2 Majordomo Protocol broker 3 A minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8 5 Author: Min RK <benjaminrk@gmail.com> 6 Based on Java example by Arkadiusz Orzechowski 8 Originally from Zguide examples, generalized to use SERVER 9 or ROUTER by brett.viren@gmail.com 15 from binascii
import hexlify
21 from .zhelpers
import dump, serverish_recv, serverish_send
24 if isinstance(sender, int):
25 return sender.to_bytes(4, sys.byteorder)
26 return hexlify(sender)
30 """a single Service""" 41 """a Worker, idle or active""" 47 def __init__(self, identity, address, lifetime):
50 self.
expiry = time.time() + 1e-3*lifetime
54 Majordomo Protocol broker 55 A minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8 59 INTERNAL_SERVICE_PREFIX = b
"mmi." 60 HEARTBEAT_LIVENESS = 3
61 HEARTBEAT_INTERVAL = 2500
62 HEARTBEAT_EXPIRY = HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
80 def __init__(self, socket_type = zmq.ROUTER, verbose=False):
81 """Initialize broker state.""" 87 self.
ctx = zmq.Context()
92 logging.basicConfig(format=
"%(asctime)s %(message)s",
93 datefmt=
"%Y-%m-%d %H:%M:%S",
101 """Main broker work happens here""" 105 except KeyboardInterrupt:
110 logging.info(
"I: received message:")
115 if (MDP.C_CLIENT == header):
117 elif (MDP.W_WORKER == header):
120 logging.error(
"E: invalid message:")
127 """Disconnect all workers, destroy context.""" 134 """Process a request coming from a client.""" 146 """Process message sent to us by a worker.""" 155 if (MDP.W_READY == command):
166 elif (MDP.W_REPLY == command):
173 msg = [MDP.C_CLIENT, worker.service.name] + msg
180 elif (MDP.W_HEARTBEAT == command):
186 elif (MDP.W_DISCONNECT == command):
189 logging.error(
"E: invalid message:")
193 """Deletes worker from all data structures, and deletes worker.""" 194 assert worker
is not None 198 if worker.service
is not None:
199 worker.service.waiting.remove(worker)
200 self.
workers.pop(worker.identity)
203 """Finds the worker (creates if necessary).""" 204 assert (address
is not None)
206 worker = self.
workers.get(identity)
209 self.
workers[identity] = worker
211 logging.info(
"I: registering new worker: %s", identity)
216 """Locates the service (creates if necessary).""" 217 assert (name
is not None)
219 if (service
is None):
226 """Bind broker to endpoint, can call this multiple times. 228 We use a single socket for both clients and workers. 231 logging.info(
"I: MDP broker/0.1.1 is active at %s", endpoint)
234 """Handle internal service according to 8/MMI specification""" 236 if b
"mmi.service" == service:
238 returncode = b
"200" if name
in self.
services else b
"404" 244 msg = [MDP.C_CLIENT, service] + msg[2:]
249 """Send heartbeats to idle workers if it's time""" 257 """Look for & kill expired workers. 259 Workers are oldest to most recent, so we stop at the first alive worker. 263 if w.expiry < time.time():
264 logging.info(
"I: deleting expired worker: %s", w.identity)
271 """This worker is now waiting for work.""" 274 worker.service.waiting.append(worker)
279 """Dispatch requests to waiting workers as possible""" 280 assert (service
is not None)
282 service.requests.append(msg)
284 while service.waiting
and service.requests:
285 msg = service.requests.pop(0)
286 worker = service.waiting.pop(0)
291 """Send message to worker. 293 If message is provided, sends that message. 298 elif not isinstance(msg, list):
303 if option
is not None:
306 msg = [MDP.W_WORKER, command] + msg
309 logging.info(
"I: sending %r to worker", command)
317 """create and start new broker""" 318 verbose =
'-v' in sys.argv
319 broker = MajorDomoBroker(verbose)
320 broker.bind(
"tcp://*:5555")
323 if __name__ ==
'__main__':
def process_worker(self, sender, msg)
def delete_worker(self, worker, disconnect)
def require_worker(self, address)
def process_client(self, sender, msg)
def send_to_worker(self, worker, command, option, msg=None)
int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_=-1)
def send_heartbeats(self)
def serverish_send(sock, cid, msg, args, kwds)
def dispatch(self, service, msg)
def service_internal(self, service, msg)
def serverish_recv(sock, args, kwds)
def __init__(self, socket_type=zmq.ROUTER, verbose=False)
string INTERNAL_SERVICE_PREFIX
void append(Message &msg, message_t &&data, const std::vector< size_t > &shape, size_t word_size, const char *tn)
def worker_waiting(self, worker)
void dump(const SM &) noexcept
def __init__(self, identity, address, lifetime)
def require_service(self, name)