Commit 318d55fd authored by Pieter Hintjens's avatar Pieter Hintjens

Fixed issue LIBZMQ-358

parent 36e9c4ac
...@@ -32,89 +32,63 @@ int zmq::device (class socket_base_t *insocket_, ...@@ -32,89 +32,63 @@ int zmq::device (class socket_base_t *insocket_,
{ {
msg_t msg; msg_t msg;
int rc = msg.init (); int rc = msg.init ();
if (rc != 0)
if (rc != 0) {
return -1; return -1;
}
int64_t more; // The algorithm below assumes ratio of request and replies processed
// under full load to be 1:1.
int more;
size_t moresz; size_t moresz;
zmq_pollitem_t items [] = {
zmq_pollitem_t items [2]; { insocket_, 0, ZMQ_POLLIN, 0 },
items [0].socket = insocket_; { outsocket_, 0, ZMQ_POLLIN, 0 }
items [0].fd = 0; };
items [0].events = ZMQ_POLLIN;
items [0].revents = 0;
items [1].socket = outsocket_;
items [1].fd = 0;
items [1].events = ZMQ_POLLIN;
items [1].revents = 0;
while (true) { while (true) {
// Wait while there are either requests or replies to process. // Wait while there are either requests or replies to process.
rc = zmq_poll (&items [0], 2, -1); rc = zmq_poll (&items [0], 2, -1);
if (unlikely (rc < 0)) { if (unlikely (rc < 0))
return -1; return -1;
}
// The algorithm below asumes ratio of request and replies processed
// under full load to be 1:1. Although processing requests replies
// first is tempting it is suspectible to DoS attacks (overloading
// the system with unsolicited replies).
// Process a request. // Process a request.
if (items [0].revents & ZMQ_POLLIN) { if (items [0].revents & ZMQ_POLLIN) {
while (true) { while (true) {
rc = insocket_->recv (&msg, 0); rc = insocket_->recv (&msg, 0);
if (unlikely (rc < 0)) { if (unlikely (rc < 0))
return -1; return -1;
}
moresz = sizeof (more); moresz = sizeof (more);
rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0)) { if (unlikely (rc < 0))
return -1; return -1;
}
rc = outsocket_->send (&msg, more? ZMQ_SNDMORE: 0);
rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); if (unlikely (rc < 0))
if (unlikely (rc < 0)) {
return -1; return -1;
} if (more == 0)
if (!more)
break; break;
} }
} }
// Process a reply. // Process a reply.
if (items [1].revents & ZMQ_POLLIN) { if (items [1].revents & ZMQ_POLLIN) {
while (true) { while (true) {
rc = outsocket_->recv (&msg, 0); rc = outsocket_->recv (&msg, 0);
if (unlikely (rc < 0)) { if (unlikely (rc < 0))
return -1; return -1;
}
moresz = sizeof (more); moresz = sizeof (more);
rc = outsocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); rc = outsocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0)) { if (unlikely (rc < 0))
return -1; return -1;
}
rc = insocket_->send (&msg, more? ZMQ_SNDMORE: 0);
rc = insocket_->send (&msg, more ? ZMQ_SNDMORE : 0); if (unlikely (rc < 0))
if (unlikely (rc < 0)) {
return -1; return -1;
} if (more == 0)
if (!more)
break; break;
} }
} }
} }
return 0; return 0;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment