Commit 3c614074 authored by Richard Newton's avatar Richard Newton

Merge branch 'master' of https://github.com/zeromq/libzmq

parents fe2532e8 e71ebbb7
......@@ -613,6 +613,11 @@ else()
OUTPUT_NAME "zmq"
PUBLIC_HEADER "${public_headers}")
endif()
add_library(libzmq-static STATIC ${sources} ${public_headers} ${html-docs} ${readme-docs} ${zmq-pkgconfig})
set_target_properties(libzmq-static PROPERTIES
PUBLIC_HEADER "${public_headers}"
COMPILE_FLAGS "-DZMQ_STATIC"
OUTPUT_NAME "zmq-static")
endif()
target_link_libraries(libzmq ${SODIUM_LIBRARY} ${CMAKE_THREAD_LIBS_INIT})
......@@ -777,7 +782,7 @@ if(MSVC)
COMPONENT Runtime)
endif()
else()
install(TARGETS libzmq
install(TARGETS libzmq libzmq-static
RUNTIME DESTINATION bin
ARCHIVE DESTINATION lib
LIBRARY DESTINATION lib
......
......@@ -113,6 +113,8 @@
<ClInclude Include="..\..\..\..\src\pgm_sender.hpp" />
<ClInclude Include="..\..\..\..\src\pgm_socket.hpp" />
<ClInclude Include="..\..\..\..\src\pipe.hpp" />
<ClInclude Include="..\..\..\..\src\socks.hpp" />
<ClInclude Include="..\..\..\..\src\socks_connecter.hpp" />
<ClInclude Include="..\..\platform.hpp" />
<ClInclude Include="..\..\..\..\src\poll.hpp" />
<ClInclude Include="..\..\..\..\src\poller.hpp" />
......@@ -211,6 +213,8 @@
<ClCompile Include="..\..\..\..\src\session_base.cpp" />
<ClCompile Include="..\..\..\..\src\signaler.cpp" />
<ClCompile Include="..\..\..\..\src\socket_base.cpp" />
<ClCompile Include="..\..\..\..\src\socks.cpp" />
<ClCompile Include="..\..\..\..\src\socks_connecter.cpp" />
<ClCompile Include="..\..\..\..\src\stream.cpp" />
<ClCompile Include="..\..\..\..\src\stream_engine.cpp" />
<ClCompile Include="..\..\..\..\src\sub.cpp" />
......
......@@ -113,6 +113,8 @@
<ClInclude Include="..\..\..\..\src\pgm_sender.hpp" />
<ClInclude Include="..\..\..\..\src\pgm_socket.hpp" />
<ClInclude Include="..\..\..\..\src\pipe.hpp" />
<ClInclude Include="..\..\..\..\src\socks.hpp" />
<ClInclude Include="..\..\..\..\src\socks_connecter.hpp" />
<ClInclude Include="..\..\platform.hpp" />
<ClInclude Include="..\..\..\..\src\poll.hpp" />
<ClInclude Include="..\..\..\..\src\poller.hpp" />
......@@ -211,6 +213,8 @@
<ClCompile Include="..\..\..\..\src\session_base.cpp" />
<ClCompile Include="..\..\..\..\src\signaler.cpp" />
<ClCompile Include="..\..\..\..\src\socket_base.cpp" />
<ClCompile Include="..\..\..\..\src\socks.cpp" />
<ClCompile Include="..\..\..\..\src\socks_connecter.cpp" />
<ClCompile Include="..\..\..\..\src\stream.cpp" />
<ClCompile Include="..\..\..\..\src\stream_engine.cpp" />
<ClCompile Include="..\..\..\..\src\sub.cpp" />
......
......@@ -11,15 +11,12 @@ Buildroot: %{_tmppath}/%{name}-%{version}-%{release}-root
BuildRequires: gcc, make, gcc-c++, libstdc++-devel
Requires: libstdc++
%if 0%{?rhel}
%if 0%{?rhel} == 6
if %{?rhel}%{!?rhel:0} >= 6
BuildRequires: libuuid-devel
Requires: libuuid
%endif
%if 0%{?rhel} == 5
%elseif %{?rhel}%{!?rhel:0} >= 5
BuildRequires: e2fsprogs-devel
Requires: e2fsprogs
%endif
%else
BuildRequires: uuid-devel
Requires: uuid
......@@ -87,11 +84,14 @@ This package contains ZeroMQ related development libraries and header files.
%defattr(-,root,root,-)
# docs in the main package
%doc AUTHORS ChangeLog COPYING COPYING.LESSER NEWS README
%doc AUTHORS ChangeLog COPYING COPYING.LESSER NEWS
# binaries
%{_bindir}/curve_keygen
# libraries
%{_libdir}/libzmq.so.3
%{_libdir}/libzmq.so.3.0.0
%{_libdir}/libzmq.so.3.1.0
%{_mandir}/man7/zmq.7.gz
......@@ -128,7 +128,6 @@ This package contains ZeroMQ related development libraries and header files.
%{_mandir}/man3/zmq_msg_size.3.gz
%{_mandir}/man3/zmq_msg_get.3.gz
%{_mandir}/man3/zmq_msg_more.3.gz
%{_mandir}/man3/zmq_msg_recv.3.gz
%{_mandir}/man3/zmq_msg_send.3.gz
%{_mandir}/man3/zmq_msg_set.3.gz
%{_mandir}/man3/zmq_poll.3.gz
......@@ -149,8 +148,18 @@ This package contains ZeroMQ related development libraries and header files.
%{_mandir}/man7/zmq_ipc.7.gz
%{_mandir}/man7/zmq_pgm.7.gz
%{_mandir}/man7/zmq_tcp.7.gz
%{_mandir}/man3/zmq_curve_keypair.3.gz
%{_mandir}/man3/zmq_send_const.3.gz
%{_mandir}/man3/zmq_z85_decode.3.gz
%{_mandir}/man3/zmq_z85_encode.3.gz
%{_mandir}/man7/zmq_curve.7.gz
%{_mandir}/man7/zmq_null.7.gz
%{_mandir}/man7/zmq_plain.7.gz
%changelog
* Tue Jun 10 2014 Tristian Celestin <tristian.celestin@outlook.com> 4.0.4
- Updated packaged files
* Mon Nov 26 2012 Justin Cook <jhcook@gmail.com> 3.2.2
- Update packaged files
......
......@@ -69,7 +69,7 @@ EXAMPLES
.Assigning a local address to a socket
----
// TCP port 5555 on all available interfaces
rc = zmq_bind(socket, "tcp:/// :5555");
rc = zmq_bind(socket, "tcp://*:5555");
assert (rc == 0);
// TCP port 5555 on the local loop-back interface on all platforms
rc = zmq_bind(socket, "tcp://127.0.0.1:5555");
......
......@@ -179,10 +179,14 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
#define ZMQ_IO_THREADS 1
#define ZMQ_MAX_SOCKETS 2
#define ZMQ_SOCKET_LIMIT 3
#define ZMQ_THREAD_PRIORITY 3
#define ZMQ_THREAD_SCHED_POLICY 4
/* Default for new contexts */
#define ZMQ_IO_THREADS_DFLT 1
#define ZMQ_MAX_SOCKETS_DFLT 1023
#define ZMQ_THREAD_PRIORITY_DFLT -1
#define ZMQ_THREAD_SCHED_POLICY_DFLT -1
ZMQ_EXPORT void *zmq_ctx_new (void);
ZMQ_EXPORT int zmq_ctx_term (void *context);
......@@ -359,11 +363,6 @@ ZMQ_EXPORT int zmq_send_const (void *s, const void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_socket_monitor (void *s, const char *addr, int events);
/* Experimental */
struct iovec;
ZMQ_EXPORT int zmq_sendiov (void *s, struct iovec *iov, size_t count, int flags);
ZMQ_EXPORT int zmq_recviov (void *s, struct iovec *iov, size_t *count, int flags);
/******************************************************************************/
/* I/O multiplexing. */
......@@ -389,12 +388,16 @@ typedef struct zmq_pollitem_t
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
/* Built-in message proxy (3-way) */
/******************************************************************************/
/* Message proxying */
/******************************************************************************/
ZMQ_EXPORT int zmq_proxy (void *frontend, void *backend, void *capture);
ZMQ_EXPORT int zmq_proxy_steerable (void *frontend, void *backend, void *capture, void *control);
/* Probe library capabilities */
/******************************************************************************/
/* Probe library capabilities */
/******************************************************************************/
#define ZMQ_HAS_CAPABILITIES 1
ZMQ_EXPORT int zmq_has (const char *capability);
......@@ -410,6 +413,54 @@ ZMQ_EXPORT int zmq_sendmsg (void *s, zmq_msg_t *msg, int flags);
ZMQ_EXPORT int zmq_recvmsg (void *s, zmq_msg_t *msg, int flags);
/******************************************************************************/
/* Encryption functions */
/******************************************************************************/
/* Encode data with Z85 encoding. Returns encoded data */
ZMQ_EXPORT char *zmq_z85_encode (char *dest, uint8_t *data, size_t size);
/* Decode data with Z85 encoding. Returns decoded data */
ZMQ_EXPORT uint8_t *zmq_z85_decode (uint8_t *dest, char *string);
/* Generate z85-encoded public and private keypair with libsodium. */
/* Returns 0 on success. */
ZMQ_EXPORT int zmq_curve_keypair (char *z85_public_key, char *z85_secret_key);
/******************************************************************************/
/* These functions are not documented by man pages -- use at your own risk. */
/* If you need these to be part of the formal ZMQ API, then (a) write a man */
/* page, and (b) write a test case in tests. */
/******************************************************************************/
struct iovec;
ZMQ_EXPORT int zmq_sendiov (void *s, struct iovec *iov, size_t count, int flags);
ZMQ_EXPORT int zmq_recviov (void *s, struct iovec *iov, size_t *count, int flags);
/* Helper functions are used by perf tests so that they don't have to care */
/* about minutiae of time-related functions on different OS platforms. */
/* Starts the stopwatch. Returns the handle to the watch. */
ZMQ_EXPORT void *zmq_stopwatch_start (void);
/* Stops the stopwatch. Returns the number of microseconds elapsed since */
/* the stopwatch was started. */
ZMQ_EXPORT unsigned long zmq_stopwatch_stop (void *watch_);
/* Sleeps for specified number of seconds. */
ZMQ_EXPORT void zmq_sleep (int seconds_);
typedef void (zmq_thread_fn) (void*);
/* Start a thread. Returns a handle to the thread. */
ZMQ_EXPORT void *zmq_threadstart (zmq_thread_fn* func, void* arg);
/* Wait for thread to complete then free up resources. */
ZMQ_EXPORT void zmq_threadclose (void* thread);
#undef ZMQ_EXPORT
#ifdef __cplusplus
......
......@@ -17,89 +17,4 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_UTILS_H_INCLUDED__
#define __ZMQ_UTILS_H_INCLUDED__
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
/* Define integer types needed for event interface */
#if defined ZMQ_HAVE_SOLARIS || defined ZMQ_HAVE_OPENVMS
# include <inttypes.h>
#elif defined _MSC_VER && _MSC_VER < 1600
# ifndef int32_t
typedef __int32 int32_t;
# endif
# ifndef uint16_t
typedef unsigned __int16 uint16_t;
# endif
#else
# include <stdint.h>
#endif
#ifdef __cplusplus
extern "C" {
#endif
/* Handle DSO symbol visibility */
#if defined _WIN32
# if defined ZMQ_STATIC
# define ZMQ_EXPORT
# elif defined DLL_EXPORT
# define ZMQ_EXPORT __declspec(dllexport)
# else
# define ZMQ_EXPORT __declspec(dllimport)
# endif
#else
# if defined __SUNPRO_C || defined __SUNPRO_CC
# define ZMQ_EXPORT __global
# elif (defined __GNUC__ && __GNUC__ >= 4) || defined __INTEL_COMPILER
# define ZMQ_EXPORT __attribute__ ((visibility("default")))
# else
# define ZMQ_EXPORT
# endif
#endif
/* These functions are documented by man pages */
/* Encode data with Z85 encoding. Returns encoded data */
ZMQ_EXPORT char *zmq_z85_encode (char *dest, uint8_t *data, size_t size);
/* Decode data with Z85 encoding. Returns decoded data */
ZMQ_EXPORT uint8_t *zmq_z85_decode (uint8_t *dest, char *string);
/* Generate z85-encoded public and private keypair with libsodium. */
/* Returns 0 on success. */
ZMQ_EXPORT int zmq_curve_keypair (char *z85_public_key, char *z85_secret_key);
typedef void (zmq_thread_fn) (void*);
/* These functions are not documented by man pages */
/* Helper functions are used by perf tests so that they don't have to care */
/* about minutiae of time-related functions on different OS platforms. */
/* Starts the stopwatch. Returns the handle to the watch. */
ZMQ_EXPORT void *zmq_stopwatch_start (void);
/* Stops the stopwatch. Returns the number of microseconds elapsed since */
/* the stopwatch was started. */
ZMQ_EXPORT unsigned long zmq_stopwatch_stop (void *watch_);
/* Sleeps for specified number of seconds. */
ZMQ_EXPORT void zmq_sleep (int seconds_);
/* Start a thread. Returns a handle to the thread. */
ZMQ_EXPORT void *zmq_threadstart (zmq_thread_fn* func, void* arg);
/* Wait for thread to complete then free up resources. */
ZMQ_EXPORT void zmq_threadclose (void* thread);
#undef ZMQ_EXPORT
#ifdef __cplusplus
}
#endif
#endif
/* This file is deprecated, and all its functionality provided by zmq.h */
\ No newline at end of file
......@@ -32,7 +32,7 @@ zmq::address_t::address_t (
: protocol (protocol_),
address (address_)
{
memset (&resolved, 0, sizeof (resolved));
memset (&resolved, 0, sizeof resolved);
}
zmq::address_t::~address_t ()
......@@ -53,7 +53,8 @@ zmq::address_t::~address_t ()
}
#endif
#if defined ZMQ_HAVE_TIPC
else if (protocol == "tipc") {
else
if (protocol == "tipc") {
if (resolved.tipc_addr) {
delete resolved.tipc_addr;
resolved.tipc_addr = 0;
......@@ -66,20 +67,20 @@ int zmq::address_t::to_string (std::string &addr_) const
{
if (protocol == "tcp") {
if (resolved.tcp_addr)
return resolved.tcp_addr->to_string(addr_);
return resolved.tcp_addr->to_string (addr_);
}
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
else
else
if (protocol == "ipc") {
if (resolved.ipc_addr)
return resolved.ipc_addr->to_string(addr_);
return resolved.ipc_addr->to_string (addr_);
}
#endif
#if defined ZMQ_HAVE_TIPC
else if (protocol == "tipc") {
if (resolved.tipc_addr) {
return resolved.tipc_addr->to_string(addr_);
}
else
if (protocol == "tipc") {
if (resolved.tipc_addr)
return resolved.tipc_addr->to_string (addr_);
}
#endif
......
......@@ -57,7 +57,9 @@ zmq::ctx_t::ctx_t () :
slots (NULL),
max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
io_thread_count (ZMQ_IO_THREADS_DFLT),
ipv6 (false)
ipv6 (false),
thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
{
#ifdef HAVE_FORK
pid = getpid();
......@@ -194,6 +196,18 @@ int zmq::ctx_t::set (int option_, int optval_)
ipv6 = (optval_ != 0);
opt_sync.unlock ();
}
else
if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) {
opt_sync.lock();
thread_priority = optval_;
opt_sync.unlock();
}
else
if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) {
opt_sync.lock();
thread_sched_policy = optval_;
opt_sync.unlock();
}
else {
errno = EINVAL;
rc = -1;
......@@ -324,6 +338,12 @@ zmq::object_t *zmq::ctx_t::get_reaper ()
return reaper;
}
void zmq::ctx_t::start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const
{
thread_.start(tfn_, arg_);
thread_.setSchedulingParameters(thread_priority, thread_sched_policy);
}
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
{
slots [tid_]->send (command_);
......@@ -366,6 +386,26 @@ int zmq::ctx_t::register_endpoint (const char *addr_,
return 0;
}
int zmq::ctx_t::unregister_endpoint (
const std::string &addr_, socket_base_t *socket_)
{
endpoints_sync.lock ();
const endpoints_t::iterator it = endpoints.find (addr_);
if (it == endpoints.end () || it->second.socket != socket_) {
endpoints_sync.unlock ();
errno = ENOENT;
return -1;
}
// Remove endpoint.
endpoints.erase (it);
endpoints_sync.unlock ();
return 0;
}
void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
{
endpoints_sync.lock ();
......
......@@ -32,6 +32,7 @@
#include "stdint.hpp"
#include "options.hpp"
#include "atomic_counter.hpp"
#include "thread.hpp"
namespace zmq
{
......@@ -87,6 +88,9 @@ namespace zmq
zmq::socket_base_t *create_socket (int type_);
void destroy_socket (zmq::socket_base_t *socket_);
// Start a new thread with proper scheduling parameters.
void start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const;
// Send command to the destination thread.
void send_command (uint32_t tid_, const command_t &command_);
......@@ -100,6 +104,7 @@ namespace zmq
// Management of inproc endpoints.
int register_endpoint (const char *addr_, const endpoint_t &endpoint_);
int unregister_endpoint (const std::string &addr_, socket_base_t *socket_);
void unregister_endpoints (zmq::socket_base_t *socket_);
endpoint_t find_endpoint (const char *addr_);
void pend_connection (const std::string &addr_,
......@@ -185,6 +190,10 @@ namespace zmq
// Is IPv6 enabled on this context?
bool ipv6;
// Thread scheduling parameters.
int thread_priority;
int thread_sched_policy;
// Synchronisation of access to context options.
mutex_t opt_sync;
......
......@@ -35,7 +35,8 @@
#include "config.hpp"
#include "i_poll_events.hpp"
zmq::devpoll_t::devpoll_t () :
zmq::devpoll_t::devpoll_t (const zmq::ctx_t &ctx_) :
ctx(ctx_),
stopping (false)
{
devpoll_fd = open ("/dev/poll", O_RDWR);
......@@ -125,7 +126,7 @@ void zmq::devpoll_t::reset_pollout (handle_t handle_)
void zmq::devpoll_t::start ()
{
worker.start (worker_routine, this);
ctx.start_thread (worker, worker_routine, this);
}
void zmq::devpoll_t::stop ()
......
......@@ -26,6 +26,7 @@
#include <vector>
#include "ctx.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "poller_base.hpp"
......@@ -43,7 +44,7 @@ namespace zmq
typedef fd_t handle_t;
devpoll_t ();
devpoll_t (const ctx_t &ctx_);
~devpoll_t ();
// "poller" concept.
......@@ -66,6 +67,9 @@ namespace zmq
// Main event loop.
void loop ();
// Reference to ZMQ context.
const ctx_t &ctx;
// File descriptor referring to "/dev/poll" pseudo-device.
fd_t devpoll_fd;
......
......@@ -32,7 +32,8 @@
#include "config.hpp"
#include "i_poll_events.hpp"
zmq::epoll_t::epoll_t () :
zmq::epoll_t::epoll_t (const zmq::ctx_t &ctx_) :
ctx(ctx_),
stopping (false)
{
epoll_fd = epoll_create (1);
......@@ -118,7 +119,7 @@ void zmq::epoll_t::reset_pollout (handle_t handle_)
void zmq::epoll_t::start ()
{
worker.start (worker_routine, this);
ctx.start_thread (worker, worker_routine, this);
}
void zmq::epoll_t::stop ()
......
......@@ -27,6 +27,7 @@
#include <vector>
#include <sys/epoll.h>
#include "ctx.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "poller_base.hpp"
......@@ -45,7 +46,7 @@ namespace zmq
typedef void* handle_t;
epoll_t ();
epoll_t (const ctx_t &ctx_);
~epoll_t ();
// "poller" concept.
......@@ -68,6 +69,9 @@ namespace zmq
// Main event loop.
void loop ();
// Reference to ZMQ context.
const ctx_t &ctx;
// Main epoll file descriptor
fd_t epoll_fd;
......
......@@ -27,7 +27,7 @@
zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_)
{
poller = new (std::nothrow) poller_t;
poller = new (std::nothrow) poller_t (*ctx_);
alloc_assert (poller);
mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
......
......@@ -28,17 +28,16 @@
zmq::ipc_address_t::ipc_address_t ()
{
memset (&address, 0, sizeof (address));
memset (&address, 0, sizeof address);
}
zmq::ipc_address_t::ipc_address_t (const sockaddr *sa, socklen_t sa_len)
{
zmq_assert(sa && sa_len > 0);
zmq_assert (sa && sa_len > 0);
memset (&address, 0, sizeof (address));
if (sa->sa_family == AF_UNIX) {
memset (&address, 0, sizeof address);
if (sa->sa_family == AF_UNIX)
memcpy(&address, sa, sa_len);
}
}
zmq::ipc_address_t::~ipc_address_t ()
......@@ -47,11 +46,11 @@ zmq::ipc_address_t::~ipc_address_t ()
int zmq::ipc_address_t::resolve (const char *path_)
{
if (strlen (path_) >= sizeof (address.sun_path)) {
if (strlen (path_) >= sizeof address.sun_path) {
errno = ENAMETOOLONG;
return -1;
}
if (path_[0] == '@' && !path_[1]) {
if (path_ [0] == '@' && !path_ [1]) {
errno = EINVAL;
return -1;
}
......@@ -59,7 +58,7 @@ int zmq::ipc_address_t::resolve (const char *path_)
address.sun_family = AF_UNIX;
strcpy (address.sun_path, path_);
/* Abstract sockets start with '\0' */
if (path_[0] == '@')
if (path_ [0] == '@')
*address.sun_path = '\0';
return 0;
}
......@@ -73,7 +72,7 @@ int zmq::ipc_address_t::to_string (std::string &addr_)
std::stringstream s;
s << "ipc://";
if (!address.sun_path[0] && address.sun_path[1])
if (!address.sun_path [0] && address.sun_path [1])
s << "@" << address.sun_path + 1;
else
s << address.sun_path;
......@@ -88,9 +87,9 @@ const sockaddr *zmq::ipc_address_t::addr () const
socklen_t zmq::ipc_address_t::addrlen () const
{
if (!address.sun_path[0] && address.sun_path[1])
return (socklen_t) strlen(address.sun_path + 1) + sizeof (sa_family_t) + 1;
return (socklen_t) sizeof (address);
if (!address.sun_path [0] && address.sun_path [1])
return (socklen_t) strlen (address.sun_path + 1) + sizeof (sa_family_t) + 1;
return (socklen_t) sizeof address;
}
#endif
......@@ -41,7 +41,7 @@ namespace zmq
~ipc_address_t ();
// This function sets up the address for UNIX domain transport.
int resolve (const char* path_);
int resolve (const char *path_);
// The opposite to resolve()
int to_string (std::string &addr_);
......@@ -56,7 +56,7 @@ namespace zmq
ipc_address_t (const ipc_address_t&);
const ipc_address_t &operator = (const ipc_address_t&);
};
}
#endif
......
......@@ -45,6 +45,9 @@
#ifdef ZMQ_HAVE_SO_PEERCRED
# include <pwd.h>
# include <grp.h>
# if defined ZMQ_HAVE_OPENBSD
# define ucred sockpeercred
# endif
#endif
zmq::ipc_listener_t::ipc_listener_t (io_thread_t *io_thread_,
......
......@@ -42,7 +42,8 @@
#define kevent_udata_t void *
#endif
zmq::kqueue_t::kqueue_t () :
zmq::kqueue_t::kqueue_t (const zmq::ctx_t &ctx_) :
ctx(ctx_),
stopping (false)
{
// Create event queue
......@@ -144,7 +145,7 @@ void zmq::kqueue_t::reset_pollout (handle_t handle_)
void zmq::kqueue_t::start ()
{
worker.start (worker_routine, this);
ctx.start_thread (worker, worker_routine, this);
}
void zmq::kqueue_t::stop ()
......
......@@ -27,6 +27,7 @@
#include <vector>
#include <unistd.h>
#include "ctx.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "poller_base.hpp"
......@@ -45,7 +46,7 @@ namespace zmq
typedef void* handle_t;
kqueue_t ();
kqueue_t (const ctx_t &ctx_);
~kqueue_t ();
// "poller" concept.
......@@ -68,6 +69,9 @@ namespace zmq
// Main event loop.
void loop ();
// Reference to ZMQ context.
const ctx_t &ctx;
// File descriptor referring to the kernel event queue.
fd_t kqueue_fd;
......
......@@ -59,26 +59,28 @@ int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
{
// Try to get the command straight away.
if (active) {
bool ok = cpipe.read (cmd_);
if (ok)
if (cpipe.read (cmd_))
return 0;
// If there are no more commands available, switch into passive state.
active = false;
signaler.recv ();
}
// Wait for signal from the command sender.
int rc = signaler.wait (timeout_);
if (rc != 0 && (errno == EAGAIN || errno == EINTR))
const int rc = signaler.wait (timeout_);
if (rc == -1) {
errno_assert (errno == EAGAIN || errno == EINTR);
return -1;
}
// Receive the signal.
signaler.recv ();
// We've got the signal. Now we can switch into active state.
// Switch into active state.
active = true;
// Get a command.
errno_assert (rc == 0);
bool ok = cpipe.read (cmd_);
const bool ok = cpipe.read (cmd_);
zmq_assert (ok);
return 0;
}
......@@ -143,6 +143,12 @@ int zmq::object_t::register_endpoint (const char *addr_,
return ctx->register_endpoint (addr_, endpoint_);
}
int zmq::object_t::unregister_endpoint (
const std::string &addr_, socket_base_t *socket_)
{
return ctx->unregister_endpoint (addr_, socket_);
}
void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
{
return ctx->unregister_endpoints (socket_);
......
......@@ -61,6 +61,8 @@ namespace zmq
// repository of inproc endpoints.
int register_endpoint (const char *addr_,
const zmq::endpoint_t &endpoint_);
int unregister_endpoint (
const std::string &addr_, socket_base_t *socket_);
void unregister_endpoints (zmq::socket_base_t *socket_);
zmq::endpoint_t find_endpoint (const char *addr_);
void pend_connection (const std::string &addr_,
......
......@@ -30,7 +30,8 @@
#include "config.hpp"
#include "i_poll_events.hpp"
zmq::poll_t::poll_t () :
zmq::poll_t::poll_t (const zmq::ctx_t &ctx_) :
ctx(ctx_),
retired (false),
stopping (false)
{
......@@ -106,7 +107,7 @@ void zmq::poll_t::reset_pollout (handle_t handle_)
void zmq::poll_t::start ()
{
worker.start (worker_routine, this);
ctx.start_thread (worker, worker_routine, this);
}
void zmq::poll_t::stop ()
......
......@@ -28,6 +28,7 @@
#include <stddef.h>
#include <vector>
#include "ctx.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "poller_base.hpp"
......@@ -46,7 +47,7 @@ namespace zmq
typedef fd_t handle_t;
poll_t ();
poll_t (const ctx_t &ctx_);
~poll_t ();
// "poller" concept.
......@@ -69,6 +70,9 @@ namespace zmq
// Main event loop.
void loop ();
// Reference to ZMQ context.
const ctx_t &ctx;
struct fd_entry_t
{
fd_t index;
......
......@@ -26,7 +26,7 @@ zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
sockets (0),
terminating (false)
{
poller = new (std::nothrow) poller_t;
poller = new (std::nothrow) poller_t (*ctx_);
alloc_assert (poller);
mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
......
......@@ -41,7 +41,8 @@
#include "config.hpp"
#include "i_poll_events.hpp"
zmq::select_t::select_t () :
zmq::select_t::select_t (const zmq::ctx_t &ctx_) :
ctx(ctx_),
maxfd (retired_fd),
retired (false),
stopping (false)
......@@ -136,7 +137,7 @@ void zmq::select_t::reset_pollout (handle_t handle_)
void zmq::select_t::start ()
{
worker.start (worker_routine, this);
ctx.start_thread (worker, worker_routine, this);
}
void zmq::select_t::stop ()
......
......@@ -38,6 +38,7 @@
#include <sys/select.h>
#endif
#include "ctx.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "poller_base.hpp"
......@@ -56,7 +57,7 @@ namespace zmq
typedef fd_t handle_t;
select_t ();
select_t (const ctx_t &ctx_);
~select_t ();
// "poller" concept.
......@@ -79,6 +80,9 @@ namespace zmq
// Main event loop.
void loop ();
// Reference to ZMQ context.
const ctx_t &ctx;
struct fd_entry_t
{
fd_t fd;
......
......@@ -189,9 +189,13 @@ int zmq::socket_base_t::parse_uri (const char *uri_,
int zmq::socket_base_t::check_protocol (const std::string &protocol_)
{
// First check out whether the protcol is something we are aware of.
if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" &&
protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "tipc" &&
protocol_ != "norm") {
if (protocol_ != "inproc"
&& protocol_ != "ipc"
&& protocol_ != "tcp"
&& protocol_ != "pgm"
&& protocol_ != "epgm"
&& protocol_ != "tipc"
&& protocol_ != "norm") {
errno = EPROTONOSUPPORT;
return -1;
}
......@@ -356,12 +360,7 @@ int zmq::socket_base_t::bind (const char *addr_)
// Parse addr_ string.
std::string protocol;
std::string address;
rc = parse_uri (addr_, protocol, address);
if (rc != 0)
return -1;
rc = check_protocol (protocol);
if (rc != 0)
if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
return -1;
if (protocol == "inproc") {
......@@ -464,12 +463,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// Parse addr_ string.
std::string protocol;
std::string address;
rc = parse_uri (addr_, protocol, address);
if (rc != 0)
return -1;
rc = check_protocol (protocol);
if (rc != 0)
if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
return -1;
if (protocol == "inproc") {
......@@ -596,7 +590,40 @@ int zmq::socket_base_t::connect (const char *addr_)
// Resolve address (if needed by the protocol)
if (protocol == "tcp") {
// Defer resolution until a socket is opened
// Do some basic sanity checks on tcp:// address syntax
// - hostname starts with digit or letter, with embedded '-' or '.'
// - IPv6 address may contain hex chars and colons.
// - IPv4 address may contain decimal digits and dots.
// - Address must end in ":port" where port is *, or numeric
// - Address may contain two parts separated by ':'
// Following code is quick and dirty check to catch obvious errors,
// without trying to be fully accurate.
const char *check = address.c_str ();
if (isalnum (*check) || isxdigit (*check)) {
check++;
while (isalnum (*check)
|| isxdigit (*check)
|| *check == '.' || *check == '-' || *check == ':'|| *check == ';')
check++;
}
// Assume the worst, now look for success
rc = -1;
// Did we reach the end of the address safely?
if (*check == 0) {
// Do we have a valid port string? (cannot be '*' in connect
check = strrchr (address.c_str (), ':');
if (check) {
check++;
if (*check && (isdigit (*check)))
rc = 0; // Valid
}
}
if (rc == -1) {
errno = EINVAL;
delete paddr;
return -1;
}
// Defer resolution until a socket is opened
paddr->resolved.tcp_addr = NULL;
}
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
......@@ -711,16 +738,13 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
// Parse addr_ string.
std::string protocol;
std::string address;
rc = parse_uri (addr_, protocol, address);
if (rc != 0)
return -1;
rc = check_protocol (protocol);
if (rc != 0)
if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
return -1;
// Disconnect an inproc socket
if (protocol == "inproc") {
if (unregister_endpoint (std::string (addr_), this) == 0)
return 0;
std::pair <inprocs_t::iterator, inprocs_t::iterator> range = inprocs.equal_range (std::string (addr_));
if (range.first == range.second) {
errno = ENOENT;
......@@ -1190,12 +1214,7 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_)
// Parse addr_ string.
std::string protocol;
std::string address;
int rc = parse_uri (addr_, protocol, address);
if (rc != 0)
return -1;
rc = check_protocol (protocol);
if (rc != 0)
if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
return -1;
// Event notification only supported over inproc://
......@@ -1211,9 +1230,9 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_)
// Never block context termination on pending event messages
int linger = 0;
rc = zmq_setsockopt (monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
int rc = zmq_setsockopt (monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
if (rc == -1)
stop_monitor ();
stop_monitor ();
// Spawn the monitor socket endpoint
rc = zmq_bind (monitor_socket, addr_);
......
......@@ -26,6 +26,7 @@
#ifndef ZMQ_HAVE_WINDOWS
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#endif
......
......@@ -28,6 +28,9 @@
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#if defined ZMQ_HAVE_OPENBSD
#define ucred sockpeercred
#endif
#endif
#include <string.h>
......
......@@ -116,7 +116,7 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_
(void) ipv6_;
// Create a socket.
int sd = open_socket (AF_INET, SOCK_DGRAM, 0);
const int sd = open_socket (AF_INET, SOCK_DGRAM, 0);
errno_assert (sd != -1);
struct ifreq ifr;
......@@ -125,7 +125,7 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_
strncpy (ifr.ifr_name, nic_, sizeof ifr.ifr_name);
// Fetch interface address.
int rc = ioctl (sd, SIOCGIFADDR, (caddr_t) &ifr, sizeof (struct ifreq));
const int rc = ioctl (sd, SIOCGIFADDR, (caddr_t) &ifr, sizeof ifr);
// Clean up.
close (sd);
......@@ -413,7 +413,7 @@ int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv6_, boo
std::string addr_str (name_, delimiter - name_);
std::string port_str (delimiter + 1);
// Remove square brackets around the address, if any.
// Remove square brackets around the address, if any, as used in IPv6
if (addr_str.size () >= 2 && addr_str [0] == '[' &&
addr_str [addr_str.size () - 1] == ']')
addr_str = addr_str.substr (1, addr_str.size () - 2);
......@@ -466,7 +466,8 @@ int zmq::tcp_address_t::to_string (std::string &addr_)
return -1;
}
// not using service resolv because of https://github.com/zeromq/libzmq/commit/1824574f9b5a8ce786853320e3ea09fe1f822bc4
// Not using service resolv because of
// https://github.com/zeromq/libzmq/commit/1824574f9b5a8ce786853320e3ea09fe1f822bc4
char hbuf [NI_MAXHOST];
int rc = getnameinfo (addr (), addrlen (), hbuf, sizeof hbuf, NULL, 0, NI_NUMERICHOST);
if (rc != 0) {
......
......@@ -303,15 +303,18 @@ zmq::fd_t zmq::tcp_connecter_t::connect ()
#ifdef ZMQ_HAVE_WINDOWS
zmq_assert (rc == 0);
if (err != 0) {
wsa_assert (err == WSAECONNREFUSED
|| err == WSAETIMEDOUT
|| err == WSAECONNABORTED
|| err == WSAEHOSTUNREACH
|| err == WSAENETUNREACH
|| err == WSAENETDOWN
|| err == WSAEACCES
|| err == WSAEINVAL
|| err == WSAEADDRINUSE);
if (err != WSAECONNREFUSED
&& err != WSAETIMEDOUT
&& err != WSAECONNABORTED
&& err != WSAEHOSTUNREACH
&& err != WSAENETUNREACH
&& err != WSAENETDOWN
&& err != WSAEACCES
&& err != WSAEINVAL
&& err != WSAEADDRINUSE)
{
wsa_assert_no (err);
}
return retired_fd;
}
#else
......
......@@ -59,6 +59,11 @@ void zmq::thread_t::stop ()
win_assert (rc2 != 0);
}
void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_)
{
// not implemented
}
#else
#include <signal.h>
......@@ -97,6 +102,28 @@ void zmq::thread_t::stop ()
posix_assert (rc);
}
void zmq::thread_t::setSchedulingParameters(int priority_, int schedulingPolicy_)
{
int policy = 0;
struct sched_param param;
int rc = pthread_getschedparam(descriptor, &policy, &param);
posix_assert (rc);
if(priority_ != -1)
{
param.sched_priority = priority_;
}
if(schedulingPolicy_ != -1)
{
policy = schedulingPolicy_;
}
rc = pthread_setschedparam(descriptor, policy, &param);
posix_assert (rc);
}
#endif
......
......@@ -55,6 +55,10 @@ namespace zmq
// Waits for thread termination.
void stop ();
// Sets the thread scheduling parameters. Only implemented for
// pthread. Has no effect on other platforms.
void setSchedulingParameters(int priority_, int schedulingPolicy_);
// These are internal members. They should be private, however then
// they would not be accessible from the main C routine of the thread.
thread_fn *tfn;
......
......@@ -28,17 +28,16 @@
zmq::tipc_address_t::tipc_address_t ()
{
memset (&address, 0, sizeof (address));
memset (&address, 0, sizeof address);
}
zmq::tipc_address_t::tipc_address_t (const sockaddr *sa, socklen_t sa_len)
{
zmq_assert(sa && sa_len > 0);
zmq_assert (sa && sa_len > 0);
memset (&address, 0, sizeof (address));
if (sa->sa_family == AF_TIPC) {
memcpy(&address, sa, sa_len);
}
memset (&address, 0, sizeof address);
if (sa->sa_family == AF_TIPC)
memcpy (&address, sa, sa_len);
}
zmq::tipc_address_t::~tipc_address_t ()
......@@ -47,15 +46,15 @@ zmq::tipc_address_t::~tipc_address_t ()
int zmq::tipc_address_t::resolve (const char *name)
{
int res;
unsigned int type = 0;
unsigned int lower = 0;
unsigned int upper = 0;
res = sscanf(name, "{%u,%u,%u}", &type, &lower, &upper);
const int res = sscanf (name, "{%u,%u,%u}", &type, &lower, &upper);
if (res == 3)
goto nameseq;
else if (res == 2 && type > TIPC_RESERVED_TYPES) {
else
if (res == 2 && type > TIPC_RESERVED_TYPES) {
address.family = AF_TIPC;
address.addrtype = TIPC_ADDR_NAME;
address.addr.name.name.type = type;
......@@ -63,7 +62,7 @@ int zmq::tipc_address_t::resolve (const char *name)
/* Since we can't specify lookup domain when connecting
* (and we're not sure that we want it to be configurable)
* Change from 'closest first' approach, to search entire zone */
address.addr.name.domain = tipc_addr(1, 0, 0);
address.addr.name.domain = tipc_addr (1, 0, 0);
address.scope = 0;
return 0;
}
......@@ -71,7 +70,7 @@ int zmq::tipc_address_t::resolve (const char *name)
return EINVAL;
nameseq:
if (type < TIPC_RESERVED_TYPES || upper < lower)
return EINVAL;
return EINVAL;
address.family = AF_TIPC;
address.addrtype = TIPC_ADDR_NAMESEQ;
address.addr.nameseq.type = type;
......@@ -102,8 +101,7 @@ const sockaddr *zmq::tipc_address_t::addr () const
socklen_t zmq::tipc_address_t::addrlen () const
{
return (socklen_t) sizeof (address);
return (socklen_t) sizeof address;
}
#endif
......@@ -41,7 +41,7 @@ namespace zmq
~tipc_address_t ();
// This function sets up the address "{type, lower, upper}" for TIPC transport
int resolve (const char* name);
int resolve (const char *name);
// The opposite to resolve()
int to_string (std::string &addr_);
......
......@@ -168,10 +168,10 @@ namespace zmq
// The pipe mustn't be empty or the function crashes.
inline bool probe (bool (*fn)(const T &))
{
bool rc = check_read ();
zmq_assert (rc);
bool rc = check_read ();
zmq_assert (rc);
return (*fn) (queue.front ());
return (*fn) (queue.front ());
}
protected:
......
......@@ -648,10 +648,15 @@ int zmq_msg_set (zmq_msg_t *, int, int)
const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_)
{
zmq::metadata_t *metadata = ((zmq::msg_t*) msg_)->metadata ();
const char *value = NULL;
if (metadata)
return metadata->get (std::string (property_));
else
value = metadata->get (std::string (property_));
if (value)
return value;
else {
errno = EINVAL;
return NULL;
}
}
// Polling.
......
......@@ -31,14 +31,18 @@ int main (void)
int rc = zmq_connect (sock, "tcp://localhost:1234");
assert (rc == 0);
// Because of lazy resolution of TCP names, this will succeed
rc = zmq_connect (sock, "tcp://localhost:invalid");
assert (rc == 0);
assert (rc == -1);
// Because of lazy resolution of TCP names, this will succeed
rc = zmq_connect (sock, "tcp://in val id:1234");
assert (rc == 0);
assert (rc == -1);
rc = zmq_connect (sock, "tcp://");
assert (rc == -1);
rc = zmq_connect (sock, "tcp://192.168.0.200:*");
assert (rc == -1);
rc = zmq_connect (sock, "invalid://localhost:1234");
assert (rc == -1);
assert (errno == EPROTONOSUPPORT);
......
......@@ -100,6 +100,8 @@ int main (void)
assert (streq (zmq_msg_gets (&msg, "Hello"), "World"));
assert (streq (zmq_msg_gets (&msg, "Socket-Type"), "DEALER"));
assert (streq (zmq_msg_gets (&msg, "User-Id"), "anonymous"));
assert (zmq_msg_gets (&msg, "No Such") == NULL);
assert (zmq_errno () == EINVAL);
zmq_msg_close (&msg);
close_zero_linger (client);
......
......@@ -21,7 +21,6 @@
#define __TESTUTIL_HPP_INCLUDED__
#include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include "../src/stdint.hpp"
#include "platform.hpp"
......@@ -35,6 +34,7 @@
#include <assert.h>
#include <stdarg.h>
#include <string>
#include <string.h>
#if defined _WIN32
# if defined _MSC_VER
......
......@@ -24,10 +24,9 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <assert.h>
#include <platform.hpp>
#include <zmq.h>
#include <zmq_utils.h>
int main (void)
{
......@@ -40,11 +39,9 @@ int main (void)
char public_key [41];
char secret_key [41];
int rc = zmq_curve_keypair (public_key, secret_key);
if (rc != 0) {
if (zmq_errno () == ENOTSUP) {
if (zmq_curve_keypair (public_key, secret_key)) {
if (zmq_errno () == ENOTSUP)
puts ("To use curve_keygen, please install libsodium and then rebuild libzmq.");
}
exit (1);
}
......
......@@ -9,11 +9,13 @@
#define FOR(i,n) for (i = 0;i < n;++i)
#define sv static void
#ifndef TWEETNACL_BASE_H
typedef unsigned char u8;
typedef unsigned long u32;
typedef unsigned long long u64;
typedef long long i64;
typedef i64 gf[16];
#endif
extern void randombytes(u8 *,u64);
static const u8
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment