ZIO
Python and C++ interface to ZeroMQ and Zyre
broker.hpp
Go to the documentation of this file.
1 #ifndef ZIO_DOMO_BROKER_HPP_SEEN
2 #define ZIO_DOMO_BROKER_HPP_SEEN
3 
4 #include "zio/util.hpp"
5 #include <unordered_map>
6 #include <unordered_set>
7 #include <deque>
8 #include <list>
9 #include <functional>
10 
11 namespace zio {
12 namespace domo {
13 
14 
17  class Broker {
18  public:
19 
24  ~Broker();
25 
28  void start();
29 
31  void proc_one();
32 
34  void proc_heartbeat(time_unit_t heartbeat_at);
35 
36  private:
37 
38  std::function<remote_identity_t(zio::socket_t& server_socket,
39  zio::multipart_t& mmsg)> recv;
40  std::function<void(zio::socket_t& server_socket,
41  zio::multipart_t& mmsg, remote_identity_t rid)> send;
42 
43  struct Service;
44 
45  // This is a proxy for the remote worker
46  struct Worker {
47  // The identity of a worker.
48  remote_identity_t identity;
49 
50  // The owner, if known.
51  Service* service{nullptr};
52  // Expire the worker at this time, heartbeat refreshes.
53  time_unit_t expiry{0};
54  };
55 
56  // This collects workers for a given service
57  struct Service {
58 
59  // Service name, that is the "thing" that its workers know how to do.
60  std::string name;
61 
62  // List of client requests for this service. Each holds a
63  // full 7/MDP message starting with Frame 1.
64  std::deque<zio::multipart_t> requests;
65 
66  // List of waiting workers.
67  std::list<Worker*> waiting;
68 
69  // How many workers the service has
70  size_t nworkers{0};
71 
72  ~Service ();
73  };
74 
75  private:
76  void purge_workers();
77  Service* service_require(std::string name);
78  void service_dispatch(Service* srv);
79  void service_internal(remote_identity_t rid, std::string service_name,
80  zio::multipart_t& mmsg);
81 
82  Worker* worker_require(remote_identity_t identity);
83  void worker_delete(Worker*& wrk, int disconnect);
84 
85  void worker_process(remote_identity_t sender, zio::multipart_t& mmsg);
86  void worker_waiting(Worker* wkr);
87 
88  void client_process(remote_identity_t client_id, zio::multipart_t& mmsg);
89 
90  private:
91 
92  zio::socket_t& m_sock;
93 
94  // fixme: make configurable
95  time_unit_t m_hb_interval{HEARTBEAT_INTERVAL};
96  time_unit_t m_hb_expiry{HEARTBEAT_EXPIRY};
97 
98  std::unordered_map<remote_identity_t, Service*> m_services;
99  std::unordered_map<remote_identity_t, Worker*> m_workers;
100  std::unordered_set<Worker*> m_waiting;
101  };
102 
103 
117  void broker_actor(zio::socket_t& link, std::string address, int socktype);
118 
119 
120 }
121 }
122 #endif
const char * disconnect
Definition: protocol.hpp:25
name
Definition: setup.py:4
void proc_heartbeat(time_unit_t heartbeat_at)
Do heartbeat processing given next heatbeat time.
Definition: domo_broker.cpp:69
void broker_actor(zio::socket_t &link, std::string address, int socktype)
const time_unit_t HEARTBEAT_EXPIRY
Definition: util.hpp:16
void proc_one()
Process one input on socket.
Definition: domo_broker.cpp:50
const time_unit_t HEARTBEAT_INTERVAL
Definition: util.hpp:15
std::string remote_identity_t
Definition: util.hpp:24
implementation of ZIO data flow protocol endpoints
Definition: actor.hpp:14
Broker(zio::socket_t &sock)
Definition: domo_broker.cpp:18
std::chrono::milliseconds time_unit_t
Definition: util.hpp:11