python-如何获取运行任务的队列-celery

我是新来的芹菜,有一个问题.我有一个简单的任务:

@app.task(name='test_install_queue')
def test_install_queue():
    return subprocess.call("exit 0",shell=True)

我稍后在一个测试用例中调用此任务

result = tasks.test_default_queue.apply_async(queue="install")

该任务在队列安装中成功运行(因为我在celery日志中看到了它,并且可以正常完成.但是我想以编程方式从存储在结果中的对象中查找任务test_install_queue在哪个队列中运行的方式.

谢谢!

编辑:

我将任务更改为:

@app.task(name='test_install_queue',bind=True)
def test_install_queue(self):
    return self.request.__dict__

然后我按如下方式使用apply_async的结果:

result = tasks.test_install_queue.apply_async(queue="install")
assert "install" in result.get()["hostname"]

解决方法是该工作程序(主机名)与该工作程序中初始化的唯一队列的名称相同.

解决方法:

您可以尝试以下方法:

delivery_info = app.current_task.request.delivery_info
# by default celery uses the same name for queues and exchanges
original_queue = delivery_info['exchange']
for queue in app.amqp.queues.itervalues():
    if queue.exchange.name == delivery_info['exchange'] 
        and queue.routing_key == delivery_info['routing_key']:
            original_queue = queue.name
            break

该方法基于以下假设:您使用默认的芹菜设置,并且您的交换是直接的.如果您需要用于扇出和主题交换的通用解决方案,则必须检查app.amqp.queues中每个已声明队列的路由键.

上一篇:分布式异步任务队列神器-Celery


下一篇:我如何在芹菜中设置回调