Commit a6381890 authored by somdoron's avatar somdoron

add methods to handle poller

parent 34968771
...@@ -383,7 +383,8 @@ ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags); ...@@ -383,7 +383,8 @@ ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_send_const (void *s, const void *buf, size_t len, int flags); ZMQ_EXPORT int zmq_send_const (void *s, const void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags); ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_socket_monitor (void *s, const char *addr, int events); ZMQ_EXPORT int zmq_socket_monitor (void *s, const char *addr, int events);
ZMQ_EXPORT int zmq_add_poller (void *s, void *p);
ZMQ_EXPORT int zmq_remove_poller (void *s, void *p);
/******************************************************************************/ /******************************************************************************/
/* I/O multiplexing. */ /* I/O multiplexing. */
...@@ -397,6 +398,7 @@ ZMQ_EXPORT int zmq_socket_monitor (void *s, const char *addr, int events); ...@@ -397,6 +398,7 @@ ZMQ_EXPORT int zmq_socket_monitor (void *s, const char *addr, int events);
typedef struct zmq_pollitem_t typedef struct zmq_pollitem_t
{ {
void *socket; void *socket;
void *poller;
#if defined _WIN32 #if defined _WIN32
SOCKET fd; SOCKET fd;
#else #else
...@@ -408,7 +410,9 @@ typedef struct zmq_pollitem_t ...@@ -408,7 +410,9 @@ typedef struct zmq_pollitem_t
#define ZMQ_POLLITEMS_DFLT 16 #define ZMQ_POLLITEMS_DFLT 16
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
ZMQ_EXPORT void *zmq_poller_new ();
ZMQ_EXPORT int zmq_poller_close (void *p);
/******************************************************************************/ /******************************************************************************/
/* Message proxying */ /* Message proxying */
......
...@@ -441,6 +441,38 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, ...@@ -441,6 +441,38 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return rc; return rc;
} }
int zmq::socket_base_t::add_signaler(signaler_t *s_)
{
ENTER_MUTEX();
if (!thread_safe) {
errno = EINVAL;
EXIT_MUTEX();
return -1;
}
((mailbox_safe_t*)mailbox)->add_signaler(s_);
EXIT_MUTEX();
return 0;
}
int zmq::socket_base_t::remove_signaler(signaler_t *s_)
{
ENTER_MUTEX();
if (!thread_safe) {
errno = EINVAL;
EXIT_MUTEX();
return -1;
}
((mailbox_safe_t*)mailbox)->remove_signaler(s_);
EXIT_MUTEX();
return 0;
}
int zmq::socket_base_t::bind (const char *addr_) int zmq::socket_base_t::bind (const char *addr_)
{ {
ENTER_MUTEX(); ENTER_MUTEX();
......
...@@ -90,6 +90,8 @@ namespace zmq ...@@ -90,6 +90,8 @@ namespace zmq
int term_endpoint (const char *addr_); int term_endpoint (const char *addr_);
int send (zmq::msg_t *msg_, int flags_); int send (zmq::msg_t *msg_, int flags_);
int recv (zmq::msg_t *msg_, int flags_); int recv (zmq::msg_t *msg_, int flags_);
int add_signaler (signaler_t *s);
int remove_signaler (signaler_t *s);
int close (); int close ();
// These functions are used by the polling mechanism to determine // These functions are used by the polling mechanism to determine
......
...@@ -74,6 +74,7 @@ struct iovec { ...@@ -74,6 +74,7 @@ struct iovec {
#include "msg.hpp" #include "msg.hpp"
#include "fd.hpp" #include "fd.hpp"
#include "metadata.hpp" #include "metadata.hpp"
#include "signaler.hpp"
#if !defined ZMQ_HAVE_WINDOWS #if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h> #include <unistd.h>
...@@ -561,6 +562,34 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_) ...@@ -561,6 +562,34 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
return nread; return nread;
} }
// Add/remove poller from a socket
int zmq_add_poller (void *s_, void *p_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
zmq::signaler_t *p = (zmq::signaler_t *) p_;
return s->add_signaler(p);
}
int zmq_remove_poller (void *s_, void *p_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
zmq::signaler_t *p = (zmq::signaler_t *) p_;
return s->remove_signaler(p);
}
// Message manipulators. // Message manipulators.
int zmq_msg_init (zmq_msg_t *msg_) int zmq_msg_init (zmq_msg_t *msg_)
...@@ -680,6 +709,31 @@ const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_) ...@@ -680,6 +709,31 @@ const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_)
} }
} }
// Create poller
void *zmq_poller_new ()
{
return new zmq::signaler_t ();
}
// Close poller
int zmq_poller_close (void* p)
{
zmq::signaler_t *s = (zmq::signaler_t*)p;
delete s;
return 0;
}
// Get poller fd
zmq::fd_t zmq_poller_get_fd (void *p)
{
zmq::signaler_t *s = (zmq::signaler_t*)p;
return s->get_fd ();
}
// Polling. // Polling.
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment