Commit 3bcaea53 authored by Simon Giesecke's avatar Simon Giesecke

Problem: zmq_fdpair function is very long

Solution: Extract TCP/IP loopback variant into zmq_fdpair_tcpip
parent 51bf2aff
...@@ -316,29 +316,9 @@ static void tune_socket (const SOCKET socket_) ...@@ -316,29 +316,9 @@ static void tune_socket (const SOCKET socket_)
zmq::tcp_tune_loopback_fast_path (socket_); zmq::tcp_tune_loopback_fast_path (socket_);
} }
#endif
int zmq::make_fdpair (fd_t *r_, fd_t *w_) static int make_fdpair_tcpip (zmq::fd_t *r_, zmq::fd_t *w_)
{ {
#if defined ZMQ_HAVE_EVENTFD
int flags = 0;
#if defined ZMQ_HAVE_EVENTFD_CLOEXEC
// Setting this option result in sane behaviour when exec() functions
// are used. Old sockets are closed and don't block TCP ports, avoid
// leaks, etc.
flags |= EFD_CLOEXEC;
#endif
fd_t fd = eventfd (0, flags);
if (fd == -1) {
errno_assert (errno == ENFILE || errno == EMFILE);
*w_ = *r_ = -1;
return -1;
} else {
*w_ = *r_ = fd;
return 0;
}
#elif defined ZMQ_HAVE_WINDOWS
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP #if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
// Windows CE does not manage security attributes // Windows CE does not manage security attributes
SECURITY_DESCRIPTOR sd; SECURITY_DESCRIPTOR sd;
...@@ -367,7 +347,7 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_) ...@@ -367,7 +347,7 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
// Otherwise use Mutex implementation. // Otherwise use Mutex implementation.
int event_signaler_port = 5905; int event_signaler_port = 5905;
if (signaler_port == event_signaler_port) { if (zmq::signaler_port == event_signaler_port) {
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP #if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
sync = sync =
CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync"); CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
...@@ -380,14 +360,14 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_) ...@@ -380,14 +360,14 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
L"Global\\zmq-signaler-port-sync"); L"Global\\zmq-signaler-port-sync");
win_assert (sync != NULL); win_assert (sync != NULL);
} else if (signaler_port != 0) { } else if (zmq::signaler_port != 0) {
wchar_t mutex_name[MAX_PATH]; wchar_t mutex_name[MAX_PATH];
#ifdef __MINGW32__ #ifdef __MINGW32__
_snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", _snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d",
signaler_port); signaler_port);
#else #else
swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d", swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d",
signaler_port); zmq::signaler_port);
#endif #endif
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP #if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
...@@ -408,7 +388,7 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_) ...@@ -408,7 +388,7 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
// Create listening socket. // Create listening socket.
SOCKET listener; SOCKET listener;
listener = open_socket (AF_INET, SOCK_STREAM, 0); listener = zmq::open_socket (AF_INET, SOCK_STREAM, 0);
wsa_assert (listener != INVALID_SOCKET); wsa_assert (listener != INVALID_SOCKET);
// Set SO_REUSEADDR and TCP_NODELAY on listening socket. // Set SO_REUSEADDR and TCP_NODELAY on listening socket.
...@@ -425,10 +405,10 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_) ...@@ -425,10 +405,10 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
memset (&addr, 0, sizeof addr); memset (&addr, 0, sizeof addr);
addr.sin_family = AF_INET; addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
addr.sin_port = htons (signaler_port); addr.sin_port = htons (zmq::signaler_port);
// Create the writer socket. // Create the writer socket.
*w_ = open_socket (AF_INET, SOCK_STREAM, 0); *w_ = zmq::open_socket (AF_INET, SOCK_STREAM, 0);
wsa_assert (*w_ != INVALID_SOCKET); wsa_assert (*w_ != INVALID_SOCKET);
if (sync != NULL) { if (sync != NULL) {
...@@ -441,7 +421,7 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_) ...@@ -441,7 +421,7 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
rc = bind (listener, reinterpret_cast<const struct sockaddr *> (&addr), rc = bind (listener, reinterpret_cast<const struct sockaddr *> (&addr),
sizeof addr); sizeof addr);
if (rc != SOCKET_ERROR && signaler_port == 0) { if (rc != SOCKET_ERROR && zmq::signaler_port == 0) {
// Retrieve ephemeral port number // Retrieve ephemeral port number
int addrlen = sizeof addr; int addrlen = sizeof addr;
rc = getsockname (listener, reinterpret_cast<struct sockaddr *> (&addr), rc = getsockname (listener, reinterpret_cast<struct sockaddr *> (&addr),
...@@ -510,7 +490,7 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_) ...@@ -510,7 +490,7 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
if (sync != NULL) { if (sync != NULL) {
// Exit the critical section. // Exit the critical section.
BOOL brc; BOOL brc;
if (signaler_port == event_signaler_port) if (zmq::signaler_port == event_signaler_port)
brc = SetEvent (sync); brc = SetEvent (sync);
else else
brc = ReleaseMutex (sync); brc = ReleaseMutex (sync);
...@@ -522,7 +502,7 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_) ...@@ -522,7 +502,7 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
} }
if (*r_ != INVALID_SOCKET) { if (*r_ != INVALID_SOCKET) {
make_socket_noninheritable (*r_); zmq::make_socket_noninheritable (*r_);
return 0; return 0;
} }
// Cleanup writer if connection failed // Cleanup writer if connection failed
...@@ -532,10 +512,33 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_) ...@@ -532,10 +512,33 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
*w_ = INVALID_SOCKET; *w_ = INVALID_SOCKET;
} }
// Set errno from saved value // Set errno from saved value
errno = wsa_error_to_errno (saved_errno); errno = zmq::wsa_error_to_errno (saved_errno);
return -1; return -1;
}
#endif
int zmq::make_fdpair (fd_t *r_, fd_t *w_)
{
#if defined ZMQ_HAVE_EVENTFD
int flags = 0;
#if defined ZMQ_HAVE_EVENTFD_CLOEXEC
// Setting this option result in sane behaviour when exec() functions
// are used. Old sockets are closed and don't block TCP ports, avoid
// leaks, etc.
flags |= EFD_CLOEXEC;
#endif
fd_t fd = eventfd (0, flags);
if (fd == -1) {
errno_assert (errno == ENFILE || errno == EMFILE);
*w_ = *r_ = -1;
return -1;
} else {
*w_ = *r_ = fd;
return 0;
}
#elif defined ZMQ_HAVE_WINDOWS
return make_fdpair_tcpip (r_, w_);
#elif defined ZMQ_HAVE_OPENVMS #elif defined ZMQ_HAVE_OPENVMS
// Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further, // Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment