• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

在运行时创建 celery 队列,以便发送到该队列的任务被工作人员拾取?

用户头像
it1352
帮助1

问题说明

我正在使用 django 1.4、celery 3.0、rabbitmq

I'm using django 1.4, celery 3.0, rabbitmq

为了描述这个问题,我在一个系统中有许多内容网络,我想要一个队列来处理与每个网络相关的任务.

To describe the problem, I have many content networks in a system and I want a queue for processing tasks related to each of these network.

但是,当系统运行时,内容是动态创建的,因此我需要动态创建队列并让现有工作人员开始处理它们.

However content is created on the fly when the system is live and therefore I need to create queues on the fly and have existing workers start picking up on them.

我已经尝试通过以下方式调度任务(其中内容是 django 模型实例):

I've tried scheduling tasks in the following way (where content is a django model instance):

queue_name = 'content.{}'.format(content.pk)
# E.g. queue_name = content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba
add_content.apply_async(args=[content], queue=queue_name)

这将创建一个名为 content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba 的队列,并创建一个名为 content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba 的新交换code> 和路由键 content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba 并将任务发送到该队列.

This create a queue with name content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba, creates a new exchange with name content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba and routing key content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba and sends a task to that queue.

但是我从来没有看到工人接手这些任务.我当前设置的工人没有监听任何特定的队列(未使用队列名称初始化)并接手发送到默认队列就好了.我的 Celery 设置是:

However I never see the workers picking up on these tasks. Workers that I have currently set up are not listening to any specific queues (not initialized with queue names) and pick up tasks sent to the default queue just fine. My Celery settings are:

BROKER_URL = "amqp://test:password@localhost:5672/vhost"
CELERY_TIMEZONE = 'UTC'
CELERY_ALWAYS_EAGER = False

from kombu import Exchange, Queue

CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_DEFAULT_ROUTING_KEY = 'default'

CELERY_QUEUES = (
    Queue(CELERY_DEFAULT_QUEUE, Exchange(CELERY_DEFAULT_EXCHANGE),
        routing_key=CELERY_DEFAULT_ROUTING_KEY),
)

CELERY_CREATE_MISSING_QUEUES = True
CELERYD_PREFETCH_MULTIPLIER = 1

知道如何让工作人员接手发送到这个新创建队列的任务吗?

Any idea how I can get the workers to pick up on tasks sent to this newly created queue?

正确答案

#1

你需要告诉workers开始消费新的队列.相关文档在这里.

You need to tell the workers to start consuming the new queues. Relevant docs are here.

从命令行:

$ celery control add_consumer content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba

或者从python内部:

Or from within python:

>>> app.control.add_consumer('content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba', reply=True)

两种形式都接受目标参数,因此如果需要,您可以只告诉个别工作人员有关新队列的信息.

Both forms accept a destination argument, so you can tell individual workers only about the new queues if required.

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /reply/detail/tanhcakbig
系列文章
更多 icon
同类精品
更多 icon
继续加载