Commit 8dbda15e authored by Constantin Rack's avatar Constantin Rack

Merge pull request #1745 from somdoron/master

parents 4f4e4753 389e853c
...@@ -49,6 +49,7 @@ ...@@ -49,6 +49,7 @@
zmq::udp_address_t::udp_address_t () zmq::udp_address_t::udp_address_t ()
{ {
memset (&bind_address, 0, sizeof bind_address); memset (&bind_address, 0, sizeof bind_address);
memset (&dest_address, 0, sizeof dest_address);
} }
zmq::udp_address_t::~udp_address_t () zmq::udp_address_t::~udp_address_t ()
...@@ -68,11 +69,6 @@ int zmq::udp_address_t::resolve (const char *name_) ...@@ -68,11 +69,6 @@ int zmq::udp_address_t::resolve (const char *name_)
std::string addr_str (name_, delimiter - name_); std::string addr_str (name_, delimiter - name_);
std::string port_str (delimiter + 1); std::string port_str (delimiter + 1);
// Remove square brackets around the address, if any, as used in IPv6
if (addr_str.size () >= 2 && addr_str [0] == '[' &&
addr_str [addr_str.size () - 1] == ']')
addr_str = addr_str.substr (1, addr_str.size () - 2);
// Parse the port number (0 is not a valid port). // Parse the port number (0 is not a valid port).
uint16_t port = (uint16_t) atoi (port_str.c_str ()); uint16_t port = (uint16_t) atoi (port_str.c_str ());
if (port == 0) { if (port == 0) {
......
...@@ -91,7 +91,7 @@ void zmq::udp_engine_t::plug (io_thread_t* io_thread_, session_base_t *session_) ...@@ -91,7 +91,7 @@ void zmq::udp_engine_t::plug (io_thread_t* io_thread_, session_base_t *session_)
mreq.imr_multiaddr = address->resolved.udp_addr->multicast_ip (); mreq.imr_multiaddr = address->resolved.udp_addr->multicast_ip ();
mreq.imr_interface = address->resolved.udp_addr->interface_ip (); mreq.imr_interface = address->resolved.udp_addr->interface_ip ();
int rc = setsockopt (fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof (mreq)); int rc = setsockopt (fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char*) &mreq, sizeof (mreq));
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
...@@ -145,10 +145,17 @@ void zmq::udp_engine_t::out_event() ...@@ -145,10 +145,17 @@ void zmq::udp_engine_t::out_event()
body_msg.close (); body_msg.close ();
errno_assert (rc == 0); errno_assert (rc == 0);
#ifdef ZMQ_HAVE_WINDOWS
rc = sendto(fd, (char*) out_buffer, size, 0,
address->resolved.udp_addr->dest_addr(),
address->resolved.udp_addr->dest_addrlen());
wsa_assert(rc != SOCKET_ERROR);
#else
rc = sendto (fd, out_buffer, size, 0, rc = sendto (fd, out_buffer, size, 0,
address->resolved.udp_addr->dest_addr (), address->resolved.udp_addr->dest_addr (),
address->resolved.udp_addr->dest_addrlen ()); address->resolved.udp_addr->dest_addrlen ());
errno_assert (rc != -1); errno_assert (rc != -1);
#endif
} }
else else
reset_pollout (handle); reset_pollout (handle);
...@@ -170,46 +177,63 @@ void zmq::udp_engine_t::restart_output() ...@@ -170,46 +177,63 @@ void zmq::udp_engine_t::restart_output()
void zmq::udp_engine_t::in_event() void zmq::udp_engine_t::in_event()
{ {
size_t read = recv (fd, in_buffer, MAX_UDP_MSG, 0); #ifdef ZMQ_HAVE_WINDOWS
int nbytes = recv(fd, (char*) in_buffer, MAX_UDP_MSG, 0);
if (read > 0) { const int last_error = WSAGetLastError();
size_t group_size = in_buffer[0]; if (nbytes == SOCKET_ERROR) {
wsa_assert(
// This doesn't fit, just ingore last_error == WSAENETDOWN ||
if (read - 1 < group_size) last_error == WSAENETRESET ||
return; last_error == WSAEWOULDBLOCK);
return;
}
#else
int nbytes = recv(fd, in_buffer, MAX_UDP_MSG, 0);
if (nbytes == -1) {
errno_assert(errno != EBADF
&& errno != EFAULT
&& errno != ENOMEM
&& errno != ENOTSOCK);
return;
}
#endif
size_t body_size = read -1 - group_size; int group_size = in_buffer[0];
msg_t msg; // This doesn't fit, just ingore
int rc = msg.init_size (group_size); if (nbytes - 1 < group_size)
errno_assert (rc == 0); return;
msg.set_flags (msg_t::more);
memcpy (msg.data (), in_buffer + 1, group_size);
rc = session->push_msg (&msg); int body_size = nbytes - 1 - group_size;
errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));
// Pipe is full msg_t msg;
if (rc != 0) { int rc = msg.init_size (group_size);
rc = msg.close (); errno_assert (rc == 0);
errno_assert (rc == 0); msg.set_flags (msg_t::more);
memcpy (msg.data (), in_buffer + 1, group_size);
reset_pollin (handle); rc = session->push_msg (&msg);
return; errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));
}
// Pipe is full
if (rc != 0) {
rc = msg.close (); rc = msg.close ();
errno_assert (rc == 0); errno_assert (rc == 0);
rc = msg.init_size (body_size);
errno_assert (rc == 0); reset_pollin (handle);
memcpy (msg.data (), in_buffer + 1 + group_size, body_size); return;
rc = session->push_msg (&msg);
errno_assert (rc == 0);
rc = msg.close ();
errno_assert (rc == 0);
session->flush ();
} }
rc = msg.close ();
errno_assert (rc == 0);
rc = msg.init_size (body_size);
errno_assert (rc == 0);
memcpy (msg.data (), in_buffer + 1 + group_size, body_size);
rc = session->push_msg (&msg);
errno_assert (rc == 0);
rc = msg.close ();
errno_assert (rc == 0);
session->flush ();
} }
void zmq::udp_engine_t::restart_input() void zmq::udp_engine_t::restart_input()
......
...@@ -77,6 +77,7 @@ int msg_recv_cmp (zmq_msg_t *msg_, void *s_, const char* group_, const char* bod ...@@ -77,6 +77,7 @@ int msg_recv_cmp (zmq_msg_t *msg_, void *s_, const char* group_, const char* bod
} }
zmq_msg_close (msg_); zmq_msg_close (msg_);
free(body);
return recv_rc; return recv_rc;
} }
......
...@@ -77,6 +77,7 @@ int msg_recv_cmp (zmq_msg_t *msg_, void *s_, const char* group_, const char* bod ...@@ -77,6 +77,7 @@ int msg_recv_cmp (zmq_msg_t *msg_, void *s_, const char* group_, const char* bod
} }
zmq_msg_close (msg_); zmq_msg_close (msg_);
free (body);
return recv_rc; return recv_rc;
} }
...@@ -91,10 +92,10 @@ int main (void) ...@@ -91,10 +92,10 @@ int main (void)
void *radio = zmq_socket (ctx, ZMQ_RADIO); void *radio = zmq_socket (ctx, ZMQ_RADIO);
void *dish = zmq_socket (ctx, ZMQ_DISH); void *dish = zmq_socket (ctx, ZMQ_DISH);
int rc = zmq_bind (radio, "udp://127.0.0.1:5556"); int rc = zmq_connect (radio, "udp://127.0.0.1:5556");
assert (rc == 0); assert (rc == 0);
rc = zmq_connect (dish, "udp://127.0.0.1:5556"); rc = zmq_bind (dish, "udp://127.0.0.1:5556");
assert (rc == 0); assert (rc == 0);
zmq_sleep (1); zmq_sleep (1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment