Herramientas de usuario

Herramientas del sitio


comb:rabbitmq

¡Esta es una revisión vieja del documento!


RabbitMQ

Configuration

Install

$ echo "deb http://www.rabbitmq.com/debian/ testing main" >> /etc/apt/sources.list
$ wget http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
$ sudo apt-key add rabbitmq-signing-key-public.asc
$ sudo apt-get install rabbitmq-server

Commands

Manage the server

$ sudo service rabbitmq-server status

Where status can be start, stop, restart

See number of messages

$ sudo rabbitmqctl list_queues
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Listing exchanges

$ sudo rabbitmqctl list_exchanges

List bindings

$ sudo rabbitmqctl list_bindings

Concepts

Exchanges

A producer can only send messages to an exchange. The exchange receives messages from producers and it pushes them to queues.

  • direct
  • topic
  • headers
  • fanout: it broadcasts all the messages it receives to all the queues it knows.

Some amq.* exchanges and the default (unnamed) exchange are created by default, we identify the default exchange by the empty string.

To declare an exchange it's used:

channel.exchange_declare(exchange='logs', type='fanout')

Bindings

The relationship between exchange and a queue is called a binding.

channel.queue_bind(exchange='logs', queue=result.method.queue)

Temporary queues

Whe we want to hear about all messages and not just a subset of them. We need to connect to Rabbit with an empty queue. To do it we could let the server choose a random queue name for us. We can do this by not supplying the queue parameter to queue_declare:

result = channel.queue_declare()

Also we need that once we disconnect the consumer the queue should be deleted. There's an exclusive flag for that:

result = channel.queue_declare(exclusive=True)

Use

Producer & consumer

Producer

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
for i in xrange(100):
    msg = 'Hello World! - ' + str(i)
    channel.basic_publish(exchange='', routing_key='hello', body=msg)
    print " [x] Sent", msg
connection.close()

Consumer

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import pika
 
 
def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'   
channel.basic_consume(callback, queue='hello', no_ack=True)
channel.start_consuming()
  • no_ack means no acknowledgement messages.
  • We can tell to Rabbit that do not deliver more than a message to a worker with prefetch_count=1 into the worker channel.

Message acknowledgment and persistent queues

  • Message acknoledgement is done after connection dies.
    • Message acknoledgement is active by default, it is defined by no_ack parameter.
  • Persistent messages and queues are those which remain in the computer even if the computer or rabbit crash.
    • They are activated marking the queue as durable and the delivery_mode property with a value 2.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import pika
import time
 
 
def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep(body.count('.'))
    print " [x] Done"
    ch.basic_ack(delivery_tag=method.delivery_tag)
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='task_queue')
channel.start_consuming()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(delivery_mode=2,))
print " [x] Sent %r" % (message,)
connection.close()

Publish\Subscriber

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import pika
 
 
def callback(ch, method, properties, body):
    print " [x] %r" % (body,)
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print ' [*] Waiting for logs. To exit press CTRL+C'
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print " [x] Sent %r" % (message,)
connection.close()

Others

Plugins

Management

$ rabbitmq-plugins enable rabbitmq_management

Will enable the web management server. You'll find at: http://server:15672 (p.eg. http://127.0.0.1:15672/). Username guest and password guest is the authentication by default.

Notes

Reset RabbitMQ

… Queues, config, everything!

$ rabbitmqctl stop_app
$ rabbitmqctl reset
$ rabbitmqctl start_app
comb/rabbitmq.1407350640.txt.gz · Última modificación: 2020/05/09 09:24 (editor externo)