====== RabbitMQ ====== ===== Configuration ===== ==== Install ==== $ echo "deb http://www.rabbitmq.com/debian/ testing main" >> /etc/apt/sources.list $ wget http://www.rabbitmq.com/rabbitmq-signing-key-public.asc $ sudo apt-key add rabbitmq-signing-key-public.asc $ sudo apt-get install rabbitmq-server ==== Commands ==== === Manage the server === $ sudo service rabbitmq-server status Where status can be ''start'', ''stop'', ''restart''... === See number of messages === $ sudo rabbitmqctl list_queues === Print not-acknowledge message === $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged === Listing exchanges === $ sudo rabbitmqctl list_exchanges === List bindings === $ sudo rabbitmqctl list_bindings ==== Concepts ==== === Exchanges === A producer can only send messages to an exchange. The exchange receives messages from producers and it pushes them to queues. * direct * topic * headers * fanout: it broadcasts all the messages it receives to all the queues it knows. Some amq.* exchanges and the default (unnamed) exchange are created by default, we identify the default exchange by the empty string. To declare an exchange it's used: channel.exchange_declare(exchange='logs', type='fanout') === Bindings === The relationship between exchange and a queue is called a binding. channel.queue_bind(exchange='logs', queue=result.method.queue) === Temporary queues === Whe we want to hear about all messages and not just a subset of them. We need to connect to Rabbit with an empty queue. To do it we could let the server choose a random queue name for us. We can do this by not supplying the queue parameter to queue_declare: result = channel.queue_declare() Also we need that once we disconnect the consumer the queue should be deleted. There's an exclusive flag for that: result = channel.queue_declare(exclusive=True) ===== Use ===== ==== Producer & consumer ==== === Producer === #!/usr/bin/env python # -*- coding: utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') for i in xrange(100): msg = 'Hello World! - ' + str(i) channel.basic_publish(exchange='', routing_key='hello', body=msg) print " [x] Sent", msg connection.close() === Consumer === #!/usr/bin/env python # -*- coding: utf-8 -*- import pika def callback(ch, method, properties, body): print " [x] Received %r" % (body,) connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') print ' [*] Waiting for messages. To exit press CTRL+C' channel.basic_consume(callback, queue='hello', no_ack=True) channel.start_consuming() * ''no_ack'' means //no acknowledgement// messages. * We can tell to Rabbit that do not deliver more than a message to a worker with ''prefetch_count=1'' into the worker channel. ==== Message acknowledgment and persistent queues ==== * Message acknoledgement is done after connection dies. * Message acknoledgement is active by default, it is defined by ''no_ack'' parameter. * Persistent messages and queues are those which remain in the computer even if the computer or rabbit crash. * They are activated marking the queue as durable and the delivery_mode property with a value 2. #!/usr/bin/env python # -*- coding: utf-8 -*- import pika import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep(body.count('.')) print " [x] Done" ch.basic_ack(delivery_tag=method.delivery_tag) connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming() #!/usr/bin/env python # -*- coding: utf-8 -*- import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties(delivery_mode=2,)) print " [x] Sent %r" % (message,) connection.close() ==== Publish\Subscriber ==== #!/usr/bin/env python # -*- coding: utf-8 -*- import pika def callback(ch, method, properties, body): print " [x] %r" % (body,) connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print ' [*] Waiting for logs. To exit press CTRL+C' channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() #!/usr/bin/env python # -*- coding: utf-8 -*- import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print " [x] Sent %r" % (message,) connection.close() ===== Others ===== ==== Plugins ==== === Management === $ rabbitmq-plugins enable rabbitmq_management Will enable the web management server. You'll find at: [[http://server:15672]] (p.eg. [[http://127.0.0.1:15672/]]). Username ''guest'' and password ''guest'' is the authentication by default. Para aƱadir usuarios: - you need to create a user for any vhost on that system (here I use default vhost "/") $ rabbitmqctl add_user yourName yourPass - Set the permissions for that user for default vhost $ rabbitmqctl set_permissions -p / yourName ".*" ".*" ".*" - Set the administrator tag for this user (to enable him access the management pluggin) $ rabbitmqctl set_user_tags yourName administrator ==== Notes ==== === Reset RabbitMQ === ... Queues, config, everything! $ rabbitmqctl stop_app $ rabbitmqctl reset $ rabbitmqctl start_app