Commit a57f7e38 authored by Pontus Sköldström's avatar Pontus Sköldström

Add support for ZMQ_XPUB_NODROP on ZMQ_RADIO sockets

Solves issue #2927
parent 9544dade
......@@ -37,7 +37,8 @@
#include "msg.hpp"
zmq::radio_t::radio_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_, true)
socket_base_t (parent_, tid_, sid_, true),
lossy (true)
{
options.type = ZMQ_RADIO;
}
......@@ -99,6 +100,22 @@ void zmq::radio_t::xwrite_activated (pipe_t *pipe_)
{
dist.activated (pipe_);
}
int zmq::radio_t::xsetsockopt (int option_,
const void *optval_,
size_t optvallen_)
{
if (optvallen_ != sizeof (int) || *static_cast<const int *> (optval_) < 0) {
errno = EINVAL;
return -1;
}
if (option_ == ZMQ_XPUB_NODROP)
lossy = (*static_cast<const int *> (optval_) == 0);
else {
errno = EINVAL;
return -1;
}
return 0;
}
void zmq::radio_t::xpipe_terminated (pipe_t *pipe_)
{
......@@ -141,7 +158,13 @@ int zmq::radio_t::xsend (msg_t *msg_)
++it)
dist.match (*it);
int rc = dist.send_to_matching (msg_);
int rc = -1;
if (lossy || dist.check_hwm ()) {
if (dist.send_to_matching (msg_) == 0) {
rc = 0; // Yay, sent successfully
}
} else
errno = EAGAIN;
return rc;
}
......
......@@ -61,6 +61,7 @@ class radio_t : public socket_base_t
bool xhas_in ();
void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xpipe_terminated (zmq::pipe_t *pipe_);
private:
......@@ -75,6 +76,9 @@ class radio_t : public socket_base_t
// Distributor of messages holding the list of outbound pipes.
dist_t dist;
// Drop messages if HWM reached, otherwise return with EAGAIN
bool lossy;
radio_t (const radio_t &);
const radio_t &operator= (const radio_t &);
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment