Commit 538e5d47 authored by Ilya Kulakov's avatar Ilya Kulakov

Make VMCI to work on Windows.

select was improved to support multiple service providers on Windows.
it should be slightly faster because of optimized iteration
over selected sockets.
parent d7e59d61
This diff is collapsed.
......@@ -38,9 +38,10 @@
#include <stddef.h>
#include <vector>
#include <map>
#ifdef ZMQ_HAVE_WINDOWS
#include <winsock2.h>
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#elif defined ZMQ_HAVE_OPENVMS
#include <sys/types.h>
#include <sys/time.h>
......@@ -90,36 +91,68 @@ namespace zmq
// Main event loop.
void loop ();
// Reference to ZMQ context.
// Reference to ZMQ context.
const ctx_t &ctx;
// Internal state.
struct fds_set_t
{
fds_set_t ();
fds_set_t (const fds_set_t& other_);
fds_set_t& operator=(const fds_set_t& other_);
// Convinient method to descriptor from all sets.
void remove_fd (const fd_t& fd_);
fd_set read;
fd_set write;
fd_set error;
};
struct fd_entry_t
{
fd_t fd;
zmq::i_poll_events *events;
zmq::i_poll_events* events;
};
typedef std::vector<fd_entry_t> fd_entries_t;
// Checks if an fd_entry_t is retired.
static bool is_retired_fd (const fd_entry_t &entry);
#if defined ZMQ_HAVE_WINDOWS
struct family_entry_t
{
family_entry_t ();
// Set of file descriptors that are used to retrieve
// information for fd_set.
typedef std::vector <fd_entry_t> fd_set_t;
fd_set_t fds;
fd_entries_t fd_entries;
fds_set_t fds_set;
bool retired;
};
typedef std::map<u_short, family_entry_t> family_entries_t;
fd_set source_set_in;
fd_set source_set_out;
fd_set source_set_err;
struct wsa_events_t
{
wsa_events_t ();
~wsa_events_t ();
fd_set readfds;
fd_set writefds;
fd_set exceptfds;
// read, write, error and readwrite
WSAEVENT events [4];
};
#endif
// Maximum file descriptor.
#if defined ZMQ_HAVE_WINDOWS
family_entries_t family_entries;
// See loop for details.
family_entries_t::iterator current_family_entry_it;
#else
fd_entries_t fd_entries;
fds_set_t fds_set;
fd_t maxfd;
// If true, at least one file descriptor has retired.
bool retired;
#endif
#if defined ZMQ_HAVE_WINDOWS
// Socket's family or AF_UNSPEC on error.
static u_short get_fd_family (fd_t fd_);
#endif
// Checks if an fd_entry_t is retired.
static bool is_retired_fd (const fd_entry_t &entry);
// If true, thread is shutting down.
bool stopping;
......
......@@ -40,7 +40,7 @@ void zmq::tune_vmci_buffer_size (ctx_t *context_, fd_t sockfd_, uint64_t default
assert (family != -1);
if (default_size_ != 0) {
int rc = setsockopt (sockfd_, family, SO_VMCI_BUFFER_SIZE, &default_size_, sizeof default_size_);
int rc = setsockopt (sockfd_, family, SO_VMCI_BUFFER_SIZE, (char*) &default_size_, sizeof default_size_);
#if defined ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
......@@ -49,7 +49,7 @@ void zmq::tune_vmci_buffer_size (ctx_t *context_, fd_t sockfd_, uint64_t default
}
if (min_size_ != 0) {
int rc = setsockopt (sockfd_, family, SO_VMCI_BUFFER_SIZE, &min_size_, sizeof min_size_);
int rc = setsockopt (sockfd_, family, SO_VMCI_BUFFER_SIZE, (char*) &min_size_, sizeof min_size_);
#if defined ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
......@@ -58,7 +58,7 @@ void zmq::tune_vmci_buffer_size (ctx_t *context_, fd_t sockfd_, uint64_t default
}
if (max_size_ != 0) {
int rc = setsockopt (sockfd_, family, SO_VMCI_BUFFER_SIZE, &max_size_, sizeof max_size_);
int rc = setsockopt (sockfd_, family, SO_VMCI_BUFFER_SIZE, (char*) &max_size_, sizeof max_size_);
#if defined ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
......@@ -76,7 +76,7 @@ void zmq::tune_vmci_connect_timeout (ctx_t *context_, fd_t sockfd_, struct timev
int family = context_->get_vmci_socket_family ();
assert (family != -1);
int rc = setsockopt (sockfd_, family, SO_VMCI_CONNECT_TIMEOUT, &timeout_, sizeof timeout_);
int rc = setsockopt (sockfd_, family, SO_VMCI_CONNECT_TIMEOUT, (char*) &timeout_, sizeof timeout_);
#if defined ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
......
......@@ -32,7 +32,6 @@
#if defined ZMQ_HAVE_VMCI
#include <new>
#include <string>
#include "stream_engine.hpp"
#include "io_thread.hpp"
......@@ -118,7 +117,8 @@ void zmq::vmci_connecter_t::out_event ()
return;
}
tune_vmci_buffer_size (this->get_ctx (), fd, options.vmci_buffer_size, options.vmci_buffer_min_size, options.vmci_buffer_max_size);
tune_vmci_buffer_size (this->get_ctx (), fd, options.vmci_buffer_size,
options.vmci_buffer_min_size, options.vmci_buffer_max_size);
if (options.vmci_connect_timeout > 0)
{
......@@ -218,8 +218,15 @@ int zmq::vmci_connecter_t::open ()
// Create the socket.
s = open_socket (family, SOCK_STREAM, 0);
#ifdef ZMQ_HAVE_WINDOWS
if (s == INVALID_SOCKET) {
errno = wsa_error_to_errno(WSAGetLastError());
return -1;
}
#else
if (s == -1)
return -1;
#endif
// Set the non-blocking flag.
unblock_socket (s);
......@@ -233,12 +240,18 @@ int zmq::vmci_connecter_t::open ()
if (rc == 0)
return 0;
// Translate other error codes indicating asynchronous connect has been
// Translate error codes indicating asynchronous connect has been
// launched to a uniform EINPROGRESS.
if (rc == -1 && errno == EINTR) {
#ifdef ZMQ_HAVE_WINDOWS
const int error_code = WSAGetLastError();
if (error_code == WSAEINPROGRESS || error_code == WSAEWOULDBLOCK)
errno = EINPROGRESS;
return -1;
}
else
errno = wsa_error_to_errno(error_code);
#else
if (errno == EINTR)
errno = EINPROGRESS;
#endif
// Forward the error.
return -1;
......@@ -269,19 +282,45 @@ zmq::fd_t zmq::vmci_connecter_t::connect ()
socklen_t len = sizeof (err);
#endif
int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
// Assert if the error was caused by 0MQ bug.
// Networking problems are OK. No need to assert.
#ifdef ZMQ_HAVE_WINDOWS
zmq_assert(rc == 0);
if (err != 0) {
if (err != WSAECONNREFUSED
&& err != WSAETIMEDOUT
&& err != WSAECONNABORTED
&& err != WSAEHOSTUNREACH
&& err != WSAENETUNREACH
&& err != WSAENETDOWN
&& err != WSAEACCES
&& err != WSAEINVAL
&& err != WSAEADDRINUSE
&& err != WSAECONNRESET)
{
wsa_assert_no(err);
}
return retired_fd;
}
#else
// Following code should handle both Berkeley-derived socket
// implementations and Solaris.
if (rc == -1)
err = errno;
if (err != 0) {
// Assert if the error was caused by 0MQ bug.
// Networking problems are OK. No need to assert.
errno = err;
errno_assert (errno == ECONNREFUSED || errno == ECONNRESET ||
errno == ETIMEDOUT || errno == EHOSTUNREACH ||
errno == ENETUNREACH || errno == ENETDOWN);
errno_assert(
errno == ECONNREFUSED ||
errno == ECONNRESET ||
errno == ETIMEDOUT ||
errno == EHOSTUNREACH ||
errno == ENETUNREACH ||
errno == ENETDOWN ||
errno == EINVAL);
return retired_fd;
}
#endif
fd_t result = s;
s = retired_fd;
......
......@@ -27,8 +27,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __VMCI_CONNECTER_HPP_INCLUDED__
#define __VMCI_CONNECTER_HPP_INCLUDED__
#ifndef __ZMQ_VMCI_CONNECTER_HPP_INCLUDED__
#define __ZMQ_VMCI_CONNECTER_HPP_INCLUDED__
#include "platform.hpp"
......
......@@ -33,8 +33,6 @@
#include <new>
#include <string.h>
#include "stream_engine.hpp"
#include "vmci_address.hpp"
#include "io_thread.hpp"
......@@ -153,20 +151,46 @@ int zmq::vmci_listener_t::set_address (const char *addr_)
// Create a listening socket.
s = open_socket (this->get_ctx ()->get_vmci_socket_family (), SOCK_STREAM, 0);
#ifdef ZMQ_HAVE_WINDOWS
if (s == INVALID_SOCKET) {
errno = wsa_error_to_errno(WSAGetLastError());
return -1;
}
#if !defined _WIN32_WCE
// On Windows, preventing sockets to be inherited by child processes.
BOOL brc = SetHandleInformation((HANDLE)s, HANDLE_FLAG_INHERIT, 0);
win_assert(brc);
#endif
#else
if (s == -1)
return -1;
#endif
address.to_string (endpoint);
// Bind the socket.
rc = bind (s, address.addr (), address.addrlen ());
#ifdef ZMQ_HAVE_WINDOWS
if (rc == SOCKET_ERROR) {
errno = wsa_error_to_errno(WSAGetLastError());
goto error;
}
#else
if (rc != 0)
goto error;
#endif
// Listen for incoming connections.
rc = listen (s, options.backlog);
#ifdef ZMQ_HAVE_WINDOWS
if (rc == SOCKET_ERROR) {
errno = wsa_error_to_errno(WSAGetLastError());
goto error;
}
#else
if (rc != 0)
goto error;
#endif
socket->event_listening (endpoint, s);
return 0;
......@@ -199,12 +223,29 @@ zmq::fd_t zmq::vmci_listener_t::accept ()
// resources is considered valid and treated by ignoring the connection.
zmq_assert (s != retired_fd);
fd_t sock = ::accept (s, NULL, NULL);
#ifdef ZMQ_HAVE_WINDOWS
if (sock == INVALID_SOCKET) {
wsa_assert(WSAGetLastError() == WSAEWOULDBLOCK ||
WSAGetLastError() == WSAECONNRESET ||
WSAGetLastError() == WSAEMFILE ||
WSAGetLastError() == WSAENOBUFS);
return retired_fd;
}
#if !defined _WIN32_WCE
// On Windows, preventing sockets to be inherited by child processes.
BOOL brc = SetHandleInformation((HANDLE)sock, HANDLE_FLAG_INHERIT, 0);
win_assert(brc);
#endif
#else
if (sock == -1) {
errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
errno_assert(errno == EAGAIN || errno == EWOULDBLOCK ||
errno == EINTR || errno == ECONNABORTED || errno == EPROTO ||
errno == ENOBUFS || errno == ENOMEM || errno == EMFILE ||
errno == ENFILE);
return retired_fd;
}
#endif
// Race condition can cause socket not to be closed (if fork happens
// between accept and this point).
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment