ZIO
Python and C++ interface to ZeroMQ and Zyre
worker.py
Go to the documentation of this file.
1 """Majordomo Protocol Worker API, Python version
2 
3 Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
4 
5 Author: Min RK <benjaminrk@gmail.com>
6 Based on Java example by Arkadiusz Orzechowski
7 
8 Originally from Zguide examples, generalized to use CLIENT
9 or DEALER following Generaldomo Protocol. brett.viren@gmail.com
10 """
11 
12 import logging
13 import time
14 import zmq
15 
16 from .zhelpers import dump, clientish_recv, clientish_send
17 # MajorDomo protocol constants:
18 from . import MDP
19 
20 class Worker(object):
21  """Generaldomo Protocol Worker API, Python version
22 
23  Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
24  Extended as described in https://brettviren.github.io/generaldomo/
25  """
26 
27  HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable
28  broker = None
29  ctx = None
30  service = None
31 
32  worker = None # Socket to broker
33  heartbeat_at = 0 # When to send HEARTBEAT (relative to time.time(), so in seconds)
34  liveness = 0 # How many attempts left
35  heartbeat = 2500 # Heartbeat delay, msecs
36  reconnect = 2500 # Reconnect delay, msecs
37 
38  # Internal state
39  expect_reply = False # False only at start
40 
41  timeout = 2500 # poller timeout
42  verbose = False # Print activity to stdout
43 
44  # Return address, if any
45  reply_to = None
46 
47 
48  def __init__(self, broker, service, socket_type=zmq.DEALER, verbose=False):
49  self.broker = broker
50  self.service = service
51  self.socket_type = socket_type
52  self.verbose = verbose
53  self.ctx = zmq.Context()
54  self.poller = zmq.Poller()
55  logging.basicConfig(format="%(asctime)s %(message)s",
56  datefmt="%Y-%m-%d %H:%M:%S",
57  level=logging.INFO)
58  self.reconnect_to_broker()
59 
60 
62  """Connect or reconnect to broker"""
63  if self.worker:
64  self.poller.unregister(self.worker)
65  self.worker.close()
66  self.worker = self.ctx.socket(self.socket_type)
67  self.worker.linger = 0
68  self.worker.connect(self.broker)
69  self.poller.register(self.worker, zmq.POLLIN)
70  if self.verbose:
71  logging.info("I: connecting to broker at %s...", self.broker)
72 
73  # Register service with broker
74  self.send_to_broker(MDP.W_READY, self.service, [])
75 
76  # If liveness hits zero, queue is considered disconnected
78  self.heartbeat_at = time.time() + 1e-3 * self.heartbeat
79 
80 
81  def send_to_broker(self, command, option=None, msg=None):
82  """Send message to broker.
83 
84  If no msg is provided, creates one internally
85  """
86  if msg is None:
87  msg = []
88  elif not isinstance(msg, list):
89  msg = [msg]
90 
91  if option:
92  msg = [option] + msg
93 
94  #msg = [b'', MDP.W_WORKER, command] + msg
95  msg = [MDP.W_WORKER, command] + msg
96  if self.verbose:
97  logging.info("I: sending %s to broker", command)
98  dump(msg)
99  #self.worker.send_multipart(msg)
100  clientish_send(self.worker, msg)
101 
102 
103  def recv(self, reply=None):
104  """Send reply, if any, to broker and wait for next request."""
105  # Format and send the reply if we were provided one
106  assert reply is not None or not self.expect_reply
107 
108  if reply is not None:
109  assert self.reply_to is not None
110  reply = [self.reply_to, b''] + reply
111  self.send_to_broker(MDP.W_REPLY, msg=reply)
112 
113  self.expect_reply = True
114 
115  while True:
116  # Poll socket for a reply, with timeout
117  try:
118  items = self.poller.poll(self.timeout)
119  except KeyboardInterrupt:
120  break # Interrupted
121 
122  if items:
123  #msg = self.worker.recv_multipart()
124  msg = clientish_recv(self.worker)
125  if self.verbose:
126  logging.info("I: received message from broker: ")
127  dump(msg)
128 
129  self.liveness = self.HEARTBEAT_LIVENESS
130  # Don't try to handle errors, just assert noisily
131  assert len(msg) >= 2
132 
133  header = msg.pop(0)
134  assert header == MDP.W_WORKER
135 
136  command = msg.pop(0)
137  if command == MDP.W_REQUEST:
138  # We should pop and save as many addresses as there are
139  # up to a null part, but for now, just save one...
140  self.reply_to = msg.pop(0)
141  # pop empty
142  empty = msg.pop(0)
143  assert empty == b''
144 
145  return msg # We have a request to process
146  elif command == MDP.W_HEARTBEAT:
147  # Do nothing for heartbeats
148  pass
149  elif command == MDP.W_DISCONNECT:
150  self.reconnect_to_broker()
151  else :
152  logging.error("E: invalid input message: ")
153  dump(msg)
154 
155  else:
156  self.liveness -= 1
157  if self.liveness == 0:
158  if self.verbose:
159  logging.warn("W: disconnected from broker - retrying...")
160  try:
161  time.sleep(1e-3*self.reconnect)
162  except KeyboardInterrupt:
163  break
164  self.reconnect_to_broker()
165 
166  # Send HEARTBEAT if it's time
167  if time.time() > self.heartbeat_at:
168  self.send_to_broker(MDP.W_HEARTBEAT)
169  self.heartbeat_at = time.time() + 1e-3*self.heartbeat
170 
171  logging.warn("W: interrupt received, killing worker...")
172  return None
173 
174 
175  def destroy(self):
176  # context.destroy depends on pyzmq >= 2.1.10
177  self.ctx.destroy(0)
def clientish_recv(sock, args, kwds)
Definition: zhelpers.py:175
def __init__(self, broker, service, socket_type=zmq.DEALER, verbose=False)
Definition: worker.py:48
def clientish_send(sock, msg, args, kwds)
Definition: zhelpers.py:191
def send_to_broker(self, command, option=None, msg=None)
Definition: worker.py:81
int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_=-1)
Definition: zmq.hpp:259
def reconnect_to_broker(self)
Definition: worker.py:61
void dump(const SM &) noexcept
Definition: _test_hsm.cpp:179
def recv(self, reply=None)
Definition: worker.py:103