ZIO
Python and C++ interface to ZeroMQ and Zyre
actor.hpp
Go to the documentation of this file.
1 
7 #ifndef ZIO_ACTOR_HPP_SEEN
8 #define ZIO_ACTOR_HPP_SEEN
9 
10 #include "zio/cppzmq.hpp"
11 
12 #include <thread>
13 
14 namespace zio {
15 
20  inline
21  std::pair<socket_t, socket_t>
23  {
24  std::pair<socket_t, socket_t> ret{socket_t(ctx, socket_type::pair),
25  socket_t(ctx, socket_type::pair)};
26 
27  std::stringstream ss;
28  ss << "inproc://link-"
29  << std::hex
30  << ret.first.handle()
31  << "-"
32  << ret.second.handle();
33  std::string addr = ss.str();
34  ret.first.bind(addr.c_str());
35  ret.second.connect(addr.c_str());
36  return ret;
37  }
38 
75  class zactor_t {
76  public:
77  // Template constructor and not class to perform type erasure
78  // of the function type.
79  template<typename Func, typename... Args>
80  zactor_t(context_t &ctx, Func fn, Args... args) {
81  socket_t asock;
82  std::tie(asock, _sock) = create_linked_pairs(ctx);
83 
84  _thread = std::thread(
85  [fn = std::forward<Func>(fn)](socket_t asock, Args... args) {
86  fn(asock, std::forward<Args>(args)...);
87  // send notice of exit down link
88  asock.send(message_t{}, send_flags::none);
89  },
90  std::move(asock), std::forward<Args>(args)...);
91  startup();
92  }
93 
95  shutdown();
96  _thread.join();
97  }
98 
99  socket_ref link() { return _sock; }
100 
101  private:
102 
103  void startup() {
104  // The default contract with the actor function is that it
105  // shall notify us with a "signal" message that we may
106  // continue.
107  message_t rmsg;
108  auto res = link().recv(rmsg);
109  }
110 
111  void shutdown() {
112  // The default contract with the actor function is we will
113  // notify it to terminate. If sending that message is
114  // successful we wait for the built-in confirmation
115  // message.
116  auto sres = link().send(message_t("$TERM",5), send_flags::dontwait);
117  if (sres) {
118  message_t rmsg;
119  auto res = link().recv(rmsg, recv_flags::none);
120  }
121  }
122 
123  private:
124  socket_t _sock;
125  std::thread _thread;
126 
127  };
128 }
129 
130 #endif
131 
const char * addr
inproc hangs. no messages ever get received by server. tcp/ipc okay.
Definition: test_tcs.cpp:16
size_t send(const void *buf_, size_t len_, int flags_=0)
Definition: zmq.hpp:1345
std::pair< socket_t, socket_t > create_linked_pairs(context_t &ctx)
Definition: actor.hpp:22
socket_ref link()
Definition: actor.hpp:99
zactor_t(context_t &ctx, Func fn, Args... args)
Definition: actor.hpp:80
implementation of ZIO data flow protocol endpoints
Definition: actor.hpp:14