11 , m_address(broker_address)
14 zio::debug(
"zio::domo::Worker constructing on " + m_address);
16 if (ZMQ_CLIENT == stype) {
20 else if (ZMQ_DEALER == stype) {
25 throw std::runtime_error(
"worker must be given DEALER or CLIENT socket");
28 connect_to_broker(
false);
37 void Worker::connect_to_broker(
bool reconnect)
40 zio::debug(
"zio::domo::Worker disconnect from " + m_address);
48 zio::debug(
"zio::domo::Worker connect to " + m_address);
50 zio::multipart_t mmsg;
51 mmsg.pushstr(m_service);
54 really_send(m_sock, mmsg);
57 m_heartbeat_at =
now_ms() + m_heartbeat;
65 reply.pushmem(NULL,0);
66 reply.pushstr(m_reply_to);
69 really_send(m_sock, reply);
74 zio::poller_t<> poller;
75 poller.add(m_sock, zio::event_flags::pollin);
77 std::vector< zio::poller_event<> > events(1);
78 int rc = poller.wait_all(events, m_heartbeat);
80 zio::multipart_t mmsg;
81 really_recv(m_sock, mmsg);
83 std::string header = mmsg.popstr();
85 std::string command = mmsg.popstr();
87 m_reply_to = mmsg.popstr();
89 request = std::move(mmsg);
99 zio::warn(
"zio::domo::Worker invalid command: " + command);
104 if (m_liveness == 0) {
105 zio::debug(
"zio::domo::Worker disconnect from broker - retrying...");
110 if (
now_ms() >= m_heartbeat_at) {
111 zio::multipart_t mmsg;
114 really_send(m_sock, mmsg);
115 m_heartbeat_at += m_heartbeat;
128 if (request.empty()) {
134 zio::info(
"zio::domo::Worker interupt received, killing worker");
137 return zio::multipart_t{};
146 Worker worker(sock, address,
"echo");
147 zio::debug(
"worker echo created on " + address);
149 zio::poller_t<> poller;
150 poller.add(link, zio::event_flags::pollin);
151 poller.add(sock, zio::event_flags::pollin);
161 zio::multipart_t
reply;
165 std::vector< zio::poller_event<> > events(2);
166 int nevents = poller.wait_all(events, poll_resolution);
167 for (
int iev=0; iev < nevents; ++iev) {
169 if (events[iev].socket == link) {
174 if (events[iev].socket == sock) {
177 worker.
recv(request);
178 if (request.empty()) {
179 zio::warn(
"worker echo got null request");
182 reply = std::move(request);
190 auto res = link.recv(die, zio::recv_flags::none);
void sleep_ms(std::chrono::milliseconds zzz)
void recv(zio::multipart_t &request)
void send_client(socket_t &client_socket, multipart_t &mmsg)
void setsockopt(int option_, T const &optval)
size_t send(const void *buf_, size_t len_, int flags_=0)
void echo_worker(zio::socket_t &link, std::string address, int socktype)
std::chrono::milliseconds now_ms()
const char * ready
Worker commands as strings.
void getsockopt(int option_, void *optval_, size_t *optvallen_) const
void send_dealer(socket_t &dealer_socket, multipart_t &mmsg)
void connect(std::string const &addr)
void recv_client(socket_t &client_socket, multipart_t &mmsg)
Worker(zio::socket_t &sock, std::string broker_address, std::string service)
zio::multipart_t work(zio::multipart_t &reply)
void recv_dealer(socket_t &dealer_socket, multipart_t &mmsg)
void disconnect(std::string const &addr)
const int HEARTBEAT_LIVENESS
void send(zio::multipart_t &reply)
std::chrono::milliseconds time_unit_t