Commit b9fb9198 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #320 from shripchenko/master

ZMQ BUG FOUND + fixes for zmq_unbind() / zmq_disconnect() usage corner cases
parents 952127df 057fab09
...@@ -65,6 +65,7 @@ void zmq::ipc_listener_t::process_plug () ...@@ -65,6 +65,7 @@ void zmq::ipc_listener_t::process_plug ()
void zmq::ipc_listener_t::process_term (int linger_) void zmq::ipc_listener_t::process_term (int linger_)
{ {
rm_fd (handle); rm_fd (handle);
close ();
own_t::process_term (linger_); own_t::process_term (linger_);
} }
...@@ -182,4 +183,3 @@ zmq::fd_t zmq::ipc_listener_t::accept () ...@@ -182,4 +183,3 @@ zmq::fd_t zmq::ipc_listener_t::accept ()
} }
#endif #endif
...@@ -359,7 +359,7 @@ void zmq::pipe_t::terminate (bool delay_) ...@@ -359,7 +359,7 @@ void zmq::pipe_t::terminate (bool delay_)
// active state. // active state.
else if (state == delimited) { else if (state == delimited) {
send_pipe_term (peer); send_pipe_term (peer);
state = terminated; state = terminated;
} }
// There are no other states. // There are no other states.
......
...@@ -357,6 +357,7 @@ void zmq::session_base_t::proceed_with_term () ...@@ -357,6 +357,7 @@ void zmq::session_base_t::proceed_with_term ()
void zmq::session_base_t::timer_event (int id_) void zmq::session_base_t::timer_event (int id_)
{ {
// Linger period expired. We can proceed with termination even though // Linger period expired. We can proceed with termination even though
// there are still pending messages to be sent. // there are still pending messages to be sent.
zmq_assert (id_ == linger_timer_id); zmq_assert (id_ == linger_timer_id);
...@@ -376,13 +377,13 @@ void zmq::session_base_t::detached () ...@@ -376,13 +377,13 @@ void zmq::session_base_t::detached ()
} }
// Reconnect. // Reconnect.
if (options.reconnect_ivl != -1) if (options.reconnect_ivl != -1)
start_connecting (true); start_connecting (true);
// For subscriber sockets we hiccup the inbound pipe, which will cause // For subscriber sockets we hiccup the inbound pipe, which will cause
// the socket object to resend all the subscriptions. // the socket object to resend all the subscriptions.
if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB)) if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
pipe->hiccup (); pipe->hiccup ();
} }
void zmq::session_base_t::start_connecting (bool wait_) void zmq::session_base_t::start_connecting (bool wait_)
......
...@@ -552,6 +552,12 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) ...@@ -552,6 +552,12 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
return -1; return -1;
} }
// Process pending commands, if any, since there could be pending unprocessed process_own()'s
// (from launch_child() for example) we're asked to terminate now.
int rc = process_commands (0, false);
if (unlikely (rc != 0))
return -1;
// Find the endpoints range (if any) corresponding to the addr_ string. // Find the endpoints range (if any) corresponding to the addr_ string.
std::pair <endpoints_t::iterator, endpoints_t::iterator> range = endpoints.equal_range (std::string (addr_)); std::pair <endpoints_t::iterator, endpoints_t::iterator> range = endpoints.equal_range (std::string (addr_));
if (range.first == range.second) if (range.first == range.second)
......
...@@ -73,6 +73,7 @@ void zmq::tcp_listener_t::process_plug () ...@@ -73,6 +73,7 @@ void zmq::tcp_listener_t::process_plug ()
void zmq::tcp_listener_t::process_term (int linger_) void zmq::tcp_listener_t::process_term (int linger_)
{ {
rm_fd (handle); rm_fd (handle);
close ();
own_t::process_term (linger_); own_t::process_term (linger_);
} }
......
...@@ -13,7 +13,8 @@ noinst_PROGRAMS = test_pair_inproc \ ...@@ -13,7 +13,8 @@ noinst_PROGRAMS = test_pair_inproc \
test_invalid_rep \ test_invalid_rep \
test_msg_flags \ test_msg_flags \
test_connect_resolve \ test_connect_resolve \
test_last_endpoint test_last_endpoint \
test_term_endpoint
if !ON_MINGW if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \ noinst_PROGRAMS += test_shutdown_stress \
...@@ -33,6 +34,7 @@ test_invalid_rep_SOURCES = test_invalid_rep.cpp ...@@ -33,6 +34,7 @@ test_invalid_rep_SOURCES = test_invalid_rep.cpp
test_msg_flags_SOURCES = test_msg_flags.cpp test_msg_flags_SOURCES = test_msg_flags.cpp
test_connect_resolve_SOURCES = test_connect_resolve.cpp test_connect_resolve_SOURCES = test_connect_resolve.cpp
test_last_endpoint_SOURCES = test_last_endpoint.cpp test_last_endpoint_SOURCES = test_last_endpoint.cpp
test_term_endpoint_SOURCES = test_term_endpoint.cpp
if !ON_MINGW if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
......
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include "../include/zmq.h"
#include "../include/zmq_utils.h"
int main (int argc, char *argv [])
{
int rc;
char buf[32];
const char *ep = "tcp://127.0.0.1:5560";
fprintf (stderr, "unbind endpoint test running...\n");
// Create infrastructure.
void *ctx = zmq_init (1);
assert (ctx);
void *push = zmq_socket (ctx, ZMQ_PUSH);
assert (push);
rc = zmq_bind (push, ep);
assert (rc == 0);
void *pull = zmq_socket (ctx, ZMQ_PULL);
assert (pull);
rc = zmq_connect (pull, ep);
assert (rc == 0);
// Pass one message through to ensure the connection is established.
rc = zmq_send (push, "ABC", 3, 0);
assert (rc == 3);
rc = zmq_recv (pull, buf, sizeof (buf), 0);
assert (rc == 3);
// Unbind the lisnening endpoint
rc = zmq_unbind (push, ep);
assert (rc == 0);
// Let events some time
zmq_sleep (1);
// Check that sending would block (there's no outbound connection).
rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT);
assert (rc == -1 && zmq_errno () == EAGAIN);
// Clean up.
rc = zmq_close (pull);
assert (rc == 0);
rc = zmq_close (push);
assert (rc == 0);
rc = zmq_term (ctx);
assert (rc == 0);
// Now the other way round.
fprintf (stderr, "disconnect endpoint test running...\n");
// Create infrastructure.
ctx = zmq_init (1);
assert (ctx);
push = zmq_socket (ctx, ZMQ_PUSH);
assert (push);
rc = zmq_connect (push, ep);
assert (rc == 0);
pull = zmq_socket (ctx, ZMQ_PULL);
assert (pull);
rc = zmq_bind (pull, ep);
assert (rc == 0);
// Pass one message through to ensure the connection is established.
rc = zmq_send (push, "ABC", 3, 0);
assert (rc == 3);
rc = zmq_recv (pull, buf, sizeof (buf), 0);
assert (rc == 3);
// Disconnect the bound endpoint
rc = zmq_disconnect (push, ep);
assert (rc == 0);
// Let events some time
zmq_sleep (1);
// Check that sending would block (there's no inbound connections).
rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT);
assert (rc == -1 && zmq_errno () == EAGAIN);
// Clean up.
rc = zmq_close (pull);
assert (rc == 0);
rc = zmq_close (push);
assert (rc == 0);
rc = zmq_term (ctx);
assert (rc == 0);
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment