Commit c7effea9 authored by Luca Boccassi's avatar Luca Boccassi

Merge pull request #1995 from somdoron/udp_raw_sockets

problem: udp doesn't enforce correct usage of bind/connect
parents 443176d5 c4d0146f
...@@ -556,9 +556,9 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -556,9 +556,9 @@ int zmq::socket_base_t::bind (const char *addr_)
return rc; return rc;
} }
if (protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp") { if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") {
// For convenience's sake, bind can be used interchangeable with // For convenience's sake, bind can be used interchangeable with
// connect for PGM, EPGM, NORM and UDP transports. // connect for PGM, EPGM, NORM transports.
EXIT_MUTEX (); EXIT_MUTEX ();
rc = connect (addr_); rc = connect (addr_);
if (rc != -1) if (rc != -1)
...@@ -566,6 +566,64 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -566,6 +566,64 @@ int zmq::socket_base_t::bind (const char *addr_)
return rc; return rc;
} }
if (protocol == "udp") {
if (!(options.type == ZMQ_DGRAM || options.type == ZMQ_DISH)) {
errno = ENOCOMPATPROTO;
EXIT_MUTEX ();
return -1;
}
// Choose the I/O thread to run the session in.
io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
errno = EMTHREAD;
EXIT_MUTEX ();
return -1;
}
address_t *paddr = new (std::nothrow) address_t (protocol, address, this->get_ctx ());
alloc_assert (paddr);
paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
alloc_assert (paddr->resolved.udp_addr);
rc = paddr->resolved.udp_addr->resolve (address.c_str(), true);
if (rc != 0) {
LIBZMQ_DELETE(paddr);
EXIT_MUTEX ();
return -1;
}
session_base_t *session = session_base_t::create (io_thread, true, this,
options, paddr);
errno_assert (session);
pipe_t *newpipe = NULL;
// Create a bi-directional pipe.
object_t *parents [2] = {this, session};
pipe_t *new_pipes [2] = {NULL, NULL};
int hwms [2] = {options.sndhwm, options.rcvhwm};
bool conflates [2] = {false, false};
rc = pipepair (parents, new_pipes, hwms, conflates);
errno_assert (rc == 0);
// Attach local end of the pipe to the socket object.
attach_pipe (new_pipes [0], true);
newpipe = new_pipes [0];
// Attach remote end of the pipe to the session object later on.
session->attach_pipe (new_pipes [1]);
// Save last endpoint URI
paddr->to_string (last_endpoint);
add_endpoint (addr_, (own_t *) session, newpipe);
EXIT_MUTEX ();
return 0;
}
// Remaining transports require to be run in an I/O thread, so at this // Remaining transports require to be run in an I/O thread, so at this
// point we'll choose one. // point we'll choose one.
io_thread_t *io_thread = choose_io_thread (options.affinity); io_thread_t *io_thread = choose_io_thread (options.affinity);
...@@ -881,9 +939,15 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -881,9 +939,15 @@ int zmq::socket_base_t::connect (const char *addr_)
#endif #endif
if (protocol == "udp") { if (protocol == "udp") {
if (options.type != ZMQ_RADIO) {
errno = ENOCOMPATPROTO;
EXIT_MUTEX ();
return -1;
}
paddr->resolved.udp_addr = new (std::nothrow) udp_address_t (); paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
alloc_assert (paddr->resolved.udp_addr); alloc_assert (paddr->resolved.udp_addr);
rc = paddr->resolved.udp_addr->resolve (address.c_str(), (options.type == ZMQ_DISH || options.type == ZMQ_DGRAM)); rc = paddr->resolved.udp_addr->resolve (address.c_str(), false);
if (rc != 0) { if (rc != 0) {
LIBZMQ_DELETE(paddr); LIBZMQ_DELETE(paddr);
EXIT_MUTEX (); EXIT_MUTEX ();
...@@ -1284,7 +1348,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -1284,7 +1348,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
int zmq::socket_base_t::close () int zmq::socket_base_t::close ()
{ {
ENTER_MUTEX (); ENTER_MUTEX ();
// Remove all existing signalers for thread safe sockets // Remove all existing signalers for thread safe sockets
if (thread_safe) if (thread_safe)
((mailbox_safe_t*)mailbox)->clear_signalers(); ((mailbox_safe_t*)mailbox)->clear_signalers();
......
...@@ -55,7 +55,7 @@ zmq::udp_address_t::~udp_address_t () ...@@ -55,7 +55,7 @@ zmq::udp_address_t::~udp_address_t ()
{ {
} }
int zmq::udp_address_t::resolve (const char *name_, bool receiver_) int zmq::udp_address_t::resolve (const char *name_, bool bind_)
{ {
// Find the ':' at end that separates address from the port number. // Find the ':' at end that separates address from the port number.
const char *delimiter = strrchr (name_, ':'); const char *delimiter = strrchr (name_, ':');
...@@ -78,8 +78,8 @@ int zmq::udp_address_t::resolve (const char *name_, bool receiver_) ...@@ -78,8 +78,8 @@ int zmq::udp_address_t::resolve (const char *name_, bool receiver_)
dest_address.sin_family = AF_INET; dest_address.sin_family = AF_INET;
dest_address.sin_port = htons (port); dest_address.sin_port = htons (port);
// Only when the udp is receiver we allow * as the address // Only when the udp should bind we allow * as the address
if (addr_str == "*" && receiver_) if (addr_str == "*" && bind_)
dest_address.sin_addr.s_addr = htons (INADDR_ANY); dest_address.sin_addr.s_addr = htons (INADDR_ANY);
else else
dest_address.sin_addr.s_addr = inet_addr (addr_str.c_str ()); dest_address.sin_addr.s_addr = inet_addr (addr_str.c_str ());
...@@ -106,9 +106,9 @@ int zmq::udp_address_t::resolve (const char *name_, bool receiver_) ...@@ -106,9 +106,9 @@ int zmq::udp_address_t::resolve (const char *name_, bool receiver_)
return -1; return -1;
} }
// If a receiver and not a multicast, the dest address // If a should bind and not a multicast, the dest address
// is actually the bind address // is actually the bind address
if (receiver_ && !is_mutlicast) if (bind_ && !is_mutlicast)
bind_address = dest_address; bind_address = dest_address;
else { else {
bind_address.sin_family = AF_INET; bind_address.sin_family = AF_INET;
......
...@@ -60,7 +60,11 @@ int main (void) ...@@ -60,7 +60,11 @@ int main (void)
void *sender = zmq_socket (ctx, ZMQ_DGRAM); void *sender = zmq_socket (ctx, ZMQ_DGRAM);
void *listener = zmq_socket (ctx, ZMQ_DGRAM); void *listener = zmq_socket (ctx, ZMQ_DGRAM);
int rc = zmq_bind (listener, "udp://*:5556"); // Connecting dgram shoudl fail
int rc = zmq_connect (listener, "udp://127.0.0.1:5556");
assert (rc == -1);
rc = zmq_bind (listener, "udp://*:5556");
assert (rc == 0); assert (rc == 0);
rc = zmq_bind (sender, "udp://*:5557"); rc = zmq_bind (sender, "udp://*:5557");
...@@ -68,7 +72,7 @@ int main (void) ...@@ -68,7 +72,7 @@ int main (void)
str_send_to (sender, "Is someone there ?", "127.0.0.1:5556"); str_send_to (sender, "Is someone there ?", "127.0.0.1:5556");
str_recv_from (listener, &message_string, &address); str_recv_from (listener, &message_string, &address);
assert (strcmp(message_string, "Is someone there ?") == 0); assert (strcmp(message_string, "Is someone there ?") == 0);
assert (strcmp(address, "127.0.0.1:5557") == 0); assert (strcmp(address, "127.0.0.1:5557") == 0);
free (message_string); free (message_string);
......
...@@ -95,9 +95,17 @@ int main (void) ...@@ -95,9 +95,17 @@ int main (void)
void *radio = zmq_socket (ctx, ZMQ_RADIO); void *radio = zmq_socket (ctx, ZMQ_RADIO);
void *dish = zmq_socket (ctx, ZMQ_DISH); void *dish = zmq_socket (ctx, ZMQ_DISH);
int rc = zmq_bind (dish, "udp://*:5556"); // Connecting dish should fail
int rc = zmq_connect (dish, "udp://127.0.0.1:5556");
assert (rc == -1);
rc = zmq_bind (dish, "udp://*:5556");
assert (rc == 0); assert (rc == 0);
// Bind radio should fail
rc = zmq_bind (radio, "udp://*:5556");
assert (rc == -1);
rc = zmq_connect (radio, "udp://127.0.0.1:5556"); rc = zmq_connect (radio, "udp://127.0.0.1:5556");
assert (rc == 0); assert (rc == 0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment