¡Esta es una revisión vieja del documento!
It’s a task queue with focus on real-time processing, while also supporting task scheduling.
Celery distributes its jobs between workers, they are programs which use AMPQ protocol to connect to brokers.
To use it is needed…
You can keep track of tasks, their transitions through different states and inspect the return values.
First you need to install a broker. RabbitMQ, Redis, even an SQL database can be used. RabbitMQ is the default broker, you only need to configure it as…
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
You can configure the broker to…
CELERY_ACCEPT_CONTENT and CELERY_ACCEPT_CONTENT variables will do it, by default JSON, YAML and pickle are enabled and if you don't want them you should disable them.RabbitMQ must be accessible by an user:
$ sudo rabbitmqctl add_user myuser mypassword $ sudo rabbitmqctl add_vhost myvhost $ sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
A basic application with Celery could be:
from celery import Celery app = Celery('tasks', broker='amqp://guest@localhost//') @app.task def add(x, y): return x + y
Arguments to create a Celery object are the name of the module (which is used as a prefix for task names) and the broker URL.
To run it you should execute the celery program with the worker argument:
$ celery -A tasks worker --loglevel=info
In production you can run it as a daemon. You can know about more options using help parameter:
$ celery worker --help $ celery help
You should never stop worker with the KILL signal (-9), unless you’ve tried TERM (Ctrl + C) a few times and waited a few minutes to let it get a chance to shut down. As if you do tasks may be terminated mid-execution, and they will not be re-run unless you have the acks_late option set.
To call a task you can use the delay() method, which is a shortcut to the apply_async():
>>> from tasks import add >>> add.delay(4, 4)
To see the difference between delay() and apply_async():
task.delay(arg1, arg2, kwarg1='x', kwarg2='y') task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})
However apply_assync() allows more options like…
T.apply_async(countdown=10), executes 10 seconds from now.T.apply_async(countdown=60, expires=120), executes in one minute from now, but expires after 2 minutes.add.apply_async((2, 2)), calls add task with arguments (2,2)add.apply_async((2, 2), queue='lopri', countdown=10), to indicate a concrete queue.apply_async neither delay, like add(2,2), no message will be sent.They return an AsyncResult instance, which can be used to check the state of the task, wait for the task to finish, get its return value, or if the task failed get the exception and traceback. To do that you must store states locally or somewhere that they can be retrieved later (QLAlchemy/Django ORM, Memcached, RabbitMQ (amqp), MongoDB, and Redis). To define a RabbitMQ or Redis backend:
app = Celery('tasks', backend='amqp', broker='amqp://') app = Celery('tasks', backend='redis://localhost', broker='amqp://')
Useful AsyncResult methods:
ready(), returns whether the task has finished processing or not.get(timeout=1), turns the asynchronous call into a synchrounous one. If we don't want that get() re-raise the exception you can specify it with propagate=False argument.traceback the traceback from the exception.success() and failure() will return True if the task was finished successfully or had an error, respectively..id will return the task id.
An AsyncResult object also contains the task state. They can be: PENDING → STARTED → RETRY → STARTED → RETRY → STARTED → SUCCESS. However the STARTED state only is recorded if CELELERY_TRACK_STARTED is enabled or if the track_started argument in the task decorator is True. So the pending stated is not recorded, it only is the default:
>>> from proj.celery import app >>> res = app.AsyncResult('this-id-does-not-exist') >>> res.state 'PENDING'
You can configure the app using conf.update() method.
app.conf.update( CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json'], # Ignore other content CELERY_RESULT_SERIALIZER='json', CELERY_TIMEZONE='Europe/Oslo', CELERY_ENABLE_UTC=True, )
However you can define the configuration inside a module. To do that you only need to code the configuration in a module (lets call it celeryconfig.py) and use the config_from_object() method to incate it:
app.config_from_object('celeryconfig')
celeryconfig.py could have the next content:
BROKER_URL = 'amqp://' CELERY_RESULT_BACKEND = 'amqp://' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT=['json'] CELERY_TIMEZONE = 'Europe/Oslo' CELERY_ENABLE_UTC = True
To store configurations in modules allows to do things like:
CELERY_ROUTES = { 'tasks.add': 'low-priority', }
CELERY_ANNOTATIONS = { 'tasks.add': {'rate_limit': '10/m'} }
The structure in a project could be…
proj/__init__.py
/celery.py
/tasks.py
Where proj/celery.py contains:
from __future__ import absolute_import from celery import Celery app = Celery('proj', broker='amqp://', backend='amqp://', include=['proj.tasks']) # Optional configuration, see the application user guide. app.conf.update( CELERY_TASK_RESULT_EXPIRES=3600, ) if __name__ == '__main__': app.start()
include argument to the Celery class you import the tasks modules which will use the worker.And proj/tasks.py contains:
from __future__ import absolute_import from proj.celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)
To execute the worker we will do:
$ celery -A proj worker -l info
The daemonization scripts uses the celery multi command which allows to execute and manage several workers running on the system.
It accepts arguments like…
@task(track_started=True), which enables the STARTED state recording.@app.task(ignore_result=True) will make that the calling of this task do not obtain the result.A subtask is a serialization of the tasks, so it can be passed to functions and sent after a period. You can create a subtask for the add task using the arguments (2, 2), and a countdown of 10 seconds like this:
>>> add.subtask((2, 2), countdown=10) tasks.add(2, 2)
Or like this:
>>> add.s(2, 2) tasks.add(2, 2)
Subtasks have the delay and apply_async methods.
Using subtasks you can define incomplete arguments for a task:
# incomplete partial: add(?, 2) >>> s2 = add.s(2)
Then s2 object needs another argument, it can e resolved when calling the subtask:
# resolves the partial: add(8, 2) >>> res = s2.delay(8) >>> res.get() 10
You can also add keywords to the subtask:
>>> s3 = add.s(2, 2, debug=True) >>> s3.delay(debug=False) # debug is now False.
You also can cancel the execution of a task:
result = add.apply_async(args=[2, 2], countdown=120) >>> result.revoke()
or if you only have the task id:
>>> from proj.celery import app >>> app.control.revoke(task_id)
Once you understand how subtasks manage partial arguments you can understand canvas primitives to call several tasks in once:
A group calls a list of tasks in parallel, and it returns a special result instance that lets you inspect the results as a group, and retrieve the return values in order.
>>> from celery import group >>> from proj.tasks import add >>> group(add.s(i, i) for i in xrange(10))().get() [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Partial group
>>> g = group(add.s(i) for i in xrange(10)) >>> g(10).get() [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Tasks can be linked together so that after one task returns the other is called:
>>> from celery import chain >>> from proj.tasks import add, mul # (4 + 4) * 8 >>> chain(add.s(4, 4) | mul.s(8))().get() 64
A partial chain:
# (? + 4) * 8 >>> g = chain(add.s(4) | mul.s(8)) >>> g(4).get() 64
Chains can also be written like this:
>>> (add.s(4, 4) | mul.s(8))().get() 64
A chord is a group with a callback:
>>> from celery import chord >>> from proj.tasks import add, xsum >>> chord((add.s(i, i) for i in xrange(10)), xsum.s())().get() 90
A group chained to another task will be automatically converted to a chord:
>>> (group(add.s(i, i) for i in xrange(10)) | xsum.s())().get() 90
Since these primitives are all of the subtask type they can be combined almost however you want, e.g:
>>> upload_document.s(file) | group(apply_filter.s() for filter in filters)
To indicate the application which will be executed. With --app=proj::
If none of these are found it’ll try a submodule named proj.celery:
You can define a concrete route for a task:
app.conf.update( CELERY_ROUTES = { 'proj.tasks.add': {'queue': 'hipri'}, }, ) # or.... >>> from proj.tasks import add >>> add.apply_async((2, 2), queue='hipri')
You can also make a worker consume from this queue by specifying the -Q option:
$ celery -A proj worker -Q hipri
You may specify multiple queues by using a comma separated list, for example you can make the worker consume from both the default queue (celery queue):
$ celery -A proj worker -Q hipri,celery
If you’re using RabbitMQ then you should install the librabbitmq module, which is an AMQP client implemented in C:
$ pip install librabbitmq
Signals are events that trigger actions when something occur in Celery. To connect an action to a signal trigger you'll do…. (for example, after_task_publish signal):
from celery.signals import after_task_publish
@after_task_publish.connect
def task_sent_handler(sender=None, body=None, **kwargs):
print('after_task_publish for task id {body[id]}'.format(
body=body,
))
Some signals identify the sender so you can filter by them. For example the after_task_publish signal uses the task name as a sender, so by providing the sender argument to connect you can connect your handler to be called every time a task with name “proj.tasks.add” is published:
@after_task_publish.connect(sender='proj.tasks.add') def task_sent_handler(sender=None, body=None, **kwargs): print('after_task_publish for task id {body[id]}'.format( body=body, ))
There are several useful signals already defined.
To inspect active tasks:
$ celery -A proj inspect active
You can force workers to enable event messages (used for monitoring tasks and workers):
$ celery -A proj control enable_events
You can disable events:
$ celery -A proj control disable_events
When events are enabled you can then start the event dumper to see what the workers are doing:
$ celery -A proj events --dump
Or the curses interface:
$ celery -A proj events
The celery status command shows a list of online workers in the cluster:
$ celery -A proj status
With the configuration CELERY_ALWAYS_EAGER all the workers are called without any asynchronous behavior, so you can debug them.
You can call Celery tasks in concrete moments: periodic tasks guide.
You can also call Celery tasks from another language, framework or outside your application you can do so by using HTTP callback tasks. They use GET/POST data to pass arguments and returns result as a JSON response. The scheme to call a task is:
GET http://example.com/mytask/?arg1=a&arg2=b&arg3=c POST http://example.com/mytask
You should enable the HTTP dispatch task, to do that you have to add celery.task.http to CELERY_IMPORTS, or start the worker with -I celery.task.http.
To call a function task not registered in the current process using only the name:
from celery import Celery celery = Celery() celery.config_from_object('celeryconfig') celery.send_task('tasks.add', (2,2))
You can also call a task by name from any language that has an AMQP client.
>>> from celery.execute import send_task >>> send_task("tasks.add", args=[2, 2], kwargs={}) <AsyncResult: 373550e8-b9a0-4666-bc61-ace01fa4f91d>
To use the celery log:
from celery.utils.log import get_task_logger logger = get_task_logger(__name__) logger.debug('Feed %s is already being imported by another worker', feed_url)
Celery supports Eventlet as an alternative execution pool implementation. To enable it:
$ celery -A proj worker -P eventlet -c 1000
You can use the celery purge command to purge all configured task queues:
$ celery -A proj purge
Or programatically:
>>> from proj.celery import app >>> app.control.purge() 1753
If you only want to purge messages from a specific queue you have to use the AMQP API or the celery amqp utility:
$ celery -A proj amqp queue.purge <queue name>
>>> result = my_task.AsyncResult(task_id) >>> result.get()