¡Esta es una revisión vieja del documento!
It’s a task queue with focus on real-time processing, while also supporting task scheduling.
Celery distributes its jobs between workers, they are programs which use AMPQ protocol to connect to brokers.
To use it is needed…
You can keep track of tasks, their transitions through different states and inspect the return values.
First you need to install a broker. RabbitMQ, Redis, even an SQL database can be used. RabbitMQ is the default broker, you only need to configure it as…
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
You can configure the broker to…
CELERY_ACCEPT_CONTENT and CELERY_ACCEPT_CONTENT variables will do it, by default JSON, YAML and pickle are enabled and if you don't want them you should disable them.RabbitMQ must be accessible by an user:
$ sudo rabbitmqctl add_user myuser mypassword $ sudo rabbitmqctl add_vhost myvhost $ sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
A basic application with Celery could be:
from celery import Celery app = Celery('tasks', broker='amqp://guest@localhost//') @app.task def add(x, y): return x + y
Arguments to create a Celery object are the name of the module (which is used as a prefix for task names) and the broker URL.
To run it you should execute the celery program with the worker argument:
$ celery -A tasks worker --loglevel=info
In production you can run it as a daemon. You can know about more options using help parameter:
$ celery worker --help $ celery help
You should never stop worker with the KILL signal (-9), unless you’ve tried TERM (Ctrl + C) a few times and waited a few minutes to let it get a chance to shut down. As if you do tasks may be terminated mid-execution, and they will not be re-run unless you have the acks_late option set.
To call a task you can use the delay() method, which is a shortcut to the apply_async():
>>> from tasks import add >>> add.delay(4, 4)
To see the difference between delay() and apply_async():
task.delay(arg1, arg2, kwarg1='x', kwarg2='y') task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})
However apply_assync() allows more options like…
T.apply_async(countdown=10), executes 10 seconds from now.T.apply_async(countdown=60, expires=120), executes in one minute from now, but expires after 2 minutes.add.apply_async((2, 2)), calls add task with arguments (2,2)add.apply_async((2, 2), queue='lopri', countdown=10), to indicate a concrete queue.apply_async neither delay, like add(2,2), no message will be sent.They return an AsyncResult instance, which can be used to check the state of the task, wait for the task to finish, get its return value, or if the task failed get the exception and traceback. To do that you must store states locally or somewhere that they can be retrieved later (QLAlchemy/Django ORM, Memcached, RabbitMQ (amqp), MongoDB, and Redis). To define a RabbitMQ or Redis backend:
app = Celery('tasks', backend='amqp', broker='amqp://') app = Celery('tasks', backend='redis://localhost', broker='amqp://')
Useful AsyncResult methods:
ready(), returns whether the task has finished processing or not.get(timeout=1), turns the asynchronous call into a synchrounous one. If we don't want that get() re-raise the exception you can specify it with propagate=False argument.traceback the traceback from the exception.success() and failure() will return True if the task was finished successfully or had an error, respectively..id will return the task id.
An AsyncResult object also contains the task state. They can be: PENDING → STARTED → RETRY → STARTED → RETRY → STARTED → SUCCESS. However the STARTED state only is recorded if CELELERY_TRACK_STARTED is enabled or if the track_started argument in the task decorator is True. So the pending stated is not recorded, it only is the default:
>>> from proj.celery import app >>> res = app.AsyncResult('this-id-does-not-exist') >>> res.state 'PENDING'
You can configure the app using conf.update() method.
app.conf.update( CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json'], # Ignore other content CELERY_RESULT_SERIALIZER='json', CELERY_TIMEZONE='Europe/Oslo', CELERY_ENABLE_UTC=True, )
However you can define the configuration inside a module. To do that you only need to code the configuration in a module (lets call it celeryconfig.py) and use the config_from_object() method to incate it:
app.config_from_object('celeryconfig')
celeryconfig.py could have the next content:
BROKER_URL = 'amqp://' CELERY_RESULT_BACKEND = 'amqp://' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT=['json'] CELERY_TIMEZONE = 'Europe/Oslo' CELERY_ENABLE_UTC = True
To store configurations in modules allows to do things like:
CELERY_ROUTES = { 'tasks.add': 'low-priority', }
CELERY_ANNOTATIONS = { 'tasks.add': {'rate_limit': '10/m'} }
The structure in a project could be…
proj/__init__.py
/celery.py
/tasks.py
Where proj/celery.py contains:
from __future__ import absolute_import from celery import Celery app = Celery('proj', broker='amqp://', backend='amqp://', include=['proj.tasks']) # Optional configuration, see the application user guide. app.conf.update( CELERY_TASK_RESULT_EXPIRES=3600, ) if __name__ == '__main__': app.start()
include argument to the Celery class you import the tasks modules which will use the worker.And proj/tasks.py contains:
from __future__ import absolute_import from proj.celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)
To execute the worker we will do:
$ celery -A proj worker -l info
The daemonization scripts uses the celery multi command which allows to execute and manage several workers running on the system.
It accepts arguments like…
@task(track_started=True), which enables the STARTED state recording.@app.task(ignore_result=True) will make that the calling of this task do not obtain the result.A subtask is a serialization of the tasks, so it can be passed to functions and sent after a period. You can create a subtask for the add task using the arguments (2, 2), and a countdown of 10 seconds like this:
>>> add.subtask((2, 2), countdown=10) tasks.add(2, 2)
Or like this:
>>> add.s(2, 2) tasks.add(2, 2)
Subtasks have the delay and apply_async methods.
Using subtasks you can define incomplete arguments for a task:
# incomplete partial: add(?, 2) >>> s2 = add.s(2)
Then s2 object needs another argument, it can e resolved when calling the subtask:
# resolves the partial: add(8, 2) >>> res = s2.delay(8) >>> res.get() 10
You can also add keywords to the subtask:
>>> s3 = add.s(2, 2, debug=True) >>> s3.delay(debug=False) # debug is now False.
You also can cancel the execution of a task:
result = add.apply_async(args=[2, 2], countdown=120) >>> result.revoke()
or if you only have the task id:
>>> from proj.celery import app >>> app.control.revoke(task_id)
Once you understand how subtasks manage partial arguments you can understand canvas primitives to call several tasks in once:
A group calls a list of tasks in parallel, and it returns a special result instance that lets you inspect the results as a group, and retrieve the return values in order.
>>> from celery import group >>> from proj.tasks import add >>> group(add.s(i, i) for i in xrange(10))().get() [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Partial group
>>> g = group(add.s(i) for i in xrange(10)) >>> g(10).get() [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Tasks can be linked together so that after one task returns the other is called:
>>> from celery import chain >>> from proj.tasks import add, mul # (4 + 4) * 8 >>> chain(add.s(4, 4) | mul.s(8))().get() 64
A partial chain:
# (? + 4) * 8 >>> g = chain(add.s(4) | mul.s(8)) >>> g(4).get() 64
Chains can also be written like this:
>>> (add.s(4, 4) | mul.s(8))().get() 64
A chord is a group with a callback:
>>> from celery import chord >>> from proj.tasks import add, xsum >>> chord((add.s(i, i) for i in xrange(10)), xsum.s())().get() 90
A group chained to another task will be automatically converted to a chord:
>>> (group(add.s(i, i) for i in xrange(10)) | xsum.s())().get() 90
Since these primitives are all of the subtask type they can be combined almost however you want, e.g:
>>> upload_document.s(file) | group(apply_filter.s() for filter in filters)
To indicate the application which will be executed. With --app=proj::
If none of these are found it’ll try a submodule named proj.celery:
You can define a concrete route for a task:
app.conf.update( CELERY_ROUTES = { 'proj.tasks.add': {'queue': 'hipri'}, }, ) # or.... >>> from proj.tasks import add >>> add.apply_async((2, 2), queue='hipri')
You can also make a worker consume from this queue by specifying the -Q option:
$ celery -A proj worker -Q hipri
You may specify multiple queues by using a comma separated list, for example you can make the worker consume from both the default queue (celery queue):
$ celery -A proj worker -Q hipri,celery
If you’re using RabbitMQ then you should install the librabbitmq module, which is an AMQP client implemented in C:
$ pip install librabbitmq
Signals are events that trigger actions when something occur in Celery. To connect an action to a signal trigger you'll do…. (for example, after_task_publish signal):
from celery.signals import after_task_publish
@after_task_publish.connect
def task_sent_handler(sender=None, body=None, **kwargs):
print('after_task_publish for task id {body[id]}'.format(
body=body,
))
Some signals identify the sender so you can filter by them. For example the after_task_publish signal uses the task name as a sender, so by providing the sender argument to connect you can connect your handler to be called every time a task with name “proj.tasks.add” is published:
@after_task_publish.connect(sender='proj.tasks.add') def task_sent_handler(sender=None, body=None, **kwargs): print('after_task_publish for task id {body[id]}'.format( body=body, ))
There are several useful signals already defined.
The Celery log works using the Python logging module. So one easy way to assign a log config is using a dictionary and the function logging.config.dictConfig().
LOGGING_CONFIG = { 'version': 1, 'disable_existing_loggers': True, 'formatters': { 'simple': { 'format': '%(levelname)s %(message)s', 'datefmt': '%y %b %d, %H:%M:%S', }, }, 'handlers': { 'console': { 'level': 'DEBUG', 'class': 'logging.StreamHandler', 'formatter': 'simple' }, 'celery': { 'level': 'DEBUG', 'class': 'logging.handlers.RotatingFileHandler', 'filename': 'celery.log', 'formatter': 'simple', 'maxBytes': 1024 * 1024 * 100, # 100 mb }, }, 'loggers': { 'celery': { 'handlers': ['celery', 'console'], 'level': 'DEBUG' }, } } dictConfig(LOGGING_CONFIG)
Remember to change the CELERYD_HIJACK_ROOT_LOGGER config option to False, this way will tell Celery to remove its root log (there are other log options).
There is another way to configure logs, and it is using signals. setup_loggin will also disable the root config and allow you to assign yours. If you would like to augment the logging default configuration setup then you can use the after_setup_logger and after_setup_task_logger.
A task message does not disappear from a queue until it has been acknowldged by a worker. If the worker is kile the message will be redelivered to ntoher worker. It acknowledge the message in advance, before it's executed (the task that has already been starte is never executed again). However you can set the acks_late option to have the worker acknowledge the message after the task returns instead.
You create a task using the @task decorator. If you needed to use more decorator, this should be the first:
@app.task @decorator2 @decorator1 def add(x, y): return x + y
A name of a task is generated from the name of the function. If you wanted to assign another name you should use the name parameter for the @task decorator. A best practice is to use the module name as a namespace, this way names won’t collide:
@app.task(name='tasks.add') def add(x, y): return x + y
You can know the name of the task using the name property:
add.name
The bind parameter for @task allows to the task to recibe information of its calling using the request variable. The data is accessed in the self variable.
You can also use the .retry() method to send the same task again. This method cuts the execution of the task, it's because it throws an exception. If you don't want this behaviour set the throw parameter for retry() as False. There also is the max_retries properties. Or the default_retry_delay for the task (default set as 30 secs).
@app.task(bind=True) def send_twitter_status(self, oauth, tweet): try: twitter = Twitter(oauth) twitter.update_status(tweet) except (Twitter.FailWhaleError, Twitter.LoginError) as exc: raise self.retry(exc=exc)
CELERY_DISABLE_RATE_LIMITS variable set as True).
Option CELERY_CREATE_MISSING_QUEUES setted as True will allow Celery to autoconfigure queues by default. Default queue is named celery. To change it you will use:
from kombu import Exchange, Queue CELERY_DEFAULT_QUEUE = 'default' CELERY_QUEUES = ( Queue('default', Exchange('default'), routing_key='default'), )
You could tell that a task server uses concretes queues with the -Q parameter:
$ celery -A proj worker -Q feeds $ celery -A proj worker -Q feeds,celery
In AMPQ, when you create a query an exchange and a routing_key with the same name will be created. The type of the exchange will be direct.
If you had several server tasks and you wanted that your client made selective sending of tasks you would do something like the next to tell that for task 'feed.tasks.import_feed' uses the queue 'feeds':
CELERY_ROUTES = {'feed.tasks.import_feed': {'queue': 'feeds'}}
You also could use RabbitMQ routing names:
from kombu import Queue CELERY_DEFAULT_QUEUE = 'default' CELERY_QUEUES = ( Queue('default', routing_key='task.#'), Queue('feed_tasks', routing_key='feed.#'), ) CELERY_DEFAULT_EXCHANGE = 'tasks' CELERY_DEFAULT_EXCHANGE_TYPE = 'topic' CELERY_DEFAULT_ROUTING_KEY = 'task.default'
To route a task to the feed_tasks queue, you can add an entry in the CELERY_ROUTES setting:
CELERY_ROUTES = { 'feeds.tasks.import_feed': { 'queue': 'feed_tasks', 'routing_key': 'feed.import', }, }
CELERY_QUEUES is a list of Queue instances. If you don’t set the exchange or exchange type values for a key, these will be taken from the CELERY_DEFAULT_EXCHANGE and CELERY_DEFAULT_EXCHANGE_TYPE settings.
You can define exhanges for queues in the next way:
from kombu import Exchange, Queue CELERY_QUEUES = ( Queue('default', Exchange('default'), routing_key='default'), Queue('videos', Exchange('media'), routing_key='media.video'), Queue('images', Exchange('media'), routing_key='media.image'), ) CELERY_DEFAULT_QUEUE = 'default' CELERY_DEFAULT_EXCHANGE_TYPE = 'direct' CELERY_DEFAULT_ROUTING_KEY = 'default'
The destination for a task is decided by the following (in order):
A router is a class that decides the routing options for a task. All you need to define a new router is to create a class with a route_for_task method:
class MyRouter(object): def route_for_task(self, task, args=None, kwargs=None): if task == 'myapp.tasks.compress_video': return {'exchange': 'video', 'exchange_type': 'topic', 'routing_key': 'video.compress'} return None
It will modify the header for configured messages. To assign them:
CELERY_ROUTES = (MyRouter(), )
Or by name…
CELERY_ROUTES = ('myapp.routers.MyRouter', )
If simply you want to map by name:
CELERY_ROUTES = ({'myapp.tasks.compress_video': { 'queue': 'video', 'routing_key': 'video.compress' }}, )
To inspect active tasks:
$ celery -A proj inspect active
You can force workers to enable event messages (used for monitoring tasks and workers):
$ celery -A proj control enable_events
You can disable events:
$ celery -A proj control disable_events
When events are enabled you can then start the event dumper to see what the workers are doing:
$ celery -A proj events --dump
Or the curses interface:
$ celery -A proj events
The celery status command shows a list of online workers in the cluster:
$ celery -A proj status
With the configuration CELERY_ALWAYS_EAGER assigned as True, all the workers are called without any asynchronous behavior.
You can call Celery tasks in concrete moments: periodic tasks guide.
You can also call Celery tasks from another language, framework or outside your application you can do so by using HTTP callback tasks. They use GET/POST data to pass arguments and returns result as a JSON response. The scheme to call a task is:
GET http://example.com/mytask/?arg1=a&arg2=b&arg3=c POST http://example.com/mytask
You should enable the HTTP dispatch task, to do that you have to add celery.task.http to CELERY_IMPORTS, or start the worker with -I celery.task.http.
To call a function task not registered in the current process using only the name:
from celery import Celery celery = Celery() celery.config_from_object('celeryconfig') celery.send_task('tasks.add', (2,2))
You can also call a task by name from any language that has an AMQP client.
>>> from celery.execute import send_task >>> send_task("tasks.add", args=[2, 2], kwargs={}) <AsyncResult: 373550e8-b9a0-4666-bc61-ace01fa4f91d>
To use the celery log:
from celery.utils.log import get_task_logger logger = get_task_logger(__name__) logger.debug('Feed %s is already being imported by another worker', feed_url)
Celery supports Eventlet as an alternative execution pool implementation. To enable it:
$ celery -A proj worker -P eventlet -c 1000
You can use the celery purge command to purge all configured task queues:
$ celery -A proj purge
Or programatically:
>>> from proj.celery import app >>> app.control.purge() 1753
If you only want to purge messages from a specific queue you have to use the AMQP API or the celery amqp utility:
$ celery -A proj amqp queue.purge <queue name>
>>> result = my_task.AsyncResult(task_id) >>> result.get()
You'll need to install the package:
$ pip install django-celery
You will create a Django app, inside it you will add a task.py file:
from __future__ import absolute_import import os from celery import Celery from django.conf import settings os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') app = Celery('proj') app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
On the __init__.py you will add:
from __future__ import absolute_import from .celery import app as celery_app
On the INSTALLED_APPS you'll add: djcelery
On settings you will add:
CELERY_RESULT_BACKEND='djcelery.backends.cache:CacheBackend'
To start workers:
$ python manage.py celeryd -l info
CELERY_REDIRECT_STDOUTS to False.