¡Esta es una revisión vieja del documento!
Implementar una capa de mensajes con ZeroMQ:
tcp://hostname:portinproc://nameipc:///tmp/filenamepgm://interface:address:port y epgm://interface:address:portPasos para desarrollar:
bind del socket. Si empezarán enviando (elemento dinámico) al connect.
El contexto es el contenedor de los sockets para un proceso. Para inicializar el contexto se usa zmq_ctx_new() y para eliminarlo zmq_ctx_destroy().
Propiedades de la librería:
Notas:
* como dirección nos estamos refiriendo a que tome todas las interfaces. Por ejemplo: http://*:5555
Esta arquitectura consiste en que el cliente envía un mensaje con zmq_send() y luego recibe con zmq_recv(), el servidor lo hace en el orden contrario. Estas acciones se lleban a cabo en loop y sin poder cambiarse de orden.
Servidor:
import zmq import time context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") while True: message = socket.recv() # Do some 'work' socket.send("World")
Cliente:
import zmq context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect ("tcp://localhost:5555") for request in range (10): socket.send ("Hello") # Get the reply. message = socket.recv() print "Received reply ", request, "[", message, "]"
Es unidireccional. En este patrón un servidor publica sin importar quien recibe. Enviará a todos los clientes y serán estos quienes filtren.
Por ejemplo el siguiente código envía eventos de los distintos participantes…
import zmq from random import choice context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://127.0.0.1:5000") countries = ['netherlands','brazil','germany','portugal'] events = ['yellow card', 'red card', 'goal', 'corner', 'foul'] while True: msg = choice( countries ) +" "+ choice( events ) print "->",msg socket.send( msg )
… Y el siguiente recibe de Netherlands y Germany.
import zmq context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect("tcp://127.0.0.1:5000") socket.setsockopt(zmq.SUBSCRIBE, "netherlands") socket.setsockopt(zmq.SUBSCRIBE, "germany") while True: print socket.recv()
Notas:
Funciona como el Rep-Req sólo que en este no hay una respuesta desde el otro nodo. Es perfecto para los mensajes en paralelo. Imaginemos la siguiente estructura:
En esta…
Ventilador:
import zmq import random import time context = zmq.Context() sender = context.socket(zmq.PUSH) sender.bind("tcp://*:5557") sink = context.socket(zmq.PUSH) sink.connect("tcp://localhost:5558") # The first message is "0" and signals start of batch sink.send('0') # Send 100 tasks total_msec = 0 for task_nbr in range(100): workload = random.randint(1, 100) total_msec += workload sender.send(str(workload))
Worker:
import sys import time import zmq context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.connect("tcp://localhost:5557") sender = context.socket(zmq.PUSH) sender.connect("tcp://localhost:5558") while True: s = receiver.recv() sys.stdout.write('.') time.sleep(int(s)*0.001) sender.send('')
Sink:
import sys import time import zmq context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.bind("tcp://*:5558") # Wait for start of batch s = receiver.recv() tstart = time.time() # Process 100 confirmations total_msec = 0 for task_nbr in range(100): s = receiver.recv() if task_nbr % 10 == 0: sys.stdout.write(':') else: sys.stdout.write('.') tend = time.time() print "Total elapsed time: %d msec" % ((tend-tstart)*1000)
Estos son una comunicación uno a uno:
import zmq context = zmq.Context() socket = context.socket(zmq.PAIR) socket.bind("tcp://127.0.0.1:5555")
import zmq context = zmq.Context() socket = context.socket(zmq.PAIR) socket.connect("tcp://127.0.0.1:5555")
Podemos configurar un nodo como device. Esto permitirá mejorar el rendimiento del sistema.
Se usa para mediar entre un REQ/REP:
Para poder llevarlo a cabo se utilizan dos nuevos tipos de sockets zmq.ROUTER y zmq.DEALER en el nodo queue. El socket ROUTER recibe un mensaje de una conexión REQ y añade un ID al inicio de dicho paquete, luego es enviado por el DEALER. El DEALER escoge una conexión REP y lo envía, cuando este es procesado es devuelto otra al ROUTER y enviado al socket REQ que inició la conexión. Todo esto es posible (y rompe el patrón REQ\REP) mediante el ID que el ROUTER inserta.
REQ:
import sys import zmq context = zmq.Context() for x in xrange(10): sock = context.socket(zmq.REQ) sock.connect(sys.argv[1]) print 'REQ is', x, sock.send(str(x)) print 'REP is', sock.recv()
REP:
import sys import zmq context = zmq.Context() while True: sock = context.socket(zmq.REP) sock.connect(sys.argv[1]) x = sock.recv() print 'REQ is', x, reply = 'x-%s' % x sock.send(reply) print 'REP is', reply
QUEUE:
import sys import zmq context = zmq.Context() s1 = context.socket(zmq.ROUTER) s2 = context.socket(zmq.DEALER) s1.bind(sys.argv[1]) s2.bind(sys.argv[2]) zmq.device(zmq.QUEUE, s1, s2)
Cuando enviamos un mensaje y no hay receptor, lo que hace ZeroMQ es guardarlo en memoria hasta que el receptor esté activo.
Para evitar problemas de memoria ZeroMQ provee de la opción de socket high water mark (zmq.HWM). Si la añadimos a setsockopt podremos indicar cuantos mensajes quedarán guardados en memoria.
Con la opción zmq.SWAP podemos indicar un tamaño de disco donde guardar más mensajes una vez el HWM sea superado.
sock = context.socket(zmq.PUSH) sock.setsockopt(zmq.HWM, 1000) sock.setsockopt(zmq.SWAP, 200*2**10) sock.connect(sys.argv[1]) while True: sock.send(sys.argv[1] + ':' + time.ctime())
zmq.zmq_version().sock.send(part1, zmq.SNDMORE) sock.send(part2, zmq.SNDMORE) sock.send(part3, zmq.SNDMORE) sock.send(final)
more = True parts = [] while more: parts.append(sock.recv()) more = sock.getsockopt(zmq.RCVMORE)
Productor:
import zmq import random import time context = zmq.Context() sender = context.socket(zmq.PUSH) sender.bind("tcp://*:5557") sender.bind("tcp://*:5558") print "Sending tasks to workers" random.seed() total_msec = 0 for task_nbr in range(100): workload = random.randint(1, 100) total_msec += workload sender.send(str(workload)) print "Total expected cost: %s msec" % total_msec time.sleep(1)
Consumidor:
import sys import time import zmq context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.connect("tcp://localhost:5557") while True: s = receiver.recv() sys.stdout.write('.'+s) sys.stdout.flush() # Do the work time.sleep(int(s)*0.001)