Herramientas de usuario

Herramientas del sitio


fw:zeromq

Diferencias

Muestra las diferencias entre dos versiones de la página.

Enlace a la vista de comparación

Ambos lados, revisión anterior Revisión previa
Próxima revisión
Revisión previa
fw:zeromq [2013/07/02 14:29]
alfred [Patrones Publish/Subscribe]
fw:zeromq [2020/05/09 09:25] (actual)
Línea 2: Línea 2:
   * [[http://​www.zeromq.org/​|Página web del proyecto]]   * [[http://​www.zeromq.org/​|Página web del proyecto]]
   * [[http://​zguide.zeromq.org/​page:​all|Guía del framework]]   * [[http://​zguide.zeromq.org/​page:​all|Guía del framework]]
 +  * [[http://​zeromq.github.io/​pyzmq/​api/​index.html|Documentación pyzmq]]
 ===== Uso de la librería ===== ===== Uso de la librería =====
  
Línea 97: Línea 98:
   * Si un publisher no tiene subscribers los mensajes quedarán desechados.   * Si un publisher no tiene subscribers los mensajes quedarán desechados.
   * TCP con esta estructura es lento.   * TCP con esta estructura es lento.
 +  * Para realizar una subscripción a todos los mensajes haremos: 
 +<code python>​ 
 +socket.setsockopt(zmq.SUBSCRIBE,​ ""​) 
 +</​code>​
 ==== Pipeline ==== ==== Pipeline ====
 Funciona como el Rep-Req sólo que en este no hay una respuesta desde el otro nodo. Es perfecto para los mensajes en paralelo. Imaginemos la siguiente estructura: \\  Funciona como el Rep-Req sólo que en este no hay una respuesta desde el otro nodo. Es perfecto para los mensajes en paralelo. Imaginemos la siguiente estructura: \\ 
Línea 194: Línea 198:
 Se usa para mediar entre un REQ/REP: \\  Se usa para mediar entre un REQ/REP: \\ 
 {{:​fw:​zeromq:​queue.png?​direct&​200|}} \\  {{:​fw:​zeromq:​queue.png?​direct&​200|}} \\ 
-Para poder llevarlo a cabo se utilizan dos nuevos tipos de sockets ''​zmq.ROUTER''​ y ''​zmq.DEALER''​ en el nodo queue. El socket ROUTER recibe un mensaje de una conexión REQ y añade un ID al inicio de dicho paquete, luego es enviado por el DEALER. El DEALER escoge una conexión REP y lo envía, cuando este es procesado es devuelto otra al ROUTER y enviado al socket REQ que inició la conexión. Todo esto es posible (y rompe el patrón REQ\REP) mediante el ID que el ROUTER inserta.+Para poder llevarlo a cabo se utilizan dos nuevos tipos de sockets ''​zmq.ROUTER''​ y ''​zmq.DEALER''​ en el nodo queue. El socket ROUTER recibe un mensaje de una conexión REQ y añade un ID al inicio de dicho paquete, luego es enviado por el DEALER. El DEALER escoge una conexión REP y lo envía, cuando este es procesado es devuelto otra al ROUTER y enviado al socket REQ que inició la conexión. Todo esto es posible (y rompe el patrón REQ\REP) mediante el ID que el ROUTER inserta. ​\\ 
 REQ: REQ:
 <code python> <code python>
Línea 232: Línea 236:
 zmq.device(zmq.QUEUE,​ s1, s2) zmq.device(zmq.QUEUE,​ s1, s2)
 </​code>​ </​code>​
-===== Avanzado ===== 
  
-===== Patrones ​===== + 
-==== Patrones Request/​Reply ==== +=== Forwarding ​=== 
-==== Patrones Publish/​Subscribe ====+Media entre un grupo de nodos push\sub. Como este conjunto no se bloquea no es necesario usar un tipo de socket especial: 
 +<code python>​ 
 +import sys 
 +import zmq 
 +context ​zmq.Context() 
 +s1 context.socket(zmq.SUB) 
 +s2 context.socket(zmq.PUB) 
 +s1.bind(sys.argv[1]) 
 +s2.bind(sys.argv[2]) 
 +s1.setsockopt(zmq.SUBSCRIBE,​ ''​) 
 +zmq.device(zmq.FORWARDER,​ s1, s2) 
 +</​code>​ 
 +Ahora podemos conectarle tantos publishers y subscribers como queramos. 
 + 
 +=== Streaming === 
 +En este caso es útil para los nodos PUSH\PULL. 
 +<code python> 
 +context = zmq.Context() 
 +s1 = context.socket(zmq.PULL) 
 +s2 = context.socket(zmq.PUSH) 
 +s1.bind("​tcp://​*:​5550"​) 
 +s2.bind("​tcp://​*:​5551"​) 
 +zmq.device(zmq.STREAMER,​ s1, s2) 
 +</​code>​ 
 +<code python>​ 
 +import zmq 
 +import random 
 +import time 
 +context = zmq.Context() 
 +  
 +sender = context.socket(zmq.PUSH) 
 +sender.connect("​tcp://​localhost:​5550"​) 
 + 
 +total_msec = 0 
 +for task_nbr in range(100):​ 
 +    workload = random.randint(1,​ 100) 
 +    sender.send(str(workload)) 
 +</​code>​ 
 + 
 +<code python>​ 
 +import sys 
 +import time 
 +import zmq 
 +  
 +context = zmq.Context() 
 +  
 +receiver = context.socket(zmq.PULL) 
 +receiver.connect("​tcp://​localhost:​5551"​) 
 +  
 +while True: 
 +    s = receiver.recv() 
 +    sys.stdout.write('​.'​) 
 +    sys.stdout.flush() 
 +    time.sleep(int(s)*0.001) 
 +</​code>​ 
 +=== Crear tu propio Streamer & Forwarder === 
 +Como los dos son unidireccionales,​ lo único que tendremos que hacer es copiar los datos de un socket al otro: 
 +<code python>​ 
 +import sys 
 +import zmq 
 + 
 +def zmq_streaming_device(a,​ b): 
 +    while True: 
 +        msg = a.recv() 
 +        more = a.getsockopt(zmq.RCVMORE) 
 +        if more: 
 +            b.send(msg, zmq.SNDMORE) 
 +        else: 
 +            b.send(msg) 
 +        sys.stdout.write('​.'​) 
 +        sys.stdout.flush() 
 + 
 +context = zmq.Context() 
 +s1 = context.socket(zmq.PULL) 
 +s2 = context.socket(zmq.PUSH) 
 +s1.bind("​tcp://​*:​5550"​) 
 +s2.bind("​tcp://​*:​5551"​) 
 +zmq_streaming_device(s1,​ s2) 
 +</​code>​ 
 +===== Patrones Request/​Reply ​===== 
 +===== Patrones Publish/​Subscribe ​=====
 ===== Avanzado ===== ===== Avanzado =====
 ==== Control de flujo ==== ==== Control de flujo ====
Línea 252: Línea 335:
 ===== Notas ===== ===== Notas =====
   * Saber la versión con ''​zmq.zmq_version()''​.   * Saber la versión con ''​zmq.zmq_version()''​.
 +  * Para realizar conexiones no bloqueantes los usaremos con [[fw:​gevent&#​zeromq|greenlets]].
 +  * Para instalar la librería en el sistema: ''​apt-get install libzmq-dev''​.
  
 +==== Instalar en Ubuntu ====
 +Instalación de la librería del sistema:
 +<​code>​
 +$ sudo apt-get install python-dev libzmq-dev
 +</​code>​
 +Instalación en Python:
 +<​code>​
 +$ sudo pip install pyzmq
 +</​code>​
 +Recuerda que para utilizar ''​pip''​ has de tener instalado el paquete ''​python-pip''​
 +==== Instalar en Windows ====
 +Necesitarás agregar el paquete ''​VS2010 C++ redistributable''​.
 ==== Como... ==== ==== Como... ====
 === Enviar\recibir varios paquetes === === Enviar\recibir varios paquetes ===
Línea 309: Línea 406:
     # Do the work     # Do the work
     time.sleep(int(s)*0.001)     time.sleep(int(s)*0.001)
 +</​code>​
 +
 +=== Enviar no bloqueante ===
 +  * Pero cuidado, el objeto sender no se elminará hasta que los datos hayan sido enviados.
 +<code python>
 +sender.send(jdata,​ zmq.NOBLOCK)
 </​code>​ </​code>​
fw/zeromq.1372775390.txt.gz · Última modificación: 2020/05/09 09:24 (editor externo)