Commit 88695aae authored by malosek's avatar malosek

link libzmq with glib when congifured --with-pgm

parent f824b8a0
...@@ -52,6 +52,8 @@ extern "C" { ...@@ -52,6 +52,8 @@ extern "C" {
#define ZMQ_IDENTITY 6 #define ZMQ_IDENTITY 6
#define ZMQ_SUBSCRIBE 7 #define ZMQ_SUBSCRIBE 7
#define ZMQ_UNSUBSCRIBE 8 #define ZMQ_UNSUBSCRIBE 8
#define ZMQ_RATE 9
#define ZMQ_RECOVERY_IVL 10
// The operation should be performed in non-blocking mode. I.e. if it cannot // The operation should be performed in non-blocking mode. I.e. if it cannot
// be processed immediately, error should be returned with errno set to EAGAIN. // be processed immediately, error should be returned with errno set to EAGAIN.
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <assert.h> #include <assert.h>
#include <stddef.h> #include <stddef.h>
#include <stdint.h>
int main (int argc, char *argv []) int main (int argc, char *argv [])
{ {
...@@ -34,9 +35,19 @@ int main (int argc, char *argv []) ...@@ -34,9 +35,19 @@ int main (int argc, char *argv [])
size_t message_size = (size_t) atoi (argv [2]); size_t message_size = (size_t) atoi (argv [2]);
int message_count = atoi (argv [3]); int message_count = atoi (argv [3]);
// appl threads, io threads
zmq::context_t ctx (1, 1); zmq::context_t ctx (1, 1);
zmq::socket_t s (ctx, ZMQ_P2P); zmq::socket_t s (ctx, ZMQ_PUB);
// 10Mb/s
uint32_t rate = 10000;
s.setsockopt (ZMQ_RATE, &rate, sizeof (rate));
// 10s
uint32_t recovery_ivl = 10;
s.setsockopt (ZMQ_RECOVERY_IVL, &recovery_ivl, sizeof (recovery_ivl));
s.connect (connect_to); s.connect (connect_to);
for (int i = 0; i != message_count; i++) { for (int i = 0; i != message_count; i++) {
......
...@@ -62,6 +62,8 @@ libzmq_la_SOURCES = $(pgm_sources) \ ...@@ -62,6 +62,8 @@ libzmq_la_SOURCES = $(pgm_sources) \
object.hpp \ object.hpp \
options.hpp \ options.hpp \
owned.hpp \ owned.hpp \
pgm_sender.hpp \
pgm_socket.hpp \
pipe.hpp \ pipe.hpp \
platform.hpp \ platform.hpp \
poll.hpp \ poll.hpp \
...@@ -101,6 +103,8 @@ libzmq_la_SOURCES = $(pgm_sources) \ ...@@ -101,6 +103,8 @@ libzmq_la_SOURCES = $(pgm_sources) \
object.cpp \ object.cpp \
options.cpp \ options.cpp \
owned.cpp \ owned.cpp \
pgm_sender.cpp \
pgm_socket.cpp \
pipe.cpp \ pipe.cpp \
poll.cpp \ poll.cpp \
select.cpp \ select.cpp \
...@@ -122,7 +126,7 @@ libzmq_la_SOURCES = $(pgm_sources) \ ...@@ -122,7 +126,7 @@ libzmq_la_SOURCES = $(pgm_sources) \
zmq_listener.cpp \ zmq_listener.cpp \
zmq_listener_init.cpp zmq_listener_init.cpp
libzmq_la_LDFLAGS = -version-info @LTVER@ libzmq_la_LDFLAGS = -version-info @LTVER@ @LIBZMQ_EXTRA_LDFLAFS@
if BUILD_PGM if BUILD_PGM
libzmq_la_CXXFLAGS = -I$(top_srcdir)/foreign/openpgm/@pgm_basename@/openpgm/pgm/include/ -Wall @LIBZMQ_EXTRA_CXXFLAGS@ libzmq_la_CXXFLAGS = -I$(top_srcdir)/foreign/openpgm/@pgm_basename@/openpgm/pgm/include/ -Wall @LIBZMQ_EXTRA_CXXFLAGS@
......
...@@ -70,8 +70,10 @@ namespace zmq ...@@ -70,8 +70,10 @@ namespace zmq
// Maximal number of non-accepted connections that can be held by // Maximal number of non-accepted connections that can be held by
// TCP listener object. // TCP listener object.
tcp_connection_backlog = 10 tcp_connection_backlog = 10,
// Maximum transport data unit size for PGM (TPDU).
pgm_max_tpdu = 1500
}; };
} }
......
...@@ -24,6 +24,8 @@ zmq::options_t::options_t () : ...@@ -24,6 +24,8 @@ zmq::options_t::options_t () :
lwm (0), lwm (0),
swap (0), swap (0),
mask (0), mask (0),
affinity (0) affinity (0),
rate (0),
recovery_ivl (0)
{ {
} }
...@@ -37,6 +37,12 @@ namespace zmq ...@@ -37,6 +37,12 @@ namespace zmq
uint64_t mask; uint64_t mask;
uint64_t affinity; uint64_t affinity;
std::string identity; std::string identity;
// Maximum tranfer rate [kb/s].
uint32_t rate;
// Reliability time interval [s].
uint32_t recovery_ivl;
}; };
} }
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <iostream>
#include <string> #include <string>
#include <algorithm> #include <algorithm>
...@@ -35,6 +37,7 @@ ...@@ -35,6 +37,7 @@
#include "uuid.hpp" #include "uuid.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp" #include "err.hpp"
#include "platform.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_), object_t (parent_),
...@@ -145,6 +148,22 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, ...@@ -145,6 +148,22 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
errno = ENOTSUP; errno = ENOTSUP;
return -1; return -1;
case ZMQ_RATE:
if (optvallen_ != sizeof (uint32_t)) {
errno = EINVAL;
return -1;
}
options.rate = *((int32_t*) optval_);
return 0;
case ZMQ_RECOVERY_IVL:
if (optvallen_ != sizeof (uint32_t)) {
errno = EINVAL;
return -1;
}
options.recovery_ivl = *((int32_t*) optval_);
return 0;
default: default:
errno = EINVAL; errno = EINVAL;
return -1; return -1;
...@@ -170,6 +189,21 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -170,6 +189,21 @@ int zmq::socket_base_t::connect (const char *addr_)
std::string session_name ("#"); std::string session_name ("#");
session_name += uuid_t ().to_string (); session_name += uuid_t ().to_string ();
// Parse addr_ string.
std::string addr_type;
std::string addr_args;
std::string addr (addr_);
std::string::size_type pos = addr.find ("://");
if (pos == std::string::npos) {
errno = EINVAL;
return -1;
}
addr_type = addr.substr (0, pos);
addr_args = addr.substr (pos + 3);
// Create the session. // Create the session.
io_thread_t *io_thread = choose_io_thread (options.affinity); io_thread_t *io_thread = choose_io_thread (options.affinity);
session_t *session = new session_t (io_thread, this, session_name.c_str (), session_t *session = new session_t (io_thread, this, session_name.c_str (),
...@@ -198,20 +232,36 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -198,20 +232,36 @@ int zmq::socket_base_t::connect (const char *addr_)
send_plug (session); send_plug (session);
send_own (this, session); send_own (this, session);
// Create the connecter object. Supply it with the session name so that if (addr_type == "tcp") {
// it can bind the new connection to the session once it is established.
zmq_connecter_t *connecter = new zmq_connecter_t ( // Create the connecter object. Supply it with the session name so that
choose_io_thread (options.affinity), this, options, // it can bind the new connection to the session once it is established.
session_name.c_str ()); zmq_connecter_t *connecter = new zmq_connecter_t (
int rc = connecter->set_address (addr_); choose_io_thread (options.affinity), this, options,
if (rc != 0) { session_name.c_str ());
delete connecter; int rc = connecter->set_address (addr_args.c_str ());
return -1; if (rc != 0) {
delete connecter;
return -1;
}
send_plug (connecter);
send_own (this, connecter);
return 0;
} }
send_plug (connecter);
send_own (this, connecter);
return 0; #if defined ZMQ_HAVE_OPENPGM
if (addr_type == "pgm") {
zmq_assert (false);
return 0;
}
#endif
// Unknown address type.
errno = ENOTSUP;
return -1;
} }
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment