Herramientas de usuario

Herramientas del sitio


fw:zeromq

Diferencias

Muestra las diferencias entre dos versiones de la página.

Enlace a la vista de comparación

Ambos lados, revisión anterior Revisión previa
Próxima revisión
Revisión previa
fw:zeromq [2013/07/02 14:29]
alfred [Control de flujo]
fw:zeromq [2020/05/09 09:25] (actual)
Línea 2: Línea 2:
   * [[http://​www.zeromq.org/​|Página web del proyecto]]   * [[http://​www.zeromq.org/​|Página web del proyecto]]
   * [[http://​zguide.zeromq.org/​page:​all|Guía del framework]]   * [[http://​zguide.zeromq.org/​page:​all|Guía del framework]]
 +  * [[http://​zeromq.github.io/​pyzmq/​api/​index.html|Documentación pyzmq]]
 ===== Uso de la librería ===== ===== Uso de la librería =====
  
Línea 97: Línea 98:
   * Si un publisher no tiene subscribers los mensajes quedarán desechados.   * Si un publisher no tiene subscribers los mensajes quedarán desechados.
   * TCP con esta estructura es lento.   * TCP con esta estructura es lento.
 +  * Para realizar una subscripción a todos los mensajes haremos: 
 +<code python>​ 
 +socket.setsockopt(zmq.SUBSCRIBE,​ ""​) 
 +</​code>​
 ==== Pipeline ==== ==== Pipeline ====
 Funciona como el Rep-Req sólo que en este no hay una respuesta desde el otro nodo. Es perfecto para los mensajes en paralelo. Imaginemos la siguiente estructura: \\  Funciona como el Rep-Req sólo que en este no hay una respuesta desde el otro nodo. Es perfecto para los mensajes en paralelo. Imaginemos la siguiente estructura: \\ 
Línea 194: Línea 198:
 Se usa para mediar entre un REQ/REP: \\  Se usa para mediar entre un REQ/REP: \\ 
 {{:​fw:​zeromq:​queue.png?​direct&​200|}} \\  {{:​fw:​zeromq:​queue.png?​direct&​200|}} \\ 
-Para poder llevarlo a cabo se utilizan dos nuevos tipos de sockets ''​zmq.ROUTER''​ y ''​zmq.DEALER''​ en el nodo queue. El socket ROUTER recibe un mensaje de una conexión REQ y añade un ID al inicio de dicho paquete, luego es enviado por el DEALER. El DEALER escoge una conexión REP y lo envía, cuando este es procesado es devuelto otra al ROUTER y enviado al socket REQ que inició la conexión. Todo esto es posible (y rompe el patrón REQ\REP) mediante el ID que el ROUTER inserta.+Para poder llevarlo a cabo se utilizan dos nuevos tipos de sockets ''​zmq.ROUTER''​ y ''​zmq.DEALER''​ en el nodo queue. El socket ROUTER recibe un mensaje de una conexión REQ y añade un ID al inicio de dicho paquete, luego es enviado por el DEALER. El DEALER escoge una conexión REP y lo envía, cuando este es procesado es devuelto otra al ROUTER y enviado al socket REQ que inició la conexión. Todo esto es posible (y rompe el patrón REQ\REP) mediante el ID que el ROUTER inserta. ​\\ 
 REQ: REQ:
 <code python> <code python>
Línea 232: Línea 236:
 zmq.device(zmq.QUEUE,​ s1, s2) zmq.device(zmq.QUEUE,​ s1, s2)
 </​code>​ </​code>​
-===== Avanzado ===== 
  
-===== Patrones ===== 
-==== Patrones Request/​Reply ==== 
-==== Patrones Publish/​Subscribe ==== 
  
 +=== Forwarding ===
 +Media entre un grupo de nodos push\sub. Como este conjunto no se bloquea no es necesario usar un tipo de socket especial:
 +<code python>
 +import sys
 +import zmq
 +context = zmq.Context()
 +s1 = context.socket(zmq.SUB)
 +s2 = context.socket(zmq.PUB)
 +s1.bind(sys.argv[1])
 +s2.bind(sys.argv[2])
 +s1.setsockopt(zmq.SUBSCRIBE,​ ''​)
 +zmq.device(zmq.FORWARDER,​ s1, s2)
 +</​code>​
 +Ahora podemos conectarle tantos publishers y subscribers como queramos.
 +
 +=== Streaming ===
 +En este caso es útil para los nodos PUSH\PULL.
 +<code python>
 +context = zmq.Context()
 +s1 = context.socket(zmq.PULL)
 +s2 = context.socket(zmq.PUSH)
 +s1.bind("​tcp://​*:​5550"​)
 +s2.bind("​tcp://​*:​5551"​)
 +zmq.device(zmq.STREAMER,​ s1, s2)
 +</​code>​
 +<code python>
 +import zmq
 +import random
 +import time
 +context = zmq.Context()
 + 
 +sender = context.socket(zmq.PUSH)
 +sender.connect("​tcp://​localhost:​5550"​)
 +
 +total_msec = 0
 +for task_nbr in range(100):
 +    workload = random.randint(1,​ 100)
 +    sender.send(str(workload))
 +</​code>​
 +
 +<code python>
 +import sys
 +import time
 +import zmq
 + 
 +context = zmq.Context()
 + 
 +receiver = context.socket(zmq.PULL)
 +receiver.connect("​tcp://​localhost:​5551"​)
 + 
 +while True:
 +    s = receiver.recv()
 +    sys.stdout.write('​.'​)
 +    sys.stdout.flush()
 +    time.sleep(int(s)*0.001)
 +</​code>​
 +=== Crear tu propio Streamer & Forwarder ===
 +Como los dos son unidireccionales,​ lo único que tendremos que hacer es copiar los datos de un socket al otro:
 +<code python>
 +import sys
 +import zmq
 +
 +def zmq_streaming_device(a,​ b):
 +    while True:
 +        msg = a.recv()
 +        more = a.getsockopt(zmq.RCVMORE)
 +        if more:
 +            b.send(msg, zmq.SNDMORE)
 +        else:
 +            b.send(msg)
 +        sys.stdout.write('​.'​)
 +        sys.stdout.flush()
 +
 +context = zmq.Context()
 +s1 = context.socket(zmq.PULL)
 +s2 = context.socket(zmq.PUSH)
 +s1.bind("​tcp://​*:​5550"​)
 +s2.bind("​tcp://​*:​5551"​)
 +zmq_streaming_device(s1,​ s2)
 +</​code>​
 +===== Patrones Request/​Reply =====
 +===== Patrones Publish/​Subscribe =====
 +===== Avanzado =====
 +==== Control de flujo ====
 +Cuando enviamos un mensaje y no hay receptor, lo que hace ZeroMQ es guardarlo en memoria hasta que el receptor esté activo. \\ 
 +Para evitar problemas de memoria ZeroMQ provee de la opción de socket high water mark (''​zmq.HWM''​). Si la añadimos a ''​setsockopt''​ podremos indicar cuantos mensajes quedarán guardados en memoria. \\ 
 +Con la opción ''​zmq.SWAP''​ podemos indicar un tamaño de disco donde guardar más mensajes una vez el HWM sea superado.
 +<code python>
 +sock = context.socket(zmq.PUSH)
 +sock.setsockopt(zmq.HWM,​ 1000)
 +sock.setsockopt(zmq.SWAP,​ 200*2**10)
 +sock.connect(sys.argv[1])
 +while True:
 +    sock.send(sys.argv[1] + ':'​ + time.ctime())
 +</​code>​
 ===== Notas ===== ===== Notas =====
   * Saber la versión con ''​zmq.zmq_version()''​.   * Saber la versión con ''​zmq.zmq_version()''​.
 +  * Para realizar conexiones no bloqueantes los usaremos con [[fw:​gevent&#​zeromq|greenlets]].
 +  * Para instalar la librería en el sistema: ''​apt-get install libzmq-dev''​.
  
 +==== Instalar en Ubuntu ====
 +Instalación de la librería del sistema:
 +<​code>​
 +$ sudo apt-get install python-dev libzmq-dev
 +</​code>​
 +Instalación en Python:
 +<​code>​
 +$ sudo pip install pyzmq
 +</​code>​
 +Recuerda que para utilizar ''​pip''​ has de tener instalado el paquete ''​python-pip''​
 +==== Instalar en Windows ====
 +Necesitarás agregar el paquete ''​VS2010 C++ redistributable''​.
 ==== Como... ==== ==== Como... ====
 === Enviar\recibir varios paquetes === === Enviar\recibir varios paquetes ===
Línea 297: Línea 406:
     # Do the work     # Do the work
     time.sleep(int(s)*0.001)     time.sleep(int(s)*0.001)
 +</​code>​
 +
 +=== Enviar no bloqueante ===
 +  * Pero cuidado, el objeto sender no se elminará hasta que los datos hayan sido enviados.
 +<code python>
 +sender.send(jdata,​ zmq.NOBLOCK)
 </​code>​ </​code>​
fw/zeromq.1372775377.txt.gz · Última modificación: 2020/05/09 09:24 (editor externo)