building_compile_config.ini 文件内容
[hd_addr]
host = ip
port = [端口]
user = [用户名]
passwd = [密码]
database = [数据库名]
building_mysql 内容
# -*- coding: utf-8 -*-
;;;
;Time ; 2022/10/12
;Auth ; REN
;File ;building_mysql.py
;IDE ;PyCharm
;;;
import os
import configparser
import pymysql
class building_mysql():
def __init__(self):
config_path = os.path.join(;..;, ;conf;, ;building_compile_config.ini;)
# configparser读取ini配置文件模块
cf = configparser.ConfigParser()
# 读取section为item的键值
cf.read(config_path)
# opt;,;bdos;,;bdos-runner-server;,;bdms;,;py_address
item = ;hd_addr;
host = cf.get(item, ;host;)
port = int(cf.get(item, ;port;))
user = cf.get(item, ;user;)
passwd = cf.get(item, ;passwd;)
database = cf.get(item, ;database;)
self.conn = pymysql.connect(
host=host,
port=port,
user=user,
passwd=passwd,
database=database,
cursorclass=pymysql.cursors.DictCursor,
charset=;utf8;)
self.cursor = self.conn.cursor()
def insert_with_sql(self, sql, values):
;;;
获取数据;返回json
:param sql:
:return:
;;;
self.cursor.execute(sql, values)
# cursor.execute(query, values)
str = ;insert successful!;
return str
def query_with_sql(self, sql):
;;;
获取数据;返回json
:param sql:
:return:
;;;
self.cursor.execute(sql)
return self.cursor.fetchall()
def update_with_sql(self, sql):
;;;
获取数据;返回json
:param sql:
:return:
;;;
self.cursor.execute(sql)
self.conn.commit()
# return self.cursor.fetchall()
str = ;update successful!;
return str
def delete_with_sql(self, sql):
;;;
获取数据;返回json
:param sql:
:return:
;;;
self.cursor.execute(sql)
self.conn.commit()
# return self.cursor.fetchall()
str = ;delete successful!;
return str
def truncate_with_sql(self, sql):
;;;
获取数据;返回json
:param sql:
:return:
;;;
self.cursor.execute(sql)
self.conn.commit()
res = ;truncate successful!;
return res
def commit(self):
self.conn.commit()
def close(self):
;;;
关闭连接
:return:
;;;
try:
self.cursor.close()
self.conn.close()
except:
pass
# -*- coding: utf-8 -*-
;;;
;Time ; 2022/10/12
;Auth ; REN
;File ;cs.py
;IDE ;PyCharm
;;;
from utils.building_mysql import building_mysql
import json
def insert_db(building_mysql, table, data):
;;;
将处理过的数据数据插入到数据库中
:param table: 表名
:param data: 需要插入数据库的数据字典格式,例子;{k1:v1,k2:v2,k3:v3}
:return:
;;;
keys = ;, ;.join(data.keys())
values = ;, ;.join([%s;] * len(data))
sql = ;INSERT INTO {table}({keys}) VALUES ({values});.format(table=table, keys=keys, values=values)
try:
# 这里的第二个参数传入的要是一个元组
if building_mysql.cursor.execute(sql, tuple(data.values())):
building_mysql.conn.commit()
except Exception as e:
print(111)
print(e)
building_mysql.conn.rollback()
if __name__ == ;__main__;:
mysql_util = building_mysql()
table_name=;table_name;
# 需要插入的数据
insert_db_date={k1:v1,k2:v2,k3:v3}
# 调用自定义插入函数
insert_db(mysql_util,table_name,insert_db_date)
# -*- coding: utf-8 -*-
;;;
;Time ; 2022/10/12
;Auth ; REN
;File ;cs.py
;IDE ;PyCharm
;;;
from utils.building_mysql import building_mysql
def update_bd(keys, table, lis):
;;;
keys更新条件;格式;id=;123123;
table表名
lis需要更新的字段字典;格式;{;k1;;;v1;,;k2;:;v2;,...;;}
:param table:
:param data:
:return:
;;;
values = str(lis).replace(;;;, ;;).replace(;: ;, ;=;;).replace(;{;, ;;).replace(;};, ;;).replace(;,;, ;;,;)
sql = ;update {table} set {values} where {keys};;.format(table=table, keys=keys, values=values).replace(; where;,
;; where;).replace(
;;None;;, ;NULL;)
mysql_util = building_mysql()
mysql_util.query_with_sql(sql)
mysql_util.commit()
mysql_util.close()
if __name__ == ;__main__;:
table_name=;table_name;
keys= ;id=123456;
# 需要更新的字段
update_db_date={;k1;;;v1;,;k2;:;v2;,...;;}
# 调用自定义更新函数
update_db(keys,table_name,update_db_date)
# -*- coding: utf-8 -*-
;;;
;Time ; 2022/10/12
;Auth ; REN
;File ;cs.py
;IDE ;PyCharm
;;;
from utils.building_mysql import building_mysql
if __name__ == ;__main__;:
mysql_util = building_mysql()
sql=;delete from 表名 where 条件;
mysql_util.query_with_sql(sql)
mysql_util.commit() # 不加不会修改数据库
mysql_util.close()
# -*- coding: utf-8 -*-
;;;
;Time ; 2022/10/12
;Auth ; REN
;File ;cs.py
;IDE ;PyCharm
;;;
from utils.building_mysql import building_mysql
if __name__ == ;__main__;:
mysql_util = building_mysql()
sql=;select 字段1、字段2、... from 表名 where 条件;
mysql_util.query_with_sql(sql)
mysql_util.close()