Commit 72c5c5ff authored by malosek's avatar malosek

--with-pgm2 compilable

parent 33afdcd1
...@@ -129,11 +129,6 @@ case "${host_os}" in ...@@ -129,11 +129,6 @@ case "${host_os}" in
;; ;;
esac esac
# If not on QNX nor OSX add -pedantic into LIBZMQ_EXTRA_CXXFLAGS.
if test "x$pedantic" = "xyes"; then
LIBZMQ_EXTRA_CXXFLAGS="${LIBZMQ_EXTRA_CXXFLAGS} -pedantic"
fi
# Check if we are running at sparc harware # Check if we are running at sparc harware
AC_MSG_CHECKING([wheter __sparc__ is defined]) AC_MSG_CHECKING([wheter __sparc__ is defined])
AC_COMPILE_IFELSE([AC_LANG_PROGRAM( AC_COMPILE_IFELSE([AC_LANG_PROGRAM(
...@@ -522,6 +517,11 @@ fi ...@@ -522,6 +517,11 @@ fi
AC_SUBST(pgm_basename) AC_SUBST(pgm_basename)
# If not on QNX nor OSX nor PGM add -pedantic into LIBZMQ_EXTRA_CXXFLAGS.
if test "x$pedantic" = "xyes" -a "x$pgm1_ext" = "xno" -a "x$pgm2_ext" = "xno"; then
LIBZMQ_EXTRA_CXXFLAGS="${LIBZMQ_EXTRA_CXXFLAGS} -pedantic"
fi
# If not on QNX nor --with-pgm/2add -Werror into LIBZMQ_EXTRA_CXXFLAGS. # If not on QNX nor --with-pgm/2add -Werror into LIBZMQ_EXTRA_CXXFLAGS.
if test "x$werror" = "xyes" -a "x$pgm1_ext" = "xno" -a "x$pgm2_ext" = "xno"; then if test "x$werror" = "xyes" -a "x$pgm1_ext" = "xno" -a "x$pgm2_ext" = "xno"; then
LIBZMQ_EXTRA_CXXFLAGS="${LIBZMQ_EXTRA_CXXFLAGS} -Werror" LIBZMQ_EXTRA_CXXFLAGS="${LIBZMQ_EXTRA_CXXFLAGS} -Werror"
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include "platform.hpp" #include "platform.hpp"
#if defined ZMQ_HAVE_OPENPGM1 #if defined ZMQ_HAVE_OPENPGM
#include <iostream> #include <iostream>
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#include "platform.hpp" #include "platform.hpp"
#if defined ZMQ_HAVE_OPENPGM1 #if defined ZMQ_HAVE_OPENPGM
#include "io_object.hpp" #include "io_object.hpp"
#include "i_engine.hpp" #include "i_engine.hpp"
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include "platform.hpp" #include "platform.hpp"
#if defined ZMQ_HAVE_OPENPGM1 #if defined ZMQ_HAVE_OPENPGM
#include <iostream> #include <iostream>
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#include "platform.hpp" #include "platform.hpp"
#if defined ZMQ_HAVE_OPENPGM1 #if defined ZMQ_HAVE_OPENPGM
#include "stdint.hpp" #include "stdint.hpp"
#include "io_object.hpp" #include "io_object.hpp"
......
...@@ -19,9 +19,12 @@ ...@@ -19,9 +19,12 @@
#include "platform.hpp" #include "platform.hpp"
#if defined ZMQ_HAVE_OPENPGM1 #ifdef ZMQ_HAVE_OPENPGM
#ifdef ZMQ_HAVE_LINUX #ifdef ZMQ_HAVE_LINUX
// TODO: add this into platform.hpp?
#define CONFIG_HAVE_POLL
#include <pgm/pgm.h> #include <pgm/pgm.h>
#include <openssl/md5.h> #include <openssl/md5.h>
#endif #endif
...@@ -151,10 +154,6 @@ int zmq::pgm_socket_t::open_transport (void) ...@@ -151,10 +154,6 @@ int zmq::pgm_socket_t::open_transport (void)
// PGM transport GSI. // PGM transport GSI.
pgm_gsi_t gsi; pgm_gsi_t gsi;
// PGM transport GSRs.
struct group_source_req recv_gsr, send_gsr;
size_t recv_gsr_len = 1;
if (options.identity.size () > 0) { if (options.identity.size () > 0) {
// Create gsi from identity string. // Create gsi from identity string.
...@@ -174,6 +173,11 @@ int zmq::pgm_socket_t::open_transport (void) ...@@ -174,6 +173,11 @@ int zmq::pgm_socket_t::open_transport (void)
zmq_log (1, "Transport GSI: %s, %s(%i)\n", pgm_print_gsi (&gsi), zmq_log (1, "Transport GSI: %s, %s(%i)\n", pgm_print_gsi (&gsi),
__FILE__, __LINE__); __FILE__, __LINE__);
#ifdef ZMQ_HAVE_OPENPGM1
// PGM transport GSRs.
struct group_source_req recv_gsr, send_gsr;
size_t recv_gsr_len = 1;
// On success, 0 is returned. On invalid arguments, -EINVAL is returned. // On success, 0 is returned. On invalid arguments, -EINVAL is returned.
// If more multicast groups are found than the recv_len parameter, // If more multicast groups are found than the recv_len parameter,
// -ENOMEM is returned. // -ENOMEM is returned.
...@@ -188,24 +192,54 @@ int zmq::pgm_socket_t::open_transport (void) ...@@ -188,24 +192,54 @@ int zmq::pgm_socket_t::open_transport (void)
errno = ENOMEM; errno = ENOMEM;
return -1; return -1;
} }
#endif
// If we are using UDP encapsulation update send_gsr & recv_gsr #ifdef ZMQ_HAVE_OPENPGM2
// structures. Note that send_gsr & recv_gsr has to be updated after struct pgm_transport_info_t* res = NULL;
// pgm_if_parse_transport call.
if (udp_encapsulation) { if (!pgm_if_get_transport_info (network, NULL, &res, NULL)) {
errno = EINVAL;
return -1;
}
res->ti_gsi = gsi;
#endif
// If we are using UDP encapsulation update gsr or res.
if (udp_encapsulation) {
#ifdef ZMQ_HAVE_OPENPGM1
// Use the same port for UDP encapsulation. // Use the same port for UDP encapsulation.
((struct sockaddr_in*)&send_gsr.gsr_group)->sin_port = ((struct sockaddr_in*)&send_gsr.gsr_group)->sin_port =
g_htons (port_number); g_htons (port_number);
((struct sockaddr_in*)&recv_gsr.gsr_group)->sin_port = ((struct sockaddr_in*)&recv_gsr.gsr_group)->sin_port =
g_htons (port_number); g_htons (port_number);
#endif
#ifdef ZMQ_HAVE_OPENPGM2
res->ti_udp_encap_ucast_port = port_number;
res->ti_udp_encap_mcast_port = port_number;
#endif
} }
#ifdef ZMQ_HAVE_OPENPGM1
rc = pgm_transport_create (&g_transport, &gsi, 0, port_number, &recv_gsr, rc = pgm_transport_create (&g_transport, &gsi, 0, port_number, &recv_gsr,
1, &send_gsr); 1, &send_gsr);
if (rc != 0) { if (rc != 0) {
return -1; return -1;
} }
#endif
#ifdef ZMQ_HAVE_OPENPGM2
if (!pgm_transport_create (&g_transport, res, NULL)) {
pgm_if_free_transport_info (res);
// TODO: tranlate errors from glib into errnos.
errno = EINVAL;
return -1;
}
pgm_if_free_transport_info (res);
#endif
// Common parameters for receiver and sender. // Common parameters for receiver and sender.
...@@ -347,6 +381,7 @@ int zmq::pgm_socket_t::open_transport (void) ...@@ -347,6 +381,7 @@ int zmq::pgm_socket_t::open_transport (void)
return -1; return -1;
} }
#ifdef ZMQ_HAVE_OPENPGM1
// Preallocate full transmit window. For simplification always // Preallocate full transmit window. For simplification always
// worst case is used (40 bytes ipv6 header and 20 bytes UDP // worst case is used (40 bytes ipv6 header and 20 bytes UDP
// encapsulation). // encapsulation).
...@@ -361,6 +396,7 @@ int zmq::pgm_socket_t::open_transport (void) ...@@ -361,6 +396,7 @@ int zmq::pgm_socket_t::open_transport (void)
zmq_log (2, "Preallocated %i slices in TX window. %s(%i)\n", zmq_log (2, "Preallocated %i slices in TX window. %s(%i)\n",
to_preallocate, __FILE__, __LINE__); to_preallocate, __FILE__, __LINE__);
#endif
// Set interval of background SPM packets [us]. // Set interval of background SPM packets [us].
rc = pgm_transport_set_ambient_spm (g_transport, 8192 * 1000); rc = pgm_transport_set_ambient_spm (g_transport, 8192 * 1000);
...@@ -392,10 +428,19 @@ int zmq::pgm_socket_t::open_transport (void) ...@@ -392,10 +428,19 @@ int zmq::pgm_socket_t::open_transport (void)
} }
// Bind a transport to the specified network devices. // Bind a transport to the specified network devices.
#ifdef ZMQ_HAVE_OPENPGM1
rc = pgm_transport_bind (g_transport); rc = pgm_transport_bind (g_transport);
if (rc != 0) { if (rc != 0) {
return -1; return -1;
} }
#endif
#ifdef ZMQ_HAVE_OPENPGM2
if (!pgm_transport_bind (g_transport, NULL)) {
// TODO: tranlate errors from glib into errnos.
return -1;
}
#endif
return 0; return 0;
} }
...@@ -484,9 +529,13 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_) ...@@ -484,9 +529,13 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_)
size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
{ {
ssize_t nbytes = 0;
#ifdef ZMQ_HAVE_OPENPGM1
iovec iov = {data_,data_len_}; iovec iov = {data_,data_len_};
ssize_t nbytes = pgm_transport_send_packetv (g_transport, &iov, 1, nbytes = pgm_transport_send_packetv (g_transport, &iov, 1,
MSG_DONTWAIT | MSG_WAITALL, true); MSG_DONTWAIT | MSG_WAITALL, true);
zmq_assert (nbytes != -EINVAL); zmq_assert (nbytes != -EINVAL);
...@@ -505,6 +554,7 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) ...@@ -505,6 +554,7 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
if (nbytes > 0) { if (nbytes > 0) {
zmq_assert (nbytes == (ssize_t)data_len_); zmq_assert (nbytes == (ssize_t)data_len_);
} }
#endif
return nbytes; return nbytes;
} }
...@@ -535,6 +585,7 @@ size_t zmq::pgm_socket_t::get_max_apdu_at_once (size_t readbuf_size_) ...@@ -535,6 +585,7 @@ size_t zmq::pgm_socket_t::get_max_apdu_at_once (size_t readbuf_size_)
return apdu_count; return apdu_count;
} }
#ifdef ZMQ_HAVE_OPENPGM1
// Allocate buffer for one packet from the transmit window, The memory buffer // Allocate buffer for one packet from the transmit window, The memory buffer
// is owned by the transmit window and so must be returned to the window with // is owned by the transmit window and so must be returned to the window with
// content via pgm_transport_send() calls or unused with pgm_packetv_free1(). // content via pgm_transport_send() calls or unused with pgm_packetv_free1().
...@@ -553,12 +604,17 @@ void zmq::pgm_socket_t::free_buffer (void *data_) ...@@ -553,12 +604,17 @@ void zmq::pgm_socket_t::free_buffer (void *data_)
{ {
pgm_packetv_free1 (g_transport, data_, false); pgm_packetv_free1 (g_transport, data_, false);
} }
#endif
// pgm_transport_recvmsgv is called to fill the pgm_msgv array up to // pgm_transport_recvmsgv is called to fill the pgm_msgv array up to
// pgm_msgv_len. In subsequent calls data from pgm_msgv structure are // pgm_msgv_len. In subsequent calls data from pgm_msgv structure are
// returned. // returned.
ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
{ {
size_t raw_data_len = 0;
#ifdef ZMQ_HAVE_OPENPGM1
// We just sent all data from pgm_transport_recvmsgv up // We just sent all data from pgm_transport_recvmsgv up
// and have to return 0 that another engine in this thread is scheduled. // and have to return 0 that another engine in this thread is scheduled.
if (nbytes_rec == nbytes_processed && nbytes_rec > 0) { if (nbytes_rec == nbytes_processed && nbytes_rec > 0) {
...@@ -626,7 +682,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) ...@@ -626,7 +682,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
// Take pointers from pgm_msgv_t structure. // Take pointers from pgm_msgv_t structure.
*raw_data_ = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_base; *raw_data_ = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_base;
size_t raw_data_len = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_len; raw_data_len = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_len;
// Save current TSI. // Save current TSI.
*tsi_ = pgm_msgv [pgm_msgv_processed].msgv_tsi; *tsi_ = pgm_msgv [pgm_msgv_processed].msgv_tsi;
...@@ -635,6 +691,8 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) ...@@ -635,6 +691,8 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
pgm_msgv_processed++; pgm_msgv_processed++;
nbytes_processed +=raw_data_len; nbytes_processed +=raw_data_len;
#endif
zmq_log (4, "sendig up %i bytes\n", (int)raw_data_len); zmq_log (4, "sendig up %i bytes\n", (int)raw_data_len);
return raw_data_len; return raw_data_len;
...@@ -643,12 +701,21 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) ...@@ -643,12 +701,21 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
void zmq::pgm_socket_t::process_upstream (void) void zmq::pgm_socket_t::process_upstream (void)
{ {
zmq_log (1, "On upstream packet, %s(%i)\n", __FILE__, __LINE__); zmq_log (1, "On upstream packet, %s(%i)\n", __FILE__, __LINE__);
ssize_t dummy_bytes = 0;
#ifdef ZMQ_HAVE_OPENPGM1
// We acctually do not want to read any data here we are going to // We acctually do not want to read any data here we are going to
// process NAK. // process NAK.
pgm_msgv_t dummy_msg; pgm_msgv_t dummy_msg;
ssize_t dummy_bytes = pgm_transport_recvmsgv (g_transport, &dummy_msg, dummy_bytes = pgm_transport_recvmsgv (g_transport, &dummy_msg,
1, MSG_DONTWAIT); 1, MSG_DONTWAIT);
#endif
#ifdef ZMQ_HAVE_OPENPGM2
zmq_assert (false);
#endif
// No data should be returned. // No data should be returned.
zmq_assert (dummy_bytes == -1 && errno == EAGAIN); zmq_assert (dummy_bytes == -1 && errno == EAGAIN);
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#include "platform.hpp" #include "platform.hpp"
#if defined ZMQ_HAVE_OPENPGM1 #if defined ZMQ_HAVE_OPENPGM
#ifdef ZMQ_HAVE_LINUX #ifdef ZMQ_HAVE_LINUX
#include <glib.h> #include <glib.h>
...@@ -135,13 +135,6 @@ namespace zmq ...@@ -135,13 +135,6 @@ namespace zmq
// Receiver transport uses 2 fd. // Receiver transport uses 2 fd.
enum {pgm_receiver_fd_count = 2}; enum {pgm_receiver_fd_count = 2};
// TSI of the actual peer.
// pgm_tsi_t tsi;
// Previous peer TSI.
// pgm_tsi_t retired_tsi;
#endif #endif
}; };
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment