Commit e04e2cdb authored by Martin Sustrik's avatar Martin Sustrik

rollback functionality added to pipe

parent 9481c69b
...@@ -146,6 +146,19 @@ bool zmq::writer_t::write (zmq_msg_t *msg_) ...@@ -146,6 +146,19 @@ bool zmq::writer_t::write (zmq_msg_t *msg_)
// TODO: Adjust size of the pipe. // TODO: Adjust size of the pipe.
} }
void zmq::writer_t::rollback ()
{
while (true) {
zmq_msg_t msg;
if (!pipe->unwrite (&msg))
break;
zmq_msg_close (&msg);
}
// TODO: We don't have to inform the reader side of the pipe about
// the event. We'll simply adjust the pipe size and keep calm.
}
void zmq::writer_t::flush () void zmq::writer_t::flush ()
{ {
if (!pipe->flush ()) if (!pipe->flush ())
......
...@@ -100,6 +100,9 @@ namespace zmq ...@@ -100,6 +100,9 @@ namespace zmq
// message cannot be written because high watermark was reached. // message cannot be written because high watermark was reached.
bool write (zmq_msg_t *msg_); bool write (zmq_msg_t *msg_);
// Remove any unflushed messages from the pipe.
void rollback ();
// Flush the messages downsteam. // Flush the messages downsteam.
void flush (); void flush ();
......
...@@ -78,6 +78,17 @@ namespace zmq ...@@ -78,6 +78,17 @@ namespace zmq
#pragma message restore #pragma message restore
#endif #endif
// Pop an unflushed message from the pipe. Returns true is such
// message exists, false otherwise.
inline bool unwrite (T *value_)
{
if (w == &queue.back ())
return false;
*value_ = queue.back ();
queue.unpush ();
return true;
}
// Flush the messages into the pipe. Returns false if the reader // Flush the messages into the pipe. Returns false if the reader
// thread is sleeping. In that case, caller is obliged to wake the // thread is sleeping. In that case, caller is obliged to wake the
// reader up before using the pipe again. // reader up before using the pipe again.
......
...@@ -102,20 +102,54 @@ namespace zmq ...@@ -102,20 +102,54 @@ namespace zmq
chunk_t *sc = spare_chunk.xchg (NULL); chunk_t *sc = spare_chunk.xchg (NULL);
if (sc) { if (sc) {
end_chunk->next = sc; end_chunk->next = sc;
sc->prev = end_chunk;
} else { } else {
end_chunk->next = (chunk_t*) malloc (sizeof (chunk_t)); end_chunk->next = (chunk_t*) malloc (sizeof (chunk_t));
zmq_assert (end_chunk->next); zmq_assert (end_chunk->next);
end_chunk->next->prev = end_chunk;
} }
end_chunk = end_chunk->next; end_chunk = end_chunk->next;
end_pos = 0; end_pos = 0;
} }
// Removes element from the back end of the queue. In other words
// it rollbacks last push to the queue. Take care: Caller is
// responsible for destroying the object being unpushed.
// The caller must also guarantee that the queue isn't empty when
// unpush is called. It cannot be done automatically as the read
// side of the queue can be managed by different, completely
// unsynchronised thread.
inline void unpush ()
{
// First, move 'back' one position backwards.
if (back_pos)
--back_pos;
else {
back_pos = N - 1;
back_chunk = back_chunk->prev;
}
// Now, move 'end' position backwards. Note that obsolete end chunk
// is not used as a spare chunk. The analysis shows that doing so
// would require free and atomic operation per chunk deallocated
// instead of a simple free.
if (end_pos)
--end_pos;
else {
end_pos = N - 1;
end_chunk = end_chunk->prev;
free (end_chunk->next);
end_chunk->next = NULL;
}
}
// Removes an element from the front end of the queue. // Removes an element from the front end of the queue.
inline void pop () inline void pop ()
{ {
if (++ begin_pos == N) { if (++ begin_pos == N) {
chunk_t *o = begin_chunk; chunk_t *o = begin_chunk;
begin_chunk = begin_chunk->next; begin_chunk = begin_chunk->next;
begin_chunk->prev = NULL;
begin_pos = 0; begin_pos = 0;
// 'o' has been more recently used than spare_chunk, // 'o' has been more recently used than spare_chunk,
...@@ -133,6 +167,7 @@ namespace zmq ...@@ -133,6 +167,7 @@ namespace zmq
struct chunk_t struct chunk_t
{ {
T values [N]; T values [N];
chunk_t *prev;
chunk_t *next; chunk_t *next;
}; };
...@@ -149,7 +184,7 @@ namespace zmq ...@@ -149,7 +184,7 @@ namespace zmq
// People are likely to produce and consume at similar rates. In // People are likely to produce and consume at similar rates. In
// this scenario holding onto the most recently freed chunk saves // this scenario holding onto the most recently freed chunk saves
// us from having to call new/delete. // us from having to call malloc/free.
atomic_ptr_t<chunk_t> spare_chunk; atomic_ptr_t<chunk_t> spare_chunk;
// Disable copying of yqueue. // Disable copying of yqueue.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment