Herramientas de usuario

Herramientas del sitio


fw:celery

Celery

It’s a task queue with focus on real-time processing, while also supporting task scheduling.
Celery distributes its jobs between workers, they are programs which use AMPQ protocol to connect to brokers.

To use it is needed…

  • To install a broker.
  • Create tasks.
  • Start a worker which calls tasks.

You can keep track of tasks, their transitions through different states and inspect the return values.

First steps

Use

Broker

First you need to install a broker. RabbitMQ, Redis, even an SQL database can be used. RabbitMQ is the default broker, you only need to configure it as…

BROKER_URL = 'amqp://guest:guest@localhost:5672//'

You can configure the broker to…

  • Choose the data type it will accept. CELERY_ACCEPT_CONTENT and CELERY_ACCEPT_CONTENT variables will do it, by default JSON, YAML and pickle are enabled and if you don't want them you should disable them.
  • BROKER_CONNECTION_TIMEOUT
  • BROKER_POOL_LIMIT
  • BROKER_USE_SSL
  • BROKER_LOGIN_METHOD

RabbitMQ must be accessible by an user:

$ sudo rabbitmqctl add_user myuser mypassword
$ sudo rabbitmqctl add_vhost myvhost
$ sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

Basic usage

A basic application with Celery could be:

from celery import Celery
 
app = Celery('tasks', broker='amqp://guest@localhost//')
 
@app.task
def add(x, y):
    return x + y

Arguments to create a Celery object are the name of the module (which is used as a prefix for task names) and the broker URL.
To run it you should execute the celery program with the worker argument:

$ celery -A tasks worker --loglevel=info

In production you can run it as a daemon. You can know about more options using help parameter:

$ celery worker --help
$ celery help

Stopping the worker

You should never stop worker with the KILL signal (-9), unless you’ve tried TERM (Ctrl + C) a few times and waited a few minutes to let it get a chance to shut down. As if you do tasks may be terminated mid-execution, and they will not be re-run unless you have the acks_late option set.

Calling the task and receiving the result

To call a task you can use the delay() method, which is a shortcut to the apply_async():

>>> from tasks import add
>>> add.delay(4, 4)

To see the difference between delay() and apply_async():

task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

However apply_assync() allows more options like…

  • T.apply_async(countdown=10), executes 10 seconds from now.
  • T.apply_async(countdown=60, expires=120), executes in one minute from now, but expires after 2 minutes.
  • add.apply_async((2, 2)), calls add task with arguments (2,2)
  • add.apply_async((2, 2), queue='lopri', countdown=10), to indicate a concrete queue.
  • If you don't use apply_async neither delay, like add(2,2), no message will be sent.

They return an AsyncResult instance, which can be used to check the state of the task, wait for the task to finish, get its return value, or if the task failed get the exception and traceback. To do that you must store states locally or somewhere that they can be retrieved later (QLAlchemy/Django ORM, Memcached, RabbitMQ (amqp), MongoDB, and Redis). To define a RabbitMQ or Redis backend:

app = Celery('tasks', backend='amqp', broker='amqp://')
app = Celery('tasks', backend='redis://localhost', broker='amqp://')

Useful AsyncResult methods:

  • ready(), returns whether the task has finished processing or not.
  • get(timeout=1), turns the asynchronous call into a synchrounous one. If we don't want that get() re-raise the exception you can specify it with propagate=False argument.
  • traceback the traceback from the exception.
  • success() and failure() will return True if the task was finished successfully or had an error, respectively.
  • .id will return the task id.

An AsyncResult object also contains the task state. They can be: PENDING → STARTED → RETRY → STARTED → RETRY → STARTED → SUCCESS. However the STARTED state only is recorded if CELELERY_TRACK_STARTED is enabled or if the track_started argument in the task decorator is True. So the pending stated is not recorded, it only is the default:

>>> from proj.celery import app
>>> res = app.AsyncResult('this-id-does-not-exist')
>>> res.state
'PENDING'

Configuration

You can configure the app using conf.update() method.

app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
    CELERY_RESULT_SERIALIZER='json',
    CELERY_TIMEZONE='Europe/Oslo',
    CELERY_ENABLE_UTC=True,
)

However you can define the configuration inside a module. To do that you only need to code the configuration in a module (lets call it celeryconfig.py) and use the config_from_object() method to incate it:

app.config_from_object('celeryconfig')

celeryconfig.py could have the next content:

BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Europe/Oslo'
CELERY_ENABLE_UTC = True

To store configurations in modules allows to do things like:

CELERY_ROUTES = {
    'tasks.add': 'low-priority',
}
CELERY_ANNOTATIONS = {
    'tasks.add': {'rate_limit': '10/m'}
}

More about Celery

Using it in your application

The structure in a project could be…

proj/__init__.py
    /celery.py
    /tasks.py

Where proj/celery.py contains:

from __future__ import absolute_import
 
from celery import Celery
 
app = Celery('proj',
             broker='amqp://',
             backend='amqp://',
             include=['proj.tasks'])
 
# Optional configuration, see the application user guide.
app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=3600,
)
 
if __name__ == '__main__':
    app.start()
  • With the include argument to the Celery class you import the tasks modules which will use the worker.

And proj/tasks.py contains:

from __future__ import absolute_import
 
from proj.celery import app
 
 
@app.task
def add(x, y):
    return x + y
 
 
@app.task
def mul(x, y):
    return x * y
 
 
@app.task
def xsum(numbers):
    return sum(numbers)

To execute the worker we will do:

$ celery -A proj worker -l info

Daemonizing workers

The daemonization scripts uses the celery multi command which allows to execute and manage several workers running on the system.

Task decorator

It accepts arguments like…

  • @task(track_started=True), which enables the STARTED state recording.
  • @app.task(ignore_result=True) will make that the calling of this task do not obtain the result.

Subtasks

A subtask is a serialization of the tasks, so it can be passed to functions and sent after a period. You can create a subtask for the add task using the arguments (2, 2), and a countdown of 10 seconds like this:

>>> add.subtask((2, 2), countdown=10)
tasks.add(2, 2)

Or like this:

>>> add.s(2, 2)
tasks.add(2, 2)

Subtasks have the delay and apply_async methods.

Calling tasks

Using subtasks you can define incomplete arguments for a task:

# incomplete partial: add(?, 2)
>>> s2 = add.s(2)

Then s2 object needs another argument, it can e resolved when calling the subtask:

# resolves the partial: add(8, 2)
>>> res = s2.delay(8)
>>> res.get()
10

You can also add keywords to the subtask:

>>> s3 = add.s(2, 2, debug=True)
>>> s3.delay(debug=False)   # debug is now False.

Cancel the execution

You also can cancel the execution of a task:

result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()

or if you only have the task id:

>>> from proj.celery import app
>>> app.control.revoke(task_id)

Canvas primitives

Once you understand how subtasks manage partial arguments you can understand canvas primitives to call several tasks in once:

Groups

A group calls a list of tasks in parallel, and it returns a special result instance that lets you inspect the results as a group, and retrieve the return values in order.

>>> from celery import group
>>> from proj.tasks import add
>>> group(add.s(i, i) for i in xrange(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Partial group

>>> g = group(add.s(i) for i in xrange(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

Chains

Tasks can be linked together so that after one task returns the other is called:

>>> from celery import chain
>>> from proj.tasks import add, mul
# (4 + 4) * 8
>>> chain(add.s(4, 4) | mul.s(8))().get()
64

A partial chain:

# (? + 4) * 8
>>> g = chain(add.s(4) | mul.s(8))
>>> g(4).get()
64

Chains can also be written like this:

>>> (add.s(4, 4) | mul.s(8))().get()
64

Chords

A chord is a group with a callback:

>>> from celery import chord
>>> from proj.tasks import add, xsum
>>> chord((add.s(i, i) for i in xrange(10)), xsum.s())().get()
90

A group chained to another task will be automatically converted to a chord:

>>> (group(add.s(i, i) for i in xrange(10)) | xsum.s())().get()
90

Since these primitives are all of the subtask type they can be combined almost however you want, e.g:

>>> upload_document.s(file) | group(apply_filter.s() for filter in filters)

Celery command

app argument

To indicate the application which will be executed. With --app=proj::

  • an attribute named proj.app, or
  • an attribute named proj.celery, or
  • any attribute in the module proj where the value is a Celery application, or

If none of these are found it’ll try a submodule named proj.celery:

  • an attribute named proj.celery.app, or
  • an attribute named proj.celery.celery, or
  • Any atribute in the module proj.celery where the value is a Celery application.

Routing

You can define a concrete route for a task:

app.conf.update(
    CELERY_ROUTES = {
        'proj.tasks.add': {'queue': 'hipri'},
    },
)
# or....
>>> from proj.tasks import add
>>> add.apply_async((2, 2), queue='hipri')

You can also make a worker consume from this queue by specifying the -Q option:

$ celery -A proj worker -Q hipri

You may specify multiple queues by using a comma separated list, for example you can make the worker consume from both the default queue (celery queue):

$ celery -A proj worker -Q hipri,celery

Optimization

If you’re using RabbitMQ then you should install the librabbitmq module, which is an AMQP client implemented in C:

$ pip install librabbitmq

Signals

Signals are events that trigger actions when something occur in Celery. To connect an action to a signal trigger you'll do…. (for example, after_task_publish signal):

from celery.signals import after_task_publish

@after_task_publish.connect
def task_sent_handler(sender=None, body=None, **kwargs):
    print('after_task_publish for task id {body[id]}'.format(
        body=body,
    ))

Some signals identify the sender so you can filter by them. For example the after_task_publish signal uses the task name as a sender, so by providing the sender argument to connect you can connect your handler to be called every time a task with name “proj.tasks.add” is published:

@after_task_publish.connect(sender='proj.tasks.add')
def task_sent_handler(sender=None, body=None, **kwargs):
    print('after_task_publish for task id {body[id]}'.format(
        body=body,
    ))

There are several useful signals already defined.

Logging

The Celery log works using the Python logging module. So one easy way to assign a log config is using a dictionary and the function logging.config.dictConfig().

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': True,
    'formatters': {
        'simple': {
            'format': '%(levelname)s %(message)s',
            'datefmt': '%y %b %d, %H:%M:%S',
        },
    },
    'handlers': {
        'console': {
            'level': 'DEBUG',
            'class': 'logging.StreamHandler',
            'formatter': 'simple'
        },
        'celery': {
            'level': 'DEBUG',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': 'celery.log',
            'formatter': 'simple',
            'maxBytes': 1024 * 1024 * 100,  # 100 mb
        },
    },
    'loggers': {
        'celery': {
            'handlers': ['celery', 'console'],
            'level': 'DEBUG'
        },
    }
}
dictConfig(LOGGING_CONFIG)

Remember to change the CELERYD_HIJACK_ROOT_LOGGER config option to False, this way will tell Celery to remove its root log (there are other log options).

There is another way to configure logs, and it is using signals. setup_loggin will also disable the root config and allow you to assign yours. If you would like to augment the logging default configuration setup then you can use the after_setup_logger and after_setup_task_logger.

Advanced

Application

Tasks

A task message does not disappear from a queue until it has been acknowldged by a worker. If the worker is kile the message will be redelivered to ntoher worker. It acknowledge the message in advance, before it's executed (the task that has already been starte is never executed again). However you can set the acks_late option to have the worker acknowledge the message after the task returns instead.
You create a task using the @task decorator. If you needed to use more decorator, this should be the first:

@app.task
@decorator2
@decorator1
def add(x, y):
    return x + y

A name of a task is generated from the name of the function. If you wanted to assign another name you should use the name parameter for the @task decorator. A best practice is to use the module name as a namespace, this way names won’t collide:

@app.task(name='tasks.add')
def add(x, y):
  return x + y

You can know the name of the task using the name property:

add.name

The bind parameter for @task allows to the task to recibe information of its calling using the request variable. The data is accessed in the self variable.
You can also use the .retry() method to send the same task again. This method cuts the execution of the task, it's because it throws an exception. If you don't want this behaviour set the throw parameter for retry() as False. There also is the max_retries properties. Or the default_retry_delay for the task (default set as 30 secs).

@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
    try:
        twitter = Twitter(oauth)
        twitter.update_status(tweet)
    except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
        raise self.retry(exc=exc)
  • If you don’t care about the results of a task, be sure to set the ignore_result option, as storing results wastes time and resources.
  • Disabling rate limits altogether is recommended if you don’t have any tasks using them. This is because the rate limit subsystem introduces quite a lot of complexity (with CELERY_DISABLE_RATE_LIMITS variable set as True).

Routing

Option CELERY_CREATE_MISSING_QUEUES setted as True will allow Celery to autoconfigure queues by default. Default queue is named celery. To change it you will use:

from kombu import Exchange, Queue
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
)

You could tell that a task server uses concretes queues with the -Q parameter:

$ celery -A proj worker -Q feeds
$ celery -A proj worker -Q feeds,celery

In AMPQ, when you create a query an exchange and a routing_key with the same name will be created. The type of the exchange will be direct.
If you had several server tasks and you wanted that your client made selective sending of tasks you would do something like the next to tell that for task 'feed.tasks.import_feed' uses the queue 'feeds':

CELERY_ROUTES = {'feed.tasks.import_feed': {'queue': 'feeds'}}

You also could use RabbitMQ routing names:

from kombu import Queue
 
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
    Queue('default',    routing_key='task.#'),
    Queue('feed_tasks', routing_key='feed.#'),
)
CELERY_DEFAULT_EXCHANGE = 'tasks'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'task.default'

To route a task to the feed_tasks queue, you can add an entry in the CELERY_ROUTES setting:

CELERY_ROUTES = {
        'feeds.tasks.import_feed': {
            'queue': 'feed_tasks',
            'routing_key': 'feed.import',
        },
}

CELERY_QUEUES is a list of Queue instances. If you don’t set the exchange or exchange type values for a key, these will be taken from the CELERY_DEFAULT_EXCHANGE and CELERY_DEFAULT_EXCHANGE_TYPE settings.
You can define exhanges for queues in the next way:

from kombu import Exchange, Queue
 
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('videos',  Exchange('media'),   routing_key='media.video'),
    Queue('images',  Exchange('media'),   routing_key='media.image'),
)
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_DEFAULT_ROUTING_KEY = 'default'

The destination for a task is decided by the following (in order):

  1. The Routers defined in CELERY_ROUTES.
  2. The routing arguments to Task.apply_async().
  3. Routing related attributes defined on the Task itself.

Routers

A router is a class that decides the routing options for a task. All you need to define a new router is to create a class with a route_for_task method:

class MyRouter(object):
    def route_for_task(self, task, args=None, kwargs=None):
        if task == 'myapp.tasks.compress_video':
            return {'exchange': 'video',
                    'exchange_type': 'topic',
                    'routing_key': 'video.compress'}
        return None

It will modify the header for configured messages. To assign them:

CELERY_ROUTES = (MyRouter(), )

Or by name…

CELERY_ROUTES = ('myapp.routers.MyRouter', )

If simply you want to map by name:

CELERY_ROUTES = ({'myapp.tasks.compress_video': {
                        'queue': 'video',
                        'routing_key': 'video.compress'
                 }}, )

Other tools

Monitoring

To inspect active tasks:

$ celery -A proj inspect active

You can force workers to enable event messages (used for monitoring tasks and workers):

$ celery -A proj control enable_events

You can disable events:

$ celery -A proj control disable_events

When events are enabled you can then start the event dumper to see what the workers are doing:

$ celery -A proj events --dump

Or the curses interface:

$ celery -A proj events

The celery status command shows a list of online workers in the cluster:

$ celery -A proj status

Testing

With the configuration CELERY_ALWAYS_EAGER assigned as True, all the workers are called without any asynchronous behavior.

Periodic tasks

You can call Celery tasks in concrete moments: periodic tasks guide. To do so you need a beat process (only one) to call the workers when it's needed. Next example are two files:

  • beat.py, which is executed as: celery -A beat beat
from datetime import timedelta
from celery import Celery
from tasks import app
 
app.conf.update(
		CELERYBEAT_SCHEDULE = {
		    'add-every-30-seconds': {
		        'task': 'tasks.add',
		        'schedule': timedelta(seconds=10)
		    },
		},
		CELERY_TIMEZONE = 'UTC'
	)
  • tasks.py, which is executed as: celery -A tasks worker
from celery import Celery
import datetime
 
app = Celery('tasks', broker='amqp://guest@192.168.0.100//')
 
@app.task
def add():
    f = open('/home/alfred/a.txt', 'a')
    f.write(str(datetime.datetime.now()))
    f.close()

Little tools

HTTP Callbacks

You can also call Celery tasks from another language, framework or outside your application you can do so by using HTTP callback tasks. They use GET/POST data to pass arguments and returns result as a JSON response. The scheme to call a task is:

GET http://example.com/mytask/?arg1=a&arg2=b&arg3=c
POST http://example.com/mytask

You should enable the HTTP dispatch task, to do that you have to add celery.task.http to CELERY_IMPORTS, or start the worker with -I celery.task.http.

send_task() function

To call a function task not registered in the current process using only the name:

from celery import Celery
celery = Celery()
celery.config_from_object('celeryconfig')
celery.send_task('tasks.add', (2,2))

You can also call a task by name from any language that has an AMQP client.

>>> from celery.execute import send_task
>>> send_task("tasks.add", args=[2, 2], kwargs={})
<AsyncResult: 373550e8-b9a0-4666-bc61-ace01fa4f91d>

Log

To use the celery log:

from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
logger.debug('Feed %s is already being imported by another worker', feed_url)

Eventlet support

Celery supports Eventlet as an alternative execution pool implementation. To enable it:

$ celery -A proj worker -P eventlet -c 1000

Notes

Purge task queues

You can use the celery purge command to purge all configured task queues:

$ celery -A proj purge

Or programatically:

>>> from proj.celery import app
>>> app.control.purge()
1753

If you only want to purge messages from a specific queue you have to use the AMQP API or the celery amqp utility:

$ celery -A proj amqp queue.purge <queue name>

Obtain a result with a task id

>>> result = my_task.AsyncResult(task_id)
>>> result.get()

Celery & Django

You'll need to install the package:

$ pip install django-celery

You will create a Django app, inside it you will add a task.py file:

from __future__ import absolute_import
 
import os
from celery import Celery
from django.conf import settings
 
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
 
app = Celery('proj')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
 
 
@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

On the __init__.py you will add:

from __future__ import absolute_import
from .celery import app as celery_app

On the INSTALLED_APPS you'll add: djcelery
On settings you will add:

CELERY_RESULT_BACKEND='djcelery.backends.cache:CacheBackend'

To start workers:

$ python manage.py celeryd -l info

Notes

  • If you want to use print you must set CELERY_REDIRECT_STDOUTS to False.

Execute celery task from c#

using (var channel = connection.CreateModel())
{
    var guid = Guid.NewGuid().ToString();
    string message = String.Format("{{\"id\": \"{0}\", \"task\": \"my.task\", \"args\": [1, 2]}}",  guid);
    var body = Encoding.UTF8.GetBytes(message);
 
    IBasicProperties props = channel.CreateBasicProperties();
    props.ContentType = "application/json";
    props.ContentEncoding = "UTF-8";
    //channel.QueueDeclare("celery", true, true, false, null);
    channel.BasicPublish("celery", "celery", props, body);
 
    Console.WriteLine(" [x] Sent {0}", message);
}

When task is:

app = Celery('tasks', broker='amqp://guest@192.168.0.100//')
@app.task(name='my.task', ignore_result=True)
def add(a, b):
    logger = get_task_logger(__name__)
    logger.critical('tralara lalala ' + str(a+b))
fw/celery.txt · Última modificación: 2020/05/09 09:25 (editor externo)