Commit 7d475add authored by Ian Barber's avatar Ian Barber

Merge pull request #495 from hintjens/master

Whitespace and comment cleanups
parents c1f76e43 d997d880
...@@ -45,7 +45,7 @@ while (true) { ...@@ -45,7 +45,7 @@ while (true) {
int rc = zmq_msg_init (&frame); int rc = zmq_msg_init (&frame);
assert (rc == 0); assert (rc == 0);
// Block until a message is available to be received from socket // Block until a message is available to be received from socket
rc = zmq_recvmsg (socket, &frame, 0); rc = zmq_msg_recv (socket, &frame, 0);
assert (rc != -1); assert (rc != -1);
if (zmq_msg_get (&frame, ZMQ_MORE)) if (zmq_msg_get (&frame, ZMQ_MORE))
fprintf (stderr, "more\n"); fprintf (stderr, "more\n");
......
...@@ -15,7 +15,9 @@ SYNOPSIS ...@@ -15,7 +15,9 @@ SYNOPSIS
DESCRIPTION DESCRIPTION
----------- -----------
The _zmq_msg_more()_ function indicates whether this is part of a multi-part The _zmq_msg_more()_ function indicates whether this is part of a multi-part
message, and there are further parts to receive. message, and there are further parts to receive. This method can safely be
called after _zmq_msg_close()_. This method is identical to _zmq_msg_get()_
with an argument of ZMQ_MORE.
RETURN VALUE RETURN VALUE
...@@ -35,7 +37,7 @@ while (true) { ...@@ -35,7 +37,7 @@ while (true) {
int rc = zmq_msg_init (&part); int rc = zmq_msg_init (&part);
assert (rc == 0); assert (rc == 0);
// Block until a message is available to be received from socket // Block until a message is available to be received from socket
rc = zmq_recvmsg (socket, &part, 0); rc = zmq_msg_recv (socket, &part, 0);
assert (rc != -1); assert (rc != -1);
if (zmq_msg_more (&part)) if (zmq_msg_more (&part))
fprintf (stderr, "more\n"); fprintf (stderr, "more\n");
......
...@@ -78,9 +78,7 @@ assert (nbytes != -1); ...@@ -78,9 +78,7 @@ assert (nbytes != -1);
SEE ALSO SEE ALSO
-------- --------
linkzmq:zmq_recvmsg[3]
linkzmq:zmq_send[3] linkzmq:zmq_send[3]
linkzmq:zmq_sendmsg[3]
linkzmq:zmq_getsockopt[3] linkzmq:zmq_getsockopt[3]
linkzmq:zmq_socket[7] linkzmq:zmq_socket[7]
linkzmq:zmq[7] linkzmq:zmq[7]
......
...@@ -110,7 +110,6 @@ SEE ALSO ...@@ -110,7 +110,6 @@ SEE ALSO
-------- --------
linkzmq:zmq_recv[3] linkzmq:zmq_recv[3]
linkzmq:zmq_send[3] linkzmq:zmq_send[3]
linkzmq:zmq_sendmsg[3]
linkzmq:zmq_getsockopt[3] linkzmq:zmq_getsockopt[3]
linkzmq:zmq_socket[7] linkzmq:zmq_socket[7]
linkzmq:zmq[7] linkzmq:zmq[7]
......
...@@ -87,9 +87,7 @@ assert (rc == 2); ...@@ -87,9 +87,7 @@ assert (rc == 2);
SEE ALSO SEE ALSO
-------- --------
linkzmq:zmq_sendmsg[3]
linkzmq:zmq_recv[3] linkzmq:zmq_recv[3]
linkzmq:zmq_recvmsg[3]
linkzmq:zmq_socket[7] linkzmq:zmq_socket[7]
linkzmq:zmq[7] linkzmq:zmq[7]
......
...@@ -106,8 +106,6 @@ rc = zmq_sendmsg (socket, &part3, 0); ...@@ -106,8 +106,6 @@ rc = zmq_sendmsg (socket, &part3, 0);
SEE ALSO SEE ALSO
-------- --------
linkzmq:zmq_recv[3] linkzmq:zmq_recv[3]
linkzmq:zmq_recv[3]
linkzmq:zmq_recvmsg[3]
linkzmq:zmq_socket[7] linkzmq:zmq_socket[7]
linkzmq:zmq[7] linkzmq:zmq[7]
......
...@@ -37,7 +37,7 @@ static void *req_socket_monitor (void *ctx) ...@@ -37,7 +37,7 @@ static void *req_socket_monitor (void *ctx)
while (true) { while (true) {
zmq_msg_t msg; zmq_msg_t msg;
zmq_msg_init (&msg); zmq_msg_init (&msg);
rc = zmq_recvmsg (s, &msg, 0); rc = zmq_msg_recv (s, &msg, 0);
if (rc == -1 && zmq_errno() == ETERM) break; if (rc == -1 && zmq_errno() == ETERM) break;
assert (rc != -1); assert (rc != -1);
memcpy (&event, zmq_msg_data (&msg), sizeof (event)); memcpy (&event, zmq_msg_data (&msg), sizeof (event));
...@@ -217,7 +217,7 @@ static void *rep_socket_monitor (void *ctx) ...@@ -217,7 +217,7 @@ static void *rep_socket_monitor (void *ctx)
while (true) { while (true) {
zmq_msg_t msg; zmq_msg_t msg;
zmq_msg_init (&msg); zmq_msg_init (&msg);
rc = zmq_recvmsg (s, &msg, 0); rc = zmq_msg_recv (s, &msg, 0);
if (rc == -1 && zmq_errno() == ETERM) break; if (rc == -1 && zmq_errno() == ETERM) break;
assert (rc != -1); assert (rc != -1);
memcpy (&event, zmq_msg_data (&msg), sizeof (event)); memcpy (&event, zmq_msg_data (&msg), sizeof (event));
......
...@@ -194,7 +194,7 @@ void zmq::stream_engine_t::terminate () ...@@ -194,7 +194,7 @@ void zmq::stream_engine_t::terminate ()
void zmq::stream_engine_t::in_event () void zmq::stream_engine_t::in_event ()
{ {
// If still handshaking, receive and prcess the greeting message. // If still handshaking, receive and process the greeting message.
if (unlikely (handshaking)) if (unlikely (handshaking))
if (!handshake ()) if (!handshake ())
return; return;
...@@ -390,9 +390,8 @@ bool zmq::stream_engine_t::handshake () ...@@ -390,9 +390,8 @@ bool zmq::stream_engine_t::handshake ()
// Position of the version field in the greeting. // Position of the version field in the greeting.
const size_t version_pos = 10; const size_t version_pos = 10;
// Is the peer using the unversioned protocol? // Is the peer using ZMTP/1.0 with no version number?
// If so, we send and receive rests of identity // If so, we send and receive rests of identity messages
// messages.
if (greeting [0] != 0xff || !(greeting [9] & 0x01)) { if (greeting [0] != 0xff || !(greeting [9] & 0x01)) {
encoder = new (std::nothrow) encoder_t (out_batch_size); encoder = new (std::nothrow) encoder_t (out_batch_size);
alloc_assert (encoder); alloc_assert (encoder);
...@@ -417,8 +416,8 @@ bool zmq::stream_engine_t::handshake () ...@@ -417,8 +416,8 @@ bool zmq::stream_engine_t::handshake ()
insize = greeting_bytes_read; insize = greeting_bytes_read;
// To allow for interoperability with peers that do not forward // To allow for interoperability with peers that do not forward
// their subscriptions, we inject a phony subsription // their subscriptions, we inject a phony subscription
// message into the incomming message stream. To put this // message into the incoming message stream. To put this
// message right after the identity message, we temporarily // message right after the identity message, we temporarily
// divert the message stream from session to ourselves. // divert the message stream from session to ourselves.
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
......
...@@ -74,9 +74,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) ...@@ -74,9 +74,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) if (options.type == ZMQ_XPUB && (unique || (*data && verbose)))
pending.push_back (blob_t (data, size)); pending.push_back (blob_t (data, size));
} }
else /*process message unrelated to sub/unsub*/ { else
// Process user message coming upstream from xsub socket
pending.push_back (blob_t (data, size)); pending.push_back (blob_t (data, size));
}
sub.close (); sub.close ();
} }
...@@ -177,7 +177,6 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, ...@@ -177,7 +177,6 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
xpub_t *self = (xpub_t*) arg_; xpub_t *self = (xpub_t*) arg_;
if (self->options.type != ZMQ_PUB) { if (self->options.type != ZMQ_PUB) {
// Place the unsubscription to the queue of pending (un)sunscriptions // Place the unsubscription to the queue of pending (un)sunscriptions
// to be retrived by the user later on. // to be retrived by the user later on.
blob_t unsub (size_ + 1, 0); blob_t unsub (size_ + 1, 0);
......
...@@ -48,7 +48,7 @@ zmq::xsub_t::~xsub_t () ...@@ -48,7 +48,7 @@ zmq::xsub_t::~xsub_t ()
void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{ {
// icanhasall_ is unused // icanhasall_ is unused
(void)icanhasall_; (void) icanhasall_;
zmq_assert (pipe_); zmq_assert (pipe_);
fq.attach (pipe_); fq.attach (pipe_);
...@@ -87,22 +87,24 @@ int zmq::xsub_t::xsend (msg_t *msg_) ...@@ -87,22 +87,24 @@ int zmq::xsub_t::xsend (msg_t *msg_)
size_t size = msg_->size (); size_t size = msg_->size ();
unsigned char *data = (unsigned char*) msg_->data (); unsigned char *data = (unsigned char*) msg_->data ();
// Process the subscription.
if (*data == 1) { if (*data == 1) {
// this used to filter out duplicate subscriptions, // Process subscribe message
// This used to filter out duplicate subscriptions,
// however this is alread done on the XPUB side and // however this is alread done on the XPUB side and
// doing it here as well breaks ZMQ_XPUB_VERBOSE // doing it here as well breaks ZMQ_XPUB_VERBOSE
// when there are forwarding devices involved // when there are forwarding devices involved.
subscriptions.add (data + 1, size - 1); subscriptions.add (data + 1, size - 1);
return dist.send_to_all (msg_); return dist.send_to_all (msg_);
} }
else if (*data == 0) { else
if (*data == 0) {
// Process unsubscribe message
if (subscriptions.rm (data + 1, size - 1)) if (subscriptions.rm (data + 1, size - 1))
return dist.send_to_all (msg_); return dist.send_to_all (msg_);
} }
else /*upstream message unrelated to sub/unsub*/ { else
// User message sent upstream to XPUB socket
return dist.send_to_all (msg_); return dist.send_to_all (msg_);
}
int rc = msg_->close (); int rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
......
...@@ -53,7 +53,7 @@ int main (void) ...@@ -53,7 +53,7 @@ int main (void)
val = 0; val = 0;
rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val));
assert (rc == 0); assert (rc == 0);
rc = zmq_bind(to, "tcp://*:5555"); rc = zmq_bind(to, "tcp://*:6555");
assert (rc == 0); assert (rc == 0);
// Create a socket pushing to two endpoints - only 1 message should arrive. // Create a socket pushing to two endpoints - only 1 message should arrive.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment