Herramientas de usuario

Herramientas del sitio


fw:celery

Diferencias

Muestra las diferencias entre dos versiones de la página.

Enlace a la vista de comparación

Ambos lados, revisión anterior Revisión previa
Próxima revisión
Revisión previa
fw:celery [2014/08/17 14:49]
alfred [Routing]
fw:celery [2020/05/09 09:25] (actual)
Línea 393: Línea 393:
 ==== Application ==== ==== Application ====
 ==== Tasks ==== ==== Tasks ====
-==== Routing ==== +A task message does not disappear from a queue until it has been acknowldged by a worker. If the worker is kile the message ​will be redelivered ​to ntoher worker. It acknowledge the message in advance, before it's executed (the task that has already been starte is never executed again). However you can set the acks_late option ​to have the worker acknowledge the message after the task returns instead\\  
-Option ''​CELERY_CREATE_MISSING_QUEUES''​ setted as ''​True'' ​will allow Celery ​to autoconfigure queues by default. However ​if you had several server tasks and you wanted that your client made selective sending of tasks you would do something like the next to tell that for task 'feed.tasks.import_feed'​ uses the queue 'feeds':+You create a task using the ''​@task''​ decorator. If you needed to use more decorator, this should be the first:
 <code python> <code python>
-CELERY_ROUTES = {'feed.tasks.import_feed':​ {'​queue'​'​feeds'​}}+@app.task 
 +@decorator2 
 +@decorator1 
 +def add(x, y): 
 +    return x + y
 </​code>​ </​code>​
-Default queue is named ''​celery''​. To change it you will use:+A name of a task is generated from the name of the function. If you wanted to assign another name you should use the name parameter for the ''​@task''​ decorator. A best practice is to use the module name as a namespace, this way names won’t collide: 
 +<code python>​ 
 +@app.task(name='​tasks.add'​) 
 +def add(x, y): 
 +  return x + y 
 +</​code>​ 
 +You can know the name of the task using the ''​name''​ property: 
 +<​code>​ 
 +add.name 
 +</​code>​ 
 +The bind parameter for ''​@task''​ allows to the task to recibe information of its calling using the [[http://​docs.celeryproject.org/​en/​latest/​userguide/​tasks.html#​context|request variable]]. The data is accessed in the ''​self''​ variable. \\  
 +You can also use the ''​.retry()''​ method to send the same task again. This method cuts the execution of the task, it's because it throws an exception. If you don't want this behaviour set the ''​throw''​ parameter for ''​retry()''​ as ''​False''​. There also is the ''​max_retries''​ properties. Or the ''​default_retry_delay''​ for the task (default set as 30 secs). 
 +<code python>​ 
 +@app.task(bind=True) 
 +def send_twitter_status(self,​ oauth, tweet): 
 +    try: 
 +        twitter = Twitter(oauth) 
 +        twitter.update_status(tweet) 
 +    except (Twitter.FailWhaleError,​ Twitter.LoginError) as exc: 
 +        raise self.retry(exc=exc) 
 +</​code>​ 
 +  * More [[options for tasks]]. 
 +  * If you don’t care about the results of a task, be sure to set the ignore_result option, as storing results wastes time and resources. 
 +  * Disabling rate limits altogether is recommended if you don’t have any tasks using them. This is because the rate limit subsystem introduces quite a lot of complexity (with ''​CELERY_DISABLE_RATE_LIMITS''​ variable set as ''​True''​). 
 +==== Routing ==== 
 +Option ''​CELERY_CREATE_MISSING_QUEUES''​ setted as ''​True''​ will allow Celery to autoconfigure queues by default.  ​Default queue is named ''​celery''​. To change it you will use:
 <code python> <code python>
 from kombu import Exchange, Queue from kombu import Exchange, Queue
Línea 411: Línea 440:
 $ celery -A proj worker -Q feeds,​celery $ celery -A proj worker -Q feeds,​celery
 </​code>​ </​code>​
 +In AMPQ, when you create a query an exchange and a routing_key with the same name will be created. The type of the exchange will be direct. \\ 
 +If you had several server tasks and you wanted that your client made selective sending of tasks you would do something like the next to tell that for task '​feed.tasks.import_feed'​ uses the queue '​feeds':​
 +<code python>
 +CELERY_ROUTES = {'​feed.tasks.import_feed':​ {'​queue':​ '​feeds'​}}
 +</​code>​
 +You also could use RabbitMQ routing names:
 +<code python>
 +from kombu import Queue
  
 +CELERY_DEFAULT_QUEUE = '​default'​
 +CELERY_QUEUES = (
 +    Queue('​default', ​   routing_key='​task.#'​),​
 +    Queue('​feed_tasks',​ routing_key='​feed.#'​),​
 +)
 +CELERY_DEFAULT_EXCHANGE = '​tasks'​
 +CELERY_DEFAULT_EXCHANGE_TYPE = '​topic'​
 +CELERY_DEFAULT_ROUTING_KEY = '​task.default'​
 +</​code>​
 +To route a task to the feed_tasks queue, you can add an entry in the CELERY_ROUTES setting:
 +<code python>
 +CELERY_ROUTES = {
 +        '​feeds.tasks.import_feed':​ {
 +            '​queue':​ '​feed_tasks',​
 +            '​routing_key':​ '​feed.import',​
 +        },
 +}
 +</​code>​
 +CELERY_QUEUES is a list of Queue instances. If you don’t set the exchange or exchange type values for a key, these will be taken from the ''​CELERY_DEFAULT_EXCHANGE''​ and ''​CELERY_DEFAULT_EXCHANGE_TYPE''​ settings. \\ 
 +You can define exhanges for queues in the next way:
 +<code python>
 +from kombu import Exchange, Queue
 +
 +CELERY_QUEUES = (
 +    Queue('​default',​ Exchange('​default'​),​ routing_key='​default'​),​
 +    Queue('​videos', ​ Exchange('​media'​), ​  ​routing_key='​media.video'​),​
 +    Queue('​images', ​ Exchange('​media'​), ​  ​routing_key='​media.image'​),​
 +)
 +CELERY_DEFAULT_QUEUE = '​default'​
 +CELERY_DEFAULT_EXCHANGE_TYPE = '​direct'​
 +CELERY_DEFAULT_ROUTING_KEY = '​default'​
 +</​code>​
 +The destination for a task is decided by the following (in order):
 +  - The Routers defined in CELERY_ROUTES.
 +  - The routing arguments to Task.apply_async().
 +  - Routing related attributes defined on the Task itself.
 +=== Routers ===
 +A router is a class that decides the routing options for a task. All you need to define a new router is to create a class with a route_for_task method:
 +<code python>
 +class MyRouter(object):​
 +    def route_for_task(self,​ task, args=None, kwargs=None):​
 +        if task == '​myapp.tasks.compress_video':​
 +            return {'​exchange':​ '​video',​
 +                    '​exchange_type':​ '​topic',​
 +                    '​routing_key':​ '​video.compress'​}
 +        return None
 +</​code>​
 +It will modify the header for configured messages. To assign them:
 +<code python>
 +CELERY_ROUTES = (MyRouter(),​ )
 +</​code>​
 +Or by name...
 +<code python>
 +CELERY_ROUTES = ('​myapp.routers.MyRouter',​ )
 +</​code>​
 +If simply you want to map by name:
 +<code python>
 +CELERY_ROUTES = ({'​myapp.tasks.compress_video':​ {
 +                        '​queue':​ '​video',​
 +                        '​routing_key':​ '​video.compress'​
 +                 }}, )
 +</​code>​
 ===== Other tools ===== ===== Other tools =====
 ==== Monitoring ==== ==== Monitoring ====
Línea 441: Línea 540:
 ==== Testing ==== ==== Testing ====
 With the configuration ''​CELERY_ALWAYS_EAGER''​ assigned as True, all the workers are called without any asynchronous behavior. With the configuration ''​CELERY_ALWAYS_EAGER''​ assigned as True, all the workers are called without any asynchronous behavior.
 +==== Periodic tasks ====
 +You can call Celery tasks in concrete moments: [[http://​docs.celeryproject.org/​en/​latest/​userguide/​periodic-tasks.html| periodic tasks guide]]. To do so you need a beat process (only one) to call the workers when it's needed. Next example are two files:
 +  * **beat.py**,​ which is executed as: ''​celery -A beat beat''​
 +<code python>
 +from datetime import timedelta
 +from celery import Celery
 +from tasks import app
 +
 +app.conf.update(
 + CELERYBEAT_SCHEDULE = {
 +     '​add-every-30-seconds':​ {
 +         '​task':​ '​tasks.add',​
 +         '​schedule':​ timedelta(seconds=10)
 +     },
 + },
 + CELERY_TIMEZONE = '​UTC'​
 + )
 +</​code>​
 +  * **tasks.py**,​ which is executed as: ''​celery -A tasks worker''​
 +<code python>
 +from celery import Celery
 +import datetime
 +
 +app = Celery('​tasks',​ broker='​amqp://​guest@192.168.0.100//'​)
 +
 +@app.task
 +def add():
 +    f = open('/​home/​alfred/​a.txt',​ '​a'​)
 +    f.write(str(datetime.datetime.now()))
 +    f.close()
 +</​code>​
 +
 ==== Little tools ==== ==== Little tools ====
  
-=== Periodic tasks === +
-You can call Celery tasks in concrete moments: [[http://​docs.celeryproject.org/​en/​latest/​userguide/​periodic-tasks.html| periodic tasks guide]].+
  
 === HTTP Callbacks === === HTTP Callbacks ===
Línea 546: Línea 676:
 $ python manage.py celeryd -l info $ python manage.py celeryd -l info
 </​code>​ </​code>​
 +
 +==== Notes ====
 +  * If you want to use print you must set ''​CELERY_REDIRECT_STDOUTS''​ to ''​False''​.
 +
 +=== Execute celery task from c# ===
 +<code csharp>
 +using (var channel = connection.CreateModel())
 +{
 +    var guid = Guid.NewGuid().ToString();​
 +    string message = String.Format("​{{\"​id\":​ \"​{0}\",​ \"​task\":​ \"​my.task\",​ \"​args\":​ [1, 2]}}", ​ guid);
 +    var body = Encoding.UTF8.GetBytes(message);​
 +
 +    IBasicProperties props = channel.CreateBasicProperties();​
 +    props.ContentType = "​application/​json";​
 +    props.ContentEncoding = "​UTF-8";​
 +    //​channel.QueueDeclare("​celery",​ true, true, false, null);
 +    channel.BasicPublish("​celery",​ "​celery",​ props, body);
 +
 +    Console.WriteLine("​ [x] Sent {0}", message);
 +}
 +</​code>​
 +When task is:
 +<code python>
 +app = Celery('​tasks',​ broker='​amqp://​guest@192.168.0.100//'​)
 +@app.task(name='​my.task',​ ignore_result=True)
 +def add(a, b):
 +    logger = get_task_logger(__name__)
 +    logger.critical('​tralara lalala ' + str(a+b))
 +</​code>​
 +
fw/celery.1408286974.txt.gz · Última modificación: 2020/05/09 09:24 (editor externo)