ZIO
Python and C++ interface to ZeroMQ and Zyre
client.py
Go to the documentation of this file.
1 """Majordomo Protocol Client API, Python version.
2 
3 Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
4 
5 Author: Min RK <benjaminrk@gmail.com>
6 Based on Java example by Arkadiusz Orzechowski
7 
8 Originally from Zguide examples, generalized to use CLIENT
9 or DEALER by brett.viren@gmail.com
10 """
11 
12 import logging
13 
14 import zmq
15 
16 from . import MDP
17 from .zhelpers import dump, clientish_recv, clientish_send
18 
19 class Client(object):
20  """Majordomo Protocol Client API, Python version.
21 
22  Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
23  """
24  broker = None
25  ctx = None
26  client = None
27  poller = None
28  timeout = 2500
29  verbose = False
30 
31  def __init__(self, broker, socket_type=zmq.ROUTER, verbose=False):
32  self.broker = broker
33  self.socket_type = socket_type
34  self.verbose = verbose
35  self.ctx = zmq.Context()
36  self.poller = zmq.Poller()
37  logging.basicConfig(format="%(asctime)s %(message)s",
38  datefmt="%Y-%m-%d %H:%M:%S",
39  level=logging.INFO)
40  self.reconnect_to_broker()
41 
42 
44  """Connect or reconnect to broker"""
45  if self.client:
46  self.poller.unregister(self.client)
47  self.client.close()
48  self.client = self.ctx.socket(self.socket_type)
49  self.client.linger = 0
50  self.client.connect(self.broker)
51  self.poller.register(self.client, zmq.POLLIN)
52  if self.verbose:
53  logging.info("I: connecting to broker at %s...", self.broker)
54 
55  def send(self, service, request):
56  """Send request to broker
57  """
58  if not isinstance(request, list):
59  request = [request]
60 
61  # Prefix request with protocol frames
62  # Frame 0: "MDPCxy" (six bytes, MDP/Client x.y)
63  # Frame 1: Service name (printable string)
64 
65  request = [MDP.C_CLIENT, service] + request
66  if self.verbose:
67  logging.warn("I: send request to '%s' service: ", service)
68  dump(request)
69  #self.client.send_multipart(request)
70  clientish_send(self.client, request)
71 
72  def recv(self):
73  """Returns the reply message or None if there was no reply."""
74  try:
75  items = self.poller.poll(self.timeout)
76  except KeyboardInterrupt:
77  return # interrupted
78 
79  if items:
80  # if we got a reply, process it
81  #msg = self.client.recv_multipart()
82  msg = clientish_recv(self.client)
83  if self.verbose:
84  logging.info("I: received reply:")
85  dump(msg)
86 
87  # Don't try to handle errors, just assert noisily
88  assert len(msg) >= 4
89 
90  header = msg.pop(0)
91  assert MDP.C_CLIENT == header
92 
93  service = msg.pop(0)
94  return msg
95  else:
96  logging.warn("W: permanent error, abandoning request")
def clientish_recv(sock, args, kwds)
Definition: zhelpers.py:175
def clientish_send(sock, msg, args, kwds)
Definition: zhelpers.py:191
def send(self, service, request)
Definition: client.py:55
int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_=-1)
Definition: zmq.hpp:259
void dump(const SM &) noexcept
Definition: _test_hsm.cpp:179
def reconnect_to_broker(self)
Definition: client.py:43
def __init__(self, broker, socket_type=zmq.ROUTER, verbose=False)
Definition: client.py:31