Commit 41b2f83b authored by Richard Newton's avatar Richard Newton

Merge pull request #714 from hintjens/master

Cleaned up the code
parents 08c91c0f 406e6051
...@@ -76,8 +76,15 @@ int zmq::proxy ( ...@@ -76,8 +76,15 @@ int zmq::proxy (
{ control_, 0, ZMQ_POLLIN, 0 } { control_, 0, ZMQ_POLLIN, 0 }
}; };
int qt_poll_items = (control_ ? 3 : 2); int qt_poll_items = (control_ ? 3 : 2);
enum {suspend, resume, terminate} state = resume;
while (state != terminate) { // Proxy can be in these three states
enum {
active,
paused,
terminated
} state = active;
while (state != terminated) {
// Wait while there are either requests or replies to process. // Wait while there are either requests or replies to process.
rc = zmq_poll (&items [0], qt_poll_items, -1); rc = zmq_poll (&items [0], qt_poll_items, -1);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
...@@ -85,46 +92,45 @@ int zmq::proxy ( ...@@ -85,46 +92,45 @@ int zmq::proxy (
// Process a control command if any // Process a control command if any
if (control_ && items [2].revents & ZMQ_POLLIN) { if (control_ && items [2].revents & ZMQ_POLLIN) {
rc = control_->recv (&msg, 0); rc = control_->recv (&msg, 0);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
moresz = sizeof more; moresz = sizeof more;
rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz); rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0) || more) if (unlikely (rc < 0) || more)
return -1; return -1;
// Copy message to capture socket if any // Copy message to capture socket if any
if (capture_) { if (capture_) {
msg_t ctrl; msg_t ctrl;
rc = ctrl.init (); rc = ctrl.init ();
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
rc = ctrl.copy (msg); rc = ctrl.copy (msg);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
rc = capture_->send (&ctrl, 0); rc = capture_->send (&ctrl, 0);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
} }
if (msg.size () == 6 && memcmp (msg.data (), "PAUSE", 6) == 0)
// process control command state = paused;
int size = msg.size(); else
char* message = (char*) malloc(size + 1); if (msg.size () == 7 && memcmp (msg.data (), "RESUME", 7) == 0)
memcpy(message, msg.data(), size); state = active;
message[size] = '\0'; else
if (size == 8 && !memcmp(message, "SUSPEND", 8)) if (msg.size () == 10 && memcmp (msg.data (), "TERMINATE", 10) == 0)
state = suspend; state = terminated;
else if (size == 7 && !memcmp(message, "RESUME", 7)) else {
state = resume; // This is an API error, we should assert
else if (size == 10 && !memcmp(message, "TERMINATE", 10)) puts ("E: invalid command sent to proxy");
state = terminate; assert (false);
else }
fprintf(stderr, "Warning : \"%s\" bad command received by proxy\n", message); // prefered compared to "return -1"
free (message);
} }
// Process a request // Process a request
if (state == resume && items [0].revents & ZMQ_POLLIN) { if (state == active
&& items [0].revents & ZMQ_POLLIN) {
while (true) { while (true) {
rc = frontend_->recv (&msg, 0); rc = frontend_->recv (&msg, 0);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
...@@ -156,38 +162,38 @@ int zmq::proxy ( ...@@ -156,38 +162,38 @@ int zmq::proxy (
} }
} }
// Process a reply // Process a reply
if (state == resume && items [1].revents & ZMQ_POLLIN) { if (state == active
&& items [1].revents & ZMQ_POLLIN) {
while (true) { while (true) {
rc = backend_->recv (&msg, 0); rc = backend_->recv (&msg, 0);
if (unlikely (rc < 0))
return -1;
moresz = sizeof more;
rc = backend_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0))
return -1;
// Copy message to capture socket if any
if (capture_) {
msg_t ctrl;
rc = ctrl.init ();
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
rc = ctrl.copy (msg);
moresz = sizeof more;
rc = backend_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
rc = capture_->send (&ctrl, more? ZMQ_SNDMORE: 0);
// Copy message to capture socket if any
if (capture_) {
msg_t ctrl;
rc = ctrl.init ();
if (unlikely (rc < 0))
return -1;
rc = ctrl.copy (msg);
if (unlikely (rc < 0))
return -1;
rc = capture_->send (&ctrl, more? ZMQ_SNDMORE: 0);
if (unlikely (rc < 0))
return -1;
}
rc = frontend_->send (&msg, more? ZMQ_SNDMORE: 0);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
if (more == 0) }
break; rc = frontend_->send (&msg, more? ZMQ_SNDMORE: 0);
if (unlikely (rc < 0))
return -1;
if (more == 0)
break;
} }
} }
} }
return 0; return 0;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment