Commit 60e45f34 authored by Chuck Remes's avatar Chuck Remes

Merge pull request #279 from pieterh/master

Wrote new ctx API, added ZMQ_MAX_SOCKETS and ZMQ_IO_THREADS
parents 7d6d2f94 1e4c5b29
MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_device.3 zmq_init.3 \ MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_device.3 \
zmq_ctx_new.3 zmq_ctx_destroy.3 zmq_ctx_get.3 zmq_ctx_set.3 \
zmq_init.3 zmq_term.3 \
zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \ zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \
zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \ zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \
zmq_msg_send.3 zmq_msg_recv.3 \ zmq_msg_send.3 zmq_msg_recv.3 \
zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3 \ zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3 \
zmq_strerror.3 zmq_term.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3 \ zmq_strerror.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3 \
zmq_sendmsg.3 zmq_recvmsg.3 zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3 zmq_sendmsg.3 zmq_recvmsg.3 zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3
MAN7 = zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_epgm.7 zmq_inproc.7 zmq_ipc.7 MAN7 = zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_epgm.7 zmq_inproc.7 zmq_ipc.7
......
...@@ -30,9 +30,21 @@ provided by the 0MQ library. ...@@ -30,9 +30,21 @@ provided by the 0MQ library.
Context Context
~~~~~~~ ~~~~~~~
Before using any 0MQ library functions the caller must initialise a 0MQ Before using any 0MQ library functions you must create a 0MQ 'context'. When
'context' using _zmq_init()_. The following functions are provided to handle you exit your application you must destroy the 'context'. These functions let
initialisation and termination of a 'context': you work with 'contexts':
Create a new 0MQ context::
linkzmq:zmq_ctx_new[3]
Work with context properties::
linkzmq:zmq_ctx_set[3]
linkzmq:zmq_ctx_get[3]
Destroy a 0MQ context::
linkzmq:zmq_ctx_destroy[3]
These deprecated functions let you create and destroy 'contexts':
Initialise 0MQ context:: Initialise 0MQ context::
linkzmq:zmq_init[3] linkzmq:zmq_init[3]
......
zmq_ctx_destroy(3)
==================
NAME
----
zmq_ctx_destroy - destroy a 0MQ context
SYNOPSIS
--------
*int zmq_ctx_destroy (void '*context');*
DESCRIPTION
-----------
The _zmq_ctx_destroy()_ function shall destroy the 0MQ context 'context'.
Context termination is performed in the following steps:
1. Any blocking operations currently in progress on sockets open within
'context' shall return immediately with an error code of ETERM. With the
exception of _zmq_close()_, any further operations on sockets open within
'context' shall fail with an error code of ETERM.
2. After interrupting all blocking calls, _zmq_ctx_destroy()_ shall _block_ until the
following conditions are satisfied:
* All sockets open within 'context' have been closed with _zmq_close()_.
* For each socket within 'context', all messages sent by the application
with _zmq_send()_ have either been physically transferred to a network
peer, or the socket's linger period set with the _ZMQ_LINGER_ socket
option has expired.
For further details regarding socket linger behavior refer to the _ZMQ_LINGER_
option in linkzmq:zmq_setsockopt[3].
This function replaces the deprecated function linkzmq:zmq_term[3].
RETURN VALUE
------------
The _zmq_ctx_destroy()_ function shall return zero if successful. Otherwise
it shall return `-1` and set 'errno' to one of the values defined below.
ERRORS
------
*EFAULT*::
The provided 'context' was invalid.
*EINTR*::
Termination was interrupted by a signal. It can be restarted if needed.
SEE ALSO
--------
linkzmq:zmq[7]
linkzmq:zmq_init[3]
linkzmq:zmq_close[3]
linkzmq:zmq_setsockopt[3]
AUTHORS
-------
This 0MQ manual page was written by Pieter Hintjens <ph@imatix.com>
zmq_ctx_get(3)
==============
NAME
----
zmq_ctx_get - get context options
SYNOPSIS
--------
*int zmq_ctx_get (void '*context', int 'option_name');*
DESCRIPTION
-----------
The _zmq_ctx_get()_ function shall return the option specified by the
'option_name' argument.
The _zmq_ctx_get()_ function accepts the following option names:
ZMQ_IO_THREADS: Get number of I/O threads
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_IO_THREADS' argument returns the size of the 0MQ thread pool
for this context.
ZMQ_MAX_SOCKETS: Set maximum number of sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_MAX_SOCKETS' argument returns the maximum number of sockets
allowed for this context.
RETURN VALUE
------------
The _zmq_ctx_get()_ function returns a value of 0 or greater if successful.
Otherwise it returns `-1` and sets 'errno' to one of the values defined
below.
ERRORS
------
*EINVAL*::
The requested option _option_name_ is unknown.
EXAMPLE
-------
.Setting a limit on the number of sockets
----
void *context = zmq_ctx_new ();
zmq_ctx_get (context, ZMQ_MAX_SOCKETS, 256);
int max_sockets = zmq_ctx_get (context, ZMQ_MAX_SOCKETS);
assert (max_sockets == 256);
----
SEE ALSO
--------
linkzmq:zmq_ctx_set[3]
linkzmq:zmq[7]
AUTHORS
-------
This 0MQ manual page was written by Pieter Hintjens <ph@imatix.com>
zmq_ctx_new(3)
==============
NAME
----
zmq_ctx_new - create new 0MQ context
SYNOPSIS
--------
*void *zmq_ctx_new ();*
DESCRIPTION
-----------
The _zmq_ctx_new()_ function creates a new 0MQ 'context'.
This function replaces the deprecated function linkzmq:zmq_init[3].
.Thread safety
A 0MQ 'context' is thread safe and may be shared among as many application
threads as necessary, without any additional locking required on the part of
the caller.
RETURN VALUE
------------
The _zmq_ctx_new()_ function shall return an opaque handle to the newly created
'context' if successful. Otherwise it shall return NULL and set 'errno' to one
of the values defined below.
ERRORS
------
No error values are defined for this function.
SEE ALSO
--------
linkzmq:zmq[7]
linkzmq:zmq_ctx_put[3]
linkzmq:zmq_ctx_get[3]
linkzmq:zmq_ctx_destroy[3]
AUTHORS
-------
This 0MQ manual page was written by Pieter Hintjens <ph@imatix.com>
zmq_ctx_set(3)
==============
NAME
----
zmq_ctx_set - set context options
SYNOPSIS
--------
*int zmq_ctx_set (void '*context', int 'option_name', int 'option_value');*
DESCRIPTION
-----------
The _zmq_ctx_set()_ function shall set the option specified by the
'option_name' argument to the value of the 'option_value' argument.
The _zmq_ctx_set()_ function accepts the following options:
ZMQ_IO_THREADS: Set number of I/O threads
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_IO_THREADS' argument specifies the size of the 0MQ thread pool to
handle I/O operations. If your application is using only the 'inproc'
transport for messaging you may set this to zero, otherwise set it to at
least one. This option only applies before creating any sockets on the
context.
[horizontal]
Default value:: 1
ZMQ_MAX_SOCKETS: Set maximum number of sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_MAX_SOCKETS' argument sets the maximum number of sockets allowed
on the context.
[horizontal]
Default value:: 1024
RETURN VALUE
------------
The _zmq_ctx_set()_ function returns zero if successful. Otherwise it
returns `-1` and sets 'errno' to one of the values defined below.
ERRORS
------
*EINVAL*::
The requested option _option_name_ is unknown.
EXAMPLE
-------
.Setting a limit on the number of sockets
----
void *context = zmq_ctx_new ();
zmq_ctx_set (context, ZMQ_MAX_SOCKETS, 256);
int max_sockets = zmq_ctx_get (context, ZMQ_MAX_SOCKETS);
assert (max_sockets == 256);
----
SEE ALSO
--------
linkzmq:zmq_ctx_get[3]
linkzmq:zmq[7]
AUTHORS
-------
This 0MQ manual page was written by Pieter Hintjens <ph@imatix.com>
...@@ -25,6 +25,7 @@ A 0MQ 'context' is thread safe and may be shared among as many application ...@@ -25,6 +25,7 @@ A 0MQ 'context' is thread safe and may be shared among as many application
threads as necessary, without any additional locking required on the part of threads as necessary, without any additional locking required on the part of
the caller. the caller.
This function is deprecated by linkzmq:zmq_ctx_new[3].
RETURN VALUE RETURN VALUE
------------ ------------
......
...@@ -36,6 +36,7 @@ Context termination is performed in the following steps: ...@@ -36,6 +36,7 @@ Context termination is performed in the following steps:
For further details regarding socket linger behaviour refer to the _ZMQ_LINGER_ For further details regarding socket linger behaviour refer to the _ZMQ_LINGER_
option in linkzmq:zmq_setsockopt[3]. option in linkzmq:zmq_setsockopt[3].
This function is deprecated by linkzmq:zmq_ctx_destroy[3].
RETURN VALUE RETURN VALUE
------------ ------------
......
...@@ -67,33 +67,6 @@ extern "C" { ...@@ -67,33 +67,6 @@ extern "C" {
#define ZMQ_VERSION \ #define ZMQ_VERSION \
ZMQ_MAKE_VERSION(ZMQ_VERSION_MAJOR, ZMQ_VERSION_MINOR, ZMQ_VERSION_PATCH) ZMQ_MAKE_VERSION(ZMQ_VERSION_MAJOR, ZMQ_VERSION_MINOR, ZMQ_VERSION_PATCH)
/* ensure one of ZMQ_TYPE_SAFE/UNSAFE is defined */
/* Choose default based on version */
/* Uncomment to test */
/* #define ZMQ_EMULATE_TYPE_SAFE */
#if !defined(ZMQ_TYPE_SAFE) && !defined(ZMQ_TYPE_UNSAFE)
# if ZMQ_VERSION_MAJOR <= 3
# if defined ZMQ_EMULATE_TYPE_SAFE
# else
# define ZMQ_TYPE_UNSAFE
# endif
# else
# define ZMQ_TYPE_SAFE
# endif
#elif defined(ZMQ_TYPE_SAFE) && defined(ZMQ_TYPE_UNSAFE)
# error "BOTH ZMQ_TYPE_SAFE and ZMQ_TYPE_UNSAFE are defined!"
#endif
#ifdef ZMQ_TYPE_UNSAFE
typedef void *zmq_socket_t;
typedef void *zmq_ctx_t;
#else
typedef struct zmq_socket_t { void *data; } zmq_socket_t;
typedef struct zmq_ctx_t { void *data; } zmq_ctx_t;
#endif
/* Run-time API version detection */ /* Run-time API version detection */
ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch); ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
...@@ -152,6 +125,29 @@ ZMQ_EXPORT int zmq_errno (void); ...@@ -152,6 +125,29 @@ ZMQ_EXPORT int zmq_errno (void);
/* Resolves system errors and 0MQ errors to human-readable string. */ /* Resolves system errors and 0MQ errors to human-readable string. */
ZMQ_EXPORT const char *zmq_strerror (int errnum); ZMQ_EXPORT const char *zmq_strerror (int errnum);
/******************************************************************************/
/* 0MQ infrastructure (a.k.a. context) initialisation & termination. */
/******************************************************************************/
/* New API */
// Context options
#define ZMQ_IO_THREADS 1
#define ZMQ_MAX_SOCKETS 2
// Default for new contexts
#define ZMQ_IO_THREADS_DFLT 1
#define ZMQ_MAX_SOCKETS_DFLT 1024
ZMQ_EXPORT void *zmq_ctx_new (void);
ZMQ_EXPORT int zmq_ctx_destroy (void *context);
ZMQ_EXPORT int zmq_ctx_set (void *context, int option, int optval);
ZMQ_EXPORT int zmq_ctx_get (void *context, int option);
/* Old (legacy) API */
ZMQ_EXPORT void *zmq_init (int io_threads);
ZMQ_EXPORT int zmq_term (void *context);
/******************************************************************************/ /******************************************************************************/
/* 0MQ message definition. */ /* 0MQ message definition. */
/******************************************************************************/ /******************************************************************************/
...@@ -164,8 +160,8 @@ ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg); ...@@ -164,8 +160,8 @@ ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg);
ZMQ_EXPORT int zmq_msg_init_size (zmq_msg_t *msg, size_t size); ZMQ_EXPORT int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
ZMQ_EXPORT int zmq_msg_init_data (zmq_msg_t *msg, void *data, ZMQ_EXPORT int zmq_msg_init_data (zmq_msg_t *msg, void *data,
size_t size, zmq_free_fn *ffn, void *hint); size_t size, zmq_free_fn *ffn, void *hint);
ZMQ_EXPORT int zmq_msg_send (zmq_msg_t *msg, zmq_socket_t s, int flags); ZMQ_EXPORT int zmq_msg_send (zmq_msg_t *msg, void *s, int flags);
ZMQ_EXPORT int zmq_msg_recv (zmq_msg_t *msg, zmq_socket_t s, int flags); ZMQ_EXPORT int zmq_msg_recv (zmq_msg_t *msg, void *s, int flags);
ZMQ_EXPORT int zmq_msg_close (zmq_msg_t *msg); ZMQ_EXPORT int zmq_msg_close (zmq_msg_t *msg);
ZMQ_EXPORT int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src); ZMQ_EXPORT int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);
ZMQ_EXPORT int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src); ZMQ_EXPORT int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);
...@@ -177,13 +173,6 @@ ZMQ_EXPORT int zmq_msg_get (zmq_msg_t *msg, int option, void *optval, ...@@ -177,13 +173,6 @@ ZMQ_EXPORT int zmq_msg_get (zmq_msg_t *msg, int option, void *optval,
ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, const void *optval, ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, const void *optval,
size_t *optvallen); size_t *optvallen);
/******************************************************************************/
/* 0MQ infrastructure (a.k.a. context) initialisation & termination. */
/******************************************************************************/
ZMQ_EXPORT zmq_ctx_t zmq_init (int io_threads);
ZMQ_EXPORT int zmq_term (zmq_ctx_t context);
/******************************************************************************/ /******************************************************************************/
/* 0MQ socket definition. */ /* 0MQ socket definition. */
/******************************************************************************/ /******************************************************************************/
...@@ -239,23 +228,23 @@ ZMQ_EXPORT int zmq_term (zmq_ctx_t context); ...@@ -239,23 +228,23 @@ ZMQ_EXPORT int zmq_term (zmq_ctx_t context);
#define ZMQ_DONTWAIT 1 #define ZMQ_DONTWAIT 1
#define ZMQ_SNDMORE 2 #define ZMQ_SNDMORE 2
ZMQ_EXPORT zmq_socket_t zmq_socket (zmq_ctx_t context, int type); ZMQ_EXPORT void *zmq_socket (void *, int type);
ZMQ_EXPORT int zmq_close (zmq_socket_t s); ZMQ_EXPORT int zmq_close (void *s);
ZMQ_EXPORT int zmq_setsockopt (zmq_socket_t s, int option, const void *optval, ZMQ_EXPORT int zmq_setsockopt (void *s, int option, const void *optval,
size_t optvallen); size_t optvallen);
ZMQ_EXPORT int zmq_getsockopt (zmq_socket_t s, int option, void *optval, ZMQ_EXPORT int zmq_getsockopt (void *s, int option, void *optval,
size_t *optvallen); size_t *optvallen);
ZMQ_EXPORT int zmq_bind (zmq_socket_t s, const char *addr); ZMQ_EXPORT int zmq_bind (void *s, const char *addr);
ZMQ_EXPORT int zmq_connect (zmq_socket_t s, const char *addr); ZMQ_EXPORT int zmq_connect (void *s, const char *addr);
ZMQ_EXPORT int zmq_send (zmq_socket_t s, const void *buf, size_t len, int flags); ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_recv (zmq_socket_t s, void *buf, size_t len, int flags); ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_sendmsg (zmq_socket_t s, zmq_msg_t *msg, int flags); ZMQ_EXPORT int zmq_sendmsg (void *s, zmq_msg_t *msg, int flags);
ZMQ_EXPORT int zmq_recvmsg (zmq_socket_t s, zmq_msg_t *msg, int flags); ZMQ_EXPORT int zmq_recvmsg (void *s, zmq_msg_t *msg, int flags);
/* Experimental */ /* Experimental */
ZMQ_EXPORT int zmq_sendiov (zmq_socket_t s, struct iovec *iov, size_t count, int flags); ZMQ_EXPORT int zmq_sendiov (void *s, struct iovec *iov, size_t count, int flags);
ZMQ_EXPORT int zmq_recviov (zmq_socket_t s, struct iovec *iov, size_t *count, int flags); ZMQ_EXPORT int zmq_recviov (void *s, struct iovec *iov, size_t *count, int flags);
/******************************************************************************/ /******************************************************************************/
/* I/O multiplexing. */ /* I/O multiplexing. */
...@@ -267,7 +256,7 @@ ZMQ_EXPORT int zmq_recviov (zmq_socket_t s, struct iovec *iov, size_t *count, in ...@@ -267,7 +256,7 @@ ZMQ_EXPORT int zmq_recviov (zmq_socket_t s, struct iovec *iov, size_t *count, in
typedef struct typedef struct
{ {
zmq_socket_t socket; void *socket;
#if defined _WIN32 #if defined _WIN32
SOCKET fd; SOCKET fd;
#else #else
...@@ -287,7 +276,7 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); ...@@ -287,7 +276,7 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
#define ZMQ_FORWARDER 2 #define ZMQ_FORWARDER 2
#define ZMQ_QUEUE 3 #define ZMQ_QUEUE 3
ZMQ_EXPORT int zmq_device (int device, void * insocket, void* outsocket); ZMQ_EXPORT int zmq_device (int device, void *insocket, void* outsocket);
#undef ZMQ_EXPORT #undef ZMQ_EXPORT
......
/* /*
Copyright (c) 2007-2012 iMatix Corporation
Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ. This file is part of 0MQ.
...@@ -37,45 +37,21 @@ ...@@ -37,45 +37,21 @@
#include "err.hpp" #include "err.hpp"
#include "msg.hpp" #include "msg.hpp"
zmq::ctx_t::ctx_t (uint32_t io_threads_) : zmq::ctx_t::ctx_t () :
tag (0xbadcafe0), tag (0xabadcafe),
terminating (false) starting (true),
terminating (false),
reaper (NULL),
slot_count (0),
slots (NULL),
max_sockets (ZMQ_MAX_SOCKETS_DFLT),
io_thread_count (ZMQ_IO_THREADS_DFLT)
{ {
// Initialise the array of mailboxes. Additional three slots are for
// internal log socket and the zmq_term thread the reaper thread.
slot_count = max_sockets + io_threads_ + 3;
slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
alloc_assert (slots);
// Initialise the infrastructure for zmq_term thread.
slots [term_tid] = &term_mailbox;
// Create the reaper thread.
reaper = new (std::nothrow) reaper_t (this, reaper_tid);
alloc_assert (reaper);
slots [reaper_tid] = reaper->get_mailbox ();
reaper->start ();
// Create I/O thread objects and launch them.
for (uint32_t i = 2; i != io_threads_ + 2; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
alloc_assert (io_thread);
io_threads.push_back (io_thread);
slots [i] = io_thread->get_mailbox ();
io_thread->start ();
}
// In the unused part of the slot array, create a list of empty slots.
for (int32_t i = (int32_t) slot_count - 1;
i >= (int32_t) io_threads_ + 2; i--) {
empty_slots.push_back (i);
slots [i] = NULL;
}
} }
bool zmq::ctx_t::check_tag () bool zmq::ctx_t::check_tag ()
{ {
return tag == 0xbadcafe0; return tag == 0xabadcafe;
} }
zmq::ctx_t::~ctx_t () zmq::ctx_t::~ctx_t ()
...@@ -93,12 +69,14 @@ zmq::ctx_t::~ctx_t () ...@@ -93,12 +69,14 @@ zmq::ctx_t::~ctx_t ()
delete io_threads [i]; delete io_threads [i];
// Deallocate the reaper thread object. // Deallocate the reaper thread object.
delete reaper; if (reaper)
delete reaper;
// Deallocate the array of mailboxes. No special work is // Deallocate the array of mailboxes. No special work is
// needed as mailboxes themselves were deallocated with their // needed as mailboxes themselves were deallocated with their
// corresponding io_thread/socket objects. // corresponding io_thread/socket objects.
free (slots); if (slots)
free (slots);
// Remove the tag, so that the object is considered dead. // Remove the tag, so that the object is considered dead.
tag = 0xdeadbeef; tag = 0xdeadbeef;
...@@ -106,35 +84,40 @@ zmq::ctx_t::~ctx_t () ...@@ -106,35 +84,40 @@ zmq::ctx_t::~ctx_t ()
int zmq::ctx_t::terminate () int zmq::ctx_t::terminate ()
{ {
// Check whether termination was already underway, but interrupted and now if (!starting) {
// restarted.
slot_sync.lock ();
bool restarted = terminating;
slot_sync.unlock ();
// First attempt to terminate the context. // Check whether termination was already underway, but interrupted and now
if (!restarted) { // restarted.
// First send stop command to sockets so that any blocking calls can be
// interrupted. If there are no sockets we can ask reaper thread to stop.
slot_sync.lock (); slot_sync.lock ();
bool restarted = terminating;
terminating = true; terminating = true;
for (sockets_t::size_type i = 0; i != sockets.size (); i++)
sockets [i]->stop ();
if (sockets.empty ())
reaper->stop ();
slot_sync.unlock (); slot_sync.unlock ();
}
// Wait till reaper thread closes all the sockets. // First attempt to terminate the context.
command_t cmd; if (!restarted) {
int rc = term_mailbox.recv (&cmd, -1);
if (rc == -1 && errno == EINTR) // First send stop command to sockets so that any blocking calls
return -1; // can be interrupted. If there are no sockets we can ask reaper
zmq_assert (rc == 0); // thread to stop.
zmq_assert (cmd.type == command_t::done); slot_sync.lock ();
slot_sync.lock (); for (sockets_t::size_type i = 0; i != sockets.size (); i++)
zmq_assert (sockets.empty ()); sockets [i]->stop ();
slot_sync.unlock (); if (sockets.empty ())
reaper->stop ();
slot_sync.unlock ();
}
// Wait till reaper thread closes all the sockets.
command_t cmd;
int rc = term_mailbox.recv (&cmd, -1);
if (rc == -1 && errno == EINTR)
return -1;
zmq_assert (rc == 0);
zmq_assert (cmd.type == command_t::done);
slot_sync.lock ();
zmq_assert (sockets.empty ());
slot_sync.unlock ();
}
// Deallocate the resources. // Deallocate the resources.
delete this; delete this;
...@@ -142,8 +125,84 @@ int zmq::ctx_t::terminate () ...@@ -142,8 +125,84 @@ int zmq::ctx_t::terminate ()
return 0; return 0;
} }
int zmq::ctx_t::set (int option_, int optval_)
{
int rc = 0;
if (option_ == ZMQ_MAX_SOCKETS) {
opt_sync.lock ();
max_sockets = optval_;
opt_sync.unlock ();
}
else
if (option_ == ZMQ_IO_THREADS) {
opt_sync.lock ();
io_thread_count = optval_;
opt_sync.unlock ();
}
else {
errno = EINVAL;
rc = -1;
}
return rc;
}
int zmq::ctx_t::get (int option_)
{
int rc = 0;
if (option_ == ZMQ_MAX_SOCKETS)
rc = max_sockets;
else
if (option_ == ZMQ_IO_THREADS)
rc = io_thread_count;
else {
errno = EINVAL;
rc = -1;
}
return rc;
}
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{ {
if (unlikely (starting)) {
starting = false;
// Initialise the array of mailboxes. Additional three slots are for
// zmq_term thread and reaper thread.
opt_sync.lock ();
int mazmq = max_sockets;
int ios = io_thread_count;
opt_sync.unlock ();
slot_count = mazmq + ios + 2;
slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
alloc_assert (slots);
// Initialise the infrastructure for zmq_term thread.
slots [term_tid] = &term_mailbox;
// Create the reaper thread.
reaper = new (std::nothrow) reaper_t (this, reaper_tid);
alloc_assert (reaper);
slots [reaper_tid] = reaper->get_mailbox ();
reaper->start ();
// Create I/O thread objects and launch them.
for (int i = 2; i != ios + 2; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
alloc_assert (io_thread);
io_threads.push_back (io_thread);
slots [i] = io_thread->get_mailbox ();
io_thread->start ();
}
// In the unused part of the slot array, create a list of empty slots.
for (int32_t i = (int32_t) slot_count - 1;
i >= (int32_t) ios + 2; i--) {
empty_slots.push_back (i);
slots [i] = NULL;
}
}
slot_sync.lock (); slot_sync.lock ();
// Once zmq_term() was called, we can't create new sockets. // Once zmq_term() was called, we can't create new sockets.
...@@ -164,8 +223,11 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) ...@@ -164,8 +223,11 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
uint32_t slot = empty_slots.back (); uint32_t slot = empty_slots.back ();
empty_slots.pop_back (); empty_slots.pop_back ();
// Generate new unique socket ID.
int sid = ((int) max_socket_id.add (1)) + 1;
// Create the socket and register its mailbox. // Create the socket and register its mailbox.
socket_base_t *s = socket_base_t::create (type_, this, slot); socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
if (!s) { if (!s) {
empty_slots.push_back (slot); empty_slots.push_back (slot);
slot_sync.unlock (); slot_sync.unlock ();
...@@ -286,3 +348,8 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) ...@@ -286,3 +348,8 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
endpoints_sync.unlock (); endpoints_sync.unlock ();
return *endpoint; return *endpoint;
} }
// The last used socket ID, or 0 if no socket was used so far. Note that this
// is a global variable. Thus, even sockets created in different contexts have
// unique IDs.
zmq::atomic_counter_t zmq::ctx_t::max_socket_id;
/* /*
Copyright (c) 2007-2012 iMatix Corporation
Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ. This file is part of 0MQ.
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
#include "mutex.hpp" #include "mutex.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "options.hpp" #include "options.hpp"
#include "atomic_counter.hpp"
namespace zmq namespace zmq
{ {
...@@ -58,9 +59,8 @@ namespace zmq ...@@ -58,9 +59,8 @@ namespace zmq
{ {
public: public:
// Create the context object. The argument specifies the size // Create the context object
// of I/O thread pool to create. ctx_t ();
ctx_t (uint32_t io_threads_);
// Returns false if object is not a context. // Returns false if object is not a context.
bool check_tag (); bool check_tag ();
...@@ -71,6 +71,10 @@ namespace zmq ...@@ -71,6 +71,10 @@ namespace zmq
// after the last one is closed. // after the last one is closed.
int terminate (); int terminate ();
// Set and set context properties
int set (int option_, int optval_);
int get (int option_);
// Create and destroy a socket. // Create and destroy a socket.
zmq::socket_base_t *create_socket (int type_); zmq::socket_base_t *create_socket (int type_);
void destroy_socket (zmq::socket_base_t *socket_); void destroy_socket (zmq::socket_base_t *socket_);
...@@ -113,6 +117,10 @@ namespace zmq ...@@ -113,6 +117,10 @@ namespace zmq
typedef std::vector <uint32_t> emtpy_slots_t; typedef std::vector <uint32_t> emtpy_slots_t;
emtpy_slots_t empty_slots; emtpy_slots_t empty_slots;
// If true, zmq_init has been called but no socket have been created
// yes. Launching of I/O threads is delayed.
bool starting;
// If true, zmq_term was already called. // If true, zmq_term was already called.
bool terminating; bool terminating;
...@@ -143,6 +151,18 @@ namespace zmq ...@@ -143,6 +151,18 @@ namespace zmq
// Synchronisation of access to the list of inproc endpoints. // Synchronisation of access to the list of inproc endpoints.
mutex_t endpoints_sync; mutex_t endpoints_sync;
// Maximum socket ID.
static atomic_counter_t max_socket_id;
// Maximum number of sockets that can be opened at the same time.
int max_sockets;
// Number of I/O threads to launch.
int io_thread_count;
// Synchronisation of access to context options.
mutex_t opt_sync;
ctx_t (const ctx_t&); ctx_t (const ctx_t&);
const ctx_t &operator = (const ctx_t&); const ctx_t &operator = (const ctx_t&);
}; };
...@@ -150,4 +170,3 @@ namespace zmq ...@@ -150,4 +170,3 @@ namespace zmq
} }
#endif #endif
/* /*
Copyright (c) 2007-2012 iMatix Corporation
Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2011 VMware, Inc. Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
...@@ -48,7 +48,8 @@ zmq::options_t::options_t () : ...@@ -48,7 +48,8 @@ zmq::options_t::options_t () :
delay_on_disconnect (true), delay_on_disconnect (true),
filter (false), filter (false),
send_identity (false), send_identity (false),
recv_identity (false) recv_identity (false),
socket_id (0)
{ {
} }
......
...@@ -110,6 +110,9 @@ namespace zmq ...@@ -110,6 +110,9 @@ namespace zmq
// Receivers identity from all new connections. // Receivers identity from all new connections.
bool recv_identity; bool recv_identity;
// ID of the socket.
int socket_id;
}; };
} }
......
...@@ -24,8 +24,8 @@ ...@@ -24,8 +24,8 @@
#include "pipe.hpp" #include "pipe.hpp"
#include "msg.hpp" #include "msg.hpp"
zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) : zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_, sid_),
pipe (NULL) pipe (NULL)
{ {
options.type = ZMQ_PAIR; options.type = ZMQ_PAIR;
......
...@@ -38,7 +38,7 @@ namespace zmq ...@@ -38,7 +38,7 @@ namespace zmq
{ {
public: public:
pair_t (zmq::ctx_t *parent_, uint32_t tid_); pair_t (zmq::ctx_t *parent_, uint32_t tid_, int sid);
~pair_t (); ~pair_t ();
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
......
/* /*
Copyright (c) 2007-2012 iMatix Corporation
Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ. This file is part of 0MQ.
...@@ -22,8 +22,8 @@ ...@@ -22,8 +22,8 @@
#include "pub.hpp" #include "pub.hpp"
#include "msg.hpp" #include "msg.hpp"
zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_) : zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
xpub_t (parent_, tid_) xpub_t (parent_, tid_, sid_)
{ {
options.type = ZMQ_PUB; options.type = ZMQ_PUB;
} }
......
/* /*
Copyright (c) 2007-2012 iMatix Corporation
Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ. This file is part of 0MQ.
...@@ -36,7 +36,7 @@ namespace zmq ...@@ -36,7 +36,7 @@ namespace zmq
{ {
public: public:
pub_t (zmq::ctx_t *parent_, uint32_t tid_); pub_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~pub_t (); ~pub_t ();
// Implementations of virtual functions from socket_base_t. // Implementations of virtual functions from socket_base_t.
......
...@@ -24,8 +24,8 @@ ...@@ -24,8 +24,8 @@
#include "msg.hpp" #include "msg.hpp"
#include "pipe.hpp" #include "pipe.hpp"
zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_) : zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_) socket_base_t (parent_, tid_, sid_)
{ {
options.type = ZMQ_PULL; options.type = ZMQ_PULL;
} }
......
...@@ -39,7 +39,7 @@ namespace zmq ...@@ -39,7 +39,7 @@ namespace zmq
{ {
public: public:
pull_t (zmq::ctx_t *parent_, uint32_t tid_); pull_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~pull_t (); ~pull_t ();
protected: protected:
......
...@@ -24,8 +24,8 @@ ...@@ -24,8 +24,8 @@
#include "err.hpp" #include "err.hpp"
#include "msg.hpp" #include "msg.hpp"
zmq::push_t::push_t (class ctx_t *parent_, uint32_t tid_) : zmq::push_t::push_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_) socket_base_t (parent_, tid_, sid_)
{ {
options.type = ZMQ_PUSH; options.type = ZMQ_PUSH;
} }
......
...@@ -39,7 +39,7 @@ namespace zmq ...@@ -39,7 +39,7 @@ namespace zmq
{ {
public: public:
push_t (zmq::ctx_t *parent_, uint32_t tid_); push_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~push_t (); ~push_t ();
protected: protected:
......
/* /*
Copyright (c) 2007-2012 iMatix Corporation
Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ. This file is part of 0MQ.
...@@ -23,8 +23,8 @@ ...@@ -23,8 +23,8 @@
#include "err.hpp" #include "err.hpp"
#include "msg.hpp" #include "msg.hpp"
zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t tid_) : zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
xrep_t (parent_, tid_), xrep_t (parent_, tid_, sid_),
sending_reply (false), sending_reply (false),
request_begins (true) request_begins (true)
{ {
......
...@@ -36,7 +36,7 @@ namespace zmq ...@@ -36,7 +36,7 @@ namespace zmq
{ {
public: public:
rep_t (zmq::ctx_t *parent_, uint32_t tid_); rep_t (zmq::ctx_t *parent_, uint32_t tid_, int sid);
~rep_t (); ~rep_t ();
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
......
...@@ -27,8 +27,8 @@ ...@@ -27,8 +27,8 @@
#include "random.hpp" #include "random.hpp"
#include "likely.hpp" #include "likely.hpp"
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) : zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
xreq_t (parent_, tid_), xreq_t (parent_, tid_, sid_),
receiving_reply (false), receiving_reply (false),
message_begins (true) message_begins (true)
{ {
......
/* /*
Copyright (c) 2007-2012 iMatix Corporation
Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2011 VMware, Inc. Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
...@@ -38,7 +38,7 @@ namespace zmq ...@@ -38,7 +38,7 @@ namespace zmq
{ {
public: public:
req_t (zmq::ctx_t *parent_, uint32_t tid_); req_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~req_t (); ~req_t ();
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
......
...@@ -75,43 +75,43 @@ bool zmq::socket_base_t::check_tag () ...@@ -75,43 +75,43 @@ bool zmq::socket_base_t::check_tag ()
} }
zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
uint32_t tid_) uint32_t tid_, int sid_)
{ {
socket_base_t *s = NULL; socket_base_t *s = NULL;
switch (type_) { switch (type_) {
case ZMQ_PAIR: case ZMQ_PAIR:
s = new (std::nothrow) pair_t (parent_, tid_); s = new (std::nothrow) pair_t (parent_, tid_, sid_);
break; break;
case ZMQ_PUB: case ZMQ_PUB:
s = new (std::nothrow) pub_t (parent_, tid_); s = new (std::nothrow) pub_t (parent_, tid_, sid_);
break; break;
case ZMQ_SUB: case ZMQ_SUB:
s = new (std::nothrow) sub_t (parent_, tid_); s = new (std::nothrow) sub_t (parent_, tid_, sid_);
break; break;
case ZMQ_REQ: case ZMQ_REQ:
s = new (std::nothrow) req_t (parent_, tid_); s = new (std::nothrow) req_t (parent_, tid_, sid_);
break; break;
case ZMQ_REP: case ZMQ_REP:
s = new (std::nothrow) rep_t (parent_, tid_); s = new (std::nothrow) rep_t (parent_, tid_, sid_);
break; break;
case ZMQ_XREQ: case ZMQ_XREQ:
s = new (std::nothrow) xreq_t (parent_, tid_); s = new (std::nothrow) xreq_t (parent_, tid_, sid_);
break; break;
case ZMQ_XREP: case ZMQ_XREP:
s = new (std::nothrow) xrep_t (parent_, tid_); s = new (std::nothrow) xrep_t (parent_, tid_, sid_);
break; break;
case ZMQ_PULL: case ZMQ_PULL:
s = new (std::nothrow) pull_t (parent_, tid_); s = new (std::nothrow) pull_t (parent_, tid_, sid_);
break; break;
case ZMQ_PUSH: case ZMQ_PUSH:
s = new (std::nothrow) push_t (parent_, tid_); s = new (std::nothrow) push_t (parent_, tid_, sid_);
break; break;
case ZMQ_XPUB: case ZMQ_XPUB:
s = new (std::nothrow) xpub_t (parent_, tid_); s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
break; break;
case ZMQ_XSUB: case ZMQ_XSUB:
s = new (std::nothrow) xsub_t (parent_, tid_); s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
break; break;
default: default:
errno = EINVAL; errno = EINVAL;
...@@ -121,7 +121,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, ...@@ -121,7 +121,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
return s; return s;
} }
zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) : zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
own_t (parent_, tid_), own_t (parent_, tid_),
tag (0xbaddecaf), tag (0xbaddecaf),
ctx_terminated (false), ctx_terminated (false),
...@@ -130,6 +130,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) : ...@@ -130,6 +130,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :
ticks (0), ticks (0),
rcvmore (false) rcvmore (false)
{ {
options.socket_id = sid_;
} }
zmq::socket_base_t::~socket_base_t () zmq::socket_base_t::~socket_base_t ()
......
/* /*
Copyright (c) 2007-2012 iMatix Corporation
Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2011 VMware, Inc. Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
...@@ -57,7 +57,7 @@ namespace zmq ...@@ -57,7 +57,7 @@ namespace zmq
// Create a socket of a specified type. // Create a socket of a specified type.
static socket_base_t *create (int type_, zmq::ctx_t *parent_, static socket_base_t *create (int type_, zmq::ctx_t *parent_,
uint32_t tid_); uint32_t tid_, int sid_);
// Returns the mailbox associated with this socket. // Returns the mailbox associated with this socket.
mailbox_t *get_mailbox (); mailbox_t *get_mailbox ();
...@@ -99,7 +99,7 @@ namespace zmq ...@@ -99,7 +99,7 @@ namespace zmq
void unlock(); void unlock();
protected: protected:
socket_base_t (zmq::ctx_t *parent_, uint32_t tid_); socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
virtual ~socket_base_t (); virtual ~socket_base_t ();
// Concrete algorithms for the x- methods are to be defined by // Concrete algorithms for the x- methods are to be defined by
......
/* /*
Copyright (c) 2007-2012 iMatix Corporation
Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ. This file is part of 0MQ.
...@@ -22,8 +22,8 @@ ...@@ -22,8 +22,8 @@
#include "sub.hpp" #include "sub.hpp"
#include "msg.hpp" #include "msg.hpp"
zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_) : zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
xsub_t (parent_, tid_) xsub_t (parent_, tid_, sid_)
{ {
options.type = ZMQ_SUB; options.type = ZMQ_SUB;
......
/* /*
Copyright (c) 2007-2012 iMatix Corporation
Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ. This file is part of 0MQ.
...@@ -36,7 +36,7 @@ namespace zmq ...@@ -36,7 +36,7 @@ namespace zmq
{ {
public: public:
sub_t (zmq::ctx_t *parent_, uint32_t tid_); sub_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~sub_t (); ~sub_t ();
protected: protected:
......
...@@ -26,8 +26,8 @@ ...@@ -26,8 +26,8 @@
#include "err.hpp" #include "err.hpp"
#include "msg.hpp" #include "msg.hpp"
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) : zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_, sid_),
more (false) more (false)
{ {
options.type = ZMQ_XPUB; options.type = ZMQ_XPUB;
......
...@@ -43,7 +43,7 @@ namespace zmq ...@@ -43,7 +43,7 @@ namespace zmq
{ {
public: public:
xpub_t (zmq::ctx_t *parent_, uint32_t tid_); xpub_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~xpub_t (); ~xpub_t ();
// Implementations of virtual functions from socket_base_t. // Implementations of virtual functions from socket_base_t.
......
...@@ -27,8 +27,8 @@ ...@@ -27,8 +27,8 @@
#include "likely.hpp" #include "likely.hpp"
#include "err.hpp" #include "err.hpp"
zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_, sid_),
prefetched (0), prefetched (0),
more_in (false), more_in (false),
current_out (NULL), current_out (NULL),
......
...@@ -44,7 +44,7 @@ namespace zmq ...@@ -44,7 +44,7 @@ namespace zmq
{ {
public: public:
xrep_t (zmq::ctx_t *parent_, uint32_t tid_); xrep_t (zmq::ctx_t *parent_, uint32_t tid_, int sid);
~xrep_t (); ~xrep_t ();
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
......
...@@ -23,8 +23,8 @@ ...@@ -23,8 +23,8 @@
#include "err.hpp" #include "err.hpp"
#include "msg.hpp" #include "msg.hpp"
zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) : zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_, sid_),
prefetched (false) prefetched (false)
{ {
options.type = ZMQ_XREQ; options.type = ZMQ_XREQ;
......
...@@ -40,7 +40,7 @@ namespace zmq ...@@ -40,7 +40,7 @@ namespace zmq
{ {
public: public:
xreq_t (zmq::ctx_t *parent_, uint32_t tid_); xreq_t (zmq::ctx_t *parent_, uint32_t tid_, int sid);
~xreq_t (); ~xreq_t ();
protected: protected:
......
...@@ -24,8 +24,8 @@ ...@@ -24,8 +24,8 @@
#include "xsub.hpp" #include "xsub.hpp"
#include "err.hpp" #include "err.hpp"
zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) : zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_, sid_),
has_message (false), has_message (false),
more (false) more (false)
{ {
......
...@@ -39,7 +39,7 @@ namespace zmq ...@@ -39,7 +39,7 @@ namespace zmq
{ {
public: public:
xsub_t (zmq::ctx_t *parent_, uint32_t tid_); xsub_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~xsub_t (); ~xsub_t ();
protected: protected:
......
...@@ -92,7 +92,6 @@ struct iovec { ...@@ -92,7 +92,6 @@ struct iovec {
typedef char check_msg_t_size typedef char check_msg_t_size
[sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1]; [sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1];
// Version.
void zmq_version (int *major_, int *minor_, int *patch_) void zmq_version (int *major_, int *minor_, int *patch_)
{ {
...@@ -101,7 +100,6 @@ void zmq_version (int *major_, int *minor_, int *patch_) ...@@ -101,7 +100,6 @@ void zmq_version (int *major_, int *minor_, int *patch_)
*patch_ = ZMQ_VERSION_PATCH; *patch_ = ZMQ_VERSION_PATCH;
} }
// Errors.
const char *zmq_strerror (int errnum_) const char *zmq_strerror (int errnum_)
{ {
...@@ -113,15 +111,11 @@ int zmq_errno () ...@@ -113,15 +111,11 @@ int zmq_errno ()
return errno; return errno;
} }
// Contexts.
static zmq::ctx_t *s_init (int io_threads_) // New context API
{
if (io_threads_ < 0) {
errno = EINVAL;
return NULL;
}
void *zmq_ctx_new (void)
{
#if defined ZMQ_HAVE_OPENPGM #if defined ZMQ_HAVE_OPENPGM
// Init PGM transport. Ensure threading and timer are enabled. Find PGM // Init PGM transport. Ensure threading and timer are enabled. Find PGM
...@@ -162,22 +156,18 @@ static zmq::ctx_t *s_init (int io_threads_) ...@@ -162,22 +156,18 @@ static zmq::ctx_t *s_init (int io_threads_)
#endif #endif
// Create 0MQ context. // Create 0MQ context.
zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_); zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
alloc_assert (ctx); alloc_assert (ctx);
return ctx; return ctx;
} }
void *zmq_init (int io_threads_) int zmq_ctx_destroy (void *ctx_)
{
return (void *) s_init (io_threads_);
}
int zmq_term (void *ctx_)
{ {
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) { if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
errno = EFAULT; errno = EFAULT;
return -1; return -1;
} }
int rc = ((zmq::ctx_t*) ctx_)->terminate (); int rc = ((zmq::ctx_t*) ctx_)->terminate ();
int en = errno; int en = errno;
...@@ -197,7 +187,41 @@ int zmq_term (void *ctx_) ...@@ -197,7 +187,41 @@ int zmq_term (void *ctx_)
return rc; return rc;
} }
// Sockets. int zmq_ctx_set (void *ctx_, int option_, int optval_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::ctx_t*) ctx_)->set (option_, optval_);
}
int zmq_ctx_get (void *ctx_, int option_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::ctx_t*) ctx_)->get (option_);
}
// Stable/legacy context API
void *zmq_init (int io_threads_)
{
void *ctx = zmq_ctx_new ();
zmq_ctx_set (ctx, ZMQ_IO_THREADS, io_threads_);
return ctx;
}
int zmq_term (void *ctx_)
{
return zmq_ctx_destroy (ctx_);
}
// Sockets
void *zmq_socket (void *ctx_, int type_) void *zmq_socket (void *ctx_, int type_)
{ {
......
#! /bin/bash
# test_last_endpoint - temporary wrapper script for .libs/test_last_endpoint
# Generated by libtool (GNU libtool) 2.4 Debian-2.4-2ubuntu1
#
# The test_last_endpoint program cannot be directly executed until all the libtool
# libraries that it depends on are installed.
#
# This wrapper script should never be moved out of the build directory.
# If it is, it will not operate correctly.
# Sed substitution that helps us do robust quoting. It backslashifies
# metacharacters that are still active within double-quoted strings.
sed_quote_subst='s/\([`"$\\]\)/\\\1/g'
# Be Bourne compatible
if test -n "${ZSH_VERSION+set}" && (emulate sh) >/dev/null 2>&1; then
emulate sh
NULLCMD=:
# Zsh 3.x and 4.x performs word splitting on ${1+"$@"}, which
# is contrary to our usage. Disable this feature.
alias -g '${1+"$@"}'='"$@"'
setopt NO_GLOB_SUBST
else
case `(set -o) 2>/dev/null` in *posix*) set -o posix;; esac
fi
BIN_SH=xpg4; export BIN_SH # for Tru64
DUALCASE=1; export DUALCASE # for MKS sh
# The HP-UX ksh and POSIX shell print the target directory to stdout
# if CDPATH is set.
(unset CDPATH) >/dev/null 2>&1 && unset CDPATH
relink_command="(cd /home/ph/work/libzmq_pieterh/tests; { test -z \"\${LIBRARY_PATH+set}\" || unset LIBRARY_PATH || { LIBRARY_PATH=; export LIBRARY_PATH; }; }; { test -z \"\${COMPILER_PATH+set}\" || unset COMPILER_PATH || { COMPILER_PATH=; export COMPILER_PATH; }; }; { test -z \"\${GCC_EXEC_PREFIX+set}\" || unset GCC_EXEC_PREFIX || { GCC_EXEC_PREFIX=; export GCC_EXEC_PREFIX; }; }; { test -z \"\${LD_RUN_PATH+set}\" || unset LD_RUN_PATH || { LD_RUN_PATH=; export LD_RUN_PATH; }; }; LD_LIBRARY_PATH=/usr/local/lib; export LD_LIBRARY_PATH; PATH=/opt/android-toolchain/bin:/usr/lib/lightdm/lightdm:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games; export PATH; g++ -g -O2 -o \$progdir/\$file test_last_endpoint.o ../src/.libs/libzmq.so -lrt -lpthread -Wl,-rpath -Wl,/home/ph/work/libzmq_pieterh/src/.libs)"
# This environment variable determines our operation mode.
if test "$libtool_install_magic" = "%%%MAGIC variable%%%"; then
# install mode needs the following variables:
generated_by_libtool_version='2.4'
notinst_deplibs=' ../src/libzmq.la'
else
# When we are sourced in execute mode, $file and $ECHO are already set.
if test "$libtool_execute_magic" != "%%%MAGIC variable%%%"; then
file="$0"
# A function that is used when there is no print builtin or printf.
func_fallback_echo ()
{
eval 'cat <<_LTECHO_EOF
$1
_LTECHO_EOF'
}
ECHO="printf %s\\n"
fi
# Very basic option parsing. These options are (a) specific to
# the libtool wrapper, (b) are identical between the wrapper
# /script/ and the wrapper /executable/ which is used only on
# windows platforms, and (c) all begin with the string --lt-
# (application programs are unlikely to have options which match
# this pattern).
#
# There are only two supported options: --lt-debug and
# --lt-dump-script. There is, deliberately, no --lt-help.
#
# The first argument to this parsing function should be the
# script's ../libtool value, followed by no.
lt_option_debug=
func_parse_lt_options ()
{
lt_script_arg0=$0
shift
for lt_opt
do
case "$lt_opt" in
--lt-debug) lt_option_debug=1 ;;
--lt-dump-script)
lt_dump_D=`$ECHO "X$lt_script_arg0" | /bin/sed -e 's/^X//' -e 's%/[^/]*$%%'`
test "X$lt_dump_D" = "X$lt_script_arg0" && lt_dump_D=.
lt_dump_F=`$ECHO "X$lt_script_arg0" | /bin/sed -e 's/^X//' -e 's%^.*/%%'`
cat "$lt_dump_D/$lt_dump_F"
exit 0
;;
--lt-*)
$ECHO "Unrecognized --lt- option: '$lt_opt'" 1>&2
exit 1
;;
esac
done
# Print the debug banner immediately:
if test -n "$lt_option_debug"; then
echo "test_last_endpoint:test_last_endpoint:${LINENO}: libtool wrapper (GNU libtool) 2.4 Debian-2.4-2ubuntu1" 1>&2
fi
}
# Used when --lt-debug. Prints its arguments to stdout
# (redirection is the responsibility of the caller)
func_lt_dump_args ()
{
lt_dump_args_N=1;
for lt_arg
do
$ECHO "test_last_endpoint:test_last_endpoint:${LINENO}: newargv[$lt_dump_args_N]: $lt_arg"
lt_dump_args_N=`expr $lt_dump_args_N + 1`
done
}
# Core function for launching the target application
func_exec_program_core ()
{
if test -n "$lt_option_debug"; then
$ECHO "test_last_endpoint:test_last_endpoint:${LINENO}: newargv[0]: $progdir/$program" 1>&2
func_lt_dump_args ${1+"$@"} 1>&2
fi
exec "$progdir/$program" ${1+"$@"}
$ECHO "$0: cannot exec $program $*" 1>&2
exit 1
}
# A function to encapsulate launching the target application
# Strips options in the --lt-* namespace from $@ and
# launches target application with the remaining arguments.
func_exec_program ()
{
for lt_wr_arg
do
case $lt_wr_arg in
--lt-*) ;;
*) set x "$@" "$lt_wr_arg"; shift;;
esac
shift
done
func_exec_program_core ${1+"$@"}
}
# Parse options
func_parse_lt_options "$0" ${1+"$@"}
# Find the directory that this script lives in.
thisdir=`$ECHO "$file" | /bin/sed 's%/[^/]*$%%'`
test "x$thisdir" = "x$file" && thisdir=.
# Follow symbolic links until we get to the real thisdir.
file=`ls -ld "$file" | /bin/sed -n 's/.*-> //p'`
while test -n "$file"; do
destdir=`$ECHO "$file" | /bin/sed 's%/[^/]*$%%'`
# If there was a directory component, then change thisdir.
if test "x$destdir" != "x$file"; then
case "$destdir" in
[\\/]* | [A-Za-z]:[\\/]*) thisdir="$destdir" ;;
*) thisdir="$thisdir/$destdir" ;;
esac
fi
file=`$ECHO "$file" | /bin/sed 's%^.*/%%'`
file=`ls -ld "$thisdir/$file" | /bin/sed -n 's/.*-> //p'`
done
# Usually 'no', except on cygwin/mingw when embedded into
# the cwrapper.
WRAPPER_SCRIPT_BELONGS_IN_OBJDIR=no
if test "$WRAPPER_SCRIPT_BELONGS_IN_OBJDIR" = "yes"; then
# special case for '.'
if test "$thisdir" = "."; then
thisdir=`pwd`
fi
# remove .libs from thisdir
case "$thisdir" in
*[\\/].libs ) thisdir=`$ECHO "$thisdir" | /bin/sed 's%[\\/][^\\/]*$%%'` ;;
.libs ) thisdir=. ;;
esac
fi
# Try to get the absolute directory name.
absdir=`cd "$thisdir" && pwd`
test -n "$absdir" && thisdir="$absdir"
program=lt-'test_last_endpoint'
progdir="$thisdir/.libs"
if test ! -f "$progdir/$program" ||
{ file=`ls -1dt "$progdir/$program" "$progdir/../$program" 2>/dev/null | /bin/sed 1q`; \
test "X$file" != "X$progdir/$program"; }; then
file="$$-$program"
if test ! -d "$progdir"; then
mkdir "$progdir"
else
rm -f "$progdir/$file"
fi
# relink executable if necessary
if test -n "$relink_command"; then
if relink_command_output=`eval $relink_command 2>&1`; then :
else
printf %s\n "$relink_command_output" >&2
rm -f "$progdir/$file"
exit 1
fi
fi
mv -f "$progdir/$file" "$progdir/$program" 2>/dev/null ||
{ rm -f "$progdir/$program";
mv -f "$progdir/$file" "$progdir/$program"; }
rm -f "$progdir/$file"
fi
if test -f "$progdir/$program"; then
if test "$libtool_execute_magic" != "%%%MAGIC variable%%%"; then
# Run the actual program with our arguments.
func_exec_program ${1+"$@"}
fi
else
# The program doesn't exist.
$ECHO "$0: error: \`$progdir/$program' does not exist" 1>&2
$ECHO "This script is just a wrapper for $program." 1>&2
$ECHO "See the libtool documentation for more information." 1>&2
exit 1
fi
fi
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment