Commit 85cbd7f8 authored by malosek's avatar malosek

added PGM bus functionality

parent 3bd8f83f
...@@ -6,6 +6,7 @@ SUBDIRS = src $(DIR_PERF) devices bindings ...@@ -6,6 +6,7 @@ SUBDIRS = src $(DIR_PERF) devices bindings
DIST_SUBDIRS = src perf devices bindings DIST_SUBDIRS = src perf devices bindings
EXTRA_DIST = $(top_srcdir)/foreign/openpgm/@pgm_basename@.tar.bz2 \ EXTRA_DIST = $(top_srcdir)/foreign/openpgm/@pgm_basename@.tar.bz2 \
$(top_srcdir)/foreign/openpgm/lost_data_tsi.patch \
$(top_srcdir)/foreign/xmlParser/xmlParser.cpp \ $(top_srcdir)/foreign/xmlParser/xmlParser.cpp \
$(top_srcdir)/foreign/xmlParser/xmlParser.hpp $(top_srcdir)/foreign/xmlParser/xmlParser.hpp
......
...@@ -388,11 +388,21 @@ if test "x$with_pgm_ext" != "xno"; then ...@@ -388,11 +388,21 @@ if test "x$with_pgm_ext" != "xno"; then
;; ;;
esac esac
AC_CHECK_HEADERS(openssl/md5.h, [] ,
[AC_MSG_ERROR([To run configure with --with-pgm option, openssl/md5.h has to be usable.])])
AC_CHECK_LIB(ssl, MD5_Init, , [AC_MSG_ERROR([Could not link with libuuid, install develop version.])])
AC_CHECK_PROG(have_tar, tar, yes, no) AC_CHECK_PROG(have_tar, tar, yes, no)
if test "x$have_tar" != "xyes"; then if test "x$have_tar" != "xyes"; then
AC_MSG_ERROR([Could not find tar.]) AC_MSG_ERROR([Could not find tar.])
fi fi
AC_CHECK_PROG(have_patch, patch, yes, no)
if test "x$have_patch" != "xyes"; then
AC_MSG_ERROR([Could not find patch.])
fi
AC_CHECK_PROG(have_bunzip2, bunzip2, yes, no) AC_CHECK_PROG(have_bunzip2, bunzip2, yes, no)
if test "x$have_bunzip2" != "xyes"; then if test "x$have_bunzip2" != "xyes"; then
AC_MSG_ERROR([Could not find bunzip2.]) AC_MSG_ERROR([Could not find bunzip2.])
...@@ -416,7 +426,15 @@ if test "x$with_pgm_ext" != "xno"; then ...@@ -416,7 +426,15 @@ if test "x$with_pgm_ext" != "xno"; then
if tar -xjf foreign/openpgm/${pgm_basename}.tar.bz2 -C foreign/openpgm/; then if tar -xjf foreign/openpgm/${pgm_basename}.tar.bz2 -C foreign/openpgm/; then
AC_MSG_RESULT([yes]) AC_MSG_RESULT([yes])
else else
AC_MSG_ERROR([Could not unpack foreign/openpgm/${pgm_basename}.tar.bz2 file]) AC_MSG_ERROR([Could not unpack foreign/openpgm/${pgm_basename}.tar.bz2 file.])
fi
AC_MSG_CHECKING([Patching ${pgm_basename}])
if patch --silent -p0 < foreign/openpgm/lost_data_tsi.patch; then
AC_MSG_RESULT([yes])
else
AC_MSG_ERROR([Could not apply foreign/openpgm/lost_data_tsi.patch file.])
fi fi
# Generate galois_tables.c # Generate galois_tables.c
......
--- libpgm-1.2.14/openpgm/pgm/transport.c 2009-08-27 04:54:04.000000000 +0200
+++ foreign/openpgm/libpgm-1.2.14/openpgm/pgm/transport.c 2009-09-22 14:36:07.713124619 +0200
@@ -2342,6 +2342,7 @@
if (waiting_rxw->ack_cumulative_losses != waiting_rxw->cumulative_losses)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), waiting_rxw->identifier, sizeof (pgm_tsi_t));
waiting_rxw->pgm_sock_err.lost_count = waiting_rxw->cumulative_losses - waiting_rxw->ack_cumulative_losses;
waiting_rxw->ack_cumulative_losses = waiting_rxw->cumulative_losses;
}
@@ -2705,6 +2706,7 @@
if (waiting_rxw->ack_cumulative_losses != waiting_rxw->cumulative_losses)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), waiting_rxw->identifier, sizeof (pgm_tsi_t));
waiting_rxw->pgm_sock_err.lost_count = waiting_rxw->cumulative_losses - waiting_rxw->ack_cumulative_losses;
waiting_rxw->ack_cumulative_losses = waiting_rxw->cumulative_losses;
}
@@ -3407,6 +3409,7 @@
!sender_rxw->waiting_link.data)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), sender_rxw->identifier, sizeof (pgm_tsi_t));
sender_rxw->pgm_sock_err.lost_count = sender_rxw->cumulative_losses - sender_rxw->ack_cumulative_losses;
sender_rxw->ack_cumulative_losses = sender_rxw->cumulative_losses;
@@ -3823,6 +3826,7 @@
!peer_rxw->waiting_link.data)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), peer_rxw->identifier, sizeof (pgm_tsi_t));
peer_rxw->pgm_sock_err.lost_count = peer_rxw->cumulative_losses - peer_rxw->ack_cumulative_losses;
peer_rxw->ack_cumulative_losses = peer_rxw->cumulative_losses;
@@ -3952,6 +3956,7 @@
!peer_rxw->waiting_link.data)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), peer_rxw->identifier, sizeof (pgm_tsi_t));
peer_rxw->pgm_sock_err.lost_count = peer_rxw->cumulative_losses - peer_rxw->ack_cumulative_losses;
peer_rxw->ack_cumulative_losses = peer_rxw->cumulative_losses;
@@ -4849,6 +4854,7 @@
!rxw->waiting_link.data)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), rxw->identifier, sizeof (pgm_tsi_t));
rxw->pgm_sock_err.lost_count = rxw->cumulative_losses - rxw->ack_cumulative_losses;
rxw->ack_cumulative_losses = rxw->cumulative_losses;
@@ -5166,6 +5172,7 @@
!rxw->waiting_link.data)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), rxw->identifier, sizeof (pgm_tsi_t));
rxw->pgm_sock_err.lost_count = rxw->cumulative_losses - rxw->ack_cumulative_losses;
rxw->ack_cumulative_losses = rxw->cumulative_losses;
@@ -5303,6 +5310,7 @@
!rxw->waiting_link.data)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), rxw->identifier, sizeof (pgm_tsi_t));
rxw->pgm_sock_err.lost_count = rxw->cumulative_losses - rxw->ack_cumulative_losses;
rxw->ack_cumulative_losses = rxw->cumulative_losses;
--- libpgm-1.2.14/openpgm/pgm/include/pgm/transport.h 2009-08-27 04:53:23.000000000 +0200
+++ foreign/openpgm/libpgm-1.2.14/openpgm/pgm/include/pgm/transport.h 2009-09-21 15:49:36.000000000 +0200
@@ -205,6 +205,7 @@
gboolean is_bound;
gboolean is_open;
gboolean has_lost_data;
+ pgm_tsi_t lost_data_tsi;
gboolean will_close_on_failure;
gboolean can_send_data; /* and SPMs */
...@@ -46,26 +46,21 @@ ...@@ -46,26 +46,21 @@
zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
const options_t &options_, const char *session_name_) : const options_t &options_, const char *session_name_) :
io_object_t (parent_), io_object_t (parent_),
decoder (NULL),
pgm_socket (true, options_), pgm_socket (true, options_),
options (options_), options (options_),
session_name (session_name_), session_name (session_name_),
joined (false),
inout (NULL) inout (NULL)
{ {
} }
zmq::pgm_receiver_t::~pgm_receiver_t () zmq::pgm_receiver_t::~pgm_receiver_t ()
{ {
if (decoder) // Destructor should not be called before unplug.
delete decoder; zmq_assert (peers.empty ());
} }
int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_) int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
{ {
decoder = new zmq_decoder_t;
zmq_assert (decoder);
return pgm_socket.init (udp_encapsulation_, network_); return pgm_socket.init (udp_encapsulation_, network_);
} }
...@@ -75,8 +70,6 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_) ...@@ -75,8 +70,6 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_)
int socket_fd; int socket_fd;
int waiting_pipe_fd; int waiting_pipe_fd;
decoder->set_inout (inout_);
// Fill socket_fd and waiting_pipe_fd from PGM transport // Fill socket_fd and waiting_pipe_fd from PGM transport
pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd); pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd);
...@@ -95,9 +88,16 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_) ...@@ -95,9 +88,16 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_)
void zmq::pgm_receiver_t::unplug () void zmq::pgm_receiver_t::unplug ()
{ {
// Delete decoders.
for (peer_t::iterator it = peers.begin (); it != peers.end (); it++) {
if (it->second.decoder != NULL)
delete it->second.decoder;
}
peers.clear ();
rm_fd (socket_handle); rm_fd (socket_handle);
rm_fd (pipe_handle); rm_fd (pipe_handle);
decoder->set_inout (NULL);
inout = NULL; inout = NULL;
} }
...@@ -106,102 +106,108 @@ void zmq::pgm_receiver_t::revive () ...@@ -106,102 +106,108 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert (false); zmq_assert (false);
} }
void zmq::pgm_receiver_t::reconnect ()
{
// Save inout ptr.
i_inout *inout_tmp = inout;
// PGM receiver is not joined anymore.
joined = false;
// Unplug - plug PGM transport.
unplug ();
delete decoder;
decoder = new zmq_decoder_t;
zmq_assert (decoder);
plug (inout_tmp);
}
// POLLIN event from socket or waiting_pipe. // POLLIN event from socket or waiting_pipe.
void zmq::pgm_receiver_t::in_event () void zmq::pgm_receiver_t::in_event ()
{ {
void *data_with_offset; // Iterator to peers map.
peer_t::iterator it;
// Data from PGM socket.
unsigned char *raw_data = NULL;
const pgm_tsi_t *tsi = NULL;
ssize_t nbytes = 0; ssize_t nbytes = 0;
// Read all data from pgm socket. do {
while ((nbytes = receive_with_offset (&data_with_offset)) > 0) {
// Push all the data to the decoder.
decoder->write ((unsigned char*)data_with_offset, nbytes);
}
// Flush any messages decoder may have produced to the dispatcher. // Read data from underlying pgm_socket.
inout->flush (); nbytes = pgm_socket.receive ((void**) &raw_data, &tsi);
// Data loss detected. // No ODATA or RDATA.
if (nbytes == -1) { if (!nbytes)
break;
// Recreate PGM transport. // Fid TSI in peers list.
reconnect (); it = peers.find (*tsi);
}
}
void zmq::pgm_receiver_t::out_event () // Data loss.
{ if (nbytes == -1) {
zmq_assert (false);
}
ssize_t zmq::pgm_receiver_t::receive_with_offset zmq_assert (it != peers.end ());
(void **data_)
{
// Data from PGM socket. // Delete decoder and set joined to false.
void *rd = NULL; it->second.joined = false;
unsigned char *raw_data = NULL;
if (it->second.decoder != NULL) {
delete it->second.decoder;
it->second.decoder = NULL;
}
// Read data from underlying pgm_socket. break;
ssize_t nbytes = pgm_socket.receive ((void**) &rd); }
raw_data = (unsigned char*) rd;
// No ODATA or RDATA. // Read offset of the fist message in current APDU.
if (!nbytes) zmq_assert ((size_t) nbytes >= sizeof (uint16_t));
return 0; uint16_t apdu_offset = get_uint16 (raw_data);
// Data loss. // Shift raw_data & decrease nbytes by the first message offset
if (nbytes == -1) { // information (sizeof uint16_t).
return -1; raw_data += sizeof (uint16_t);
} nbytes -= sizeof (uint16_t);
zmq_assert (apdu_offset <= nbytes);
// Read offset of the fist message in current APDU. // New peer.
uint16_t apdu_offset = get_uint16 (raw_data); if (it == peers.end ()) {
// Shift raw_data & decrease nbytes by the first message offset peer_info_t peer_info = {false, NULL};
// information (sizeof uint16_t). it = peers.insert (std::make_pair (*tsi, peer_info)).first;
*data_ = raw_data + sizeof (uint16_t);
nbytes -= sizeof (uint16_t);
// There is not beginning of the message in current APDU and we zmq_log (1, "New peer TSI: %s, %s(%i).\n", pgm_print_tsi (tsi),
// are not joined jet -> throwing data. __FILE__, __LINE__);
if (apdu_offset == 0xFFFF && !joined) { }
*data_ = NULL;
return 0;
}
// Now is the possibility to join the stream. // There is not beginning of the message in current APDU and we
if (!joined) { // are not joined jet -> throwing data.
if (apdu_offset == 0xFFFF && !it->second.joined) {
// We have to move data to the begining of the first message. break;
*data_ = (unsigned char *)*data_ + apdu_offset; }
nbytes -= apdu_offset;
// Joined the stream. // Now is the possibility to join the stream.
joined = true; if (!it->second.joined) {
zmq_assert (it->second.decoder == NULL);
zmq_log (2, "joined into the stream, %s(%i)\n", // We have to move data to the begining of the first message.
__FILE__, __LINE__); raw_data += apdu_offset;
} nbytes -= apdu_offset;
// Joined the stream.
it->second.joined = true;
// Create and connect decoder for joined peer.
it->second.decoder = new zmq_decoder_t;
it->second.decoder->set_inout (inout);
zmq_log (1, "Peer %s joined into the stream, %s(%i)\n",
pgm_print_tsi (tsi), __FILE__, __LINE__);
}
if (nbytes > 0) {
// Push all the data to the decoder.
it->second.decoder->write (raw_data, nbytes);
}
} while (nbytes > 0);
// Flush any messages decoder may have produced to the dispatcher.
inout->flush ();
}
return nbytes; void zmq::pgm_receiver_t::out_event ()
{
zmq_assert (false);
} }
#endif #endif
...@@ -30,6 +30,8 @@ ...@@ -30,6 +30,8 @@
#include "zmq_decoder.hpp" #include "zmq_decoder.hpp"
#include "pgm_socket.hpp" #include "pgm_socket.hpp"
#include <map>
namespace zmq namespace zmq
{ {
...@@ -45,7 +47,6 @@ namespace zmq ...@@ -45,7 +47,6 @@ namespace zmq
~pgm_receiver_t (); ~pgm_receiver_t ();
int init (bool udp_encapsulation_, const char *network_); int init (bool udp_encapsulation_, const char *network_);
void reconnect ();
// i_engine interface implementation. // i_engine interface implementation.
void plug (struct i_inout *inout_); void plug (struct i_inout *inout_);
...@@ -57,15 +58,28 @@ namespace zmq ...@@ -57,15 +58,28 @@ namespace zmq
void out_event (); void out_event ();
private: private:
// Read exactly iov_len_ count APDUs, function returns number
// of bytes received. Note that if we did not join message stream // Map to hold TSI, joined and decoder for each peer.
// before and there is not message beginning in the APDUs being struct peer_info_t {
// received iov_len for such a APDUs will be 0. bool joined;
ssize_t receive_with_offset (void **data_); zmq_decoder_t *decoder;
};
// Message decoder.
zmq_decoder_t *decoder; struct tsi_comp {
bool operator () (const pgm_tsi_t &ltsi, const pgm_tsi_t &rtsi) const
{
if (ltsi.sport < rtsi.sport)
return true;
return (std::lexicographical_compare (ltsi.gsi.identifier,
ltsi.gsi.identifier + 6,
rtsi.gsi.identifier, rtsi.gsi.identifier + 6));
}
};
typedef std::map <pgm_tsi_t, peer_info_t, tsi_comp> peer_t;
peer_t peers;
// PGM socket. // PGM socket.
pgm_socket_t pgm_socket; pgm_socket_t pgm_socket;
...@@ -75,9 +89,6 @@ namespace zmq ...@@ -75,9 +89,6 @@ namespace zmq
// Name of the session associated with the connecter. // Name of the session associated with the connecter.
std::string session_name; std::string session_name;
// If receiver joined the messages stream.
bool joined;
// Parent session. // Parent session.
i_inout *inout; i_inout *inout;
......
...@@ -23,10 +23,7 @@ ...@@ -23,10 +23,7 @@
#ifdef ZMQ_HAVE_LINUX #ifdef ZMQ_HAVE_LINUX
#include <pgm/pgm.h> #include <pgm/pgm.h>
#else #include <openssl/md5.h>
#include <Winsock2.h>
#include <Wsrm.h>
#include <ws2spi.h>
#endif #endif
#include <string> #include <string>
...@@ -36,6 +33,7 @@ ...@@ -36,6 +33,7 @@
#include "pgm_socket.hpp" #include "pgm_socket.hpp"
#include "config.hpp" #include "config.hpp"
#include "err.hpp" #include "err.hpp"
#include "uuid.hpp"
//#define PGM_SOCKET_DEBUG //#define PGM_SOCKET_DEBUG
//#define PGM_SOCKET_DEBUG_LEVEL 1 //#define PGM_SOCKET_DEBUG_LEVEL 1
...@@ -68,6 +66,21 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : ...@@ -68,6 +66,21 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :
} }
int zmq::pgm_socket_t::pgm_create_custom_gsi (const char *data_, pgm_gsi_t *gsi_)
{
unsigned char result_md5 [16];
MD5_CTX ctx;
MD5_Init (&ctx);
MD5_Update (&ctx, data_, strlen (data_));
MD5_Final (result_md5, &ctx);
memcpy (gsi_, result_md5 + 10, 6);
return 0;
}
int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
{ {
udp_encapsulation = udp_encapsulation_; udp_encapsulation = udp_encapsulation_;
...@@ -118,10 +131,6 @@ int zmq::pgm_socket_t::open_transport (void) ...@@ -118,10 +131,6 @@ int zmq::pgm_socket_t::open_transport (void)
// Can not open transport before destroying old one. // Can not open transport before destroying old one.
zmq_assert (g_transport == NULL); zmq_assert (g_transport == NULL);
// Set actual_tsi and prev_tsi to zeros.
memset (&tsi, '\0', sizeof (pgm_tsi_t));
memset (&retired_tsi, '\0', sizeof (pgm_tsi_t));
// Zero counter used in msgrecv. // Zero counter used in msgrecv.
nbytes_rec = 0; nbytes_rec = 0;
nbytes_processed = 0; nbytes_processed = 0;
...@@ -146,12 +155,25 @@ int zmq::pgm_socket_t::open_transport (void) ...@@ -146,12 +155,25 @@ int zmq::pgm_socket_t::open_transport (void)
struct group_source_req recv_gsr, send_gsr; struct group_source_req recv_gsr, send_gsr;
size_t recv_gsr_len = 1; size_t recv_gsr_len = 1;
rc = pgm_create_md5_gsi (&gsi); if (options.identity.size () > 0) {
// Create gsi from identity string.
rc = pgm_create_custom_gsi (options.identity.c_str (), &gsi);
} else {
// Generate random gsi.
rc = pgm_create_custom_gsi (uuid_t ().to_string (), &gsi);
}
if (rc != 0) { if (rc != 0) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
zmq_log (1, "Transport GSI: %s, %s(%i)\n", pgm_print_gsi (&gsi),
__FILE__, __LINE__);
// On success, 0 is returned. On invalid arguments, -EINVAL is returned. // On success, 0 is returned. On invalid arguments, -EINVAL is returned.
// If more multicast groups are found than the recv_len parameter, // If more multicast groups are found than the recv_len parameter,
// -ENOMEM is returned. // -ENOMEM is returned.
...@@ -204,14 +226,6 @@ int zmq::pgm_socket_t::open_transport (void) ...@@ -204,14 +226,6 @@ int zmq::pgm_socket_t::open_transport (void)
// Receiver transport. // Receiver transport.
if (receiver) { if (receiver) {
// Set transport->may_close_on_failure to true,
// after data los recvmsgv returns -1 errno set to ECONNRESET.
rc = pgm_transport_set_close_on_failure (g_transport, TRUE);
if (rc != 0) {
errno = EINVAL;
return -1;
}
// Set transport->can_send_data = FALSE. // Set transport->can_send_data = FALSE.
// Note that NAKs are still generated by the transport. // Note that NAKs are still generated by the transport.
rc = pgm_transport_set_recv_only (g_transport, false); rc = pgm_transport_set_recv_only (g_transport, false);
...@@ -543,7 +557,7 @@ void zmq::pgm_socket_t::free_buffer (void *data_) ...@@ -543,7 +557,7 @@ void zmq::pgm_socket_t::free_buffer (void *data_)
// pgm_transport_recvmsgv is called to fill the pgm_msgv array up to // pgm_transport_recvmsgv is called to fill the pgm_msgv array up to
// pgm_msgv_len. In subsequent calls data from pgm_msgv structure are // pgm_msgv_len. In subsequent calls data from pgm_msgv structure are
// returned. // returned.
ssize_t zmq::pgm_socket_t::receive (void **raw_data_) ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
{ {
// We just sent all data from pgm_transport_recvmsgv up // We just sent all data from pgm_transport_recvmsgv up
// and have to return 0 that another engine in this thread is scheduled. // and have to return 0 that another engine in this thread is scheduled.
...@@ -583,9 +597,13 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_) ...@@ -583,9 +597,13 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
// For data loss nbytes_rec == -1 errno == ECONNRESET. // For data loss nbytes_rec == -1 errno == ECONNRESET.
if (nbytes_rec == -1 && errno == ECONNRESET) { if (nbytes_rec == -1 && errno == ECONNRESET) {
// Save lost data TSI.
*tsi_ = &(g_transport->lost_data_tsi);
// In case of dala loss -1 is returned. // In case of dala loss -1 is returned.
zmq_log (1, "Data loss detected, %s(%i)\n", __FILE__, __LINE__); zmq_log (1, "Data loss detected %s, %s(%i)\n",
pgm_print_tsi (&(g_transport->lost_data_tsi)), __FILE__, __LINE__);
nbytes_rec = 0; nbytes_rec = 0;
return -1; return -1;
} }
...@@ -610,65 +628,9 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_) ...@@ -610,65 +628,9 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
*raw_data_ = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_base; *raw_data_ = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_base;
size_t raw_data_len = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_len; size_t raw_data_len = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_len;
// Check if peer TSI did not change, this is detection of peer restart. // Save current TSI.
const pgm_tsi_t *current_tsi = pgm_msgv [pgm_msgv_processed].msgv_tsi; *tsi_ = pgm_msgv [pgm_msgv_processed].msgv_tsi;
// If empty store new TSI.
if (tsi_empty (&tsi)) {
// Store current peer TSI.
memcpy (&tsi, current_tsi, sizeof (pgm_tsi_t));
#ifdef PGM_SOCKET_DEBUG
uint8_t *gsi = (uint8_t*)(&tsi)->gsi.identifier;
#endif
zmq_log (1, "First peer TSI: %i.%i.%i.%i.%i.%i.%i, %s(%i)\n",
gsi [0], gsi [1], gsi [2], gsi [3], gsi [4], gsi [5],
ntohs (tsi.sport), __FILE__, __LINE__);
}
// Compare stored TSI with actual.
if (!tsi_equal (&tsi, current_tsi)) {
// Peer change detected.
zmq_log (1, "Peer change detected, %s(%i)\n", __FILE__, __LINE__);
// Compare with retired TSI, in case of match ignore APDU.
if (tsi_equal (&retired_tsi, current_tsi)) {
zmq_log (1, "Retired TSI - ignoring APDU, %s(%i)\n",
__FILE__, __LINE__);
// Move the the next pgm_msgv_t structure.
pgm_msgv_processed++;
nbytes_processed +=raw_data_len;
return 0;
} else {
zmq_log (1, "New TSI, %s(%i)\n", __FILE__, __LINE__);
// Store new TSI and move last valid to retired_tsi
memcpy (&retired_tsi, &tsi, sizeof (pgm_tsi_t));
memcpy (&tsi, current_tsi, sizeof (pgm_tsi_t));
#ifdef PGM_SOCKET_DEBUG
uint8_t *gsi = (uint8_t*)(&retired_tsi)->gsi.identifier;
#endif
zmq_log (1, "retired TSI: %i.%i.%i.%i.%i.%i.%i, %s(%i)\n",
gsi [0], gsi [1], gsi [2], gsi [3], gsi [4], gsi [5],
ntohs (retired_tsi.sport), __FILE__, __LINE__);
#ifdef PGM_SOCKET_DEBUG
gsi = (uint8_t*)(&tsi)->gsi.identifier;
#endif
zmq_log (1, " TSI: %i.%i.%i.%i.%i.%i.%i, %s(%i)\n",
gsi [0], gsi [1], gsi [2], gsi [3], gsi [4], gsi [5],
ntohs (tsi.sport), __FILE__, __LINE__);
// Peers change is recognized as a GAP.
return -1;
}
}
// Move the the next pgm_msgv_t structure. // Move the the next pgm_msgv_t structure.
pgm_msgv_processed++; pgm_msgv_processed++;
nbytes_processed +=raw_data_len; nbytes_processed +=raw_data_len;
...@@ -692,47 +654,6 @@ void zmq::pgm_socket_t::process_upstream (void) ...@@ -692,47 +654,6 @@ void zmq::pgm_socket_t::process_upstream (void)
zmq_assert (dummy_bytes == -1 && errno == EAGAIN); zmq_assert (dummy_bytes == -1 && errno == EAGAIN);
} }
bool zmq::pgm_socket_t::tsi_equal (const pgm_tsi_t *tsi_a_,
const pgm_tsi_t *tsi_b_)
{
// Compare 6B GSI.
const uint8_t *gsi_a = tsi_a_->gsi.identifier;
const uint8_t *gsi_b = tsi_b_->gsi.identifier;
if (gsi_a [0] != gsi_b [0] || gsi_a [1] != gsi_b [1] ||
gsi_a [2] != gsi_b [2] || gsi_a [3] != gsi_b [3] ||
gsi_a [4] != gsi_b [4] || gsi_a [5] != gsi_b [5]) {
return false;
}
// Compare source port.
if (tsi_a_->sport != tsi_b_->sport) {
return false;
}
return true;
}
bool zmq::pgm_socket_t::tsi_empty (const pgm_tsi_t *tsi_)
{
uint8_t *gsi = (uint8_t*)tsi_->gsi.identifier;
// GSI.
if (gsi [0] != 0 || gsi [1] != 0 || gsi [2] != 0 ||
gsi [3] != 0 || gsi [4] != 0 || gsi [5] != 0) {
return false;
}
// Source port.
if (tsi_->sport != 0) {
return false;
}
return true;
}
#endif #endif
#endif #endif
...@@ -77,7 +77,7 @@ namespace zmq ...@@ -77,7 +77,7 @@ namespace zmq
void free_buffer (void *data_); void free_buffer (void *data_);
// Receive data from pgm socket. // Receive data from pgm socket.
ssize_t receive (void **data_); ssize_t receive (void **data_, const pgm_tsi_t **tsi_);
// POLLIN on sender side should mean NAK or SPMR receiving. // POLLIN on sender side should mean NAK or SPMR receiving.
// process_upstream function is used to handle such a situation. // process_upstream function is used to handle such a situation.
...@@ -90,21 +90,18 @@ namespace zmq ...@@ -90,21 +90,18 @@ namespace zmq
private: private:
// Associated socket options.
options_t options;
// Returns max tsdu size without fragmentation. // Returns max tsdu size without fragmentation.
size_t get_max_tsdu_size (void); size_t get_max_tsdu_size (void);
// Returns maximum count of apdus which fills readbuf_size_ // Returns maximum count of apdus which fills readbuf_size_
size_t get_max_apdu_at_once (size_t readbuf_size_); size_t get_max_apdu_at_once (size_t readbuf_size_);
// Return true if TSI has empty GSI ('\0') and sport 0. // Compute gsi from string.
bool tsi_empty (const pgm_tsi_t *tsi_); int pgm_create_custom_gsi (const char *data_, pgm_gsi_t *gsi_);
// Compare TSIs, return true if equal. // Associated socket options.
bool tsi_equal (const pgm_tsi_t *tsi_a_, const pgm_tsi_t *tsi_b_); options_t options;
// true when pgm_socket should create receiving side. // true when pgm_socket should create receiving side.
bool receiver; bool receiver;
...@@ -140,10 +137,10 @@ namespace zmq ...@@ -140,10 +137,10 @@ namespace zmq
enum {pgm_receiver_fd_count = 2}; enum {pgm_receiver_fd_count = 2};
// TSI of the actual peer. // TSI of the actual peer.
pgm_tsi_t tsi; // pgm_tsi_t tsi;
// Previous peer TSI. // Previous peer TSI.
pgm_tsi_t retired_tsi; // pgm_tsi_t retired_tsi;
#endif #endif
}; };
......
...@@ -36,6 +36,9 @@ ...@@ -36,6 +36,9 @@
/* Define to 1 if you have the `socket' library (-lsocket). */ /* Define to 1 if you have the `socket' library (-lsocket). */
#undef HAVE_LIBSOCKET #undef HAVE_LIBSOCKET
/* Define to 1 if you have the `ssl' library (-lssl). */
#undef HAVE_LIBSSL
/* Define to 1 if you have the `stdc++' library (-lstdc++). */ /* Define to 1 if you have the `stdc++' library (-lstdc++). */
#undef HAVE_LIBSTDC__ #undef HAVE_LIBSTDC__
...@@ -61,6 +64,9 @@ ...@@ -61,6 +64,9 @@
/* Define to 1 if you have the <netinet/tcp.h> header file. */ /* Define to 1 if you have the <netinet/tcp.h> header file. */
#undef HAVE_NETINET_TCP_H #undef HAVE_NETINET_TCP_H
/* Define to 1 if you have the <openssl/md5.h> header file. */
#undef HAVE_OPENSSL_MD5_H
/* Define to 1 if you have the `perror' function. */ /* Define to 1 if you have the `perror' function. */
#undef HAVE_PERROR #undef HAVE_PERROR
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment