Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
dcd1f203
Commit
dcd1f203
authored
May 05, 2014
by
Martin Hurton
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Minor renaming
parent
2c786a20
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
29 additions
and
30 deletions
+29
-30
plain_mechanism.cpp
src/plain_mechanism.cpp
+0
-1
stream_engine.cpp
src/stream_engine.cpp
+25
-25
stream_engine.hpp
src/stream_engine.hpp
+4
-4
No files found.
src/plain_mechanism.cpp
View file @
dcd1f203
...
@@ -22,7 +22,6 @@
...
@@ -22,7 +22,6 @@
#include "windows.hpp"
#include "windows.hpp"
#endif
#endif
#include <string.h>
#include <string>
#include <string>
#include "msg.hpp"
#include "msg.hpp"
...
...
src/stream_engine.cpp
View file @
dcd1f203
...
@@ -73,8 +73,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
...
@@ -73,8 +73,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
options
(
options_
),
options
(
options_
),
endpoint
(
endpoint_
),
endpoint
(
endpoint_
),
plugged
(
false
),
plugged
(
false
),
read_msg
(
&
stream_engine_t
::
read_identity
),
next_msg
(
&
stream_engine_t
::
identity_msg
),
write_msg
(
&
stream_engine_t
::
write_identity
),
process_msg
(
&
stream_engine_t
::
process_identity_msg
),
io_error
(
false
),
io_error
(
false
),
subscription_required
(
false
),
subscription_required
(
false
),
mechanism
(
NULL
),
mechanism
(
NULL
),
...
@@ -185,14 +185,14 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
...
@@ -185,14 +185,14 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
// disable handshaking for raw socket
// disable handshaking for raw socket
handshaking
=
false
;
handshaking
=
false
;
read
_msg
=
&
stream_engine_t
::
pull_msg_from_session
;
next
_msg
=
&
stream_engine_t
::
pull_msg_from_session
;
write
_msg
=
&
stream_engine_t
::
push_msg_to_session
;
process
_msg
=
&
stream_engine_t
::
push_msg_to_session
;
// For raw sockets, send an initial 0-length message to the
// For raw sockets, send an initial 0-length message to the
// application so that it knows a peer has connected.
// application so that it knows a peer has connected.
msg_t
connector
;
msg_t
connector
;
connector
.
init
();
connector
.
init
();
(
this
->*
write_msg
)
(
&
connector
);
push_msg_to_session
(
&
connector
);
connector
.
close
();
connector
.
close
();
session
->
flush
();
session
->
flush
();
}
}
...
@@ -286,7 +286,7 @@ void zmq::stream_engine_t::in_event ()
...
@@ -286,7 +286,7 @@ void zmq::stream_engine_t::in_event ()
insize
-=
processed
;
insize
-=
processed
;
if
(
rc
==
0
||
rc
==
-
1
)
if
(
rc
==
0
||
rc
==
-
1
)
break
;
break
;
rc
=
(
this
->*
write
_msg
)
(
decoder
->
msg
());
rc
=
(
this
->*
process
_msg
)
(
decoder
->
msg
());
if
(
rc
==
-
1
)
if
(
rc
==
-
1
)
break
;
break
;
}
}
...
@@ -324,7 +324,7 @@ void zmq::stream_engine_t::out_event ()
...
@@ -324,7 +324,7 @@ void zmq::stream_engine_t::out_event ()
outsize
=
encoder
->
encode
(
&
outpos
,
0
);
outsize
=
encoder
->
encode
(
&
outpos
,
0
);
while
(
outsize
<
out_batch_size
)
{
while
(
outsize
<
out_batch_size
)
{
if
((
this
->*
read
_msg
)
(
&
tx_msg
)
==
-
1
)
if
((
this
->*
next
_msg
)
(
&
tx_msg
)
==
-
1
)
break
;
break
;
encoder
->
load_msg
(
&
tx_msg
);
encoder
->
load_msg
(
&
tx_msg
);
unsigned
char
*
bufptr
=
outpos
+
outsize
;
unsigned
char
*
bufptr
=
outpos
+
outsize
;
...
@@ -391,7 +391,7 @@ void zmq::stream_engine_t::restart_input ()
...
@@ -391,7 +391,7 @@ void zmq::stream_engine_t::restart_input ()
zmq_assert
(
session
!=
NULL
);
zmq_assert
(
session
!=
NULL
);
zmq_assert
(
decoder
!=
NULL
);
zmq_assert
(
decoder
!=
NULL
);
int
rc
=
(
this
->*
write
_msg
)
(
decoder
->
msg
());
int
rc
=
(
this
->*
process
_msg
)
(
decoder
->
msg
());
if
(
rc
==
-
1
)
{
if
(
rc
==
-
1
)
{
if
(
errno
==
EAGAIN
)
if
(
errno
==
EAGAIN
)
session
->
flush
();
session
->
flush
();
...
@@ -408,7 +408,7 @@ void zmq::stream_engine_t::restart_input ()
...
@@ -408,7 +408,7 @@ void zmq::stream_engine_t::restart_input ()
insize
-=
processed
;
insize
-=
processed
;
if
(
rc
==
0
||
rc
==
-
1
)
if
(
rc
==
0
||
rc
==
-
1
)
break
;
break
;
rc
=
(
this
->*
write
_msg
)
(
decoder
->
msg
());
rc
=
(
this
->*
process
_msg
)
(
decoder
->
msg
());
if
(
rc
==
-
1
)
if
(
rc
==
-
1
)
break
;
break
;
}
}
...
@@ -550,10 +550,10 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -550,10 +550,10 @@ bool zmq::stream_engine_t::handshake ()
// We are sending our identity now and the next message
// We are sending our identity now and the next message
// will come from the socket.
// will come from the socket.
read
_msg
=
&
stream_engine_t
::
pull_msg_from_session
;
next
_msg
=
&
stream_engine_t
::
pull_msg_from_session
;
// We are expecting identity message.
// We are expecting identity message.
write_msg
=
&
stream_engine_t
::
write_identity
;
process_msg
=
&
stream_engine_t
::
process_identity_msg
;
}
}
else
else
if
(
greeting_recv
[
revision_pos
]
==
ZMTP_1_0
)
{
if
(
greeting_recv
[
revision_pos
]
==
ZMTP_1_0
)
{
...
@@ -619,8 +619,8 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -619,8 +619,8 @@ bool zmq::stream_engine_t::handshake ()
error
();
error
();
return
false
;
return
false
;
}
}
read
_msg
=
&
stream_engine_t
::
next_handshake_command
;
next
_msg
=
&
stream_engine_t
::
next_handshake_command
;
write
_msg
=
&
stream_engine_t
::
process_handshake_command
;
process
_msg
=
&
stream_engine_t
::
process_handshake_command
;
}
}
// Start polling for output if necessary.
// Start polling for output if necessary.
...
@@ -634,17 +634,17 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -634,17 +634,17 @@ bool zmq::stream_engine_t::handshake ()
return
true
;
return
true
;
}
}
int
zmq
::
stream_engine_t
::
read_identity
(
msg_t
*
msg_
)
int
zmq
::
stream_engine_t
::
identity_msg
(
msg_t
*
msg_
)
{
{
int
rc
=
msg_
->
init_size
(
options
.
identity_size
);
int
rc
=
msg_
->
init_size
(
options
.
identity_size
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
if
(
options
.
identity_size
>
0
)
if
(
options
.
identity_size
>
0
)
memcpy
(
msg_
->
data
(),
options
.
identity
,
options
.
identity_size
);
memcpy
(
msg_
->
data
(),
options
.
identity
,
options
.
identity_size
);
read
_msg
=
&
stream_engine_t
::
pull_msg_from_session
;
next
_msg
=
&
stream_engine_t
::
pull_msg_from_session
;
return
0
;
return
0
;
}
}
int
zmq
::
stream_engine_t
::
write_identity
(
msg_t
*
msg_
)
int
zmq
::
stream_engine_t
::
process_identity_msg
(
msg_t
*
msg_
)
{
{
if
(
options
.
recv_identity
)
{
if
(
options
.
recv_identity
)
{
msg_
->
set_flags
(
msg_t
::
identity
);
msg_
->
set_flags
(
msg_t
::
identity
);
...
@@ -659,9 +659,9 @@ int zmq::stream_engine_t::write_identity (msg_t *msg_)
...
@@ -659,9 +659,9 @@ int zmq::stream_engine_t::write_identity (msg_t *msg_)
}
}
if
(
subscription_required
)
if
(
subscription_required
)
write
_msg
=
&
stream_engine_t
::
write_subscription_msg
;
process
_msg
=
&
stream_engine_t
::
write_subscription_msg
;
else
else
write
_msg
=
&
stream_engine_t
::
push_msg_to_session
;
process
_msg
=
&
stream_engine_t
::
push_msg_to_session
;
return
0
;
return
0
;
}
}
...
@@ -734,8 +734,8 @@ void zmq::stream_engine_t::mechanism_ready ()
...
@@ -734,8 +734,8 @@ void zmq::stream_engine_t::mechanism_ready ()
session
->
flush
();
session
->
flush
();
}
}
read
_msg
=
&
stream_engine_t
::
pull_and_encode
;
next
_msg
=
&
stream_engine_t
::
pull_and_encode
;
write
_msg
=
&
stream_engine_t
::
write_credential
;
process
_msg
=
&
stream_engine_t
::
write_credential
;
// Compile metadata.
// Compile metadata.
typedef
metadata_t
::
dict_t
properties_t
;
typedef
metadata_t
::
dict_t
properties_t
;
...
@@ -792,7 +792,7 @@ int zmq::stream_engine_t::write_credential (msg_t *msg_)
...
@@ -792,7 +792,7 @@ int zmq::stream_engine_t::write_credential (msg_t *msg_)
return
-
1
;
return
-
1
;
}
}
}
}
write
_msg
=
&
stream_engine_t
::
decode_and_push
;
process
_msg
=
&
stream_engine_t
::
decode_and_push
;
return
decode_and_push
(
msg_
);
return
decode_and_push
(
msg_
);
}
}
...
@@ -817,7 +817,7 @@ int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
...
@@ -817,7 +817,7 @@ int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
msg_
->
set_metadata
(
metadata
);
msg_
->
set_metadata
(
metadata
);
if
(
session
->
push_msg
(
msg_
)
==
-
1
)
{
if
(
session
->
push_msg
(
msg_
)
==
-
1
)
{
if
(
errno
==
EAGAIN
)
if
(
errno
==
EAGAIN
)
write
_msg
=
&
stream_engine_t
::
push_one_then_decode_and_push
;
process
_msg
=
&
stream_engine_t
::
push_one_then_decode_and_push
;
return
-
1
;
return
-
1
;
}
}
return
0
;
return
0
;
...
@@ -827,7 +827,7 @@ int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
...
@@ -827,7 +827,7 @@ int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
{
{
const
int
rc
=
session
->
push_msg
(
msg_
);
const
int
rc
=
session
->
push_msg
(
msg_
);
if
(
rc
==
0
)
if
(
rc
==
0
)
write
_msg
=
&
stream_engine_t
::
decode_and_push
;
process
_msg
=
&
stream_engine_t
::
decode_and_push
;
return
rc
;
return
rc
;
}
}
...
@@ -844,7 +844,7 @@ int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
...
@@ -844,7 +844,7 @@ int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
if
(
rc
==
-
1
)
if
(
rc
==
-
1
)
return
-
1
;
return
-
1
;
write
_msg
=
&
stream_engine_t
::
push_msg_to_session
;
process
_msg
=
&
stream_engine_t
::
push_msg_to_session
;
return
push_msg_to_session
(
msg_
);
return
push_msg_to_session
(
msg_
);
}
}
...
@@ -855,7 +855,7 @@ void zmq::stream_engine_t::error ()
...
@@ -855,7 +855,7 @@ void zmq::stream_engine_t::error ()
// so that it knows the peer has been disconnected.
// so that it knows the peer has been disconnected.
msg_t
terminator
;
msg_t
terminator
;
terminator
.
init
();
terminator
.
init
();
(
this
->*
write
_msg
)
(
&
terminator
);
(
this
->*
process
_msg
)
(
&
terminator
);
terminator
.
close
();
terminator
.
close
();
}
}
zmq_assert
(
session
);
zmq_assert
(
session
);
...
...
src/stream_engine.hpp
View file @
dcd1f203
...
@@ -93,8 +93,8 @@ namespace zmq
...
@@ -93,8 +93,8 @@ namespace zmq
// Zero indicates the peer has closed the connection.
// Zero indicates the peer has closed the connection.
int
read
(
void
*
data_
,
size_t
size_
);
int
read
(
void
*
data_
,
size_t
size_
);
int
read_identity
(
msg_t
*
msg_
);
int
identity_msg
(
msg_t
*
msg_
);
int
write_identity
(
msg_t
*
msg_
);
int
process_identity_msg
(
msg_t
*
msg_
);
int
next_handshake_command
(
msg_t
*
msg
);
int
next_handshake_command
(
msg_t
*
msg
);
int
process_handshake_command
(
msg_t
*
msg
);
int
process_handshake_command
(
msg_t
*
msg
);
...
@@ -168,9 +168,9 @@ namespace zmq
...
@@ -168,9 +168,9 @@ namespace zmq
bool
plugged
;
bool
plugged
;
int
(
stream_engine_t
::*
read
_msg
)
(
msg_t
*
msg_
);
int
(
stream_engine_t
::*
next
_msg
)
(
msg_t
*
msg_
);
int
(
stream_engine_t
::*
write
_msg
)
(
msg_t
*
msg_
);
int
(
stream_engine_t
::*
process
_msg
)
(
msg_t
*
msg_
);
bool
io_error
;
bool
io_error
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment