Commit fe3e8c5c authored by Stoian Ivanov's avatar Stoian Ivanov

linking fd to pipe identity via socket option

parent e37c2062
...@@ -300,6 +300,7 @@ ZMQ_EXPORT char *zmq_msg_gets (zmq_msg_t *msg, char *property); ...@@ -300,6 +300,7 @@ ZMQ_EXPORT char *zmq_msg_gets (zmq_msg_t *msg, char *property);
#define ZMQ_GSSAPI_PRINCIPAL 63 #define ZMQ_GSSAPI_PRINCIPAL 63
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL 64 #define ZMQ_GSSAPI_SERVICE_PRINCIPAL 64
#define ZMQ_GSSAPI_PLAINTEXT 65 #define ZMQ_GSSAPI_PLAINTEXT 65
#define ZMQ_IDENTITY_FD 66
/* Message options */ /* Message options */
#define ZMQ_MORE 1 #define ZMQ_MORE 1
......
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
#ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__ #ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__
#define __ZMQ_I_ENGINE_HPP_INCLUDED__ #define __ZMQ_I_ENGINE_HPP_INCLUDED__
#include "fd.hpp"
namespace zmq namespace zmq
{ {
...@@ -47,7 +49,10 @@ namespace zmq ...@@ -47,7 +49,10 @@ namespace zmq
// are messages to send available. // are messages to send available.
virtual void restart_output () = 0; virtual void restart_output () = 0;
virtual void zap_msg_available () = 0; virtual void zap_msg_available () = 0;
// provide a way to link from engine to file descriptor
virtual fd_t get_assoc_fd () { return retired_fd;};
}; };
} }
......
...@@ -65,6 +65,7 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], ...@@ -65,6 +65,7 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
int inhwm_, int outhwm_, bool conflate_) : int inhwm_, int outhwm_, bool conflate_) :
object_t (parent_), object_t (parent_),
assoc_fd (retired_fd),
inpipe (inpipe_), inpipe (inpipe_),
outpipe (outpipe_), outpipe (outpipe_),
in_active (true), in_active (true),
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include "stdint.hpp" #include "stdint.hpp"
#include "array.hpp" #include "array.hpp"
#include "blob.hpp" #include "blob.hpp"
#include "fd.hpp"
namespace zmq namespace zmq
{ {
...@@ -117,6 +118,8 @@ namespace zmq ...@@ -117,6 +118,8 @@ namespace zmq
// set the high water marks. // set the high water marks.
void set_hwms (int inhwm_, int outhwm_); void set_hwms (int inhwm_, int outhwm_);
// provide a way to link pipe to engine fd. Set on session initialization
fd_t assoc_fd; //=retired_fd
private: private:
// Type of the underlying lock-free pipe. // Type of the underlying lock-free pipe.
......
...@@ -133,6 +133,33 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, ...@@ -133,6 +133,33 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
return -1; return -1;
} }
int zmq::router_t::xgetsockopt (int option_, const void *optval_,
size_t *optvallen_)
{
switch (option_) {
case ZMQ_IDENTITY_FD:
if (optval_==NULL && optvallen_) {
*optvallen_=sizeof(fd_t);
return 0;
}
if (optval_ && optvallen_ && *optvallen_) {
blob_t identity= blob_t((unsigned char*)optval_,*optvallen_);
outpipes_t::iterator it = outpipes.find (identity);
if (it == outpipes.end() ){
return ENOTSOCK;
}
*((fd_t*)optval_)=it->second.pipe->assoc_fd;
*optvallen_=sizeof(fd_t);
return 0;
}
break;
default:
break;
}
errno = EINVAL;
return -1;
}
void zmq::router_t::xpipe_terminated (pipe_t *pipe_) void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
{ {
......
...@@ -47,6 +47,7 @@ namespace zmq ...@@ -47,6 +47,7 @@ namespace zmq
// Overrides of functions from socket_base_t. // Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xgetsockopt (int option_, const void *optval_, size_t *optvallen_);
int xsend (zmq::msg_t *msg_); int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
...@@ -54,7 +55,6 @@ namespace zmq ...@@ -54,7 +55,6 @@ namespace zmq
void xread_activated (zmq::pipe_t *pipe_); void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_);
protected: protected:
// Rollback any message parts that were sent but not yet flushed. // Rollback any message parts that were sent but not yet flushed.
......
...@@ -353,7 +353,8 @@ void zmq::session_base_t::process_attach (i_engine *engine_) ...@@ -353,7 +353,8 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
// Remember the local end of the pipe. // Remember the local end of the pipe.
zmq_assert (!pipe); zmq_assert (!pipe);
pipe = pipes [0]; pipe = pipes [0];
// Store engine assoc_fd for lilnking pipe to fd
pipe->assoc_fd=engine_->get_assoc_fd();
// Ask socket to plug into the remote end of the pipe. // Ask socket to plug into the remote end of the pipe.
send_bind (socket, pipes [1]); send_bind (socket, pipes [1]);
} }
......
...@@ -284,6 +284,11 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, ...@@ -284,6 +284,11 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
errno = ETERM; errno = ETERM;
return -1; return -1;
} }
// First, check whether specific socket type overloads the option.
int rc = xgetsockopt (option_, optval_, optvallen_);
if (rc == 0 || errno != EINVAL)
return rc;
if (option_ == ZMQ_RCVMORE) { if (option_ == ZMQ_RCVMORE) {
if (*optvallen_ < sizeof (int)) { if (*optvallen_ < sizeof (int)) {
...@@ -1037,6 +1042,11 @@ int zmq::socket_base_t::xsetsockopt (int, const void *, size_t) ...@@ -1037,6 +1042,11 @@ int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
int zmq::socket_base_t::xgetsockopt (int, const void *, size_t*)
{
errno = EINVAL;
return -1;
}
bool zmq::socket_base_t::xhas_out () bool zmq::socket_base_t::xhas_out ()
{ {
......
...@@ -133,10 +133,13 @@ namespace zmq ...@@ -133,10 +133,13 @@ namespace zmq
// The default implementation assumes there are no specific socket // The default implementation assumes there are no specific socket
// options for the particular socket type. If not so, override this // options for the particular socket type. If not so, override this
// method. // methods.
virtual int xsetsockopt (int option_, const void *optval_, virtual int xsetsockopt (int option_, const void *optval_,
size_t optvallen_); size_t optvallen_);
virtual int xgetsockopt (int option_, const void *optval_,
size_t *optvallen_);
// The default implementation assumes that send is not supported. // The default implementation assumes that send is not supported.
virtual bool xhas_out (); virtual bool xhas_out ();
virtual int xsend (zmq::msg_t *msg_); virtual int xsend (zmq::msg_t *msg_);
......
...@@ -68,6 +68,8 @@ namespace zmq ...@@ -68,6 +68,8 @@ namespace zmq
void in_event (); void in_event ();
void out_event (); void out_event ();
// export s via i_engine so it is possible to link a pipe to fd
fd_t get_assoc_fd (){ return s; };
private: private:
// Unplug the engine from the session. // Unplug the engine from the session.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment