Commit 62922188 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #810 from sradomski/master

ZMQ_SRCFD docs and tests
parents ecb97709 23ea1b2b
...@@ -23,6 +23,12 @@ The following properties can be retrieved with the _zmq_msg_get()_ function: ...@@ -23,6 +23,12 @@ The following properties can be retrieved with the _zmq_msg_get()_ function:
*ZMQ_MORE*:: *ZMQ_MORE*::
Indicates that there are more message frames to follow after the 'message'. Indicates that there are more message frames to follow after the 'message'.
*ZMQ_SRCFD*::
Returns the file descriptor of the socket the 'message' was read from. This
allows application to retrieve the remote endpoint via 'getpeername(2)'. Be
aware that the respective socket might be closed already, reused even.
Currently only implemented for TCP sockets.
RETURN VALUE RETURN VALUE
------------ ------------
The _zmq_msg_get()_ function shall return the value for the property if The _zmq_msg_get()_ function shall return the value for the property if
......
...@@ -199,7 +199,7 @@ ZMQ_EXPORT int zmq_ctx_destroy (void *context); ...@@ -199,7 +199,7 @@ ZMQ_EXPORT int zmq_ctx_destroy (void *context);
/* 0MQ message definition. */ /* 0MQ message definition. */
/******************************************************************************/ /******************************************************************************/
typedef struct zmq_msg_t {unsigned char _ [32];} zmq_msg_t; typedef struct zmq_msg_t {unsigned char _ [40];} zmq_msg_t;
typedef void (zmq_free_fn) (void *data, void *hint); typedef void (zmq_free_fn) (void *data, void *hint);
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
// Check whether the sizes of public representation of the message (zmq_msg_t) // Check whether the sizes of public representation of the message (zmq_msg_t)
// and private representation of the message (zmq::msg_t) match. // and private representation of the message (zmq::msg_t) match.
typedef char zmq_msg_size_check typedef char zmq_msg_size_check
[2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) - 1]; [2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) - 1];
...@@ -43,11 +44,13 @@ int zmq::msg_t::init () ...@@ -43,11 +44,13 @@ int zmq::msg_t::init ()
u.vsm.type = type_vsm; u.vsm.type = type_vsm;
u.vsm.flags = 0; u.vsm.flags = 0;
u.vsm.size = 0; u.vsm.size = 0;
file_desc = -1;
return 0; return 0;
} }
int zmq::msg_t::init_size (size_t size_) int zmq::msg_t::init_size (size_t size_)
{ {
file_desc = -1;
if (size_ <= max_vsm_size) { if (size_ <= max_vsm_size) {
u.vsm.type = type_vsm; u.vsm.type = type_vsm;
u.vsm.flags = 0; u.vsm.flags = 0;
...@@ -67,7 +70,6 @@ int zmq::msg_t::init_size (size_t size_) ...@@ -67,7 +70,6 @@ int zmq::msg_t::init_size (size_t size_)
u.lmsg.content->size = size_; u.lmsg.content->size = size_;
u.lmsg.content->ffn = NULL; u.lmsg.content->ffn = NULL;
u.lmsg.content->hint = NULL; u.lmsg.content->hint = NULL;
u.lmsg.content->fd = -1;
new (&u.lmsg.content->refcnt) zmq::atomic_counter_t (); new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
} }
return 0; return 0;
...@@ -80,6 +82,8 @@ int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_, ...@@ -80,6 +82,8 @@ int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_,
// would occur once the data is accessed // would occur once the data is accessed
assert (data_ != NULL || size_ == 0); assert (data_ != NULL || size_ == 0);
file_desc = -1;
// Initialize constant message if there's no need to deallocate // Initialize constant message if there's no need to deallocate
if(ffn_ == NULL) { if(ffn_ == NULL) {
u.cmsg.type = type_cmsg; u.cmsg.type = type_cmsg;
...@@ -100,7 +104,6 @@ int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_, ...@@ -100,7 +104,6 @@ int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_,
u.lmsg.content->size = size_; u.lmsg.content->size = size_;
u.lmsg.content->ffn = ffn_; u.lmsg.content->ffn = ffn_;
u.lmsg.content->hint = hint_; u.lmsg.content->hint = hint_;
u.lmsg.content->fd = -1;
new (&u.lmsg.content->refcnt) zmq::atomic_counter_t (); new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
} }
return 0; return 0;
...@@ -249,17 +252,14 @@ void zmq::msg_t::reset_flags (unsigned char flags_) ...@@ -249,17 +252,14 @@ void zmq::msg_t::reset_flags (unsigned char flags_)
u.base.flags &= ~flags_; u.base.flags &= ~flags_;
} }
zmq::fd_t zmq::msg_t::fd () int64_t zmq::msg_t::fd ()
{ {
if (u.base.type == type_lmsg) return file_desc;
return u.lmsg.content->fd;
return -1;
} }
void zmq::msg_t::set_fd (fd_t fd_) void zmq::msg_t::set_fd (int64_t fd_)
{ {
if (u.base.type == type_lmsg) file_desc = fd_;
u.lmsg.content->fd = fd_;
} }
bool zmq::msg_t::is_identity () const bool zmq::msg_t::is_identity () const
......
...@@ -25,7 +25,6 @@ ...@@ -25,7 +25,6 @@
#include "config.hpp" #include "config.hpp"
#include "atomic_counter.hpp" #include "atomic_counter.hpp"
#include "fd.hpp"
// Signature for free function to deallocate the message content. // Signature for free function to deallocate the message content.
// Note that it has to be declared as "C" so that it is the same as // Note that it has to be declared as "C" so that it is the same as
...@@ -68,8 +67,8 @@ namespace zmq ...@@ -68,8 +67,8 @@ namespace zmq
unsigned char flags (); unsigned char flags ();
void set_flags (unsigned char flags_); void set_flags (unsigned char flags_);
void reset_flags (unsigned char flags_); void reset_flags (unsigned char flags_);
fd_t fd (); int64_t fd ();
void set_fd (fd_t fd_); void set_fd (int64_t fd_);
bool is_identity () const; bool is_identity () const;
bool is_delimiter (); bool is_delimiter ();
bool is_vsm (); bool is_vsm ();
...@@ -103,7 +102,6 @@ namespace zmq ...@@ -103,7 +102,6 @@ namespace zmq
msg_free_fn *ffn; msg_free_fn *ffn;
void *hint; void *hint;
zmq::atomic_counter_t refcnt; zmq::atomic_counter_t refcnt;
fd_t fd;
}; };
// Different message types. // Different message types.
...@@ -121,6 +119,9 @@ namespace zmq ...@@ -121,6 +119,9 @@ namespace zmq
type_max = 104 type_max = 104
}; };
// the file descriptor where this message originated, needs to be 64bit due to alignment
int64_t file_desc;
// Note that fields shared between different message types are not // Note that fields shared between different message types are not
// moved to tha parent class (msg_t). This way we ger tighter packing // moved to tha parent class (msg_t). This way we ger tighter packing
// of the data. Shared fields can be accessed via 'base' member of // of the data. Shared fields can be accessed via 'base' member of
......
...@@ -827,12 +827,10 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -827,12 +827,10 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
if (unlikely (rc != 0 && errno != EAGAIN)) if (unlikely (rc != 0 && errno != EAGAIN))
return -1; return -1;
// set file descriptor
if (file_desc >= 0)
msg_->set_fd(file_desc);
// If we have the message, return immediately. // If we have the message, return immediately.
if (rc == 0) { if (rc == 0) {
if (file_desc >= 0)
msg_->set_fd(file_desc);
extract_flags (msg_); extract_flags (msg_);
return 0; return 0;
} }
...@@ -849,6 +847,8 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -849,6 +847,8 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
rc = xrecv (msg_); rc = xrecv (msg_);
if (rc < 0) if (rc < 0)
return rc; return rc;
if (file_desc >= 0)
msg_->set_fd(file_desc);
extract_flags (msg_); extract_flags (msg_);
return 0; return 0;
} }
...@@ -881,6 +881,8 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -881,6 +881,8 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
} }
} }
if (file_desc >= 0)
msg_->set_fd(file_desc);
extract_flags (msg_); extract_flags (msg_);
return 0; return 0;
} }
......
...@@ -17,6 +17,7 @@ noinst_PROGRAMS = test_system \ ...@@ -17,6 +17,7 @@ noinst_PROGRAMS = test_system \
test_immediate \ test_immediate \
test_last_endpoint \ test_last_endpoint \
test_term_endpoint \ test_term_endpoint \
test_srcfd \
test_monitor \ test_monitor \
test_resource \ test_resource \
test_router_mandatory \ test_router_mandatory \
...@@ -81,6 +82,7 @@ test_connect_resolve_SOURCES = test_connect_resolve.cpp ...@@ -81,6 +82,7 @@ test_connect_resolve_SOURCES = test_connect_resolve.cpp
test_immediate_SOURCES = test_immediate.cpp test_immediate_SOURCES = test_immediate.cpp
test_last_endpoint_SOURCES = test_last_endpoint.cpp test_last_endpoint_SOURCES = test_last_endpoint.cpp
test_term_endpoint_SOURCES = test_term_endpoint.cpp test_term_endpoint_SOURCES = test_term_endpoint.cpp
test_srcfd_SOURCES = test_srcfd.cpp
test_monitor_SOURCES = test_monitor.cpp test_monitor_SOURCES = test_monitor.cpp
test_resource_SOURCES = test_resource.cpp test_resource_SOURCES = test_resource.cpp
test_router_mandatory_SOURCES = test_router_mandatory.cpp test_router_mandatory_SOURCES = test_router_mandatory.cpp
......
/*
Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#define MSG_SIZE 20
#ifdef _WIN32
#include <Winsock2.h>
#include <Ws2tcpip.h>
#else
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#endif
int main (void)
{
int rc;
setup_test_environment();
// Create the infrastructure
void *ctx = zmq_ctx_new ();
assert (ctx);
void *rep = zmq_socket (ctx, ZMQ_REP);
assert (rep);
void *req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
rc = zmq_bind(rep, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = zmq_connect(req, "tcp://127.0.0.1:5560");
assert (rc == 0);
char tmp[MSG_SIZE];
zmq_send(req, tmp, MSG_SIZE, 0);
zmq_msg_t msg;
rc = zmq_msg_init(&msg);
assert (rc == 0);
zmq_recvmsg(rep, &msg, 0);
assert(zmq_msg_size(&msg) == MSG_SIZE);
// get the messages source file descriptor
int srcFd = zmq_msg_get(&msg, ZMQ_SRCFD);
assert(srcFd >= 0);
// get the remote endpoint
struct sockaddr_storage ss;
socklen_t addrlen = sizeof ss;
rc = getpeername (srcFd, (struct sockaddr*) &ss, &addrlen);
assert (rc == 0);
char host [NI_MAXHOST];
rc = getnameinfo ((struct sockaddr*) &ss, addrlen, host, sizeof host, NULL, 0, NI_NUMERICHOST);
assert (rc == 0);
// assert it is localhost which connected
assert (strcmp(host, "127.0.0.1") == 0);
rc = zmq_close (rep);
assert (rc == 0);
rc = zmq_close (req);
assert (rc == 0);
// sleep a bit for the socket to be freed
usleep(30000);
// getting name from closed socket will fail
rc = getpeername (srcFd, (struct sockaddr*) &ss, &addrlen);
assert (rc == -1);
assert (errno == EBADF);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment