Commit 95cbad38 authored by Ian Barber's avatar Ian Barber

Revert "After speaking with Ben Gray and the discussion on the mailing list,…

Revert "After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost."

This reverts commit fe3fb419.
parent a563d494
...@@ -39,7 +39,6 @@ tests/test_invalid_rep ...@@ -39,7 +39,6 @@ tests/test_invalid_rep
tests/test_msg_flags tests/test_msg_flags
tests/test_ts_context tests/test_ts_context
tests/test_connect_resolve tests/test_connect_resolve
tests/test_connect_delay
tests/test_term_endpoint tests/test_term_endpoint
src/platform.hpp* src/platform.hpp*
src/stamp-h1 src/stamp-h1
......
...@@ -342,21 +342,6 @@ Default value:: 1 (true) ...@@ -342,21 +342,6 @@ Default value:: 1 (true)
Applicable socket types:: all, when using TCP transports. Applicable socket types:: all, when using TCP transports.
ZMQ_DELAY_ATTACH_ON_CONNECT
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Retrieve the state of the attach on connect value. If set to `1`, will delay the
attachment of a pipe on connect until the underlying connection has completed.
This will cause the socket to block if there are no other connections, but will
prevent queues from filling on pipes awaiting connection.
[horizontal]
Option value type:: int
Option value unit:: boolean
Default value:: 0 (false)
Applicable socket types:: all, primarily when using TCP/IPC transports.
ZMQ_FD: Retrieve file descriptor associated with the socket ZMQ_FD: Retrieve file descriptor associated with the socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_FD' option shall retrieve the file descriptor associated with the The 'ZMQ_FD' option shall retrieve the file descriptor associated with the
......
...@@ -351,19 +351,6 @@ Option value unit:: boolean ...@@ -351,19 +351,6 @@ Option value unit:: boolean
Default value:: 1 (true) Default value:: 1 (true)
Applicable socket types:: all, when using TCP transports. Applicable socket types:: all, when using TCP transports.
ZMQ_DELAY_ATTACH_ON_CONNECT
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If set to `1`, will delay the attachment of a pipe on connect until the underlying
connection has completed. This will cause the socket to block if there are no other
connections, but will prevent queues from filling on pipes awaiting connection.
[horizontal]
Option value type:: int
Option value unit:: boolean
Default value:: 0 (false)
Applicable socket types:: all, primarily when using TCP/IPC transports.
ZMQ_FAIL_UNROUTABLE: Set unroutable message behavior ZMQ_FAIL_UNROUTABLE: Set unroutable message behavior
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
......
...@@ -227,7 +227,6 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); ...@@ -227,7 +227,6 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_TCP_KEEPALIVE_IDLE 36 #define ZMQ_TCP_KEEPALIVE_IDLE 36
#define ZMQ_TCP_KEEPALIVE_INTVL 37 #define ZMQ_TCP_KEEPALIVE_INTVL 37
#define ZMQ_TCP_ACCEPT_FILTER 38 #define ZMQ_TCP_ACCEPT_FILTER 38
#define ZMQ_DELAY_ATTACH_ON_CONNECT 39
/* Message options */ /* Message options */
#define ZMQ_MORE 1 #define ZMQ_MORE 1
......
...@@ -41,7 +41,8 @@ zmq::lb_t::~lb_t () ...@@ -41,7 +41,8 @@ zmq::lb_t::~lb_t ()
void zmq::lb_t::attach (pipe_t *pipe_) void zmq::lb_t::attach (pipe_t *pipe_)
{ {
pipes.push_back (pipe_); pipes.push_back (pipe_);
activated (pipe_); pipes.swap (active, pipes.size () - 1);
active++;
} }
void zmq::lb_t::terminated (pipe_t *pipe_) void zmq::lb_t::terminated (pipe_t *pipe_)
......
...@@ -44,7 +44,6 @@ zmq::options_t::options_t () : ...@@ -44,7 +44,6 @@ zmq::options_t::options_t () :
rcvtimeo (-1), rcvtimeo (-1),
sndtimeo (-1), sndtimeo (-1),
ipv4only (1), ipv4only (1),
delay_attach_on_connect (0),
delay_on_close (true), delay_on_close (true),
delay_on_disconnect (true), delay_on_disconnect (true),
filter (false), filter (false),
...@@ -220,21 +219,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, ...@@ -220,21 +219,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return 0; return 0;
} }
case ZMQ_DELAY_ATTACH_ON_CONNECT:
{
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
int val = *((int*) optval_);
if (val != 0 && val != 1) {
errno = EINVAL;
return -1;
}
delay_attach_on_connect = val;
return 0;
}
case ZMQ_TCP_KEEPALIVE: case ZMQ_TCP_KEEPALIVE:
{ {
if (optvallen_ != sizeof (int)) { if (optvallen_ != sizeof (int)) {
...@@ -500,15 +484,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) ...@@ -500,15 +484,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (int); *optvallen_ = sizeof (int);
return 0; return 0;
case ZMQ_DELAY_ATTACH_ON_CONNECT:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
*((int*) optval_) = delay_attach_on_connect;
*optvallen_ = sizeof (int);
return 0;
case ZMQ_TCP_KEEPALIVE: case ZMQ_TCP_KEEPALIVE:
if (*optvallen_ < sizeof (int)) { if (*optvallen_ < sizeof (int)) {
errno = EINVAL; errno = EINVAL;
......
...@@ -96,10 +96,6 @@ namespace zmq ...@@ -96,10 +96,6 @@ namespace zmq
// possible to communicate with IPv6-only hosts. If 0, the socket can // possible to communicate with IPv6-only hosts. If 0, the socket can
// connect to and accept connections from both IPv4 and IPv6 hosts. // connect to and accept connections from both IPv4 and IPv6 hosts.
int ipv4only; int ipv4only;
// If 1, connecting pipes are not attached immediately, meaning a send()
// on a socket with only connecting pipes would block
int delay_attach_on_connect;
// If true, session reads all the pending messages from the pipe and // If true, session reads all the pending messages from the pipe and
// sends them to the network when socket is closed. // sends them to the network when socket is closed.
......
...@@ -111,7 +111,6 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, ...@@ -111,7 +111,6 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
io_object_t (io_thread_), io_object_t (io_thread_),
connect (connect_), connect (connect_),
pipe (NULL), pipe (NULL),
outpipe (NULL),
incomplete_in (false), incomplete_in (false),
pending (false), pending (false),
engine (NULL), engine (NULL),
...@@ -151,13 +150,6 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_) ...@@ -151,13 +150,6 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
pipe->set_event_sink (this); pipe->set_event_sink (this);
} }
void zmq::session_base_t::onconnect_attach_pipe (pipe_t *pipe_)
{
zmq_assert (!is_terminating ());
zmq_assert (pipe_);
outpipe = pipe_;
}
int zmq::session_base_t::read (msg_t *msg_) int zmq::session_base_t::read (msg_t *msg_)
{ {
// First message to send is identity (if required). // First message to send is identity (if required).
...@@ -237,12 +229,6 @@ void zmq::session_base_t::clean_pipes () ...@@ -237,12 +229,6 @@ void zmq::session_base_t::clean_pipes ()
void zmq::session_base_t::terminated (pipe_t *pipe_) void zmq::session_base_t::terminated (pipe_t *pipe_)
{ {
// If we get a term signal from our held outpipe
// we can safely ignore it.
if (pipe_ == outpipe) {
return;
}
// Drop the reference to the deallocated pipe. // Drop the reference to the deallocated pipe.
zmq_assert (pipe == pipe_); zmq_assert (pipe == pipe_);
pipe = NULL; pipe = NULL;
...@@ -324,12 +310,6 @@ void zmq::session_base_t::process_attach (i_engine *engine_) ...@@ -324,12 +310,6 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
send_bind (socket, pipes [1]); send_bind (socket, pipes [1]);
} }
if (outpipe && options.delay_attach_on_connect) {
send_bind (socket, outpipe);
// Forget the outpipe
outpipe = NULL;
}
// Plug in the engine. // Plug in the engine.
zmq_assert (!engine); zmq_assert (!engine);
engine = engine_; engine = engine_;
...@@ -378,12 +358,6 @@ void zmq::session_base_t::process_term (int linger_) ...@@ -378,12 +358,6 @@ void zmq::session_base_t::process_term (int linger_)
// Start pipe termination process. Delay the termination till all messages // Start pipe termination process. Delay the termination till all messages
// are processed in case the linger time is non-zero. // are processed in case the linger time is non-zero.
pipe->terminate (linger_ != 0); pipe->terminate (linger_ != 0);
// If we're storing to a to be connected, we can clear that as well
if (outpipe) {
outpipe->set_event_sink (this);
outpipe->terminate (linger_ != 0);
}
// TODO: Should this go into pipe_t::terminate ? // TODO: Should this go into pipe_t::terminate ?
// In case there's no engine and there's only delimiter in the // In case there's no engine and there's only delimiter in the
...@@ -411,9 +385,6 @@ void zmq::session_base_t::timer_event (int id_) ...@@ -411,9 +385,6 @@ void zmq::session_base_t::timer_event (int id_)
// Ask pipe to terminate even though there may be pending messages in it. // Ask pipe to terminate even though there may be pending messages in it.
zmq_assert (pipe); zmq_assert (pipe);
pipe->terminate (false); pipe->terminate (false);
if (outpipe)
outpipe->terminate (false);
} }
void zmq::session_base_t::detached () void zmq::session_base_t::detached ()
......
...@@ -52,9 +52,6 @@ namespace zmq ...@@ -52,9 +52,6 @@ namespace zmq
// To be used once only, when creating the session. // To be used once only, when creating the session.
void attach_pipe (zmq::pipe_t *pipe_); void attach_pipe (zmq::pipe_t *pipe_);
// To be used once only, for delayed connection
void onconnect_attach_pipe (pipe_t *pipe_);
// Following functions are the interface exposed towards the engine. // Following functions are the interface exposed towards the engine.
virtual int read (msg_t *msg_); virtual int read (msg_t *msg_);
...@@ -107,9 +104,6 @@ namespace zmq ...@@ -107,9 +104,6 @@ namespace zmq
// Pipe connecting the session to its socket. // Pipe connecting the session to its socket.
zmq::pipe_t *pipe; zmq::pipe_t *pipe;
// Pipe connecting the socket to the client
zmq::pipe_t *outpipe;
// This flag is true if the remainder of the message being processed // This flag is true if the remainder of the message being processed
// is still in the in pipe. // is still in the in pipe.
bool incomplete_in; bool incomplete_in;
......
...@@ -545,13 +545,10 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -545,13 +545,10 @@ int zmq::socket_base_t::connect (const char *addr_)
icanhasall = true; icanhasall = true;
// Attach local end of the pipe to the socket object. // Attach local end of the pipe to the socket object.
if (options.delay_attach_on_connect == 0) attach_pipe (pipes [0], icanhasall);
attach_pipe (pipes [0], icanhasall);
// Attach remote end of the pipe to the session object later on. // Attach remote end of the pipe to the session object later on.
session->attach_pipe (pipes [1]); session->attach_pipe (pipes [1]);
if (options.delay_attach_on_connect == 1)
session->onconnect_attach_pipe (pipes [0]);
// Save last endpoint URI // Save last endpoint URI
paddr->to_string (options.last_endpoint); paddr->to_string (options.last_endpoint);
......
...@@ -13,7 +13,6 @@ noinst_PROGRAMS = test_pair_inproc \ ...@@ -13,7 +13,6 @@ noinst_PROGRAMS = test_pair_inproc \
test_invalid_rep \ test_invalid_rep \
test_msg_flags \ test_msg_flags \
test_connect_resolve \ test_connect_resolve \
test_connect_delay \
test_last_endpoint \ test_last_endpoint \
test_term_endpoint \ test_term_endpoint \
test_monitor test_monitor
...@@ -35,7 +34,6 @@ test_sub_forward_SOURCES = test_sub_forward.cpp ...@@ -35,7 +34,6 @@ test_sub_forward_SOURCES = test_sub_forward.cpp
test_invalid_rep_SOURCES = test_invalid_rep.cpp test_invalid_rep_SOURCES = test_invalid_rep.cpp
test_msg_flags_SOURCES = test_msg_flags.cpp test_msg_flags_SOURCES = test_msg_flags.cpp
test_connect_resolve_SOURCES = test_connect_resolve.cpp test_connect_resolve_SOURCES = test_connect_resolve.cpp
test_connect_delay_SOURCES = test_connect_delay.cpp
test_last_endpoint_SOURCES = test_last_endpoint.cpp test_last_endpoint_SOURCES = test_last_endpoint.cpp
test_term_endpoint_SOURCES = test_term_endpoint.cpp test_term_endpoint_SOURCES = test_term_endpoint.cpp
test_monitor_SOURCES = test_monitor.cpp test_monitor_SOURCES = test_monitor.cpp
......
/*
Copyright (c) 2012 Ian Barber
Copyright (c) 2012 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <errno.h>
#include "../include/zmq.h"
int main (int argc, char *argv [])
{
fprintf (stderr, "test_connect_delay running...\n");
int val;
int rc;
char buffer[16];
int seen = 0;
void *context = zmq_ctx_new();
void *to = zmq_socket(context, ZMQ_PULL);
val = 0;
zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val));
zmq_bind(to, "tcp://*:5555");
// Create a socket pushing to two endpoints - only 1 message should arrive.
void *from = zmq_socket (context, ZMQ_PUSH);
val = 0;
zmq_setsockopt(from, ZMQ_LINGER, &val, sizeof(val));
rc = zmq_connect(from, "tcp://localhost:5556");
assert (rc == 0);
rc = zmq_connect(from, "tcp://localhost:5555");
assert (rc == 0);
for (int i = 0; i < 10; ++i)
{
std::string message("message ");
message += ('0' + i);
zmq_send(from, message.data(), message.size(), 0);
}
sleep(1);
seen = 0;
for (int i = 0; i < 10; ++i)
{
memset(&buffer, 0, sizeof(buffer));
rc = zmq_recv(to, &buffer, sizeof(buffer), ZMQ_DONTWAIT);
if( rc == -1)
break;
seen++;
}
assert (seen == 5);
rc = zmq_close (from);
assert (rc == 0);
rc = zmq_close (to);
assert (rc == 0);
rc = zmq_ctx_destroy(context);
assert (rc == 0);
context = zmq_ctx_new();
std::cout << " Rerunning with DELAY_ATTACH_ON_CONNECT\n";
to = zmq_socket(context, ZMQ_PULL);
zmq_bind(to, "tcp://*:5560");
val = 0;
zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val));
// Create a socket pushing to two endpoints - all messages should arrive.
from = zmq_socket (context, ZMQ_PUSH);
val = 0;
zmq_setsockopt(from, ZMQ_LINGER, &val, sizeof(val));
val = 1;
zmq_setsockopt(from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val));
rc = zmq_connect(from, "tcp://localhost:5561");
assert (rc == 0);
rc = zmq_connect(from, "tcp://localhost:5560");
assert (rc == 0);
for (int i = 0; i < 10; ++i)
{
std::string message("message ");
message += ('0' + i);
zmq_send(from, message.data(), message.size(), 0);
}
sleep(1);
seen = 0;
for (int i = 0; i < 10; ++i)
{
memset(&buffer, 0, sizeof(buffer));
rc = zmq_recv(to, &buffer, sizeof(buffer), ZMQ_DONTWAIT);
if( rc == -1) {
break;
}
seen++;
}
assert (seen == 10);
rc = zmq_close (from);
assert (rc == 0);
rc = zmq_close (to);
assert (rc == 0);
rc = zmq_ctx_destroy(context);
assert (rc == 0);
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment