> 文章列表 > python协程实战

python协程实战

python协程实战

协程简介

协程(Coroutine)又称微线程、纤程,协程不是进程或线程,其执行过程类似于 Python 函数调用,Python 的 asyncio 模块实现的异步IO编程框架中,协程是对使用 async 关键字定义的异步函数的调用;

一个进程包含多个线程,类似于一个人体组织有多种细胞在工作,同样,一个程序可以包含多个协程。多个线程相对独立,线程的切换受系统控制。同样,多个协程也相对独立,但是其切换由程序自己控制。

一个简单例子

我们来使用一个简单的例子了解协程,首先看看下面的代码:

import time
def display(num):time.sleep(1)print(num)
for num in range(10):display(num)

 

很容易看得懂,程序会输出0到9的数字,每隔1秒中输出一个数字,因此整个程序的执行需要大约10秒 时间。值得注意的是,因为没有使用多线程或多进程(并发),程序中只有一个执行单元(只有一个线程在 执行),而 time.sleep(1) 的休眠操作会让整个线程停滞1秒钟,

对于上面的代码来说,在这段时间里面 CPU是闲置的没有做什么事情。

我们再来看看使用协程会发生什么:

import asyncio
async def display(num): # 在函数前使用async关键字,变成异步函数 await asyncio.sleep(1)print(num)

 

异步函数不同于普通函数,调用普通函数会得到返回值,而调用异步函数会得到一个协程对象。我们需要将协程对象放到一个事件循环中才能达到与其他协程对象协作的效果,因为事件循环会负责处理子程 序切换的操作。
简单的说就是让阻塞的子程序让出CPU给可以执行的子程序。

基本概念

异步IO是指程序发起一个IO操作(阻塞等待)后,不用等IO操作结束,可以继续其它操作;做其他事情,当IO操作结束时,会得到通知,然后继续执行。异步IO编程是实现并发的一种方式,适用于IO密集型任务

Python 模块 asyncio 提供了一个异步编程框架,全局的流程图大致如下:

代码介绍

import asyncioasync def test():await asyncio.sleep(1)print('hello 异步')c = test()  # 调用异步函数,得到协程对象-->c
loop = asyncio.get_event_loop()  # 创建事件循环
loop.run_until_complete(c)  # 把协程对象丢给循环,并执行异步函数内部代码

await asyncio.sleep(1):用来模拟耗时的任务

task:对协程对象的进一步封装

async def test():print('hello 异步')
c = test() # 调用异步函数,得到协程对象-->c
loop = asyncio.get_event_loop() # 创建事件循环
task = loop.create_task(c) # 创建task任务
print(task)
loop.run_until_complete(task) # 执行任务

future:代表以后执行或者没有执行的任务,实际上和task没有本质区别

async def func(url):print(f'正在对{url}发起请求:')print(f'请求{url}成功!')c = func('www.baidu.com')  # 函数调用的写成对象--> cloop = asyncio.get_event_loop()  # 创建一个事件循环对象
future_task = asyncio.ensure_future(c)
print(future_task, '未执行')
loop.run_until_complete(future_task)  # 注册加启动
print(future_task, '执行完了')

多任务协程

任务(Task)对象用于封装协程对象,保存了协程运行后的状态,使用 run_until_complete() 方法将任务注册到事件循环;

如果我们想要使用多任务,那么我们就需要同时注册多个任务的列表,可以使用 run_until_complete(asyncio.wait(tasks))

这里的tasks,表示一个任务序列(通常为列表)

注册多个任务也可以使用run_until_complete(asyncio. gather(*tasks))

import asyncio, timeasync def do_some_work(i, n):  # 使用async关键字定义异步函数print('任务{}等待: {}秒'.format(i, n))await asyncio.sleep(n)  # 休眠一段时间return '任务{}在{}秒后返回结束运行'.format(i, n)start_time = time.time()  # 开始时间
tasks = [asyncio.ensure_future(do_some_work(1, 2)),asyncio.ensure_future(do_some_work(2, 1)),asyncio.ensure_future(do_some_work(3, 3))]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:print('任务执行结果: ', task.result())
print('运行时间: ', time.time() - start_time)

以上就是协程的基本使用方法,下面做一个实战熟悉一下效果

协程实战

# -*- coding: utf-8 -*-
# Created by Xue Jian on 4/22/23
import asyncio
import os
import timeimport requests
import jsondef get_pages(limit=21):page_urls = []for i in range(1, limit):url = 'https://game.gtimg.cn/images/lol/act/img/js/hero/{}.js'.format(i)# print(url)page_urls.append(url)return page_urlsheaders = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'
}def get_img(limit):img_urls = []page_urls = get_pages(limit=limit)for page_url in page_urls:res = requests.get(page_url, headers=headers)result = res.content.decode('utf-8')res_dict = json.loads(result)skins = res_dict['skins']img_url = []for hero in skins:# todo 这里item放在外面,只有最后一个结果item = {}item['name'] = hero["heroName"]item['skin_name'] = hero["name"]if hero["mainImg"] == '':continueitem['imgLink'] = hero["mainImg"]# print(item)img_url.append(item)img_urls.extend(img_url)return img_urls'''
{'name': '黑暗之女', 'skin_name': '毒菇梦魇 安妮', 'imgLink': 'https://game.gtimg.cn/images/lol/act/img/skin/big_4c8b9ce8-4d3f-4c05-80b9-3207891f0147.jpg'}
'''async def save_img(index, img_url):"""声明异步协程任务:param index::param img_url::return:"""path = "皮肤/" + img_url['name']if not os.path.exists(path):os.makedirs(path)content = requests.get(img_url['imgLink'], headers=headers).content# 这里就是会发生阻塞的任务  =>  await asyncio.sleep()print('******正在下载第{}张************'.format(index))with open('./皮肤/' + img_url['name'] + '/' + img_url['skin_name'] + str(index) + '.jpg', 'wb') as f:f.write(content)def main(limit):# 创建事件循环loop = asyncio.get_event_loop()# 获取图片url列表img_urls = get_img(limit)print(len(img_urls))# 创建协程任务tasks = [save_img(img[0], img[1]) for img in enumerate(img_urls)]try:loop.run_until_complete(asyncio.wait(tasks))finally:loop.close()"""以下是同步操作首先将 async def save_img(index, img)  改成  def save_img(index, img)将try finally替换成以下for index, img in enumerate(img_urls):save_img(index, img)"""if __name__ == '__main__':start = time.time()main(21)end = time.time()print('cost_timer:::', end - start)