Commit 00cf3ceb authored by Martin Sustrik's avatar Martin Sustrik

multi-part message functionality available via ZMQ_SNDMORE and ZMQ_RCVMORE

parent 6fea4225
...@@ -179,8 +179,11 @@ ZMQ_EXPORT int zmq_term (void *context); ...@@ -179,8 +179,11 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_MCAST_LOOP 10 #define ZMQ_MCAST_LOOP 10
#define ZMQ_SNDBUF 11 #define ZMQ_SNDBUF 11
#define ZMQ_RCVBUF 12 #define ZMQ_RCVBUF 12
#define ZMQ_RCVMORE 13
#define ZMQ_NOBLOCK 1 #define ZMQ_NOBLOCK 1
#define ZMQ_SNDMORE 2
// Obsolete:
#define ZMQ_MORE 2 #define ZMQ_MORE 2
ZMQ_EXPORT void *zmq_socket (void *context, int type); ZMQ_EXPORT void *zmq_socket (void *context, int type);
......
...@@ -42,6 +42,7 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : ...@@ -42,6 +42,7 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_), object_t (parent_),
pending_term_acks (0), pending_term_acks (0),
ticks (0), ticks (0),
rcvmore (false),
app_thread (parent_), app_thread (parent_),
shutting_down (false), shutting_down (false),
sent_seqnum (0), sent_seqnum (0),
...@@ -70,7 +71,16 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, ...@@ -70,7 +71,16 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
int zmq::socket_base_t::getsockopt (int option_, void *optval_, int zmq::socket_base_t::getsockopt (int option_, void *optval_,
size_t *optvallen_) size_t *optvallen_)
{ {
// At the moment there are no socket-type-specific overloads of getsockopt. if (option_ == ZMQ_RCVMORE) {
if (*optvallen_ < sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
*((int64_t*) optval_) = rcvmore ? 1 : 0;
*optvallen_ = sizeof (int64_t);
return 0;
}
return options.getsockopt (option_, optval_, optvallen_); return options.getsockopt (option_, optval_, optvallen_);
} }
...@@ -318,9 +328,8 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -318,9 +328,8 @@ int zmq::socket_base_t::connect (const char *addr_)
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
{ {
// ZMQ_MORE is actually a message flag, not a real send-flag // At this point we impose the MORE flag on the message.
// such as ZMQ_NOBLOCK. At this point we impose it on the message. if (flags_ & ZMQ_SNDMORE)
if (flags_ & ZMQ_MORE)
msg_->flags |= ZMQ_MSG_MORE; msg_->flags |= ZMQ_MSG_MORE;
// Process pending commands, if any. // Process pending commands, if any.
...@@ -367,8 +376,12 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) ...@@ -367,8 +376,12 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
} }
// If we have the message, return immediately. // If we have the message, return immediately.
if (rc == 0) if (rc == 0) {
rcvmore = msg_->flags & ZMQ_MSG_MORE;
if (rcvmore)
msg_->flags &= ~ZMQ_MSG_MORE;
return 0; return 0;
}
// If we don't have the message, restore the original cause of the problem. // If we don't have the message, restore the original cause of the problem.
errno = err; errno = err;
...@@ -393,6 +406,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) ...@@ -393,6 +406,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
rc = xrecv (msg_, flags_); rc = xrecv (msg_, flags_);
ticks = 0; ticks = 0;
} }
rcvmore = msg_->flags & ZMQ_MSG_MORE;
if (rcvmore)
msg_->flags &= ~ZMQ_MSG_MORE;
return 0; return 0;
} }
......
...@@ -141,6 +141,9 @@ namespace zmq ...@@ -141,6 +141,9 @@ namespace zmq
// Number of messages received since last command processing. // Number of messages received since last command processing.
int ticks; int ticks;
// If true there's a half-read message in the socket.
bool rcvmore;
// Application thread the socket lives in. // Application thread the socket lives in.
class app_thread_t *app_thread; class app_thread_t *app_thread;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment