Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
c179ad11
Commit
c179ad11
authored
Nov 13, 2012
by
Pieter Hintjens
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #468 from hurtonm/issue_465
Resolve LIBZMQ-465
parents
99f71444
c543b2ce
Show whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
34 additions
and
27 deletions
+34
-27
decoder.cpp
src/decoder.cpp
+0
-5
decoder.hpp
src/decoder.hpp
+18
-6
i_decoder.hpp
src/i_decoder.hpp
+1
-1
raw_decoder.cpp
src/raw_decoder.cpp
+1
-1
raw_decoder.hpp
src/raw_decoder.hpp
+1
-1
stream_engine.cpp
src/stream_engine.cpp
+10
-5
stream_engine.hpp
src/stream_engine.hpp
+3
-1
v1_decoder.cpp
src/v1_decoder.cpp
+0
-5
v1_decoder.hpp
src/v1_decoder.hpp
+0
-2
No files found.
src/decoder.cpp
View file @
c179ad11
...
@@ -57,11 +57,6 @@ void zmq::decoder_t::set_msg_sink (i_msg_sink *msg_sink_)
...
@@ -57,11 +57,6 @@ void zmq::decoder_t::set_msg_sink (i_msg_sink *msg_sink_)
msg_sink
=
msg_sink_
;
msg_sink
=
msg_sink_
;
}
}
bool
zmq
::
decoder_t
::
stalled
()
const
{
return
next
==
&
decoder_t
::
message_ready
;
}
bool
zmq
::
decoder_t
::
one_byte_size_ready
()
bool
zmq
::
decoder_t
::
one_byte_size_ready
()
{
{
// First byte of size is read. If it is 0xff read 8-byte size.
// First byte of size is read. If it is 0xff read 8-byte size.
...
...
src/decoder.hpp
View file @
c179ad11
...
@@ -143,6 +143,22 @@ namespace zmq
...
@@ -143,6 +143,22 @@ namespace zmq
}
}
}
}
// Returns true if the decoder has been fed all required data
// but cannot proceed with the next decoding step.
// False is returned if the decoder has encountered an error.
bool
stalled
()
{
while
(
!
to_read
)
{
if
(
!
(
static_cast
<
T
*>
(
this
)
->*
next
)
())
{
if
(
unlikely
(
!
(
static_cast
<
T
*>
(
this
)
->
next
)))
return
false
;
return
true
;
}
}
return
false
;
}
inline
bool
message_ready_size
(
size_t
msg_sz
)
inline
bool
message_ready_size
(
size_t
msg_sz
)
{
{
zmq_assert
(
false
);
zmq_assert
(
false
);
...
@@ -172,13 +188,13 @@ namespace zmq
...
@@ -172,13 +188,13 @@ namespace zmq
next
=
NULL
;
next
=
NULL
;
}
}
private
:
// Next step. If set to NULL, it means that associated data stream
// Next step. If set to NULL, it means that associated data stream
// is dead. Note that there can be still data in the process in such
// is dead. Note that there can be still data in the process in such
// case.
// case.
step_t
next
;
step_t
next
;
private
:
// Where to store the read data.
// Where to store the read data.
unsigned
char
*
read_pos
;
unsigned
char
*
read_pos
;
...
@@ -205,10 +221,6 @@ namespace zmq
...
@@ -205,10 +221,6 @@ namespace zmq
// Set the receiver of decoded messages.
// Set the receiver of decoded messages.
void
set_msg_sink
(
i_msg_sink
*
msg_sink_
);
void
set_msg_sink
(
i_msg_sink
*
msg_sink_
);
// Returns true if there is a decoded message
// waiting to be delivered to the session.
bool
stalled
()
const
;
private
:
private
:
bool
one_byte_size_ready
();
bool
one_byte_size_ready
();
...
...
src/i_decoder.hpp
View file @
c179ad11
...
@@ -40,7 +40,7 @@ namespace zmq
...
@@ -40,7 +40,7 @@ namespace zmq
virtual
size_t
process_buffer
(
unsigned
char
*
data_
,
size_t
size_
)
=
0
;
virtual
size_t
process_buffer
(
unsigned
char
*
data_
,
size_t
size_
)
=
0
;
virtual
bool
stalled
()
const
=
0
;
virtual
bool
stalled
()
=
0
;
virtual
bool
message_ready_size
(
size_t
msg_sz
)
=
0
;
virtual
bool
message_ready_size
(
size_t
msg_sz
)
=
0
;
};
};
...
...
src/raw_decoder.cpp
View file @
c179ad11
...
@@ -53,7 +53,7 @@ void zmq::raw_decoder_t::set_msg_sink (i_msg_sink *msg_sink_)
...
@@ -53,7 +53,7 @@ void zmq::raw_decoder_t::set_msg_sink (i_msg_sink *msg_sink_)
msg_sink
=
msg_sink_
;
msg_sink
=
msg_sink_
;
}
}
bool
zmq
::
raw_decoder_t
::
stalled
()
const
bool
zmq
::
raw_decoder_t
::
stalled
()
{
{
return
false
;
return
false
;
}
}
...
...
src/raw_decoder.hpp
View file @
c179ad11
...
@@ -45,7 +45,7 @@ namespace zmq
...
@@ -45,7 +45,7 @@ namespace zmq
// i_decoder interface.
// i_decoder interface.
virtual
void
set_msg_sink
(
i_msg_sink
*
msg_sink_
);
virtual
void
set_msg_sink
(
i_msg_sink
*
msg_sink_
);
virtual
bool
stalled
()
const
;
virtual
bool
stalled
();
virtual
bool
message_ready_size
(
size_t
msg_sz
);
virtual
bool
message_ready_size
(
size_t
msg_sz
);
...
...
src/stream_engine.cpp
View file @
c179ad11
...
@@ -52,10 +52,10 @@
...
@@ -52,10 +52,10 @@
zmq
::
stream_engine_t
::
stream_engine_t
(
fd_t
fd_
,
const
options_t
&
options_
,
const
std
::
string
&
endpoint_
)
:
zmq
::
stream_engine_t
::
stream_engine_t
(
fd_t
fd_
,
const
options_t
&
options_
,
const
std
::
string
&
endpoint_
)
:
s
(
fd_
),
s
(
fd_
),
io_enabled
(
false
),
inpos
(
NULL
),
inpos
(
NULL
),
insize
(
0
),
insize
(
0
),
decoder
(
NULL
),
decoder
(
NULL
),
input_error
(
false
),
outpos
(
NULL
),
outpos
(
NULL
),
outsize
(
0
),
outsize
(
0
),
encoder
(
NULL
),
encoder
(
NULL
),
...
@@ -134,6 +134,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
...
@@ -134,6 +134,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
// Connect to I/O threads poller object.
// Connect to I/O threads poller object.
io_object_t
::
plug
(
io_thread_
);
io_object_t
::
plug
(
io_thread_
);
handle
=
add_fd
(
s
);
handle
=
add_fd
(
s
);
io_enabled
=
true
;
if
(
options
.
raw_sock
)
{
if
(
options
.
raw_sock
)
{
// no handshaking for raw sock, instantiate raw encoder and decoders
// no handshaking for raw sock, instantiate raw encoder and decoders
...
@@ -169,7 +170,10 @@ void zmq::stream_engine_t::unplug ()
...
@@ -169,7 +170,10 @@ void zmq::stream_engine_t::unplug ()
plugged
=
false
;
plugged
=
false
;
// Cancel all fd subscriptions.
// Cancel all fd subscriptions.
if
(
io_enabled
)
{
rm_fd
(
handle
);
rm_fd
(
handle
);
io_enabled
=
false
;
}
// Disconnect from I/O threads poller object.
// Disconnect from I/O threads poller object.
io_object_t
::
unplug
();
io_object_t
::
unplug
();
...
@@ -250,9 +254,10 @@ void zmq::stream_engine_t::in_event ()
...
@@ -250,9 +254,10 @@ void zmq::stream_engine_t::in_event ()
// waiting for input events and postpone the termination
// waiting for input events and postpone the termination
// until after the session has accepted the message.
// until after the session has accepted the message.
if
(
disconnection
)
{
if
(
disconnection
)
{
input_error
=
true
;
if
(
decoder
->
stalled
())
{
if
(
decoder
->
stalled
())
rm_fd
(
handle
);
reset_pollin
(
handle
);
io_enabled
=
false
;
}
else
else
error
();
error
();
}
}
...
@@ -319,7 +324,7 @@ void zmq::stream_engine_t::activate_out ()
...
@@ -319,7 +324,7 @@ void zmq::stream_engine_t::activate_out ()
void
zmq
::
stream_engine_t
::
activate_in
()
void
zmq
::
stream_engine_t
::
activate_in
()
{
{
if
(
input_error
)
{
if
(
unlikely
(
!
io_enabled
)
)
{
// There was an input error but the engine could not
// There was an input error but the engine could not
// be terminated (due to the stalled decoder).
// be terminated (due to the stalled decoder).
// Flush the pending message and terminate the engine now.
// Flush the pending message and terminate the engine now.
...
...
src/stream_engine.hpp
View file @
c179ad11
...
@@ -96,12 +96,14 @@ namespace zmq
...
@@ -96,12 +96,14 @@ namespace zmq
// Preamble (10 bytes) + version (1 byte) + socket type (1 byte).
// Preamble (10 bytes) + version (1 byte) + socket type (1 byte).
const
static
size_t
greeting_size
=
12
;
const
static
size_t
greeting_size
=
12
;
// True iff we are registered with an I/O poller.
bool
io_enabled
;
handle_t
handle
;
handle_t
handle
;
unsigned
char
*
inpos
;
unsigned
char
*
inpos
;
size_t
insize
;
size_t
insize
;
i_decoder
*
decoder
;
i_decoder
*
decoder
;
bool
input_error
;
unsigned
char
*
outpos
;
unsigned
char
*
outpos
;
size_t
outsize
;
size_t
outsize
;
...
...
src/v1_decoder.cpp
View file @
c179ad11
...
@@ -58,11 +58,6 @@ void zmq::v1_decoder_t::set_msg_sink (i_msg_sink *msg_sink_)
...
@@ -58,11 +58,6 @@ void zmq::v1_decoder_t::set_msg_sink (i_msg_sink *msg_sink_)
msg_sink
=
msg_sink_
;
msg_sink
=
msg_sink_
;
}
}
bool
zmq
::
v1_decoder_t
::
stalled
()
const
{
return
next
==
&
v1_decoder_t
::
message_ready
;
}
bool
zmq
::
v1_decoder_t
::
flags_ready
()
bool
zmq
::
v1_decoder_t
::
flags_ready
()
{
{
msg_flags
=
0
;
msg_flags
=
0
;
...
...
src/v1_decoder.hpp
View file @
c179ad11
...
@@ -44,8 +44,6 @@ namespace zmq
...
@@ -44,8 +44,6 @@ namespace zmq
// i_decoder interface.
// i_decoder interface.
virtual
void
set_msg_sink
(
i_msg_sink
*
msg_sink_
);
virtual
void
set_msg_sink
(
i_msg_sink
*
msg_sink_
);
virtual
bool
stalled
()
const
;
private
:
private
:
bool
flags_ready
();
bool
flags_ready
();
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment