====== RabbitMQ ======
===== Configuration =====
==== Install ====
$ echo "deb http://www.rabbitmq.com/debian/ testing main" >> /etc/apt/sources.list
$ wget http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
$ sudo apt-key add rabbitmq-signing-key-public.asc
$ sudo apt-get install rabbitmq-server
==== Commands ====
=== Manage the server ===
$ sudo service rabbitmq-server status
Where status can be ''start'', ''stop'', ''restart''...
=== See number of messages ===
$ sudo rabbitmqctl list_queues
=== Print not-acknowledge message ===
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
=== Listing exchanges ===
$ sudo rabbitmqctl list_exchanges
=== List bindings ===
$ sudo rabbitmqctl list_bindings
==== Concepts ====
=== Exchanges ===
A producer can only send messages to an exchange. The exchange receives messages from producers and it pushes them to queues.
* direct
* topic
* headers
* fanout: it broadcasts all the messages it receives to all the queues it knows.
Some amq.* exchanges and the default (unnamed) exchange are created by default, we identify the default exchange by the empty string.
To declare an exchange it's used:
channel.exchange_declare(exchange='logs', type='fanout')
=== Bindings ===
The relationship between exchange and a queue is called a binding.
channel.queue_bind(exchange='logs', queue=result.method.queue)
=== Temporary queues ===
Whe we want to hear about all messages and not just a subset of them. We need to connect to Rabbit with an empty queue. To do it we could let the server choose a random queue name for us. We can do this by not supplying the queue parameter to queue_declare:
result = channel.queue_declare()
Also we need that once we disconnect the consumer the queue should be deleted. There's an exclusive flag for that:
result = channel.queue_declare(exclusive=True)
===== Use =====
==== Producer & consumer ====
=== Producer ===
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
for i in xrange(100):
msg = 'Hello World! - ' + str(i)
channel.basic_publish(exchange='', routing_key='hello', body=msg)
print " [x] Sent", msg
connection.close()
=== Consumer ===
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pika
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.basic_consume(callback, queue='hello', no_ack=True)
channel.start_consuming()
* ''no_ack'' means //no acknowledgement// messages.
* We can tell to Rabbit that do not deliver more than a message to a worker with ''prefetch_count=1'' into the worker channel.
==== Message acknowledgment and persistent queues ====
* Message acknoledgement is done after connection dies.
* Message acknoledgement is active by default, it is defined by ''no_ack'' parameter.
* Persistent messages and queues are those which remain in the computer even if the computer or rabbit crash.
* They are activated marking the queue as durable and the delivery_mode property with a value 2.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pika
import time
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep(body.count('.'))
print " [x] Done"
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='task_queue')
channel.start_consuming()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(delivery_mode=2,))
print " [x] Sent %r" % (message,)
connection.close()
==== Publish\Subscriber ====
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pika
def callback(ch, method, properties, body):
print " [x] %r" % (body,)
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print ' [*] Waiting for logs. To exit press CTRL+C'
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print " [x] Sent %r" % (message,)
connection.close()
===== Others =====
==== Plugins ====
=== Management ===
$ rabbitmq-plugins enable rabbitmq_management
Will enable the web management server. You'll find at: [[http://server:15672]] (p.eg. [[http://127.0.0.1:15672/]]). Username ''guest'' and password ''guest'' is the authentication by default.
Para aƱadir usuarios:
- you need to create a user for any vhost on that system (here I use default vhost "/")
$ rabbitmqctl add_user yourName yourPass
- Set the permissions for that user for default vhost
$ rabbitmqctl set_permissions -p / yourName ".*" ".*" ".*"
- Set the administrator tag for this user (to enable him access the management pluggin)
$ rabbitmqctl set_user_tags yourName administrator
==== Notes ====
=== Reset RabbitMQ ===
... Queues, config, everything!
$ rabbitmqctl stop_app
$ rabbitmqctl reset
$ rabbitmqctl start_app