Commit 2f219d7c authored by Martin Sustrik's avatar Martin Sustrik

ZMQ_TBC renamed to ZMQ_MORE

parent 842b4dd2
......@@ -105,7 +105,7 @@ ZMQ_EXPORT const char *zmq_strerror (int errnum);
// Message flags. ZMQ_MSG_SHARED is strictly speaking not a message flag
// (it has no equivalent in the wire format), however, making it a flag
// allows us to pack the stucture tigher and thus improve performance.
#define ZMQ_MSG_TBC 1
#define ZMQ_MSG_MORE 1
#define ZMQ_MSG_SHARED 128
// A message. Note that 'content' is not a pointer to the raw data.
......@@ -181,7 +181,7 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_RCVBUF 12
#define ZMQ_NOBLOCK 1
#define ZMQ_TBC 2
#define ZMQ_MORE 2
ZMQ_EXPORT void *zmq_socket (void *context, int type);
ZMQ_EXPORT int zmq_close (void *s);
......
......@@ -26,7 +26,7 @@
zmq::fq_t::fq_t () :
active (0),
current (0),
tbc (false)
more (false)
{
}
......@@ -45,7 +45,7 @@ void zmq::fq_t::attach (reader_t *pipe_)
void zmq::fq_t::detach (reader_t *pipe_)
{
zmq_assert (!tbc || pipes [current] != pipe_);
zmq_assert (!more || pipes [current] != pipe_);
// Remove the pipe from the list; adjust number of active pipes
// accordingly.
......@@ -84,14 +84,14 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)
// Try to fetch new message. If we've already read part of the message
// subsequent part should be immediately available.
bool fetched = pipes [current]->read (msg_);
zmq_assert (!(tbc && !fetched));
zmq_assert (!(more && !fetched));
// Note that when message is not fetched, current pipe is killed and
// replaced by another active pipe. Thus we don't have to increase
// the 'current' pointer.
if (fetched) {
tbc = msg_->flags & ZMQ_MSG_TBC;
if (!tbc) {
more = msg_->flags & ZMQ_MSG_MORE;
if (!more) {
current++;
if (current >= active)
current = 0;
......@@ -110,7 +110,7 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)
bool zmq::fq_t::has_in ()
{
// There are subsequent parts of the partly-read message available.
if (tbc)
if (more)
return true;
// Note that messing with current doesn't break the fairness of fair
......
......@@ -57,7 +57,7 @@ namespace zmq
// If true, part of a multipart message was already received, but
// there are following parts still waiting in the current pipe.
bool tbc;
bool more;
fq_t (const fq_t&);
void operator = (const fq_t&);
......
......@@ -26,7 +26,7 @@
zmq::lb_t::lb_t () :
active (0),
current (0),
tbc (false)
more (false)
{
}
......@@ -45,7 +45,7 @@ void zmq::lb_t::attach (writer_t *pipe_)
void zmq::lb_t::detach (writer_t *pipe_)
{
zmq_assert (!tbc || pipes [current] != pipe_);
zmq_assert (!more || pipes [current] != pipe_);
// Remove the pipe from the list; adjust number of active pipes
// accordingly.
......@@ -68,11 +68,11 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
{
while (active > 0) {
if (pipes [current]->write (msg_)) {
tbc = msg_->flags & ZMQ_MSG_TBC;
more = msg_->flags & ZMQ_MSG_MORE;
break;
}
zmq_assert (!tbc);
zmq_assert (!more);
active--;
if (current < active)
pipes.swap (current, active);
......@@ -88,7 +88,7 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
// If it's final part of the message we can fluch it downstream and
// continue round-robinning (load balance).
if (!tbc) {
if (!more) {
pipes [current]->flush ();
current = (current + 1) % active;
}
......@@ -104,7 +104,7 @@ bool zmq::lb_t::has_out ()
{
// If one part of the message was already written we can definitely
// write the rest of the message.
if (tbc)
if (more)
return true;
while (active > 0) {
......
......@@ -54,7 +54,7 @@ namespace zmq
pipes_t::size_type current;
// True if last we are in the middle of a multipart message.
bool tbc;
bool more;
lb_t (const lb_t&);
void operator = (const lb_t&);
......
......@@ -77,7 +77,7 @@ bool zmq::reader_t::read (zmq_msg_t *msg_)
return false;
}
if (!(msg_->flags & ZMQ_MSG_TBC))
if (!(msg_->flags & ZMQ_MSG_MORE))
msgs_read++;
if (lwm > 0 && msgs_read % lwm == 0)
......@@ -163,7 +163,7 @@ bool zmq::writer_t::write (zmq_msg_t *msg_)
}
pipe->write (*msg_);
if (!(msg_->flags & ZMQ_MSG_TBC))
if (!(msg_->flags & ZMQ_MSG_MORE))
msgs_written++;
return true;
}
......@@ -173,7 +173,7 @@ void zmq::writer_t::rollback ()
zmq_msg_t msg;
while (pipe->unwrite (&msg)) {
if (!(msg.flags & ZMQ_MSG_TBC)) {
if (!(msg.flags & ZMQ_MSG_MORE)) {
pipe->write (msg);
break;
}
......
......@@ -170,7 +170,7 @@ bool zmq::pub_t::write (class writer_t *pipe_, zmq_msg_t *msg_)
pipes.swap (pipes.index (pipe_), active);
return false;
}
if (!(msg_->flags & ZMQ_MSG_TBC))
if (!(msg_->flags & ZMQ_MSG_MORE))
pipe_->flush ();
return true;
}
......
......@@ -28,7 +28,7 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) :
active (0),
current (0),
sending_reply (false),
tbc (false),
more (false),
reply_pipe (NULL)
{
options.requires_in = true;
......@@ -59,7 +59,7 @@ void zmq::rep_t::xattach_pipes (class reader_t *inpipe_,
void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)
{
zmq_assert (sending_reply || !tbc || in_pipes [current] != pipe_);
zmq_assert (sending_reply || !more || in_pipes [current] != pipe_);
zmq_assert (pipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
......@@ -93,7 +93,7 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)
void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_)
{
zmq_assert (!sending_reply || !tbc || reply_pipe != pipe_);
zmq_assert (!sending_reply || !more || reply_pipe != pipe_);
zmq_assert (pipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
......@@ -168,13 +168,13 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
}
// Check whether it's last part of the reply.
tbc = msg_->flags & ZMQ_MSG_TBC;
more = msg_->flags & ZMQ_MSG_MORE;
if (reply_pipe) {
// Push message to the reply pipe.
bool written = reply_pipe->write (msg_);
zmq_assert (!tbc || written);
zmq_assert (!more || written);
// The pipe is full...
// TODO: Tear down the underlying connection (?)
......@@ -187,7 +187,7 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
}
// Flush the reply to the requester.
if (!tbc) {
if (!more) {
reply_pipe->flush ();
sending_reply = false;
reply_pipe = NULL;
......@@ -213,11 +213,11 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
// Round-robin over the pipes to get next message.
for (int count = active; count != 0; count--) {
bool fetched = in_pipes [current]->read (msg_);
zmq_assert (!(tbc && !fetched));
zmq_assert (!(more && !fetched));
if (fetched) {
tbc = msg_->flags & ZMQ_MSG_TBC;
if (!tbc) {
more = msg_->flags & ZMQ_MSG_MORE;
if (!more) {
reply_pipe = out_pipes [current];
sending_reply = true;
current++;
......@@ -237,7 +237,7 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
bool zmq::rep_t::xhas_in ()
{
if (!sending_reply && tbc)
if (!sending_reply && more)
return true;
for (int count = active; count != 0; count--) {
......@@ -253,7 +253,7 @@ bool zmq::rep_t::xhas_in ()
bool zmq::rep_t::xhas_out ()
{
if (sending_reply && tbc)
if (sending_reply && more)
return true;
// TODO: No check for write here...
......
......@@ -70,7 +70,7 @@ namespace zmq
// True, if message processed at the moment (either sent or received)
// is processed only partially.
bool tbc;
bool more;
// Pipe we are going to send reply to.
class writer_t *reply_pipe;
......
......@@ -29,7 +29,7 @@ zmq::req_t::req_t (class app_thread_t *parent_) :
current (0),
receiving_reply (false),
reply_pipe_active (false),
tbc (false),
more (false),
reply_pipe (NULL)
{
options.requires_in = true;
......@@ -57,7 +57,7 @@ void zmq::req_t::xattach_pipes (class reader_t *inpipe_,
void zmq::req_t::xdetach_inpipe (class reader_t *pipe_)
{
zmq_assert (!receiving_reply || !tbc || reply_pipe != pipe_);
zmq_assert (!receiving_reply || !more || reply_pipe != pipe_);
zmq_assert (pipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
......@@ -96,7 +96,7 @@ void zmq::req_t::xdetach_inpipe (class reader_t *pipe_)
void zmq::req_t::xdetach_outpipe (class writer_t *pipe_)
{
zmq_assert (receiving_reply || !tbc || out_pipes [current] != pipe_);
zmq_assert (receiving_reply || !more || out_pipes [current] != pipe_);
zmq_assert (pipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
......@@ -175,7 +175,7 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
if (out_pipes [current]->check_write ())
break;
zmq_assert (!tbc);
zmq_assert (!more);
active--;
if (current < active) {
in_pipes.swap (current, active);
......@@ -193,8 +193,8 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
// Push message to the selected pipe.
bool written = out_pipes [current]->write (msg_);
zmq_assert (written);
tbc = msg_->flags & ZMQ_MSG_TBC;
if (!tbc) {
more = msg_->flags & ZMQ_MSG_MORE;
if (!more) {
out_pipes [current]->flush ();
receiving_reply = true;
reply_pipe = in_pipes [current];
......@@ -235,8 +235,8 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
}
// If this was last part of the reply, switch to request phase.
tbc = msg_->flags & ZMQ_MSG_TBC;
if (!tbc) {
more = msg_->flags & ZMQ_MSG_MORE;
if (!more) {
receiving_reply = false;
reply_pipe = NULL;
}
......@@ -246,7 +246,7 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
bool zmq::req_t::xhas_in ()
{
if (receiving_reply && tbc)
if (receiving_reply && more)
return true;
if (!receiving_reply || !reply_pipe_active)
......@@ -263,7 +263,7 @@ bool zmq::req_t::xhas_in ()
bool zmq::req_t::xhas_out ()
{
if (!receiving_reply && tbc)
if (!receiving_reply && more)
return true;
if (receiving_reply)
......
......@@ -79,7 +79,7 @@ namespace zmq
// True, if message processed at the moment (either sent or received)
// is processed only partially.
bool tbc;
bool more;
// Pipe we are awaiting the reply from.
class reader_t *reply_pipe;
......
......@@ -76,7 +76,7 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)
if (!in_pipe->read (msg_))
return false;
incomplete_in = msg_->flags & ZMQ_MSG_TBC;
incomplete_in = msg_->flags & ZMQ_MSG_MORE;
return true;
}
......
......@@ -311,10 +311,10 @@ int zmq::socket_base_t::connect (const char *addr_)
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
{
// ZMQ_TBC is actually a message flag, not a real send-flag
// ZMQ_MORE is actually a message flag, not a real send-flag
// such as ZMQ_NOBLOCK. At this point we impose it on the message.
if (flags_ & ZMQ_TBC)
msg_->flags |= ZMQ_MSG_TBC;
if (flags_ & ZMQ_MORE)
msg_->flags |= ZMQ_MSG_MORE;
// Process pending commands, if any.
app_thread->process_commands (false, true);
......
......@@ -27,7 +27,7 @@
zmq::sub_t::sub_t (class app_thread_t *parent_) :
socket_base_t (parent_),
has_message (false),
tbc (false)
more (false)
{
options.requires_in = true;
options.requires_out = false;
......@@ -106,7 +106,7 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
if (has_message) {
zmq_msg_move (msg_, &message);
has_message = false;
tbc = msg_->flags & ZMQ_MSG_TBC;
more = msg_->flags & ZMQ_MSG_MORE;
return 0;
}
......@@ -125,14 +125,14 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
// Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed
if (tbc || match (msg_)) {
tbc = msg_->flags & ZMQ_MSG_TBC;
if (more || match (msg_)) {
more = msg_->flags & ZMQ_MSG_MORE;
return 0;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
while (msg_->flags & ZMQ_MSG_TBC) {
while (msg_->flags & ZMQ_MSG_MORE) {
rc = fq.recv (msg_, ZMQ_NOBLOCK);
zmq_assert (rc == 0);
}
......@@ -142,7 +142,7 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
bool zmq::sub_t::xhas_in ()
{
// There are subsequent parts of the partly-read message available.
if (tbc)
if (more)
return true;
// If there's already a message prepared by a previous call to zmq_poll,
......@@ -172,7 +172,7 @@ bool zmq::sub_t::xhas_in ()
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
while (message.flags & ZMQ_MSG_TBC) {
while (message.flags & ZMQ_MSG_MORE) {
rc = fq.recv (&message, ZMQ_NOBLOCK);
zmq_assert (rc == 0);
}
......
......@@ -70,7 +70,7 @@ namespace zmq
// If true, part of a multipart message was already received, but
// there are following parts still waiting.
bool tbc;
bool more;
sub_t (const sub_t&);
void operator = (const sub_t&);
......
......@@ -71,19 +71,19 @@ bool zmq::zmq_encoder_t::message_ready ()
// For messages less than 255 bytes long, write one byte of message size.
// For longer messages write 0xff escape character followed by 8-byte
// message size. In both cases empty 'flags' field follows.
// message size. In both cases 'flags' field follows.
if (size < 255) {
tmpbuf [0] = (unsigned char) size;
tmpbuf [1] = (in_progress.flags & ~ZMQ_MSG_SHARED);
next_step (tmpbuf, 2, &zmq_encoder_t::size_ready,
!(in_progress.flags & ZMQ_MSG_TBC));
!(in_progress.flags & ZMQ_MSG_MORE));
}
else {
tmpbuf [0] = 0xff;
put_uint64 (tmpbuf + 1, size);
tmpbuf [9] = (in_progress.flags & ~ZMQ_MSG_SHARED);
next_step (tmpbuf, 10, &zmq_encoder_t::size_ready,
!(in_progress.flags & ZMQ_MSG_TBC));
!(in_progress.flags & ZMQ_MSG_MORE));
}
return true;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment