Commit 89783c37 authored by Martin Sustrik's avatar Martin Sustrik

incomplete messages can be stored in ypipe

parent f40ce4e5
...@@ -162,7 +162,7 @@ bool zmq::writer_t::write (zmq_msg_t *msg_) ...@@ -162,7 +162,7 @@ bool zmq::writer_t::write (zmq_msg_t *msg_)
return false; return false;
} }
pipe->write (*msg_); pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE);
if (!(msg_->flags & ZMQ_MSG_MORE)) if (!(msg_->flags & ZMQ_MSG_MORE))
msgs_written++; msgs_written++;
return true; return true;
...@@ -172,11 +172,9 @@ void zmq::writer_t::rollback () ...@@ -172,11 +172,9 @@ void zmq::writer_t::rollback ()
{ {
zmq_msg_t msg; zmq_msg_t msg;
// Remove all incomplete messages from the pipe.
while (pipe->unwrite (&msg)) { while (pipe->unwrite (&msg)) {
if (!(msg.flags & ZMQ_MSG_MORE)) { zmq_assert (msg.flags & ZMQ_MSG_MORE);
pipe->write (msg);
break;
}
zmq_msg_close (&msg); zmq_msg_close (&msg);
msgs_written--; msgs_written--;
} }
...@@ -206,7 +204,7 @@ void zmq::writer_t::term () ...@@ -206,7 +204,7 @@ void zmq::writer_t::term ()
const unsigned char *offset = 0; const unsigned char *offset = 0;
msg.content = (void*) (offset + ZMQ_DELIMITER); msg.content = (void*) (offset + ZMQ_DELIMITER);
msg.flags = 0; msg.flags = 0;
pipe->write (msg); pipe->write (msg, false);
pipe->flush (); pipe->flush ();
} }
......
...@@ -31,7 +31,7 @@ namespace zmq ...@@ -31,7 +31,7 @@ namespace zmq
// Only a single thread can read from the pipe at any specific moment. // Only a single thread can read from the pipe at any specific moment.
// Only a single thread can write to the pipe at any specific moment. // Only a single thread can write to the pipe at any specific moment.
// T is the type of the object in the queue. // T is the type of the object in the queue.
// N is granularity of the pipe, i.e. how many messages are needed to // N is granularity of the pipe, i.e. how many items are needed to
// perform next memory allocation. // perform next memory allocation.
template <typename T, int N> class ypipe_t template <typename T, int N> class ypipe_t
...@@ -46,7 +46,7 @@ namespace zmq ...@@ -46,7 +46,7 @@ namespace zmq
// Let all the pointers to point to the terminator. // Let all the pointers to point to the terminator.
// (unless pipe is dead, in which case c is set to NULL). // (unless pipe is dead, in which case c is set to NULL).
r = w = &queue.back (); r = w = f = &queue.back ();
c.set (&queue.back ()); c.set (&queue.back ());
} }
...@@ -59,54 +59,61 @@ namespace zmq ...@@ -59,54 +59,61 @@ namespace zmq
#pragma message disable(UNINIT) #pragma message disable(UNINIT)
#endif #endif
// Write an item to the pipe. Don't flush it yet. // Write an item to the pipe. Don't flush it yet. If incomplete is
inline void write (const T &value_) // set to true the item is assumed to be continued by items
// subsequently written to the pipe. Incomplete items are never
// flushed down the stream.
inline void write (const T &value_, bool incomplete_)
{ {
// Place the value to the queue, add new terminator element. // Place the value to the queue, add new terminator element.
queue.back () = value_; queue.back () = value_;
queue.push (); queue.push ();
// Move the "flush up to here" poiter.
if (!incomplete_)
f = &queue.back ();
} }
#ifdef ZMQ_HAVE_OPENVMS #ifdef ZMQ_HAVE_OPENVMS
#pragma message restore #pragma message restore
#endif #endif
// Pop an unflushed message from the pipe. Returns true is such // Pop an incomplete item from the pipe. Returns true is such
// message exists, false otherwise. // item exists, false otherwise.
inline bool unwrite (T *value_) inline bool unwrite (T *value_)
{ {
if (w == &queue.back ()) if (f == &queue.back ())
return false; return false;
queue.unpush (); queue.unpush ();
*value_ = queue.back (); *value_ = queue.back ();
return true; return true;
} }
// Flush the messages into the pipe. Returns false if the reader // Flush all the completed items into the pipe. Returns false if
// thread is sleeping. In that case, caller is obliged to wake the // the reader thread is sleeping. In that case, caller is obliged to
// reader up before using the pipe again. // wake the reader up before using the pipe again.
inline bool flush () inline bool flush ()
{ {
// If there are no un-flushed items, do nothing. // If there are no un-flushed items, do nothing.
if (w == &queue.back ()) if (w == f)
return true; return true;
// Try to set 'c' to 'back' // Try to set 'c' to 'f'.
if (c.cas (w, &queue.back ()) != w) { if (c.cas (w, f) != w) {
// Compare-and-swap was unseccessful because 'c' is NULL. // Compare-and-swap was unseccessful because 'c' is NULL.
// This means that the reader is asleep. Therefore we don't // This means that the reader is asleep. Therefore we don't
// care about thread-safeness and update c in non-atomic // care about thread-safeness and update c in non-atomic
// manner. We'll return false to let the caller know // manner. We'll return false to let the caller know
// that reader is sleeping. // that reader is sleeping.
c.set (&queue.back ()); c.set (f);
w = &queue.back (); w = f;
return false; return false;
} }
// Reader is alive. Nothing special to do now. Just move // Reader is alive. Nothing special to do now. Just move
// the 'first un-flushed item' pointer to the end of the queue. // the 'first un-flushed item' pointer to 'f'.
w = &queue.back (); w = f;
return true; return true;
} }
...@@ -125,7 +132,7 @@ namespace zmq ...@@ -125,7 +132,7 @@ namespace zmq
// If there are no elements prefetched, exit. // If there are no elements prefetched, exit.
// During pipe's lifetime r should never be NULL, however, // During pipe's lifetime r should never be NULL, however,
// it can happen during pipe shutdown when messages // it can happen during pipe shutdown when items
// are being deallocated. // are being deallocated.
if (&queue.front () == r || !r) if (&queue.front () == r || !r)
return false; return false;
...@@ -165,6 +172,9 @@ namespace zmq ...@@ -165,6 +172,9 @@ namespace zmq
// exclusively by reader thread. // exclusively by reader thread.
T *r; T *r;
// Points to the first item to be flushed in the future.
T *f;
// The single point of contention between writer and reader thread. // The single point of contention between writer and reader thread.
// Points past the last flushed item. If it is NULL, // Points past the last flushed item. If it is NULL,
// reader is asleep. This pointer should be always accessed using // reader is asleep. This pointer should be always accessed using
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment