Muestra las diferencias entre dos versiones de la página.
| Ambos lados, revisión anterior Revisión previa Próxima revisión | Revisión previa | ||
|
comb:rabbitmq [2014/07/15 09:25] alfred [Commands] |
comb:rabbitmq [2020/05/09 09:25] (actual) |
||
|---|---|---|---|
| Línea 23: | Línea 23: | ||
| <code> | <code> | ||
| $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged | $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged | ||
| + | </code> | ||
| + | |||
| + | === Listing exchanges === | ||
| + | <code> | ||
| + | $ sudo rabbitmqctl list_exchanges | ||
| + | </code> | ||
| + | |||
| + | === List bindings === | ||
| + | <code> | ||
| + | $ sudo rabbitmqctl list_bindings | ||
| + | </code> | ||
| + | ==== Concepts ==== | ||
| + | === Exchanges === | ||
| + | A producer can only send messages to an exchange. The exchange receives messages from producers and it pushes them to queues. | ||
| + | * direct | ||
| + | * topic | ||
| + | * headers | ||
| + | * fanout: it broadcasts all the messages it receives to all the queues it knows. | ||
| + | |||
| + | Some amq.* exchanges and the default (unnamed) exchange are created by default, we identify the default exchange by the empty string. | ||
| + | |||
| + | To declare an exchange it's used: | ||
| + | <code python> | ||
| + | channel.exchange_declare(exchange='logs', type='fanout') | ||
| + | </code> | ||
| + | |||
| + | === Bindings === | ||
| + | The relationship between exchange and a queue is called a binding. | ||
| + | <code python> | ||
| + | channel.queue_bind(exchange='logs', queue=result.method.queue) | ||
| + | </code> | ||
| + | |||
| + | === Temporary queues === | ||
| + | Whe we want to hear about all messages and not just a subset of them. We need to connect to Rabbit with an empty queue. To do it we could let the server choose a random queue name for us. We can do this by not supplying the queue parameter to queue_declare: | ||
| + | <code python> | ||
| + | result = channel.queue_declare() | ||
| + | </code> | ||
| + | Also we need that once we disconnect the consumer the queue should be deleted. There's an exclusive flag for that: | ||
| + | <code python> | ||
| + | result = channel.queue_declare(exclusive=True) | ||
| </code> | </code> | ||
| ===== Use ===== | ===== Use ===== | ||
| Línea 61: | Línea 101: | ||
| </code> | </code> | ||
| * ''no_ack'' means //no acknowledgement// messages. | * ''no_ack'' means //no acknowledgement// messages. | ||
| - | + | * We can tell to Rabbit that do not deliver more than a message to a worker with ''prefetch_count=1'' into the worker channel. | |
| - | ==== Message acknowledgment ==== | + | ==== Message acknowledgment and persistent queues ==== |
| * Message acknoledgement is done after connection dies. | * Message acknoledgement is done after connection dies. | ||
| + | * Message acknoledgement is active by default, it is defined by ''no_ack'' parameter. | ||
| + | * Persistent messages and queues are those which remain in the computer even if the computer or rabbit crash. | ||
| + | * They are activated marking the queue as durable and the delivery_mode property with a value 2. | ||
| + | <code python> | ||
| + | #!/usr/bin/env python | ||
| + | # -*- coding: utf-8 -*- | ||
| + | import pika | ||
| + | import time | ||
| + | |||
| + | |||
| + | def callback(ch, method, properties, body): | ||
| + | print " [x] Received %r" % (body,) | ||
| + | time.sleep(body.count('.')) | ||
| + | print " [x] Done" | ||
| + | ch.basic_ack(delivery_tag=method.delivery_tag) | ||
| + | |||
| + | connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) | ||
| + | channel = connection.channel() | ||
| + | channel.queue_declare(queue='task_queue', durable=True) | ||
| + | print ' [*] Waiting for messages. To exit press CTRL+C' | ||
| + | channel.basic_qos(prefetch_count=1) | ||
| + | channel.basic_consume(callback, queue='task_queue') | ||
| + | channel.start_consuming() | ||
| + | </code> | ||
| + | |||
| + | <code python> | ||
| + | #!/usr/bin/env python | ||
| + | # -*- coding: utf-8 -*- | ||
| + | |||
| + | import pika | ||
| + | import sys | ||
| + | |||
| + | connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) | ||
| + | channel = connection.channel() | ||
| + | channel.queue_declare(queue='task_queue', durable=True) | ||
| + | message = ' '.join(sys.argv[1:]) or "Hello World!" | ||
| + | channel.basic_publish(exchange='', | ||
| + | routing_key='task_queue', | ||
| + | body=message, | ||
| + | properties=pika.BasicProperties(delivery_mode=2,)) | ||
| + | print " [x] Sent %r" % (message,) | ||
| + | connection.close() | ||
| + | </code> | ||
| + | |||
| + | ==== Publish\Subscriber ==== | ||
| + | |||
| + | <code python> | ||
| + | #!/usr/bin/env python | ||
| + | # -*- coding: utf-8 -*- | ||
| + | |||
| + | import pika | ||
| + | |||
| + | |||
| + | def callback(ch, method, properties, body): | ||
| + | print " [x] %r" % (body,) | ||
| + | |||
| + | connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) | ||
| + | channel = connection.channel() | ||
| + | channel.exchange_declare(exchange='logs', type='fanout') | ||
| + | result = channel.queue_declare(exclusive=True) | ||
| + | queue_name = result.method.queue | ||
| + | channel.queue_bind(exchange='logs', queue=queue_name) | ||
| + | print ' [*] Waiting for logs. To exit press CTRL+C' | ||
| + | channel.basic_consume(callback, queue=queue_name, no_ack=True) | ||
| + | channel.start_consuming() | ||
| + | </code> | ||
| + | |||
| + | <code python> | ||
| + | #!/usr/bin/env python | ||
| + | # -*- coding: utf-8 -*- | ||
| + | |||
| + | import pika | ||
| + | import sys | ||
| + | |||
| + | connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) | ||
| + | channel = connection.channel() | ||
| + | channel.exchange_declare(exchange='logs', type='fanout') | ||
| + | message = ' '.join(sys.argv[1:]) or "info: Hello World!" | ||
| + | channel.basic_publish(exchange='logs', routing_key='', body=message) | ||
| + | print " [x] Sent %r" % (message,) | ||
| + | connection.close() | ||
| + | </code> | ||
| + | ===== Others ===== | ||
| + | ==== Plugins ==== | ||
| + | === Management === | ||
| + | <code> | ||
| + | $ rabbitmq-plugins enable rabbitmq_management | ||
| + | </code> | ||
| + | Will enable the web management server. You'll find at: [[http://server:15672]] (p.eg. [[http://127.0.0.1:15672/]]). Username ''guest'' and password ''guest'' is the authentication by default. | ||
| + | |||
| + | Para añadir usuarios: | ||
| + | |||
| + | - you need to create a user for any vhost on that system (here I use default vhost "/") | ||
| + | <code> | ||
| + | $ rabbitmqctl add_user yourName yourPass | ||
| + | </code> | ||
| + | - Set the permissions for that user for default vhost | ||
| + | <code> | ||
| + | $ rabbitmqctl set_permissions -p / yourName ".*" ".*" ".*" | ||
| + | </code> | ||
| + | - Set the administrator tag for this user (to enable him access the management pluggin) | ||
| + | <code> | ||
| + | $ rabbitmqctl set_user_tags yourName administrator | ||
| + | </code> | ||
| + | ==== Notes ==== | ||
| + | === Reset RabbitMQ === | ||
| + | ... Queues, config, everything! | ||
| + | <code> | ||
| + | $ rabbitmqctl stop_app | ||
| + | $ rabbitmqctl reset | ||
| + | $ rabbitmqctl start_app | ||
| + | </code> | ||