Commit d16b3bc0 authored by Martin Sustrik's avatar Martin Sustrik

Merge branch 'master' of git@github.com:sustrik/zeromq2

parents 35c58dc7 10dd8c53
...@@ -61,12 +61,12 @@ zmq::pgm_receiver_t::~pgm_receiver_t () ...@@ -61,12 +61,12 @@ zmq::pgm_receiver_t::~pgm_receiver_t ()
delete decoder; delete decoder;
} }
int zmq::pgm_receiver_t::init (const char *network_) int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
{ {
decoder = new zmq_decoder_t; decoder = new zmq_decoder_t;
zmq_assert (decoder); zmq_assert (decoder);
return pgm_socket.init (network_); return pgm_socket.init (udp_encapsulation_, network_);
} }
void zmq::pgm_receiver_t::plug (i_inout *inout_) void zmq::pgm_receiver_t::plug (i_inout *inout_)
......
...@@ -44,7 +44,7 @@ namespace zmq ...@@ -44,7 +44,7 @@ namespace zmq
const char *session_name_); const char *session_name_);
~pgm_receiver_t (); ~pgm_receiver_t ();
int init (const char *network_); int init (bool udp_encapsulation_, const char *network_);
void reconnect (); void reconnect ();
// i_engine interface implementation. // i_engine interface implementation.
......
...@@ -59,9 +59,9 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, ...@@ -59,9 +59,9 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
} }
int zmq::pgm_sender_t::init (const char *network_) int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
{ {
return pgm_socket.init (network_); return pgm_socket.init (udp_encapsulation_, network_);
} }
void zmq::pgm_sender_t::plug (i_inout *inout_) void zmq::pgm_sender_t::plug (i_inout *inout_)
...@@ -157,7 +157,7 @@ void zmq::pgm_sender_t::out_event () ...@@ -157,7 +157,7 @@ void zmq::pgm_sender_t::out_event ()
// We can write all data or 0 which means rate limit reached. // We can write all data or 0 which means rate limit reached.
if (write_size - write_pos != nbytes && nbytes != 0) { if (write_size - write_pos != nbytes && nbytes != 0) {
zmq_log (1, "write_size - write_pos %i, nbytes %i, %s(%i)", zmq_log (2, "write_size - write_pos %i, nbytes %i, %s(%i)",
(int)(write_size - write_pos), (int)nbytes, __FILE__, __LINE__); (int)(write_size - write_pos), (int)nbytes, __FILE__, __LINE__);
assert (false); assert (false);
} }
...@@ -180,11 +180,9 @@ void zmq::pgm_sender_t::out_event () ...@@ -180,11 +180,9 @@ void zmq::pgm_sender_t::out_event ()
size_t zmq::pgm_sender_t::write_one_pkt_with_offset (unsigned char *data_, size_t zmq::pgm_sender_t::write_one_pkt_with_offset (unsigned char *data_,
size_t size_, uint16_t offset_) size_t size_, uint16_t offset_)
{ {
zmq_log (1, "data_size %i, first message offset %i, %s(%i)\n", zmq_log (4, "data_size %i, first message offset %i, %s(%i)\n",
(int) size_, offset_, __FILE__, __LINE__); (int) size_, offset_, __FILE__, __LINE__);
std::cout << std::flush;
// Put offset information in the buffer. // Put offset information in the buffer.
put_uint16 (data_, offset_); put_uint16 (data_, offset_);
......
...@@ -42,7 +42,7 @@ namespace zmq ...@@ -42,7 +42,7 @@ namespace zmq
const char *session_name_); const char *session_name_);
~pgm_sender_t (); ~pgm_sender_t ();
int init (const char *network_); int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation. // i_engine interface implementation.
void plug (struct i_inout *inout_); void plug (struct i_inout *inout_);
......
...@@ -68,24 +68,12 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : ...@@ -68,24 +68,12 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :
} }
int zmq::pgm_socket_t::init (const char *network_) int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
{ {
// Check if we are encapsulating into UDP, natwork string has to udp_encapsulation = udp_encapsulation_;
// start with udp:.
const char *network_ptr = network_;
if (strlen (network_) >= 4 && network_ [0] == 'u' &&
network_ [1] == 'd' && network_ [2] == 'p' &&
network_ [3] == ':') {
// Shift interface_ptr after ':'.
network_ptr += 4;
udp_encapsulation = true;
}
// Parse port number. // Parse port number.
const char *port_delim = strchr (network_ptr, ':'); const char *port_delim = strchr (network_, ':');
if (!port_delim) { if (!port_delim) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
...@@ -93,20 +81,13 @@ int zmq::pgm_socket_t::init (const char *network_) ...@@ -93,20 +81,13 @@ int zmq::pgm_socket_t::init (const char *network_)
port_number = atoi (port_delim + 1); port_number = atoi (port_delim + 1);
// Store interface string. if (port_delim - network_ >= (int) sizeof (network) - 1) {
if (port_delim <= network_ptr) {
errno = EINVAL;
return -1;
}
if (port_delim - network_ptr >= (int) sizeof (network) - 1) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
memset (network, '\0', sizeof (network)); memset (network, '\0', sizeof (network));
memcpy (network, network_ptr, port_delim - network_ptr); memcpy (network, network_, port_delim - network_);
zmq_log (1, "parsed: network %s, port %i, udp encaps. %s, %s(%i)\n", zmq_log (1, "parsed: network %s, port %i, udp encaps. %s, %s(%i)\n",
network, port_number, udp_encapsulation ? "yes" : "no", network, port_number, udp_encapsulation ? "yes" : "no",
...@@ -364,7 +345,7 @@ int zmq::pgm_socket_t::open_transport (void) ...@@ -364,7 +345,7 @@ int zmq::pgm_socket_t::open_transport (void)
return -1; return -1;
} }
zmq_log (1, "Preallocated %i slices in TX window. %s(%i)\n", zmq_log (2, "Preallocated %i slices in TX window. %s(%i)\n",
to_preallocate, __FILE__, __LINE__); to_preallocate, __FILE__, __LINE__);
// Set interval of background SPM packets [us]. // Set interval of background SPM packets [us].
...@@ -611,7 +592,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_) ...@@ -611,7 +592,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
// Catch the rest of the errors. // Catch the rest of the errors.
if (nbytes_rec <= 0) { if (nbytes_rec <= 0) {
zmq_log (1, "received %i B, errno %i, %s(%i)", (int)nbytes_rec, zmq_log (2, "received %i B, errno %i, %s(%i)", (int)nbytes_rec,
errno, __FILE__, __LINE__); errno, __FILE__, __LINE__);
errno_assert (nbytes_rec > 0); errno_assert (nbytes_rec > 0);
} }
......
...@@ -52,7 +52,7 @@ namespace zmq ...@@ -52,7 +52,7 @@ namespace zmq
~pgm_socket_t (); ~pgm_socket_t ();
// Initialize PGM network structures (GSI, GSRs). // Initialize PGM network structures (GSI, GSRs).
int init (const char *network_); int init (bool udp_encapsulation_, const char *network_);
// Open PGM transport. Parameters are the same as in constructor. // Open PGM transport. Parameters are the same as in constructor.
int open_transport (void); int open_transport (void);
......
...@@ -208,7 +208,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -208,7 +208,7 @@ int zmq::socket_base_t::bind (const char *addr_)
} }
#if defined ZMQ_HAVE_OPENPGM #if defined ZMQ_HAVE_OPENPGM
if (addr_type == "pgm") { if (addr_type == "pgm" || addr_type == "udp") {
// In the case of PGM bind behaves the same like connect. // In the case of PGM bind behaves the same like connect.
return connect (addr_); return connect (addr_);
} }
...@@ -287,7 +287,12 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -287,7 +287,12 @@ int zmq::socket_base_t::connect (const char *addr_)
} }
#if defined ZMQ_HAVE_OPENPGM #if defined ZMQ_HAVE_OPENPGM
if (addr_type == "pgm") { if (addr_type == "pgm" || addr_type == "udp") {
// For udp, pgm transport with udp encapsulation is used.
bool udp_encapsulation = false;
if (addr_type == "udp")
udp_encapsulation = true;
switch (type) { switch (type) {
...@@ -298,7 +303,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -298,7 +303,7 @@ int zmq::socket_base_t::connect (const char *addr_)
new pgm_sender_t (choose_io_thread (options.affinity), options, new pgm_sender_t (choose_io_thread (options.affinity), options,
session_name.c_str ()); session_name.c_str ());
int rc = pgm_sender->init (addr_args.c_str ()); int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ());
if (rc != 0) { if (rc != 0) {
delete pgm_sender; delete pgm_sender;
return -1; return -1;
...@@ -320,7 +325,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -320,7 +325,7 @@ int zmq::socket_base_t::connect (const char *addr_)
new pgm_receiver_t (choose_io_thread (options.affinity), options, new pgm_receiver_t (choose_io_thread (options.affinity), options,
session_name.c_str ()); session_name.c_str ());
int rc = pgm_receiver->init (addr_args.c_str ()); int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ());
if (rc != 0) { if (rc != 0) {
delete pgm_receiver; delete pgm_receiver;
return -1; return -1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment