Commit 3cddcbaa authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #761 from claws/add_dscp_sockopt

Add ability to set Differential Services Code Point socket option
parents b91ef997 a4385e61
...@@ -82,6 +82,7 @@ tests/test_shutdown_stress_tipc ...@@ -82,6 +82,7 @@ tests/test_shutdown_stress_tipc
tests/test_sub_forward_tipc tests/test_sub_forward_tipc
tests/test_term_endpoint_tipc tests/test_term_endpoint_tipc
tests/test_many_sockets tests/test_many_sockets
tests/test_diffserv
tests/test*.log tests/test*.log
tests/test*.trs tests/test*.trs
src/platform.hpp* src/platform.hpp*
......
...@@ -352,6 +352,17 @@ Default value:: 1 (true) ...@@ -352,6 +352,17 @@ Default value:: 1 (true)
Applicable socket types:: all, when using TCP transports. Applicable socket types:: all, when using TCP transports.
ZMQ_TOS: Retrieve the Type-of-Service socket override status
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Retrieve the IP_TOS option for the socket.
[horizontal]
Option value type:: int
Option value unit:: >0
Default value:: 0
Applicable socket types:: all, only for connection-oriented transports
ZMQ_IMMEDIATE: Retrieve attach-on-connect value ZMQ_IMMEDIATE: Retrieve attach-on-connect value
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Retrieve the state of the attach on connect value. If set to `1`, will delay the Retrieve the state of the attach on connect value. If set to `1`, will delay the
......
...@@ -13,8 +13,8 @@ SYNOPSIS ...@@ -13,8 +13,8 @@ SYNOPSIS
*int zmq_setsockopt (void '*socket', int 'option_name', const void '*option_value', size_t 'option_len');* *int zmq_setsockopt (void '*socket', int 'option_name', const void '*option_value', size_t 'option_len');*
Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE, Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE,
ZMQ_LINGER, ZMQ_ROUTER_HANDOVER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE_ROUTER, ZMQ_LINGER, ZMQ_ROUTER_HANDOVER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE_ROUTER,
ZMQ_XPUB_VERBOSE, ZMQ_REQ_CORRELATE, and ZMQ_REQ_RELAXED, only take effect for ZMQ_XPUB_VERBOSE, ZMQ_REQ_CORRELATE, and ZMQ_REQ_RELAXED, only take effect for
subsequent socket bind/connects. subsequent socket bind/connects.
Specifically, security options take effect for subsequent bind/connect calls, Specifically, security options take effect for subsequent bind/connect calls,
...@@ -377,6 +377,21 @@ Default value:: 1 (true) ...@@ -377,6 +377,21 @@ Default value:: 1 (true)
Applicable socket types:: all, when using TCP transports. Applicable socket types:: all, when using TCP transports.
ZMQ_TOS: Set the Type-of-Service on socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the ToS fields (Differentiated services (DS) and Explicit Congestion Notification
(ECN) field of the IP header. The ToS field is typically used to specify a packets
priority. The availability of this option is dependent on intermediate network
equipment that inspect the ToS field andprovide a path for low-delay, high-throughput,
highly-reliable service, etc.
[horizontal]
Option value type:: int
Option value unit:: >0
Default value:: 0
Applicable socket types:: all, only for connection-oriented transports
ZMQ_IMMEDIATE: Queue messages only to completed connections ZMQ_IMMEDIATE: Queue messages only to completed connections
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
...@@ -397,11 +412,11 @@ ZMQ_ROUTER_HANDOVER: handle peer identity name collisions on ROUTER sockets ...@@ -397,11 +412,11 @@ ZMQ_ROUTER_HANDOVER: handle peer identity name collisions on ROUTER sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the ROUTER socket behavior when it encounters peers with the same identity. Sets the ROUTER socket behavior when it encounters peers with the same identity.
By default, if two peers with the same identity connect to the same ROUTER By default, if two peers with the same identity connect to the same ROUTER
socket the results will be undefined. A value of `1` will cause the ROUTER socket the results will be undefined. A value of `1` will cause the ROUTER
socket to reassign the identity upon encountering an identity name collision. socket to reassign the identity upon encountering an identity name collision.
Specifically, the first peer to connect with that identity will be terminated Specifically, the first peer to connect with that identity will be terminated
and the second peer will receive any subsequent messages routed to that and the second peer will receive any subsequent messages routed to that
identity. identity.
Option value type:: int Option value type:: int
...@@ -480,7 +495,7 @@ Applicable socket types:: ZMQ_XPUB ...@@ -480,7 +495,7 @@ Applicable socket types:: ZMQ_XPUB
ZMQ_REQ_CORRELATE: match replies with requests ZMQ_REQ_CORRELATE: match replies with requests
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The default behavior of REQ sockets is to rely on the ordering of messages to The default behavior of REQ sockets is to rely on the ordering of messages to
match requests and responses and that is usually sufficient. When this option match requests and responses and that is usually sufficient. When this option
is set to 1, the REQ socket will prefix outgoing messages with an extra frame is set to 1, the REQ socket will prefix outgoing messages with an extra frame
containing a request id. That means the full message is (request id, 0, containing a request id. That means the full message is (request id, 0,
...@@ -506,7 +521,7 @@ The request-reply state machine is reset and a new request is sent to the ...@@ -506,7 +521,7 @@ The request-reply state machine is reset and a new request is sent to the
next available peer. next available peer.
If set to 1, also enable ZMQ_REQ_CORRELATE to ensure correct matching of If set to 1, also enable ZMQ_REQ_CORRELATE to ensure correct matching of
requests and replies. Otherwise a late reply to an aborted request can be requests and replies. Otherwise a late reply to an aborted request can be
reported as the reply to the superseding request. reported as the reply to the superseding request.
[horizontal] [horizontal]
......
...@@ -289,6 +289,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); ...@@ -289,6 +289,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_CONFLATE 54 #define ZMQ_CONFLATE 54
#define ZMQ_ZAP_DOMAIN 55 #define ZMQ_ZAP_DOMAIN 55
#define ZMQ_ROUTER_HANDOVER 56 #define ZMQ_ROUTER_HANDOVER 56
#define ZMQ_TOS 57
/* Message options */ /* Message options */
#define ZMQ_MORE 1 #define ZMQ_MORE 1
......
...@@ -148,3 +148,18 @@ bool zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_) ...@@ -148,3 +148,18 @@ bool zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_)
ip_addr_ = host; ip_addr_ = host;
return true; return true;
} }
void zmq::set_ip_type_of_service (fd_t s_, int iptos)
{
(void) s_;
int rc = setsockopt(s_, IPPROTO_IP, IP_TOS, &iptos, sizeof(iptos));
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
}
...@@ -39,6 +39,9 @@ namespace zmq ...@@ -39,6 +39,9 @@ namespace zmq
// Socket sockfd_ must be connected. Returns true iff successful. // Socket sockfd_ must be connected. Returns true iff successful.
bool get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_); bool get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_);
// Sets the IP Type-Of-Service for the underlying socket
void set_ip_type_of_service (fd_t s_, int iptos);
} }
#endif #endif
...@@ -33,6 +33,7 @@ zmq::options_t::options_t () : ...@@ -33,6 +33,7 @@ zmq::options_t::options_t () :
multicast_hops (1), multicast_hops (1),
sndbuf (0), sndbuf (0),
rcvbuf (0), rcvbuf (0),
tos (0),
type (-1), type (-1),
linger (-1), linger (-1),
reconnect_ivl (100), reconnect_ivl (100),
...@@ -125,6 +126,13 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, ...@@ -125,6 +126,13 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
} }
break; break;
case ZMQ_TOS:
if (is_int && value >= 0) {
tos = value;
return 0;
}
break;
case ZMQ_LINGER: case ZMQ_LINGER:
if (is_int && value >= -1) { if (is_int && value >= -1) {
linger = value; linger = value;
...@@ -424,6 +432,12 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) ...@@ -424,6 +432,12 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
} }
break; break;
case ZMQ_TOS:
if (is_int) {
*value = tos;
return 0;
}
break;
case ZMQ_TYPE: case ZMQ_TYPE:
if (is_int) { if (is_int) {
*value = type; *value = type;
......
...@@ -66,6 +66,9 @@ namespace zmq ...@@ -66,6 +66,9 @@ namespace zmq
int sndbuf; int sndbuf;
int rcvbuf; int rcvbuf;
// Type of service (containing DSCP and ECN socket options)
int tos;
// Socket type. // Socket type.
int type; int type;
......
...@@ -190,14 +190,14 @@ int zmq::tcp_connecter_t::get_new_reconnect_ivl () ...@@ -190,14 +190,14 @@ int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
// Only change the current reconnect interval if the maximum reconnect // Only change the current reconnect interval if the maximum reconnect
// interval was set and if it's larger than the reconnect interval. // interval was set and if it's larger than the reconnect interval.
if (options.reconnect_ivl_max > 0 && if (options.reconnect_ivl_max > 0 &&
options.reconnect_ivl_max > options.reconnect_ivl) { options.reconnect_ivl_max > options.reconnect_ivl) {
// Calculate the next interval // Calculate the next interval
current_reconnect_ivl = current_reconnect_ivl * 2; current_reconnect_ivl = current_reconnect_ivl * 2;
if(current_reconnect_ivl >= options.reconnect_ivl_max) { if(current_reconnect_ivl >= options.reconnect_ivl_max) {
current_reconnect_ivl = options.reconnect_ivl_max; current_reconnect_ivl = options.reconnect_ivl_max;
} }
} }
return this_interval; return this_interval;
} }
...@@ -223,6 +223,10 @@ int zmq::tcp_connecter_t::open () ...@@ -223,6 +223,10 @@ int zmq::tcp_connecter_t::open ()
if (addr->resolved.tcp_addr->family () == AF_INET6) if (addr->resolved.tcp_addr->family () == AF_INET6)
enable_ipv4_mapping (s); enable_ipv4_mapping (s);
// Set the IP Type-Of-Service priority for this socket
if (options.tos != 0)
set_ip_type_of_service (s, options.tos);
// Set the socket to non-blocking mode so that we get async connect(). // Set the socket to non-blocking mode so that we get async connect().
unblock_socket (s); unblock_socket (s);
...@@ -232,6 +236,10 @@ int zmq::tcp_connecter_t::open () ...@@ -232,6 +236,10 @@ int zmq::tcp_connecter_t::open ()
if (options.rcvbuf != 0) if (options.rcvbuf != 0)
set_tcp_receive_buffer (s, options.rcvbuf); set_tcp_receive_buffer (s, options.rcvbuf);
// Set the IP Type-Of-Service for the underlying socket
if (options.tos != 0)
set_ip_type_of_service (s, options.tos);
// Connect to the remote peer. // Connect to the remote peer.
int rc = ::connect ( int rc = ::connect (
s, addr->resolved.tcp_addr->addr (), s, addr->resolved.tcp_addr->addr (),
......
...@@ -100,7 +100,7 @@ void zmq::tcp_listener_t::in_event () ...@@ -100,7 +100,7 @@ void zmq::tcp_listener_t::in_event ()
io_thread_t *io_thread = choose_io_thread (options.affinity); io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread); zmq_assert (io_thread);
// Create and launch a session object. // Create and launch a session object.
session_base_t *session = session_base_t::create (io_thread, false, socket, session_base_t *session = session_base_t::create (io_thread, false, socket,
options, NULL); options, NULL);
errno_assert (session); errno_assert (session);
...@@ -188,6 +188,10 @@ int zmq::tcp_listener_t::set_address (const char *addr_) ...@@ -188,6 +188,10 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
if (address.family () == AF_INET6) if (address.family () == AF_INET6)
enable_ipv4_mapping (s); enable_ipv4_mapping (s);
// Set the IP Type-Of-Service for the underlying socket
if (options.tos != 0)
set_ip_type_of_service (s, options.tos);
// Set the socket buffer limits for the underlying socket. // Set the socket buffer limits for the underlying socket.
if (options.sndbuf != 0) if (options.sndbuf != 0)
set_tcp_send_buffer (s, options.sndbuf); set_tcp_send_buffer (s, options.sndbuf);
...@@ -300,5 +304,9 @@ zmq::fd_t zmq::tcp_listener_t::accept () ...@@ -300,5 +304,9 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
} }
} }
// Set the IP Type-Of-Service priority for this client socket
if (options.tos != 0)
set_ip_type_of_service (sock, options.tos);
return sock; return sock;
} }
...@@ -42,7 +42,8 @@ noinst_PROGRAMS = test_system \ ...@@ -42,7 +42,8 @@ noinst_PROGRAMS = test_system \
test_issue_566 \ test_issue_566 \
test_proxy \ test_proxy \
test_abstract_ipc \ test_abstract_ipc \
test_many_sockets test_many_sockets \
test_diffserv
if !ON_MINGW if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \ noinst_PROGRAMS += test_shutdown_stress \
...@@ -103,6 +104,7 @@ test_issue_566_SOURCES = test_issue_566.cpp ...@@ -103,6 +104,7 @@ test_issue_566_SOURCES = test_issue_566.cpp
test_proxy_SOURCES = test_proxy.cpp test_proxy_SOURCES = test_proxy.cpp
test_abstract_ipc_SOURCES = test_abstract_ipc.cpp test_abstract_ipc_SOURCES = test_abstract_ipc.cpp
test_many_sockets_SOURCES = test_many_sockets.cpp test_many_sockets_SOURCES = test_many_sockets.cpp
test_diffserv_SOURCES = test_diffserv.cpp
if !ON_MINGW if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp
......
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
int main (void)
{
int rc;
int tos = 0x28;
int o_tos;
size_t tos_size = sizeof(tos);
setup_test_environment();
void *ctx = zmq_ctx_new ();
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_PAIR);
assert (sb);
rc = zmq_setsockopt (sb, ZMQ_TOS, &tos, tos_size);
assert (rc == 0);
rc = zmq_bind (sb, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = zmq_getsockopt (sb, ZMQ_TOS, &o_tos, &tos_size);
assert (rc == 0);
assert (o_tos == tos);
void *sc = zmq_socket (ctx, ZMQ_PAIR);
assert (sc);
tos = 0x58;
rc = zmq_setsockopt (sc, ZMQ_TOS, &tos, tos_size);
assert (rc == 0);
rc = zmq_connect (sc, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = zmq_getsockopt (sc, ZMQ_TOS, &o_tos, &tos_size);
assert (rc == 0);
assert (o_tos == tos);
// Wireshark can be used to verify that the server socket is
// using DSCP 0x28 in packets to the client while the client
// is using 0x58 in packets to the server.
bounce (sb, sc);
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment