Commit 443176d5 authored by Luca Boccassi's avatar Luca Boccassi

Merge pull request #1994 from somdoron/udp_raw_sockets

problem: no way to send raw udp messages with zeromq.
parents 72f19648 0db70e24
......@@ -134,6 +134,7 @@ test_socketopt_hwm
test_use_fd_ipc
test_use_fd_tcp
test_pub_invert_matching
test_dgram
tests/test*.log
tests/test*.trs
src/platform.hpp*
......
......@@ -436,6 +436,7 @@ set (cxx-sources
curve_server.cpp
dealer.cpp
devpoll.cpp
dgram.cpp
dist.cpp
epoll.cpp
err.cpp
......
......@@ -45,6 +45,8 @@ src_libzmq_la_SOURCES = \
src/decoder.hpp \
src/devpoll.cpp \
src/devpoll.hpp \
src/dgram.cpp \
src/dgram.hpp \
src/dish.cpp \
src/dish.hpp \
src/dist.cpp \
......@@ -727,7 +729,8 @@ test_apps += tests/test_poller \
tests/test_timers \
tests/test_radio_dish \
tests/test_udp \
tests/test_scatter_gather
tests/test_scatter_gather \
tests/test_dgram
tests_test_poller_SOURCES = tests/test_poller.cpp
tests_test_poller_LDADD = src/libzmq.la
......@@ -749,6 +752,9 @@ tests_test_udp_LDADD = src/libzmq.la
tests_test_scatter_gather_SOURCES = tests/test_scatter_gather.cpp
tests_test_scatter_gather_LDADD = src/libzmq.la
tests_test_dgram_SOURCES = tests/test_dgram.cpp
tests_test_dgram_LDADD = src/libzmq.la
endif
check_PROGRAMS = ${test_apps}
......
......@@ -523,6 +523,7 @@ ZMQ_EXPORT void zmq_threadclose (void* thread);
#define ZMQ_DISH 15
#define ZMQ_GATHER 16
#define ZMQ_SCATTER 17
#define ZMQ_DGRAM 18
/* DRAFT Socket methods. */
ZMQ_EXPORT int zmq_join (void *s, const char *group);
......
/*
Copyright (c) 2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "precompiled.hpp"
#include "macros.hpp"
#include "dgram.hpp"
#include "pipe.hpp"
#include "wire.hpp"
#include "random.hpp"
#include "likely.hpp"
#include "err.hpp"
zmq::dgram_t::dgram_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
pipe (NULL),
last_in (NULL),
more_out (false)
{
options.type = ZMQ_DGRAM;
options.raw_socket = true;
}
zmq::dgram_t::~dgram_t ()
{
zmq_assert (!pipe);
}
void zmq::dgram_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{
LIBZMQ_UNUSED(subscribe_to_all_);
zmq_assert (pipe_);
// ZMQ_DGRAM socket can only be connected to a single peer.
// The socket rejects any further connection requests.
if (pipe == NULL)
pipe = pipe_;
else
pipe_->terminate (false);
}
void zmq::dgram_t::xpipe_terminated (pipe_t *pipe_)
{
if (pipe_ == pipe) {
if (last_in == pipe) {
saved_credential = last_in->get_credential ();
last_in = NULL;
}
pipe = NULL;
}
}
void zmq::dgram_t::xread_activated (pipe_t *)
{
// There's just one pipe. No lists of active and inactive pipes.
// There's nothing to do here.
}
void zmq::dgram_t::xwrite_activated (pipe_t *)
{
// There's just one pipe. No lists of active and inactive pipes.
// There's nothing to do here.
}
int zmq::dgram_t::xsend (msg_t *msg_)
{
// If there's no out pipe, just drop it.
if (!pipe) {
int rc = msg_->close ();
errno_assert (rc == 0);
return -1;
}
// If this is the first part of the message it's the ID of the
// peer to send the message to.
if (!more_out) {
if (!(msg_->flags () & msg_t::more)) {
errno = EINVAL;
return -1;
}
// Expect one more message frame.
more_out = true;
}
else {
// dgram messages are two part only, reject part if more is set
if (msg_->flags () & msg_t::more) {
errno = EINVAL;
return -1;
}
// This is the last part of the message.
more_out = false;
}
// Push the message into the pipe.
if (!pipe->write (msg_)) {
errno = EAGAIN;
return -1;
}
if (!(msg_->flags () & msg_t::more))
pipe->flush ();
// Detach the message from the data buffer.
int rc = msg_->init ();
errno_assert (rc == 0);
return 0;
}
int zmq::dgram_t::xrecv (msg_t *msg_)
{
// Deallocate old content of the message.
int rc = msg_->close ();
errno_assert (rc == 0);
if (!pipe || !pipe->read (msg_)) {
// Initialise the output parameter to be a 0-byte message.
rc = msg_->init ();
errno_assert (rc == 0);
errno = EAGAIN;
return -1;
}
last_in = pipe;
return 0;
}
bool zmq::dgram_t::xhas_in ()
{
if (!pipe)
return false;
return pipe->check_read ();
}
bool zmq::dgram_t::xhas_out ()
{
if (!pipe)
return false;
return pipe->check_write ();
}
zmq::blob_t zmq::dgram_t::get_credential () const
{
return last_in? last_in->get_credential (): saved_credential;
}
/*
Copyright (c) 2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_DGRAM_HPP_INCLUDED__
#define __ZMQ_DGRAM_HPP_INCLUDED__
#include "blob.hpp"
#include "socket_base.hpp"
#include "session_base.hpp"
namespace zmq
{
class ctx_t;
class msg_t;
class pipe_t;
class io_thread_t;
class dgram_t :
public socket_base_t
{
public:
dgram_t (zmq::ctx_t *parent_, uint32_t tid_, int sid);
~dgram_t ();
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
bool xhas_out ();
blob_t get_credential () const;
void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);
private:
zmq::pipe_t *pipe;
zmq::pipe_t *last_in;
blob_t saved_credential;
// If true, more outgoing message parts are expected.
bool more_out;
dgram_t (const dgram_t&);
const dgram_t &operator = (const dgram_t&);
};
}
#endif
......@@ -78,8 +78,8 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type) const
"XPUB", "XSUB", "STREAM",
"SERVER", "CLIENT",
"RADIO", "DISH",
"GATHER", "SCATTER"};
zmq_assert (socket_type >= 0 && socket_type <= 17);
"GATHER", "SCATTER", "DGRAM"};
zmq_assert (socket_type >= 0 && socket_type <= 18);
return names [socket_type];
}
......@@ -203,6 +203,8 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const
return type_ == "SCATTER";
case ZMQ_SCATTER:
return type_ == "GATHER";
case ZMQ_DGRAM:
return type_ == "DGRAM";
default:
break;
}
......
......@@ -83,6 +83,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
case ZMQ_CLIENT:
case ZMQ_GATHER:
case ZMQ_SCATTER:
case ZMQ_DGRAM:
s = new (std::nothrow) session_base_t (io_thread_, active_,
socket_, options_, addr_);
break;
......@@ -572,9 +573,9 @@ void zmq::session_base_t::start_connecting (bool wait_)
#endif
if (addr->protocol == "udp") {
zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO);
zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO || options.type == ZMQ_DGRAM);
udp_engine_t* engine = new (std::nothrow) udp_engine_t ();
udp_engine_t* engine = new (std::nothrow) udp_engine_t (options);
alloc_assert (engine);
bool recv = false;
......@@ -588,6 +589,10 @@ void zmq::session_base_t::start_connecting (bool wait_)
send = false;
recv = true;
}
else if (options.type == ZMQ_DGRAM) {
send = true;
recv = true;
}
int rc = engine->init (addr, send, recv);
errno_assert (rc == 0);
......
......@@ -95,6 +95,7 @@
#include "dish.hpp"
#include "gather.hpp"
#include "scatter.hpp"
#include "dgram.hpp"
#define ENTER_MUTEX() \
if (thread_safe) \
......@@ -168,6 +169,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
case ZMQ_SCATTER:
s = new (std::nothrow) scatter_t (parent_, tid_, sid_);
break;
case ZMQ_DGRAM:
s = new (std::nothrow) dgram_t (parent_, tid_, sid_);
break;
default:
errno = EINVAL;
return NULL;
......@@ -304,7 +308,8 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
#endif
if (protocol_ == "udp" && (options.type != ZMQ_DISH &&
options.type != ZMQ_RADIO)) {
options.type != ZMQ_RADIO &&
options.type != ZMQ_DGRAM)) {
errno = ENOCOMPATPROTO;
return -1;
}
......@@ -878,7 +883,7 @@ int zmq::socket_base_t::connect (const char *addr_)
if (protocol == "udp") {
paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
alloc_assert (paddr->resolved.udp_addr);
rc = paddr->resolved.udp_addr->resolve (address.c_str(), options.type == ZMQ_DISH);
rc = paddr->resolved.udp_addr->resolve (address.c_str(), (options.type == ZMQ_DISH || options.type == ZMQ_DGRAM));
if (rc != 0) {
LIBZMQ_DELETE(paddr);
EXIT_MUTEX ();
......
......@@ -43,12 +43,13 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "err.hpp"
#include "ip.hpp"
zmq::udp_engine_t::udp_engine_t() :
zmq::udp_engine_t::udp_engine_t(const options_t &options_) :
plugged (false),
fd(-1),
session(NULL),
handle(NULL),
address(NULL),
options(options_),
send_enabled(false),
recv_enabled(false)
{
......@@ -100,8 +101,18 @@ void zmq::udp_engine_t::plug (io_thread_t* io_thread_, session_base_t *session_)
io_object_t::plug (io_thread_);
handle = add_fd (fd);
if (send_enabled)
if (send_enabled) {
if (!options.raw_socket) {
out_address = address->resolved.udp_addr->dest_addr ();
out_addrlen = address->resolved.udp_addr->dest_addrlen ();
}
else {
out_address = (sockaddr *) &raw_address;
out_addrlen = sizeof (sockaddr_in);
}
set_pollout (handle);
}
if (recv_enabled) {
int on = 1;
......@@ -151,6 +162,54 @@ void zmq::udp_engine_t::terminate()
delete this;
}
void zmq::udp_engine_t::sockaddr_to_msg (zmq::msg_t *msg, sockaddr_in* addr)
{
char* name = inet_ntoa(addr->sin_addr);
char port[6];
sprintf (port, "%d", (int)ntohs (addr->sin_port));
int size = strlen (name) + strlen (port) + 1 + 1; // Colon + NULL
int rc = msg->init_size (size);
errno_assert (rc == 0);
msg->set_flags (msg_t::more);
char *address = (char*)msg->data ();
strcpy (address, name);
strcat (address, ":");
strcat (address, port);
}
int zmq::udp_engine_t::resolve_raw_address (char *name_, int length_)
{
const char *delimiter = strrchr (name_, ':');
if (!delimiter) {
errno = EINVAL;
return -1;
}
std::string addr_str (name_, delimiter - name_);
std::string port_str (delimiter + 1);
// Parse the port number (0 is not a valid port).
uint16_t port = (uint16_t) atoi (port_str.c_str ());
if (port == 0) {
errno = EINVAL;
return -1;
}
raw_address.sin_family = AF_INET;
raw_address.sin_port = htons (port);
raw_address.sin_addr.s_addr = inet_addr (addr_str.c_str ());
if (raw_address.sin_addr.s_addr == INADDR_NONE) {
errno = EINVAL;
return -1;
}
return 0;
}
void zmq::udp_engine_t::out_event()
{
msg_t group_msg;
......@@ -163,12 +222,34 @@ void zmq::udp_engine_t::out_event()
size_t group_size = group_msg.size ();
size_t body_size = body_msg.size ();
size_t size = group_size + body_size + 1;
size_t size;
if (options.raw_socket) {
rc = resolve_raw_address ((char*) group_msg.data(), group_size);
// We discard the message if address is not valid
if (rc != 0) {
rc = group_msg.close ();
errno_assert (rc == 0);
// TODO: check if larger than maximum size
out_buffer[0] = (unsigned char) group_size;
memcpy (out_buffer + 1, group_msg.data (), group_size);
memcpy (out_buffer + 1 + group_size, body_msg.data (), body_size);
body_msg.close ();
errno_assert (rc == 0);
return;
}
size = body_size;
memcpy (out_buffer, body_msg.data (), body_size);
}
else {
size = group_size + body_size + 1;
// TODO: check if larger than maximum size
out_buffer[0] = (unsigned char) group_size;
memcpy (out_buffer + 1, group_msg.data (), group_size);
memcpy (out_buffer + 1 + group_size, body_msg.data (), body_size);
}
rc = group_msg.close ();
errno_assert (rc == 0);
......@@ -178,13 +259,10 @@ void zmq::udp_engine_t::out_event()
#ifdef ZMQ_HAVE_WINDOWS
rc = sendto (fd, (const char *) out_buffer, (int) size, 0,
address->resolved.udp_addr->dest_addr (),
(int) address->resolved.udp_addr->dest_addrlen ());
out_address, (int) out_addrlen);
wsa_assert (rc != SOCKET_ERROR);
#else
rc = sendto (fd, out_buffer, size, 0,
address->resolved.udp_addr->dest_addr (),
address->resolved.udp_addr->dest_addrlen ());
rc = sendto (fd, out_buffer, size, 0, out_address, out_addrlen);
errno_assert (rc != -1);
#endif
}
......@@ -208,8 +286,10 @@ void zmq::udp_engine_t::restart_output()
void zmq::udp_engine_t::in_event()
{
struct sockaddr_in in_address;
socklen_t in_addrlen = sizeof(sockaddr_in);
#ifdef ZMQ_HAVE_WINDOWS
int nbytes = recv(fd, (char*) in_buffer, MAX_UDP_MSG, 0);
int nbytes = recvfrom(fd, (char*) in_buffer, MAX_UDP_MSG, 0, (sockaddr*) &in_address, &in_addrlen);
const int last_error = WSAGetLastError();
if (nbytes == SOCKET_ERROR) {
wsa_assert(
......@@ -219,7 +299,7 @@ void zmq::udp_engine_t::in_event()
return;
}
#else
int nbytes = recv(fd, in_buffer, MAX_UDP_MSG, 0);
int nbytes = recvfrom(fd, in_buffer, MAX_UDP_MSG, 0, (sockaddr*) &in_address, &in_addrlen);
if (nbytes == -1) {
errno_assert(errno != EBADF
&& errno != EFAULT
......@@ -228,20 +308,33 @@ void zmq::udp_engine_t::in_event()
return;
}
#endif
int rc;
int body_size;
int body_offset;
msg_t msg;
int group_size = in_buffer[0];
if (options.raw_socket) {
sockaddr_to_msg (&msg, &in_address);
// This doesn't fit, just ingore
if (nbytes - 1 < group_size)
return;
body_size = nbytes;
body_offset = 0;
}
else {
char* group_buffer = (char *)in_buffer + 1;
int group_size = in_buffer[0];
int body_size = nbytes - 1 - group_size;
rc = msg.init_size (group_size);
errno_assert (rc == 0);
msg.set_flags (msg_t::more);
memcpy (msg.data (), group_buffer, group_size);
msg_t msg;
int rc = msg.init_size (group_size);
errno_assert (rc == 0);
msg.set_flags (msg_t::more);
memcpy (msg.data (), in_buffer + 1, group_size);
// This doesn't fit, just ingore
if (nbytes - 1 < group_size)
return;
body_size = nbytes - 1 - group_size;
body_offset = 1 + group_size;
}
rc = session->push_msg (&msg);
errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN));
......@@ -259,7 +352,7 @@ void zmq::udp_engine_t::in_event()
errno_assert (rc == 0);
rc = msg.init_size (body_size);
errno_assert (rc == 0);
memcpy (msg.data (), in_buffer + 1 + group_size, body_size);
memcpy (msg.data (), in_buffer + body_offset, body_size);
rc = session->push_msg (&msg);
errno_assert (rc == 0);
rc = msg.close ();
......
......@@ -6,6 +6,7 @@
#include "i_engine.hpp"
#include "address.hpp"
#include "udp_address.hpp"
#include "msg.hpp"
#define MAX_UDP_MSG 8192
......@@ -17,7 +18,7 @@ namespace zmq
class udp_engine_t : public io_object_t, public i_engine
{
public:
udp_engine_t ();
udp_engine_t (const options_t &options_);
~udp_engine_t ();
int init (address_t *address_, bool send_, bool recv_);
......@@ -45,6 +46,9 @@ namespace zmq
private:
int resolve_raw_address (char *addr_, int length_);
void sockaddr_to_msg (zmq::msg_t *msg, sockaddr_in* addr);
bool plugged;
fd_t fd;
......@@ -52,6 +56,12 @@ namespace zmq
handle_t handle;
address_t *address;
options_t options;
sockaddr_in raw_address;
const struct sockaddr* out_address;
socklen_t out_addrlen;
unsigned char out_buffer[MAX_UDP_MSG];
unsigned char in_buffer[MAX_UDP_MSG];
bool send_enabled;
......
......@@ -44,6 +44,7 @@
#define ZMQ_DISH 15
#define ZMQ_GATHER 16
#define ZMQ_SCATTER 17
#define ZMQ_DGRAM 18
/* DRAFT Socket methods. */
int zmq_join (void *s, const char *group);
......
......@@ -121,6 +121,7 @@ IF (ENABLE_DRAFTS)
test_radio_dish
test_udp
test_scatter_gather
test_dgram
)
ENDIF (ENABLE_DRAFTS)
......
/*
Copyright (c) 2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
void str_send_to (void *s_, const char *content_, const char *address_)
{
// Send the address part
int rc = s_sendmore (s_, address_);
assert (rc > 0);
rc = s_send (s_, content_);
assert (rc > 0);
}
void str_recv_from (void *s_, char **ptr_content_, char **ptr_address_)
{
*ptr_address_ = s_recv (s_);
assert (ptr_address_);
*ptr_content_ = s_recv (s_);
assert (ptr_content_);
}
int main (void)
{
setup_test_environment ();
void *ctx = zmq_ctx_new ();
assert (ctx);
char* message_string;
char* address;
void *sender = zmq_socket (ctx, ZMQ_DGRAM);
void *listener = zmq_socket (ctx, ZMQ_DGRAM);
int rc = zmq_bind (listener, "udp://*:5556");
assert (rc == 0);
rc = zmq_bind (sender, "udp://*:5557");
assert (rc == 0);
str_send_to (sender, "Is someone there ?", "127.0.0.1:5556");
str_recv_from (listener, &message_string, &address);
assert (strcmp(message_string, "Is someone there ?") == 0);
assert (strcmp(address, "127.0.0.1:5557") == 0);
free (message_string);
str_send_to (listener, "Yes, there is !", address);
free (address);
str_recv_from (sender, &message_string, &address);
assert (strcmp(message_string, "Yes, there is !") == 0);
assert (strcmp(address, "127.0.0.1:5556") == 0);
free (message_string);
free (address);
rc = zmq_close (sender);
assert (rc == 0);
rc = zmq_close (listener);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment