Commit 4012538c authored by Doron Somech's avatar Doron Somech

problem: srcfd is broken

parent a192663e
...@@ -87,7 +87,6 @@ int zmq::msg_t::init () ...@@ -87,7 +87,6 @@ int zmq::msg_t::init ()
u.vsm.size = 0; u.vsm.size = 0;
u.vsm.group[0] = '\0'; u.vsm.group[0] = '\0';
u.vsm.routing_id = 0; u.vsm.routing_id = 0;
u.vsm.fd = retired_fd;
return 0; return 0;
} }
...@@ -100,7 +99,6 @@ int zmq::msg_t::init_size (size_t size_) ...@@ -100,7 +99,6 @@ int zmq::msg_t::init_size (size_t size_)
u.vsm.size = (unsigned char) size_; u.vsm.size = (unsigned char) size_;
u.vsm.group[0] = '\0'; u.vsm.group[0] = '\0';
u.vsm.routing_id = 0; u.vsm.routing_id = 0;
u.vsm.fd = retired_fd;
} }
else { else {
u.lmsg.metadata = NULL; u.lmsg.metadata = NULL;
...@@ -108,7 +106,6 @@ int zmq::msg_t::init_size (size_t size_) ...@@ -108,7 +106,6 @@ int zmq::msg_t::init_size (size_t size_)
u.lmsg.flags = 0; u.lmsg.flags = 0;
u.lmsg.group[0] = '\0'; u.lmsg.group[0] = '\0';
u.lmsg.routing_id = 0; u.lmsg.routing_id = 0;
u.lmsg.fd = retired_fd;
u.lmsg.content = NULL; u.lmsg.content = NULL;
if (sizeof (content_t) + size_ > size_) if (sizeof (content_t) + size_ > size_)
u.lmsg.content = (content_t*) malloc (sizeof (content_t) + size_); u.lmsg.content = (content_t*) malloc (sizeof (content_t) + size_);
...@@ -137,7 +134,6 @@ int zmq::msg_t::init_external_storage(content_t* content_, void* data_, size_t s ...@@ -137,7 +134,6 @@ int zmq::msg_t::init_external_storage(content_t* content_, void* data_, size_t s
u.zclmsg.flags = 0; u.zclmsg.flags = 0;
u.zclmsg.group[0] = '\0'; u.zclmsg.group[0] = '\0';
u.zclmsg.routing_id = 0; u.zclmsg.routing_id = 0;
u.zclmsg.fd = retired_fd;
u.zclmsg.content = content_; u.zclmsg.content = content_;
u.zclmsg.content->data = data_; u.zclmsg.content->data = data_;
...@@ -165,7 +161,6 @@ int zmq::msg_t::init_data (void *data_, size_t size_, ...@@ -165,7 +161,6 @@ int zmq::msg_t::init_data (void *data_, size_t size_,
u.cmsg.size = size_; u.cmsg.size = size_;
u.cmsg.group[0] = '\0'; u.cmsg.group[0] = '\0';
u.cmsg.routing_id = 0; u.cmsg.routing_id = 0;
u.cmsg.fd = retired_fd;
} }
else { else {
u.lmsg.metadata = NULL; u.lmsg.metadata = NULL;
...@@ -173,7 +168,6 @@ int zmq::msg_t::init_data (void *data_, size_t size_, ...@@ -173,7 +168,6 @@ int zmq::msg_t::init_data (void *data_, size_t size_,
u.lmsg.flags = 0; u.lmsg.flags = 0;
u.lmsg.group[0] = '\0'; u.lmsg.group[0] = '\0';
u.lmsg.routing_id = 0; u.lmsg.routing_id = 0;
u.lmsg.fd = retired_fd;
u.lmsg.content = (content_t*) malloc (sizeof (content_t)); u.lmsg.content = (content_t*) malloc (sizeof (content_t));
if (!u.lmsg.content) { if (!u.lmsg.content) {
errno = ENOMEM; errno = ENOMEM;
...@@ -197,7 +191,6 @@ int zmq::msg_t::init_delimiter () ...@@ -197,7 +191,6 @@ int zmq::msg_t::init_delimiter ()
u.delimiter.flags = 0; u.delimiter.flags = 0;
u.delimiter.group[0] = '\0'; u.delimiter.group[0] = '\0';
u.delimiter.routing_id = 0; u.delimiter.routing_id = 0;
u.delimiter.fd = retired_fd;
return 0; return 0;
} }
...@@ -208,7 +201,6 @@ int zmq::msg_t::init_join () ...@@ -208,7 +201,6 @@ int zmq::msg_t::init_join ()
u.base.flags = 0; u.base.flags = 0;
u.base.group[0] = '\0'; u.base.group[0] = '\0';
u.base.routing_id = 0; u.base.routing_id = 0;
u.base.fd = retired_fd;
return 0; return 0;
} }
...@@ -219,7 +211,6 @@ int zmq::msg_t::init_leave () ...@@ -219,7 +211,6 @@ int zmq::msg_t::init_leave ()
u.base.flags = 0; u.base.flags = 0;
u.base.group[0] = '\0'; u.base.group[0] = '\0';
u.base.routing_id = 0; u.base.routing_id = 0;
u.base.fd = retired_fd;
return 0; return 0;
} }
...@@ -400,16 +391,6 @@ void zmq::msg_t::reset_flags (unsigned char flags_) ...@@ -400,16 +391,6 @@ void zmq::msg_t::reset_flags (unsigned char flags_)
u.base.flags &= ~flags_; u.base.flags &= ~flags_;
} }
zmq::fd_t zmq::msg_t::fd ()
{
return u.base.fd;
}
void zmq::msg_t::set_fd (fd_t fd_)
{
u.base.fd = fd_;
}
zmq::metadata_t *zmq::msg_t::metadata () const zmq::metadata_t *zmq::msg_t::metadata () const
{ {
return u.base.metadata; return u.base.metadata;
......
...@@ -105,8 +105,6 @@ namespace zmq ...@@ -105,8 +105,6 @@ namespace zmq
unsigned char flags (); unsigned char flags ();
void set_flags (unsigned char flags_); void set_flags (unsigned char flags_);
void reset_flags (unsigned char flags_); void reset_flags (unsigned char flags_);
fd_t fd ();
void set_fd (fd_t fd_);
metadata_t *metadata () const; metadata_t *metadata () const;
void set_metadata (metadata_t *metadata_); void set_metadata (metadata_t *metadata_);
void reset_metadata (); void reset_metadata ();
...@@ -139,8 +137,7 @@ namespace zmq ...@@ -139,8 +137,7 @@ namespace zmq
enum { max_vsm_size = msg_t_size - (sizeof (metadata_t *) + enum { max_vsm_size = msg_t_size - (sizeof (metadata_t *) +
3 + 3 +
16 + 16 +
sizeof (uint32_t) + sizeof (uint32_t))};
sizeof (fd_t))};
private: private:
zmq::atomic_counter_t* refcnt(); zmq::atomic_counter_t* refcnt();
...@@ -179,13 +176,11 @@ namespace zmq ...@@ -179,13 +176,11 @@ namespace zmq
unsigned char unused [msg_t_size - (sizeof (metadata_t *) + unsigned char unused [msg_t_size - (sizeof (metadata_t *) +
2 + 2 +
16 + 16 +
sizeof (uint32_t) + sizeof (uint32_t))];
sizeof (fd_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16]; char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd;
} base; } base;
struct { struct {
metadata_t *metadata; metadata_t *metadata;
...@@ -195,7 +190,6 @@ namespace zmq ...@@ -195,7 +190,6 @@ namespace zmq
unsigned char flags; unsigned char flags;
char group [16]; char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd;
} vsm; } vsm;
struct { struct {
metadata_t *metadata; metadata_t *metadata;
...@@ -204,13 +198,11 @@ namespace zmq ...@@ -204,13 +198,11 @@ namespace zmq
sizeof (content_t*) + sizeof (content_t*) +
2 + 2 +
16 + 16 +
sizeof (uint32_t) + sizeof (uint32_t))];
sizeof (fd_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16]; char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd;
} lmsg; } lmsg;
struct { struct {
metadata_t *metadata; metadata_t *metadata;
...@@ -219,13 +211,11 @@ namespace zmq ...@@ -219,13 +211,11 @@ namespace zmq
sizeof (content_t*) + sizeof (content_t*) +
2 + 2 +
16 + 16 +
sizeof (uint32_t) + sizeof (uint32_t))];
sizeof (fd_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16]; char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd;
} zclmsg; } zclmsg;
struct { struct {
metadata_t *metadata; metadata_t *metadata;
...@@ -236,26 +226,22 @@ namespace zmq ...@@ -236,26 +226,22 @@ namespace zmq
sizeof (size_t) + sizeof (size_t) +
2 + 2 +
16 + 16 +
sizeof (uint32_t) + sizeof (uint32_t))];
sizeof (fd_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16]; char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd;
} cmsg; } cmsg;
struct { struct {
metadata_t *metadata; metadata_t *metadata;
unsigned char unused [msg_t_size - (sizeof (metadata_t *) + unsigned char unused [msg_t_size - (sizeof (metadata_t *) +
2 + 2 +
16 + 16 +
sizeof (uint32_t) + sizeof (uint32_t))];
sizeof (fd_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16]; char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd;
} delimiter; } delimiter;
} u; } u;
}; };
......
...@@ -197,7 +197,6 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool ...@@ -197,7 +197,6 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool
last_tsc (0), last_tsc (0),
ticks (0), ticks (0),
rcvmore (false), rcvmore (false),
file_desc(-1),
monitor_socket (NULL), monitor_socket (NULL),
monitor_events (0), monitor_events (0),
thread_safe (thread_safe_), thread_safe (thread_safe_),
...@@ -1215,8 +1214,6 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -1215,8 +1214,6 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// If we have the message, return immediately. // If we have the message, return immediately.
if (rc == 0) { if (rc == 0) {
if (file_desc != retired_fd)
msg_->set_fd(file_desc);
extract_flags (msg_); extract_flags (msg_);
EXIT_MUTEX (); EXIT_MUTEX ();
return 0; return 0;
...@@ -1238,8 +1235,6 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -1238,8 +1235,6 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
EXIT_MUTEX (); EXIT_MUTEX ();
return rc; return rc;
} }
if (file_desc != retired_fd)
msg_->set_fd(file_desc);
extract_flags (msg_); extract_flags (msg_);
EXIT_MUTEX (); EXIT_MUTEX ();
...@@ -1279,8 +1274,6 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -1279,8 +1274,6 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
} }
} }
if (file_desc != retired_fd)
msg_->set_fd(file_desc);
extract_flags (msg_); extract_flags (msg_);
EXIT_MUTEX (); EXIT_MUTEX ();
return 0; return 0;
...@@ -1646,16 +1639,6 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_) ...@@ -1646,16 +1639,6 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_)
return rc; return rc;
} }
void zmq::socket_base_t::set_fd(zmq::fd_t fd_)
{
file_desc = fd_;
}
zmq::fd_t zmq::socket_base_t::fd()
{
return file_desc;
}
void zmq::socket_base_t::event_connected (const std::string &addr_, int fd_) void zmq::socket_base_t::event_connected (const std::string &addr_, int fd_)
{ {
if (monitor_events & ZMQ_EVENT_CONNECTED) if (monitor_events & ZMQ_EVENT_CONNECTED)
......
...@@ -123,9 +123,6 @@ namespace zmq ...@@ -123,9 +123,6 @@ namespace zmq
int monitor (const char *endpoint_, int events_); int monitor (const char *endpoint_, int events_);
void set_fd(fd_t fd_);
fd_t fd();
void event_connected (const std::string &addr_, int fd_); void event_connected (const std::string &addr_, int fd_);
void event_connect_delayed (const std::string &addr_, int err_); void event_connect_delayed (const std::string &addr_, int err_);
void event_connect_retried (const std::string &addr_, int interval_); void event_connect_retried (const std::string &addr_, int interval_);
...@@ -264,9 +261,6 @@ namespace zmq ...@@ -264,9 +261,6 @@ namespace zmq
// True if the last message received had MORE flag set. // True if the last message received had MORE flag set.
bool rcvmore; bool rcvmore;
// File descriptor if applicable
fd_t file_desc;
// Improves efficiency of time measurement. // Improves efficiency of time measurement.
clock_t clock; clock_t clock;
......
...@@ -152,9 +152,6 @@ void zmq::socks_connecter_t::in_event () ...@@ -152,9 +152,6 @@ void zmq::socks_connecter_t::in_event ()
if (rc == -1) if (rc == -1)
error (); error ();
else { else {
// Remember our fd for ZMQ_SRCFD in messages
socket->set_fd (s);
// Create the engine object for this connection. // Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t *engine = new (std::nothrow)
stream_engine_t (s, options, endpoint); stream_engine_t (s, options, endpoint);
......
...@@ -985,6 +985,10 @@ void zmq::stream_engine_t::set_handshake_timer () ...@@ -985,6 +985,10 @@ void zmq::stream_engine_t::set_handshake_timer ()
bool zmq::stream_engine_t::init_properties (properties_t & properties) { bool zmq::stream_engine_t::init_properties (properties_t & properties) {
if (peer_address.empty()) return false; if (peer_address.empty()) return false;
properties.insert (std::make_pair("Peer-Address", peer_address)); properties.insert (std::make_pair("Peer-Address", peer_address));
// Private property to support deprecated SRCFD
std::string fd_string = static_cast<std::ostringstream*>(&(std::ostringstream() << (int)s))->str();
properties.insert (std::make_pair("__fd", fd_string));
return true; return true;
} }
......
...@@ -151,9 +151,6 @@ void zmq::tcp_connecter_t::out_event () ...@@ -151,9 +151,6 @@ void zmq::tcp_connecter_t::out_event ()
options.tcp_keepalive_idle, options.tcp_keepalive_intvl); options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
tune_tcp_maxrt (fd, options.tcp_maxrt); tune_tcp_maxrt (fd, options.tcp_maxrt);
// remember our fd for ZMQ_SRCFD in messages
socket->set_fd (fd);
// Create the engine object for this connection. // Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t *engine = new (std::nothrow)
stream_engine_t (fd, options, endpoint); stream_engine_t (fd, options, endpoint);
......
...@@ -105,9 +105,6 @@ void zmq::tcp_listener_t::in_event () ...@@ -105,9 +105,6 @@ void zmq::tcp_listener_t::in_event ()
options.tcp_keepalive_idle, options.tcp_keepalive_intvl); options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
tune_tcp_maxrt (fd, options.tcp_maxrt); tune_tcp_maxrt (fd, options.tcp_maxrt);
// remember our fd for ZMQ_SRCFD in messages
socket->set_fd(fd);
// Create the engine object for this connection. // Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t *engine = new (std::nothrow)
stream_engine_t (fd, options, endpoint); stream_engine_t (fd, options, endpoint);
......
...@@ -680,11 +680,17 @@ int zmq_msg_more (zmq_msg_t *msg_) ...@@ -680,11 +680,17 @@ int zmq_msg_more (zmq_msg_t *msg_)
int zmq_msg_get (zmq_msg_t *msg_, int property_) int zmq_msg_get (zmq_msg_t *msg_, int property_)
{ {
const char* fd_string;
switch (property_) { switch (property_) {
case ZMQ_MORE: case ZMQ_MORE:
return (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more)? 1: 0; return (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more)? 1: 0;
case ZMQ_SRCFD: case ZMQ_SRCFD:
return (int)((zmq::msg_t*) msg_)->fd (); fd_string = zmq_msg_gets(msg_, "__fd");
if (fd_string == NULL)
return (int)-1;
return atoi(fd_string);
case ZMQ_SHARED: case ZMQ_SHARED:
return (((zmq::msg_t*) msg_)->is_cmsg ()) || return (((zmq::msg_t*) msg_)->is_cmsg ()) ||
(((zmq::msg_t*) msg_)->flags () & zmq::msg_t::shared)? 1: 0; (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::shared)? 1: 0;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment