¡Esta es una revisión vieja del documento!
Implementar una capa de mensajes con ZeroMQ:
tcp://hostname:portinproc://nameipc:///tmp/filenamepgm://interface:address:port y epgm://interface:address:portPasos para desarrollar:
bind del socket. Si empezarán enviando (elemento dinámico) al connect.
El contexto es el contenedor de los sockets para un proceso. Para inicializar el contexto se usa zmq_ctx_new() y para eliminarlo zmq_ctx_destroy().
Propiedades de la librería:
Notas:
* como dirección nos estamos refiriendo a que tome todas las interfaces. Por ejemplo: http://*:5555
Esta arquitectura consiste en que el cliente envía un mensaje con zmq_send() y luego recibe con zmq_recv(), el servidor lo hace en el orden contrario. Estas acciones se lleban a cabo en loop y sin poder cambiarse de orden.
Servidor:
import zmq import time context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") while True: message = socket.recv() # Do some 'work' socket.send("World")
Cliente:
import zmq context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect ("tcp://localhost:5555") for request in range (10): socket.send ("Hello") # Get the reply. message = socket.recv() print "Received reply ", request, "[", message, "]"
Es unidireccional. En este patrón un servidor publica sin importar quien recibe. Enviará a todos los clientes y serán estos quienes filtren.
Por ejemplo el siguiente código envía eventos de los distintos participantes…
import zmq from random import choice context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://127.0.0.1:5000") countries = ['netherlands','brazil','germany','portugal'] events = ['yellow card', 'red card', 'goal', 'corner', 'foul'] while True: msg = choice( countries ) +" "+ choice( events ) print "->",msg socket.send( msg )
… Y el siguiente recibe de Netherlands y Germany.
import zmq context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect("tcp://127.0.0.1:5000") socket.setsockopt(zmq.SUBSCRIBE, "netherlands") socket.setsockopt(zmq.SUBSCRIBE, "germany") while True: print socket.recv()
Notas:
Funciona como el Rep-Req sólo que en este no hay una respuesta desde el otro nodo. Es perfecto para los mensajes en paralelo. Imaginemos la siguiente estructura:
En esta…
Ventilador:
import zmq import random import time context = zmq.Context() sender = context.socket(zmq.PUSH) sender.bind("tcp://*:5557") sink = context.socket(zmq.PUSH) sink.connect("tcp://localhost:5558") # The first message is "0" and signals start of batch sink.send('0') # Send 100 tasks total_msec = 0 for task_nbr in range(100): workload = random.randint(1, 100) total_msec += workload sender.send(str(workload))
Worker:
import sys import time import zmq context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.connect("tcp://localhost:5557") sender = context.socket(zmq.PUSH) sender.connect("tcp://localhost:5558") while True: s = receiver.recv() sys.stdout.write('.') time.sleep(int(s)*0.001) sender.send('')
Sink:
import sys import time import zmq context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.bind("tcp://*:5558") # Wait for start of batch s = receiver.recv() tstart = time.time() # Process 100 confirmations total_msec = 0 for task_nbr in range(100): s = receiver.recv() if task_nbr % 10 == 0: sys.stdout.write(':') else: sys.stdout.write('.') tend = time.time() print "Total elapsed time: %d msec" % ((tend-tstart)*1000)
Estos son una comunicación uno a uno:
import zmq context = zmq.Context() socket = context.socket(zmq.PAIR) socket.bind("tcp://127.0.0.1:5555")
import zmq context = zmq.Context() socket = context.socket(zmq.PAIR) socket.connect("tcp://127.0.0.1:5555")
Podemos configurar un nodo como device. Esto permitirá mejorar el rendimiento del sistema.
Se usa para mediar entre un REQ/REP:
Para poder llevarlo a cabo se utilizan dos nuevos tipos de sockets zmq.ROUTER y zmq.DEALER en el nodo queue. El socket ROUTER recibe un mensaje de una conexión REQ y añade un ID al inicio de dicho paquete, luego es enviado por el DEALER. El DEALER escoge una conexión REP y lo envía, cuando este es procesado es devuelto otra al ROUTER y enviado al socket REQ que inició la conexión. Todo esto es posible (y rompe el patrón REQ\REP) mediante el ID que el ROUTER inserta.
REQ:
import sys import zmq context = zmq.Context() for x in xrange(10): sock = context.socket(zmq.REQ) sock.connect(sys.argv[1]) print 'REQ is', x, sock.send(str(x)) print 'REP is', sock.recv()
REP:
import sys import zmq context = zmq.Context() while True: sock = context.socket(zmq.REP) sock.connect(sys.argv[1]) x = sock.recv() print 'REQ is', x, reply = 'x-%s' % x sock.send(reply) print 'REP is', reply
QUEUE:
import sys import zmq context = zmq.Context() s1 = context.socket(zmq.ROUTER) s2 = context.socket(zmq.DEALER) s1.bind(sys.argv[1]) s2.bind(sys.argv[2]) zmq.device(zmq.QUEUE, s1, s2)
zmq.zmq_version().sock.send(part1, zmq.SNDMORE) sock.send(part2, zmq.SNDMORE) sock.send(part3, zmq.SNDMORE) sock.send(final)
more = True parts = [] while more: parts.append(sock.recv()) more = sock.getsockopt(zmq.RCVMORE)
Productor:
import zmq import random import time context = zmq.Context() sender = context.socket(zmq.PUSH) sender.bind("tcp://*:5557") sender.bind("tcp://*:5558") print "Sending tasks to workers" random.seed() total_msec = 0 for task_nbr in range(100): workload = random.randint(1, 100) total_msec += workload sender.send(str(workload)) print "Total expected cost: %s msec" % total_msec time.sleep(1)
Consumidor:
import sys import time import zmq context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.connect("tcp://localhost:5557") while True: s = receiver.recv() sys.stdout.write('.'+s) sys.stdout.flush() # Do the work time.sleep(int(s)*0.001)