Commit 70bc7dd9 authored by Doron Somech's avatar Doron Somech

problem: zeromq doesn't has a thread-safe peer to peer socket

Solution: a new socket type, called PEER. Very similar to SERVER, but can only connect to other PEERs. Also a new zmq_connect_peer method, that connect and return a routing-id in thread-safe and atomic operation
parent 05194eb5
...@@ -156,12 +156,14 @@ test_ws_transport ...@@ -156,12 +156,14 @@ test_ws_transport
test_wss_transport test_wss_transport
test_socks test_socks
test_xpub_manual_last_value test_xpub_manual_last_value
test_peer
unittest_ip_resolver unittest_ip_resolver
unittest_mtrie unittest_mtrie
unittest_poller unittest_poller
unittest_radix_tree unittest_radix_tree
unittest_udp_address unittest_udp_address
unittest_ypipe unittest_ypipe
unittests/unittest_curve_encoding
tests/test*.log tests/test*.log
tests/test*.trs tests/test*.trs
unittests/unittest*.log unittests/unittest*.log
...@@ -205,4 +207,4 @@ core ...@@ -205,4 +207,4 @@ core
build build
test-suite.log test-suite.log
.idea/ .idea/
cmake-build-debug/ cmake-build-debug/
\ No newline at end of file
...@@ -822,6 +822,7 @@ set(cxx-sources ...@@ -822,6 +822,7 @@ set(cxx-sources
own.cpp own.cpp
null_mechanism.cpp null_mechanism.cpp
pair.cpp pair.cpp
peer.cpp
pgm_receiver.cpp pgm_receiver.cpp
pgm_sender.cpp pgm_sender.cpp
pgm_socket.cpp pgm_socket.cpp
...@@ -947,6 +948,7 @@ set(cxx-sources ...@@ -947,6 +948,7 @@ set(cxx-sources
options.hpp options.hpp
own.hpp own.hpp
pair.hpp pair.hpp
peer.hpp
pgm_receiver.hpp pgm_receiver.hpp
pgm_sender.hpp pgm_sender.hpp
pgm_socket.hpp pgm_socket.hpp
......
...@@ -126,6 +126,8 @@ src_libzmq_la_SOURCES = \ ...@@ -126,6 +126,8 @@ src_libzmq_la_SOURCES = \
src/own.hpp \ src/own.hpp \
src/pair.cpp \ src/pair.cpp \
src/pair.hpp \ src/pair.hpp \
src/peer.cpp \
src/peer.hpp \
src/pgm_receiver.cpp \ src/pgm_receiver.cpp \
src/pgm_receiver.hpp \ src/pgm_receiver.hpp \
src/pgm_sender.cpp \ src/pgm_sender.cpp \
...@@ -1034,7 +1036,8 @@ test_apps += tests/test_poller \ ...@@ -1034,7 +1036,8 @@ test_apps += tests/test_poller \
tests/test_dgram \ tests/test_dgram \
tests/test_app_meta \ tests/test_app_meta \
tests/test_xpub_manual_last_value \ tests/test_xpub_manual_last_value \
tests/test_router_notify tests/test_router_notify \
tests/test_peer
tests_test_poller_SOURCES = tests/test_poller.cpp tests_test_poller_SOURCES = tests/test_poller.cpp
tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
...@@ -1075,6 +1078,10 @@ tests_test_app_meta_CPPFLAGS = ${TESTUTIL_CPPFLAGS} ...@@ -1075,6 +1078,10 @@ tests_test_app_meta_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_router_notify_SOURCES = tests/test_router_notify.cpp tests_test_router_notify_SOURCES = tests/test_router_notify.cpp
tests_test_router_notify_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_router_notify_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_router_notify_CPPFLAGS = ${TESTUTIL_CPPFLAGS} tests_test_router_notify_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_peer_SOURCES = tests/test_peer.cpp
tests_test_peer_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_peer_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
endif endif
if ENABLE_STATIC if ENABLE_STATIC
......
# #
# documentation # documentation
# #
MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_disconnect.3 zmq_close.3 \ MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_connect_peer.3 zmq_disconnect.3 zmq_close.3 \
zmq_ctx_new.3 zmq_ctx_term.3 zmq_ctx_get.3 zmq_ctx_set.3 zmq_ctx_shutdown.3 \ zmq_ctx_new.3 zmq_ctx_term.3 zmq_ctx_get.3 zmq_ctx_set.3 zmq_ctx_shutdown.3 \
zmq_msg_init.3 zmq_msg_init_data.3 zmq_msg_init_size.3 \ zmq_msg_init.3 zmq_msg_init_data.3 zmq_msg_init_size.3 \
zmq_msg_move.3 zmq_msg_copy.3 zmq_msg_size.3 zmq_msg_data.3 zmq_msg_close.3 \ zmq_msg_move.3 zmq_msg_copy.3 zmq_msg_size.3 zmq_msg_data.3 zmq_msg_close.3 \
......
zmq_connect_peer(3)
===================
NAME
----
zmq_connect_peer - create outgoing connection from socket and return the connection routing id in thread-safe and atomic way.
SYNOPSIS
--------
*uint32_t zmq_connect_peer (void '*socket', const char '*endpoint');*
DESCRIPTION
-----------
The _zmq_connect_peer()_ function connects a 'ZMQ_PEER' socket to an 'endpoint' and then returns the endpoint 'routing_id'.
The 'endpoint' is a string consisting of a 'transport'`://` followed by an
'address'. The 'transport' specifies the underlying protocol to use. The
'address' specifies the transport-specific address to connect to.
The function is supported only on the 'ZMQ_PEER' socket type and would return `0` with 'errno' set to 'ENOTSUP' otherwise.
The _zmq_connect_peer()_ support the following transports:
'tcp':: unicast transport using TCP, see linkzmq:zmq_tcp[7]
'ipc':: local inter-process communication transport, see linkzmq:zmq_ipc[7]
'inproc':: local in-process (inter-thread) communication transport, see linkzmq:zmq_inproc[7]
'ws':: unicast transport using WebSockets, see linkzmq:zmq_ws[7]
'wss':: unicast transport using WebSockets over TLS, see linkzmq:zmq_wss[7]
RETURN VALUE
------------
The _zmq_connect_peer()_ function returns the peer 'routing_id' if successful. Otherwise it returns
`0` and sets 'errno' to one of the values defined below.
ERRORS
------
*EINVAL*::
The endpoint supplied is invalid.
*EPROTONOSUPPORT*::
The requested 'transport' protocol is not supported with 'ZMQ_PEER'.
*ENOCOMPATPROTO*::
The requested 'transport' protocol is not compatible with the socket type.
*ETERM*::
The 0MQ 'context' associated with the specified 'socket' was terminated.
*ENOTSOCK*::
The provided 'socket' was invalid.
*EMTHREAD*::
No I/O thread is available to accomplish the task.
*ENOTSUP*::
The socket is not of type 'ZMQ_PEER'.
*EFAULT*::
The 'ZMQ_IMMEDIATE' option is set on the socket.
EXAMPLE
-------
.Connecting a peer socket to a TCP transport and sending a message
----
/* Create a ZMQ_SUB socket */
void *socket = zmq_socket (context, ZMQ_PEER);
assert (socket);
/* Connect it to the host server001, port 5555 using a TCP transport */
uint32_t routing_id = zmq_connect (socket, "tcp://server001:5555");
assert (routing_id == 0);
/* Sending a message to the peer */
zmq_msg_t msg;
int rc = zmq_msg_init_data (&msg, "HELLO", 5, NULL, NULL);
assert (rc == 0);
rc = zmq_msg_set_routing_id (&msg, routing_id);
assert (rc == 0);
rc = zmq_msg_send (&msg, socket, 0);
assert (rc == 5);
rc = zmq_msg_close (&msg);
assert (rc == 0);
----
SEE ALSO
--------
linkzmq:zmq_connect[3]
linkzmq:zmq_bind[3]
linkzmq:zmq_socket[3]
linkzmq:zmq[7]
AUTHORS
-------
This page was written by the 0MQ community. To make a change please
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.
...@@ -59,6 +59,7 @@ Following are the thread safe sockets: ...@@ -59,6 +59,7 @@ Following are the thread safe sockets:
* ZMQ_RADIO * ZMQ_RADIO
* ZMQ_SCATTER * ZMQ_SCATTER
* ZMQ_GATHER * ZMQ_GATHER
* ZMQ_PEER
.Socket types .Socket types
The following sections present the socket types defined by 0MQ, grouped by the The following sections present the socket types defined by 0MQ, grouped by the
...@@ -434,6 +435,48 @@ Outgoing routing strategy:: N/A ...@@ -434,6 +435,48 @@ Outgoing routing strategy:: N/A
Action in mute state:: Block Action in mute state:: Block
Peer-to-peer pattern
~~~~~~~~~~~~~~~~~~~~~~
The peer-to-peer pattern is used to connect a peer to multiple peers.
Peer can both connect and bind and mix both of them with the same socket.
The peer-to-peer pattern is useful to build peer-to-peer networks (e.g zyre, bitcoin, torrent)
where a peer can both accept connections from other peers or connect to them.
NOTE: Peer-to-peer is still in draft phase.
ZMQ_PEER
^^^^^^^^
A 'ZMQ_PEER' socket talks to a set of 'ZMQ_PEER' sockets.
To connect and fetch the 'routing_id' of the peer use linkzmq:zmq_connect_peer[3].
Each received message has a 'routing_id' that is a 32-bit unsigned integer.
The application can fetch this with linkzmq:zmq_msg_routing_id[3].
To send a message to a given 'ZMQ_PEER' peer the application must set the peer's
'routing_id' on the message, using linkzmq:zmq_msg_set_routing_id[3].
If the 'routing_id' is not specified, or does not refer to a connected client
peer, the send call will fail with EHOSTUNREACH. If the outgoing buffer for
the peer is full, the send call shall block, unless ZMQ_DONTWAIT is
used in the send, in which case it shall fail with EAGAIN. The 'ZMQ_PEER'
socket shall not drop messages in any case.
NOTE: 'ZMQ_PEER' sockets are threadsafe. They do not accept the ZMQ_SNDMORE
option on sends not ZMQ_RCVMORE on receives. This limits them to single part
data.
[horizontal]
.Summary of ZMQ_PEER characteristics
Compatible peer sockets:: 'ZMQ_PEER'
Direction:: Bidirectional
Send/receive pattern:: Unrestricted
Outgoing routing strategy:: See text
Incoming routing strategy:: Fair-queued
Action in mute state:: Return EAGAIN
Native Pattern Native Pattern
~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~
The native pattern is used for communicating with TCP peers and allows The native pattern is used for communicating with TCP peers and allows
......
...@@ -263,7 +263,7 @@ typedef struct zmq_msg_t ...@@ -263,7 +263,7 @@ typedef struct zmq_msg_t
#endif #endif
} zmq_msg_t; } zmq_msg_t;
typedef void(zmq_free_fn) (void *data_, void *hint_); typedef void (zmq_free_fn) (void *data_, void *hint_);
ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg_); ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg_);
ZMQ_EXPORT int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_); ZMQ_EXPORT int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_);
...@@ -597,7 +597,7 @@ ZMQ_EXPORT void zmq_atomic_counter_destroy (void **counter_p_); ...@@ -597,7 +597,7 @@ ZMQ_EXPORT void zmq_atomic_counter_destroy (void **counter_p_);
#define ZMQ_HAVE_TIMERS #define ZMQ_HAVE_TIMERS
typedef void(zmq_timer_fn) (int timer_id, void *arg); typedef void (zmq_timer_fn) (int timer_id, void *arg);
ZMQ_EXPORT void *zmq_timers_new (void); ZMQ_EXPORT void *zmq_timers_new (void);
ZMQ_EXPORT int zmq_timers_destroy (void **timers_p); ZMQ_EXPORT int zmq_timers_destroy (void **timers_p);
...@@ -634,7 +634,7 @@ ZMQ_EXPORT unsigned long zmq_stopwatch_stop (void *watch_); ...@@ -634,7 +634,7 @@ ZMQ_EXPORT unsigned long zmq_stopwatch_stop (void *watch_);
/* Sleeps for specified number of seconds. */ /* Sleeps for specified number of seconds. */
ZMQ_EXPORT void zmq_sleep (int seconds_); ZMQ_EXPORT void zmq_sleep (int seconds_);
typedef void(zmq_thread_fn) (void *); typedef void (zmq_thread_fn) (void *);
/* Start a thread. Returns a handle to the thread. */ /* Start a thread. Returns a handle to the thread. */
ZMQ_EXPORT void *zmq_threadstart (zmq_thread_fn *func_, void *arg_); ZMQ_EXPORT void *zmq_threadstart (zmq_thread_fn *func_, void *arg_);
...@@ -658,6 +658,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_); ...@@ -658,6 +658,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);
#define ZMQ_GATHER 16 #define ZMQ_GATHER 16
#define ZMQ_SCATTER 17 #define ZMQ_SCATTER 17
#define ZMQ_DGRAM 18 #define ZMQ_DGRAM 18
#define ZMQ_PEER 19
/* DRAFT Socket options. */ /* DRAFT Socket options. */
#define ZMQ_ZAP_ENFORCE_DOMAIN 93 #define ZMQ_ZAP_ENFORCE_DOMAIN 93
...@@ -694,6 +695,7 @@ ZMQ_EXPORT int zmq_ctx_get_ext (void *context_, ...@@ -694,6 +695,7 @@ ZMQ_EXPORT int zmq_ctx_get_ext (void *context_,
/* DRAFT Socket methods. */ /* DRAFT Socket methods. */
ZMQ_EXPORT int zmq_join (void *s, const char *group); ZMQ_EXPORT int zmq_join (void *s, const char *group);
ZMQ_EXPORT int zmq_leave (void *s, const char *group); ZMQ_EXPORT int zmq_leave (void *s, const char *group);
ZMQ_EXPORT uint32_t zmq_connect_peer (void *s_, const char *addr_);
/* DRAFT Msg methods. */ /* DRAFT Msg methods. */
ZMQ_EXPORT int zmq_msg_set_routing_id (zmq_msg_t *msg, uint32_t routing_id); ZMQ_EXPORT int zmq_msg_set_routing_id (zmq_msg_t *msg, uint32_t routing_id);
......
...@@ -93,6 +93,7 @@ const char socket_type_dish[] = "DISH"; ...@@ -93,6 +93,7 @@ const char socket_type_dish[] = "DISH";
const char socket_type_gather[] = "GATHER"; const char socket_type_gather[] = "GATHER";
const char socket_type_scatter[] = "SCATTER"; const char socket_type_scatter[] = "SCATTER";
const char socket_type_dgram[] = "DGRAM"; const char socket_type_dgram[] = "DGRAM";
const char socket_type_peer[] = "PEER";
#endif #endif
const char *zmq::mechanism_t::socket_type_string (int socket_type_) const char *zmq::mechanism_t::socket_type_string (int socket_type_)
...@@ -106,7 +107,7 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type_) ...@@ -106,7 +107,7 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type_)
#ifdef ZMQ_BUILD_DRAFT_API #ifdef ZMQ_BUILD_DRAFT_API
socket_type_server, socket_type_client, socket_type_radio, socket_type_server, socket_type_client, socket_type_radio,
socket_type_dish, socket_type_gather, socket_type_scatter, socket_type_dish, socket_type_gather, socket_type_scatter,
socket_type_dgram socket_type_dgram, socket_type_peer
#endif #endif
}; };
static const size_t names_count = sizeof (names) / sizeof (names[0]); static const size_t names_count = sizeof (names) / sizeof (names[0]);
...@@ -353,6 +354,8 @@ bool zmq::mechanism_t::check_socket_type (const char *type_, ...@@ -353,6 +354,8 @@ bool zmq::mechanism_t::check_socket_type (const char *type_,
return strequals (type_, len_, socket_type_gather); return strequals (type_, len_, socket_type_gather);
case ZMQ_DGRAM: case ZMQ_DGRAM:
return strequals (type_, len_, socket_type_dgram); return strequals (type_, len_, socket_type_dgram);
case ZMQ_PEER:
return strequals (type_, len_, socket_type_peer);
#endif #endif
default: default:
break; break;
......
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "precompiled.hpp"
#include "macros.hpp"
#include "peer.hpp"
#include "pipe.hpp"
#include "wire.hpp"
#include "random.hpp"
#include "likely.hpp"
#include "err.hpp"
zmq::peer_t::peer_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
server_t (parent_, tid_, sid_)
{
options.type = ZMQ_PEER;
}
uint32_t zmq::peer_t::connect_peer (const char *endpoint_uri_)
{
scoped_optional_lock_t sync_lock (&_sync);
// connect_peer cannot work with immediate enabled
if (options.immediate == 1) {
errno = EFAULT;
return 0;
}
int rc = socket_base_t::connect_internal (endpoint_uri_);
if (rc != 0)
return 0;
return _peer_last_routing_id;
}
void zmq::peer_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{
server_t::xattach_pipe (pipe_, subscribe_to_all_, locally_initiated_);
_peer_last_routing_id = pipe_->get_server_socket_routing_id ();
}
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_PEER_HPP_INCLUDED__
#define __ZMQ_PEER_HPP_INCLUDED__
#include <map>
#include "socket_base.hpp"
#include "server.hpp"
#include "session_base.hpp"
#include "stdint.hpp"
#include "blob.hpp"
#include "fq.hpp"
namespace zmq
{
class ctx_t;
class msg_t;
class pipe_t;
class peer_t ZMQ_FINAL : public server_t
{
public:
peer_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
uint32_t connect_peer (const char *endpoint_uri_);
private:
uint32_t _peer_last_routing_id;
ZMQ_NON_COPYABLE_NOR_MOVABLE (peer_t)
};
}
#endif
...@@ -45,7 +45,7 @@ class msg_t; ...@@ -45,7 +45,7 @@ class msg_t;
class pipe_t; class pipe_t;
// TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm. // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
class server_t ZMQ_FINAL : public socket_base_t class server_t : public socket_base_t
{ {
public: public:
server_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); server_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
......
...@@ -87,6 +87,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, ...@@ -87,6 +87,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
case ZMQ_GATHER: case ZMQ_GATHER:
case ZMQ_SCATTER: case ZMQ_SCATTER:
case ZMQ_DGRAM: case ZMQ_DGRAM:
case ZMQ_PEER:
s = new (std::nothrow) s = new (std::nothrow)
session_base_t (io_thread_, active_, socket_, options_, addr_); session_base_t (io_thread_, active_, socket_, options_, addr_);
break; break;
......
...@@ -100,6 +100,7 @@ ...@@ -100,6 +100,7 @@
#include "gather.hpp" #include "gather.hpp"
#include "scatter.hpp" #include "scatter.hpp"
#include "dgram.hpp" #include "dgram.hpp"
#include "peer.hpp"
void zmq::socket_base_t::inprocs_t::emplace (const char *endpoint_uri_, void zmq::socket_base_t::inprocs_t::emplace (const char *endpoint_uri_,
pipe_t *pipe_) pipe_t *pipe_)
...@@ -207,6 +208,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, ...@@ -207,6 +208,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_,
case ZMQ_DGRAM: case ZMQ_DGRAM:
s = new (std::nothrow) dgram_t (parent_, tid_, sid_); s = new (std::nothrow) dgram_t (parent_, tid_, sid_);
break; break;
case ZMQ_PEER:
s = new (std::nothrow) peer_t (parent_, tid_, sid_);
break;
default: default:
errno = EINVAL; errno = EINVAL;
return NULL; return NULL;
...@@ -228,6 +232,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, ...@@ -228,6 +232,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_,
int sid_, int sid_,
bool thread_safe_) : bool thread_safe_) :
own_t (parent_, tid_), own_t (parent_, tid_),
_sync (),
_tag (0xbaddecaf), _tag (0xbaddecaf),
_ctx_terminated (false), _ctx_terminated (false),
_destroyed (false), _destroyed (false),
...@@ -240,7 +245,6 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, ...@@ -240,7 +245,6 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_,
_monitor_events (0), _monitor_events (0),
_thread_safe (thread_safe_), _thread_safe (thread_safe_),
_reaper_signaler (NULL), _reaper_signaler (NULL),
_sync (),
_monitor_sync () _monitor_sync ()
{ {
options.socket_id = sid_; options.socket_id = sid_;
...@@ -740,7 +744,11 @@ int zmq::socket_base_t::bind (const char *endpoint_uri_) ...@@ -740,7 +744,11 @@ int zmq::socket_base_t::bind (const char *endpoint_uri_)
int zmq::socket_base_t::connect (const char *endpoint_uri_) int zmq::socket_base_t::connect (const char *endpoint_uri_)
{ {
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL); scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
return connect_internal (endpoint_uri_);
}
int zmq::socket_base_t::connect_internal (const char *endpoint_uri_)
{
if (unlikely (_ctx_terminated)) { if (unlikely (_ctx_terminated)) {
errno = ETERM; errno = ETERM;
return -1; return -1;
......
...@@ -205,6 +205,11 @@ class socket_base_t : public own_t, ...@@ -205,6 +205,11 @@ class socket_base_t : public own_t,
// Delay actual destruction of the socket. // Delay actual destruction of the socket.
void process_destroy () ZMQ_FINAL; void process_destroy () ZMQ_FINAL;
int connect_internal (const char *endpoint_uri_);
// Mutex for synchronize access to the socket in thread safe mode
mutex_t _sync;
private: private:
// test if event should be sent and then dispatch it // test if event should be sent and then dispatch it
void event (const endpoint_uri_pair_t &endpoint_uri_pair_, void event (const endpoint_uri_pair_t &endpoint_uri_pair_,
...@@ -336,9 +341,6 @@ class socket_base_t : public own_t, ...@@ -336,9 +341,6 @@ class socket_base_t : public own_t,
// Signaler to be used in the reaping stage // Signaler to be used in the reaping stage
signaler_t *_reaper_signaler; signaler_t *_reaper_signaler;
// Mutex for synchronize access to the socket in thread safe mode
mutex_t _sync;
// Mutex to synchronize access to the monitor Pair socket // Mutex to synchronize access to the monitor Pair socket
mutex_t _monitor_sync; mutex_t _monitor_sync;
......
...@@ -40,6 +40,7 @@ ...@@ -40,6 +40,7 @@
#include "macros.hpp" #include "macros.hpp"
#include "poller.hpp" #include "poller.hpp"
#include "peer.hpp"
#if !defined ZMQ_HAVE_POLLER #if !defined ZMQ_HAVE_POLLER
// On AIX platform, poll.h has to be included first to get consistent // On AIX platform, poll.h has to be included first to get consistent
...@@ -336,6 +337,28 @@ int zmq_connect (void *s_, const char *addr_) ...@@ -336,6 +337,28 @@ int zmq_connect (void *s_, const char *addr_)
return s->connect (addr_); return s->connect (addr_);
} }
uint32_t zmq_connect_peer (void *s_, const char *addr_)
{
zmq::peer_t *s = static_cast<zmq::peer_t *> (s_);
if (!s_ || !s->check_tag ()) {
errno = ENOTSOCK;
return 0;
}
int socket_type;
size_t socket_type_size = sizeof (socket_type);
if (s->getsockopt (ZMQ_TYPE, &socket_type, &socket_type_size) != 0)
return 0;
if (socket_type != ZMQ_PEER) {
errno = ENOTSUP;
return 0;
}
return s->connect_peer (addr_);
}
int zmq_unbind (void *s_, const char *addr_) int zmq_unbind (void *s_, const char *addr_)
{ {
zmq::socket_base_t *s = as_socket_base_t (s_); zmq::socket_base_t *s = as_socket_base_t (s_);
......
...@@ -45,6 +45,7 @@ ...@@ -45,6 +45,7 @@
#define ZMQ_GATHER 16 #define ZMQ_GATHER 16
#define ZMQ_SCATTER 17 #define ZMQ_SCATTER 17
#define ZMQ_DGRAM 18 #define ZMQ_DGRAM 18
#define ZMQ_PEER 19
/* DRAFT Socket options. */ /* DRAFT Socket options. */
#define ZMQ_ZAP_ENFORCE_DOMAIN 93 #define ZMQ_ZAP_ENFORCE_DOMAIN 93
......
...@@ -162,6 +162,7 @@ if(ENABLE_DRAFTS) ...@@ -162,6 +162,7 @@ if(ENABLE_DRAFTS)
test_app_meta test_app_meta
test_router_notify test_router_notify
test_xpub_manual_last_value test_xpub_manual_last_value
test_peer
) )
endif() endif()
......
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
SETUP_TEARDOWN_TESTCONTEXT
void test_peer ()
{
size_t len = MAX_SOCKET_STRING;
char my_endpoint[MAX_SOCKET_STRING];
void *peer1 = test_context_socket (ZMQ_PEER);
bind_loopback (peer1, false, my_endpoint, len);
void *peer2 = test_context_socket (ZMQ_PEER);
uint32_t peer1_routing_id = zmq_connect_peer (peer2, my_endpoint);
TEST_ASSERT_NOT_EQUAL (0, peer1_routing_id);
{
zmq_msg_t msg;
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, 1));
char *data = static_cast<char *> (zmq_msg_data (&msg));
data[0] = 1;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_msg_set_routing_id (&msg, peer1_routing_id));
int rc = zmq_msg_send (&msg, peer2, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
}
uint32_t peer2_routing_id;
{
zmq_msg_t msg;
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
int rc = TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, peer1, 0));
TEST_ASSERT_EQUAL_INT (1, rc);
peer2_routing_id = zmq_msg_routing_id (&msg);
TEST_ASSERT_NOT_EQUAL (0, peer2_routing_id);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
}
{
zmq_msg_t msg;
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, 1));
char *data = static_cast<char *> (zmq_msg_data (&msg));
data[0] = 2;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_msg_set_routing_id (&msg, peer2_routing_id));
int rc = zmq_msg_send (&msg, peer1, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
}
{
zmq_msg_t msg;
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
int rc = zmq_msg_recv (&msg, peer2, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
uint32_t routing_id = zmq_msg_routing_id (&msg);
TEST_ASSERT_EQUAL_UINT32 (peer1_routing_id, routing_id);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
}
test_context_socket_close (peer1);
test_context_socket_close (peer2);
}
int main (void)
{
setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test_peer);
return UNITY_END ();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment