Commit 4726f726 authored by Manuel Segura's avatar Manuel Segura Committed by Luca Boccassi

Pull request to merge porting to WindRiver VxWorks 6.x (#2966)

* Problem: Still need to port over more files to VxWorks 6.x

Solution: Port more files to VxWorks 6.x

* Problem: Need to port over remaining files to VxWorks 6.x. Also remove POSIX thread dependency for VxWorks (because of priority inversion problem in POSIX mutexes with VxWorks 6.x processes)

Solution: Port over remaining files to VxWorks 6.x. Also removed POSIX thread dependency for VxWorks

* Problem: Needed to modify TCP, UDP, TIPC classes with #ifdefs to be compatible with VxWorks 6.x.

Solution:  Modify TCP, UDP, TIPC classes with #ifdefs to be compatible with VxWorks 6.x
parent 0d23b5ca
/* src/platform.hpp. Generated from platform.hpp.in by configure. */
/* src/platform.hpp.in. Generated from configure.ac by autoheader. */
/* Define to 1 if you have the <alloca.h> header file. */
/* #undef HAVE_ALLOCA_H */
/* Define to 1 if you have the <arpa/inet.h> header file. */
#define HAVE_ARPA_INET_H 1
/* Define to 1 if you have the `clock_gettime' function. */
#define HAVE_CLOCK_GETTIME 1
/* Define to 1 if you have the declaration of `LOCAL_PEERCRED', and to 0 if
you don't. */
#define HAVE_DECL_LOCAL_PEERCRED 0
/* Define to 1 if you have the declaration of `SO_PEERCRED', and to 0 if you
don't. */
#define HAVE_DECL_SO_PEERCRED 0
/* Define to 1 if you have the <dlfcn.h> header file. */
#define HAVE_DLFCN_H 1
/* Define to 1 if you have the <errno.h> header file. */
#define HAVE_ERRNO_H 1
/* Define to 1 if you have the `fork' function. */
#define HAVE_FORK 1
/* Define to 1 if you have the `freeifaddrs' function. */
#define HAVE_FREEIFADDRS 1
/* Define to 1 if you have the `gethrtime' function. */
/* #undef HAVE_GETHRTIME */
/* Define to 1 if you have the `getifaddrs' function. */
#define HAVE_GETIFADDRS 1
/* Define to 1 if you have the `gettimeofday' function. */
#define HAVE_GETTIMEOFDAY 1
/* Define to 1 if you have the <ifaddrs.h> header file. */
//#define HAVE_IFADDRS_H 1
/* Define to 1 if you have the <inttypes.h> header file. */
#define HAVE_INTTYPES_H 1
/* Define to 1 if you have the `gssapi_krb5' library (-lgssapi_krb5). */
/* #undef HAVE_LIBGSSAPI_KRB5 */
/* Define to 1 if you have the `iphlpapi' library (-liphlpapi). */
/* #undef HAVE_LIBIPHLPAPI */
/* Define to 1 if you have the `nsl' library (-lnsl). */
/* #undef HAVE_LIBNSL */
/* Define to 1 if you have the `pthread' library (-lpthread). */
/* #undef HAVE_LIBPTHREAD */
/* Define to 1 if you have the `rpcrt4' library (-lrpcrt4). */
/* #undef HAVE_LIBRPCRT4 */
/* Define to 1 if you have the `rt' library (-lrt). */
/* #undef HAVE_LIBRT */
/* Define to 1 if you have the `socket' library (-lsocket). */
/* #undef HAVE_LIBSOCKET */
/* Define to 1 if you have the `sodium' library (-lsodium). */
/* #undef HAVE_LIBSODIUM */
/* Define to 1 if you have the `ws2_32' library (-lws2_32). */
/* #undef HAVE_LIBWS2_32 */
/* Define to 1 if you have the <limits.h> header file. */
#define HAVE_LIMITS_H 1
/* Define to 1 if you have the <memory.h> header file. */
#define HAVE_MEMORY_H 1
/* Define to 1 if you have the `memset' function. */
#define HAVE_MEMSET 1
/* Define to 1 if you have the <netinet/in.h> header file. */
#define HAVE_NETINET_IN_H 1
/* Define to 1 if you have the <netinet/tcp.h> header file. */
#define HAVE_NETINET_TCP_H 1
/* Define to 1 if you have the `perror' function. */
#define HAVE_PERROR 1
/* Define to 1 if you have the `socket' function. */
#define HAVE_SOCKET 1
/* Define to 1 if stdbool.h conforms to C99. */
#define HAVE_STDBOOL_H 1
/* Define to 1 if you have the <stddef.h> header file. */
#define HAVE_STDDEF_H 1
/* Define to 1 if you have the <stdint.h> header file. */
#define HAVE_STDINT_H 1
/* Define to 1 if you have the <stdlib.h> header file. */
#define HAVE_STDLIB_H 1
/* Define to 1 if you have the <strings.h> header file. */
#define HAVE_STRINGS_H 1
/* Define to 1 if you have the <string.h> header file. */
#define HAVE_STRING_H 1
/* Define to 1 if you have the <sys/eventfd.h> header file. */
/* #undef HAVE_SYS_EVENTFD_H */
/* Define to 1 if you have the <sys/socket.h> header file. */
#define HAVE_SYS_SOCKET_H 1
/* Define to 1 if you have the <sys/stat.h> header file. */
#define HAVE_SYS_STAT_H 1
/* Define to 1 if you have the <sys/time.h> header file. */
#define HAVE_SYS_TIME_H 1
/* Define to 1 if you have the <sys/types.h> header file. */
#define HAVE_SYS_TYPES_H 1
/* Define to 1 if you have the <sys/uio.h> header file. */
#define HAVE_SYS_UIO_H 1
/* Define to 1 if you have the <time.h> header file. */
#define HAVE_TIME_H 1
/* Define to 1 if you have the <unistd.h> header file. */
#define HAVE_UNISTD_H 1
/* Define to 1 if you have the <windows.h> header file. */
/* #undef HAVE_WINDOWS_H */
/* Define to 1 if the system has the type `_Bool'. */
/* #undef HAVE__BOOL */
/* Define to the sub-directory in which libtool stores uninstalled libraries.
*/
#define LT_OBJDIR ".libs/"
/* Name of package */
#define PACKAGE "zeromq"
/* Define to the address where bug reports for this package should be sent. */
#define PACKAGE_BUGREPORT "zeromq-dev@lists.zeromq.org"
/* Define to the full name of this package. */
#define PACKAGE_NAME "zeromq"
/* Define to the full name and version of this package. */
#define PACKAGE_STRING "zeromq 4.1.0"
/* Define to the one symbol short name of this package. */
#define PACKAGE_TARNAME "zeromq"
/* Define to the home page for this package. */
#define PACKAGE_URL ""
/* Define to the version of this package. */
#define PACKAGE_VERSION "4.1.0"
/* Define as the return type of signal handlers (`int' or `void'). */
#define RETSIGTYPE void
/* Define to 1 if you have the ANSI C header files. */
#define STDC_HEADERS 1
/* Define to 1 if you can safely include both <sys/time.h> and <time.h>. */
#define TIME_WITH_SYS_TIME 1
/* Version number of package */
#define VERSION "4.1.0"
/* Enable militant API assertions */
/* #undef ZMQ_ACT_MILITANT */
/* Force to use mutexes */
/* #undef ZMQ_FORCE_MUTEXES */
/* Have VxWorks OS */
#define ZMQ_HAVE_VXWORKS 1
/* Have AIX OS */
/* #undef ZMQ_HAVE_AIX */
/* Have Android OS */
/* #undef ZMQ_HAVE_ANDROID */
/* Have Cygwin */
/* #undef ZMQ_HAVE_CYGWIN */
/* Have eventfd extension. */
/* #undef ZMQ_HAVE_EVENTFD */
/* Have FreeBSD OS */
/* #undef ZMQ_HAVE_FREEBSD */
/* Have HPUX OS */
/* #undef ZMQ_HAVE_HPUX */
/* Have ifaddrs.h header. */
//#define ZMQ_HAVE_IFADDRS 1
/* Have Linux OS */
/* #undef ZMQ_HAVE_LINUX */
/* Have LOCAL_PEERCRED socket option */
/* #undef ZMQ_HAVE_LOCAL_PEERCRED */
/* Have MinGW32 */
/* #undef ZMQ_HAVE_MINGW32 */
/* Have NetBSD OS */
/* #undef ZMQ_HAVE_NETBSD */
/* Have NORM protocol extension */
/* #undef ZMQ_HAVE_NORM */
/* Have OpenBSD OS */
/* #undef ZMQ_HAVE_OPENBSD */
/* Have OpenPGM extension */
/* #undef ZMQ_HAVE_OPENPGM */
/* Have DarwinOSX OS */
/* #undef ZMQ_HAVE_OSX */
/* Have QNX Neutrino OS */
/* #undef ZMQ_HAVE_QNXNTO */
/* Whether SOCK_CLOEXEC is defined and functioning. */
/* #undef ZMQ_HAVE_SOCK_CLOEXEC */
/* Have Solaris OS */
/* #undef ZMQ_HAVE_SOLARIS */
/* Whether SO_KEEPALIVE is supported. */
#define ZMQ_HAVE_SO_KEEPALIVE 1
/* Have SO_PEERCRED socket option */
/* #undef ZMQ_HAVE_SO_PEERCRED */
/* Whether TCP_KEEPALIVE is supported. */
/* #undef ZMQ_HAVE_TCP_KEEPALIVE */
/* Whether TCP_KEEPCNT is supported. */
/* #undef ZMQ_HAVE_TCP_KEEPCNT */
/* Whether TCP_KEEPIDLE is supported. */
/* #undef ZMQ_HAVE_TCP_KEEPIDLE */
/* Whether TCP_KEEPINTVL is supported. */
/* #undef ZMQ_HAVE_TCP_KEEPINTVL */
/* Have TIPC support */
#define ZMQ_HAVE_TIPC 1
/* Have uio.h header. */
//#define ZMQ_HAVE_UIO 1
#define ZMQ_USE_SELECT 1
/* Have Windows OS */
/* #undef ZMQ_HAVE_WINDOWS */
/* Define for Solaris 2.5.1 so the uint32_t typedef from <sys/synch.h>,
<pthread.h>, or <semaphore.h> is not used. If the typedef were allowed, the
#define below would cause a syntax error. */
/* #undef _UINT32_T */
/* Define to empty if `const' does not conform to ANSI C. */
/* #undef const */
/* Define to `__inline__' or `__inline' if that's what the C compiler
calls it, or to nothing if 'inline' is not supported under any name. */
#ifndef __cplusplus
/* #undef inline */
#endif
/* Define to `unsigned int' if <sys/types.h> does not define. */
/* #undef size_t */
/* Define to `int' if <sys/types.h> does not define. */
/* #undef ssize_t */
/* Define to the type of an unsigned integer type of width exactly 32 bits if
such a type exists and the standard includes do not define it. */
/* #undef uint32_t */
/* Define to empty if the keyword `volatile' does not work. Warning: valid
code using `volatile' can become incorrect without. Disable with care. */
/* #undef volatile */
/* ---- Special case for z/OS Unix Services: openedition ---- */
#include <vxWorks.h>
#ifndef NI_MAXHOST
#define NI_MAXHOST 1025
#endif
......@@ -66,7 +66,8 @@ zmq::address_t::~address_t ()
LIBZMQ_DELETE (resolved.udp_addr);
}
}
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS
else if (protocol == "ipc") {
if (resolved.ipc_addr) {
LIBZMQ_DELETE (resolved.ipc_addr);
......@@ -99,7 +100,8 @@ int zmq::address_t::to_string (std::string &addr_) const
if (resolved.udp_addr)
return resolved.udp_addr->to_string (addr_);
}
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS
else if (protocol == "ipc") {
if (resolved.ipc_addr)
return resolved.ipc_addr->to_string (addr_);
......
......@@ -40,7 +40,7 @@ class udp_address_t;
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
class ipc_address_t;
#endif
#if defined ZMQ_HAVE_LINUX
#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_VXWORKS
class tipc_address_t;
#endif
#if defined ZMQ_HAVE_VMCI
......@@ -63,10 +63,11 @@ struct address_t
{
tcp_address_t *tcp_addr;
udp_address_t *udp_addr;
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS
ipc_address_t *ipc_addr;
#endif
#if defined ZMQ_HAVE_LINUX
#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_VXWORKS
tipc_address_t *tipc_addr;
#endif
#if defined ZMQ_HAVE_VMCI
......
......@@ -280,7 +280,11 @@ struct atomic_value_t
#endif
#if defined ZMQ_ATOMIC_PTR_MUTEX
#if defined ZMQ_HAVE_VXWORKS
mutable mutex_t sync;
#else
mutex_t sync;
#endif
#endif
private:
......
......@@ -52,6 +52,10 @@
#include <time.h>
#endif
#if defined ZMQ_HAVE_VXWORKS
#include "timers.h"
#endif
#if defined ZMQ_HAVE_OSX
#include <mach/clock.h>
#include <mach/mach.h>
......@@ -154,7 +158,8 @@ uint64_t zmq::clock_t::now_us ()
double ticks_div = ticksPerSecond.QuadPart / 1000000.0;
return (uint64_t) (tick.QuadPart / ticks_div);
#elif defined HAVE_CLOCK_GETTIME && defined CLOCK_MONOTONIC
#elif defined HAVE_CLOCK_GETTIME \
&& (defined CLOCK_MONOTONIC || defined ZMQ_HAVE_VXWORKS)
// Use POSIX clock_gettime function to get precise monotonic time.
struct timespec tv;
......@@ -169,11 +174,13 @@ uint64_t zmq::clock_t::now_us ()
// This should be a configuration check, but I looked into it and writing an
// AC_FUNC_CLOCK_MONOTONIC seems beyond my powers.
if (rc != 0) {
#ifndef ZMQ_HAVE_VXWORKS
// Use POSIX gettimeofday function to get precise time.
struct timeval tv;
int rc = gettimeofday (&tv, NULL);
errno_assert (rc == 0);
return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec);
#endif
}
return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_nsec / 1000);
......
......@@ -170,6 +170,90 @@ class condition_variable_t
#endif
#elif defined ZMQ_HAVE_VXWORKS
#include <sysLib.h>
namespace zmq
{
class condition_variable_t
{
public:
inline condition_variable_t () {}
inline ~condition_variable_t ()
{
scoped_lock_t l (m_listenersMutex);
for (size_t i = 0; i < m_listeners.size (); i++) {
semDelete (m_listeners[i]);
}
}
inline int wait (mutex_t *mutex_, int timeout_)
{
//Atomically releases lock, blocks the current executing thread,
//and adds it to the list of threads waiting on *this. The thread
//will be unblocked when broadcast() is executed.
//It may also be unblocked spuriously. When unblocked, regardless
//of the reason, lock is reacquired and wait exits.
SEM_ID sem = semBCreate (SEM_Q_PRIORITY, SEM_EMPTY);
{
scoped_lock_t l (m_listenersMutex);
m_listeners.push_back (sem);
}
mutex_->unlock ();
int rc;
if (timeout_ < 0)
rc = semTake (sem, WAIT_FOREVER);
else {
int ticksPerSec = sysClkRateGet ();
int timeoutTicks = (timeout_ * ticksPerSec) / 1000 + 1;
rc = semTake (sem, timeoutTicks);
}
{
scoped_lock_t l (m_listenersMutex);
// remove sem from listeners
for (size_t i = 0; i < m_listeners.size (); i++) {
if (m_listeners[i] == sem) {
m_listeners.erase (m_listeners.begin () + i);
break;
}
}
semDelete (sem);
}
mutex_->lock ();
if (rc == 0)
return 0;
if (rc == S_objLib_OBJ_TIMEOUT) {
errno = EAGAIN;
return -1;
}
return -1;
}
inline void broadcast ()
{
scoped_lock_t l (m_listenersMutex);
for (size_t i = 0; i < m_listeners.size (); i++) {
semGive (m_listeners[i]);
}
}
private:
mutex_t m_listenersMutex;
std::vector<SEM_ID> m_listeners;
// Disable copy construction and assignment.
condition_variable_t (const condition_variable_t &);
const condition_variable_t &operator= (const condition_variable_t &);
};
}
#else
#include <pthread.h>
......
......@@ -44,10 +44,16 @@
#include "tcp.hpp"
#endif
#if defined ZMQ_HAVE_OPENVMS
#if defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_VXWORKS
#include <ioctl.h>
#endif
#if defined ZMQ_HAVE_VXWORKS
#include <unistd.h>
#include <sockLib.h>
#include <ioLib.h>
#endif
#if defined ZMQ_HAVE_EVENTFD
#include <sys/eventfd.h>
#endif
......@@ -106,7 +112,7 @@ void zmq::unblock_socket (fd_t s_)
u_long nonblock = 1;
int rc = ioctlsocket (s_, FIONBIO, &nonblock);
wsa_assert (rc != SOCKET_ERROR);
#elif defined ZMQ_HAVE_OPENVMS
#elif defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_VXWORKS
int nonblock = 1;
int rc = ioctl (s_, FIONBIO, &nonblock);
errno_assert (rc != -1);
......@@ -129,8 +135,8 @@ void zmq::enable_ipv4_mapping (fd_t s_)
#else
int flag = 0;
#endif
int rc = setsockopt (s_, IPPROTO_IPV6, IPV6_V6ONLY, (const char *) &flag,
sizeof (flag));
int rc =
setsockopt (s_, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flag, sizeof (flag));
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
......@@ -144,7 +150,8 @@ int zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_)
int rc;
struct sockaddr_storage ss;
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_WINDOWS
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_WINDOWS \
|| defined ZMQ_HAVE_VXWORKS
int addrlen = static_cast<int> (sizeof ss);
#else
socklen_t addrlen = sizeof ss;
......@@ -185,9 +192,8 @@ int zmq::get_peer_ip_address (fd_t sockfd_, std::string &ip_addr_)
void zmq::set_ip_type_of_service (fd_t s_, int iptos)
{
int rc =
setsockopt (s_, IPPROTO_IP, IP_TOS,
reinterpret_cast<const char *> (&iptos), sizeof (iptos));
int rc = setsockopt (s_, IPPROTO_IP, IP_TOS,
reinterpret_cast<char *> (&iptos), sizeof (iptos));
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
......@@ -198,7 +204,7 @@ void zmq::set_ip_type_of_service (fd_t s_, int iptos)
// Windows and Hurd do not support IPV6_TCLASS
#if !defined(ZMQ_HAVE_WINDOWS) && defined(IPV6_TCLASS)
rc = setsockopt (s_, IPPROTO_IPV6, IPV6_TCLASS,
reinterpret_cast<const char *> (&iptos), sizeof (iptos));
reinterpret_cast<char *> (&iptos), sizeof (iptos));
// If IPv6 is not enabled ENOPROTOOPT will be returned on Linux and
// EINVAL on OSX
......@@ -577,7 +583,48 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
close (listener);
return 0;
#elif defined ZMQ_HAVE_VXWORKS
struct sockaddr_in lcladdr;
memset (&lcladdr, 0, sizeof lcladdr);
lcladdr.sin_family = AF_INET;
lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
lcladdr.sin_port = 0;
int listener = open_socket (AF_INET, SOCK_STREAM, 0);
errno_assert (listener != -1);
int on = 1;
int rc =
setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof on);
errno_assert (rc != -1);
rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
errno_assert (rc != -1);
socklen_t lcladdr_len = sizeof lcladdr;
rc = getsockname (listener, (struct sockaddr *) &lcladdr,
(int *) &lcladdr_len);
errno_assert (rc != -1);
rc = listen (listener, 1);
errno_assert (rc != -1);
*w_ = open_socket (AF_INET, SOCK_STREAM, 0);
errno_assert (*w_ != -1);
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof on);
errno_assert (rc != -1);
rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
errno_assert (rc != -1);
*r_ = accept (listener, NULL, NULL);
errno_assert (*r_ != -1);
close (listener);
return 0;
#else
// All other implementations support socketpair()
int sv[2];
......@@ -594,9 +641,9 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
*w_ = *r_ = -1;
return -1;
} else {
// If there's no SOCK_CLOEXEC, let's try the second best option. Note that
// race condition can cause socket not to be closed (if fork happens
// between socket creation and this point).
// If there's no SOCK_CLOEXEC, let's try the second best option. Note that
// race condition can cause socket not to be closed (if fork happens
// between socket creation and this point).
#if !defined ZMQ_HAVE_SOCK_CLOEXEC && defined FD_CLOEXEC
rc = fcntl (sv[0], F_SETFD, FD_CLOEXEC);
errno_assert (rc != -1);
......
......@@ -30,7 +30,8 @@
#include "precompiled.hpp"
#include "ipc_address.hpp"
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS
#include "err.hpp"
......
......@@ -32,7 +32,8 @@
#include <string>
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS
#include <sys/socket.h>
#include <sys/un.h>
......
......@@ -30,7 +30,8 @@
#include "precompiled.hpp"
#include "ipc_connecter.hpp"
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS
#include <new>
#include <string>
......@@ -49,6 +50,7 @@
#include <sys/socket.h>
#include <sys/un.h>
zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
class session_base_t *session_,
const options_t &options_,
......
......@@ -30,7 +30,8 @@
#ifndef __IPC_CONNECTER_HPP_INCLUDED__
#define __IPC_CONNECTER_HPP_INCLUDED__
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS
#include "fd.hpp"
#include "own.hpp"
......
......@@ -30,7 +30,8 @@
#include "precompiled.hpp"
#include "ipc_listener.hpp"
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS
#include <new>
......@@ -69,6 +70,7 @@ const char *zmq::ipc_listener_t::tmp_env_vars[] = {
0 // Sentinel
};
int zmq::ipc_listener_t::create_wildcard_address (std::string &path_,
std::string &file_)
{
......@@ -263,7 +265,7 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
}
// Bind the socket to the file path.
rc = bind (s, address.addr (), address.addrlen ());
rc = bind (s, (sockaddr *) address.addr (), address.addrlen ());
if (rc != 0)
goto error;
......
......@@ -30,7 +30,8 @@
#ifndef __ZMQ_IPC_LISTENER_HPP_INCLUDED__
#define __ZMQ_IPC_LISTENER_HPP_INCLUDED__
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS
#include <string>
......
......@@ -67,6 +67,45 @@ class mutex_t
};
}
#elif defined ZMQ_HAVE_VXWORKS
#include <vxWorks.h>
#include <semLib.h>
namespace zmq
{
class mutex_t
{
public:
inline mutex_t ()
{
m_semId =
semMCreate (SEM_Q_PRIORITY | SEM_INVERSION_SAFE | SEM_DELETE_SAFE);
}
inline ~mutex_t () { semDelete (m_semId); }
inline void lock () { semTake (m_semId, WAIT_FOREVER); }
inline bool try_lock ()
{
if (semTake (m_semId, NO_WAIT) == OK) {
return true;
}
return false;
}
inline void unlock () { semGive (m_semId); }
private:
SEM_ID m_semId;
// Disable copy construction and assignment.
mutex_t (const mutex_t &);
const mutex_t &operator= (const mutex_t &);
};
}
#else
#include <pthread.h>
......
......@@ -717,10 +717,10 @@ int zmq::options_t::setsockopt (int option_,
break;
}
// TODO mechanism should either be set explicitly, or determined when
// connecting. currently, it depends on the order of setsockopt calls
// if there is some inconsistency, which is confusing. in addition,
// the assumed or set mechanism should be queryable (as a socket option)
// TODO mechanism should either be set explicitly, or determined when
// connecting. currently, it depends on the order of setsockopt calls
// if there is some inconsistency, which is confusing. in addition,
// the assumed or set mechanism should be queryable (as a socket option)
#if defined(ZMQ_ACT_MILITANT)
// There is no valid use case for passing an error back to the application
......
......@@ -39,6 +39,10 @@
#elif defined ZMQ_HAVE_OPENVMS
#include <sys/types.h>
#include <sys/time.h>
#elif defined ZMQ_HAVE_VXWORKS
#include <sys/types.h>
#include <sys/time.h>
#include <strings.h>
#else
#include <sys/select.h>
#endif
......
......@@ -576,7 +576,8 @@ void zmq::session_base_t::start_connecting (bool wait_)
return;
}
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS
if (addr->protocol == "ipc") {
ipc_connecter_t *connecter = new (std::nothrow)
ipc_connecter_t (io_thread, this, options, addr, wait_);
......
......@@ -53,6 +53,11 @@
#elif defined ZMQ_HAVE_OPENVMS
#include <sys/types.h>
#include <sys/time.h>
#elif defined ZMQ_HAVE_VXWORKS
#include <sys/types.h>
#include <sys/time.h>
#include <sockLib.h>
#include <strings.h>
#else
#include <sys/select.h>
#endif
......@@ -87,6 +92,11 @@ static int sleep_ms (unsigned int ms_)
#elif defined ZMQ_HAVE_ANDROID
usleep (ms_ * 1000);
return 0;
#elif defined ZMQ_HAVE_VXWORKS
struct timespec ns_;
ns_.tv_sec = ms_ / 1000;
ns_.tv_nsec = ms_ % 1000 * 1000000;
return nanosleep (&ns_, 0);
#else
return usleep (ms_ * 1000);
#endif
......@@ -194,6 +204,22 @@ void zmq::signaler_t::send ()
zmq_assert (nbytes == sizeof (dummy));
break;
}
#elif defined ZMQ_HAVE_VXWORKS
unsigned char dummy = 0;
while (true) {
ssize_t nbytes = ::send (w, (char *) &dummy, sizeof (dummy), 0);
if (unlikely (nbytes == -1 && errno == EINTR))
continue;
#if defined(HAVE_FORK)
if (unlikely (pid != getpid ())) {
//printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
errno = EINTR;
break;
}
#endif
zmq_assert (nbytes == sizeof dummy);
break;
}
#else
unsigned char dummy = 0;
while (true) {
......@@ -305,6 +331,9 @@ void zmq::signaler_t::recv ()
#if defined ZMQ_HAVE_WINDOWS
int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
wsa_assert (nbytes != SOCKET_ERROR);
#elif defined ZMQ_HAVE_VXWORKS
ssize_t nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
errno_assert (nbytes >= 0);
#else
ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
errno_assert (nbytes >= 0);
......@@ -349,6 +378,16 @@ int zmq::signaler_t::recv_failable ()
}
wsa_assert (last_error == WSAEWOULDBLOCK);
}
#elif defined ZMQ_HAVE_VXWORKS
ssize_t nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
if (nbytes == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
errno = EAGAIN;
return -1;
}
errno_assert (errno == EAGAIN || errno == EWOULDBLOCK
|| errno == EINTR);
}
#else
ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
if (nbytes == -1) {
......
......@@ -288,7 +288,8 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
{
// First check out whether the protocol is something we are aware of.
if (protocol_ != "inproc"
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS
&& protocol_ != "ipc"
#endif
&& protocol_ != "tcp"
......@@ -312,9 +313,9 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
return -1;
}
// Check whether socket type and transport protocol match.
// Specifically, multicast protocols can't be combined with
// bi-directional messaging patterns (socket types).
// Check whether socket type and transport protocol match.
// Specifically, multicast protocols can't be combined with
// bi-directional messaging patterns (socket types).
#if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "norm")
&& options.type != ZMQ_PUB && options.type != ZMQ_SUB
......@@ -591,7 +592,8 @@ int zmq::socket_base_t::bind (const char *addr_)
return 0;
}
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS
if (protocol == "ipc") {
ipc_listener_t *listener =
new (std::nothrow) ipc_listener_t (io_thread, this, options);
......@@ -847,7 +849,8 @@ int zmq::socket_base_t::connect (const char *addr_)
// Defer resolution until a socket is opened
paddr->resolved.tcp_addr = NULL;
}
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
&& !defined ZMQ_HAVE_VXWORKS
else if (protocol == "ipc") {
paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
alloc_assert (paddr->resolved.ipc_addr);
......@@ -875,7 +878,7 @@ int zmq::socket_base_t::connect (const char *addr_)
}
}
// TBD - Should we check address for ZMQ_HAVE_NORM???
// TBD - Should we check address for ZMQ_HAVE_NORM???
#ifdef ZMQ_HAVE_OPENPGM
if (protocol == "pgm" || protocol == "epgm") {
......
......@@ -541,6 +541,12 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_,
usleep (timeout_ * 1000);
errno = EAGAIN;
return -1;
#elif defined ZMQ_HAVE_VXWORKS
struct timespec ns_;
ns_.tv_sec = timeout_ / 1000;
ns_.tv_nsec = timeout_ % 1000 * 1000000;
nanosleep (&ns_, 0);
return -1;
#else
usleep (timeout_ * 1000);
return -1;
......
......@@ -38,6 +38,10 @@
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#elif defined ZMQ_HAVE_VXWORKS
#include <unistd.h>
#include <sys/time.h>
#include <strings.h>
#else
#include <unistd.h>
#endif
......
......@@ -47,6 +47,9 @@
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#if defined ZMQ_HAVE_VXWORKS
#include <sockLib.h>
#endif
#endif
zmq::socks_connecter_t::socks_connecter_t (class io_thread_t *io_thread_,
......@@ -342,7 +345,12 @@ int zmq::socks_connecter_t::connect_to_proxy ()
// Set a source address for conversations
if (tcp_addr->has_src_addr ()) {
#if defined ZMQ_HAVE_VXWORKS
rc = ::bind (s, (sockaddr *) tcp_addr->src_addr (),
tcp_addr->src_addrlen ());
#else
rc = ::bind (s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
#endif
if (rc == -1) {
close ();
return -1;
......@@ -350,8 +358,11 @@ int zmq::socks_connecter_t::connect_to_proxy ()
}
// Connect to the remote peer.
#if defined ZMQ_HAVE_VXWORKS
rc = ::connect (s, (sockaddr *) tcp_addr->addr (), tcp_addr->addrlen ());
#else
rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
#endif
// Connect was successful immediately.
if (rc == 0)
return 0;
......@@ -377,7 +388,7 @@ zmq::fd_t zmq::socks_connecter_t::check_proxy_connection ()
{
// Async connect has finished. Check whether an error occurred
int err = 0;
#ifdef ZMQ_HAVE_HPUX
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
int len = sizeof err;
#else
socklen_t len = sizeof err;
......
......@@ -39,6 +39,9 @@
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#ifdef ZMQ_HAVE_VXWORKS
#include <sockLib.h>
#endif
#endif
#if defined ZMQ_HAVE_OPENVMS
......@@ -224,7 +227,7 @@ int zmq::tcp_write (fd_t s_, const void *data_, size_t size_)
return nbytes;
#else
ssize_t nbytes = send (s_, data_, size_, 0);
ssize_t nbytes = send (s_, (char *) data_, size_, 0);
// Several errors are OK. When speculative write is being done we may not
// be able to write a single byte from the socket. Also, SIGSTOP issued
......@@ -273,7 +276,7 @@ int zmq::tcp_read (fd_t s_, void *data_, size_t size_)
#else
const ssize_t rc = recv (s_, data_, size_, 0);
const ssize_t rc = recv (s_, (char *) data_, size_, 0);
// Several errors are OK. When speculative read is being done we may not
// be able to read a single byte from the socket. Also, SIGSTOP issued
......@@ -297,7 +300,7 @@ void zmq::tcp_assert_tuning_error (zmq::fd_t s_, int rc_)
// Check whether an error occurred
int err = 0;
#ifdef ZMQ_HAVE_HPUX
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
int len = sizeof err;
#else
socklen_t len = sizeof err;
......
......@@ -113,8 +113,12 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_,
return 0;
}
#elif defined ZMQ_HAVE_AIX || defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_ANDROID
#elif defined ZMQ_HAVE_AIX || defined ZMQ_HAVE_HPUX \
|| defined ZMQ_HAVE_ANDROID || defined ZMQ_HAVE_VXWORKS
#include <sys/ioctl.h>
#ifdef ZMQ_HAVE_VXWORKS
#include <ioLib.h>
#endif
int zmq::tcp_address_t::resolve_nic_name (const char *nic_,
bool ipv6_,
......@@ -413,7 +417,12 @@ int zmq::tcp_address_t::resolve_interface (const char *interface_,
sockaddr_in6 ip6_addr;
memset (&ip6_addr, 0, sizeof (ip6_addr));
ip6_addr.sin6_family = AF_INET6;
#ifdef ZMQ_HAVE_VXWORKS
struct in6_addr newaddr = IN6ADDR_ANY_INIT;
memcpy (&ip6_addr.sin6_addr, &newaddr, sizeof (in6_addr));
#else
memcpy (&ip6_addr.sin6_addr, &in6addr_any, sizeof (in6addr_any));
#endif
out_addrlen = sizeof (ip6_addr);
memcpy (out_addr, &ip6_addr, out_addrlen);
} else {
......@@ -646,7 +655,8 @@ int zmq::tcp_address_t::resolve (const char *name_,
std::string if_str = addr_str.substr (pos + 1);
addr_str = addr_str.substr (0, pos);
if (isalpha (if_str.at (0)))
#if !defined ZMQ_HAVE_WINDOWS_TARGET_XP && !defined ZMQ_HAVE_WINDOWS_UWP
#if !defined ZMQ_HAVE_WINDOWS_TARGET_XP && !defined ZMQ_HAVE_WINDOWS_UWP \
&& !defined ZMQ_HAVE_VXWORKS
zone_id = if_nametoindex (if_str.c_str ());
#else
// The function 'if_nametoindex' is not supported on Windows XP.
......
......@@ -52,6 +52,9 @@
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#ifdef ZMQ_HAVE_VXWORKS
#include <sockLib.h>
#endif
#ifdef ZMQ_HAVE_OPENVMS
#include <ioctl.h>
#endif
......@@ -325,26 +328,38 @@ int zmq::tcp_connecter_t::open ()
rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, (const char *) &flag,
sizeof (int));
wsa_assert (rc != SOCKET_ERROR);
#elif defined ZMQ_HAVE_VXWORKS
rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, (char *) &flag,
sizeof (int));
errno_assert (rc == 0);
#else
rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
errno_assert (rc == 0);
#endif
#if defined ZMQ_HAVE_VXWORKS
rc = ::bind (s, (sockaddr *) tcp_addr->src_addr (),
tcp_addr->src_addrlen ());
#else
rc = ::bind (s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
#endif
if (rc == -1)
return -1;
}
// Connect to the remote peer.
#if defined ZMQ_HAVE_VXWORKS
rc = ::connect (s, (sockaddr *) tcp_addr->addr (), tcp_addr->addrlen ());
#else
rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
#endif
// Connect was successful immediately.
if (rc == 0) {
return 0;
}
// Translate error codes indicating asynchronous connect has been
// launched to a uniform EINPROGRESS.
// Translate error codes indicating asynchronous connect has been
// launched to a uniform EINPROGRESS.
#ifdef ZMQ_HAVE_WINDOWS
const int last_error = WSAGetLastError ();
if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
......@@ -362,7 +377,7 @@ zmq::fd_t zmq::tcp_connecter_t::connect ()
{
// Async connect has finished. Check whether an error occurred
int err = 0;
#ifdef ZMQ_HAVE_HPUX
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
int len = sizeof err;
#else
socklen_t len = sizeof err;
......
......@@ -51,6 +51,9 @@
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#ifdef ZMQ_HAVE_VXWORKS
#include <sockLib.h>
#endif
#endif
#ifdef ZMQ_HAVE_OPENVMS
......@@ -149,7 +152,7 @@ int zmq::tcp_listener_t::get_address (std::string &addr_)
{
// Get the details of the TCP socket
struct sockaddr_storage ss;
#ifdef ZMQ_HAVE_HPUX
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
int sl = sizeof (ss);
#else
socklen_t sl = sizeof (ss);
......@@ -236,13 +239,20 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
rc = setsockopt (s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char *) &flag,
sizeof (int));
wsa_assert (rc != SOCKET_ERROR);
#elif defined ZMQ_HAVE_VXWORKS
rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, (char *) &flag, sizeof (int));
errno_assert (rc == 0);
#else
rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
errno_assert (rc == 0);
#endif
// Bind the socket to the network interface and port.
#if defined ZMQ_HAVE_VXWORKS
rc = bind (s, (sockaddr *) address.addr (), address.addrlen ());
#else
rc = bind (s, address.addr (), address.addrlen ());
#endif
#ifdef ZMQ_HAVE_WINDOWS
if (rc == SOCKET_ERROR) {
errno = wsa_error_to_errno (WSAGetLastError ());
......@@ -284,7 +294,7 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
struct sockaddr_storage ss;
memset (&ss, 0, sizeof (ss));
#ifdef ZMQ_HAVE_HPUX
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
int ss_len = sizeof (ss);
#else
socklen_t ss_len = sizeof (ss);
......
......@@ -97,6 +97,66 @@ void zmq::thread_t::setThreadName (const char *name_)
LIBZMQ_UNUSED (name_);
}
#elif defined ZMQ_HAVE_VXWORKS
extern "C" {
static void *thread_routine (void *arg_)
{
zmq::thread_t *self = (zmq::thread_t *) arg_;
self->applySchedulingParameters ();
self->tfn (self->arg);
return NULL;
}
}
void zmq::thread_t::start (thread_fn *tfn_, void *arg_)
{
tfn = tfn_;
arg = arg_;
descriptor = taskSpawn (NULL, DEFAULT_PRIORITY, DEFAULT_OPTIONS,
DEFAULT_STACK_SIZE, (FUNCPTR) thread_routine,
(int) this, 0, 0, 0, 0, 0, 0, 0, 0, 0);
if (descriptor != NULL || descriptor > 0)
started = true;
}
void zmq::thread_t::stop ()
{
if (started)
while ((descriptor != NULL || descriptor > 0)
&& taskIdVerify (descriptor) == 0) {
}
}
bool zmq::thread_t::is_current_thread () const
{
return taskIdSelf () == descriptor;
}
void zmq::thread_t::setSchedulingParameters (
int priority_, int schedulingPolicy_, const std::set<int> &affinity_cpus_)
{
thread_priority = priority_;
thread_sched_policy = schedulingPolicy_;
thread_affinity_cpus = affinity_cpus_;
}
void zmq::thread_t::
applySchedulingParameters () // to be called in secondary thread context
{
int priority = (thread_priority >= 0 ? thread_priority : DEFAULT_PRIORITY);
priority = (priority < 255 ? priority : DEFAULT_PRIORITY);
if (descriptor != NULL || descriptor > 0) {
taskPrioritySet (descriptor, priority);
}
}
void zmq::thread_t::setThreadName (const char *name_)
{
// not implemented
LIBZMQ_UNUSED (name_);
}
#else
#include <signal.h>
......@@ -206,6 +266,7 @@ void zmq::thread_t::
posix_assert (rc);
#if !defined ZMQ_HAVE_VXWORKS
if (use_nice_instead_priority
&& thread_priority != ZMQ_THREAD_PRIORITY_DFLT) {
// assume the user wants to decrease the thread's nice value
......@@ -217,6 +278,7 @@ void zmq::thread_t::
// IMPORTANT: EPERM is typically returned for unprivileged processes: that's because
// CAP_SYS_NICE capability is required or RLIMIT_NICE resource limit should be changed to avoid EPERM!
}
#endif
#ifdef ZMQ_HAVE_PTHREAD_SET_AFFINITY
if (!thread_affinity_cpus.empty ()) {
......
......@@ -30,7 +30,10 @@
#ifndef __ZMQ_THREAD_HPP_INCLUDED__
#define __ZMQ_THREAD_HPP_INCLUDED__
#ifndef ZMQ_HAVE_WINDOWS
#if defined ZMQ_HAVE_VXWORKS
#include <vxWorks.h>
#include <taskLib.h>
#elif !defined ZMQ_HAVE_WINDOWS
#include <pthread.h>
#endif
#include <set>
......@@ -58,6 +61,15 @@ class thread_t
{
}
#ifdef ZMQ_HAVE_VXWORKS
~thread_t ()
{
if (descriptor != NULL || descriptor > 0) {
taskDelete (descriptor);
}
}
#endif
// Creates OS thread. 'tfn' is main thread function. It'll be passed
// 'arg' as an argument.
void start (thread_fn *tfn_, void *arg_);
......@@ -93,6 +105,14 @@ class thread_t
#ifdef ZMQ_HAVE_WINDOWS
HANDLE descriptor;
#elif defined ZMQ_HAVE_VXWORKS
int descriptor;
enum
{
DEFAULT_PRIORITY = 100,
DEFAULT_OPTIONS = 0,
DEFAULT_STACK_SIZE = 4000
};
#else
pthread_t descriptor;
#endif
......
......@@ -37,7 +37,11 @@
#if defined ZMQ_HAVE_TIPC
#include <sys/socket.h>
#if defined ZMQ_HAVE_VXWORKS
#include <tipc/tipc.h>
#else
#include <linux/tipc.h>
#endif
namespace zmq
{
......
......@@ -49,6 +49,9 @@
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#ifdef ZMQ_HAVE_VXWORKS
#include <sockLib.h>
#endif
zmq::tipc_connecter_t::tipc_connecter_t (class io_thread_t *io_thread_,
class session_base_t *session_,
......@@ -217,9 +220,13 @@ int zmq::tipc_connecter_t::open ()
// Set the non-blocking flag.
unblock_socket (s);
// Connect to the remote peer.
#ifdef ZMQ_HAVE_VXWORKS
int rc = ::connect (s, (sockaddr *) addr->resolved.tipc_addr->addr (),
addr->resolved.tipc_addr->addrlen ());
#else
int rc = ::connect (s, addr->resolved.tipc_addr->addr (),
addr->resolved.tipc_addr->addrlen ());
#endif
// Connect was successful immediately.
if (rc == 0)
return 0;
......@@ -248,8 +255,11 @@ zmq::fd_t zmq::tipc_connecter_t::connect ()
// Following code should handle both Berkeley-derived socket
// implementations and Solaris.
int err = 0;
#if ZMQ_HAVE_VXWORKS
int len = sizeof (err);
#else
socklen_t len = sizeof (err);
#endif
int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char *) &err, &len);
if (rc == -1)
err = errno;
......
......@@ -49,7 +49,12 @@
#include <unistd.h>
#include <sys/socket.h>
#include <fcntl.h>
#if defined ZMQ_HAVE_VXWORKS
#include <sockLib.h>
#include <tipc/tipc.h>
#else
#include <linux/tipc.h>
#endif
zmq::tipc_listener_t::tipc_listener_t (io_thread_t *io_thread_,
socket_base_t *socket_,
......@@ -116,7 +121,11 @@ int zmq::tipc_listener_t::get_address (std::string &addr_)
struct sockaddr_storage ss;
socklen_t sl = sizeof (ss);
#ifdef ZMQ_HAVE_VXWORKS
int rc = getsockname (s, (sockaddr *) &ss, (int *) &sl);
#else
int rc = getsockname (s, (sockaddr *) &ss, &sl);
#endif
if (rc != 0) {
addr_.clear ();
return rc;
......@@ -148,7 +157,11 @@ int zmq::tipc_listener_t::set_address (const char *addr_)
// If random Port Identity, update address object to reflect the assigned address
if (address.is_random ()) {
struct sockaddr_storage ss;
#ifdef ZMQ_HAVE_VXWORKS
int sl = sizeof (ss);
#else
socklen_t sl = sizeof (ss);
#endif
int rc = getsockname (s, (sockaddr *) &ss, &sl);
if (rc != 0)
goto error;
......@@ -161,7 +174,11 @@ int zmq::tipc_listener_t::set_address (const char *addr_)
// Bind the socket to tipc name
if (address.is_service ()) {
#ifdef ZMQ_HAVE_VXWORKS
rc = bind (s, (sockaddr *) address.addr (), address.addrlen ());
#else
rc = bind (s, address.addr (), address.addrlen ());
#endif
if (rc != 0)
goto error;
}
......@@ -199,7 +216,11 @@ zmq::fd_t zmq::tipc_listener_t::accept ()
socklen_t ss_len = sizeof (ss);
zmq_assert (s != retired_fd);
#ifdef ZMQ_HAVE_VXWORKS
fd_t sock = ::accept (s, (struct sockaddr *) &ss, (int *) &ss_len);
#else
fd_t sock = ::accept (s, (struct sockaddr *) &ss, &ss_len);
#endif
if (sock == -1) {
errno_assert (errno == EAGAIN || errno == EWOULDBLOCK
|| errno == ENOBUFS || errno == EINTR
......
......@@ -35,6 +35,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#ifdef ZMQ_HAVE_VXWORKS
#include <sockLib.h>
#endif
#endif
#include "udp_engine.hpp"
......@@ -128,8 +131,13 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
errno_assert (rc == 0);
#endif
#ifdef ZMQ_HAVE_VXWORKS
rc = bind (fd, (sockaddr *) address->resolved.udp_addr->bind_addr (),
address->resolved.udp_addr->bind_addrlen ());
#else
rc = bind (fd, address->resolved.udp_addr->bind_addr (),
address->resolved.udp_addr->bind_addrlen ());
#endif
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
......@@ -282,6 +290,10 @@ void zmq::udp_engine_t::out_event ()
rc = sendto (fd, (const char *) out_buffer, (int) size, 0, out_address,
(int) out_addrlen);
wsa_assert (rc != SOCKET_ERROR);
#elif defined ZMQ_HAVE_VXWORKS
rc = sendto (fd, (caddr_t) out_buffer, size, 0,
(sockaddr *) out_address, (int) out_addrlen);
errno_assert (rc != -1);
#else
rc = sendto (fd, out_buffer, size, 0, out_address, out_addrlen);
errno_assert (rc != -1);
......@@ -321,6 +333,14 @@ void zmq::udp_engine_t::in_event ()
|| last_error == WSAEWOULDBLOCK);
return;
}
#elif defined ZMQ_HAVE_VXWORKS
int nbytes = recvfrom (fd, (char *) in_buffer, MAX_UDP_MSG, 0,
(sockaddr *) &in_address, (int *) &in_addrlen);
if (nbytes == -1) {
errno_assert (errno != EBADF && errno != EFAULT && errno != ENOMEM
&& errno != ENOTSOCK);
return;
}
#else
int nbytes = recvfrom (fd, in_buffer, MAX_UDP_MSG, 0,
(sockaddr *) &in_address, &in_addrlen);
......
......@@ -62,9 +62,9 @@ template <typename T, int N> class ypipe_t : public ypipe_base_t<T>
// just to keep ICC and code checking tools from complaining.
inline virtual ~ypipe_t () {}
// Following function (write) deliberately copies uninitialised data
// when used with zmq_msg. Initialising the VSM body for
// non-VSM messages won't be good for performance.
// Following function (write) deliberately copies uninitialised data
// when used with zmq_msg. Initialising the VSM body for
// non-VSM messages won't be good for performance.
#ifdef ZMQ_HAVE_OPENVMS
#pragma message save
......
......@@ -53,9 +53,9 @@ template <typename T> class ypipe_conflate_t : public ypipe_base_t<T>
// just to keep ICC and code checking tools from complaining.
inline virtual ~ypipe_conflate_t () {}
// Following function (write) deliberately copies uninitialised data
// when used with zmq_msg. Initialising the VSM body for
// non-VSM messages won't be good for performance.
// Following function (write) deliberately copies uninitialised data
// when used with zmq_msg. Initialising the VSM body for
// non-VSM messages won't be good for performance.
#ifdef ZMQ_HAVE_OPENVMS
#pragma message save
......
......@@ -55,6 +55,9 @@
#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
#ifdef ZMQ_HAVE_VXWORKS
#include <strings.h>
#endif
#endif
// XSI vector I/O
......@@ -682,7 +685,7 @@ const char *zmq_msg_gets (const zmq_msg_t *msg_, const char *property_)
}
}
// Polling.
// Polling.
#if defined ZMQ_HAVE_POLLER
inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
......@@ -967,6 +970,11 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
#if defined ZMQ_HAVE_WINDOWS
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
return 0;
#elif defined ZMQ_HAVE_VXWORKS
struct timespec ns_;
ns_.tv_sec = timeout_ / 1000;
ns_.tv_nsec = timeout_ % 1000 * 1000000;
return nanosleep (&ns_, 0);
#else
return usleep (timeout_ * 1000);
#endif
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment