Commit 240eff38 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #563 from shripchenko/master

Refactoring of 'ZMQ_ROUTER_ANNOUNCE_SELF'
parents 9d63ebf6 f805e4dd
......@@ -421,6 +421,23 @@ Default value:: 0
Applicable socket types:: ZMQ_ROUTER
ZMQ_PROBE: automatically send empty packet to every established connection
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the compatible sockets behavior to automatically send an empty packet
to any new connection made (or accepted) by socket. It could help sockets to
auto discovery them-self. It especially important in 'ROUTER' <-> 'ROUTER' connections
where it solves 'who will write first' problems.
NOTE: Don't set this options for sockets working with ZMQ_REP, ZMQ_REQ sockets.
It will interfere with their strict synchronous logic and framing.
[horizontal]
Option value type:: int
Option value unit:: 0, 1
Default value:: 0
Applicable socket types:: ZMQ_ROUTER, ZMQ_DEALER, ZMQ_REP, ZMQ_REQ
ZMQ_XPUB_VERBOSE: provide all subscription messages on XPUB sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
......
......@@ -274,7 +274,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_CURVE_SERVER 47
#define ZMQ_CURVE_PUBLICKEY 48
#define ZMQ_CURVE_SERVERKEY 49
#define ZMQ_ROUTER_ANNOUNCE_SELF 50
#define ZMQ_PROBE 50
/* Message options */
#define ZMQ_MORE 1
......
......@@ -22,7 +22,8 @@
#include "msg.hpp"
zmq::dealer_t::dealer_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
socket_base_t (parent_, tid_, sid_),
probe_new_peers(false)
{
options.type = ZMQ_DEALER;
}
......@@ -37,10 +38,48 @@ void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
(void) icanhasall_;
zmq_assert (pipe_);
if (probe_new_peers) {
int rc;
msg_t probe_msg_;
rc = probe_msg_.init ();
errno_assert (rc == 0);
rc = pipe_->write (&probe_msg_);
zmq_assert (rc);
pipe_->flush ();
rc = probe_msg_.close ();
errno_assert (rc == 0);
}
fq.attach (pipe_);
lb.attach (pipe_);
}
int zmq::dealer_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
bool is_int = (optvallen_ == sizeof (int));
int value = is_int? *((int *) optval_): 0;
switch (option_) {
case ZMQ_PROBE:
if (is_int && value >= 0) {
probe_new_peers = value;
return 0;
}
break;
default:
break;
}
errno = EINVAL;
return -1;
}
int zmq::dealer_t::xsend (msg_t *msg_)
{
return lb.send (msg_);
......
......@@ -46,6 +46,7 @@ namespace zmq
// Overloads of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
......@@ -61,6 +62,9 @@ namespace zmq
fq_t fq;
lb_t lb;
// if true, send an empty message to every connected peer
bool probe_new_peers;
dealer_t (const dealer_t&);
const dealer_t &operator = (const dealer_t&);
};
......
......@@ -34,7 +34,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
next_peer_id (generate_random ()),
mandatory(false),
raw_sock(false),
announce_self(false)
probe_new_peers(false)
{
options.type = ZMQ_ROUTER;
options.recv_identity = true;
......@@ -83,21 +83,21 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
return 0;
}
break;
case ZMQ_ROUTER_MANDATORY:
if (is_int && value >= 0) {
mandatory = value;
return 0;
}
break;
case ZMQ_ROUTER_ANNOUNCE_SELF:
case ZMQ_PROBE:
if (is_int && value >= 0) {
announce_self = value;
probe_new_peers = value;
return 0;
}
break;
default:
break;
}
......@@ -391,14 +391,20 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second;
zmq_assert (ok);
if (announce_self) {
msg_t tmp_;
tmp_.init ();
ok = pipe_->write (&tmp_);
zmq_assert (ok);
if (probe_new_peers) {
int rc;
msg_t probe_msg_;
rc = probe_msg_.init ();
errno_assert (rc == 0);
rc = pipe_->write (&probe_msg_);
zmq_assert (rc);
pipe_->flush ();
tmp_.close ();
};
rc = probe_msg_.close ();
errno_assert (rc == 0);
}
return true;
}
......
......@@ -47,8 +47,8 @@ namespace zmq
// Overloads of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (msg_t *msg_);
int xrecv (msg_t *msg_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
bool xhas_out ();
void xread_activated (zmq::pipe_t *pipe_);
......@@ -112,8 +112,8 @@ namespace zmq
bool mandatory;
bool raw_sock;
// if true, send an empty message to every connected peer to solve 'who will write first' race condition
bool announce_self;
// if true, send an empty message to every connected peer to solve 'who will write first?' auto discovery problem
bool probe_new_peers;
router_t (const router_t&);
const router_t &operator = (const router_t&);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment