Commit 27e83cc5 authored by Mikko Koppanen's avatar Mikko Koppanen Committed by Martin Sustrik

Fixes assertion on pipe.cpp:237 when swap fills up.

Fixes swap::full () functionality
Signed-off-by: 's avatarMikko Koppanen <mkoppanen@php.net>
parent a46980ba
...@@ -128,8 +128,15 @@ bool zmq::lb_t::has_out () ...@@ -128,8 +128,15 @@ bool zmq::lb_t::has_out ()
return true; return true;
while (active > 0) { while (active > 0) {
if (pipes [current]->check_write ())
// Check whether zero-sized message can be written to the pipe.
zmq_msg_t msg;
zmq_msg_init (&msg);
if (pipes [current]->check_write (&msg)) {
zmq_msg_close (&msg);
return true; return true;
}
zmq_msg_close (&msg);
// Deactivate the pipe. // Deactivate the pipe.
active--; active--;
......
...@@ -170,7 +170,10 @@ bool zmq::pair_t::xhas_out () ...@@ -170,7 +170,10 @@ bool zmq::pair_t::xhas_out ()
if (!outpipe || !outpipe_alive) if (!outpipe || !outpipe_alive)
return false; return false;
outpipe_alive = outpipe->check_write (); zmq_msg_t msg;
zmq_msg_init (&msg);
outpipe_alive = outpipe->check_write (&msg);
zmq_msg_close (&msg);
return outpipe_alive; return outpipe_alive;
} }
...@@ -200,15 +200,15 @@ void zmq::writer_t::set_event_sink (i_writer_events *sink_) ...@@ -200,15 +200,15 @@ void zmq::writer_t::set_event_sink (i_writer_events *sink_)
sink = sink_; sink = sink_;
} }
bool zmq::writer_t::check_write () bool zmq::writer_t::check_write (zmq_msg_t *msg_)
{ {
// We've already checked and there's no space free for the new message. // We've already checked and there's no space free for the new message.
// There's no point in checking once again. // There's no point in checking once again.
if (unlikely (!active)) if (unlikely (!active))
return false; return false;
if (unlikely (swapping)) { if (unlikely (swapping)) {
if (unlikely (swap->full ())) { if (unlikely (!swap->fits (msg_))) {
active = false; active = false;
return false; return false;
} }
...@@ -229,7 +229,7 @@ bool zmq::writer_t::check_write () ...@@ -229,7 +229,7 @@ bool zmq::writer_t::check_write ()
bool zmq::writer_t::write (zmq_msg_t *msg_) bool zmq::writer_t::write (zmq_msg_t *msg_)
{ {
if (unlikely (!check_write ())) if (unlikely (!check_write (msg_)))
return false; return false;
if (unlikely (swapping)) { if (unlikely (swapping)) {
......
...@@ -134,10 +134,10 @@ namespace zmq ...@@ -134,10 +134,10 @@ namespace zmq
// Specifies the object to get events from the writer. // Specifies the object to get events from the writer.
void set_event_sink (i_writer_events *endpoint_); void set_event_sink (i_writer_events *endpoint_);
// Checks whether a message can be written to the pipe. // Checks whether messages can be written to the pipe.
// If writing the message would cause high watermark and (optionally) // If writing the message would cause high watermark and (optionally)
// swap to be exceeded, the function returns false. // if the swap is full, the function returns false.
bool check_write (); bool check_write (zmq_msg_t *msg_);
// Writes a message to the underlying pipe. Returns false if the // Writes a message to the underlying pipe. Returns false if the
// message cannot be written because high watermark was reached. // message cannot be written because high watermark was reached.
......
...@@ -189,10 +189,23 @@ bool zmq::swap_t::empty () ...@@ -189,10 +189,23 @@ bool zmq::swap_t::empty ()
return read_pos == write_pos; return read_pos == write_pos;
} }
/*
bool zmq::swap_t::full () bool zmq::swap_t::full ()
{ {
return buffer_space () == 1; // Check that at least the message size can be written to the swap.
return buffer_space () < (int64_t) (sizeof (size_t) + 1);
} }
*/
bool zmq::swap_t::fits (zmq_msg_t *msg_)
{
// Check whether whole binary representation of the message
// fits into the swap.
size_t msg_size = zmq_msg_size (msg_);
if (buffer_space () <= (int64_t) (sizeof msg_size + 1 + msg_size))
return false;
return true;
}
void zmq::swap_t::copy_from_file (void *buffer_, size_t count_) void zmq::swap_t::copy_from_file (void *buffer_, size_t count_)
{ {
......
...@@ -59,8 +59,12 @@ namespace zmq ...@@ -59,8 +59,12 @@ namespace zmq
// Returns true if the swap is empty; false otherwise. // Returns true if the swap is empty; false otherwise.
bool empty (); bool empty ();
// Returns true if and only if the swap is full.
bool full (); // // Returns true if and only if the swap is full.
// bool full ();
// Returns true if the message fits into swap.
bool fits (zmq_msg_t *msg_);
private: private:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment