Loading... > 问题: > > aioamqp官方异步MQ client的消费者实现中的消息回调callback协程是个同步执行的, 如果你的callback执行耗时很长, 那么callback的执行只会一个一个顺序执行; > > 解决方案: > > 将callback的执行体代码修改为异步提交非阻塞执行, 提交一个真正处理业务逻辑的耗时的异步协程到event_loop 中去执行**, 那么整个消费流程就彻底变为异步了; > > ** ```python # author: Michael # email: yangowen@126.com import asyncio import aioamqp import random # 如下是aioamqp官方异步MQ client的消费者实现: # async def callback(channel, body, envelope, properties): # time = random.randint(1, 3) # await asyncio.sleep(time) # print(" [x] %r, cost %s" % (body, time)) # # # async def receive_log(): # try: # transport, protocol = await aioamqp.connect('172.16.50.128', 5672, login='admin', password='admin') # except aioamqp.AmqpClosedConnection: # print("closed connections") # return # # # channel = await protocol.channel() # exchange_name = 'exchange' # # await channel.exchange(exchange_name=exchange_name, type_name='fanout') # # # let RabbitMQ generate a random queue name # result = await channel.queue(queue_name='', exclusive=True) # # queue_name = result['queue'] # await channel.queue_bind(exchange_name=exchange_name, queue_name=queue_name, routing_key='') # # print(' [*] Waiting for logs. To exit press CTRL+C') # # await channel.basic_consume(callback, queue_name=queue_name, no_ack=True) # print('hehe') # # event_loop = asyncio.get_event_loop() # event_loop.run_until_complete(receive_log()) # event_loop.run_forever() # ----------------------------修改callback 为异步执行------------------ # 将真正的回调时需要执行的任务封装成协程后放到事件循环中去执行 async def do(envelope): time = random.randint(1, 5) await asyncio.sleep(time) print(f"cost:{time}, contag:{envelope.consumer_tag}, deltag:{envelope.delivery_tag}") # mq的消息回调callback函数是个同步的协程, 即执行完该协程才会继续去执行下一个回调; # 将其转化为异步的思想是: 代码内容改为异步非阻塞, 提交一个真正处理数据的的协程到事件循环去 async def callback(channel, body, envelope, properties): loop = asyncio.get_event_loop() loop.create_task(do(envelope)) async def receive_log(): try: transport, protocol = await aioamqp.connect('172.16.50.128', 5672, login='admin', password='admin') except aioamqp.AmqpClosedConnection: print("closed connections") return channel = await protocol.channel() exchange_name = 'exchange' await channel.exchange(exchange_name=exchange_name, type_name='fanout') # let RabbitMQ generate a random queue name result = await channel.queue(queue_name='', exclusive=True) queue_name = result['queue'] await channel.queue_bind(exchange_name=exchange_name, queue_name=queue_name, routing_key='') print(' [*] Waiting for logs. To exit press CTRL+C') await channel.basic_consume(callback, queue_name=queue_name, no_ack=True) print('hehe') event_loop = asyncio.get_event_loop() event_loop.run_until_complete(receive_log()) try: # Blocking call interrupted by loop.stop() print('step: loop.run_forever()') loop.run_forever() except KeyboardInterrupt: pass finally: print('step: loop.close()') loop.close() ``` © 允许规范转载