¡Esta es una revisión vieja del documento!
$ echo "deb http://www.rabbitmq.com/debian/ testing main" >> /etc/apt/sources.list $ wget http://www.rabbitmq.com/rabbitmq-signing-key-public.asc $ sudo apt-key add rabbitmq-signing-key-public.asc $ sudo apt-get install rabbitmq-server
$ sudo service rabbitmq-server status
Where status can be start, stop, restart…
$ sudo rabbitmqctl list_queues
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
$ sudo rabbitmqctl list_exchanges
$ sudo rabbitmqctl list_bindings
A producer can only send messages to an exchange. The exchange receives messages from producers and it pushes them to queues.
Some amq.* exchanges and the default (unnamed) exchange are created by default, we identify the default exchange by the empty string.
To declare an exchange it's used:
channel.exchange_declare(exchange='logs', type='fanout')
The relationship between exchange and a queue is called a binding.
channel.queue_bind(exchange='logs', queue=result.method.queue)
Whe we want to hear about all messages and not just a subset of them. We need to connect to Rabbit with an empty queue. To do it we could let the server choose a random queue name for us. We can do this by not supplying the queue parameter to queue_declare:
result = channel.queue_declare()
Also we need that once we disconnect the consumer the queue should be deleted. There's an exclusive flag for that:
result = channel.queue_declare(exclusive=True)
#!/usr/bin/env python # -*- coding: utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') for i in xrange(100): msg = 'Hello World! - ' + str(i) channel.basic_publish(exchange='', routing_key='hello', body=msg) print " [x] Sent", msg connection.close()
#!/usr/bin/env python # -*- coding: utf-8 -*- import pika def callback(ch, method, properties, body): print " [x] Received %r" % (body,) connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') print ' [*] Waiting for messages. To exit press CTRL+C' channel.basic_consume(callback, queue='hello', no_ack=True) channel.start_consuming()
no_ack means no acknowledgement messages.prefetch_count=1 into the worker channel.no_ack parameter.#!/usr/bin/env python # -*- coding: utf-8 -*- import pika import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep(body.count('.')) print " [x] Done" ch.basic_ack(delivery_tag=method.delivery_tag) connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
#!/usr/bin/env python # -*- coding: utf-8 -*- import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties(delivery_mode=2,)) print " [x] Sent %r" % (message,) connection.close()
#!/usr/bin/env python # -*- coding: utf-8 -*- import pika def callback(ch, method, properties, body): print " [x] %r" % (body,) connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print ' [*] Waiting for logs. To exit press CTRL+C' channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
#!/usr/bin/env python # -*- coding: utf-8 -*- import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print " [x] Sent %r" % (message,) connection.close()
$ rabbitmq-plugins enable rabbitmq_management
Will enable the web management server. You'll find at: http://server:15672 (p.eg. http://127.0.0.1:15672/). Username guest and password guest is the authentication by default.
Para añadir usuarios:
$ rabbitmqctl add_user yourName yourPass
$ rabbitmqctl set_permissions -p / yourName ".*" ".*" ".*"
$ rabbitmqctl set_user_tags yourName administrator
… Queues, config, everything!
$ rabbitmqctl stop_app $ rabbitmqctl reset $ rabbitmqctl start_app