Unverified Commit 173b54a8 authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #2950 from zeromq/add-unittests-mtrie

Problem: no unit tests for mtrie
parents 31387f84 9cd01bb5
...@@ -611,6 +611,8 @@ set (cxx-sources ...@@ -611,6 +611,8 @@ set (cxx-sources
fd.hpp fd.hpp
fq.hpp fq.hpp
gather.hpp gather.hpp
generic_mtrie.hpp
generic_mtrie_impl.hpp
gssapi_client.hpp gssapi_client.hpp
gssapi_mechanism_base.hpp gssapi_mechanism_base.hpp
gssapi_server.hpp gssapi_server.hpp
......
...@@ -64,6 +64,8 @@ src_libzmq_la_SOURCES = \ ...@@ -64,6 +64,8 @@ src_libzmq_la_SOURCES = \
src/fq.hpp \ src/fq.hpp \
src/gather.cpp \ src/gather.cpp \
src/gather.hpp \ src/gather.hpp \
src/generic_mtrie.hpp \
src/generic_mtrie_impl.hpp \
src/gssapi_mechanism_base.cpp \ src/gssapi_mechanism_base.cpp \
src/gssapi_mechanism_base.hpp \ src/gssapi_mechanism_base.hpp \
src/gssapi_client.cpp \ src/gssapi_client.cpp \
...@@ -415,6 +417,7 @@ test_apps = \ ...@@ -415,6 +417,7 @@ test_apps = \
tests/test_xpub_nodrop \ tests/test_xpub_nodrop \
tests/test_xpub_manual \ tests/test_xpub_manual \
tests/test_xpub_welcome_msg \ tests/test_xpub_welcome_msg \
tests/test_xpub_verbose \
tests/test_atomics \ tests/test_atomics \
tests/test_sockopt_hwm \ tests/test_sockopt_hwm \
tests/test_heartbeats \ tests/test_heartbeats \
...@@ -621,6 +624,10 @@ tests_test_xpub_manual_LDADD = src/libzmq.la ...@@ -621,6 +624,10 @@ tests_test_xpub_manual_LDADD = src/libzmq.la
tests_test_xpub_welcome_msg_SOURCES = tests/test_xpub_welcome_msg.cpp tests_test_xpub_welcome_msg_SOURCES = tests/test_xpub_welcome_msg.cpp
tests_test_xpub_welcome_msg_LDADD = src/libzmq.la tests_test_xpub_welcome_msg_LDADD = src/libzmq.la
tests_test_xpub_verbose_SOURCES = tests/test_xpub_verbose.cpp
tests_test_xpub_verbose_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_xpub_verbose_CPPFLAGS = ${UNITY_CPPFLAGS}
tests_test_atomics_SOURCES = tests/test_atomics.cpp tests_test_atomics_SOURCES = tests/test_atomics.cpp
tests_test_atomics_LDADD = src/libzmq.la tests_test_atomics_LDADD = src/libzmq.la
...@@ -865,19 +872,32 @@ if ENABLE_STATIC ...@@ -865,19 +872,32 @@ if ENABLE_STATIC
# unit tests - these include individual source files and test the internal functions # unit tests - these include individual source files and test the internal functions
test_apps += \ test_apps += \
unittests/unittest_poller \ unittests/unittest_poller \
unittests/unittest_ypipe unittests/unittest_ypipe \
unittests/unittest_mtrie
unittests_unittest_poller_SOURCES = unittests/unittest_poller.cpp unittests_unittest_poller_SOURCES = unittests/unittest_poller.cpp
unittests_unittest_poller_CPPFLAGS = -I$(top_srcdir)/src ${UNITY_CPPFLAGS} unittests_unittest_poller_CPPFLAGS = -I$(top_srcdir)/src ${UNITY_CPPFLAGS} $(CODE_COVERAGE_CPPFLAGS)
unittests_unittest_poller_CXXFLAGS = $(CODE_COVERAGE_CXXFLAGS)
unittests_unittest_poller_LDADD = $(top_builddir)/src/.libs/libzmq.a \ unittests_unittest_poller_LDADD = $(top_builddir)/src/.libs/libzmq.a \
${src_libzmq_la_LIBADD} \ ${src_libzmq_la_LIBADD} \
${UNITY_LIBS} ${UNITY_LIBS} \
$(CODE_COVERAGE_LDFLAGS)
unittests_unittest_ypipe_SOURCES = unittests/unittest_ypipe.cpp unittests_unittest_ypipe_SOURCES = unittests/unittest_ypipe.cpp
unittests_unittest_ypipe_CPPFLAGS = -I$(top_srcdir)/src ${UNITY_CPPFLAGS} unittests_unittest_ypipe_CPPFLAGS = -I$(top_srcdir)/src ${UNITY_CPPFLAGS} $(CODE_COVERAGE_CPPFLAGS)
unittests_unittest_ypipe_CXXFLAGS = $(CODE_COVERAGE_CXXFLAGS)
unittests_unittest_ypipe_LDADD = $(top_builddir)/src/.libs/libzmq.a \ unittests_unittest_ypipe_LDADD = $(top_builddir)/src/.libs/libzmq.a \
${src_libzmq_la_LIBADD} \ ${src_libzmq_la_LIBADD} \
${UNITY_LIBS} ${UNITY_LIBS} \
$(CODE_COVERAGE_LDFLAGS)
unittests_unittest_mtrie_SOURCES = unittests/unittest_mtrie.cpp
unittests_unittest_mtrie_CPPFLAGS = -I$(top_srcdir)/src ${UNITY_CPPFLAGS} $(CODE_COVERAGE_CPPFLAGS)
unittests_unittest_mtrie_CXXFLAGS = $(CODE_COVERAGE_CXXFLAGS)
unittests_unittest_mtrie_LDADD = $(top_builddir)/src/.libs/libzmq.a \
${src_libzmq_la_LIBADD} \
${UNITY_LIBS} \
$(CODE_COVERAGE_LDFLAGS)
endif endif
check_PROGRAMS = ${test_apps} check_PROGRAMS = ${test_apps}
......
0MQ version 4.2.4 stable, released on 20xx/xx/xx 0MQ version 4.2.4 stable, released on 20xx/xx/xx
================================================ ================================================
* Fixed #2820 - further clarify ZMQ_XPUB_VERBOSE(R) documentation.
0MQ version 4.2.3 stable, released on 2017/12/13 0MQ version 4.2.3 stable, released on 2017/12/13
================================================ ================================================
......
...@@ -1003,11 +1003,12 @@ Default value:: N/A ...@@ -1003,11 +1003,12 @@ Default value:: N/A
Applicable socket types:: ZMQ_SUB Applicable socket types:: ZMQ_SUB
ZMQ_XPUB_VERBOSE: pass subscribe messages on XPUB socket ZMQ_XPUB_VERBOSE: pass duplicate subscribe messages on XPUB socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the 'XPUB' socket behaviour on new subscriptions. If enabled, Sets the 'XPUB' socket behaviour on new duplicated subscriptions. If enabled,
the socket passes all subscribe messages to the caller. If disabled, the socket passes all subscribe messages to the caller. If disabled,
these are not visible to the caller. The default is 0 (disabled). only the first subscription to each filter will be passed. The default is 0
(disabled).
[horizontal] [horizontal]
Option value type:: int Option value type:: int
...@@ -1016,11 +1017,12 @@ Default value:: 0 ...@@ -1016,11 +1017,12 @@ Default value:: 0
Applicable socket types:: ZMQ_XPUB Applicable socket types:: ZMQ_XPUB
ZMQ_XPUB_VERBOSER: pass subscribe and unsubscribe messages on XPUB socket ZMQ_XPUB_VERBOSER: pass duplicate subscribe and unsubscribe messages on XPUB socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the 'XPUB' socket behaviour on new subscriptions and ubsubscriptions. Sets the 'XPUB' socket behaviour on new duplicated subscriptions and
If enabled, the socket passes all subscribe and unsubscribe messages to the unsubscriptions. If enabled, the socket passes all subscribe and unsubscribe
caller. If disabled, these are not visible to the caller. The default is 0 messages to the caller. If disabled, only the first subscription to each filter and
the last unsubscription from each filter will be passed. The default is 0
(disabled). (disabled).
[horizontal] [horizontal]
......
/*
Copyright (c) 2018 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_GENERIC_MTRIE_HPP_INCLUDED__
#define __ZMQ_GENERIC_MTRIE_HPP_INCLUDED__
#include <stddef.h>
#include <set>
#include "stdint.hpp"
namespace zmq
{
// Multi-trie (prefix tree). Each node in the trie is a set of pointers.
template <typename T> class generic_mtrie_t
{
public:
typedef T value_t;
typedef const unsigned char *prefix_t;
enum rm_result
{
not_found,
last_value_removed,
values_remain
};
generic_mtrie_t ();
~generic_mtrie_t ();
// Add key to the trie. Returns true iff no entry with the same prefix_
// and size_ existed before.
bool add (prefix_t prefix_, size_t size_, value_t *value_);
// Remove all entries with a specific value from the trie.
// The call_on_uniq_ flag controls if the callback is invoked
// when there are no entries left on a prefix only (true)
// or on every removal (false). The arg_ argument is passed
// through to the callback function.
template <typename Arg>
void rm (value_t *value_,
void (*func_) (const unsigned char *data_, size_t size_, Arg arg_),
Arg arg_,
bool call_on_uniq_);
// Removes a specific entry from the trie.
// Returns the result of the operation.
rm_result rm (prefix_t prefix_, size_t size_, value_t *value_);
// Calls a callback function for all matching entries, i.e. any node
// corresponding to data_ or a prefix of it. The arg_ argument
// is passed through to the callback function.
template <typename Arg>
void match (prefix_t data_,
size_t size_,
void (*func_) (value_t *value_, Arg arg_),
Arg arg_);
private:
bool add_helper (prefix_t prefix_, size_t size_, value_t *value_);
template <typename Arg>
void rm_helper (value_t *value_,
unsigned char **buff_,
size_t buffsize_,
size_t maxbuffsize_,
void (*func_) (prefix_t data_, size_t size_, Arg arg_),
Arg arg_,
bool call_on_uniq_);
rm_result rm_helper (prefix_t prefix_, size_t size_, value_t *value_);
bool is_redundant () const;
typedef std::set<value_t *> pipes_t;
pipes_t *pipes;
unsigned char min;
unsigned short count;
unsigned short live_nodes;
union
{
class generic_mtrie_t<value_t> *node;
class generic_mtrie_t<value_t> **table;
} next;
generic_mtrie_t (const generic_mtrie_t<value_t> &);
const generic_mtrie_t<value_t> &
operator= (const generic_mtrie_t<value_t> &);
};
}
#endif
This diff is collapsed.
This diff is collapsed.
...@@ -30,74 +30,23 @@ ...@@ -30,74 +30,23 @@
#ifndef __ZMQ_MTRIE_HPP_INCLUDED__ #ifndef __ZMQ_MTRIE_HPP_INCLUDED__
#define __ZMQ_MTRIE_HPP_INCLUDED__ #define __ZMQ_MTRIE_HPP_INCLUDED__
#include <stddef.h> #include "generic_mtrie.hpp"
#include <set>
#include "stdint.hpp" #if __cplusplus >= 201103L || defined(_MSC_VER)
#define ZMQ_HAS_EXTERN_TEMPLATE 1
#else
#define ZMQ_HAS_EXTERN_TEMPLATE 0
#endif
namespace zmq namespace zmq
{ {
class pipe_t; class pipe_t;
// Multi-trie. Each node in the trie is a set of pointers to pipes. #if ZMQ_HAS_EXTERN_TEMPLATE
extern template class generic_mtrie_t<pipe_t>;
class mtrie_t #endif
{
public:
mtrie_t ();
~mtrie_t ();
// Add key to the trie. Returns true if it's a new subscription
// rather than a duplicate.
bool add (unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_);
// Remove all subscriptions for a specific peer from the trie.
// The call_on_uniq_ flag controls if the callback is invoked
// when there are no subscriptions left on some topics or on
// every removal.
void rm (zmq::pipe_t *pipe_,
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
void *arg_,
bool call_on_uniq_);
// Remove specific subscription from the trie. Return true is it was
// actually removed rather than de-duplicated.
bool rm (unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_);
// Signal all the matching pipes.
void match (unsigned char *data_,
size_t size_,
void (*func_) (zmq::pipe_t *pipe_, void *arg_),
void *arg_);
private:
bool add_helper (unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_);
void
rm_helper (zmq::pipe_t *pipe_,
unsigned char **buff_,
size_t buffsize_,
size_t maxbuffsize_,
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
void *arg_,
bool call_on_uniq_);
bool rm_helper (unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_);
bool is_redundant () const;
typedef std::set<zmq::pipe_t *> pipes_t;
pipes_t *pipes;
unsigned char min;
unsigned short count;
unsigned short live_nodes;
union
{
class mtrie_t *node;
class mtrie_t **table;
} next;
mtrie_t (const mtrie_t &); typedef generic_mtrie_t<pipe_t> mtrie_t;
const mtrie_t &operator= (const mtrie_t &);
};
} }
#endif #endif
...@@ -35,6 +35,7 @@ ...@@ -35,6 +35,7 @@
#include "err.hpp" #include "err.hpp"
#include "msg.hpp" #include "msg.hpp"
#include "macros.hpp" #include "macros.hpp"
#include "generic_mtrie_impl.hpp"
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_), socket_base_t (parent_, tid_, sid_),
...@@ -106,18 +107,23 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) ...@@ -106,18 +107,23 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
pending_metadata.push_back (metadata); pending_metadata.push_back (metadata);
pending_flags.push_back (0); pending_flags.push_back (0);
} else { } else {
bool unique; bool notify;
if (*data == 0) if (*data == 0) {
unique = subscriptions.rm (data + 1, size - 1, pipe_); mtrie_t::rm_result rm_result =
else subscriptions.rm (data + 1, size - 1, pipe_);
unique = subscriptions.add (data + 1, size - 1, pipe_); // TODO reconsider what to do if rm_result == mtrie_t::not_found
notify =
// If the (un)subscription is not a duplicate store it so that it can be rm_result != mtrie_t::values_remain || verbose_unsubs;
// passed to the user on next recv call unless verbose mode is enabled } else {
// which makes to pass always these messages. bool first_added =
if (options.type == ZMQ_XPUB subscriptions.add (data + 1, size - 1, pipe_);
&& (unique || (*data == 1 && verbose_subs) notify = first_added || verbose_subs;
|| (*data == 0 && verbose_unsubs && verbose_subs))) { }
// If the request was a new subscription, or the subscription
// was removed, or verbose mode is enabled, store it so that
// it can be passed to the user on next recv call.
if (options.type == ZMQ_XPUB && notify) {
pending_data.push_back (blob_t (data, size)); pending_data.push_back (blob_t (data, size));
if (metadata) if (metadata)
metadata->add_ref (); metadata->add_ref ();
...@@ -188,7 +194,7 @@ int zmq::xpub_t::xsetsockopt (int option_, ...@@ -188,7 +194,7 @@ int zmq::xpub_t::xsetsockopt (int option_,
return 0; return 0;
} }
static void stub (unsigned char *data_, size_t size_, void *arg_) static void stub (zmq::mtrie_t::prefix_t data_, size_t size_, void *arg_)
{ {
LIBZMQ_UNUSED (data_); LIBZMQ_UNUSED (data_);
LIBZMQ_UNUSED (size_); LIBZMQ_UNUSED (size_);
...@@ -204,7 +210,7 @@ void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_) ...@@ -204,7 +210,7 @@ void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
// Remove pipe without actually sending the message as it was taken // Remove pipe without actually sending the message as it was taken
// care of by the manual call above. subscriptions is the real mtrie, // care of by the manual call above. subscriptions is the real mtrie,
// so the pipe must be removed from there or it will be left over. // so the pipe must be removed from there or it will be left over.
subscriptions.rm (pipe_, stub, NULL, false); subscriptions.rm (pipe_, stub, (void *) NULL, false);
} else { } else {
// Remove the pipe from the trie. If there are topics that nobody // Remove the pipe from the trie. If there are topics that nobody
// is interested in anymore, send corresponding unsubscriptions // is interested in anymore, send corresponding unsubscriptions
...@@ -215,10 +221,9 @@ void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_) ...@@ -215,10 +221,9 @@ void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
dist.pipe_terminated (pipe_); dist.pipe_terminated (pipe_);
} }
void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_) void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, xpub_t *self_)
{ {
xpub_t *self = (xpub_t *) arg_; self_->dist.match (pipe_);
self->dist.match (pipe_);
} }
int zmq::xpub_t::xsend (msg_t *msg_) int zmq::xpub_t::xsend (msg_t *msg_)
...@@ -295,26 +300,24 @@ bool zmq::xpub_t::xhas_in () ...@@ -295,26 +300,24 @@ bool zmq::xpub_t::xhas_in ()
return !pending_data.empty (); return !pending_data.empty ();
} }
void zmq::xpub_t::send_unsubscription (unsigned char *data_, void zmq::xpub_t::send_unsubscription (zmq::mtrie_t::prefix_t data_,
size_t size_, size_t size_,
void *arg_) xpub_t *self_)
{ {
xpub_t *self = (xpub_t *) arg_; if (self_->options.type != ZMQ_PUB) {
if (self->options.type != ZMQ_PUB) {
// Place the unsubscription to the queue of pending (un)subscriptions // Place the unsubscription to the queue of pending (un)subscriptions
// to be retrieved by the user later on. // to be retrieved by the user later on.
blob_t unsub (size_ + 1); blob_t unsub (size_ + 1);
*unsub.data () = 0; *unsub.data () = 0;
if (size_ > 0) if (size_ > 0)
memcpy (unsub.data () + 1, data_, size_); memcpy (unsub.data () + 1, data_, size_);
self->pending_data.ZMQ_PUSH_OR_EMPLACE_BACK (ZMQ_MOVE (unsub)); self_->pending_data.ZMQ_PUSH_OR_EMPLACE_BACK (ZMQ_MOVE (unsub));
self->pending_metadata.push_back (NULL); self_->pending_metadata.push_back (NULL);
self->pending_flags.push_back (0); self_->pending_flags.push_back (0);
if (self->manual) { if (self_->manual) {
self->last_pipe = NULL; self_->last_pipe = NULL;
self->pending_pipes.push_back (NULL); self_->pending_pipes.push_back (NULL);
} }
} }
} }
...@@ -66,11 +66,12 @@ class xpub_t : public socket_base_t ...@@ -66,11 +66,12 @@ class xpub_t : public socket_base_t
private: private:
// Function to be applied to the trie to send all the subscriptions // Function to be applied to the trie to send all the subscriptions
// upstream. // upstream.
static void static void send_unsubscription (zmq::mtrie_t::prefix_t data_,
send_unsubscription (unsigned char *data_, size_t size_, void *arg_); size_t size_,
xpub_t *self_);
// Function to be applied to each matching pipes. // Function to be applied to each matching pipes.
static void mark_as_matching (zmq::pipe_t *pipe_, void *arg_); static void mark_as_matching (zmq::pipe_t *pipe_, xpub_t *arg_);
// List of all subscriptions mapped to corresponding pipes. // List of all subscriptions mapped to corresponding pipes.
mtrie_t subscriptions; mtrie_t subscriptions;
......
...@@ -65,6 +65,7 @@ set(tests ...@@ -65,6 +65,7 @@ set(tests
test_stream_timeout test_stream_timeout
test_xpub_manual test_xpub_manual
test_xpub_welcome_msg test_xpub_welcome_msg
test_xpub_verbose
test_base85 test_base85
test_bind_after_connect_tcp test_bind_after_connect_tcp
test_sodium test_sodium
......
This diff is collapsed.
...@@ -4,6 +4,7 @@ cmake_minimum_required(VERSION "2.8.1") ...@@ -4,6 +4,7 @@ cmake_minimum_required(VERSION "2.8.1")
set(unittests set(unittests
unittest_ypipe unittest_ypipe
unittest_poller unittest_poller
unittest_mtrie
) )
#IF (ENABLE_DRAFTS) #IF (ENABLE_DRAFTS)
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment