14 Broker::Service::~Service () {
22 if (ZMQ_SERVER == stype) {
25 zio::debug(
"zio::domo::Broker with SERVER starting");
28 if(ZMQ_ROUTER == stype) {
31 zio::debug(
"zio::domo::Broker with ROUTER starting");
34 throw std::runtime_error(
"zio::domo::Broker requires SERVER or ROUTER socket");
39 while (! m_services.empty()) {
40 delete m_services.begin()->second;
41 m_services.erase(m_services.begin());
43 while (! m_workers.empty()) {
44 delete m_workers.begin()->second;
45 m_workers.erase(m_workers.begin());
52 zio::multipart_t mmsg;
54 assert(mmsg.size() > 0);
55 std::string header = mmsg.popstr();
57 zio::debug(
"zio::domo::Broker process client");
58 client_process(sender, mmsg);
61 zio::debug(
"zio::domo::Broker process worker");
62 worker_process(sender, mmsg);
65 zio::warn(
"zio::domo::Broker invalid message from " + sender);
72 if (
now < heartbeat_at) {
76 for (
auto& wrk : m_waiting) {
77 zio::debug(
"zio::domo::Broker heartbeat to worker");
78 zio::multipart_t mmsg;
81 send(m_sock, mmsg, wrk->identity);
90 zio::poller_t<> poller;
91 poller.add(m_sock, zio::event_flags::pollin);
94 if (heartbeat_at > now ) {
98 std::vector< zio::poller_event<> > events(1);
99 int rc = poller.wait_all(events,
timeout);
105 heartbeat_at += m_hb_interval;
110 void Broker::purge_workers()
114 std::vector<Worker*> dead;
115 for (
auto wrk : m_waiting) {
116 if (wrk->expiry <=
now) {
120 for (
auto wrk : dead) {
121 zio::debug(
"zio::domo::Broker deleting expired worker: " + wrk->identity);
122 worker_delete(wrk,0);
126 Broker::Service* Broker::service_require(std::string
name)
128 Service* srv = m_services[
name];
130 srv =
new Service{name};
131 m_services[
name] = srv;
132 zio::debug(
"zio::domo::Broker registering new service: " + name);
137 void Broker::service_internal(
remote_identity_t rid, std::string service_name, zio::multipart_t& mmsg)
139 zio::multipart_t response;
141 if (service_name ==
"mmi.service") {
142 std::string sn = mmsg.popstr();
143 Service* srv = m_services[sn];
144 if (srv and srv->nworkers) {
145 response.pushstr(
"200");
148 response.pushstr(
"404");
152 response.pushstr(
"501");
155 send(m_sock, response, rid);
158 void Broker::service_dispatch(Service* srv)
161 while (srv->waiting.size() and srv->requests.size()) {
163 std::list<Worker*>::iterator wrk_it = srv->waiting.begin();
164 std::list<Worker*>::iterator next = wrk_it;
165 for (++next; next != srv->waiting.end(); ++next) {
166 if ((*next)->expiry > (*wrk_it)->expiry) {
171 zio::multipart_t& mmsg = srv->requests.front();
173 send(m_sock, mmsg, (*wrk_it)->identity);
174 srv->requests.pop_front();
175 m_waiting.erase(*wrk_it);
176 srv->waiting.erase(wrk_it);
183 Worker* wrk = m_workers[identity];
185 wrk =
new Worker{identity};
186 m_workers[identity] = wrk;
187 zio::debug(
"zio::domo::Broker registering new worker");
192 void Broker::worker_delete(Broker::Worker*& wrk,
int disconnect)
195 zio::multipart_t mmsg;
198 zio::debug(
"zio::domo::Broker disconnect worker");
199 send(m_sock, mmsg, wrk->identity);
202 for (std::list<Worker*>::iterator it = wrk->service->waiting.begin();
203 it != wrk->service->waiting.end();) {
205 it = wrk->service->waiting.erase(it);
211 --wrk->service->nworkers;
213 m_waiting.erase(wrk);
214 m_workers.erase(wrk->identity);
221 assert(mmsg.size() >= 1);
222 const std::string command = mmsg.popstr();
223 bool worker_ready = (m_workers.find(
sender) != m_workers.end());
224 Worker* wrk = worker_require(
sender);
228 zio::error(
"zio::domo::Broker protocol error (double ready) from: " +
sender);
229 worker_delete(wrk, 1);
232 if (
sender.size() >= 4 &&
sender.find_first_of(
"mmi.") == 0) {
233 zio::error(
"zio::domo::Broker protocol error (worker mmi) from: " +
sender);
234 worker_delete(wrk, 1);
238 std::string service_name = mmsg.popstr();
239 wrk->service = service_require(service_name);
240 wrk->service->nworkers++;
246 worker_delete(wrk, 1);
251 mmsg.pushstr(wrk->service->name);
253 zio::debug(
"zio::domo::Broker reply to client");
254 send(m_sock, mmsg, client_id);
260 worker_delete(wrk, 1);
263 wrk->expiry =
now_ms() + m_hb_expiry;
267 worker_delete(wrk, 0);
270 zio::error(
"zio::domo::Broker invalid input message " + command);
274 void Broker::worker_waiting(Broker::Worker* wrk)
276 m_waiting.insert(wrk);
277 wrk->service->waiting.push_back(wrk);
278 wrk->expiry =
now_ms() + m_hb_expiry;
280 service_dispatch(wrk->service);
285 std::string service_name = mmsg.popstr();
286 Service* srv = service_require(service_name);
287 if (service_name.size() >= 4 and service_name.find_first_of(
"mmi.") == 0) {
288 service_internal(client_id, service_name, mmsg);
291 mmsg.pushmem(NULL,0);
292 mmsg.pushstr(client_id);
295 srv->requests.emplace_back(std::move(mmsg));
296 service_dispatch(srv);
319 zio::poller_t<> poller;
320 poller.add(link, zio::event_flags::pollin);
321 poller.add(sock, zio::event_flags::pollin);
325 if (heartbeat_at > now ) {
330 std::vector< zio::poller_event<> > events(2);
331 int nevents = poller.wait_all(events,
timeout);
332 for (
int iev=0; iev < nevents; ++iev) {
334 if (events[iev].socket == sock) {
339 if (events[iev].socket == link) {
342 auto res = events[0].socket.recv(msg, zio::recv_flags::dontwait);
344 std::stringstream
ss;
345 ss <<
"msg: " << msg.
size();
355 heartbeat_at += hb_interval;
360 auto res = link.recv(die);
void send_router(socket_t &router_socket, multipart_t &mmsg, remote_identity_t rid)
size_t send(const void *buf_, size_t len_, int flags_=0)
def broker(verbose, socket, address)
void send_server(socket_t &server_socket, multipart_t &mmsg, remote_identity_t rid)
remote_identity_t recv_router(socket_t &router_socket, multipart_t &mmsg)
std::chrono::milliseconds now_ms()
const char * ready
Worker commands as strings.
void proc_heartbeat(time_unit_t heartbeat_at)
Do heartbeat processing given next heatbeat time.
void getsockopt(int option_, void *optval_, size_t *optvallen_) const
void broker_actor(zio::socket_t &link, std::string address, int socktype)
size_t size() const ZMQ_NOTHROW
void bind(std::string const &addr)
void proc_one()
Process one input on socket.
const time_unit_t HEARTBEAT_INTERVAL
remote_identity_t recv_server(socket_t &server_socket, multipart_t &mmsg)
std::string remote_identity_t
Broker(zio::socket_t &sock)
std::chrono::milliseconds time_unit_t