Commit bb6d18d5 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #1156 from kreuzberger/master

Provide socket option for pub sockets to not silently drop message
parents 446e8efb f042ea9e
......@@ -307,6 +307,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
#define ZMQ_HANDSHAKE_IVL 66
#define ZMQ_IDENTITY_FD 67
#define ZMQ_SOCKS_PROXY 68
#define ZMQ_XPUB_NODROP 69
/* Message options */
#define ZMQ_MORE 1
......
......@@ -194,3 +194,23 @@ bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
return true;
}
bool zmq::dist_t::check_hwm ()
{
// If there are no matching pipes available, there is nothing to write.
bool pipes_hwm_ok = true;
if (matching == 0) {
return true;
}
for (pipes_t::size_type i = 0; i < matching; ++i) {
if( !pipes [i] -> check_hwm()) {
pipes_hwm_ok = false;
break;
}
}
return pipes_hwm_ok;
}
......@@ -64,6 +64,9 @@ namespace zmq
bool has_out ();
// check HWM of all pipes matching
bool check_hwm ();
private:
// Write the message to the pipe. Make the pipe inactive if writing
......
......@@ -458,6 +458,12 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return 0;
}
break;
case ZMQ_XPUB_NODROP:
{
pub_nodrop = true;
return 0;
}
break;
default:
#if defined (ZMQ_ACT_MILITANT)
......@@ -805,6 +811,12 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
}
break;
case ZMQ_XPUB_NODROP:
if( is_int) {
*value = pub_nodrop;
return 0;
}
break;
default:
#if defined (ZMQ_ACT_MILITANT)
malformed = false;
......
......@@ -179,6 +179,9 @@ namespace zmq
// close socket. Default is 30 secs. 0 means no handshake timeout.
int handshake_ivl;
// flag if PUB socket should not drop messages if reaching HWM
bool pub_nodrop;
};
}
......
......@@ -500,3 +500,9 @@ void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
lwm = compute_lwm (inhwm_);
hwm = outhwm_;
}
bool zmq::pipe_t::check_hwm ()
{
bool full = hwm > 0 && msgs_written - peers_msgs_read >= uint64_t (hwm - 1);
return( !full );
}
......@@ -118,6 +118,8 @@ namespace zmq
// set the high water marks.
void set_hwms (int inhwm_, int outhwm_);
// check HWM
bool check_hwm ();
// provide a way to link pipe to engine fd. Set on session initialization
fd_t assoc_fd; //=retired_fd
private:
......
......@@ -90,7 +90,7 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
if (option_ != ZMQ_XPUB_VERBOSE) {
if (option_ != ZMQ_XPUB_VERBOSE && option_ != ZMQ_XPUB_NODROP) {
errno = EINVAL;
return -1;
}
......@@ -98,7 +98,15 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
errno = EINVAL;
return -1;
}
if (option_ == ZMQ_XPUB_VERBOSE) {
verbose = (*static_cast <const int*> (optval_) != 0);
} else if (option_ == ZMQ_XPUB_NODROP) {
nodrop = (*static_cast <const int*> (optval_) != 0);
}
else {
return -1;
}
return 0;
}
......@@ -127,6 +135,11 @@ int zmq::xpub_t::xsend (msg_t *msg_)
subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
mark_as_matching, this);
if (nodrop && !dist.check_hwm()) {
return EAGAIN;
}
// Send the message to all the pipes that were marked as matching
// in the previous step.
int rc = dist.send_to_matching (msg_);
......
......@@ -79,6 +79,9 @@ namespace zmq
// True if we are in the middle of sending a multi-part message.
bool more;
// dont drop messages if hwm reached, just return with EAGAIN
bool nodrop;
// List of pending (un)subscriptions, ie. those that were already
// applied to the trie, but not yet received by the user.
typedef std::basic_string <unsigned char> blob_t;
......
......@@ -7,6 +7,7 @@ set(tests
test_reqrep_inproc
test_reqrep_tcp
test_hwm
test_hwm_pubsub
test_reqrep_device
test_sub_forward
test_invalid_rep
......@@ -42,6 +43,7 @@ set(tests
test_many_sockets
test_diffserv
test_connect_rid
test_xpub_wait_inproc
)
if(NOT WIN32)
list(APPEND tests
......
......@@ -9,6 +9,7 @@ noinst_PROGRAMS = test_system \
test_reqrep_inproc \
test_reqrep_tcp \
test_hwm \
test_hwm_pubsub \
test_reqrep_device \
test_sub_forward \
test_invalid_rep \
......@@ -52,7 +53,8 @@ noinst_PROGRAMS = test_system \
test_bind_src_address \
test_metadata \
test_id2fd \
test_capabilities
test_capabilities \
test_xpub_wait_inproc
if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \
......@@ -86,6 +88,7 @@ test_pair_tcp_SOURCES = test_pair_tcp.cpp testutil.hpp
test_reqrep_inproc_SOURCES = test_reqrep_inproc.cpp testutil.hpp
test_reqrep_tcp_SOURCES = test_reqrep_tcp.cpp testutil.hpp
test_hwm_SOURCES = test_hwm.cpp
test_hwm_pubsub_SOURCES = test_hwm_pubsub.cpp
test_reqrep_device_SOURCES = test_reqrep_device.cpp
test_sub_forward_SOURCES = test_sub_forward.cpp
test_invalid_rep_SOURCES = test_invalid_rep.cpp
......@@ -131,6 +134,7 @@ test_bind_src_address_SOURCES = test_bind_src_address.cpp
test_metadata_SOURCES = test_metadata.cpp
test_id2fd_SOURCES = test_id2fd.cpp
test_capabilities_SOURCES = test_capabilities.cpp
test_xpub_wait_inproc_SOURCES = test_xpub_wait_inproc.cpp
if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp
......
/*
Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
const int MAX_SENDS = 10000;
int test_defaults (int send_hwm, int msgCnt)
{
void *ctx = zmq_ctx_new ();
assert (ctx);
int rc;
// Set up bind socket
void *pub_socket = zmq_socket (ctx, ZMQ_PUB);
assert (pub_socket);
rc = zmq_bind (pub_socket, "inproc://a");
assert (rc == 0);
// Set up connect socket
void *sub_socket = zmq_socket (ctx, ZMQ_SUB);
assert (sub_socket);
rc = zmq_connect (sub_socket, "inproc://a");
assert (rc == 0);
//set a hwm on publisher
rc = zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm));
rc = zmq_setsockopt( sub_socket, ZMQ_SUBSCRIBE, 0, 0);
// Send until we block
int send_count = 0;
while (send_count < msgCnt && zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
++send_count;
// Now receive all sent messages
int recv_count = 0;
while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT))
{
++recv_count;
}
assert (send_hwm == recv_count);
// Clean up
rc = zmq_close (sub_socket);
assert (rc == 0);
rc = zmq_close (pub_socket);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return recv_count;
}
int receive( void* socket)
{
int recv_count = 0;
// Now receive all sent messages
while (0 == zmq_recv (socket, NULL, 0, ZMQ_DONTWAIT))
{
++recv_count;
}
return recv_count;
}
int test_blocking (int send_hwm, int msgCnt)
{
void *ctx = zmq_ctx_new ();
assert (ctx);
int rc;
// Set up bind socket
void *pub_socket = zmq_socket (ctx, ZMQ_PUB);
assert (pub_socket);
rc = zmq_bind (pub_socket, "inproc://a");
assert (rc == 0);
// Set up connect socket
void *sub_socket = zmq_socket (ctx, ZMQ_SUB);
assert (sub_socket);
rc = zmq_connect (sub_socket, "inproc://a");
assert (rc == 0);
//set a hwm on publisher
rc = zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm));
int wait = 1;
rc = zmq_setsockopt (pub_socket, ZMQ_XPUB_NODROP, &wait, sizeof(wait));
rc = zmq_setsockopt( sub_socket, ZMQ_SUBSCRIBE, 0, 0);
// Send until we block
int send_count = 0;
int recv_count = 0;
while (send_count < msgCnt )
{
rc = zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT);
if( rc == 0)
{
++send_count;
}
else if( -1 == rc)
{
assert(EAGAIN == errno);
recv_count += receive(sub_socket);
assert(recv_count == send_count);
}
}
recv_count += receive(sub_socket);
// Clean up
rc = zmq_close (sub_socket);
assert (rc == 0);
rc = zmq_close (pub_socket);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return recv_count;
}
int main (void)
{
setup_test_environment();
int count;
// send 1000 msg on hwm 1000, receive 1000
count = test_defaults (1000,1000);
assert (count == 1000);
// send 6000 msg on hwm 2000, drops above hwm, only receive hwm
count = test_blocking (2000,6000);
assert (count == 6000);
return 0;
}
/*
Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
int main (void)
{
setup_test_environment();
void *ctx = zmq_ctx_new ();
assert (ctx);
// Create a publisher
void *pub = zmq_socket (ctx, ZMQ_PUB);
assert (pub);
int rc = zmq_bind (pub, "inproc://soname");
assert (rc == 0);
// set pub socket options
int wait = 1;
rc = zmq_setsockopt (pub, ZMQ_XPUB_NODROP, &wait, 4);
assert (rc == 0);
int hwm = 2000;
rc = zmq_setsockopt (pub, ZMQ_SNDHWM, &hwm, 4);
assert (rc == 0);
// Create a subscriber
void *sub = zmq_socket (ctx, ZMQ_SUB);
assert (sub);
rc = zmq_connect (sub, "inproc://soname");
assert (rc == 0);
// Subscribe for all messages.
rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
int hwmlimit = hwm-1;
int send_count = 0;
// Send an empty message
for (int i = 0; i< hwmlimit; i++) {
rc = zmq_send (pub, NULL, 0, 0);
assert (rc == 0);
send_count++;
}
int recv_count = 0;
do {
// Receive the message in the subscriber
// rc = zmq_recv (sub, buff, sizeof (buff), ZMQ_DONTWAIT);
rc = zmq_recv (sub, NULL, 0, ZMQ_DONTWAIT);
if( -1 == rc ) {
assert(EAGAIN == errno);
}
else
{
assert( 0 == rc );
recv_count++;
}
} while( 0 == rc);
assert(send_count == recv_count);
// now test real blocking behavior
// set a timeout, default is infinite
int timeout = 0;
rc = zmq_setsockopt (pub, ZMQ_SNDTIMEO, &timeout, 4);
assert (rc == 0);
send_count = 0;
recv_count = 0;
hwmlimit = hwm;
// Send an empty message
while( 0 == zmq_send (pub, NULL, 0, 0) )
{
send_count++;
}
assert( EAGAIN == errno);
while( 0 == zmq_recv (sub, NULL, 0, ZMQ_DONTWAIT))
{
recv_count ++;
}
assert( send_count == recv_count);
// Clean up.
rc = zmq_close (pub);
assert (rc == 0);
rc = zmq_close (sub);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment