> 文章列表 > 用Python并发测试MySQL的分区表

用Python并发测试MySQL的分区表

用Python并发测试MySQL的分区表

用Python并发测试MySQL的分区表

想测试比较一下mysql的分区表和普通表查询性能之间的差别,通过Python进行并发测试,压力查询mysql 的分区表,看看效果。

1、创建分区表

CREATE TABLE `bigdata_part` (`str_info` varchar(16) DEFAULT NULL,`sys_date` datetime DEFAULT NULL,`sys_year` int(11) DEFAULT NULL,`sys_month` int(11) DEFAULT NULL,`sys_week` int(11) DEFAULT NULL,KEY `ind_bigdata_info_date_part` (`str_info`,`sys_date`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
/*!50100 PARTITION BY RANGE (to_days(`sys_date`))
(PARTITION p2016 VALUES LESS THAN (736695) ENGINE = InnoDB,PARTITION p2017 VALUES LESS THAN (737060) ENGINE = InnoDB,PARTITION p2018 VALUES LESS THAN (737425) ENGINE = InnoDB,PARTITION p2019 VALUES LESS THAN (737790) ENGINE = InnoDB,PARTITION p2020 VALUES LESS THAN (738156) ENGINE = InnoDB,PARTITION p2021 VALUES LESS THAN (738521) ENGINE = InnoDB,PARTITION p2022 VALUES LESS THAN (738886) ENGINE = InnoDB,PARTITION p2023 VALUES LESS THAN (739251) ENGINE = InnoDB,PARTITION pmax VALUES LESS THAN MAXVALUE ENGINE = InnoDB) */

2、准备大量数据

CREATE DEFINER=`root`@`%` PROCEDURE `while_bigdata`()
BEGINDECLARE		i INT;#申明变量DECLARE		str VARCHAR ( 16 );DECLARE		sys_date DATETIME;SET i = 0;#变量赋值WHILEi < 10000000 DO#结束循环的条件: 当i大于10时跳出while循环SET str = upper(substring( md5( rand()), 1, 16 ));SET sys_date = from_unixtime(unix_timestamp( '2016-01-01' )+ floor(rand()*(unix_timestamp( '2023-12-31' )- unix_timestamp( '2016-01-01' )+ 1 )));INSERT INTO bigdata_partVALUES(str,sys_date,YEAR ( sys_date ),MONTH ( sys_date ),WEEK ( sys_date ));#往表添加数据SET i = i + 1;#循环一次,i加一END WHILE;#结束while循环COMMIT;#提交数据END

3、Python并行压力测试

测试思路:
(1)bigdata_test 是保存少量数据,比如1万条数据;
(2)bigdata_part是分区表,大数据量;
(3)从test表中提取数据,到 part表中查询;
(4)测试时间记录到另外一个Oracle数据库中,不影响MySQL的性能;
(5)用thread多线程并发,默认64个并发,可以多找几个客户端进行测试,都在一个客户端上,oracle驱动可能报错。
(6)数据库的并发数,需要调整,oracle和mysql两个库,根据开的线程数,每一个线程就是一个session连接。

import cx_Oracle
import pymysql
import datetime
import time
import threadingclass myThread (threading.Thread):def __init__(self, threadID, name):threading.Thread.__init__(self)self.threadID = threadIDself.name = namedef run(self):print ("开始线程:" + self.name)spend_time(self.name) #延迟1秒print ("退出线程:" + self.name)def spend_time(threadName):db_conn = pymysql.connect(host='127.0.0.1',port=3306,user='root',password='TEST',database='test',)global_db = pymysql.connect(db_conn)cursor_query = global_db.cursor()dblog = cx_Oracle.connect('scott', 'Tiger', '127.0.0.1:1521/oracle12c')log_cursor_query = dblog.cursor()sql_text = "select t.str_info,t.sys_date from bigdata_test t "cursor_query.execute(sql_text)result = cursor_query.fetchall()t1=time.time()#SQL1:测试简单查询,通过索引,CPU压力比较大for bigdata_test in result :sql_text = "select * from bigdata_part t where t.str_info = '"+ bigdata_test[0] +"' and t.sys_date='" + bigdata_test[1].strftime('%Y-%m-%d %H:%M:%S') + "'"#print(sql_text)cursor_query.execute(sql_text)result = cursor_query.fetchall()t1=time.time()#SQL1:测试简单查询,通过索引,CPU压力比较大for bigdata_test in result :#time.sleep(1)#print(bigdata_test[0],bigdata_test[1])#print(datetime.datetime.strftime(bigdata_test[1] , '%Y-%m-%d %H:%M:%S'))#print(type(bigdata_test[1]))sql_text = "select * from bigdata_part t where t.str_info = '"+ bigdata_test[0] +"' and t.sys_date='" + bigdata_test[1].strftime('%Y-%m-%d %H:%M:%S') + "'"#print(sql_text)cursor_query.execute(sql_text)#记录性能日志sql_text = sql_text.replace("'", "''")log_sql_text = "insert into mysql_query values (sysdate,'%s','%s','%f') " % (threadName, sql_text, time.time() - t1)log_cursor_query.execute(log_sql_text)t1 = time.time()dblog.commit()cursor_query.close()global_db.close()log_cursor_query.close()dblog.close()if __name__ == "__main__":#灵活调试线程数量threadnum = 64for i in range(0, threadnum):tname = "Thread-" + str(threadnum) + "-" +str(i)t = myThread(i, tname)t.start()#t.join()print('退出主程序!')

4、小结

以后测试mysql表的查询SQL性能,进行一个简单的并发压力测试,看看调优的效果。