Commit 68b13fbd authored by Ilya Kulakov's avatar Ilya Kulakov

Add the VMCI transport.

VMCI transport allows fast communication between the Host
and a virtual machine, between virtual machines on the same host,
and within a virtual machine (like IPC).

It requires VMware to be installed on the host and Guest Additions
to be installed on a guest.
parent 61f74e2d
......@@ -53,6 +53,7 @@ Gonzalo Diethelm
Guido Goldstein
Ian Barber
Ilja Golshtein
Ilya Kulakov
Ivo Danihelka
Jacob Rideout
Joe Thornber
......
......@@ -6,6 +6,7 @@ project(ZeroMQ)
list(INSERT CMAKE_MODULE_PATH 0 "${CMAKE_SOURCE_DIR}")
option(WITH_OPENPGM "Build with support for OpenPGM" OFF)
option(WITH_VMCI "Build with support for VMware VMCI socket" OFF)
if(APPLE)
option(ZMQ_BUILD_FRAMEWORK "Build as OS X framework" ON)
......@@ -502,6 +503,12 @@ if(WITH_OPENPGM)
set(OPTIONAL_LIBRARIES ${OPENPGM_LIBRARIES})
endif(WITH_OPENPGM)
if(WITH_VMCI)
add_definitions(-DZMQ_HAVE_VMCI)
include_directories(${VMCI_INCLUDE_DIRS})
list(APPEND cxx-sources vmci_address.cpp vmci_connecter.cpp vmci_listener.cpp vmci.cpp)
endif(WITH_VMCI)
#-----------------------------------------------------------------------------
# source generators
......
......@@ -196,6 +196,14 @@ src_libzmq_la_SOURCES = \
src/v2_encoder.cpp \
src/v2_encoder.hpp \
src/v2_protocol.hpp \
src/vmci.cpp \
src/vmci.hpp \
src/vmci_address.cpp \
src/vmci_address.hpp \
src/vmci_connecter.cpp \
src/vmci_connecter.hpp \
src/vmci_listener.cpp \
src/vmci_listener.hpp \
src/windows.hpp \
src/wire.hpp \
src/xpub.hpp \
......@@ -663,6 +671,21 @@ tests_test_abstract_ipc_LDADD = src/libzmq.la
endif
if HAVE_VMCI
test_apps += test_pair_vmci test_reqrep_vmci
test_pair_vmci_SOURCES = tests/test_pair_vmci.cpp
test_pair_vmci_LDADD = libzmq.la
test_pair_vmci_LDFLAGS = @LIBZMQ_VMCI_LDFLAGS@
test_pair_vmci_CXXFLAGS = @LIBZMQ_VMCI_CXXFLAGS@
test_reqrep_vmci_SOURCES = tests/test_reqrep_vmci.cpp
test_reqrep_vmci_LDADD = libzmq.la
test_reqrep_vmci_LDFLAGS = @LIBZMQ_VMCI_LDFLAGS@
test_reqrep_vmci_CXXFLAGS = @LIBZMQ_VMCI_CXXFLAGS@
endif
check_PROGRAMS = ${test_apps}
# Run the test cases
......
......@@ -488,6 +488,28 @@ if test "x$with_norm_ext" != "xno"; then
LIBS="-lnorm $LIBS"
fi
# build using vmci
have_vmci_library="no"
AC_ARG_WITH([vmci], [AS_HELP_STRING([--with-vmci],
[build libzmq with VMCI transport [default=no]])],
[have_vmci_ext=$withval],
[have_vmci_ext=yes])
if test "x$have_vmci_ext" != "xno"; then
AC_DEFINE(ZMQ_HAVE_VMCI, 1, [Have VMCI transport])
if test "x$have_vmci_ext" != "xyes"; then
vmci_path="${have_vmci_ext}"
LIBZMQ_VMCI_CXXFLAGS="-I${vmci_path}"
LIBZMQ_VMCI_LDFLAGS="-I${vmci_path}"
LIBZMQ_EXTRA_CXXFLAGS="${LIBZMQ_VMCI_CXXFLAGS} ${LIBZMQ_EXTRA_CXXFLAGS}"
LIBZMQ_EXTRA_LDFLAGS="${LIBZMQ_VMCI_LDFLAGS} ${LIBZMQ_EXTRA_LDFLAGS}"
fi
fi
AM_CONDITIONAL(HAVE_VMCI, test "x$have_vmci_ext" != "xno")
# Set -Wall, -Werror and -pedantic
AC_LANG_PUSH([C++])
......@@ -567,6 +589,9 @@ AC_SUBST(LIBZMQ_EXTRA_CFLAGS)
AC_SUBST(LIBZMQ_EXTRA_CXXFLAGS)
AC_SUBST(LIBZMQ_EXTRA_LDFLAGS)
AC_SUBST(LIBZMQ_VMCI_CXXFLAGS)
AC_SUBST(LIBZMQ_VMCI_LDFLAGS)
# set pkgconfigdir, allow override
AC_ARG_WITH([pkgconfigdir],
AS_HELP_STRING([--with-pkgconfigdir=PATH],
......
......@@ -21,7 +21,7 @@ MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_disconnect.3 zmq_close.3 \
zmq_atomic_counter_value.3 zmq_atomic_counter_destroy.3
MAN7 = zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_inproc.7 zmq_ipc.7 \
zmq_null.7 zmq_plain.7 zmq_curve.7 zmq_tipc.7
zmq_null.7 zmq_plain.7 zmq_curve.7 zmq_tipc.7 zmq_vmci.7
MAN_DOC = $(MAN1) $(MAN3) $(MAN7)
......
......@@ -162,6 +162,9 @@ Local inter-process communication transport::
Local in-process (inter-thread) communication transport::
linkzmq:zmq_inproc[7]
Virtual Machine Communications Interface (VMC) transport::
linkzmq:zmq_vmci[7]
Proxies
~~~~~~~
......
......@@ -27,13 +27,14 @@ The 'endpoint' is a string consisting of a 'transport'`://` followed by an
'ipc':: local inter-process communication transport, see linkzmq:zmq_ipc[7]
'inproc':: local in-process (inter-thread) communication transport, see linkzmq:zmq_inproc[7]
'pgm', 'epgm':: reliable multicast transport using PGM, see linkzmq:zmq_pgm[7]
'vmci':: virtual machine communications interface (VMCI), see linkzmq:zmq_vmci[7]
Every 0MQ socket type except 'ZMQ_PAIR' supports one-to-many and many-to-one
semantics. The precise semantics depend on the socket type and are defined in
linkzmq:zmq_socket[3].
The 'ipc' and 'tcp' transports accept wildcard addresses: see linkzmq:zmq_ipc[7]
and linkzmq:zmq_tcp[7] for details.
The 'ipc', 'tcp' and 'vmci' transports accept wildcard addresses: see linkzmq:zmq_ipc[7],
linkzmq:zmq_tcp[7] and linkzmq:zmq_vmci[7] for details.
NOTE: the address syntax may be different for _zmq_bind()_ and _zmq_connect()_
especially for the 'tcp', 'pgm' and 'epgm' transports.
......
......@@ -27,6 +27,7 @@ The 'endpoint' is a string consisting of a 'transport'`://` followed by an
'ipc':: local inter-process communication transport, see linkzmq:zmq_ipc[7]
'inproc':: local in-process (inter-thread) communication transport, see linkzmq:zmq_inproc[7]
'pgm', 'epgm':: reliable multicast transport using PGM, see linkzmq:zmq_pgm[7]
'vmci':: virtual machine communications interface (VMCI), see linkzmq:zmq_vmci[7]
Every 0MQ socket type except 'ZMQ_PAIR' supports one-to-many and many-to-one
semantics. The precise semantics depend on the socket type and are defined in
......
......@@ -777,6 +777,54 @@ Option value unit:: >0
Default value:: 8192
Applicable socket types:: all, when using TCP transport
ZMQ_VMCI_BUFFER_SIZE: Retrieve buffer size of the VMCI socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The `ZMQ_VMCI_BUFFER_SIZE` option shall retrieve the size of the underlying
buffer for the socket. Used during negotiation before the connection is established.
[horizontal]
Option value type:: uint64_t
Option value unit:: bytes
Default value:: 65546
Applicable socket types:: all, when using VMCI transport
ZMQ_VMCI_BUFFER_MIN_SIZE: Retrieve min buffer size of the VMCI socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The `ZMQ_VMCI_BUFFER_MIN_SIZE` option shall retrieve the min size of the underlying
buffer for the socket. Used during negotiation before the connection is established.
[horizontal]
Option value type:: uint64_t
Option value unit:: bytes
Default value:: 128
Applicable socket types:: all, when using VMCI transport
ZMQ_VMCI_BUFFER_MAX_SIZE: Retrieve max buffer size of the VMCI socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The `ZMQ_VMCI_BUFFER_MAX_SIZE` option shall retrieve the max size of the underlying
buffer for the socket. Used during negotiation before the connection is established.
[horizontal]
Option value type:: uint64_t
Option value unit:: bytes
Default value:: 262144
Applicable socket types:: all, when using VMCI transport
ZMQ_VMCI_CONNECT_TIMEOUT: Retrieve connection timeout of the VMCI socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The `ZMQ_VMCI_CONNECT_TIMEOUT` option shall retrieve connection timeout
for the socket.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: -1
Applicable socket types:: all, when using VMCI transport
RETURN VALUE
------------
The _zmq_getsockopt()_ function shall return zero if successful. Otherwise it
......
......@@ -78,6 +78,7 @@ linkzmq:zmq_connect[3]
linkzmq:zmq_ipc[7]
linkzmq:zmq_tcp[7]
linkzmq:zmq_pgm[7]
linkzmq:zmq_vmci[7]
linkzmq:zmq[7]
......
......@@ -95,6 +95,7 @@ linkzmq:zmq_connect[3]
linkzmq:zmq_inproc[7]
linkzmq:zmq_tcp[7]
linkzmq:zmq_pgm[7]
linkzmq:zmq_vmci[7]
linkzmq:zmq_getsockopt[3]
linkzmq:zmq[7]
......
......@@ -154,6 +154,7 @@ linkzmq:zmq_setsockopt[3]
linkzmq:zmq_tcp[7]
linkzmq:zmq_ipc[7]
linkzmq:zmq_inproc[7]
linkzmq:zmq_vmci[7]
linkzmq:zmq[7]
......
......@@ -1112,6 +1112,54 @@ Option value unit:: >0
Default value:: 8192
Applicable socket types:: all, when using TCP transport
ZMQ_VMCI_BUFFER_SIZE: Set buffer size of the VMCI socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The `ZMQ_VMCI_BUFFER_SIZE` option shall set the size of the underlying
buffer for the socket. Used during negotiation before the connection is established.
[horizontal]
Option value type:: uint64_t
Option value unit:: bytes
Default value:: 65546
Applicable socket types:: all, when using VMCI transport
ZMQ_VMCI_BUFFER_MIN_SIZE: Set min buffer size of the VMCI socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The `ZMQ_VMCI_BUFFER_MIN_SIZE` option shall set the min size of the underlying
buffer for the socket. Used during negotiation before the connection is established.
[horizontal]
Option value type:: uint64_t
Option value unit:: bytes
Default value:: 128
Applicable socket types:: all, when using VMCI transport
ZMQ_VMCI_BUFFER_MAX_SIZE: Set max buffer size of the VMCI socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The `ZMQ_VMCI_BUFFER_MAX_SIZE` option shall set the max size of the underlying
buffer for the socket. Used during negotiation before the connection is established.
[horizontal]
Option value type:: uint64_t
Option value unit:: bytes
Default value:: 262144
Applicable socket types:: all, when using VMCI transport
ZMQ_VMCI_CONNECT_TIMEOUT: Set connection timeout of the VMCI socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The `ZMQ_VMCI_CONNECT_TIMEOUT` option shall set connection timeout
for the socket.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: -1
Applicable socket types:: all, when using VMCI transport
RETURN VALUE
------------
The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it
......
......@@ -108,6 +108,7 @@ linkzmq:zmq_connect[3]
linkzmq:zmq_pgm[7]
linkzmq:zmq_ipc[7]
linkzmq:zmq_inproc[7]
linkzmq:zmq_vmci[7]
linkzmq:zmq[7]
......
......@@ -73,6 +73,7 @@ linkzmq:zmq_tcp[7]
linkzmq:zmq_pgm[7]
linkzmq:zmq_ipc[7]
linkzmq:zmq_inproc[7]
linkzmq:zmq_vmci[7]
linkzmq:zmq[7]
......
......@@ -21,10 +21,10 @@ argument.
The 'endpoint' argument is as described in linkzmq:zmq_bind[3]
Unbinding wild-card address from a socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
When wild-card `*` 'endpoint' (described in linkzmq:zmq_tcp[7] and
linkzmq:zmq_ipc[7]) was used in _zmq_bind()_, the caller should use
real 'endpoint' obtained from the ZMQ_LAST_ENDPOINT socket option
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
When wild-card `*` 'endpoint' (described in linkzmq:zmq_tcp[7],
linkzmq:zmq_ipc[7] and linkzmq:zmq_vmci[7]) was used in _zmq_bind()_, the caller should use
real 'endpoint' obtained from the ZMQ_LAST_ENDPOINT socket option
to unbind this 'endpoint' from a socket.
RETURN VALUE
......
zmq_ipc(7)
==========
NAME
----
zmq_ipc - 0MQ transport over virtual machine communicatios interface (VMCI) sockets
SYNOPSIS
--------
The VMCI transport passes messages between VMware virtual machines running on the same host,
between virtual machine and the host and within virtual machines (inter-process transport like ipc).
NOTE: Communication between a virtual machine and the host is not supported on Mac OS X 10.9 and above.
ADDRESSING
----------
A 0MQ endpoint is a string consisting of a 'transport'`://` followed by an
'address'. The 'transport' specifies the underlying protocol to use. The
'address' specifies the transport-specific address to connect to.
For the VMCI transport, the transport is `vmci`, and the meaning of
the 'address' part is defined below.
Binding a socket
~~~~~~~~~~~~~~~~
When binding a 'socket' to a local address using _zmq_bind()_ with the 'vmci'
transport, the 'endpoint' shall be interpreted as an 'interface' followed by a
colon and the TCP port number to use.
An 'interface' may be specified by either of the following:
* The wild-card `*`, meaning all available interfaces.
* An integer returned by `VMCISock_GetLocalCID`
The port may be specified by:
* A numeric value, usually above 1024 on POSIX systems.
* The wild-card `*`, meaning a system-assigned ephemeral port.
Unbinding wild-card address from a socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
When wild-card `*` 'endpoint' was used in _zmq_bind()_, the caller should use
real 'endpoint' obtained from the ZMQ_LAST_ENDPOINT socket option to unbind
this 'endpoint' from a socket using _zmq_unbind()_.
Connecting a socket
~~~~~~~~~~~~~~~~~~~
When connecting a socket to a peer address using _zmq_connect()_ with the 'vmci'
transport, the 'endpoint' shall be interpreted as a 'peer address' followed by
a colon and the port number to use.
A 'peer address' must be a CID of the peer.
EXAMPLES
--------
.Assigning a local address to a socket
----
// VMCI port 5555 on all available interfaces
rc = zmq_bind(socket, "vmci://*:5555");
assert (rc == 0);
// VMCI port 5555 on the local loop-back interface on all platforms
cid = VMCISock_GetLocalCID();
sprintf(endpoint, "vmci://%d:5555", cid);
rc = zmq_bind(socket, endpoint);
assert (rc == 0);
----
.Connecting a socket
----
// Connecting using a CID
sprintf(endpoint, "vmci://%d:5555", cid);
rc = zmq_connect(socket, endpoint);
assert (rc == 0);
----
SEE ALSO
--------
linkzmq:zmq_bind[3]
linkzmq:zmq_connect[3]
linkzmq:zmq_inproc[7]
linkzmq:zmq_tcp[7]
linkzmq:zmq_pgm[7]
linkzmq:zmq_vmci[7]
linkzmq:zmq_getsockopt[3]
linkzmq:zmq[7]
AUTHORS
-------
This page was written by the 0MQ community. To make a change please
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.
......@@ -326,6 +326,10 @@ ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg);
#define ZMQ_TCP_RECV_BUFFER 82
#define ZMQ_TCP_SEND_BUFFER 83
#define ZMQ_MULTICAST_MAXTPDU 84
#define ZMQ_VMCI_BUFFER_SIZE 85
#define ZMQ_VMCI_BUFFER_MIN_SIZE 86
#define ZMQ_VMCI_BUFFER_MAX_SIZE 87
#define ZMQ_VMCI_CONNECT_TIMEOUT 88
/* Message options */
#define ZMQ_MORE 1
......
......@@ -30,18 +30,24 @@
#include "macros.hpp"
#include "platform.hpp"
#include "address.hpp"
#include "ctx.hpp"
#include "err.hpp"
#include "tcp_address.hpp"
#include "ipc_address.hpp"
#include "tipc_address.hpp"
#if defined ZMQ_HAVE_VMCI
#include "vmci_address.hpp"
#endif
#include <string>
#include <sstream>
zmq::address_t::address_t (
const std::string &protocol_, const std::string &address_)
const std::string &protocol_, const std::string &address_, ctx_t *parent_)
: protocol (protocol_),
address (address_)
address (address_),
parent (parent_)
{
memset (&resolved, 0, sizeof resolved);
}
......@@ -69,6 +75,14 @@ zmq::address_t::~address_t ()
}
}
#endif
#if defined ZMQ_HAVE_VMCI
else
if (protocol == "vmci") {
if (resolved.vmci_addr) {
LIBZMQ_DELETE(resolved.vmci_addr);
}
}
#endif
}
int zmq::address_t::to_string (std::string &addr_) const
......@@ -91,6 +105,13 @@ int zmq::address_t::to_string (std::string &addr_) const
return resolved.tipc_addr->to_string (addr_);
}
#endif
#if defined ZMQ_HAVE_VMCI
else
if (protocol == "vmci") {
if (resolved.vmci_addr)
return resolved.vmci_addr->to_string (addr_);
}
#endif
if (!protocol.empty () && !address.empty ()) {
std::stringstream s;
......
......@@ -34,20 +34,25 @@
namespace zmq
{
class ctx_t;
class tcp_address_t;
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
class ipc_address_t;
#endif
#if defined ZMQ_HAVE_LINUX
class tipc_address_t;
#endif
#if defined ZMQ_HAVE_VMCI
class vmci_address_t;
#endif
struct address_t {
address_t (const std::string &protocol_, const std::string &address_);
address_t (const std::string &protocol_, const std::string &address_, ctx_t *parent_);
~address_t ();
const std::string protocol;
const std::string address;
ctx_t *parent;
// Protocol specific resolved address
union {
......@@ -57,6 +62,9 @@ namespace zmq
#endif
#if defined ZMQ_HAVE_LINUX
tipc_address_t *tipc_addr;
#endif
#if defined ZMQ_HAVE_VMCI
vmci_address_t *vmci_addr;
#endif
} resolved;
......
......@@ -55,6 +55,10 @@
#endif
#endif
#ifdef ZMQ_HAVE_VMCI
#include <vmci_sockets.h>
#endif
#define ZMQ_CTX_TAG_VALUE_GOOD 0xabadcafe
#define ZMQ_CTX_TAG_VALUE_BAD 0xdeadbeef
......@@ -84,6 +88,10 @@ zmq::ctx_t::ctx_t () :
#ifdef HAVE_FORK
pid = getpid();
#endif
#ifdef ZMQ_HAVE_VMCI
vmci_fd = -1;
vmci_family = -1;
#endif
}
bool zmq::ctx_t::check_tag ()
......@@ -183,6 +191,16 @@ int zmq::ctx_t::terminate ()
}
slot_sync.unlock ();
#ifdef ZMQ_HAVE_VMCI
vmci_sync.lock ();
VMCISock_ReleaseAFValueFd (vmci_fd);
vmci_family = -1;
vmci_fd = -1;
vmci_sync.unlock ();
#endif
// Deallocate the resources.
delete this;
......@@ -578,6 +596,30 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
}
}
#ifdef ZMQ_HAVE_VMCI
int zmq::ctx_t::get_vmci_socket_family ()
{
vmci_sync.lock ();
if (vmci_fd == -1) {
vmci_family = VMCISock_GetAFValueFd (&vmci_fd);
if (vmci_fd != -1) {
#ifdef FD_CLOEXEC
int rc = fcntl (vmci_fd, F_SETFD, FD_CLOEXEC);
errno_assert (rc != -1);
#endif
}
}
vmci_sync.unlock ();
return vmci_family;
}
#endif
// The last used socket ID, or 0 if no socket was used so far. Note that this
// is a global variable. Thus, even sockets created in different contexts have
// unique IDs.
......
......@@ -121,6 +121,11 @@ namespace zmq
const endpoint_t &endpoint_, pipe_t **pipes_);
void connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_);
#ifdef ZMQ_HAVE_VMCI
// Return family for the VMCI socket or -1 if it's not available.
int get_vmci_socket_family ();
#endif
enum {
term_tid = 0,
reaper_tid = 1
......@@ -219,6 +224,12 @@ namespace zmq
#endif
enum side { connect_side, bind_side };
void connect_inproc_sockets(zmq::socket_base_t *bind_socket_, options_t& bind_options, const pending_connection_t &pending_connection_, side side_);
#ifdef ZMQ_HAVE_VMCI
int vmci_fd;
int vmci_family;
mutex_t vmci_sync;
#endif
};
}
......
......@@ -79,6 +79,12 @@ zmq::options_t::options_t () :
heartbeat_interval (0),
heartbeat_timeout (-1)
{
#if defined ZMQ_HAVE_VMCI
vmci_buffer_size = 0;
vmci_buffer_min_size = 0;
vmci_buffer_max_size = 0;
vmci_connect_timeout = -1;
#endif
}
int zmq::options_t::setsockopt (int option_, const void *optval_,
......@@ -585,6 +591,36 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
}
break;
# ifdef ZMQ_HAVE_VMCI
case ZMQ_VMCI_BUFFER_SIZE:
if (optvallen_ == sizeof (uint64_t)) {
vmci_buffer_size = *((uint64_t*) optval_);
return 0;
}
break;
case ZMQ_VMCI_BUFFER_MIN_SIZE:
if (optvallen_ == sizeof (uint64_t)) {
vmci_buffer_min_size = *((uint64_t*) optval_);
return 0;
}
break;
case ZMQ_VMCI_BUFFER_MAX_SIZE:
if (optvallen_ == sizeof (uint64_t)) {
vmci_buffer_max_size = *((uint64_t*) optval_);
return 0;
}
break;
case ZMQ_VMCI_CONNECT_TIMEOUT:
if (optvallen_ == sizeof (int)) {
vmci_connect_timeout = *((int*) optval_);
return 0;
}
break;
# endif
default:
#if defined (ZMQ_ACT_MILITANT)
// There are valid scenarios for probing with unknown socket option
......
......@@ -225,6 +225,12 @@ namespace zmq
// Time in milliseconds to wait for a PING response before disconnecting
int heartbeat_timeout;
# if defined ZMQ_HAVE_VMCI
uint64_t vmci_buffer_size;
uint64_t vmci_buffer_min_size;
uint64_t vmci_buffer_max_size;
int vmci_connect_timeout;
# endif
};
}
......
......@@ -37,6 +37,7 @@
#include "ipc_connecter.hpp"
#include "tipc_connecter.hpp"
#include "socks_connecter.hpp"
#include "vmci_connecter.hpp"
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
#include "address.hpp"
......@@ -523,7 +524,7 @@ void zmq::session_base_t::start_connecting (bool wait_)
if (addr->protocol == "tcp") {
if (!options.socks_proxy_address.empty()) {
address_t *proxy_address = new (std::nothrow)
address_t ("tcp", options.socks_proxy_address);
address_t ("tcp", options.socks_proxy_address, this->get_ctx ());
alloc_assert (proxy_address);
socks_connecter_t *connecter =
new (std::nothrow) socks_connecter_t (
......@@ -633,6 +634,16 @@ void zmq::session_base_t::start_connecting (bool wait_)
}
#endif // ZMQ_HAVE_NORM
#if defined ZMQ_HAVE_VMCI
if (addr->protocol == "vmci") {
vmci_connecter_t *connecter = new (std::nothrow) vmci_connecter_t (
io_thread, this, options, addr, wait_);
alloc_assert (connecter);
launch_child (connecter);
return;
}
#endif
zmq_assert (false);
}
......@@ -52,6 +52,7 @@
#include "ipc_listener.hpp"
#include "tipc_listener.hpp"
#include "tcp_connecter.hpp"
#include "vmci_listener.hpp"
#include "io_thread.hpp"
#include "session_base.hpp"
#include "config.hpp"
......@@ -67,6 +68,12 @@
#include "tipc_address.hpp"
#include "mailbox.hpp"
#include "mailbox_safe.hpp"
#if defined ZMQ_HAVE_VMCI
#include "vmci_address.hpp"
#include "vmci_listener.hpp"
#endif
#ifdef ZMQ_HAVE_OPENPGM
#include "pgm_socket.hpp"
#endif
......@@ -244,7 +251,8 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
&& protocol_ != "pgm"
&& protocol_ != "epgm"
&& protocol_ != "tipc"
&& protocol_ != "norm") {
&& protocol_ != "norm"
&& protocol_ != "vmci") {
errno = EPROTONOSUPPORT;
return -1;
}
......@@ -281,6 +289,13 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
}
#endif
#if !defined ZMQ_HAVE_VMCI
if (protocol_ == "vmci") {
errno = EPROTONOSUPPORT;
return -1;
}
#endif
// Check whether socket type and transport protocol match.
// Specifically, multicast protocols can't be combined with
// bi-directional messaging patterns (socket types).
......@@ -595,6 +610,27 @@ int zmq::socket_base_t::bind (const char *addr_)
return 0;
}
#endif
#if defined ZMQ_HAVE_VMCI
if (protocol == "vmci") {
vmci_listener_t *listener = new (std::nothrow) vmci_listener_t (
io_thread, this, options);
alloc_assert (listener);
int rc = listener->set_address (address.c_str ());
if (rc != 0) {
LIBZMQ_DELETE(listener);
event_bind_failed (address, zmq_errno ());
EXIT_MUTEX();
return -1;
}
listener->get_address (last_endpoint);
add_endpoint (last_endpoint.c_str(), (own_t *) listener, NULL);
options.connected = true;
EXIT_MUTEX();
return 0;
}
#endif
EXIT_MUTEX();
zmq_assert (false);
......@@ -753,7 +789,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1;
}
address_t *paddr = new (std::nothrow) address_t (protocol, address);
address_t *paddr = new (std::nothrow) address_t (protocol, address, this->get_ctx ());
alloc_assert (paddr);
// Resolve address (if needed by the protocol)
......@@ -838,6 +874,19 @@ int zmq::socket_base_t::connect (const char *addr_)
}
}
#endif
#if defined ZMQ_HAVE_VMCI
else
if (protocol == "vmci") {
paddr->resolved.vmci_addr = new (std::nothrow) vmci_address_t (this->get_ctx ());
alloc_assert (paddr->resolved.vmci_addr);
int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
if (rc != 0) {
LIBZMQ_DELETE(paddr);
EXIT_MUTEX();
return -1;
}
}
#endif
// Create session.
session_base_t *session = session_base_t::create (io_thread, true, this,
......
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vmci.hpp"
#if defined ZMQ_HAVE_VMCI
#include <cassert>
#include <vmci_sockets.h>
void zmq::tune_vmci_buffer_size (ctx_t *context_, fd_t sockfd_, uint64_t default_size_, uint64_t min_size_, uint64_t max_size_)
{
int family = context_->get_vmci_socket_family ();
assert (family != -1);
if (default_size_ != 0) {
int rc = setsockopt (sockfd_, family, SO_VMCI_BUFFER_SIZE, &default_size_, sizeof default_size_);
#if defined ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
}
if (min_size_ != 0) {
int rc = setsockopt (sockfd_, family, SO_VMCI_BUFFER_SIZE, &min_size_, sizeof min_size_);
#if defined ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
}
if (max_size_ != 0) {
int rc = setsockopt (sockfd_, family, SO_VMCI_BUFFER_SIZE, &max_size_, sizeof max_size_);
#if defined ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
}
}
#if defined ZMQ_HAVE_WINDOWS
void zmq::tune_vmci_connect_timeout (ctx_t *context_, fd_t sockfd_, DWORD timeout_)
#else
void zmq::tune_vmci_connect_timeout (ctx_t *context_, fd_t sockfd_, struct timeval timeout_)
#endif
{
int family = context_->get_vmci_socket_family ();
assert (family != -1);
int rc = setsockopt (sockfd_, family, SO_VMCI_CONNECT_TIMEOUT, &timeout_, sizeof timeout_);
#if defined ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
}
#endif
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_VMCI_HPP_INCLUDED__
#define __ZMQ_VMCI_HPP_INCLUDED__
#include <stdint.h>
#include <string>
#include "platform.hpp"
#include "fd.hpp"
#include "ctx.hpp"
#if defined ZMQ_HAVE_VMCI
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <sys/time.h>
#endif
namespace zmq
{
void tune_vmci_buffer_size (ctx_t *context_, fd_t sockfd_, uint64_t default_size_, uint64_t min_size_, uint64_t max_size_);
#if defined ZMQ_HAVE_WINDOWS
void tune_vmci_connect_timeout (ctx_t *context_, fd_t sockfd_, DWORD timeout_);
#else
void tune_vmci_connect_timeout (ctx_t *context_, fd_t sockfd_, struct timeval timeout_);
#endif
}
#endif
#endif
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vmci_address.hpp"
#if defined(ZMQ_HAVE_VMCI)
#include <climits>
#include <string>
#include <sstream>
#include "ctx.hpp"
#include "err.hpp"
zmq::vmci_address_t::vmci_address_t(ctx_t *parent_) :
parent(parent_)
{
memset (&address, 0, sizeof address);
}
zmq::vmci_address_t::vmci_address_t(const sockaddr *sa, socklen_t sa_len, ctx_t *parent_) :
parent(parent_)
{
zmq_assert (sa && sa_len > 0);
memset (&address, 0, sizeof address);
if (sa->sa_family == parent->get_vmci_socket_family())
memcpy(&address, sa, sa_len);
}
zmq::vmci_address_t::~vmci_address_t ()
{
}
int zmq::vmci_address_t::resolve(const char *path_)
{
// Find the ':' at end that separates address from the port number.
const char *delimiter = strrchr (path_, ':');
if (!delimiter) {
errno = EINVAL;
return -1;
}
// Separate the address/port.
std::string addr_str (path_, delimiter - path_);
std::string port_str (delimiter + 1);
unsigned int cid = VMADDR_CID_ANY;
unsigned int port = VMADDR_PORT_ANY;
if (!addr_str.length()) {
errno = EINVAL;
return -1;
}
else if (addr_str != "*" && addr_str != "-1") {
const char *begin = addr_str.c_str();
char *end = NULL;
unsigned long l = strtoul(begin, &end, 10);
if ((l == 0 && end == begin) || (l == ULONG_MAX && errno == ERANGE) || l > UINT_MAX)
{
errno = EINVAL;
return -1;
}
cid = static_cast<unsigned int> (l);
}
if (!port_str.length()) {
errno = EINVAL;
return -1;
}
else if (port_str != "*" && port_str != "-1") {
const char *begin = port_str.c_str();
char *end = NULL;
unsigned long l = strtoul(begin, &end, 10);
if ((l == 0 && end == begin) || (l == ULONG_MAX && errno == ERANGE) || l > UINT_MAX) {
errno = EINVAL;
return -1;
}
port = static_cast<unsigned int> (l);
}
address.svm_family = static_cast<sa_family_t> (parent->get_vmci_socket_family());
address.svm_cid = cid;
address.svm_port = port;
return 0;
}
int zmq::vmci_address_t::to_string (std::string &addr_)
{
if (address.svm_family != parent->get_vmci_socket_family()) {
addr_.clear ();
return -1;
}
std::stringstream s;
s << "vmci://";
if (address.svm_cid == VMADDR_CID_ANY) {
s << "*";
}
else
{
s << address.svm_cid;
}
s << ":";
if (address.svm_port == VMADDR_PORT_ANY) {
s << "*";
}
else {
s << address.svm_port;
}
addr_ = s.str ();
return 0;
}
const sockaddr *zmq::vmci_address_t::addr () const
{
return reinterpret_cast<const sockaddr*> (&address);
}
socklen_t zmq::vmci_address_t::addrlen () const
{
return static_cast<socklen_t> (sizeof address);
}
#endif
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_VMCI_ADDRESS_HPP_INCLUDED__
#define __ZMQ_VMCI_ADDRESS_HPP_INCLUDED__
#include <string>
#include "platform.hpp"
#include "ctx.hpp"
#if defined(ZMQ_HAVE_VMCI)
#include <vmci_sockets.h>
namespace zmq
{
// class ctx_t;
class vmci_address_t
{
public:
vmci_address_t (ctx_t *parent_);
vmci_address_t (const sockaddr *sa, socklen_t sa_len, ctx_t *parent_);
~vmci_address_t ();
// This function sets up the address for VMCI transport.
int resolve (const char *path_);
// The opposite to resolve()
int to_string (std::string &addr_);
const sockaddr *addr () const;
socklen_t addrlen () const;
private:
struct sockaddr_vm address;
ctx_t *parent;
vmci_address_t ();
vmci_address_t (const vmci_address_t&);
const vmci_address_t &operator = (const vmci_address_t&);
};
}
#endif
#endif
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vmci_connecter.hpp"
#if defined ZMQ_HAVE_VMCI
#include <new>
#include <string>
#include "stream_engine.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
#include "random.hpp"
#include "err.hpp"
#include "ip.hpp"
#include "address.hpp"
#include "ipc_address.hpp"
#include "session_base.hpp"
#include "vmci.hpp"
zmq::vmci_connecter_t::vmci_connecter_t (class io_thread_t *io_thread_,
class session_base_t *session_, const options_t &options_,
const address_t *addr_, bool delayed_start_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
addr (addr_),
s (retired_fd),
handle_valid (false),
delayed_start (delayed_start_),
timer_started (false),
session (session_),
current_reconnect_ivl(options.reconnect_ivl)
{
zmq_assert (addr);
zmq_assert (addr->protocol == "vmci");
addr->to_string (endpoint);
socket = session-> get_socket();
}
zmq::vmci_connecter_t::~vmci_connecter_t ()
{
zmq_assert (!timer_started);
zmq_assert (!handle_valid);
zmq_assert (s == retired_fd);
}
void zmq::vmci_connecter_t::process_plug ()
{
if (delayed_start)
add_reconnect_timer ();
else
start_connecting ();
}
void zmq::vmci_connecter_t::process_term (int linger_)
{
if (timer_started) {
cancel_timer (reconnect_timer_id);
timer_started = false;
}
if (handle_valid) {
rm_fd (handle);
handle_valid = false;
}
if (s != retired_fd)
close ();
own_t::process_term (linger_);
}
void zmq::vmci_connecter_t::in_event ()
{
// We are not polling for incoming data, so we are actually called
// because of error here. However, we can get error on out event as well
// on some platforms, so we'll simply handle both events in the same way.
out_event ();
}
void zmq::vmci_connecter_t::out_event ()
{
fd_t fd = connect ();
rm_fd (handle);
handle_valid = false;
// Handle the error condition by attempt to reconnect.
if (fd == retired_fd) {
close ();
add_reconnect_timer();
return;
}
tune_vmci_buffer_size (this->get_ctx (), fd, options.vmci_buffer_size, options.vmci_buffer_min_size, options.vmci_buffer_max_size);
if (options.vmci_connect_timeout > 0)
{
#if defined ZMQ_HAVE_WINDOWS
tune_vmci_connect_timeout (this->get_ctx (), fd, options.vmci_connect_timeout);
#else
struct timeval timeout = {0, options.vmci_connect_timeout * 1000};
tune_vmci_connect_timeout (this->get_ctx (), fd, timeout);
#endif
}
// Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow)
stream_engine_t (fd, options, endpoint);
alloc_assert (engine);
// Attach the engine to the corresponding session object.
send_attach (session, engine);
// Shut the connecter down.
terminate ();
socket->event_connected (endpoint, fd);
}
void zmq::vmci_connecter_t::timer_event (int id_)
{
zmq_assert (id_ == reconnect_timer_id);
timer_started = false;
start_connecting ();
}
void zmq::vmci_connecter_t::start_connecting ()
{
// Open the connecting socket.
int rc = open ();
// Connect may succeed in synchronous manner.
if (rc == 0) {
handle = add_fd (s);
handle_valid = true;
out_event ();
}
// Connection establishment may be delayed. Poll for its completion.
else
if (rc == -1 && errno == EINPROGRESS) {
handle = add_fd (s);
handle_valid = true;
set_pollout (handle);
socket->event_connect_delayed (endpoint, zmq_errno());
}
// Handle any other error condition by eventual reconnect.
else {
if (s != retired_fd)
close ();
add_reconnect_timer ();
}
}
void zmq::vmci_connecter_t::add_reconnect_timer()
{
int rc_ivl = get_new_reconnect_ivl();
add_timer (rc_ivl, reconnect_timer_id);
socket->event_connect_retried (endpoint, rc_ivl);
timer_started = true;
}
int zmq::vmci_connecter_t::get_new_reconnect_ivl ()
{
// The new interval is the current interval + random value.
int this_interval = current_reconnect_ivl +
(generate_random () % options.reconnect_ivl);
// Only change the current reconnect interval if the maximum reconnect
// interval was set and if it's larger than the reconnect interval.
if (options.reconnect_ivl_max > 0 &&
options.reconnect_ivl_max > options.reconnect_ivl) {
// Calculate the next interval
current_reconnect_ivl = current_reconnect_ivl * 2;
if(current_reconnect_ivl >= options.reconnect_ivl_max) {
current_reconnect_ivl = options.reconnect_ivl_max;
}
}
return this_interval;
}
int zmq::vmci_connecter_t::open ()
{
zmq_assert (s == retired_fd);
int family = this->get_ctx ()->get_vmci_socket_family ();
if (family == -1)
return -1;
// Create the socket.
s = open_socket (family, SOCK_STREAM, 0);
if (s == -1)
return -1;
// Set the non-blocking flag.
unblock_socket (s);
// Connect to the remote peer.
int rc = ::connect (
s, addr->resolved.ipc_addr->addr (),
addr->resolved.ipc_addr->addrlen ());
// Connect was successful immediately.
if (rc == 0)
return 0;
// Translate other error codes indicating asynchronous connect has been
// launched to a uniform EINPROGRESS.
if (rc == -1 && errno == EINTR) {
errno = EINPROGRESS;
return -1;
}
// Forward the error.
return -1;
}
int zmq::vmci_connecter_t::close ()
{
zmq_assert (s != retired_fd);
int rc = ::close (s);
errno_assert (rc == 0);
socket->event_closed (endpoint, s);
s = retired_fd;
return 0;
}
zmq::fd_t zmq::vmci_connecter_t::connect ()
{
// Following code should handle both Berkeley-derived socket
// implementations and Solaris.
int err = 0;
#if defined ZMQ_HAVE_HPUX
int len = sizeof (err);
#else
socklen_t len = sizeof (err);
#endif
int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
if (rc == -1)
err = errno;
if (err != 0) {
// Assert if the error was caused by 0MQ bug.
// Networking problems are OK. No need to assert.
errno = err;
errno_assert (errno == ECONNREFUSED || errno == ECONNRESET ||
errno == ETIMEDOUT || errno == EHOSTUNREACH ||
errno == ENETUNREACH || errno == ENETDOWN);
return retired_fd;
}
fd_t result = s;
s = retired_fd;
return result;
}
#endif
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __VMCI_CONNECTER_HPP_INCLUDED__
#define __VMCI_CONNECTER_HPP_INCLUDED__
#include "platform.hpp"
#if defined ZMQ_HAVE_VMCI
#include "fd.hpp"
#include "own.hpp"
#include "stdint.hpp"
#include "io_object.hpp"
namespace zmq
{
class io_thread_t;
class session_base_t;
struct address_t;
class vmci_connecter_t : public own_t, public io_object_t
{
public:
// If 'delayed_start' is true connecter first waits for a while,
// then starts connection process.
vmci_connecter_t (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_, const options_t &options_,
const address_t *addr_, bool delayed_start_);
~vmci_connecter_t ();
private:
// ID of the timer used to delay the reconnection.
enum {reconnect_timer_id = 1};
// Handlers for incoming commands.
void process_plug ();
void process_term (int linger_);
// Handlers for I/O events.
void in_event ();
void out_event ();
void timer_event (int id_);
// Internal function to start the actual connection establishment.
void start_connecting ();
// Internal function to add a reconnect timer
void add_reconnect_timer();
// Internal function to return a reconnect backoff delay.
// Will modify the current_reconnect_ivl used for next call
// Returns the currently used interval
int get_new_reconnect_ivl ();
// Open VMCI connecting socket. Returns -1 in case of error,
// 0 if connect was successful immediately. Returns -1 with
// EAGAIN errno if async connect was launched.
int open ();
// Close the connecting socket.
int close ();
// Get the file descriptor of newly created connection. Returns
// retired_fd if the connection was unsuccessful.
fd_t connect ();
// Address to connect to. Owned by session_base_t.
const address_t *addr;
// Underlying socket.
fd_t s;
// Handle corresponding to the listening socket.
handle_t handle;
// If true file descriptor is registered with the poller and 'handle'
// contains valid value.
bool handle_valid;
// If true, connecter is waiting a while before trying to connect.
const bool delayed_start;
// True iff a timer has been started.
bool timer_started;
// Reference to the session we belong to.
zmq::session_base_t *session;
// Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl;
// String representation of endpoint to connect to
std::string endpoint;
// Socket
zmq::socket_base_t *socket;
vmci_connecter_t (const vmci_connecter_t&);
const vmci_connecter_t &operator = (const vmci_connecter_t&);
};
}
#endif
#endif
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vmci_listener.hpp"
#if defined ZMQ_HAVE_VMCI
#include <new>
#include <string.h>
#include "stream_engine.hpp"
#include "vmci_address.hpp"
#include "io_thread.hpp"
#include "session_base.hpp"
#include "config.hpp"
#include "err.hpp"
#include "ip.hpp"
#include "socket_base.hpp"
#include "vmci.hpp"
#include <unistd.h>
#include <fcntl.h>
zmq::vmci_listener_t::vmci_listener_t (io_thread_t *io_thread_,
socket_base_t *socket_, const options_t &options_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
s (retired_fd),
socket (socket_)
{
}
zmq::vmci_listener_t::~vmci_listener_t ()
{
zmq_assert (s == retired_fd);
}
void zmq::vmci_listener_t::process_plug ()
{
// Start polling for incoming connections.
handle = add_fd (s);
set_pollin (handle);
}
void zmq::vmci_listener_t::process_term (int linger_)
{
rm_fd (handle);
close ();
own_t::process_term (linger_);
}
void zmq::vmci_listener_t::in_event ()
{
fd_t fd = accept ();
// If connection was reset by the peer in the meantime, just ignore it.
if (fd == retired_fd) {
socket->event_accept_failed (endpoint, zmq_errno());
return;
}
tune_vmci_buffer_size (this->get_ctx (), fd, options.vmci_buffer_size, options.vmci_buffer_min_size, options.vmci_buffer_max_size);
if (options.vmci_connect_timeout > 0)
{
#if defined ZMQ_HAVE_WINDOWS
tune_vmci_connect_timeout (this->get_ctx (), fd, options.vmci_connect_timeout);
#else
struct timeval timeout = {0, options.vmci_connect_timeout * 1000};
tune_vmci_connect_timeout (this->get_ctx (), fd, timeout);
#endif
}
// Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow)
stream_engine_t (fd, options, endpoint);
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
// Create and launch a session object.
session_base_t *session = session_base_t::create (io_thread, false, socket,
options, NULL);
errno_assert (session);
session->inc_seqnum ();
launch_child (session);
send_attach (session, engine, false);
socket->event_accepted (endpoint, fd);
}
int zmq::vmci_listener_t::get_address (std::string &addr_)
{
struct sockaddr_storage ss;
#ifdef ZMQ_HAVE_HPUX
int sl = sizeof (ss);
#else
socklen_t sl = sizeof (ss);
#endif
int rc = getsockname (s, (sockaddr *) &ss, &sl);
if (rc != 0) {
addr_.clear ();
return rc;
}
vmci_address_t addr ((struct sockaddr *) &ss, sl, this->get_ctx ());
return addr.to_string (addr_);
}
int zmq::vmci_listener_t::set_address (const char *addr_)
{
// Create addr on stack for auto-cleanup
std::string addr (addr_);
// Initialise the address structure.
vmci_address_t address(this->get_ctx ());
int rc = address.resolve (addr.c_str());
if (rc != 0)
return -1;
// Create a listening socket.
s = open_socket (this->get_ctx ()->get_vmci_socket_family (), SOCK_STREAM, 0);
if (s == -1)
return -1;
address.to_string (endpoint);
// Bind the socket.
rc = bind (s, address.addr (), address.addrlen ());
if (rc != 0)
goto error;
// Listen for incoming connections.
rc = listen (s, options.backlog);
if (rc != 0)
goto error;
socket->event_listening (endpoint, s);
return 0;
error:
int err = errno;
close ();
errno = err;
return -1;
}
int zmq::vmci_listener_t::close ()
{
zmq_assert (s != retired_fd);
int rc = ::close (s);
errno_assert (rc == 0);
s = retired_fd;
socket->event_closed (endpoint, s);
return 0;
}
zmq::fd_t zmq::vmci_listener_t::accept ()
{
// Accept one connection and deal with different failure modes.
// The situation where connection cannot be accepted due to insufficient
// resources is considered valid and treated by ignoring the connection.
zmq_assert (s != retired_fd);
fd_t sock = ::accept (s, NULL, NULL);
if (sock == -1) {
errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
errno == EINTR || errno == ECONNABORTED || errno == EPROTO ||
errno == ENFILE);
return retired_fd;
}
// Race condition can cause socket not to be closed (if fork happens
// between accept and this point).
#ifdef FD_CLOEXEC
int rc = fcntl (sock, F_SETFD, FD_CLOEXEC);
errno_assert (rc != -1);
#endif
return sock;
}
#endif
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_VMCI_LISTENER_HPP_INCLUDED__
#define __ZMQ_VMCI_LISTENER_HPP_INCLUDED__
#include "platform.hpp"
#if defined ZMQ_HAVE_VMCI
#include <string>
#include "fd.hpp"
#include "own.hpp"
#include "stdint.hpp"
#include "io_object.hpp"
namespace zmq
{
class io_thread_t;
class socket_base_t;
class vmci_listener_t : public own_t, public io_object_t
{
public:
vmci_listener_t (zmq::io_thread_t *io_thread_,
zmq::socket_base_t *socket_, const options_t &options_);
~vmci_listener_t ();
// Set address to listen on.
int set_address (const char *addr_);
// Get the bound address for use with wildcards
int get_address (std::string &addr_);
private:
// Handlers for incoming commands.
void process_plug ();
void process_term (int linger_);
// Handlers for I/O events.
void in_event ();
// Close the listening socket.
int close ();
// Accept the new connection. Returns the file descriptor of the
// newly created connection. The function may return retired_fd
// if the connection was dropped while waiting in the listen backlog.
fd_t accept ();
// Underlying socket.
fd_t s;
// Handle corresponding to the listening socket.
handle_t handle;
// Socket the listerner belongs to.
zmq::socket_base_t *socket;
// String representation of endpoint to bind to
std::string endpoint;
vmci_listener_t (const vmci_listener_t&);
const vmci_listener_t &operator = (const vmci_listener_t&);
};
}
#endif
#endif
......@@ -1238,6 +1238,10 @@ int zmq_has (const char *capability)
#if defined (HAVE_LIBGSSAPI_KRB5)
if (strcmp (capability, "gssapi") == 0)
return true;
#endif
#if defined (ZMQ_HAVE_VMCI)
if (strcmp (capability, "vmci") == 0)
return true;
#endif
// Whatever the application asked for, we don't have
return false;
......
......@@ -92,6 +92,13 @@ if(NOT WIN32)
endif()
endif()
if(WITH_VMCI)
list(APPEND tests
test_pair_vmci
test_reqrep_vmci
)
endif()
foreach(test ${tests})
add_executable(${test} ${test}.cpp)
target_link_libraries(${test} libzmq)
......
......@@ -67,5 +67,11 @@ int main (void)
assert (!zmq_has ("gssapi"));
#endif
#if defined (ZMQ_HAVE_VMCI)
assert (zmq_has("vmci"));
#else
assert (!zmq_has("vmci"));
#endif
return 0;
}
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string>
#include <sstream>
#include <vmci_sockets.h>
#include "testutil.hpp"
int main (void)
{
setup_test_environment();
void *ctx = zmq_ctx_new ();
assert (ctx);
std::stringstream s;
s << "vmci://" << VMCISock_GetLocalCID() << ":" << 5560;
std::string endpoint = s.str();
void *sb = zmq_socket (ctx, ZMQ_PAIR);
assert (sb);
int rc = zmq_bind (sb, endpoint.c_str());
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_PAIR);
assert (sc);
rc = zmq_connect (sc, endpoint.c_str());
assert (rc == 0);
bounce (sb, sc);
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
}
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string>
#include <sstream>
#include <vmci_sockets.h>
#include "testutil.hpp"
int main (void)
{
setup_test_environment();
void *ctx = zmq_ctx_new ();
assert (ctx);
std::stringstream s;
s << "vmci://" << VMCISock_GetLocalCID() << ":" << 5560;
std::string endpoint = s.str();
void *sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
int rc = zmq_bind (sb, endpoint.c_str());
assert (rc == 0);
void *sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);
rc = zmq_connect (sc, endpoint.c_str());
assert (rc == 0);
bounce (sb, sc);
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
}
......@@ -40,6 +40,9 @@ int main (void)
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
const char *ep_wc_ipc = "ipc://*";
#endif
#if defined ZMQ_HAVE_VMCI
const char *ep_wc_vmci = "vmci://*:*";
#endif
// Create infrastructure.
void *ctx = zmq_ctx_new ();
......@@ -128,6 +131,12 @@ int main (void)
rc = zmq_bind (pull, ep_wc_ipc);
assert (rc == 0);
#endif
#if defined ZMQ_HAVE_VMCI
void *req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
rc = zmq_bind (req, ep_wc_vmci);
assert (rc == 0);
#endif
// Unbind sockets binded by wild-card address
rc = zmq_getsockopt (push, ZMQ_LAST_ENDPOINT, buf, (size_t *)&buf_size);
......@@ -140,6 +149,12 @@ int main (void)
rc = zmq_unbind (pull, buf);
assert (rc == 0);
#endif
#if defined ZMQ_HAVE_VMCI
rc = zmq_getsockopt (req, ZMQ_LAST_ENDPOINT, buf, (size_t *)&buf_size);
assert (rc == 0);
rc = zmq_unbind(req, buf);
assert (rc == 0);
#endif
// Create infrastructure (wild-card binding)
ctx = zmq_ctx_new ();
......@@ -154,6 +169,12 @@ int main (void)
rc = zmq_bind (pull, ep_wc_ipc);
assert (rc == 0);
#endif
#if defined ZMQ_HAVE_VMCI
req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
rc = zmq_bind (req, ep_wc_vmci);
assert (rc == 0);
#endif
// Sockets binded by wild-card address can't be unbinded by wild-card address
rc = zmq_unbind (push, ep_wc_tcp);
......@@ -162,6 +183,10 @@ int main (void)
rc = zmq_unbind (pull, ep_wc_ipc);
assert (rc == -1 && zmq_errno () == ENOENT);
#endif
#if defined ZMQ_HAVE_VMCI
rc = zmq_unbind (req, ep_wc_vmci);
assert (rc == -1 && zmq_errno () == ENOENT);
#endif
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment