Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
c43aded5
Commit
c43aded5
authored
Dec 13, 2009
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
debug code removed from PGM engines
parent
f4ac8d7a
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
24 additions
and
156 deletions
+24
-156
pgm_receiver.cpp
src/pgm_receiver.cpp
+0
-21
pgm_receiver.hpp
src/pgm_receiver.hpp
+6
-4
pgm_sender.cpp
src/pgm_sender.cpp
+2
-31
pgm_socket.cpp
src/pgm_socket.cpp
+8
-84
pgm_socket.hpp
src/pgm_socket.hpp
+8
-16
No files found.
src/pgm_receiver.cpp
View file @
c43aded5
...
@@ -34,20 +34,6 @@
...
@@ -34,20 +34,6 @@
#include "wire.hpp"
#include "wire.hpp"
#include "i_inout.hpp"
#include "i_inout.hpp"
//#define PGM_RECEIVER_DEBUG
//#define PGM_RECEIVER_DEBUG_LEVEL 1
// level 1 = key behaviour
// level 2 = processing flow
// level 4 = infos
#ifndef PGM_RECEIVER_DEBUG
# define zmq_log(n, ...) while (0)
#else
# define zmq_log(n, ...) do { if ((n) <= PGM_RECEIVER_DEBUG_LEVEL) \
{ printf (__VA_ARGS__);}} while (0)
#endif
zmq
::
pgm_receiver_t
::
pgm_receiver_t
(
class
io_thread_t
*
parent_
,
zmq
::
pgm_receiver_t
::
pgm_receiver_t
(
class
io_thread_t
*
parent_
,
const
options_t
&
options_
,
const
char
*
session_name_
)
:
const
options_t
&
options_
,
const
char
*
session_name_
)
:
io_object_t
(
parent_
),
io_object_t
(
parent_
),
...
@@ -161,12 +147,8 @@ void zmq::pgm_receiver_t::in_event ()
...
@@ -161,12 +147,8 @@ void zmq::pgm_receiver_t::in_event ()
// New peer.
// New peer.
if
(
it
==
peers
.
end
())
{
if
(
it
==
peers
.
end
())
{
peer_info_t
peer_info
=
{
false
,
NULL
};
peer_info_t
peer_info
=
{
false
,
NULL
};
it
=
peers
.
insert
(
std
::
make_pair
(
*
tsi
,
peer_info
)).
first
;
it
=
peers
.
insert
(
std
::
make_pair
(
*
tsi
,
peer_info
)).
first
;
zmq_log
(
1
,
"New peer TSI: %s, %s(%i).
\n
"
,
pgm_tsi_print
(
tsi
),
__FILE__
,
__LINE__
);
}
}
// There is not beginning of the message in current APDU and we
// There is not beginning of the message in current APDU and we
...
@@ -191,9 +173,6 @@ void zmq::pgm_receiver_t::in_event ()
...
@@ -191,9 +173,6 @@ void zmq::pgm_receiver_t::in_event ()
// Create and connect decoder for joined peer.
// Create and connect decoder for joined peer.
it
->
second
.
decoder
=
new
zmq_decoder_t
(
0
);
it
->
second
.
decoder
=
new
zmq_decoder_t
(
0
);
it
->
second
.
decoder
->
set_inout
(
inout
);
it
->
second
.
decoder
->
set_inout
(
inout
);
zmq_log
(
1
,
"Peer %s joined into the stream, %s(%i)
\n
"
,
pgm_tsi_print
(
tsi
),
__FILE__
,
__LINE__
);
}
}
if
(
nbytes
>
0
)
{
if
(
nbytes
>
0
)
{
...
...
src/pgm_receiver.hpp
View file @
c43aded5
...
@@ -37,7 +37,6 @@
...
@@ -37,7 +37,6 @@
#include "zmq_decoder.hpp"
#include "zmq_decoder.hpp"
#include "pgm_socket.hpp"
#include "pgm_socket.hpp"
namespace
zmq
namespace
zmq
{
{
...
@@ -66,13 +65,16 @@ namespace zmq
...
@@ -66,13 +65,16 @@ namespace zmq
private
:
private
:
// Map to hold TSI, joined and decoder for each peer.
// Map to hold TSI, joined and decoder for each peer.
struct
peer_info_t
{
struct
peer_info_t
{
bool
joined
;
bool
joined
;
zmq_decoder_t
*
decoder
;
zmq_decoder_t
*
decoder
;
};
};
struct
tsi_comp
{
struct
tsi_comp
bool
operator
()
(
const
pgm_tsi_t
&
ltsi
,
const
pgm_tsi_t
&
rtsi
)
const
{
inline
bool
operator
()
(
const
pgm_tsi_t
&
ltsi
,
const
pgm_tsi_t
&
rtsi
)
const
{
{
if
(
ltsi
.
sport
<
rtsi
.
sport
)
if
(
ltsi
.
sport
<
rtsi
.
sport
)
return
true
;
return
true
;
...
...
src/pgm_sender.cpp
View file @
c43aded5
...
@@ -32,20 +32,6 @@
...
@@ -32,20 +32,6 @@
#include "err.hpp"
#include "err.hpp"
#include "wire.hpp"
#include "wire.hpp"
//#define PGM_SENDER_DEBUG
//#define PGM_SENDER_DEBUG_LEVEL 1
// level 1 = key behaviour
// level 2 = processing flow
// level 4 = infos
#ifndef PGM_SENDER_DEBUG
# define zmq_log(n, ...) while (0)
#else
# define zmq_log(n, ...) do { if ((n) <= PGM_SENDER_DEBUG_LEVEL) \
{ printf (__VA_ARGS__);}} while (0)
#endif
zmq
::
pgm_sender_t
::
pgm_sender_t
(
io_thread_t
*
parent_
,
zmq
::
pgm_sender_t
::
pgm_sender_t
(
io_thread_t
*
parent_
,
const
options_t
&
options_
,
const
char
*
session_name_
)
:
const
options_t
&
options_
,
const
char
*
session_name_
)
:
io_object_t
(
parent_
),
io_object_t
(
parent_
),
...
@@ -119,9 +105,6 @@ void zmq::pgm_sender_t::revive ()
...
@@ -119,9 +105,6 @@ void zmq::pgm_sender_t::revive ()
zmq
::
pgm_sender_t
::~
pgm_sender_t
()
zmq
::
pgm_sender_t
::~
pgm_sender_t
()
{
{
zmq_log
(
4
,
"pgm_sender_t destructor, %s(%i)
\n
"
,
__FILE__
,
__LINE__
);
if
(
out_buffer
)
{
if
(
out_buffer
)
{
pgm_socket
.
free_buffer
(
out_buffer
);
pgm_socket
.
free_buffer
(
out_buffer
);
out_buffer
=
NULL
;
out_buffer
=
NULL
;
...
@@ -171,17 +154,8 @@ void zmq::pgm_sender_t::out_event ()
...
@@ -171,17 +154,8 @@ void zmq::pgm_sender_t::out_event ()
size_t
nbytes
=
write_one_pkt_with_offset
(
out_buffer
+
write_pos
,
size_t
nbytes
=
write_one_pkt_with_offset
(
out_buffer
+
write_pos
,
write_size
-
write_pos
,
(
uint16_t
)
first_message_offset
);
write_size
-
write_pos
,
(
uint16_t
)
first_message_offset
);
// We can write all data or 0 which means rate limit reached.
// We can write either all data or 0 which means rate limit reached.
if
(
write_size
-
write_pos
!=
nbytes
&&
nbytes
!=
0
)
{
zmq_assert
(
write_size
-
write_pos
==
nbytes
||
nbytes
==
0
);
zmq_log
(
2
,
"write_size - write_pos %i, nbytes %i, %s(%i)"
,
(
int
)(
write_size
-
write_pos
),
(
int
)
nbytes
,
__FILE__
,
__LINE__
);
assert
(
false
);
}
// PGM rate limit reached nbytes is 0.
if
(
!
nbytes
)
{
zmq_log
(
1
,
"pgm rate limit reached, %s(%i)
\n
"
,
__FILE__
,
__LINE__
);
}
write_pos
+=
nbytes
;
write_pos
+=
nbytes
;
}
}
...
@@ -191,9 +165,6 @@ void zmq::pgm_sender_t::out_event ()
...
@@ -191,9 +165,6 @@ void zmq::pgm_sender_t::out_event ()
size_t
zmq
::
pgm_sender_t
::
write_one_pkt_with_offset
(
unsigned
char
*
data_
,
size_t
zmq
::
pgm_sender_t
::
write_one_pkt_with_offset
(
unsigned
char
*
data_
,
size_t
size_
,
uint16_t
offset_
)
size_t
size_
,
uint16_t
offset_
)
{
{
zmq_log
(
4
,
"data_size %i, first message offset %i, %s(%i)
\n
"
,
(
int
)
size_
,
offset_
,
__FILE__
,
__LINE__
);
// Put offset information in the buffer.
// Put offset information in the buffer.
put_uint16
(
data_
,
offset_
);
put_uint16
(
data_
,
offset_
);
...
...
src/pgm_socket.cpp
View file @
c43aded5
...
@@ -41,20 +41,6 @@
...
@@ -41,20 +41,6 @@
#include "err.hpp"
#include "err.hpp"
#include "uuid.hpp"
#include "uuid.hpp"
//#define PGM_SOCKET_DEBUG
//#define PGM_SOCKET_DEBUG_LEVEL 4
// level 1 = key behaviour
// level 2 = processing flow
// level 4 = infos
#ifndef PGM_SOCKET_DEBUG
# define zmq_log(n, ...) while (0)
#else
# define zmq_log(n, ...) do { if ((n) <= PGM_SOCKET_DEBUG_LEVEL) \
{ printf (__VA_ARGS__);}} while (0)
#endif
zmq
::
pgm_socket_t
::
pgm_socket_t
(
bool
receiver_
,
const
options_t
&
options_
)
:
zmq
::
pgm_socket_t
::
pgm_socket_t
(
bool
receiver_
,
const
options_t
&
options_
)
:
transport
(
NULL
),
transport
(
NULL
),
options
(
options_
),
options
(
options_
),
...
@@ -91,10 +77,6 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
...
@@ -91,10 +77,6 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
memset
(
network
,
'\0'
,
sizeof
(
network
));
memset
(
network
,
'\0'
,
sizeof
(
network
));
memcpy
(
network
,
network_
,
port_delim
-
network_
);
memcpy
(
network
,
network_
,
port_delim
-
network_
);
zmq_log
(
1
,
"parsed: network %s, port %i, udp encaps. %s, %s(%i)
\n
"
,
network
,
port_number
,
udp_encapsulation
?
"yes"
:
"no"
,
__FILE__
,
__LINE__
);
// Open PGM transport.
// Open PGM transport.
int
rc
=
open_transport
();
int
rc
=
open_transport
();
if
(
rc
!=
0
)
if
(
rc
!=
0
)
...
@@ -105,20 +87,13 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
...
@@ -105,20 +87,13 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
if
(
receiver
)
{
if
(
receiver
)
{
pgm_msgv_len
=
get_max_apdu_at_once
(
in_batch_size
);
pgm_msgv_len
=
get_max_apdu_at_once
(
in_batch_size
);
pgm_msgv
=
new
pgm_msgv_t
[
pgm_msgv_len
];
pgm_msgv
=
new
pgm_msgv_t
[
pgm_msgv_len
];
zmq_log
(
1
,
"PGM transport: pgm_msgv_len %i, %s(%i)
\n
"
,
(
int
)
pgm_msgv_len
,
__FILE__
,
__LINE__
);
}
}
return
0
;
return
0
;
}
}
int
zmq
::
pgm_socket_t
::
open_transport
(
void
)
int
zmq
::
pgm_socket_t
::
open_transport
()
{
{
zmq_log
(
1
,
"Opening PGM: network %s, port %i, udp encaps. %s, %s(%i)
\n
"
,
network
,
port_number
,
udp_encapsulation
?
"yes"
:
"no"
,
__FILE__
,
__LINE__
);
// Can not open transport before destroying old one.
// Can not open transport before destroying old one.
zmq_assert
(
transport
==
NULL
);
zmq_assert
(
transport
==
NULL
);
...
@@ -193,8 +168,6 @@ int zmq::pgm_socket_t::open_transport (void)
...
@@ -193,8 +168,6 @@ int zmq::pgm_socket_t::open_transport (void)
pgm_if_free_transport_info
(
res
);
pgm_if_free_transport_info
(
res
);
zmq_log
(
1
,
"PGM transport created, %s(%i)
\n
"
,
__FILE__
,
__LINE__
);
// Common parameters for receiver and sender.
// Common parameters for receiver and sender.
// Set maximum transport protocol data unit size (TPDU).
// Set maximum transport protocol data unit size (TPDU).
...
@@ -211,13 +184,11 @@ int zmq::pgm_socket_t::open_transport (void)
...
@@ -211,13 +184,11 @@ int zmq::pgm_socket_t::open_transport (void)
return
-
1
;
return
-
1
;
}
}
#ifdef ZMQ_HAVE_OPENPGM2
// Set nonblocking send/recv sockets.
// Set nonblocking send/recv sockets.
if
(
!
pgm_transport_set_nonblocking
(
transport
,
true
))
{
if
(
!
pgm_transport_set_nonblocking
(
transport
,
true
))
{
errno
=
EINVAL
;
errno
=
EINVAL
;
return
-
1
;
return
-
1
;
}
}
#endif
if
(
receiver
)
{
if
(
receiver
)
{
...
@@ -338,8 +309,6 @@ int zmq::pgm_socket_t::open_transport (void)
...
@@ -338,8 +309,6 @@ int zmq::pgm_socket_t::open_transport (void)
return
-
1
;
return
-
1
;
}
}
zmq_log
(
1
,
"PGM transport bound, %s(%i)
\n
"
,
__FILE__
,
__LINE__
);
return
0
;
return
0
;
}
}
...
@@ -354,7 +323,7 @@ zmq::pgm_socket_t::~pgm_socket_t ()
...
@@ -354,7 +323,7 @@ zmq::pgm_socket_t::~pgm_socket_t ()
close_transport
();
close_transport
();
}
}
void
zmq
::
pgm_socket_t
::
close_transport
(
void
)
void
zmq
::
pgm_socket_t
::
close_transport
()
{
{
// transport has to be valid.
// transport has to be valid.
zmq_assert
(
transport
);
zmq_assert
(
transport
);
...
@@ -366,7 +335,7 @@ void zmq::pgm_socket_t::close_transport (void)
...
@@ -366,7 +335,7 @@ void zmq::pgm_socket_t::close_transport (void)
// Get receiver fds. recv_fd is from transport->recv_sock
// Get receiver fds. recv_fd is from transport->recv_sock
// waiting_pipe_fd is from transport->waiting_pipe [0]
// waiting_pipe_fd is from transport->waiting_pipe [0]
int
zmq
::
pgm_socket_t
::
get_receiver_fds
(
int
*
receive_fd_
,
void
zmq
::
pgm_socket_t
::
get_receiver_fds
(
int
*
receive_fd_
,
int
*
waiting_pipe_fd_
)
int
*
waiting_pipe_fd_
)
{
{
zmq_assert
(
receive_fd_
);
zmq_assert
(
receive_fd_
);
...
@@ -382,15 +351,13 @@ int zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_,
...
@@ -382,15 +351,13 @@ int zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_,
// Take FDs directly from transport.
// Take FDs directly from transport.
*
receive_fd_
=
pgm_transport_get_recv_fd
(
transport
);
*
receive_fd_
=
pgm_transport_get_recv_fd
(
transport
);
*
waiting_pipe_fd_
=
pgm_transport_get_pending_fd
(
transport
);
*
waiting_pipe_fd_
=
pgm_transport_get_pending_fd
(
transport
);
return
pgm_receiver_fd_count
;
}
}
// Get fds and store them into user allocated memory.
// Get fds and store them into user allocated memory.
// sender_fd is from pgm_transport->send_sock.
// sender_fd is from pgm_transport->send_sock.
// receive_fd_ is from transport->recv_sock.
// receive_fd_ is from transport->recv_sock.
// rdata_notify_fd_ is from transport->rdata_notify (PGM2 only).
// rdata_notify_fd_ is from transport->rdata_notify (PGM2 only).
int
zmq
::
pgm_socket_t
::
get_sender_fds
(
int
*
send_fd_
,
int
*
receive_fd_
,
void
zmq
::
pgm_socket_t
::
get_sender_fds
(
int
*
send_fd_
,
int
*
receive_fd_
,
int
*
rdata_notify_fd_
)
int
*
rdata_notify_fd_
)
{
{
zmq_assert
(
send_fd_
);
zmq_assert
(
send_fd_
);
...
@@ -409,8 +376,6 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,
...
@@ -409,8 +376,6 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,
*
receive_fd_
=
pgm_transport_get_recv_fd
(
transport
);
*
receive_fd_
=
pgm_transport_get_recv_fd
(
transport
);
*
rdata_notify_fd_
=
pgm_transport_get_repair_fd
(
transport
);
*
rdata_notify_fd_
=
pgm_transport_get_repair_fd
(
transport
);
*
send_fd_
=
pgm_transport_get_send_fd
(
transport
);
*
send_fd_
=
pgm_transport_get_send_fd
(
transport
);
return
pgm_sender_fd_count
;
}
}
// Send one APDU, transmit window owned memory.
// Send one APDU, transmit window owned memory.
...
@@ -421,28 +386,19 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
...
@@ -421,28 +386,19 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
PGMIOStatus
status
=
pgm_send
(
transport
,
data_
,
data_len_
,
&
nbytes
);
PGMIOStatus
status
=
pgm_send
(
transport
,
data_
,
data_len_
,
&
nbytes
);
if
(
nbytes
!=
data_len_
)
{
if
(
nbytes
!=
data_len_
)
{
zmq_log
(
1
,
"status %i, data_len %i, wrote %iB, %s(%i)
\n
"
,
(
int
)
status
,
(
int
)
data_len_
,
(
int
)
nbytes
,
__FILE__
,
__LINE__
);
zmq_assert
(
status
==
PGM_IO_STATUS_RATE_LIMITED
);
zmq_assert
(
status
==
PGM_IO_STATUS_RATE_LIMITED
);
zmq_assert
(
nbytes
==
0
);
zmq_assert
(
nbytes
==
0
);
}
}
zmq_log
(
4
,
"wrote %i/%iB, %s(%i)
\n
"
,
(
int
)
nbytes
,
(
int
)
data_len_
,
__FILE__
,
__LINE__
);
// We have to write all data as one packet.
// We have to write all data as one packet.
if
(
nbytes
>
0
)
{
if
(
nbytes
>
0
)
zmq_log
(
1
,
"data sent %iB, %s(%i)
\n
"
,
(
int
)
nbytes
,
__FILE__
,
__LINE__
);
zmq_assert
((
ssize_t
)
nbytes
==
(
ssize_t
)
data_len_
);
zmq_assert
((
ssize_t
)
nbytes
==
(
ssize_t
)
data_len_
);
}
return
nbytes
;
return
nbytes
;
}
}
// Return max TSDU size without fragmentation from current PGM transport.
// Return max TSDU size without fragmentation from current PGM transport.
size_t
zmq
::
pgm_socket_t
::
get_max_tsdu_size
(
void
)
size_t
zmq
::
pgm_socket_t
::
get_max_tsdu_size
()
{
{
return
(
size_t
)
pgm_transport_max_tsdu
(
transport
,
false
);
return
(
size_t
)
pgm_transport_max_tsdu
(
transport
,
false
);
}
}
...
@@ -494,7 +450,6 @@ void zmq::pgm_socket_t::free_buffer (void *data_)
...
@@ -494,7 +450,6 @@ void zmq::pgm_socket_t::free_buffer (void *data_)
// returned.
// returned.
ssize_t
zmq
::
pgm_socket_t
::
receive
(
void
**
raw_data_
,
const
pgm_tsi_t
**
tsi_
)
ssize_t
zmq
::
pgm_socket_t
::
receive
(
void
**
raw_data_
,
const
pgm_tsi_t
**
tsi_
)
{
{
size_t
raw_data_len
=
0
;
size_t
raw_data_len
=
0
;
// We just sent all data from pgm_transport_recvmsgv up
// We just sent all data from pgm_transport_recvmsgv up
...
@@ -505,7 +460,6 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
...
@@ -505,7 +460,6 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
nbytes_rec
=
0
;
nbytes_rec
=
0
;
nbytes_processed
=
0
;
nbytes_processed
=
0
;
pgm_msgv_processed
=
0
;
pgm_msgv_processed
=
0
;
return
0
;
return
0
;
}
}
...
@@ -525,11 +479,6 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
...
@@ -525,11 +479,6 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
const
PGMIOStatus
status
=
pgm_recvmsgv
(
transport
,
pgm_msgv
,
const
PGMIOStatus
status
=
pgm_recvmsgv
(
transport
,
pgm_msgv
,
pgm_msgv_len
,
MSG_DONTWAIT
,
&
nbytes_rec
,
&
pgm_error
);
pgm_msgv_len
,
MSG_DONTWAIT
,
&
nbytes_rec
,
&
pgm_error
);
if
(
nbytes_rec
>
0
)
{
zmq_log
(
1
,
"PGMIOStatus %i, nbytes_rec %i, %s(%i).
\n
"
,
status
,
(
int
)
nbytes_rec
,
__FILE__
,
__LINE__
);
}
// In a case when no ODATA/RDATA fired POLLIN event (SPM...)
// In a case when no ODATA/RDATA fired POLLIN event (SPM...)
// pgm_recvmsg returns ?.
// pgm_recvmsg returns ?.
if
(
status
==
PGM_IO_STATUS_TIMER_PENDING
)
{
if
(
status
==
PGM_IO_STATUS_TIMER_PENDING
)
{
...
@@ -545,17 +494,10 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
...
@@ -545,17 +494,10 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
// Data loss.
// Data loss.
if
(
status
==
PGM_IO_STATUS_RESET
)
{
if
(
status
==
PGM_IO_STATUS_RESET
)
{
zmq_log
(
1
,
"PGMIOStatus %i, nbytes_rec %i, %s(%i).
\n
"
,
status
,
(
int
)
nbytes_rec
,
__FILE__
,
__LINE__
);
pgm_peer_t
*
peer
=
(
pgm_peer_t
*
)
transport
->
peers_pending
->
data
;
pgm_peer_t
*
peer
=
(
pgm_peer_t
*
)
transport
->
peers_pending
->
data
;
// Save lost data TSI.
// Save lost data TSI.
*
tsi_
=
&
peer
->
tsi
;
*
tsi_
=
&
peer
->
tsi
;
zmq_log
(
1
,
"Data loss detected %s, %s(%i)
\n
"
,
pgm_tsi_print
(
&
peer
->
tsi
),
__FILE__
,
__LINE__
);
nbytes_rec
=
0
;
nbytes_rec
=
0
;
// In case of dala loss -1 is returned.
// In case of dala loss -1 is returned.
...
@@ -563,18 +505,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
...
@@ -563,18 +505,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
}
}
// Catch the rest of the errors.
// Catch the rest of the errors.
if
(
status
!=
PGM_IO_STATUS_NORMAL
)
{
zmq_assert
(
status
==
PGM_IO_STATUS_NORMAL
);
zmq_log
(
1
,
"PGMIOStatus %i, nbytes_rec %i, %s(%i).
\n
"
,
status
,
(
int
)
nbytes_rec
,
__FILE__
,
__LINE__
);
zmq_assert
(
false
);
nbytes_rec
=
0
;
return
-
1
;
}
zmq_log
(
4
,
"received %i bytes
\n
"
,
(
int
)
nbytes_rec
);
}
}
zmq_assert
(
nbytes_rec
>
0
);
zmq_assert
(
nbytes_rec
>
0
);
...
@@ -596,15 +527,11 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
...
@@ -596,15 +527,11 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
pgm_msgv_processed
++
;
pgm_msgv_processed
++
;
nbytes_processed
+=
raw_data_len
;
nbytes_processed
+=
raw_data_len
;
zmq_log
(
4
,
"sendig up %i bytes
\n
"
,
(
int
)
raw_data_len
);
return
raw_data_len
;
return
raw_data_len
;
}
}
void
zmq
::
pgm_socket_t
::
process_upstream
(
void
)
void
zmq
::
pgm_socket_t
::
process_upstream
()
{
{
zmq_log
(
1
,
"On upstream packet, %s(%i)
\n
"
,
__FILE__
,
__LINE__
);
pgm_msgv_t
dummy_msg
;
pgm_msgv_t
dummy_msg
;
size_t
dummy_bytes
=
0
;
size_t
dummy_bytes
=
0
;
...
@@ -613,9 +540,6 @@ void zmq::pgm_socket_t::process_upstream (void)
...
@@ -613,9 +540,6 @@ void zmq::pgm_socket_t::process_upstream (void)
PGMIOStatus
status
=
pgm_recvmsgv
(
transport
,
&
dummy_msg
,
PGMIOStatus
status
=
pgm_recvmsgv
(
transport
,
&
dummy_msg
,
1
,
MSG_DONTWAIT
,
&
dummy_bytes
,
&
pgm_error
);
1
,
MSG_DONTWAIT
,
&
dummy_bytes
,
&
pgm_error
);
zmq_log
(
1
,
"upstream status %i, nbytes %i, %s(%i)
\n
"
,
(
int
)
status
,
(
int
)
dummy_bytes
,
__FILE__
,
__LINE__
);
// No data should be returned.
// No data should be returned.
zmq_assert
(
dummy_bytes
==
0
&&
(
status
==
PGM_IO_STATUS_TIMER_PENDING
||
zmq_assert
(
dummy_bytes
==
0
&&
(
status
==
PGM_IO_STATUS_TIMER_PENDING
||
status
==
PGM_IO_STATUS_RATE_LIMITED
));
status
==
PGM_IO_STATUS_RATE_LIMITED
));
...
...
src/pgm_socket.hpp
View file @
c43aded5
...
@@ -52,18 +52,18 @@ namespace zmq
...
@@ -52,18 +52,18 @@ namespace zmq
int
init
(
bool
udp_encapsulation_
,
const
char
*
network_
);
int
init
(
bool
udp_encapsulation_
,
const
char
*
network_
);
// Open PGM transport. Parameters are the same as in constructor.
// Open PGM transport. Parameters are the same as in constructor.
int
open_transport
(
void
);
int
open_transport
();
// Close transport.
// Close transport.
void
close_transport
(
void
);
void
close_transport
();
// Get receiver fds and store them into user allocated memory.
// Get receiver fds and store them into user allocated memory.
int
get_receiver_fds
(
int
*
receive_fd_
,
int
*
waiting_pipe_fd_
);
void
get_receiver_fds
(
int
*
receive_fd_
,
int
*
waiting_pipe_fd_
);
// Get sender and receiver fds and store it to user allocated
// Get sender and receiver fds and store it to user allocated
// memory. Receive fd is used to process NAKs from peers.
// memory. Receive fd is used to process NAKs from peers.
int
get_sender_fds
(
int
*
send_fd_
,
int
*
receive_fd_
,
void
get_sender_fds
(
int
*
send_fd_
,
int
*
receive_fd_
,
int
*
rdata_notify_fd_
=
NULL
);
int
*
rdata_notify_fd_
);
// Send data as one APDU, transmit window owned memory.
// Send data as one APDU, transmit window owned memory.
size_t
send
(
unsigned
char
*
data_
,
size_t
data_len_
);
size_t
send
(
unsigned
char
*
data_
,
size_t
data_len_
);
...
@@ -79,17 +79,15 @@ namespace zmq
...
@@ -79,17 +79,15 @@ namespace zmq
// POLLIN on sender side should mean NAK or SPMR receiving.
// POLLIN on sender side should mean NAK or SPMR receiving.
// process_upstream function is used to handle such a situation.
// process_upstream function is used to handle such a situation.
void
process_upstream
(
void
);
void
process_upstream
();
pr
otected
:
pr
ivate
:
// OpenPGM transport
// OpenPGM transport
pgm_transport_t
*
transport
;
pgm_transport_t
*
transport
;
private
:
// Returns max tsdu size without fragmentation.
// Returns max tsdu size without fragmentation.
size_t
get_max_tsdu_size
(
void
);
size_t
get_max_tsdu_size
();
// Returns maximum count of apdus which fills readbuf_size_
// Returns maximum count of apdus which fills readbuf_size_
size_t
get_max_apdu_at_once
(
size_t
readbuf_size_
);
size_t
get_max_apdu_at_once
(
size_t
readbuf_size_
);
...
@@ -124,12 +122,6 @@ namespace zmq
...
@@ -124,12 +122,6 @@ namespace zmq
// Size of pgm_msgv array.
// Size of pgm_msgv array.
size_t
pgm_msgv_len
;
size_t
pgm_msgv_len
;
// Sender transport uses 2 fd.
enum
{
pgm_sender_fd_count
=
3
};
// Receiver transport uses 2 fd.
enum
{
pgm_receiver_fd_count
=
2
};
};
};
}
}
#endif
#endif
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment