Commit ce7a9a58 authored by Martin Sustrik's avatar Martin Sustrik

Setting TCP socket options moved to tcp_engine_t

Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent 588c7287
...@@ -205,12 +205,6 @@ int zmq::tcp_connecter_t::open () ...@@ -205,12 +205,6 @@ int zmq::tcp_connecter_t::open ()
int rc = ioctlsocket (s, FIONBIO, &argp); int rc = ioctlsocket (s, FIONBIO, &argp);
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
// Disable Nagle's algorithm.
int flag = 1;
rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag,
sizeof (int));
wsa_assert (rc != SOCKET_ERROR);
// Connect to the remote peer. // Connect to the remote peer.
rc = ::connect (s, (sockaddr*) &addr, addr_len); rc = ::connect (s, (sockaddr*) &addr, addr_len);
...@@ -301,20 +295,6 @@ int zmq::tcp_connecter_t::open () ...@@ -301,20 +295,6 @@ int zmq::tcp_connecter_t::open ()
errno_assert (rc != -1); errno_assert (rc != -1);
#endif #endif
// Disable Nagle's algorithm.
int flag = 1;
rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag,
sizeof (int));
errno_assert (rc == 0);
#ifdef ZMQ_HAVE_OPENVMS
// Disable delayed acknowledgements.
flag = 1;
rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &flag,
sizeof (int));
errno_assert (rc != SOCKET_ERROR);
#endif
// Connect to the remote peer. // Connect to the remote peer.
rc = ::connect (s, (struct sockaddr*) &addr, addr_len); rc = ::connect (s, (struct sockaddr*) &addr, addr_len);
......
...@@ -41,7 +41,7 @@ ...@@ -41,7 +41,7 @@
#include "err.hpp" #include "err.hpp"
zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) : zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) :
s (retired_fd), s (fd_),
inpos (NULL), inpos (NULL),
insize (0), insize (0),
decoder (in_batch_size, options_.maxmsgsize), decoder (in_batch_size, options_.maxmsgsize),
...@@ -53,17 +53,88 @@ zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) : ...@@ -53,17 +53,88 @@ zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) :
options (options_), options (options_),
plugged (false) plugged (false)
{ {
// Initialise the underlying socket. int rc;
int rc = open (fd_, options.sndbuf, options.rcvbuf);
zmq_assert (rc == 0); // Set the socket to the non-blocking mode.
#ifdef ZMQ_HAVE_WINDOWS
u_long nonblock = 1;
rc = ioctlsocket (s, FIONBIO, &nonblock);
wsa_assert (rc != SOCKET_ERROR);
#elif ZMQ_HAVE_OPENVMS
int nonblock = 1;
rc = ioctl (s, FIONBIO, &nonblock);
errno_assert (rc != -1);
#else
int flags = fcntl (s, F_GETFL, 0);
if (flags == -1)
flags = 0;
rc = fcntl (s, F_SETFL, flags | O_NONBLOCK);
errno_assert (rc != -1);
#endif
// Set the socket buffer limits for the underlying socket.
if (options.sndbuf) {
rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF,
(char*) &options.sndbuf, sizeof (int));
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
}
if (options.rcvbuf) {
rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF,
(char*) &options.rcvbuf, sizeof (int));
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
}
#if defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_FREEBSD
// Make sure that SIGPIPE signal is not generated when writing to a
// connection that was already closed by the peer.
int set = 1;
rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
errno_assert (rc == 0);
#endif
// Disable Nagle's algorithm. We are doing data batching on 0MQ level,
// so using Nagle wouldn't improve throughput in anyway, but it would
// hurt latency.
int nodelay = 1;
rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &nodelay,
sizeof (int));
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
#ifdef ZMQ_HAVE_OPENVMS
// Disable delayed acknowledgements as they hurt latency is serious manner.
int nodelack = 1;
rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &nodelack,
sizeof (int));
errno_assert (rc != SOCKET_ERROR);
#endif
} }
zmq::tcp_engine_t::~tcp_engine_t () zmq::tcp_engine_t::~tcp_engine_t ()
{ {
zmq_assert (!plugged); zmq_assert (!plugged);
if (s != retired_fd) if (s != retired_fd) {
close (); #ifdef ZMQ_HAVE_WINDOWS
int rc = closesocket (s);
wsa_assert (rc != SOCKET_ERROR);
#else
int rc = close (s);
errno_assert (rc == 0);
#endif
s = retired_fd;
}
} }
void zmq::tcp_engine_t::plug (io_thread_t *io_thread_, session_t *session_) void zmq::tcp_engine_t::plug (io_thread_t *io_thread_, session_t *session_)
...@@ -234,39 +305,10 @@ void zmq::tcp_engine_t::error () ...@@ -234,39 +305,10 @@ void zmq::tcp_engine_t::error ()
delete this; delete this;
} }
#ifdef ZMQ_HAVE_WINDOWS
int zmq::tcp_engine_t::open (fd_t fd_, int sndbuf_, int rcvbuf_)
{
zmq_assert (s == retired_fd);
s = fd_;
if (sndbuf_) {
int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF,
(char*) &sndbuf_, sizeof (int));
errno_assert (rc == 0);
}
if (rcvbuf_) {
int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF,
(char*) &rcvbuf_, sizeof (int));
errno_assert (rc == 0);
}
return 0;
}
int zmq::tcp_engine_t::close ()
{
zmq_assert (s != retired_fd);
int rc = closesocket (s);
wsa_assert (rc != SOCKET_ERROR);
s = retired_fd;
return 0;
}
int zmq::tcp_engine_t::write (const void *data_, size_t size_) int zmq::tcp_engine_t::write (const void *data_, size_t size_)
{ {
#ifdef ZMQ_HAVE_WINDOWS
int nbytes = send (s, (char*) data_, (int) size_, 0); int nbytes = send (s, (char*) data_, (int) size_, 0);
// If not a single byte can be written to the socket in non-blocking mode // If not a single byte can be written to the socket in non-blocking mode
...@@ -285,12 +327,33 @@ int zmq::tcp_engine_t::write (const void *data_, size_t size_) ...@@ -285,12 +327,33 @@ int zmq::tcp_engine_t::write (const void *data_, size_t size_)
return -1; return -1;
wsa_assert (nbytes != SOCKET_ERROR); wsa_assert (nbytes != SOCKET_ERROR);
return (size_t) nbytes;
#else
ssize_t nbytes = send (s, data_, size_, 0);
// Several errors are OK. When speculative write is being done we may not
// be able to write a single byte from the socket. Also, SIGSTOP issued
// by a debugging tool can result in EINTR error.
if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
errno == EINTR))
return 0;
// Signalise peer failure.
if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE))
return -1;
errno_assert (nbytes != -1);
return (size_t) nbytes; return (size_t) nbytes;
#endif
} }
int zmq::tcp_engine_t::read (void *data_, size_t size_) int zmq::tcp_engine_t::read (void *data_, size_t size_)
{ {
#ifdef ZMQ_HAVE_WINDOWS
int nbytes = recv (s, (char*) data_, (int) size_, 0); int nbytes = recv (s, (char*) data_, (int) size_, 0);
// If not a single byte can be read from the socket in non-blocking mode // If not a single byte can be read from the socket in non-blocking mode
...@@ -316,69 +379,13 @@ int zmq::tcp_engine_t::read (void *data_, size_t size_) ...@@ -316,69 +379,13 @@ int zmq::tcp_engine_t::read (void *data_, size_t size_)
return -1; return -1;
return (size_t) nbytes; return (size_t) nbytes;
}
#else #else
int zmq::tcp_engine_t::open (fd_t fd_, int sndbuf_, int rcvbuf_)
{
assert (s == retired_fd);
s = fd_;
if (sndbuf_) {
int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, &sndbuf_, sizeof (int));
errno_assert (rc == 0);
}
if (rcvbuf_) {
int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, &rcvbuf_, sizeof (int));
errno_assert (rc == 0);
}
#if defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_FREEBSD
int set = 1;
int rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
errno_assert (rc == 0);
#endif
return 0;
}
int zmq::tcp_engine_t::close ()
{
zmq_assert (s != retired_fd);
int rc = ::close (s);
if (rc != 0)
return -1;
s = retired_fd;
return 0;
}
int zmq::tcp_engine_t::write (const void *data_, size_t size_)
{
ssize_t nbytes = send (s, data_, size_, 0);
// Several errors are OK. When speculative write is being done we may not
// be able to write a single byte to the socket. Also, SIGSTOP issued
// by a debugging tool can result in EINTR error.
if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
errno == EINTR))
return 0;
// Signalise peer failure.
if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE))
return -1;
errno_assert (nbytes != -1);
return (size_t) nbytes;
}
int zmq::tcp_engine_t::read (void *data_, size_t size_)
{
ssize_t nbytes = recv (s, data_, size_, 0); ssize_t nbytes = recv (s, data_, size_, 0);
// Several errors are OK. When speculative read is being done we may not // Several errors are OK. When speculative read is being done we may not
// be able to read a single byte to the socket. Also, SIGSTOP issued // be able to read a single byte from the socket. Also, SIGSTOP issued
// by a debugging tool can result in EINTR error. // by a debugging tool can result in EINTR error.
if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
errno == EINTR)) errno == EINTR))
...@@ -396,6 +403,7 @@ int zmq::tcp_engine_t::read (void *data_, size_t size_) ...@@ -396,6 +403,7 @@ int zmq::tcp_engine_t::read (void *data_, size_t size_)
return -1; return -1;
return (size_t) nbytes; return (size_t) nbytes;
}
#endif #endif
}
...@@ -56,12 +56,6 @@ namespace zmq ...@@ -56,12 +56,6 @@ namespace zmq
// Function to handle network disconnections. // Function to handle network disconnections.
void error (); void error ();
// Associates a socket with a native socket descriptor.
int open (fd_t fd_, int sndbuf_, int rcvbuf_);
// Closes the underlying socket.
int close ();
// Writes data to the socket. Returns the number of bytes actually // Writes data to the socket. Returns the number of bytes actually
// written (even zero is to be considered to be a success). In case // written (even zero is to be considered to be a success). In case
// of error or orderly shutdown by the other peer -1 is returned. // of error or orderly shutdown by the other peer -1 is returned.
......
...@@ -133,11 +133,6 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_) ...@@ -133,11 +133,6 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_)
(const char*) &flag, sizeof (int)); (const char*) &flag, sizeof (int));
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
// Set the non-blocking flag.
u_long uflag = 1;
rc = ioctlsocket (s, FIONBIO, &uflag);
wsa_assert (rc != SOCKET_ERROR);
// Bind the socket to the network interface and port. // Bind the socket to the network interface and port.
rc = bind (s, (struct sockaddr*) &addr, addr_len); rc = bind (s, (struct sockaddr*) &addr, addr_len);
if (rc == SOCKET_ERROR) { if (rc == SOCKET_ERROR) {
...@@ -182,12 +177,6 @@ zmq::fd_t zmq::tcp_listener_t::accept () ...@@ -182,12 +177,6 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
int rc = ioctlsocket (sock, FIONBIO, &argp); int rc = ioctlsocket (sock, FIONBIO, &argp);
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
// Disable Nagle's algorithm.
int flag = 1;
rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag,
sizeof (int));
wsa_assert (rc != SOCKET_ERROR);
return sock; return sock;
} }
...@@ -212,19 +201,6 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_) ...@@ -212,19 +201,6 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_)
rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
errno_assert (rc == 0); errno_assert (rc == 0);
// Set the non-blocking flag.
#ifdef ZMQ_HAVE_OPENVMS
flag = 1;
rc = ioctl (s, FIONBIO, &flag);
errno_assert (rc != -1);
#else
flag = fcntl (s, F_GETFL, 0);
if (flag == -1)
flag = 0;
rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
errno_assert (rc != -1);
#endif
// Bind the socket to the network interface and port. // Bind the socket to the network interface and port.
rc = bind (s, (struct sockaddr*) &addr, addr_len); rc = bind (s, (struct sockaddr*) &addr, addr_len);
if (rc != 0) { if (rc != 0) {
...@@ -369,24 +345,6 @@ zmq::fd_t zmq::tcp_listener_t::accept () ...@@ -369,24 +345,6 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
errno_assert (rc != -1); errno_assert (rc != -1);
#endif #endif
struct sockaddr *sa = (struct sockaddr*) &addr;
if (AF_UNIX != sa->sa_family) {
// Disable Nagle's algorithm.
int flag = 1;
rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag,
sizeof (int));
errno_assert (rc == 0);
#ifdef ZMQ_HAVE_OPENVMS
// Disable delayed acknowledgements.
flag = 1;
rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELACK, (char*) &flag,
sizeof (int));
errno_assert (rc != SOCKET_ERROR);
#endif
}
return sock; return sock;
} }
......
...@@ -225,33 +225,6 @@ zmq::fd_t zmq::vtcp_connecter_t::connect () ...@@ -225,33 +225,6 @@ zmq::fd_t zmq::vtcp_connecter_t::connect ()
return retired_fd; return retired_fd;
} }
// Set to non-blocking mode.
#ifdef ZMQ_HAVE_OPENVMS
int flags = 1;
rc = ioctl (s, FIONBIO, &flags);
errno_assert (rc != -1);
#else
int flags = fcntl (s, F_GETFL, 0);
if (flags == -1)
flags = 0;
rc = fcntl (s, F_SETFL, flags | O_NONBLOCK);
errno_assert (rc != -1);
#endif
// Disable Nagle's algorithm.
int flag = 1;
rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag,
sizeof (int));
errno_assert (rc == 0);
#ifdef ZMQ_HAVE_OPENVMS
// Disable delayed acknowledgements.
flag = 1;
rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &flag,
sizeof (int));
errno_assert (rc != SOCKET_ERROR);
#endif
fd_t result = s; fd_t result = s;
s = retired_fd; s = retired_fd;
return result; return result;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment