Commit fe3fb419 authored by Ian Barber's avatar Ian Barber

After speaking with Ben Gray and the discussion on the mailing list, this is an…

After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.

This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.

The documentation has been updated to reflect the change, but please do check over the code and test and review.
parent c28af41c
......@@ -39,6 +39,7 @@ tests/test_invalid_rep
tests/test_msg_flags
tests/test_ts_context
tests/test_connect_resolve
tests/test_connect_delay
tests/test_term_endpoint
src/platform.hpp*
src/stamp-h1
......
......@@ -342,6 +342,21 @@ Default value:: 1 (true)
Applicable socket types:: all, when using TCP transports.
ZMQ_DELAY_ATTACH_ON_CONNECT
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Retrieve the state of the attach on connect value. If set to `1`, will delay the
attachment of a pipe on connect until the underlying connection has completed.
This will cause the socket to block if there are no other connections, but will
prevent queues from filling on pipes awaiting connection.
[horizontal]
Option value type:: int
Option value unit:: boolean
Default value:: 0 (false)
Applicable socket types:: all, primarily when using TCP/IPC transports.
ZMQ_FD: Retrieve file descriptor associated with the socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_FD' option shall retrieve the file descriptor associated with the
......
......@@ -351,6 +351,19 @@ Option value unit:: boolean
Default value:: 1 (true)
Applicable socket types:: all, when using TCP transports.
ZMQ_DELAY_ATTACH_ON_CONNECT
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If set to `1`, will delay the attachment of a pipe on connect until the underlying
connection has completed. This will cause the socket to block if there are no other
connections, but will prevent queues from filling on pipes awaiting connection.
[horizontal]
Option value type:: int
Option value unit:: boolean
Default value:: 0 (false)
Applicable socket types:: all, primarily when using TCP/IPC transports.
ZMQ_FAIL_UNROUTABLE: Set unroutable message behavior
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
......
......@@ -227,6 +227,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_TCP_KEEPALIVE_IDLE 36
#define ZMQ_TCP_KEEPALIVE_INTVL 37
#define ZMQ_TCP_ACCEPT_FILTER 38
#define ZMQ_DELAY_ATTACH_ON_CONNECT 39
/* Message options */
#define ZMQ_MORE 1
......
......@@ -41,8 +41,7 @@ zmq::lb_t::~lb_t ()
void zmq::lb_t::attach (pipe_t *pipe_)
{
pipes.push_back (pipe_);
pipes.swap (active, pipes.size () - 1);
active++;
activated (pipe_);
}
void zmq::lb_t::terminated (pipe_t *pipe_)
......
......@@ -44,6 +44,7 @@ zmq::options_t::options_t () :
rcvtimeo (-1),
sndtimeo (-1),
ipv4only (1),
delay_attach_on_connect (0),
delay_on_close (true),
delay_on_disconnect (true),
filter (false),
......@@ -219,6 +220,21 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return 0;
}
case ZMQ_DELAY_ATTACH_ON_CONNECT:
{
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
int val = *((int*) optval_);
if (val != 0 && val != 1) {
errno = EINVAL;
return -1;
}
delay_attach_on_connect = val;
return 0;
}
case ZMQ_TCP_KEEPALIVE:
{
if (optvallen_ != sizeof (int)) {
......@@ -484,6 +500,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (int);
return 0;
case ZMQ_DELAY_ATTACH_ON_CONNECT:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
*((int*) optval_) = delay_attach_on_connect;
*optvallen_ = sizeof (int);
return 0;
case ZMQ_TCP_KEEPALIVE:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
......
......@@ -96,6 +96,10 @@ namespace zmq
// possible to communicate with IPv6-only hosts. If 0, the socket can
// connect to and accept connections from both IPv4 and IPv6 hosts.
int ipv4only;
// If 1, connecting pipes are not attached immediately, meaning a send()
// on a socket with only connecting pipes would block
int delay_attach_on_connect;
// If true, session reads all the pending messages from the pipe and
// sends them to the network when socket is closed.
......
......@@ -111,6 +111,7 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
io_object_t (io_thread_),
connect (connect_),
pipe (NULL),
outpipe (NULL),
incomplete_in (false),
pending (false),
engine (NULL),
......@@ -150,6 +151,13 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
pipe->set_event_sink (this);
}
void zmq::session_base_t::onconnect_attach_pipe (pipe_t *pipe_)
{
zmq_assert (!is_terminating ());
zmq_assert (pipe_);
outpipe = pipe_;
}
int zmq::session_base_t::read (msg_t *msg_)
{
// First message to send is identity (if required).
......@@ -229,6 +237,12 @@ void zmq::session_base_t::clean_pipes ()
void zmq::session_base_t::terminated (pipe_t *pipe_)
{
// If we get a term signal from our held outpipe
// we can safely ignore it.
if (pipe_ == outpipe) {
return;
}
// Drop the reference to the deallocated pipe.
zmq_assert (pipe == pipe_);
pipe = NULL;
......@@ -310,6 +324,12 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
send_bind (socket, pipes [1]);
}
if (outpipe && options.delay_attach_on_connect) {
send_bind (socket, outpipe);
// Forget the outpipe
outpipe = NULL;
}
// Plug in the engine.
zmq_assert (!engine);
engine = engine_;
......@@ -358,6 +378,12 @@ void zmq::session_base_t::process_term (int linger_)
// Start pipe termination process. Delay the termination till all messages
// are processed in case the linger time is non-zero.
pipe->terminate (linger_ != 0);
// If we're storing to a to be connected, we can clear that as well
if (outpipe) {
outpipe->set_event_sink (this);
outpipe->terminate (linger_ != 0);
}
// TODO: Should this go into pipe_t::terminate ?
// In case there's no engine and there's only delimiter in the
......@@ -385,6 +411,9 @@ void zmq::session_base_t::timer_event (int id_)
// Ask pipe to terminate even though there may be pending messages in it.
zmq_assert (pipe);
pipe->terminate (false);
if (outpipe)
outpipe->terminate (false);
}
void zmq::session_base_t::detached ()
......
......@@ -52,6 +52,9 @@ namespace zmq
// To be used once only, when creating the session.
void attach_pipe (zmq::pipe_t *pipe_);
// To be used once only, for delayed connection
void onconnect_attach_pipe (pipe_t *pipe_);
// Following functions are the interface exposed towards the engine.
virtual int read (msg_t *msg_);
......@@ -104,6 +107,9 @@ namespace zmq
// Pipe connecting the session to its socket.
zmq::pipe_t *pipe;
// Pipe connecting the socket to the client
zmq::pipe_t *outpipe;
// This flag is true if the remainder of the message being processed
// is still in the in pipe.
bool incomplete_in;
......
......@@ -545,10 +545,13 @@ int zmq::socket_base_t::connect (const char *addr_)
icanhasall = true;
// Attach local end of the pipe to the socket object.
attach_pipe (pipes [0], icanhasall);
if (options.delay_attach_on_connect == 0)
attach_pipe (pipes [0], icanhasall);
// Attach remote end of the pipe to the session object later on.
session->attach_pipe (pipes [1]);
if (options.delay_attach_on_connect == 1)
session->onconnect_attach_pipe (pipes [0]);
// Save last endpoint URI
paddr->to_string (options.last_endpoint);
......
......@@ -13,6 +13,7 @@ noinst_PROGRAMS = test_pair_inproc \
test_invalid_rep \
test_msg_flags \
test_connect_resolve \
test_connect_delay \
test_last_endpoint \
test_term_endpoint \
test_monitor
......@@ -34,6 +35,7 @@ test_sub_forward_SOURCES = test_sub_forward.cpp
test_invalid_rep_SOURCES = test_invalid_rep.cpp
test_msg_flags_SOURCES = test_msg_flags.cpp
test_connect_resolve_SOURCES = test_connect_resolve.cpp
test_connect_delay_SOURCES = test_connect_delay.cpp
test_last_endpoint_SOURCES = test_last_endpoint.cpp
test_term_endpoint_SOURCES = test_term_endpoint.cpp
test_monitor_SOURCES = test_monitor.cpp
......
/*
Copyright (c) 2012 Ian Barber
Copyright (c) 2012 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <errno.h>
#include "../include/zmq.h"
int main (int argc, char *argv [])
{
fprintf (stderr, "test_connect_delay running...\n");
int val;
int rc;
char buffer[16];
int seen = 0;
void *context = zmq_ctx_new();
void *to = zmq_socket(context, ZMQ_PULL);
val = 0;
zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val));
zmq_bind(to, "tcp://*:5555");
// Create a socket pushing to two endpoints - only 1 message should arrive.
void *from = zmq_socket (context, ZMQ_PUSH);
val = 0;
zmq_setsockopt(from, ZMQ_LINGER, &val, sizeof(val));
rc = zmq_connect(from, "tcp://localhost:5556");
assert (rc == 0);
rc = zmq_connect(from, "tcp://localhost:5555");
assert (rc == 0);
for (int i = 0; i < 10; ++i)
{
std::string message("message ");
message += ('0' + i);
zmq_send(from, message.data(), message.size(), 0);
}
sleep(1);
seen = 0;
for (int i = 0; i < 10; ++i)
{
memset(&buffer, 0, sizeof(buffer));
rc = zmq_recv(to, &buffer, sizeof(buffer), ZMQ_DONTWAIT);
if( rc == -1)
break;
seen++;
}
assert (seen == 5);
rc = zmq_close (from);
assert (rc == 0);
rc = zmq_close (to);
assert (rc == 0);
rc = zmq_ctx_destroy(context);
assert (rc == 0);
context = zmq_ctx_new();
std::cout << " Rerunning with DELAY_ATTACH_ON_CONNECT\n";
to = zmq_socket(context, ZMQ_PULL);
zmq_bind(to, "tcp://*:5560");
val = 0;
zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val));
// Create a socket pushing to two endpoints - all messages should arrive.
from = zmq_socket (context, ZMQ_PUSH);
val = 0;
zmq_setsockopt(from, ZMQ_LINGER, &val, sizeof(val));
val = 1;
zmq_setsockopt(from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val));
rc = zmq_connect(from, "tcp://localhost:5561");
assert (rc == 0);
rc = zmq_connect(from, "tcp://localhost:5560");
assert (rc == 0);
for (int i = 0; i < 10; ++i)
{
std::string message("message ");
message += ('0' + i);
zmq_send(from, message.data(), message.size(), 0);
}
sleep(1);
seen = 0;
for (int i = 0; i < 10; ++i)
{
memset(&buffer, 0, sizeof(buffer));
rc = zmq_recv(to, &buffer, sizeof(buffer), ZMQ_DONTWAIT);
if( rc == -1) {
break;
}
seen++;
}
assert (seen == 10);
rc = zmq_close (from);
assert (rc == 0);
rc = zmq_close (to);
assert (rc == 0);
rc = zmq_ctx_destroy(context);
assert (rc == 0);
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment