ZIO
Python and C++ interface to ZeroMQ and Zyre
test_broker.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 '''
3 Test flow broker with example handler
4 '''
5 import zmq
6 import zio
7 import json
8 from zio.flow import objectify, Broker
9 from zio.flow.example_handler import Factory
10 from zio.flow.backend import spawner
11 from pyre.zactor import ZActor
12 
13 server_address = "tcp://127.0.0.1:5679"
14 
15 def client_actor(ctx, pipe, *args):
16  pipe.signal()
17 
18  port = zio.Port("client", zmq.CLIENT,'')
19  port.connect(server_address)
20  port.online(None) # peer not needed if port only direct connects
21  cflow = zio.flow.Flow(port)
22 
23  msg = zio.Message(label=json.dumps(dict(direction='extract',credit=2)))
24  cflow.send_bot(msg)
25  print ("client sent BOT:\n%s\n" % (msg,))
26  msg = cflow.recv_bot(1000)
27  assert(msg)
28  print ("client got BOT:\n%s\n" % (msg,))
29 
30  for count in range(10):
31  cflow.put(zio.Message())
32 
33  print ("client sent DAT")
34  cflow.send_eot()
35  print ("client sent EOT")
36  msg = cflow.recv_eot()
37  assert(msg)
38  print ("client done")
39  pipe.recv() # wait for signal to exit
40 
41 
43 
44  ctx = zmq.Context()
45 
46  # note: normally, app goes through zio.Node
47  sport = zio.Port("server", zmq.SERVER);
48  sport.bind(server_address)
49  sport.do_binds()
50  sport.online()
51 
52  client = ZActor(ctx, client_actor)
53 
54  factory = Factory(server_address)
55  backend = ZActor(ctx, spawner, factory)
56  broker = Broker(sport, backend.pipe)
57 
58  for count in range(30):
59  print (f"main: poll [{count}]")
60  ok = broker.poll(1000) # client->handler
61  if not ok:
62  break
63 
64  print ("main: stopping")
65  broker.stop()
66  client.pipe.signal()
67 
68 if '__main__' == __name__:
69  test_broker()
70 
ZIO data flow.
Definition: flow.hpp:17
def test_broker()
Definition: test_broker.py:42
A port holds a socket in the context of a node.
Definition: port.hpp:27
a ZIO message
Definition: message.hpp:59
def client_actor(ctx, pipe, args)
Definition: test_broker.py:15