Commit 7b8e728e authored by Sergey KHripchenko's avatar Sergey KHripchenko

implement zmq_unbind(),zmq_disconnect(), zmq->sock->getsockopt(ZMQ_LAST_ENDPOINT_ID)

parent b89a53ee
......@@ -227,6 +227,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_TCP_KEEPALIVE_IDLE 36
#define ZMQ_TCP_KEEPALIVE_INTVL 37
#define ZMQ_TCP_ACCEPT_FILTER 38
#define ZMQ_LAST_ENDPOINT_ID 39
/* Message options */
......@@ -244,6 +245,8 @@ ZMQ_EXPORT int zmq_getsockopt (void *s, int option, void *optval,
size_t *optvallen);
ZMQ_EXPORT int zmq_bind (void *s, const char *addr);
ZMQ_EXPORT int zmq_connect (void *s, const char *addr);
ZMQ_EXPORT int zmq_unbind (void *s, void *ep);
ZMQ_EXPORT int zmq_disconnect (void *s, void *ep);
ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags);
......
......@@ -23,7 +23,8 @@
#include "tcp_address.hpp"
#include "ipc_address.hpp"
#include <string.h>
#include <string>
#include <sstream>
zmq::address_t::address_t (
const std::string &protocol_, const std::string &address_)
......@@ -50,3 +51,28 @@ zmq::address_t::~address_t ()
}
#endif
}
int zmq::address_t::to_string (std::string &addr_)
{
if (protocol == "tcp") {
if (resolved.tcp_addr) {
return resolved.tcp_addr->to_string(addr_);
}
}
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
else if (protocol == "ipc") {
if (resolved.ipc_addr) {
return resolved.tcp_addr->to_string(addr_);
}
}
#endif
if (!protocol.empty () && !address.empty ()) {
std::stringstream s;
s << protocol << "://" << address;
addr_ = s.str ();
return 0;
}
addr_.clear ();
return -1;
}
......@@ -44,6 +44,8 @@ namespace zmq
ipc_address_t *ipc_addr;
#endif
} resolved;
int to_string (std::string &addr_);
};
}
......
......@@ -24,13 +24,24 @@
#include "err.hpp"
#include <string.h>
#include <string>
#include <sstream>
zmq::ipc_address_t::ipc_address_t ()
{
memset (&address, 0, sizeof (address));
}
zmq::ipc_address_t::ipc_address_t (const sockaddr *sa, socklen_t sa_len)
{
zmq_assert(sa && sa_len > 0);
memset (&address, 0, sizeof (address));
if (sa->sa_family == AF_UNIX) {
memcpy(&address, sa, sa_len);
}
}
zmq::ipc_address_t::~ipc_address_t ()
{
}
......@@ -47,6 +58,19 @@ int zmq::ipc_address_t::resolve (const char *path_)
return 0;
}
int zmq::ipc_address_t::to_string (std::string &addr_)
{
if (address.sun_family != AF_UNIX) {
addr_.clear ();
return -1;
}
std::stringstream s;
s << "ipc://" << address.sun_path;
addr_ = s.str ();
return 0;
}
const sockaddr *zmq::ipc_address_t::addr () const
{
return (sockaddr*) &address;
......
......@@ -21,6 +21,8 @@
#ifndef __ZMQ_IPC_ADDRESS_HPP_INCLUDED__
#define __ZMQ_IPC_ADDRESS_HPP_INCLUDED__
#include <string>
#include "platform.hpp"
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
......@@ -36,11 +38,15 @@ namespace zmq
public:
ipc_address_t ();
ipc_address_t (const sockaddr *sa, socklen_t sa_len);
~ipc_address_t ();
// This function sets up the address for UNIX domain transport.
int resolve (const char* path_);
// The opposite to resolve()
int to_string (std::string &addr_);
const sockaddr *addr () const;
socklen_t addrlen () const;
......
......@@ -98,22 +98,15 @@ void zmq::ipc_listener_t::in_event ()
int zmq::ipc_listener_t::get_address (std::string &addr_)
{
struct sockaddr_storage ss;
int rc;
// Get the details of the IPC socket
socklen_t sl = sizeof (ss);
rc = getsockname (s, (sockaddr *) &ss, &sl);
int rc = getsockname (s, (sockaddr *) &ss, &sl);
if (rc != 0) {
addr_.clear ();
return rc;
}
// Store the address for retrieval by users using wildcards
addr_ = std::string ("ipc://");
struct sockaddr_un saddr;
memcpy (&saddr, &ss, sizeof (saddr));
addr_.append (saddr.sun_path);
return 0;
ipc_address_t addr ((struct sockaddr *) &ss, sl);
return addr.to_string (addr_);
}
int zmq::ipc_listener_t::set_address (const char *addr_)
......
......@@ -30,6 +30,7 @@ zmq::options_t::options_t () :
rcvhwm (1000),
affinity (0),
identity_size (0),
last_endpoint_id(NULL),
rate (100),
recovery_ivl (10000),
multicast_hops (1),
......@@ -529,6 +530,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
memcpy (optval_, last_endpoint.c_str(), last_endpoint.size()+1);
*optvallen_ = last_endpoint.size()+1;
return 0;
case ZMQ_LAST_ENDPOINT_ID:
if (*optvallen_ < sizeof (void *)) {
errno = EINVAL;
return -1;
}
*((void **) optval_) = last_endpoint_id;
*optvallen_ = sizeof (void *);
return 0;
}
errno = EINVAL;
......
......@@ -53,6 +53,8 @@ namespace zmq
// Last socket endpoint URI
std::string last_endpoint;
// Last socket endpoint ID
void *last_endpoint_id;
// Maximum tranfer rate [kb/s]. Default 100kb/s.
int rate;
......
......@@ -80,6 +80,11 @@ void zmq::own_t::launch_child (own_t *object_)
send_own (this, object_);
}
void zmq::own_t::term_child (own_t *object_)
{
process_term_req (object_);
}
void zmq::own_t::process_term_req (own_t *object_)
{
// When shutting down we can ignore termination requests from owned
......
......@@ -70,6 +70,9 @@ namespace zmq
// Launch the supplied object and become its owner.
void launch_child (own_t *object_);
// Terminate owned object
void term_child (own_t *object_);
// Ask owner object to terminate this object. It may take a while
// while actual termination is started. This function should not be
// called more than once.
......
......@@ -318,11 +318,16 @@ int zmq::socket_base_t::bind (const char *addr_)
if (protocol == "inproc") {
endpoint_t endpoint = {this, options};
return register_endpoint (addr_, endpoint);
int rc = register_endpoint (addr_, endpoint);
if (rc == 0) {
// Save last endpoint info
options.last_endpoint.clear ();
options.last_endpoint_id = NULL;
}
return rc;
}
if (protocol == "pgm" || protocol == "epgm") {
// For convenience's sake, bind can be used interchageable with
// connect for PGM and EPGM transports.
return connect (addr_);
......@@ -346,7 +351,10 @@ int zmq::socket_base_t::bind (const char *addr_)
return -1;
}
rc = listener->get_address (options.last_endpoint);
// Save last endpoint info
options.last_endpoint_id = (void *) ((own_t *) listener);
listener->get_address (options.last_endpoint);
launch_child (listener);
return 0;
}
......@@ -362,7 +370,10 @@ int zmq::socket_base_t::bind (const char *addr_)
return -1;
}
rc = listener->get_address (options.last_endpoint);
// Save last endpoint info
options.last_endpoint_id = (void *) ((own_t *) listener);
listener->get_address (options.last_endpoint);
launch_child (listener);
return 0;
}
......@@ -454,6 +465,10 @@ int zmq::socket_base_t::connect (const char *addr_)
// increased here.
send_bind (peer.socket, pipes [1], false);
// Save last endpoint info
options.last_endpoint.clear ();
options.last_endpoint_id = NULL;
return 0;
}
......@@ -514,6 +529,10 @@ int zmq::socket_base_t::connect (const char *addr_)
// Attach remote end of the pipe to the session object later on.
session->attach_pipe (pipes [1]);
// Save last endpoint info
paddr->to_string (options.last_endpoint);
options.last_endpoint_id = (void *) ((own_t *) session);
// Activate the session. Make it a child of this socket.
launch_child (session);
......@@ -586,6 +605,16 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
return 0;
}
int zmq::socket_base_t::term_endpoint (void *ep_)
{
if (unlikely (ctx_terminated)) {
errno = ETERM;
return -1;
}
term_child ((own_t *) ep_);
return 0;
}
int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
{
// Check whether the library haven't been shut down yet.
......
......@@ -71,6 +71,7 @@ namespace zmq
int getsockopt (int option_, void *optval_, size_t *optvallen_);
int bind (const char *addr_);
int connect (const char *addr_);
int term_endpoint (void *ep_);
int send (zmq::msg_t *msg_, int flags_);
int recv (zmq::msg_t *msg_, int flags_);
int close ();
......
......@@ -19,8 +19,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string.h>
#include <string>
#include <sstream>
#include "tcp_address.hpp"
#include "platform.hpp"
......@@ -251,9 +251,8 @@ int zmq::tcp_address_t::resolve_interface (const char *interface_,
int rc = resolve_nic_name (interface_, ipv4only_);
if (rc != 0 && errno != ENODEV)
return rc;
if (rc == 0) {
if (rc == 0)
return 0;
}
// There's no such interface name. Assume literal address.
#if defined ZMQ_HAVE_OPENVMS && defined __ia64
......@@ -367,6 +366,19 @@ zmq::tcp_address_t::tcp_address_t ()
memset (&address, 0, sizeof (address));
}
zmq::tcp_address_t::tcp_address_t (const sockaddr *sa, socklen_t sa_len)
{
zmq_assert(sa && sa_len > 0);
memset (&address, 0, sizeof (address));
if (sa->sa_family == AF_INET && sa_len >= sizeof (address.ipv4)) {
memcpy(&address.ipv4, sa, sizeof (address.ipv4));
}
else if (sa->sa_family == AF_INET6 && sa_len >= sizeof (address.ipv6)) {
memcpy(&address.ipv6, sa, sizeof (address.ipv6));
}
}
zmq::tcp_address_t::~tcp_address_t ()
{
}
......@@ -421,6 +433,34 @@ int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv4only_)
return 0;
}
int zmq::tcp_address_t::to_string (std::string &addr_)
{
if (address.generic.sa_family != AF_INET && address.generic.sa_family != AF_INET6) {
addr_.clear ();
return -1;
}
// not using service resolv because of https://github.com/zeromq/libzmq/commit/1824574f9b5a8ce786853320e3ea09fe1f822bc4
char hbuf[NI_MAXHOST];
int rc = getnameinfo (addr (), addrlen (), hbuf, sizeof (hbuf), NULL, 0, NI_NUMERICHOST);
if (rc != 0) {
addr_.clear ();
return rc;
}
if (address.generic.sa_family == AF_INET6) {
std::stringstream s;
s << "tcp://[" << hbuf << "]:" << ntohs (address.ipv6.sin6_port);
addr_ = s.str ();
}
else {
std::stringstream s;
s << "tcp://" << hbuf << ":" << ntohs (address.ipv4.sin_port);
addr_ = s.str ();
};
return 0;
}
const sockaddr *zmq::tcp_address_t::addr () const
{
return &address.generic;
......@@ -504,6 +544,37 @@ int zmq::tcp_address_mask_t::resolve (const char *name_, bool ipv4only_)
return 0;
}
int zmq::tcp_address_mask_t::to_string (std::string &addr_)
{
if (address.generic.sa_family != AF_INET && address.generic.sa_family != AF_INET6) {
addr_.clear ();
return -1;
}
if (address_mask == -1) {
addr_.clear ();
return -1;
}
char hbuf[NI_MAXHOST];
int rc = getnameinfo (addr (), addrlen (), hbuf, sizeof (hbuf), NULL, 0, NI_NUMERICHOST);
if (rc != 0) {
addr_.clear ();
return rc;
}
if (address.generic.sa_family == AF_INET6) {
std::stringstream s;
s << "[" << hbuf << "]/" << address_mask;
addr_ = s.str ();
}
else {
std::stringstream s;
s << hbuf << "/" << address_mask;
addr_ = s.str ();
};
return 0;
}
const bool zmq::tcp_address_mask_t::match_address (const struct sockaddr *ss, const socklen_t ss_len) const
{
zmq_assert (address_mask != -1 && ss != NULL && ss_len >= sizeof(struct sockaddr));
......
......@@ -39,6 +39,7 @@ namespace zmq
public:
tcp_address_t ();
tcp_address_t (const sockaddr *sa, socklen_t sa_len);
~tcp_address_t ();
// This function translates textual TCP address into an address
......@@ -47,6 +48,9 @@ namespace zmq
// If 'ipv4only' is true, the name will never resolve to IPv6 address.
int resolve (const char* name_, bool local_, bool ipv4only_);
// The opposite to resolve()
virtual int to_string (std::string &addr_);
#if defined ZMQ_HAVE_WINDOWS
unsigned short family () const;
#else
......@@ -79,6 +83,9 @@ namespace zmq
// Works only with remote hostnames.
int resolve (const char* name_, bool ipv4only_);
// The opposite to resolve()
int to_string (std::string &addr_);
const int mask () const;
const bool match_address (const struct sockaddr *ss, const socklen_t ss_len) const;
......
......@@ -21,8 +21,7 @@
#include <new>
#include <string.h>
#include <sstream>
#include <string>
#include "platform.hpp"
#include "tcp_listener.hpp"
......@@ -122,36 +121,18 @@ void zmq::tcp_listener_t::close ()
int zmq::tcp_listener_t::get_address (std::string &addr_)
{
struct sockaddr_storage ss;
char host [NI_MAXHOST];
int rc;
std::stringstream address;
// Get the details of the TCP socket
struct sockaddr_storage ss;
socklen_t sl = sizeof (ss);
rc = getsockname (s, (struct sockaddr *) &ss, &sl);
if (rc != 0) {
return rc;
}
int rc = getsockname (s, (struct sockaddr *) &ss, &sl);
rc = getnameinfo ((struct sockaddr *) &ss, sl, host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
if (rc != 0) {
addr_.clear ();
return rc;
}
if (ss.ss_family == AF_INET) {
struct sockaddr_in sa = {0};
memcpy (&sa, &ss, sizeof (sa));
address << "tcp://" << host << ":" << ntohs (sa.sin_port);
} else {
struct sockaddr_in6 sa = {0};
memcpy (&sa, &ss, sizeof (sa));
address << "tcp://[" << host << "]:" << ntohs (sa.sin6_port);
}
addr_ = address.str ();
return 0;
tcp_address_t addr ((struct sockaddr *) &ss, sl);
return addr.to_string (addr_);
}
int zmq::tcp_listener_t::set_address (const char *addr_)
......@@ -203,7 +184,6 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
errno_assert (rc == 0);
#endif
// Bind the socket to the network interface and port.
rc = bind (s, address.addr (), address.addrlen ());
#ifdef ZMQ_HAVE_WINDOWS
......
......@@ -293,6 +293,26 @@ int zmq_connect (void *s_, const char *addr_)
return result;
}
int zmq_unbind (void *s_, void *ep_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
return s->term_endpoint (ep_);
}
int zmq_disconnect (void *s_, void *ep_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
return s->term_endpoint (ep_);
}
// Sending functions.
static int
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment