Commit 6ced7027 authored by Pieter Hintjens's avatar Pieter Hintjens

Problem: commit afb24b53 broke ZMQ_STREAM contract

Symptom is that ZMQ_STREAM sockets in 4.1.0 and 4.1.1 generate zero
sized messages on each new connection, unlike 4.0.x which did not do
this.

Person who made this commit also changed test cases so that contract
breakage did not show. Same person was later banned for persistently
poor form in CZMQ contributions.

Solution: enable connect notifications on ZMQ_STREAM sockets using a
new ZMQ_STREAM_NOTIFY setting. By default, socket does not deliver
notifications, and behaves as in 4.0.x.

Fixes #1316
parent 94d9a4ff
...@@ -670,6 +670,19 @@ Default value:: -1 (infinite) ...@@ -670,6 +670,19 @@ Default value:: -1 (infinite)
Applicable socket types:: all Applicable socket types:: all
ZMQ_STREAM_NOTIFY: send connect notifications
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Enables connect notifications on a STREAM socket, when set to 1. By default a
STREAM socket does not notify new connections. When notifications are enabled,
it delivers a zero-length message to signal new client connections.
[horizontal]
Option value type:: int
Option value unit:: 0, 1
Default value:: 0
Applicable socket types:: ZMQ_STREAM
ZMQ_SUBSCRIBE: Establish message filter ZMQ_SUBSCRIBE: Establish message filter
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_SUBSCRIBE' option shall establish a new message filter on a 'ZMQ_SUB' The 'ZMQ_SUBSCRIBE' option shall establish a new message filter on a 'ZMQ_SUB'
......
...@@ -297,6 +297,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property); ...@@ -297,6 +297,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
#define ZMQ_BLOCKY 70 #define ZMQ_BLOCKY 70
#define ZMQ_XPUB_MANUAL 71 #define ZMQ_XPUB_MANUAL 71
#define ZMQ_XPUB_WELCOME_MSG 72 #define ZMQ_XPUB_WELCOME_MSG 72
#define ZMQ_STREAM_NOTIFY 73
/* Message options */ /* Message options */
#define ZMQ_MORE 1 #define ZMQ_MORE 1
......
...@@ -46,7 +46,8 @@ zmq::options_t::options_t () : ...@@ -46,7 +46,8 @@ zmq::options_t::options_t () :
immediate (0), immediate (0),
filter (false), filter (false),
recv_identity (false), recv_identity (false),
raw_sock (false), raw_socket (false),
raw_notify (false),
tcp_keepalive (-1), tcp_keepalive (-1),
tcp_keepalive_cnt (-1), tcp_keepalive_cnt (-1),
tcp_keepalive_idle (-1), tcp_keepalive_idle (-1),
......
...@@ -112,7 +112,8 @@ namespace zmq ...@@ -112,7 +112,8 @@ namespace zmq
bool recv_identity; bool recv_identity;
// if true, router socket accepts non-zmq tcp connections // if true, router socket accepts non-zmq tcp connections
bool raw_sock; bool raw_socket;
bool raw_notify; // Provide connect notifications
// Addres of SOCKS proxy // Addres of SOCKS proxy
std::string socks_proxy_address; std::string socks_proxy_address;
......
...@@ -33,14 +33,14 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -33,14 +33,14 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
more_out (false), more_out (false),
next_rid (generate_random ()), next_rid (generate_random ()),
mandatory (false), mandatory (false),
// raw_sock functionality in ROUTER is deprecated // raw_socket functionality in ROUTER is deprecated
raw_sock (false), raw_socket (false),
probe_router (false), probe_router (false),
handover (false) handover (false)
{ {
options.type = ZMQ_ROUTER; options.type = ZMQ_ROUTER;
options.recv_identity = true; options.recv_identity = true;
options.raw_sock = false; options.raw_socket = false;
prefetched_id.init (); prefetched_id.init ();
prefetched_msg.init (); prefetched_msg.init ();
...@@ -96,10 +96,10 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, ...@@ -96,10 +96,10 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
break; break;
case ZMQ_ROUTER_RAW: case ZMQ_ROUTER_RAW:
if (is_int && value >= 0) { if (is_int && value >= 0) {
raw_sock = (value != 0); raw_socket = (value != 0);
if (raw_sock) { if (raw_socket) {
options.recv_identity = false; options.recv_identity = false;
options.raw_sock = true; options.raw_socket = true;
} }
return 0; return 0;
} }
...@@ -223,7 +223,7 @@ int zmq::router_t::xsend (msg_t *msg_) ...@@ -223,7 +223,7 @@ int zmq::router_t::xsend (msg_t *msg_)
} }
// Ignore the MORE flag for raw-sock or assert? // Ignore the MORE flag for raw-sock or assert?
if (options.raw_sock) if (options.raw_socket)
msg_->reset_flags (msg_t::more); msg_->reset_flags (msg_t::more);
// Check whether this is the last part of the message. // Check whether this is the last part of the message.
...@@ -235,7 +235,7 @@ int zmq::router_t::xsend (msg_t *msg_) ...@@ -235,7 +235,7 @@ int zmq::router_t::xsend (msg_t *msg_)
// Close the remote connection if user has asked to do so // Close the remote connection if user has asked to do so
// by sending zero length message. // by sending zero length message.
// Pending messages in the pipe will be dropped (on receiving term- ack) // Pending messages in the pipe will be dropped (on receiving term- ack)
if (raw_sock && msg_->size() == 0) { if (raw_socket && msg_->size() == 0) {
current_out->terminate (false); current_out->terminate (false);
int rc = msg_->close (); int rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
...@@ -397,14 +397,14 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -397,14 +397,14 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
zmq_assert(false); // Not allowed to duplicate an existing rid zmq_assert(false); // Not allowed to duplicate an existing rid
} }
else else
if (options.raw_sock) { // Always assign identity for raw-socket if (options.raw_socket) { // Always assign identity for raw-socket
unsigned char buf [5]; unsigned char buf [5];
buf [0] = 0; buf [0] = 0;
put_uint32 (buf + 1, next_rid++); put_uint32 (buf + 1, next_rid++);
identity = blob_t (buf, sizeof buf); identity = blob_t (buf, sizeof buf);
} }
else else
if (!options.raw_sock) { if (!options.raw_socket) {
// Pick up handshake cases and also case where next identity is set // Pick up handshake cases and also case where next identity is set
msg.init (); msg.init ();
ok = pipe_->read (&msg); ok = pipe_->read (&msg);
......
...@@ -111,7 +111,7 @@ namespace zmq ...@@ -111,7 +111,7 @@ namespace zmq
// If true, report EAGAIN to the caller instead of silently dropping // If true, report EAGAIN to the caller instead of silently dropping
// the message targeting an unknown peer. // the message targeting an unknown peer.
bool mandatory; bool mandatory;
bool raw_sock; bool raw_socket;
// if true, send an empty message to every connected router peer // if true, send an empty message to every connected router peer
bool probe_router; bool probe_router;
......
...@@ -213,14 +213,15 @@ void zmq::session_base_t::pipe_terminated (pipe_t *pipe_) ...@@ -213,14 +213,15 @@ void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
cancel_timer (linger_timer_id); cancel_timer (linger_timer_id);
has_linger_timer = false; has_linger_timer = false;
} }
} else }
else
if (pipe_ == zap_pipe) if (pipe_ == zap_pipe)
zap_pipe = NULL; zap_pipe = NULL;
else else
// Remove the pipe from the detached pipes set // Remove the pipe from the detached pipes set
terminating_pipes.erase (pipe_); terminating_pipes.erase (pipe_);
if (!is_terminating () && options.raw_sock) { if (!is_terminating () && options.raw_socket) {
if (engine) { if (engine) {
engine->terminate (); engine->terminate ();
engine = NULL; engine = NULL;
......
...@@ -33,7 +33,7 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -33,7 +33,7 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
next_rid (generate_random ()) next_rid (generate_random ())
{ {
options.type = ZMQ_STREAM; options.type = ZMQ_STREAM;
options.raw_sock = true; options.raw_socket = true;
prefetched_id.init (); prefetched_id.init ();
prefetched_msg.init (); prefetched_msg.init ();
...@@ -167,6 +167,8 @@ int zmq::stream_t::xsend (msg_t *msg_) ...@@ -167,6 +167,8 @@ int zmq::stream_t::xsend (msg_t *msg_)
int zmq::stream_t::xsetsockopt (int option_, const void *optval_, int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
bool is_int = (optvallen_ == sizeof (int));
int value = is_int? *((int *) optval_): 0;
switch (option_) { switch (option_) {
case ZMQ_CONNECT_RID: case ZMQ_CONNECT_RID:
if (optval_ && optvallen_) { if (optval_ && optvallen_) {
...@@ -174,6 +176,14 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_, ...@@ -174,6 +176,14 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
return 0; return 0;
} }
break; break;
case ZMQ_STREAM_NOTIFY:
if (is_int && (value == 0 || value == 1)) {
options.raw_notify = value;
return 0;
}
break;
default: default:
break; break;
} }
......
...@@ -180,7 +180,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, ...@@ -180,7 +180,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
handle = add_fd (s); handle = add_fd (s);
io_error = false; io_error = false;
if (options.raw_sock) { if (options.raw_socket) {
// no handshaking for raw sock, instantiate raw encoder and decoders // no handshaking for raw sock, instantiate raw encoder and decoders
encoder = new (std::nothrow) raw_encoder_t (out_batch_size); encoder = new (std::nothrow) raw_encoder_t (out_batch_size);
alloc_assert (encoder); alloc_assert (encoder);
...@@ -194,13 +194,15 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, ...@@ -194,13 +194,15 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
next_msg = &stream_engine_t::pull_msg_from_session; next_msg = &stream_engine_t::pull_msg_from_session;
process_msg = &stream_engine_t::push_msg_to_session; process_msg = &stream_engine_t::push_msg_to_session;
// For raw sockets, send an initial 0-length message to the if (options.raw_notify) {
// application so that it knows a peer has connected. // For raw sockets, send an initial 0-length message to the
msg_t connector; // application so that it knows a peer has connected.
connector.init(); msg_t connector;
push_msg_to_session (&connector); connector.init();
connector.close(); push_msg_to_session (&connector);
session->flush (); connector.close();
session->flush ();
}
} }
else { else {
// start optional timer, to prevent handshake hanging on no input // start optional timer, to prevent handshake hanging on no input
...@@ -914,7 +916,7 @@ int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_) ...@@ -914,7 +916,7 @@ int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
void zmq::stream_engine_t::error (error_reason_t reason) void zmq::stream_engine_t::error (error_reason_t reason)
{ {
if (options.raw_sock) { if (options.raw_socket) {
// For raw sockets, send a final 0-length message to the application // For raw sockets, send a final 0-length message to the application
// so that it knows the peer has been disconnected. // so that it knows the peer has been disconnected.
msg_t terminator; msg_t terminator;
...@@ -934,7 +936,7 @@ void zmq::stream_engine_t::set_handshake_timer () ...@@ -934,7 +936,7 @@ void zmq::stream_engine_t::set_handshake_timer ()
{ {
zmq_assert (!has_handshake_timer); zmq_assert (!has_handshake_timer);
if (!options.raw_sock && options.handshake_ivl > 0) { if (!options.raw_socket && options.handshake_ivl > 0) {
add_timer (options.handshake_ivl, handshake_timer_id); add_timer (options.handshake_ivl, handshake_timer_id);
has_handshake_timer = true; has_handshake_timer = true;
} }
......
...@@ -63,19 +63,13 @@ void test_stream_2_stream(){ ...@@ -63,19 +63,13 @@ void test_stream_2_stream(){
// Accept data on the bound stream. // Accept data on the bound stream.
ret = zmq_recv (rbind, buff, 256, 0); ret = zmq_recv (rbind, buff, 256, 0);
assert (ret && 0 == buff[0]);
assert (0 == buff[0]);
ret = zmq_recv (rbind, buff, 256, 0);
assert (0 == ret);
// Handle close of the socket.
ret = zmq_recv (rbind, buff, 256, 0);
assert (ret); assert (ret);
assert (0 == buff[0]); assert (0 == buff[0]);
ret = zmq_recv (rbind, buff+128, 128, 0); ret = zmq_recv (rbind, buff+128, 128, 0);
assert (5 == ret); assert (5 == ret);
assert ('h' == buff[128]); assert ('h' == buff[128]);
// Handle close of the socket.
ret = zmq_unbind (rbind, bindip); ret = zmq_unbind (rbind, bindip);
assert(0 == ret); assert(0 == ret);
ret = zmq_close (rbind); ret = zmq_close (rbind);
......
...@@ -54,6 +54,9 @@ test_stream_to_dealer (void) ...@@ -54,6 +54,9 @@ test_stream_to_dealer (void)
int zero = 0; int zero = 0;
rc = zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero)); rc = zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0); assert (rc == 0);
int enabled = 1;
rc = zmq_setsockopt (stream, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
assert (rc == 0);
rc = zmq_bind (stream, "tcp://127.0.0.1:5556"); rc = zmq_bind (stream, "tcp://127.0.0.1:5556");
assert (rc == 0); assert (rc == 0);
...@@ -182,11 +185,16 @@ test_stream_to_stream (void) ...@@ -182,11 +185,16 @@ test_stream_to_stream (void)
void *server = zmq_socket (ctx, ZMQ_STREAM); void *server = zmq_socket (ctx, ZMQ_STREAM);
assert (server); assert (server);
int enabled = 1;
rc = zmq_setsockopt (server, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
assert (rc == 0);
rc = zmq_bind (server, "tcp://127.0.0.1:9070"); rc = zmq_bind (server, "tcp://127.0.0.1:9070");
assert (rc == 0); assert (rc == 0);
void *client = zmq_socket (ctx, ZMQ_STREAM); void *client = zmq_socket (ctx, ZMQ_STREAM);
assert (client); assert (client);
rc = zmq_setsockopt (client, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
assert (rc == 0);
rc = zmq_connect (client, "tcp://localhost:9070"); rc = zmq_connect (client, "tcp://localhost:9070");
assert (rc == 0); assert (rc == 0);
uint8_t id [256]; uint8_t id [256];
......
...@@ -55,15 +55,20 @@ int main(int, char**) ...@@ -55,15 +55,20 @@ int main(int, char**)
{ {
setup_test_environment(); setup_test_environment();
void* context = zmq_ctx_new (); void *context = zmq_ctx_new ();
void* sockets [2]; void *sockets [2];
int rc = 0; int rc = 0;
sockets [SERVER] = zmq_socket (context, ZMQ_STREAM); sockets [SERVER] = zmq_socket (context, ZMQ_STREAM);
int enabled = 1;
rc = zmq_setsockopt (sockets [SERVER], ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
assert (rc == 0);
rc = zmq_bind (sockets [SERVER], "tcp://0.0.0.0:6666"); rc = zmq_bind (sockets [SERVER], "tcp://0.0.0.0:6666");
assert (rc == 0); assert (rc == 0);
sockets [CLIENT] = zmq_socket (context, ZMQ_STREAM); sockets [CLIENT] = zmq_socket (context, ZMQ_STREAM);
rc = zmq_setsockopt (sockets [CLIENT], ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
assert (rc == 0);
rc = zmq_connect (sockets [CLIENT], "tcp://localhost:6666"); rc = zmq_connect (sockets [CLIENT], "tcp://localhost:6666");
assert (rc == 0); assert (rc == 0);
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include "../src/stdint.hpp" #include "../src/stdint.hpp"
#include "platform.hpp" #include "../src/platform.hpp"
// This defines the settle time used in tests; raise this if we // This defines the settle time used in tests; raise this if we
// get test failures on slower systems due to binds/connects not // get test failures on slower systems due to binds/connects not
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment