Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
cae5d3b8
Commit
cae5d3b8
authored
Nov 22, 2012
by
Pieter Hintjens
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #478 from methodmissing/event-messages-3.2
Event message memory corruption fixes
parents
d1cbf96c
359a5059
Show whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
122 additions
and
56 deletions
+122
-56
ipc_connecter.cpp
src/ipc_connecter.cpp
+4
-4
ipc_listener.cpp
src/ipc_listener.cpp
+5
-5
socket_base.cpp
src/socket_base.cpp
+43
-24
socket_base.hpp
src/socket_base.hpp
+19
-11
stream_engine.cpp
src/stream_engine.cpp
+1
-3
stream_engine.hpp
src/stream_engine.hpp
+1
-1
tcp_connecter.cpp
src/tcp_connecter.cpp
+4
-4
tcp_listener.cpp
src/tcp_listener.cpp
+4
-4
zmq.cpp
src/zmq.cpp
+41
-0
No files found.
src/ipc_connecter.cpp
View file @
cae5d3b8
...
@@ -122,7 +122,7 @@ void zmq::ipc_connecter_t::out_event ()
...
@@ -122,7 +122,7 @@ void zmq::ipc_connecter_t::out_event ()
// Shut the connecter down.
// Shut the connecter down.
terminate
();
terminate
();
socket
->
event_connected
(
endpoint
.
c_str
()
,
fd
);
socket
->
event_connected
(
endpoint
,
fd
);
}
}
void
zmq
::
ipc_connecter_t
::
timer_event
(
int
id_
)
void
zmq
::
ipc_connecter_t
::
timer_event
(
int
id_
)
...
@@ -150,7 +150,7 @@ void zmq::ipc_connecter_t::start_connecting ()
...
@@ -150,7 +150,7 @@ void zmq::ipc_connecter_t::start_connecting ()
handle
=
add_fd
(
s
);
handle
=
add_fd
(
s
);
handle_valid
=
true
;
handle_valid
=
true
;
set_pollout
(
handle
);
set_pollout
(
handle
);
socket
->
event_connect_delayed
(
endpoint
.
c_str
()
,
zmq_errno
());
socket
->
event_connect_delayed
(
endpoint
,
zmq_errno
());
}
}
// Handle any other error condition by eventual reconnect.
// Handle any other error condition by eventual reconnect.
...
@@ -165,7 +165,7 @@ void zmq::ipc_connecter_t::add_reconnect_timer()
...
@@ -165,7 +165,7 @@ void zmq::ipc_connecter_t::add_reconnect_timer()
{
{
int
rc_ivl
=
get_new_reconnect_ivl
();
int
rc_ivl
=
get_new_reconnect_ivl
();
add_timer
(
rc_ivl
,
reconnect_timer_id
);
add_timer
(
rc_ivl
,
reconnect_timer_id
);
socket
->
event_connect_retried
(
endpoint
.
c_str
()
,
rc_ivl
);
socket
->
event_connect_retried
(
endpoint
,
rc_ivl
);
timer_started
=
true
;
timer_started
=
true
;
}
}
...
@@ -226,7 +226,7 @@ int zmq::ipc_connecter_t::close ()
...
@@ -226,7 +226,7 @@ int zmq::ipc_connecter_t::close ()
zmq_assert
(
s
!=
retired_fd
);
zmq_assert
(
s
!=
retired_fd
);
int
rc
=
::
close
(
s
);
int
rc
=
::
close
(
s
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
socket
->
event_closed
(
endpoint
.
c_str
()
,
s
);
socket
->
event_closed
(
endpoint
,
s
);
s
=
retired_fd
;
s
=
retired_fd
;
return
0
;
return
0
;
}
}
...
...
src/ipc_listener.cpp
View file @
cae5d3b8
...
@@ -76,7 +76,7 @@ void zmq::ipc_listener_t::in_event ()
...
@@ -76,7 +76,7 @@ void zmq::ipc_listener_t::in_event ()
// If connection was reset by the peer in the meantime, just ignore it.
// If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if
(
fd
==
retired_fd
)
{
if
(
fd
==
retired_fd
)
{
socket
->
event_accept_failed
(
endpoint
.
c_str
()
,
zmq_errno
());
socket
->
event_accept_failed
(
endpoint
,
zmq_errno
());
return
;
return
;
}
}
...
@@ -96,7 +96,7 @@ void zmq::ipc_listener_t::in_event ()
...
@@ -96,7 +96,7 @@ void zmq::ipc_listener_t::in_event ()
session
->
inc_seqnum
();
session
->
inc_seqnum
();
launch_child
(
session
);
launch_child
(
session
);
send_attach
(
session
,
engine
,
false
);
send_attach
(
session
,
engine
,
false
);
socket
->
event_accepted
(
endpoint
.
c_str
()
,
fd
);
socket
->
event_accepted
(
endpoint
,
fd
);
}
}
int
zmq
::
ipc_listener_t
::
get_address
(
std
::
string
&
addr_
)
int
zmq
::
ipc_listener_t
::
get_address
(
std
::
string
&
addr_
)
...
@@ -155,7 +155,7 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
...
@@ -155,7 +155,7 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
if
(
rc
!=
0
)
if
(
rc
!=
0
)
goto
error
;
goto
error
;
socket
->
event_listening
(
endpoint
.
c_str
()
,
s
);
socket
->
event_listening
(
endpoint
,
s
);
return
0
;
return
0
;
error
:
error
:
...
@@ -178,12 +178,12 @@ int zmq::ipc_listener_t::close ()
...
@@ -178,12 +178,12 @@ int zmq::ipc_listener_t::close ()
if
(
has_file
&&
!
filename
.
empty
())
{
if
(
has_file
&&
!
filename
.
empty
())
{
rc
=
::
unlink
(
filename
.
c_str
());
rc
=
::
unlink
(
filename
.
c_str
());
if
(
rc
!=
0
)
{
if
(
rc
!=
0
)
{
socket
->
event_close_failed
(
endpoint
.
c_str
()
,
zmq_errno
());
socket
->
event_close_failed
(
endpoint
,
zmq_errno
());
return
-
1
;
return
-
1
;
}
}
}
}
socket
->
event_closed
(
endpoint
.
c_str
()
,
s
);
socket
->
event_closed
(
endpoint
,
s
);
return
0
;
return
0
;
}
}
...
...
src/socket_base.cpp
View file @
cae5d3b8
...
@@ -357,7 +357,7 @@ int zmq::socket_base_t::bind (const char *addr_)
...
@@ -357,7 +357,7 @@ int zmq::socket_base_t::bind (const char *addr_)
int
rc
=
listener
->
set_address
(
address
.
c_str
());
int
rc
=
listener
->
set_address
(
address
.
c_str
());
if
(
rc
!=
0
)
{
if
(
rc
!=
0
)
{
delete
listener
;
delete
listener
;
event_bind_failed
(
addr
_
,
zmq_errno
());
event_bind_failed
(
addr
ess
,
zmq_errno
());
return
-
1
;
return
-
1
;
}
}
...
@@ -376,7 +376,7 @@ int zmq::socket_base_t::bind (const char *addr_)
...
@@ -376,7 +376,7 @@ int zmq::socket_base_t::bind (const char *addr_)
int
rc
=
listener
->
set_address
(
address
.
c_str
());
int
rc
=
listener
->
set_address
(
address
.
c_str
());
if
(
rc
!=
0
)
{
if
(
rc
!=
0
)
{
delete
listener
;
delete
listener
;
event_bind_failed
(
addr
_
,
zmq_errno
());
event_bind_failed
(
addr
ess
,
zmq_errno
());
return
-
1
;
return
-
1
;
}
}
...
@@ -1047,122 +1047,141 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_)
...
@@ -1047,122 +1047,141 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_)
return
rc
;
return
rc
;
}
}
void
zmq
::
socket_base_t
::
event_connected
(
const
char
*
addr_
,
int
fd_
)
void
zmq
::
socket_base_t
::
event_connected
(
std
::
string
&
addr_
,
int
fd_
)
{
{
if
(
monitor_events
&
ZMQ_EVENT_CONNECTED
)
{
if
(
monitor_events
&
ZMQ_EVENT_CONNECTED
)
{
zmq_event_t
event
;
zmq_event_t
event
;
event
.
event
=
ZMQ_EVENT_CONNECTED
;
event
.
event
=
ZMQ_EVENT_CONNECTED
;
event
.
data
.
connected
.
addr
=
const_cast
<
char
*>
(
addr_
);
event
.
data
.
connected
.
addr
=
(
char
*
)
malloc
(
addr_
.
size
()
+
1
);
copy_monitor_address
(
event
.
data
.
connected
.
addr
,
addr_
);
event
.
data
.
connected
.
fd
=
fd_
;
event
.
data
.
connected
.
fd
=
fd_
;
monitor_event
(
event
);
monitor_event
(
event
);
}
}
}
}
void
zmq
::
socket_base_t
::
event_connect_delayed
(
const
char
*
addr_
,
int
err_
)
void
zmq
::
socket_base_t
::
event_connect_delayed
(
std
::
string
&
addr_
,
int
err_
)
{
{
if
(
monitor_events
&
ZMQ_EVENT_CONNECT_DELAYED
)
{
if
(
monitor_events
&
ZMQ_EVENT_CONNECT_DELAYED
)
{
zmq_event_t
event
;
zmq_event_t
event
;
event
.
event
=
ZMQ_EVENT_CONNECT_DELAYED
;
event
.
event
=
ZMQ_EVENT_CONNECT_DELAYED
;
event
.
data
.
connect_delayed
.
addr
=
const_cast
<
char
*>
(
addr_
);
event
.
data
.
connect_delayed
.
addr
=
(
char
*
)
malloc
(
addr_
.
size
()
+
1
);
copy_monitor_address
(
event
.
data
.
connect_delayed
.
addr
,
addr_
);
event
.
data
.
connect_delayed
.
err
=
err_
;
event
.
data
.
connect_delayed
.
err
=
err_
;
monitor_event
(
event
);
monitor_event
(
event
);
}
}
}
}
void
zmq
::
socket_base_t
::
event_connect_retried
(
const
char
*
addr_
,
int
interval_
)
void
zmq
::
socket_base_t
::
event_connect_retried
(
std
::
string
&
addr_
,
int
interval_
)
{
{
if
(
monitor_events
&
ZMQ_EVENT_CONNECT_RETRIED
)
{
if
(
monitor_events
&
ZMQ_EVENT_CONNECT_RETRIED
)
{
zmq_event_t
event
;
zmq_event_t
event
;
event
.
event
=
ZMQ_EVENT_CONNECT_RETRIED
;
event
.
event
=
ZMQ_EVENT_CONNECT_RETRIED
;
event
.
data
.
connect_retried
.
addr
=
const_cast
<
char
*>
(
addr_
);
event
.
data
.
connect_retried
.
addr
=
(
char
*
)
malloc
(
addr_
.
size
()
+
1
);
copy_monitor_address
(
event
.
data
.
connect_retried
.
addr
,
addr_
);
event
.
data
.
connect_retried
.
interval
=
interval_
;
event
.
data
.
connect_retried
.
interval
=
interval_
;
monitor_event
(
event
);
monitor_event
(
event
);
}
}
}
}
void
zmq
::
socket_base_t
::
event_listening
(
const
char
*
addr_
,
int
fd_
)
void
zmq
::
socket_base_t
::
event_listening
(
std
::
string
&
addr_
,
int
fd_
)
{
{
if
(
monitor_events
&
ZMQ_EVENT_LISTENING
)
{
if
(
monitor_events
&
ZMQ_EVENT_LISTENING
)
{
zmq_event_t
event
;
zmq_event_t
event
;
event
.
event
=
ZMQ_EVENT_LISTENING
;
event
.
event
=
ZMQ_EVENT_LISTENING
;
event
.
data
.
listening
.
addr
=
const_cast
<
char
*>
(
addr_
);
event
.
data
.
listening
.
addr
=
(
char
*
)
malloc
(
addr_
.
size
()
+
1
);
copy_monitor_address
(
event
.
data
.
listening
.
addr
,
addr_
);
event
.
data
.
listening
.
fd
=
fd_
;
event
.
data
.
listening
.
fd
=
fd_
;
monitor_event
(
event
);
monitor_event
(
event
);
}
}
}
}
void
zmq
::
socket_base_t
::
event_bind_failed
(
const
char
*
addr_
,
int
err_
)
void
zmq
::
socket_base_t
::
event_bind_failed
(
std
::
string
&
addr_
,
int
err_
)
{
{
if
(
monitor_events
&
ZMQ_EVENT_BIND_FAILED
)
{
if
(
monitor_events
&
ZMQ_EVENT_BIND_FAILED
)
{
zmq_event_t
event
;
zmq_event_t
event
;
event
.
event
=
ZMQ_EVENT_BIND_FAILED
;
event
.
event
=
ZMQ_EVENT_BIND_FAILED
;
event
.
data
.
bind_failed
.
addr
=
const_cast
<
char
*>
(
addr_
);
event
.
data
.
bind_failed
.
addr
=
(
char
*
)
malloc
(
addr_
.
size
()
+
1
);
copy_monitor_address
(
event
.
data
.
bind_failed
.
addr
,
addr_
);
event
.
data
.
bind_failed
.
err
=
err_
;
event
.
data
.
bind_failed
.
err
=
err_
;
monitor_event
(
event
);
monitor_event
(
event
);
}
}
}
}
void
zmq
::
socket_base_t
::
event_accepted
(
const
char
*
addr_
,
int
fd_
)
void
zmq
::
socket_base_t
::
event_accepted
(
std
::
string
&
addr_
,
int
fd_
)
{
{
if
(
monitor_events
&
ZMQ_EVENT_ACCEPTED
)
{
if
(
monitor_events
&
ZMQ_EVENT_ACCEPTED
)
{
zmq_event_t
event
;
zmq_event_t
event
;
event
.
event
=
ZMQ_EVENT_ACCEPTED
;
event
.
event
=
ZMQ_EVENT_ACCEPTED
;
event
.
data
.
accepted
.
addr
=
const_cast
<
char
*>
(
addr_
);
event
.
data
.
accepted
.
addr
=
(
char
*
)
malloc
(
addr_
.
size
()
+
1
);
copy_monitor_address
(
event
.
data
.
accepted
.
addr
,
addr_
);
event
.
data
.
accepted
.
fd
=
fd_
;
event
.
data
.
accepted
.
fd
=
fd_
;
monitor_event
(
event
);
monitor_event
(
event
);
}
}
}
}
void
zmq
::
socket_base_t
::
event_accept_failed
(
const
char
*
addr_
,
int
err_
)
void
zmq
::
socket_base_t
::
event_accept_failed
(
std
::
string
&
addr_
,
int
err_
)
{
{
if
(
monitor_events
&
ZMQ_EVENT_ACCEPT_FAILED
)
{
if
(
monitor_events
&
ZMQ_EVENT_ACCEPT_FAILED
)
{
zmq_event_t
event
;
zmq_event_t
event
;
event
.
event
=
ZMQ_EVENT_ACCEPT_FAILED
;
event
.
event
=
ZMQ_EVENT_ACCEPT_FAILED
;
event
.
data
.
accept_failed
.
addr
=
const_cast
<
char
*>
(
addr_
);
event
.
data
.
accept_failed
.
addr
=
(
char
*
)
malloc
(
addr_
.
size
()
+
1
);
copy_monitor_address
(
event
.
data
.
accept_failed
.
addr
,
addr_
);
event
.
data
.
accept_failed
.
err
=
err_
;
event
.
data
.
accept_failed
.
err
=
err_
;
monitor_event
(
event
);
monitor_event
(
event
);
}
}
}
}
void
zmq
::
socket_base_t
::
event_closed
(
const
char
*
addr_
,
int
fd_
)
void
zmq
::
socket_base_t
::
event_closed
(
std
::
string
&
addr_
,
int
fd_
)
{
{
if
(
monitor_events
&
ZMQ_EVENT_CLOSED
)
{
if
(
monitor_events
&
ZMQ_EVENT_CLOSED
)
{
zmq_event_t
event
;
zmq_event_t
event
;
event
.
event
=
ZMQ_EVENT_CLOSED
;
event
.
event
=
ZMQ_EVENT_CLOSED
;
event
.
data
.
closed
.
addr
=
const_cast
<
char
*>
(
addr_
);
event
.
data
.
closed
.
addr
=
(
char
*
)
malloc
(
addr_
.
size
()
+
1
);
copy_monitor_address
(
event
.
data
.
closed
.
addr
,
addr_
);
event
.
data
.
closed
.
fd
=
fd_
;
event
.
data
.
closed
.
fd
=
fd_
;
monitor_event
(
event
);
monitor_event
(
event
);
}
}
}
}
void
zmq
::
socket_base_t
::
event_close_failed
(
const
char
*
addr_
,
int
err_
)
void
zmq
::
socket_base_t
::
event_close_failed
(
std
::
string
&
addr_
,
int
err_
)
{
{
if
(
monitor_events
&
ZMQ_EVENT_CLOSE_FAILED
)
{
if
(
monitor_events
&
ZMQ_EVENT_CLOSE_FAILED
)
{
zmq_event_t
event
;
zmq_event_t
event
;
event
.
event
=
ZMQ_EVENT_CLOSE_FAILED
;
event
.
event
=
ZMQ_EVENT_CLOSE_FAILED
;
event
.
data
.
close_failed
.
addr
=
const_cast
<
char
*>
(
addr_
);
event
.
data
.
close_failed
.
addr
=
(
char
*
)
malloc
(
addr_
.
size
()
+
1
);
copy_monitor_address
(
event
.
data
.
close_failed
.
addr
,
addr_
);
event
.
data
.
close_failed
.
err
=
err_
;
event
.
data
.
close_failed
.
err
=
err_
;
monitor_event
(
event
);
monitor_event
(
event
);
}
}
}
}
void
zmq
::
socket_base_t
::
event_disconnected
(
const
char
*
addr_
,
int
fd_
)
void
zmq
::
socket_base_t
::
event_disconnected
(
std
::
string
&
addr_
,
int
fd_
)
{
{
if
(
monitor_events
&
ZMQ_EVENT_DISCONNECTED
)
{
if
(
monitor_events
&
ZMQ_EVENT_DISCONNECTED
)
{
zmq_event_t
event
;
zmq_event_t
event
;
event
.
event
=
ZMQ_EVENT_DISCONNECTED
;
event
.
event
=
ZMQ_EVENT_DISCONNECTED
;
event
.
data
.
disconnected
.
addr
=
const_cast
<
char
*>
(
addr_
);
event
.
data
.
disconnected
.
addr
=
(
char
*
)
malloc
(
addr_
.
size
()
+
1
);
copy_monitor_address
(
event
.
data
.
disconnected
.
addr
,
addr_
);
event
.
data
.
disconnected
.
fd
=
fd_
;
event
.
data
.
disconnected
.
fd
=
fd_
;
monitor_event
(
event
);
monitor_event
(
event
);
}
}
}
}
void
zmq
::
socket_base_t
::
copy_monitor_address
(
char
*
dest_
,
std
::
string
&
src_
)
{
alloc_assert
(
dest_
);
dest_
[
src_
.
size
()]
=
0
;
memcpy
(
dest_
,
src_
.
c_str
(),
src_
.
size
());
}
void
zmq
::
socket_base_t
::
monitor_event
(
zmq_event_t
event_
)
void
zmq
::
socket_base_t
::
monitor_event
(
zmq_event_t
event_
)
{
{
if
(
monitor_socket
)
{
if
(
monitor_socket
)
{
zmq_msg_t
msg
;
zmq_msg_t
msg
;
zmq_msg_init_size
(
&
msg
,
sizeof
(
event_
));
void
*
event_data
=
malloc
(
sizeof
(
event_
));
memcpy
(
zmq_msg_data
(
&
msg
),
&
event_
,
sizeof
(
event_
));
alloc_assert
(
event_data
);
memcpy
(
event_data
,
&
event_
,
sizeof
(
event_
));
zmq_msg_init_data
(
&
msg
,
event_data
,
sizeof
(
event_
),
zmq_free_event
,
NULL
);
zmq_sendmsg
(
monitor_socket
,
&
msg
,
0
);
zmq_sendmsg
(
monitor_socket
,
&
msg
,
0
);
zmq_msg_close
(
&
msg
);
zmq_msg_close
(
&
msg
);
}
}
...
...
src/socket_base.hpp
View file @
cae5d3b8
...
@@ -38,6 +38,11 @@
...
@@ -38,6 +38,11 @@
#include "clock.hpp"
#include "clock.hpp"
#include "pipe.hpp"
#include "pipe.hpp"
extern
"C"
{
void
zmq_free_event
(
void
*
data
,
void
*
hint
);
}
namespace
zmq
namespace
zmq
{
{
...
@@ -102,18 +107,18 @@ namespace zmq
...
@@ -102,18 +107,18 @@ namespace zmq
void
lock
();
void
lock
();
void
unlock
();
void
unlock
();
int
monitor
(
const
char
*
endpoint_
,
int
events_
);
int
monitor
(
const
char
*
endpoint_
,
int
events_
);
void
event_connected
(
const
char
*
addr_
,
int
fd_
);
void
event_connected
(
std
::
string
&
addr_
,
int
fd_
);
void
event_connect_delayed
(
const
char
*
addr_
,
int
err_
);
void
event_connect_delayed
(
std
::
string
&
addr_
,
int
err_
);
void
event_connect_retried
(
const
char
*
addr_
,
int
interval_
);
void
event_connect_retried
(
std
::
string
&
addr_
,
int
interval_
);
void
event_listening
(
const
char
*
addr_
,
int
fd_
);
void
event_listening
(
std
::
string
&
addr_
,
int
fd_
);
void
event_bind_failed
(
const
char
*
addr_
,
int
err_
);
void
event_bind_failed
(
std
::
string
&
addr_
,
int
err_
);
void
event_accepted
(
const
char
*
addr_
,
int
fd_
);
void
event_accepted
(
std
::
string
&
addr_
,
int
fd_
);
void
event_accept_failed
(
const
char
*
addr_
,
int
err_
);
void
event_accept_failed
(
std
::
string
&
addr_
,
int
err_
);
void
event_closed
(
const
char
*
addr_
,
int
fd_
);
void
event_closed
(
std
::
string
&
addr_
,
int
fd_
);
void
event_close_failed
(
const
char
*
addr_
,
int
fd_
);
void
event_close_failed
(
std
::
string
&
addr_
,
int
fd_
);
void
event_disconnected
(
const
char
*
addr_
,
int
fd_
);
void
event_disconnected
(
std
::
string
&
addr_
,
int
fd_
);
protected
:
protected
:
...
@@ -151,6 +156,9 @@ namespace zmq
...
@@ -151,6 +156,9 @@ namespace zmq
// Socket event data dispath
// Socket event data dispath
void
monitor_event
(
zmq_event_t
data_
);
void
monitor_event
(
zmq_event_t
data_
);
// Copy monitor specific event endpoints to event messages
void
copy_monitor_address
(
char
*
dest_
,
std
::
string
&
src_
);
// Monitor socket cleanup
// Monitor socket cleanup
void
stop_monitor
();
void
stop_monitor
();
...
...
src/stream_engine.cpp
View file @
cae5d3b8
...
@@ -63,6 +63,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
...
@@ -63,6 +63,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
greeting_bytes_read
(
0
),
greeting_bytes_read
(
0
),
session
(
NULL
),
session
(
NULL
),
options
(
options_
),
options
(
options_
),
endpoint
(
endpoint_
),
plugged
(
false
),
plugged
(
false
),
socket
(
NULL
)
socket
(
NULL
)
{
{
...
@@ -95,8 +96,6 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
...
@@ -95,8 +96,6 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
int
rc
=
setsockopt
(
s
,
SOL_SOCKET
,
SO_NOSIGPIPE
,
&
set
,
sizeof
(
int
));
int
rc
=
setsockopt
(
s
,
SOL_SOCKET
,
SO_NOSIGPIPE
,
&
set
,
sizeof
(
int
));
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
#endif
#endif
endpoint
=
new
char
[
endpoint_
.
length
()
+
1
];
strcpy
(
endpoint
,
endpoint_
.
c_str
());
}
}
zmq
::
stream_engine_t
::~
stream_engine_t
()
zmq
::
stream_engine_t
::~
stream_engine_t
()
...
@@ -118,7 +117,6 @@ zmq::stream_engine_t::~stream_engine_t ()
...
@@ -118,7 +117,6 @@ zmq::stream_engine_t::~stream_engine_t ()
delete
encoder
;
delete
encoder
;
if
(
decoder
!=
NULL
)
if
(
decoder
!=
NULL
)
delete
decoder
;
delete
decoder
;
delete
[]
endpoint
;
}
}
void
zmq
::
stream_engine_t
::
plug
(
io_thread_t
*
io_thread_
,
void
zmq
::
stream_engine_t
::
plug
(
io_thread_t
*
io_thread_
,
...
...
src/stream_engine.hpp
View file @
cae5d3b8
...
@@ -132,7 +132,7 @@ namespace zmq
...
@@ -132,7 +132,7 @@ namespace zmq
options_t
options
;
options_t
options
;
// String representation of endpoint
// String representation of endpoint
char
*
endpoint
;
std
::
string
endpoint
;
bool
plugged
;
bool
plugged
;
...
...
src/tcp_connecter.cpp
View file @
cae5d3b8
...
@@ -136,7 +136,7 @@ void zmq::tcp_connecter_t::out_event ()
...
@@ -136,7 +136,7 @@ void zmq::tcp_connecter_t::out_event ()
// Shut the connecter down.
// Shut the connecter down.
terminate
();
terminate
();
socket
->
event_connected
(
endpoint
.
c_str
()
,
fd
);
socket
->
event_connected
(
endpoint
,
fd
);
}
}
void
zmq
::
tcp_connecter_t
::
timer_event
(
int
id_
)
void
zmq
::
tcp_connecter_t
::
timer_event
(
int
id_
)
...
@@ -164,7 +164,7 @@ void zmq::tcp_connecter_t::start_connecting ()
...
@@ -164,7 +164,7 @@ void zmq::tcp_connecter_t::start_connecting ()
handle
=
add_fd
(
s
);
handle
=
add_fd
(
s
);
handle_valid
=
true
;
handle_valid
=
true
;
set_pollout
(
handle
);
set_pollout
(
handle
);
socket
->
event_connect_delayed
(
endpoint
.
c_str
()
,
zmq_errno
());
socket
->
event_connect_delayed
(
endpoint
,
zmq_errno
());
}
}
// Handle any other error condition by eventual reconnect.
// Handle any other error condition by eventual reconnect.
...
@@ -179,7 +179,7 @@ void zmq::tcp_connecter_t::add_reconnect_timer()
...
@@ -179,7 +179,7 @@ void zmq::tcp_connecter_t::add_reconnect_timer()
{
{
int
rc_ivl
=
get_new_reconnect_ivl
();
int
rc_ivl
=
get_new_reconnect_ivl
();
add_timer
(
rc_ivl
,
reconnect_timer_id
);
add_timer
(
rc_ivl
,
reconnect_timer_id
);
socket
->
event_connect_retried
(
endpoint
.
c_str
()
,
rc_ivl
);
socket
->
event_connect_retried
(
endpoint
,
rc_ivl
);
timer_started
=
true
;
timer_started
=
true
;
}
}
...
@@ -305,6 +305,6 @@ void zmq::tcp_connecter_t::close ()
...
@@ -305,6 +305,6 @@ void zmq::tcp_connecter_t::close ()
int
rc
=
::
close
(
s
);
int
rc
=
::
close
(
s
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
#endif
#endif
socket
->
event_closed
(
endpoint
.
c_str
()
,
s
);
socket
->
event_closed
(
endpoint
,
s
);
s
=
retired_fd
;
s
=
retired_fd
;
}
}
src/tcp_listener.cpp
View file @
cae5d3b8
...
@@ -85,7 +85,7 @@ void zmq::tcp_listener_t::in_event ()
...
@@ -85,7 +85,7 @@ void zmq::tcp_listener_t::in_event ()
// If connection was reset by the peer in the meantime, just ignore it.
// If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if
(
fd
==
retired_fd
)
{
if
(
fd
==
retired_fd
)
{
socket
->
event_accept_failed
(
endpoint
.
c_str
()
,
zmq_errno
());
socket
->
event_accept_failed
(
endpoint
,
zmq_errno
());
return
;
return
;
}
}
...
@@ -108,7 +108,7 @@ void zmq::tcp_listener_t::in_event ()
...
@@ -108,7 +108,7 @@ void zmq::tcp_listener_t::in_event ()
session
->
inc_seqnum
();
session
->
inc_seqnum
();
launch_child
(
session
);
launch_child
(
session
);
send_attach
(
session
,
engine
,
false
);
send_attach
(
session
,
engine
,
false
);
socket
->
event_accepted
(
endpoint
.
c_str
()
,
fd
);
socket
->
event_accepted
(
endpoint
,
fd
);
}
}
void
zmq
::
tcp_listener_t
::
close
()
void
zmq
::
tcp_listener_t
::
close
()
...
@@ -121,7 +121,7 @@ void zmq::tcp_listener_t::close ()
...
@@ -121,7 +121,7 @@ void zmq::tcp_listener_t::close ()
int
rc
=
::
close
(
s
);
int
rc
=
::
close
(
s
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
#endif
#endif
socket
->
event_closed
(
endpoint
.
c_str
()
,
s
);
socket
->
event_closed
(
endpoint
,
s
);
s
=
retired_fd
;
s
=
retired_fd
;
}
}
...
@@ -223,7 +223,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
...
@@ -223,7 +223,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
goto
error
;
goto
error
;
#endif
#endif
socket
->
event_listening
(
endpoint
.
c_str
()
,
s
);
socket
->
event_listening
(
endpoint
,
s
);
return
0
;
return
0
;
error
:
error
:
...
...
src/zmq.cpp
View file @
cae5d3b8
...
@@ -987,6 +987,47 @@ int zmq_device (int type, void *frontend_, void *backend_)
...
@@ -987,6 +987,47 @@ int zmq_device (int type, void *frontend_, void *backend_)
(
zmq
::
socket_base_t
*
)
backend_
,
NULL
);
(
zmq
::
socket_base_t
*
)
backend_
,
NULL
);
}
}
// Callback to free socket event data
void
zmq_free_event
(
void
*
event_data
,
void
*
hint
)
{
zmq_event_t
*
event
=
(
zmq_event_t
*
)
event_data
;
switch
(
event
->
event
)
{
case
ZMQ_EVENT_CONNECTED
:
free
(
event
->
data
.
connected
.
addr
);
break
;
case
ZMQ_EVENT_CONNECT_DELAYED
:
free
(
event
->
data
.
connect_delayed
.
addr
);
break
;
case
ZMQ_EVENT_CONNECT_RETRIED
:
free
(
event
->
data
.
connect_retried
.
addr
);
break
;
case
ZMQ_EVENT_LISTENING
:
free
(
event
->
data
.
listening
.
addr
);
break
;
case
ZMQ_EVENT_BIND_FAILED
:
free
(
event
->
data
.
bind_failed
.
addr
);
break
;
case
ZMQ_EVENT_ACCEPTED
:
free
(
event
->
data
.
accepted
.
addr
);
break
;
case
ZMQ_EVENT_ACCEPT_FAILED
:
free
(
event
->
data
.
accept_failed
.
addr
);
break
;
case
ZMQ_EVENT_CLOSED
:
free
(
event
->
data
.
closed
.
addr
);
break
;
case
ZMQ_EVENT_CLOSE_FAILED
:
free
(
event
->
data
.
close_failed
.
addr
);
break
;
case
ZMQ_EVENT_DISCONNECTED
:
free
(
event
->
data
.
disconnected
.
addr
);
break
;
}
free
(
event_data
);
}
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
// 0MQ utils - to be used by perf tests
// 0MQ utils - to be used by perf tests
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment