三亚市文章资讯

Python开发从入门到精通:异步编程与协程

2026-03-26 12:30:02 浏览次数:2
详细信息

Python 异步编程与协程:从入门到精通

一、异步编程基础概念

1.1 同步 vs 异步 1.2 为什么需要异步编程?

二、协程(Coroutine)基础

2.1 什么是协程?

协程是可以在执行过程中暂停,并在适当时候恢复执行的函数。

# Python 3.5+ 使用 async/await 语法
import asyncio

async def say_hello():
    print("Hello")
    await asyncio.sleep(1)  # 模拟I/O操作
    print("World")

# 运行协程
asyncio.run(say_hello())
2.2 协程 vs 线程 vs 进程
特性 协程 线程 进程
创建开销 极小 中等
切换开销 极小 中等
内存占用 中等
并发能力 中等
数据共享 容易 需要注意锁 复杂

三、async/await 详解

3.1 定义异步函数
import asyncio

async def fetch_data(url):
    """模拟获取数据"""
    print(f"开始获取 {url}")
    await asyncio.sleep(2)  # 模拟网络延迟
    print(f"完成获取 {url}")
    return f"{url}的数据"

async def main():
    # 顺序执行
    result1 = await fetch_data("https://api.example.com/data1")
    result2 = await fetch_data("https://api.example.com/data2")

    # 并发执行
    task1 = asyncio.create_task(fetch_data("https://api.example.com/data3"))
    task2 = asyncio.create_task(fetch_data("https://api.example.com/data4"))

    results = await asyncio.gather(task1, task2)
    print(results)

四、异步上下文管理器

import aiofiles  # 需要安装:pip install aiofiles
import asyncio

class AsyncDatabaseConnection:
    async def __aenter__(self):
        print("连接数据库...")
        await asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("关闭数据库连接...")
        await asyncio.sleep(0.5)

    async def query(self, sql):
        await asyncio.sleep(0.5)
        return f"查询结果: {sql}"

async def main():
    async with AsyncDatabaseConnection() as db:
        result = await db.query("SELECT * FROM users")
        print(result)

五、实战:异步Web请求

import aiohttp  # 需要安装:pip install aiohttp
import asyncio
import time

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/3"
    ]

    start_time = time.time()

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    end_time = time.time()
    print(f"总耗时: {end_time - start_time:.2f}秒")
    print(f"获取到 {len(results)} 个结果")

六、异步队列和生产者消费者模式

import asyncio
import random

async def producer(queue, name):
    for i in range(5):
        item = f"产品-{name}-{i}"
        await queue.put(item)
        print(f"生产者 {name} 生产了: {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))

async def consumer(queue, name):
    while True:
        try:
            item = await asyncio.wait_for(queue.get(), timeout=2)
            print(f"消费者 {name} 消费了: {item}")
            queue.task_done()
            await asyncio.sleep(random.uniform(0.1, 0.3))
        except asyncio.TimeoutError:
            break

async def main():
    queue = asyncio.Queue(maxsize=10)

    # 创建生产者任务
    producers = [
        asyncio.create_task(producer(queue, f"P{i}"))
        for i in range(3)
    ]

    # 创建消费者任务
    consumers = [
        asyncio.create_task(consumer(queue, f"C{i}"))
        for i in range(2)
    ]

    await asyncio.gather(*producers)
    await queue.join()  # 等待所有任务完成

    # 取消消费者任务
    for c in consumers:
        c.cancel()

七、高级模式:信号量和限流

import asyncio

class RateLimiter:
    def __init__(self, rate_limit):
        self.semaphore = asyncio.Semaphore(rate_limit)

    async def __aenter__(self):
        await self.semaphore.acquire()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await asyncio.sleep(1)  # 限制每秒请求数
        self.semaphore.release()

async def limited_request(url, limiter):
    async with limiter:
        print(f"请求: {url}")
        await asyncio.sleep(0.5)
        return f"响应: {url}"

async def main():
    limiter = RateLimiter(rate_limit=3)  # 限制同时3个请求
    urls = [f"https://example.com/{i}" for i in range(10)]

    tasks = [
        limited_request(url, limiter)
        for url in urls
    ]

    results = await asyncio.gather(*tasks)

八、异步错误处理

import asyncio

async def risky_operation():
    await asyncio.sleep(1)
    if random.random() < 0.3:
        raise ValueError("随机错误发生!")
    return "操作成功"

async def main():
    tasks = [risky_operation() for _ in range(5)]

    # 方法1:gather的return_exceptions参数
    results = await asyncio.gather(*tasks, return_exceptions=True)
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"任务 {i} 失败: {result}")
        else:
            print(f"任务 {i} 成功: {result}")

    # 方法2:使用asyncio.as_completed
    for coro in asyncio.as_completed(tasks):
        try:
            result = await coro
            print(f"完成: {result}")
        except Exception as e:
            print(f"错误: {e}")

九、性能优化技巧

9.1 避免阻塞操作
# 错误示例:在协程中使用阻塞调用
async def bad_example():
    import time
    time.sleep(5)  # 这会阻塞整个事件循环!

# 正确示例
async def good_example():
    await asyncio.sleep(5)  # 非阻塞
9.2 使用线程池处理CPU密集型任务
import asyncio
import concurrent.futures
import time

def cpu_intensive_task(n):
    """模拟CPU密集型任务"""
    return sum(i * i for i in range(10**6))

async def main():
    loop = asyncio.get_running_loop()

    with concurrent.futures.ThreadPoolExecutor() as pool:
        tasks = [loop.run_in_executor(pool, cpu_intensive_task, i) 
                for i in range(4)]
        results = await asyncio.gather(*tasks)
        print(f"结果: {results}")

十、实战项目:异步Web爬虫

import asyncio
import aiohttp
from bs4 import BeautifulSoup
from urllib.parse import urljoin
import time

class AsyncWebCrawler:
    def __init__(self, start_url, max_concurrent=10):
        self.start_url = start_url
        self.visited = set()
        self.to_visit = asyncio.Queue()
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.results = []

    async def fetch(self, session, url):
        async with self.semaphore:
            try:
                async with session.get(url, timeout=10) as response:
                    if response.status == 200:
                        html = await response.text()
                        return html
            except Exception as e:
                print(f"获取 {url} 失败: {e}")
                return None

    async def parse_links(self, html, base_url):
        soup = BeautifulSoup(html, 'html.parser')
        links = set()

        for a_tag in soup.find_all('a', href=True):
            href = a_tag['href']
            absolute_url = urljoin(base_url, href)
            if absolute_url.startswith('http'):
                links.add(absolute_url)

        return links

    async def crawl(self, session, url):
        if url in self.visited:
            return

        self.visited.add(url)
        print(f"爬取: {url}")

        html = await self.fetch(session, url)
        if html:
            self.results.append({
                'url': url,
                'content': html[:100]  # 只存储前100字符
            })

            links = await self.parse_links(html, url)
            for link in links:
                if link not in self.visited:
                    await self.to_visit.put(link)

    async def worker(self, session):
        while True:
            try:
                url = await asyncio.wait_for(
                    self.to_visit.get(), 
                    timeout=5
                )
                await self.crawl(session, url)
                self.to_visit.task_done()
            except asyncio.TimeoutError:
                break

    async def run(self, max_pages=50):
        await self.to_visit.put(self.start_url)

        async with aiohttp.ClientSession() as session:
            # 创建多个worker并发爬取
            workers = [
                asyncio.create_task(self.worker(session))
                for _ in range(10)
            ]

            # 等待队列清空或达到最大页数
            while (len(self.visited) < max_pages and 
                   not self.to_visit.empty()):
                await asyncio.sleep(0.1)

            # 等待所有worker完成
            await self.to_visit.join()

            # 取消worker任务
            for w in workers:
                w.cancel()

        return self.results

# 使用示例
async def main():
    crawler = AsyncWebCrawler("https://example.com")
    start_time = time.time()
    results = await crawler.run(max_pages=30)
    end_time = time.time()

    print(f"爬取了 {len(results)} 个页面")
    print(f"总耗时: {end_time - start_time:.2f}秒")
    for result in results[:3]:
        print(f"URL: {result['url']}")
        print(f"内容预览: {result['content']}...\n")

十一、调试和测试

11.1 调试异步代码
import asyncio
import logging

# 设置日志
logging.basicConfig(level=logging.DEBUG)

async def debug_example():
    await asyncio.sleep(1)
    print("执行中...")

# 使用asyncio调试模式
asyncio.run(debug_example(), debug=True)
11.2 异步单元测试
import pytest
import asyncio

async def async_function():
    await asyncio.sleep(0.1)
    return 42

@pytest.mark.asyncio
async def test_async_function():
    result = await async_function()
    assert result == 42

十二、最佳实践总结

合理使用async/await:只在真正需要异步的地方使用 避免阻塞调用:使用await而非同步阻塞方法 控制并发数量:使用信号量限制并发 及时清理资源:正确使用async上下文管理器 错误处理:为每个异步任务添加异常处理 性能监控:使用asyncio的调试工具监控性能 代码组织:保持异步函数简洁,避免过长的协程链

学习路径建议

入门阶段(1-2周)

进阶阶段(2-4周)

精通阶段(1-2月)

常见陷阱及解决方案

忘记await

# 错误
async def func():
    asyncio.sleep(1)  # 缺少await

# 正确
async def func():
    await asyncio.sleep(1)

在同步代码中调用异步函数

# 错误
result = async_func()  # 返回的是coroutine对象

# 正确
result = await async_func()
# 或者在事件循环中
result = asyncio.run(async_func())

过多并发导致资源耗尽

# 使用信号量控制并发
semaphore = asyncio.Semaphore(100)
async with semaphore:
    await fetch_data()

通过系统学习这些内容,你将能够熟练运用Python异步编程,构建高性能、高并发的应用程序。记住,实践是最好的老师,多写代码、多调试、多优化!

相关推荐