Commit 107f2441 authored by laplaceyang's avatar laplaceyang

Problem: Thread-safe solution for modify hwm of pipe

Solution: where change pipe hwm, send a command (new type pipe_hwm) to peer, so peer pipe can modify hwm thread-safely
parent 4fc313d1
...@@ -64,6 +64,7 @@ namespace zmq ...@@ -64,6 +64,7 @@ namespace zmq
hiccup, hiccup,
pipe_term, pipe_term,
pipe_term_ack, pipe_term_ack,
pipe_hwm,
term_req, term_req,
term, term,
term_ack, term_ack,
...@@ -129,6 +130,12 @@ namespace zmq ...@@ -129,6 +130,12 @@ namespace zmq
struct { struct {
} pipe_term_ack; } pipe_term_ack;
// Sent by one of pipe to another part for modify hwm
struct {
int inhwm;
int outhwm;
} pipe_hwm;
// Sent by I/O object ot the socket to request the shutdown of // Sent by I/O object ot the socket to request the shutdown of
// the I/O object. // the I/O object.
struct { struct {
......
...@@ -118,6 +118,10 @@ void zmq::object_t::process_command (command_t &cmd_) ...@@ -118,6 +118,10 @@ void zmq::object_t::process_command (command_t &cmd_)
process_pipe_term_ack (); process_pipe_term_ack ();
break; break;
case command_t::pipe_hwm:
process_pipe_hwm (cmd_.args.pipe_hwm.inhwm, cmd_.args.pipe_hwm.outhwm);
break;
case command_t::term_req: case command_t::term_req:
process_term_req (cmd_.args.term_req.object); process_term_req (cmd_.args.term_req.object);
break; break;
...@@ -291,6 +295,16 @@ void zmq::object_t::send_pipe_term_ack (pipe_t *destination_) ...@@ -291,6 +295,16 @@ void zmq::object_t::send_pipe_term_ack (pipe_t *destination_)
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_pipe_hwm (pipe_t *destination_, int inhwm_, int outhwm_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::pipe_hwm;
cmd.args.pipe_hwm.inhwm = inhwm_;
cmd.args.pipe_hwm.outhwm = outhwm_;
send_command (cmd);
}
void zmq::object_t::send_term_req (own_t *destination_, void zmq::object_t::send_term_req (own_t *destination_,
own_t *object_) own_t *object_)
{ {
...@@ -401,6 +415,11 @@ void zmq::object_t::process_pipe_term_ack () ...@@ -401,6 +415,11 @@ void zmq::object_t::process_pipe_term_ack ()
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_pipe_hwm (int, int)
{
zmq_assert (false);
}
void zmq::object_t::process_term_req (own_t *) void zmq::object_t::process_term_req (own_t *)
{ {
zmq_assert (false); zmq_assert (false);
......
...@@ -102,6 +102,7 @@ namespace zmq ...@@ -102,6 +102,7 @@ namespace zmq
void send_hiccup (zmq::pipe_t *destination_, void *pipe_); void send_hiccup (zmq::pipe_t *destination_, void *pipe_);
void send_pipe_term (zmq::pipe_t *destination_); void send_pipe_term (zmq::pipe_t *destination_);
void send_pipe_term_ack (zmq::pipe_t *destination_); void send_pipe_term_ack (zmq::pipe_t *destination_);
void send_pipe_hwm (zmq::pipe_t *destination_, int inhwm_, int outhwm_);
void send_term_req (zmq::own_t *destination_, void send_term_req (zmq::own_t *destination_,
zmq::own_t *object_); zmq::own_t *object_);
void send_term (zmq::own_t *destination_, int linger_); void send_term (zmq::own_t *destination_, int linger_);
...@@ -122,6 +123,7 @@ namespace zmq ...@@ -122,6 +123,7 @@ namespace zmq
virtual void process_hiccup (void *pipe_); virtual void process_hiccup (void *pipe_);
virtual void process_pipe_term (); virtual void process_pipe_term ();
virtual void process_pipe_term_ack (); virtual void process_pipe_term_ack ();
virtual void process_pipe_hwm (int inhwm_, int outhwm_);
virtual void process_term_req (zmq::own_t *object_); virtual void process_term_req (zmq::own_t *object_);
virtual void process_term (int linger_); virtual void process_term (int linger_);
virtual void process_term_ack (); virtual void process_term_ack ();
......
...@@ -377,6 +377,11 @@ void zmq::pipe_t::process_pipe_term_ack () ...@@ -377,6 +377,11 @@ void zmq::pipe_t::process_pipe_term_ack ()
delete this; delete this;
} }
void zmq::pipe_t::process_pipe_hwm (int inhwm_, int outhwm_)
{
set_hwms(inhwm_, outhwm_);
}
void zmq::pipe_t::set_nodelay () void zmq::pipe_t::set_nodelay ()
{ {
this->delay = false; this->delay = false;
...@@ -533,3 +538,8 @@ bool zmq::pipe_t::check_hwm () const ...@@ -533,3 +538,8 @@ bool zmq::pipe_t::check_hwm () const
bool full = hwm > 0 && msgs_written - peers_msgs_read >= uint64_t (hwm); bool full = hwm > 0 && msgs_written - peers_msgs_read >= uint64_t (hwm);
return( !full ); return( !full );
} }
void zmq::pipe_t::send_hwms_to_peer(int inhwm_, int outhwm_)
{
send_pipe_hwm(peer, inhwm_, outhwm_);
}
...@@ -136,6 +136,9 @@ namespace zmq ...@@ -136,6 +136,9 @@ namespace zmq
// Set the boost to high water marks, used by inproc sockets so total hwm are sum of connect and bind sockets watermarks // Set the boost to high water marks, used by inproc sockets so total hwm are sum of connect and bind sockets watermarks
void set_hwms_boost(int inhwmboost_, int outhwmboost_); void set_hwms_boost(int inhwmboost_, int outhwmboost_);
// send command to peer for notify the change of hwm
void send_hwms_to_peer(int inhwm_, int outhwm_);
// Returns true if HWM is not reached // Returns true if HWM is not reached
bool check_hwm () const; bool check_hwm () const;
private: private:
...@@ -149,6 +152,7 @@ namespace zmq ...@@ -149,6 +152,7 @@ namespace zmq
void process_hiccup (void *pipe_); void process_hiccup (void *pipe_);
void process_pipe_term (); void process_pipe_term ();
void process_pipe_term_ack (); void process_pipe_term_ack ();
void process_pipe_hwm (int inhwm_, int outhwm_);
// Handler for delimiter read from the pipe. // Handler for delimiter read from the pipe.
void process_delimiter (); void process_delimiter ();
......
...@@ -1414,6 +1414,7 @@ void zmq::socket_base_t::update_pipe_options(int option_) ...@@ -1414,6 +1414,7 @@ void zmq::socket_base_t::update_pipe_options(int option_)
for (pipes_t::size_type i = 0; i != pipes.size(); ++i) for (pipes_t::size_type i = 0; i != pipes.size(); ++i)
{ {
pipes[i]->set_hwms(options.rcvhwm, options.sndhwm); pipes[i]->set_hwms(options.rcvhwm, options.sndhwm);
pipes[i]->send_hwms_to_peer(options.sndhwm, options.rcvhwm);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment