python 操作mysql 单例
参考链接:https://zhuanlan.zhihu.com/p/368561056
参考链接:https://zhuanlan.zhihu.com/p/368561056
pip install mysqlclient
下面这个例子没有考虑mysql连接断开的情况,我怀疑我网站报错500,就是因为这个连接断开。
所以建议使用这个连接几次后就重新建立连接。
新的:
import MySQLdb
from threading import Thread, RLock
def Singleton(cls):
instance = {"cnt": 0}
single_lock = RLock()
def _singleton_wrapper(*args, **kargs):
with single_lock:
instance["cnt"] += 1
if cls not in instance or instance["cnt"] >= 10:
instance[cls] = cls(*args, **kargs)
instance["cnt"] = 0
return instance[cls]
return _singleton_wrapper
@Singleton
class sql_opeartor():
def __init__(self, user_name="root", user_password="123456", dataset_name="blog"):
self.user_name = user_name
self.user_password = user_password
self.database_name = dataset_name
self.db = MySQLdb.connect("localhost", self.user_name, self.user_password, self.database_name, charset='utf8mb4')
def __del__(self):
self.db.close()
def execute(self, sql):
try:
# 执行SQL语句
cur = self.db.cursor()
cur.execute(sql)
# 向数据库提交
cur.commit()
cur.close()
except:
# 发生错误时回滚
self.db.rollback()
def query(self, sql):
try:
# 执行SQL语句
cur = self.db.cursor()
cur.execute(sql)
# 向数据库提交
cur.commit()
except:
# 发生错误时回滚
self.db.rollback()
results_tuple = cur.fetchall()
cur.close()
results_list = []
for result in results_tuple:
ans = []
for item in result:
ans.append(item)
results_list.append(ans)
return results_list
def create_table(self, table_data=''):
sql = "create table {0};".format(table_data)
self.execute(sql)
def delete_table(self, table_name=''):
sql = 'delete from {0}'.format(table_name)
self.execute(sql)
def drop_table(self, table_name):
sql = 'drop table {0}'.format(table_name)
self.execute(sql)
def show_table(self, table_name=''):
sql = 'select * from {0}'.format(table_name)
self.execute(sql)
def show_ta(n):
data = sql_opeartor().query('show tables;')
print(str(n)+ " "+ data[0][0])
if __name__ == '__main__':
n = 10
threads = []
for i in range(n):
t = Thread(target=show_ta, args=(i,))
threads.append(t)
for i in range(n):
threads[i].start()
for i in range(n):
threads[i].join()
旧的:
import MySQLdb
from threading import Thread, RLock
def Singleton(cls):
instance = {}
single_lock = RLock()
def _singleton_wrapper(*args, **kargs):
with single_lock:
if cls not in instance:
instance[cls] = cls(*args, **kargs)
return instance[cls]
return _singleton_wrapper
@Singleton
class sql_opeartor():
def __init__(self, user_name="root", user_password="111111", dataset_name="datasetName"):
self.user_name = user_name
self.user_password = user_password
self.database_name = dataset_name
self.db = MySQLdb.connect("localhost", self.user_name, self.user_password, self.database_name, charset='utf8mb4')
def execute(self, sql):
try:
# 执行SQL语句
cur = self.db.cursor()
cur.execute(sql)
# 向数据库提交
cur.commit()
cur.close()
except:
# 发生错误时回滚
self.db.rollback()
def query(self, sql):
try:
# 执行SQL语句
cur = self.db.cursor()
cur.execute(sql)
# 向数据库提交
cur.commit()
except:
# 发生错误时回滚
self.db.rollback()
results_tuple = cur.fetchall()
cur.close()
results_list = []
for result in results_tuple:
ans = []
for item in result:
ans.append(item)
results_list.append(ans)
return results_list
def create_table(self, table_data=''):
sql = "create table {0};".format(table_data)
self.execute(sql)
def delete_table(self, table_name=''):
sql = 'delete from {0}'.format(table_name)
self.execute(sql)
def drop_table(self, table_name):
sql = 'drop table {0}'.format(table_name)
self.execute(sql)
def show_table(self, table_name=''):
sql = 'select * from {0}'.format(table_name)
self.execute(sql)
def show_ta(n):
data = sql_opeartor().query('show tables;')
print(str(n)+ " "+ data[0][0])
if __name__ == '__main__':
n = 10
threads = []
for i in range(n):
t = Thread(target=show_ta, args=(i,))
threads.append(t)
for i in range(n):
threads[i].start()
for i in range(n):
threads[i].join()