Commit 4fc313d1 authored by Constantin Rack's avatar Constantin Rack Committed by GitHub

Merge pull request #2301 from bluca/set_peer_unsafe

Problems: modifying pipe from different thread is not safe and HWM always boosted by 1
parents 598befc1 edc770d6
...@@ -83,8 +83,8 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, ...@@ -83,8 +83,8 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
out_active (true), out_active (true),
hwm (outhwm_), hwm (outhwm_),
lwm (compute_lwm (inhwm_)), lwm (compute_lwm (inhwm_)),
inhwmboost(1), inhwmboost(-1),
outhwmboost(1), outhwmboost(-1),
msgs_read (0), msgs_read (0),
msgs_written (0), msgs_written (0),
peers_msgs_read (0), peers_msgs_read (0),
...@@ -508,25 +508,20 @@ void zmq::pipe_t::hiccup () ...@@ -508,25 +508,20 @@ void zmq::pipe_t::hiccup ()
void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_) void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
{ {
int in = inhwm_ + inhwmboost; int in = inhwm_ + (inhwmboost > 0 ? inhwmboost : 0);
int out = outhwm_ + outhwmboost; int out = outhwm_ + (outhwmboost > 0 ? outhwmboost : 0);
// if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite // if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite
if (inhwm_ <= 0 || inhwmboost <= 0) if (inhwm_ <= 0 || inhwmboost == 0)
in = 0; in = 0;
if (outhwm_ <= 0 || outhwmboost <= 0) if (outhwm_ <= 0 || outhwmboost == 0)
out = 0; out = 0;
lwm = compute_lwm(in); lwm = compute_lwm(in);
hwm = out; hwm = out;
} }
void zmq::pipe_t::set_peer_hwms (int inhwm_, int outhwm_)
{
peer->set_hwms(inhwm_, outhwm_);
}
void zmq::pipe_t::set_hwms_boost(int inhwmboost_, int outhwmboost_) void zmq::pipe_t::set_hwms_boost(int inhwmboost_, int outhwmboost_)
{ {
inhwmboost = inhwmboost_; inhwmboost = inhwmboost_;
......
...@@ -133,9 +133,6 @@ namespace zmq ...@@ -133,9 +133,6 @@ namespace zmq
// Set the high water marks. // Set the high water marks.
void set_hwms (int inhwm_, int outhwm_); void set_hwms (int inhwm_, int outhwm_);
// Set the high water marks for peer.
void set_peer_hwms (int inhwm_, int outhwm_);
// Set the boost to high water marks, used by inproc sockets so total hwm are sum of connect and bind sockets watermarks // Set the boost to high water marks, used by inproc sockets so total hwm are sum of connect and bind sockets watermarks
void set_hwms_boost(int inhwmboost_, int outhwmboost_); void set_hwms_boost(int inhwmboost_, int outhwmboost_);
......
...@@ -1414,7 +1414,6 @@ void zmq::socket_base_t::update_pipe_options(int option_) ...@@ -1414,7 +1414,6 @@ void zmq::socket_base_t::update_pipe_options(int option_)
for (pipes_t::size_type i = 0; i != pipes.size(); ++i) for (pipes_t::size_type i = 0; i != pipes.size(); ++i)
{ {
pipes[i]->set_hwms(options.rcvhwm, options.sndhwm); pipes[i]->set_hwms(options.rcvhwm, options.sndhwm);
pipes[i]->set_peer_hwms(options.sndhwm, options.rcvhwm);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment