Commit 96ccc1c5 authored by Martin Sustrik's avatar Martin Sustrik

'flags' fields added to the wire format

parent e04e2cdb
......@@ -60,25 +60,24 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
else {
// TODO: Handle over-sized message decently.
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
if (prefix.empty ()) {
int rc = zmq_msg_init_size (&in_progress, *tmpbuf);
int rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1);
errno_assert (rc == 0);
next_step (zmq_msg_data (&in_progress), *tmpbuf,
&zmq_decoder_t::message_ready);
}
else {
int rc = zmq_msg_init_size (&in_progress,
*tmpbuf + 1 + prefix.size ());
1 + prefix.size () + *tmpbuf - 1);
errno_assert (rc == 0);
unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress);
*data = (unsigned char) prefix.size ();
memcpy (data + 1, prefix.data (), *data);
next_step (data + *data + 1, *tmpbuf,
&zmq_decoder_t::message_ready);
}
next_step (tmpbuf, 1, &zmq_decoder_t::flags_ready);
}
return true;
}
......@@ -90,22 +89,41 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
size_t size = (size_t) get_uint64 (tmpbuf);
// TODO: Handle over-sized message decently.
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
if (prefix.empty ()) {
int rc = zmq_msg_init_size (&in_progress, size);
int rc = zmq_msg_init_size (&in_progress, size - 1);
errno_assert (rc == 0);
next_step (zmq_msg_data (&in_progress), size,
&zmq_decoder_t::message_ready);
}
else {
int rc = zmq_msg_init_size (&in_progress, size + 1 + prefix.size ());
int rc = zmq_msg_init_size (&in_progress,
1 + prefix.size () + size - 1);
errno_assert (rc == 0);
unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress);
*data = (unsigned char) prefix.size ();
memcpy (data + 1, prefix.data (), *data);
next_step (data + *data + 1, size, &zmq_decoder_t::message_ready);
}
next_step (tmpbuf, 1, &zmq_decoder_t::flags_ready);
return true;
}
bool zmq::zmq_decoder_t::flags_ready ()
{
// No flags are accepted at the moment.
zmq_assert (tmpbuf [0] == 0);
if (prefix.empty ()) {
next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
&zmq_decoder_t::message_ready);
}
else {
next_step ((unsigned char*) zmq_msg_data (&in_progress) +
prefix.size () + 1,
zmq_msg_size (&in_progress) - prefix.size () - 1,
&zmq_decoder_t::message_ready);
}
return true;
......
......@@ -48,6 +48,7 @@ namespace zmq
bool one_byte_size_ready ();
bool eight_byte_size_ready ();
bool flags_ready ();
bool message_ready ();
struct i_inout *destination;
......
......@@ -89,17 +89,22 @@ bool zmq::zmq_encoder_t::message_ready ()
size -= prefix_size;
}
// Account for the 'flags' byte.
size++;
// For messages less than 255 bytes long, write one byte of message size.
// For longer messages write 0xff escape character followed by 8-byte
// message size.
// message size. In both cases empty 'flags' field follows.
if (size < 255) {
tmpbuf [0] = (unsigned char) size;
next_step (tmpbuf, 1, &zmq_encoder_t::size_ready, true);
tmpbuf [1] = 0;
next_step (tmpbuf, 2, &zmq_encoder_t::size_ready, true);
}
else {
tmpbuf [0] = 0xff;
put_uint64 (tmpbuf + 1, size);
next_step (tmpbuf, 9, &zmq_encoder_t::size_ready, true);
tmpbuf [9] = 0;
next_step (tmpbuf, 10, &zmq_encoder_t::size_ready, true);
}
return true;
}
......@@ -48,7 +48,7 @@ namespace zmq
struct i_inout *source;
::zmq_msg_t in_progress;
unsigned char tmpbuf [9];
unsigned char tmpbuf [10];
bool trim;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment