Commit 531835bb authored by skaller's avatar skaller

Implement zmq_sendv.

Posix style send multiple messages using scatter/gather array.
parent f59fff7b
...@@ -50,6 +50,17 @@ ...@@ -50,6 +50,17 @@
#include <unistd.h> #include <unistd.h>
#endif #endif
// XSI vector I/O
#if ZMQ_HAVE_UIO
#include <sys/uio.h>
#else
struct iovec
{
void *iov_base;
size_t iov_len;
};
#endif
#include <string.h> #include <string.h>
#include <errno.h> #include <errno.h>
#include <stdlib.h> #include <stdlib.h>
...@@ -294,6 +305,10 @@ int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_) ...@@ -294,6 +305,10 @@ int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
int zmq_send (void *s_, const void *buf_, size_t len_, int flags_) int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
{ {
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq_msg_t msg; zmq_msg_t msg;
int rc = zmq_msg_init_size (&msg, len_); int rc = zmq_msg_init_size (&msg, len_);
if (rc != 0) if (rc != 0)
...@@ -317,6 +332,47 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_) ...@@ -317,6 +332,47 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
return rc; return rc;
} }
// Send multiple messages.
//
// If flag bit ZMQ_SNDMORE is set the vector is treated as
// a single multi-part message, i.e. the last message has
// ZMQ_SENDMORE bit switched off.
//
int zmq_sendv (void *s_, iovec *a_, size_t count_, int flags_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
int rc = 0;
zmq_msg_t msg;
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
if(s->thread_safe()) s->lock();
for(size_t i = 0; i < count_; ++i)
{
rc = zmq_msg_init_size (&msg, a_[i].iov_len);
if (rc != 0)
{
rc = -1;
break;
}
memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len);
if (i == count_ - 1) flags_ = flags_ & ~ZMQ_SNDMORE;
rc = inner_sendmsg (s, &msg, flags_);
if (unlikely (rc < 0)) {
int err = errno;
int rc2 = zmq_msg_close (&msg);
errno_assert (rc2 == 0);
errno = err;
rc = -1;
break;
}
}
if(s->thread_safe()) s->unlock();
return rc;
}
// Receiving functions. // Receiving functions.
static int inner_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_) static int inner_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment