Muestra las diferencias entre dos versiones de la página.
| Ambos lados, revisión anterior Revisión previa Próxima revisión | Revisión previa | ||
|
fw:zeromq [2013/07/02 14:29] alfred [Avanzado] |
fw:zeromq [2020/05/09 09:25] (actual) |
||
|---|---|---|---|
| Línea 2: | Línea 2: | ||
| * [[http://www.zeromq.org/|Página web del proyecto]] | * [[http://www.zeromq.org/|Página web del proyecto]] | ||
| * [[http://zguide.zeromq.org/page:all|Guía del framework]] | * [[http://zguide.zeromq.org/page:all|Guía del framework]] | ||
| + | * [[http://zeromq.github.io/pyzmq/api/index.html|Documentación pyzmq]] | ||
| ===== Uso de la librería ===== | ===== Uso de la librería ===== | ||
| Línea 97: | Línea 98: | ||
| * Si un publisher no tiene subscribers los mensajes quedarán desechados. | * Si un publisher no tiene subscribers los mensajes quedarán desechados. | ||
| * TCP con esta estructura es lento. | * TCP con esta estructura es lento. | ||
| + | * Para realizar una subscripción a todos los mensajes haremos: | ||
| + | <code python> | ||
| + | socket.setsockopt(zmq.SUBSCRIBE, "") | ||
| + | </code> | ||
| ==== Pipeline ==== | ==== Pipeline ==== | ||
| Funciona como el Rep-Req sólo que en este no hay una respuesta desde el otro nodo. Es perfecto para los mensajes en paralelo. Imaginemos la siguiente estructura: \\ | Funciona como el Rep-Req sólo que en este no hay una respuesta desde el otro nodo. Es perfecto para los mensajes en paralelo. Imaginemos la siguiente estructura: \\ | ||
| Línea 194: | Línea 198: | ||
| Se usa para mediar entre un REQ/REP: \\ | Se usa para mediar entre un REQ/REP: \\ | ||
| {{:fw:zeromq:queue.png?direct&200|}} \\ | {{:fw:zeromq:queue.png?direct&200|}} \\ | ||
| - | Para poder llevarlo a cabo se utilizan dos nuevos tipos de sockets ''zmq.ROUTER'' y ''zmq.DEALER'' en el nodo queue. El socket ROUTER recibe un mensaje de una conexión REQ y añade un ID al inicio de dicho paquete, luego es enviado por el DEALER. El DEALER escoge una conexión REP y lo envía, cuando este es procesado es devuelto otra al ROUTER y enviado al socket REQ que inició la conexión. Todo esto es posible (y rompe el patrón REQ\REP) mediante el ID que el ROUTER inserta. | + | Para poder llevarlo a cabo se utilizan dos nuevos tipos de sockets ''zmq.ROUTER'' y ''zmq.DEALER'' en el nodo queue. El socket ROUTER recibe un mensaje de una conexión REQ y añade un ID al inicio de dicho paquete, luego es enviado por el DEALER. El DEALER escoge una conexión REP y lo envía, cuando este es procesado es devuelto otra al ROUTER y enviado al socket REQ que inició la conexión. Todo esto es posible (y rompe el patrón REQ\REP) mediante el ID que el ROUTER inserta. \\ |
| REQ: | REQ: | ||
| <code python> | <code python> | ||
| Línea 233: | Línea 237: | ||
| </code> | </code> | ||
| - | ===== Patrones ===== | + | |
| - | ==== Patrones Request/Reply ==== | + | === Forwarding === |
| - | ==== Patrones Publish/Subscribe ==== | + | Media entre un grupo de nodos push\sub. Como este conjunto no se bloquea no es necesario usar un tipo de socket especial: |
| + | <code python> | ||
| + | import sys | ||
| + | import zmq | ||
| + | context = zmq.Context() | ||
| + | s1 = context.socket(zmq.SUB) | ||
| + | s2 = context.socket(zmq.PUB) | ||
| + | s1.bind(sys.argv[1]) | ||
| + | s2.bind(sys.argv[2]) | ||
| + | s1.setsockopt(zmq.SUBSCRIBE, '') | ||
| + | zmq.device(zmq.FORWARDER, s1, s2) | ||
| + | </code> | ||
| + | Ahora podemos conectarle tantos publishers y subscribers como queramos. | ||
| + | |||
| + | === Streaming === | ||
| + | En este caso es útil para los nodos PUSH\PULL. | ||
| + | <code python> | ||
| + | context = zmq.Context() | ||
| + | s1 = context.socket(zmq.PULL) | ||
| + | s2 = context.socket(zmq.PUSH) | ||
| + | s1.bind("tcp://*:5550") | ||
| + | s2.bind("tcp://*:5551") | ||
| + | zmq.device(zmq.STREAMER, s1, s2) | ||
| + | </code> | ||
| + | <code python> | ||
| + | import zmq | ||
| + | import random | ||
| + | import time | ||
| + | context = zmq.Context() | ||
| + | |||
| + | sender = context.socket(zmq.PUSH) | ||
| + | sender.connect("tcp://localhost:5550") | ||
| + | |||
| + | total_msec = 0 | ||
| + | for task_nbr in range(100): | ||
| + | workload = random.randint(1, 100) | ||
| + | sender.send(str(workload)) | ||
| + | </code> | ||
| + | |||
| + | <code python> | ||
| + | import sys | ||
| + | import time | ||
| + | import zmq | ||
| + | |||
| + | context = zmq.Context() | ||
| + | |||
| + | receiver = context.socket(zmq.PULL) | ||
| + | receiver.connect("tcp://localhost:5551") | ||
| + | |||
| + | while True: | ||
| + | s = receiver.recv() | ||
| + | sys.stdout.write('.') | ||
| + | sys.stdout.flush() | ||
| + | time.sleep(int(s)*0.001) | ||
| + | </code> | ||
| + | === Crear tu propio Streamer & Forwarder === | ||
| + | Como los dos son unidireccionales, lo único que tendremos que hacer es copiar los datos de un socket al otro: | ||
| + | <code python> | ||
| + | import sys | ||
| + | import zmq | ||
| + | |||
| + | def zmq_streaming_device(a, b): | ||
| + | while True: | ||
| + | msg = a.recv() | ||
| + | more = a.getsockopt(zmq.RCVMORE) | ||
| + | if more: | ||
| + | b.send(msg, zmq.SNDMORE) | ||
| + | else: | ||
| + | b.send(msg) | ||
| + | sys.stdout.write('.') | ||
| + | sys.stdout.flush() | ||
| + | |||
| + | context = zmq.Context() | ||
| + | s1 = context.socket(zmq.PULL) | ||
| + | s2 = context.socket(zmq.PUSH) | ||
| + | s1.bind("tcp://*:5550") | ||
| + | s2.bind("tcp://*:5551") | ||
| + | zmq_streaming_device(s1, s2) | ||
| + | </code> | ||
| + | ===== Patrones Request/Reply ===== | ||
| + | ===== Patrones Publish/Subscribe ===== | ||
| ===== Avanzado ===== | ===== Avanzado ===== | ||
| ==== Control de flujo ==== | ==== Control de flujo ==== | ||
| Línea 251: | Línea 335: | ||
| ===== Notas ===== | ===== Notas ===== | ||
| * Saber la versión con ''zmq.zmq_version()''. | * Saber la versión con ''zmq.zmq_version()''. | ||
| + | * Para realizar conexiones no bloqueantes los usaremos con [[fw:gevent&#zeromq|greenlets]]. | ||
| + | * Para instalar la librería en el sistema: ''apt-get install libzmq-dev''. | ||
| + | ==== Instalar en Ubuntu ==== | ||
| + | Instalación de la librería del sistema: | ||
| + | <code> | ||
| + | $ sudo apt-get install python-dev libzmq-dev | ||
| + | </code> | ||
| + | Instalación en Python: | ||
| + | <code> | ||
| + | $ sudo pip install pyzmq | ||
| + | </code> | ||
| + | Recuerda que para utilizar ''pip'' has de tener instalado el paquete ''python-pip'' | ||
| + | ==== Instalar en Windows ==== | ||
| + | Necesitarás agregar el paquete ''VS2010 C++ redistributable''. | ||
| ==== Como... ==== | ==== Como... ==== | ||
| === Enviar\recibir varios paquetes === | === Enviar\recibir varios paquetes === | ||
| Línea 308: | Línea 406: | ||
| # Do the work | # Do the work | ||
| time.sleep(int(s)*0.001) | time.sleep(int(s)*0.001) | ||
| + | </code> | ||
| + | |||
| + | === Enviar no bloqueante === | ||
| + | * Pero cuidado, el objeto sender no se elminará hasta que los datos hayan sido enviados. | ||
| + | <code python> | ||
| + | sender.send(jdata, zmq.NOBLOCK) | ||
| </code> | </code> | ||