Commit 5637ed08 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #407 from methodmissing/LIBZMQ-399

Fix invalid address metadata for ZMQ_EVENT_DISCONNECTED
parents 1f229547 9dc248f6
......@@ -111,9 +111,8 @@ void zmq::ipc_connecter_t::out_event ()
add_reconnect_timer();
return;
}
// Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options);
stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint);
alloc_assert (engine);
// Attach the engine to the corresponding session object.
......
......@@ -81,7 +81,7 @@ void zmq::ipc_listener_t::in_event ()
}
// Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options);
stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint);
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
......@@ -155,7 +155,7 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
if (rc != 0)
goto error;
socket->monitor_event (ZMQ_EVENT_LISTENING, addr_, s);
socket->monitor_event (ZMQ_EVENT_LISTENING, endpoint.c_str(), s);
return 0;
error:
......
......@@ -279,13 +279,6 @@ void zmq::session_base_t::hiccuped (pipe_t *pipe_)
zmq_assert (false);
}
int zmq::session_base_t::get_address (std::string &addr_)
{
if (addr)
return addr->to_string (addr_);
return -1;
}
void zmq::session_base_t::monitor_event (int event_, ...)
{
va_list args;
......
......@@ -66,7 +66,6 @@ namespace zmq
void hiccuped (zmq::pipe_t *pipe_);
void terminated (zmq::pipe_t *pipe_);
int get_address (std::string &addr_);
void monitor_event (int event_, ...);
protected:
......
......@@ -42,7 +42,7 @@
#include "err.hpp"
#include "ip.hpp"
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint_) :
s (fd_),
inpos (NULL),
insize (0),
......@@ -53,11 +53,11 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
encoder (out_batch_size),
session (NULL),
options (options_),
plugged (false)
plugged (false),
endpoint (endpoint_)
{
// Put the socket into non-blocking mode.
unblock_socket (s);
// Set the socket buffer limits for the underlying socket.
if (options.sndbuf) {
int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF,
......@@ -116,14 +116,11 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
decoder.set_session (session_);
session = session_;
session->get_address (endpoint);
// Connect to I/O threads poller object.
io_object_t::plug (io_thread_);
handle = add_fd (s);
set_pollin (handle);
set_pollout (handle);
// Flush all the data that may have been already received downstream.
in_event ();
}
......@@ -143,7 +140,6 @@ void zmq::stream_engine_t::unplug ()
encoder.set_session (NULL);
decoder.set_session (NULL);
session = NULL;
endpoint.clear();
}
void zmq::stream_engine_t::terminate ()
......
......@@ -45,7 +45,7 @@ namespace zmq
{
public:
stream_engine_t (fd_t fd_, const options_t &options_);
stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint);
~stream_engine_t ();
// i_engine interface implementation.
......
......@@ -126,7 +126,7 @@ void zmq::tcp_connecter_t::out_event ()
tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
// Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options);
stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint);
alloc_assert (engine);
// Attach the engine to the corresponding session object.
......
......@@ -93,7 +93,7 @@ void zmq::tcp_listener_t::in_event ()
tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
// Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options);
stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint);
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
......
......@@ -21,6 +21,7 @@
#include <assert.h>
#include <string.h>
#include "testutil.hpp"
#include "../include/zmq.h"
#include "../include/zmq_utils.h"
......@@ -98,6 +99,8 @@ int main (int argc, char *argv [])
rc = zmq_connect (req, "tcp://127.0.0.1:5560");
assert (rc == 0);
bounce (rep, req);
// Allow a window for socket events as connect can be async
zmq_sleep (1);
......@@ -120,6 +123,7 @@ int main (int argc, char *argv [])
assert (events & ZMQ_EVENT_ACCEPTED);
assert (events & ZMQ_EVENT_CONNECTED);
assert (events & ZMQ_EVENT_CLOSED);
assert (events & ZMQ_EVENT_DISCONNECTED);
return 0 ;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment