Commit d1c7280d authored by Jens Auer's avatar Jens Auer

Add mutex for monitor socket

parent 23be1dc0
...@@ -195,7 +195,9 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool ...@@ -195,7 +195,9 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool
monitor_socket (NULL), monitor_socket (NULL),
monitor_events (0), monitor_events (0),
thread_safe (thread_safe_), thread_safe (thread_safe_),
reaper_signaler (NULL) reaper_signaler (NULL),
sync(),
monitor_sync()
{ {
options.socket_id = sid_; options.socket_id = sid_;
options.ipv6 = (parent_->get (ZMQ_IPV6) != 0); options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
...@@ -222,7 +224,9 @@ zmq::socket_base_t::~socket_base_t () ...@@ -222,7 +224,9 @@ zmq::socket_base_t::~socket_base_t ()
if (reaper_signaler) if (reaper_signaler)
LIBZMQ_DELETE(reaper_signaler); LIBZMQ_DELETE(reaper_signaler);
scoped_lock_t lock(monitor_sync);
stop_monitor (); stop_monitor ();
zmq_assert (destroyed); zmq_assert (destroyed);
} }
...@@ -1376,7 +1380,10 @@ void zmq::socket_base_t::process_stop () ...@@ -1376,7 +1380,10 @@ void zmq::socket_base_t::process_stop ()
// We'll remember the fact so that any blocking call is interrupted and any // We'll remember the fact so that any blocking call is interrupted and any
// further attempt to use the socket will return ETERM. The user is still // further attempt to use the socket will return ETERM. The user is still
// responsible for calling zmq_close on the socket though! // responsible for calling zmq_close on the socket though!
stop_monitor (); {
scoped_lock_t lock(monitor_sync);
stop_monitor ();
}
ctx_terminated = true; ctx_terminated = true;
} }
...@@ -1580,6 +1587,9 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_) ...@@ -1580,6 +1587,9 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_)
errno = ETERM; errno = ETERM;
return -1; return -1;
} }
scoped_lock_t lock(monitor_sync);
// Support deregistering monitoring endpoints as well // Support deregistering monitoring endpoints as well
if (addr_ == NULL) { if (addr_ == NULL) {
stop_monitor (); stop_monitor ();
...@@ -1619,69 +1629,71 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_) ...@@ -1619,69 +1629,71 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_)
return rc; return rc;
} }
void zmq::socket_base_t::event_connected (const std::string &addr_, zmq::fd_t fd_) void zmq::socket_base_t::event_connected (const std::string &addr_, int fd_)
{ {
if (monitor_events & ZMQ_EVENT_CONNECTED) event(addr_, fd_, ZMQ_EVENT_CONNECTED);
monitor_event (ZMQ_EVENT_CONNECTED, fd_, addr_);
} }
void zmq::socket_base_t::event_connect_delayed (const std::string &addr_, int err_) void zmq::socket_base_t::event_connect_delayed (const std::string &addr_, int err_)
{ {
if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED) event(addr_, err_, ZMQ_EVENT_CONNECT_DELAYED);
monitor_event (ZMQ_EVENT_CONNECT_DELAYED, err_, addr_);
} }
void zmq::socket_base_t::event_connect_retried (const std::string &addr_, int interval_) void zmq::socket_base_t::event_connect_retried (const std::string &addr_, int interval_)
{ {
if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED) event(addr_, interval_, ZMQ_EVENT_CONNECT_RETRIED);
monitor_event (ZMQ_EVENT_CONNECT_RETRIED, interval_, addr_);
} }
void zmq::socket_base_t::event_listening (const std::string &addr_, zmq::fd_t fd_) void zmq::socket_base_t::event_listening (const std::string &addr_, int fd_)
{ {
if (monitor_events & ZMQ_EVENT_LISTENING) event(addr_, fd_, ZMQ_EVENT_LISTENING);
monitor_event (ZMQ_EVENT_LISTENING, fd_, addr_);
} }
void zmq::socket_base_t::event_bind_failed (const std::string &addr_, int err_) void zmq::socket_base_t::event_bind_failed (const std::string &addr_, int err_)
{ {
if (monitor_events & ZMQ_EVENT_BIND_FAILED) event(addr_, err_, ZMQ_EVENT_BIND_FAILED);
monitor_event (ZMQ_EVENT_BIND_FAILED, err_, addr_);
} }
void zmq::socket_base_t::event_accepted (const std::string &addr_, zmq::fd_t fd_) void zmq::socket_base_t::event_accepted (const std::string &addr_, int fd_)
{ {
if (monitor_events & ZMQ_EVENT_ACCEPTED) event(addr_, fd_, ZMQ_EVENT_ACCEPTED);
monitor_event (ZMQ_EVENT_ACCEPTED, fd_, addr_);
} }
void zmq::socket_base_t::event_accept_failed (const std::string &addr_, int err_) void zmq::socket_base_t::event_accept_failed (const std::string &addr_, int err_)
{ {
if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED) event(addr_, err_, ZMQ_EVENT_ACCEPT_FAILED);
monitor_event (ZMQ_EVENT_ACCEPT_FAILED, err_, addr_);
} }
void zmq::socket_base_t::event_closed (const std::string &addr_, zmq::fd_t fd_) void zmq::socket_base_t::event_closed (const std::string &addr_, int fd_)
{ {
if (monitor_events & ZMQ_EVENT_CLOSED) event(addr_, fd_, ZMQ_EVENT_CLOSED);
monitor_event (ZMQ_EVENT_CLOSED, fd_, addr_);
} }
void zmq::socket_base_t::event_close_failed (const std::string &addr_, int err_) void zmq::socket_base_t::event_close_failed (const std::string &addr_, int err_)
{ {
if (monitor_events & ZMQ_EVENT_CLOSE_FAILED) event(addr_, err_, ZMQ_EVENT_CLOSE_FAILED);
monitor_event (ZMQ_EVENT_CLOSE_FAILED, err_, addr_);
} }
void zmq::socket_base_t::event_disconnected (const std::string &addr_, zmq::fd_t fd_) void zmq::socket_base_t::event_disconnected (const std::string &addr_, int fd_)
{ {
if (monitor_events & ZMQ_EVENT_DISCONNECTED) event(addr_, fd_, ZMQ_EVENT_DISCONNECTED);
monitor_event (ZMQ_EVENT_DISCONNECTED, fd_, addr_); }
void zmq::socket_base_t::event(const std::string &addr_, int fd_, int type_)
{
scoped_lock_t lock(monitor_sync);
if (monitor_events & type_)
{
monitor_event (type_, fd_, addr_);
}
} }
// Send a monitor event // Send a monitor event
void zmq::socket_base_t::monitor_event (int event_, intptr_t value_, const std::string &addr_) void zmq::socket_base_t::monitor_event (int event_, intptr_t value_, const std::string &addr_)
{ {
// this is a private method which is only called from
// contexts where the mutex has been locked before
if (monitor_socket) { if (monitor_socket) {
// Send event in first frame // Send event in first frame
zmq_msg_t msg; zmq_msg_t msg;
...@@ -1703,6 +1715,9 @@ void zmq::socket_base_t::monitor_event (int event_, intptr_t value_, const std:: ...@@ -1703,6 +1715,9 @@ void zmq::socket_base_t::monitor_event (int event_, intptr_t value_, const std::
void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_) void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
{ {
// this is a private method which is only called from
// contexts where the mutex has been locked before
if (monitor_socket) { if (monitor_socket) {
if ((monitor_events & ZMQ_EVENT_MONITOR_STOPPED) && send_monitor_stopped_event_) if ((monitor_events & ZMQ_EVENT_MONITOR_STOPPED) && send_monitor_stopped_event_)
monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, ""); monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, "");
......
...@@ -176,16 +176,20 @@ namespace zmq ...@@ -176,16 +176,20 @@ namespace zmq
// Delay actual destruction of the socket. // Delay actual destruction of the socket.
void process_destroy (); void process_destroy ();
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
std::string connect_rid;
private:
// test if event should be sent and then dispatch it
void event(const std::string &addr_, int fd_, int type_);
// Socket event data dispatch // Socket event data dispatch
void monitor_event (int event_, intptr_t value_, const std::string& addr_); void monitor_event (int event_, intptr_t value_, const std::string& addr_);
// Monitor socket cleanup // Monitor socket cleanup
void stop_monitor (bool send_monitor_stopped_event_ = true); void stop_monitor (bool send_monitor_stopped_event_ = true);
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
std::string connect_rid;
private:
// Creates new endpoint ID and adds the endpoint to the map. // Creates new endpoint ID and adds the endpoint to the map.
void add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe); void add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe);
...@@ -282,6 +286,9 @@ namespace zmq ...@@ -282,6 +286,9 @@ namespace zmq
// Mutex for synchronize access to the socket in thread safe mode // Mutex for synchronize access to the socket in thread safe mode
mutex_t sync; mutex_t sync;
// Mutex to synchronize access to the monitor Pair socket
mutex_t monitor_sync;
socket_base_t (const socket_base_t&); socket_base_t (const socket_base_t&);
const socket_base_t &operator = (const socket_base_t&); const socket_base_t &operator = (const socket_base_t&);
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment