Commit fe30cc6d authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #619 from ckamm/req-send-resets

Add ZMQ_REQ_SEND_RESETS option.
parents 9f4526f5 f5c59556
...@@ -59,6 +59,7 @@ tests/test_spec_rep ...@@ -59,6 +59,7 @@ tests/test_spec_rep
tests/test_spec_req tests/test_spec_req
tests/test_spec_router tests/test_spec_router
tests/test_req_request_ids tests/test_req_request_ids
tests/test_req_send_resets
src/platform.hpp* src/platform.hpp*
src/stamp-h1 src/stamp-h1
perf/local_lat perf/local_lat
......
...@@ -13,10 +13,10 @@ SYNOPSIS ...@@ -13,10 +13,10 @@ SYNOPSIS
*int zmq_setsockopt (void '*socket', int 'option_name', const void '*option_value', size_t 'option_len');* *int zmq_setsockopt (void '*socket', int 'option_name', const void '*option_value', size_t 'option_len');*
Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE, Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE,
ZMQ_LINGER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE_ROUTER, ZMQ_XPUB_VERBOSE only ZMQ_LINGER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE_ROUTER, ZMQ_XPUB_VERBOSE,
take effect for subsequent socket bind/connects. Specifically, security ZMQ_REQ_SEND_RESETS only take effect for subsequent socket bind/connects.
options take effect for subsequent binds/connects and can be changed at any Specifically, security options take effect for subsequent binds/connects and can be
time to affect subsequent binds and/or connects. changed at any time to affect subsequent binds and/or connects.
DESCRIPTION DESCRIPTION
----------- -----------
...@@ -476,6 +476,28 @@ Default value:: 0 ...@@ -476,6 +476,28 @@ Default value:: 0
Applicable socket types:: ZMQ_REQ Applicable socket types:: ZMQ_REQ
ZMQ_REQ_SEND_RESETS: reset request-reply sequence by sending another message
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
When set to 0, a REQ socket does not allow initiating a new request with
_zmq_send(3)_ until the reply to the previous one has been received.
When set to 1, sending another message is allowed and has the effect of
disconnecting the underlying connection to the peer from which the reply was
expected, triggering a reconnection attempt on transports that support it.
The request-reply state machine is reset and a new request is sent to the
next available peer.
When this option is enabled, also enable ZMQ_REQ_REQUEST_IDS to ensure correct
matching of requests and replies. Otherwise a late reply to an aborted request
can be reported as the reply to the superseding request.
[horizontal]
Option value type:: int
Option value unit:: 0, 1
Default value:: 0
Applicable socket types:: ZMQ_REQ
ZMQ_TCP_KEEPALIVE: Override SO_KEEPALIVE socket option ZMQ_TCP_KEEPALIVE: Override SO_KEEPALIVE socket option
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
......
...@@ -277,6 +277,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); ...@@ -277,6 +277,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_CURVE_SERVERKEY 50 #define ZMQ_CURVE_SERVERKEY 50
#define ZMQ_PROBE_ROUTER 51 #define ZMQ_PROBE_ROUTER 51
#define ZMQ_REQ_REQUEST_IDS 52 #define ZMQ_REQ_REQUEST_IDS 52
#define ZMQ_REQ_SEND_RESETS 53
/* Message options */ /* Message options */
#define ZMQ_MORE 1 #define ZMQ_MORE 1
......
...@@ -30,7 +30,8 @@ zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -30,7 +30,8 @@ zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
message_begins (true), message_begins (true),
reply_pipe (NULL), reply_pipe (NULL),
request_id_frames_enabled (false), request_id_frames_enabled (false),
request_id (generate_random()) request_id (generate_random()),
send_resets (false)
{ {
options.type = ZMQ_REQ; options.type = ZMQ_REQ;
} }
...@@ -42,12 +43,19 @@ zmq::req_t::~req_t () ...@@ -42,12 +43,19 @@ zmq::req_t::~req_t ()
int zmq::req_t::xsend (msg_t *msg_) int zmq::req_t::xsend (msg_t *msg_)
{ {
// If we've sent a request and we still haven't got the reply, // If we've sent a request and we still haven't got the reply,
// we can't send another request. // we can't send another request unless the send_resets option is enabled.
if (receiving_reply) { if (receiving_reply) {
if (!send_resets) {
errno = EFSM; errno = EFSM;
return -1; return -1;
} }
if (reply_pipe)
reply_pipe->terminate (false);
receiving_reply = false;
message_begins = true;
}
// First part of the request is the request identity. // First part of the request is the request identity.
if (message_begins) { if (message_begins) {
reply_pipe = NULL; reply_pipe = NULL;
...@@ -185,7 +193,7 @@ bool zmq::req_t::xhas_out () ...@@ -185,7 +193,7 @@ bool zmq::req_t::xhas_out ()
return dealer_t::xhas_out (); return dealer_t::xhas_out ();
} }
int zmq::req_t::xsetsockopt(int option_, const void *optval_, size_t optvallen_) int zmq::req_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_)
{ {
bool is_int = (optvallen_ == sizeof (int)); bool is_int = (optvallen_ == sizeof (int));
int value = is_int? *((int *) optval_): 0; int value = is_int? *((int *) optval_): 0;
...@@ -197,21 +205,35 @@ int zmq::req_t::xsetsockopt(int option_, const void *optval_, size_t optvallen_) ...@@ -197,21 +205,35 @@ int zmq::req_t::xsetsockopt(int option_, const void *optval_, size_t optvallen_)
} }
break; break;
case ZMQ_REQ_SEND_RESETS:
if (is_int && value >= 0) {
send_resets = value;
return 0;
}
break;
default: default:
break; break;
} }
return dealer_t::xsetsockopt(option_, optval_, optvallen_); return dealer_t::xsetsockopt (option_, optval_, optvallen_);
}
void zmq::req_t::xpipe_terminated (pipe_t *pipe_)
{
if (reply_pipe == pipe_)
reply_pipe = NULL;
dealer_t::xpipe_terminated (pipe_);
} }
int zmq::req_t::recv_reply_pipe (msg_t *msg_) int zmq::req_t::recv_reply_pipe (msg_t *msg_)
{ {
while (true) { while (true) {
pipe_t *pipe = NULL; pipe_t *pipe = NULL;
int rc = dealer_t::recvpipe(msg_, &pipe); int rc = dealer_t::recvpipe (msg_, &pipe);
if (rc != 0) if (rc != 0)
return rc; return rc;
if (pipe == reply_pipe) if (!reply_pipe || pipe == reply_pipe)
return 0; return 0;
} }
} }
......
...@@ -43,7 +43,8 @@ namespace zmq ...@@ -43,7 +43,8 @@ namespace zmq
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
int xsetsockopt(int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xpipe_terminated (zmq::pipe_t *pipe_);
protected: protected:
...@@ -71,8 +72,9 @@ namespace zmq ...@@ -71,8 +72,9 @@ namespace zmq
// request is sent. // request is sent.
uint32_t request_id; uint32_t request_id;
// If true, send() will reset its internal state instead of failing if // If true, send() will reset its internal state and terminate the
// a previous request is still pending. // reply_pipe's connection instead of failing if a previous request is
// still pending.
bool send_resets; bool send_resets;
req_t (const req_t&); req_t (const req_t&);
......
...@@ -30,7 +30,8 @@ noinst_PROGRAMS = test_pair_inproc \ ...@@ -30,7 +30,8 @@ noinst_PROGRAMS = test_pair_inproc \
test_spec_dealer \ test_spec_dealer \
test_spec_router \ test_spec_router \
test_spec_pushpull \ test_spec_pushpull \
test_req_request_ids test_req_request_ids \
test_req_send_resets
if !ON_MINGW if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \ noinst_PROGRAMS += test_shutdown_stress \
...@@ -67,6 +68,7 @@ test_spec_dealer_SOURCES = test_spec_dealer.cpp ...@@ -67,6 +68,7 @@ test_spec_dealer_SOURCES = test_spec_dealer.cpp
test_spec_router_SOURCES = test_spec_router.cpp test_spec_router_SOURCES = test_spec_router.cpp
test_spec_pushpull_SOURCES = test_spec_pushpull.cpp test_spec_pushpull_SOURCES = test_spec_pushpull.cpp
test_req_request_ids_SOURCES = test_req_request_ids.cpp test_req_request_ids_SOURCES = test_req_request_ids.cpp
test_req_send_resets_SOURCES = test_req_send_resets.cpp
if !ON_MINGW if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp
......
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <unistd.h>
#include <time.h>
#include "testutil.hpp"
int main (void)
{
void *ctx = zmq_ctx_new ();
assert (ctx);
void *req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
int enabled = 1;
int rc = zmq_setsockopt (req, ZMQ_REQ_SEND_RESETS, &enabled, sizeof (int));
assert (rc == 0);
rc = zmq_setsockopt (req, ZMQ_REQ_REQUEST_IDS, &enabled, sizeof (int));
assert (rc == 0);
rc = zmq_bind (req, "tcp://*:5555");
assert (rc == 0);
const size_t services = 5;
void *rep [services];
for (size_t peer = 0; peer < services; peer++) {
rep [peer] = zmq_socket (ctx, ZMQ_REP);
assert (rep [peer]);
int timeout = 100;
rc = zmq_setsockopt (rep [peer], ZMQ_RCVTIMEO, &timeout, sizeof (int));
assert (rc == 0);
rc = zmq_connect (rep [peer], "tcp://localhost:5555");
assert (rc == 0);
}
// We have to give the connects time to finish otherwise the requests
// will not properly round-robin. We could alternatively connect the
// REQ sockets to the REP sockets.
struct timespec t = { 0, 250 * 1000000 };
nanosleep (&t, NULL);
// Case 1: Second send() before a reply arrives in a pipe.
// Send a request, ensure it arrives, don't send a reply
s_send_seq (req, "A", "B", SEQ_END);
s_recv_seq (rep [0], "A", "B", SEQ_END);
// Send another request on the REQ socket
s_send_seq (req, "C", "D", SEQ_END);
s_recv_seq (rep [1], "C", "D", SEQ_END);
// Send a reply to the first request - that should be discarded by the REQ
s_send_seq (rep [0], "WRONG", SEQ_END);
// Send the expected reply
s_send_seq (rep [1], "OK", SEQ_END);
s_recv_seq (req, "OK", SEQ_END);
// Another standard req-rep cycle, just to check
s_send_seq (req, "E", SEQ_END);
s_recv_seq (rep [2], "E", SEQ_END);
s_send_seq (rep [2], "F", "G", SEQ_END);
s_recv_seq (req, "F", "G", SEQ_END);
// Case 2: Second send() after a reply is already in a pipe on the REQ.
// Send a request, ensure it arrives, send a reply
s_send_seq (req, "H", SEQ_END);
s_recv_seq (rep [3], "H", SEQ_END);
s_send_seq (rep [3], "BAD", SEQ_END);
// Wait for message to be there.
rc = zmq_poll (0, 0, 100);
assert (rc == 0);
// Without receiving that reply, send another request on the REQ socket
s_send_seq (req, "I", SEQ_END);
s_recv_seq (rep [4], "I", SEQ_END);
// Send the expected reply
s_send_seq (rep [4], "GOOD", SEQ_END);
s_recv_seq (req, "GOOD", SEQ_END);
close_zero_linger (req);
for (size_t peer = 0; peer < services; peer++)
close_zero_linger (rep [peer]);
// Wait for disconnects.
rc = zmq_poll (0, 0, 100);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment