
python学习——多进程
- 进程的定义和状态
- 所需模块及参数解释
-
- 多进程之间的相互通讯
-
- 协程gevent
-
- 练习
进程的定义和状态
- 一个程序(py文件)运行之后,代码+用到的资源成为进程
- 进程是操作系统分配资源的基本单元,不仅可以通过线程完成多任务,进程也是可以的
- 进程和线程之间的区别:
-
- 1.操作系统会给锦城湖分配一块独立的内存地址,来存储进程的相关资源,但是线程没有
-
- 2.线程可以看做是进程的子单位,一个进程可以分为多个线程,同属于一个进程组的线程,共享同一块内存地址中的数据
-
- 进程的状态:
-
-
-
- 等待状态:等在某些条件满足,例如一个程序sleep了,就是处于等待状态
所需模块及参数解释
- 需要用到模块:
multiprocessing
- 导入模块 :
import multiprocessing
或者from multiprocessing import Process
- multiprocessing模块就是跨平台版本的多进程模块,提供了一个Process类来创建一个进程对象,这个兑现可以理解为一个单独的进程,可以执行另外的事情
- 如果是第一种导入,下面需要使用
multiprocessing.Process
来调用线程,第二种直接使用Process
即可
- Process方法中,需要传入参数,参数分别为:
-
-
-
-
-
kwargs
:为目标函数传递的关键词参数,目标是字典
进程实例常用的方法和属性
方法
-
-
-
join()
是否等待子进程执行结果,或者等待你多少秒
-
terminate()
不管任务是否完成,立即终止子进程
属性
- name:当前进程的别名,默认为Process-N,N是从1开始递增的整数
- pid:当前进程的pid(进程号)
import multiprocessing
import requests
import timedef res_1():url = "https://www.baidu.com"payload = {}requests.request("GET", url, data=payload)time.sleep(1)if __name__ == '__main__':data_1= []for i in range(3):a = multiprocessing.Process(target=res_1)a.start()data_1.append(a)print("进程是否还活着:{}".format(a.is_alive()))for a in data_1:a.join()print("进程的名字:{}".format(a.name))print("进程的pid:{}".format(a.pid))a.terminate()print("终止子进程之后,进程是否还活着:{}".format(a.is_alive()))from multiprocessing import Processdata_2 = []for i in range(3):a = Process(target=res_1)a.start()data_2.append(a)print("进程是否还活着:{}".format(a.is_alive()))for a in data_2:a.join()print("进程的名字:{}".format(a.name))print("进程的pid:{}".format(a.pid))print("运行完之后,进程是否还活着:{}".format(a.is_alive()))
注意事项
- 使用进程时,不添加
if __name__ == '__main__':
就会报错,原因:
-
- 如果写在外面,python开启多进程后,需要复制一份环境,为了创建这个环境,子进程重新创建这个“环境”的时候,会执行所有外面的内容,于是又运行了开启多线程的代码,循环创建,进入无线的递归,因此报错
- 运行时是在进程下起一个子线程来运行,不同进程之间的子线程,不会共享全局变量
多进程之间的相互通讯
- 因为多进程之间是不共享全局变量的,那么针对需要两个进程都需要处理的内容,多进程之间就需要进行相互通讯
- 进程进行相互通讯的方式有:队列、进程池
- 上面说的通讯队列,和
queue.Queue
不同,queue.Queue
是进程内非阻塞队列
- 但是进程通信所需要的
multiprocessing.Queue
,是跨进程的通信队列
- 多进程前者是各自私有,后者是子进程共有
- Process之间有时候需要通信,操作系统就提供了许多机制来实现进程之间的通信
队列:Queue
multiprocessing.Queue
可以实现多进程之间的数据传递,Queue本身就是一个消息队列
- Queue的工作原理是:在父进程中创建两个子进程,一个往Queue中填写数据,一个从Queue中读取数据
- 注意点:进程之间的Queue要单做参数传入,因为进程之间不共享全局变量
import multiprocessing
import requests
import time
from multiprocessing import Queuedef res_1(q):while q.qsize() > 0:url = q.get()payload = {}requests.request("GET", url, data=payload)print("第一个进程执行完毕*")time.sleep(1)def res_2(q):while q.qsize() > 0:url = q.get()payload = {}requests.request("GET", url, data=payload)print("第二个进程执行完毕*")time.sleep(1)if __name__ == '__main__':q = Queue()for i in range(10):q.put("https://www.baidu.com")data = []for i in range(3):a = multiprocessing.Process(target=res_1,args=(q,))a.start()print("进程是否还活着:{}".format(a.is_alive()))print("进程的名字:{}".format(a.name))print("进程的pid:{}".format(a.pid))data.append(a)for a in data:a.join() a.terminate() print("终止子进程之后,进程是否还活着:{}".format(a.is_alive()))
进程池
- 当需要创建的子进程数量不多的时候,可以使用
multiprocessing.Process
,动态生成多个进程
- 但是如果有上百甚至上千个目标,到后东去创建进程的工作量就巨大
- 此时就可以用到
multiprocessing
模块中的pool方法
pool
- 初始化pool时,可以指定一个最大进程数
- 当有新的请求提交到pool中,如果进程池没有满,就会创建一个新的进程用来执行该请求
- 但是如果进程池中的进程数已经达到最大值,那么该请求就会等待
- 直到池中有进程结束,才会用之前的进程来执行新的任务
pool常用的方法
apply_async()
:使用非阻塞的方式调用func
-
- 并行执行,堵塞方式必须等待上一个进程退出才会执行下一个进程
-
-
close()
:关闭pool,使其不再接受新的任务
terminate()
:不管任务是否完成,立即终止
join()
:主进程堵塞,等在子进程的退出,必须在close或者terminate之后使用
from multiprocessing import Pool
import os,time,randomdef worker(msg):t_start = time.time()print("{}开始执行,进程号是{}".format(msg,os.getpid()))time.sleep(random.random()*2)t_stop = time.time()print("{}执行完毕,执行时间{}".format(msg,t_stop-t_start))if __name__ == '__main__':po = Pool(3)for i in range(0,10):po.apply_async(func=worker,args=(i,))print("-------start--------")po.close() po.join() print("--------end--------")
协程gevent
- 线程之下可以创建协程,协程需要用到第三方库:gevent
- 协程存在于线程之中,线程默认不会等待协程执行
- 用户态(存在用户空间的数据)的轻量级线程(微线程)
- 协程能保留上一次状态
- 在单线程实现并发,串行操作,函数间来回切换执行
- 遇到IO操作就切换,IO 操作完了再回来
协程实例常用的方法
spawn()
:开启协程,第一个参数为协程要执行的任务
join()
:让线程等待协程执行
gevent.sleep()
:协程之间切换的条件,耗时等待的情况下才会切换,自动切换
joinall()
:joinall可以用在单人舞等在,也可以用在批量任务等待,只能将列表中的所有等待的时间都执行了才能结束,可用于添加所有协程函数
- 程序运行,首先要考虑协程,然后再考虑线程和进程
- 补丁,所需下面两行代码
-
from gevent import monkey
-
-
练习
- 10000个请求,使用开启2个进程,进程中开启4个线程,线程中开启5个协程来处理
- 一共30个携程
import time
from threading import Thread
from multiprocessing import Process,Queue
import gevent
import requests
def count_time(func):""" 计算函数运行时间的装饰器 """def wapper(*args,kwargs):print("-----start-----")start_t = time.time()func(*args,kwargs)end_t = time.time()print("-----end--------")print("耗时:{}".format(end_t - start_t))return wapperdef green_work(q,gname):"""每个协程的工作函数:param q::param gname::return:"""count = 0while not q.empty():url = q.get(timeout=0.1)payload = {}requests.request("GET", url, data=payload)gevent.sleep(0.1)count +=1time.sleep(1)print("---------协程{}执行了{}任务-----".format(gname,count))def thread_work(q,tname):"""每个此案城的执行任务函数,在线程中开启5个协程:param q: 进程、协程、线程的请求队列:param tname: 协程所属线程的名字:return:"""g_list = []for i in range(5):gname = "{}-g-{}".format(tname,i)print("协程创建--------{}".format(gname))g = gevent.spawn(green_work,q,gname)g_list.append(g)gevent.joinall(g_list)def process_work(q,pname):"""每个进程执行的任务函数,在该进程中开启3个线程创建一个线程:param q: 进程之间通讯的任务队列:param name:线程所属进程的名字:return:"""thread_list = []for i in range(3):tname = "{}-th-{}".format(pname,i)print("创建线程{}".format(tname))t = Thread(target=thread_work,args=(q,tname))thread_list.append(t)t.start()for t in thread_list:t.join()@count_time
def main():q = Queue() for i in range(1000):q.put("https://www.baidu.com")print("队列创建完毕,数量:{}".format(q.qsize()))pro_list = []for i in range(2):pname = "pro-{}".format(i)print("创建进程{}".format(pname))p = Process(target=process_work,args=(q,pname))p.start()pro_list.append(p)for p in pro_list:p.join()if __name__ == '__main__':main()