Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
b0a1a16f
Commit
b0a1a16f
authored
Sep 23, 2009
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'master' of git@github.com:sustrik/zeromq2
parents
088a2db6
25211307
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
266 additions
and
230 deletions
+266
-230
Makefile.am
Makefile.am
+1
-0
configure.in
configure.in
+19
-1
lost_data_tsi.patch
foreign/openpgm/lost_data_tsi.patch
+76
-0
options.cpp
src/options.cpp
+1
-1
pgm_receiver.cpp
src/pgm_receiver.cpp
+91
-85
pgm_receiver.hpp
src/pgm_receiver.hpp
+24
-13
pgm_socket.cpp
src/pgm_socket.cpp
+40
-119
pgm_socket.hpp
src/pgm_socket.hpp
+8
-11
platform.hpp.in
src/platform.hpp.in
+6
-0
No files found.
Makefile.am
View file @
b0a1a16f
...
@@ -6,6 +6,7 @@ SUBDIRS = src $(DIR_PERF) devices bindings
...
@@ -6,6 +6,7 @@ SUBDIRS = src $(DIR_PERF) devices bindings
DIST_SUBDIRS
=
src perf devices bindings
DIST_SUBDIRS
=
src perf devices bindings
EXTRA_DIST
=
$(top_srcdir)
/foreign/openpgm/@pgm_basename@.tar.bz2
\
EXTRA_DIST
=
$(top_srcdir)
/foreign/openpgm/@pgm_basename@.tar.bz2
\
$(top_srcdir)
/foreign/openpgm/lost_data_tsi.patch
\
$(top_srcdir)
/foreign/xmlParser/xmlParser.cpp
\
$(top_srcdir)
/foreign/xmlParser/xmlParser.cpp
\
$(top_srcdir)
/foreign/xmlParser/xmlParser.hpp
$(top_srcdir)
/foreign/xmlParser/xmlParser.hpp
...
...
configure.in
View file @
b0a1a16f
...
@@ -388,11 +388,21 @@ if test "x$with_pgm_ext" != "xno"; then
...
@@ -388,11 +388,21 @@ if test "x$with_pgm_ext" != "xno"; then
;;
;;
esac
esac
AC_CHECK_HEADERS(openssl/md5.h, [] ,
[AC_MSG_ERROR([To run configure with --with-pgm option, openssl/md5.h has to be usable.])])
AC_CHECK_LIB(ssl, MD5_Init, , [AC_MSG_ERROR([Could not link with libuuid, install develop version.])])
AC_CHECK_PROG(have_tar, tar, yes, no)
AC_CHECK_PROG(have_tar, tar, yes, no)
if test "x$have_tar" != "xyes"; then
if test "x$have_tar" != "xyes"; then
AC_MSG_ERROR([Could not find tar.])
AC_MSG_ERROR([Could not find tar.])
fi
fi
AC_CHECK_PROG(have_patch, patch, yes, no)
if test "x$have_patch" != "xyes"; then
AC_MSG_ERROR([Could not find patch.])
fi
AC_CHECK_PROG(have_bunzip2, bunzip2, yes, no)
AC_CHECK_PROG(have_bunzip2, bunzip2, yes, no)
if test "x$have_bunzip2" != "xyes"; then
if test "x$have_bunzip2" != "xyes"; then
AC_MSG_ERROR([Could not find bunzip2.])
AC_MSG_ERROR([Could not find bunzip2.])
...
@@ -416,7 +426,15 @@ if test "x$with_pgm_ext" != "xno"; then
...
@@ -416,7 +426,15 @@ if test "x$with_pgm_ext" != "xno"; then
if tar -xjf foreign/openpgm/${pgm_basename}.tar.bz2 -C foreign/openpgm/; then
if tar -xjf foreign/openpgm/${pgm_basename}.tar.bz2 -C foreign/openpgm/; then
AC_MSG_RESULT([yes])
AC_MSG_RESULT([yes])
else
else
AC_MSG_ERROR([Could not unpack foreign/openpgm/${pgm_basename}.tar.bz2 file])
AC_MSG_ERROR([Could not unpack foreign/openpgm/${pgm_basename}.tar.bz2 file.])
fi
AC_MSG_CHECKING([Patching ${pgm_basename}])
if patch --silent -p0 < foreign/openpgm/lost_data_tsi.patch; then
AC_MSG_RESULT([yes])
else
AC_MSG_ERROR([Could not apply foreign/openpgm/lost_data_tsi.patch file.])
fi
fi
# Generate galois_tables.c
# Generate galois_tables.c
...
...
foreign/openpgm/lost_data_tsi.patch
0 → 100644
View file @
b0a1a16f
--- libpgm-1.2.14/openpgm/pgm/transport.c 2009-08-27 04:54:04.000000000 +0200
+++ foreign/openpgm/libpgm-1.2.14/openpgm/pgm/transport.c 2009-09-22 14:36:07.713124619 +0200
@@ -2342,6 +2342,7 @@
if (waiting_rxw->ack_cumulative_losses != waiting_rxw->cumulative_losses)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), waiting_rxw->identifier, sizeof (pgm_tsi_t));
waiting_rxw->pgm_sock_err.lost_count = waiting_rxw->cumulative_losses - waiting_rxw->ack_cumulative_losses;
waiting_rxw->ack_cumulative_losses = waiting_rxw->cumulative_losses;
}
@@ -2705,6 +2706,7 @@
if (waiting_rxw->ack_cumulative_losses != waiting_rxw->cumulative_losses)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), waiting_rxw->identifier, sizeof (pgm_tsi_t));
waiting_rxw->pgm_sock_err.lost_count = waiting_rxw->cumulative_losses - waiting_rxw->ack_cumulative_losses;
waiting_rxw->ack_cumulative_losses = waiting_rxw->cumulative_losses;
}
@@ -3407,6 +3409,7 @@
!sender_rxw->waiting_link.data)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), sender_rxw->identifier, sizeof (pgm_tsi_t));
sender_rxw->pgm_sock_err.lost_count = sender_rxw->cumulative_losses - sender_rxw->ack_cumulative_losses;
sender_rxw->ack_cumulative_losses = sender_rxw->cumulative_losses;
@@ -3823,6 +3826,7 @@
!peer_rxw->waiting_link.data)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), peer_rxw->identifier, sizeof (pgm_tsi_t));
peer_rxw->pgm_sock_err.lost_count = peer_rxw->cumulative_losses - peer_rxw->ack_cumulative_losses;
peer_rxw->ack_cumulative_losses = peer_rxw->cumulative_losses;
@@ -3952,6 +3956,7 @@
!peer_rxw->waiting_link.data)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), peer_rxw->identifier, sizeof (pgm_tsi_t));
peer_rxw->pgm_sock_err.lost_count = peer_rxw->cumulative_losses - peer_rxw->ack_cumulative_losses;
peer_rxw->ack_cumulative_losses = peer_rxw->cumulative_losses;
@@ -4849,6 +4854,7 @@
!rxw->waiting_link.data)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), rxw->identifier, sizeof (pgm_tsi_t));
rxw->pgm_sock_err.lost_count = rxw->cumulative_losses - rxw->ack_cumulative_losses;
rxw->ack_cumulative_losses = rxw->cumulative_losses;
@@ -5166,6 +5172,7 @@
!rxw->waiting_link.data)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), rxw->identifier, sizeof (pgm_tsi_t));
rxw->pgm_sock_err.lost_count = rxw->cumulative_losses - rxw->ack_cumulative_losses;
rxw->ack_cumulative_losses = rxw->cumulative_losses;
@@ -5303,6 +5310,7 @@
!rxw->waiting_link.data)
{
transport->has_lost_data = TRUE;
+ memcpy (&(transport->lost_data_tsi), rxw->identifier, sizeof (pgm_tsi_t));
rxw->pgm_sock_err.lost_count = rxw->cumulative_losses - rxw->ack_cumulative_losses;
rxw->ack_cumulative_losses = rxw->cumulative_losses;
--- libpgm-1.2.14/openpgm/pgm/include/pgm/transport.h 2009-08-27 04:53:23.000000000 +0200
+++ foreign/openpgm/libpgm-1.2.14/openpgm/pgm/include/pgm/transport.h 2009-09-21 15:49:36.000000000 +0200
@@ -205,6 +205,7 @@
gboolean is_bound;
gboolean is_open;
gboolean has_lost_data;
+ pgm_tsi_t lost_data_tsi;
gboolean will_close_on_failure;
gboolean can_send_data; /* and SPMs */
src/options.cpp
View file @
b0a1a16f
...
@@ -29,7 +29,7 @@ zmq::options_t::options_t () :
...
@@ -29,7 +29,7 @@ zmq::options_t::options_t () :
affinity
(
0
),
affinity
(
0
),
rate
(
100
),
rate
(
100
),
recovery_ivl
(
10
),
recovery_ivl
(
10
),
use_multicast_loop
(
fals
e
),
use_multicast_loop
(
tru
e
),
requires_in
(
false
),
requires_in
(
false
),
requires_out
(
false
)
requires_out
(
false
)
{
{
...
...
src/pgm_receiver.cpp
View file @
b0a1a16f
...
@@ -46,26 +46,21 @@
...
@@ -46,26 +46,21 @@
zmq
::
pgm_receiver_t
::
pgm_receiver_t
(
class
io_thread_t
*
parent_
,
zmq
::
pgm_receiver_t
::
pgm_receiver_t
(
class
io_thread_t
*
parent_
,
const
options_t
&
options_
,
const
char
*
session_name_
)
:
const
options_t
&
options_
,
const
char
*
session_name_
)
:
io_object_t
(
parent_
),
io_object_t
(
parent_
),
decoder
(
NULL
),
pgm_socket
(
true
,
options_
),
pgm_socket
(
true
,
options_
),
options
(
options_
),
options
(
options_
),
session_name
(
session_name_
),
session_name
(
session_name_
),
joined
(
false
),
inout
(
NULL
)
inout
(
NULL
)
{
{
}
}
zmq
::
pgm_receiver_t
::~
pgm_receiver_t
()
zmq
::
pgm_receiver_t
::~
pgm_receiver_t
()
{
{
if
(
decoder
)
// Destructor should not be called before unplug.
delete
decoder
;
zmq_assert
(
peers
.
empty
())
;
}
}
int
zmq
::
pgm_receiver_t
::
init
(
bool
udp_encapsulation_
,
const
char
*
network_
)
int
zmq
::
pgm_receiver_t
::
init
(
bool
udp_encapsulation_
,
const
char
*
network_
)
{
{
decoder
=
new
zmq_decoder_t
;
zmq_assert
(
decoder
);
return
pgm_socket
.
init
(
udp_encapsulation_
,
network_
);
return
pgm_socket
.
init
(
udp_encapsulation_
,
network_
);
}
}
...
@@ -75,8 +70,6 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_)
...
@@ -75,8 +70,6 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_)
int
socket_fd
;
int
socket_fd
;
int
waiting_pipe_fd
;
int
waiting_pipe_fd
;
decoder
->
set_inout
(
inout_
);
// Fill socket_fd and waiting_pipe_fd from PGM transport
// Fill socket_fd and waiting_pipe_fd from PGM transport
pgm_socket
.
get_receiver_fds
(
&
socket_fd
,
&
waiting_pipe_fd
);
pgm_socket
.
get_receiver_fds
(
&
socket_fd
,
&
waiting_pipe_fd
);
...
@@ -95,9 +88,16 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_)
...
@@ -95,9 +88,16 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_)
void
zmq
::
pgm_receiver_t
::
unplug
()
void
zmq
::
pgm_receiver_t
::
unplug
()
{
{
// Delete decoders.
for
(
peer_t
::
iterator
it
=
peers
.
begin
();
it
!=
peers
.
end
();
it
++
)
{
if
(
it
->
second
.
decoder
!=
NULL
)
delete
it
->
second
.
decoder
;
}
peers
.
clear
();
rm_fd
(
socket_handle
);
rm_fd
(
socket_handle
);
rm_fd
(
pipe_handle
);
rm_fd
(
pipe_handle
);
decoder
->
set_inout
(
NULL
);
inout
=
NULL
;
inout
=
NULL
;
}
}
...
@@ -106,102 +106,108 @@ void zmq::pgm_receiver_t::revive ()
...
@@ -106,102 +106,108 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert
(
false
);
zmq_assert
(
false
);
}
}
void
zmq
::
pgm_receiver_t
::
reconnect
()
{
// Save inout ptr.
i_inout
*
inout_tmp
=
inout
;
// PGM receiver is not joined anymore.
joined
=
false
;
// Unplug - plug PGM transport.
unplug
();
delete
decoder
;
decoder
=
new
zmq_decoder_t
;
zmq_assert
(
decoder
);
plug
(
inout_tmp
);
}
// POLLIN event from socket or waiting_pipe.
// POLLIN event from socket or waiting_pipe.
void
zmq
::
pgm_receiver_t
::
in_event
()
void
zmq
::
pgm_receiver_t
::
in_event
()
{
{
void
*
data_with_offset
;
// Iterator to peers map.
peer_t
::
iterator
it
;
// Data from PGM socket.
unsigned
char
*
raw_data
=
NULL
;
const
pgm_tsi_t
*
tsi
=
NULL
;
ssize_t
nbytes
=
0
;
ssize_t
nbytes
=
0
;
// Read all data from pgm socket.
do
{
while
((
nbytes
=
receive_with_offset
(
&
data_with_offset
))
>
0
)
{
// Push all the data to the decoder.
decoder
->
write
((
unsigned
char
*
)
data_with_offset
,
nbytes
);
}
// Flush any messages decoder may have produced to the dispatcher
.
// Read data from underlying pgm_socket
.
inout
->
flush
(
);
nbytes
=
pgm_socket
.
receive
((
void
**
)
&
raw_data
,
&
tsi
);
// Data loss detected.
// No ODATA or RDATA.
if
(
nbytes
==
-
1
)
{
if
(
!
nbytes
)
break
;
// Recreate PGM transport.
// Fid TSI in peers list.
reconnect
();
it
=
peers
.
find
(
*
tsi
);
}
}
void
zmq
::
pgm_receiver_t
::
out_event
()
// Data loss.
{
if
(
nbytes
==
-
1
)
{
zmq_assert
(
false
);
}
ssize_t
zmq
::
pgm_receiver_t
::
receive_with_offset
zmq_assert
(
it
!=
peers
.
end
());
(
void
**
data_
)
{
// Data from PGM socket.
// Delete decoder and set joined to false.
void
*
rd
=
NULL
;
it
->
second
.
joined
=
false
;
unsigned
char
*
raw_data
=
NULL
;
if
(
it
->
second
.
decoder
!=
NULL
)
{
delete
it
->
second
.
decoder
;
it
->
second
.
decoder
=
NULL
;
}
// Read data from underlying pgm_socket.
break
;
ssize_t
nbytes
=
pgm_socket
.
receive
((
void
**
)
&
rd
);
}
raw_data
=
(
unsigned
char
*
)
rd
;
// No ODATA or RDATA
.
// Read offset of the fist message in current APDU
.
if
(
!
nbytes
)
zmq_assert
((
size_t
)
nbytes
>=
sizeof
(
uint16_t
));
return
0
;
uint16_t
apdu_offset
=
get_uint16
(
raw_data
)
;
// Data loss.
// Shift raw_data & decrease nbytes by the first message offset
if
(
nbytes
==
-
1
)
{
// information (sizeof uint16_t).
return
-
1
;
raw_data
+=
sizeof
(
uint16_t
);
}
nbytes
-=
sizeof
(
uint16_t
);
zmq_assert
(
apdu_offset
<=
nbytes
);
// Read offset of the fist message in current APDU
.
// New peer
.
uint16_t
apdu_offset
=
get_uint16
(
raw_data
);
if
(
it
==
peers
.
end
())
{
// Shift raw_data & decrease nbytes by the first message offset
peer_info_t
peer_info
=
{
false
,
NULL
};
// information (sizeof uint16_t).
it
=
peers
.
insert
(
std
::
make_pair
(
*
tsi
,
peer_info
)).
first
;
*
data_
=
raw_data
+
sizeof
(
uint16_t
);
nbytes
-=
sizeof
(
uint16_t
);
// There is not beginning of the message in current APDU and we
zmq_log
(
1
,
"New peer TSI: %s, %s(%i).
\n
"
,
pgm_print_tsi
(
tsi
),
// are not joined jet -> throwing data.
__FILE__
,
__LINE__
);
if
(
apdu_offset
==
0xFFFF
&&
!
joined
)
{
}
*
data_
=
NULL
;
return
0
;
}
// Now is the possibility to join the stream.
// There is not beginning of the message in current APDU and we
if
(
!
joined
)
{
// are not joined jet -> throwing data.
if
(
apdu_offset
==
0xFFFF
&&
!
it
->
second
.
joined
)
{
// We have to move data to the begining of the first message.
break
;
*
data_
=
(
unsigned
char
*
)
*
data_
+
apdu_offset
;
}
nbytes
-=
apdu_offset
;
// Joined the stream.
// Now is the possibility to join the stream.
joined
=
true
;
if
(
!
it
->
second
.
joined
)
{
zmq_assert
(
it
->
second
.
decoder
==
NULL
);
zmq_log
(
2
,
"joined into the stream, %s(%i)
\n
"
,
// We have to move data to the begining of the first message.
__FILE__
,
__LINE__
);
raw_data
+=
apdu_offset
;
}
nbytes
-=
apdu_offset
;
// Joined the stream.
it
->
second
.
joined
=
true
;
// Create and connect decoder for joined peer.
it
->
second
.
decoder
=
new
zmq_decoder_t
;
it
->
second
.
decoder
->
set_inout
(
inout
);
zmq_log
(
1
,
"Peer %s joined into the stream, %s(%i)
\n
"
,
pgm_print_tsi
(
tsi
),
__FILE__
,
__LINE__
);
}
if
(
nbytes
>
0
)
{
// Push all the data to the decoder.
it
->
second
.
decoder
->
write
(
raw_data
,
nbytes
);
}
}
while
(
nbytes
>
0
);
// Flush any messages decoder may have produced to the dispatcher.
inout
->
flush
();
}
return
nbytes
;
void
zmq
::
pgm_receiver_t
::
out_event
()
{
zmq_assert
(
false
);
}
}
#endif
#endif
src/pgm_receiver.hpp
View file @
b0a1a16f
...
@@ -30,6 +30,8 @@
...
@@ -30,6 +30,8 @@
#include "zmq_decoder.hpp"
#include "zmq_decoder.hpp"
#include "pgm_socket.hpp"
#include "pgm_socket.hpp"
#include <map>
namespace
zmq
namespace
zmq
{
{
...
@@ -45,7 +47,6 @@ namespace zmq
...
@@ -45,7 +47,6 @@ namespace zmq
~
pgm_receiver_t
();
~
pgm_receiver_t
();
int
init
(
bool
udp_encapsulation_
,
const
char
*
network_
);
int
init
(
bool
udp_encapsulation_
,
const
char
*
network_
);
void
reconnect
();
// i_engine interface implementation.
// i_engine interface implementation.
void
plug
(
struct
i_inout
*
inout_
);
void
plug
(
struct
i_inout
*
inout_
);
...
@@ -57,15 +58,28 @@ namespace zmq
...
@@ -57,15 +58,28 @@ namespace zmq
void
out_event
();
void
out_event
();
private
:
private
:
// Read exactly iov_len_ count APDUs, function returns number
// of bytes received. Note that if we did not join message stream
// Map to hold TSI, joined and decoder for each peer.
// before and there is not message beginning in the APDUs being
struct
peer_info_t
{
// received iov_len for such a APDUs will be 0.
bool
joined
;
ssize_t
receive_with_offset
(
void
**
data_
);
zmq_decoder_t
*
decoder
;
};
// Message decoder.
zmq_decoder_t
*
decoder
;
struct
tsi_comp
{
bool
operator
()
(
const
pgm_tsi_t
&
ltsi
,
const
pgm_tsi_t
&
rtsi
)
const
{
if
(
ltsi
.
sport
<
rtsi
.
sport
)
return
true
;
return
(
std
::
lexicographical_compare
(
ltsi
.
gsi
.
identifier
,
ltsi
.
gsi
.
identifier
+
6
,
rtsi
.
gsi
.
identifier
,
rtsi
.
gsi
.
identifier
+
6
));
}
};
typedef
std
::
map
<
pgm_tsi_t
,
peer_info_t
,
tsi_comp
>
peer_t
;
peer_t
peers
;
// PGM socket.
// PGM socket.
pgm_socket_t
pgm_socket
;
pgm_socket_t
pgm_socket
;
...
@@ -75,9 +89,6 @@ namespace zmq
...
@@ -75,9 +89,6 @@ namespace zmq
// Name of the session associated with the connecter.
// Name of the session associated with the connecter.
std
::
string
session_name
;
std
::
string
session_name
;
// If receiver joined the messages stream.
bool
joined
;
// Parent session.
// Parent session.
i_inout
*
inout
;
i_inout
*
inout
;
...
...
src/pgm_socket.cpp
View file @
b0a1a16f
...
@@ -23,10 +23,7 @@
...
@@ -23,10 +23,7 @@
#ifdef ZMQ_HAVE_LINUX
#ifdef ZMQ_HAVE_LINUX
#include <pgm/pgm.h>
#include <pgm/pgm.h>
#else
#include <openssl/md5.h>
#include <Winsock2.h>
#include <Wsrm.h>
#include <ws2spi.h>
#endif
#endif
#include <string>
#include <string>
...
@@ -36,6 +33,7 @@
...
@@ -36,6 +33,7 @@
#include "pgm_socket.hpp"
#include "pgm_socket.hpp"
#include "config.hpp"
#include "config.hpp"
#include "err.hpp"
#include "err.hpp"
#include "uuid.hpp"
//#define PGM_SOCKET_DEBUG
//#define PGM_SOCKET_DEBUG
//#define PGM_SOCKET_DEBUG_LEVEL 1
//#define PGM_SOCKET_DEBUG_LEVEL 1
...
@@ -68,6 +66,21 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :
...
@@ -68,6 +66,21 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :
}
}
int
zmq
::
pgm_socket_t
::
pgm_create_custom_gsi
(
const
char
*
data_
,
pgm_gsi_t
*
gsi_
)
{
unsigned
char
result_md5
[
16
];
MD5_CTX
ctx
;
MD5_Init
(
&
ctx
);
MD5_Update
(
&
ctx
,
data_
,
strlen
(
data_
));
MD5_Final
(
result_md5
,
&
ctx
);
memcpy
(
gsi_
,
result_md5
+
10
,
6
);
return
0
;
}
int
zmq
::
pgm_socket_t
::
init
(
bool
udp_encapsulation_
,
const
char
*
network_
)
int
zmq
::
pgm_socket_t
::
init
(
bool
udp_encapsulation_
,
const
char
*
network_
)
{
{
udp_encapsulation
=
udp_encapsulation_
;
udp_encapsulation
=
udp_encapsulation_
;
...
@@ -118,10 +131,6 @@ int zmq::pgm_socket_t::open_transport (void)
...
@@ -118,10 +131,6 @@ int zmq::pgm_socket_t::open_transport (void)
// Can not open transport before destroying old one.
// Can not open transport before destroying old one.
zmq_assert
(
g_transport
==
NULL
);
zmq_assert
(
g_transport
==
NULL
);
// Set actual_tsi and prev_tsi to zeros.
memset
(
&
tsi
,
'\0'
,
sizeof
(
pgm_tsi_t
));
memset
(
&
retired_tsi
,
'\0'
,
sizeof
(
pgm_tsi_t
));
// Zero counter used in msgrecv.
// Zero counter used in msgrecv.
nbytes_rec
=
0
;
nbytes_rec
=
0
;
nbytes_processed
=
0
;
nbytes_processed
=
0
;
...
@@ -146,12 +155,25 @@ int zmq::pgm_socket_t::open_transport (void)
...
@@ -146,12 +155,25 @@ int zmq::pgm_socket_t::open_transport (void)
struct
group_source_req
recv_gsr
,
send_gsr
;
struct
group_source_req
recv_gsr
,
send_gsr
;
size_t
recv_gsr_len
=
1
;
size_t
recv_gsr_len
=
1
;
rc
=
pgm_create_md5_gsi
(
&
gsi
);
if
(
options
.
identity
.
size
()
>
0
)
{
// Create gsi from identity string.
rc
=
pgm_create_custom_gsi
(
options
.
identity
.
c_str
(),
&
gsi
);
}
else
{
// Generate random gsi.
rc
=
pgm_create_custom_gsi
(
uuid_t
().
to_string
(),
&
gsi
);
}
if
(
rc
!=
0
)
{
if
(
rc
!=
0
)
{
errno
=
EINVAL
;
errno
=
EINVAL
;
return
-
1
;
return
-
1
;
}
}
zmq_log
(
1
,
"Transport GSI: %s, %s(%i)
\n
"
,
pgm_print_gsi
(
&
gsi
),
__FILE__
,
__LINE__
);
// On success, 0 is returned. On invalid arguments, -EINVAL is returned.
// On success, 0 is returned. On invalid arguments, -EINVAL is returned.
// If more multicast groups are found than the recv_len parameter,
// If more multicast groups are found than the recv_len parameter,
// -ENOMEM is returned.
// -ENOMEM is returned.
...
@@ -204,14 +226,6 @@ int zmq::pgm_socket_t::open_transport (void)
...
@@ -204,14 +226,6 @@ int zmq::pgm_socket_t::open_transport (void)
// Receiver transport.
// Receiver transport.
if
(
receiver
)
{
if
(
receiver
)
{
// Set transport->may_close_on_failure to true,
// after data los recvmsgv returns -1 errno set to ECONNRESET.
rc
=
pgm_transport_set_close_on_failure
(
g_transport
,
TRUE
);
if
(
rc
!=
0
)
{
errno
=
EINVAL
;
return
-
1
;
}
// Set transport->can_send_data = FALSE.
// Set transport->can_send_data = FALSE.
// Note that NAKs are still generated by the transport.
// Note that NAKs are still generated by the transport.
rc
=
pgm_transport_set_recv_only
(
g_transport
,
false
);
rc
=
pgm_transport_set_recv_only
(
g_transport
,
false
);
...
@@ -543,7 +557,7 @@ void zmq::pgm_socket_t::free_buffer (void *data_)
...
@@ -543,7 +557,7 @@ void zmq::pgm_socket_t::free_buffer (void *data_)
// pgm_transport_recvmsgv is called to fill the pgm_msgv array up to
// pgm_transport_recvmsgv is called to fill the pgm_msgv array up to
// pgm_msgv_len. In subsequent calls data from pgm_msgv structure are
// pgm_msgv_len. In subsequent calls data from pgm_msgv structure are
// returned.
// returned.
ssize_t
zmq
::
pgm_socket_t
::
receive
(
void
**
raw_data_
)
ssize_t
zmq
::
pgm_socket_t
::
receive
(
void
**
raw_data_
,
const
pgm_tsi_t
**
tsi_
)
{
{
// We just sent all data from pgm_transport_recvmsgv up
// We just sent all data from pgm_transport_recvmsgv up
// and have to return 0 that another engine in this thread is scheduled.
// and have to return 0 that another engine in this thread is scheduled.
...
@@ -583,9 +597,13 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
...
@@ -583,9 +597,13 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
// For data loss nbytes_rec == -1 errno == ECONNRESET.
// For data loss nbytes_rec == -1 errno == ECONNRESET.
if
(
nbytes_rec
==
-
1
&&
errno
==
ECONNRESET
)
{
if
(
nbytes_rec
==
-
1
&&
errno
==
ECONNRESET
)
{
// Save lost data TSI.
*
tsi_
=
&
(
g_transport
->
lost_data_tsi
);
// In case of dala loss -1 is returned.
// In case of dala loss -1 is returned.
zmq_log
(
1
,
"Data loss detected, %s(%i)
\n
"
,
__FILE__
,
__LINE__
);
zmq_log
(
1
,
"Data loss detected %s, %s(%i)
\n
"
,
pgm_print_tsi
(
&
(
g_transport
->
lost_data_tsi
)),
__FILE__
,
__LINE__
);
nbytes_rec
=
0
;
nbytes_rec
=
0
;
return
-
1
;
return
-
1
;
}
}
...
@@ -610,65 +628,9 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
...
@@ -610,65 +628,9 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
*
raw_data_
=
pgm_msgv
[
pgm_msgv_processed
].
msgv_iov
->
iov_base
;
*
raw_data_
=
pgm_msgv
[
pgm_msgv_processed
].
msgv_iov
->
iov_base
;
size_t
raw_data_len
=
pgm_msgv
[
pgm_msgv_processed
].
msgv_iov
->
iov_len
;
size_t
raw_data_len
=
pgm_msgv
[
pgm_msgv_processed
].
msgv_iov
->
iov_len
;
// Check if peer TSI did not change, this is detection of peer restart.
// Save current TSI.
const
pgm_tsi_t
*
current_tsi
=
pgm_msgv
[
pgm_msgv_processed
].
msgv_tsi
;
*
tsi_
=
pgm_msgv
[
pgm_msgv_processed
].
msgv_tsi
;
// If empty store new TSI.
if
(
tsi_empty
(
&
tsi
))
{
// Store current peer TSI.
memcpy
(
&
tsi
,
current_tsi
,
sizeof
(
pgm_tsi_t
));
#ifdef PGM_SOCKET_DEBUG
uint8_t
*
gsi
=
(
uint8_t
*
)(
&
tsi
)
->
gsi
.
identifier
;
#endif
zmq_log
(
1
,
"First peer TSI: %i.%i.%i.%i.%i.%i.%i, %s(%i)
\n
"
,
gsi
[
0
],
gsi
[
1
],
gsi
[
2
],
gsi
[
3
],
gsi
[
4
],
gsi
[
5
],
ntohs
(
tsi
.
sport
),
__FILE__
,
__LINE__
);
}
// Compare stored TSI with actual.
if
(
!
tsi_equal
(
&
tsi
,
current_tsi
))
{
// Peer change detected.
zmq_log
(
1
,
"Peer change detected, %s(%i)
\n
"
,
__FILE__
,
__LINE__
);
// Compare with retired TSI, in case of match ignore APDU.
if
(
tsi_equal
(
&
retired_tsi
,
current_tsi
))
{
zmq_log
(
1
,
"Retired TSI - ignoring APDU, %s(%i)
\n
"
,
__FILE__
,
__LINE__
);
// Move the the next pgm_msgv_t structure.
pgm_msgv_processed
++
;
nbytes_processed
+=
raw_data_len
;
return
0
;
}
else
{
zmq_log
(
1
,
"New TSI, %s(%i)
\n
"
,
__FILE__
,
__LINE__
);
// Store new TSI and move last valid to retired_tsi
memcpy
(
&
retired_tsi
,
&
tsi
,
sizeof
(
pgm_tsi_t
));
memcpy
(
&
tsi
,
current_tsi
,
sizeof
(
pgm_tsi_t
));
#ifdef PGM_SOCKET_DEBUG
uint8_t
*
gsi
=
(
uint8_t
*
)(
&
retired_tsi
)
->
gsi
.
identifier
;
#endif
zmq_log
(
1
,
"retired TSI: %i.%i.%i.%i.%i.%i.%i, %s(%i)
\n
"
,
gsi
[
0
],
gsi
[
1
],
gsi
[
2
],
gsi
[
3
],
gsi
[
4
],
gsi
[
5
],
ntohs
(
retired_tsi
.
sport
),
__FILE__
,
__LINE__
);
#ifdef PGM_SOCKET_DEBUG
gsi
=
(
uint8_t
*
)(
&
tsi
)
->
gsi
.
identifier
;
#endif
zmq_log
(
1
,
" TSI: %i.%i.%i.%i.%i.%i.%i, %s(%i)
\n
"
,
gsi
[
0
],
gsi
[
1
],
gsi
[
2
],
gsi
[
3
],
gsi
[
4
],
gsi
[
5
],
ntohs
(
tsi
.
sport
),
__FILE__
,
__LINE__
);
// Peers change is recognized as a GAP.
return
-
1
;
}
}
// Move the the next pgm_msgv_t structure.
// Move the the next pgm_msgv_t structure.
pgm_msgv_processed
++
;
pgm_msgv_processed
++
;
nbytes_processed
+=
raw_data_len
;
nbytes_processed
+=
raw_data_len
;
...
@@ -692,47 +654,6 @@ void zmq::pgm_socket_t::process_upstream (void)
...
@@ -692,47 +654,6 @@ void zmq::pgm_socket_t::process_upstream (void)
zmq_assert
(
dummy_bytes
==
-
1
&&
errno
==
EAGAIN
);
zmq_assert
(
dummy_bytes
==
-
1
&&
errno
==
EAGAIN
);
}
}
bool
zmq
::
pgm_socket_t
::
tsi_equal
(
const
pgm_tsi_t
*
tsi_a_
,
const
pgm_tsi_t
*
tsi_b_
)
{
// Compare 6B GSI.
const
uint8_t
*
gsi_a
=
tsi_a_
->
gsi
.
identifier
;
const
uint8_t
*
gsi_b
=
tsi_b_
->
gsi
.
identifier
;
if
(
gsi_a
[
0
]
!=
gsi_b
[
0
]
||
gsi_a
[
1
]
!=
gsi_b
[
1
]
||
gsi_a
[
2
]
!=
gsi_b
[
2
]
||
gsi_a
[
3
]
!=
gsi_b
[
3
]
||
gsi_a
[
4
]
!=
gsi_b
[
4
]
||
gsi_a
[
5
]
!=
gsi_b
[
5
])
{
return
false
;
}
// Compare source port.
if
(
tsi_a_
->
sport
!=
tsi_b_
->
sport
)
{
return
false
;
}
return
true
;
}
bool
zmq
::
pgm_socket_t
::
tsi_empty
(
const
pgm_tsi_t
*
tsi_
)
{
uint8_t
*
gsi
=
(
uint8_t
*
)
tsi_
->
gsi
.
identifier
;
// GSI.
if
(
gsi
[
0
]
!=
0
||
gsi
[
1
]
!=
0
||
gsi
[
2
]
!=
0
||
gsi
[
3
]
!=
0
||
gsi
[
4
]
!=
0
||
gsi
[
5
]
!=
0
)
{
return
false
;
}
// Source port.
if
(
tsi_
->
sport
!=
0
)
{
return
false
;
}
return
true
;
}
#endif
#endif
#endif
#endif
src/pgm_socket.hpp
View file @
b0a1a16f
...
@@ -77,7 +77,7 @@ namespace zmq
...
@@ -77,7 +77,7 @@ namespace zmq
void
free_buffer
(
void
*
data_
);
void
free_buffer
(
void
*
data_
);
// Receive data from pgm socket.
// Receive data from pgm socket.
ssize_t
receive
(
void
**
data_
);
ssize_t
receive
(
void
**
data_
,
const
pgm_tsi_t
**
tsi_
);
// POLLIN on sender side should mean NAK or SPMR receiving.
// POLLIN on sender side should mean NAK or SPMR receiving.
// process_upstream function is used to handle such a situation.
// process_upstream function is used to handle such a situation.
...
@@ -90,21 +90,18 @@ namespace zmq
...
@@ -90,21 +90,18 @@ namespace zmq
private
:
private
:
// Associated socket options.
options_t
options
;
// Returns max tsdu size without fragmentation.
// Returns max tsdu size without fragmentation.
size_t
get_max_tsdu_size
(
void
);
size_t
get_max_tsdu_size
(
void
);
// Returns maximum count of apdus which fills readbuf_size_
// Returns maximum count of apdus which fills readbuf_size_
size_t
get_max_apdu_at_once
(
size_t
readbuf_size_
);
size_t
get_max_apdu_at_once
(
size_t
readbuf_size_
);
//
Return true if TSI has empty GSI ('\0') and sport 0
.
//
Compute gsi from string
.
bool
tsi_empty
(
const
pgm_tsi_t
*
t
si_
);
int
pgm_create_custom_gsi
(
const
char
*
data_
,
pgm_gsi_t
*
g
si_
);
//
Compare TSIs, return true if equal
.
//
Associated socket options
.
bool
tsi_equal
(
const
pgm_tsi_t
*
tsi_a_
,
const
pgm_tsi_t
*
tsi_b_
)
;
options_t
options
;
// true when pgm_socket should create receiving side.
// true when pgm_socket should create receiving side.
bool
receiver
;
bool
receiver
;
...
@@ -140,10 +137,10 @@ namespace zmq
...
@@ -140,10 +137,10 @@ namespace zmq
enum
{
pgm_receiver_fd_count
=
2
};
enum
{
pgm_receiver_fd_count
=
2
};
// TSI of the actual peer.
// TSI of the actual peer.
pgm_tsi_t
tsi
;
//
pgm_tsi_t tsi;
// Previous peer TSI.
// Previous peer TSI.
pgm_tsi_t
retired_tsi
;
//
pgm_tsi_t retired_tsi;
#endif
#endif
};
};
...
...
src/platform.hpp.in
View file @
b0a1a16f
...
@@ -36,6 +36,9 @@
...
@@ -36,6 +36,9 @@
/* Define to 1 if you have the `socket' library (-lsocket). */
/* Define to 1 if you have the `socket' library (-lsocket). */
#undef HAVE_LIBSOCKET
#undef HAVE_LIBSOCKET
/* Define to 1 if you have the `ssl' library (-lssl). */
#undef HAVE_LIBSSL
/* Define to 1 if you have the `stdc++' library (-lstdc++). */
/* Define to 1 if you have the `stdc++' library (-lstdc++). */
#undef HAVE_LIBSTDC__
#undef HAVE_LIBSTDC__
...
@@ -61,6 +64,9 @@
...
@@ -61,6 +64,9 @@
/* Define to 1 if you have the <netinet/tcp.h> header file. */
/* Define to 1 if you have the <netinet/tcp.h> header file. */
#undef HAVE_NETINET_TCP_H
#undef HAVE_NETINET_TCP_H
/* Define to 1 if you have the <openssl/md5.h> header file. */
#undef HAVE_OPENSSL_MD5_H
/* Define to 1 if you have the `perror' function. */
/* Define to 1 if you have the `perror' function. */
#undef HAVE_PERROR
#undef HAVE_PERROR
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment