Commit 51750a7d authored by shripchenko's avatar shripchenko

refactored ZMQ_ROUTER_ANNOUNCE_SELF code. renamed it to ZMQ_PROBE_NEW_PEERS.

implement it for DEALER tocket.
+documentation
parent 910b4692
......@@ -421,6 +421,21 @@ Default value:: 0
Applicable socket types:: ZMQ_ROUTER
ZMQ_PROBE_NEW_PEERS: automatically send empty packet to every established connection
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the 'ROUTER' & 'DEALER' sockets behavior to automatically send an empty packet
to any new connection made (or accepted) by socket. It could help sockets to
auto discovery themself. It especially important in 'ROUTER' <-> 'ROUTER' connections
where it solves 'who will write first' problems.
[horizontal]
Option value type:: int
Option value unit:: 0, 1
Default value:: 0
Applicable socket types:: ZMQ_ROUTER, ZMQ_DEALER
ZMQ_XPUB_VERBOSE: provide all subscription messages on XPUB sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
......
......@@ -274,7 +274,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_CURVE_SERVER 47
#define ZMQ_CURVE_PUBLICKEY 48
#define ZMQ_CURVE_SERVERKEY 49
#define ZMQ_ROUTER_ANNOUNCE_SELF 50
#define ZMQ_PROBE_NEW_PEERS 50
/* Message options */
#define ZMQ_MORE 1
......
......@@ -22,7 +22,8 @@
#include "msg.hpp"
zmq::dealer_t::dealer_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
socket_base_t (parent_, tid_, sid_),
probe_new_peers(false)
{
options.type = ZMQ_DEALER;
}
......@@ -37,10 +38,48 @@ void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
(void) icanhasall_;
zmq_assert (pipe_);
if (probe_new_peers) {
int rc, ok;
msg_t probe_msg_;
rc = probe_msg_.init ();
errno_assert (rc == 0);
ok = pipe_->write (&probe_msg_);
zmq_assert (ok);
pipe_->flush ();
rc = probe_msg_.close ();
errno_assert (rc == 0);
}
fq.attach (pipe_);
lb.attach (pipe_);
}
int zmq::dealer_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
bool is_int = (optvallen_ == sizeof (int));
int value = is_int? *((int *) optval_): 0;
switch (option_) {
case ZMQ_PROBE_NEW_PEERS:
if (is_int && value >= 0) {
probe_new_peers = value;
return 0;
}
break;
default:
break;
}
errno = EINVAL;
return -1;
}
int zmq::dealer_t::xsend (msg_t *msg_)
{
return lb.send (msg_);
......
......@@ -46,6 +46,7 @@ namespace zmq
// Overloads of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
......@@ -61,6 +62,9 @@ namespace zmq
fq_t fq;
lb_t lb;
// if true, send an empty message to every connected peer
bool probe_new_peers;
dealer_t (const dealer_t&);
const dealer_t &operator = (const dealer_t&);
};
......
......@@ -34,7 +34,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
next_peer_id (generate_random ()),
mandatory(false),
raw_sock(false),
announce_self(false)
probe_new_peers(false)
{
options.type = ZMQ_ROUTER;
options.recv_identity = true;
......@@ -85,7 +85,7 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
// DEBUGGING PROBLEM WITH TRAVIS CI
printf ("E: invalid option value (int=%d value=%d)\n", is_int, value);
break;
case ZMQ_ROUTER_MANDATORY:
if (is_int && value >= 0) {
mandatory = value;
......@@ -94,14 +94,14 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
// DEBUGGING PROBLEM WITH TRAVIS CI
printf ("E: invalid option value (int=%d value=%d)\n", is_int, value);
break;
case ZMQ_ROUTER_ANNOUNCE_SELF:
case ZMQ_PROBE_NEW_PEERS:
if (is_int && value >= 0) {
announce_self = value;
probe_new_peers = value;
return 0;
}
break;
default:
break;
}
......@@ -397,14 +397,23 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second;
zmq_assert (ok);
if (announce_self) {
msg_t tmp_;
tmp_.init ();
ok = pipe_->write (&tmp_);
zmq_assert (ok);
if (probe_new_peers) {
int rc;
msg_t probe_msg_;
rc = probe_msg_.init ();
errno_assert (rc == 0);
ok = pipe_->write (&probe_msg_);
pipe_->flush ();
tmp_.close ();
};
rc = probe_msg_.close ();
errno_assert (rc == 0);
// Ignore not probed peers
if (!ok)
return false;
}
return true;
}
......
......@@ -47,8 +47,8 @@ namespace zmq
// Overloads of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (msg_t *msg_);
int xrecv (msg_t *msg_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
bool xhas_out ();
void xread_activated (zmq::pipe_t *pipe_);
......@@ -112,8 +112,8 @@ namespace zmq
bool mandatory;
bool raw_sock;
// if true, send an empty message to every connected peer to solve 'who will write first' race condition
bool announce_self;
// if true, send an empty message to every connected peer to solve 'who will write first?' auto discovery problem
bool probe_new_peers;
router_t (const router_t&);
const router_t &operator = (const router_t&);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment