Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
f4ac8d7a
Commit
f4ac8d7a
authored
Dec 13, 2009
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
OpenPGM v1 support removed
parent
73b765e4
Show whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
15 additions
and
524 deletions
+15
-524
Makefile.am
Makefile.am
+1
-3
configure.in
configure.in
+12
-122
libpgm-1.2.14.tar.bz2
foreign/openpgm/libpgm-1.2.14.tar.bz2
+0
-0
lost_data_tsi.patch
foreign/openpgm/lost_data_tsi.patch
+0
-76
Makefile.am
src/Makefile.am
+0
-59
pgm_receiver.cpp
src/pgm_receiver.cpp
+0
-10
pgm_sender.cpp
src/pgm_sender.cpp
+0
-18
pgm_sender.hpp
src/pgm_sender.hpp
+0
-2
pgm_socket.cpp
src/pgm_socket.cpp
+2
-218
pgm_socket.hpp
src/pgm_socket.hpp
+0
-16
No files found.
Makefile.am
View file @
f4ac8d7a
...
...
@@ -9,13 +9,11 @@ endif
SUBDIRS
=
src
$(DIR_MAN)
$(DIR_PERF)
devices bindings examples
DIST_SUBDIRS
=
src man perf devices bindings examples
EXTRA_DIST
=
$(top_srcdir)
/foreign/openpgm/@pgm1_basename@.tar.bz2
\
EXTRA_DIST
=
\
$(top_srcdir)
/foreign/openpgm/@pgm2_basename@.tar.bz2
\
$(top_srcdir)
/foreign/openpgm/lost_data_tsi.patch
\
$(top_srcdir)
/foreign/openpgm/create_custom_gsi_1.patch
\
$(top_srcdir)
/foreign/xmlParser/xmlParser.cpp
\
$(top_srcdir)
/foreign/xmlParser/xmlParser.hpp
dist-hook
:
-
rm
-rf
$(distdir)
/foreign/openpgm/@pgm1_basename@
-
rm
-rf
$(distdir)
/foreign/openpgm/@pgm2_basename@
configure.in
View file @
f4ac8d7a
...
...
@@ -407,33 +407,22 @@ if test "x$clzmq" = "xyes"; then
fi
# PGM extension
pgm1_ext="no"
pgm2_ext="no"
pgm1_basename="libpgm-1.2.14"
pgm2_basename="libpgm-2.0.17rc2"
AC_SUBST(pgm1_basename)
AC_SUBST(pgm2_basename)
pgm_basename=""
AC_ARG_WITH([pgm], [AS_HELP_STRING([--with-pgm],
[build libzmq with PGM v1 extension [default=no]])],
[with_pgm1_ext=yes], [with_pgm1_ext=no])
AC_ARG_WITH([pgm2], [AS_HELP_STRING([--with-pgm2],
[build libzmq with PGM v2 extension [default=no]])],
[build libzmq with PGM extension [default=no]])],
[with_pgm2_ext=yes], [with_pgm2_ext=no])
AC_ARG_WITH([pgm
2-examples], [AS_HELP_STRING([--with-pgm2
-examples],
[build PGM
v2
examples [default=no]])],
AC_ARG_WITH([pgm
-examples], [AS_HELP_STRING([--with-pgm
-examples],
[build PGM examples [default=no]])],
[with_pgm2_examples=yes], [with_pgm2_examples=no])
if test "x$with_pgm1_ext" != "xno" -a "x$with_pgm2_ext" != "xno"; then
AC_MSG_ERROR([Can not configure --with-pgm and --with-pgm2.]);
fi
if test "x$with_pgm2_ext" = "xno" -a "x$with_pgm2_examples" = "xyes"; then
AC_MSG_ERROR([Can not configure --with-pgm2-examples without --with-pgm2.]);
fi
...
...
@@ -442,98 +431,6 @@ if test "x$c" = "xno" -a "x$with_pgm2_examples" = "xyes"; then
AC_MSG_ERROR([Can not configure --with-pgm2-examples without --with-c.]);
fi
if test "x$with_pgm1_ext" != "xno"; then
pgm_basename=${pgm1_basename}
# Test if we have pkg-config
if test "x$have_pkg_config" != "xyes"; then
AC_MSG_ERROR([To run configure with --with-pgm option, pkg-config has to be installed.]);
fi
case "${host_os}" in
*linux*)
LIBZMQ_EXTRA_CXXFLAGS="${LIBZMQ_EXTRA_CXXFLAGS} -Wno-variadic-macros -Wno-long-long "
;;
*)
AC_MSG_ERROR([PGM extesion is not supported on this platform $host.])
;;
esac
AC_CHECK_PROG(have_tar, tar, yes, no)
if test "x$have_tar" != "xyes"; then
AC_MSG_ERROR([Could not find tar.])
fi
AC_CHECK_PROG(have_patch, patch, yes, no)
if test "x$have_patch" != "xyes"; then
AC_MSG_ERROR([Could not find patch.])
fi
AC_CHECK_PROG(have_bunzip2, bunzip2, yes, no)
if test "x$have_bunzip2" != "xyes"; then
AC_MSG_ERROR([Could not find bunzip2.])
fi
AC_CHECK_PROG(have_perl, perl, yes, no)
if test "x$have_perl" != "xyes"; then
AC_MSG_ERROR([Could not find perl.])
fi
if test "x$pyzmq" != "xyes"; then
AC_CHECK_PROG(have_python, python, yes, no)
if test "x$have_python" != "xyes"; then
AC_MSG_ERROR([Could not find python.])
fi
fi
# Unpack libpgm1
AC_MSG_CHECKING([Unpacking ${pgm_basename}.tar.bz2])
if tar -xjf foreign/openpgm/${pgm_basename}.tar.bz2 -C foreign/openpgm/; then
AC_MSG_RESULT([yes])
else
AC_MSG_ERROR([Could not unpack foreign/openpgm/${pgm_basename}.tar.bz2 file.])
fi
AC_MSG_CHECKING([Patching ${pgm_basename}])
if patch --silent -p0 < foreign/openpgm/lost_data_tsi.patch; then
AC_MSG_RESULT([yes])
else
AC_MSG_ERROR([Could not apply foreign/openpgm/lost_data_tsi.patch file.])
fi
AC_MSG_CHECKING([Patching ${pgm_basename}])
if patch --silent -p0 < foreign/openpgm/create_custom_gsi_1.patch; then
AC_MSG_RESULT([yes])
else
AC_MSG_ERROR([Could not apply foreign/openpgm/create_custom_gsi_1.patch file.])
fi
# Generate galois_tables.c
AC_CONFIG_COMMANDS([galois_tables.c],
[perl foreign/openpgm/libpgm-1.2.14/openpgm/pgm/galois_generator.pl > \
foreign/openpgm/libpgm-1.2.14/openpgm/pgm/galois_tables.c])
# Generate version.c
AC_CONFIG_COMMANDS([version.c],
[python foreign/openpgm/libpgm-1.2.14/openpgm/pgm/version_generator.py > \
foreign/openpgm/libpgm-1.2.14/openpgm/pgm/version.c])
# Check for OpenPGM nedded libraries.
PKG_CHECK_MODULES([GLIB], [glib-2.0 gthread-2.0])
LIBZMQ_EXTRA_CXXFLAGS="${LIBZMQ_EXTRA_CXXFLAGS} ${GLIB_CFLAGS} "
LIBZMQ_EXTRA_LDFLAFS="${LIBZMQ_EXTRA_LDFLAFS} ${GLIB_LIBS}"
AC_DEFINE(ZMQ_HAVE_OPENPGM, 1, [Have OpenPGM v1 or v2 extension.])
AC_DEFINE(ZMQ_HAVE_OPENPGM1, 1, [Have OpenPGM v1 extension.])
pgm1_ext="yes"
fi
if test "x$with_pgm2_ext" != "xno"; then
pgm_basename=${pgm2_basename}
...
...
@@ -596,12 +493,12 @@ fi
AC_SUBST(pgm_basename)
# If not on QNX nor OSX nor PGM add -pedantic into LIBZMQ_EXTRA_CXXFLAGS.
if test "x$pedantic" = "xyes" -a "x$pgm
1_ext" = "xno" -a "x$pgm
2_ext" = "xno"; then
if test "x$pedantic" = "xyes" -a "x$pgm2_ext" = "xno"; then
LIBZMQ_EXTRA_CXXFLAGS="${LIBZMQ_EXTRA_CXXFLAGS} -pedantic"
fi
# If not on QNX nor --with-pgm
/2
add -Werror into LIBZMQ_EXTRA_CXXFLAGS.
if test "x$werror" = "xyes" -a "x$pgm
1_ext" = "xno" -a "x$pgm
2_ext" = "xno"; then
# If not on QNX nor --with-pgm
2
add -Werror into LIBZMQ_EXTRA_CXXFLAGS.
if test "x$werror" = "xyes" -a "x$pgm2_ext" = "xno"; then
LIBZMQ_EXTRA_CXXFLAGS="${LIBZMQ_EXTRA_CXXFLAGS} -Werror"
fi
...
...
@@ -655,9 +552,8 @@ AM_CONDITIONAL(BUILD_RUBY, test "x$rbzmq" = "xyes")
AM_CONDITIONAL(BUILD_C, test "x$czmq" = "xyes")
AM_CONDITIONAL(BUILD_CL, test "x$clzmq" = "xyes")
AM_CONDITIONAL(BUILD_CPP, test "x$cppzmq" = "xyes")
AM_CONDITIONAL(BUILD_PGM1, test "x$pgm1_ext" = "xyes")
AM_CONDITIONAL(BUILD_PGM2, test "x$pgm2_ext" = "xyes")
AM_CONDITIONAL(BUILD_NO_PGM, test "x$pgm2_ext" = "xno"
-a "x$pgm1_ext" = "xno"
)
AM_CONDITIONAL(BUILD_NO_PGM, test "x$pgm2_ext" = "xno")
AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes")
AM_CONDITIONAL(BUILD_STREAMER, test "x$streamer" = "xyes")
AM_CONDITIONAL(BUILD_PERF, test "x$perf" = "xyes")
...
...
@@ -712,17 +608,11 @@ AC_MSG_RESULT([ Ruby: $rbzmq])
if test "x$rbzmq" = "xyes"; then
AC_MSG_RESULT([ Ruby library install dir: $rubydir])
fi
AC_MSG_RESULT([ Network protocols:])
AC_MSG_RESULT([ TCP: yes])
if test "x$pgm1_ext" = "xyes"; then
AC_MSG_RESULT([ PGM: $pgm1_ext ($pgm_basename)])
fi
if test "x$pgm2_ext" = "xyes"; then
AC_MSG_RESULT([ PGM: $pgm2_ext ($pgm_basename)])
fi
if test "x$pgm1_ext" = "xno" -a "x$pgm2_ext" = "xno"; then
AC_MSG_RESULT([ PGM: no])
fi
AC_MSG_RESULT([ Transports:])
AC_MSG_RESULT([ tcp: yes])
AC_MSG_RESULT([ udp: $pgm2_ext])
AC_MSG_RESULT([ pgm: $pgm2_ext])
AC_MSG_RESULT([ inproc: yes])
AC_MSG_RESULT([ Devices:])
AC_MSG_RESULT([ Forwarder: $forwarder])
AC_MSG_RESULT([ Streamer: $streamer])
...
...
foreign/openpgm/libpgm-1.2.14.tar.bz2
deleted
100644 → 0
View file @
73b765e4
File deleted
foreign/openpgm/lost_data_tsi.patch
deleted
100644 → 0
View file @
73b765e4
--- foreign/openpgm/libpgm-1.2.14/openpgm/pgm/transport.c 2009-08-27 04:54:04.000000000 +0200
+++ foreign/openpgm/libpgm-1.2.14/openpgm/pgm/transport.c 2009-09-22 14:36:07.713124619 +0200
@@ -2342,6 +2342,7 @@
if (waiting_rxw->ack_cumulative_losses != waiting_rxw->cumulative_losses)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), waiting_rxw->identifier, sizeof (pgm_tsi_t));
waiting_rxw->pgm_sock_err.lost_count = waiting_rxw->cumulative_losses - waiting_rxw->ack_cumulative_losses;
waiting_rxw->ack_cumulative_losses = waiting_rxw->cumulative_losses;
}
@@ -2705,6 +2706,7 @@
if (waiting_rxw->ack_cumulative_losses != waiting_rxw->cumulative_losses)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), waiting_rxw->identifier, sizeof (pgm_tsi_t));
waiting_rxw->pgm_sock_err.lost_count = waiting_rxw->cumulative_losses - waiting_rxw->ack_cumulative_losses;
waiting_rxw->ack_cumulative_losses = waiting_rxw->cumulative_losses;
}
@@ -3407,6 +3409,7 @@
!sender_rxw->waiting_link.data)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), sender_rxw->identifier, sizeof (pgm_tsi_t));
sender_rxw->pgm_sock_err.lost_count = sender_rxw->cumulative_losses - sender_rxw->ack_cumulative_losses;
sender_rxw->ack_cumulative_losses = sender_rxw->cumulative_losses;
@@ -3823,6 +3826,7 @@
!peer_rxw->waiting_link.data)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), peer_rxw->identifier, sizeof (pgm_tsi_t));
peer_rxw->pgm_sock_err.lost_count = peer_rxw->cumulative_losses - peer_rxw->ack_cumulative_losses;
peer_rxw->ack_cumulative_losses = peer_rxw->cumulative_losses;
@@ -3952,6 +3956,7 @@
!peer_rxw->waiting_link.data)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), peer_rxw->identifier, sizeof (pgm_tsi_t));
peer_rxw->pgm_sock_err.lost_count = peer_rxw->cumulative_losses - peer_rxw->ack_cumulative_losses;
peer_rxw->ack_cumulative_losses = peer_rxw->cumulative_losses;
@@ -4849,6 +4854,7 @@
!rxw->waiting_link.data)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), rxw->identifier, sizeof (pgm_tsi_t));
rxw->pgm_sock_err.lost_count = rxw->cumulative_losses - rxw->ack_cumulative_losses;
rxw->ack_cumulative_losses = rxw->cumulative_losses;
@@ -5166,6 +5172,7 @@
!rxw->waiting_link.data)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), rxw->identifier, sizeof (pgm_tsi_t));
rxw->pgm_sock_err.lost_count = rxw->cumulative_losses - rxw->ack_cumulative_losses;
rxw->ack_cumulative_losses = rxw->cumulative_losses;
@@ -5303,6 +5310,7 @@
!rxw->waiting_link.data)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), rxw->identifier, sizeof (pgm_tsi_t));
rxw->pgm_sock_err.lost_count = rxw->cumulative_losses - rxw->ack_cumulative_losses;
rxw->ack_cumulative_losses = rxw->cumulative_losses;
--- foreign/openpgm/libpgm-1.2.14/openpgm/pgm/include/pgm/transport.h 2009-08-27 04:53:23.000000000 +0200
+++ foreign/openpgm/libpgm-1.2.14/openpgm/pgm/include/pgm/transport.h 2009-09-21 15:49:36.000000000 +0200
@@ -205,6 +205,7 @@
gboolean is_bound;
gboolean is_open;
gboolean has_lost_data;
+ pgm_tsi_t lost_data_tsi;
gboolean will_close_on_failure;
gboolean can_send_data; /* and SPMs */
src/Makefile.am
View file @
f4ac8d7a
...
...
@@ -15,23 +15,6 @@ include_HEADERS = ../bindings/c/zmq.h
endif
endif
if
BUILD_PGM1
pgm_sources
=
../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/timer.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/if.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/gsi.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/signal.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/txwi.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/rxwi.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/transport.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/rate_control.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/async.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/checksum.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/reed_solomon.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/version.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/galois_tables.c
endif
if
BUILD_PGM2
pgm_sources
=
../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/time.c
\
...
...
@@ -179,48 +162,6 @@ libzmq_la_SOURCES = app_thread.hpp \
libzmq_la_LDFLAGS
=
-version-info
@LTVER@ @LIBZMQ_EXTRA_LDFLAFS@
if
BUILD_PGM1
libzmq_la_CXXFLAGS
=
-I
$(top_srcdir)
/foreign/openpgm/@pgm_basename@/openpgm/pgm/include/
-Wall
@LIBZMQ_EXTRA_CXXFLAGS@
libzmq_la_CFLAGS
=
-I
$(top_srcdir)
/foreign/openpgm/@pgm_basename@/openpgm/pgm/include/ @LIBZMQ_EXTRA_CXXFLAGS@
\
-pipe
\
-Wall
\
-Wextra
\
-Wfloat-equal
\
-Wshadow
\
-Wunsafe-loop-optimizations
\
-Wpointer-arith
\
-Wbad-function-cast
\
-Wcast-qual
\
-Wcast-align
\
-Wwrite-strings
\
-Waggregate-return
\
-Wstrict-prototypes
\
-Wold-style-definition
\
-Wmissing-prototypes
\
-Wmissing-declarations
\
-Wmissing-noreturn
\
-Wmissing-format-attribute
\
-Wredundant-decls
\
-Wnested-externs
\
-Winline
\
-pedantic
\
-std
=
gnu99
\
--param
max-inline-insns-single
=
600
\
-D_REENTRANT
\
-D_GNU_SOURCE
\
-D__need_IOV_MAX
\
-DCONFIG_HAVE_EPOLL
\
-DCONFIG_HAVE_RTC
\
-DCONFIG_HAVE_TSC
\
-DCONFIG_HAVE_IFR_NETMASK
\
-DCONFIG_HAVE_GETIFADDRS
\
-DCONFIG_HAVE_GETHOSTBYNAME2
\
-DCONFIG_HAVE_GETPROTOBYNAME_R
\
-DCONFIG_HAVE_SIGHANDLER_T
\
-DCONFIG_BIND_INADDR_ANY
\
-DCONFIG_GALOIS_MUL_LUT
endif
if
BUILD_PGM2
libzmq_la_CXXFLAGS
=
-I
$(top_srcdir)
/foreign/openpgm/@pgm_basename@/openpgm/pgm/include/
-Wall
@LIBZMQ_EXTRA_CXXFLAGS@
...
...
src/pgm_receiver.cpp
View file @
f4ac8d7a
...
...
@@ -165,13 +165,8 @@ void zmq::pgm_receiver_t::in_event ()
peer_info_t
peer_info
=
{
false
,
NULL
};
it
=
peers
.
insert
(
std
::
make_pair
(
*
tsi
,
peer_info
)).
first
;
#ifdef ZMQ_HAVE_OPENPGM1
zmq_log
(
1
,
"New peer TSI: %s, %s(%i).
\n
"
,
pgm_print_tsi
(
tsi
),
__FILE__
,
__LINE__
);
#elif ZMQ_HAVE_OPENPGM2
zmq_log
(
1
,
"New peer TSI: %s, %s(%i).
\n
"
,
pgm_tsi_print
(
tsi
),
__FILE__
,
__LINE__
);
#endif
}
// There is not beginning of the message in current APDU and we
...
...
@@ -197,13 +192,8 @@ void zmq::pgm_receiver_t::in_event ()
it
->
second
.
decoder
=
new
zmq_decoder_t
(
0
);
it
->
second
.
decoder
->
set_inout
(
inout
);
#ifdef ZMQ_HAVE_OPENPGM1
zmq_log
(
1
,
"Peer %s joined into the stream, %s(%i)
\n
"
,
pgm_print_tsi
(
tsi
),
__FILE__
,
__LINE__
);
#elif ZMQ_HAVE_OPENPGM2
zmq_log
(
1
,
"Peer %s joined into the stream, %s(%i)
\n
"
,
pgm_tsi_print
(
tsi
),
__FILE__
,
__LINE__
);
#endif
}
if
(
nbytes
>
0
)
{
...
...
src/pgm_sender.cpp
View file @
f4ac8d7a
...
...
@@ -74,19 +74,13 @@ void zmq::pgm_sender_t::plug (i_inout *inout_)
// Alocate 2 fds for PGM socket.
int
downlink_socket_fd
=
0
;
int
uplink_socket_fd
=
0
;
#ifdef ZMQ_HAVE_OPENPGM2
int
rdata_notify_fd
=
0
;
#endif
encoder
.
set_inout
(
inout_
);
// Fill fds from PGM transport.
#ifdef ZMQ_HAVE_OPENPGM1
pgm_socket
.
get_sender_fds
(
&
downlink_socket_fd
,
&
uplink_socket_fd
);
#elif ZMQ_HAVE_OPENPGM2
pgm_socket
.
get_sender_fds
(
&
downlink_socket_fd
,
&
uplink_socket_fd
,
&
rdata_notify_fd
);
#endif
// Add downlink_socket_fd into poller.
handle
=
add_fd
(
downlink_socket_fd
);
...
...
@@ -95,16 +89,12 @@ void zmq::pgm_sender_t::plug (i_inout *inout_)
uplink_handle
=
add_fd
(
uplink_socket_fd
);
// Add rdata_notify_fd into the poller.
#ifdef ZMQ_HAVE_OPENPGM2
rdata_notify_handle
=
add_fd
(
rdata_notify_fd
);
#endif
// Set POLLIN. We wont never want to stop polling for uplink = we never
// want to stop porocess NAKs.
set_pollin
(
uplink_handle
);
#ifdef ZMQ_HAVE_OPENPGM2
set_pollin
(
rdata_notify_handle
);
#endif
// Set POLLOUT for downlink_socket_handle.
set_pollout
(
handle
);
...
...
@@ -116,9 +106,7 @@ void zmq::pgm_sender_t::unplug ()
{
rm_fd
(
handle
);
rm_fd
(
uplink_handle
);
#ifdef ZMQ_HAVE_OPENPGM2
rm_fd
(
rdata_notify_handle
);
#endif
encoder
.
set_inout
(
NULL
);
inout
=
NULL
;
}
...
...
@@ -195,12 +183,6 @@ void zmq::pgm_sender_t::out_event ()
zmq_log
(
1
,
"pgm rate limit reached, %s(%i)
\n
"
,
__FILE__
,
__LINE__
);
}
#ifdef ZMQ_HAVE_OPENPGM1
// After sending data slice is owned by tx window.
if
(
nbytes
)
{
out_buffer
=
NULL
;
}
#endif
write_pos
+=
nbytes
;
}
...
...
src/pgm_sender.hpp
View file @
f4ac8d7a
...
...
@@ -80,9 +80,7 @@ namespace zmq
// Poll handle associated with PGM socket.
handle_t
handle
;
handle_t
uplink_handle
;
#ifdef ZMQ_HAVE_OPENPGM2
handle_t
rdata_notify_handle
;
#endif
// Parent session.
i_inout
*
inout
;
...
...
src/pgm_socket.cpp
View file @
f4ac8d7a
...
...
@@ -127,13 +127,9 @@ int zmq::pgm_socket_t::open_transport (void)
nbytes_processed
=
0
;
pgm_msgv_processed
=
0
;
#ifdef ZMQ_HAVE_OPENPGM1
int
pgm_ok
=
0
;
#elif defined ZMQ_HAVE_OPENPGM2
// TODO: Converting bool to int? Not nice.
int
pgm_ok
=
true
;
GError
*
pgm_error
=
NULL
;
#endif
// Init PGM transport.
// Ensure threading enabled, ensure timer enabled and find PGM protocol id.
...
...
@@ -162,11 +158,7 @@ int zmq::pgm_socket_t::open_transport (void)
gsi_base
=
uuid_t
().
to_string
();
}
#ifdef ZMQ_HAVE_OPENPGM1
rc
=
pgm_create_custom_gsi
(
gsi_base
.
c_str
(),
&
gsi
);
#elif defined ZMQ_HAVE_OPENPGM2
rc
=
pgm_gsi_create_from_string
(
&
gsi
,
gsi_base
.
c_str
(),
-
1
);
#endif
if
(
rc
!=
pgm_ok
)
{
errno
=
EINVAL
;
...
...
@@ -176,26 +168,6 @@ int zmq::pgm_socket_t::open_transport (void)
//zmq_log (1, "Transport GSI: %s, %s(%i)\n", pgm_print_gsi (&gsi),
// __FILE__, __LINE__);
#ifdef ZMQ_HAVE_OPENPGM1
// PGM transport GSRs.
struct
group_source_req
recv_gsr
,
send_gsr
;
size_t
recv_gsr_len
=
1
;
// On success, 0 is returned. On invalid arguments, -EINVAL is returned.
// If more multicast groups are found than the recv_len parameter,
// -ENOMEM is returned.
rc
=
pgm_if_parse_transport
(
network
,
AF_INET
,
&
recv_gsr
,
&
recv_gsr_len
,
&
send_gsr
);
if
(
rc
!=
0
)
{
errno
=
EINVAL
;
return
-
1
;
}
if
(
recv_gsr_len
!=
1
)
{
errno
=
ENOMEM
;
return
-
1
;
}
#elif defined ZMQ_HAVE_OPENPGM2
struct
pgm_transport_info_t
*
res
=
NULL
;
if
(
!
pgm_if_get_transport_info
(
network
,
NULL
,
&
res
,
&
pgm_error
))
{
...
...
@@ -205,29 +177,13 @@ int zmq::pgm_socket_t::open_transport (void)
res
->
ti_gsi
=
gsi
;
res
->
ti_dport
=
port_number
;
#endif
// If we are using UDP encapsulation update gsr or res.
if
(
udp_encapsulation
)
{
#ifdef ZMQ_HAVE_OPENPGM1
// Use the same port for UDP encapsulation.
((
struct
sockaddr_in
*
)
&
send_gsr
.
gsr_group
)
->
sin_port
=
g_htons
(
port_number
);
((
struct
sockaddr_in
*
)
&
recv_gsr
.
gsr_group
)
->
sin_port
=
g_htons
(
port_number
);
#elif defined ZMQ_HAVE_OPENPGM2
res
->
ti_udp_encap_ucast_port
=
port_number
;
res
->
ti_udp_encap_mcast_port
=
port_number
;
#endif
}
#ifdef ZMQ_HAVE_OPENPGM1
rc
=
pgm_transport_create
(
&
transport
,
&
gsi
,
0
,
port_number
,
&
recv_gsr
,
1
,
&
send_gsr
);
if
(
rc
!=
0
)
{
return
-
1
;
}
#elif defined ZMQ_HAVE_OPENPGM2
if
(
!
pgm_transport_create
(
&
transport
,
res
,
&
pgm_error
))
{
pgm_if_free_transport_info
(
res
);
// TODO: tranlate errors from glib into errnos.
...
...
@@ -236,7 +192,6 @@ int zmq::pgm_socket_t::open_transport (void)
}
pgm_if_free_transport_info
(
res
);
#endif
zmq_log
(
1
,
"PGM transport created, %s(%i)
\n
"
,
__FILE__
,
__LINE__
);
...
...
@@ -270,11 +225,8 @@ int zmq::pgm_socket_t::open_transport (void)
// Set transport->can_send_data = FALSE.
// Note that NAKs are still generated by the transport.
#if defined ZMQ_HAVE_OPENPGM1
rc
=
pgm_transport_set_recv_only
(
transport
,
false
);
#elif defined ZMQ_HAVE_OPENPGM2
rc
=
pgm_transport_set_recv_only
(
transport
,
true
,
false
);
#endif
zmq_assert
(
rc
==
pgm_ok
);
// Set NAK transmit back-off interval [us].
...
...
@@ -361,23 +313,6 @@ int zmq::pgm_socket_t::open_transport (void)
return
-
1
;
}
#ifdef ZMQ_HAVE_OPENPGM1
// Preallocate full transmit window. For simplification always
// worst case is used (40 bytes ipv6 header and 20 bytes UDP
// encapsulation).
int
to_preallocate
=
options
.
recovery_ivl
*
(
options
.
rate
*
1000
/
8
)
/
(
pgm_max_tpdu
-
40
-
20
);
rc
=
pgm_transport_set_txw_preallocate
(
transport
,
to_preallocate
);
if
(
rc
!=
0
)
{
errno
=
EINVAL
;
return
-
1
;
}
zmq_log
(
2
,
"Preallocated %i slices in TX window. %s(%i)
\n
"
,
to_preallocate
,
__FILE__
,
__LINE__
);
#endif
// Set interval of background SPM packets [us].
rc
=
pgm_transport_set_ambient_spm
(
transport
,
8192
*
1000
);
zmq_assert
(
rc
==
pgm_ok
);
...
...
@@ -398,17 +333,10 @@ int zmq::pgm_socket_t::open_transport (void)
}
// Bind a transport to the specified network devices.
#ifdef ZMQ_HAVE_OPENPGM1
rc
=
pgm_transport_bind
(
transport
);
if
(
rc
!=
0
)
{
return
-
1
;
}
#elif defined ZMQ_HAVE_OPENPGM2
if
(
!
pgm_transport_bind
(
transport
,
&
pgm_error
))
{
// TODO: tranlate errors from glib into errnos.
return
-
1
;
}
#endif
zmq_log
(
1
,
"PGM transport bound, %s(%i)
\n
"
,
__FILE__
,
__LINE__
);
...
...
@@ -444,28 +372,6 @@ int zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_,
zmq_assert
(
receive_fd_
);
zmq_assert
(
waiting_pipe_fd_
);
#if defined ZMQ_HAVE_OPENPGM1
// For POLLIN there are 2 pollfds in pgm_transport.
int
fds_array_size
=
pgm_receiver_fd_count
;
pollfd
*
fds
=
new
pollfd
[
fds_array_size
];
memset
(
fds
,
'\0'
,
fds_array_size
*
sizeof
(
fds
));
// Retrieve pollfds from pgm_transport.
int
rc
=
pgm_transport_poll_info
(
transport
,
fds
,
&
fds_array_size
,
POLLIN
);
// pgm_transport_poll_info has to return 2 pollfds for POLLIN.
// Note that fds_array_size parameter can be
// changed inside pgm_transport_poll_info call.
zmq_assert
(
rc
==
pgm_receiver_fd_count
);
// Store pfds into user allocated space.
*
receive_fd_
=
fds
[
0
].
fd
;
*
waiting_pipe_fd_
=
fds
[
1
].
fd
;
delete
[]
fds
;
#elif defined ZMQ_HAVE_OPENPGM2
// recv_sock2 should not be used - check it.
zmq_assert
(
transport
->
recv_sock2
==
-
1
);
...
...
@@ -476,7 +382,6 @@ int zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_,
// Take FDs directly from transport.
*
receive_fd_
=
pgm_transport_get_recv_fd
(
transport
);
*
waiting_pipe_fd_
=
pgm_transport_get_pending_fd
(
transport
);
#endif
return
pgm_receiver_fd_count
;
}
...
...
@@ -491,31 +396,6 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,
zmq_assert
(
send_fd_
);
zmq_assert
(
receive_fd_
);
#if defined ZMQ_HAVE_OPENPGM1
zmq_assert
(
!
rdata_notify_fd_
);
// Preallocate pollfds array.
int
fds_array_size
=
pgm_sender_fd_count
;
pollfd
*
fds
=
new
pollfd
[
fds_array_size
];
memset
(
fds
,
'\0'
,
fds_array_size
*
sizeof
(
fds
));
// Retrieve pollfds from pgm_transport.
int
rc
=
pgm_transport_poll_info
(
transport
,
fds
,
&
fds_array_size
,
POLLOUT
|
POLLIN
);
// pgm_transport_poll_info has to return one pollfds for POLLOUT and
// second for POLLIN.
// Note that fds_array_size parameter can be
// changed inside pgm_transport_poll_info call.
zmq_assert
(
rc
==
pgm_sender_fd_count
);
// Store pfds into user allocated space.
*
receive_fd_
=
fds
[
0
].
fd
;
*
send_fd_
=
fds
[
1
].
fd
;
delete
[]
fds
;
#elif defined ZMQ_HAVE_OPENPGM2
zmq_assert
(
rdata_notify_fd_
);
// recv_sock2 should not be used - check it.
...
...
@@ -529,7 +409,6 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,
*
receive_fd_
=
pgm_transport_get_recv_fd
(
transport
);
*
rdata_notify_fd_
=
pgm_transport_get_repair_fd
(
transport
);
*
send_fd_
=
pgm_transport_get_send_fd
(
transport
);
#endif
return
pgm_sender_fd_count
;
}
...
...
@@ -537,27 +416,6 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,
// Send one APDU, transmit window owned memory.
size_t
zmq
::
pgm_socket_t
::
send
(
unsigned
char
*
data_
,
size_t
data_len_
)
{
#if defined ZMQ_HAVE_OPENPGM1
ssize_t
nbytes
=
0
;
iovec
iov
=
{
data_
,
data_len_
};
nbytes
=
pgm_transport_send_packetv
(
transport
,
&
iov
,
1
,
MSG_DONTWAIT
|
MSG_WAITALL
,
true
);
zmq_assert
(
nbytes
!=
-
EINVAL
);
if
(
nbytes
==
-
1
&&
errno
!=
EAGAIN
)
{
errno_assert
(
false
);
}
// If nbytes is -1 and errno is EAGAIN means that we can not send data
// now. We have to call write_one_pkt again.
nbytes
=
nbytes
==
-
1
?
0
:
nbytes
;
#elif defined ZMQ_HAVE_OPENPGM2
size_t
nbytes
=
0
;
PGMIOStatus
status
=
pgm_send
(
transport
,
data_
,
data_len_
,
&
nbytes
);
...
...
@@ -569,7 +427,6 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
zmq_assert
(
status
==
PGM_IO_STATUS_RATE_LIMITED
);
zmq_assert
(
nbytes
==
0
);
}
#endif
zmq_log
(
4
,
"wrote %i/%iB, %s(%i)
\n
"
,
(
int
)
nbytes
,
(
int
)
data_len_
,
__FILE__
,
__LINE__
);
...
...
@@ -618,26 +475,17 @@ void *zmq::pgm_socket_t::get_buffer (size_t *size_)
// Store size.
*
size_
=
get_max_tsdu_size
();
#if defined ZMQ_HAVE_OPENPGM1
// Allocate one packet in tx window.
return
pgm_packetv_alloc
(
transport
,
false
);
#elif defined ZMQ_HAVE_OPENPGM2
// Allocate buffer.
unsigned
char
*
apdu_buff
=
new
unsigned
char
[
*
size_
];
zmq_assert
(
apdu_buff
);
return
apdu_buff
;
#endif
}
// Return an unused packet allocated from the transmit window
// via pgm_packetv_alloc().
void
zmq
::
pgm_socket_t
::
free_buffer
(
void
*
data_
)
{
#if defined ZMQ_HAVE_OPENPGM1
pgm_packetv_free1
(
transport
,
data_
,
false
);
#elif defined ZMQ_HAVE_OPENPGM2
delete
[]
(
unsigned
char
*
)
data_
;
#endif
}
...
...
@@ -672,42 +520,6 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
// Receive a vector of Application Protocol Domain Unit's (APDUs)
// from the transport.
#ifdef ZMQ_HAVE_OPENPGM1
nbytes_rec
=
pgm_transport_recvmsgv
(
transport
,
pgm_msgv
,
pgm_msgv_len
,
MSG_DONTWAIT
);
// In a case when no ODATA/RDATA fired POLLIN event (SPM...)
// pgm_transport_recvmsg returns -1 with errno == EAGAIN.
if
(
nbytes_rec
==
-
1
&&
errno
==
EAGAIN
)
{
// In case if no RDATA/ODATA caused POLLIN 0 is
// returned.
nbytes_rec
=
0
;
return
0
;
}
// For data loss nbytes_rec == -1 errno == ECONNRESET.
if
(
nbytes_rec
==
-
1
&&
errno
==
ECONNRESET
)
{
// Save lost data TSI.
*
tsi_
=
&
transport
->
lost_data_tsi
;
zmq_log
(
1
,
"Data loss detected %s, %s(%i)
\n
"
,
pgm_print_tsi
(
&
transport
->
lost_data_tsi
),
__FILE__
,
__LINE__
);
nbytes_rec
=
0
;
// In case of dala loss -1 is returned.
return
-
1
;
}
// Catch the rest of the errors.
if
(
nbytes_rec
<=
0
)
{
zmq_log
(
2
,
"received %i B, errno %i, %s(%i).
\n
"
,
(
int
)
nbytes_rec
,
errno
,
__FILE__
,
__LINE__
);
errno_assert
(
false
);
}
#elif defined ZMQ_HAVE_OPENPGM2
GError
*
pgm_error
=
NULL
;
const
PGMIOStatus
status
=
pgm_recvmsgv
(
transport
,
pgm_msgv
,
...
...
@@ -760,7 +572,6 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
nbytes_rec
=
0
;
return
-
1
;
}
#endif
zmq_log
(
4
,
"received %i bytes
\n
"
,
(
int
)
nbytes_rec
);
...
...
@@ -768,17 +579,6 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
zmq_assert
(
nbytes_rec
>
0
);
#ifdef ZMQ_HAVE_OPENPGM1
// Only one APDU per pgm_msgv_t structure is allowed.
zmq_assert
(
pgm_msgv
[
pgm_msgv_processed
].
msgv_iovlen
==
1
);
// Take pointers from pgm_msgv_t structure.
*
raw_data_
=
pgm_msgv
[
pgm_msgv_processed
].
msgv_iov
->
iov_base
;
raw_data_len
=
pgm_msgv
[
pgm_msgv_processed
].
msgv_iov
->
iov_len
;
// Save current TSI.
*
tsi_
=
pgm_msgv
[
pgm_msgv_processed
].
msgv_tsi
;
#elif defined ZMQ_HAVE_OPENPGM2
// Only one APDU per pgm_msgv_t structure is allowed.
zmq_assert
(
pgm_msgv
[
pgm_msgv_processed
].
msgv_len
==
1
);
...
...
@@ -791,7 +591,6 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
// Save current TSI.
*
tsi_
=
&
skb
->
tsi
;
#endif
// Move the the next pgm_msgv_t structure.
pgm_msgv_processed
++
;
...
...
@@ -808,19 +607,6 @@ void zmq::pgm_socket_t::process_upstream (void)
pgm_msgv_t
dummy_msg
;
#ifdef ZMQ_HAVE_OPENPGM1
ssize_t
dummy_bytes
=
0
;
// We acctually do not want to read any data here we are going to
// process NAK.
dummy_bytes
=
pgm_transport_recvmsgv
(
transport
,
&
dummy_msg
,
1
,
MSG_DONTWAIT
);
// No data should be returned.
zmq_assert
(
dummy_bytes
==
-
1
&&
errno
==
EAGAIN
);
#elif defined ZMQ_HAVE_OPENPGM2
size_t
dummy_bytes
=
0
;
GError
*
pgm_error
=
NULL
;
...
...
@@ -833,8 +619,6 @@ void zmq::pgm_socket_t::process_upstream (void)
// No data should be returned.
zmq_assert
(
dummy_bytes
==
0
&&
(
status
==
PGM_IO_STATUS_TIMER_PENDING
||
status
==
PGM_IO_STATUS_RATE_LIMITED
));
#endif
}
#endif
...
...
src/pgm_socket.hpp
View file @
f4ac8d7a
...
...
@@ -114,35 +114,19 @@ namespace zmq
pgm_msgv_t
*
pgm_msgv
;
// How many bytes were read from pgm socket.
#ifdef ZMQ_HAVE_OPENPGM1
ssize_t
nbytes_rec
;
#elif defined ZMQ_HAVE_OPENPGM2
size_t
nbytes_rec
;
#endif
// How many bytes were processed from last pgm socket read.
#ifdef ZMQ_HAVE_OPENPGM1
ssize_t
nbytes_processed
;
#elif defined ZMQ_HAVE_OPENPGM2
size_t
nbytes_processed
;
#endif
// How many messages from pgm_msgv were already sent up.
#ifdef ZMQ_HAVE_OPENPGM1
ssize_t
pgm_msgv_processed
;
#elif defined ZMQ_HAVE_OPENPGM2
size_t
pgm_msgv_processed
;
#endif
// Size of pgm_msgv array.
size_t
pgm_msgv_len
;
// Sender transport uses 2 fd.
#ifdef ZMQ_HAVE_OPENPGM1
enum
{
pgm_sender_fd_count
=
2
};
#elif ZMQ_HAVE_OPENPGM2
enum
{
pgm_sender_fd_count
=
3
};
#endif
// Receiver transport uses 2 fd.
enum
{
pgm_receiver_fd_count
=
2
};
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment