Commit dd5eec35 authored by Pontus Sköldström's avatar Pontus Sköldström

Support application metadata through ZMQ_METADATA

Lets the application set per-connection metadata.
Metadata is specified as "X-key:value" and set using zmq_setsockopt, eg:
zmq_setsockopt (s, ZMQ_METADATA, "X-key:value", 11);

The peer can then obtain the metadata from a received message:
char *data = zmq_msg_gets(msg, "X-key");
parent c9437ab7
......@@ -846,7 +846,8 @@ test_apps += tests/test_poller \
tests/test_radio_dish \
tests/test_udp \
tests/test_scatter_gather \
tests/test_dgram
tests/test_dgram \
tests/test_app_meta
tests_test_poller_SOURCES = tests/test_poller.cpp
tests_test_poller_LDADD = src/libzmq.la
......@@ -871,6 +872,10 @@ tests_test_scatter_gather_LDADD = src/libzmq.la
tests_test_dgram_SOURCES = tests/test_dgram.cpp
tests_test_dgram_LDADD = src/libzmq.la
tests_test_app_meta_SOURCES = tests/test_app_meta.cpp
tests_test_app_meta_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_app_meta_CPPFLAGS = ${UNITY_CPPFLAGS}
endif
if ENABLE_STATIC
......
......@@ -585,6 +585,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread);
#define ZMQ_BINDTODEVICE 92
#define ZMQ_ZAP_ENFORCE_DOMAIN 93
#define ZMQ_LOOPBACK_FASTPATH 94
#define ZMQ_METADATA 95
/* DRAFT 0MQ socket events and monitoring */
/* Unspecified system errors during handshake. Event value is an errno. */
......
......@@ -170,13 +170,30 @@ size_t zmq::mechanism_t::add_basic_properties (unsigned char *buf,
options.routing_id, options.routing_id_size);
}
for (std::map<std::string, std::string>::const_iterator it =
options.app_metadata.begin ();
it != options.app_metadata.end (); ++it)
ptr +=
add_property (ptr, buf_capacity - (ptr - buf), it->first.c_str (),
it->second.c_str (), strlen (it->second.c_str ()));
return ptr - buf;
}
size_t zmq::mechanism_t::basic_properties_len () const
{
const char *socket_type = socket_type_string (options.type);
int meta_len = 0;
for (std::map<std::string, std::string>::const_iterator it =
options.app_metadata.begin ();
it != options.app_metadata.end (); ++it)
meta_len +=
property_len (it->first.c_str (), strlen (it->second.c_str ()));
return property_len (ZMTP_PROPERTY_SOCKET_TYPE, strlen (socket_type))
+ meta_len
+ ((options.type == ZMQ_REQ || options.type == ZMQ_DEALER
|| options.type == ZMQ_ROUTER)
? property_len (ZMTP_PROPERTY_IDENTITY, options.routing_id_size)
......
......@@ -706,6 +706,26 @@ int zmq::options_t::setsockopt (int option_,
return do_setsockopt_int_as_bool_relaxed (optval_, optvallen_,
&loopback_fastpath);
case ZMQ_METADATA:
if (optvallen_ > 0 && !is_int) {
std::string s ((char *) optval_);
size_t pos = 0;
std::string key, val, delimiter = ":";
pos = s.find (delimiter);
if (pos != std::string::npos && pos != 0
&& pos != s.length () - 1) {
key = s.substr (0, pos);
if (key.compare (0, 2, "X-") == 0 && key.length () < 256) {
val = s.substr (pos + 1, s.length ());
app_metadata.insert (
std::pair<std::string, std::string> (key, val));
return 0;
}
}
}
errno = EINVAL;
return -1;
break;
default:
#if defined(ZMQ_ACT_MILITANT)
// There are valid scenarios for probing with unknown socket option
......
......@@ -33,6 +33,7 @@
#include <string>
#include <vector>
#include <set>
#include <map>
#include "atomic_ptr.hpp"
#include "stddef.h"
......@@ -258,6 +259,9 @@ struct options_t
// Use zero copy strategy for storing message content when decoding.
bool zero_copy;
// Application metadata
std::map<std::string, std::string> app_metadata;
};
int do_getsockopt (void *const optval_,
......
......@@ -56,6 +56,7 @@ unsigned long zmq_stopwatch_intermediate (void *watch_);
#define ZMQ_BINDTODEVICE 92
#define ZMQ_ZAP_ENFORCE_DOMAIN 93
#define ZMQ_LOOPBACK_FASTPATH 94
#define ZMQ_METADATA 95
/* DRAFT 0MQ socket events and monitoring */
/* Unspecified system errors during handshake. Event value is an errno. */
......
......@@ -135,6 +135,7 @@ IF (ENABLE_DRAFTS)
test_udp
test_scatter_gather
test_dgram
test_app_meta
)
ENDIF (ENABLE_DRAFTS)
......
......@@ -30,20 +30,17 @@
#include "testutil.hpp"
#include <unity.h>
void *ctx;
void setUp ()
{
ctx = zmq_ctx_new ();
}
void tearDown ()
{
zmq_ctx_term (ctx);
ctx = NULL;
}
void test_tipc_port_name_and_domain ()
{
void *ctx = zmq_ctx_new ();
TEST_ASSERT_NOT_NULL (ctx);
// test Port Name addressing
......@@ -64,6 +61,8 @@ void test_tipc_port_name_and_domain ()
rc = zmq_close (sb);
TEST_ASSERT_EQUAL_INT (0, rc);
zmq_ctx_term (ctx);
}
void test_tipc_port_identity ()
......@@ -72,6 +71,7 @@ void test_tipc_port_identity ()
size_t size = 256;
unsigned int z, c, n, ref;
void *ctx = zmq_ctx_new ();
TEST_ASSERT_NOT_NULL (ctx);
void *sb = zmq_socket (ctx, ZMQ_REP);
......@@ -101,10 +101,13 @@ void test_tipc_port_identity ()
rc = zmq_close (sb);
TEST_ASSERT_EQUAL_INT (0, rc);
zmq_ctx_term (ctx);
}
void test_tipc_bad_addresses ()
{
void *ctx = zmq_ctx_new ();
TEST_ASSERT_NOT_NULL (ctx);
// Test Port Name addressing
......@@ -124,6 +127,8 @@ void test_tipc_bad_addresses ()
// Clean up
rc = zmq_close (sb);
TEST_ASSERT_EQUAL_INT (0, rc);
zmq_ctx_term (ctx);
}
......
/*
Copyright (c) 2007-2018 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#include <unity.h>
void setUp ()
{
}
void tearDown ()
{
}
void test_app_meta_reqrep ()
{
void *ctx;
zmq_msg_t msg;
void *rep_sock, *req_sock;
const char *req_hello = "X-hello:hello";
const char *req_connection = "X-connection:primary";
const char *req_z85 = "X-bin:009c6";
const char *rep_hello = "X-hello:world";
const char *rep_connection = "X-connection:backup";
const char *bad_strings[] = {
":",
"key:",
":value",
"keyvalue",
"",
"X-"
"KeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKe"
"yTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyT"
"ooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyToo"
"LongKeyTooLongKeyTooLongKeyTooLongKeyTooLong:value"};
ctx = zmq_ctx_new ();
rep_sock = zmq_socket (ctx, ZMQ_REP);
TEST_ASSERT_NOT_NULL (rep_sock);
req_sock = zmq_socket (ctx, ZMQ_REQ);
TEST_ASSERT_NOT_NULL (req_sock);
int rc =
zmq_setsockopt (rep_sock, ZMQ_METADATA, rep_hello, strlen (rep_hello));
TEST_ASSERT_EQUAL_INT (0, rc);
int l = 0;
rc = zmq_setsockopt (rep_sock, ZMQ_LINGER, &l, sizeof (l));
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_setsockopt (rep_sock, ZMQ_METADATA, rep_connection,
strlen (rep_connection));
TEST_ASSERT_EQUAL_INT (0, rc);
for (int i = 0; i < 6; i++) {
rc = zmq_setsockopt (rep_sock, ZMQ_METADATA, bad_strings[i],
strlen (bad_strings[i]));
TEST_ASSERT_EQUAL_INT (-1, rc);
}
rc = zmq_bind (rep_sock, "tcp://127.0.0.1:5555");
TEST_ASSERT_EQUAL_INT (0, rc);
l = 0;
rc = zmq_setsockopt (req_sock, ZMQ_LINGER, &l, sizeof (l));
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_setsockopt (req_sock, ZMQ_METADATA, req_hello, strlen (req_hello));
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_setsockopt (req_sock, ZMQ_METADATA, req_connection,
strlen (req_connection));
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_setsockopt (req_sock, ZMQ_METADATA, req_z85, strlen (req_z85));
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_connect (req_sock, "tcp://127.0.0.1:5555");
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_msg_init_size (&msg, 1);
TEST_ASSERT_EQUAL_INT (0, rc);
char *data = (char *) zmq_msg_data (&msg);
data[0] = 1;
rc = zmq_msg_send (&msg, req_sock, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
rc = zmq_msg_init (&msg);
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_msg_recv (&msg, rep_sock, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
TEST_ASSERT_EQUAL_STRING ("hello", zmq_msg_gets (&msg, "X-hello"));
TEST_ASSERT_EQUAL_STRING ("primary", zmq_msg_gets (&msg, "X-connection"));
char *bindata = (char *) zmq_msg_gets (&msg, "X-bin");
TEST_ASSERT_NOT_NULL (bindata);
uint8_t rawdata[4];
void *ret = zmq_z85_decode (rawdata, bindata);
TEST_ASSERT_NOT_NULL (ret);
TEST_ASSERT_EQUAL_UINT8 (0, rawdata[0]);
TEST_ASSERT_EQUAL_UINT8 (1, rawdata[1]);
TEST_ASSERT_EQUAL_UINT8 (2, rawdata[2]);
TEST_ASSERT_EQUAL_UINT8 (3, rawdata[3]);
TEST_ASSERT_NULL (zmq_msg_gets (&msg, "X-foobar"));
TEST_ASSERT_NULL (zmq_msg_gets (&msg, "foobar"));
rc = zmq_msg_send (&msg, rep_sock, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
rc = zmq_msg_recv (&msg, req_sock, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
TEST_ASSERT_EQUAL_STRING ("world", zmq_msg_gets (&msg, "X-hello"));
TEST_ASSERT_EQUAL_STRING ("backup", zmq_msg_gets (&msg, "X-connection"));
rc = zmq_msg_close (&msg);
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_close (req_sock);
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_close (rep_sock);
TEST_ASSERT_EQUAL_INT (0, rc);
zmq_ctx_term (ctx);
}
int main ()
{
setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test_app_meta_reqrep);
return UNITY_END ();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment