Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
5852db45
Commit
5852db45
authored
Dec 28, 2009
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
PGM code cleanup
parent
aebff623
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
107 additions
and
201 deletions
+107
-201
pgm_receiver.cpp
src/pgm_receiver.cpp
+57
-91
pgm_receiver.hpp
src/pgm_receiver.hpp
+6
-7
pgm_sender.cpp
src/pgm_sender.cpp
+35
-59
pgm_sender.hpp
src/pgm_sender.hpp
+3
-13
pgm_socket.cpp
src/pgm_socket.cpp
+0
-0
pgm_socket.hpp
src/pgm_socket.hpp
+6
-31
No files found.
src/pgm_receiver.cpp
View file @
5852db45
...
...
@@ -27,9 +27,6 @@
#include "windows.hpp"
#endif
#include <pgm/pgm.h>
#include <iostream>
#include "pgm_receiver.hpp"
#include "err.hpp"
#include "stdint.hpp"
...
...
@@ -58,20 +55,12 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
void
zmq
::
pgm_receiver_t
::
plug
(
i_inout
*
inout_
)
{
//
Allocate 2 fds one for socket second for waiting pipe
.
//
Retrieve PGM fds and start polling
.
int
socket_fd
;
int
waiting_pipe_fd
;
// Fill socket_fd and waiting_pipe_fd from PGM transport
pgm_socket
.
get_receiver_fds
(
&
socket_fd
,
&
waiting_pipe_fd
);
// Add socket_fd into poller.
socket_handle
=
add_fd
(
socket_fd
);
// Add waiting_pipe_fd into poller.
pipe_handle
=
add_fd
(
waiting_pipe_fd
);
// Set POLLIN for both handlers.
set_pollin
(
pipe_handle
);
set_pollin
(
socket_handle
);
...
...
@@ -81,15 +70,16 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_)
void
zmq
::
pgm_receiver_t
::
unplug
()
{
// Delete decoders.
for
(
peer_t
::
iterator
it
=
peers
.
begin
();
it
!=
peers
.
end
();
it
++
)
{
for
(
peer
s
_t
::
iterator
it
=
peers
.
begin
();
it
!=
peers
.
end
();
it
++
)
{
if
(
it
->
second
.
decoder
!=
NULL
)
delete
it
->
second
.
decoder
;
}
peers
.
clear
();
// Stop polling.
rm_fd
(
socket_handle
);
rm_fd
(
pipe_handle
);
inout
=
NULL
;
}
...
...
@@ -98,101 +88,77 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert
(
false
);
}
// POLLIN event from socket or waiting_pipe.
void
zmq
::
pgm_receiver_t
::
in_event
()
{
// Iterator to peers map.
peer_t
::
iterator
it
;
// Data from PGM socket.
unsigned
char
*
raw_data
=
NULL
;
// Read data from the underlying pgm_socket.
unsigned
char
*
data
=
NULL
;
const
pgm_tsi_t
*
tsi
=
NULL
;
ssize_t
nbytes
=
0
;
do
{
// Read data from underlying pgm_socket.
nbytes
=
pgm_socket
.
receive
((
void
**
)
&
raw_data
,
&
tsi
);
// No ODATA or RDATA.
if
(
!
nbytes
)
break
;
ssize_t
received
=
pgm_socket
.
receive
((
void
**
)
&
data
,
&
tsi
);
// Fid TSI in peers list.
it
=
peers
.
find
(
*
tsi
);
// No data to process. This may happen if the packet received is
// neither ODATA nor ODATA.
if
(
received
==
0
)
return
;
// Data loss
.
if
(
nbytes
==
-
1
)
{
// Find the peer based on its TSI
.
peers_t
::
iterator
it
=
peers
.
find
(
*
tsi
);
zmq_assert
(
it
!=
peers
.
end
());
// Delete decoder and set joined to false.
it
->
second
.
joined
=
false
;
if
(
it
->
second
.
decoder
!=
NULL
)
{
delete
it
->
second
.
decoder
;
it
->
second
.
decoder
=
NULL
;
}
break
;
// Data loss. Delete decoder and mark the peer as disjoint.
if
(
received
==
-
1
)
{
zmq_assert
(
it
!=
peers
.
end
());
it
->
second
.
joined
=
false
;
if
(
it
->
second
.
decoder
!=
NULL
)
{
delete
it
->
second
.
decoder
;
it
->
second
.
decoder
=
NULL
;
}
return
;
}
// Read offset of the fist message in current APDU.
zmq_assert
((
size_t
)
nbytes
>=
sizeof
(
uint16_t
));
uint16_t
apdu_offset
=
get_uint16
(
raw_data
);
// New peer. Add it to the list of know but unjoint peers.
if
(
it
==
peers
.
end
())
{
peer_info_t
peer_info
=
{
false
,
NULL
};
it
=
peers
.
insert
(
std
::
make_pair
(
*
tsi
,
peer_info
)).
first
;
}
// Shift raw_data & decrease nbytes by the first message offset
// information (sizeof uint16_t).
raw_data
+=
sizeof
(
uint16_t
);
nbytes
-=
sizeof
(
uint16_t
);
// Read the offset of the fist message in the current packet.
zmq_assert
((
size_t
)
received
>=
sizeof
(
uint16_t
));
uint16_t
offset
=
get_uint16
(
data
);
data
+=
sizeof
(
uint16_t
);
received
-=
sizeof
(
uint16_t
);
// New peer.
if
(
it
==
peers
.
end
())
{
peer_info_t
peer_info
=
{
false
,
NULL
};
it
=
peers
.
insert
(
std
::
make_pair
(
*
tsi
,
peer_info
)).
first
;
}
// Join the stream if needed.
if
(
!
it
->
second
.
joined
)
{
// There is not beginning of the message in current APDU and we
// are not joined jet -> throwing data.
if
(
apdu_offset
==
0xFFFF
&&
!
it
->
second
.
joined
)
{
break
;
}
// There is no beginning of the message in current packet.
// Ignore the data.
if
(
offset
==
0xffff
)
return
;
// Now is the possibility to join the stream.
if
(
!
it
->
second
.
joined
)
{
zmq_assert
(
apdu_offset
<=
nbytes
);
zmq_assert
(
it
->
second
.
decoder
==
NULL
);
zmq_assert
(
offset
<=
received
);
zmq_assert
(
it
->
second
.
decoder
==
NULL
);
// We have to move data to the begining of the first message.
raw_data
+=
apdu_
offset
;
nbytes
-=
apdu_
offset
;
// We have to move data to the begining of the first message.
data
+=
offset
;
received
-=
offset
;
// Joined the stream
.
it
->
second
.
joined
=
true
;
// Mark the stream as joined
.
it
->
second
.
joined
=
true
;
// Create and connect decoder for joined peer.
it
->
second
.
decoder
=
new
(
std
::
nothrow
)
zmq_decoder_t
(
0
,
NULL
,
0
);
it
->
second
.
decoder
->
set_inout
(
inout
);
}
if
(
nbytes
>
0
)
{
// Push all the data to the decoder.
// TODO: process_buffer may not process entire buffer!
it
->
second
.
decoder
->
process_buffer
(
raw_data
,
nbytes
);
}
// Create and connect decoder for the peer.
it
->
second
.
decoder
=
new
(
std
::
nothrow
)
zmq_decoder_t
(
0
,
NULL
,
0
);
it
->
second
.
decoder
->
set_inout
(
inout
);
}
}
while
(
nbytes
>
0
);
if
(
received
)
{
// Flush any messages decoder may have produced to the dispatch
er.
inout
->
flush
();
}
// Push all the data to the decod
er.
// TODO: process_buffer may not process entire buffer!
size_t
processed
=
it
->
second
.
decoder
->
process_buffer
(
data
,
received
);
zmq_assert
(
processed
==
received
);
void
zmq
::
pgm_receiver_t
::
out_event
()
{
zmq_assert
(
false
);
// Flush any messages decoder may have produced.
inout
->
flush
();
}
}
#endif
...
...
src/pgm_receiver.hpp
View file @
5852db45
...
...
@@ -29,7 +29,7 @@
#endif
#include <map>
#include <
pgm/pgm.h
>
#include <
algorithm
>
#include "io_object.hpp"
#include "i_engine.hpp"
...
...
@@ -45,8 +45,6 @@ namespace zmq
public
:
// Creates gm_engine. Underlying PGM connection is initialised
// using network_ parameter.
pgm_receiver_t
(
class
io_thread_t
*
parent_
,
const
options_t
&
options_
);
~
pgm_receiver_t
();
...
...
@@ -59,11 +57,12 @@ namespace zmq
// i_poll_events interface implementation.
void
in_event
();
void
out_event
();
private
:
// Map to hold TSI, joined and decoder for each peer.
// If joined is true we are already getting messages from the peer.
// It it's false, we are getting data but still we haven't seen
// beginning of a message.
struct
peer_info_t
{
bool
joined
;
...
...
@@ -84,8 +83,8 @@ namespace zmq
}
};
typedef
std
::
map
<
pgm_tsi_t
,
peer_info_t
,
tsi_comp
>
peer_t
;
peer_t
peers
;
typedef
std
::
map
<
pgm_tsi_t
,
peer_info_t
,
tsi_comp
>
peer
s
_t
;
peer
s
_t
peers
;
// PGM socket.
pgm_socket_t
pgm_socket
;
...
...
src/pgm_sender.cpp
View file @
5852db45
...
...
@@ -25,12 +25,13 @@
#include "windows.hpp"
#endif
#include <
iostream
>
#include <
stdlib.h
>
#include "io_thread.hpp"
#include "pgm_sender.hpp"
#include "err.hpp"
#include "wire.hpp"
#include "stdint.hpp"
zmq
::
pgm_sender_t
::
pgm_sender_t
(
io_thread_t
*
parent_
,
const
options_t
&
options_
)
:
...
...
@@ -38,18 +39,21 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
encoder
(
0
,
false
),
pgm_socket
(
false
,
options_
),
options
(
options_
),
inout
(
NULL
),
out_buffer
(
NULL
),
out_buffer_size
(
0
),
write_size
(
0
),
write_pos
(
0
),
first_message_offset
(
-
1
)
write_size
(
0
)
{
}
int
zmq
::
pgm_sender_t
::
init
(
bool
udp_encapsulation_
,
const
char
*
network_
)
{
return
pgm_socket
.
init
(
udp_encapsulation_
,
network_
);
int
rc
=
pgm_socket
.
init
(
udp_encapsulation_
,
network_
);
if
(
rc
!=
0
)
return
rc
;
out_buffer_size
=
pgm_socket
.
get_max_tsdu_size
();
out_buffer
=
(
unsigned
char
*
)
malloc
(
out_buffer_size
);
zmq_assert
(
out_buffer
);
}
void
zmq
::
pgm_sender_t
::
plug
(
i_inout
*
inout_
)
...
...
@@ -61,17 +65,11 @@ void zmq::pgm_sender_t::plug (i_inout *inout_)
encoder
.
set_inout
(
inout_
);
// Fill fds from PGM transport.
pgm_socket
.
get_sender_fds
(
&
downlink_socket_fd
,
&
uplink_socket_fd
,
&
rdata_notify_fd
);
// Add downlink_socket_fd into poller.
// Fill fds from PGM transport and add them to the poller.
pgm_socket
.
get_sender_fds
(
&
downlink_socket_fd
,
&
uplink_socket_fd
,
&
rdata_notify_fd
);
handle
=
add_fd
(
downlink_socket_fd
);
// Add uplink_socket_fd into the poller.
uplink_handle
=
add_fd
(
uplink_socket_fd
);
// Add rdata_notify_fd into the poller.
rdata_notify_handle
=
add_fd
(
rdata_notify_fd
);
// Set POLLIN. We wont never want to stop polling for uplink = we never
...
...
@@ -81,8 +79,6 @@ void zmq::pgm_sender_t::plug (i_inout *inout_)
// Set POLLOUT for downlink_socket_handle.
set_pollout
(
handle
);
inout
=
inout_
;
}
void
zmq
::
pgm_sender_t
::
unplug
()
...
...
@@ -91,7 +87,6 @@ void zmq::pgm_sender_t::unplug ()
rm_fd
(
uplink_handle
);
rm_fd
(
rdata_notify_handle
);
encoder
.
set_inout
(
NULL
);
inout
=
NULL
;
}
void
zmq
::
pgm_sender_t
::
revive
()
...
...
@@ -103,14 +98,14 @@ void zmq::pgm_sender_t::revive ()
zmq
::
pgm_sender_t
::~
pgm_sender_t
()
{
if
(
out_buffer
)
{
pgm_socket
.
free_buffer
(
out_buffer
);
free
(
out_buffer
);
out_buffer
=
NULL
;
}
}
// In event on sender side means NAK or SPMR receiving from some peer.
void
zmq
::
pgm_sender_t
::
in_event
()
{
// In event on sender side means NAK or SPMR receiving from some peer.
pgm_socket
.
process_upstream
();
}
...
...
@@ -118,55 +113,36 @@ void zmq::pgm_sender_t::out_event ()
{
// POLLOUT event from send socket. If write buffer is empty,
// try to read new data from the encoder.
if
(
write_pos
==
write_size
)
{
// Get buffer if we do not have already one.
if
(
!
out_buffer
)
{
out_buffer
=
(
unsigned
char
*
)
pgm_socket
.
get_buffer
(
&
out_buffer_size
);
}
if
(
write_size
==
0
)
{
assert
(
out_buffer_size
>
0
);
// First two bytes /sizeof (uint16_t)/ are used to store message
// offset in following steps.
// First two bytes (sizeof uint16_t) are used to store message
// offset in following steps. Note that by passing our buffer to
// the get data function we prevent it from returning its own buffer.
unsigned
char
*
bf
=
out_buffer
+
sizeof
(
uint16_t
);
write_size
=
out_buffer_size
-
sizeof
(
uint16_t
);
encoder
.
get_data
(
&
bf
,
&
write_size
,
&
first_message_offset
)
;
write_pos
=
0
;
size_t
bfsz
=
out_buffer_size
-
sizeof
(
uint16_t
);
int
offset
=
-
1
;
encoder
.
get_data
(
&
bf
,
&
bfsz
,
&
offset
)
;
// If there are no data to write stop polling for output.
if
(
!
write_size
)
{
if
(
!
bfsz
)
{
reset_pollout
(
handle
);
}
else
{
// Addning uint16_t for offset in a case when encoder returned > 0B.
write_size
+=
sizeof
(
uint16_t
);
return
;
}
}
// If there are any data to write, write them into the socket.
// Note that all data has to written in one write_one_pkt_with_offset call.
if
(
write_pos
<
write_size
)
{
size_t
nbytes
=
write_one_pkt_with_offset
(
out_buffer
+
write_pos
,
write_size
-
write_pos
,
(
uint16_t
)
first_message_offset
);
// We can write either all data or 0 which means rate limit reached.
zmq_assert
(
write_size
-
write_pos
==
nbytes
||
nbytes
==
0
);
write_pos
+=
nbytes
;
// Put offset information in the buffer.
write_size
=
bfsz
+
sizeof
(
uint16_t
);
put_uint16
(
out_buffer
,
offset
==
-
1
?
0xffff
:
(
uint16_t
)
offset
);
}
}
size_t
zmq
::
pgm_sender_t
::
write_one_pkt_with_offset
(
unsigned
char
*
data_
,
size_t
size_
,
uint16_t
offset_
)
{
// Put offset information in the buffer.
put_uint16
(
data_
,
offset_
);
// Send data.
size_t
nbytes
=
pgm_socket
.
send
(
data_
,
size_
);
// Send the data.
size_t
nbytes
=
pgm_socket
.
send
(
out_buffer
,
write_size
);
return
nbytes
;
// We can write either all data or 0 which means rate limit reached.
if
(
nbytes
==
write_size
)
write_size
=
0
;
else
zmq_assert
(
nbytes
==
0
);
}
#endif
src/pgm_sender.hpp
View file @
5852db45
...
...
@@ -42,6 +42,7 @@ namespace zmq
{
public
:
pgm_sender_t
(
class
io_thread_t
*
parent_
,
const
options_t
&
options_
);
~
pgm_sender_t
();
...
...
@@ -58,12 +59,6 @@ namespace zmq
private
:
// Send one APDU with first message offset information.
// Note that first 2 bytes in data_ are used to store the offset_
// and thus user data has to start at data_ + sizeof (uint16_t).
size_t
write_one_pkt_with_offset
(
unsigned
char
*
data_
,
size_t
size_
,
uint16_t
offset_
);
// Message encoder.
zmq_encoder_t
encoder
;
...
...
@@ -78,20 +73,15 @@ namespace zmq
handle_t
uplink_handle
;
handle_t
rdata_notify_handle
;
// Parent session.
i_inout
*
inout
;
// Output buffer from pgm_socket.
unsigned
char
*
out_buffer
;
// Output buffer size.
size_t
out_buffer_size
;
// Number of bytes in the buffer to be written to the socket.
// If zero, there are no data to be sent.
size_t
write_size
;
size_t
write_pos
;
// Offset of the first mesage in data chunk taken from encoder.
int
first_message_offset
;
pgm_sender_t
(
const
pgm_sender_t
&
);
void
operator
=
(
const
pgm_sender_t
&
);
...
...
src/pgm_socket.cpp
View file @
5852db45
This diff is collapsed.
Click to expand it.
src/pgm_socket.hpp
View file @
5852db45
...
...
@@ -30,7 +30,6 @@
#include <pgm/pgm.h>
#include "stdint.hpp"
#include "options.hpp"
namespace
zmq
...
...
@@ -62,11 +61,8 @@ namespace zmq
// Send data as one APDU, transmit window owned memory.
size_t
send
(
unsigned
char
*
data_
,
size_t
data_len_
);
// Allocates one slice for packet in tx window.
void
*
get_buffer
(
size_t
*
size_
);
// Fees memory allocated by get_buffer.
void
free_buffer
(
void
*
data_
);
// Returns max tsdu size without fragmentation.
size_t
get_max_tsdu_size
();
// Receive data from pgm socket.
ssize_t
receive
(
void
**
data_
,
const
pgm_tsi_t
**
tsi_
);
...
...
@@ -76,21 +72,9 @@ namespace zmq
void
process_upstream
();
private
:
// Open PGM transport.
int
open_transport
();
// Close transport.
void
close_transport
();
// OpenPGM transport
pgm_transport_t
*
transport
;
// Returns max tsdu size without fragmentation.
size_t
get_max_tsdu_size
();
// Returns maximum count of apdus which fills readbuf_size_
size_t
get_max_apdu_at_once
(
size_t
readbuf_size_
);
// Associated socket options.
options_t
options
;
...
...
@@ -98,19 +82,13 @@ namespace zmq
// true when pgm_socket should create receiving side.
bool
receiver
;
// TIBCO Rendezvous format network info.
char
network
[
256
];
// PGM transport port number.
uint16_t
port_number
;
// If we are using UDP encapsulation.
bool
udp_encapsulation
;
// Array of pgm_msgv_t structures to store received data
// Array of pgm_msgv_t structures to store received data
// from the socket (pgm_transport_recvmsgv).
pgm_msgv_t
*
pgm_msgv
;
// Size of pgm_msgv array.
size_t
pgm_msgv_len
;
// How many bytes were read from pgm socket.
size_t
nbytes_rec
;
...
...
@@ -119,9 +97,6 @@ namespace zmq
// How many messages from pgm_msgv were already sent up.
size_t
pgm_msgv_processed
;
// Size of pgm_msgv array.
size_t
pgm_msgv_len
;
};
}
#endif
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment