Commit f63db009 authored by Martin Sustrik's avatar Martin Sustrik

Different listener implementations simplified

Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent b45fec33
...@@ -111,33 +111,17 @@ int zmq::ipc_listener_t::set_address (const char *addr_) ...@@ -111,33 +111,17 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
if (s == -1) if (s == -1)
return -1; return -1;
// Set the non-blocking flag.
int flag = fcntl (s, F_GETFL, 0);
if (flag == -1)
flag = 0;
rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
errno_assert (rc != -1);
// Bind the socket to the file path. // Bind the socket to the file path.
rc = bind (s, (struct sockaddr*) &addr, addr_len); rc = bind (s, (struct sockaddr*) &addr, addr_len);
if (rc != 0) { if (rc != 0)
int err = errno;
if (close () != 0)
return -1;
errno = err;
return -1; return -1;
}
has_file = true; has_file = true;
// Listen for incomming connections. // Listen for incomming connections.
rc = listen (s, options.backlog); rc = listen (s, options.backlog);
if (rc != 0) { if (rc != 0)
int err = errno;
if (close () != 0)
return -1;
errno = err;
return -1; return -1;
}
return 0; return 0;
} }
...@@ -164,44 +148,15 @@ int zmq::ipc_listener_t::close () ...@@ -164,44 +148,15 @@ int zmq::ipc_listener_t::close ()
zmq::fd_t zmq::ipc_listener_t::accept () zmq::fd_t zmq::ipc_listener_t::accept ()
{ {
// Accept one connection and deal with different failure modes.
zmq_assert (s != retired_fd); zmq_assert (s != retired_fd);
// Accept one incoming connection.
fd_t sock = ::accept (s, NULL, NULL); fd_t sock = ::accept (s, NULL, NULL);
if (sock == -1) {
#if (defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD || \ errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_OSX || \ errno == EINTR || errno == ECONNABORTED || errno == EPROTO ||
defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_NETBSD || \ errno == ENOBUFS);
defined ZMQ_HAVE_CYGWIN)
if (sock == -1 &&
(errno == EAGAIN || errno == EWOULDBLOCK ||
errno == EINTR || errno == ECONNABORTED))
return retired_fd;
#elif (defined ZMQ_HAVE_SOLARIS || defined ZMQ_HAVE_AIX)
if (sock == -1 &&
(errno == EWOULDBLOCK || errno == EINTR ||
errno == ECONNABORTED || errno == EPROTO))
return retired_fd;
#elif defined ZMQ_HAVE_HPUX
if (sock == -1 &&
(errno == EAGAIN || errno == EWOULDBLOCK ||
errno == EINTR || errno == ECONNABORTED || errno == ENOBUFS))
return retired_fd; return retired_fd;
#elif defined ZMQ_HAVE_QNXNTO }
if (sock == -1 &&
(errno == EWOULDBLOCK || errno == EINTR || errno == ECONNABORTED))
return retired_fd;
#endif
errno_assert (sock != -1);
// Set to non-blocking mode.
int flags = fcntl (s, F_GETFL, 0);
if (flags == -1)
flags = 0;
int rc = fcntl (sock, F_SETFL, flags | O_NONBLOCK);
errno_assert (rc != -1);
return sock; return sock;
} }
......
...@@ -107,134 +107,71 @@ void zmq::tcp_listener_t::in_event () ...@@ -107,134 +107,71 @@ void zmq::tcp_listener_t::in_event ()
send_attach (session, engine, false); send_attach (session, engine, false);
} }
void zmq::tcp_listener_t::close ()
{
zmq_assert (s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
int rc = closesocket (s);
wsa_assert (rc != SOCKET_ERROR);
#else
int rc = ::close (s);
errno_assert (rc == 0);
#endif
s = retired_fd;
}
int zmq::tcp_listener_t::set_address (const char *addr_) int zmq::tcp_listener_t::set_address (const char *addr_)
{ {
// Convert the interface into sockaddr_in structure. // Convert the interface into sockaddr_in structure.
int rc = resolve_ip_interface (&addr, &addr_len, addr_); int rc = resolve_ip_interface (&addr, &addr_len, addr_);
if (rc != 0) if (rc != 0)
return rc; return -1;
// Create a listening socket. // Create a listening socket.
s = ::socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); s = ::socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP);
#ifdef ZMQ_HAVE_WINDOWS
if (s == INVALID_SOCKET) { if (s == INVALID_SOCKET) {
wsa_error_to_errno (); wsa_error_to_errno ();
return -1; return -1;
} }
#else
if (s == -1)
return -1;
#endif
// Allow reusing of the address. // Allow reusing of the address.
int flag = 1; int flag = 1;
#ifdef ZMQ_HAVE_WINDOWS
rc = setsockopt (s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, rc = setsockopt (s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
(const char*) &flag, sizeof (int)); (const char*) &flag, sizeof (int));
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
#else
rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
errno_assert (rc == 0);
#endif
// Bind the socket to the network interface and port. // Bind the socket to the network interface and port.
rc = bind (s, (struct sockaddr*) &addr, addr_len); rc = bind (s, (struct sockaddr*) &addr, addr_len);
#ifdef ZMQ_HAVE_WINDOWS
if (rc == SOCKET_ERROR) { if (rc == SOCKET_ERROR) {
wsa_error_to_errno (); wsa_error_to_errno ();
return -1; return -1;
} }
// Listen for incomming connections.
rc = listen (s, options.backlog);
if (rc == SOCKET_ERROR) {
wsa_error_to_errno ();
return -1;
}
return 0;
}
int zmq::tcp_listener_t::close ()
{
zmq_assert (s != retired_fd);
int rc = closesocket (s);
wsa_assert (rc != SOCKET_ERROR);
s = retired_fd;
return 0;
}
zmq::fd_t zmq::tcp_listener_t::accept ()
{
zmq_assert (s != retired_fd);
// Accept one incoming connection.
fd_t sock = ::accept (s, NULL, NULL);
if (sock == INVALID_SOCKET &&
(WSAGetLastError () == WSAEWOULDBLOCK ||
WSAGetLastError () == WSAECONNRESET))
return retired_fd;
zmq_assert (sock != INVALID_SOCKET);
// Set to non-blocking mode.
unsigned long argp = 1;
int rc = ioctlsocket (sock, FIONBIO, &argp);
wsa_assert (rc != SOCKET_ERROR);
return sock;
}
#else #else
int zmq::tcp_listener_t::set_address (const char *addr_)
{
// Resolve the sockaddr to bind to.
int rc = resolve_ip_interface (&addr, &addr_len, addr_);
if (rc != 0) if (rc != 0)
return -1; return -1;
#endif
// Create a listening socket.
s = ::socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP);
if (s == -1)
return -1;
// Allow reusing of the address.
int flag = 1;
rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
errno_assert (rc == 0);
// Bind the socket to the network interface and port.
rc = bind (s, (struct sockaddr*) &addr, addr_len);
if (rc != 0) {
int err = errno;
if (close () != 0)
return -1;
errno = err;
return -1;
}
// Listen for incomming connections. // Listen for incomming connections.
rc = listen (s, options.backlog); rc = listen (s, options.backlog);
if (rc != 0) { #ifdef ZMQ_HAVE_WINDOWS
int err = errno; if (rc == SOCKET_ERROR) {
if (close () != 0) wsa_error_to_errno ();
return -1;
errno = err;
return -1; return -1;
} }
#else
return 0;
}
int zmq::tcp_listener_t::close ()
{
zmq_assert (s != retired_fd);
int rc = ::close (s);
if (rc != 0) if (rc != 0)
return -1; return -1;
s = retired_fd;
#ifndef ZMQ_HAVE_OPENVMS
// If there's an underlying UNIX domain socket, get rid of the file it
// is associated with.
struct sockaddr_un *su = (struct sockaddr_un*) &addr;
if (AF_UNIX == su->sun_family && has_file) {
rc = ::unlink(su->sun_path);
if (rc != 0)
return -1;
}
#endif #endif
return 0; return 0;
...@@ -242,51 +179,23 @@ int zmq::tcp_listener_t::close () ...@@ -242,51 +179,23 @@ int zmq::tcp_listener_t::close ()
zmq::fd_t zmq::tcp_listener_t::accept () zmq::fd_t zmq::tcp_listener_t::accept ()
{ {
// Accept one connection and deal with different failure modes.
zmq_assert (s != retired_fd); zmq_assert (s != retired_fd);
// Accept one incoming connection.
fd_t sock = ::accept (s, NULL, NULL); fd_t sock = ::accept (s, NULL, NULL);
#ifdef ZMQ_HAVE_WINDOWS
#if (defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD || \ if (sock == INVALID_SOCKET)
defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_OSX || \ wsa_assert (WSAGetLastError () == WSAEWOULDBLOCK ||
defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_NETBSD || \ WSAGetLastError () == WSAECONNRESET);
defined ZMQ_HAVE_CYGWIN)
if (sock == -1 &&
(errno == EAGAIN || errno == EWOULDBLOCK ||
errno == EINTR || errno == ECONNABORTED))
return retired_fd;
#elif (defined ZMQ_HAVE_SOLARIS || defined ZMQ_HAVE_AIX)
if (sock == -1 &&
(errno == EWOULDBLOCK || errno == EINTR ||
errno == ECONNABORTED || errno == EPROTO))
return retired_fd;
#elif defined ZMQ_HAVE_HPUX
if (sock == -1 &&
(errno == EAGAIN || errno == EWOULDBLOCK ||
errno == EINTR || errno == ECONNABORTED || errno == ENOBUFS))
return retired_fd;
#elif defined ZMQ_HAVE_QNXNTO
if (sock == -1 &&
(errno == EWOULDBLOCK || errno == EINTR || errno == ECONNABORTED))
return retired_fd; return retired_fd;
#endif }
errno_assert (sock != -1);
// Set to non-blocking mode.
#ifdef ZMQ_HAVE_OPENVMS
int flags = 1;
int rc = ioctl (sock, FIONBIO, &flags);
errno_assert (rc != -1);
#else #else
int flags = fcntl (s, F_GETFL, 0); if (sock == -1) {
if (flags == -1) errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
flags = 0; errno == EINTR || errno == ECONNABORTED || errno == EPROTO ||
int rc = fcntl (sock, F_SETFL, flags | O_NONBLOCK); errno == ENOBUFS);
errno_assert (rc != -1); return retired_fd;
}
#endif #endif
return sock; return sock;
} }
#endif
...@@ -51,7 +51,7 @@ namespace zmq ...@@ -51,7 +51,7 @@ namespace zmq
void in_event (); void in_event ();
// Close the listening socket. // Close the listening socket.
int close (); void close ();
// Accept the new connection. Returns the file descriptor of the // Accept the new connection. Returns the file descriptor of the
// newly created connection. The function may return retired_fd // newly created connection. The function may return retired_fd
......
...@@ -69,7 +69,7 @@ int zmq::vtcp_listener_t::set_address (const char *addr_) ...@@ -69,7 +69,7 @@ int zmq::vtcp_listener_t::set_address (const char *addr_)
uint16_t port = (uint16_t) atoi (port_str.c_str ()); uint16_t port = (uint16_t) atoi (port_str.c_str ());
uint32_t subport = (uint32_t) atoi (subport_str.c_str ()); uint32_t subport = (uint32_t) atoi (subport_str.c_str ());
// Srart listening. // Start listening.
s = vtcp_bind (port, subport); s = vtcp_bind (port, subport);
if (s == retired_fd) if (s == retired_fd)
return -1; return -1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment