> 文章列表 > aiohttp 异步爬虫实战

aiohttp 异步爬虫实战

aiohttp 异步爬虫实战

一、aiohttp

之前介绍的 asyncio 模块内部实现了对 TCP、UDP、SSL 协议的异步操作,但是对于 HTTP 请求的异步操作来说,我们就需要用到 aiohttp 来实现了。aiohttp 是一个基于 asyncio 的异步 HTTP 网络模块,它既提供了服务端,又提供了客户端。其中我们用服务端可以搭建一个支持异步处理的服务器,用于处理请求并返回响应,类似于 Django、Flask、Tornado 等一些 Web 服务器。而客户端我们就可以用来发起请求,就类似于 requests 来发起一个 HTTP 请求然后获得响应,但 requests 发起的是同步的网络请求,而 aiohttp 则发起的是异步的。

二、基本使用

  1. 基本实例

首先我们来看一个基本的 aiohttp 请求案例,代码如下:

import aiohttp
import asyncio
async def fetch(session, url):async with session.get(url) as response:return await response.text(), response.status
async def main():async with aiohttp.ClientSession() as session:html, status = await fetch(session, 'https://cuiqingcai.com')print(f'html: {html[:100]}...')print(f'status: {status}')
if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(main())

在这里我们使用 aiohttp 来爬取了我的个人博客,获得了源码和响应状态码并输出,运行结果如下:

html: <!DOCTYPE HTML>
<html>
<head>
<meta charset="UTF-8">
<meta name="baidu-tc-verification" content=...
status: 200

这里网页源码过长,只截取输出了一部分,可以看到我们成功获取了网页的源代码及响应状态码 200,也就完成了一次基本的 HTTP 请求,即我们成功使用 aiohttp 通过异步的方式进行了网页的爬取,当然这个操作用之前我们所讲的 requests 同样也可以做到。

我们可以看到其请求方法的定义和之前有了明显的区别,主要有如下几点:

  1. 首先在导入库的时候,我们除了必须要引入 aiohttp 这个库之外,还必须要引入 asyncio 这个库,因为要实现异步爬取需要启动协程,而协程则需要借助于 asyncio 里面的事件循环来执行。除了事件循环,asyncio 里面也提供了很多基础的异步操作

  2. 异步爬取的方法的定义和之前有所不同,在每个异步方法前面统一要加 async 来修饰。with as 语句前面同样需要加 async 来修饰,在 Python 中,with as 语句用于声明一个上下文管理器,能够帮我们自动分配和释放资源,而在异步方法中,with as 前面加上 async 代表声明一个支持异步的上下文管理器

  3. 对于一些返回 coroutine 的操作,前面需要加 await 来修饰,如 response 调用 text 方法,查询 API 可以发现其返回的是coroutine 对象,那么前面就要加 await

  4. 而对于状态码来说,其返回值就是一个数值类型,那么前面就不需要加 await。所以,这里可以按照实际情况处理,参考官方文档说明,看看其对应的返回值是怎样的类型,然后决定加不加 await 就可以了。

  5. 最后,定义完爬取方法之后,实际上是 main 方法调用了 fetch 方法。要运行的话,必须要启用事件循环,事件循环就需要使用 asyncio 库,然后使用 run_until_complete 方法来运行。注意在 Python 3.7 及以后的版本中,我们可以使用asyncio.run(main())来代替最后的启动操作,不需要显式声明事件循环,run 方法内部会自动启动一个事件循环
    。但这里为了兼容更多的 Python版本,依然还是显式声明了事件循环。

  6. URL 参数设置
    对于 URL 参数的设置,我们可以借助于 params 参数,传入一个字典即可示例如下:

import aiohttp
import asyncio
async def main():params = {'name': 'germey', 'age': 25}async with aiohttp.ClientSession() as session:async with session.get('https://httpbin.org/get', params=params) as response:print(await response.text())
if __name__ == '__main__':asyncio.get_event_loop().run_until_complete(main())

运行结果如下:

{"args": {"age": "25","name": "germey"},"headers": {"Accept": "*/*","Accept-Encoding": "gzip, deflate","Host": "httpbin.org","User-Agent": "Python/3.7 aiohttp/3.6.2","X-Amzn-Trace-Id": "Root=1-5e85eed2-d240ac90f4dddf40b4723ef0"},"origin": "17.20.255.122","url": "https://httpbin.org/get?name=germey&age=25"
}

这里可以看到,其实际请求的 URL 为 https://httpbin.org/get?name=germey&age=25,其 URL 请求参数就对应了 params 的内容。

  1. 其他请求类型

另外 aiohttp 还支持其他的请求类型,如 POST、PUT、DELETE 等等,这个和 requests 的使用方式有点类似,示例如下:

session.post('http://httpbin.org/post', data=b'data')
session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')
  • POST 数据

对于 POST 表单提交,其对应的请求头的 Content-type 为 application/x-www-form-urlencoded,我们可以用如下方式来实现,代码示例如下:

import aiohttp
import asyncio
async def main():data = {'name': 'germey', 'age': 25}async with aiohttp.ClientSession() as session:async with session.post('https://httpbin.org/post', data=data) as response:print(await response.text())
if __name__ == '__main__':asyncio.get_event_loop().run_until_complete(main())

运行结果如下:

{"args": {},"data": "","files": {},"form": {"age": "25","name": "germey"},"headers": {"Accept": "*/*","Accept-Encoding": "gzip, deflate","Content-Length": "18","Content-Type": "application/x-www-form-urlencoded","Host": "httpbin.org","User-Agent": "Python/3.7 aiohttp/3.6.2","X-Amzn-Trace-Id": "Root=1-5e85f0b2-9017ea603a68dc285e0552d0"},"json": null,"origin": "17.20.255.58","url": "https://httpbin.org/post"
}

对于 POST JSON 数据提交,其对应的请求头的 Content-type 为 application/json,我们只需要将 post 方法的 data 参数改成 json 即可,代码示例如下:

async def main():data = {'name': 'germey', 'age': 25}async with aiohttp.ClientSession() as session:async with session.post('https://httpbin.org/post', json=data) as response:print(await response.text())

运行结果如下:

{"args": {},"data": "{\\"name\\": \\"germey\\", \\"age\\": 25}","files": {},"form": {},"headers": {"Accept": "*/*","Accept-Encoding": "gzip, deflate","Content-Length": "29","Content-Type": "application/json","Host": "httpbin.org","User-Agent": "Python/3.7 aiohttp/3.6.2","X-Amzn-Trace-Id": "Root=1-5e85f03e-c91c9a20c79b9780dbed7540"},"json": {"age": 25,"name": "germey"},"origin": "17.20.255.58","url": "https://httpbin.org/post"
}
  • 响应字段

对于响应来说,我们可以用如下的方法分别获取响应的状态码、响应头、响应体、响应体二进制内容、响应体 JSON 结果,代码示例如下:

import aiohttp
import asyncio
async def main():data = {'name': 'germey', 'age': 25}async with aiohttp.ClientSession() as session:async with session.post('https://httpbin.org/post', data=data) as response:print('status:', response.status)print('headers:', response.headers)print('body:', await response.text())print('bytes:', await response.read())print('json:', await response.json())
if __name__ == '__main__':asyncio.get_event_loop().run_until_complete(main())

运行结果如下:

status: 200
headers: <CIMultiDictProxy('Date': 'Thu, 02 Apr 2020 14:13:05 GMT', 'Content-Type': 'application/json', 'Content-Length': '503', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
body: {"args": {},"data": "","files": {},"form": {"age": "25","name": "germey"},"headers": {"Accept": "*/*","Accept-Encoding": "gzip, deflate","Content-Length": "18","Content-Type": "application/x-www-form-urlencoded","Host": "httpbin.org","User-Agent": "Python/3.7 aiohttp/3.6.2","X-Amzn-Trace-Id": "Root=1-5e85f2f1-f55326ff5800b15886c8e029"},"json": null,"origin": "17.20.255.58","url": "https://httpbin.org/post"
}
bytes: b'{\\n  "args": {}, \\n  "data": "", \\n  "files": {}, \\n  "form": {\\n    "age": "25", \\n    "name": "germey"\\n  }, \\n  "headers": {\\n    "Accept": "*/*", \\n    "Accept-Encoding": "gzip, deflate", \\n    "Content-Length": "18", \\n    "Content-Type": "application/x-www-form-urlencoded", \\n    "Host": "httpbin.org", \\n    "User-Agent": "Python/3.7 aiohttp/3.6.2", \\n    "X-Amzn-Trace-Id": "Root=1-5e85f2f1-f55326ff5800b15886c8e029"\\n  }, \\n  "json": null, \\n  "origin": "17.20.255.58", \\n  "url": "https://httpbin.org/post"\\n}\\n'
json: {'args': {}, 'data': '', 'files': {}, 'form': {'age': '25', 'name': 'germey'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '18', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'httpbin.org', 'User-Agent': 'Python/3.7 aiohttp/3.6.2', 'X-Amzn-Trace-Id': 'Root=1-5e85f2f1-f55326ff5800b15886c8e029'}, 'json': None, 'origin': '17.20.255.58', 'url': 'https://httpbin.org/post'}

这里我们可以看到有些字段前面需要加 await,有的则不需要。其原则是,如果其返回的是一个 coroutine 对象(如 async 修饰的方法),那么前面就要加 await,具体可以看 aiohttp 的 API,其链接为:https://docs.aiohttp.org/en/stable/client_reference.html。

  • 超时设置

对于超时的设置,我们可以借助于 ClientTimeout 对象,比如这里我要设置 1 秒的超时,可以这么来实现:

import aiohttp
import asyncio
async def main():timeout = aiohttp.ClientTimeout(total=1)async with aiohttp.ClientSession(timeout=timeout) as session:async with session.get('https://httpbin.org/get') as response:print('status:', response.status)
if __name__ == '__main__':asyncio.get_event_loop().run_until_complete(main())

如果在 1 秒之内成功获取响应的话,运行结果如下:

200

如果超时的话,会抛出 TimeoutError 异常,其类型为 asyncio.TimeoutError,我们再进行异常捕获即可。另外 ClientTimeout 对象声明时还有其他参数,如 connect、socket_connect 等,详细说明可以参考官方文档:https://docs.aiohttp.org/en/stable/client_quickstart.html#timeouts

  • 并发限制

由于 aiohttp 可以支持非常大的并发,比如上万、十万、百万都是能做到的,但这么大的并发量,目标网站是很可能在短时间内无法响应的,而且很可能瞬时间将目标网站爬挂掉。所以我们需要控制一下爬取的并发量

在一般情况下,我们可以借助于 asyncio 的 Semaphore 来控制并发量,代码示例如下:

import asyncio
import aiohttp
CONCURRENCY = 5
URL = 'https://www.baidu.com'
semaphore = asyncio.Semaphore(CONCURRENCY)
session = None
async def scrape_api():async with semaphore:print('scraping', URL)async with session.get(URL) as response:await asyncio.sleep(1)return await response.text()
async def main():global sessionsession = aiohttp.ClientSession()scrape_index_tasks = [asyncio.ensure_future(scrape_api()) for _ in range(10000)]await asyncio.gather(*scrape_index_tasks)
if __name__ == '__main__':asyncio.get_event_loop().run_until_complete(main())

在这里我们声明了 CONCURRENCY 代表爬取的最大并发量为 5,同时声明爬取的目标 URL 为百度。接着我们借助于 Semaphore 创建了一个信号量对象,赋值为 semaphore,这样我们就可以用它来控制最大并发量了。怎么使用呢?我们这里把它直接放置在对应的爬取方法里面,使用 async with 语句将 semaphore 作为上下文对象即可。这样的话,信号量可以控制进入爬取的最大协程数量,最大数量就是我们声明的 CONCURRENCY 的值。

在 main 方法里面,我们声明了 10000 个 task,传递给 gather 方法运行。倘若不加以限制,这 10000 个 task 会被同时执行,并发数量太大。但有了信号量的控制之后,同时运行的 task 的数量最大会被控制在 5 个,这样就能给 aiohttp 限制速度了

在这里,aiohttp 的基本使用就介绍这么多,更详细的内容还是推荐你到官方文档查阅,链接:https://docs.aiohttp.org/。