Commit 4ad2edff authored by Constantin Rack's avatar Constantin Rack

Merge pull request #1322 from hintjens/master

Problem: commit afb24b53 broke ZMQ_STREAM contract
parents 0d9852a2 6ced7027
...@@ -670,6 +670,19 @@ Default value:: -1 (infinite) ...@@ -670,6 +670,19 @@ Default value:: -1 (infinite)
Applicable socket types:: all Applicable socket types:: all
ZMQ_STREAM_NOTIFY: send connect notifications
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Enables connect notifications on a STREAM socket, when set to 1. By default a
STREAM socket does not notify new connections. When notifications are enabled,
it delivers a zero-length message to signal new client connections.
[horizontal]
Option value type:: int
Option value unit:: 0, 1
Default value:: 0
Applicable socket types:: ZMQ_STREAM
ZMQ_SUBSCRIBE: Establish message filter ZMQ_SUBSCRIBE: Establish message filter
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_SUBSCRIBE' option shall establish a new message filter on a 'ZMQ_SUB' The 'ZMQ_SUBSCRIBE' option shall establish a new message filter on a 'ZMQ_SUB'
......
...@@ -297,6 +297,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property); ...@@ -297,6 +297,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
#define ZMQ_BLOCKY 70 #define ZMQ_BLOCKY 70
#define ZMQ_XPUB_MANUAL 71 #define ZMQ_XPUB_MANUAL 71
#define ZMQ_XPUB_WELCOME_MSG 72 #define ZMQ_XPUB_WELCOME_MSG 72
#define ZMQ_STREAM_NOTIFY 73
/* Message options */ /* Message options */
#define ZMQ_MORE 1 #define ZMQ_MORE 1
......
...@@ -46,7 +46,8 @@ zmq::options_t::options_t () : ...@@ -46,7 +46,8 @@ zmq::options_t::options_t () :
immediate (0), immediate (0),
filter (false), filter (false),
recv_identity (false), recv_identity (false),
raw_sock (false), raw_socket (false),
raw_notify (false),
tcp_keepalive (-1), tcp_keepalive (-1),
tcp_keepalive_cnt (-1), tcp_keepalive_cnt (-1),
tcp_keepalive_idle (-1), tcp_keepalive_idle (-1),
......
...@@ -112,7 +112,8 @@ namespace zmq ...@@ -112,7 +112,8 @@ namespace zmq
bool recv_identity; bool recv_identity;
// if true, router socket accepts non-zmq tcp connections // if true, router socket accepts non-zmq tcp connections
bool raw_sock; bool raw_socket;
bool raw_notify; // Provide connect notifications
// Addres of SOCKS proxy // Addres of SOCKS proxy
std::string socks_proxy_address; std::string socks_proxy_address;
......
...@@ -33,14 +33,14 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -33,14 +33,14 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
more_out (false), more_out (false),
next_rid (generate_random ()), next_rid (generate_random ()),
mandatory (false), mandatory (false),
// raw_sock functionality in ROUTER is deprecated // raw_socket functionality in ROUTER is deprecated
raw_sock (false), raw_socket (false),
probe_router (false), probe_router (false),
handover (false) handover (false)
{ {
options.type = ZMQ_ROUTER; options.type = ZMQ_ROUTER;
options.recv_identity = true; options.recv_identity = true;
options.raw_sock = false; options.raw_socket = false;
prefetched_id.init (); prefetched_id.init ();
prefetched_msg.init (); prefetched_msg.init ();
...@@ -96,10 +96,10 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, ...@@ -96,10 +96,10 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
break; break;
case ZMQ_ROUTER_RAW: case ZMQ_ROUTER_RAW:
if (is_int && value >= 0) { if (is_int && value >= 0) {
raw_sock = (value != 0); raw_socket = (value != 0);
if (raw_sock) { if (raw_socket) {
options.recv_identity = false; options.recv_identity = false;
options.raw_sock = true; options.raw_socket = true;
} }
return 0; return 0;
} }
...@@ -223,7 +223,7 @@ int zmq::router_t::xsend (msg_t *msg_) ...@@ -223,7 +223,7 @@ int zmq::router_t::xsend (msg_t *msg_)
} }
// Ignore the MORE flag for raw-sock or assert? // Ignore the MORE flag for raw-sock or assert?
if (options.raw_sock) if (options.raw_socket)
msg_->reset_flags (msg_t::more); msg_->reset_flags (msg_t::more);
// Check whether this is the last part of the message. // Check whether this is the last part of the message.
...@@ -235,7 +235,7 @@ int zmq::router_t::xsend (msg_t *msg_) ...@@ -235,7 +235,7 @@ int zmq::router_t::xsend (msg_t *msg_)
// Close the remote connection if user has asked to do so // Close the remote connection if user has asked to do so
// by sending zero length message. // by sending zero length message.
// Pending messages in the pipe will be dropped (on receiving term- ack) // Pending messages in the pipe will be dropped (on receiving term- ack)
if (raw_sock && msg_->size() == 0) { if (raw_socket && msg_->size() == 0) {
current_out->terminate (false); current_out->terminate (false);
int rc = msg_->close (); int rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
...@@ -401,14 +401,14 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -401,14 +401,14 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
zmq_assert(false); // Not allowed to duplicate an existing rid zmq_assert(false); // Not allowed to duplicate an existing rid
} }
else else
if (options.raw_sock) { // Always assign identity for raw-socket if (options.raw_socket) { // Always assign identity for raw-socket
unsigned char buf [5]; unsigned char buf [5];
buf [0] = 0; buf [0] = 0;
put_uint32 (buf + 1, next_rid++); put_uint32 (buf + 1, next_rid++);
identity = blob_t (buf, sizeof buf); identity = blob_t (buf, sizeof buf);
} }
else else
if (!options.raw_sock) { if (!options.raw_socket) {
// Pick up handshake cases and also case where next identity is set // Pick up handshake cases and also case where next identity is set
msg.init (); msg.init ();
ok = pipe_->read (&msg); ok = pipe_->read (&msg);
......
...@@ -111,7 +111,7 @@ namespace zmq ...@@ -111,7 +111,7 @@ namespace zmq
// If true, report EAGAIN to the caller instead of silently dropping // If true, report EAGAIN to the caller instead of silently dropping
// the message targeting an unknown peer. // the message targeting an unknown peer.
bool mandatory; bool mandatory;
bool raw_sock; bool raw_socket;
// if true, send an empty message to every connected router peer // if true, send an empty message to every connected router peer
bool probe_router; bool probe_router;
......
...@@ -213,14 +213,15 @@ void zmq::session_base_t::pipe_terminated (pipe_t *pipe_) ...@@ -213,14 +213,15 @@ void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
cancel_timer (linger_timer_id); cancel_timer (linger_timer_id);
has_linger_timer = false; has_linger_timer = false;
} }
} else }
else
if (pipe_ == zap_pipe) if (pipe_ == zap_pipe)
zap_pipe = NULL; zap_pipe = NULL;
else else
// Remove the pipe from the detached pipes set // Remove the pipe from the detached pipes set
terminating_pipes.erase (pipe_); terminating_pipes.erase (pipe_);
if (!is_terminating () && options.raw_sock) { if (!is_terminating () && options.raw_socket) {
if (engine) { if (engine) {
engine->terminate (); engine->terminate ();
engine = NULL; engine = NULL;
......
...@@ -33,7 +33,7 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -33,7 +33,7 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
next_rid (generate_random ()) next_rid (generate_random ())
{ {
options.type = ZMQ_STREAM; options.type = ZMQ_STREAM;
options.raw_sock = true; options.raw_socket = true;
prefetched_id.init (); prefetched_id.init ();
prefetched_msg.init (); prefetched_msg.init ();
...@@ -167,6 +167,8 @@ int zmq::stream_t::xsend (msg_t *msg_) ...@@ -167,6 +167,8 @@ int zmq::stream_t::xsend (msg_t *msg_)
int zmq::stream_t::xsetsockopt (int option_, const void *optval_, int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
bool is_int = (optvallen_ == sizeof (int));
int value = is_int? *((int *) optval_): 0;
switch (option_) { switch (option_) {
case ZMQ_CONNECT_RID: case ZMQ_CONNECT_RID:
if (optval_ && optvallen_) { if (optval_ && optvallen_) {
...@@ -174,6 +176,14 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_, ...@@ -174,6 +176,14 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
return 0; return 0;
} }
break; break;
case ZMQ_STREAM_NOTIFY:
if (is_int && (value == 0 || value == 1)) {
options.raw_notify = value;
return 0;
}
break;
default: default:
break; break;
} }
......
...@@ -180,7 +180,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, ...@@ -180,7 +180,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
handle = add_fd (s); handle = add_fd (s);
io_error = false; io_error = false;
if (options.raw_sock) { if (options.raw_socket) {
// no handshaking for raw sock, instantiate raw encoder and decoders // no handshaking for raw sock, instantiate raw encoder and decoders
encoder = new (std::nothrow) raw_encoder_t (out_batch_size); encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
alloc_assert (encoder); alloc_assert (encoder);
...@@ -194,6 +194,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, ...@@ -194,6 +194,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
next_msg = &stream_engine_t::pull_msg_from_session; next_msg = &stream_engine_t::pull_msg_from_session;
process_msg = &stream_engine_t::push_msg_to_session; process_msg = &stream_engine_t::push_msg_to_session;
if (options.raw_notify) {
// For raw sockets, send an initial 0-length message to the // For raw sockets, send an initial 0-length message to the
// application so that it knows a peer has connected. // application so that it knows a peer has connected.
msg_t connector; msg_t connector;
...@@ -202,6 +203,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, ...@@ -202,6 +203,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
connector.close(); connector.close();
session->flush (); session->flush ();
} }
}
else { else {
// start optional timer, to prevent handshake hanging on no input // start optional timer, to prevent handshake hanging on no input
set_handshake_timer (); set_handshake_timer ();
...@@ -911,7 +913,7 @@ int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_) ...@@ -911,7 +913,7 @@ int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
void zmq::stream_engine_t::error (error_reason_t reason) void zmq::stream_engine_t::error (error_reason_t reason)
{ {
if (options.raw_sock) { if (options.raw_socket) {
// For raw sockets, send a final 0-length message to the application // For raw sockets, send a final 0-length message to the application
// so that it knows the peer has been disconnected. // so that it knows the peer has been disconnected.
msg_t terminator; msg_t terminator;
...@@ -931,7 +933,7 @@ void zmq::stream_engine_t::set_handshake_timer () ...@@ -931,7 +933,7 @@ void zmq::stream_engine_t::set_handshake_timer ()
{ {
zmq_assert (!has_handshake_timer); zmq_assert (!has_handshake_timer);
if (!options.raw_sock && options.handshake_ivl > 0) { if (!options.raw_socket && options.handshake_ivl > 0) {
add_timer (options.handshake_ivl, handshake_timer_id); add_timer (options.handshake_ivl, handshake_timer_id);
has_handshake_timer = true; has_handshake_timer = true;
} }
......
...@@ -63,19 +63,13 @@ void test_stream_2_stream(){ ...@@ -63,19 +63,13 @@ void test_stream_2_stream(){
// Accept data on the bound stream. // Accept data on the bound stream.
ret = zmq_recv (rbind, buff, 256, 0); ret = zmq_recv (rbind, buff, 256, 0);
assert (ret && 0 == buff[0]);
assert (0 == buff[0]);
ret = zmq_recv (rbind, buff, 256, 0);
assert (0 == ret);
// Handle close of the socket.
ret = zmq_recv (rbind, buff, 256, 0);
assert (ret); assert (ret);
assert (0 == buff[0]); assert (0 == buff[0]);
ret = zmq_recv (rbind, buff+128, 128, 0); ret = zmq_recv (rbind, buff+128, 128, 0);
assert (5 == ret); assert (5 == ret);
assert ('h' == buff[128]); assert ('h' == buff[128]);
// Handle close of the socket.
ret = zmq_unbind (rbind, bindip); ret = zmq_unbind (rbind, bindip);
assert(0 == ret); assert(0 == ret);
ret = zmq_close (rbind); ret = zmq_close (rbind);
......
...@@ -54,6 +54,9 @@ test_stream_to_dealer (void) ...@@ -54,6 +54,9 @@ test_stream_to_dealer (void)
int zero = 0; int zero = 0;
rc = zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero)); rc = zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0); assert (rc == 0);
int enabled = 1;
rc = zmq_setsockopt (stream, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
assert (rc == 0);
rc = zmq_bind (stream, "tcp://127.0.0.1:5556"); rc = zmq_bind (stream, "tcp://127.0.0.1:5556");
assert (rc == 0); assert (rc == 0);
...@@ -182,11 +185,16 @@ test_stream_to_stream (void) ...@@ -182,11 +185,16 @@ test_stream_to_stream (void)
void *server = zmq_socket (ctx, ZMQ_STREAM); void *server = zmq_socket (ctx, ZMQ_STREAM);
assert (server); assert (server);
int enabled = 1;
rc = zmq_setsockopt (server, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
assert (rc == 0);
rc = zmq_bind (server, "tcp://127.0.0.1:9070"); rc = zmq_bind (server, "tcp://127.0.0.1:9070");
assert (rc == 0); assert (rc == 0);
void *client = zmq_socket (ctx, ZMQ_STREAM); void *client = zmq_socket (ctx, ZMQ_STREAM);
assert (client); assert (client);
rc = zmq_setsockopt (client, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
assert (rc == 0);
rc = zmq_connect (client, "tcp://localhost:9070"); rc = zmq_connect (client, "tcp://localhost:9070");
assert (rc == 0); assert (rc == 0);
uint8_t id [256]; uint8_t id [256];
......
...@@ -55,15 +55,20 @@ int main(int, char**) ...@@ -55,15 +55,20 @@ int main(int, char**)
{ {
setup_test_environment(); setup_test_environment();
void* context = zmq_ctx_new (); void *context = zmq_ctx_new ();
void* sockets [2]; void *sockets [2];
int rc = 0; int rc = 0;
sockets [SERVER] = zmq_socket (context, ZMQ_STREAM); sockets [SERVER] = zmq_socket (context, ZMQ_STREAM);
int enabled = 1;
rc = zmq_setsockopt (sockets [SERVER], ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
assert (rc == 0);
rc = zmq_bind (sockets [SERVER], "tcp://0.0.0.0:6666"); rc = zmq_bind (sockets [SERVER], "tcp://0.0.0.0:6666");
assert (rc == 0); assert (rc == 0);
sockets [CLIENT] = zmq_socket (context, ZMQ_STREAM); sockets [CLIENT] = zmq_socket (context, ZMQ_STREAM);
rc = zmq_setsockopt (sockets [CLIENT], ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
assert (rc == 0);
rc = zmq_connect (sockets [CLIENT], "tcp://localhost:6666"); rc = zmq_connect (sockets [CLIENT], "tcp://localhost:6666");
assert (rc == 0); assert (rc == 0);
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include "../src/stdint.hpp" #include "../src/stdint.hpp"
#include "platform.hpp" #include "../src/platform.hpp"
// This defines the settle time used in tests; raise this if we // This defines the settle time used in tests; raise this if we
// get test failures on slower systems due to binds/connects not // get test failures on slower systems due to binds/connects not
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment