Muestra las diferencias entre dos versiones de la página.
| Ambos lados, revisión anterior Revisión previa Próxima revisión | Revisión previa | ||
|
fw:celery [2014/08/09 17:42] alfred [Signals] |
fw:celery [2020/05/09 09:25] (actual) |
||
|---|---|---|---|
| Línea 351: | Línea 351: | ||
| ==== Logging ==== | ==== Logging ==== | ||
| + | The Celery log works using the Python ''logging'' module. So one easy way to assign a log config is using a dictionary and the function ''logging.config.dictConfig()''. | ||
| + | * [[https://docs.python.org/2/library/logging.config.html|dictConfig doc]] | ||
| + | * [[https://docs.python.org/2/library/logging.html|Python logging module doc]] | ||
| + | <code python> | ||
| + | LOGGING_CONFIG = { | ||
| + | 'version': 1, | ||
| + | 'disable_existing_loggers': True, | ||
| + | 'formatters': { | ||
| + | 'simple': { | ||
| + | 'format': '%(levelname)s %(message)s', | ||
| + | 'datefmt': '%y %b %d, %H:%M:%S', | ||
| + | }, | ||
| + | }, | ||
| + | 'handlers': { | ||
| + | 'console': { | ||
| + | 'level': 'DEBUG', | ||
| + | 'class': 'logging.StreamHandler', | ||
| + | 'formatter': 'simple' | ||
| + | }, | ||
| + | 'celery': { | ||
| + | 'level': 'DEBUG', | ||
| + | 'class': 'logging.handlers.RotatingFileHandler', | ||
| + | 'filename': 'celery.log', | ||
| + | 'formatter': 'simple', | ||
| + | 'maxBytes': 1024 * 1024 * 100, # 100 mb | ||
| + | }, | ||
| + | }, | ||
| + | 'loggers': { | ||
| + | 'celery': { | ||
| + | 'handlers': ['celery', 'console'], | ||
| + | 'level': 'DEBUG' | ||
| + | }, | ||
| + | } | ||
| + | } | ||
| + | dictConfig(LOGGING_CONFIG) | ||
| + | </code> | ||
| + | Remember to change the ''CELERYD_HIJACK_ROOT_LOGGER'' config option to ''False'', this way will tell Celery to remove its root log (there are [[http://celery.readthedocs.org/en/latest/configuration.html?highlight=log#logging|other log options]]). | ||
| + | There is another way to configure logs, and it is using [[http://celery.readthedocs.org/en/latest/userguide/signals.html#logging-signals|signals]]. ''setup_loggin'' will also disable the root config and allow you to assign yours. If you would like to augment the logging default configuration setup then you can use the ''after_setup_logger'' and ''after_setup_task_logger''. | ||
| + | ===== Advanced ===== | ||
| + | ==== Application ==== | ||
| + | ==== Tasks ==== | ||
| + | A task message does not disappear from a queue until it has been acknowldged by a worker. If the worker is kile the message will be redelivered to ntoher worker. It acknowledge the message in advance, before it's executed (the task that has already been starte is never executed again). However you can set the acks_late option to have the worker acknowledge the message after the task returns instead. \\ | ||
| + | You create a task using the ''@task'' decorator. If you needed to use more decorator, this should be the first: | ||
| + | <code python> | ||
| + | @app.task | ||
| + | @decorator2 | ||
| + | @decorator1 | ||
| + | def add(x, y): | ||
| + | return x + y | ||
| + | </code> | ||
| + | A name of a task is generated from the name of the function. If you wanted to assign another name you should use the name parameter for the ''@task'' decorator. A best practice is to use the module name as a namespace, this way names won’t collide: | ||
| + | <code python> | ||
| + | @app.task(name='tasks.add') | ||
| + | def add(x, y): | ||
| + | return x + y | ||
| + | </code> | ||
| + | You can know the name of the task using the ''name'' property: | ||
| + | <code> | ||
| + | add.name | ||
| + | </code> | ||
| + | The bind parameter for ''@task'' allows to the task to recibe information of its calling using the [[http://docs.celeryproject.org/en/latest/userguide/tasks.html#context|request variable]]. The data is accessed in the ''self'' variable. \\ | ||
| + | You can also use the ''.retry()'' method to send the same task again. This method cuts the execution of the task, it's because it throws an exception. If you don't want this behaviour set the ''throw'' parameter for ''retry()'' as ''False''. There also is the ''max_retries'' properties. Or the ''default_retry_delay'' for the task (default set as 30 secs). | ||
| + | <code python> | ||
| + | @app.task(bind=True) | ||
| + | def send_twitter_status(self, oauth, tweet): | ||
| + | try: | ||
| + | twitter = Twitter(oauth) | ||
| + | twitter.update_status(tweet) | ||
| + | except (Twitter.FailWhaleError, Twitter.LoginError) as exc: | ||
| + | raise self.retry(exc=exc) | ||
| + | </code> | ||
| + | * More [[options for tasks]]. | ||
| + | * If you don’t care about the results of a task, be sure to set the ignore_result option, as storing results wastes time and resources. | ||
| + | * Disabling rate limits altogether is recommended if you don’t have any tasks using them. This is because the rate limit subsystem introduces quite a lot of complexity (with ''CELERY_DISABLE_RATE_LIMITS'' variable set as ''True''). | ||
| + | ==== Routing ==== | ||
| + | Option ''CELERY_CREATE_MISSING_QUEUES'' setted as ''True'' will allow Celery to autoconfigure queues by default. Default queue is named ''celery''. To change it you will use: | ||
| + | <code python> | ||
| + | from kombu import Exchange, Queue | ||
| + | CELERY_DEFAULT_QUEUE = 'default' | ||
| + | CELERY_QUEUES = ( | ||
| + | Queue('default', Exchange('default'), routing_key='default'), | ||
| + | ) | ||
| + | </code> | ||
| + | You could tell that a task server uses concretes queues with the -Q parameter: | ||
| + | <code> | ||
| + | $ celery -A proj worker -Q feeds | ||
| + | $ celery -A proj worker -Q feeds,celery | ||
| + | </code> | ||
| + | In AMPQ, when you create a query an exchange and a routing_key with the same name will be created. The type of the exchange will be direct. \\ | ||
| + | If you had several server tasks and you wanted that your client made selective sending of tasks you would do something like the next to tell that for task 'feed.tasks.import_feed' uses the queue 'feeds': | ||
| + | <code python> | ||
| + | CELERY_ROUTES = {'feed.tasks.import_feed': {'queue': 'feeds'}} | ||
| + | </code> | ||
| + | You also could use RabbitMQ routing names: | ||
| + | <code python> | ||
| + | from kombu import Queue | ||
| + | |||
| + | CELERY_DEFAULT_QUEUE = 'default' | ||
| + | CELERY_QUEUES = ( | ||
| + | Queue('default', routing_key='task.#'), | ||
| + | Queue('feed_tasks', routing_key='feed.#'), | ||
| + | ) | ||
| + | CELERY_DEFAULT_EXCHANGE = 'tasks' | ||
| + | CELERY_DEFAULT_EXCHANGE_TYPE = 'topic' | ||
| + | CELERY_DEFAULT_ROUTING_KEY = 'task.default' | ||
| + | </code> | ||
| + | To route a task to the feed_tasks queue, you can add an entry in the CELERY_ROUTES setting: | ||
| + | <code python> | ||
| + | CELERY_ROUTES = { | ||
| + | 'feeds.tasks.import_feed': { | ||
| + | 'queue': 'feed_tasks', | ||
| + | 'routing_key': 'feed.import', | ||
| + | }, | ||
| + | } | ||
| + | </code> | ||
| + | CELERY_QUEUES is a list of Queue instances. If you don’t set the exchange or exchange type values for a key, these will be taken from the ''CELERY_DEFAULT_EXCHANGE'' and ''CELERY_DEFAULT_EXCHANGE_TYPE'' settings. \\ | ||
| + | You can define exhanges for queues in the next way: | ||
| + | <code python> | ||
| + | from kombu import Exchange, Queue | ||
| + | |||
| + | CELERY_QUEUES = ( | ||
| + | Queue('default', Exchange('default'), routing_key='default'), | ||
| + | Queue('videos', Exchange('media'), routing_key='media.video'), | ||
| + | Queue('images', Exchange('media'), routing_key='media.image'), | ||
| + | ) | ||
| + | CELERY_DEFAULT_QUEUE = 'default' | ||
| + | CELERY_DEFAULT_EXCHANGE_TYPE = 'direct' | ||
| + | CELERY_DEFAULT_ROUTING_KEY = 'default' | ||
| + | </code> | ||
| + | The destination for a task is decided by the following (in order): | ||
| + | - The Routers defined in CELERY_ROUTES. | ||
| + | - The routing arguments to Task.apply_async(). | ||
| + | - Routing related attributes defined on the Task itself. | ||
| + | === Routers === | ||
| + | A router is a class that decides the routing options for a task. All you need to define a new router is to create a class with a route_for_task method: | ||
| + | <code python> | ||
| + | class MyRouter(object): | ||
| + | def route_for_task(self, task, args=None, kwargs=None): | ||
| + | if task == 'myapp.tasks.compress_video': | ||
| + | return {'exchange': 'video', | ||
| + | 'exchange_type': 'topic', | ||
| + | 'routing_key': 'video.compress'} | ||
| + | return None | ||
| + | </code> | ||
| + | It will modify the header for configured messages. To assign them: | ||
| + | <code python> | ||
| + | CELERY_ROUTES = (MyRouter(), ) | ||
| + | </code> | ||
| + | Or by name... | ||
| + | <code python> | ||
| + | CELERY_ROUTES = ('myapp.routers.MyRouter', ) | ||
| + | </code> | ||
| + | If simply you want to map by name: | ||
| + | <code python> | ||
| + | CELERY_ROUTES = ({'myapp.tasks.compress_video': { | ||
| + | 'queue': 'video', | ||
| + | 'routing_key': 'video.compress' | ||
| + | }}, ) | ||
| + | </code> | ||
| ===== Other tools ===== | ===== Other tools ===== | ||
| ==== Monitoring ==== | ==== Monitoring ==== | ||
| Línea 381: | Línea 540: | ||
| ==== Testing ==== | ==== Testing ==== | ||
| With the configuration ''CELERY_ALWAYS_EAGER'' assigned as True, all the workers are called without any asynchronous behavior. | With the configuration ''CELERY_ALWAYS_EAGER'' assigned as True, all the workers are called without any asynchronous behavior. | ||
| + | ==== Periodic tasks ==== | ||
| + | You can call Celery tasks in concrete moments: [[http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html| periodic tasks guide]]. To do so you need a beat process (only one) to call the workers when it's needed. Next example are two files: | ||
| + | * **beat.py**, which is executed as: ''celery -A beat beat'' | ||
| + | <code python> | ||
| + | from datetime import timedelta | ||
| + | from celery import Celery | ||
| + | from tasks import app | ||
| + | |||
| + | app.conf.update( | ||
| + | CELERYBEAT_SCHEDULE = { | ||
| + | 'add-every-30-seconds': { | ||
| + | 'task': 'tasks.add', | ||
| + | 'schedule': timedelta(seconds=10) | ||
| + | }, | ||
| + | }, | ||
| + | CELERY_TIMEZONE = 'UTC' | ||
| + | ) | ||
| + | </code> | ||
| + | * **tasks.py**, which is executed as: ''celery -A tasks worker'' | ||
| + | <code python> | ||
| + | from celery import Celery | ||
| + | import datetime | ||
| + | |||
| + | app = Celery('tasks', broker='amqp://guest@192.168.0.100//') | ||
| + | |||
| + | @app.task | ||
| + | def add(): | ||
| + | f = open('/home/alfred/a.txt', 'a') | ||
| + | f.write(str(datetime.datetime.now())) | ||
| + | f.close() | ||
| + | </code> | ||
| + | |||
| ==== Little tools ==== | ==== Little tools ==== | ||
| - | === Periodic tasks === | + | |
| - | You can call Celery tasks in concrete moments: [[http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html| periodic tasks guide]]. | + | |
| === HTTP Callbacks === | === HTTP Callbacks === | ||
| Línea 446: | Línea 636: | ||
| >>> result = my_task.AsyncResult(task_id) | >>> result = my_task.AsyncResult(task_id) | ||
| >>> result.get() | >>> result.get() | ||
| + | </code> | ||
| + | |||
| + | ==== Celery & Django ==== | ||
| + | You'll need to install the package: | ||
| + | <code> | ||
| + | $ pip install django-celery | ||
| + | </code> | ||
| + | You will create a Django app, inside it you will add a task.py file: | ||
| + | <code python> | ||
| + | from __future__ import absolute_import | ||
| + | |||
| + | import os | ||
| + | from celery import Celery | ||
| + | from django.conf import settings | ||
| + | |||
| + | os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') | ||
| + | |||
| + | app = Celery('proj') | ||
| + | app.config_from_object('django.conf:settings') | ||
| + | app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) | ||
| + | |||
| + | |||
| + | @app.task(bind=True) | ||
| + | def debug_task(self): | ||
| + | print('Request: {0!r}'.format(self.request)) | ||
| + | </code> | ||
| + | On the <nowiki>__init__.py</nowiki> you will add: | ||
| + | <code python> | ||
| + | from __future__ import absolute_import | ||
| + | from .celery import app as celery_app | ||
| + | </code> | ||
| + | On the ''INSTALLED_APPS'' you'll add: ''djcelery'' \\ | ||
| + | On settings you will add: | ||
| + | <code python> | ||
| + | CELERY_RESULT_BACKEND='djcelery.backends.cache:CacheBackend' | ||
| + | </code> | ||
| + | To start workers: | ||
| + | <code> | ||
| + | $ python manage.py celeryd -l info | ||
| + | </code> | ||
| + | |||
| + | ==== Notes ==== | ||
| + | * If you want to use print you must set ''CELERY_REDIRECT_STDOUTS'' to ''False''. | ||
| + | |||
| + | === Execute celery task from c# === | ||
| + | <code csharp> | ||
| + | using (var channel = connection.CreateModel()) | ||
| + | { | ||
| + | var guid = Guid.NewGuid().ToString(); | ||
| + | string message = String.Format("{{\"id\": \"{0}\", \"task\": \"my.task\", \"args\": [1, 2]}}", guid); | ||
| + | var body = Encoding.UTF8.GetBytes(message); | ||
| + | |||
| + | IBasicProperties props = channel.CreateBasicProperties(); | ||
| + | props.ContentType = "application/json"; | ||
| + | props.ContentEncoding = "UTF-8"; | ||
| + | //channel.QueueDeclare("celery", true, true, false, null); | ||
| + | channel.BasicPublish("celery", "celery", props, body); | ||
| + | |||
| + | Console.WriteLine(" [x] Sent {0}", message); | ||
| + | } | ||
| + | </code> | ||
| + | When task is: | ||
| + | <code python> | ||
| + | app = Celery('tasks', broker='amqp://guest@192.168.0.100//') | ||
| + | @app.task(name='my.task', ignore_result=True) | ||
| + | def add(a, b): | ||
| + | logger = get_task_logger(__name__) | ||
| + | logger.critical('tralara lalala ' + str(a+b)) | ||
| </code> | </code> | ||