Commit 0897b3e0 authored by Simon Giesecke's avatar Simon Giesecke Committed by Luca Boccassi

Problem: excessive memory allocations around blob_t (#2796)

* Problem: excessive memory allocations around blob_t

Solution: redefine blob_t as a custom type, and use reference/move
semantics where possible
parent cfef0403
...@@ -30,108 +30,155 @@ ...@@ -30,108 +30,155 @@
#ifndef __ZMQ_BLOB_HPP_INCLUDED__ #ifndef __ZMQ_BLOB_HPP_INCLUDED__
#define __ZMQ_BLOB_HPP_INCLUDED__ #define __ZMQ_BLOB_HPP_INCLUDED__
#include <string> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <algorithm>
#if __cplusplus >= 201103L || defined(_MSC_VER) && _MSC_VER >= 1700
#define ZMQ_HAS_MOVE_SEMANTICS
#define ZMQ_MAP_INSERT_OR_EMPLACE(k, v) emplace (k,v)
#define ZMQ_PUSH_OR_EMPLACE_BACK emplace_back
#define ZMQ_MOVE(x) std::move (x)
#else
#define ZMQ_MAP_INSERT_OR_EMPLACE(k, v) insert (std::make_pair (k, v))
#define ZMQ_PUSH_OR_EMPLACE_BACK push_back
#define ZMQ_MOVE(x) (x)
#endif
// Borrowed from id3lib_strings.h: namespace zmq
// They seem to be doing something for MSC, but since I only have gcc, I'll just do that
// Assuming this is uneccessary on GCC 4
// #if (defined(__GNUC__) && (__GNUC__ >= 3) || (defined(_MSC_VER) && _MSC_VER > 1000))
#if (defined(__GNUC__) && (__GNUC__ >= 3) && (__GNUC__ <= 4))
namespace std
{ {
template<> struct reference_tag_t {};
struct char_traits<unsigned char>
// Object to hold dynamically allocated opaque binary data.
// On modern compilers, it will be movable but not copyable. Copies
// must be explicitly created by set_deep_copy.
// On older compilers, it is copyable for syntactical reasons.
struct blob_t
{ {
typedef unsigned char char_type; // Creates an empty blob_t.
// Unsigned as wint_t in unsigned. blob_t () : data_ (0), size_ (0), owned_ (true) {}
typedef unsigned long int_type;
typedef streampos pos_type; // Creates a blob_t of a given size, with uninitialized content.
typedef streamoff off_type; blob_t (const size_t size)
typedef mbstate_t state_type; : data_ ((unsigned char*)malloc (size))
, size_ (size)
static void , owned_ (true)
assign(char_type& __c1, const char_type& __c2)
{ __c1 = __c2; }
static bool
eq(const char_type& __c1, const char_type& __c2)
{ return __c1 == __c2; }
static bool
lt(const char_type& __c1, const char_type& __c2)
{ return __c1 < __c2; }
static int
compare(const char_type* __s1, const char_type* __s2, size_t __n)
{ {
for (size_t __i = 0; __i < __n; ++__i)
if (!eq(__s1[__i], __s2[__i]))
return lt(__s1[__i], __s2[__i]) ? -1 : 1;
return 0;
} }
static size_t // Creates a blob_t of a given size, an initializes content by copying
length(const char_type* __s) // from another buffer.
blob_t(const unsigned char * const data, const size_t size)
: data_ ((unsigned char*)malloc (size))
, size_ (size)
, owned_ (true)
{ {
const char_type* __p = __s; memcpy(data_, data, size_);
while (__p)
++__p;
return (__p - __s);
} }
static const char_type* // Creates a blob_t for temporary use that only references a
find(const char_type* __s, size_t __n, const char_type& __a) // pre-allocated block of data.
// Use with caution and ensure that the blob_t will not outlive
// the referenced data.
blob_t (unsigned char * const data, const size_t size, reference_tag_t)
: data_ (data)
, size_ (size)
, owned_ (false)
{ {
for (const char_type* __p = __s; size_t(__p - __s) < __n; ++__p)
if (*__p == __a) return __p;
return 0;
} }
static char_type* // Returns the size of the blob_t.
move(char_type* __s1, const char_type* __s2, size_t __n) size_t size () const { return size_; }
{ return (char_type*) memmove(__s1, __s2, __n * sizeof(char_type)); }
static char_type* // Returns a pointer to the data of the blob_t.
copy(char_type* __s1, const char_type* __s2, size_t __n) const unsigned char *data() const {
{ return (char_type*) memcpy(__s1, __s2, __n * sizeof(char_type)); } return data_;
static char_type*
assign(char_type* __s, size_t __n, char_type __a)
{
for (char_type* __p = __s; __p < __s + __n; ++__p)
assign(*__p, __a);
return __s;
} }
static char_type // Returns a pointer to the data of the blob_t.
to_char_type(const int_type& __c) unsigned char *data() {
{ return char_type(__c); } return data_;
}
static int_type // Defines an order relationship on blob_t.
to_int_type(const char_type& __c) { return int_type(__c); } bool operator< (blob_t const &other) const {
int cmpres = memcmp (data_, other.data_, std::min (size_, other.size_));
return cmpres < 0 || (cmpres == 0 && size_ < other.size_);
}
static bool // Sets a blob_t to a deep copy of another blob_t.
eq_int_type(const int_type& __c1, const int_type& __c2) void set_deep_copy (blob_t const &other)
{ return __c1 == __c2; } {
clear ();
data_ = (unsigned char*)malloc (other.size_);
size_ = other.size_;
owned_ = true;
memcpy (data_, other.data_, size_);
}
static int_type // Sets a blob_t to a copy of a given buffer.
eof() { return static_cast<int_type>(-1); } void set (const unsigned char * const data, const size_t size)
{
clear ();
data_ = (unsigned char*)malloc (size);
size_ = size;
owned_ = true;
memcpy (data_, data, size_);
}
static int_type // Empties a blob_t.
not_eof(const int_type& __c) void clear () {
{ return eq_int_type(__c, eof()) ? int_type(0) : __c; } if (owned_) { free (data_); }
}; data_ = 0; size_ = 0;
}
} // namespace std ~blob_t () {
#endif // GCC version 3 if (owned_) { free (data_); }
}
#ifdef ZMQ_HAS_MOVE_SEMANTICS
blob_t (const blob_t &) = delete;
blob_t &operator= (const blob_t &) = delete;
namespace zmq blob_t (blob_t&& other)
{ : data_ (other.data_)
, size_ (other.size_)
, owned_ (other.owned_)
{
other.owned_ = false;
}
blob_t &operator= (blob_t&& other) {
if (this != &other)
{
clear ();
data_ = other.data_;
size_ = other.size_;
owned_ = other.owned_;
other.owned_ = false;
}
return *this;
}
#else
blob_t (const blob_t &other)
: owned_(false)
{
set_deep_copy (other);
}
blob_t &operator= (const blob_t &other) {
if (this != &other)
{
clear ();
set_deep_copy (other);
}
return *this;
}
#endif
// Object to hold dynamically allocated opaque binary data. private:
typedef std::basic_string <unsigned char> blob_t; unsigned char *data_;
size_t size_;
bool owned_;
};
} }
......
...@@ -94,7 +94,7 @@ bool zmq::client_t::xhas_out () ...@@ -94,7 +94,7 @@ bool zmq::client_t::xhas_out ()
return lb.has_out (); return lb.has_out ();
} }
zmq::blob_t zmq::client_t::get_credential () const const zmq::blob_t &zmq::client_t::get_credential () const
{ {
return fq.get_credential (); return fq.get_credential ();
} }
......
...@@ -60,7 +60,7 @@ namespace zmq ...@@ -60,7 +60,7 @@ namespace zmq
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
blob_t get_credential () const; const blob_t &get_credential () const;
void xread_activated (zmq::pipe_t *pipe_); void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_);
......
...@@ -110,7 +110,7 @@ bool zmq::dealer_t::xhas_out () ...@@ -110,7 +110,7 @@ bool zmq::dealer_t::xhas_out ()
return lb.has_out (); return lb.has_out ();
} }
zmq::blob_t zmq::dealer_t::get_credential () const const zmq::blob_t &zmq::dealer_t::get_credential () const
{ {
return fq.get_credential (); return fq.get_credential ();
} }
......
...@@ -61,7 +61,7 @@ namespace zmq ...@@ -61,7 +61,7 @@ namespace zmq
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
blob_t get_credential () const; const blob_t &get_credential () const;
void xread_activated (zmq::pipe_t *pipe_); void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_);
......
...@@ -69,7 +69,7 @@ void zmq::dgram_t::xpipe_terminated (pipe_t *pipe_) ...@@ -69,7 +69,7 @@ void zmq::dgram_t::xpipe_terminated (pipe_t *pipe_)
{ {
if (pipe_ == pipe) { if (pipe_ == pipe) {
if (last_in == pipe) { if (last_in == pipe) {
saved_credential = last_in->get_credential (); saved_credential.set_deep_copy (last_in->get_credential ());
last_in = NULL; last_in = NULL;
} }
pipe = NULL; pipe = NULL;
...@@ -171,7 +171,7 @@ bool zmq::dgram_t::xhas_out () ...@@ -171,7 +171,7 @@ bool zmq::dgram_t::xhas_out ()
return pipe->check_write (); return pipe->check_write ();
} }
zmq::blob_t zmq::dgram_t::get_credential () const const zmq::blob_t &zmq::dgram_t::get_credential () const
{ {
return last_in? last_in->get_credential (): saved_credential; return last_in? last_in->get_credential (): saved_credential;
} }
...@@ -56,7 +56,7 @@ namespace zmq ...@@ -56,7 +56,7 @@ namespace zmq
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
blob_t get_credential () const; const blob_t &get_credential () const;
void xread_activated (zmq::pipe_t *pipe_); void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_);
......
...@@ -229,7 +229,7 @@ bool zmq::dish_t::xhas_in () ...@@ -229,7 +229,7 @@ bool zmq::dish_t::xhas_in ()
} }
} }
zmq::blob_t zmq::dish_t::get_credential () const const zmq::blob_t &zmq::dish_t::get_credential () const
{ {
return fq.get_credential (); return fq.get_credential ();
} }
......
...@@ -62,7 +62,7 @@ namespace zmq ...@@ -62,7 +62,7 @@ namespace zmq
bool xhas_out (); bool xhas_out ();
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
blob_t get_credential () const; const blob_t &get_credential () const;
void xread_activated (zmq::pipe_t *pipe_); void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_);
void xhiccuped (pipe_t *pipe_); void xhiccuped (pipe_t *pipe_);
......
...@@ -68,7 +68,7 @@ void zmq::fq_t::pipe_terminated (pipe_t *pipe_) ...@@ -68,7 +68,7 @@ void zmq::fq_t::pipe_terminated (pipe_t *pipe_)
pipes.erase (pipe_); pipes.erase (pipe_);
if (last_in == pipe_) { if (last_in == pipe_) {
saved_credential = last_in->get_credential (); saved_credential.set_deep_copy (last_in->get_credential ());
last_in = NULL; last_in = NULL;
} }
} }
...@@ -155,7 +155,7 @@ bool zmq::fq_t::has_in () ...@@ -155,7 +155,7 @@ bool zmq::fq_t::has_in ()
return false; return false;
} }
zmq::blob_t zmq::fq_t::get_credential () const const zmq::blob_t &zmq::fq_t::get_credential () const
{ {
return last_in? return last_in?
last_in->get_credential (): saved_credential; last_in->get_credential (): saved_credential;
......
...@@ -56,7 +56,7 @@ namespace zmq ...@@ -56,7 +56,7 @@ namespace zmq
int recv (msg_t *msg_); int recv (msg_t *msg_);
int recvpipe (msg_t *msg_, pipe_t **pipe_); int recvpipe (msg_t *msg_, pipe_t **pipe_);
bool has_in (); bool has_in ();
blob_t get_credential () const; const blob_t &get_credential () const;
private: private:
......
...@@ -88,7 +88,7 @@ bool zmq::gather_t::xhas_in () ...@@ -88,7 +88,7 @@ bool zmq::gather_t::xhas_in ()
return fq.has_in (); return fq.has_in ();
} }
zmq::blob_t zmq::gather_t::get_credential () const const zmq::blob_t &zmq::gather_t::get_credential () const
{ {
return fq.get_credential (); return fq.get_credential ();
} }
...@@ -56,7 +56,7 @@ namespace zmq ...@@ -56,7 +56,7 @@ namespace zmq
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
blob_t get_credential () const; const blob_t &get_credential () const;
void xread_activated (zmq::pipe_t *pipe_); void xread_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_);
......
...@@ -48,7 +48,7 @@ zmq::mechanism_t::~mechanism_t () ...@@ -48,7 +48,7 @@ zmq::mechanism_t::~mechanism_t ()
void zmq::mechanism_t::set_peer_routing_id (const void *id_ptr, size_t id_size) void zmq::mechanism_t::set_peer_routing_id (const void *id_ptr, size_t id_size)
{ {
routing_id = blob_t (static_cast <const unsigned char*> (id_ptr), id_size); routing_id.set (static_cast <const unsigned char*> (id_ptr), id_size);
} }
void zmq::mechanism_t::peer_routing_id (msg_t *msg_) void zmq::mechanism_t::peer_routing_id (msg_t *msg_)
...@@ -61,12 +61,12 @@ void zmq::mechanism_t::peer_routing_id (msg_t *msg_) ...@@ -61,12 +61,12 @@ void zmq::mechanism_t::peer_routing_id (msg_t *msg_)
void zmq::mechanism_t::set_user_id (const void *data_, size_t size_) void zmq::mechanism_t::set_user_id (const void *data_, size_t size_)
{ {
user_id = blob_t (static_cast <const unsigned char*> (data_), size_); user_id.set (static_cast <const unsigned char*> (data_), size_);
zap_properties.insert (metadata_t::dict_t::value_type ( zap_properties.insert (metadata_t::dict_t::value_type (
ZMQ_MSG_PROPERTY_USER_ID, std::string ((char *) data_, size_))); ZMQ_MSG_PROPERTY_USER_ID, std::string ((char *) data_, size_)));
} }
zmq::blob_t zmq::mechanism_t::get_user_id () const const zmq::blob_t &zmq::mechanism_t::get_user_id () const
{ {
return user_id; return user_id;
} }
......
...@@ -80,7 +80,7 @@ namespace zmq ...@@ -80,7 +80,7 @@ namespace zmq
void set_user_id (const void *user_id, size_t size); void set_user_id (const void *user_id, size_t size);
blob_t get_user_id () const; const blob_t &get_user_id () const;
const metadata_t::dict_t& get_zmtp_properties () { const metadata_t::dict_t& get_zmtp_properties () {
return zmtp_properties; return zmtp_properties;
......
...@@ -65,7 +65,7 @@ void zmq::pair_t::xpipe_terminated (pipe_t *pipe_) ...@@ -65,7 +65,7 @@ void zmq::pair_t::xpipe_terminated (pipe_t *pipe_)
{ {
if (pipe_ == pipe) { if (pipe_ == pipe) {
if (last_in == pipe) { if (last_in == pipe) {
saved_credential = last_in->get_credential (); saved_credential.set_deep_copy (last_in->get_credential ());
last_in = NULL; last_in = NULL;
} }
pipe = NULL; pipe = NULL;
...@@ -136,7 +136,7 @@ bool zmq::pair_t::xhas_out () ...@@ -136,7 +136,7 @@ bool zmq::pair_t::xhas_out ()
return pipe->check_write (); return pipe->check_write ();
} }
zmq::blob_t zmq::pair_t::get_credential () const const zmq::blob_t &zmq::pair_t::get_credential () const
{ {
return last_in? last_in->get_credential (): saved_credential; return last_in? last_in->get_credential (): saved_credential;
} }
...@@ -56,7 +56,7 @@ namespace zmq ...@@ -56,7 +56,7 @@ namespace zmq
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
blob_t get_credential () const; const blob_t &get_credential () const;
void xread_activated (zmq::pipe_t *pipe_); void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_);
......
...@@ -127,15 +127,15 @@ uint32_t zmq::pipe_t::get_server_socket_routing_id () ...@@ -127,15 +127,15 @@ uint32_t zmq::pipe_t::get_server_socket_routing_id ()
void zmq::pipe_t::set_router_socket_routing_id (const blob_t &router_socket_routing_id_) void zmq::pipe_t::set_router_socket_routing_id (const blob_t &router_socket_routing_id_)
{ {
router_socket_routing_id = router_socket_routing_id_; router_socket_routing_id.set_deep_copy (router_socket_routing_id_);
} }
zmq::blob_t zmq::pipe_t::get_routing_id () const zmq::blob_t &zmq::pipe_t::get_routing_id ()
{ {
return router_socket_routing_id; return router_socket_routing_id;
} }
zmq::blob_t zmq::pipe_t::get_credential () const const zmq::blob_t &zmq::pipe_t::get_credential () const
{ {
return credential; return credential;
} }
...@@ -182,7 +182,7 @@ read_message: ...@@ -182,7 +182,7 @@ read_message:
// If this is a credential, save a copy and receive next message. // If this is a credential, save a copy and receive next message.
if (unlikely (msg_->is_credential ())) { if (unlikely (msg_->is_credential ())) {
const unsigned char *data = static_cast <const unsigned char *> (msg_->data ()); const unsigned char *data = static_cast <const unsigned char *> (msg_->data ());
credential = blob_t (data, msg_->size ()); credential.set (data, msg_->size ());
const int rc = msg_->close (); const int rc = msg_->close ();
zmq_assert (rc == 0); zmq_assert (rc == 0);
goto read_message; goto read_message;
......
...@@ -90,9 +90,9 @@ namespace zmq ...@@ -90,9 +90,9 @@ namespace zmq
// Pipe endpoint can store an opaque ID to be used by its clients. // Pipe endpoint can store an opaque ID to be used by its clients.
void set_router_socket_routing_id (const blob_t &identity_); void set_router_socket_routing_id (const blob_t &identity_);
blob_t get_routing_id (); const blob_t &get_routing_id ();
blob_t get_credential () const; const blob_t &get_credential () const;
// Returns true if there is at least one message to read in the pipe. // Returns true if there is at least one message to read in the pipe.
bool check_read (); bool check_read ();
......
...@@ -72,7 +72,7 @@ bool zmq::pull_t::xhas_in () ...@@ -72,7 +72,7 @@ bool zmq::pull_t::xhas_in ()
return fq.has_in (); return fq.has_in ();
} }
zmq::blob_t zmq::pull_t::get_credential () const const zmq::blob_t &zmq::pull_t::get_credential () const
{ {
return fq.get_credential (); return fq.get_credential ();
} }
...@@ -56,7 +56,7 @@ namespace zmq ...@@ -56,7 +56,7 @@ namespace zmq
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
blob_t get_credential () const; const blob_t &get_credential () const;
void xread_activated (zmq::pipe_t *pipe_); void xread_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_);
......
...@@ -357,7 +357,7 @@ int zmq::router_t::xrecv (msg_t *msg_) ...@@ -357,7 +357,7 @@ int zmq::router_t::xrecv (msg_t *msg_)
prefetched = true; prefetched = true;
current_in = pipe; current_in = pipe;
blob_t routing_id = pipe->get_routing_id (); const blob_t &routing_id = pipe->get_routing_id ();
rc = msg_->init_size (routing_id.size ()); rc = msg_->init_size (routing_id.size ());
errno_assert (rc == 0); errno_assert (rc == 0);
memcpy (msg_->data (), routing_id.data (), routing_id.size ()); memcpy (msg_->data (), routing_id.data (), routing_id.size ());
...@@ -408,7 +408,7 @@ bool zmq::router_t::xhas_in () ...@@ -408,7 +408,7 @@ bool zmq::router_t::xhas_in ()
zmq_assert (pipe != NULL); zmq_assert (pipe != NULL);
blob_t routing_id = pipe->get_routing_id (); const blob_t &routing_id = pipe->get_routing_id ();
rc = prefetched_id.init_size (routing_id.size ()); rc = prefetched_id.init_size (routing_id.size ());
errno_assert (rc == 0); errno_assert (rc == 0);
memcpy (prefetched_id.data (), routing_id.data (), routing_id.size ()); memcpy (prefetched_id.data (), routing_id.data (), routing_id.size ());
...@@ -438,7 +438,7 @@ bool zmq::router_t::xhas_out () ...@@ -438,7 +438,7 @@ bool zmq::router_t::xhas_out ()
return has_out; return has_out;
} }
zmq::blob_t zmq::router_t::get_credential () const const zmq::blob_t &zmq::router_t::get_credential () const
{ {
return fq.get_credential (); return fq.get_credential ();
} }
...@@ -467,15 +467,15 @@ int zmq::router_t::get_peer_state (const void *routing_id_, ...@@ -467,15 +467,15 @@ int zmq::router_t::get_peer_state (const void *routing_id_,
bool zmq::router_t::identify_peer (pipe_t *pipe_) bool zmq::router_t::identify_peer (pipe_t *pipe_)
{ {
msg_t msg; msg_t msg;
blob_t routing_id;
bool ok; bool ok;
blob_t routing_id;
if (connect_routing_id.length()) { if (connect_routing_id.length()) {
routing_id = blob_t ((unsigned char*) connect_routing_id.c_str (), routing_id.set ((unsigned char*)connect_routing_id.c_str(),
connect_routing_id.length()); connect_routing_id.length());
connect_routing_id.clear (); connect_routing_id.clear();
outpipes_t::iterator it = outpipes.find (routing_id); outpipes_t::iterator it = outpipes.find(routing_id);
if (it != outpipes.end ()) if (it != outpipes.end())
zmq_assert(false); // Not allowed to duplicate an existing rid zmq_assert(false); // Not allowed to duplicate an existing rid
} }
else else
...@@ -483,7 +483,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -483,7 +483,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
unsigned char buf [5]; unsigned char buf [5];
buf [0] = 0; buf [0] = 0;
put_uint32 (buf + 1, next_integral_routing_id++); put_uint32 (buf + 1, next_integral_routing_id++);
routing_id = blob_t (buf, sizeof buf); routing_id.set (buf, sizeof buf);
} }
else else
if (!options.raw_socket) { if (!options.raw_socket) {
...@@ -498,11 +498,11 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -498,11 +498,11 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
unsigned char buf [5]; unsigned char buf [5];
buf [0] = 0; buf [0] = 0;
put_uint32 (buf + 1, next_integral_routing_id++); put_uint32 (buf + 1, next_integral_routing_id++);
routing_id = blob_t (buf, sizeof buf); routing_id.set (buf, sizeof buf);
msg.close (); msg.close ();
} }
else { else {
routing_id = blob_t ((unsigned char*) msg.data (), msg.size ()); routing_id.set ((unsigned char*) msg.data (), msg.size ());
outpipes_t::iterator it = outpipes.find (routing_id); outpipes_t::iterator it = outpipes.find (routing_id);
msg.close (); msg.close ();
...@@ -517,14 +517,14 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -517,14 +517,14 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
unsigned char buf [5]; unsigned char buf [5];
buf [0] = 0; buf [0] = 0;
put_uint32 (buf + 1, next_integral_routing_id++); put_uint32 (buf + 1, next_integral_routing_id++);
blob_t new_routing_id = blob_t (buf, sizeof buf); blob_t new_routing_id (buf, sizeof buf);
it->second.pipe->set_router_socket_routing_id (new_routing_id); it->second.pipe->set_router_socket_routing_id (new_routing_id);
outpipe_t existing_outpipe = outpipe_t existing_outpipe =
{it->second.pipe, it->second.active}; {it->second.pipe, it->second.active};
ok = outpipes.insert (outpipes_t::value_type ( ok = outpipes.ZMQ_MAP_INSERT_OR_EMPLACE (
new_routing_id, existing_outpipe)).second; ZMQ_MOVE(new_routing_id), existing_outpipe).second;
zmq_assert (ok); zmq_assert (ok);
// Remove the existing routing id entry to allow the new // Remove the existing routing id entry to allow the new
...@@ -543,7 +543,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -543,7 +543,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
pipe_->set_router_socket_routing_id (routing_id); pipe_->set_router_socket_routing_id (routing_id);
// Add the record into output pipes lookup table // Add the record into output pipes lookup table
outpipe_t outpipe = {pipe_, true}; outpipe_t outpipe = {pipe_, true};
ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second; ok = outpipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE(routing_id), outpipe).second;
zmq_assert (ok); zmq_assert (ok);
return true; return true;
......
...@@ -70,7 +70,7 @@ namespace zmq ...@@ -70,7 +70,7 @@ namespace zmq
// Rollback any message parts that were sent but not yet flushed. // Rollback any message parts that were sent but not yet flushed.
int rollback (); int rollback ();
blob_t get_credential () const; const blob_t &get_credential () const;
private: private:
......
...@@ -178,7 +178,7 @@ bool zmq::server_t::xhas_out () ...@@ -178,7 +178,7 @@ bool zmq::server_t::xhas_out ()
return true; return true;
} }
zmq::blob_t zmq::server_t::get_credential () const const zmq::blob_t &zmq::server_t::get_credential () const
{ {
return fq.get_credential (); return fq.get_credential ();
} }
...@@ -66,7 +66,7 @@ namespace zmq ...@@ -66,7 +66,7 @@ namespace zmq
protected: protected:
blob_t get_credential () const; const blob_t &get_credential () const;
private: private:
......
...@@ -1491,9 +1491,11 @@ int zmq::socket_base_t::xrecv (msg_t *) ...@@ -1491,9 +1491,11 @@ int zmq::socket_base_t::xrecv (msg_t *)
return -1; return -1;
} }
zmq::blob_t zmq::socket_base_t::get_credential () const static const zmq::blob_t empty_blob;
const zmq::blob_t &zmq::socket_base_t::get_credential () const
{ {
return blob_t (); return empty_blob;
} }
void zmq::socket_base_t::xread_activated (pipe_t *) void zmq::socket_base_t::xread_activated (pipe_t *)
......
...@@ -170,7 +170,7 @@ namespace zmq ...@@ -170,7 +170,7 @@ namespace zmq
// Returns the credential for the peer from which we have received // Returns the credential for the peer from which we have received
// the last message. If no message has been received yet, // the last message. If no message has been received yet,
// the function returns empty credential. // the function returns empty credential.
virtual blob_t get_credential () const; virtual const blob_t &get_credential () const;
// i_pipe_events will be forwarded to these functions. // i_pipe_events will be forwarded to these functions.
virtual void xread_activated (pipe_t *pipe_); virtual void xread_activated (pipe_t *pipe_);
......
...@@ -231,7 +231,7 @@ int zmq::stream_t::xrecv (msg_t *msg_) ...@@ -231,7 +231,7 @@ int zmq::stream_t::xrecv (msg_t *msg_)
// We have received a frame with TCP data. // We have received a frame with TCP data.
// Rather than sending this frame, we keep it in prefetched // Rather than sending this frame, we keep it in prefetched
// buffer and send a frame with peer's ID. // buffer and send a frame with peer's ID.
blob_t routing_id = pipe->get_routing_id (); const blob_t &routing_id = pipe->get_routing_id ();
rc = msg_->close(); rc = msg_->close();
errno_assert (rc == 0); errno_assert (rc == 0);
rc = msg_->init_size (routing_id.size ()); rc = msg_->init_size (routing_id.size ());
...@@ -267,7 +267,7 @@ bool zmq::stream_t::xhas_in () ...@@ -267,7 +267,7 @@ bool zmq::stream_t::xhas_in ()
zmq_assert (pipe != NULL); zmq_assert (pipe != NULL);
zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0); zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0);
blob_t routing_id = pipe->get_routing_id (); const blob_t &routing_id = pipe->get_routing_id ();
rc = prefetched_routing_id.init_size (routing_id.size ()); rc = prefetched_routing_id.init_size (routing_id.size ());
errno_assert (rc == 0); errno_assert (rc == 0);
...@@ -300,7 +300,7 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) ...@@ -300,7 +300,7 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
buffer [0] = 0; buffer [0] = 0;
blob_t routing_id; blob_t routing_id;
if (connect_routing_id.length ()) { if (connect_routing_id.length ()) {
routing_id = blob_t ((unsigned char*) connect_routing_id.c_str(), routing_id.set ((unsigned char*) connect_routing_id.c_str(),
connect_routing_id.length ()); connect_routing_id.length ());
connect_routing_id.clear (); connect_routing_id.clear ();
outpipes_t::iterator it = outpipes.find (routing_id); outpipes_t::iterator it = outpipes.find (routing_id);
...@@ -308,14 +308,14 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) ...@@ -308,14 +308,14 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
} }
else { else {
put_uint32 (buffer + 1, next_integral_routing_id++); put_uint32 (buffer + 1, next_integral_routing_id++);
routing_id = blob_t (buffer, sizeof buffer); routing_id.set (buffer, sizeof buffer);
memcpy (options.routing_id, routing_id.data (), routing_id.size ()); memcpy (options.routing_id, routing_id.data (), routing_id.size ());
options.routing_id_size = (unsigned char) routing_id.size (); options.routing_id_size = (unsigned char) routing_id.size ();
} }
pipe_->set_router_socket_routing_id (routing_id); pipe_->set_router_socket_routing_id (routing_id);
// Add the record into output pipes lookup table // Add the record into output pipes lookup table
outpipe_t outpipe = {pipe_, true}; outpipe_t outpipe = {pipe_, true};
const bool ok = outpipes.insert ( const bool ok = outpipes.ZMQ_MAP_INSERT_OR_EMPLACE (
outpipes_t::value_type (routing_id, outpipe)).second; ZMQ_MOVE(routing_id), outpipe).second;
zmq_assert (ok); zmq_assert (ok);
} }
...@@ -901,7 +901,7 @@ int zmq::stream_engine_t::write_credential (msg_t *msg_) ...@@ -901,7 +901,7 @@ int zmq::stream_engine_t::write_credential (msg_t *msg_)
zmq_assert (mechanism != NULL); zmq_assert (mechanism != NULL);
zmq_assert (session != NULL); zmq_assert (session != NULL);
const blob_t credential = mechanism->get_user_id (); const blob_t &credential = mechanism->get_user_id ();
if (credential.size () > 0) { if (credential.size () > 0) {
msg_t msg; msg_t msg;
int rc = msg.init_size (credential.size ()); int rc = msg.init_size (credential.size ());
......
...@@ -324,11 +324,11 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, ...@@ -324,11 +324,11 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
if (self->options.type != ZMQ_PUB) { if (self->options.type != ZMQ_PUB) {
// Place the unsubscription to the queue of pending (un)subscriptions // Place the unsubscription to the queue of pending (un)subscriptions
// to be retrieved by the user later on. // to be retrieved by the user later on.
blob_t unsub (size_ + 1, 0); blob_t unsub (size_ + 1);
unsub [0] = 0; *unsub.data() = 0;
if (size_ > 0) if (size_ > 0)
memcpy (&unsub [1], data_, size_); memcpy (unsub.data() + 1, data_, size_);
self->pending_data.push_back (unsub); self->pending_data.ZMQ_PUSH_OR_EMPLACE_BACK (ZMQ_MOVE(unsub));
self->pending_metadata.push_back (NULL); self->pending_metadata.push_back (NULL);
self->pending_flags.push_back (0); self->pending_flags.push_back (0);
......
...@@ -113,7 +113,6 @@ namespace zmq ...@@ -113,7 +113,6 @@ namespace zmq
// List of pending (un)subscriptions, ie. those that were already // List of pending (un)subscriptions, ie. those that were already
// applied to the trie, but not yet received by the user. // applied to the trie, but not yet received by the user.
typedef std::basic_string <unsigned char> blob_t;
std::deque <blob_t> pending_data; std::deque <blob_t> pending_data;
std::deque <metadata_t*> pending_metadata; std::deque <metadata_t*> pending_metadata;
std::deque <unsigned char> pending_flags; std::deque <unsigned char> pending_flags;
......
...@@ -210,7 +210,7 @@ bool zmq::xsub_t::xhas_in () ...@@ -210,7 +210,7 @@ bool zmq::xsub_t::xhas_in ()
} }
} }
zmq::blob_t zmq::xsub_t::get_credential () const const zmq::blob_t &zmq::xsub_t::get_credential () const
{ {
return fq.get_credential (); return fq.get_credential ();
} }
......
...@@ -59,7 +59,7 @@ namespace zmq ...@@ -59,7 +59,7 @@ namespace zmq
bool xhas_out (); bool xhas_out ();
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
blob_t get_credential () const; const blob_t &get_credential () const;
void xread_activated (zmq::pipe_t *pipe_); void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_);
void xhiccuped (pipe_t *pipe_); void xhiccuped (pipe_t *pipe_);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment