Commit b55288fd authored by Maximilian Schneider's avatar Maximilian Schneider

return values of socket functions are ssize_t (not int) in compliance with POSIX

deprecated API (zmq_msg_recv/zmq_msg_send) still returns int
parent 19712d3f
...@@ -87,6 +87,7 @@ typedef unsigned __int8 uint8_t; ...@@ -87,6 +87,7 @@ typedef unsigned __int8 uint8_t;
# include <stdint.h> # include <stdint.h>
#endif #endif
#include <sys/types.h>
/******************************************************************************/ /******************************************************************************/
/* 0MQ errors. */ /* 0MQ errors. */
...@@ -212,8 +213,8 @@ ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg); ...@@ -212,8 +213,8 @@ ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg);
ZMQ_EXPORT int zmq_msg_init_size (zmq_msg_t *msg, size_t size); ZMQ_EXPORT int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
ZMQ_EXPORT int zmq_msg_init_data (zmq_msg_t *msg, void *data, ZMQ_EXPORT int zmq_msg_init_data (zmq_msg_t *msg, void *data,
size_t size, zmq_free_fn *ffn, void *hint); size_t size, zmq_free_fn *ffn, void *hint);
ZMQ_EXPORT int zmq_msg_send (zmq_msg_t *msg, void *s, int flags); ZMQ_EXPORT ssize_t zmq_msg_send (zmq_msg_t *msg, void *s, int flags);
ZMQ_EXPORT int zmq_msg_recv (zmq_msg_t *msg, void *s, int flags); ZMQ_EXPORT ssize_t zmq_msg_recv (zmq_msg_t *msg, void *s, int flags);
ZMQ_EXPORT int zmq_msg_close (zmq_msg_t *msg); ZMQ_EXPORT int zmq_msg_close (zmq_msg_t *msg);
ZMQ_EXPORT int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src); ZMQ_EXPORT int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);
ZMQ_EXPORT int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src); ZMQ_EXPORT int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);
...@@ -360,9 +361,9 @@ ZMQ_EXPORT int zmq_bind (void *s, const char *addr); ...@@ -360,9 +361,9 @@ ZMQ_EXPORT int zmq_bind (void *s, const char *addr);
ZMQ_EXPORT int zmq_connect (void *s, const char *addr); ZMQ_EXPORT int zmq_connect (void *s, const char *addr);
ZMQ_EXPORT int zmq_unbind (void *s, const char *addr); ZMQ_EXPORT int zmq_unbind (void *s, const char *addr);
ZMQ_EXPORT int zmq_disconnect (void *s, const char *addr); ZMQ_EXPORT int zmq_disconnect (void *s, const char *addr);
ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags); ZMQ_EXPORT ssize_t zmq_send (void *s, const void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_send_const (void *s, const void *buf, size_t len, int flags); ZMQ_EXPORT ssize_t zmq_send_const (void *s, const void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags); ZMQ_EXPORT ssize_t zmq_recv (void *s, void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_socket_monitor (void *s, const char *addr, int events); ZMQ_EXPORT int zmq_socket_monitor (void *s, const char *addr, int events);
......
...@@ -329,37 +329,38 @@ int zmq_disconnect (void *s_, const char *addr_) ...@@ -329,37 +329,38 @@ int zmq_disconnect (void *s_, const char *addr_)
// Sending functions. // Sending functions.
static int static ssize_t
s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_) s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
{ {
int sz = (int) zmq_msg_size (msg_); size_t nbytes = zmq_msg_size (msg_);
int rc = s_->send ((zmq::msg_t*) msg_, flags_); int rc = s_->send ((zmq::msg_t*) msg_, flags_);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
return sz; return nbytes;
} }
/* To be deprecated once zmq_msg_send() is stable */ /* To be deprecated once zmq_msg_send() is stable */
int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_) int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
{ {
return zmq_msg_send (msg_, s_, flags_); // reproduce behaviour of truncated int in deprecated API
return (int) zmq_msg_send (msg_, s_, flags_);
} }
int zmq_send (void *s_, const void *buf_, size_t len_, int flags_) ssize_t zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
{ {
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK; errno = ENOTSOCK;
return -1; return -1;
} }
zmq_msg_t msg; zmq_msg_t msg;
int rc = zmq_msg_init_size (&msg, len_); int rc1 = zmq_msg_init_size (&msg, len_);
if (rc != 0) if (rc1 != 0)
return -1; return -1;
memcpy (zmq_msg_data (&msg), buf_, len_); memcpy (zmq_msg_data (&msg), buf_, len_);
zmq::socket_base_t *s = (zmq::socket_base_t *) s_; zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
rc = s_sendmsg (s, &msg, flags_); ssize_t nbytes = s_sendmsg (s, &msg, flags_);
if (unlikely (rc < 0)) { if (unlikely (nbytes < 0)) {
int err = errno; int err = errno;
int rc2 = zmq_msg_close (&msg); int rc2 = zmq_msg_close (&msg);
errno_assert (rc2 == 0); errno_assert (rc2 == 0);
...@@ -369,23 +370,23 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_) ...@@ -369,23 +370,23 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
// Note the optimisation here. We don't close the msg object as it is // Note the optimisation here. We don't close the msg object as it is
// empty anyway. This may change when implementation of zmq_msg_t changes. // empty anyway. This may change when implementation of zmq_msg_t changes.
return rc; return nbytes;
} }
int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_) ssize_t zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
{ {
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK; errno = ENOTSOCK;
return -1; return -1;
} }
zmq_msg_t msg; zmq_msg_t msg;
int rc = zmq_msg_init_data (&msg, (void*)buf_, len_, NULL, NULL); int rc1 = zmq_msg_init_data (&msg, (void*)buf_, len_, NULL, NULL);
if (rc != 0) if (rc1 != 0)
return -1; return -1;
zmq::socket_base_t *s = (zmq::socket_base_t *) s_; zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
rc = s_sendmsg (s, &msg, flags_); ssize_t nbytes = s_sendmsg (s, &msg, flags_);
if (unlikely (rc < 0)) { if (unlikely (nbytes < 0)) {
int err = errno; int err = errno;
int rc2 = zmq_msg_close (&msg); int rc2 = zmq_msg_close (&msg);
errno_assert (rc2 == 0); errno_assert (rc2 == 0);
...@@ -395,7 +396,7 @@ int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_) ...@@ -395,7 +396,7 @@ int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
// Note the optimisation here. We don't close the msg object as it is // Note the optimisation here. We don't close the msg object as it is
// empty anyway. This may change when implementation of zmq_msg_t changes. // empty anyway. This may change when implementation of zmq_msg_t changes.
return rc; return nbytes;
} }
...@@ -425,8 +426,8 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_) ...@@ -425,8 +426,8 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len); memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len);
if (i == count_ - 1) if (i == count_ - 1)
flags_ = flags_ & ~ZMQ_SNDMORE; flags_ = flags_ & ~ZMQ_SNDMORE;
rc = s_sendmsg (s, &msg, flags_); ssize_t nbytes = s_sendmsg (s, &msg, flags_);
if (unlikely (rc < 0)) { if (unlikely (nbytes < 0)) {
int err = errno; int err = errno;
int rc2 = zmq_msg_close (&msg); int rc2 = zmq_msg_close (&msg);
errno_assert (rc2 == 0); errno_assert (rc2 == 0);
...@@ -440,23 +441,24 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_) ...@@ -440,23 +441,24 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_)
// Receiving functions. // Receiving functions.
static int static ssize_t
s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_) s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
{ {
int rc = s_->recv ((zmq::msg_t*) msg_, flags_); int rc = s_->recv ((zmq::msg_t*) msg_, flags_);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
return (int) zmq_msg_size (msg_); return zmq_msg_size (msg_);
} }
/* To be deprecated once zmq_msg_recv() is stable */ /* To be deprecated once zmq_msg_recv() is stable */
int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_) int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
{ {
return zmq_msg_recv (msg_, s_, flags_); // reproduce behaviour of truncated int in deprecated API
return (int) zmq_msg_recv (msg_, s_, flags_);
} }
int zmq_recv (void *s_, void *buf_, size_t len_, int flags_) ssize_t zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
{ {
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK; errno = ENOTSOCK;
...@@ -467,7 +469,7 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_) ...@@ -467,7 +469,7 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
errno_assert (rc == 0); errno_assert (rc == 0);
zmq::socket_base_t *s = (zmq::socket_base_t *) s_; zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int nbytes = s_recvmsg (s, &msg, flags_); ssize_t nbytes = s_recvmsg (s, &msg, flags_);
if (unlikely (nbytes < 0)) { if (unlikely (nbytes < 0)) {
int err = errno; int err = errno;
rc = zmq_msg_close (&msg); rc = zmq_msg_close (&msg);
...@@ -476,9 +478,7 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_) ...@@ -476,9 +478,7 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
return -1; return -1;
} }
// At the moment an oversized message is silently truncated. ssize_t to_copy = (size_t) nbytes < len_ ? nbytes : len_;
// TODO: Build in a notification mechanism to report the overflows.
size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
memcpy (buf_, zmq_msg_data (&msg), to_copy); memcpy (buf_, zmq_msg_data (&msg), to_copy);
rc = zmq_msg_close (&msg); rc = zmq_msg_close (&msg);
...@@ -523,7 +523,7 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_) ...@@ -523,7 +523,7 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
int rc = zmq_msg_init (&msg); int rc = zmq_msg_init (&msg);
errno_assert (rc == 0); errno_assert (rc == 0);
int nbytes = s_recvmsg (s, &msg, flags_); ssize_t nbytes = s_recvmsg (s, &msg, flags_);
if (unlikely (nbytes < 0)) { if (unlikely (nbytes < 0)) {
int err = errno; int err = errno;
rc = zmq_msg_close (&msg); rc = zmq_msg_close (&msg);
...@@ -569,26 +569,26 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, ...@@ -569,26 +569,26 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_); return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_);
} }
int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_) ssize_t zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_)
{ {
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK; errno = ENOTSOCK;
return -1; return -1;
} }
zmq::socket_base_t *s = (zmq::socket_base_t *) s_; zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s_sendmsg (s, msg_, flags_); ssize_t nbytes = s_sendmsg (s, msg_, flags_);
return result; return nbytes;
} }
int zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_) ssize_t zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_)
{ {
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK; errno = ENOTSOCK;
return -1; return -1;
} }
zmq::socket_base_t *s = (zmq::socket_base_t *) s_; zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s_recvmsg (s, msg_, flags_); ssize_t nbytes = s_recvmsg (s, msg_, flags_);
return result; return nbytes;
} }
int zmq_msg_close (zmq_msg_t *msg_) int zmq_msg_close (zmq_msg_t *msg_)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment