11 return int(1000000*time.time())
28 self.
poller.register(port, zmq.POLLIN)
31 frame = self.
port.recv(copy=
False);
34 parts = zm.decode_message(frame.bytes);
35 ph = zm.PrefixHeader(parts[0])
37 ch = zm.CoordHeader(parts[1])
40 fobj = json.loads(ph.label)
42 credit = fobj[
"credit"]
43 direction = fobj[
"direction"]
45 if direction ==
"extract":
49 elif direction ==
"inject":
54 print (
"unknown direction %s" % direction)
58 ph.label = json.dumps(fobj)
62 ret = zm.encode_message([bytes(ph), bytes(ch)])
65 self.
port.send(ret, routing_id = frame.routing_id)
69 print (
"send_pay(broke)")
71 ph = zm.PrefixHeader(form=
"FLOW",
72 label=json.dumps(dict(flow=
"PAY",
77 pay = zm.encode_message([bytes(ph), bytes(ch)])
82 print(
"recv_pay(to=%d)", timeout)
84 if not self.
port in which:
86 frame = self.
port.recv(copy=
False, timeout=timeout);
88 print (
"unknown client")
90 parts = zm.decode_message(frame.bytes);
91 ph = zm.PrefixHeader(parts[0])
93 fobj = json.loads(ph.label)
94 if fobj[
"flow"] ==
"PAY":
95 self.
credit += fobj[
"credit"]
104 raise RuntimeError(
"no credit and failed to get payed")
105 ph = zm.PrefixHeader(form=
"FLOW", label=json.dumps({
'flow':
'DAT'}))
109 dat = zm.encode_message([bytes(ph), bytes(ch)])
110 print (
"send_dat",ph)
117 if not self.
port in which:
119 frame = self.
port.recv(copy=
False);
121 print (
"unknown client")
123 parts = zm.decode_message(frame.bytes);
124 ph = zm.PrefixHeader(parts[0])
126 ch = zm.CoordHeader(parts[1])
127 fobj = json.loads(ph.label)
128 if fobj[
"flow"] ==
"DAT":
134 ph = zm.PrefixHeader(form=
"FLOW",
135 label=json.dumps(dict(flow=
"EOT")))
137 eot = zm.encode_message([bytes(ph), bytes(ch)])
138 print (
"send_eot", ph)
142 if not self.
port in which:
144 frame = self.
port.recv(copy=
False)
145 parts = zm.decode_message(frame.bytes);
146 ph = zm.PrefixHeader(parts[0])
147 print(
"wait for eot, got",ph)
148 fobj = json.loads(ph.label)
149 if fobj[
"flow"] ==
"EOT":
159 sock = ctx.socket(zmq.SERVER)
160 addr =
"tcp://127.0.0.1:5678" 163 node = pyre.Pyre(
"testflows")
165 node.set_header(
"zio.port.%s.address"%portname, addr)
166 node.set_header(
"zio.port.%s.socket"%portname,
"SERVER")
182 if random.uniform(0,1) > 0.8:
186 print(
"sflow send EOT")
189 print(
"sflow recv EOT")
def recv_pay(self, timeout=0)
int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_=-1)
def recv_dat(self, timeout=-1)
def __init__(self, port, origin=42)