Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
9cee8f9c
Commit
9cee8f9c
authored
Feb 02, 2010
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
problem with PGM messages larger than 1 MTU fixed
parent
27e47bdc
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
54 additions
and
50 deletions
+54
-50
pgm_receiver.cpp
src/pgm_receiver.cpp
+54
-50
No files found.
src/pgm_receiver.cpp
View file @
9cee8f9c
...
...
@@ -93,72 +93,76 @@ void zmq::pgm_receiver_t::in_event ()
// Read data from the underlying pgm_socket.
unsigned
char
*
data
=
NULL
;
const
pgm_tsi_t
*
tsi
=
NULL
;
ssize_t
received
=
pgm_socket
.
receive
((
void
**
)
&
data
,
&
tsi
);
// No data to process. This may happen if the packet received is
// neither ODATA nor ODATA.
if
(
received
==
0
)
return
;
// Find the peer based on its TSI.
peers_t
::
iterator
it
=
peers
.
find
(
*
tsi
);
// Data loss. Delete decoder and mark the peer as disjoint.
if
(
received
==
-
1
)
{
zmq_assert
(
it
!=
peers
.
end
());
it
->
second
.
joined
=
false
;
if
(
it
->
second
.
decoder
!=
NULL
)
{
delete
it
->
second
.
decoder
;
it
->
second
.
decoder
=
NULL
;
// TODO: This loop can effectively block other engines in the same I/O
// thread in the case of high load.
while
(
true
)
{
// Get new batch of data.
ssize_t
received
=
pgm_socket
.
receive
((
void
**
)
&
data
,
&
tsi
);
// No data to process. This may happen if the packet received is
// neither ODATA nor ODATA.
if
(
received
==
0
)
break
;
// Find the peer based on its TSI.
peers_t
::
iterator
it
=
peers
.
find
(
*
tsi
);
// Data loss. Delete decoder and mark the peer as disjoint.
if
(
received
==
-
1
)
{
zmq_assert
(
it
!=
peers
.
end
());
it
->
second
.
joined
=
false
;
if
(
it
->
second
.
decoder
!=
NULL
)
{
delete
it
->
second
.
decoder
;
it
->
second
.
decoder
=
NULL
;
}
break
;
}
return
;
}
// New peer. Add it to the list of know but unjoint peers.
if
(
it
==
peers
.
end
())
{
peer_info_t
peer_info
=
{
false
,
NULL
};
it
=
peers
.
insert
(
std
::
make_pair
(
*
tsi
,
peer_info
)).
first
;
}
// New peer. Add it to the list of know but unjoint peers.
if
(
it
==
peers
.
end
())
{
peer_info_t
peer_info
=
{
false
,
NULL
};
it
=
peers
.
insert
(
std
::
make_pair
(
*
tsi
,
peer_info
)).
first
;
}
// Read the offset of the fist message in the current packet.
zmq_assert
((
size_t
)
received
>=
sizeof
(
uint16_t
));
uint16_t
offset
=
get_uint16
(
data
);
data
+=
sizeof
(
uint16_t
);
received
-=
sizeof
(
uint16_t
);
// Read the offset of the fist message in the current packet.
zmq_assert
((
size_t
)
received
>=
sizeof
(
uint16_t
));
uint16_t
offset
=
get_uint16
(
data
);
data
+=
sizeof
(
uint16_t
);
received
-=
sizeof
(
uint16_t
);
// Join the stream if needed.
if
(
!
it
->
second
.
joined
)
{
// Join the stream if needed.
if
(
!
it
->
second
.
joined
)
{
// There is no beginning of the message in current packet.
// Ignore the data.
if
(
offset
==
0xffff
)
return
;
// There is no beginning of the message in current packet.
// Ignore the data.
if
(
offset
==
0xffff
)
continue
;
zmq_assert
(
offset
<=
received
);
zmq_assert
(
it
->
second
.
decoder
==
NULL
);
zmq_assert
(
offset
<=
received
);
zmq_assert
(
it
->
second
.
decoder
==
NULL
);
// We have to move data to the begining of the first message.
data
+=
offset
;
received
-=
offset
;
// We have to move data to the begining of the first message.
data
+=
offset
;
received
-=
offset
;
// Mark the stream as joined.
it
->
second
.
joined
=
true
;
// Mark the stream as joined.
it
->
second
.
joined
=
true
;
// Create and connect decoder for the peer.
it
->
second
.
decoder
=
new
(
std
::
nothrow
)
zmq_decoder_t
(
0
,
NULL
,
0
);
it
->
second
.
decoder
->
set_inout
(
inout
);
}
// Create and connect decoder for the peer.
it
->
second
.
decoder
=
new
(
std
::
nothrow
)
zmq_decoder_t
(
0
,
NULL
,
0
);
it
->
second
.
decoder
->
set_inout
(
inout
);
}
if
(
received
)
{
// Push all the data to the decoder.
// TODO: process_buffer may not process entire buffer!
ssize_t
processed
=
it
->
second
.
decoder
->
process_buffer
(
data
,
received
);
zmq_assert
(
processed
==
received
);
// Flush any messages decoder may have produced.
inout
->
flush
();
}
// Flush any messages decoder may have produced.
inout
->
flush
();
}
#endif
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment