Commit 1e6e5b1c authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #1357 from rodgert/master

Support limited metadata for STREAM sockets
parents 8a526874 4b948b1f
...@@ -101,6 +101,10 @@ test_xpub_nodrop ...@@ -101,6 +101,10 @@ test_xpub_nodrop
test_xpub_manual test_xpub_manual
test_xpub_welcome_msg test_xpub_welcome_msg
test_atomics test_atomics
test_client_drop_more
test_client_server
test_server_drop_more
test_thread_safe
tests/test*.log tests/test*.log
tests/test*.trs tests/test*.trs
src/platform.hpp* src/platform.hpp*
......
...@@ -176,7 +176,7 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_, ...@@ -176,7 +176,7 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
return 0; return 0;
} }
break; break;
case ZMQ_STREAM_NOTIFY: case ZMQ_STREAM_NOTIFY:
if (is_int && (value == 0 || value == 1)) { if (is_int && (value == 0 || value == 1)) {
options.raw_notify = (value != 0); options.raw_notify = (value != 0);
...@@ -221,6 +221,12 @@ int zmq::stream_t::xrecv (msg_t *msg_) ...@@ -221,6 +221,12 @@ int zmq::stream_t::xrecv (msg_t *msg_)
blob_t identity = pipe->get_identity (); blob_t identity = pipe->get_identity ();
rc = msg_->init_size (identity.size ()); rc = msg_->init_size (identity.size ());
errno_assert (rc == 0); errno_assert (rc == 0);
// forward metadata (if any)
metadata_t *metadata = prefetched_msg.metadata();
if (metadata)
msg_->set_metadata(metadata);
memcpy (msg_->data (), identity.data (), identity.size ()); memcpy (msg_->data (), identity.data (), identity.size ());
msg_->set_flags (msg_t::more); msg_->set_flags (msg_t::more);
...@@ -277,7 +283,7 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) ...@@ -277,7 +283,7 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
connect_rid.length ()); connect_rid.length ());
connect_rid.clear (); connect_rid.clear ();
outpipes_t::iterator it = outpipes.find (identity); outpipes_t::iterator it = outpipes.find (identity);
if (it != outpipes.end ()) if (it != outpipes.end ())
zmq_assert(false); zmq_assert(false);
} }
else { else {
......
...@@ -36,7 +36,6 @@ ...@@ -36,7 +36,6 @@
#include <string.h> #include <string.h>
#include <new> #include <new>
#include <sstream> #include <sstream>
#include <iostream>
#include "stream_engine.hpp" #include "stream_engine.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
...@@ -192,14 +191,21 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, ...@@ -192,14 +191,21 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
handshaking = false; handshaking = false;
next_msg = &stream_engine_t::pull_msg_from_session; next_msg = &stream_engine_t::pull_msg_from_session;
process_msg = &stream_engine_t::push_msg_to_session; process_msg = &stream_engine_t::push_raw_msg_to_session;
properties_t properties;
if (init_properties(properties)) {
// Compile metadata.
zmq_assert (metadata == NULL);
metadata = new (std::nothrow) metadata_t (properties);
}
if (options.raw_notify) { if (options.raw_notify) {
// For raw sockets, send an initial 0-length message to the // For raw sockets, send an initial 0-length message to the
// application so that it knows a peer has connected. // application so that it knows a peer has connected.
msg_t connector; msg_t connector;
connector.init(); connector.init();
push_msg_to_session (&connector); push_raw_msg_to_session (&connector);
connector.close(); connector.close();
session->flush (); session->flush ();
} }
...@@ -804,13 +810,8 @@ void zmq::stream_engine_t::mechanism_ready () ...@@ -804,13 +810,8 @@ void zmq::stream_engine_t::mechanism_ready ()
process_msg = &stream_engine_t::write_credential; process_msg = &stream_engine_t::write_credential;
// Compile metadata. // Compile metadata.
typedef metadata_t::dict_t properties_t;
properties_t properties; properties_t properties;
init_properties(properties);
// If we have a peer_address, add it to metadata
if (!peer_address.empty()) {
properties.insert(std::make_pair("Peer-Address", peer_address));
}
// Add ZAP properties. // Add ZAP properties.
const properties_t& zap_properties = mechanism->get_zap_properties (); const properties_t& zap_properties = mechanism->get_zap_properties ();
...@@ -835,6 +836,12 @@ int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_) ...@@ -835,6 +836,12 @@ int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
return session->push_msg (msg_); return session->push_msg (msg_);
} }
int zmq::stream_engine_t::push_raw_msg_to_session (msg_t *msg_) {
if (metadata)
msg_->set_metadata(metadata);
return push_msg_to_session(msg_);
}
int zmq::stream_engine_t::write_credential (msg_t *msg_) int zmq::stream_engine_t::write_credential (msg_t *msg_)
{ {
zmq_assert (mechanism != NULL); zmq_assert (mechanism != NULL);
...@@ -938,6 +945,12 @@ void zmq::stream_engine_t::set_handshake_timer () ...@@ -938,6 +945,12 @@ void zmq::stream_engine_t::set_handshake_timer ()
} }
} }
bool zmq::stream_engine_t::init_properties (properties_t & properties) {
if (peer_address.empty()) return false;
properties.insert (std::make_pair("Peer-Address", peer_address));
return true;
}
void zmq::stream_engine_t::timer_event (int id_) void zmq::stream_engine_t::timer_event (int id_)
{ {
zmq_assert (id_ == handshake_timer_id); zmq_assert (id_ == handshake_timer_id);
......
...@@ -59,7 +59,7 @@ namespace zmq ...@@ -59,7 +59,7 @@ namespace zmq
timeout_error timeout_error
}; };
stream_engine_t (fd_t fd_, const options_t &options_, stream_engine_t (fd_t fd_, const options_t &options_,
const std::string &endpoint); const std::string &endpoint);
~stream_engine_t (); ~stream_engine_t ();
...@@ -77,7 +77,6 @@ namespace zmq ...@@ -77,7 +77,6 @@ namespace zmq
void timer_event (int id_); void timer_event (int id_);
private: private:
// Unplug the engine from the session. // Unplug the engine from the session.
void unplug (); void unplug ();
...@@ -99,6 +98,8 @@ namespace zmq ...@@ -99,6 +98,8 @@ namespace zmq
int pull_msg_from_session (msg_t *msg_); int pull_msg_from_session (msg_t *msg_);
int push_msg_to_session (msg_t *msg); int push_msg_to_session (msg_t *msg);
int push_raw_msg_to_session (msg_t *msg);
int write_credential (msg_t *msg_); int write_credential (msg_t *msg_);
int pull_and_encode (msg_t *msg_); int pull_and_encode (msg_t *msg_);
int decode_and_push (msg_t *msg_); int decode_and_push (msg_t *msg_);
...@@ -113,6 +114,9 @@ namespace zmq ...@@ -113,6 +114,9 @@ namespace zmq
void set_handshake_timer(); void set_handshake_timer();
typedef metadata_t::dict_t properties_t;
bool init_properties (properties_t & properties);
// Underlying socket. // Underlying socket.
fd_t s; fd_t s;
......
...@@ -80,17 +80,23 @@ test_stream_to_dealer (void) ...@@ -80,17 +80,23 @@ test_stream_to_dealer (void)
assert (rc > 0); assert (rc > 0);
assert (zmq_msg_more (&identity)); assert (zmq_msg_more (&identity));
// Verify the existence of Peer-Address metadata
assert (streq (zmq_msg_gets (&identity, "Peer-Address"), "127.0.0.1"));
// Second frame is zero // Second frame is zero
byte buffer [255]; byte buffer [255];
rc = zmq_recv (stream, buffer, 255, 0); rc = zmq_recv (stream, buffer, 255, 0);
assert (rc == 0); assert (rc == 0);
// Real data follows // Real data follows
// First frame is identity // First frame is identity
rc = zmq_msg_recv (&identity, stream, 0); rc = zmq_msg_recv (&identity, stream, 0);
assert (rc > 0); assert (rc > 0);
assert (zmq_msg_more (&identity)); assert (zmq_msg_more (&identity));
// Verify the existence of Peer-Address metadata
assert (streq (zmq_msg_gets (&identity, "Peer-Address"), "127.0.0.1"));
// Second frame is greeting signature // Second frame is greeting signature
rc = zmq_recv (stream, buffer, 255, 0); rc = zmq_recv (stream, buffer, 255, 0);
assert (rc == 10); assert (rc == 10);
...@@ -182,7 +188,7 @@ test_stream_to_stream (void) ...@@ -182,7 +188,7 @@ test_stream_to_stream (void)
// Set-up our context and sockets // Set-up our context and sockets
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
void *server = zmq_socket (ctx, ZMQ_STREAM); void *server = zmq_socket (ctx, ZMQ_STREAM);
assert (server); assert (server);
int enabled = 1; int enabled = 1;
...@@ -200,7 +206,7 @@ test_stream_to_stream (void) ...@@ -200,7 +206,7 @@ test_stream_to_stream (void)
uint8_t id [256]; uint8_t id [256];
size_t id_size = 256; size_t id_size = 256;
uint8_t buffer [256]; uint8_t buffer [256];
// Connecting sends a zero message // Connecting sends a zero message
// Server: First frame is identity, second frame is zero // Server: First frame is identity, second frame is zero
id_size = zmq_recv (server, id, 256, 0); id_size = zmq_recv (server, id, 256, 0);
...@@ -223,19 +229,19 @@ test_stream_to_stream (void) ...@@ -223,19 +229,19 @@ test_stream_to_stream (void)
// Second frame is HTTP GET request // Second frame is HTTP GET request
rc = zmq_send (client, "GET /\n\n", 7, 0); rc = zmq_send (client, "GET /\n\n", 7, 0);
assert (rc == 7); assert (rc == 7);
// Get HTTP request; ID frame and then request // Get HTTP request; ID frame and then request
id_size = zmq_recv (server, id, 256, 0); id_size = zmq_recv (server, id, 256, 0);
assert (id_size > 0); assert (id_size > 0);
rc = zmq_recv (server, buffer, 256, 0); rc = zmq_recv (server, buffer, 256, 0);
assert (rc != -1); assert (rc != -1);
assert (memcmp (buffer, "GET /\n\n", 7) == 0); assert (memcmp (buffer, "GET /\n\n", 7) == 0);
// Send reply back to client // Send reply back to client
char http_response [] = char http_response [] =
"HTTP/1.0 200 OK\r\n" "HTTP/1.0 200 OK\r\n"
"Content-Type: text/plain\r\n" "Content-Type: text/plain\r\n"
"\r\n" "\r\n"
"Hello, World!"; "Hello, World!";
rc = zmq_send (server, id, id_size, ZMQ_SNDMORE); rc = zmq_send (server, id, id_size, ZMQ_SNDMORE);
assert (rc != -1); assert (rc != -1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment