『pyqt5 从0基础开始项目实战』12. 实现多线程循环检测数据的开始和停止(保姆级图文)
目录
欢迎关注 『pyqt5 从0基础开始项目实战』 专栏,持续更新中
欢迎关注 『pyqt5 从0基础开始项目实战』 专栏,持续更新中
最终效果
- 开始执行
- 逐渐停止
- 全部停止
导包和框架代码
提示!!!! 检查一下你的threads.py有没有导入 import time
如果导入了from datetime import time
会导致闪退!!
请查阅上文获取源码,此处只列举第三方库
import os
import sys
from PyQt5.QtCore import Qt
from PyQt5.QtWidgets import QApplication, QWidget, QDesktopWidget, QHBoxLayout, QVBoxLayout
from PyQt5.QtWidgets import QPushButton, QLineEdit, QTableWidget, QTableWidgetItem, QLabel
from PyQt5.QtWidgets import QMessageBox, QMenu
main.py
避免重复执行
方法很简单,设置一个变量用来记录当前是否已经在执行中。
- 初始化设置常量
# 设置一些常用的常量
RUNNING = 1
STOPPING = 2
STOP = 3class MainWindow(QWidget):def __init__(self):···self.switch = STOP #用于记录当前是否在执行中,如果已经在执行中,不允许重复启动···
在开始按钮的事件最前面添加判断
# 点击开始def event_start_click(self):if self.switch != STOP:QMessageBox.warning(self, "错误", "正在执行获取终止中,请勿重复操作")returnself.switch = RUNNING
- 在结束按钮的事件前面添加判断
# 点击停止def event_stop_click(self):if self.switch != RUNNING:#如果点击停止时不是执行状态QMessageBox.warning(self, "错误", "不在执行中,不需要停止")returnself.switch = STOPPING
- 在我们更新左下角标签的事件中也要修改
def update_status_message(self, message):if message == "已终止":self.switch = STOPself.label_status.setText(message)self.label_status.repaint()
开始与停止事件
- 开始事件
# 点击开始def event_start_click(self):if self.switch != STOP:QMessageBox.warning(self, "错误", "正在执行获取终止中,请勿重复操作")returnself.switch = RUNNING# 1.为每一行创建一个线程去执行 ( 所有的线程记录) [x,x,x,x,]from utils.scheduler import SCHEDULERSCHEDULER.start(BASE_DIR,self,self.task_start_callback,self.task_stop_callback,self.task_counter_callback,self.task_error_counter_callback)# 2.执行中self.update_status_message("执行中")
- 停止事件
# 点击停止def event_stop_click(self):if self.switch != RUNNING:#如果点击停止时不是执行状态QMessageBox.warning(self, "错误", "不在执行中,不需要停止")returnself.switch = STOPPING# 1.执行中的线程逐一终止from utils.scheduler import SCHEDULERSCHEDULER.stop()# 2.更新状态
表格更新事件
# 将状态改为执行中def task_start_callback(self, row_index):# 对表格中的数据进行状态更新cell_status = QTableWidgetItem(STATUS_MAPPING[2])#得到对应行的状态文本cell_status.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)#不可选中 不可修改self.table_widget.setItem(row_index, 6, cell_status)#修改状态# 将状态改为待执行def task_stop_callback(self, row_index):cell_status = QTableWidgetItem(STATUS_MAPPING[1])cell_status.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)self.table_widget.setItem(row_index, 6, cell_status)# 成功次数增加def task_counter_callback(self, row_index):# 原有个数+1old_count = self.table_widget.item(row_index, 4).text().strip()new_count = int(old_count) + 1# 表格赋值cell_status = QTableWidgetItem(str(new_count))cell_status.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)self.table_widget.setItem(row_index, 4, cell_status)# 503 次数增加def task_error_counter_callback(self, row_index):# 原有个数+1old_count = self.table_widget.item(row_index, 5).text().strip()new_count = int(old_count) + 1# 表格赋值cell_status = QTableWidgetItem(str(new_count))cell_status.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)self.table_widget.setItem(row_index, 5, cell_status)
scheduler.py
初始化
def __init__(self):self.thread_list = []self.window = Noneself.terminate = False # 标志,点击停止,false表示此时可以启动,true表示正在关闭
开始线程
# 开始线程def start(self, base_dir, window, fn_start, fn_stop, fn_counter, fn_error_counter):self.window = window#这里的window就是我们main传入的窗体对象,通过这个 .window.控件 我们可以调用窗体对象中的所有控件self.terminate = False# 1.获取表格中的所有数据,每一行创建一个线程去执行监控for row_index in range(window.table_widget.rowCount()):# 0/1/2/3asin = window.table_widget.item(row_index, 0).text().strip()#得到型号status_text = window.table_widget.item(row_index, 6).text().strip()#得到状态文本# 日志import oslog_folder = os.path.join(base_dir, 'log')if not os.path.exists(log_folder):#如果日志文件夹不存在就新建os.makedirs(log_folder)log_file_path = os.path.join(log_folder, "{}.log".format(asin))# 日志文件# 只有是待执行的时,才创建线程去执行if status_text != "待执行":continue# 2.每个线程 执行 & 状态实时的显示在表格中 信号+回调from utils.threads import TaskThreadt = TaskThread(self, log_file_path, row_index, asin, window)t.start_signal.connect(fn_start)t.counter_signal.connect(fn_counter)t.error_counter_signal.connect(fn_error_counter)t.stop_signal.connect(fn_stop)t.start()self.thread_list.append(t)
停止线程
#停止线程def stop(self):self.terminate = True#设置标记为true# 创建线程,去监测 thread_list 中的数量 + 实时更新的窗体的label中# self.window.update_status_message("xxx")from utils.threads import StopThreadt = StopThread(self, self.window)t.update_signal.connect(self.window.update_status_message)#窗口更新数据的声明t.start()
在线程列表中删除线程
# 在线程列表中删除线程def destroy_thread(self, thread):self.thread_list.remove(thread)#关闭线程时要把线程从我们正在执行的线程列表中移除
# 在threads.py中的新建线程与停止线程函数 - 开始,创建线程去执行任务(更新表格数据 + 信号 ) - 结束,表格更新 + 结束
我们的思路是在新建线程的同时把线程放入我们的thread_list,线程停止执行后移除thread_list,根据thread_list是否为空,判断当前是否所有的线程都停止了。
TaskThread新建线程
class TaskThread(QThread):start_signal = pyqtSignal(int)#开始信号stop_signal = pyqtSignal(int)#结束信号counter_signal = pyqtSignal(int)#总数统计信号error_counter_signal = pyqtSignal(int)#错误进程信号def __init__(self, scheduler, log_file_path, row_index, asin, *args, kwargs):super().__init__(*args, kwargs)self.scheduler = schedulerself.log_file_path = log_file_path#日志文件self.row_index = row_indexself.asin = asindef run(self):# 触发start_signalself.start_signal.emit(self.row_index)import timeimport randomwhile True:# 停止if self.scheduler.terminate:#终止线程的执行。根据操作系统的调度策略,线程可能会立即终止,也可能不会立即终止。可以在终止()之后使用QThread.wait()。self.stop_signal.emit(self.row_index)# 自己的线程在thread_list中移除掉self.scheduler.destroy_thread(self)returntry:time.sleep(random.randint(1, 3))#随机休眠self.counter_signal.emit(self.row_index)# 日志写入,注意用mode=a 表示在原来的文件末尾续写而不是覆盖with open(self.log_file_path, mode='a', encoding='utf-8') as f:f.write("日志\\n")# 监控的动作# 1.根据型号访问通过bs4获取数据# 2.获取到数据 价格是否小于预期# 3.发送报警(邮件)time.sleep(5)#检测的5秒延时except Exception as e:self.error_counter_signal.emit(self.row_index)#如果错误,提交错误的行号
StopThread停止线程
class StopThread(QThread):update_signal = pyqtSignal(str)#线程传递数据的信号def __init__(self, scheduler, *args, kwargs):super().__init__(*args, kwargs)self.scheduler = schedulerdef run(self):# 1.监测线程数量(总线程数)while True:#死循环,直到所有的线程都结束了才跳出循环running_count = len(self.scheduler.thread_list)#得到存储当前正在运行的线程列表的长度self.update_signal.emit("正在终止({})".format(running_count))#正在终止X个还没有结束的线程if running_count == 0:#如果执行中的线程数为0,说明所有的线程都结束了,退出循环breaktime.sleep(1)self.update_signal.emit("已终止")#提交数据,更新左下角状态
完整代码
main.py
import os
import sys
from PyQt5.QtCore import Qt
from PyQt5.QtWidgets import QApplication, QWidget, QDesktopWidget, QHBoxLayout, QVBoxLayout
from PyQt5.QtWidgets import QPushButton, QLineEdit, QTableWidget, QTableWidgetItem, QLabel
from PyQt5.QtWidgets import QMessageBox, QMenu
from utils.dialog import LogDialog#拿到在另外一台计算机上执行程序代码所存储的文件路径。
BASE_DIR = os.path.dirname(os.path.realpath(sys.argv[0]))STATUS_MAPPING = {0: "初始化中",1: "待执行",2: "正在执行",3: "完成并提醒",10: "异常并停止",11: "初始化失败",
}# 设置一些常用的常量
RUNNING = 1
STOPPING = 2
STOP = 3class MainWindow(QWidget):def __init__(self):# 用super 继承父类的初始化super().__init__()self.switch = STOP #用于记录当前是否在执行中,如果已经在执行中,不允许重复启动self.txt_asin=None# 设置窗口的窗体标题self.setWindowTitle('发现你走远了的xx系统')# 设置窗体的尺寸self.resize(1228, 450)# 设置窗体位置# 获取整个窗口部分的宽高和左上角坐标信息,返回值是一个QRect类型,(x,y width,height)qr = self.frameGeometry()cp = QDesktopWidget().availableGeometry().center() # 得到屏幕中间的位置信息qr.moveCenter(cp) # 让我们的窗体移动到屏幕中间# 创建窗口总布局layout = QVBoxLayout()# 讲调用方法生成的顶部菜单布局添加到总布局layout.addLayout(self.init_header())layout.addLayout(self.init_form())layout.addLayout(self.init_table())layout.addLayout(self.init_footer())# 给窗体设置元素的排列方式self.setLayout(layout)def init_header(self):# 1.顶部菜单布局header_layout = QHBoxLayout() # 创建顶部菜单布局# 1.1 放入按钮btn_start = QPushButton("开始") # 新建一个开始按钮btn_start.clicked.connect(self.event_start_click)header_layout.addWidget(btn_start) # 将开始按钮添加到顶部菜单布局btn_stop = QPushButton("停止") # 新建一个开始按钮btn_stop.clicked.connect(self.event_stop_click)header_layout.addWidget(btn_stop) # 将开始按钮添加到顶部菜单布局# 1.2 加入弹簧header_layout.addStretch()return header_layoutdef init_form(self):# 2.添加内容布局form_layout = QHBoxLayout() # 创建添加内容布局# 2.1 输入框txt_asin = QLineEdit() # 新建一个输入框对象txt_asin.setText("B07YN82X3B=100") # 设置默认的form数据txt_asin.setPlaceholderText("请输入商品ID和价格,例如:B0818JJQQ8=88") # 设置灰色的提示信息self.txt_asin=txt_asinform_layout.addWidget(txt_asin) # 将输入框加入到布局中# 2.2 添加按钮btn_add = QPushButton("添加") # 新建一个添加按钮btn_add.clicked.connect(self.event_add_click)#新增一个点击事件form_layout.addWidget(btn_add) # 将添加按钮添加到form布局return form_layoutdef init_table(self):# 3.表格数据展示布局table_layout = QHBoxLayout()# 3.1 创建表格self.table_widget=table_widget = QTableWidget(0, 8) # 新建一个0行8列的表格# # 修改表格索引名# item=QTableWidgetItem()# item.setText("标题0")# table_widget.setHorizontalHeaderItem(0,item)# table_widget.setColumnWidth(0,150)#设置水平单元格0号位置的宽度 150## item2=QTableWidgetItem()# item2.setText("网址1")# table_widget.setHorizontalHeaderItem(1,item2)# table_widget.setColumnWidth(1,400)#设置水平单元格1号位置的宽度 400## item3=QTableWidgetItem()# item3.setText("行索引0")# table_widget.setVerticalHeaderItem(0,item3)table_header = [{"field": "asin", "text": "ASIN", 'width': 120},{"field": "title", "text": "标题", 'width': 150},{"field": "url", "text": "URL", 'width': 400},{"field": "price", "text": "底价", 'width': 100},{"field": "success", "text": "成功次数", 'width': 100},{"field": "error", "text": "503次数", 'width': 100},{"field": "status", "text": "状态", 'width': 100},{"field": "frequency", "text": "频率(N秒/次)", 'width': 100},]for idx, info in enumerate(table_header):item = QTableWidgetItem()item.setText(info['text'])table_widget.setHorizontalHeaderItem(idx, item)table_widget.setColumnWidth(idx, info['width'])# 3.2 初始化表格数据# 读取数据文件import jsonfile_path = os.path.join(BASE_DIR, "db", "db.json")with open(file_path, mode='r', encoding='utf-8') as f:data = f.read()data_list = json.loads(data)#读取得到了json数据current_row_count = table_widget.rowCount() # 当前表格有多少行for row_list in data_list:#每有一行json数据,我们就需要遍历一轮增加一行数据table_widget.insertRow(current_row_count)#增加一行# print(row_list) # ['B08166SLDF', 'AMD er', 'https://www.amazon.', 300.0, 0, 166, 1, 5]# 把row_list写入这一行for i, ele in enumerate(row_list):#enumerate中 i表示索引id,ele表示数据值# 一个多目运算 如果i==6(此时是状态的数据) 如果STATUS_MAPPING中能够找到ele的索引,那么我们把ele设置成STATUS_MAPPING中的内容ele = STATUS_MAPPING[ele] if i == 6 else elecell = QTableWidgetItem(str(ele))#注意我们的数据格式转为str,比如说状态的数据可能本身是int类型if i in [0, 4, 5, 6]:# 不可修改cell.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)#指定索引单元格不能选中和修改table_widget.setItem(current_row_count, i, cell)#写入数据到指定单元格current_row_count += 1 #行数+1,这样下次遍历行时会在新的行下面新增一行# 如果不新增行其实也可以用 current_row_count=table_widget.rowCount()替代# # 开启右键复制功能,在表格中点击右键时,自动触发 right_menu 函数table_widget.setContextMenuPolicy(Qt.CustomContextMenu)#设置启用右键菜单table_widget.customContextMenuRequested.connect(self.table_right_menu)#右键菜单的实现函数table_layout.addWidget(table_widget) # 把表格添加到表格布局中return table_layout#右键菜单功能def table_right_menu(self, pos):# 只有选中一行时,才支持右键 selected_item_list可以是一个多行的列表# 我们只处理选中的第一行,所以要 selected_item_list[0] 表示选中的多行中的第一行selected_item_list = self.table_widget.selectedItems()if len(selected_item_list) == 0:returnmenu = QMenu()item_copy = menu.addAction("复制")item_log = menu.addAction("查看日志")item_log_clear = menu.addAction("清除日志")action = menu.exec_(self.table_widget.mapToGlobal(pos))# 获取选中的对象if action == item_copy:# 赋值当前型号 B08166SLDFclipboard = QApplication.clipboard()#新建剪切板对象clipboard.setText(selected_item_list[0].text())#剪切板复制得到ASINif action == item_log:# 查看日志,在对话框中显示日志信息# 获取选中的型号row_index = selected_item_list[0].row()#得到选中第一行的行号asin = self.table_widget.item(row_index, 0).text().strip()#得到选中行的商品ASINdialog = LogDialog(asin)#打开我们写好的日志窗口,针对每个商品创建的日志的保存和修改也在这个窗口中实现dialog.setWindowModality(Qt.ApplicationModal)dialog.exec_()if action == item_log_clear:# 清空日志row_index = selected_item_list[0].row()asin = self.table_widget.item(row_index, 0).text().strip()file_path = os.path.join("log", "{}.log".format(asin))if os.path.exists(file_path):#如果已经存在了日志os.remove(file_path)#删除日志文件def init_footer(self):# 4.底部菜单footer_layout = QHBoxLayout()self.label_status=label_status = QLabel("未检测", self)footer_layout.addWidget(label_status)footer_layout.addStretch() # 添加弹簧,更加美观btn_reset = QPushButton("重新初始化")btn_reset.clicked.connect(self.event_reset_click)#新增一个点击事件footer_layout.addWidget(btn_reset)btn_recheck = QPushButton("重新检测")footer_layout.addWidget(btn_recheck)btn_reset_count = QPushButton("次数清零")btn_reset_count.clicked.connect(self.event_reset_count_click)footer_layout.addWidget(btn_reset_count)btn_delete = QPushButton("删除检测项")btn_delete.clicked.connect(self.event_delete_click)footer_layout.addWidget(btn_delete)btn_alert = QPushButton("SMTP报警配置")btn_alert.clicked.connect(self.event_alert_click)footer_layout.addWidget(btn_alert)btn_proxy = QPushButton("代理IP")btn_proxy.clicked.connect(self.event_proxy_click)footer_layout.addWidget(btn_proxy)return footer_layout# 添加数据按钮事件def event_add_click(self):# 1.获取输入框中的内容text = self.txt_asin.text()text = text.strip()#去掉空格if not text:#如果输入的是空格空字符QMessageBox.warning(self, "错误", "商品的ASIN输入错误")return# B07YN82X3B=100asin, price = text.split("=")#把数据分为id 和 价格price = float(price)#价格转为浮点型print(asin, price)# 2.加入到表格中(型号、底价)new_row_list = [asin, "", "", price, 0, 0, 0, 5]# 写入表格,具体操作和之前初始化表格一样current_row_count = self.table_widget.rowCount() # 当前表格有多少行self.table_widget.insertRow(current_row_count)for i, ele in enumerate(new_row_list):ele = STATUS_MAPPING[ele] if i == 6 else elecell = QTableWidgetItem(str(ele))if i in [0, 4, 5, 6]:# 不可修改cell.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)self.table_widget.setItem(current_row_count, i, cell)# 3.发送请求自动获取标题# 注意:不能再主线程中做数据获取的事,创建一个线程去做数据获取,爬取到数据再更新到窗体应用(信号)。from utils.threads import NewTaskThread #导入我们写的线程py文件中的类thread = NewTaskThread(current_row_count, asin, self)#初始化线程中类的对象,传入了 表格总共的行数{current_row_count} 和 当前在表格的第几行{asin} 以及自身这个对象 {self}thread.success.connect(self.init_task_success_callback)#添加数据成功事件成功的回调,马上开始初始化数据thread.error.connect(self.init_task_error_callback)#事件失败的回调,弹框提示错误thread.start()#线程开始运行pass# 添加数据成功事件成功的回调,得到数据后马上开始初始化数据def init_task_success_callback(self, row_index, asin, title, url):print("成功",row_index, asin, title, url)# 更新标题cell_title = QTableWidgetItem(title)self.table_widget.setItem(row_index, 1, cell_title)# 更新URLcell_url = QTableWidgetItem(url)self.table_widget.setItem(row_index, 2, cell_url)# 更新状态 成功后未待执行cell_status = QTableWidgetItem(STATUS_MAPPING[1])cell_status.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)self.table_widget.setItem(row_index, 6, cell_status)# 输入框清空self.txt_asin.clear()# 添加数据失败事件成功的回调,得到数据后马上开始初始化数据def init_task_error_callback(self, row_index, asin, title, url):print("错误",row_index, asin, title, url)# 更新标题cell_title = QTableWidgetItem(title)self.table_widget.setItem(row_index, 1, cell_title)# 更新URLcell_url = QTableWidgetItem(url)self.table_widget.setItem(row_index, 2, cell_url)# 更新状态为 初始化失败cell_status = QTableWidgetItem(STATUS_MAPPING[11])cell_status.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)self.table_widget.setItem(row_index, 6, cell_status)def event_reset_click(self):# 1.获取已经选中的行row_list = self.table_widget.selectionModel().selectedRows()if not row_list:#如果没有选中行,提示错误QMessageBox.warning(self, "错误", "请选择要重新初始化的行")return# 2.获取每一行进行重新初始化for row_object in row_list:index = row_object.row()print("选中的行:", index)# 获取信号asin = self.table_widget.item(index, 0).text().strip()#得到了商品id# 状态重新初始化cell_status = QTableWidgetItem(STATUS_MAPPING[0])#把状态设置成初始化中print("选中的行:", index,"---",STATUS_MAPPING[0]) # 选中的行: 1 --- 初始化中cell_status.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)self.table_widget.setItem(index, 6, cell_status)# 创建线程去进行初始化动作from utils.threads import NewTaskThreadthread = NewTaskThread(index, asin, self)#传入 选中的行/商品id/窗体对象thread.success.connect(self.init_task_success_callback)thread.error.connect(self.init_task_error_callback)thread.start()# 点击数量清零def event_reset_count_click(self):# 1.获取已经选中的行row_list = self.table_widget.selectionModel().selectedRows()if not row_list:QMessageBox.warning(self, "错误", "请选择要操作的行")return# 2.获取每一行进行重新初始化for row_object in row_list:index = row_object.row()# print("选中的行:", index)# # 获取信号# asin = self.table_widget.item(index, 0).text().strip()# 状态重新初始化cell_status = QTableWidgetItem(str(0))#得到一个字符串格式的 0cell_status.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)self.table_widget.setItem(index, 4, cell_status)#设置成功次数清零cell_status = QTableWidgetItem(str(0))#得到一个字符串格式的 0cell_status.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)self.table_widget.setItem(index, 5, cell_status)#设置503次数清零# 点击删除def event_delete_click(self):# 1.获取已经选中的行row_list = self.table_widget.selectionModel().selectedRows()if not row_list:QMessageBox.warning(self, "错误", "请选择要操作的行")return# 2.翻转,先删除掉行号大的数据,然后在删除行号小的数据,避免删除行号小的数据变动行号大的数据。row_list.reverse()print (row_list)# 3.删除for row_object in row_list:index = row_object.row()print("当前删除的行是:",index)self.table_widget.removeRow(index)# 点击邮件配置def event_alert_click(self):# 创建弹窗并在弹窗中进行设置from utils.dialog import AlertDialogdialog = AlertDialog()dialog.setWindowModality(Qt.ApplicationModal)dialog.exec_()# 点击代理def event_proxy_click(self):from utils.dialog import ProxyDialogdialog = ProxyDialog()dialog.setWindowModality(Qt.ApplicationModal)dialog.exec_()# 点击开始def event_start_click(self):if self.switch != STOP:QMessageBox.warning(self, "错误", "正在执行获取终止中,请勿重复操作")returnself.switch = RUNNING# 1.为每一行创建一个线程去执行 ( 所有的线程记录) [x,x,x,x,]from utils.scheduler import SCHEDULER# 函数方法作为参数传入SCHEDULERSCHEDULER.start(BASE_DIR,self,self.task_start_callback,self.task_stop_callback,self.task_counter_callback,self.task_error_counter_callback)# 2.左下角标签显示执行中self.update_status_message("执行中")# 将状态改为执行中def task_start_callback(self, row_index):# 对表格中的数据进行状态更新cell_status = QTableWidgetItem(STATUS_MAPPING[2])#得到对应行的状态文本cell_status.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)#不可选中 不可修改self.table_widget.setItem(row_index, 6, cell_status)#修改状态# 将状态改为待执行def task_stop_callback(self, row_index):cell_status = QTableWidgetItem(STATUS_MAPPING[1])cell_status.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)self.table_widget.setItem(row_index, 6, cell_status)# 成功次数增加def task_counter_callback(self, row_index):# 原有个数+1old_count = self.table_widget.item(row_index, 4).text().strip()new_count = int(old_count) + 1# 表格赋值cell_status = QTableWidgetItem(str(new_count))cell_status.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)self.table_widget.setItem(row_index, 4, cell_status)# 503 次数增加def task_error_counter_callback(self, row_index):# 原有个数+1old_count = self.table_widget.item(row_index, 5).text().strip()new_count = int(old_count) + 1# 表格赋值cell_status = QTableWidgetItem(str(new_count))cell_status.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)self.table_widget.setItem(row_index, 5, cell_status)# 点击停止def event_stop_click(self):if self.switch != RUNNING:#如果点击停止时不是执行状态QMessageBox.warning(self, "错误", "不在执行中,不需要停止")returnself.switch = STOPPING# 1.执行中的线程逐一终止from utils.scheduler import SCHEDULERSCHEDULER.stop()# 2.更新状态# 接收message信息,更新到左下角信息标签def update_status_message(self, message):if message == "已终止":self.switch = STOPself.label_status.setText(message)self.label_status.repaint()if __name__ == '__main__':app = QApplication(sys.argv) # 实例化一个Application应用,所有的窗口均在其下运行window = MainWindow() # 实例化窗口对象window.show() # 窗口展示sys.exit(app.exec_())# app.exec_()运行主循环,并在退出时返回状态代码。# sys.exit(n)退出您的应用程序并返回n到父进程(通常是您的shell)
threads.py
import time
from PyQt5.QtCore import QThread, pyqtSignalclass NewTaskThread(QThread):# 信号,触发信号,更新窗体中的数据success = pyqtSignal(int, str, str, str) # 成功后向ui对象发送数据的声明error = pyqtSignal(int, str, str, str) # 失败后向ui对象发送数据的声明def __init__(self, row_index, asin, *args, kwargs):super().__init__(*args, kwargs)self.row_index = row_index # 行数self.asin = asin # 商品iddef run(self):""" 具体线程应该做的事"""try:title = "good_title_{}".format(self.asin) # good_title_B07YN82X3Burl = "https://blog.csdn.net/u011027547/{}".format(self.asin)# 获取到title和url,将这个信息填写到 表格上 & 写入文件中。self.success.emit(self.row_index, self.asin, title, url)except Exception as e:print(e)title = "监控项 {} 添加失败。".format(self.asin)self.error.emit(self.row_index, self.asin, title, str(e))class TaskThread(QThread):start_signal = pyqtSignal(int)#开始信号stop_signal = pyqtSignal(int)#结束信号counter_signal = pyqtSignal(int)#总数统计信号error_counter_signal = pyqtSignal(int)#错误进程信号def __init__(self, scheduler, log_file_path, row_index, asin, *args, kwargs):super().__init__(*args, kwargs)self.scheduler = schedulerself.log_file_path = log_file_path#日志文件self.row_index = row_indexself.asin = asindef run(self):# 触发start_signalself.start_signal.emit(self.row_index)import timeimport randomwhile True:# 停止if self.scheduler.terminate:#终止线程的执行。根据操作系统的调度策略,线程可能会立即终止,也可能不会立即终止。可以在终止()之后使用QThread.wait()。self.stop_signal.emit(self.row_index)# 自己的线程在thread_list中移除掉self.scheduler.destroy_thread(self)returntry:time.sleep(random.randint(1, 3))#随机休眠self.counter_signal.emit(self.row_index)# 日志写入,注意用mode=a 表示在原来的文件末尾续写而不是覆盖with open(self.log_file_path, mode='a', encoding='utf-8') as f:f.write("日志\\n")# 监控的动作# 1.根据型号访问通过bs4获取数据# 2.获取到数据 价格是否小于预期# 3.发送报警(邮件)time.sleep(5)#检测的5秒延时except Exception as e:self.error_counter_signal.emit(self.row_index)#如果错误,提交错误的行号class StopThread(QThread):update_signal = pyqtSignal(str)#线程传递数据的信号def __init__(self, scheduler, *args, kwargs):super().__init__(*args, kwargs)self.scheduler = schedulerdef run(self):# 1.监测线程数量(总线程数)while True:#死循环,直到所有的线程都结束了才跳出循环running_count = len(self.scheduler.thread_list)#得到存储当前正在运行的线程列表的长度self.update_signal.emit("正在终止({})".format(running_count))#正在终止X个还没有结束的线程if running_count == 0:#如果执行中的线程数为0,说明所有的线程都结束了,退出循环breaktime.sleep(1)self.update_signal.emit("已终止")#提交数据,更新左下角状态
scheduler.py
class Scheduler(object):def __init__(self):self.thread_list = []self.window = Noneself.terminate = False # 标志,点击停止,false表示此时可以启动,true表示正在关闭# 开始线程def start(self, base_dir, window, fn_start, fn_stop, fn_counter, fn_error_counter):self.window = window # 这里的window就是我们main传入的窗体对象,通过这个 .window.控件 我们可以调用窗体对象中的所有控件self.terminate = False# 1.获取表格中的所有数据,每一行创建一个线程去执行监控for row_index in range(window.table_widget.rowCount()):# 0/1/2/3asin = window.table_widget.item(row_index, 0).text().strip() # 得到型号status_text = window.table_widget.item(row_index, 6).text().strip() # 得到状态文本# 日志import oslog_folder = os.path.join(base_dir, 'log')if not os.path.exists(log_folder): # 如果日志文件夹不存在就新建os.makedirs(log_folder)log_file_path = os.path.join(log_folder, "{}.log".format(asin)) # 日志文件# 只有是待执行的时,才创建线程去执行if status_text != "待执行":continue# 2.每个线程 执行 & 状态实时的显示在表格中 信号+回调from utils.threads import TaskThreadt = TaskThread(self, log_file_path, row_index, asin, window)t.start_signal.connect(fn_start)t.counter_signal.connect(fn_counter)t.error_counter_signal.connect(fn_error_counter)t.stop_signal.connect(fn_stop)t.start()self.thread_list.append(t)# 停止线程def stop(self):self.terminate = True # 设置标记为true# 创建线程,去监测 thread_list 中的数量 + 实时更新的窗体的label中# self.window.update_status_message("xxx")from utils.threads import StopThreadt = StopThread(self, self.window)t.update_signal.connect(self.window.update_status_message) # 窗口更新数据的声明t.start()# 在线程列表中删除线程def destroy_thread(self, thread):self.thread_list.remove(thread) # 关闭线程时要把线程从我们正在执行的线程列表中移除# 单例模式,实例化了这一个对象SCHEDULER,后面的其他py文件引用时也会引用这个实例(所有的共同一个,方便状态的管理)
SCHEDULER = Scheduler()
总结
大家喜欢的话,给个👍,点个关注!给大家分享更多计算机专业学生的求学之路!
版权声明:
发现你走远了@mzh原创作品,转载必须标注原文链接
Copyright 2023 mzh
Crated:2023-3-1
欢迎关注 『pyqt5 从0基础开始项目实战』 专栏,持续更新中
欢迎关注 『pyqt5 从0基础开始项目实战』 专栏,持续更新中
『未完待续』
抖音电脑版