Commit 2db7cdc6 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #999 from hurtonm/master

Include ZMTP properties in message metadata
parents 55c06924 8d82cc2a
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
*/ */
#include <string.h> #include <string.h>
#include <map>
#include "mechanism.hpp" #include "mechanism.hpp"
#include "options.hpp" #include "options.hpp"
...@@ -27,16 +26,12 @@ ...@@ -27,16 +26,12 @@
#include "wire.hpp" #include "wire.hpp"
zmq::mechanism_t::mechanism_t (const options_t &options_) : zmq::mechanism_t::mechanism_t (const options_t &options_) :
metadata (NULL),
options (options_) options (options_)
{ {
} }
zmq::mechanism_t::~mechanism_t () zmq::mechanism_t::~mechanism_t ()
{ {
if (metadata != NULL)
if (metadata->drop_ref ())
delete metadata;
} }
void zmq::mechanism_t::set_peer_identity (const void *id_ptr, size_t id_size) void zmq::mechanism_t::set_peer_identity (const void *id_ptr, size_t id_size)
...@@ -90,7 +85,6 @@ size_t zmq::mechanism_t::add_property (unsigned char *ptr, const char *name, ...@@ -90,7 +85,6 @@ size_t zmq::mechanism_t::add_property (unsigned char *ptr, const char *name,
int zmq::mechanism_t::parse_metadata (const unsigned char *ptr_, int zmq::mechanism_t::parse_metadata (const unsigned char *ptr_,
size_t length_, bool zap_flag) size_t length_, bool zap_flag)
{ {
std::map <const std::string, const std::string> dict;
size_t bytes_left = length_; size_t bytes_left = length_;
while (bytes_left > 1) { while (bytes_left > 1) {
...@@ -132,19 +126,19 @@ int zmq::mechanism_t::parse_metadata (const unsigned char *ptr_, ...@@ -132,19 +126,19 @@ int zmq::mechanism_t::parse_metadata (const unsigned char *ptr_,
return -1; return -1;
} }
dict.insert ( if (zap_flag)
std::map <const std::string, const std::string>::value_type ( zap_properties.insert (
name, std::string ((char *) value, value_length))); metadata_t::dict_t::value_type (
name, std::string ((char *) value, value_length)));
else
zmtp_properties.insert (
metadata_t::dict_t::value_type (
name, std::string ((char *) value, value_length)));
} }
if (bytes_left > 0) { if (bytes_left > 0) {
errno = EPROTO; errno = EPROTO;
return -1; return -1;
} }
if (zap_flag) {
assert (metadata == NULL);
if (!dict.empty ())
metadata = new (std::nothrow) metadata_t (dict);
}
return 0; return 0;
} }
......
...@@ -65,7 +65,13 @@ namespace zmq ...@@ -65,7 +65,13 @@ namespace zmq
blob_t get_user_id () const; blob_t get_user_id () const;
metadata_t *get_metadata () { return metadata; } const metadata_t::dict_t& get_zmtp_properties () {
return zmtp_properties;
}
const metadata_t::dict_t& get_zap_properties () {
return zap_properties;
}
protected: protected:
...@@ -93,9 +99,11 @@ namespace zmq ...@@ -93,9 +99,11 @@ namespace zmq
virtual int property (const std::string& name_, virtual int property (const std::string& name_,
const void *value_, size_t length_); const void *value_, size_t length_);
// Metadada as returned by ZAP protocol. // Properties received from ZMTP peer.
// NULL if no metadata received. metadata_t::dict_t zmtp_properties;
metadata_t *metadata;
// Properties received from ZAP server.
metadata_t::dict_t zap_properties;
options_t options; options_t options;
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include "metadata.hpp" #include "metadata.hpp"
zmq::metadata_t::metadata_t (dict_t &dict) : zmq::metadata_t::metadata_t (const dict_t &dict) :
ref_cnt (1), ref_cnt (1),
dict (dict) dict (dict)
{ {
......
...@@ -31,7 +31,9 @@ namespace zmq ...@@ -31,7 +31,9 @@ namespace zmq
{ {
public: public:
metadata_t (std::map <const std::string, const std::string> &dict); typedef std::map <const std::string, const std::string> dict_t;
metadata_t (const dict_t &dict);
virtual ~metadata_t (); virtual ~metadata_t ();
// Returns pointer to property value or NULL if // Returns pointer to property value or NULL if
...@@ -50,7 +52,6 @@ namespace zmq ...@@ -50,7 +52,6 @@ namespace zmq
atomic_counter_t ref_cnt; atomic_counter_t ref_cnt;
// Dictionary holding metadata. // Dictionary holding metadata.
typedef std::map <const std::string, const std::string> dict_t;
const dict_t dict; const dict_t dict;
}; };
......
...@@ -64,6 +64,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, ...@@ -64,6 +64,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
outpos (NULL), outpos (NULL),
outsize (0), outsize (0),
encoder (NULL), encoder (NULL),
metadata (NULL),
handshaking (true), handshaking (true),
greeting_size (v2_greeting_size), greeting_size (v2_greeting_size),
greeting_bytes_read (0), greeting_bytes_read (0),
...@@ -144,6 +145,12 @@ zmq::stream_engine_t::~stream_engine_t () ...@@ -144,6 +145,12 @@ zmq::stream_engine_t::~stream_engine_t ()
int rc = tx_msg.close (); int rc = tx_msg.close ();
errno_assert (rc == 0); errno_assert (rc == 0);
// Drop reference to metadata and destroy it if we are
// the only user.
if (metadata != NULL)
if (metadata->drop_ref ())
delete metadata;
delete encoder; delete encoder;
delete decoder; delete decoder;
delete mechanism; delete mechanism;
...@@ -728,6 +735,31 @@ void zmq::stream_engine_t::mechanism_ready () ...@@ -728,6 +735,31 @@ void zmq::stream_engine_t::mechanism_ready ()
read_msg = &stream_engine_t::pull_and_encode; read_msg = &stream_engine_t::pull_and_encode;
write_msg = &stream_engine_t::write_credential; write_msg = &stream_engine_t::write_credential;
// Compile metadata.
typedef metadata_t::dict_t properties_t;
properties_t properties;
properties_t::const_iterator it;
// Add ZAP properties.
const properties_t& zap_properties = mechanism->get_zap_properties ();
it = zap_properties.begin ();
while (it != zap_properties.end ()) {
properties.insert (properties_t::value_type (it->first, it->second));
it++;
}
// Add ZMTP properties.
const properties_t& zmtp_properties = mechanism->get_zmtp_properties ();
it = zmtp_properties.begin ();
while (it != zmtp_properties.end ()) {
properties.insert (properties_t::value_type (it->first, it->second));
it++;
}
zmq_assert (metadata == NULL);
if (!properties.empty ())
metadata = new (std::nothrow) metadata_t (properties);
} }
int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_) int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_)
...@@ -780,7 +812,6 @@ int zmq::stream_engine_t::decode_and_push (msg_t *msg_) ...@@ -780,7 +812,6 @@ int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
if (mechanism->decode (msg_) == -1) if (mechanism->decode (msg_) == -1)
return -1; return -1;
metadata_t *metadata = mechanism->get_metadata ();
if (metadata) if (metadata)
msg_->set_properties (metadata); msg_->set_properties (metadata);
if (session->push_msg (msg_) == -1) { if (session->push_msg (msg_) == -1) {
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include "options.hpp" #include "options.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "../include/zmq.h" #include "../include/zmq.h"
#include "metadata.hpp"
namespace zmq namespace zmq
{ {
...@@ -131,6 +132,9 @@ namespace zmq ...@@ -131,6 +132,9 @@ namespace zmq
size_t outsize; size_t outsize;
i_encoder *encoder; i_encoder *encoder;
// Metadata to be attached to received messages. May be NULL.
metadata_t *metadata;
// When true, we are still trying to determine whether // When true, we are still trying to determine whether
// the peer is using versioned protocol, and if so, which // the peer is using versioned protocol, and if so, which
// version. When false, normal message flow has started. // version. When false, normal message flow has started.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment