Python 异步编程与协程:从入门到精通
一、异步编程基础概念
1.1 同步 vs 异步
- 同步:代码顺序执行,一个任务完成后再执行下一个
- 异步:多个任务可以并发执行,无需等待前一个任务完成
1.2 为什么需要异步编程?
- I/O密集型任务(网络请求、文件读写)
- 高并发应用(Web服务器、爬虫)
- 提高程序响应性
二、协程(Coroutine)基础
2.1 什么是协程?
协程是可以在执行过程中暂停,并在适当时候恢复执行的函数。
# Python 3.5+ 使用 async/await 语法
import asyncio
async def say_hello():
print("Hello")
await asyncio.sleep(1) # 模拟I/O操作
print("World")
# 运行协程
asyncio.run(say_hello())
2.2 协程 vs 线程 vs 进程
| 特性 |
协程 |
线程 |
进程 |
|---|
| 创建开销 |
极小 |
中等 |
大 |
| 切换开销 |
极小 |
中等 |
大 |
| 内存占用 |
少 |
中等 |
多 |
| 并发能力 |
高 |
中等 |
低 |
| 数据共享 |
容易 |
需要注意锁 |
复杂 |
三、async/await 详解
3.1 定义异步函数
import asyncio
async def fetch_data(url):
"""模拟获取数据"""
print(f"开始获取 {url}")
await asyncio.sleep(2) # 模拟网络延迟
print(f"完成获取 {url}")
return f"{url}的数据"
async def main():
# 顺序执行
result1 = await fetch_data("https://api.example.com/data1")
result2 = await fetch_data("https://api.example.com/data2")
# 并发执行
task1 = asyncio.create_task(fetch_data("https://api.example.com/data3"))
task2 = asyncio.create_task(fetch_data("https://api.example.com/data4"))
results = await asyncio.gather(task1, task2)
print(results)
四、异步上下文管理器
import aiofiles # 需要安装:pip install aiofiles
import asyncio
class AsyncDatabaseConnection:
async def __aenter__(self):
print("连接数据库...")
await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("关闭数据库连接...")
await asyncio.sleep(0.5)
async def query(self, sql):
await asyncio.sleep(0.5)
return f"查询结果: {sql}"
async def main():
async with AsyncDatabaseConnection() as db:
result = await db.query("SELECT * FROM users")
print(result)
五、实战:异步Web请求
import aiohttp # 需要安装:pip install aiohttp
import asyncio
import time
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/3"
]
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f}秒")
print(f"获取到 {len(results)} 个结果")
六、异步队列和生产者消费者模式
import asyncio
import random
async def producer(queue, name):
for i in range(5):
item = f"产品-{name}-{i}"
await queue.put(item)
print(f"生产者 {name} 生产了: {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
async def consumer(queue, name):
while True:
try:
item = await asyncio.wait_for(queue.get(), timeout=2)
print(f"消费者 {name} 消费了: {item}")
queue.task_done()
await asyncio.sleep(random.uniform(0.1, 0.3))
except asyncio.TimeoutError:
break
async def main():
queue = asyncio.Queue(maxsize=10)
# 创建生产者任务
producers = [
asyncio.create_task(producer(queue, f"P{i}"))
for i in range(3)
]
# 创建消费者任务
consumers = [
asyncio.create_task(consumer(queue, f"C{i}"))
for i in range(2)
]
await asyncio.gather(*producers)
await queue.join() # 等待所有任务完成
# 取消消费者任务
for c in consumers:
c.cancel()
七、高级模式:信号量和限流
import asyncio
class RateLimiter:
def __init__(self, rate_limit):
self.semaphore = asyncio.Semaphore(rate_limit)
async def __aenter__(self):
await self.semaphore.acquire()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await asyncio.sleep(1) # 限制每秒请求数
self.semaphore.release()
async def limited_request(url, limiter):
async with limiter:
print(f"请求: {url}")
await asyncio.sleep(0.5)
return f"响应: {url}"
async def main():
limiter = RateLimiter(rate_limit=3) # 限制同时3个请求
urls = [f"https://example.com/{i}" for i in range(10)]
tasks = [
limited_request(url, limiter)
for url in urls
]
results = await asyncio.gather(*tasks)
八、异步错误处理
import asyncio
async def risky_operation():
await asyncio.sleep(1)
if random.random() < 0.3:
raise ValueError("随机错误发生!")
return "操作成功"
async def main():
tasks = [risky_operation() for _ in range(5)]
# 方法1:gather的return_exceptions参数
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务 {i} 失败: {result}")
else:
print(f"任务 {i} 成功: {result}")
# 方法2:使用asyncio.as_completed
for coro in asyncio.as_completed(tasks):
try:
result = await coro
print(f"完成: {result}")
except Exception as e:
print(f"错误: {e}")
九、性能优化技巧
9.1 避免阻塞操作
# 错误示例:在协程中使用阻塞调用
async def bad_example():
import time
time.sleep(5) # 这会阻塞整个事件循环!
# 正确示例
async def good_example():
await asyncio.sleep(5) # 非阻塞
9.2 使用线程池处理CPU密集型任务
import asyncio
import concurrent.futures
import time
def cpu_intensive_task(n):
"""模拟CPU密集型任务"""
return sum(i * i for i in range(10**6))
async def main():
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
tasks = [loop.run_in_executor(pool, cpu_intensive_task, i)
for i in range(4)]
results = await asyncio.gather(*tasks)
print(f"结果: {results}")
十、实战项目:异步Web爬虫
import asyncio
import aiohttp
from bs4 import BeautifulSoup
from urllib.parse import urljoin
import time
class AsyncWebCrawler:
def __init__(self, start_url, max_concurrent=10):
self.start_url = start_url
self.visited = set()
self.to_visit = asyncio.Queue()
self.semaphore = asyncio.Semaphore(max_concurrent)
self.results = []
async def fetch(self, session, url):
async with self.semaphore:
try:
async with session.get(url, timeout=10) as response:
if response.status == 200:
html = await response.text()
return html
except Exception as e:
print(f"获取 {url} 失败: {e}")
return None
async def parse_links(self, html, base_url):
soup = BeautifulSoup(html, 'html.parser')
links = set()
for a_tag in soup.find_all('a', href=True):
href = a_tag['href']
absolute_url = urljoin(base_url, href)
if absolute_url.startswith('http'):
links.add(absolute_url)
return links
async def crawl(self, session, url):
if url in self.visited:
return
self.visited.add(url)
print(f"爬取: {url}")
html = await self.fetch(session, url)
if html:
self.results.append({
'url': url,
'content': html[:100] # 只存储前100字符
})
links = await self.parse_links(html, url)
for link in links:
if link not in self.visited:
await self.to_visit.put(link)
async def worker(self, session):
while True:
try:
url = await asyncio.wait_for(
self.to_visit.get(),
timeout=5
)
await self.crawl(session, url)
self.to_visit.task_done()
except asyncio.TimeoutError:
break
async def run(self, max_pages=50):
await self.to_visit.put(self.start_url)
async with aiohttp.ClientSession() as session:
# 创建多个worker并发爬取
workers = [
asyncio.create_task(self.worker(session))
for _ in range(10)
]
# 等待队列清空或达到最大页数
while (len(self.visited) < max_pages and
not self.to_visit.empty()):
await asyncio.sleep(0.1)
# 等待所有worker完成
await self.to_visit.join()
# 取消worker任务
for w in workers:
w.cancel()
return self.results
# 使用示例
async def main():
crawler = AsyncWebCrawler("https://example.com")
start_time = time.time()
results = await crawler.run(max_pages=30)
end_time = time.time()
print(f"爬取了 {len(results)} 个页面")
print(f"总耗时: {end_time - start_time:.2f}秒")
for result in results[:3]:
print(f"URL: {result['url']}")
print(f"内容预览: {result['content']}...\n")
十一、调试和测试
11.1 调试异步代码
import asyncio
import logging
# 设置日志
logging.basicConfig(level=logging.DEBUG)
async def debug_example():
await asyncio.sleep(1)
print("执行中...")
# 使用asyncio调试模式
asyncio.run(debug_example(), debug=True)
11.2 异步单元测试
import pytest
import asyncio
async def async_function():
await asyncio.sleep(0.1)
return 42
@pytest.mark.asyncio
async def test_async_function():
result = await async_function()
assert result == 42
十二、最佳实践总结
合理使用async/await:只在真正需要异步的地方使用
避免阻塞调用:使用
await而非同步阻塞方法
控制并发数量:使用信号量限制并发
及时清理资源:正确使用async上下文管理器
错误处理:为每个异步任务添加异常处理
性能监控:使用
asyncio的调试工具监控性能
代码组织:保持异步函数简洁,避免过长的协程链
学习路径建议
入门阶段(1-2周)
- 理解同步/异步基本概念
- 掌握async/await基本语法
- 编写简单的异步函数
进阶阶段(2-4周)
- 学习asyncio核心组件
- 掌握任务、队列、信号量
- 实现生产者消费者模式
精通阶段(1-2月)
- 深入理解事件循环
- 性能调优和调试
- 复杂系统架构设计
- 源码阅读和贡献
常见陷阱及解决方案
忘记await
# 错误
async def func():
asyncio.sleep(1) # 缺少await
# 正确
async def func():
await asyncio.sleep(1)
在同步代码中调用异步函数
# 错误
result = async_func() # 返回的是coroutine对象
# 正确
result = await async_func()
# 或者在事件循环中
result = asyncio.run(async_func())
过多并发导致资源耗尽
# 使用信号量控制并发
semaphore = asyncio.Semaphore(100)
async with semaphore:
await fetch_data()
通过系统学习这些内容,你将能够熟练运用Python异步编程,构建高性能、高并发的应用程序。记住,实践是最好的老师,多写代码、多调试、多优化!