Commit 15eecf4c authored by Richard Newton's avatar Richard Newton

Update high water marking to allow changing high water mark after connection established.

parent a3b8f80f
...@@ -529,15 +529,6 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_, ...@@ -529,15 +529,6 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
errno_assert (rc == 0); errno_assert (rc == 0);
} }
int sndhwm = 0;
if (pending_connection_.endpoint.options.sndhwm != 0 && bind_options.rcvhwm != 0)
sndhwm = pending_connection_.endpoint.options.sndhwm + bind_options.rcvhwm;
int rcvhwm = 0;
if (pending_connection_.endpoint.options.rcvhwm != 0 && bind_options.sndhwm != 0)
rcvhwm = pending_connection_.endpoint.options.rcvhwm + bind_options.sndhwm;
bool conflate = pending_connection_.endpoint.options.conflate && bool conflate = pending_connection_.endpoint.options.conflate &&
(pending_connection_.endpoint.options.type == ZMQ_DEALER || (pending_connection_.endpoint.options.type == ZMQ_DEALER ||
pending_connection_.endpoint.options.type == ZMQ_PULL || pending_connection_.endpoint.options.type == ZMQ_PULL ||
...@@ -545,9 +536,17 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_, ...@@ -545,9 +536,17 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
pending_connection_.endpoint.options.type == ZMQ_PUB || pending_connection_.endpoint.options.type == ZMQ_PUB ||
pending_connection_.endpoint.options.type == ZMQ_SUB); pending_connection_.endpoint.options.type == ZMQ_SUB);
int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm}; if (!conflate) {
pending_connection_.connect_pipe->set_hwms(hwms [1], hwms [0]); pending_connection_.connect_pipe->set_hwms_boost(bind_options.sndhwm, bind_options.rcvhwm);
pending_connection_.bind_pipe->set_hwms(hwms [0], hwms [1]); pending_connection_.bind_pipe->set_hwms_boost(pending_connection_.endpoint.options.sndhwm, pending_connection_.endpoint.options.rcvhwm);
pending_connection_.connect_pipe->set_hwms(pending_connection_.endpoint.options.rcvhwm, pending_connection_.endpoint.options.sndhwm);
pending_connection_.bind_pipe->set_hwms(bind_options.rcvhwm, bind_options.sndhwm);
}
else {
pending_connection_.connect_pipe->set_hwms(-1, -1);
pending_connection_.bind_pipe->set_hwms(-1, -1);
}
if (side_ == bind_side) { if (side_ == bind_side) {
command_t cmd; command_t cmd;
......
...@@ -888,18 +888,5 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) ...@@ -888,18 +888,5 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
bool zmq::options_t::is_valid (int option_) const bool zmq::options_t::is_valid (int option_) const
{ {
bool valid = true; return true;
if (connected) {
switch (option_) {
case ZMQ_SNDHWM:
case ZMQ_RCVHWM:
valid = false;
break;
default:
break;
}
}
return valid;
} }
...@@ -81,6 +81,8 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, ...@@ -81,6 +81,8 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
out_active (true), out_active (true),
hwm (outhwm_), hwm (outhwm_),
lwm (compute_lwm (inhwm_)), lwm (compute_lwm (inhwm_)),
inhwmboost(0),
outhwmboost(0),
msgs_read (0), msgs_read (0),
msgs_written (0), msgs_written (0),
peers_msgs_read (0), peers_msgs_read (0),
...@@ -518,8 +520,14 @@ void zmq::pipe_t::hiccup () ...@@ -518,8 +520,14 @@ void zmq::pipe_t::hiccup ()
void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_) void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
{ {
lwm = compute_lwm (inhwm_); lwm = compute_lwm(inhwm_ + inhwmboost);
hwm = outhwm_; hwm = outhwm_ + outhwmboost;
}
void zmq::pipe_t::set_hwms_boost(int inhwmboost_, int outhwmboost_)
{
inhwmboost = inhwmboost_;
outhwmboost = outhwmboost_;
} }
bool zmq::pipe_t::check_hwm () const bool zmq::pipe_t::check_hwm () const
......
...@@ -133,6 +133,9 @@ namespace zmq ...@@ -133,6 +133,9 @@ namespace zmq
// set the high water marks. // set the high water marks.
void set_hwms (int inhwm_, int outhwm_); void set_hwms (int inhwm_, int outhwm_);
// set the boost to high water marks, used by inproc sockets so total hwm are sum of connect and bind sockets watermarks
void set_hwms_boost(int inhwmboost_, int outhwmboost_);
// check HWM // check HWM
bool check_hwm () const; bool check_hwm () const;
private: private:
...@@ -176,6 +179,10 @@ namespace zmq ...@@ -176,6 +179,10 @@ namespace zmq
// Low watermark for the inbound pipe. // Low watermark for the inbound pipe.
int lwm; int lwm;
// boosts for high and low watermarks, used with inproc sockets so hwm are sum of send and recv hmws on each side of pipe
int inhwmboost;
int outhwmboost;
// Number of messages read and written so far. // Number of messages read and written so far.
uint64_t msgs_read; uint64_t msgs_read;
uint64_t msgs_written; uint64_t msgs_written;
......
...@@ -335,6 +335,7 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, ...@@ -335,6 +335,7 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
// If the socket type doesn't support the option, pass it to // If the socket type doesn't support the option, pass it to
// the generic option parser. // the generic option parser.
rc = options.setsockopt (option_, optval_, optvallen_); rc = options.setsockopt (option_, optval_, optvallen_);
update_pipe_options(option_);
EXIT_MUTEX(); EXIT_MUTEX();
return rc; return rc;
...@@ -612,6 +613,11 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -612,6 +613,11 @@ int zmq::socket_base_t::connect (const char *addr_)
int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm}; int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
bool conflates [2] = {conflate, conflate}; bool conflates [2] = {conflate, conflate};
int rc = pipepair (parents, new_pipes, hwms, conflates); int rc = pipepair (parents, new_pipes, hwms, conflates);
if (!conflate) {
new_pipes[0]->set_hwms_boost(peer.options.sndhwm, peer.options.rcvhwm);
new_pipes[1]->set_hwms_boost(options.sndhwm, options.rcvhwm);
}
errno_assert (rc == 0); errno_assert (rc == 0);
if (!peer.socket) { if (!peer.socket) {
...@@ -1249,6 +1255,18 @@ void zmq::socket_base_t::process_term (int linger_) ...@@ -1249,6 +1255,18 @@ void zmq::socket_base_t::process_term (int linger_)
own_t::process_term (linger_); own_t::process_term (linger_);
} }
void zmq::socket_base_t::update_pipe_options(int option_)
{
if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM)
{
for (pipes_t::size_type i = 0; i != pipes.size(); ++i)
{
pipes[i]->set_hwms(options.rcvhwm, options.sndhwm);
}
}
}
void zmq::socket_base_t::process_destroy () void zmq::socket_base_t::process_destroy ()
{ {
destroyed = true; destroyed = true;
......
...@@ -232,6 +232,8 @@ namespace zmq ...@@ -232,6 +232,8 @@ namespace zmq
void process_bind (zmq::pipe_t *pipe_); void process_bind (zmq::pipe_t *pipe_);
void process_term (int linger_); void process_term (int linger_);
void update_pipe_options(int option_);
// Socket's mailbox object. // Socket's mailbox object.
i_mailbox* mailbox; i_mailbox* mailbox;
......
#include "testutil.hpp" #include "testutil.hpp"
void test_valid_hwm_change() const int MAX_SENDS = 10000;
void test_change_before_connected()
{ {
void *ctx = zmq_ctx_new (); int rc;
assert (ctx); void *ctx = zmq_ctx_new();
int rc;
void *bind_socket = zmq_socket (ctx, ZMQ_SUB); void *bind_socket = zmq_socket(ctx, ZMQ_PUSH);
assert (bind_socket); void *connect_socket = zmq_socket(ctx, ZMQ_PULL);
int val = 500; int val = 2;
rc = zmq_setsockopt(bind_socket, ZMQ_RCVHWM, &val, sizeof(val)); rc = zmq_setsockopt(connect_socket, ZMQ_RCVHWM, &val, sizeof(val));
assert (rc == 0); assert(rc == 0);
rc = zmq_setsockopt(bind_socket, ZMQ_SNDHWM, &val, sizeof(val));
assert(rc == 0);
rc = zmq_bind (bind_socket, "inproc://a"); zmq_connect(connect_socket, "inproc://a");
assert (rc == 0); zmq_bind(bind_socket, "inproc://a");
size_t placeholder = sizeof(val); size_t placeholder = sizeof(val);
val = 0; val = 0;
rc = zmq_getsockopt(bind_socket, ZMQ_RCVHWM, &val, &placeholder); rc = zmq_getsockopt(bind_socket, ZMQ_SNDHWM, &val, &placeholder);
assert (rc == 0); assert(rc == 0);
assert(val == 500); assert(val == 2);
}
int send_count = 0;
while (send_count < MAX_SENDS && zmq_send(bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
++send_count;
assert(send_count == 4);
zmq_close(bind_socket);
zmq_close(connect_socket);
zmq_ctx_term(ctx);
}
/** void test_change_after_connected()
* Test that zmq_setsockopt() fails to change the RCVHWM when called
* after a call to zmq_bind().
*/
void test_invalid_hwm_change_bind()
{ {
void *ctx = zmq_ctx_new (); int rc;
assert (ctx); void *ctx = zmq_ctx_new();
int rc;
void *bind_socket = zmq_socket (ctx, ZMQ_SUB); void *bind_socket = zmq_socket(ctx, ZMQ_PUSH);
assert (bind_socket); void *connect_socket = zmq_socket(ctx, ZMQ_PULL);
rc = zmq_bind (bind_socket, "inproc://a"); int val = 1;
assert (rc == 0); rc = zmq_setsockopt(connect_socket, ZMQ_RCVHWM, &val, sizeof(val));
assert(rc == 0);
rc = zmq_setsockopt(bind_socket, ZMQ_SNDHWM, &val, sizeof(val));
assert(rc == 0);
int val = 500; zmq_connect(connect_socket, "inproc://a");
rc = zmq_setsockopt (bind_socket, ZMQ_RCVHWM, &val, sizeof(val)); zmq_bind(bind_socket, "inproc://a");
assert (rc == -1);
zmq_close (bind_socket); val = 5;
zmq_ctx_term (ctx); rc = zmq_setsockopt(bind_socket, ZMQ_SNDHWM, &val, sizeof(val));
} assert(rc == 0);
void test_invalid_hwm_change_connect() size_t placeholder = sizeof(val);
{ val = 0;
void *ctx = zmq_ctx_new(); rc = zmq_getsockopt(bind_socket, ZMQ_SNDHWM, &val, &placeholder);
assert(ctx); assert(rc == 0);
int rc; assert(val == 5);
void *connect_socket = zmq_socket (ctx, ZMQ_SUB); int send_count = 0;
assert(connect_socket); while (send_count < MAX_SENDS && zmq_send(bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
++send_count;
rc = zmq_connect (connect_socket, "inproc://a"); assert(send_count == 6);
assert(rc == 0);
int val = 500; zmq_close(bind_socket);
rc = zmq_setsockopt (connect_socket, ZMQ_RCVHWM, &val, sizeof(val)); zmq_close(connect_socket);
assert(rc == -1); zmq_ctx_term(ctx);
}
zmq_close (connect_socket); void test_decrease_when_full()
zmq_ctx_term (ctx); {
int rc;
void *ctx = zmq_ctx_new();
void *bind_socket = zmq_socket(ctx, ZMQ_PUSH);
void *connect_socket = zmq_socket(ctx, ZMQ_PULL);
int val = 1;
rc = zmq_setsockopt(connect_socket, ZMQ_RCVHWM, &val, sizeof(val));
assert(rc == 0);
val = 100;
rc = zmq_setsockopt(bind_socket, ZMQ_SNDHWM, &val, sizeof(val));
assert(rc == 0);
zmq_bind(bind_socket, "inproc://a");
zmq_connect(connect_socket, "inproc://a");
// Fill up to hwm
int send_count = 0;
while (send_count < MAX_SENDS && zmq_send(bind_socket, &send_count, sizeof(send_count), ZMQ_DONTWAIT) == sizeof(send_count))
++send_count;
assert(send_count == 101);
// Descrease snd hwm
val = 70;
rc = zmq_setsockopt(bind_socket, ZMQ_SNDHWM, &val, sizeof(val));
assert(rc == 0);
size_t placeholder = sizeof(val);
val = 0;
rc = zmq_getsockopt(bind_socket, ZMQ_SNDHWM, &val, &placeholder);
assert(rc == 0);
assert(val == 70);
// Read out all data (should get up to previous hwm worth so none were dropped)
int read_count = 0;
int read_data = 0;
while (read_count < MAX_SENDS && zmq_recv(connect_socket, &read_data, sizeof(read_data), ZMQ_DONTWAIT) == sizeof(read_data)) {
assert(read_count == read_data);
++read_count;
}
assert(read_count == 101);
// Give io thread some time to catch up
msleep(10);
// Fill up to new hwm
send_count = 0;
while (send_count < MAX_SENDS && zmq_send(bind_socket, &send_count, sizeof(send_count), ZMQ_DONTWAIT) == sizeof(send_count))
++send_count;
// Really this should be 71, but the lwm stuff kicks in doesn't seem quite right
assert(send_count > 0);
zmq_close(bind_socket);
zmq_close(connect_socket);
zmq_ctx_term(ctx);
} }
int main() int main()
{ {
test_valid_hwm_change(); test_change_before_connected();
test_invalid_hwm_change_bind(); test_change_after_connected();
test_invalid_hwm_change_connect(); test_decrease_when_full();
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment