Commit daa7a802 authored by danielkr's avatar danielkr

Plug in dbuffer to serve the ZMQ_CONFLATE option

ZMQ_CONFLATE option is passed to pipepair() which creates a usual
ypipe_t or ypipe_conflate_t and plugs it into pipe_t under a common
abstract base.
parent 4c35b88a
...@@ -88,6 +88,7 @@ libzmq_la_SOURCES = \ ...@@ -88,6 +88,7 @@ libzmq_la_SOURCES = \
dealer.hpp \ dealer.hpp \
xsub.hpp \ xsub.hpp \
ypipe.hpp \ ypipe.hpp \
ypipe_flat.hpp \
yqueue.hpp \ yqueue.hpp \
z85_codec.hpp \ z85_codec.hpp \
address.cpp \ address.cpp \
...@@ -163,7 +164,9 @@ libzmq_la_SOURCES = \ ...@@ -163,7 +164,9 @@ libzmq_la_SOURCES = \
raw_decoder.hpp \ raw_decoder.hpp \
raw_decoder.cpp \ raw_decoder.cpp \
raw_encoder.hpp \ raw_encoder.hpp \
raw_encoder.cpp raw_encoder.cpp \
ypipe_conflate.hpp \
dbuffer.hpp
if ON_MINGW if ON_MINGW
libzmq_la_LDFLAGS = -no-undefined -avoid-version -version-info @LTVER@ @LIBZMQ_EXTRA_LDFLAGS@ libzmq_la_LDFLAGS = -no-undefined -avoid-version -version-info @LTVER@ @LIBZMQ_EXTRA_LDFLAGS@
......
...@@ -23,22 +23,37 @@ ...@@ -23,22 +23,37 @@
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp" #include "err.hpp"
#include "ypipe.hpp"
#include "ypipe_conflate.hpp"
int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
int hwms_ [2], bool delays_ [2]) int hwms_ [2], bool delays_ [2], bool conflate_ [2])
{ {
// Creates two pipe objects. These objects are connected by two ypipes, // Creates two pipe objects. These objects are connected by two ypipes,
// each to pass messages in one direction. // each to pass messages in one direction.
pipe_t::upipe_t *upipe1 = new (std::nothrow) pipe_t::upipe_t (); typedef ypipe_t <msg_t, message_pipe_granularity> upipe_normal_t;
typedef ypipe_conflate_t <msg_t, message_pipe_granularity> upipe_conflate_t;
pipe_t::upipe_t *upipe1;
if(conflate_ [0])
upipe1 = new (std::nothrow) upipe_conflate_t ();
else
upipe1 = new (std::nothrow) upipe_normal_t ();
alloc_assert (upipe1); alloc_assert (upipe1);
pipe_t::upipe_t *upipe2 = new (std::nothrow) pipe_t::upipe_t ();
pipe_t::upipe_t *upipe2;
if(conflate_ [1])
upipe2 = new (std::nothrow) upipe_conflate_t ();
else
upipe2 = new (std::nothrow) upipe_normal_t ();
alloc_assert (upipe2); alloc_assert (upipe2);
pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2, pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
hwms_ [1], hwms_ [0], delays_ [0]); hwms_ [1], hwms_ [0], delays_ [0], conflate_ [0]);
alloc_assert (pipes_ [0]); alloc_assert (pipes_ [0]);
pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1, pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
hwms_ [0], hwms_ [1], delays_ [1]); hwms_ [0], hwms_ [1], delays_ [1], conflate_ [1]);
alloc_assert (pipes_ [1]); alloc_assert (pipes_ [1]);
pipes_ [0]->set_peer (pipes_ [1]); pipes_ [0]->set_peer (pipes_ [1]);
...@@ -48,7 +63,7 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], ...@@ -48,7 +63,7 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
} }
zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
int inhwm_, int outhwm_, bool delay_) : int inhwm_, int outhwm_, bool delay_, bool conflate_) :
object_t (parent_), object_t (parent_),
inpipe (inpipe_), inpipe (inpipe_),
outpipe (outpipe_), outpipe (outpipe_),
...@@ -62,7 +77,8 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, ...@@ -62,7 +77,8 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
peer (NULL), peer (NULL),
sink (NULL), sink (NULL),
state (active), state (active),
delay (delay_) delay (delay_),
conflate (conflate_)
{ {
} }
...@@ -303,11 +319,16 @@ void zmq::pipe_t::process_pipe_term_ack () ...@@ -303,11 +319,16 @@ void zmq::pipe_t::process_pipe_term_ack ()
// First, delete all the unread messages in the pipe. We have to do it by // First, delete all the unread messages in the pipe. We have to do it by
// hand because msg_t doesn't have automatic destructor. Then deallocate // hand because msg_t doesn't have automatic destructor. Then deallocate
// the ypipe itself. // the ypipe itself.
msg_t msg;
while (inpipe->read (&msg)) { if (!conflate)
int rc = msg.close (); {
errno_assert (rc == 0); msg_t msg;
while (inpipe->read (&msg)) {
int rc = msg.close ();
errno_assert (rc == 0);
}
} }
delete inpipe; delete inpipe;
// Deallocate the pipe object // Deallocate the pipe object
...@@ -439,7 +460,13 @@ void zmq::pipe_t::hiccup () ...@@ -439,7 +460,13 @@ void zmq::pipe_t::hiccup ()
inpipe = NULL; inpipe = NULL;
// Create new inpipe. // Create new inpipe.
inpipe = new (std::nothrow) pipe_t::upipe_t (); if (conflate)
inpipe = new (std::nothrow)
ypipe_t <msg_t, message_pipe_granularity> ();
else
inpipe = new (std::nothrow)
ypipe_conflate_t <msg_t, message_pipe_granularity> ();
alloc_assert (inpipe); alloc_assert (inpipe);
in_active = true; in_active = true;
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#define __ZMQ_PIPE_HPP_INCLUDED__ #define __ZMQ_PIPE_HPP_INCLUDED__
#include "msg.hpp" #include "msg.hpp"
#include "ypipe.hpp" #include "ypipe_base.hpp"
#include "config.hpp" #include "config.hpp"
#include "object.hpp" #include "object.hpp"
#include "stdint.hpp" #include "stdint.hpp"
...@@ -40,8 +40,10 @@ namespace zmq ...@@ -40,8 +40,10 @@ namespace zmq
// Delay specifies how the pipe behaves when the peer terminates. If true // Delay specifies how the pipe behaves when the peer terminates. If true
// pipe receives all the pending messages before terminating, otherwise it // pipe receives all the pending messages before terminating, otherwise it
// terminates straight away. // terminates straight away.
// If conflate is true, only the most recently arrived message could be
// read (older messages are discarded)
int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2], int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2],
int hwms_ [2], bool delays_ [2]); int hwms_ [2], bool delays_ [2], bool conflate_ [2]);
struct i_pipe_events struct i_pipe_events
{ {
...@@ -65,7 +67,8 @@ namespace zmq ...@@ -65,7 +67,8 @@ namespace zmq
{ {
// This allows pipepair to create pipe objects. // This allows pipepair to create pipe objects.
friend int pipepair (zmq::object_t *parents_ [2], friend int pipepair (zmq::object_t *parents_ [2],
zmq::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]); zmq::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2],
bool conflate_ [2]);
public: public:
...@@ -110,7 +113,7 @@ namespace zmq ...@@ -110,7 +113,7 @@ namespace zmq
private: private:
// Type of the underlying lock-free pipe. // Type of the underlying lock-free pipe.
typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t; typedef ypipe_base_t <msg_t, message_pipe_granularity> upipe_t;
// Command handlers. // Command handlers.
void process_activate_read (); void process_activate_read ();
...@@ -125,7 +128,7 @@ namespace zmq ...@@ -125,7 +128,7 @@ namespace zmq
// Constructor is private. Pipe can only be created using // Constructor is private. Pipe can only be created using
// pipepair function. // pipepair function.
pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
int inhwm_, int outhwm_, bool delay_); int inhwm_, int outhwm_, bool delay_, bool conflate_);
// Pipepair uses this function to let us know about // Pipepair uses this function to let us know about
// the peer pipe object. // the peer pipe object.
...@@ -196,6 +199,8 @@ namespace zmq ...@@ -196,6 +199,8 @@ namespace zmq
// Computes appropriate low watermark from the given high watermark. // Computes appropriate low watermark from the given high watermark.
static int compute_lwm (int hwm_); static int compute_lwm (int hwm_);
bool conflate;
// Disable copying. // Disable copying.
pipe_t (const pipe_t&); pipe_t (const pipe_t&);
const pipe_t &operator = (const pipe_t&); const pipe_t &operator = (const pipe_t&);
......
...@@ -300,7 +300,8 @@ int zmq::session_base_t::zap_connect () ...@@ -300,7 +300,8 @@ int zmq::session_base_t::zap_connect ()
pipe_t *new_pipes [2] = {NULL, NULL}; pipe_t *new_pipes [2] = {NULL, NULL};
int hwms [2] = {0, 0}; int hwms [2] = {0, 0};
bool delays [2] = {false, false}; bool delays [2] = {false, false};
int rc = pipepair (parents, new_pipes, hwms, delays); bool conflates [2] = {false, false};
int rc = pipepair (parents, new_pipes, hwms, delays, conflates);
errno_assert (rc == 0); errno_assert (rc == 0);
// Attach local end of the pipe to this socket object. // Attach local end of the pipe to this socket object.
...@@ -331,9 +332,19 @@ void zmq::session_base_t::process_attach (i_engine *engine_) ...@@ -331,9 +332,19 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
if (!pipe && !is_terminating ()) { if (!pipe && !is_terminating ()) {
object_t *parents [2] = {this, socket}; object_t *parents [2] = {this, socket};
pipe_t *pipes [2] = {NULL, NULL}; pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.rcvhwm, options.sndhwm};
bool conflate = options.conflate &&
(options.type == ZMQ_DEALER ||
options.type == ZMQ_PULL ||
options.type == ZMQ_PUSH ||
options.type == ZMQ_PUB ||
options.type == ZMQ_SUB);
int hwms [2] = {conflate? -1 : options.rcvhwm,
conflate? -1 : options.sndhwm};
bool delays [2] = {options.delay_on_close, options.delay_on_disconnect}; bool delays [2] = {options.delay_on_close, options.delay_on_disconnect};
int rc = pipepair (parents, pipes, hwms, delays); bool conflates [2] = {conflate, conflate};
int rc = pipepair (parents, pipes, hwms, delays, conflates);
errno_assert (rc == 0); errno_assert (rc == 0);
// Plug the local end of the pipe. // Plug the local end of the pipe.
......
...@@ -450,9 +450,18 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -450,9 +450,18 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create a bi-directional pipe to connect the peers. // Create a bi-directional pipe to connect the peers.
object_t *parents [2] = {this, peer.socket}; object_t *parents [2] = {this, peer.socket};
pipe_t *new_pipes [2] = {NULL, NULL}; pipe_t *new_pipes [2] = {NULL, NULL};
int hwms [2] = {sndhwm, rcvhwm};
bool conflate = options.conflate &&
(options.type == ZMQ_DEALER ||
options.type == ZMQ_PULL ||
options.type == ZMQ_PUSH ||
options.type == ZMQ_PUB ||
options.type == ZMQ_SUB);
int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
int rc = pipepair (parents, new_pipes, hwms, delays); bool conflates [2] = {conflate, conflate};
int rc = pipepair (parents, new_pipes, hwms, delays, conflates);
errno_assert (rc == 0); errno_assert (rc == 0);
// Attach local end of the pipe to this socket object. // Attach local end of the pipe to this socket object.
...@@ -553,9 +562,19 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -553,9 +562,19 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create a bi-directional pipe. // Create a bi-directional pipe.
object_t *parents [2] = {this, session}; object_t *parents [2] = {this, session};
pipe_t *new_pipes [2] = {NULL, NULL}; pipe_t *new_pipes [2] = {NULL, NULL};
int hwms [2] = {options.sndhwm, options.rcvhwm};
bool conflate = options.conflate &&
(options.type == ZMQ_DEALER ||
options.type == ZMQ_PULL ||
options.type == ZMQ_PUSH ||
options.type == ZMQ_PUB ||
options.type == ZMQ_SUB);
int hwms [2] = {conflate? -1 : options.sndhwm,
conflate? -1 : options.rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
rc = pipepair (parents, new_pipes, hwms, delays); bool conflates [2] = {conflate, conflate};
rc = pipepair (parents, new_pipes, hwms, delays, conflates);
errno_assert (rc == 0); errno_assert (rc == 0);
// Attach local end of the pipe to the socket object. // Attach local end of the pipe to the socket object.
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "atomic_ptr.hpp" #include "atomic_ptr.hpp"
#include "yqueue.hpp" #include "yqueue.hpp"
#include "platform.hpp" #include "platform.hpp"
#include "ypipe_base.hpp"
namespace zmq namespace zmq
{ {
...@@ -34,7 +35,7 @@ namespace zmq ...@@ -34,7 +35,7 @@ namespace zmq
// N is granularity of the pipe, i.e. how many items are needed to // N is granularity of the pipe, i.e. how many items are needed to
// perform next memory allocation. // perform next memory allocation.
template <typename T, int N> class ypipe_t template <typename T, int N> class ypipe_t : public ypipe_base_t<T,N>
{ {
public: public:
......
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_YPIPE_BASE_HPP_INCLUDED__
#define __ZMQ_YPIPE_BASE_HPP_INCLUDED__
namespace zmq
{
// ypipe_base abstracts ypipe and ypipe_conflate specific
// classes, one is selected according to a the conflate
// socket option
template <typename T, int N> class ypipe_base_t
{
public:
virtual ~ypipe_base_t () {}
virtual void write (const T &value_, bool incomplete_) = 0;
virtual bool unwrite (T *value_) = 0;
virtual bool flush () = 0;
virtual bool check_read () = 0;
virtual bool read (T *value_) = 0;
virtual bool probe (bool (*fn)(T &)) = 0;
};
}
#endif
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_YPIPE_CONFLATE_HPP_INCLUDED__
#define __ZMQ_YPIPE_CONFLATE_HPP_INCLUDED__
#include "platform.hpp"
#include "dbuffer.hpp"
#include "ypipe_base.hpp"
namespace zmq
{
// Adapter for dbuffer, to plug it in instead of a queue for the sake
// of implementing the conflate socket option, which, if set, makes
// the receiving side to discard all incoming messages but the last one.
//
// reader_awake flag is needed here to mimic ypipe delicate behaviour
// around the reader being asleep (see 'c' pointer being NULL in ypipe.hpp)
template <typename T, int N> class ypipe_conflate_t : public ypipe_base_t<T,N>
{
public:
// Initialises the pipe.
inline ypipe_conflate_t ()
: reader_awake(false)
{
}
// The destructor doesn't have to be virtual. It is mad virtual
// just to keep ICC and code checking tools from complaining.
inline virtual ~ypipe_conflate_t ()
{
}
// Following function (write) deliberately copies uninitialised data
// when used with zmq_msg. Initialising the VSM body for
// non-VSM messages won't be good for performance.
#ifdef ZMQ_HAVE_OPENVMS
#pragma message save
#pragma message disable(UNINIT)
#endif
inline void write (const T &value_, bool incomplete_)
{
(void)incomplete_;
dbuffer.write (value_);
}
#ifdef ZMQ_HAVE_OPENVMS
#pragma message restore
#endif
// There are no incomplete items for conflate ypipe
inline bool unwrite (T *value_)
{
return false;
}
// Flush is no-op for conflate ypipe. Reader asleep behaviour
// is as of the usual ypipe.
// Returns false if the reader thread is sleeping. In that case,
// caller is obliged to wake the reader up before using the pipe again.
inline bool flush ()
{
return reader_awake;
}
// Check whether item is available for reading.
inline bool check_read ()
{
bool res = dbuffer.check_read ();
if (!res)
reader_awake = false;
return res;
}
// Reads an item from the pipe. Returns false if there is no value.
// available.
inline bool read (T *value_)
{
if (!check_read ())
return false;
return dbuffer.read (value_);
}
// Applies the function fn to the first elemenent in the pipe
// and returns the value returned by the fn.
// The pipe mustn't be empty or the function crashes.
inline bool probe (bool (*fn)(T &))
{
return dbuffer.probe (fn);
}
protected:
dbuffer_t <T> dbuffer;
bool reader_awake;
// Disable copying of ypipe object.
ypipe_conflate_t (const ypipe_conflate_t&);
const ypipe_conflate_t &operator = (const ypipe_conflate_t&);
};
}
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment