Commit 2566c02a authored by Pieter Hintjens's avatar Pieter Hintjens

Problem: CLIENT and SERVER don't check SNDMORE

These sockets don't handle multipart data, so if callers send it,
they drop frames, and things break silently.

Solution: if the caller tries to use ZMQ_SNDMORE, return -1 and
set errno to EINVAL.
parent ae3b2734
...@@ -67,6 +67,8 @@ ERRORS ...@@ -67,6 +67,8 @@ ERRORS
Non-blocking mode was requested and the message cannot be sent at the moment. Non-blocking mode was requested and the message cannot be sent at the moment.
*ENOTSUP*:: *ENOTSUP*::
The _zmq_msg_send()_ operation is not supported by this socket type. The _zmq_msg_send()_ operation is not supported by this socket type.
*EINVAL*::
The sender tried to send multipart data, which the socket type does not allow.
*EFSM*:: *EFSM*::
The _zmq_msg_send()_ operation cannot be performed on this socket at the moment The _zmq_msg_send()_ operation cannot be performed on this socket at the moment
due to the socket not being in the appropriate state. This error may occur with due to the socket not being in the appropriate state. This error may occur with
......
...@@ -58,6 +58,8 @@ ERRORS ...@@ -58,6 +58,8 @@ ERRORS
Non-blocking mode was requested and the message cannot be sent at the moment. Non-blocking mode was requested and the message cannot be sent at the moment.
*ENOTSUP*:: *ENOTSUP*::
The _zmq_send()_ operation is not supported by this socket type. The _zmq_send()_ operation is not supported by this socket type.
*EINVAL*::
The sender tried to send multipart data, which the socket type does not allow.
*EFSM*:: *EFSM*::
The _zmq_send()_ operation cannot be performed on this socket at the moment The _zmq_send()_ operation cannot be performed on this socket at the moment
due to the socket not being in the appropriate state. This error may occur with due to the socket not being in the appropriate state. This error may occur with
......
...@@ -63,6 +63,8 @@ ERRORS ...@@ -63,6 +63,8 @@ ERRORS
Non-blocking mode was requested and the message cannot be sent at the moment. Non-blocking mode was requested and the message cannot be sent at the moment.
*ENOTSUP*:: *ENOTSUP*::
The _zmq_sendmsg()_ operation is not supported by this socket type. The _zmq_sendmsg()_ operation is not supported by this socket type.
*EINVAL*::
The sender tried to send multipart data, which the socket type does not allow.
*EFSM*:: *EFSM*::
The _zmq_sendmsg()_ operation cannot be performed on this socket at the moment The _zmq_sendmsg()_ operation cannot be performed on this socket at the moment
due to the socket not being in the appropriate state. This error may occur with due to the socket not being in the appropriate state. This error may occur with
......
...@@ -54,6 +54,11 @@ void zmq::client_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) ...@@ -54,6 +54,11 @@ void zmq::client_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
int zmq::client_t::xsend (msg_t *msg_) int zmq::client_t::xsend (msg_t *msg_)
{ {
// CLIENT sockets do not allow multipart data (ZMQ_SNDMORE)
if (msg_->flags () & msg_t::more) {
errno = EINVAL;
return -1;
}
return lb.sendpipe (msg_, NULL); return lb.sendpipe (msg_, NULL);
} }
......
...@@ -93,6 +93,11 @@ void zmq::server_t::xwrite_activated (pipe_t *pipe_) ...@@ -93,6 +93,11 @@ void zmq::server_t::xwrite_activated (pipe_t *pipe_)
int zmq::server_t::xsend (msg_t *msg_) int zmq::server_t::xsend (msg_t *msg_)
{ {
// SERVER sockets do not allow multipart data (ZMQ_SNDMORE)
if (msg_->flags () & msg_t::more) {
errno = EINVAL;
return -1;
}
// Find the pipe associated with the routing stored in the message. // Find the pipe associated with the routing stored in the message.
uint32_t routing_id = msg_->get_routing_id (); uint32_t routing_id = msg_->get_routing_id ();
outpipes_t::iterator it = outpipes.find (routing_id); outpipes_t::iterator it = outpipes.find (routing_id);
...@@ -108,7 +113,6 @@ int zmq::server_t::xsend (msg_t *msg_) ...@@ -108,7 +113,6 @@ int zmq::server_t::xsend (msg_t *msg_)
errno = EHOSTUNREACH; errno = EHOSTUNREACH;
return -1; return -1;
} }
bool ok = it->second.pipe->write (msg_); bool ok = it->second.pipe->write (msg_);
if (unlikely (!ok)) { if (unlikely (!ok)) {
// Message failed to send - we must close it ourselves. // Message failed to send - we must close it ourselves.
...@@ -118,7 +122,6 @@ int zmq::server_t::xsend (msg_t *msg_) ...@@ -118,7 +122,6 @@ int zmq::server_t::xsend (msg_t *msg_)
else else
it->second.pipe->flush (); it->second.pipe->flush ();
// Detach the message from the data buffer. // Detach the message from the data buffer.
int rc = msg_->init (); int rc = msg_->init ();
errno_assert (rc == 0); errno_assert (rc == 0);
......
...@@ -96,7 +96,7 @@ ...@@ -96,7 +96,7 @@
if (thread_safe) \ if (thread_safe) \
sync.lock(); sync.lock();
#define EXIT_MUTEX() \ #define EXIT_MUTEX(); \
if (thread_safe) \ if (thread_safe) \
sync.unlock(); sync.unlock();
...@@ -329,24 +329,24 @@ void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_) ...@@ -329,24 +329,24 @@ void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
int zmq::socket_base_t::setsockopt (int option_, const void *optval_, int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
ENTER_MUTEX(); ENTER_MUTEX ();
if (!options.is_valid(option_)) { if (!options.is_valid(option_)) {
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
if (unlikely (ctx_terminated)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
// First, check whether specific socket type overloads the option. // First, check whether specific socket type overloads the option.
int rc = xsetsockopt (option_, optval_, optvallen_); int rc = xsetsockopt (option_, optval_, optvallen_);
if (rc == 0 || errno != EINVAL) { if (rc == 0 || errno != EINVAL) {
EXIT_MUTEX(); EXIT_MUTEX ();
return rc; return rc;
} }
...@@ -355,64 +355,64 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, ...@@ -355,64 +355,64 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
rc = options.setsockopt (option_, optval_, optvallen_); rc = options.setsockopt (option_, optval_, optvallen_);
update_pipe_options(option_); update_pipe_options(option_);
EXIT_MUTEX(); EXIT_MUTEX ();
return rc; return rc;
} }
int zmq::socket_base_t::getsockopt (int option_, void *optval_, int zmq::socket_base_t::getsockopt (int option_, void *optval_,
size_t *optvallen_) size_t *optvallen_)
{ {
ENTER_MUTEX(); ENTER_MUTEX ();
if (unlikely (ctx_terminated)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
if (option_ == ZMQ_RCVMORE) { if (option_ == ZMQ_RCVMORE) {
if (*optvallen_ < sizeof (int)) { if (*optvallen_ < sizeof (int)) {
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
memset(optval_, 0, *optvallen_); memset(optval_, 0, *optvallen_);
*((int*) optval_) = rcvmore ? 1 : 0; *((int*) optval_) = rcvmore ? 1 : 0;
*optvallen_ = sizeof (int); *optvallen_ = sizeof (int);
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
if (option_ == ZMQ_FD) { if (option_ == ZMQ_FD) {
if (*optvallen_ < sizeof (fd_t)) { if (*optvallen_ < sizeof (fd_t)) {
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
if (thread_safe) { if (thread_safe) {
// thread safe socket doesn't provide file descriptor // thread safe socket doesn't provide file descriptor
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
*((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd(); *((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd();
*optvallen_ = sizeof(fd_t); *optvallen_ = sizeof(fd_t);
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
if (option_ == ZMQ_EVENTS) { if (option_ == ZMQ_EVENTS) {
if (*optvallen_ < sizeof (int)) { if (*optvallen_ < sizeof (int)) {
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
int rc = process_commands (0, false); int rc = process_commands (0, false);
if (rc != 0 && (errno == EINTR || errno == ETERM)) { if (rc != 0 && (errno == EINTR || errno == ETERM)) {
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
errno_assert (rc == 0); errno_assert (rc == 0);
...@@ -422,86 +422,86 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, ...@@ -422,86 +422,86 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
if (has_in ()) if (has_in ())
*((int*) optval_) |= ZMQ_POLLIN; *((int*) optval_) |= ZMQ_POLLIN;
*optvallen_ = sizeof (int); *optvallen_ = sizeof (int);
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
if (option_ == ZMQ_LAST_ENDPOINT) { if (option_ == ZMQ_LAST_ENDPOINT) {
if (*optvallen_ < last_endpoint.size () + 1) { if (*optvallen_ < last_endpoint.size () + 1) {
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
strcpy (static_cast <char *> (optval_), last_endpoint.c_str ()); strcpy (static_cast <char *> (optval_), last_endpoint.c_str ());
*optvallen_ = last_endpoint.size () + 1; *optvallen_ = last_endpoint.size () + 1;
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
if (option_ == ZMQ_THREAD_SAFE) { if (option_ == ZMQ_THREAD_SAFE) {
if (*optvallen_ < sizeof (int)) { if (*optvallen_ < sizeof (int)) {
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
memset(optval_, 0, *optvallen_); memset(optval_, 0, *optvallen_);
*((int*) optval_) = thread_safe ? 1 : 0; *((int*) optval_) = thread_safe ? 1 : 0;
*optvallen_ = sizeof (int); *optvallen_ = sizeof (int);
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
int rc = options.getsockopt (option_, optval_, optvallen_); int rc = options.getsockopt (option_, optval_, optvallen_);
EXIT_MUTEX(); EXIT_MUTEX ();
return rc; return rc;
} }
int zmq::socket_base_t::add_signaler(signaler_t *s_) int zmq::socket_base_t::add_signaler(signaler_t *s_)
{ {
ENTER_MUTEX(); ENTER_MUTEX ();
if (!thread_safe) { if (!thread_safe) {
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
((mailbox_safe_t*)mailbox)->add_signaler(s_); ((mailbox_safe_t*)mailbox)->add_signaler(s_);
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
int zmq::socket_base_t::remove_signaler(signaler_t *s_) int zmq::socket_base_t::remove_signaler(signaler_t *s_)
{ {
ENTER_MUTEX(); ENTER_MUTEX ();
if (!thread_safe) { if (!thread_safe) {
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
((mailbox_safe_t*)mailbox)->remove_signaler(s_); ((mailbox_safe_t*)mailbox)->remove_signaler(s_);
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
int zmq::socket_base_t::bind (const char *addr_) int zmq::socket_base_t::bind (const char *addr_)
{ {
ENTER_MUTEX(); ENTER_MUTEX ();
if (unlikely (ctx_terminated)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
// Process pending commands, if any. // Process pending commands, if any.
int rc = process_commands (0, false); int rc = process_commands (0, false);
if (unlikely (rc != 0)) { if (unlikely (rc != 0)) {
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -509,7 +509,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -509,7 +509,7 @@ int zmq::socket_base_t::bind (const char *addr_)
std::string protocol; std::string protocol;
std::string address; std::string address;
if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) { if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -521,14 +521,14 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -521,14 +521,14 @@ int zmq::socket_base_t::bind (const char *addr_)
last_endpoint.assign (addr_); last_endpoint.assign (addr_);
options.connected = true; options.connected = true;
} }
EXIT_MUTEX(); EXIT_MUTEX ();
return rc; return rc;
} }
if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") { if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") {
// For convenience's sake, bind can be used interchangeable with // For convenience's sake, bind can be used interchangeable with
// connect for PGM, EPGM and NORM transports. // connect for PGM, EPGM and NORM transports.
EXIT_MUTEX(); EXIT_MUTEX ();
rc = connect (addr_); rc = connect (addr_);
if (rc != -1) if (rc != -1)
options.connected = true; options.connected = true;
...@@ -540,7 +540,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -540,7 +540,7 @@ int zmq::socket_base_t::bind (const char *addr_)
io_thread_t *io_thread = choose_io_thread (options.affinity); io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) { if (!io_thread) {
errno = EMTHREAD; errno = EMTHREAD;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -552,7 +552,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -552,7 +552,7 @@ int zmq::socket_base_t::bind (const char *addr_)
if (rc != 0) { if (rc != 0) {
LIBZMQ_DELETE(listener); LIBZMQ_DELETE(listener);
event_bind_failed (address, zmq_errno()); event_bind_failed (address, zmq_errno());
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -561,7 +561,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -561,7 +561,7 @@ int zmq::socket_base_t::bind (const char *addr_)
add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL); add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
options.connected = true; options.connected = true;
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
...@@ -574,7 +574,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -574,7 +574,7 @@ int zmq::socket_base_t::bind (const char *addr_)
if (rc != 0) { if (rc != 0) {
LIBZMQ_DELETE(listener); LIBZMQ_DELETE(listener);
event_bind_failed (address, zmq_errno()); event_bind_failed (address, zmq_errno());
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -583,7 +583,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -583,7 +583,7 @@ int zmq::socket_base_t::bind (const char *addr_)
add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL); add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
options.connected = true; options.connected = true;
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
#endif #endif
...@@ -596,7 +596,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -596,7 +596,7 @@ int zmq::socket_base_t::bind (const char *addr_)
if (rc != 0) { if (rc != 0) {
LIBZMQ_DELETE(listener); LIBZMQ_DELETE(listener);
event_bind_failed (address, zmq_errno()); event_bind_failed (address, zmq_errno());
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -605,7 +605,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -605,7 +605,7 @@ int zmq::socket_base_t::bind (const char *addr_)
add_endpoint (addr_, (own_t *) listener, NULL); add_endpoint (addr_, (own_t *) listener, NULL);
options.connected = true; options.connected = true;
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
#endif #endif
...@@ -618,7 +618,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -618,7 +618,7 @@ int zmq::socket_base_t::bind (const char *addr_)
if (rc != 0) { if (rc != 0) {
LIBZMQ_DELETE(listener); LIBZMQ_DELETE(listener);
event_bind_failed (address, zmq_errno ()); event_bind_failed (address, zmq_errno ());
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -626,30 +626,30 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -626,30 +626,30 @@ int zmq::socket_base_t::bind (const char *addr_)
add_endpoint (last_endpoint.c_str(), (own_t *) listener, NULL); add_endpoint (last_endpoint.c_str(), (own_t *) listener, NULL);
options.connected = true; options.connected = true;
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
#endif #endif
EXIT_MUTEX(); EXIT_MUTEX ();
zmq_assert (false); zmq_assert (false);
return -1; return -1;
} }
int zmq::socket_base_t::connect (const char *addr_) int zmq::socket_base_t::connect (const char *addr_)
{ {
ENTER_MUTEX(); ENTER_MUTEX ();
if (unlikely (ctx_terminated)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
// Process pending commands, if any. // Process pending commands, if any.
int rc = process_commands (0, false); int rc = process_commands (0, false);
if (unlikely (rc != 0)) { if (unlikely (rc != 0)) {
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -657,7 +657,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -657,7 +657,7 @@ int zmq::socket_base_t::connect (const char *addr_)
std::string protocol; std::string protocol;
std::string address; std::string address;
if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) { if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -763,7 +763,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -763,7 +763,7 @@ int zmq::socket_base_t::connect (const char *addr_)
inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0])); inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0]));
options.connected = true; options.connected = true;
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
bool is_single_connect = (options.type == ZMQ_DEALER || bool is_single_connect = (options.type == ZMQ_DEALER ||
...@@ -775,7 +775,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -775,7 +775,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// There is no valid use for multiple connects for SUB-PUB nor // There is no valid use for multiple connects for SUB-PUB nor
// DEALER-ROUTER nor REQ-REP. Multiple connects produces // DEALER-ROUTER nor REQ-REP. Multiple connects produces
// nonsensical results. // nonsensical results.
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
} }
...@@ -784,7 +784,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -784,7 +784,7 @@ int zmq::socket_base_t::connect (const char *addr_)
io_thread_t *io_thread = choose_io_thread (options.affinity); io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) { if (!io_thread) {
errno = EMTHREAD; errno = EMTHREAD;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -827,7 +827,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -827,7 +827,7 @@ int zmq::socket_base_t::connect (const char *addr_)
if (rc == -1) { if (rc == -1) {
errno = EINVAL; errno = EINVAL;
LIBZMQ_DELETE(paddr); LIBZMQ_DELETE(paddr);
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
// Defer resolution until a socket is opened // Defer resolution until a socket is opened
...@@ -841,7 +841,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -841,7 +841,7 @@ int zmq::socket_base_t::connect (const char *addr_)
int rc = paddr->resolved.ipc_addr->resolve (address.c_str ()); int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
if (rc != 0) { if (rc != 0) {
LIBZMQ_DELETE(paddr); LIBZMQ_DELETE(paddr);
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
} }
...@@ -857,7 +857,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -857,7 +857,7 @@ int zmq::socket_base_t::connect (const char *addr_)
if (res != NULL) if (res != NULL)
pgm_freeaddrinfo (res); pgm_freeaddrinfo (res);
if (rc != 0 || port_number == 0) { if (rc != 0 || port_number == 0) {
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
} }
...@@ -870,7 +870,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -870,7 +870,7 @@ int zmq::socket_base_t::connect (const char *addr_)
int rc = paddr->resolved.tipc_addr->resolve (address.c_str()); int rc = paddr->resolved.tipc_addr->resolve (address.c_str());
if (rc != 0) { if (rc != 0) {
LIBZMQ_DELETE(paddr); LIBZMQ_DELETE(paddr);
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
} }
...@@ -883,7 +883,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -883,7 +883,7 @@ int zmq::socket_base_t::connect (const char *addr_)
int rc = paddr->resolved.vmci_addr->resolve (address.c_str ()); int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
if (rc != 0) { if (rc != 0) {
LIBZMQ_DELETE(paddr); LIBZMQ_DELETE(paddr);
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
} }
...@@ -929,7 +929,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -929,7 +929,7 @@ int zmq::socket_base_t::connect (const char *addr_)
paddr->to_string (last_endpoint); paddr->to_string (last_endpoint);
add_endpoint (addr_, (own_t *) session, newpipe); add_endpoint (addr_, (own_t *) session, newpipe);
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
...@@ -942,19 +942,19 @@ void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe ...@@ -942,19 +942,19 @@ void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe
int zmq::socket_base_t::term_endpoint (const char *addr_) int zmq::socket_base_t::term_endpoint (const char *addr_)
{ {
ENTER_MUTEX(); ENTER_MUTEX ();
// Check whether the library haven't been shut down yet. // Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
// Check whether endpoint address passed to the function is valid. // Check whether endpoint address passed to the function is valid.
if (unlikely (!addr_)) { if (unlikely (!addr_)) {
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -962,7 +962,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) ...@@ -962,7 +962,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
// (from launch_child() for example) we're asked to terminate now. // (from launch_child() for example) we're asked to terminate now.
int rc = process_commands (0, false); int rc = process_commands (0, false);
if (unlikely(rc != 0)) { if (unlikely(rc != 0)) {
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -970,27 +970,27 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) ...@@ -970,27 +970,27 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
std::string protocol; std::string protocol;
std::string address; std::string address;
if (parse_uri(addr_, protocol, address) || check_protocol(protocol)) { if (parse_uri(addr_, protocol, address) || check_protocol(protocol)) {
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
// Disconnect an inproc socket // Disconnect an inproc socket
if (protocol == "inproc") { if (protocol == "inproc") {
if (unregister_endpoint (std::string(addr_), this) == 0) { if (unregister_endpoint (std::string(addr_), this) == 0) {
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
std::pair <inprocs_t::iterator, inprocs_t::iterator> range = inprocs.equal_range (std::string (addr_)); std::pair <inprocs_t::iterator, inprocs_t::iterator> range = inprocs.equal_range (std::string (addr_));
if (range.first == range.second) { if (range.first == range.second) {
errno = ENOENT; errno = ENOENT;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
for (inprocs_t::iterator it = range.first; it != range.second; ++it) for (inprocs_t::iterator it = range.first; it != range.second; ++it)
it->second->terminate (true); it->second->terminate (true);
inprocs.erase (range.first, range.second); inprocs.erase (range.first, range.second);
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
...@@ -998,7 +998,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) ...@@ -998,7 +998,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
std::pair <endpoints_t::iterator, endpoints_t::iterator> range = endpoints.equal_range (std::string (addr_)); std::pair <endpoints_t::iterator, endpoints_t::iterator> range = endpoints.equal_range (std::string (addr_));
if (range.first == range.second) { if (range.first == range.second) {
errno = ENOENT; errno = ENOENT;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -1009,32 +1009,32 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) ...@@ -1009,32 +1009,32 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
term_child (it->second.first); term_child (it->second.first);
} }
endpoints.erase (range.first, range.second); endpoints.erase (range.first, range.second);
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
int zmq::socket_base_t::send (msg_t *msg_, int flags_) int zmq::socket_base_t::send (msg_t *msg_, int flags_)
{ {
ENTER_MUTEX(); ENTER_MUTEX ();
// Check whether the library haven't been shut down yet. // Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
// Check whether message passed to the function is valid. // Check whether message passed to the function is valid.
if (unlikely (!msg_ || !msg_->check ())) { if (unlikely (!msg_ || !msg_->check ())) {
errno = EFAULT; errno = EFAULT;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
// Process pending commands, if any. // Process pending commands, if any.
int rc = process_commands (0, true); int rc = process_commands (0, true);
if (unlikely (rc != 0)) { if (unlikely (rc != 0)) {
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -1047,21 +1047,21 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) ...@@ -1047,21 +1047,21 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
msg_->reset_metadata (); msg_->reset_metadata ();
// Try to send the message. // Try to send the message using method in each socket class
rc = xsend (msg_); rc = xsend (msg_);
if (rc == 0) { if (rc == 0) {
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
if (unlikely (errno != EAGAIN)) { if (unlikely (errno != EAGAIN)) {
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
// In case of non-blocking send we'll simply propagate // In case of non-blocking send we'll simply propagate
// the error - including EAGAIN - up the stack. // the error - including EAGAIN - up the stack.
if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) { if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) {
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -1075,45 +1075,45 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) ...@@ -1075,45 +1075,45 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
// If timeout is reached in the meantime, return EAGAIN. // If timeout is reached in the meantime, return EAGAIN.
while (true) { while (true) {
if (unlikely (process_commands (timeout, false) != 0)) { if (unlikely (process_commands (timeout, false) != 0)) {
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
rc = xsend (msg_); rc = xsend (msg_);
if (rc == 0) if (rc == 0)
break; break;
if (unlikely (errno != EAGAIN)) { if (unlikely (errno != EAGAIN)) {
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
if (timeout > 0) { if (timeout > 0) {
timeout = (int) (end - clock.now_ms ()); timeout = (int) (end - clock.now_ms ());
if (timeout <= 0) { if (timeout <= 0) {
errno = EAGAIN; errno = EAGAIN;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
} }
} }
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
int zmq::socket_base_t::recv (msg_t *msg_, int flags_) int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
{ {
ENTER_MUTEX(); ENTER_MUTEX ();
// Check whether the library haven't been shut down yet. // Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
// Check whether message passed to the function is valid. // Check whether message passed to the function is valid.
if (unlikely (!msg_ || !msg_->check ())) { if (unlikely (!msg_ || !msg_->check ())) {
errno = EFAULT; errno = EFAULT;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -1127,7 +1127,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -1127,7 +1127,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// ticks is more efficient than doing RDTSC all the time. // ticks is more efficient than doing RDTSC all the time.
if (++ticks == inbound_poll_rate) { if (++ticks == inbound_poll_rate) {
if (unlikely (process_commands (0, false) != 0)) { if (unlikely (process_commands (0, false) != 0)) {
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
ticks = 0; ticks = 0;
...@@ -1136,7 +1136,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -1136,7 +1136,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// Get the message. // Get the message.
int rc = xrecv (msg_); int rc = xrecv (msg_);
if (unlikely (rc != 0 && errno != EAGAIN)) { if (unlikely (rc != 0 && errno != EAGAIN)) {
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -1145,7 +1145,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -1145,7 +1145,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
if (file_desc != retired_fd) if (file_desc != retired_fd)
msg_->set_fd(file_desc); msg_->set_fd(file_desc);
extract_flags (msg_); extract_flags (msg_);
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
...@@ -1155,21 +1155,21 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -1155,21 +1155,21 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// If it's not, return EAGAIN. // If it's not, return EAGAIN.
if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) { if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) {
if (unlikely (process_commands (0, false) != 0)) { if (unlikely (process_commands (0, false) != 0)) {
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
ticks = 0; ticks = 0;
rc = xrecv (msg_); rc = xrecv (msg_);
if (rc < 0) { if (rc < 0) {
EXIT_MUTEX(); EXIT_MUTEX ();
return rc; return rc;
} }
if (file_desc != retired_fd) if (file_desc != retired_fd)
msg_->set_fd(file_desc); msg_->set_fd(file_desc);
extract_flags (msg_); extract_flags (msg_);
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
...@@ -1183,7 +1183,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -1183,7 +1183,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
bool block = (ticks != 0); bool block = (ticks != 0);
while (true) { while (true) {
if (unlikely (process_commands (block ? timeout : 0, false) != 0)) { if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
rc = xrecv (msg_); rc = xrecv (msg_);
...@@ -1192,7 +1192,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -1192,7 +1192,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
break; break;
} }
if (unlikely (errno != EAGAIN)) { if (unlikely (errno != EAGAIN)) {
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
block = true; block = true;
...@@ -1200,7 +1200,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -1200,7 +1200,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
timeout = (int) (end - clock.now_ms ()); timeout = (int) (end - clock.now_ms ());
if (timeout <= 0) { if (timeout <= 0) {
errno = EAGAIN; errno = EAGAIN;
EXIT_MUTEX(); EXIT_MUTEX ();
return -1; return -1;
} }
} }
...@@ -1209,7 +1209,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -1209,7 +1209,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
if (file_desc != retired_fd) if (file_desc != retired_fd)
msg_->set_fd(file_desc); msg_->set_fd(file_desc);
extract_flags (msg_); extract_flags (msg_);
EXIT_MUTEX(); EXIT_MUTEX ();
return 0; return 0;
} }
...@@ -1246,7 +1246,7 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_) ...@@ -1246,7 +1246,7 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_)
if (!thread_safe) if (!thread_safe)
fd = ((mailbox_t*)mailbox)->get_fd(); fd = ((mailbox_t*)mailbox)->get_fd();
else { else {
ENTER_MUTEX(); ENTER_MUTEX ();
reaper_signaler = new signaler_t(); reaper_signaler = new signaler_t();
...@@ -1257,7 +1257,7 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_) ...@@ -1257,7 +1257,7 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_)
// Send a signal to make sure reaper handle existing commands // Send a signal to make sure reaper handle existing commands
reaper_signaler->send(); reaper_signaler->send();
EXIT_MUTEX(); EXIT_MUTEX ();
} }
handle = poller->add_fd (fd, this); handle = poller->add_fd (fd, this);
...@@ -1426,15 +1426,14 @@ void zmq::socket_base_t::in_event () ...@@ -1426,15 +1426,14 @@ void zmq::socket_base_t::in_event ()
// of the reaper thread. Process any commands from other threads/sockets // of the reaper thread. Process any commands from other threads/sockets
// that may be available at the moment. Ultimately, the socket will // that may be available at the moment. Ultimately, the socket will
// be destroyed. // be destroyed.
ENTER_MUTEX ();
ENTER_MUTEX();
// If the socket is thread safe we need to unsignal the reaper signaler // If the socket is thread safe we need to unsignal the reaper signaler
if (thread_safe) if (thread_safe)
reaper_signaler->recv(); reaper_signaler->recv();
process_commands (0, false); process_commands (0, false);
EXIT_MUTEX(); EXIT_MUTEX ();
check_destroy (); check_destroy ();
} }
......
...@@ -347,7 +347,7 @@ static int ...@@ -347,7 +347,7 @@ static int
s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_) s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
{ {
int sz = (int) zmq_msg_size (msg_); int sz = (int) zmq_msg_size (msg_);
int rc = s_->send ((zmq::msg_t*) msg_, flags_); int rc = s_->send ((zmq::msg_t *) msg_, flags_);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
return sz; return sz;
...@@ -393,7 +393,7 @@ int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_) ...@@ -393,7 +393,7 @@ int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
return -1; return -1;
} }
zmq_msg_t msg; zmq_msg_t msg;
int rc = zmq_msg_init_data (&msg, (void*)buf_, len_, NULL, NULL); int rc = zmq_msg_init_data (&msg, (void *)buf_, len_, NULL, NULL);
if (rc != 0) if (rc != 0)
return -1; return -1;
......
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
int main (void) int main (void)
{ {
setup_test_environment(); setup_test_environment ();
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
...@@ -51,36 +51,43 @@ int main (void) ...@@ -51,36 +51,43 @@ int main (void)
char *data = (char *) zmq_msg_data (&msg); char *data = (char *) zmq_msg_data (&msg);
data [0] = 1; data [0] = 1;
rc = zmq_msg_send(&msg, client, 0); rc = zmq_msg_send (&msg, client, ZMQ_SNDMORE);
assert (rc == -1);
rc = zmq_msg_send (&msg, client, 0);
assert (rc == 1); assert (rc == 1);
rc = zmq_msg_init (&msg); rc = zmq_msg_init (&msg);
assert (rc == 0); assert (rc == 0);
rc = zmq_msg_recv (&msg, server, 0); rc = zmq_msg_recv (&msg, server, 0);
assert (rc == 1); assert (rc == 1);
uint32_t routing_id = zmq_msg_routing_id (&msg); uint32_t routing_id = zmq_msg_routing_id (&msg);
assert (routing_id != 0); assert (routing_id != 0);
rc = zmq_msg_close(&msg); rc = zmq_msg_close (&msg);
assert (rc == 0); assert (rc == 0);
rc = zmq_msg_init_size (&msg, 1); rc = zmq_msg_init_size (&msg, 1);
assert (rc == 0); assert (rc == 0);
data = (char *)zmq_msg_data(&msg); data = (char *)zmq_msg_data (&msg);
data[0] = 2; data[0] = 2;
rc = zmq_msg_set_routing_id(&msg, routing_id); rc = zmq_msg_set_routing_id (&msg, routing_id);
assert (rc == 0); assert (rc == 0);
rc = zmq_msg_send (&msg, server, ZMQ_SNDMORE);
assert (rc == -1);
rc = zmq_msg_send(&msg, server, 0); rc = zmq_msg_send (&msg, server, 0);
assert (rc == 1); assert (rc == 1);
rc = zmq_msg_recv(&msg, client, 0); rc = zmq_msg_recv (&msg, client, 0);
assert (rc == 1); assert (rc == 1);
rc = zmq_msg_close(&msg); rc = zmq_msg_close (&msg);
assert (rc == 0); assert (rc == 0);
rc = zmq_close (server); rc = zmq_close (server);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment