Unverified Commit ada6f0c0 authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #3871 from somdoron/DISCONNECT_MSG

problem: router doesn't know when peer disconnected 
parents 7ce7b8b8 81444136
......@@ -1042,7 +1042,8 @@ test_apps += tests/test_poller \
tests/test_peer \
tests/test_reconnect_options \
tests/test_msg_init \
tests/test_hello_msg
tests/test_hello_msg \
tests/test_disconnect_msg
tests_test_poller_SOURCES = tests/test_poller.cpp
tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
......@@ -1099,6 +1100,10 @@ tests_test_msg_init_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_hello_msg_SOURCES = tests/test_hello_msg.cpp
tests_test_hello_msg_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_hello_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_disconnect_msg_SOURCES = tests/test_disconnect_msg.cpp
tests_test_disconnect_msg_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_disconnect_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
endif
if ENABLE_STATIC
......
......@@ -228,6 +228,18 @@ Option value size:: 32 or 41
Default value:: NULL
Applicable socket types:: all, when using TCP transport
ZMQ_DISCONNECT_MSG: set a disconnect message that the socket will generate when accepted peer disconnect
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
When set, the socket will generate a disconnect message when accepted peer has been disconnected.
You may set this on ROUTER, SERVER and PEER sockets.
The combination with ZMQ_HEARTBEAT_IVL is powerful and simplify protocols, when heartbeat recognize a connection drop it
will generate a disconnect message that can match the protocol of the application.
[horizontal]
Option value type:: binary data
Option value unit:: N/A
Default value:: NULL
Applicable socket types:: ZMQ_ROUTER, ZMQ_SERVER and ZMQ_PEER
ZMQ_GSSAPI_PLAINTEXT: Disable GSSAPI encryption
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
......
......@@ -679,6 +679,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);
#define ZMQ_ONLY_FIRST_SUBSCRIBE 108
#define ZMQ_RECONNECT_STOP 109
#define ZMQ_HELLO_MSG 110
#define ZMQ_DISCONNECT_MSG 111
/* DRAFT ZMQ_RECONNECT_STOP options */
#define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1
......
......@@ -828,6 +828,13 @@ void zmq::ctx_t::connect_inproc_sockets (
pending_connection_.bind_pipe->set_hwms (-1, -1);
}
#ifdef ZMQ_BUILD_DRAFT_API
if (bind_options_.can_recv_disconnect_msg
&& !bind_options_.disconnect_msg.empty ())
pending_connection_.connect_pipe->set_disconnect_msg (
bind_options_.disconnect_msg);
#endif
if (side_ == bind_side) {
command_t cmd;
cmd.type = command_t::bind;
......
......@@ -251,7 +251,9 @@ zmq::options_t::options_t () :
monitor_event_version (1),
wss_trust_system (false),
hello_msg (),
can_send_hello_msg (false)
can_send_hello_msg (false),
disconnect_msg (),
can_recv_disconnect_msg (false)
{
memset (curve_public_key, 0, CURVE_KEYSIZE);
memset (curve_secret_key, 0, CURVE_KEYSIZE);
......@@ -825,6 +827,16 @@ int zmq::options_t::setsockopt (int option_,
hello_msg = std::vector<unsigned char> ();
}
return 0;
case ZMQ_DISCONNECT_MSG:
if (optvallen_ > 0) {
unsigned char *bytes = (unsigned char *) optval_;
disconnect_msg =
std::vector<unsigned char> (bytes, bytes + optvallen_);
} else {
disconnect_msg = std::vector<unsigned char> ();
}
return 0;
......
......@@ -301,6 +301,10 @@ struct options_t
// Hello msg
std::vector<unsigned char> hello_msg;
bool can_send_hello_msg;
// Disconnect msg
std::vector<unsigned char> disconnect_msg;
bool can_recv_disconnect_msg;
};
inline bool get_effective_conflate_option (const options_t &options)
......
......@@ -41,6 +41,7 @@ zmq::peer_t::peer_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
{
options.type = ZMQ_PEER;
options.can_send_hello_msg = true;
options.can_recv_disconnect_msg = true;
}
uint32_t zmq::peer_t::connect_peer (const char *endpoint_uri_)
......
......@@ -124,10 +124,12 @@ zmq::pipe_t::pipe_t (object_t *parent_,
_server_socket_routing_id (0),
_conflate (conflate_)
{
_disconnect_msg.init ();
}
zmq::pipe_t::~pipe_t ()
{
_disconnect_msg.close ();
}
void zmq::pipe_t::set_peer (pipe_t *peer_)
......@@ -591,3 +593,24 @@ void zmq::pipe_t::process_pipe_peer_stats (uint64_t queue_count_,
send_pipe_stats_publish (socket_base_, queue_count_,
_msgs_written - _peers_msgs_read, endpoint_pair_);
}
void zmq::pipe_t::send_disconnect_msg ()
{
if (_disconnect_msg.size () > 0) {
// Rollback any incomplete message in the pipe, and push the disconnect message.
rollback ();
_out_pipe->write (_disconnect_msg, false);
flush ();
_disconnect_msg.init ();
}
}
void zmq::pipe_t::set_disconnect_msg (
const std::vector<unsigned char> &disconnect_)
{
_disconnect_msg.close ();
const int rc =
_disconnect_msg.init_buffer (&disconnect_[0], disconnect_.size ());
errno_assert (rc == 0);
}
......@@ -38,10 +38,10 @@
#include "blob.hpp"
#include "options.hpp"
#include "endpoint.hpp"
#include "msg.hpp"
namespace zmq
{
class msg_t;
class pipe_t;
// Create a pipepair for bi-directional transfer of messages.
......@@ -147,6 +147,9 @@ class pipe_t ZMQ_FINAL : public object_t,
void send_stats_to_peer (own_t *socket_base_);
void send_disconnect_msg ();
void set_disconnect_msg (const std::vector<unsigned char> &disconnect_);
private:
// Type of the underlying lock-free pipe.
typedef ypipe_base_t<msg_t> upipe_t;
......@@ -257,6 +260,9 @@ class pipe_t ZMQ_FINAL : public object_t,
// The endpoints of this pipe.
endpoint_uri_pair_t _endpoint_pair;
// Disconnect msg
msg_t _disconnect_msg;
ZMQ_NON_COPYABLE_NOR_MOVABLE (pipe_t)
};
......
......@@ -56,6 +56,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
options.recv_routing_id = true;
options.raw_socket = false;
options.can_send_hello_msg = true;
options.can_recv_disconnect_msg = true;
_prefetched_id.init ();
_prefetched_msg.init ();
......
......@@ -42,6 +42,7 @@ zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
{
options.type = ZMQ_SERVER;
options.can_send_hello_msg = true;
options.can_recv_disconnect_msg = true;
}
zmq::server_t::~server_t ()
......
......@@ -442,15 +442,26 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
_engine->plug (_io_thread, this);
}
void zmq::session_base_t::engine_error (zmq::i_engine::error_reason_t reason_)
void zmq::session_base_t::engine_error (bool handshaked_,
zmq::i_engine::error_reason_t reason_)
{
// Engine is dead. Let's forget about it.
_engine = NULL;
// Remove any half-done messages from the pipes.
if (_pipe)
if (_pipe) {
clean_pipes ();
#ifdef ZMQ_BUILD_DRAFT_API
// Only send disconnect message if socket was accepted and handshake was completed
if (!_active && handshaked_ && options.can_recv_disconnect_msg
&& !options.disconnect_msg.empty ()) {
_pipe->set_disconnect_msg (options.disconnect_msg);
_pipe->send_disconnect_msg ();
}
#endif
}
zmq_assert (reason_ == i_engine::connection_error
|| reason_ == i_engine::timeout_error
|| reason_ == i_engine::protocol_error);
......
......@@ -62,7 +62,7 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events
virtual void reset ();
void flush ();
void rollback ();
void engine_error (zmq::i_engine::error_reason_t reason_);
void engine_error (bool handshaked_, zmq::i_engine::error_reason_t reason_);
// i_pipe_events interface implementation.
void read_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
......
......@@ -122,8 +122,10 @@ int zmq::socket_base_t::inprocs_t::erase_pipes (
return -1;
}
for (map_t::iterator it = range.first; it != range.second; ++it)
for (map_t::iterator it = range.first; it != range.second; ++it) {
it->second->send_disconnect_msg ();
it->second->terminate (true);
}
_inprocs.erase (range.first, range.second);
return 0;
}
......@@ -864,6 +866,10 @@ int zmq::socket_base_t::connect_internal (const char *endpoint_uri_)
&& peer.options.hello_msg.size () > 0) {
send_hello_msg (new_pipes[1], peer.options);
}
if (peer.options.can_recv_disconnect_msg
&& peer.options.disconnect_msg.size () > 0)
new_pipes[0]->set_disconnect_msg (peer.options.disconnect_msg);
#endif
// Attach remote end of the pipe to the peer socket. Note that peer's
......@@ -1530,6 +1536,8 @@ void zmq::socket_base_t::process_term (int linger_)
// Ask all attached pipes to terminate.
for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) {
// Only inprocs might have a disconnect message set
_pipes[i]->send_disconnect_msg ();
_pipes[i]->terminate (false);
}
register_term_acks (static_cast<int> (_pipes.size ()));
......
......@@ -685,7 +685,11 @@ void zmq::stream_engine_base_t::error (error_reason_t reason_)
_socket->event_disconnected (_endpoint_uri_pair, _s);
_session->flush ();
_session->engine_error (reason_);
_session->engine_error (
!_handshaking
&& (_mechanism == NULL
|| _mechanism->status () != mechanism_t::handshaking),
reason_);
unplug ();
delete this;
}
......
......@@ -344,7 +344,7 @@ int zmq::udp_engine_t::add_membership (fd_t s_, const udp_address_t *addr_)
void zmq::udp_engine_t::error (error_reason_t reason_)
{
zmq_assert (_session);
_session->engine_error (reason_);
_session->engine_error (false, reason_);
terminate ();
}
......
......@@ -66,6 +66,7 @@
#define ZMQ_ONLY_FIRST_SUBSCRIBE 108
#define ZMQ_RECONNECT_STOP 109
#define ZMQ_HELLO_MSG 110
#define ZMQ_DISCONNECT_MSG 111
/* DRAFT ZMQ_RECONNECT_STOP options */
#define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1
......
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
SETUP_TEARDOWN_TESTCONTEXT
void test (const char *address)
{
// Create a server
void *server = test_context_socket (ZMQ_SERVER);
// set server socket options
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (server, ZMQ_DISCONNECT_MSG, "D", 1));
// bind server
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (server, address));
// Create a client
void *client = test_context_socket (ZMQ_CLIENT);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (client, ZMQ_HELLO_MSG, "H", 1));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, address));
// Receive the hello message from client
recv_string_expect_success (server, "H", 0);
// Kill the client
test_context_socket_close (client);
// Receive the disconnect message
recv_string_expect_success (server, "D", 0);
// Clean up.
test_context_socket_close (server);
}
void test_tcp ()
{
test ("tcp://127.0.0.1:5569");
}
void test_inproc ()
{
test ("inproc://disconnect-msg");
}
void test_inproc_disconnect ()
{
const char *address = "inproc://disconnect-msg";
// Create a server
void *server = test_context_socket (ZMQ_SERVER);
// set server socket options
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (server, ZMQ_DISCONNECT_MSG, "D", 1));
// bind server
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (server, address));
// Create a client
void *client = test_context_socket (ZMQ_CLIENT);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (client, ZMQ_HELLO_MSG, "H", 1));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, address));
// Receive the hello message from client
recv_string_expect_success (server, "H", 0);
// disconnect the client
TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (client, address));
// Receive the disconnect message
recv_string_expect_success (server, "D", 0);
// Clean up.
test_context_socket_close (client);
test_context_socket_close (server);
}
int main ()
{
setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test_tcp);
RUN_TEST (test_inproc);
RUN_TEST (test_inproc_disconnect);
return UNITY_END ();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment