python 操作mysql 单例

参考链接:https://zhuanlan.zhihu.com/p/368561056

参考链接:https://zhuanlan.zhihu.com/p/368561056

pip install mysqlclient

下面这个例子没有考虑mysql连接断开的情况,我怀疑我网站报错500,就是因为这个连接断开。

所以建议使用这个连接几次后就重新建立连接。

新的:

import MySQLdb
from threading import Thread, RLock

def Singleton(cls):
    instance = {"cnt": 0}
    single_lock = RLock()

    def _singleton_wrapper(*args, **kargs):
        with single_lock:
            instance["cnt"] += 1
            if cls not in instance or instance["cnt"] >= 10:
                instance[cls] = cls(*args, **kargs)
                instance["cnt"] = 0
        return instance[cls]

    return _singleton_wrapper

@Singleton
class sql_opeartor():

    def __init__(self, user_name="root", user_password="123456", dataset_name="blog"):
        self.user_name = user_name
        self.user_password = user_password
        self.database_name = dataset_name
        self.db = MySQLdb.connect("localhost", self.user_name, self.user_password, self.database_name, charset='utf8mb4')

    def __del__(self):
        self.db.close()

    def execute(self, sql):
        try:
            # 执行SQL语句
            cur = self.db.cursor()
            cur.execute(sql)
            # 向数据库提交
            cur.commit()
            cur.close()
        except:
            # 发生错误时回滚
            self.db.rollback()

    def query(self, sql):
        try:
            # 执行SQL语句
            cur = self.db.cursor()
            cur.execute(sql)
            # 向数据库提交
            cur.commit()
        except:
            # 发生错误时回滚
            self.db.rollback()
        results_tuple = cur.fetchall()
        cur.close()
        results_list = []
        for result in results_tuple:
            ans = []
            for item in result:
                ans.append(item)
            results_list.append(ans)
        return results_list

    def create_table(self, table_data=''):
        sql = "create table {0};".format(table_data)
        self.execute(sql)

    def delete_table(self, table_name=''):
        sql = 'delete from {0}'.format(table_name)
        self.execute(sql)

    def drop_table(self, table_name):
        sql = 'drop table {0}'.format(table_name)
        self.execute(sql)

    def show_table(self, table_name=''):
        sql = 'select * from {0}'.format(table_name)
        self.execute(sql)

def show_ta(n):
    data = sql_opeartor().query('show tables;')
    print(str(n)+ " "+ data[0][0])

if __name__ == '__main__':
    n = 10
    threads = []
    for i in range(n):
        t = Thread(target=show_ta, args=(i,))
        threads.append(t)

    for i in range(n):
        threads[i].start()

    for i in range(n):
        threads[i].join()


旧的:

import MySQLdb
from threading import Thread, RLock

def Singleton(cls):
    instance = {}
    single_lock = RLock()

    def _singleton_wrapper(*args, **kargs):
        with single_lock:
            if cls not in instance:
                instance[cls] = cls(*args, **kargs)
        return instance[cls]

    return _singleton_wrapper

@Singleton
class sql_opeartor():

    def __init__(self, user_name="root", user_password="111111", dataset_name="datasetName"):
        self.user_name = user_name
        self.user_password = user_password
        self.database_name = dataset_name
        self.db = MySQLdb.connect("localhost", self.user_name, self.user_password, self.database_name, charset='utf8mb4')

    def execute(self, sql):
        try:
            # 执行SQL语句
            cur = self.db.cursor()
            cur.execute(sql)
            # 向数据库提交
            cur.commit()
            cur.close()
        except:
            # 发生错误时回滚
            self.db.rollback()

    def query(self, sql):
        try:
            # 执行SQL语句
            cur = self.db.cursor()
            cur.execute(sql)
            # 向数据库提交
            cur.commit()
        except:
            # 发生错误时回滚
            self.db.rollback()
        results_tuple = cur.fetchall()
        cur.close()
        results_list = []
        for result in results_tuple:
            ans = []
            for item in result:
                ans.append(item)
            results_list.append(ans)
        return results_list

    def create_table(self, table_data=''):
        sql = "create table {0};".format(table_data)
        self.execute(sql)

    def delete_table(self, table_name=''):
        sql = 'delete from {0}'.format(table_name)
        self.execute(sql)

    def drop_table(self, table_name):
        sql = 'drop table {0}'.format(table_name)
        self.execute(sql)

    def show_table(self, table_name=''):
        sql = 'select * from {0}'.format(table_name)
        self.execute(sql)

def show_ta(n):
    data = sql_opeartor().query('show tables;')
    print(str(n)+ " "+ data[0][0])

if __name__ == '__main__':
    n = 10
    threads = []
    for i in range(n):
        t = Thread(target=show_ta, args=(i,))
        threads.append(t)

    for i in range(n):
        threads[i].start()

    for i in range(n):
        threads[i].join()


文章目录