Commit a9baa051 authored by Ian Barber's avatar Ian Barber

Merge in master changes

parents 531d3ebc d485404a
...@@ -308,19 +308,19 @@ set(cxx-sources ...@@ -308,19 +308,19 @@ set(cxx-sources
kqueue.cpp kqueue.cpp
lb.cpp lb.cpp
mailbox.cpp mailbox.cpp
mechanism.cpp mechanism.cpp
msg.cpp msg.cpp
mtrie.cpp mtrie.cpp
object.cpp object.cpp
options.cpp options.cpp
own.cpp own.cpp
null_mechanism.cpp null_mechanism.cpp
pair.cpp pair.cpp
pgm_receiver.cpp pgm_receiver.cpp
pgm_sender.cpp pgm_sender.cpp
pgm_socket.cpp pgm_socket.cpp
pipe.cpp pipe.cpp
plain_mechanism.cpp plain_mechanism.cpp
poll.cpp poll.cpp
poller_base.cpp poller_base.cpp
precompiled.cpp precompiled.cpp
...@@ -576,6 +576,56 @@ if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug") # Why? ...@@ -576,6 +576,56 @@ if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug") # Why?
endforeach() endforeach()
endif() endif()
enable_testing()
set(tests
test_connect_delay
test_connect_resolve
test_ctx_options
test_disconnect_inproc
test_hwm
test_invalid_rep
test_iov
test_last_endpoint
test_monitor
test_msg_flags
test_pair_inproc
test_pair_ipc
test_pair_tcp
test_probe_router
test_raw_sock
test_req_request_ids
test_req_strict
test_reqrep_device
test_reqrep_inproc
test_reqrep_ipc
test_reqrep_tcp
test_router_mandatory
test_security
test_security_curve
test_shutdown_stress
test_spec_dealer
test_spec_pushpull
test_spec_rep
test_spec_req
test_spec_router
test_stream
test_sub_forward
test_term_endpoint
test_timeo)
foreach(test ${tests})
add_executable(${test} tests/${test}.cpp)
target_link_libraries(${test} libzmq)
if(RT_LIBRARY)
target_link_libraries(${test} ${RT_LIBRARY})
endif()
if(WIN32)
add_test(NAME ${test} WORKING_DIRECTORY ${LIBRARY_OUTPUT_PATH}/Debug COMMAND ${test})
else()
add_test(NAME ${test} COMMAND ${test})
endif()
endforeach()
#----------------------------------------------------------------------------- #-----------------------------------------------------------------------------
# installer # installer
......
...@@ -682,6 +682,22 @@ Default value:: NULL ...@@ -682,6 +682,22 @@ Default value:: NULL
Applicable socket types:: all, when using TCP transport Applicable socket types:: all, when using TCP transport
ZMQ_CONFLATE: Keep only last message
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If set, a socket shall keep only one message in its inbound/outbound
queue, this message being the last message received/the last message
to be sent.
Ignores 'ZMQ_RECVHWM' and 'ZMQ_SENDHWM' options.
Does not supports multi-part messages, in particular, only one part of it
is kept in the socket internal queue.
[horizontal]
Option value type:: int
Option value unit:: boolean
Default value:: 0 (false)
Applicable socket types:: ZMQ_PULL, ZMQ_PUSH, ZMQ_SUB, ZMQ_PUB, ZMQ_DEALER
RETURN VALUE RETURN VALUE
------------ ------------
The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it
......
...@@ -278,6 +278,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); ...@@ -278,6 +278,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_PROBE_ROUTER 51 #define ZMQ_PROBE_ROUTER 51
#define ZMQ_REQ_REQUEST_IDS 52 #define ZMQ_REQ_REQUEST_IDS 52
#define ZMQ_REQ_STRICT 53 #define ZMQ_REQ_STRICT 53
#define ZMQ_CONFLATE 54
/* Message options */ /* Message options */
#define ZMQ_MORE 1 #define ZMQ_MORE 1
......
...@@ -43,6 +43,8 @@ extern "C" { ...@@ -43,6 +43,8 @@ extern "C" {
# endif # endif
#endif #endif
typedef void (zmq_thread_fn) (void*);
/* Helper functions are used by perf tests so that they don't have to care */ /* Helper functions are used by perf tests so that they don't have to care */
/* about minutiae of time-related functions on different OS platforms. */ /* about minutiae of time-related functions on different OS platforms. */
...@@ -56,6 +58,12 @@ ZMQ_EXPORT unsigned long zmq_stopwatch_stop (void *watch_); ...@@ -56,6 +58,12 @@ ZMQ_EXPORT unsigned long zmq_stopwatch_stop (void *watch_);
/* Sleeps for specified number of seconds. */ /* Sleeps for specified number of seconds. */
ZMQ_EXPORT void zmq_sleep (int seconds_); ZMQ_EXPORT void zmq_sleep (int seconds_);
/* Start a thread. Returns a handle to the thread. */
ZMQ_EXPORT void *zmq_threadstart(zmq_thread_fn* func, void* arg);
/* Wait for thread to complete then free up resources. */
ZMQ_EXPORT void zmq_threadclose(void* thread);
#undef ZMQ_EXPORT #undef ZMQ_EXPORT
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -88,6 +88,7 @@ libzmq_la_SOURCES = \ ...@@ -88,6 +88,7 @@ libzmq_la_SOURCES = \
dealer.hpp \ dealer.hpp \
xsub.hpp \ xsub.hpp \
ypipe.hpp \ ypipe.hpp \
ypipe_flat.hpp \
yqueue.hpp \ yqueue.hpp \
z85_codec.hpp \ z85_codec.hpp \
address.cpp \ address.cpp \
...@@ -163,7 +164,9 @@ libzmq_la_SOURCES = \ ...@@ -163,7 +164,9 @@ libzmq_la_SOURCES = \
raw_decoder.hpp \ raw_decoder.hpp \
raw_decoder.cpp \ raw_decoder.cpp \
raw_encoder.hpp \ raw_encoder.hpp \
raw_encoder.cpp raw_encoder.cpp \
ypipe_conflate.hpp \
dbuffer.hpp
if ON_MINGW if ON_MINGW
libzmq_la_LDFLAGS = -no-undefined -avoid-version -version-info @LTVER@ @LIBZMQ_EXTRA_LDFLAGS@ libzmq_la_LDFLAGS = -no-undefined -avoid-version -version-info @LTVER@ @LIBZMQ_EXTRA_LDFLAGS@
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#define __ZMQ_BLOB_HPP_INCLUDED__ #define __ZMQ_BLOB_HPP_INCLUDED__
#include <string> #include <string>
#include <string.h>
// Borrowed from id3lib_strings.h: // Borrowed from id3lib_strings.h:
// They seem to be doing something for MSC, but since I only have gcc, I'll just do that // They seem to be doing something for MSC, but since I only have gcc, I'll just do that
......
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_DBUFFER_HPP_INCLUDED__
#define __ZMQ_DBUFFER_HPP_INCLUDED__
#include <stdlib.h>
#include <stddef.h>
#include <algorithm>
#include "mutex.hpp"
#include "msg.hpp"
namespace zmq
{
// dbuffer is a single-producer single-consumer double-buffer
// implementation.
//
// The producer writes to a back buffer and then tries to swap
// pointers between the back and front buffers. If it fails,
// due to the consumer reading from the front buffer, it just
// gives up, which is ok since writes are many and redundant.
//
// The reader simply reads from the front buffer.
//
// has_msg keeps track of whether there has been a not yet read
// value written, it is used by ypipe_conflate to mimic ypipe
// functionality regarding a reader being asleep
template <typename T> class dbuffer_t;
template <> class dbuffer_t<msg_t>
{
public:
inline dbuffer_t ()
: back (&storage[0])
, front (&storage[1])
, has_msg (false)
{
back->init ();
front->init ();
}
inline ~dbuffer_t()
{
back->close ();
front->close ();
}
inline void write (const msg_t &value_)
{
msg_t& xvalue = const_cast<msg_t&>(value_);
zmq_assert (xvalue.check ());
back->move (xvalue); // cannot just overwrite, might leak
zmq_assert (back->check ());
if (sync.try_lock ())
{
std::swap (back, front);
has_msg = true;
sync.unlock ();
}
}
inline bool read (msg_t *value_)
{
if (!value_)
return false;
{
scoped_lock_t lock (sync);
if (!has_msg)
return false;
zmq_assert (front->check ());
*value_ = *front;
front->init (); // avoid double free
has_msg = false;
return true;
}
}
inline bool check_read ()
{
scoped_lock_t lock (sync);
return has_msg;
}
inline bool probe (bool (*fn)(msg_t &))
{
scoped_lock_t lock (sync);
return (*fn) (*front);
}
private:
msg_t storage[2];
msg_t *back, *front;
mutex_t sync;
bool has_msg;
// Disable copying of dbuffer.
dbuffer_t (const dbuffer_t&);
const dbuffer_t &operator = (const dbuffer_t&);
};
}
#endif
...@@ -58,7 +58,7 @@ int zmq::msg_t::init_size (size_t size_) ...@@ -58,7 +58,7 @@ int zmq::msg_t::init_size (size_t size_)
u.lmsg.flags = 0; u.lmsg.flags = 0;
u.lmsg.content = u.lmsg.content =
(content_t*) malloc (sizeof (content_t) + size_); (content_t*) malloc (sizeof (content_t) + size_);
if (!u.lmsg.content) { if (unlikely (!u.lmsg.content)) {
errno = ENOMEM; errno = ENOMEM;
return -1; return -1;
} }
...@@ -75,19 +75,32 @@ int zmq::msg_t::init_size (size_t size_) ...@@ -75,19 +75,32 @@ int zmq::msg_t::init_size (size_t size_)
int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_, int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_,
void *hint_) void *hint_)
{ {
u.lmsg.type = type_lmsg; // If data is NULL and size is not 0, a segfault
u.lmsg.flags = 0; // would occur once the data is accessed
u.lmsg.content = (content_t*) malloc (sizeof (content_t)); assert (data_ != NULL || size_ == 0);
if (!u.lmsg.content) {
errno = ENOMEM; // Initialize constant message if there's no need to deallocate
return -1; if(ffn_ == NULL) {
u.cmsg.type = type_cmsg;
u.cmsg.flags = 0;
u.cmsg.data = data_;
u.cmsg.size = size_;
} }
else {
u.lmsg.type = type_lmsg;
u.lmsg.flags = 0;
u.lmsg.content = (content_t*) malloc (sizeof (content_t));
if (!u.lmsg.content) {
errno = ENOMEM;
return -1;
}
u.lmsg.content->data = data_; u.lmsg.content->data = data_;
u.lmsg.content->size = size_; u.lmsg.content->size = size_;
u.lmsg.content->ffn = ffn_; u.lmsg.content->ffn = ffn_;
u.lmsg.content->hint = hint_; u.lmsg.content->hint = hint_;
new (&u.lmsg.content->refcnt) zmq::atomic_counter_t (); new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
}
return 0; return 0;
} }
...@@ -193,6 +206,8 @@ void *zmq::msg_t::data () ...@@ -193,6 +206,8 @@ void *zmq::msg_t::data ()
return u.vsm.data; return u.vsm.data;
case type_lmsg: case type_lmsg:
return u.lmsg.content->data; return u.lmsg.content->data;
case type_cmsg:
return u.cmsg.data;
default: default:
zmq_assert (false); zmq_assert (false);
return NULL; return NULL;
...@@ -209,6 +224,8 @@ size_t zmq::msg_t::size () ...@@ -209,6 +224,8 @@ size_t zmq::msg_t::size ()
return u.vsm.size; return u.vsm.size;
case type_lmsg: case type_lmsg:
return u.lmsg.content->size; return u.lmsg.content->size;
case type_cmsg:
return u.cmsg.size;
default: default:
zmq_assert (false); zmq_assert (false);
return 0; return 0;
...@@ -245,6 +262,11 @@ bool zmq::msg_t::is_vsm () ...@@ -245,6 +262,11 @@ bool zmq::msg_t::is_vsm ()
return u.base.type == type_vsm; return u.base.type == type_vsm;
} }
bool zmq::msg_t::is_cmsg ()
{
return u.base.type == type_cmsg;
}
void zmq::msg_t::add_refs (int refs_) void zmq::msg_t::add_refs (int refs_)
{ {
zmq_assert (refs_ >= 0); zmq_assert (refs_ >= 0);
...@@ -253,8 +275,8 @@ void zmq::msg_t::add_refs (int refs_) ...@@ -253,8 +275,8 @@ void zmq::msg_t::add_refs (int refs_)
if (!refs_) if (!refs_)
return; return;
// VSMs and delimiters can be copied straight away. The only message type // VSMs, CMSGS and delimiters can be copied straight away. The only
// that needs special care are long messages. // message type that needs special care are long messages.
if (u.base.type == type_lmsg) { if (u.base.type == type_lmsg) {
if (u.lmsg.flags & msg_t::shared) if (u.lmsg.flags & msg_t::shared)
u.lmsg.content->refcnt.add (refs_); u.lmsg.content->refcnt.add (refs_);
......
...@@ -44,7 +44,7 @@ namespace zmq ...@@ -44,7 +44,7 @@ namespace zmq
{ {
public: public:
// Mesage flags. // Message flags.
enum enum
{ {
more = 1, more = 1,
...@@ -69,6 +69,7 @@ namespace zmq ...@@ -69,6 +69,7 @@ namespace zmq
bool is_identity () const; bool is_identity () const;
bool is_delimiter (); bool is_delimiter ();
bool is_vsm (); bool is_vsm ();
bool is_cmsg ();
// After calling this function you can copy the message in POD-style // After calling this function you can copy the message in POD-style
// refs_ times. No need to call copy. // refs_ times. No need to call copy.
...@@ -104,10 +105,15 @@ namespace zmq ...@@ -104,10 +105,15 @@ namespace zmq
enum type_t enum type_t
{ {
type_min = 101, type_min = 101,
// VSM messages store the content in the message itself
type_vsm = 101, type_vsm = 101,
// LMSG messages store the content in malloc-ed memory
type_lmsg = 102, type_lmsg = 102,
// Delimiter messages are used in envelopes
type_delimiter = 103, type_delimiter = 103,
type_max = 103 // CMSG messages point to constant data
type_cmsg = 104,
type_max = 104
}; };
// Note that fields shared between different message types are not // Note that fields shared between different message types are not
...@@ -132,6 +138,14 @@ namespace zmq ...@@ -132,6 +138,14 @@ namespace zmq
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
} lmsg; } lmsg;
struct {
void* data;
size_t size;
unsigned char unused
[max_vsm_size + 1 - sizeof (void*) - sizeof (size_t)];
unsigned char type;
unsigned char flags;
} cmsg;
struct { struct {
unsigned char unused [max_vsm_size + 1]; unsigned char unused [max_vsm_size + 1];
unsigned char type; unsigned char type;
......
...@@ -50,6 +50,11 @@ namespace zmq ...@@ -50,6 +50,11 @@ namespace zmq
EnterCriticalSection (&cs); EnterCriticalSection (&cs);
} }
inline bool try_lock ()
{
return (bool) TryEnterCriticalSection (&cs);
}
inline void unlock () inline void unlock ()
{ {
LeaveCriticalSection (&cs); LeaveCriticalSection (&cs);
...@@ -94,6 +99,16 @@ namespace zmq ...@@ -94,6 +99,16 @@ namespace zmq
posix_assert (rc); posix_assert (rc);
} }
inline bool try_lock ()
{
int rc = pthread_mutex_trylock (&mutex);
if (rc == EBUSY)
return false;
posix_assert (rc);
return true;
}
inline void unlock () inline void unlock ()
{ {
int rc = pthread_mutex_unlock (&mutex); int rc = pthread_mutex_unlock (&mutex);
...@@ -113,4 +128,30 @@ namespace zmq ...@@ -113,4 +128,30 @@ namespace zmq
#endif #endif
namespace zmq
{
struct scoped_lock_t
{
scoped_lock_t (mutex_t& mutex_)
: mutex (mutex_)
{
mutex.lock ();
}
~scoped_lock_t ()
{
mutex.unlock ();
}
private:
mutex_t& mutex;
// Disable copy construction and assignment.
scoped_lock_t (const scoped_lock_t&);
const scoped_lock_t &operator = (const scoped_lock_t&);
};
}
#endif #endif
...@@ -52,7 +52,8 @@ zmq::options_t::options_t () : ...@@ -52,7 +52,8 @@ zmq::options_t::options_t () :
tcp_keepalive_intvl (-1), tcp_keepalive_intvl (-1),
mechanism (ZMQ_NULL), mechanism (ZMQ_NULL),
as_server (0), as_server (0),
socket_id (0) socket_id (0),
conflate (false)
{ {
} }
...@@ -338,6 +339,16 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, ...@@ -338,6 +339,16 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
} }
break; break;
# endif # endif
case ZMQ_CONFLATE:
if (is_int && (value == 0 || value == 1)) {
conflate = (value != 0);
return 0;
}
break;
default:
break;
} }
errno = EINVAL; errno = EINVAL;
return -1; return -1;
...@@ -594,6 +605,14 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) ...@@ -594,6 +605,14 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
} }
break; break;
# endif # endif
case ZMQ_CONFLATE:
if (is_int) {
*value = conflate;
return 0;
}
break;
} }
errno = EINVAL; errno = EINVAL;
return -1; return -1;
......
...@@ -134,6 +134,12 @@ namespace zmq ...@@ -134,6 +134,12 @@ namespace zmq
// ID of the socket. // ID of the socket.
int socket_id; int socket_id;
// If true, socket conflates outgoing/incoming messages.
// Applicable to dealer, push/pull, pub/sub socket types.
// Cannot receive multi-part messages.
// Ignores hwm
bool conflate;
}; };
} }
......
...@@ -23,22 +23,37 @@ ...@@ -23,22 +23,37 @@
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp" #include "err.hpp"
#include "ypipe.hpp"
#include "ypipe_conflate.hpp"
int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
int hwms_ [2]) int hwms_ [2], bool conflate_ [2])
{ {
// Creates two pipe objects. These objects are connected by two ypipes, // Creates two pipe objects. These objects are connected by two ypipes,
// each to pass messages in one direction. // each to pass messages in one direction.
pipe_t::upipe_t *upipe1 = new (std::nothrow) pipe_t::upipe_t (); typedef ypipe_t <msg_t, message_pipe_granularity> upipe_normal_t;
typedef ypipe_conflate_t <msg_t, message_pipe_granularity> upipe_conflate_t;
pipe_t::upipe_t *upipe1;
if(conflate_ [0])
upipe1 = new (std::nothrow) upipe_conflate_t ();
else
upipe1 = new (std::nothrow) upipe_normal_t ();
alloc_assert (upipe1); alloc_assert (upipe1);
pipe_t::upipe_t *upipe2 = new (std::nothrow) pipe_t::upipe_t ();
pipe_t::upipe_t *upipe2;
if(conflate_ [1])
upipe2 = new (std::nothrow) upipe_conflate_t ();
else
upipe2 = new (std::nothrow) upipe_normal_t ();
alloc_assert (upipe2); alloc_assert (upipe2);
pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2, pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
hwms_ [1], hwms_ [0]); hwms_ [1], hwms_, conflate_ [0]);
alloc_assert (pipes_ [0]); alloc_assert (pipes_ [0]);
pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1, pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
hwms_ [0], hwms_ [1]); hwms_ [0], hwms_ [1], conflate_ [1]);
alloc_assert (pipes_ [1]); alloc_assert (pipes_ [1]);
pipes_ [0]->set_peer (pipes_ [1]); pipes_ [0]->set_peer (pipes_ [1]);
...@@ -48,7 +63,7 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], ...@@ -48,7 +63,7 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
} }
zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
int inhwm_, int outhwm_) : int inhwm_, int outhwm_, bool conflate_) :
object_t (parent_), object_t (parent_),
inpipe (inpipe_), inpipe (inpipe_),
outpipe (outpipe_), outpipe (outpipe_),
...@@ -62,7 +77,8 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, ...@@ -62,7 +77,8 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
peer (NULL), peer (NULL),
sink (NULL), sink (NULL),
state (active), state (active),
delay (true) delay (true),
conflate (conflate_)
{ {
} }
...@@ -303,11 +319,15 @@ void zmq::pipe_t::process_pipe_term_ack () ...@@ -303,11 +319,15 @@ void zmq::pipe_t::process_pipe_term_ack ()
// First, delete all the unread messages in the pipe. We have to do it by // First, delete all the unread messages in the pipe. We have to do it by
// hand because msg_t doesn't have automatic destructor. Then deallocate // hand because msg_t doesn't have automatic destructor. Then deallocate
// the ypipe itself. // the ypipe itself.
msg_t msg;
while (inpipe->read (&msg)) { if (!conflate) {
int rc = msg.close (); msg_t msg;
errno_assert (rc == 0); while (inpipe->read (&msg)) {
int rc = msg.close ();
errno_assert (rc == 0);
}
} }
delete inpipe; delete inpipe;
// Deallocate the pipe object // Deallocate the pipe object
...@@ -444,7 +464,13 @@ void zmq::pipe_t::hiccup () ...@@ -444,7 +464,13 @@ void zmq::pipe_t::hiccup ()
inpipe = NULL; inpipe = NULL;
// Create new inpipe. // Create new inpipe.
inpipe = new (std::nothrow) pipe_t::upipe_t (); if (conflate)
inpipe = new (std::nothrow)
ypipe_t <msg_t, message_pipe_granularity> ();
else
inpipe = new (std::nothrow)
ypipe_conflate_t <msg_t, message_pipe_granularity> ();
alloc_assert (inpipe); alloc_assert (inpipe);
in_active = true; in_active = true;
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#define __ZMQ_PIPE_HPP_INCLUDED__ #define __ZMQ_PIPE_HPP_INCLUDED__
#include "msg.hpp" #include "msg.hpp"
#include "ypipe.hpp" #include "ypipe_base.hpp"
#include "config.hpp" #include "config.hpp"
#include "object.hpp" #include "object.hpp"
#include "stdint.hpp" #include "stdint.hpp"
...@@ -40,8 +40,10 @@ namespace zmq ...@@ -40,8 +40,10 @@ namespace zmq
// Delay specifies how the pipe behaves when the peer terminates. If true // Delay specifies how the pipe behaves when the peer terminates. If true
// pipe receives all the pending messages before terminating, otherwise it // pipe receives all the pending messages before terminating, otherwise it
// terminates straight away. // terminates straight away.
// If conflate is true, only the most recently arrived message could be
// read (older messages are discarded)
int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2], int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2],
int hwms_ [2]); int hwms_ [2], bool conflate_ [2]);
struct i_pipe_events struct i_pipe_events
{ {
...@@ -64,9 +66,9 @@ namespace zmq ...@@ -64,9 +66,9 @@ namespace zmq
public array_item_t <3> public array_item_t <3>
{ {
// This allows pipepair to create pipe objects. // This allows pipepair to create pipe objects.
friend int pipepair (zmq::object_t *parents_ [2], friend int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2],
zmq::pipe_t* pipes_ [2], int hwms_ [2]); int hwms_ [2], bool conflate_ [2]);
public: public:
// Specifies the object to send events to. // Specifies the object to send events to.
...@@ -113,7 +115,7 @@ namespace zmq ...@@ -113,7 +115,7 @@ namespace zmq
private: private:
// Type of the underlying lock-free pipe. // Type of the underlying lock-free pipe.
typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t; typedef ypipe_base_t <msg_t, message_pipe_granularity> upipe_t;
// Command handlers. // Command handlers.
void process_activate_read (); void process_activate_read ();
...@@ -128,7 +130,7 @@ namespace zmq ...@@ -128,7 +130,7 @@ namespace zmq
// Constructor is private. Pipe can only be created using // Constructor is private. Pipe can only be created using
// pipepair function. // pipepair function.
pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
int inhwm_, int outhwm_); int inhwm_, int outhwm_, bool conflate_);
// Pipepair uses this function to let us know about // Pipepair uses this function to let us know about
// the peer pipe object. // the peer pipe object.
...@@ -199,6 +201,8 @@ namespace zmq ...@@ -199,6 +201,8 @@ namespace zmq
// Computes appropriate low watermark from the given high watermark. // Computes appropriate low watermark from the given high watermark.
static int compute_lwm (int hwm_); static int compute_lwm (int hwm_);
bool conflate;
// Disable copying. // Disable copying.
pipe_t (const pipe_t&); pipe_t (const pipe_t&);
const pipe_t &operator = (const pipe_t&); const pipe_t &operator = (const pipe_t&);
......
...@@ -299,7 +299,8 @@ int zmq::session_base_t::zap_connect () ...@@ -299,7 +299,8 @@ int zmq::session_base_t::zap_connect ()
object_t *parents [2] = {this, peer.socket}; object_t *parents [2] = {this, peer.socket};
pipe_t *new_pipes [2] = {NULL, NULL}; pipe_t *new_pipes [2] = {NULL, NULL};
int hwms [2] = {0, 0}; int hwms [2] = {0, 0};
int rc = pipepair (parents, new_pipes, hwms); bool conflates [2] = {false, false};
int rc = pipepair (parents, new_pipes, hwms, conflates);
errno_assert (rc == 0); errno_assert (rc == 0);
// Attach local end of the pipe to this socket object. // Attach local end of the pipe to this socket object.
...@@ -332,8 +333,18 @@ void zmq::session_base_t::process_attach (i_engine *engine_) ...@@ -332,8 +333,18 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
if (!pipe && !is_terminating ()) { if (!pipe && !is_terminating ()) {
object_t *parents [2] = {this, socket}; object_t *parents [2] = {this, socket};
pipe_t *pipes [2] = {NULL, NULL}; pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.rcvhwm, options.sndhwm};
int rc = pipepair (parents, pipes, hwms); bool conflate = options.conflate &&
(options.type == ZMQ_DEALER ||
options.type == ZMQ_PULL ||
options.type == ZMQ_PUSH ||
options.type == ZMQ_PUB ||
options.type == ZMQ_SUB);
int hwms [2] = {conflate? -1 : options.rcvhwm,
conflate? -1 : options.sndhwm};
bool conflates [2] = {conflate, conflate};
int rc = pipepair (parents, pipes, hwms, conflates);
errno_assert (rc == 0); errno_assert (rc == 0);
// Plug the local end of the pipe. // Plug the local end of the pipe.
......
...@@ -450,8 +450,17 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -450,8 +450,17 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create a bi-directional pipe to connect the peers. // Create a bi-directional pipe to connect the peers.
object_t *parents [2] = {this, peer.socket}; object_t *parents [2] = {this, peer.socket};
pipe_t *new_pipes [2] = {NULL, NULL}; pipe_t *new_pipes [2] = {NULL, NULL};
int hwms [2] = {sndhwm, rcvhwm};
int rc = pipepair (parents, new_pipes, hwms); bool conflate = options.conflate &&
(options.type == ZMQ_DEALER ||
options.type == ZMQ_PULL ||
options.type == ZMQ_PUSH ||
options.type == ZMQ_PUB ||
options.type == ZMQ_SUB);
int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
bool conflates [2] = {conflate, conflate};
int rc = pipepair (parents, new_pipes, hwms, conflates);
errno_assert (rc == 0); errno_assert (rc == 0);
// Attach local end of the pipe to this socket object. // Attach local end of the pipe to this socket object.
...@@ -553,8 +562,18 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -553,8 +562,18 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create a bi-directional pipe. // Create a bi-directional pipe.
object_t *parents [2] = {this, session}; object_t *parents [2] = {this, session};
pipe_t *new_pipes [2] = {NULL, NULL}; pipe_t *new_pipes [2] = {NULL, NULL};
int hwms [2] = {options.sndhwm, options.rcvhwm};
rc = pipepair (parents, new_pipes, hwms); bool conflate = options.conflate &&
(options.type == ZMQ_DEALER ||
options.type == ZMQ_PULL ||
options.type == ZMQ_PUSH ||
options.type == ZMQ_PUB ||
options.type == ZMQ_SUB);
int hwms [2] = {conflate? -1 : options.sndhwm,
conflate? -1 : options.rcvhwm};
bool conflates [2] = {conflate, conflate};
rc = pipepair (parents, new_pipes, hwms, conflates);
errno_assert (rc == 0); errno_assert (rc == 0);
// Attach local end of the pipe to the socket object. // Attach local end of the pipe to the socket object.
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "atomic_ptr.hpp" #include "atomic_ptr.hpp"
#include "yqueue.hpp" #include "yqueue.hpp"
#include "platform.hpp" #include "platform.hpp"
#include "ypipe_base.hpp"
namespace zmq namespace zmq
{ {
...@@ -34,7 +35,7 @@ namespace zmq ...@@ -34,7 +35,7 @@ namespace zmq
// N is granularity of the pipe, i.e. how many items are needed to // N is granularity of the pipe, i.e. how many items are needed to
// perform next memory allocation. // perform next memory allocation.
template <typename T, int N> class ypipe_t template <typename T, int N> class ypipe_t : public ypipe_base_t<T,N>
{ {
public: public:
......
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_YPIPE_BASE_HPP_INCLUDED__
#define __ZMQ_YPIPE_BASE_HPP_INCLUDED__
namespace zmq
{
// ypipe_base abstracts ypipe and ypipe_conflate specific
// classes, one is selected according to a the conflate
// socket option
template <typename T, int N> class ypipe_base_t
{
public:
virtual ~ypipe_base_t () {}
virtual void write (const T &value_, bool incomplete_) = 0;
virtual bool unwrite (T *value_) = 0;
virtual bool flush () = 0;
virtual bool check_read () = 0;
virtual bool read (T *value_) = 0;
virtual bool probe (bool (*fn)(T &)) = 0;
};
}
#endif
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_YPIPE_CONFLATE_HPP_INCLUDED__
#define __ZMQ_YPIPE_CONFLATE_HPP_INCLUDED__
#include "platform.hpp"
#include "dbuffer.hpp"
#include "ypipe_base.hpp"
namespace zmq
{
// Adapter for dbuffer, to plug it in instead of a queue for the sake
// of implementing the conflate socket option, which, if set, makes
// the receiving side to discard all incoming messages but the last one.
//
// reader_awake flag is needed here to mimic ypipe delicate behaviour
// around the reader being asleep (see 'c' pointer being NULL in ypipe.hpp)
template <typename T, int N> class ypipe_conflate_t : public ypipe_base_t<T,N>
{
public:
// Initialises the pipe.
inline ypipe_conflate_t ()
: reader_awake(false)
{
}
// The destructor doesn't have to be virtual. It is mad virtual
// just to keep ICC and code checking tools from complaining.
inline virtual ~ypipe_conflate_t ()
{
}
// Following function (write) deliberately copies uninitialised data
// when used with zmq_msg. Initialising the VSM body for
// non-VSM messages won't be good for performance.
#ifdef ZMQ_HAVE_OPENVMS
#pragma message save
#pragma message disable(UNINIT)
#endif
inline void write (const T &value_, bool incomplete_)
{
(void)incomplete_;
dbuffer.write (value_);
}
#ifdef ZMQ_HAVE_OPENVMS
#pragma message restore
#endif
// There are no incomplete items for conflate ypipe
inline bool unwrite (T *value_)
{
return false;
}
// Flush is no-op for conflate ypipe. Reader asleep behaviour
// is as of the usual ypipe.
// Returns false if the reader thread is sleeping. In that case,
// caller is obliged to wake the reader up before using the pipe again.
inline bool flush ()
{
return reader_awake;
}
// Check whether item is available for reading.
inline bool check_read ()
{
bool res = dbuffer.check_read ();
if (!res)
reader_awake = false;
return res;
}
// Reads an item from the pipe. Returns false if there is no value.
// available.
inline bool read (T *value_)
{
if (!check_read ())
return false;
return dbuffer.read (value_);
}
// Applies the function fn to the first elemenent in the pipe
// and returns the value returned by the fn.
// The pipe mustn't be empty or the function crashes.
inline bool probe (bool (*fn)(T &))
{
return dbuffer.probe (fn);
}
protected:
dbuffer_t <T> dbuffer;
bool reader_awake;
// Disable copying of ypipe object.
ypipe_conflate_t (const ypipe_conflate_t&);
const ypipe_conflate_t &operator = (const ypipe_conflate_t&);
};
}
#endif
...@@ -539,7 +539,7 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_) ...@@ -539,7 +539,7 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
a_[i].iov_len = zmq_msg_size (&msg); a_[i].iov_len = zmq_msg_size (&msg);
a_[i].iov_base = malloc(a_[i].iov_len); a_[i].iov_base = malloc(a_[i].iov_len);
if (!a_[i].iov_base) { if (unlikely (!a_[i].iov_base)) {
errno = ENOMEM; errno = ENOMEM;
return -1; return -1;
} }
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "stdint.hpp" #include "stdint.hpp"
#include "clock.hpp" #include "clock.hpp"
#include "err.hpp" #include "err.hpp"
#include "thread.hpp"
#if !defined ZMQ_HAVE_WINDOWS #if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h> #include <unistd.h>
...@@ -57,3 +58,17 @@ unsigned long zmq_stopwatch_stop (void *watch_) ...@@ -57,3 +58,17 @@ unsigned long zmq_stopwatch_stop (void *watch_)
free (watch_); free (watch_);
return (unsigned long) (end - start); return (unsigned long) (end - start);
} }
void *zmq_threadstart(zmq_thread_fn* func, void* arg)
{
zmq::thread_t* thread = new zmq::thread_t;
thread->start(func, arg);
return thread;
}
void zmq_threadclose(void* thread)
{
zmq::thread_t* pThread = static_cast<zmq::thread_t*>(thread);
pThread->stop();
delete pThread;
}
...@@ -31,7 +31,8 @@ noinst_PROGRAMS = test_pair_inproc \ ...@@ -31,7 +31,8 @@ noinst_PROGRAMS = test_pair_inproc \
test_spec_router \ test_spec_router \
test_spec_pushpull \ test_spec_pushpull \
test_req_request_ids \ test_req_request_ids \
test_req_strict test_req_strict \
test_conflate
if !ON_MINGW if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \ noinst_PROGRAMS += test_shutdown_stress \
...@@ -69,6 +70,7 @@ test_spec_router_SOURCES = test_spec_router.cpp ...@@ -69,6 +70,7 @@ test_spec_router_SOURCES = test_spec_router.cpp
test_spec_pushpull_SOURCES = test_spec_pushpull.cpp test_spec_pushpull_SOURCES = test_spec_pushpull.cpp
test_req_request_ids_SOURCES = test_req_request_ids.cpp test_req_request_ids_SOURCES = test_req_request_ids.cpp
test_req_strict_SOURCES = test_req_strict.cpp test_req_strict_SOURCES = test_req_strict.cpp
test_conflate_SOURCES = test_conflate.cpp
if !ON_MINGW if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp
......
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include "../include/zmq_utils.h"
#include "testutil.hpp"
int main (int argc, char *argv [])
{
const char *bind_to = "tcp://127.0.0.1:77008";
int rc;
void* ctx = zmq_init (1);
assert (ctx);
void* s_in = zmq_socket (ctx, ZMQ_PULL);
assert (s_in);
int conflate = 1;
rc = zmq_setsockopt (s_in, ZMQ_CONFLATE, &conflate, sizeof(conflate));
assert (rc == 0);
rc = zmq_bind (s_in, bind_to);
assert (rc == 0);
void* s_out = zmq_socket (ctx, ZMQ_PUSH);
assert (s_out);
rc = zmq_connect (s_out, bind_to);
assert (rc == 0);
int message_count = 20;
for (int j = 0; j < message_count; ++j) {
rc = zmq_send(s_out, (void*)&j, sizeof(int), 0);
if (rc < 0) {
printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
return -1;
}
}
zmq_sleep (1);
int payload_recved = 0;
rc = zmq_recv(s_in, (void*)&payload_recved, sizeof(int), 0);
assert (rc > 0);
assert (payload_recved == message_count - 1);
rc = zmq_close (s_in);
assert (rc == 0);
rc = zmq_close (s_out);
assert (rc == 0);
rc = zmq_term (ctx);
assert (rc == 0);
return 0;
}
...@@ -18,17 +18,16 @@ ...@@ -18,17 +18,16 @@
*/ */
#include "../include/zmq.h" #include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <errno.h> #include <errno.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <unistd.h>
#include <string> #include <string>
#include "testutil.hpp"
#undef NDEBUG
#include <assert.h>
int main (void) int main (void)
{ {
setup_test_environment();
int val; int val;
int rc; int rc;
char buffer[16]; char buffer[16];
...@@ -198,11 +197,10 @@ int main (void) ...@@ -198,11 +197,10 @@ int main (void)
rc = zmq_close (backend); rc = zmq_close (backend);
assert (rc == 0); assert (rc == 0);
// Give time to process disconnect // Give time to process disconnect
// There's no way to do this except with a sleep // There's no way to do this except with a sleep
struct timespec t = { 0, 250 * 1000000 }; zmq_sleep(1);
nanosleep (&t, NULL);
// Send a message, should fail // Send a message, should fail
rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT); rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
......
...@@ -20,12 +20,11 @@ ...@@ -20,12 +20,11 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include <stdio.h> #include <stdio.h>
#include <errno.h> #include <errno.h>
#include "testutil.hpp"
#undef NDEBUG
#include <assert.h>
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -19,12 +19,11 @@ ...@@ -19,12 +19,11 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include <string.h> #include <string.h>
#include <stdbool.h> #include "testutil.hpp"
#undef NDEBUG
#include <assert.h>
int main (void) int main (void)
{ {
setup_test_environment();
int rc; int rc;
// Set up our context and sockets // Set up our context and sockets
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include <zmq.h> #include <zmq.h>
#include <string.h> #include <string.h>
#include <assert.h> #include "testutil.hpp"
/// Initialize a zeromq message with a given null-terminated string /// Initialize a zeromq message with a given null-terminated string
#define ZMQ_PREPARE_STRING(msg, data, size) \ #define ZMQ_PREPARE_STRING(msg, data, size) \
...@@ -31,6 +31,7 @@ int publicationsReceived = 0; ...@@ -31,6 +31,7 @@ int publicationsReceived = 0;
bool isSubscribed = false; bool isSubscribed = false;
int main(int argc, char** argv) { int main(int argc, char** argv) {
setup_test_environment();
void* context = zmq_ctx_new(); void* context = zmq_ctx_new();
void* pubSocket; void* pubSocket;
void* subSocket; void* subSocket;
......
...@@ -20,11 +20,11 @@ ...@@ -20,11 +20,11 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#undef NDEBUG #include "testutil.hpp"
#include <assert.h>
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -19,12 +19,11 @@ ...@@ -19,12 +19,11 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include <stdio.h> #include <stdio.h>
#include "testutil.hpp"
#undef NDEBUG
#include <assert.h>
int main (void) int main (void)
{ {
setup_test_environment();
// Create REQ/ROUTER wiring. // Create REQ/ROUTER wiring.
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -18,12 +18,11 @@ ...@@ -18,12 +18,11 @@
*/ */
#include "../include/zmq.h" #include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <stdio.h> #include <stdio.h>
#include <unistd.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#undef NDEBUG #include "testutil.hpp"
#include <assert.h>
// XSI vector I/O // XSI vector I/O
#if defined ZMQ_HAVE_UIO #if defined ZMQ_HAVE_UIO
...@@ -37,6 +36,7 @@ struct iovec { ...@@ -37,6 +36,7 @@ struct iovec {
void do_check(void* sb, void* sc, unsigned int msgsz) void do_check(void* sb, void* sc, unsigned int msgsz)
{ {
setup_test_environment();
int rc; int rc;
int sum =0; int sum =0;
for (int i = 0; i < 10; i++) for (int i = 0; i < 10; i++)
...@@ -85,7 +85,7 @@ int main (void) ...@@ -85,7 +85,7 @@ int main (void)
rc = zmq_bind (sb, "inproc://a"); rc = zmq_bind (sb, "inproc://a");
assert (rc == 0); assert (rc == 0);
::sleep(1); zmq_sleep(1);
void *sc = zmq_socket (ctx, ZMQ_PUSH); void *sc = zmq_socket (ctx, ZMQ_PUSH);
rc = zmq_connect (sc, "inproc://a"); rc = zmq_connect (sc, "inproc://a");
...@@ -107,5 +107,5 @@ int main (void) ...@@ -107,5 +107,5 @@ int main (void)
rc = zmq_ctx_term (ctx); rc = zmq_ctx_term (ctx);
assert (rc == 0); assert (rc == 0);
return 0; return 0;
} }
...@@ -19,9 +19,7 @@ ...@@ -19,9 +19,7 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include <string.h> #include <string.h>
#include "testutil.hpp"
#undef NDEBUG
#include <assert.h>
static void do_bind_and_verify (void *s, const char *endpoint) static void do_bind_and_verify (void *s, const char *endpoint)
{ {
...@@ -35,6 +33,7 @@ static void do_bind_and_verify (void *s, const char *endpoint) ...@@ -35,6 +33,7 @@ static void do_bind_and_verify (void *s, const char *endpoint)
int main (void) int main (void)
{ {
setup_test_environment();
// Create the infrastructure // Create the infrastructure
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -19,9 +19,8 @@ ...@@ -19,9 +19,8 @@
#include <string> #include <string>
#include "../include/zmq.h" #include "../include/zmq.h"
#include <pthread.h> #include "../include/zmq_utils.h"
#include <string.h> #include <string.h>
#include <unistd.h>
#include "testutil.hpp" #include "testutil.hpp"
// REQ socket events handled // REQ socket events handled
...@@ -67,7 +66,7 @@ static bool read_msg(void* s, zmq_event_t& event, std::string& ep) ...@@ -67,7 +66,7 @@ static bool read_msg(void* s, zmq_event_t& event, std::string& ep)
// REQ socket monitor thread // REQ socket monitor thread
static void *req_socket_monitor (void *ctx) static void req_socket_monitor (void *ctx)
{ {
zmq_event_t event; zmq_event_t event;
std::string ep ; std::string ep ;
...@@ -105,11 +104,10 @@ static void *req_socket_monitor (void *ctx) ...@@ -105,11 +104,10 @@ static void *req_socket_monitor (void *ctx)
} }
} }
zmq_close (s); zmq_close (s);
return NULL;
} }
// 2nd REQ socket monitor thread // 2nd REQ socket monitor thread
static void *req2_socket_monitor (void *ctx) static void req2_socket_monitor (void *ctx)
{ {
zmq_event_t event; zmq_event_t event;
std::string ep ; std::string ep ;
...@@ -134,11 +132,10 @@ static void *req2_socket_monitor (void *ctx) ...@@ -134,11 +132,10 @@ static void *req2_socket_monitor (void *ctx)
} }
} }
zmq_close (s); zmq_close (s);
return NULL;
} }
// REP socket monitor thread // REP socket monitor thread
static void *rep_socket_monitor (void *ctx) static void rep_socket_monitor (void *ctx)
{ {
zmq_event_t event; zmq_event_t event;
std::string ep ; std::string ep ;
...@@ -175,16 +172,16 @@ static void *rep_socket_monitor (void *ctx) ...@@ -175,16 +172,16 @@ static void *rep_socket_monitor (void *ctx)
} }
} }
zmq_close (s); zmq_close (s);
return NULL;
} }
int main (void) int main (void)
{ {
setup_test_environment();
int rc; int rc;
void *req; void *req;
void *req2; void *req2;
void *rep; void *rep;
pthread_t threads [3]; void* threads [3];
addr = "tcp://127.0.0.1:5560"; addr = "tcp://127.0.0.1:5560";
...@@ -208,8 +205,7 @@ int main (void) ...@@ -208,8 +205,7 @@ int main (void)
// REP socket monitor, all events // REP socket monitor, all events
rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL); rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL);
assert (rc == 0); assert (rc == 0);
rc = pthread_create (&threads [0], NULL, rep_socket_monitor, ctx); threads [0] = zmq_threadstart(&rep_socket_monitor, ctx);
assert (rc == 0);
// REQ socket // REQ socket
req = zmq_socket (ctx, ZMQ_REQ); req = zmq_socket (ctx, ZMQ_REQ);
...@@ -218,9 +214,8 @@ int main (void) ...@@ -218,9 +214,8 @@ int main (void)
// REQ socket monitor, all events // REQ socket monitor, all events
rc = zmq_socket_monitor (req, "inproc://monitor.req", ZMQ_EVENT_ALL); rc = zmq_socket_monitor (req, "inproc://monitor.req", ZMQ_EVENT_ALL);
assert (rc == 0); assert (rc == 0);
rc = pthread_create (&threads [1], NULL, req_socket_monitor, ctx); threads [1] = zmq_threadstart(&req_socket_monitor, ctx);
assert (rc == 0); zmq_sleep(1);
sleep (1);
// Bind REQ and REP // Bind REQ and REP
rc = zmq_bind (rep, addr.c_str()); rc = zmq_bind (rep, addr.c_str());
...@@ -238,8 +233,7 @@ int main (void) ...@@ -238,8 +233,7 @@ int main (void)
// 2nd REQ socket monitor, connected event only // 2nd REQ socket monitor, connected event only
rc = zmq_socket_monitor (req2, "inproc://monitor.req2", ZMQ_EVENT_CONNECTED); rc = zmq_socket_monitor (req2, "inproc://monitor.req2", ZMQ_EVENT_CONNECTED);
assert (rc == 0); assert (rc == 0);
rc = pthread_create (&threads [2], NULL, req2_socket_monitor, ctx); threads [2] = zmq_threadstart(&req2_socket_monitor, ctx);
assert (rc == 0);
rc = zmq_connect (req2, addr.c_str()); rc = zmq_connect (req2, addr.c_str());
assert (rc == 0); assert (rc == 0);
...@@ -249,8 +243,7 @@ int main (void) ...@@ -249,8 +243,7 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// Allow some time for detecting error states // Allow some time for detecting error states
struct timespec t = { 0, 250 * 1000000 }; zmq_sleep(1);
nanosleep (&t, NULL);
// Close the REQ socket // Close the REQ socket
rc = zmq_close (req); rc = zmq_close (req);
...@@ -276,7 +269,8 @@ int main (void) ...@@ -276,7 +269,8 @@ int main (void)
assert (req2_socket_events & ZMQ_EVENT_CONNECTED); assert (req2_socket_events & ZMQ_EVENT_CONNECTED);
assert (!(req2_socket_events & ZMQ_EVENT_CLOSED)); assert (!(req2_socket_events & ZMQ_EVENT_CLOSED));
pthread_exit (NULL); for (unsigned int i = 0; i < 3; ++i)
zmq_threadclose(threads [i]);
return 0 ; return 0 ;
} }
...@@ -19,12 +19,11 @@ ...@@ -19,12 +19,11 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include <string.h> #include <string.h>
#include "testutil.hpp"
#undef NDEBUG
#include <assert.h>
int main (void) int main (void)
{ {
setup_test_environment();
// Create the infrastructure // Create the infrastructure
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -20,11 +20,11 @@ ...@@ -20,11 +20,11 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#undef NDEBUG #include "testutil.hpp"
#include <assert.h>
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -19,9 +19,7 @@ ...@@ -19,9 +19,7 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include <string.h> #include <string.h>
#include <stdbool.h> #include "testutil.hpp"
#undef NDEBUG
#include <assert.h>
// ZMTP protocol greeting structure // ZMTP protocol greeting structure
...@@ -45,6 +43,7 @@ static zmtp_greeting_t greeting ...@@ -45,6 +43,7 @@ static zmtp_greeting_t greeting
int main (void) int main (void)
{ {
setup_test_environment();
int rc; int rc;
// Set up our context and sockets // Set up our context and sockets
......
...@@ -18,12 +18,12 @@ ...@@ -18,12 +18,12 @@
*/ */
#include <stdio.h> #include <stdio.h>
#include <unistd.h>
#include <time.h> #include <time.h>
#include "testutil.hpp" #include "testutil.hpp"
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -17,13 +17,14 @@ ...@@ -17,13 +17,14 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../include/zmq_utils.h"
#include <stdio.h> #include <stdio.h>
#include <unistd.h>
#include <time.h> #include <time.h>
#include "testutil.hpp" #include "testutil.hpp"
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
...@@ -57,9 +58,7 @@ int main (void) ...@@ -57,9 +58,7 @@ int main (void)
// We have to give the connects time to finish otherwise the requests // We have to give the connects time to finish otherwise the requests
// will not properly round-robin. We could alternatively connect the // will not properly round-robin. We could alternatively connect the
// REQ sockets to the REP sockets. // REQ sockets to the REP sockets.
struct timespec t = { 0, 250 * 1000000 }; zmq_sleep(1);
nanosleep (&t, NULL);
// Case 1: Second send() before a reply arrives in a pipe. // Case 1: Second send() before a reply arrives in a pipe.
......
...@@ -20,12 +20,11 @@ ...@@ -20,12 +20,11 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include "testutil.hpp"
#undef NDEBUG
#include <assert.h>
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -20,11 +20,11 @@ ...@@ -20,11 +20,11 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#undef NDEBUG #include "testutil.hpp"
#include <assert.h>
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
void *router = zmq_socket (ctx, ZMQ_ROUTER); void *router = zmq_socket (ctx, ZMQ_ROUTER);
......
...@@ -17,13 +17,12 @@ ...@@ -17,13 +17,12 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <pthread.h> #include "../include/zmq_utils.h"
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
#include "testutil.hpp" #include "testutil.hpp"
static void * static void zap_handler (void *zap)
zap_handler (void *zap)
{ {
char *version = s_recv (zap); char *version = s_recv (zap);
char *sequence = s_recv (zap); char *sequence = s_recv (zap);
...@@ -62,12 +61,11 @@ zap_handler (void *zap) ...@@ -62,12 +61,11 @@ zap_handler (void *zap)
int rc = zmq_close (zap); int rc = zmq_close (zap);
assert (rc == 0); assert (rc == 0);
return NULL;
} }
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
...@@ -177,9 +175,7 @@ int main (void) ...@@ -177,9 +175,7 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// Spawn ZAP handler // Spawn ZAP handler
pthread_t zap_thread; void* zap_thread = zmq_threadstart(&zap_handler, zap);
rc = pthread_create (&zap_thread, NULL, &zap_handler, zap);
assert (rc == 0);
rc = zmq_bind (server, "tcp://*:9998"); rc = zmq_bind (server, "tcp://*:9998");
assert (rc == 0); assert (rc == 0);
...@@ -194,7 +190,7 @@ int main (void) ...@@ -194,7 +190,7 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// Wait until ZAP handler terminates. // Wait until ZAP handler terminates.
pthread_join (zap_thread, NULL); zmq_threadclose(zap_thread);
// Check PLAIN security -- two servers trying to talk to each other // Check PLAIN security -- two servers trying to talk to each other
server = zmq_socket (ctx, ZMQ_DEALER); server = zmq_socket (ctx, ZMQ_DEALER);
......
...@@ -18,13 +18,12 @@ ...@@ -18,13 +18,12 @@
*/ */
#include "platform.hpp" #include "platform.hpp"
#include <pthread.h> #include "../include/zmq_utils.h"
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
#include "testutil.hpp" #include "testutil.hpp"
static void * static void zap_handler (void *zap)
zap_handler (void *zap)
{ {
char *version = s_recv (zap); char *version = s_recv (zap);
char *sequence = s_recv (zap); char *sequence = s_recv (zap);
...@@ -52,8 +51,6 @@ zap_handler (void *zap) ...@@ -52,8 +51,6 @@ zap_handler (void *zap)
int rc = zmq_close (zap); int rc = zmq_close (zap);
assert (rc == 0); assert (rc == 0);
return NULL;
} }
int main (void) int main (void)
...@@ -62,6 +59,7 @@ int main (void) ...@@ -62,6 +59,7 @@ int main (void)
printf ("libsodium not installed, skipping CURVE test\n"); printf ("libsodium not installed, skipping CURVE test\n");
return 0; return 0;
#endif #endif
setup_test_environment();
int rc; int rc;
size_t optsize; size_t optsize;
int mechanism; int mechanism;
...@@ -122,9 +120,7 @@ int main (void) ...@@ -122,9 +120,7 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// Spawn ZAP handler // Spawn ZAP handler
pthread_t zap_thread; void* zap_thread = zmq_threadstart(&zap_handler, zap);
rc = pthread_create (&zap_thread, NULL, &zap_handler, zap);
assert (rc == 0);
rc = zmq_bind (server, "tcp://*:9998"); rc = zmq_bind (server, "tcp://*:9998");
assert (rc == 0); assert (rc == 0);
...@@ -139,7 +135,7 @@ int main (void) ...@@ -139,7 +135,7 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// Wait until ZAP handler terminates. // Wait until ZAP handler terminates.
pthread_join (zap_thread, NULL); zmq_threadclose(zap_thread);
// Shutdown // Shutdown
rc = zmq_ctx_term (ctx); rc = zmq_ctx_term (ctx);
......
...@@ -18,18 +18,16 @@ ...@@ -18,18 +18,16 @@
*/ */
#include "../include/zmq.h" #include "../include/zmq.h"
#include <pthread.h> #include "../include/zmq_utils.h"
#include <stddef.h> #include <stddef.h>
#include <stdio.h> #include <stdio.h>
#include "testutil.hpp"
#undef NDEBUG
#include <assert.h>
#define THREAD_COUNT 100 #define THREAD_COUNT 100
extern "C" extern "C"
{ {
static void *worker (void *s) static void worker (void *s)
{ {
int rc; int rc;
...@@ -39,19 +37,18 @@ extern "C" ...@@ -39,19 +37,18 @@ extern "C"
// Start closing the socket while the connecting process is underway. // Start closing the socket while the connecting process is underway.
rc = zmq_close (s); rc = zmq_close (s);
assert (rc == 0); assert (rc == 0);
return NULL;
} }
} }
int main (void) int main (void)
{ {
setup_test_environment();
void *s1; void *s1;
void *s2; void *s2;
int i; int i;
int j; int j;
int rc; int rc;
pthread_t threads [THREAD_COUNT]; void* threads [THREAD_COUNT];
for (j = 0; j != 10; j++) { for (j = 0; j != 10; j++) {
...@@ -69,13 +66,11 @@ int main (void) ...@@ -69,13 +66,11 @@ int main (void)
for (i = 0; i != THREAD_COUNT; i++) { for (i = 0; i != THREAD_COUNT; i++) {
s2 = zmq_socket (ctx, ZMQ_SUB); s2 = zmq_socket (ctx, ZMQ_SUB);
assert (s2); assert (s2);
rc = pthread_create (&threads [i], NULL, worker, s2); threads [i] = zmq_threadstart(&worker, s2);
assert (rc == 0);
} }
for (i = 0; i != THREAD_COUNT; i++) { for (i = 0; i != THREAD_COUNT; i++) {
rc = pthread_join (threads [i], NULL); zmq_threadclose(threads [i]);
assert (rc == 0);
} }
rc = zmq_close (s1); rc = zmq_close (s1);
......
...@@ -220,6 +220,7 @@ void test_block_on_send_no_peers (void *ctx) ...@@ -220,6 +220,7 @@ void test_block_on_send_no_peers (void *ctx)
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -258,6 +258,7 @@ void test_destroy_queue_on_disconnect (void *ctx) ...@@ -258,6 +258,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -123,6 +123,7 @@ void test_envelope (void *ctx) ...@@ -123,6 +123,7 @@ void test_envelope (void *ctx)
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../include/zmq_utils.h"
#include <stdio.h> #include <stdio.h>
#include <unistd.h>
#include <time.h> #include <time.h>
#include "testutil.hpp" #include "testutil.hpp"
...@@ -49,8 +49,7 @@ void test_round_robin_out (void *ctx) ...@@ -49,8 +49,7 @@ void test_round_robin_out (void *ctx)
// We have to give the connects time to finish otherwise the requests // We have to give the connects time to finish otherwise the requests
// will not properly round-robin. We could alternatively connect the // will not properly round-robin. We could alternatively connect the
// REQ sockets to the REP sockets. // REQ sockets to the REP sockets.
struct timespec t = { 0, 250 * 1000000 }; zmq_sleep(1);
nanosleep (&t, NULL);
// Send our peer-replies, and expect every REP it used once in order // Send our peer-replies, and expect every REP it used once in order
for (size_t peer = 0; peer < services; peer++) { for (size_t peer = 0; peer < services; peer++) {
...@@ -217,6 +216,7 @@ void test_block_on_send_no_peers (void *ctx) ...@@ -217,6 +216,7 @@ void test_block_on_send_no_peers (void *ctx)
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -177,6 +177,7 @@ void test_destroy_queue_on_disconnect (void *ctx) ...@@ -177,6 +177,7 @@ void test_destroy_queue_on_disconnect (void *ctx)
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -19,9 +19,7 @@ ...@@ -19,9 +19,7 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include <string.h> #include <string.h>
#include <stdbool.h> #include "testutil.hpp"
#undef NDEBUG
#include <assert.h>
// ZMTP protocol greeting structure // ZMTP protocol greeting structure
...@@ -222,6 +220,7 @@ test_stream_to_stream (void) ...@@ -222,6 +220,7 @@ test_stream_to_stream (void)
int main (void) int main (void)
{ {
setup_test_environment();
test_stream_to_dealer (); test_stream_to_dealer ();
test_stream_to_stream (); test_stream_to_stream ();
} }
...@@ -18,13 +18,14 @@ ...@@ -18,13 +18,14 @@
*/ */
#include "../include/zmq.h" #include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <stdio.h> #include <stdio.h>
#include <time.h> #include <time.h>
#undef NDEBUG #include "testutil.hpp"
#include <assert.h>
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
...@@ -62,8 +63,7 @@ int main (void) ...@@ -62,8 +63,7 @@ int main (void)
assert (rc >= 0); assert (rc >= 0);
// Wait a bit till the subscription gets to the publisher // Wait a bit till the subscription gets to the publisher
struct timespec t = { 0, 250 * 1000000 }; zmq_sleep(1);
nanosleep (&t, NULL);
// Send an empty message // Send an empty message
rc = zmq_send (pub, NULL, 0, 0); rc = zmq_send (pub, NULL, 0, 0);
......
...@@ -18,15 +18,14 @@ ...@@ -18,15 +18,14 @@
*/ */
#include "../include/zmq.h" #include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <string.h> #include <string.h>
#include <unistd.h>
#include <time.h> #include <time.h>
#include "testutil.hpp"
#undef NDEBUG
#include <assert.h>
int main (void) int main (void)
{ {
setup_test_environment();
int rc; int rc;
char buf[32]; char buf[32];
const char *ep = "tcp://127.0.0.1:5560"; const char *ep = "tcp://127.0.0.1:5560";
...@@ -54,8 +53,7 @@ int main (void) ...@@ -54,8 +53,7 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// Allow unbind to settle // Allow unbind to settle
struct timespec t = { 0, 250 * 1000000 }; zmq_sleep(1);
nanosleep (&t, NULL);
// Check that sending would block (there's no outbound connection) // Check that sending would block (there's no outbound connection)
rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT); rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT);
...@@ -92,7 +90,7 @@ int main (void) ...@@ -92,7 +90,7 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// Allow disconnect to settle // Allow disconnect to settle
nanosleep (&t, NULL); zmq_sleep(1);
// Check that sending would block (there's no inbound connections). // Check that sending would block (there's no inbound connections).
rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT); rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT);
......
...@@ -18,15 +18,15 @@ ...@@ -18,15 +18,15 @@
*/ */
#include "../include/zmq.h" #include "../include/zmq.h"
#include <sys/time.h> #include "../include/zmq_utils.h"
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#undef NDEBUG #include "testutil.hpp"
#include <assert.h>
int main (void) int main (void)
{ {
setup_test_environment();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
...@@ -46,17 +46,11 @@ int main (void) ...@@ -46,17 +46,11 @@ int main (void)
rc = zmq_setsockopt (frontend, ZMQ_RCVTIMEO, &timeout, sizeof (int)); rc = zmq_setsockopt (frontend, ZMQ_RCVTIMEO, &timeout, sizeof (int));
assert (rc == 0); assert (rc == 0);
struct timeval before, after; void* stopwatch = zmq_stopwatch_start();
gettimeofday (&before, NULL);
rc = zmq_recv (frontend, buffer, 32, 0); rc = zmq_recv (frontend, buffer, 32, 0);
assert (rc == -1); assert (rc == -1);
assert (zmq_errno () == EAGAIN); assert (zmq_errno () == EAGAIN);
gettimeofday (&after, NULL); unsigned int elapsed = zmq_stopwatch_stop(stopwatch) / 1000;
long elapsed = (long)
((after.tv_sec * 1000 + after.tv_usec / 1000)
- (before.tv_sec * 1000 + before.tv_usec / 1000));
assert (elapsed > 200 && elapsed < 300); assert (elapsed > 200 && elapsed < 300);
// Check that normal message flow works as expected // Check that normal message flow works as expected
......
...@@ -22,10 +22,16 @@ ...@@ -22,10 +22,16 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include <string.h> #include <string.h>
#undef NDEBUG #undef NDEBUG
#include <assert.h> #include <assert.h>
#include <stdarg.h> #include <stdarg.h>
#if defined _WIN32
#include <crtdbg.h>
#pragma warning(disable:4996)
#endif
// Bounce a message from client to server and back // Bounce a message from client to server and back
// For REQ/REP or DEALER/DEALER pairs only // For REQ/REP or DEALER/DEALER pairs only
...@@ -191,4 +197,13 @@ void close_zero_linger (void *socket) ...@@ -191,4 +197,13 @@ void close_zero_linger (void *socket)
assert (rc == 0); assert (rc == 0);
} }
void setup_test_environment()
{
#if defined _WIN32
_set_abort_behavior( 0, _WRITE_ABORT_MSG);
_CrtSetReportMode( _CRT_ASSERT, _CRTDBG_MODE_FILE );
_CrtSetReportFile( _CRT_ASSERT, _CRTDBG_FILE_STDERR );
#endif
}
#endif #endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment