Commit bd0ba6e8 authored by Martin Sustrik's avatar Martin Sustrik

Size of inproc hwm and swap is sum of peers' hwms and swaps

The meat of the patch was contributed by Douglas Creager.
Martin Sustrik implemented storing peer options in inproc
endpoint repository.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent babdf48a
...@@ -16,6 +16,7 @@ Conrad D. Steenberg <conrad.steenberg@caltech.edu> ...@@ -16,6 +16,7 @@ Conrad D. Steenberg <conrad.steenberg@caltech.edu>
Dhammika Pathirana <dhammika@gmail.com> Dhammika Pathirana <dhammika@gmail.com>
Dhruva Krishnamurthy <dhruva@ymail.com> Dhruva Krishnamurthy <dhruva@ymail.com>
Dirk O. Kaar <dok@dok-net.net> Dirk O. Kaar <dok@dok-net.net>
Douglas Creager <douglas.creager@redjack.com>
Erich Heine <sophacles@gmail.com> Erich Heine <sophacles@gmail.com>
Erik Rigtorp <erik@rigtorp.com> Erik Rigtorp <erik@rigtorp.com>
Frank Denis <zeromq@pureftpd.org> Frank Denis <zeromq@pureftpd.org>
......
...@@ -242,13 +242,12 @@ zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_) ...@@ -242,13 +242,12 @@ zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
return io_threads [result]; return io_threads [result];
} }
int zmq::ctx_t::register_endpoint (const char *addr_, int zmq::ctx_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)
socket_base_t *socket_)
{ {
endpoints_sync.lock (); endpoints_sync.lock ();
bool inserted = endpoints.insert (endpoints_t::value_type ( bool inserted = endpoints.insert (endpoints_t::value_type (
std::string (addr_), socket_)).second; std::string (addr_), endpoint_)).second;
if (!inserted) { if (!inserted) {
errno = EADDRINUSE; errno = EADDRINUSE;
endpoints_sync.unlock (); endpoints_sync.unlock ();
...@@ -265,7 +264,7 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_) ...@@ -265,7 +264,7 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
endpoints_t::iterator it = endpoints.begin (); endpoints_t::iterator it = endpoints.begin ();
while (it != endpoints.end ()) { while (it != endpoints.end ()) {
if (it->second == socket_) { if (it->second.socket == socket_) {
endpoints_t::iterator to_erase = it; endpoints_t::iterator to_erase = it;
it++; it++;
endpoints.erase (to_erase); endpoints.erase (to_erase);
...@@ -277,7 +276,7 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_) ...@@ -277,7 +276,7 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
endpoints_sync.unlock (); endpoints_sync.unlock ();
} }
zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_) zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
{ {
endpoints_sync.lock (); endpoints_sync.lock ();
...@@ -285,18 +284,19 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_) ...@@ -285,18 +284,19 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_)
if (it == endpoints.end ()) { if (it == endpoints.end ()) {
endpoints_sync.unlock (); endpoints_sync.unlock ();
errno = ECONNREFUSED; errno = ECONNREFUSED;
return NULL; endpoint_t empty = {NULL, options_t()};
return empty;
} }
socket_base_t *endpoint = it->second; endpoint_t *endpoint = &it->second;
// Increment the command sequence number of the peer so that it won't // Increment the command sequence number of the peer so that it won't
// get deallocated until "bind" command is issued by the caller. // get deallocated until "bind" command is issued by the caller.
// The subsequent 'bind' has to be called with inc_seqnum parameter // The subsequent 'bind' has to be called with inc_seqnum parameter
// set to false, so that the seqnum isn't incremented twice. // set to false, so that the seqnum isn't incremented twice.
endpoint->inc_seqnum (); endpoint->socket->inc_seqnum ();
endpoints_sync.unlock (); endpoints_sync.unlock ();
return endpoint; return *endpoint;
} }
void zmq::ctx_t::log (zmq_msg_t *msg_) void zmq::ctx_t::log (zmq_msg_t *msg_)
......
...@@ -34,9 +34,18 @@ ...@@ -34,9 +34,18 @@
#include "mutex.hpp" #include "mutex.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "thread.hpp" #include "thread.hpp"
#include "options.hpp"
namespace zmq namespace zmq
{ {
// Information associated with inproc endpoint. Note that endpoint options
// are registered as well so that the peer can access them without a need
// for synchronisation, handshaking or similar.
struct endpoint_t
{
class socket_base_t *socket;
options_t options;
};
// Context object encapsulates all the global state associated with // Context object encapsulates all the global state associated with
// the library. // the library.
...@@ -70,9 +79,9 @@ namespace zmq ...@@ -70,9 +79,9 @@ namespace zmq
class io_thread_t *choose_io_thread (uint64_t affinity_); class io_thread_t *choose_io_thread (uint64_t affinity_);
// Management of inproc endpoints. // Management of inproc endpoints.
int register_endpoint (const char *addr_, class socket_base_t *socket_); int register_endpoint (const char *addr_, endpoint_t &endpoint_);
void unregister_endpoints (class socket_base_t *socket_); void unregister_endpoints (class socket_base_t *socket_);
class socket_base_t *find_endpoint (const char *addr_); endpoint_t find_endpoint (const char *addr_);
// Logging. // Logging.
void log (zmq_msg_t *msg_); void log (zmq_msg_t *msg_);
...@@ -122,7 +131,7 @@ namespace zmq ...@@ -122,7 +131,7 @@ namespace zmq
mailbox_t **slots; mailbox_t **slots;
// List of inproc endpoints within this context. // List of inproc endpoints within this context.
typedef std::map <std::string, class socket_base_t*> endpoints_t; typedef std::map <std::string, endpoint_t> endpoints_t;
endpoints_t endpoints; endpoints_t endpoints;
// Synchronisation of access to the list of inproc endpoints. // Synchronisation of access to the list of inproc endpoints.
......
...@@ -123,9 +123,9 @@ void zmq::object_t::process_command (command_t &cmd_) ...@@ -123,9 +123,9 @@ void zmq::object_t::process_command (command_t &cmd_)
deallocate_command (&cmd_); deallocate_command (&cmd_);
} }
int zmq::object_t::register_endpoint (const char *addr_, socket_base_t *socket_) int zmq::object_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)
{ {
return ctx->register_endpoint (addr_, socket_); return ctx->register_endpoint (addr_, endpoint_);
} }
void zmq::object_t::unregister_endpoints (socket_base_t *socket_) void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
...@@ -133,7 +133,7 @@ void zmq::object_t::unregister_endpoints (socket_base_t *socket_) ...@@ -133,7 +133,7 @@ void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
return ctx->unregister_endpoints (socket_); return ctx->unregister_endpoints (socket_);
} }
zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_) zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
{ {
return ctx->find_endpoint (addr_); return ctx->find_endpoint (addr_);
} }
......
...@@ -46,9 +46,9 @@ namespace zmq ...@@ -46,9 +46,9 @@ namespace zmq
// Using following function, socket is able to access global // Using following function, socket is able to access global
// repository of inproc endpoints. // repository of inproc endpoints.
int register_endpoint (const char *addr_, class socket_base_t *socket_); int register_endpoint (const char *addr_, struct endpoint_t &endpoint_);
void unregister_endpoints (class socket_base_t *socket_); void unregister_endpoints (class socket_base_t *socket_);
class socket_base_t *find_endpoint (const char *addr_); struct endpoint_t find_endpoint (const char *addr_);
// Logs an message. // Logs an message.
void log (zmq_msg_t *msg_); void log (zmq_msg_t *msg_);
......
...@@ -300,8 +300,10 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -300,8 +300,10 @@ int zmq::socket_base_t::bind (const char *addr_)
if (rc != 0) if (rc != 0)
return -1; return -1;
if (protocol == "inproc" || protocol == "sys") if (protocol == "inproc" || protocol == "sys") {
return register_endpoint (addr_, this); endpoint_t endpoint = {this, options};
return register_endpoint (addr_, endpoint);
}
if (protocol == "tcp" || protocol == "ipc") { if (protocol == "tcp" || protocol == "ipc") {
...@@ -361,9 +363,9 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -361,9 +363,9 @@ int zmq::socket_base_t::connect (const char *addr_)
// as there's no 'reconnect' functionality implemented. Once that // as there's no 'reconnect' functionality implemented. Once that
// is in place we should follow generic pipe creation algorithm. // is in place we should follow generic pipe creation algorithm.
// Find the peer socket. // Find the peer endpoint.
socket_base_t *peer = find_endpoint (addr_); endpoint_t peer = find_endpoint (addr_);
if (!peer) if (!peer.socket)
return -1; return -1;
reader_t *inpipe_reader = NULL; reader_t *inpipe_reader = NULL;
...@@ -371,14 +373,28 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -371,14 +373,28 @@ int zmq::socket_base_t::connect (const char *addr_)
reader_t *outpipe_reader = NULL; reader_t *outpipe_reader = NULL;
writer_t *outpipe_writer = NULL; writer_t *outpipe_writer = NULL;
// The total HWM for an inproc connection should be the sum of
// the binder's HWM and the connector's HWM. (Similarly for the
// SWAP.)
int64_t hwm;
if (options.hwm == 0 || peer.options.hwm == 0)
hwm = 0;
else
hwm = options.hwm + peer.options.hwm;
int64_t swap;
if (options.swap == 0 && peer.options.swap == 0)
swap = 0;
else
swap = options.swap + peer.options.swap;
// Create inbound pipe, if required. // Create inbound pipe, if required.
if (options.requires_in) if (options.requires_in)
create_pipe (this, peer, options.hwm, options.swap, create_pipe (this, peer.socket, hwm, swap,
&inpipe_reader, &inpipe_writer); &inpipe_reader, &inpipe_writer);
// Create outbound pipe, if required. // Create outbound pipe, if required.
if (options.requires_out) if (options.requires_out)
create_pipe (peer, this, options.hwm, options.swap, create_pipe (peer.socket, this, hwm, swap,
&outpipe_reader, &outpipe_writer); &outpipe_reader, &outpipe_writer);
// Attach the pipes to this socket object. // Attach the pipes to this socket object.
...@@ -387,7 +403,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -387,7 +403,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// Attach the pipes to the peer socket. Note that peer's seqnum // Attach the pipes to the peer socket. Note that peer's seqnum
// was incremented in find_endpoint function. We don't need it // was incremented in find_endpoint function. We don't need it
// increased here. // increased here.
send_bind (peer, outpipe_reader, inpipe_writer, send_bind (peer.socket, outpipe_reader, inpipe_writer,
options.identity, false); options.identity, false);
return 0; return 0;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment