Commit 6d245eb6 authored by Lourens Naudé's avatar Lourens Naudé

Copy monitor specific event endpoints to event messages as the engine etc. can…

Copy monitor specific event endpoints to event messages as the engine etc. can be released at anytime
parent ce4d3216
...@@ -122,7 +122,7 @@ void zmq::ipc_connecter_t::out_event () ...@@ -122,7 +122,7 @@ void zmq::ipc_connecter_t::out_event ()
// Shut the connecter down. // Shut the connecter down.
terminate (); terminate ();
socket->event_connected (endpoint.c_str(), fd); socket->event_connected (endpoint, fd);
} }
void zmq::ipc_connecter_t::timer_event (int id_) void zmq::ipc_connecter_t::timer_event (int id_)
...@@ -150,7 +150,7 @@ void zmq::ipc_connecter_t::start_connecting () ...@@ -150,7 +150,7 @@ void zmq::ipc_connecter_t::start_connecting ()
handle = add_fd (s); handle = add_fd (s);
handle_valid = true; handle_valid = true;
set_pollout (handle); set_pollout (handle);
socket->event_connect_delayed (endpoint.c_str(), zmq_errno()); socket->event_connect_delayed (endpoint, zmq_errno());
} }
// Handle any other error condition by eventual reconnect. // Handle any other error condition by eventual reconnect.
...@@ -165,7 +165,7 @@ void zmq::ipc_connecter_t::add_reconnect_timer() ...@@ -165,7 +165,7 @@ void zmq::ipc_connecter_t::add_reconnect_timer()
{ {
int rc_ivl = get_new_reconnect_ivl(); int rc_ivl = get_new_reconnect_ivl();
add_timer (rc_ivl, reconnect_timer_id); add_timer (rc_ivl, reconnect_timer_id);
socket->event_connect_retried (endpoint.c_str(), rc_ivl); socket->event_connect_retried (endpoint, rc_ivl);
timer_started = true; timer_started = true;
} }
...@@ -226,7 +226,7 @@ int zmq::ipc_connecter_t::close () ...@@ -226,7 +226,7 @@ int zmq::ipc_connecter_t::close ()
zmq_assert (s != retired_fd); zmq_assert (s != retired_fd);
int rc = ::close (s); int rc = ::close (s);
errno_assert (rc == 0); errno_assert (rc == 0);
socket->event_closed (endpoint.c_str(), s); socket->event_closed (endpoint, s);
s = retired_fd; s = retired_fd;
return 0; return 0;
} }
......
...@@ -76,7 +76,7 @@ void zmq::ipc_listener_t::in_event () ...@@ -76,7 +76,7 @@ void zmq::ipc_listener_t::in_event ()
// If connection was reset by the peer in the meantime, just ignore it. // If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc. // TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd) { if (fd == retired_fd) {
socket->event_accept_failed (endpoint.c_str(), zmq_errno()); socket->event_accept_failed (endpoint, zmq_errno());
return; return;
} }
...@@ -96,7 +96,7 @@ void zmq::ipc_listener_t::in_event () ...@@ -96,7 +96,7 @@ void zmq::ipc_listener_t::in_event ()
session->inc_seqnum (); session->inc_seqnum ();
launch_child (session); launch_child (session);
send_attach (session, engine, false); send_attach (session, engine, false);
socket->event_accepted (endpoint.c_str(), fd); socket->event_accepted (endpoint, fd);
} }
int zmq::ipc_listener_t::get_address (std::string &addr_) int zmq::ipc_listener_t::get_address (std::string &addr_)
...@@ -155,7 +155,7 @@ int zmq::ipc_listener_t::set_address (const char *addr_) ...@@ -155,7 +155,7 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
if (rc != 0) if (rc != 0)
goto error; goto error;
socket->event_listening (endpoint.c_str(), s); socket->event_listening (endpoint, s);
return 0; return 0;
error: error:
...@@ -178,12 +178,12 @@ int zmq::ipc_listener_t::close () ...@@ -178,12 +178,12 @@ int zmq::ipc_listener_t::close ()
if (has_file && !filename.empty ()) { if (has_file && !filename.empty ()) {
rc = ::unlink(filename.c_str ()); rc = ::unlink(filename.c_str ());
if (rc != 0) { if (rc != 0) {
socket->event_close_failed (endpoint.c_str(), zmq_errno()); socket->event_close_failed (endpoint, zmq_errno());
return -1; return -1;
} }
} }
socket->event_closed (endpoint.c_str(), s); socket->event_closed (endpoint, s);
return 0; return 0;
} }
......
...@@ -357,7 +357,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -357,7 +357,7 @@ int zmq::socket_base_t::bind (const char *addr_)
int rc = listener->set_address (address.c_str ()); int rc = listener->set_address (address.c_str ());
if (rc != 0) { if (rc != 0) {
delete listener; delete listener;
event_bind_failed (addr_, zmq_errno()); event_bind_failed (address, zmq_errno());
return -1; return -1;
} }
...@@ -376,7 +376,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -376,7 +376,7 @@ int zmq::socket_base_t::bind (const char *addr_)
int rc = listener->set_address (address.c_str ()); int rc = listener->set_address (address.c_str ());
if (rc != 0) { if (rc != 0) {
delete listener; delete listener;
event_bind_failed (addr_, zmq_errno()); event_bind_failed (address, zmq_errno());
return -1; return -1;
} }
...@@ -1047,116 +1047,133 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_) ...@@ -1047,116 +1047,133 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_)
return rc; return rc;
} }
void zmq::socket_base_t::event_connected (const char *addr_, int fd_) void zmq::socket_base_t::event_connected (std::string &addr_, int fd_)
{ {
if (monitor_events & ZMQ_EVENT_CONNECTED) { if (monitor_events & ZMQ_EVENT_CONNECTED) {
zmq_event_t event; zmq_event_t event;
event.event = ZMQ_EVENT_CONNECTED; event.event = ZMQ_EVENT_CONNECTED;
event.data.connected.addr = const_cast <char *> (addr_); event.data.connected.addr = new char[addr_.size () + 1];
copy_monitor_address (event.data.connected.addr, addr_);
event.data.connected.fd = fd_; event.data.connected.fd = fd_;
monitor_event (event); monitor_event (event);
} }
} }
void zmq::socket_base_t::event_connect_delayed (const char *addr_, int err_) void zmq::socket_base_t::event_connect_delayed (std::string &addr_, int err_)
{ {
if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED) { if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED) {
zmq_event_t event; zmq_event_t event;
event.event = ZMQ_EVENT_CONNECT_DELAYED; event.event = ZMQ_EVENT_CONNECT_DELAYED;
event.data.connect_delayed.addr = const_cast <char *> (addr_); event.data.connect_delayed.addr = new char[addr_.size () + 1];
copy_monitor_address (event.data.connect_delayed.addr, addr_);
event.data.connect_delayed.err = err_; event.data.connect_delayed.err = err_;
monitor_event (event); monitor_event (event);
} }
} }
void zmq::socket_base_t::event_connect_retried (const char *addr_, int interval_) void zmq::socket_base_t::event_connect_retried (std::string &addr_, int interval_)
{ {
if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED) { if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED) {
zmq_event_t event; zmq_event_t event;
event.event = ZMQ_EVENT_CONNECT_RETRIED; event.event = ZMQ_EVENT_CONNECT_RETRIED;
event.data.connect_retried.addr = const_cast <char *> (addr_); event.data.connect_retried.addr = new char[addr_.size () + 1];
copy_monitor_address (event.data.connect_retried.addr, addr_);
event.data.connect_retried.interval = interval_; event.data.connect_retried.interval = interval_;
monitor_event (event); monitor_event (event);
} }
} }
void zmq::socket_base_t::event_listening (const char *addr_, int fd_) void zmq::socket_base_t::event_listening (std::string &addr_, int fd_)
{ {
if (monitor_events & ZMQ_EVENT_LISTENING) { if (monitor_events & ZMQ_EVENT_LISTENING) {
zmq_event_t event; zmq_event_t event;
event.event = ZMQ_EVENT_LISTENING; event.event = ZMQ_EVENT_LISTENING;
event.data.listening.addr = const_cast <char *> (addr_); event.data.listening.addr = new char[addr_.size () + 1];
copy_monitor_address (event.data.listening.addr, addr_);
event.data.listening.fd = fd_; event.data.listening.fd = fd_;
monitor_event (event); monitor_event (event);
} }
} }
void zmq::socket_base_t::event_bind_failed (const char *addr_, int err_) void zmq::socket_base_t::event_bind_failed (std::string &addr_, int err_)
{ {
if (monitor_events & ZMQ_EVENT_BIND_FAILED) { if (monitor_events & ZMQ_EVENT_BIND_FAILED) {
zmq_event_t event; zmq_event_t event;
event.event = ZMQ_EVENT_BIND_FAILED; event.event = ZMQ_EVENT_BIND_FAILED;
event.data.bind_failed.addr = const_cast <char *> (addr_); event.data.bind_failed.addr = new char[addr_.size () + 1];
copy_monitor_address (event.data.bind_failed.addr, addr_);
event.data.bind_failed.err = err_; event.data.bind_failed.err = err_;
monitor_event (event); monitor_event (event);
} }
} }
void zmq::socket_base_t::event_accepted (const char *addr_, int fd_) void zmq::socket_base_t::event_accepted (std::string &addr_, int fd_)
{ {
if (monitor_events & ZMQ_EVENT_ACCEPTED) { if (monitor_events & ZMQ_EVENT_ACCEPTED) {
zmq_event_t event; zmq_event_t event;
event.event = ZMQ_EVENT_ACCEPTED; event.event = ZMQ_EVENT_ACCEPTED;
event.data.accepted.addr = const_cast <char *> (addr_); event.data.accepted.addr = new char[addr_.size () + 1];
copy_monitor_address (event.data.accepted.addr, addr_);
event.data.accepted.fd = fd_; event.data.accepted.fd = fd_;
monitor_event (event); monitor_event (event);
} }
} }
void zmq::socket_base_t::event_accept_failed (const char *addr_, int err_) void zmq::socket_base_t::event_accept_failed (std::string &addr_, int err_)
{ {
if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED) { if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED) {
zmq_event_t event; zmq_event_t event;
event.event = ZMQ_EVENT_ACCEPT_FAILED; event.event = ZMQ_EVENT_ACCEPT_FAILED;
event.data.accept_failed.addr = const_cast <char *> (addr_); event.data.accept_failed.addr = new char[addr_.size () + 1];
copy_monitor_address (event.data.accept_failed.addr, addr_);
event.data.accept_failed.err= err_; event.data.accept_failed.err= err_;
monitor_event (event); monitor_event (event);
} }
} }
void zmq::socket_base_t::event_closed (const char *addr_, int fd_) void zmq::socket_base_t::event_closed (std::string &addr_, int fd_)
{ {
if (monitor_events & ZMQ_EVENT_CLOSED) { if (monitor_events & ZMQ_EVENT_CLOSED) {
zmq_event_t event; zmq_event_t event;
event.event = ZMQ_EVENT_CLOSED; event.event = ZMQ_EVENT_CLOSED;
event.data.closed.addr = const_cast <char *> (addr_); event.data.closed.addr = new char[addr_.size () + 1];
copy_monitor_address (event.data.closed.addr, addr_);
event.data.closed.fd = fd_; event.data.closed.fd = fd_;
monitor_event (event); monitor_event (event);
} }
} }
void zmq::socket_base_t::event_close_failed (const char *addr_, int err_) void zmq::socket_base_t::event_close_failed (std::string &addr_, int err_)
{ {
if (monitor_events & ZMQ_EVENT_CLOSE_FAILED) { if (monitor_events & ZMQ_EVENT_CLOSE_FAILED) {
zmq_event_t event; zmq_event_t event;
event.event = ZMQ_EVENT_CLOSE_FAILED; event.event = ZMQ_EVENT_CLOSE_FAILED;
event.data.close_failed.addr = const_cast <char *> (addr_); event.data.close_failed.addr = new char[addr_.size () + 1];
copy_monitor_address (event.data.close_failed.addr, addr_);
event.data.close_failed.err = err_; event.data.close_failed.err = err_;
monitor_event (event); monitor_event (event);
} }
} }
void zmq::socket_base_t::event_disconnected (const char *addr_, int fd_) void zmq::socket_base_t::event_disconnected (std::string &addr_, int fd_)
{ {
if (monitor_events & ZMQ_EVENT_DISCONNECTED) { if (monitor_events & ZMQ_EVENT_DISCONNECTED) {
zmq_event_t event; zmq_event_t event;
event.event = ZMQ_EVENT_DISCONNECTED; event.event = ZMQ_EVENT_DISCONNECTED;
event.data.disconnected.addr = const_cast <char *> (addr_); event.data.disconnected.addr = new char[addr_.size () + 1];
copy_monitor_address (event.data.disconnected.addr, addr_);
event.data.disconnected.fd = fd_; event.data.disconnected.fd = fd_;
monitor_event (event); monitor_event (event);
} }
} }
void zmq::socket_base_t::copy_monitor_address (char *dest_, std::string &src_)
{
alloc_assert (dest_);
dest_[src_.size ()] = 0;
memcpy (dest_, src_.c_str (), src_.size ());
}
void zmq::socket_base_t::monitor_event (zmq_event_t event_) void zmq::socket_base_t::monitor_event (zmq_event_t event_)
{ {
if (monitor_socket) { if (monitor_socket) {
......
...@@ -102,18 +102,18 @@ namespace zmq ...@@ -102,18 +102,18 @@ namespace zmq
void lock(); void lock();
void unlock(); void unlock();
int monitor(const char *endpoint_, int events_); int monitor (const char *endpoint_, int events_);
void event_connected(const char *addr_, int fd_); void event_connected (std::string &addr_, int fd_);
void event_connect_delayed(const char *addr_, int err_); void event_connect_delayed (std::string &addr_, int err_);
void event_connect_retried(const char *addr_, int interval_); void event_connect_retried (std::string &addr_, int interval_);
void event_listening(const char *addr_, int fd_); void event_listening (std::string &addr_, int fd_);
void event_bind_failed(const char *addr_, int err_); void event_bind_failed (std::string &addr_, int err_);
void event_accepted(const char *addr_, int fd_); void event_accepted (std::string &addr_, int fd_);
void event_accept_failed(const char *addr_, int err_); void event_accept_failed (std::string &addr_, int err_);
void event_closed(const char *addr_, int fd_); void event_closed (std::string &addr_, int fd_);
void event_close_failed(const char *addr_, int fd_); void event_close_failed (std::string &addr_, int fd_);
void event_disconnected(const char *addr_, int fd_); void event_disconnected (std::string &addr_, int fd_);
protected: protected:
...@@ -151,6 +151,9 @@ namespace zmq ...@@ -151,6 +151,9 @@ namespace zmq
// Socket event data dispath // Socket event data dispath
void monitor_event (zmq_event_t data_); void monitor_event (zmq_event_t data_);
// Copy monitor specific event endpoints to event messages
void copy_monitor_address (char *dest_, std::string &src_);
// Monitor socket cleanup // Monitor socket cleanup
void stop_monitor (); void stop_monitor ();
......
...@@ -484,7 +484,7 @@ int zmq::stream_engine_t::push_msg (msg_t *msg_) ...@@ -484,7 +484,7 @@ int zmq::stream_engine_t::push_msg (msg_t *msg_)
void zmq::stream_engine_t::error () void zmq::stream_engine_t::error ()
{ {
zmq_assert (session); zmq_assert (session);
socket->event_disconnected (endpoint.c_str(), s); socket->event_disconnected (endpoint, s);
session->detach (); session->detach ();
unplug (); unplug ();
delete this; delete this;
......
...@@ -136,7 +136,7 @@ void zmq::tcp_connecter_t::out_event () ...@@ -136,7 +136,7 @@ void zmq::tcp_connecter_t::out_event ()
// Shut the connecter down. // Shut the connecter down.
terminate (); terminate ();
socket->event_connected (endpoint.c_str(), fd); socket->event_connected (endpoint, fd);
} }
void zmq::tcp_connecter_t::timer_event (int id_) void zmq::tcp_connecter_t::timer_event (int id_)
...@@ -164,7 +164,7 @@ void zmq::tcp_connecter_t::start_connecting () ...@@ -164,7 +164,7 @@ void zmq::tcp_connecter_t::start_connecting ()
handle = add_fd (s); handle = add_fd (s);
handle_valid = true; handle_valid = true;
set_pollout (handle); set_pollout (handle);
socket->event_connect_delayed (endpoint.c_str(), zmq_errno()); socket->event_connect_delayed (endpoint, zmq_errno());
} }
// Handle any other error condition by eventual reconnect. // Handle any other error condition by eventual reconnect.
...@@ -179,7 +179,7 @@ void zmq::tcp_connecter_t::add_reconnect_timer() ...@@ -179,7 +179,7 @@ void zmq::tcp_connecter_t::add_reconnect_timer()
{ {
int rc_ivl = get_new_reconnect_ivl(); int rc_ivl = get_new_reconnect_ivl();
add_timer (rc_ivl, reconnect_timer_id); add_timer (rc_ivl, reconnect_timer_id);
socket->event_connect_retried (endpoint.c_str(), rc_ivl); socket->event_connect_retried (endpoint, rc_ivl);
timer_started = true; timer_started = true;
} }
...@@ -305,6 +305,6 @@ void zmq::tcp_connecter_t::close () ...@@ -305,6 +305,6 @@ void zmq::tcp_connecter_t::close ()
int rc = ::close (s); int rc = ::close (s);
errno_assert (rc == 0); errno_assert (rc == 0);
#endif #endif
socket->event_closed (endpoint.c_str(), s); socket->event_closed (endpoint, s);
s = retired_fd; s = retired_fd;
} }
...@@ -85,7 +85,7 @@ void zmq::tcp_listener_t::in_event () ...@@ -85,7 +85,7 @@ void zmq::tcp_listener_t::in_event ()
// If connection was reset by the peer in the meantime, just ignore it. // If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc. // TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd) { if (fd == retired_fd) {
socket->event_accept_failed (endpoint.c_str(), zmq_errno()); socket->event_accept_failed (endpoint, zmq_errno());
return; return;
} }
...@@ -108,7 +108,7 @@ void zmq::tcp_listener_t::in_event () ...@@ -108,7 +108,7 @@ void zmq::tcp_listener_t::in_event ()
session->inc_seqnum (); session->inc_seqnum ();
launch_child (session); launch_child (session);
send_attach (session, engine, false); send_attach (session, engine, false);
socket->event_accepted (endpoint.c_str(), fd); socket->event_accepted (endpoint, fd);
} }
void zmq::tcp_listener_t::close () void zmq::tcp_listener_t::close ()
...@@ -121,7 +121,7 @@ void zmq::tcp_listener_t::close () ...@@ -121,7 +121,7 @@ void zmq::tcp_listener_t::close ()
int rc = ::close (s); int rc = ::close (s);
errno_assert (rc == 0); errno_assert (rc == 0);
#endif #endif
socket->event_closed (endpoint.c_str(), s); socket->event_closed (endpoint, s);
s = retired_fd; s = retired_fd;
} }
...@@ -223,7 +223,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_) ...@@ -223,7 +223,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
goto error; goto error;
#endif #endif
socket->event_listening (endpoint.c_str(), s); socket->event_listening (endpoint, s);
return 0; return 0;
error: error:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment