1 """Round-trip demonstrator 3 While this example runs in a single process, that is just to make 4 it easier to start and stop the example. Client thread signals to 7 Originally from Zguide examples, generalized to use CLIENT/SERVER by 18 from .zhelpers
import zpipe
19 from .zhelpers
import serverish_recv, serverish_send
20 from .zhelpers
import clientish_recv, clientish_send
23 def client_task (ctx, pipe, stype, requests=10000, verbose=False):
24 client = ctx.socket(stype)
25 client.identity = b
'C' 26 client.connect(
"tcp://localhost:5555")
28 print (f
"Client with socket {stype}...")
31 print (
"Synchronous round-trip test...")
33 for r
in range(requests):
40 assert(len(parts) == 3)
41 assert(parts[0] ==
'hello')
42 assert(parts[1] ==
's')
43 assert(parts[2] == str(r))
44 print (
" %d calls/second" % (requests / (time.time()-start)))
46 print (
"Asynchronous round-trip test...")
48 for r
in range(requests):
50 for r
in range(requests):
54 assert(len(parts) == 3)
55 assert(parts[0] ==
'hello')
56 assert(parts[1] ==
'a')
57 assert(parts[2] == str(r))
58 print (
" %d calls/second" % (requests / (time.time()-start)))
65 worker = ctx.socket(stype)
66 worker.identity = b
'W' 67 worker.connect(
"tcp://localhost:5556")
69 print (f
"Worker with socket {stype}...")
79 frontend = ctx.socket(fes_type)
80 backend = ctx.socket(bes_type)
81 frontend.bind(
"tcp://*:5555")
82 backend.bind(
"tcp://*:5556")
83 print (f
"Broker with sockets {fes_type} <--> {bes_type}")
87 poller.register(backend, zmq.POLLIN)
88 poller.register(frontend, zmq.POLLIN)
94 assert(msg[0] == b
"greetings")
101 items = dict(poller.poll())
105 if frontend
in items:
107 be_waiting.append(msg)
111 fe_waiting.append(msg)
114 for msg
in fe_waiting:
120 for msg
in be_waiting:
125 def main(fes_type=zmq.ROUTER, bes_type=zmq.ROUTER,
126 requests = 10000, verbose=False):
127 otype = {zmq.ROUTER:zmq.DEALER, zmq.SERVER:zmq.CLIENT}
128 fec_type = otype[fes_type]
129 bec_type = otype[bes_type]
133 client,pipe =
zpipe(ctx)
135 client_thread = threading.Thread(target=client_task,
136 args=(ctx, pipe, fec_type, requests, verbose))
137 worker_thread = threading.Thread(target=worker_task,
138 args=(bec_type,verbose))
139 worker_thread.daemon=
True 140 broker_thread = threading.Thread(target=broker_task,
141 args=(fes_type, bes_type, verbose))
142 broker_thread.daemon=
True 144 worker_thread.start()
145 broker_thread.start()
146 client_thread.start()
151 if __name__ ==
'__main__':
def main(fes_type=zmq.ROUTER, bes_type=zmq.ROUTER, requests=10000, verbose=False)
def clientish_recv(sock, args, kwds)
def clientish_send(sock, msg, args, kwds)
def broker_task(fes_type, bes_type, verbose=False)
def serverish_send(sock, cid, msg, args, kwds)
def serverish_recv(sock, args, kwds)
def worker_task(stype, verbose=False)
def client_task(ctx, pipe, stype, requests=10000, verbose=False)