Commit c7e3efba authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #808 from sradomski/master

Allow clients to get remote endpoint per message for TCP connections
parents 08d897b1 823b7ebe
...@@ -296,6 +296,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); ...@@ -296,6 +296,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
/* Message options */ /* Message options */
#define ZMQ_MORE 1 #define ZMQ_MORE 1
#define ZMQ_SRCFD 2
/* Send/recv options. */ /* Send/recv options. */
#define ZMQ_DONTWAIT 1 #define ZMQ_DONTWAIT 1
......
...@@ -67,6 +67,7 @@ int zmq::msg_t::init_size (size_t size_) ...@@ -67,6 +67,7 @@ int zmq::msg_t::init_size (size_t size_)
u.lmsg.content->size = size_; u.lmsg.content->size = size_;
u.lmsg.content->ffn = NULL; u.lmsg.content->ffn = NULL;
u.lmsg.content->hint = NULL; u.lmsg.content->hint = NULL;
u.lmsg.content->fd = -1;
new (&u.lmsg.content->refcnt) zmq::atomic_counter_t (); new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
} }
return 0; return 0;
...@@ -99,6 +100,7 @@ int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_, ...@@ -99,6 +100,7 @@ int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_,
u.lmsg.content->size = size_; u.lmsg.content->size = size_;
u.lmsg.content->ffn = ffn_; u.lmsg.content->ffn = ffn_;
u.lmsg.content->hint = hint_; u.lmsg.content->hint = hint_;
u.lmsg.content->fd = -1;
new (&u.lmsg.content->refcnt) zmq::atomic_counter_t (); new (&u.lmsg.content->refcnt) zmq::atomic_counter_t ();
} }
return 0; return 0;
...@@ -247,6 +249,19 @@ void zmq::msg_t::reset_flags (unsigned char flags_) ...@@ -247,6 +249,19 @@ void zmq::msg_t::reset_flags (unsigned char flags_)
u.base.flags &= ~flags_; u.base.flags &= ~flags_;
} }
zmq::fd_t zmq::msg_t::fd ()
{
if (u.base.type == type_lmsg)
return u.lmsg.content->fd;
return -1;
}
void zmq::msg_t::set_fd (fd_t fd_)
{
if (u.base.type == type_lmsg)
u.lmsg.content->fd = fd_;
}
bool zmq::msg_t::is_identity () const bool zmq::msg_t::is_identity () const
{ {
return (u.base.flags & identity) == identity; return (u.base.flags & identity) == identity;
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "config.hpp" #include "config.hpp"
#include "atomic_counter.hpp" #include "atomic_counter.hpp"
#include "fd.hpp"
// Signature for free function to deallocate the message content. // Signature for free function to deallocate the message content.
// Note that it has to be declared as "C" so that it is the same as // Note that it has to be declared as "C" so that it is the same as
...@@ -67,6 +68,8 @@ namespace zmq ...@@ -67,6 +68,8 @@ namespace zmq
unsigned char flags (); unsigned char flags ();
void set_flags (unsigned char flags_); void set_flags (unsigned char flags_);
void reset_flags (unsigned char flags_); void reset_flags (unsigned char flags_);
fd_t fd ();
void set_fd (fd_t fd_);
bool is_identity () const; bool is_identity () const;
bool is_delimiter (); bool is_delimiter ();
bool is_vsm (); bool is_vsm ();
...@@ -100,6 +103,7 @@ namespace zmq ...@@ -100,6 +103,7 @@ namespace zmq
msg_free_fn *ffn; msg_free_fn *ffn;
void *hint; void *hint;
zmq::atomic_counter_t refcnt; zmq::atomic_counter_t refcnt;
fd_t fd;
}; };
// Different message types. // Different message types.
......
...@@ -36,6 +36,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, ...@@ -36,6 +36,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
bool active_, class socket_base_t *socket_, const options_t &options_, bool active_, class socket_base_t *socket_, const options_t &options_,
const address_t *addr_) const address_t *addr_)
{ {
session_base_t *s = NULL; session_base_t *s = NULL;
switch (options_.type) { switch (options_.type) {
case ZMQ_REQ: case ZMQ_REQ:
...@@ -115,6 +116,7 @@ int zmq::session_base_t::pull_msg (msg_t *msg_) ...@@ -115,6 +116,7 @@ int zmq::session_base_t::pull_msg (msg_t *msg_)
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
incomplete_in = msg_->flags () & msg_t::more ? true : false; incomplete_in = msg_->flags () & msg_t::more ? true : false;
return 0; return 0;
......
...@@ -136,6 +136,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -136,6 +136,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
last_tsc (0), last_tsc (0),
ticks (0), ticks (0),
rcvmore (false), rcvmore (false),
file_desc(-1),
monitor_socket (NULL), monitor_socket (NULL),
monitor_events (0) monitor_events (0)
{ {
...@@ -826,6 +827,10 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -826,6 +827,10 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
if (unlikely (rc != 0 && errno != EAGAIN)) if (unlikely (rc != 0 && errno != EAGAIN))
return -1; return -1;
// set file descriptor
if (file_desc >= 0)
msg_->set_fd(file_desc);
// If we have the message, return immediately. // If we have the message, return immediately.
if (rc == 0) { if (rc == 0) {
extract_flags (msg_); extract_flags (msg_);
...@@ -1188,6 +1193,16 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_) ...@@ -1188,6 +1193,16 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_)
return rc; return rc;
} }
void zmq::socket_base_t::set_fd(zmq::fd_t fd_)
{
file_desc = fd_;
}
zmq::fd_t zmq::socket_base_t::fd()
{
return file_desc;
}
void zmq::socket_base_t::event_connected (std::string &addr_, int fd_) void zmq::socket_base_t::event_connected (std::string &addr_, int fd_)
{ {
if (monitor_events & ZMQ_EVENT_CONNECTED) { if (monitor_events & ZMQ_EVENT_CONNECTED) {
......
...@@ -106,6 +106,9 @@ namespace zmq ...@@ -106,6 +106,9 @@ namespace zmq
int monitor (const char *endpoint_, int events_); int monitor (const char *endpoint_, int events_);
void set_fd(fd_t fd_);
fd_t fd();
void event_connected (std::string &addr_, int fd_); void event_connected (std::string &addr_, int fd_);
void event_connect_delayed (std::string &addr_, int err_); void event_connect_delayed (std::string &addr_, int err_);
void event_connect_retried (std::string &addr_, int interval_); void event_connect_retried (std::string &addr_, int interval_);
...@@ -230,6 +233,9 @@ namespace zmq ...@@ -230,6 +233,9 @@ namespace zmq
// True if the last message received had MORE flag set. // True if the last message received had MORE flag set.
bool rcvmore; bool rcvmore;
// File descriptor if applicable
fd_t file_desc;
// Improves efficiency of time measurement. // Improves efficiency of time measurement.
clock_t clock; clock_t clock;
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <new> #include <new>
#include <string> #include <string>
#include <stdio.h>
#include "platform.hpp" #include "platform.hpp"
#include "tcp_listener.hpp" #include "tcp_listener.hpp"
...@@ -90,6 +91,9 @@ void zmq::tcp_listener_t::in_event () ...@@ -90,6 +91,9 @@ void zmq::tcp_listener_t::in_event ()
tune_tcp_socket (fd); tune_tcp_socket (fd);
tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl); tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
// remember our fd for ZMQ_SRCFD in messages
socket->set_fd(fd);
// Create the engine object for this connection. // Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t *engine = new (std::nothrow)
stream_engine_t (fd, options, endpoint); stream_engine_t (fd, options, endpoint);
......
...@@ -640,6 +640,8 @@ int zmq_msg_get (zmq_msg_t *msg_, int option_) ...@@ -640,6 +640,8 @@ int zmq_msg_get (zmq_msg_t *msg_, int option_)
switch (option_) { switch (option_) {
case ZMQ_MORE: case ZMQ_MORE:
return (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more)? 1: 0; return (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more)? 1: 0;
case ZMQ_SRCFD:
return ((zmq::msg_t*) msg_)->fd ();
default: default:
errno = EINVAL; errno = EINVAL;
return -1; return -1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment