Herramientas de usuario

Herramientas del sitio


fw:zeromq

Diferencias

Muestra las diferencias entre dos versiones de la página.

Enlace a la vista de comparación

Ambos lados, revisión anterior Revisión previa
Próxima revisión
Revisión previa
fw:zeromq [2013/07/02 13:50]
alfred [Control de flujo]
fw:zeromq [2020/05/09 09:25] (actual)
Línea 2: Línea 2:
   * [[http://​www.zeromq.org/​|Página web del proyecto]]   * [[http://​www.zeromq.org/​|Página web del proyecto]]
   * [[http://​zguide.zeromq.org/​page:​all|Guía del framework]]   * [[http://​zguide.zeromq.org/​page:​all|Guía del framework]]
 +  * [[http://​zeromq.github.io/​pyzmq/​api/​index.html|Documentación pyzmq]]
 ===== Uso de la librería ===== ===== Uso de la librería =====
  
Línea 97: Línea 98:
   * Si un publisher no tiene subscribers los mensajes quedarán desechados.   * Si un publisher no tiene subscribers los mensajes quedarán desechados.
   * TCP con esta estructura es lento.   * TCP con esta estructura es lento.
 +  * Para realizar una subscripción a todos los mensajes haremos: 
 +<code python>​ 
 +socket.setsockopt(zmq.SUBSCRIBE,​ ""​) 
 +</​code>​
 ==== Pipeline ==== ==== Pipeline ====
 Funciona como el Rep-Req sólo que en este no hay una respuesta desde el otro nodo. Es perfecto para los mensajes en paralelo. Imaginemos la siguiente estructura: \\  Funciona como el Rep-Req sólo que en este no hay una respuesta desde el otro nodo. Es perfecto para los mensajes en paralelo. Imaginemos la siguiente estructura: \\ 
Línea 190: Línea 194:
 </​code>​ </​code>​
 ==== Devices ==== ==== Devices ====
-Podemos configurar un nodo como device. Esto permitirá mejorar el rendimiento del sistema. ​Por ejemplo una queue para un Request-Reply+Podemos configurar un nodo como device. Esto permitirá mejorar el rendimiento del sistema. ​ 
-{{:​fw:​zeromq:​queue.png?​direct&​200|}}+=== Queue === 
 +Se usa para mediar entre un REQ/REP\\  
 +{{:​fw:​zeromq:​queue.png?​direct&​200|}} ​\\  
 +Para poder llevarlo a cabo se utilizan dos nuevos tipos de sockets ''​zmq.ROUTER''​ y ''​zmq.DEALER''​ en el nodo queue. El socket ROUTER recibe un mensaje de una conexión REQ y añade un ID al inicio de dicho paquete, luego es enviado por el DEALER. El DEALER escoge una conexión REP y lo envía, cuando este es procesado es devuelto otra al ROUTER y enviado al socket REQ que inició la conexión. Todo esto es posible (y rompe el patrón REQ\REP) mediante el ID que el ROUTER inserta. \\  
 +REQ: 
 +<code python>​ 
 +import sys 
 +import zmq 
 +context = zmq.Context() 
 +for x in xrange(10):​ 
 +    sock = context.socket(zmq.REQ) 
 +    sock.connect(sys.argv[1]) 
 +    print 'REQ is', x, 
 +    sock.send(str(x)) 
 +    print 'REP is', sock.recv() 
 +</​code>​ 
 +REP: 
 +<code python>​ 
 +import sys 
 +import zmq 
 +context = zmq.Context() 
 +while True: 
 +    sock = context.socket(zmq.REP) 
 +    sock.connect(sys.argv[1]) 
 +    x = sock.recv() 
 +    print 'REQ is', x, 
 +    reply = '​x-%s'​ % x 
 +    sock.send(reply) 
 +    print 'REP is', reply 
 +</​code>​ 
 +QUEUE: 
 +<code python>​ 
 +import sys 
 +import zmq 
 +context = zmq.Context() 
 +s1 = context.socket(zmq.ROUTER) 
 +s2 = context.socket(zmq.DEALER) 
 +s1.bind(sys.argv[1]) 
 +s2.bind(sys.argv[2]) 
 +zmq.device(zmq.QUEUE,​ s1, s2) 
 +</​code>​ 
 + 
 + 
 +=== Forwarding === 
 +Media entre un grupo de nodos push\sub. Como este conjunto no se bloquea no es necesario usar un tipo de socket especial: 
 +<code python>​ 
 +import sys 
 +import zmq 
 +context = zmq.Context() 
 +s1 = context.socket(zmq.SUB) 
 +s2 = context.socket(zmq.PUB) 
 +s1.bind(sys.argv[1]) 
 +s2.bind(sys.argv[2]) 
 +s1.setsockopt(zmq.SUBSCRIBE,​ ''​) 
 +zmq.device(zmq.FORWARDER,​ s1, s2) 
 +</​code>​ 
 +Ahora podemos conectarle tantos publishers y subscribers como queramos. 
 + 
 +=== Streaming === 
 +En este caso es útil para los nodos PUSH\PULL. 
 +<code python>​ 
 +context = zmq.Context() 
 +s1 = context.socket(zmq.PULL) 
 +s2 = context.socket(zmq.PUSH) 
 +s1.bind("​tcp://​*:​5550"​) 
 +s2.bind("​tcp://​*:​5551"​) 
 +zmq.device(zmq.STREAMER,​ s1, s2) 
 +</​code>​ 
 +<code python>​ 
 +import zmq 
 +import random 
 +import time 
 +context = zmq.Context() 
 +  
 +sender = context.socket(zmq.PUSH) 
 +sender.connect("​tcp://​localhost:​5550"​) 
 + 
 +total_msec = 0 
 +for task_nbr in range(100):​ 
 +    workload = random.randint(1,​ 100) 
 +    sender.send(str(workload)) 
 +</​code>​ 
 + 
 +<code python>​ 
 +import sys 
 +import time 
 +import zmq 
 +  
 +context = zmq.Context() 
 +  
 +receiver = context.socket(zmq.PULL) 
 +receiver.connect("​tcp://​localhost:​5551"​) 
 +  
 +while True: 
 +    s = receiver.recv() 
 +    sys.stdout.write('​.'​) 
 +    sys.stdout.flush() 
 +    time.sleep(int(s)*0.001) 
 +</​code>​ 
 +=== Crear tu propio Streamer & Forwarder === 
 +Como los dos son unidireccionales,​ lo único que tendremos que hacer es copiar los datos de un socket al otro: 
 +<code python>​ 
 +import sys 
 +import zmq 
 + 
 +def zmq_streaming_device(a,​ b): 
 +    while True: 
 +        msg = a.recv() 
 +        more = a.getsockopt(zmq.RCVMORE) 
 +        if more: 
 +            b.send(msg, zmq.SNDMORE) 
 +        else: 
 +            b.send(msg) 
 +        sys.stdout.write('​.'​) 
 +        sys.stdout.flush() 
 + 
 +context = zmq.Context() 
 +s1 = context.socket(zmq.PULL) 
 +s2 = context.socket(zmq.PUSH) 
 +s1.bind("​tcp://​*:​5550"​) 
 +s2.bind("​tcp://​*:​5551"​) 
 +zmq_streaming_device(s1,​ s2) 
 +</​code>​ 
 +===== Patrones Request/​Reply ===== 
 +===== Patrones Publish/​Subscribe =====
 ===== Avanzado ===== ===== Avanzado =====
 ==== Control de flujo ==== ==== Control de flujo ====
 Cuando enviamos un mensaje y no hay receptor, lo que hace ZeroMQ es guardarlo en memoria hasta que el receptor esté activo. \\  Cuando enviamos un mensaje y no hay receptor, lo que hace ZeroMQ es guardarlo en memoria hasta que el receptor esté activo. \\ 
-Para evitar problemas de memoria ZeroMQ provee de la opción de socket high water mark (''​zmq.HWM''​). Si +Para evitar problemas de memoria ZeroMQ provee de la opción de socket high water mark (''​zmq.HWM''​). Si la añadimos a ''​setsockopt''​ podremos indicar cuantos mensajes quedarán guardados en memoria. \\  
 +Con la opción ''​zmq.SWAP''​ podemos indicar un tamaño de disco donde guardar más mensajes una vez el HWM sea superado.
 <code python> <code python>
 sock = context.socket(zmq.PUSH) sock = context.socket(zmq.PUSH)
Línea 206: Línea 335:
 ===== Notas ===== ===== Notas =====
   * Saber la versión con ''​zmq.zmq_version()''​.   * Saber la versión con ''​zmq.zmq_version()''​.
 +  * Para realizar conexiones no bloqueantes los usaremos con [[fw:​gevent&#​zeromq|greenlets]].
 +  * Para instalar la librería en el sistema: ''​apt-get install libzmq-dev''​.
  
 +==== Instalar en Ubuntu ====
 +Instalación de la librería del sistema:
 +<​code>​
 +$ sudo apt-get install python-dev libzmq-dev
 +</​code>​
 +Instalación en Python:
 +<​code>​
 +$ sudo pip install pyzmq
 +</​code>​
 +Recuerda que para utilizar ''​pip''​ has de tener instalado el paquete ''​python-pip''​
 +==== Instalar en Windows ====
 +Necesitarás agregar el paquete ''​VS2010 C++ redistributable''​.
 ==== Como... ==== ==== Como... ====
 === Enviar\recibir varios paquetes === === Enviar\recibir varios paquetes ===
Línea 263: Línea 406:
     # Do the work     # Do the work
     time.sleep(int(s)*0.001)     time.sleep(int(s)*0.001)
 +</​code>​
 +
 +=== Enviar no bloqueante ===
 +  * Pero cuidado, el objeto sender no se elminará hasta que los datos hayan sido enviados.
 +<code python>
 +sender.send(jdata,​ zmq.NOBLOCK)
 </​code>​ </​code>​
fw/zeromq.1372773017.txt.gz · Última modificación: 2020/05/09 09:24 (editor externo)