本文探讨了在`asyncio`中处理嵌套异步生成器时,如何通过传统`await`模式导致的串行执行问题。针对`await`的阻塞特性,文章提出并详细阐述了利用`asyncio.queue`和`asyncio.event`构建生产者-消费者模式的解决方案,从而实现任务间的解耦和真正的并发执行,显著提升异步应用的效率和响应性。
在asyncio编程中,await关键字是调度协程的核心机制。当一个协程遇到await表达式时,它会暂停自身的执行,将控制权交还给事件循环,并等待被await的协程完成。一旦被await的协程完成并返回结果,原协程才会从暂停点继续执行。这种机制虽然实现了协作式多任务,但如果设计不当,也可能导致非预期的串行执行。
考虑以下场景:一个异步任务(main)需要从一个异步生成器(sentences_generator)获取数据,然后将数据传递给另一个异步任务(process_sentence)进行处理。如果main函数在每次获取到数据后,都直接await process_sentence的完成,那么在process_sentence执行期间,sentences_generator将无法继续生成新的数据。这违背了我们期望的并发处理,即当process_sentence在处理当前数据时,sentences_generator应该能够同时准备下一批数据。
以下是原始代码示例及其输出,展示了这种串行阻塞行为:
import asyncio
async def stream():
char_string = "Hi. Hello. Hello."
for char in char_string:
await asyncio.sleep(0.1) # 模拟耗时操作
print("got char:", char)
yield char
async def sentences_generator():
sentence = ""
async for char in stream():
sentence += char
if char in [".", "!", "?"]:
print("got sentence: ", sentence)
yield sentence
sentence = ""
async def process_sentence(sentence: str):
print("waiting for processing sentence: ", sentence)
await asyncio.sleep(len(sentence)*0.1) # 模拟耗时处理
print("sentence processed!")
async def main():
i=0
async for sentence in sentences_generator():
print("processing sentence: ", i)
await process_sentence(sentence) # 这里的await导致阻塞
i += 1
# asyncio.run(main())原始输出示例:
got char: H got char: i got char: . got sentence: Hi. processing sentence: 0 waiting for processing sentence: Hi. sentence processed! got char: got char: H got char: e got char: y got char: . got sentence: Hey. processing sentence: 1 waiting for processing sentence: Hey. sentence processed! ...
从输出可以看出,只有当process_sentence完全处理完一个句子后,stream和sentences_generator才能继续生成下一个字符和句子。这并不是我们期望的并发效果。
为了实现真正的并发,我们需要解耦数据的生产和消费过程,使它们能够独立运行。asyncio.Queue是实现这种生产者-消费者模式的理想工具。
核心思想:
此外,为了实现优雅的关闭和通知消费者数据已全部生产完毕,我们可以引入asyncio.Event。生产者在完成所有数据生产后设置Event,消费者则可以结合队列是否为空和Event状态来判断何时停止。
我们将修改sentences_generator作为生产者,将生成的句子放入队列;process_sentence作为消费者,从队列中取出句子进行处理。main函数将负责启动这两个独立的协程。
import asyncio
# 定义全局变量用于计数,方便观察
i = 1
async def stream():
char_string = "Hi. Hello. Thank you." # 增加一些内容以更好地展示并发
for char in char_string:
await asyncio.sleep(0.1) # 模拟耗时操作
print("got char:", char)
yield char
async def sentences_generator(q: asyncio.Queue[str], flag: asyncio.Event):
"""
生产者协程:从字符流生成句子,并放入队列。
当所有句子生成完毕后,设置flag通知消费者。
"""
sentence = ""
async for char in stream():
sentence += char
if char in [".", "!", "?"]:
print("got sentence: ", sentence)
await q.put(sentence) # 将生成的句子放入队列
sentence = ""
# 确保最后一个不以标点符号结尾的句子也被处理(如果需要)
if sentence:
print("got sentence: ", sentence)
await q.put(sentence)
flag.set() # 生产完毕,设置事件标志
async def process_sentence(q: asyncio.Queue[str], flag: asyncio.Event):
"""
消费者协程:从队列中获取句子并进行处理。
当队列为空且生产者已设置flag时,停止消费。
"""
global i
while True:
# 检查是否应该停止:队列为空且生产者已完成
if q.empty() and flag.is_set():
break
# 尝试从队列获取项目,如果队列为空则等待
item = await q.get()
print("processing sentence: ", i)
print("waiting for processing sentence: ", item)
await asyncio.sleep(len(item) * 0.1) # 模拟耗时处理
print("sentence processed!")
q.task_done() # 通知队列此任务已完成
i += 1
async def main():
global i
i = 1 # 重置计数器
event = asyncio.Event() # 用于生产者通知消费者结束
queue = asyncio.Queue[str]() # 生产者和消费者之间的通信队列
# 启动生产者和消费者作为独立的协程任务
producer_task = asyncio.create_task(sentences_generator(queue, event))
consumer_task = asyncio.create_task(process_sentence(queue, event))
# 等待所有任务完成
await asyncio.gather(producer_task, consumer_task)
# 可选:等待队列中所有任务被标记为完成,确保所有数据都被处理
await queue.join()
asyncio.run(main())预期输出示例:
got char: H got char: i got char: . got sentence: Hi. got char: got char: H processing sentence: 1 waiting for processing sentence: Hi. got char: e got char: l got char: l got char: o got char: . got sentence: Hello. sentence processed! got char: got char: T processing sentence: 2 waiting for processing sentence: Hello. got char: h got char: a got char: n got char: k got char: got char: y got char: o got char: u got char: . got sentence: Thank you. sentence processed! processing sentence: 3 waiting for processing sentence: Thank you. sentence processed!
从这个输出可以看出,当process_sentence正在处理第一个句子时,stream和sentences_generator已经继续生成了后续的字符和句子,并将其放入队列。这正是我们期望的并发行为。
逻辑至关重要。一个常见的模式是while True循环,内部判断q.empty() and flag.is_set()来决定是否退出。这确保了在生产者完成且队列中所有待处理项都已消费后,消费者才能安全退出。通过将异步任务分解为独立的生产者和消费者,并利用asyncio.Queue进行通信,我们成功地将原本串行执行的逻辑转换为了并发执行。这种模式不仅提高了资源利用率,也使得代码结构更加清晰,易于维护和扩展。在设计复杂的asyncio应用时,当存在数据流动的依赖但又希望实现任务并行时,生产者-消费者模式与asyncio.Queue是解决这类问题的强大工具。
# go
# 工具
# ai
# stream
# 异步任务
# while
# try
# 循环
# Event
# 线程
# 并发
# 事件
# 异步
# 多个
# 为空
# 可以看出
# 都已
# 子时
# 这确
# 第一个
# 才会
# 它是
# 我们可以
相关文章:
大学网站设计制作软件有哪些,如何将网站制作成自己app?
制作销售网站教学视频,销售网站有哪些?
javascript中的try catch异常捕获机制用法分析
C#如何使用XPathNavigator高效查询XML
如何在阿里云部署织梦网站?
建站之星Pro快速搭建教程:模板选择与功能配置指南
高性价比服务器租赁——企业级配置与24小时运维服务
如何彻底删除建站之星生成的Banner?
如何在局域网内绑定自建网站域名?
建站之星伪静态规则如何设置?
宿州网站制作公司兴策,安徽省低保查询网站?
如何高效配置香港服务器实现快速建站?
如何通过西部建站助手安装IIS服务器?
小捣蛋自助建站系统:数据分析与安全设置双核驱动网站优化
金*站制作公司有哪些,金华教育集团官网?
图册素材网站设计制作软件,图册的导出方式有几种?
怀化网站制作公司,怀化新生儿上户网上办理流程?
全景视频制作网站有哪些,全景图怎么做成网页?
公司网站制作价格怎么算,公司办个官网需要多少钱?
c++ stringstream用法详解_c++字符串与数字转换利器
如何通过多用户协作模板快速搭建高效企业网站?
利用JavaScript实现拖拽改变元素大小
台州网站建设制作公司,浙江手机无犯罪记录证明怎么开?
图片制作网站免费软件,有没有免费的网站或软件可以将图片批量转为A4大小的pdf?
建站VPS配置与SEO优化指南:关键词排名提升策略
微网站制作教程,不会写代码,不会编程,怎么样建自己的网站?
广平建站公司哪家专业可靠?如何选择?
制作门户网站的参考文献在哪,小说网站怎么建立?
如何获取免费开源的自助建站系统源码?
公司网站的制作公司,企业网站制作基本流程有哪些?
如何制作一个表白网站视频,关于勇敢表白的小标题?
建站之星后台搭建步骤解析:模板选择与产品管理实操指南
如何通过智能用户系统一键生成高效建站方案?
如何在万网ECS上快速搭建专属网站?
h5在线制作网站电脑版下载,h5网页制作软件?
制作网站的公司有哪些,做一个公司网站要多少钱?
如何选择靠谱的建站公司加盟品牌?
建站之星安装模板失败:服务器环境不兼容?
网页制作模板网站推荐,网页设计海报之类的素材哪里好?
网站制作软件有哪些,制图软件有哪些?
如何选择高效便捷的WAP商城建站系统?
网站制作企业,网站的banner和导航栏是指什么?
如何实现建站之星域名转发设置?
兔展官网 在线制作,怎样制作微信请帖?
jQuery 常见小例汇总
视频网站制作教程,怎么样制作优酷网的小视频?
成都响应式网站开发,dw怎么把手机适应页面变成网页?
建站10G流量真的够用吗?如何应对访问高峰?
电脑免费海报制作网站推荐,招聘海报哪个网站多?
魔毅自助建站系统:模板定制与SEO优化一键生成指南
*请认真填写需求信息,我们会在24小时内与您取得联系。