ZIO
Python and C++ interface to ZeroMQ and Zyre
test_ugly.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 import zmq
4 import pyre
5 import zio.message as zm
6 import json
7 import time
8 import random
9 
10 def now():
11  return int(1000000*time.time())
12 
13 
14 # fixme/warning: this example mashes two ZIO layers together. The
15 # "real" ZIO has a port layer between application and socket. When
16 # you see "port" here, it's really a low level socket and the port
17 # function is hard-wired in the server methods.
18 
20  def __init__(self, port, origin=42):
21  self.port = port
22  self.origin = origin
23  self.credit = 0
24  self.direction = ""
25  self.count = 0
26  self.client_id = 0;
27  self.poller = zmq.Poller()
28  self.poller.register(port, zmq.POLLIN)
29 
30  def do_bot(self):
31  frame = self.port.recv(copy=False);
32  print (type(frame))
33  print (frame.bytes)
34  parts = zm.decode_message(frame.bytes);
35  ph = zm.PrefixHeader(parts[0])
36  print(ph)
37  ch = zm.CoordHeader(parts[1])
38  print(ch)
39 
40  fobj = json.loads(ph.label)
41 
42  credit = fobj["credit"]
43  direction = fobj["direction"]
44 
45  if direction == "extract":
46  self.direction = "inject"
47  self.credit = credit
48  self.is_sender = False
49  elif direction == "inject":
50  self.direction = "extract"
51  self.credit = 0
52  self.is_sender = True
53  else:
54  print ("unknown direction %s" % direction)
55  return
56  fobj["direction"] = self.direction
57 
58  ph.label = json.dumps(fobj)
59  ch.origin = self.origin
60  ch.granule = now()
61 
62  ret = zm.encode_message([bytes(ph), bytes(ch)])
63  print (ret)
64  self.client_id = frame.routing_id
65  self.port.send(ret, routing_id = frame.routing_id)
66 
67  def send_pay(self):
68  if self.credit == 0:
69  print ("send_pay(broke)")
70  return
71  ph = zm.PrefixHeader(form="FLOW",
72  label=json.dumps(dict(flow="PAY",
73  credit=self.credit)))
74  ch = zm.CoordHeader(self.origin, now(), self.count)
75  self.count += 1
76  self.credit = 0
77  pay = zm.encode_message([bytes(ph), bytes(ch)])
78  print ("send_pay",ph)
79  self.port.send(pay, routing_id = self.client_id)
80 
81  def recv_pay(self, timeout=0):
82  print("recv_pay(to=%d)", timeout)
83  which = dict(self.poller.poll(timeout))
84  if not self.port in which:
85  return False
86  frame = self.port.recv(copy=False, timeout=timeout);
87  if frame.routing_id != self.client_id:
88  print ("unknown client")
89  return
90  parts = zm.decode_message(frame.bytes);
91  ph = zm.PrefixHeader(parts[0])
92  print("recv_pay",ph)
93  fobj = json.loads(ph.label)
94  if fobj["flow"] == "PAY":
95  self.credit += fobj["credit"]
96  return True
97  return False
98 
99  def send_dat(self):
100  self.recv_pay(0)
101  if self.credit == 0:
102  self.recv_pay(-1)
103  if self.credit == 0:
104  raise RuntimeError("no credit and failed to get payed")
105  ph = zm.PrefixHeader(form="FLOW", label=json.dumps({'flow':'DAT'}))
106  ch = zm.CoordHeader(self.origin, now(), self.count)
107  self.count += 1
108  self.credit -= 1
109  dat = zm.encode_message([bytes(ph), bytes(ch)])
110  print ("send_dat",ph)
111  self.port.send(day, routing_id = self.client_id)
112 
113  def recv_dat(self, timeout=-1):
114  self.send_pay();
115  print("recv_dat()")
116  which = dict(self.poller.poll(timeout))
117  if not self.port in which:
118  return False
119  frame = self.port.recv(copy=False);
120  if frame.routing_id != self.client_id:
121  print ("unknown client")
122  return
123  parts = zm.decode_message(frame.bytes);
124  ph = zm.PrefixHeader(parts[0])
125  print("recv_dat",ph)
126  ch = zm.CoordHeader(parts[1])
127  fobj = json.loads(ph.label)
128  if fobj["flow"] == "DAT":
129  self.credit += 1
130  return True
131  return False
132 
133  def eot(self, timeout=0):
134  ph = zm.PrefixHeader(form="FLOW",
135  label=json.dumps(dict(flow="EOT")))
136  ch = zm.CoordHeader(self.origin, now(), self.count)
137  eot = zm.encode_message([bytes(ph), bytes(ch)])
138  print ("send_eot", ph)
139  self.port.send(eot, routing_id = self.client_id);
140  while True:
141  which = dict(self.poller.poll(timeout))
142  if not self.port in which:
143  return False
144  frame = self.port.recv(copy=False)
145  parts = zm.decode_message(frame.bytes);
146  ph = zm.PrefixHeader(parts[0])
147  print("wait for eot, got",ph)
148  fobj = json.loads(ph.label)
149  if fobj["flow"] == "EOT":
150  print ("got eot")
151  return
152  if timeout == 0:
153  return
154 
155 
156 
157 
158 ctx = zmq.Context()
159 sock = ctx.socket(zmq.SERVER)
160 addr = "tcp://127.0.0.1:5678"
161 sock.bind(addr)
162 
163 node = pyre.Pyre("testflows")
164 portname = "recver"
165 node.set_header("zio.port.%s.address"%portname, addr)
166 node.set_header("zio.port.%s.socket"%portname, "SERVER")
167 node.start()
168 
169 
170 
171 ss = SimpleServer(sock);
172 ss.do_bot()
173 I_quit = False
174 while True:
175  print ("loop")
176  if ss.is_sender:
177  ss.send_dat()
178  else:
179  ok = ss.recv_dat()
180  if not ok:
181  break
182  if random.uniform(0,1) > 0.8:
183  I_quit = True
184  break
185 if I_quit:
186  print("sflow send EOT")
187  ss.eot(-1)
188 else:
189  print("sflow recv EOT")
190  ss.eot(0)
191 print ("done")
192 node.stop() # else the program continues running
def recv_pay(self, timeout=0)
Definition: test_ugly.py:81
def now()
Definition: test_ugly.py:10
int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_=-1)
Definition: zmq.hpp:259
def recv_dat(self, timeout=-1)
Definition: test_ugly.py:113
def __init__(self, port, origin=42)
Definition: test_ugly.py:20
def eot(self, timeout=0)
Definition: test_ugly.py:133