Commit 03a1b0e2 authored by Telford Berkey's avatar Telford Berkey

Merge remote-tracking branch 'upstream/master'

parents fa1f676c 94b0ad3e
......@@ -137,3 +137,4 @@ zeromq-*.tar.gz
zeromq-*.zip
core
build
test-suite.log
......@@ -334,7 +334,6 @@ test_apps = \
tests/test_connect_rid \
tests/test_bind_src_address \
tests/test_metadata \
tests/test_id2fd \
tests/test_capabilities \
tests/test_xpub_nodrop \
tests/test_xpub_manual \
......
......@@ -271,7 +271,7 @@ case "${host_os}" in
CPPFLAGS="-D_GNU_SOURCE $CPPFLAGS"
AC_DEFINE(ZMQ_HAVE_CYGWIN, 1, [Have Cygwin])
libzmq_on_cygwin="yes"
libzmq_dso_visibility="no"
libzmq_dso_visibility="no"
if test "x$enable_static" = "xyes"; then
AC_MSG_ERROR([Building static libraries is not supported under Cygwin])
fi
......@@ -436,10 +436,10 @@ AM_CONDITIONAL(HAVE_PGM, test "x$have_pgm_library" = "xyes")
# This uses "--with-norm" to point to the "norm" directory
# for "norm/include" and "norm/lib"
#(if "--with-norm=yes" is given, then assume installed on system)
AC_ARG_WITH([norm],
[AS_HELP_STRING([--with-norm],
AC_ARG_WITH([norm],
[AS_HELP_STRING([--with-norm],
[build libzmq with NORM protocol extension, optionally specifying norm path [default=no]])],
[with_norm_ext=$withval],
[with_norm_ext=$withval],
[with_norm_ext=no])
......@@ -481,7 +481,7 @@ AM_CONDITIONAL(ON_LINUX, test "x$libzmq_on_linux" = "xyes")
# Checks for library functions.
AC_TYPE_SIGNAL
AC_CHECK_FUNCS(perror gettimeofday clock_gettime memset socket getifaddrs freeifaddrs fork)
AC_CHECK_FUNCS(perror gettimeofday clock_gettime memset socket getifaddrs freeifaddrs fork posix_memalign)
AC_CHECK_HEADERS([alloca.h])
LIBZMQ_CHECK_SOCK_CLOEXEC([
......
......@@ -676,23 +676,6 @@ Option value unit:: N/A
Default value:: not set
Applicable socket types:: all, when using TCP transport
ZMQ_IDENTITY_FD: Retrieve FD associated with given identity
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_IDENTITY_FD' option shall retrieve the FD associated with given identity.
call _zmq_getsockopt()_ with _option_value_ / _option_len_ pointing to memory
holding the identity string. On return the start of _option_value_ buffer will be
filled with file descriptor of the pipe with given identity if found. If the identity
is not found ENOTSOCK is returned as _zmq_getsockopt()_ result. When the pipe is not
using FD as lower transport you might get -1 as FD. NB: _option_value_ must be always
big enough to hold sizeof(fd_t) bytes no matter how small the identity length is.
[horizontal]
Option value type:: character string/fd_t
Option value unit:: N/A
Default value:: not set
Applicable socket types:: ROUTER
RETURN VALUE
------------
......
......@@ -19,8 +19,22 @@ property specified by the 'property' argument for the message pointed to by
the 'message' argument. Both the 'property' argument and the 'value'
shall be NULL-terminated UTF8-encoded strings.
The following properties can be retrieved with the _zmq_msg_gets()_ function:
Metadata is defined on a per-connection basis during the ZeroMQ connection
handshake as specified in <rfc.zeromq.org/spec:37>.
The following ZMTP properties can be retrieved with the _zmq_msg_gets()_
function:
Socket-Type
Identity
Resource
Additionally, when available for the underlying transport, the *Peer-Address*
property will return the IP address of the remote endpoint as returned by
getnameinfo(2).
Other properties may be defined based on the underlying security mechanism,
see ZAP authenticated connection sample below.
RETURN VALUE
------------
......
......@@ -195,7 +195,7 @@ ZMQ_EXPORT int zmq_ctx_destroy (void *context);
/* 0MQ message definition. */
/******************************************************************************/
typedef struct zmq_msg_t {unsigned char _ [48];} zmq_msg_t;
typedef struct zmq_msg_t {unsigned char _ [64];} zmq_msg_t;
typedef void (zmq_free_fn) (void *data, void *hint);
......@@ -292,7 +292,6 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL 64
#define ZMQ_GSSAPI_PLAINTEXT 65
#define ZMQ_HANDSHAKE_IVL 66
#define ZMQ_IDENTITY_FD 67
#define ZMQ_SOCKS_PROXY 68
#define ZMQ_XPUB_NODROP 69
#define ZMQ_BLOCKY 70
......
......@@ -33,7 +33,11 @@ namespace zmq
// This structure defines the commands that can be sent between threads.
#ifdef _MSC_VER
__declspec(align(64)) struct command_t
#else
struct command_t
#endif
{
// Object to process the command.
zmq::object_t *destination;
......@@ -59,7 +63,8 @@ namespace zmq
done
} type;
union {
union args_t
{
// Sent to I/O thread to let it know that it should
// terminate itself.
......@@ -146,8 +151,11 @@ namespace zmq
} done;
} args;
#ifdef _MSC_VER
};
}
#else
} __attribute__((aligned(64)));
#endif
}
#endif
......@@ -20,8 +20,6 @@
#ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__
#define __ZMQ_I_ENGINE_HPP_INCLUDED__
#include "fd.hpp"
namespace zmq
{
......@@ -49,10 +47,7 @@ namespace zmq
// are messages to send available.
virtual void restart_output () = 0;
virtual void zap_msg_available () = 0;
// provide a way to link from engine to file descriptor
virtual fd_t get_assoc_fd () { return retired_fd;};
virtual void zap_msg_available () = 0;
};
}
......
......@@ -92,7 +92,7 @@ namespace zmq
// Size in bytes of the largest message that is still copied around
// rather than being reference-counted.
enum { msg_t_size = 48 };
enum { msg_t_size = 64 };
enum { max_vsm_size = msg_t_size - (8 + sizeof (metadata_t *) + 3) };
// Shared message buffer. Message data are either allocated in one
......
......@@ -65,7 +65,6 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
int inhwm_, int outhwm_, bool conflate_) :
object_t (parent_),
assoc_fd (retired_fd),
inpipe (inpipe_),
outpipe (outpipe_),
in_active (true),
......
......@@ -27,7 +27,6 @@
#include "stdint.hpp"
#include "array.hpp"
#include "blob.hpp"
#include "fd.hpp"
namespace zmq
{
......@@ -120,8 +119,6 @@ namespace zmq
// check HWM
bool check_hwm () const;
// provide a way to link pipe to engine fd. Set on session initialization
fd_t assoc_fd; //=retired_fd
private:
// Type of the underlying lock-free pipe.
......
......@@ -133,33 +133,6 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
return -1;
}
int zmq::router_t::xgetsockopt (int option_, const void *optval_,
size_t *optvallen_)
{
switch (option_) {
case ZMQ_IDENTITY_FD:
if (optval_==NULL && optvallen_) {
*optvallen_=sizeof(fd_t);
return 0;
}
if (optval_ && optvallen_ && *optvallen_) {
blob_t identity= blob_t((unsigned char*)optval_,*optvallen_);
outpipes_t::iterator it = outpipes.find (identity);
if (it == outpipes.end() ){
return ENOTSOCK;
}
*((fd_t*)optval_)=it->second.pipe->assoc_fd;
*optvallen_=sizeof(fd_t);
return 0;
}
break;
default:
break;
}
errno = EINVAL;
return -1;
}
void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
{
......
......@@ -47,7 +47,6 @@ namespace zmq
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xgetsockopt (int option_, const void *optval_, size_t *optvallen_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
......
......@@ -367,9 +367,7 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
// Remember the local end of the pipe.
zmq_assert (!pipe);
pipe = pipes [0];
// Store engine assoc_fd for linking pipe to fd
pipe->assoc_fd = engine_->get_assoc_fd ();
pipes [1]->assoc_fd = pipe->assoc_fd;
// Ask socket to plug into the remote end of the pipe.
send_bind (socket, pipes [1]);
}
......
......@@ -289,11 +289,6 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
errno = ETERM;
return -1;
}
// First, check whether specific socket type overloads the option.
int rc = xgetsockopt (option_, optval_, optvallen_);
if (rc == 0 || errno != EINVAL)
return rc;
if (option_ == ZMQ_RCVMORE) {
if (*optvallen_ < sizeof (int)) {
......@@ -1067,11 +1062,6 @@ int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
errno = EINVAL;
return -1;
}
int zmq::socket_base_t::xgetsockopt (int, const void *, size_t*)
{
errno = EINVAL;
return -1;
}
bool zmq::socket_base_t::xhas_out ()
{
......
......@@ -133,13 +133,10 @@ namespace zmq
// The default implementation assumes there are no specific socket
// options for the particular socket type. If not so, override this
// methods.
// method.
virtual int xsetsockopt (int option_, const void *optval_,
size_t optvallen_);
virtual int xgetsockopt (int option_, const void *optval_,
size_t *optvallen_);
// The default implementation assumes that send is not supported.
virtual bool xhas_out ();
virtual int xsend (zmq::msg_t *msg_);
......
......@@ -671,7 +671,7 @@ bool zmq::stream_engine_t::handshake ()
options.mechanism == ZMQ_GSSAPI? "GSSAPI":
"OTHER",
mechanism);
error (protocol_error);
return false;
}
......@@ -806,21 +806,18 @@ void zmq::stream_engine_t::mechanism_ready ()
properties_t properties;
properties_t::const_iterator it;
// If we have a peer_address, add it to metadata
if (!peer_address.empty()) {
properties.insert(std::make_pair("Peer-Address", peer_address));
}
// Add ZAP properties.
const properties_t& zap_properties = mechanism->get_zap_properties ();
it = zap_properties.begin ();
while (it != zap_properties.end ()) {
properties.insert (properties_t::value_type (it->first, it->second));
++it;
}
properties.insert(zap_properties.begin (), zap_properties.end ());
// Add ZMTP properties.
const properties_t& zmtp_properties = mechanism->get_zmtp_properties ();
it = zmtp_properties.begin ();
while (it != zmtp_properties.end ()) {
properties.insert (properties_t::value_type (it->first, it->second));
++it;
}
properties.insert(zmtp_properties.begin (), zmtp_properties.end ());
zmq_assert (metadata == NULL);
if (!properties.empty ())
......
......@@ -76,8 +76,6 @@ namespace zmq
void out_event ();
void timer_event (int id_);
// export s via i_engine so it is possible to link a pipe to fd
fd_t get_assoc_fd (){ return s; };
private:
// Unplug the engine from the session.
......
......@@ -33,7 +33,7 @@ namespace zmq
// to minimise number of allocations/deallocations needed. Thus yqueue
// allocates/deallocates elements in batches of N.
//
// yqueue allows one thread to use push/back function and another one
// yqueue allows one thread to use push/back function and another one
// to use pop/front functions. However, user must ensure that there's no
// pop on the empty queue and that both threads don't access the same
// element in unsynchronised manner.
......@@ -41,8 +41,16 @@ namespace zmq
// T is the type of the object in the queue.
// N is granularity of the queue (how many pushes have to be done till
// actual memory allocation is required).
#ifdef HAVE_POSIX_MEMALIGN
// ALIGN is the memory alignment size to use in the case where we have
// posix_memalign available. Default value is 64, this alignment will
// prevent two queue chunks from occupying the same CPU cache line on
// architectures where cache lines are <= 64 bytes (e.g. most things
// except POWER).
template <typename T, int N, size_t ALIGN = 64> class yqueue_t
#else
template <typename T, int N> class yqueue_t
#endif
{
public:
......@@ -65,7 +73,7 @@ namespace zmq
if (begin_chunk == end_chunk) {
free (begin_chunk);
break;
}
}
chunk_t *o = begin_chunk;
begin_chunk = begin_chunk->next;
free (o);
......@@ -103,7 +111,13 @@ namespace zmq
end_chunk->next = sc;
sc->prev = end_chunk;
} else {
#ifdef HAVE_POSIX_MEMALIGN
void *pv;
if (posix_memalign(&pv, ALIGN, sizeof (chunk_t)) == 0)
end_chunk->next = (chunk_t*) pv;
#else
end_chunk->next = (chunk_t*) malloc (sizeof (chunk_t));
#endif
alloc_assert (end_chunk->next);
end_chunk->next->prev = end_chunk;
}
......
/*
Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
int main (void)
{
setup_test_environment();
void *ctx = zmq_ctx_new ();
assert (ctx);
void *client = zmq_socket (ctx, ZMQ_REQ);
assert (client);
void *server = zmq_socket (ctx, ZMQ_ROUTER);
assert (server);
// Now do a basic ping test
int rc = zmq_bind (server, "tcp://127.0.0.1:9998");
assert (rc == 0);
rc = zmq_connect (client, "tcp://127.0.0.1:9998");
assert (rc == 0);
rc=zmq_send(client,"1234567890",10,0);
assert (rc != -1);
int partnumber=1;
int recvfd=-1;
zmq_msg_t part;
do {
/* if not first free prev message part */
if (partnumber!=1) zmq_msg_close (&part);
/* Create an empty ØMQ message to hold the message part */
int rc = zmq_msg_init (&part);
assert (rc == 0);
/* Block until a message is available to be received from socket */
rc = zmq_msg_recv (&part,server, 0);
assert (rc != -1);
if (partnumber==1) {// this is the identity of the receiving pipe
//buffer for zmq_getsockopt / ZMQ_IDENTITY_FD
char idbuf[255];
size_t idbufsz=zmq_msg_size (&part);
assert (idbufsz<=255);
memcpy(idbuf,zmq_msg_data(&part),idbufsz);
rc = zmq_getsockopt (server, ZMQ_IDENTITY_FD, idbuf, &idbufsz);
assert (rc == 0);
memcpy(&recvfd,idbuf,sizeof(recvfd));
//depending on your system this should be around 14
assert (recvfd > 0);
}
partnumber++;
} while (zmq_msg_more(&part));
zmq_msg_close (&part);
close_zero_linger (client);
close_zero_linger (server);
zmq_ctx_term (ctx);
return 0 ;
}
......@@ -41,7 +41,7 @@ zap_handler (void *handler)
assert (streq (version, "1.0"));
assert (streq (mechanism, "NULL"));
s_sendmore (handler, version);
s_sendmore (handler, sequence);
if (streq (domain, "DOMAIN")) {
......@@ -100,6 +100,8 @@ int main (void)
assert (streq (zmq_msg_gets (&msg, "Hello"), "World"));
assert (streq (zmq_msg_gets (&msg, "Socket-Type"), "DEALER"));
assert (streq (zmq_msg_gets (&msg, "User-Id"), "anonymous"));
assert (streq (zmq_msg_gets (&msg, "Peer-Address"), "127.0.0.1"));
assert (zmq_msg_gets (&msg, "No Such") == NULL);
assert (zmq_errno () == EINVAL);
zmq_msg_close (&msg);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment