ZIO
Python and C++ interface to ZeroMQ and Zyre
domo_client.cpp
Go to the documentation of this file.
1 #include "zio/domo/client.hpp"
2 #include "zio/domo/protocol.hpp"
3 #include "zio/logging.hpp"
4 
5 using namespace zio::domo;
6 
7 Client::Client(zio::socket_t& sock, std::string broker_address)
8  : m_sock(sock)
9  , m_address(broker_address)
10 {
11  int stype = m_sock.getsockopt<int>(ZMQ_TYPE);
12  if (ZMQ_CLIENT == stype) {
13  really_recv = recv_client;
14  really_send = send_client;
15  }
16  else if (ZMQ_DEALER == stype) {
17  really_recv = recv_dealer;
18  really_send = send_dealer;
19  }
20  else {
21  throw std::runtime_error("client must be given DEALER or CLIENT socket");
22  }
23 
24  connect_to_broker(false);
25 }
26 
27 
29 
30 void Client::connect_to_broker(bool reconnect)
31 {
32  if (reconnect) {
33  m_sock.disconnect(m_address);
34  }
35 
36  int linger=0;
37  m_sock.setsockopt(ZMQ_LINGER, linger);
38  // set socket routing ID?
39  m_sock.connect(m_address);
40  zio::debug("zio::domo::Client connect to " + m_address);
41 }
42 
43 
44 void Client::send(std::string service, zio::multipart_t& request)
45 {
46  request.pushstr(service); // frame 2
47  request.pushstr(mdp::client::ident); // frame 1
48  zio::debug("zio::domo::Client send request for " + service);
49  really_send(m_sock, request);
50 }
51 
52 
53 
54 void Client::recv(zio::multipart_t& reply)
55 {
56  zio::poller_t<> poller;
57  poller.add(m_sock, zio::event_flags::pollin);
58 
59  std::vector< zio::poller_event<> > events(1);
60  int rc = poller.wait_all(events, m_timeout);
61  if (rc > 0) { // got one
62  zio::multipart_t mmsg;
63  really_recv(m_sock, mmsg);
64 
65  std::string header = mmsg.popstr();
66  assert(header == mdp::client::ident);
67 
68  std::string service = mmsg.popstr();
69  reply = std::move(mmsg);
70  return; // success
71  }
72  if ( interrupted() ) {
73  zio::error("zio::domo::Client interupted on recv");
74  }
75  else {
76  zio::error("zio::domo::Client timeout");
77  }
78  reply.clear();
79  return;
80 }
81 
82 
void send_client(socket_t &client_socket, multipart_t &mmsg)
const char * reply
Definition: protocol.hpp:23
void setsockopt(int option_, T const &optval)
Definition: zmq.hpp:1279
bool interrupted()
Definition: util.cpp:194
const char * request
Definition: protocol.hpp:22
Client(zio::socket_t &sock, std::string broker_address)
Definition: domo_client.cpp:7
void getsockopt(int option_, void *optval_, size_t *optvallen_) const
Definition: zmq.hpp:1291
void send_dealer(socket_t &dealer_socket, multipart_t &mmsg)
void recv(zio::multipart_t &reply)
Definition: domo_client.cpp:54
void connect(std::string const &addr)
Definition: zmq.hpp:1324
void recv_client(socket_t &client_socket, multipart_t &mmsg)
void recv_dealer(socket_t &dealer_socket, multipart_t &mmsg)
void disconnect(std::string const &addr)
Definition: zmq.hpp:1333
const char * ident
Definition: protocol.hpp:14
void send(std::string service, zio::multipart_t &request)
Definition: domo_client.cpp:44