ZIO
Python and C++ interface to ZeroMQ and Zyre
test_flow.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 '''
3 test zio.flow
4 '''
5 
6 import time
7 import unittest
8 import zmq
9 from zio import Node, Message, CoordHeader
10 from zio.flow import Flow, stringify, objectify
11 
12 class TestFlow(unittest.TestCase):
13 
14  origin = 42
15 
16  def setUp(self):
17  self.snode = Node("server", self.origin)
18  sport = self.snode.port("sport", zmq.SERVER)
19  sport.bind()
20  self.snode.online()
21  self.sflow = Flow(sport)
22 
23  self.cnode = Node("client")
24  cport = self.cnode.port("cport", zmq.CLIENT)
25  cport.connect("server", "sport")
26  self.cnode.online()
27  self.cflow = Flow(cport)
28 
29  def test_conversation(self):
30 
31  # cflow is recver
32  bot = Message(label='{"credit":2,"direction":"inject"}')
33  self.cflow.send_bot(bot)
34  bot = self.sflow.recv_bot(1000);
35  assert(bot)
36  assert(self.sflow.credit == 0)
37  assert(self.sflow.total_credit == 2)
38 
39  # sflow is sender
40  bot = Message(label='{"credit":2,"direction":"extract"}')
41  self.sflow.send_bot(bot)
42  bot = self.cflow.recv_bot(1000);
43  assert(bot)
44  assert(self.cflow.credit == 2)
45  assert(self.cflow.total_credit == 2)
46 
47  self.cflow.flush_pay()
48  assert(self.cflow.credit == 0)
49  c = self.sflow.slurp_pay()
50  assert (c==2)
51  assert(self.sflow.credit == 2)
52 
53  for count in range(10):
54  # note, seqno normally should sequential
55  self.sflow.put(Message(coord=CoordHeader(seqno=100+count)))
56  self.sflow.put(Message(coord=CoordHeader(seqno=200+count)))
57  dat = self.cflow.get()
58  assert(dat.seqno == 100+count)
59  dat = self.cflow.get()
60  assert(dat.seqno == 200+count)
61 
62  # normally, when a flow explicitly sends EOT the other end
63  # will recv the EOT when its trying to recv another message
64  # (PAY or DAT). In this test things are synchronous and so we
65  # explicitly recv_eot().
66  self.cflow.send_eot(Message())
67 
68  surprise = self.sflow.recv_eot(1000)
69  assert(surprise)
70  self.sflow.send_eot(Message())
71 
72  expected = self.cflow.recv_eot(1000)
73  assert(expected)
74 
75 
76  def test_flow_string(self):
77  msg = Message(label='{"extra":42}')
78  msg.label = stringify('DAT', **objectify(msg))
79  fobj = objectify(msg)
80  assert(fobj["extra"] == 42)
81  assert(fobj["flow"] == "DAT")
82 
83 
84 
85  def tearDown(self):
86  self.cnode.offline()
87  self.snode.offline()
88 
89  pass
90 
91 
92 if __name__ == '__main__':
93  unittest.main()
An identified vertex in a ported, directed graph.
Definition: node.hpp:30
def tearDown(self)
Definition: test_flow.py:85
ZIO data flow.
Definition: flow.hpp:17
def stringify(flowtype, params)
Definition: util.py:21
def test_flow_string(self)
Definition: test_flow.py:76
def objectify(morl)
Definition: util.py:5
def setUp(self)
Definition: test_flow.py:16
a ZIO message
Definition: message.hpp:59
def test_conversation(self)
Definition: test_flow.py:29