Commit 7773fddd authored by Martin Sustrik's avatar Martin Sustrik

Merge branch 'master' of git@github.com:sustrik/zeromq2

parents 091e92a1 89783c37
......@@ -162,7 +162,7 @@ bool zmq::writer_t::write (zmq_msg_t *msg_)
return false;
}
pipe->write (*msg_);
pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE);
if (!(msg_->flags & ZMQ_MSG_MORE))
msgs_written++;
return true;
......@@ -172,11 +172,9 @@ void zmq::writer_t::rollback ()
{
zmq_msg_t msg;
// Remove all incomplete messages from the pipe.
while (pipe->unwrite (&msg)) {
if (!(msg.flags & ZMQ_MSG_MORE)) {
pipe->write (msg);
break;
}
zmq_assert (msg.flags & ZMQ_MSG_MORE);
zmq_msg_close (&msg);
msgs_written--;
}
......@@ -206,7 +204,7 @@ void zmq::writer_t::term ()
const unsigned char *offset = 0;
msg.content = (void*) (offset + ZMQ_DELIMITER);
msg.flags = 0;
pipe->write (msg);
pipe->write (msg, false);
pipe->flush ();
}
......
......@@ -31,7 +31,7 @@ namespace zmq
// Only a single thread can read from the pipe at any specific moment.
// Only a single thread can write to the pipe at any specific moment.
// T is the type of the object in the queue.
// N is granularity of the pipe, i.e. how many messages are needed to
// N is granularity of the pipe, i.e. how many items are needed to
// perform next memory allocation.
template <typename T, int N> class ypipe_t
......@@ -46,7 +46,7 @@ namespace zmq
// Let all the pointers to point to the terminator.
// (unless pipe is dead, in which case c is set to NULL).
r = w = &queue.back ();
r = w = f = &queue.back ();
c.set (&queue.back ());
}
......@@ -59,54 +59,61 @@ namespace zmq
#pragma message disable(UNINIT)
#endif
// Write an item to the pipe. Don't flush it yet.
inline void write (const T &value_)
// Write an item to the pipe. Don't flush it yet. If incomplete is
// set to true the item is assumed to be continued by items
// subsequently written to the pipe. Incomplete items are never
// flushed down the stream.
inline void write (const T &value_, bool incomplete_)
{
// Place the value to the queue, add new terminator element.
queue.back () = value_;
queue.push ();
// Move the "flush up to here" poiter.
if (!incomplete_)
f = &queue.back ();
}
#ifdef ZMQ_HAVE_OPENVMS
#pragma message restore
#endif
// Pop an unflushed message from the pipe. Returns true is such
// message exists, false otherwise.
// Pop an incomplete item from the pipe. Returns true is such
// item exists, false otherwise.
inline bool unwrite (T *value_)
{
if (w == &queue.back ())
if (f == &queue.back ())
return false;
queue.unpush ();
*value_ = queue.back ();
return true;
}
// Flush the messages into the pipe. Returns false if the reader
// thread is sleeping. In that case, caller is obliged to wake the
// reader up before using the pipe again.
// Flush all the completed items into the pipe. Returns false if
// the reader thread is sleeping. In that case, caller is obliged to
// wake the reader up before using the pipe again.
inline bool flush ()
{
// If there are no un-flushed items, do nothing.
if (w == &queue.back ())
if (w == f)
return true;
// Try to set 'c' to 'back'
if (c.cas (w, &queue.back ()) != w) {
// Try to set 'c' to 'f'.
if (c.cas (w, f) != w) {
// Compare-and-swap was unseccessful because 'c' is NULL.
// This means that the reader is asleep. Therefore we don't
// care about thread-safeness and update c in non-atomic
// manner. We'll return false to let the caller know
// that reader is sleeping.
c.set (&queue.back ());
w = &queue.back ();
c.set (f);
w = f;
return false;
}
// Reader is alive. Nothing special to do now. Just move
// the 'first un-flushed item' pointer to the end of the queue.
w = &queue.back ();
// the 'first un-flushed item' pointer to 'f'.
w = f;
return true;
}
......@@ -125,7 +132,7 @@ namespace zmq
// If there are no elements prefetched, exit.
// During pipe's lifetime r should never be NULL, however,
// it can happen during pipe shutdown when messages
// it can happen during pipe shutdown when items
// are being deallocated.
if (&queue.front () == r || !r)
return false;
......@@ -165,6 +172,9 @@ namespace zmq
// exclusively by reader thread.
T *r;
// Points to the first item to be flushed in the future.
T *f;
// The single point of contention between writer and reader thread.
// Points past the last flushed item. If it is NULL,
// reader is asleep. This pointer should be always accessed using
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment