Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
00cd7d49
Commit
00cd7d49
authored
Sep 28, 2010
by
Steven McCoy
Committed by
Martin Sustrik
Sep 28, 2010
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Upgrade to OpenPGM-5.0.78
parent
10bb9d04
Show whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
356 additions
and
332 deletions
+356
-332
configure.in
configure.in
+6
-10
libpgm-2.1.26.tar.gz
foreign/openpgm/libpgm-2.1.26.tar.gz
+0
-0
libpgm-5.0.78.tar.gz
foreign/openpgm/libpgm-5.0.78.tar.gz
+0
-0
Makefile.am
src/Makefile.am
+51
-45
pgm_socket.cpp
src/pgm_socket.cpp
+285
-263
pgm_socket.hpp
src/pgm_socket.hpp
+1
-1
zmq.cpp
src/zmq.cpp
+13
-13
No files found.
configure.in
View file @
00cd7d49
...
@@ -102,6 +102,7 @@ case "${host_os}" in
...
@@ -102,6 +102,7 @@ case "${host_os}" in
# Define on Linux to enable all library features
# Define on Linux to enable all library features
CPPFLAGS="-D_GNU_SOURCE $CPPFLAGS"
CPPFLAGS="-D_GNU_SOURCE $CPPFLAGS"
AC_DEFINE(ZMQ_HAVE_LINUX, 1, [Have Linux OS])
AC_DEFINE(ZMQ_HAVE_LINUX, 1, [Have Linux OS])
AC_CHECK_LIB(rt, main)
AC_CHECK_LIB(uuid, main, ,
AC_CHECK_LIB(uuid, main, ,
[AC_MSG_ERROR([cannot link with -luuid, install uuid-dev.])])
[AC_MSG_ERROR([cannot link with -luuid, install uuid-dev.])])
;;
;;
...
@@ -269,7 +270,7 @@ fi
...
@@ -269,7 +270,7 @@ fi
# PGM extension
# PGM extension
pgm_ext="no"
pgm_ext="no"
pgm_basename="libpgm-
2.1.26
"
pgm_basename="libpgm-
5.0.78
"
AC_SUBST(pgm_basename)
AC_SUBST(pgm_basename)
...
@@ -280,9 +281,9 @@ AC_ARG_WITH([pgm], [AS_HELP_STRING([--with-pgm],
...
@@ -280,9 +281,9 @@ AC_ARG_WITH([pgm], [AS_HELP_STRING([--with-pgm],
if test "x$with_pgm_ext" != "xno"; then
if test "x$with_pgm_ext" != "xno"; then
AC_MSG_CHECKING([if the PGM extension is supported on this platform])
AC_MSG_CHECKING([if the PGM extension is supported on this platform])
# OpenPGM is only supported by the vendor on x86
and AMD64
platforms...
# OpenPGM is only supported by the vendor on x86
, AMD64, and SPARC
platforms...
case "${host_cpu}" in
case "${host_cpu}" in
i*86|x86_64)
i*86|x86_64
|*sparc*
)
# Supported
# Supported
;;
;;
*)
*)
...
@@ -290,9 +291,9 @@ if test "x$with_pgm_ext" != "xno"; then
...
@@ -290,9 +291,9 @@ if test "x$with_pgm_ext" != "xno"; then
;;
;;
esac
esac
# ... and on Linux/Windows/Solaris systems.
# ... and on Linux/Windows/Solaris
/FreeBSD/OSX
systems.
case "${host_os}" in
case "${host_os}" in
*linux*|*mingw32*|*solaris*)
*linux*|*mingw32*|*solaris*
|*freebsd*|*darwin*
)
LIBZMQ_EXTRA_CXXFLAGS="${LIBZMQ_EXTRA_CXXFLAGS} -Wno-variadic-macros -Wno-long-long "
LIBZMQ_EXTRA_CXXFLAGS="${LIBZMQ_EXTRA_CXXFLAGS} -Wno-variadic-macros -Wno-long-long "
;;
;;
*)
*)
...
@@ -306,11 +307,6 @@ if test "x$with_pgm_ext" != "xno"; then
...
@@ -306,11 +307,6 @@ if test "x$with_pgm_ext" != "xno"; then
if test "x$have_pkg_config" != "xyes"; then
if test "x$have_pkg_config" != "xyes"; then
AC_MSG_ERROR([the --with-pgm option requires that pkg-config be installed.]);
AC_MSG_ERROR([the --with-pgm option requires that pkg-config be installed.]);
fi
fi
# Check for OpenPGM dependencies
PKG_CHECK_MODULES([GLIB], [glib-2.0 gthread-2.0])
LIBZMQ_EXTRA_CXXFLAGS="${LIBZMQ_EXTRA_CXXFLAGS} ${GLIB_CFLAGS} "
LIBZMQ_EXTRA_LDFLAGS="${LIBZMQ_EXTRA_LDFLAGS} ${GLIB_LIBS} "
fi
fi
# Gzip, Perl and Python are required duing PGM build
# Gzip, Perl and Python are required duing PGM build
...
...
foreign/openpgm/libpgm-2.1.26.tar.gz
deleted
100644 → 0
View file @
10bb9d04
File deleted
foreign/openpgm/libpgm-5.0.78.tar.gz
0 → 100644
View file @
00cd7d49
File added
src/Makefile.am
View file @
00cd7d49
...
@@ -6,7 +6,21 @@ pkgconfig_DATA = libzmq.pc
...
@@ -6,7 +6,21 @@ pkgconfig_DATA = libzmq.pc
include_HEADERS
=
../include/zmq.h ../include/zmq.hpp ../include/zmq_utils.h
include_HEADERS
=
../include/zmq.h ../include/zmq.hpp ../include/zmq_utils.h
if
BUILD_PGM
if
BUILD_PGM
pgm_sources
=
../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c
\
noinst_LTLIBRARIES
=
libpgm.la
nodist_libpgm_la_SOURCES
=
../foreign/openpgm/@pgm_basename@/openpgm/pgm/thread.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/mem.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/string.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/list.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/slist.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/queue.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/hashtable.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/messages.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/error.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/math.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet_parse.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet_test.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/sockaddr.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/time.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/time.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/if.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/if.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/getifaddrs.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/getifaddrs.c
\
...
@@ -16,28 +30,25 @@ pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \
...
@@ -16,28 +30,25 @@ pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \
../foreign/openpgm/@pgm_basename@/openpgm/pgm/nametoindex.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/nametoindex.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/inet_network.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/inet_network.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/md5.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/md5.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/rand.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/gsi.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/gsi.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/tsi.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/tsi.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/
signal
.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/
txw
.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/
txwi
.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/
rxw
.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/
rxwi
.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/
skbuff
.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/
transpor
t.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/
socke
t.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/source.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/source.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/receiver.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/receiver.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/recv.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/recv.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/
pgm
.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/
engine
.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/timer.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/timer.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/net.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/net.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/rate_control.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/rate_control.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/async.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/checksum.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/checksum.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/reed_solomon.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/reed_solomon.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/galois_tables.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/galois_tables.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/wsastrerror.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/wsastrerror.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/glib-compat.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/histogram.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/backtrace.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/log.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/sockaddr.c
\
../foreign/openpgm/@pgm_basename@/openpgm/pgm/version.c
../foreign/openpgm/@pgm_basename@/openpgm/pgm/version.c
../foreign/openpgm/@pgm_basename@/openpgm/pgm/version.c
:
../foreign/openpgm/@pgm_basename@/openpgm/pgm/version_generator.py
../foreign/openpgm/@pgm_basename@/openpgm/pgm/version.c
:
../foreign/openpgm/@pgm_basename@/openpgm/pgm/version_generator.py
...
@@ -45,9 +56,9 @@ pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \
...
@@ -45,9 +56,9 @@ pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \
../foreign/openpgm/@pgm_basename@/openpgm/pgm/galois_tables.c
:
../foreign/openpgm/@pgm_basename@/openpgm/pgm/galois_generator.pl
../foreign/openpgm/@pgm_basename@/openpgm/pgm/galois_tables.c
:
../foreign/openpgm/@pgm_basename@/openpgm/pgm/galois_generator.pl
perl ../foreign/openpgm/@pgm_basename@/openpgm/pgm/galois_generator.pl
>
$@
perl ../foreign/openpgm/@pgm_basename@/openpgm/pgm/galois_generator.pl
>
$@
endif
nodist_libzmq_la_SOURCES
=
$(pgm_sources)
libpgm_la_LIBADD
=
@LTLIBOBJS@
endif
libzmq_la_SOURCES
=
\
libzmq_la_SOURCES
=
\
array.hpp
\
array.hpp
\
...
@@ -189,53 +200,44 @@ if BUILD_PGM
...
@@ -189,53 +200,44 @@ if BUILD_PGM
if
ON_MINGW
if
ON_MINGW
libpgm_diff_flags
=
\
libpgm_diff_flags
=
\
-D_WIN32_WINNT
=
0x0501
\
-D_WIN32_WINNT
=
0x0501
\
-DCONFIG_16BIT_CHECKSUM
\
-DCONFIG_HAVE_ISO_VARARGS
\
-DCONFIG_HAVE_IFR_NETMASK
\
-DCONFIG_HAVE_TSC
\
-DCONFIG_BIND_INADDR_ANY
\
-DCONFIG_GALOIS_MUL_LUT
\
-DIF_NAMESIZE
=
256
\
-DPGM_GNUC_INTERNAL
=
G_GNUC_INTERNAL
\
-DCONFIG_HAVE_WSACMSGHDR
\
-DCONFIG_HAVE_WSACMSGHDR
\
-D
GETTEXT_PACKAGE
=
'"pgm"'
\
-D
CONFIG_HAVE_DSO_VISIBILITY
\
-D
G_LOG_DOMAIN
=
'"Pgm"'
-D
CONFIG_BIND_INADDR_ANY
else
else
libpgm_diff_flags
=
\
libpgm_diff_flags
=
\
-D__need_IOV_MAX
\
-DCONFIG_HAVE_GETPROTOBYNAME_R2
\
-DCONFIG_16BIT_CHECKSUM
\
-DCONFIG_HAVE_ISO_VARARGS
\
-DCONFIG_HAVE_ALLOCA_H
\
-DCONFIG_HAVE_PROC
\
-DCONFIG_HAVE_BACKTRACE
\
-DCONFIG_HAVE_PSELECT
\
-DCONFIG_HAVE_PSELECT
\
-DCONFIG_HAVE_POLL
\
-DCONFIG_HAVE_PPOLL
\
-DCONFIG_HAVE_EPOLL
\
-DCONFIG_HAVE_CLOCK_GETTIME
\
-DCONFIG_HAVE_CLOCK_NANOSLEEP
\
-DCONFIG_HAVE_NANOSLEEP
\
-DCONFIG_HAVE_USLEEP
\
-DCONFIG_HAVE_RTC
\
-DCONFIG_HAVE_RTC
\
-DCONFIG_HAVE_TSC
\
-DCONFIG_HAVE_TSC
\
-DCONFIG_HAVE_IFR_NETMASK
\
-DCONFIG_HAVE_HPET
\
-DCONFIG_HAVE_POLL
\
-DCONFIG_HAVE_EPOLL
\
-DCONFIG_HAVE_GETIFADDRS
\
-DCONFIG_HAVE_GETIFADDRS
\
-DCONFIG_HAVE_GETHOSTBYNAME2
\
-DCONFIG_HAVE_IFR_NETMASK
\
-DCONFIG_HAVE_GETPROTOBYNAME_R
\
-DCONFIG_BIND_INADDR_ANY
\
-DCONFIG_GALOIS_MUL_LUT
\
-DCONFIG_HAVE_MCAST_JOIN
\
-DCONFIG_HAVE_MCAST_JOIN
\
-DCONFIG_HAVE_IP_MREQN
\
-DCONFIG_HAVE_IP_MREQN
\
-DCONFIG_HAVE_SPRINTF_GROUPING
\
-DCONFIG_HAVE_SPRINTF_GROUPING
\
-DCONFIG_HAVE_
HPET
\
-DCONFIG_HAVE_
VASPRINTF
\
-D
PGM_GNUC_INTERNAL
=
G_GNUC_INTERNAL
\
-D
CONFIG_HAVE_DSO_VISIBILITY
\
-D
GETTEXT_PACKAGE
=
'"pgm"'
\
-D
CONFIG_BIND_INADDR_ANY
\
-D
G_LOG_DOMAIN
=
'"Pgm"'
-D
CONFIG_HAVE_GETOPT
endif
endif
libzmq_la_CFLAGS
=
-I
$(top_srcdir)
/foreign/openpgm/@pgm_basename@/openpgm/pgm/include/ @LIBZMQ_EXTRA_CXXFLAGS@
\
libpgm_la_CFLAGS
=
-I
$(top_srcdir)
/foreign/openpgm/@pgm_basename@/openpgm/pgm/include/ @LIBZMQ_EXTRA_CXXFLAGS@
\
-Wall
\
-pedantic
\
-std
=
gnu99
\
-std
=
gnu99
\
-
fno-strict-aliasing
\
-
D_XOPEN_SOURCE
=
600
\
-
-param
max-inline-insns-single
=
600
\
-
D_BSD_SOURCE
\
-D_REENTRANT
\
-D_REENTRANT
\
-D_GNU_SOURCE
\
-DCONFIG_16BIT_CHECKSUM
\
-DCONFIG_GALOIS_MUL_LUT
\
-DGETTEXT_PACKAGE
=
'"pgm"'
\
${
libpgm_diff_flags
}
${
libpgm_diff_flags
}
libzmq_la_CXXFLAGS
=
-I
$(top_srcdir)
/foreign/openpgm/@pgm_basename@/openpgm/pgm/include/
\
libzmq_la_CXXFLAGS
=
-I
$(top_srcdir)
/foreign/openpgm/@pgm_basename@/openpgm/pgm/include/
\
...
@@ -246,6 +248,10 @@ if BUILD_NO_PGM
...
@@ -246,6 +248,10 @@ if BUILD_NO_PGM
libzmq_la_CXXFLAGS
=
@LIBZMQ_EXTRA_CXXFLAGS@
libzmq_la_CXXFLAGS
=
@LIBZMQ_EXTRA_CXXFLAGS@
endif
endif
if
BUILD_PGM
libzmq_la_LIBADD
=
libpgm.la
endif
dist-hook
:
dist-hook
:
-
rm
$(distdir)
/platform.hpp
-
rm
$(distdir)
/platform.hpp
...
...
src/pgm_socket.cpp
View file @
00cd7d49
...
@@ -41,7 +41,7 @@
...
@@ -41,7 +41,7 @@
#include "stdint.hpp"
#include "stdint.hpp"
zmq
::
pgm_socket_t
::
pgm_socket_t
(
bool
receiver_
,
const
options_t
&
options_
)
:
zmq
::
pgm_socket_t
::
pgm_socket_t
(
bool
receiver_
,
const
options_t
&
options_
)
:
transport
(
NULL
),
sock
(
NULL
),
options
(
options_
),
options
(
options_
),
receiver
(
receiver_
),
receiver
(
receiver_
),
pgm_msgv
(
NULL
),
pgm_msgv
(
NULL
),
...
@@ -55,10 +55,10 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :
...
@@ -55,10 +55,10 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :
int
zmq
::
pgm_socket_t
::
init
(
bool
udp_encapsulation_
,
const
char
*
network_
)
int
zmq
::
pgm_socket_t
::
init
(
bool
udp_encapsulation_
,
const
char
*
network_
)
{
{
// Can not open transport before destroying old one.
// Can not open transport before destroying old one.
zmq_assert
(
transport
==
NULL
);
zmq_assert
(
sock
==
NULL
);
// Parse port number
.
// Parse port number
, start from end for IPv6
const
char
*
port_delim
=
strchr
(
network_
,
':'
);
const
char
*
port_delim
=
str
r
chr
(
network_
,
':'
);
if
(
!
port_delim
)
{
if
(
!
port_delim
)
{
errno
=
EINVAL
;
errno
=
EINVAL
;
return
-
1
;
return
-
1
;
...
@@ -74,260 +74,226 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
...
@@ -74,260 +74,226 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
memset
(
network
,
'\0'
,
sizeof
(
network
));
memset
(
network
,
'\0'
,
sizeof
(
network
));
memcpy
(
network
,
network_
,
port_delim
-
network_
);
memcpy
(
network
,
network_
,
port_delim
-
network_
);
// Zero counter used in msgrecv.
// Validate socket options
nbytes_rec
=
0
;
// Data rate is in [B/s]. options.rate is in [kb/s].
nbytes_processed
=
0
;
if
(
options
.
rate
<=
0
)
{
pgm_msgv_processed
=
0
;
int
rc
;
GError
*
pgm_error
=
NULL
;
// PGM transport GSI.
pgm_gsi_t
gsi
;
std
::
string
gsi_base
;
if
(
options
.
identity
.
size
()
>
0
)
{
// Create gsi from identity.
// TODO: We assume that identity is standard C string here.
// What if it contains binary zeroes?
gsi_base
.
assign
((
const
char
*
)
options
.
identity
.
data
(),
options
.
identity
.
size
());
}
else
{
// Generate random gsi.
gsi_base
=
uuid_t
().
to_string
();
}
rc
=
pgm_gsi_create_from_string
(
&
gsi
,
gsi_base
.
c_str
(),
-
1
);
if
(
rc
!=
TRUE
)
{
errno
=
EINVAL
;
errno
=
EINVAL
;
return
-
1
;
return
-
1
;
}
}
// Recovery interval [s].
struct
pgm_transport_info_t
*
res
=
NULL
;
if
(
options
.
recovery_ivl
<=
0
)
{
struct
pgm_transport_info_t
hint
;
memset
(
&
hint
,
0
,
sizeof
(
hint
));
hint
.
ti_family
=
AF_INET
;
if
(
!
pgm_if_get_transport_info
(
network
,
&
hint
,
&
res
,
&
pgm_error
))
{
if
(
pgm_error
->
domain
==
PGM_IF_ERROR
&&
(
pgm_error
->
code
==
PGM_IF_ERROR_INVAL
||
pgm_error
->
code
==
PGM_IF_ERROR_XDEV
||
pgm_error
->
code
==
PGM_IF_ERROR_NODEV
||
pgm_error
->
code
==
PGM_IF_ERROR_NOTUNIQ
||
pgm_error
->
code
==
PGM_IF_ERROR_ADDRFAMILY
||
pgm_error
->
code
==
PGM_IF_ERROR_FAMILY
||
pgm_error
->
code
==
PGM_IF_ERROR_NODATA
||
pgm_error
->
code
==
PGM_IF_ERROR_NONAME
||
pgm_error
->
code
==
PGM_IF_ERROR_SERVICE
))
{
g_error_free
(
pgm_error
);
errno
=
EINVAL
;
errno
=
EINVAL
;
return
-
1
;
return
-
1
;
}
}
// Zero counter used in msgrecv.
nbytes_rec
=
0
;
nbytes_processed
=
0
;
pgm_msgv_processed
=
0
;
bool
rc
;
pgm_error_t
*
pgm_error
=
NULL
;
struct
pgm_addrinfo_t
hints
,
*
res
=
NULL
;
sa_family_t
sa_family
;
memset
(
&
hints
,
0
,
sizeof
(
hints
));
hints
.
ai_family
=
AF_UNSPEC
;
if
(
!
pgm_getaddrinfo
(
network
,
NULL
,
&
res
,
&
pgm_error
))
{
if
(
pgm_error
->
domain
==
PGM_ERROR_DOMAIN_IF
&&
(
pgm_error
->
code
==
PGM_ERROR_INVAL
||
pgm_error
->
code
==
PGM_ERROR_XDEV
||
pgm_error
->
code
==
PGM_ERROR_NODEV
||
pgm_error
->
code
==
PGM_ERROR_NOTUNIQ
||
pgm_error
->
code
==
PGM_ERROR_ADDRFAMILY
||
pgm_error
->
code
==
PGM_ERROR_AFNOSUPPORT
||
pgm_error
->
code
==
PGM_ERROR_NODATA
||
pgm_error
->
code
==
PGM_ERROR_NONAME
||
pgm_error
->
code
==
PGM_ERROR_SERVICE
))
goto
err_abort
;
/* fatal OpenPGM API error */
zmq_assert
(
false
);
zmq_assert
(
false
);
}
}
res
->
ti_gsi
=
gsi
;
// Pick up detected IP family
res
->
ti_dport
=
port_number
;
sa_family
=
res
->
ai_send_addrs
[
0
].
gsr_group
.
ss_family
;
//
If we are using UDP encapsulation update gsr or res.
//
Create IP/PGM or UDP/PGM socket
if
(
udp_encapsulation_
)
{
if
(
udp_encapsulation_
)
{
res
->
ti_udp_encap_ucast_port
=
port_number
;
if
(
!
pgm_socket
(
&
sock
,
sa_family
,
SOCK_SEQPACKET
,
IPPROTO_UDP
,
&
pgm_error
))
{
res
->
ti_udp_encap_mcast_port
=
port_number
;
if
(
pgm_error
->
domain
==
PGM_ERROR_DOMAIN_SOCKET
&&
(
}
pgm_error
->
code
==
PGM_ERROR_INVAL
||
pgm_error
->
code
==
PGM_ERROR_NODEV
))
goto
err_abort
;
if
(
!
pgm_transport_create
(
&
transport
,
res
,
&
pgm_error
))
{
/* fatal OpenPGM API error */
if
(
pgm_error
->
domain
==
PGM_TRANSPORT_ERROR
&&
(
zmq_assert
(
false
);
pgm_error
->
code
==
PGM_TRANSPORT_ERROR_INVAL
||
pgm_error
->
code
==
PGM_TRANSPORT_ERROR_PERM
||
pgm_error
->
code
==
PGM_TRANSPORT_ERROR_NODEV
))
{
pgm_if_free_transport_info
(
res
);
g_error_free
(
pgm_error
);
errno
=
EINVAL
;
return
-
1
;
}
}
// All options are of data type int
const
int
encapsulation_port
=
port_number
;
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_UDP_ENCAP_UCAST_PORT
,
&
encapsulation_port
,
sizeof
(
encapsulation_port
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_UDP_ENCAP_UCAST_PORT
,
&
encapsulation_port
,
sizeof
(
encapsulation_port
)))
goto
err_abort
;
}
else
{
if
(
!
pgm_socket
(
&
sock
,
sa_family
,
SOCK_SEQPACKET
,
IPPROTO_PGM
,
&
pgm_error
))
{
if
(
pgm_error
->
domain
==
PGM_ERROR_DOMAIN_SOCKET
&&
(
pgm_error
->
code
==
PGM_ERROR_INVAL
||
pgm_error
->
code
==
PGM_ERROR_PERM
||
pgm_error
->
code
==
PGM_ERROR_NODEV
))
goto
err_abort
;
/* fatal OpenPGM API error */
zmq_assert
(
false
);
zmq_assert
(
false
);
}
}
pgm_if_free_transport_info
(
res
);
// Common parameters for receiver and sender.
// Set maximum transport protocol data unit size (TPDU).
rc
=
pgm_transport_set_max_tpdu
(
transport
,
pgm_max_tpdu
);
if
(
rc
!=
TRUE
)
{
errno
=
EINVAL
;
return
-
1
;
}
}
// Set maximum number of network hops to cross.
{
rc
=
pgm_transport_set_hops
(
transport
,
16
);
const
int
rcvbuf
=
(
int
)
options
.
rcvbuf
,
if
(
rc
!=
TRUE
)
{
sndbuf
=
(
int
)
options
.
sndbuf
,
errno
=
EINVAL
;
max_tpdu
=
(
int
)
pgm_max_tpdu
;
return
-
1
;
if
(
rcvbuf
)
{
if
(
!
pgm_setsockopt
(
sock
,
SOL_SOCKET
,
SO_RCVBUF
,
&
rcvbuf
,
sizeof
(
rcvbuf
)))
goto
err_abort
;
}
if
(
sndbuf
)
{
if
(
!
pgm_setsockopt
(
sock
,
SOL_SOCKET
,
SO_RCVBUF
,
&
sndbuf
,
sizeof
(
sndbuf
)))
goto
err_abort
;
}
}
// Set nonblocking send/recv sockets.
// Set maximum transport protocol data unit size (TPDU).
if
(
!
pgm_transport_set_nonblocking
(
transport
,
true
))
{
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_MTU
,
&
max_tpdu
,
sizeof
(
max_tpdu
)))
errno
=
EINVAL
;
goto
err_abort
;
return
-
1
;
}
}
if
(
receiver
)
{
if
(
receiver
)
{
const
int
recv_only
=
1
,
// Receiver transport.
rxw_max_rte
=
options
.
rate
*
1000
/
8
,
rxw_secs
=
options
.
recovery_ivl
,
// Note that NAKs are still generated by the transport.
peer_expiry
=
5
*
pgm_msecs
(
8192
),
rc
=
pgm_transport_set_recv_only
(
transport
,
true
,
false
);
spmr_expiry
=
pgm_msecs
(
25
),
zmq_assert
(
rc
==
TRUE
);
nak_bo_ivl
=
pgm_msecs
(
50
),
nak_rpt_ivl
=
pgm_msecs
(
200
),
if
(
options
.
rcvbuf
)
{
nak_rdata_ivl
=
pgm_msecs
(
200
),
rc
=
pgm_transport_set_rcvbuf
(
transport
,
(
int
)
options
.
rcvbuf
);
nak_data_retries
=
5
,
if
(
rc
!=
TRUE
)
nak_ncf_retries
=
2
;
return
-
1
;
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_RECV_ONLY
,
&
recv_only
,
sizeof
(
recv_only
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_RXW_MAX_RTE
,
&
rxw_max_rte
,
sizeof
(
rxw_max_rte
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_RXW_SECS
,
&
rxw_secs
,
sizeof
(
rxw_secs
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_PEER_EXPIRY
,
&
peer_expiry
,
sizeof
(
peer_expiry
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_SPMR_EXPIRY
,
&
spmr_expiry
,
sizeof
(
spmr_expiry
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_NAK_BO_IVL
,
&
nak_bo_ivl
,
sizeof
(
nak_bo_ivl
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_NAK_RPT_IVL
,
&
nak_rpt_ivl
,
sizeof
(
nak_rpt_ivl
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_NAK_RDATA_IVL
,
&
nak_rdata_ivl
,
sizeof
(
nak_rdata_ivl
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_NAK_DATA_RETRIES
,
&
nak_data_retries
,
sizeof
(
nak_data_retries
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_NAK_NCF_RETRIES
,
&
nak_ncf_retries
,
sizeof
(
nak_ncf_retries
)))
goto
err_abort
;
}
else
{
const
int
send_only
=
1
,
txw_max_rte
=
options
.
rate
*
1000
/
8
,
txw_secs
=
options
.
recovery_ivl
,
ambient_spm
=
pgm_msecs
(
8192
),
heartbeat_spm
[]
=
{
pgm_msecs
(
4
),
pgm_msecs
(
4
),
pgm_msecs
(
8
),
pgm_msecs
(
16
),
pgm_msecs
(
32
),
pgm_msecs
(
64
),
pgm_msecs
(
128
),
pgm_msecs
(
256
),
pgm_msecs
(
512
),
pgm_msecs
(
1024
),
pgm_msecs
(
2048
),
pgm_msecs
(
4096
),
pgm_msecs
(
8192
)
};
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_SEND_ONLY
,
&
send_only
,
sizeof
(
send_only
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_TXW_MAX_RTE
,
&
txw_max_rte
,
sizeof
(
txw_max_rte
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_TXW_SECS
,
&
txw_secs
,
sizeof
(
txw_secs
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_AMBIENT_SPM
,
&
ambient_spm
,
sizeof
(
ambient_spm
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_HEARTBEAT_SPM
,
&
heartbeat_spm
,
sizeof
(
heartbeat_spm
)))
goto
err_abort
;
}
}
// Set NAK transmit back-off interval [us].
// PGM transport GSI.
rc
=
pgm_transport_set_nak_bo_ivl
(
transport
,
50
*
1000
);
struct
pgm_sockaddr_t
addr
;
zmq_assert
(
rc
==
TRUE
);
// Set timeout before repeating NAK [us].
rc
=
pgm_transport_set_nak_rpt_ivl
(
transport
,
200
*
1000
);
zmq_assert
(
rc
==
TRUE
);
// Set timeout for receiving RDATA.
rc
=
pgm_transport_set_nak_rdata_ivl
(
transport
,
200
*
1000
);
zmq_assert
(
rc
==
TRUE
);
// Set retries for NAK without NCF/DATA (NAK_DATA_RETRIES).
rc
=
pgm_transport_set_nak_data_retries
(
transport
,
5
);
zmq_assert
(
rc
==
TRUE
);
// Set retries for NCF after NAK (NAK_NCF_RETRIES).
rc
=
pgm_transport_set_nak_ncf_retries
(
transport
,
2
);
zmq_assert
(
rc
==
TRUE
);
// Set timeout for removing a dead peer [us].
rc
=
pgm_transport_set_peer_expiry
(
transport
,
5
*
8192
*
1000
);
zmq_assert
(
rc
==
TRUE
);
// Set expiration time of SPM Requests [us].
memset
(
&
addr
,
0
,
sizeof
(
addr
));
rc
=
pgm_transport_set_spmr_expiry
(
transport
,
25
*
1000
)
;
addr
.
sa_port
=
port_number
;
zmq_assert
(
rc
==
TRUE
)
;
addr
.
sa_addr
.
sport
=
DEFAULT_DATA_SOURCE_PORT
;
// Set the size of the receive window.
if
(
options
.
identity
.
size
()
>
0
)
{
// Data rate is in [B/s]. options.rate is in [kb/s].
if
(
options
.
rate
<=
0
)
{
errno
=
EINVAL
;
return
-
1
;
}
rc
=
pgm_transport_set_rxw_max_rte
(
transport
,
options
.
rate
*
1000
/
8
);
if
(
rc
!=
TRUE
)
{
errno
=
EINVAL
;
return
-
1
;
}
// Recovery interval [s].
if
(
options
.
recovery_ivl
<=
0
)
{
errno
=
EINVAL
;
return
-
1
;
}
rc
=
pgm_transport_set_rxw_secs
(
transport
,
options
.
recovery_ivl
);
if
(
rc
!=
TRUE
)
{
errno
=
EINVAL
;
return
-
1
;
}
// Create gsi from identity.
if
(
!
pgm_gsi_create_from_data
(
&
addr
.
sa_addr
.
gsi
,
options
.
identity
.
data
(),
options
.
identity
.
size
()))
goto
err_abort
;
}
else
{
}
else
{
// Sender transport.
// Generate random gsi.
std
::
string
gsi_base
=
uuid_t
().
to_string
();
// Waiting pipe won't be read.
if
(
!
pgm_gsi_create_from_string
(
&
addr
.
sa_addr
.
gsi
,
gsi_base
.
c_str
(),
-
1
))
rc
=
pgm_transport_set_send_only
(
transport
,
TRUE
);
goto
err_abort
;
zmq_assert
(
rc
==
TRUE
);
if
(
options
.
sndbuf
)
{
rc
=
pgm_transport_set_sndbuf
(
transport
,
(
int
)
options
.
sndbuf
);
if
(
rc
!=
TRUE
)
return
-
1
;
}
// Set the size of the send window.
// Data rate is in [B/s] options.rate is in [kb/s].
if
(
options
.
rate
<=
0
)
{
errno
=
EINVAL
;
return
-
1
;
}
rc
=
pgm_transport_set_txw_max_rte
(
transport
,
options
.
rate
*
1000
/
8
);
if
(
rc
!=
TRUE
)
{
errno
=
EINVAL
;
return
-
1
;
}
// Recovery interval [s].
if
(
options
.
recovery_ivl
<=
0
)
{
errno
=
EINVAL
;
return
-
1
;
}
rc
=
pgm_transport_set_txw_secs
(
transport
,
options
.
recovery_ivl
);
if
(
rc
!=
TRUE
)
{
errno
=
EINVAL
;
return
-
1
;
}
}
// Set interval of background SPM packets [us].
// Bind a transport to the specified network devices.
rc
=
pgm_transport_set_ambient_spm
(
transport
,
8192
*
1000
);
struct
pgm_interface_req_t
if_req
;
zmq_assert
(
rc
==
TRUE
);
memset
(
&
if_req
,
0
,
sizeof
(
if_req
));
if_req
.
ir_interface
=
res
->
ai_recv_addrs
[
0
].
gsr_interface
;
// Set intervals of data flushing SPM packets [us].
if_req
.
ir_scope_id
=
0
;
guint
spm_heartbeat
[]
=
{
4
*
1000
,
4
*
1000
,
8
*
1000
,
16
*
1000
,
if
(
AF_INET6
==
sa_family
)
{
32
*
1000
,
64
*
1000
,
128
*
1000
,
256
*
1000
,
512
*
1000
,
struct
sockaddr_in6
sa6
;
1024
*
1000
,
2048
*
1000
,
4096
*
1000
,
8192
*
1000
};
memcpy
(
&
sa6
,
&
res
->
ai_recv_addrs
[
0
].
gsr_group
,
sizeof
(
sa6
));
rc
=
pgm_transport_set_heartbeat_spm
(
transport
,
spm_heartbeat
,
if_req
.
ir_scope_id
=
sa6
.
sin6_scope_id
;
G_N_ELEMENTS
(
spm_heartbeat
));
}
zmq_assert
(
rc
==
TRUE
);
if
(
!
pgm_bind3
(
sock
,
&
addr
,
sizeof
(
addr
),
&
if_req
,
sizeof
(
if_req
),
&
if_req
,
sizeof
(
if_req
),
&
pgm_error
))
{
if
(
pgm_error
->
domain
==
PGM_ERROR_DOMAIN_IF
&&
(
pgm_error
->
code
==
PGM_ERROR_INVAL
||
pgm_error
->
code
==
PGM_ERROR_XDEV
||
pgm_error
->
code
==
PGM_ERROR_NODEV
||
pgm_error
->
code
==
PGM_ERROR_NOTUNIQ
||
pgm_error
->
code
==
PGM_ERROR_ADDRFAMILY
||
pgm_error
->
code
==
PGM_ERROR_AFNOSUPPORT
||
pgm_error
->
code
==
PGM_ERROR_NODATA
||
pgm_error
->
code
==
PGM_ERROR_NONAME
||
pgm_error
->
code
==
PGM_ERROR_SERVICE
))
goto
err_abort
;
if
(
pgm_error
->
domain
==
PGM_ERROR_DOMAIN_SOCKET
&&
(
pgm_error
->
code
==
PGM_ERROR_FAILED
))
goto
err_abort
;
/* fatal OpenPGM API error */
zmq_assert
(
false
);
}
}
// Enable multicast loopback.
// Join IP multicast groups
if
(
options
.
use_multicast_loop
)
{
for
(
unsigned
i
=
0
;
i
<
res
->
ai_recv_addrs_len
;
i
++
)
rc
=
pgm_transport_set_multicast_loop
(
transport
,
true
);
{
zmq_assert
(
rc
==
TRUE
);
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_JOIN_GROUP
,
&
res
->
ai_recv_addrs
[
i
],
sizeof
(
struct
group_req
)))
goto
err_abort
;
}
}
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_SEND_GROUP
,
&
res
->
ai_send_addrs
[
0
],
sizeof
(
struct
group_req
)))
goto
err_abort
;
pgm_freeaddrinfo
(
res
);
// Bind a transport to the specified network devices.
// Set IP level parameters
if
(
!
pgm_transport_bind
(
transport
,
&
pgm_error
))
{
{
if
(
pgm_error
->
domain
==
PGM_IF_ERROR
&&
(
const
int
nonblocking
=
1
,
pgm_error
->
code
==
PGM_IF_ERROR_INVAL
||
multicast_loop
=
options
.
use_multicast_loop
?
1
:
0
,
pgm_error
->
code
==
PGM_IF_ERROR_XDEV
||
multicast_hops
=
16
,
pgm_error
->
code
==
PGM_IF_ERROR_NODEV
||
dscp
=
0x2e
<<
2
;
/* Expedited Forwarding PHB for network elements, no ECN. */
pgm_error
->
code
==
PGM_IF_ERROR_NOTUNIQ
||
pgm_error
->
code
==
PGM_IF_ERROR_ADDRFAMILY
||
pgm_error
->
code
==
PGM_IF_ERROR_FAMILY
||
pgm_error
->
code
==
PGM_IF_ERROR_NODATA
||
pgm_error
->
code
==
PGM_IF_ERROR_NONAME
||
pgm_error
->
code
==
PGM_IF_ERROR_SERVICE
))
{
g_error_free
(
pgm_error
);
errno
=
EINVAL
;
return
-
1
;
}
if
(
pgm_error
->
domain
==
PGM_TRANSPORT_ERROR
&&
(
pgm_error
->
code
==
PGM_TRANSPORT_ERROR_FAILED
))
{
g_error_free
(
pgm_error
);
errno
=
EINVAL
;
return
-
1
;
}
zmq_assert
(
false
);
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_MULTICAST_LOOP
,
&
multicast_loop
,
sizeof
(
multicast_loop
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_MULTICAST_HOPS
,
&
multicast_hops
,
sizeof
(
multicast_hops
)))
goto
err_abort
;
if
(
AF_INET6
!=
sa_family
&&
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_TOS
,
&
dscp
,
sizeof
(
dscp
)));
goto
err_abort
;
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_NOBLOCK
,
&
nonblocking
,
sizeof
(
nonblocking
)))
goto
err_abort
;
}
}
// For receiver transport preallocate pgm_msgv array.
// For receiver transport preallocate pgm_msgv array.
// TODO: ?
if
(
receiver
)
{
if
(
receiver
)
{
zmq_assert
(
in_batch_size
>
0
);
zmq_assert
(
in_batch_size
>
0
);
size_t
max_tsdu_size
=
get_max_tsdu_size
();
size_t
max_tsdu_size
=
get_max_tsdu_size
();
...
@@ -340,74 +306,102 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
...
@@ -340,74 +306,102 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
}
}
return
0
;
return
0
;
err_abort
:
if
(
sock
!=
NULL
)
{
pgm_close
(
sock
,
FALSE
);
sock
=
NULL
;
}
if
(
res
!=
NULL
)
{
pgm_freeaddrinfo
(
res
);
res
=
NULL
;
}
if
(
pgm_error
!=
NULL
)
{
pgm_error_free
(
pgm_error
);
pgm_error
=
NULL
;
}
errno
=
EINVAL
;
return
-
1
;
}
}
zmq
::
pgm_socket_t
::~
pgm_socket_t
()
zmq
::
pgm_socket_t
::~
pgm_socket_t
()
{
{
if
(
pgm_msgv
)
if
(
pgm_msgv
)
free
(
pgm_msgv
);
free
(
pgm_msgv
);
if
(
transport
)
if
(
sock
)
pgm_
transport_destroy
(
transport
,
TRUE
);
pgm_
close
(
sock
,
TRUE
);
}
}
// Get receiver fds. recv_fd is from transport->recv_sock
// Get receiver fds. receive_fd_ is signaled for incoming
// waiting_pipe_fd is from transport->waiting_pipe [0]
// packets, waiting_pipe_fd_ is signaled for state driven
// events and data.
void
zmq
::
pgm_socket_t
::
get_receiver_fds
(
int
*
receive_fd_
,
void
zmq
::
pgm_socket_t
::
get_receiver_fds
(
int
*
receive_fd_
,
int
*
waiting_pipe_fd_
)
int
*
waiting_pipe_fd_
)
{
{
socklen_t
socklen
;
bool
rc
;
zmq_assert
(
receive_fd_
);
zmq_assert
(
receive_fd_
);
zmq_assert
(
waiting_pipe_fd_
);
zmq_assert
(
waiting_pipe_fd_
);
// recv_sock2 should not be used - check it.
socklen
=
sizeof
(
*
receive_fd_
);
zmq_assert
(
transport
->
recv_sock2
==
-
1
);
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
PGM_RECV_SOCK
,
receive_fd_
,
&
socklen
);
zmq_assert
(
rc
);
zmq_assert
(
socklen
==
sizeof
(
*
receive_fd_
));
// Check if transport can receive data and can not send.
socklen
=
sizeof
(
*
waiting_pipe_fd_
);
zmq_assert
(
transport
->
can_recv_data
);
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
PGM_PENDING_SOCK
,
waiting_pipe_fd_
,
&
socklen
);
zmq_assert
(
!
transport
->
can_send_data
);
zmq_assert
(
rc
);
zmq_assert
(
socklen
==
sizeof
(
*
waiting_pipe_fd_
));
// Take FDs directly from transport.
*
receive_fd_
=
pgm_transport_get_recv_fd
(
transport
);
*
waiting_pipe_fd_
=
pgm_transport_get_pending_fd
(
transport
);
}
}
// Get fds and store them into user allocated memory.
// Get fds and store them into user allocated memory.
// send
er_fd is from pgm_transport->send_sock
.
// send
_fd is for non-blocking send wire notifications
.
// receive_fd_ is f
rom transport->recv_sock
.
// receive_fd_ is f
or incoming back-channel protocol packets
.
// rdata_notify_fd_ is
from transport->rdata_notify
.
// rdata_notify_fd_ is
raised for waiting repair transmissions
.
// pending_notify_fd_ is f
rom transport->pending_notify
.
// pending_notify_fd_ is f
or state driven events
.
void
zmq
::
pgm_socket_t
::
get_sender_fds
(
int
*
send_fd_
,
int
*
receive_fd_
,
void
zmq
::
pgm_socket_t
::
get_sender_fds
(
int
*
send_fd_
,
int
*
receive_fd_
,
int
*
rdata_notify_fd_
,
int
*
pending_notify_fd_
)
int
*
rdata_notify_fd_
,
int
*
pending_notify_fd_
)
{
{
socklen_t
socklen
;
bool
rc
;
zmq_assert
(
send_fd_
);
zmq_assert
(
send_fd_
);
zmq_assert
(
receive_fd_
);
zmq_assert
(
receive_fd_
);
zmq_assert
(
rdata_notify_fd_
);
zmq_assert
(
rdata_notify_fd_
);
zmq_assert
(
pending_notify_fd_
);
zmq_assert
(
pending_notify_fd_
);
// recv_sock2 should not be used - check it.
socklen
=
sizeof
(
*
send_fd_
);
zmq_assert
(
transport
->
recv_sock2
==
-
1
);
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
PGM_SEND_SOCK
,
send_fd_
,
&
socklen
);
zmq_assert
(
rc
);
// Check if transport can send data and can not receive.
zmq_assert
(
socklen
==
sizeof
(
*
receive_fd_
));
zmq_assert
(
transport
->
can_send_data
);
zmq_assert
(
!
transport
->
can_recv_data
);
socklen
=
sizeof
(
*
receive_fd_
);
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
PGM_RECV_SOCK
,
receive_fd_
,
&
socklen
);
// Take FDs from transport.
zmq_assert
(
rc
);
*
send_fd_
=
pgm_transport_get_send_fd
(
transport
);
zmq_assert
(
socklen
==
sizeof
(
*
receive_fd_
));
*
receive_fd_
=
pgm_transport_get_recv_fd
(
transport
);
socklen
=
sizeof
(
*
rdata_notify_fd_
);
*
rdata_notify_fd_
=
pgm_transport_get_repair_fd
(
transport
);
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
PGM_REPAIR_SOCK
,
rdata_notify_fd_
,
&
socklen
);
*
pending_notify_fd_
=
pgm_transport_get_pending_fd
(
transport
);
zmq_assert
(
rc
);
zmq_assert
(
socklen
==
sizeof
(
*
rdata_notify_fd_
));
socklen
=
sizeof
(
*
pending_notify_fd_
);
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
PGM_PENDING_SOCK
,
pending_notify_fd_
,
&
socklen
);
zmq_assert
(
rc
);
zmq_assert
(
socklen
==
sizeof
(
*
pending_notify_fd_
));
}
}
// Send one APDU, transmit window owned memory.
// Send one APDU, transmit window owned memory.
// data_len_ must be less than one TPDU.
size_t
zmq
::
pgm_socket_t
::
send
(
unsigned
char
*
data_
,
size_t
data_len_
)
size_t
zmq
::
pgm_socket_t
::
send
(
unsigned
char
*
data_
,
size_t
data_len_
)
{
{
size_t
nbytes
=
0
;
size_t
nbytes
=
0
;
PGMIOStatus
status
=
pgm_send
(
transport
,
data_
,
data_len_
,
&
nbytes
);
const
int
status
=
pgm_send
(
sock
,
data_
,
data_len_
,
&
nbytes
);
if
(
nbytes
!=
data_len_
)
{
if
(
nbytes
!=
data_len_
)
{
zmq_assert
(
status
==
PGM_IO_STATUS_RATE_LIMITED
);
zmq_assert
(
status
==
PGM_IO_STATUS_RATE_LIMITED
||
status
==
PGM_IO_STATUS_WOULD_BLOCK
);
zmq_assert
(
nbytes
==
0
);
zmq_assert
(
nbytes
==
0
);
}
}
...
@@ -421,10 +415,16 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
...
@@ -421,10 +415,16 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
// Return max TSDU size without fragmentation from current PGM transport.
// Return max TSDU size without fragmentation from current PGM transport.
size_t
zmq
::
pgm_socket_t
::
get_max_tsdu_size
()
size_t
zmq
::
pgm_socket_t
::
get_max_tsdu_size
()
{
{
return
(
size_t
)
pgm_transport_max_tsdu
(
transport
,
false
);
int
max_tsdu
=
0
;
socklen_t
optlen
=
sizeof
(
max_tsdu
);
bool
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
PGM_MSS
,
&
max_tsdu
,
&
optlen
);
zmq_assert
(
rc
);
zmq_assert
(
optlen
==
sizeof
(
max_tsdu
));
return
(
size_t
)
max_tsdu
;
}
}
// pgm_
transport_
recvmsgv is called to fill the pgm_msgv array up to
// pgm_recvmsgv is called to fill the pgm_msgv array up to
// pgm_msgv_len. In subsequent calls data from pgm_msgv structure are
// pgm_msgv_len. In subsequent calls data from pgm_msgv structure are
// returned.
// returned.
ssize_t
zmq
::
pgm_socket_t
::
receive
(
void
**
raw_data_
,
const
pgm_tsi_t
**
tsi_
)
ssize_t
zmq
::
pgm_socket_t
::
receive
(
void
**
raw_data_
,
const
pgm_tsi_t
**
tsi_
)
...
@@ -453,15 +453,15 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
...
@@ -453,15 +453,15 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
// Receive a vector of Application Protocol Domain Unit's (APDUs)
// Receive a vector of Application Protocol Domain Unit's (APDUs)
// from the transport.
// from the transport.
GError
*
pgm_error
=
NULL
;
pgm_error_t
*
pgm_error
=
NULL
;
const
PGMIOStatus
status
=
pgm_recvmsgv
(
transport
,
pgm_msgv
,
const
int
status
=
pgm_recvmsgv
(
sock
,
pgm_msgv
,
pgm_msgv_len
,
MSG_
DONTWAIT
,
&
nbytes_rec
,
&
pgm_error
);
pgm_msgv_len
,
MSG_
ERRQUEUE
,
&
nbytes_rec
,
&
pgm_error
);
zmq_assert
(
status
!=
PGM_IO_STATUS_ERROR
);
zmq_assert
(
status
!=
PGM_IO_STATUS_ERROR
);
// In a case when no ODATA/RDATA fired POLLIN event (SPM...)
// In a case when no ODATA/RDATA fired POLLIN event (SPM...)
// pgm_recvmsg returns
?
.
// pgm_recvmsg returns
PGM_IO_STATUS_TIMER_PENDING
.
if
(
status
==
PGM_IO_STATUS_TIMER_PENDING
)
{
if
(
status
==
PGM_IO_STATUS_TIMER_PENDING
)
{
zmq_assert
(
nbytes_rec
==
0
);
zmq_assert
(
nbytes_rec
==
0
);
...
@@ -472,18 +472,40 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
...
@@ -472,18 +472,40 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
return
0
;
return
0
;
}
}
// Send SPMR, NAK, ACK is rate limited.
if
(
status
==
PGM_IO_STATUS_RATE_LIMITED
)
{
zmq_assert
(
nbytes_rec
==
0
);
// In case if no RDATA/ODATA caused POLLIN 0 is
// returned.
nbytes_rec
=
0
;
return
0
;
}
// No peers and hence no incoming packets.
if
(
status
==
PGM_IO_STATUS_WOULD_BLOCK
)
{
zmq_assert
(
nbytes_rec
==
0
);
// In case if no RDATA/ODATA caused POLLIN 0 is
// returned.
nbytes_rec
=
0
;
return
0
;
}
// Data loss.
// Data loss.
if
(
status
==
PGM_IO_STATUS_RESET
)
{
if
(
status
==
PGM_IO_STATUS_RESET
)
{
pgm_peer_t
*
peer
=
(
pgm_peer_t
*
)
transport
->
peers_pending
->
data
;
struct
pgm_sk_buff_t
*
skb
=
pgm_msgv
[
0
].
msgv_skb
[
0
]
;
// Save lost data TSI.
// Save lost data TSI.
*
tsi_
=
&
peer
->
tsi
;
*
tsi_
=
&
skb
->
tsi
;
nbytes_rec
=
0
;
nbytes_rec
=
0
;
// In case of dala loss -1 is returned.
// In case of dala loss -1 is returned.
errno
=
EINVAL
;
errno
=
EINVAL
;
g_error_free
(
pgm_error
);
pgm_free_skb
(
skb
);
return
-
1
;
return
-
1
;
}
}
...
@@ -522,16 +544,16 @@ void zmq::pgm_socket_t::process_upstream ()
...
@@ -522,16 +544,16 @@ void zmq::pgm_socket_t::process_upstream ()
pgm_msgv_t
dummy_msg
;
pgm_msgv_t
dummy_msg
;
size_t
dummy_bytes
=
0
;
size_t
dummy_bytes
=
0
;
GError
*
pgm_error
=
NULL
;
pgm_error_t
*
pgm_error
=
NULL
;
PGMIOStatus
status
=
pgm_recvmsgv
(
transport
,
&
dummy_msg
,
const
int
status
=
pgm_recvmsgv
(
sock
,
&
dummy_msg
,
1
,
MSG_
DONTWAIT
,
&
dummy_bytes
,
&
pgm_error
);
1
,
MSG_
ERRQUEUE
,
&
dummy_bytes
,
&
pgm_error
);
zmq_assert
(
status
!=
PGM_IO_STATUS_ERROR
);
zmq_assert
(
status
!=
PGM_IO_STATUS_ERROR
);
// No data should be returned.
// No data should be returned.
zmq_assert
(
dummy_bytes
==
0
&&
(
status
==
PGM_IO_STATUS_TIMER_PENDING
||
zmq_assert
(
dummy_bytes
==
0
&&
(
status
==
PGM_IO_STATUS_TIMER_PENDING
||
status
==
PGM_IO_STATUS_RATE_LIMITED
));
status
==
PGM_IO_STATUS_RATE_LIMITED
||
status
==
PGM_IO_STATUS_WOULD_BLOCK
));
}
}
#endif
#endif
...
...
src/pgm_socket.hpp
View file @
00cd7d49
...
@@ -74,7 +74,7 @@ namespace zmq
...
@@ -74,7 +74,7 @@ namespace zmq
private
:
private
:
// OpenPGM transport
// OpenPGM transport
pgm_
transport_t
*
transport
;
pgm_
sock_t
*
sock
;
// Associated socket options.
// Associated socket options.
options_t
options
;
options_t
options
;
...
...
src/zmq.cpp
View file @
00cd7d49
...
@@ -242,20 +242,20 @@ void *zmq_init (int io_threads_)
...
@@ -242,20 +242,20 @@ void *zmq_init (int io_threads_)
// protocol ID. Note that if you want to use gettimeofday and sleep for
// protocol ID. Note that if you want to use gettimeofday and sleep for
// openPGM timing, set environment variables PGM_TIMER to "GTOD" and
// openPGM timing, set environment variables PGM_TIMER to "GTOD" and
// PGM_SLEEP to "USLEEP".
// PGM_SLEEP to "USLEEP".
GError
*
pgm_error
=
NULL
;
pgm_error_t
*
pgm_error
=
NULL
;
int
rc
=
pgm_init
(
&
pgm_error
);
bool
rc
=
pgm_init
(
&
pgm_error
);
if
(
rc
!=
TRUE
)
{
if
(
rc
!=
TRUE
)
{
if
(
pgm_error
->
domain
==
PGM_
IF_ERROR
&&
(
if
(
pgm_error
->
domain
==
PGM_
ERROR_DOMAIN_IF
&&
(
pgm_error
->
code
==
PGM_
IF_
ERROR_INVAL
||
pgm_error
->
code
==
PGM_ERROR_INVAL
||
pgm_error
->
code
==
PGM_
IF_
ERROR_XDEV
||
pgm_error
->
code
==
PGM_ERROR_XDEV
||
pgm_error
->
code
==
PGM_
IF_
ERROR_NODEV
||
pgm_error
->
code
==
PGM_ERROR_NODEV
||
pgm_error
->
code
==
PGM_
IF_
ERROR_NOTUNIQ
||
pgm_error
->
code
==
PGM_ERROR_NOTUNIQ
||
pgm_error
->
code
==
PGM_
IF_
ERROR_ADDRFAMILY
||
pgm_error
->
code
==
PGM_ERROR_ADDRFAMILY
||
pgm_error
->
code
==
PGM_
IF_ERROR_FAMILY
||
pgm_error
->
code
==
PGM_
ERROR_AFNOSUPPORT
||
pgm_error
->
code
==
PGM_
IF_
ERROR_NODATA
||
pgm_error
->
code
==
PGM_ERROR_NODATA
||
pgm_error
->
code
==
PGM_
IF_
ERROR_NONAME
||
pgm_error
->
code
==
PGM_ERROR_NONAME
||
pgm_error
->
code
==
PGM_
IF_
ERROR_SERVICE
))
{
pgm_error
->
code
==
PGM_ERROR_SERVICE
))
{
g
_error_free
(
pgm_error
);
pgm
_error_free
(
pgm_error
);
errno
=
EINVAL
;
errno
=
EINVAL
;
return
NULL
;
return
NULL
;
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment