24 #ifndef __ZMQ_ADDON_HPP_INCLUDED__ 25 #define __ZMQ_ADDON_HPP_INCLUDED__ 36 #include <unordered_map> 45 template<
bool CheckN,
class OutputIt>
47 recv_multipart_n(socket_ref s, OutputIt out,
size_t n, recv_flags flags)
54 throw std::runtime_error(
55 "Too many message parts in recv_multipart_n");
57 if (!s.recv(msg, flags)) {
59 assert(msg_count == 0);
63 const bool more = msg.more();
64 *out++ = std::move(msg);
84 template<
class OutputIt>
87 recv_flags flags = recv_flags::none)
89 return detail::recv_multipart_n<false>(s, std::move(out), 0, flags);
106 template<
class OutputIt>
110 recv_flags flags = recv_flags::none)
112 return detail::recv_multipart_n<true>(s, std::move(out), n, flags);
128 #ifndef ZMQ_CPP11_PARTIAL 130 typename =
typename std::enable_if<
131 detail::is_range<Range>::value
132 && (std::is_same<detail::range_value_t<Range>, message_t>::value
133 || detail::is_buffer<detail::range_value_t<Range>>::value)>::
type 137 send_multipart(socket_ref s, Range &&msgs, send_flags flags = send_flags::none)
141 auto it = begin(msgs);
142 const auto end_it = end(msgs);
143 size_t msg_count = 0;
144 while (it != end_it) {
145 const auto next = std::next(it);
146 const auto msg_flags =
147 flags | (next == end_it ? send_flags::none : send_flags::sndmore);
148 if (!s.send(*it, msg_flags)) {
150 assert(it == begin(msgs));
178 #ifndef ZMQ_CPP11_PARTIAL 180 typename =
typename std::enable_if<
181 detail::is_range<Range>::value
182 && (std::is_same<detail::range_value_t<Range>, message_t>::value
183 || detail::is_buffer<detail::range_value_t<Range>>::value)>::
type 186 message_t encode(
const Range &parts)
188 size_t mmsg_size = 0;
191 for (
const auto &part : parts) {
192 size_t part_size = part.size();
193 if (part_size > std::numeric_limits<std::uint32_t>::max()) {
195 throw std::range_error(
"Invalid size, message part too large");
197 size_t count_size = 5;
198 if (part_size < std::numeric_limits<std::uint8_t>::max()) {
201 mmsg_size += part_size + count_size;
204 message_t encoded(mmsg_size);
205 unsigned char *buf = encoded.data<
unsigned char>();
206 for (
const auto &part : parts) {
207 uint32_t part_size = part.size();
208 const unsigned char *part_data =
209 static_cast<const unsigned char *
>(part.data());
212 if (part_size < std::numeric_limits<std::uint8_t>::max()) {
213 *buf++ = (
unsigned char) part_size;
214 memcpy(buf, part_data, part_size);
220 *buf++ = std::numeric_limits<uint8_t>::max();
221 *buf++ = (part_size >> 24) & std::numeric_limits<std::uint8_t>::max();
222 *buf++ = (part_size >> 16) & std::numeric_limits<std::uint8_t>::max();
223 *buf++ = (part_size >> 8) & std::numeric_limits<std::uint8_t>::max();
224 *buf++ = part_size & std::numeric_limits<std::uint8_t>::max();
225 memcpy(buf, part_data, part_size);
246 template<
class OutputIt> OutputIt decode(
const message_t &encoded, OutputIt out)
248 const unsigned char *source = encoded.data<
unsigned char>();
249 const unsigned char *
const limit = source + encoded.size();
251 while (source < limit) {
252 size_t part_size = *source++;
253 if (part_size == std::numeric_limits<std::uint8_t>::max()) {
254 if (source > limit - 4) {
255 throw std::out_of_range(
256 "Malformed encoding, overflow in reading size");
258 part_size = ((uint32_t) source[0] << 24) + ((uint32_t) source[1] << 16)
259 + ((uint32_t) source[2] << 8) + (uint32_t) source[3];
263 if (source > limit - part_size) {
264 throw std::out_of_range(
"Malformed encoding, overflow in reading part");
266 *out = message_t(source, part_size);
276 #ifdef ZMQ_HAS_RVALUE_REFS 288 std::deque<message_t> m_parts;
291 typedef std::deque<message_t>::value_type value_type;
293 typedef std::deque<message_t>::iterator iterator;
294 typedef std::deque<message_t>::const_iterator const_iterator;
296 typedef std::deque<message_t>::reverse_iterator reverse_iterator;
297 typedef std::deque<message_t>::const_reverse_iterator const_reverse_iterator;
303 multipart_t(socket_t &socket) { recv(socket); }
306 multipart_t(
const void *src,
size_t size) { addmem(src, size); }
309 multipart_t(
const std::string &
string) { addstr(
string); }
312 multipart_t(message_t &&message) { add(std::move(message)); }
315 multipart_t(multipart_t &&other) { m_parts = std::move(other.m_parts); }
318 multipart_t &operator=(multipart_t &&other)
320 m_parts = std::move(other.m_parts);
325 virtual ~multipart_t() { clear(); }
327 message_t &operator[](
size_t n) {
return m_parts[n]; }
329 const message_t &operator[](
size_t n)
const {
return m_parts[n]; }
331 message_t &
at(
size_t n) {
return m_parts.at(n); }
333 const message_t &
at(
size_t n)
const {
return m_parts.at(n); }
335 iterator begin() {
return m_parts.begin(); }
337 const_iterator begin()
const {
return m_parts.begin(); }
339 const_iterator cbegin()
const {
return m_parts.cbegin(); }
341 reverse_iterator rbegin() {
return m_parts.rbegin(); }
343 const_reverse_iterator rbegin()
const {
return m_parts.rbegin(); }
345 iterator end() {
return m_parts.end(); }
347 const_iterator end()
const {
return m_parts.end(); }
349 const_iterator cend()
const {
return m_parts.cend(); }
351 reverse_iterator rend() {
return m_parts.rend(); }
353 const_reverse_iterator rend()
const {
return m_parts.rend(); }
356 void clear() { m_parts.clear(); }
359 size_t size()
const {
return m_parts.
size(); }
362 bool empty()
const {
return m_parts.empty(); }
365 bool recv(socket_t &socket,
int flags = 0)
372 if (!socket.recv(message, static_cast<recv_flags>(flags)))
375 if (!socket.recv(&message, flags))
378 more = message.more();
379 add(std::move(message));
385 bool send(socket_t &socket,
int flags = 0)
387 flags &= ~(ZMQ_SNDMORE);
388 bool more = size() > 0;
390 message_t message = pop();
393 if (!socket.send(message, static_cast<send_flags>(
394 (more ? ZMQ_SNDMORE : 0) | flags)))
397 if (!socket.send(message, (more ? ZMQ_SNDMORE : 0) | flags))
406 void prepend(multipart_t &&other)
408 while (!other.empty())
409 push(other.remove());
413 void append(multipart_t &&other)
415 while (!other.empty())
420 void pushmem(
const void *src,
size_t size)
422 m_parts.push_front(message_t(src, size));
426 void addmem(
const void *src,
size_t size)
428 m_parts.push_back(message_t(src, size));
432 void pushstr(
const std::string &
string)
434 m_parts.push_front(message_t(
string.data(),
string.size()));
438 void addstr(
const std::string &
string)
440 m_parts.push_back(message_t(
string.data(),
string.size()));
444 template<
typename T>
void pushtyp(
const T &
type)
446 static_assert(!std::is_same<T, std::string>::value,
447 "Use pushstr() instead of pushtyp<std::string>()");
448 m_parts.push_front(message_t(&type,
sizeof(type)));
452 template<
typename T>
void addtyp(
const T &type)
454 static_assert(!std::is_same<T, std::string>::value,
455 "Use addstr() instead of addtyp<std::string>()");
456 m_parts.push_back(message_t(&type,
sizeof(type)));
460 void push(message_t &&message) { m_parts.push_front(std::move(message)); }
463 void add(message_t &&message) { m_parts.push_back(std::move(message)); }
466 void push_back(message_t &&message) { m_parts.push_back(std::move(message)); }
471 std::string string(m_parts.front().data<
char>(), m_parts.front().size());
477 template<
typename T> T poptyp()
479 static_assert(!std::is_same<T, std::string>::value,
480 "Use popstr() instead of poptyp<std::string>()");
481 if (
sizeof(T) != m_parts.front().size())
482 throw std::runtime_error(
483 "Invalid type, size does not match the message size");
484 T type = *m_parts.front().data<T>();
492 message_t message = std::move(m_parts.front());
500 message_t message = std::move(m_parts.back());
506 const message_t &front() {
return m_parts.front(); }
509 const message_t &back() {
return m_parts.back(); }
512 const message_t *peek(
size_t index)
const {
return &m_parts[index]; }
515 std::string peekstr(
size_t index)
const 517 std::string string(m_parts[index].data<char>(), m_parts[index].size());
522 template<
typename T> T peektyp(
size_t index)
const 524 static_assert(!std::is_same<T, std::string>::value,
525 "Use peekstr() instead of peektyp<std::string>()");
526 if (
sizeof(T) != m_parts[index].size())
527 throw std::runtime_error(
528 "Invalid type, size does not match the message size");
529 T type = *m_parts[index].data<T>();
534 template<
typename T>
static multipart_t create(
const T &type)
536 multipart_t multipart;
537 multipart.addtyp(type);
542 multipart_t clone()
const 544 multipart_t multipart;
545 for (
size_t i = 0; i < size(); i++)
546 multipart.addmem(m_parts[i].data(), m_parts[i].size());
551 std::string str()
const 553 std::stringstream
ss;
554 for (
size_t i = 0; i < m_parts.size(); i++) {
555 const unsigned char *data = m_parts[i].data<
unsigned char>();
556 size_t size = m_parts[i].size();
560 for (
size_t j = 0; j < size; j++) {
561 if (data[j] < 32 || data[j] > 127) {
566 ss <<
"\n[" << std::dec << std::setw(3) << std::setfill(
'0') << size
569 ss <<
"... (to big to print)";
572 for (
size_t j = 0; j < size; j++) {
574 ss << static_cast<char>(data[j]);
576 ss << std::hex << std::setw(2) << std::setfill(
'0')
577 <<
static_cast<short>(data[j]);
584 bool equal(
const multipart_t *other)
const 586 if (size() != other->size())
588 for (
size_t i = 0; i < size(); i++)
589 if (*peek(i) != *other->peek(i))
597 message_t encode()
const {
return zmq::encode(*
this); }
600 void decode_append(
const message_t &encoded)
602 zmq::decode(encoded, std::back_inserter(*
this));
606 static multipart_t decode(
const message_t &encoded)
609 zmq::decode(encoded, std::back_inserter(tmp));
621 inline std::ostream &
operator<<(std::ostream &os,
const multipart_t &msg)
623 return os << msg.str();
626 #endif // ZMQ_HAS_RVALUE_REFS 628 #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER) 629 class active_poller_t
632 active_poller_t() =
default;
633 ~active_poller_t() =
default;
635 active_poller_t(
const active_poller_t &) =
delete;
636 active_poller_t &operator=(
const active_poller_t &) =
delete;
638 active_poller_t(active_poller_t &&src) =
default;
639 active_poller_t &operator=(active_poller_t &&src) =
default;
641 using handler_type = std::function<void(event_flags)>;
645 auto it = decltype(handlers)::iterator{};
646 auto inserted =
bool{};
647 std::tie(it, inserted) = handlers.emplace(
648 socket, std::make_shared<handler_type>(std::move(handler)));
650 base_poller.add(socket, events,
651 inserted && *(it->second) ? it->second.get() :
nullptr);
652 need_rebuild |= inserted;
657 handlers.erase(socket);
665 base_poller.remove(socket);
666 handlers.erase(socket);
672 base_poller.modify(socket, events);
675 size_t wait(std::chrono::milliseconds
timeout)
678 poller_events.resize(handlers.size());
679 poller_handlers.clear();
680 poller_handlers.reserve(handlers.size());
681 for (
const auto &handler : handlers) {
682 poller_handlers.push_back(handler.second);
684 need_rebuild =
false;
686 const auto count = base_poller.wait_all(poller_events, timeout);
687 std::for_each(poller_events.begin(),
688 poller_events.begin() +
static_cast<ptrdiff_t
>(count),
689 [](decltype(base_poller)::event_type &event) {
690 if (event.user_data !=
nullptr)
691 (*
event.user_data)(event.events);
696 ZMQ_NODISCARD bool empty() const noexcept {
return handlers.empty(); }
698 size_t size() const noexcept {
return handlers.size(); }
701 bool need_rebuild{
false};
703 poller_t<handler_type> base_poller{};
704 std::unordered_map<socket_ref, std::shared_ptr<handler_type>> handlers{};
705 std::vector<decltype(base_poller)::event_type> poller_events{};
706 std::vector<std::shared_ptr<handler_type>> poller_handlers{};
708 #endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER) 713 #endif // __ZMQ_ADDON_HPP_INCLUDED__ #define ZMQ_DELETED_FUNCTION
const zio::message_t & at(const Message &msg, size_t index)
def handler(ctx, pipe, bot, rule_object, filename, broker_addr, rargs)
std::ostream & operator<<(std::ostream &os, const message_t &msg)
void append(Message &msg, message_t &&data, const std::vector< size_t > &shape, size_t word_size, const char *tn)
size_t size() const ZMQ_NOTHROW