ZIO
Python and C++ interface to ZeroMQ and Zyre
tripping.py
Go to the documentation of this file.
1 """Round-trip demonstrator
2 
3 While this example runs in a single process, that is just to make
4 it easier to start and stop the example. Client thread signals to
5 main when it's ready.
6 
7 Originally from Zguide examples, generalized to use CLIENT/SERVER by
8 brett.viren@gmail.com
9 
10 """
11 
12 import sys
13 import threading
14 import time
15 
16 import zmq
17 
18 from .zhelpers import zpipe
19 from .zhelpers import serverish_recv, serverish_send
20 from .zhelpers import clientish_recv, clientish_send
21 
22 
23 def client_task (ctx, pipe, stype, requests=10000, verbose=False):
24  client = ctx.socket(stype)
25  client.identity = b'C'
26  client.connect("tcp://localhost:5555")
27 
28  print (f"Client with socket {stype}...")
29  time.sleep(0.5)
30 
31  print ("Synchronous round-trip test...")
32  start = time.time()
33  for r in range(requests):
34  clientish_send(client, f'hello s {r}'.encode('utf-8'))
35  #client.send(f'hello s {r}'.encode('utf-8'))
36  msg = clientish_recv(client)[0].decode('utf-8')
37  #msg = client.recv().decode('utf-8')
38  #print (f'->sc {msg}')
39  parts = msg.split()
40  assert(len(parts) == 3)
41  assert(parts[0] == 'hello')
42  assert(parts[1] == 's')
43  assert(parts[2] == str(r))
44  print (" %d calls/second" % (requests / (time.time()-start)))
45 
46  print ("Asynchronous round-trip test...")
47  start = time.time()
48  for r in range(requests):
49  clientish_send(client, f'hello a {r}'.encode('utf-8'))
50  for r in range(requests):
51  msg = clientish_recv(client)[0].decode('utf-8')
52  #print (f'->ac {msg}')
53  parts = msg.split()
54  assert(len(parts) == 3)
55  assert(parts[0] == 'hello')
56  assert(parts[1] == 'a')
57  assert(parts[2] == str(r))
58  print (" %d calls/second" % (requests / (time.time()-start)))
59 
60  # signal done:
61  pipe.send(b"done")
62 
63 def worker_task(stype, verbose=False):
64  ctx = zmq.Context()
65  worker = ctx.socket(stype)
66  worker.identity = b'W'
67  worker.connect("tcp://localhost:5556")
68  time.sleep(0.5)
69  print (f"Worker with socket {stype}...")
70  clientish_send(worker, b"greetings")
71  while True:
72  msg = clientish_recv(worker)
73  clientish_send(worker, msg)
74  ctx.destroy(0)
75 
76 def broker_task(fes_type, bes_type, verbose=False):
77  # Prepare our context and sockets
78  ctx = zmq.Context()
79  frontend = ctx.socket(fes_type)
80  backend = ctx.socket(bes_type)
81  frontend.bind("tcp://*:5555")
82  backend.bind("tcp://*:5556")
83  print (f"Broker with sockets {fes_type} <--> {bes_type}")
84 
85  # Initialize poll set
86  poller = zmq.Poller()
87  poller.register(backend, zmq.POLLIN)
88  poller.register(frontend, zmq.POLLIN)
89 
90  feid = beid = None
91 
92  beid, msg = serverish_recv(backend)
93  print (msg)
94  assert(msg[0] == b"greetings")
95 
96  fe_waiting = list()
97  be_waiting = list()
98 
99  while True:
100  try:
101  items = dict(poller.poll())
102  except:
103  break # Interrupted
104 
105  if frontend in items:
106  feid,msg = serverish_recv(frontend)
107  be_waiting.append(msg)
108 
109  if backend in items:
110  beid, msg = serverish_recv(backend)
111  fe_waiting.append(msg)
112 
113  if feid:
114  for msg in fe_waiting:
115  #print(f'b->f {msg}')
116  serverish_send(frontend, feid, msg)
117  fe_waiting.clear()
118 
119  if beid:
120  for msg in be_waiting:
121  #print(f'f->b {msg}')
122  serverish_send(backend, beid, msg)
123  be_waiting.clear()
124 
125 def main(fes_type=zmq.ROUTER, bes_type=zmq.ROUTER,
126  requests = 10000, verbose=False):
127  otype = {zmq.ROUTER:zmq.DEALER, zmq.SERVER:zmq.CLIENT}
128  fec_type = otype[fes_type]
129  bec_type = otype[bes_type]
130 
131  # Create threads
132  ctx = zmq.Context()
133  client,pipe = zpipe(ctx)
134 
135  client_thread = threading.Thread(target=client_task,
136  args=(ctx, pipe, fec_type, requests, verbose))
137  worker_thread = threading.Thread(target=worker_task,
138  args=(bec_type,verbose))
139  worker_thread.daemon=True
140  broker_thread = threading.Thread(target=broker_task,
141  args=(fes_type, bes_type, verbose))
142  broker_thread.daemon=True
143 
144  worker_thread.start()
145  broker_thread.start()
146  client_thread.start()
147 
148  # Wait for signal on client pipe
149  client.recv()
150 
151 if __name__ == '__main__':
152  main()
def main(fes_type=zmq.ROUTER, bes_type=zmq.ROUTER, requests=10000, verbose=False)
Definition: tripping.py:126
def clientish_recv(sock, args, kwds)
Definition: zhelpers.py:175
def zpipe(ctx)
Definition: zhelpers.py:50
def clientish_send(sock, msg, args, kwds)
Definition: zhelpers.py:191
def broker_task(fes_type, bes_type, verbose=False)
Definition: tripping.py:76
def serverish_send(sock, cid, msg, args, kwds)
Definition: zhelpers.py:156
def serverish_recv(sock, args, kwds)
Definition: zhelpers.py:134
def worker_task(stype, verbose=False)
Definition: tripping.py:63
def client_task(ctx, pipe, stype, requests=10000, verbose=False)
Definition: tripping.py:23