基于sqlalchemy對(duì)mysql實(shí)現(xiàn)增刪改查操作
需求場(chǎng)景:
老大讓我利用爬蟲(chóng)爬取的數(shù)據(jù)寫(xiě)到或更新到mysql數(shù)據(jù)庫(kù)中,百度了兩種方法
1 是使用pymysql連接mysql,通過(guò)操作原生的sql語(yǔ)句進(jìn)行增刪改查數(shù)據(jù);
2 是使用sqlalchemy連接mysql,通過(guò)ORM模型建表并操作數(shù)據(jù)庫(kù),不需要寫(xiě)原生的sql語(yǔ)句,相對(duì)簡(jiǎn)單些;
以下就是本次使用sqlalchemy的經(jīng)驗(yàn)之談。
實(shí)現(xiàn)流程:連接數(shù)據(jù)庫(kù)》通過(guò)模型類(lèi)創(chuàng)建表》建立會(huì)話(huà)》執(zhí)行創(chuàng)建表語(yǔ)句》通過(guò)會(huì)話(huà)進(jìn)行增刪改查
from sqlalchemy import exists, Column, Integer, String, ForeignKey, existsfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import create_enginefrom sqlalchemy.orm import sessionmaker# 創(chuàng)建的數(shù)據(jù)庫(kù)引擎engine = create_engine('mysql+pymysql://user:pwd@ip/數(shù)據(jù)庫(kù)名?charset=utf8')#創(chuàng)建session類(lèi)型DBSession = sessionmaker(bind=engine)# 實(shí)例化官宣模型 - Base 就是 ORM 模型Base = declarative_base()# 創(chuàng)建服務(wù)單表class ServiceOrder(Base): __tablename__ = ’serviceOrderTable’ id = Column(Integer, primary_key=True, autoincrement=True) serviceOrderId = Column(String(32), nullable=False, index=True, comment=’服務(wù)單ID’) serviceDesc = Column(String(268), comment=’服務(wù)說(shuō)明’) oneLevelName = Column(String(32), comment=’C類(lèi)別’) twoLevelName = Column(String(32), comment=’T子類(lèi)’) threeLevelName = Column(String(32), comment=’I項(xiàng)目’) fourLevelName = Column(String(32), comment=’S子項(xiàng)’) transferTimes = Column(String(32), comment=’轉(zhuǎn)派次數(shù)’) overDueStatus = Column(String(32), comment=’過(guò)期狀態(tài)’) serviceTimeLimit = Column(String(32), comment=’服務(wù)時(shí)限’) serTimeLimitTypeName = Column(String(16), comment=’時(shí)限類(lèi)型’) # 一對(duì)多: # serviceWorkOrder = relationship('ServiceWorkOrder', backref='serviceorder')# 多對(duì)一:多個(gè)服務(wù)工單可以屬于服務(wù)單class ServiceWorkOrder(Base): __tablename__ = ’serviceWorkOrderTable’ id = Column(Integer, primary_key=True, autoincrement=True) serviceWorkOrderId = Column(String(32), nullable=False, index=True, comment=’服務(wù)工單ID’) workOrderName = Column(String(268), comment=’工單名稱(chēng)’) fromId = Column(String(32), comment=’服務(wù)單ID’) createUserSectionName = Column(String(32), comment=’創(chuàng)建人室’) createUserName = Column(String(32), comment=’創(chuàng)建人’) handlerName = Column(String(32), comment=’處理人’) statusName = Column(String(32), comment=’工單狀態(tài)’) createTime = Column(String(32), comment=’創(chuàng)建時(shí)間’) # “多”的一方的book表是通過(guò)外鍵關(guān)聯(lián)到user表的: # serviceOrder_id = Column(Integer, ForeignKey(’serviceOrderTable.id’))# 創(chuàng)建數(shù)據(jù)庫(kù) 如果數(shù)據(jù)庫(kù)已存在 則不會(huì)創(chuàng)建 會(huì)根據(jù)庫(kù)名直接連接已有的庫(kù)def init_db(): Base.metadata.create_all(engine)def drop_db(): Base.metadata.drop_all(engine)def insert_update(): # all_needed_data_lists 是需要插入數(shù)據(jù)庫(kù)的數(shù)據(jù) 格式[{key: value, ... }, { }, { }...] for item in all_needed_data_lists: ServiceOrderRow = ServiceOrder(serviceOrderId=item[’serviceOrderId’], serviceDesc=item[’serviceDesc’], oneLevelName=item[’oneLevelName’], twoLevelName=item[’twoLevelName’], threeLevelName=item[’threeLevelName’], fourLevelName=item[’fourLevelName’], transferTimes=item[’transferTimes’], overDueStatus=item[’overDueStatus’], serviceTimeLimit=item[’serviceTimeLimit’], serTimeLimitTypeName=item[’serTimeLimitTypeName’], ) try: # 利用exists判斷目標(biāo)對(duì)象是否存在,返回True或Faults it_exists = session.query( exists().where(ServiceOrder.serviceOrderId == item[’serviceOrderId’] )).scalar() except Exception as e: self.log.error(e) break try: # 如果不存在,進(jìn)行新增;存在的話(huà)就更新現(xiàn)存的數(shù)據(jù) if not it_exists:session.add(ServiceOrderRow) else:session.query(ServiceOrder).filter(ServiceOrder.serviceOrderId == item[’serviceOrderId’]) .update(item) except Exception as e: self.log.error(e) break try: session.commit() self.log.info(’數(shù)據(jù)更新成功!’) except: session.rollback() self.log.info(’數(shù)據(jù)更新失敗!’)if __name__ == '__main__': # 創(chuàng)建數(shù)據(jù)庫(kù) 如果數(shù)據(jù)庫(kù)已存在 則不會(huì)創(chuàng)建 會(huì)根據(jù)庫(kù)名直接連接已有的庫(kù) init_db() # 創(chuàng)建session對(duì)象,進(jìn)行增刪改查: session = DBSession() # 利用session 增 改數(shù)據(jù) 記得提交 insert_update()
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持好吧啦網(wǎng)。
相關(guān)文章:
1. 什么是Access數(shù)據(jù)庫(kù)2. Microsoft Office Access刪除表記錄的方法3. DB2 V9.5工作負(fù)載管理之閾值(THRESHOLD)4. Sql Server 壓縮數(shù)據(jù)庫(kù)日志文件的方法5. Oracle數(shù)據(jù)庫(kù)的兩種授權(quán)收費(fèi)方式詳解6. MariaDB中1045權(quán)限錯(cuò)誤導(dǎo)致拒絕用戶(hù)訪(fǎng)問(wèn)的錯(cuò)誤解決方法7. mybatis plus in方法使用詳解8. 實(shí)例講解MySQL 慢查詢(xún)9. mybatis 運(yùn)行時(shí)加載自定義mapper文件方式10. MySQL 常用函數(shù)總結(jié)

網(wǎng)公網(wǎng)安備