Herramientas de usuario

Herramientas del sitio


fw:celery

Diferencias

Muestra las diferencias entre dos versiones de la página.

Enlace a la vista de comparación

Ambos lados, revisión anterior Revisión previa
Próxima revisión
Revisión previa
fw:celery [2014/08/04 17:12]
alfred [Calling tasks]
fw:celery [2020/05/09 09:25] (actual)
Línea 51: Línea 51:
 $ celery help $ celery help
 </​code>​ </​code>​
 +=== Stopping the worker === 
 +You should never stop worker with the KILL signal (-9), unless you’ve tried TERM (Ctrl + C) a few times and waited a few minutes to let it get a chance to shut down. As if you do tasks may be terminated mid-execution,​ and they will not be re-run unless you have the acks_late option set.
 ==== Calling the task and receiving the result ==== ==== Calling the task and receiving the result ====
 To call a task you can use the ''​delay()''​ method, which is a shortcut to the ''​apply_async()'':​ To call a task you can use the ''​delay()''​ method, which is a shortcut to the ''​apply_async()'':​
Línea 326: Línea 327:
 <​code>​ <​code>​
 $ pip install librabbitmq $ pip install librabbitmq
 +</​code>​
 +
 +==== Signals ====
 +Signals are events that trigger actions when something occur in Celery. To connect an action to a signal trigger you'll do.... (for example, ''​after_task_publish''​ signal):
 +<​code>​
 +from celery.signals import after_task_publish
 +
 +@after_task_publish.connect
 +def task_sent_handler(sender=None,​ body=None, **kwargs):
 +    print('​after_task_publish for task id {body[id]}'​.format(
 +        body=body,
 +    ))
 +</​code>​
 +Some signals identify the sender so you can filter by them. For example the after_task_publish signal uses the task name as a sender, so by providing the sender argument to connect you can connect your handler to be called every time a task with name “proj.tasks.add” is published:
 +<code python>
 +@after_task_publish.connect(sender='​proj.tasks.add'​)
 +def task_sent_handler(sender=None,​ body=None, **kwargs):
 +    print('​after_task_publish for task id {body[id]}'​.format(
 +        body=body,
 +    ))
 +</​code>​
 +There are several useful [[http://​docs.celeryproject.org/​en/​latest/​userguide/​signals.html|signals already defined]].
 +
 +==== Logging ====
 +The Celery log works using the Python ''​logging''​ module. So one easy way to assign a log config is using a dictionary and the function ''​logging.config.dictConfig()''​.
 +  * [[https://​docs.python.org/​2/​library/​logging.config.html|dictConfig doc]]
 +  * [[https://​docs.python.org/​2/​library/​logging.html|Python logging module doc]]
 +<code python>
 +LOGGING_CONFIG = {
 +    '​version':​ 1,
 +    '​disable_existing_loggers':​ True,
 +    '​formatters':​ {
 +        '​simple':​ {
 +            '​format':​ '​%(levelname)s %(message)s',​
 +            '​datefmt':​ '%y %b %d, %H:​%M:​%S',​
 +        },
 +    },
 +    '​handlers':​ {
 +        '​console':​ {
 +            '​level':​ '​DEBUG',​
 +            '​class':​ '​logging.StreamHandler',​
 +            '​formatter':​ '​simple'​
 +        },
 +        '​celery':​ {
 +            '​level':​ '​DEBUG',​
 +            '​class':​ '​logging.handlers.RotatingFileHandler',​
 +            '​filename':​ '​celery.log',​
 +            '​formatter':​ '​simple',​
 +            '​maxBytes':​ 1024 * 1024 * 100,  # 100 mb
 +        },
 +    },
 +    '​loggers':​ {
 +        '​celery':​ {
 +            '​handlers':​ ['​celery',​ '​console'​],​
 +            '​level':​ '​DEBUG'​
 +        },
 +    }
 +}
 +dictConfig(LOGGING_CONFIG)
 +</​code>​
 +Remember to change the ''​CELERYD_HIJACK_ROOT_LOGGER''​ config option to ''​False'',​ this way will tell Celery to remove its root log (there are [[http://​celery.readthedocs.org/​en/​latest/​configuration.html?​highlight=log#​logging|other log options]]).
 +
 +There is another way to configure logs, and it is using [[http://​celery.readthedocs.org/​en/​latest/​userguide/​signals.html#​logging-signals|signals]]. ''​setup_loggin''​ will also disable the root config and allow you to assign yours. If you would like to augment the logging default configuration setup then you can use the ''​after_setup_logger''​ and ''​after_setup_task_logger''​.
 +===== Advanced =====
 +==== Application ====
 +==== Tasks ====
 +A task message does not disappear from a queue until it has been acknowldged by a worker. If the worker is kile the message will be redelivered to ntoher worker. It acknowledge the message in advance, before it's executed (the task that has already been starte is never executed again). However you can set the acks_late option to have the worker acknowledge the message after the task returns instead. \\ 
 +You create a task using the ''​@task''​ decorator. If you needed to use more decorator, this should be the first:
 +<code python>
 +@app.task
 +@decorator2
 +@decorator1
 +def add(x, y):
 +    return x + y
 +</​code>​
 +A name of a task is generated from the name of the function. If you wanted to assign another name you should use the name parameter for the ''​@task''​ decorator. A best practice is to use the module name as a namespace, this way names won’t collide:
 +<code python>
 +@app.task(name='​tasks.add'​)
 +def add(x, y):
 +  return x + y
 +</​code>​
 +You can know the name of the task using the ''​name''​ property:
 +<​code>​
 +add.name
 +</​code>​
 +The bind parameter for ''​@task''​ allows to the task to recibe information of its calling using the [[http://​docs.celeryproject.org/​en/​latest/​userguide/​tasks.html#​context|request variable]]. The data is accessed in the ''​self''​ variable. \\ 
 +You can also use the ''​.retry()''​ method to send the same task again. This method cuts the execution of the task, it's because it throws an exception. If you don't want this behaviour set the ''​throw''​ parameter for ''​retry()''​ as ''​False''​. There also is the ''​max_retries''​ properties. Or the ''​default_retry_delay''​ for the task (default set as 30 secs).
 +<code python>
 +@app.task(bind=True)
 +def send_twitter_status(self,​ oauth, tweet):
 +    try:
 +        twitter = Twitter(oauth)
 +        twitter.update_status(tweet)
 +    except (Twitter.FailWhaleError,​ Twitter.LoginError) as exc:
 +        raise self.retry(exc=exc)
 +</​code>​
 +  * More [[options for tasks]].
 +  * If you don’t care about the results of a task, be sure to set the ignore_result option, as storing results wastes time and resources.
 +  * Disabling rate limits altogether is recommended if you don’t have any tasks using them. This is because the rate limit subsystem introduces quite a lot of complexity (with ''​CELERY_DISABLE_RATE_LIMITS''​ variable set as ''​True''​).
 +==== Routing ====
 +Option ''​CELERY_CREATE_MISSING_QUEUES''​ setted as ''​True''​ will allow Celery to autoconfigure queues by default. ​ Default queue is named ''​celery''​. To change it you will use:
 +<code python>
 +from kombu import Exchange, Queue
 +CELERY_DEFAULT_QUEUE = '​default'​
 +CELERY_QUEUES = (
 +    Queue('​default',​ Exchange('​default'​),​ routing_key='​default'​),​
 +)
 +</​code>​
 +You could tell that a task server uses concretes queues with the -Q parameter:
 +<​code>​
 +$ celery -A proj worker -Q feeds
 +$ celery -A proj worker -Q feeds,​celery
 +</​code>​
 +In AMPQ, when you create a query an exchange and a routing_key with the same name will be created. The type of the exchange will be direct. \\ 
 +If you had several server tasks and you wanted that your client made selective sending of tasks you would do something like the next to tell that for task '​feed.tasks.import_feed'​ uses the queue '​feeds':​
 +<code python>
 +CELERY_ROUTES = {'​feed.tasks.import_feed':​ {'​queue':​ '​feeds'​}}
 +</​code>​
 +You also could use RabbitMQ routing names:
 +<code python>
 +from kombu import Queue
 +
 +CELERY_DEFAULT_QUEUE = '​default'​
 +CELERY_QUEUES = (
 +    Queue('​default', ​   routing_key='​task.#'​),​
 +    Queue('​feed_tasks',​ routing_key='​feed.#'​),​
 +)
 +CELERY_DEFAULT_EXCHANGE = '​tasks'​
 +CELERY_DEFAULT_EXCHANGE_TYPE = '​topic'​
 +CELERY_DEFAULT_ROUTING_KEY = '​task.default'​
 +</​code>​
 +To route a task to the feed_tasks queue, you can add an entry in the CELERY_ROUTES setting:
 +<code python>
 +CELERY_ROUTES = {
 +        '​feeds.tasks.import_feed':​ {
 +            '​queue':​ '​feed_tasks',​
 +            '​routing_key':​ '​feed.import',​
 +        },
 +}
 +</​code>​
 +CELERY_QUEUES is a list of Queue instances. If you don’t set the exchange or exchange type values for a key, these will be taken from the ''​CELERY_DEFAULT_EXCHANGE''​ and ''​CELERY_DEFAULT_EXCHANGE_TYPE''​ settings. \\ 
 +You can define exhanges for queues in the next way:
 +<code python>
 +from kombu import Exchange, Queue
 +
 +CELERY_QUEUES = (
 +    Queue('​default',​ Exchange('​default'​),​ routing_key='​default'​),​
 +    Queue('​videos', ​ Exchange('​media'​), ​  ​routing_key='​media.video'​),​
 +    Queue('​images', ​ Exchange('​media'​), ​  ​routing_key='​media.image'​),​
 +)
 +CELERY_DEFAULT_QUEUE = '​default'​
 +CELERY_DEFAULT_EXCHANGE_TYPE = '​direct'​
 +CELERY_DEFAULT_ROUTING_KEY = '​default'​
 +</​code>​
 +The destination for a task is decided by the following (in order):
 +  - The Routers defined in CELERY_ROUTES.
 +  - The routing arguments to Task.apply_async().
 +  - Routing related attributes defined on the Task itself.
 +=== Routers ===
 +A router is a class that decides the routing options for a task. All you need to define a new router is to create a class with a route_for_task method:
 +<code python>
 +class MyRouter(object):​
 +    def route_for_task(self,​ task, args=None, kwargs=None):​
 +        if task == '​myapp.tasks.compress_video':​
 +            return {'​exchange':​ '​video',​
 +                    '​exchange_type':​ '​topic',​
 +                    '​routing_key':​ '​video.compress'​}
 +        return None
 +</​code>​
 +It will modify the header for configured messages. To assign them:
 +<code python>
 +CELERY_ROUTES = (MyRouter(),​ )
 +</​code>​
 +Or by name...
 +<code python>
 +CELERY_ROUTES = ('​myapp.routers.MyRouter',​ )
 +</​code>​
 +If simply you want to map by name:
 +<code python>
 +CELERY_ROUTES = ({'​myapp.tasks.compress_video':​ {
 +                        '​queue':​ '​video',​
 +                        '​routing_key':​ '​video.compress'​
 +                 }}, )
 </​code>​ </​code>​
 ===== Other tools ===== ===== Other tools =====
Línea 353: Línea 537:
 $ celery -A proj status $ celery -A proj status
 </​code>​ </​code>​
 +
 +==== Testing ====
 +With the configuration ''​CELERY_ALWAYS_EAGER''​ assigned as True, all the workers are called without any asynchronous behavior.
 +==== Periodic tasks ====
 +You can call Celery tasks in concrete moments: [[http://​docs.celeryproject.org/​en/​latest/​userguide/​periodic-tasks.html| periodic tasks guide]]. To do so you need a beat process (only one) to call the workers when it's needed. Next example are two files:
 +  * **beat.py**,​ which is executed as: ''​celery -A beat beat''​
 +<code python>
 +from datetime import timedelta
 +from celery import Celery
 +from tasks import app
 +
 +app.conf.update(
 + CELERYBEAT_SCHEDULE = {
 +     '​add-every-30-seconds':​ {
 +         '​task':​ '​tasks.add',​
 +         '​schedule':​ timedelta(seconds=10)
 +     },
 + },
 + CELERY_TIMEZONE = '​UTC'​
 + )
 +</​code>​
 +  * **tasks.py**,​ which is executed as: ''​celery -A tasks worker''​
 +<code python>
 +from celery import Celery
 +import datetime
 +
 +app = Celery('​tasks',​ broker='​amqp://​guest@192.168.0.100//'​)
 +
 +@app.task
 +def add():
 +    f = open('/​home/​alfred/​a.txt',​ '​a'​)
 +    f.write(str(datetime.datetime.now()))
 +    f.close()
 +</​code>​
 +
 ==== Little tools ==== ==== Little tools ====
  
-=== Periodic tasks === +
-You can call Celery tasks in concrete moments: [[http://​docs.celeryproject.org/​en/​latest/​userguide/​periodic-tasks.html| periodic tasks guide]].+
  
 === HTTP Callbacks === === HTTP Callbacks ===
Línea 418: Línea 636:
 >>>​ result = my_task.AsyncResult(task_id) >>>​ result = my_task.AsyncResult(task_id)
 >>>​ result.get() >>>​ result.get()
 +</​code>​
 +
 +==== Celery & Django ====
 +You'll need to install the package:
 +<​code>​
 +$ pip install django-celery
 +</​code>​
 +You will create a Django app, inside it you will add a task.py file:
 +<code python>
 +from __future__ import absolute_import
 +
 +import os
 +from celery import Celery
 +from django.conf import settings
 +
 +os.environ.setdefault('​DJANGO_SETTINGS_MODULE',​ '​proj.settings'​)
 +
 +app = Celery('​proj'​)
 +app.config_from_object('​django.conf:​settings'​)
 +app.autodiscover_tasks(lambda:​ settings.INSTALLED_APPS)
 +
 +
 +@app.task(bind=True)
 +def debug_task(self):​
 +    print('​Request:​ {0!r}'​.format(self.request))
 +</​code>​
 +On the <​nowiki>​__init__.py</​nowiki>​ you will add:
 +<code python>
 +from __future__ import absolute_import
 +from .celery import app as celery_app
 +</​code>​
 +On the ''​INSTALLED_APPS''​ you'll add: ''​djcelery''​ \\
 +On settings you will add:
 +<code python>
 +CELERY_RESULT_BACKEND='​djcelery.backends.cache:​CacheBackend'​
 +</​code>​
 +To start workers:
 +<​code>​
 +$ python manage.py celeryd -l info
 +</​code>​
 +
 +==== Notes ====
 +  * If you want to use print you must set ''​CELERY_REDIRECT_STDOUTS''​ to ''​False''​.
 +
 +=== Execute celery task from c# ===
 +<code csharp>
 +using (var channel = connection.CreateModel())
 +{
 +    var guid = Guid.NewGuid().ToString();​
 +    string message = String.Format("​{{\"​id\":​ \"​{0}\",​ \"​task\":​ \"​my.task\",​ \"​args\":​ [1, 2]}}", ​ guid);
 +    var body = Encoding.UTF8.GetBytes(message);​
 +
 +    IBasicProperties props = channel.CreateBasicProperties();​
 +    props.ContentType = "​application/​json";​
 +    props.ContentEncoding = "​UTF-8";​
 +    //​channel.QueueDeclare("​celery",​ true, true, false, null);
 +    channel.BasicPublish("​celery",​ "​celery",​ props, body);
 +
 +    Console.WriteLine("​ [x] Sent {0}", message);
 +}
 +</​code>​
 +When task is:
 +<code python>
 +app = Celery('​tasks',​ broker='​amqp://​guest@192.168.0.100//'​)
 +@app.task(name='​my.task',​ ignore_result=True)
 +def add(a, b):
 +    logger = get_task_logger(__name__)
 +    logger.critical('​tralara lalala ' + str(a+b))
 </​code>​ </​code>​
  
fw/celery.1407172363.txt.gz · Última modificación: 2020/05/09 09:24 (editor externo)