ZIO
Python and C++ interface to ZeroMQ and Zyre
test_flowbroker.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 import logging
3 logging.basicConfig(level=logging.DEBUG,
4  format='%(asctime)s.%(msecs)03d %(levelname)s\t%(message)s',
5  datefmt='%Y-%m-%d %H:%M:%S')
6 log = logging.getLogger("zio")
7 
8 
9 import json
10 import zmq
11 import zio
12 from zio.flow import objectify, Broker
13 
14 from pyre.zactor import ZActor
15 
16 
17 
18 def client_actor(ctx, pipe, address):
19  'An actor function talking to a broker on given address'
20  pipe.signal()
21 
22  port = zio.Port("client", zmq.CLIENT,'')
23  port.connect(address)
24  port.online(None) # peer not needed if port only direct connects
25  log.debug ("made flow")
26  cflow = zio.flow.Flow(port)
27 
28  msg = zio.Message(seqno=0,
29  label=json.dumps(dict(direction='extract',credit=2)))
30  cflow.send_bot(msg)
31  log.debug (f'client sent BOT: {msg}')
32  msg = cflow.recv_bot(1000)
33  assert(msg)
34  log.debug (f'client got BOT: {msg}')
35  msg = zio.Message(seqno=1)
36  cflow.put(msg)
37  log.debug (f'client sent {msg}')
38  eot = zio.Message(seqno=2)
39  cflow.send_eot(eot)
40  log.debug (f'client sent {eot}')
41  eot = cflow.recv_eot()
42  assert(eot)
43  log.debug (f'client done with {eot}')
44  pipe.recv() # wait for signal to exit
45 
46 
47 def dumper(ctx, pipe, bot, address):
48  '''
49  A dump handler which may be used as an actor talking to a broker's botport.
50 
51  Parameters
52  ----------
53  bot : zio.Message
54  Our initiating BOT message
55  address : string
56  A ZeroMQ address string for a bound broker SERVER socket
57  '''
58  poller = zmq.Poller()
59  poller.register(pipe, zmq.POLLIN)
60  pipe.signal() # ready
61 
62  port = zio.Port("client", zmq.CLIENT,'')
63  port.connect(address)
64  port.online(None) # peer not needed if port only direct connects
65  flow = zio.flow.Flow(port)
66  poller.register(flow.port.sock, zmq.POLLIN)
67 
68  log.debug (f'dumper: send {bot}')
69 
70  flow.send_bot(bot)
71  bot = flow.recv_bot(1000)
72  assert(bot)
73 
74  # must explicitly break the deadlock of us waiting for DAT before
75  # triggering the poll so that get() will implicitly send PAY so
76  # that the other end can send DAT. 3rd base.
77  flow.flush_pay()
78 
79  interupted = False
80  keep_going = True
81  while keep_going:
82 
83  for sock,_ in poller.poll():
84 
85  if sock == pipe:
86  log.debug ("dumper: pipe hit")
87  data = pipe.recv()
88  if data == b'STOP':
89  log.debug ("dumper: got STOP")
90  if len(data) == 0:
91  log.debug ("dumper: got signal")
92  interupted = True
93  return
94 
95  # got flow messages
96  msg = flow.get()
97  log.debug (f'dumper: sock hit: {msg}')
98  if msg is None:
99  log.debug ("dumper: null message from get, sending EOT")
100  flow.send_eot()
101  poller.unregister(sock)
102  keep_going = False
103  break
104 
105  log.debug("dumper: taking port offline")
106  port.offline()
107  if not interupted:
108  log.debug("dumper: waiting for quit")
109  pipe.recv()
110  log.debug("dumper: done")
111  return
112 
113 class Factory:
114  def __init__(self, address):
115  self.handlers = list()
116  self.address = address # broker
117  self.ctx = zmq.Context()
118 
119  def __call__(self, bot):
120  fobj = json.loads(bot.label)
121  if fobj['direction'] == 'extract': # my direction.
122  return # only handle inject
123  actor = ZActor(self.ctx, dumper, bot, self.address)
124  self.handlers.append(actor)
125  return True
126 
127  def stop(self):
128  for actor in self.handlers:
129  log.debug("factory stopping handler")
130  actor.pipe.signal()
131  del(actor)
133 
134  ctx = zmq.Context()
135 
136  node = zio.Node("dumper")
137  server = node.port("server", zmq.SERVER)
138  server_address = "tcp://127.0.0.1:5678"
139  server.bind(server_address)
140  node.online()
141 
142  client = ZActor(ctx, client_actor, server_address)
143 
144  factory = Factory(server_address)
145  broker = Broker(server, factory)
146 
147  for count in range(10):
148  log.debug (f"main: poll [{count}]")
149  try:
150  broker.poll(1000)
151  except TimeoutError as te:
152  log.warning(te)
153  # not the most elegant nor robust way to shutdown
154  break
155 
156  log.debug (f"main: stop broker")
157  broker.stop()
158  log.debug (f"main: node offline")
159  node.offline()
160  log.debug (f"main: stop client")
161  client.pipe.signal()
162 
163 
164 if '__main__' == __name__:
165  test_dumper()
An identified vertex in a ported, directed graph.
Definition: node.hpp:30
ZIO data flow.
Definition: flow.hpp:17
def __init__(self, address)
A port holds a socket in the context of a node.
Definition: port.hpp:27
def client_actor(ctx, pipe, address)
def dumper(ctx, pipe, bot, address)
void append(Message &msg, message_t &&data, const std::vector< size_t > &shape, size_t word_size, const char *tn)
Definition: tens.cpp:34
a ZIO message
Definition: message.hpp:59