Herramientas de usuario

Herramientas del sitio


comb:rabbitmq

¡Esta es una revisión vieja del documento!


RabbitMQ

Configuration

Install

$ echo "deb http://www.rabbitmq.com/debian/ testing main" >> /etc/apt/sources.list
$ wget http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
$ sudo apt-key add rabbitmq-signing-key-public.asc
$ sudo apt-get install rabbitmq-server

Commands

Manage the server

$ sudo service rabbitmq-server status

Where status can be start, stop, restart

See number of messages

$ sudo rabbitmqctl list_queues
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Listing exchanges

$ sudo rabbitmqctl list_exchanges

List bindings

$ sudo rabbitmqctl list_bindings

Exchanges

A producer can only send messages to an exchange. It receives messages from producers and it pushes them to queues.

  • direct
  • topic
  • headers
  • fanout: it broadcasts all the messages it receives to all the queues it knows.

Some amq.* exchanges and the default (unnamed) exchange are created by default, but it is unlikely you'll need to use them at the moment. We identify the default exchange by the empty string.

Use

Producer & consumer

Producer

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
for i in xrange(100):
    msg = 'Hello World! - ' + str(i)
    channel.basic_publish(exchange='', routing_key='hello', body=msg)
    print " [x] Sent", msg
connection.close()

Consumer

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import pika
 
 
def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'   
channel.basic_consume(callback, queue='hello', no_ack=True)
channel.start_consuming()
  • no_ack means no acknowledgement messages.
  • We can tell to Rabbit that do not deliver more than a message to a worker with prefetch_count=1 into the worker channel.

Message acknowledgment and persistent queues

  • Message acknoledgement is done after connection dies.
    • Message acknoledgement is active by default, it is defined by no_ack parameter.
  • Persistent messages and queues are those which remain in the computer even if the computer or rabbit crash.
    • They are activated marking the queue as durable and the delivery_mode property with a value 2.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import pika
import time
 
 
def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep(body.count('.'))
    print " [x] Done"
    ch.basic_ack(delivery_tag=method.delivery_tag)
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='task_queue')
channel.start_consuming()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(delivery_mode=2,))
print " [x] Sent %r" % (message,)
connection.close()

Publish\Subscriber

#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import pika
 
 
def callback(ch, method, properties, body):
    print " [x] %r" % (body,)
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print ' [*] Waiting for logs. To exit press CTRL+C'
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
 
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print " [x] Sent %r" % (message,)
connection.close()
comb/rabbitmq.1405419962.txt.gz · Última modificación: 2020/05/09 09:24 (editor externo)