Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
8aa09086
Commit
8aa09086
authored
Dec 15, 2009
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
all news converted to nothrow variant
parent
2cef05d8
Show whitespace changes
Inline
Side-by-side
Showing
17 changed files
with
85 additions
and
47 deletions
+85
-47
app_thread.cpp
src/app_thread.cpp
+12
-11
dispatcher.cpp
src/dispatcher.cpp
+8
-4
epoll.cpp
src/epoll.cpp
+2
-1
io_thread.cpp
src/io_thread.cpp
+3
-1
kqueue.cpp
src/kqueue.cpp
+2
-1
pgm_receiver.cpp
src/pgm_receiver.cpp
+3
-1
pgm_socket.cpp
src/pgm_socket.cpp
+2
-0
session.cpp
src/session.cpp
+6
-2
socket_base.cpp
src/socket_base.cpp
+21
-12
tcp_listener.cpp
src/tcp_listener.cpp
+0
-1
yqueue.hpp
src/yqueue.hpp
+3
-2
zmq.cpp
src/zmq.cpp
+2
-2
zmq_connecter.cpp
src/zmq_connecter.cpp
+4
-2
zmq_connecter_init.cpp
src/zmq_connecter_init.cpp
+4
-1
zmq_engine.cpp
src/zmq_engine.cpp
+3
-1
zmq_listener.cpp
src/zmq_listener.cpp
+4
-2
zmq_listener_init.cpp
src/zmq_listener_init.cpp
+6
-3
No files found.
src/app_thread.cpp
View file @
8aa09086
...
@@ -17,6 +17,7 @@
...
@@ -17,6 +17,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include <new>
#include <algorithm>
#include <algorithm>
#include "../bindings/c/zmq.h"
#include "../bindings/c/zmq.h"
...
@@ -65,11 +66,11 @@ zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_,
...
@@ -65,11 +66,11 @@ zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_,
last_processing_time
(
0
)
last_processing_time
(
0
)
{
{
if
(
flags_
&
ZMQ_POLL
)
{
if
(
flags_
&
ZMQ_POLL
)
{
signaler
=
new
fd_signaler_t
;
signaler
=
new
(
std
::
nothrow
)
fd_signaler_t
;
zmq_assert
(
signaler
);
zmq_assert
(
signaler
);
}
}
else
{
else
{
signaler
=
new
ypollset_t
;
signaler
=
new
(
std
::
nothrow
)
ypollset_t
;
zmq_assert
(
signaler
);
zmq_assert
(
signaler
);
}
}
}
}
...
@@ -163,31 +164,31 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
...
@@ -163,31 +164,31 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
socket_base_t
*
s
=
NULL
;
socket_base_t
*
s
=
NULL
;
switch
(
type_
)
{
switch
(
type_
)
{
case
ZMQ_P2P
:
case
ZMQ_P2P
:
s
=
new
p2p_t
(
this
);
s
=
new
(
std
::
nothrow
)
p2p_t
(
this
);
break
;
break
;
case
ZMQ_PUB
:
case
ZMQ_PUB
:
s
=
new
pub_t
(
this
);
s
=
new
(
std
::
nothrow
)
pub_t
(
this
);
break
;
break
;
case
ZMQ_SUB
:
case
ZMQ_SUB
:
s
=
new
sub_t
(
this
);
s
=
new
(
std
::
nothrow
)
sub_t
(
this
);
break
;
break
;
case
ZMQ_REQ
:
case
ZMQ_REQ
:
s
=
new
req_t
(
this
);
s
=
new
(
std
::
nothrow
)
req_t
(
this
);
break
;
break
;
case
ZMQ_REP
:
case
ZMQ_REP
:
s
=
new
rep_t
(
this
);
s
=
new
(
std
::
nothrow
)
rep_t
(
this
);
break
;
break
;
case
ZMQ_XREQ
:
case
ZMQ_XREQ
:
s
=
new
xreq_t
(
this
);
s
=
new
(
std
::
nothrow
)
xreq_t
(
this
);
break
;
break
;
case
ZMQ_XREP
:
case
ZMQ_XREP
:
s
=
new
xrep_t
(
this
);
s
=
new
(
std
::
nothrow
)
xrep_t
(
this
);
break
;
break
;
case
ZMQ_UPSTREAM
:
case
ZMQ_UPSTREAM
:
s
=
new
upstream_t
(
this
);
s
=
new
(
std
::
nothrow
)
upstream_t
(
this
);
break
;
break
;
case
ZMQ_DOWNSTREAM
:
case
ZMQ_DOWNSTREAM
:
s
=
new
downstream_t
(
this
);
s
=
new
(
std
::
nothrow
)
downstream_t
(
this
);
break
;
break
;
default
:
default
:
// TODO: This should be EINVAL.
// TODO: This should be EINVAL.
...
...
src/dispatcher.cpp
View file @
8aa09086
...
@@ -17,6 +17,8 @@
...
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include <new>
#include "../bindings/c/zmq.h"
#include "../bindings/c/zmq.h"
#include "dispatcher.hpp"
#include "dispatcher.hpp"
...
@@ -49,7 +51,8 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
...
@@ -49,7 +51,8 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
// Create application thread proxies.
// Create application thread proxies.
for
(
int
i
=
0
;
i
!=
app_threads_
;
i
++
)
{
for
(
int
i
=
0
;
i
!=
app_threads_
;
i
++
)
{
app_thread_t
*
app_thread
=
new
app_thread_t
(
this
,
i
,
flags_
);
app_thread_t
*
app_thread
=
new
(
std
::
nothrow
)
app_thread_t
(
this
,
i
,
flags_
);
zmq_assert
(
app_thread
);
zmq_assert
(
app_thread
);
app_threads
.
push_back
(
app_thread
);
app_threads
.
push_back
(
app_thread
);
signalers
.
push_back
(
app_thread
->
get_signaler
());
signalers
.
push_back
(
app_thread
->
get_signaler
());
...
@@ -57,15 +60,16 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
...
@@ -57,15 +60,16 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
// Create I/O thread objects.
// Create I/O thread objects.
for
(
int
i
=
0
;
i
!=
io_threads_
;
i
++
)
{
for
(
int
i
=
0
;
i
!=
io_threads_
;
i
++
)
{
io_thread_t
*
io_thread
=
new
io_thread_t
(
this
,
i
+
app_threads_
,
io_thread_t
*
io_thread
=
new
(
std
::
nothrow
)
io_thread_t
(
this
,
flags_
);
i
+
app_threads_
,
flags_
);
zmq_assert
(
io_thread
);
zmq_assert
(
io_thread
);
io_threads
.
push_back
(
io_thread
);
io_threads
.
push_back
(
io_thread
);
signalers
.
push_back
(
io_thread
->
get_signaler
());
signalers
.
push_back
(
io_thread
->
get_signaler
());
}
}
// Create command pipe matrix.
// Create command pipe matrix.
command_pipes
=
new
command_pipe_t
[
signalers
.
size
()
*
signalers
.
size
()];
command_pipes
=
new
(
std
::
nothrow
)
command_pipe_t
[
signalers
.
size
()
*
signalers
.
size
()];
zmq_assert
(
command_pipes
);
zmq_assert
(
command_pipes
);
// Launch I/O threads.
// Launch I/O threads.
...
...
src/epoll.cpp
View file @
8aa09086
...
@@ -26,6 +26,7 @@
...
@@ -26,6 +26,7 @@
#include <string.h>
#include <string.h>
#include <unistd.h>
#include <unistd.h>
#include <algorithm>
#include <algorithm>
#include <new>
#include "epoll.hpp"
#include "epoll.hpp"
#include "err.hpp"
#include "err.hpp"
...
@@ -54,7 +55,7 @@ zmq::epoll_t::~epoll_t ()
...
@@ -54,7 +55,7 @@ zmq::epoll_t::~epoll_t ()
zmq
::
epoll_t
::
handle_t
zmq
::
epoll_t
::
add_fd
(
fd_t
fd_
,
i_poll_events
*
events_
)
zmq
::
epoll_t
::
handle_t
zmq
::
epoll_t
::
add_fd
(
fd_t
fd_
,
i_poll_events
*
events_
)
{
{
poll_entry_t
*
pe
=
new
poll_entry_t
;
poll_entry_t
*
pe
=
new
(
std
::
nothrow
)
poll_entry_t
;
zmq_assert
(
pe
!=
NULL
);
zmq_assert
(
pe
!=
NULL
);
// The memset is not actually needed. It's here to prevent debugging
// The memset is not actually needed. It's here to prevent debugging
...
...
src/io_thread.cpp
View file @
8aa09086
...
@@ -17,6 +17,8 @@
...
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include <new>
#include "../bindings/c/zmq.h"
#include "../bindings/c/zmq.h"
#include "io_thread.hpp"
#include "io_thread.hpp"
...
@@ -31,7 +33,7 @@ zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_,
...
@@ -31,7 +33,7 @@ zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_,
int
flags_
)
:
int
flags_
)
:
object_t
(
dispatcher_
,
thread_slot_
)
object_t
(
dispatcher_
,
thread_slot_
)
{
{
poller
=
new
poller_t
;
poller
=
new
(
std
::
nothrow
)
poller_t
;
zmq_assert
(
poller
);
zmq_assert
(
poller
);
signaler_handle
=
poller
->
add_fd
(
signaler
.
get_fd
(),
this
);
signaler_handle
=
poller
->
add_fd
(
signaler
.
get_fd
(),
this
);
...
...
src/kqueue.cpp
View file @
8aa09086
...
@@ -27,6 +27,7 @@
...
@@ -27,6 +27,7 @@
#include <stdlib.h>
#include <stdlib.h>
#include <unistd.h>
#include <unistd.h>
#include <algorithm>
#include <algorithm>
#include <new>
#include "kqueue.hpp"
#include "kqueue.hpp"
#include "err.hpp"
#include "err.hpp"
...
@@ -72,7 +73,7 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_)
...
@@ -72,7 +73,7 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_)
zmq
::
kqueue_t
::
handle_t
zmq
::
kqueue_t
::
add_fd
(
fd_t
fd_
,
zmq
::
kqueue_t
::
handle_t
zmq
::
kqueue_t
::
add_fd
(
fd_t
fd_
,
i_poll_events
*
reactor_
)
i_poll_events
*
reactor_
)
{
{
poll_entry_t
*
pe
=
new
poll_entry_t
;
poll_entry_t
*
pe
=
new
(
std
::
nothrow
)
poll_entry_t
;
zmq_assert
(
pe
!=
NULL
);
zmq_assert
(
pe
!=
NULL
);
pe
->
fd
=
fd_
;
pe
->
fd
=
fd_
;
...
...
src/pgm_receiver.cpp
View file @
8aa09086
...
@@ -21,6 +21,8 @@
...
@@ -21,6 +21,8 @@
#if defined ZMQ_HAVE_OPENPGM
#if defined ZMQ_HAVE_OPENPGM
#include <new>
#ifdef ZMQ_HAVE_WINDOWS
#ifdef ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#include "windows.hpp"
#endif
#endif
...
@@ -171,7 +173,7 @@ void zmq::pgm_receiver_t::in_event ()
...
@@ -171,7 +173,7 @@ void zmq::pgm_receiver_t::in_event ()
it
->
second
.
joined
=
true
;
it
->
second
.
joined
=
true
;
// Create and connect decoder for joined peer.
// Create and connect decoder for joined peer.
it
->
second
.
decoder
=
new
zmq_decoder_t
(
0
,
NULL
,
0
);
it
->
second
.
decoder
=
new
(
std
::
nothrow
)
zmq_decoder_t
(
0
,
NULL
,
0
);
it
->
second
.
decoder
->
set_inout
(
inout
);
it
->
second
.
decoder
->
set_inout
(
inout
);
}
}
...
...
src/pgm_socket.cpp
View file @
8aa09086
...
@@ -86,6 +86,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
...
@@ -86,6 +86,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
// in_batch_size configured in confing.hpp
// in_batch_size configured in confing.hpp
if
(
receiver
)
{
if
(
receiver
)
{
pgm_msgv_len
=
get_max_apdu_at_once
(
in_batch_size
);
pgm_msgv_len
=
get_max_apdu_at_once
(
in_batch_size
);
// TODO: use malloc instead of new
pgm_msgv
=
new
pgm_msgv_t
[
pgm_msgv_len
];
pgm_msgv
=
new
pgm_msgv_t
[
pgm_msgv_len
];
}
}
...
@@ -443,6 +444,7 @@ void *zmq::pgm_socket_t::get_buffer (size_t *size_)
...
@@ -443,6 +444,7 @@ void *zmq::pgm_socket_t::get_buffer (size_t *size_)
*
size_
=
get_max_tsdu_size
();
*
size_
=
get_max_tsdu_size
();
// Allocate buffer.
// Allocate buffer.
// TODO: use malloc instead of new
unsigned
char
*
apdu_buff
=
new
unsigned
char
[
*
size_
];
unsigned
char
*
apdu_buff
=
new
unsigned
char
[
*
size_
];
zmq_assert
(
apdu_buff
);
zmq_assert
(
apdu_buff
);
return
apdu_buff
;
return
apdu_buff
;
...
...
src/session.cpp
View file @
8aa09086
...
@@ -17,6 +17,8 @@
...
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include <new>
#include "session.hpp"
#include "session.hpp"
#include "i_engine.hpp"
#include "i_engine.hpp"
#include "err.hpp"
#include "err.hpp"
...
@@ -157,14 +159,16 @@ void zmq::session_t::process_plug ()
...
@@ -157,14 +159,16 @@ void zmq::session_t::process_plug ()
pipe_t
*
outbound
=
NULL
;
pipe_t
*
outbound
=
NULL
;
if
(
options
.
requires_out
)
{
if
(
options
.
requires_out
)
{
inbound
=
new
pipe_t
(
this
,
owner
,
options
.
hwm
,
options
.
lwm
);
inbound
=
new
(
std
::
nothrow
)
pipe_t
(
this
,
owner
,
options
.
hwm
,
options
.
lwm
);
zmq_assert
(
inbound
);
zmq_assert
(
inbound
);
in_pipe
=
&
inbound
->
reader
;
in_pipe
=
&
inbound
->
reader
;
in_pipe
->
set_endpoint
(
this
);
in_pipe
->
set_endpoint
(
this
);
}
}
if
(
options
.
requires_in
)
{
if
(
options
.
requires_in
)
{
outbound
=
new
pipe_t
(
owner
,
this
,
options
.
hwm
,
options
.
lwm
);
outbound
=
new
(
std
::
nothrow
)
pipe_t
(
owner
,
this
,
options
.
hwm
,
options
.
lwm
);
zmq_assert
(
outbound
);
zmq_assert
(
outbound
);
out_pipe
=
&
outbound
->
writer
;
out_pipe
=
&
outbound
->
writer
;
out_pipe
->
set_endpoint
(
this
);
out_pipe
->
set_endpoint
(
this
);
...
...
src/socket_base.cpp
View file @
8aa09086
...
@@ -17,6 +17,7 @@
...
@@ -17,6 +17,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include <new>
#include <string>
#include <string>
#include <algorithm>
#include <algorithm>
...
@@ -87,8 +88,9 @@ int zmq::socket_base_t::bind (const char *addr_)
...
@@ -87,8 +88,9 @@ int zmq::socket_base_t::bind (const char *addr_)
return
register_endpoint
(
addr_args
.
c_str
(),
this
);
return
register_endpoint
(
addr_args
.
c_str
(),
this
);
if
(
addr_type
==
"tcp"
)
{
if
(
addr_type
==
"tcp"
)
{
zmq_listener_t
*
listener
=
new
zmq_listener_t
(
zmq_listener_t
*
listener
=
new
(
std
::
nothrow
)
zmq_listener_t
(
choose_io_thread
(
options
.
affinity
),
this
,
options
);
choose_io_thread
(
options
.
affinity
),
this
,
options
);
zmq_assert
(
listener
);
int
rc
=
listener
->
set_address
(
addr_args
.
c_str
());
int
rc
=
listener
->
set_address
(
addr_args
.
c_str
());
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
-
1
;
...
@@ -143,13 +145,15 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -143,13 +145,15 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create inbound pipe, if required.
// Create inbound pipe, if required.
if
(
options
.
requires_in
)
{
if
(
options
.
requires_in
)
{
in_pipe
=
new
pipe_t
(
this
,
peer
,
options
.
hwm
,
options
.
lwm
);
in_pipe
=
new
(
std
::
nothrow
)
pipe_t
(
this
,
peer
,
options
.
hwm
,
options
.
lwm
);
zmq_assert
(
in_pipe
);
zmq_assert
(
in_pipe
);
}
}
// Create outbound pipe, if required.
// Create outbound pipe, if required.
if
(
options
.
requires_out
)
{
if
(
options
.
requires_out
)
{
out_pipe
=
new
pipe_t
(
peer
,
this
,
options
.
hwm
,
options
.
lwm
);
out_pipe
=
new
(
std
::
nothrow
)
pipe_t
(
peer
,
this
,
options
.
hwm
,
options
.
lwm
);
zmq_assert
(
out_pipe
);
zmq_assert
(
out_pipe
);
}
}
...
@@ -168,8 +172,8 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -168,8 +172,8 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create the session.
// Create the session.
io_thread_t
*
io_thread
=
choose_io_thread
(
options
.
affinity
);
io_thread_t
*
io_thread
=
choose_io_thread
(
options
.
affinity
);
session_t
*
session
=
new
session_t
(
io_thread
,
this
,
session_name
.
c_str
()
,
session_t
*
session
=
new
(
std
::
nothrow
)
session_t
(
io_thread
,
this
,
options
,
true
);
session_name
.
c_str
(),
options
,
true
);
zmq_assert
(
session
);
zmq_assert
(
session
);
pipe_t
*
in_pipe
=
NULL
;
pipe_t
*
in_pipe
=
NULL
;
...
@@ -177,14 +181,16 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -177,14 +181,16 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create inbound pipe, if required.
// Create inbound pipe, if required.
if
(
options
.
requires_in
)
{
if
(
options
.
requires_in
)
{
in_pipe
=
new
pipe_t
(
this
,
session
,
options
.
hwm
,
options
.
lwm
);
in_pipe
=
new
(
std
::
nothrow
)
pipe_t
(
this
,
session
,
options
.
hwm
,
options
.
lwm
);
zmq_assert
(
in_pipe
);
zmq_assert
(
in_pipe
);
}
}
// Create outbound pipe, if required.
// Create outbound pipe, if required.
if
(
options
.
requires_out
)
{
if
(
options
.
requires_out
)
{
out_pipe
=
new
pipe_t
(
session
,
this
,
options
.
hwm
,
options
.
lwm
);
out_pipe
=
new
(
std
::
nothrow
)
pipe_t
(
session
,
this
,
options
.
hwm
,
options
.
lwm
);
zmq_assert
(
out_pipe
);
zmq_assert
(
out_pipe
);
}
}
...
@@ -205,9 +211,10 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -205,9 +211,10 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create the connecter object. Supply it with the session name
// Create the connecter object. Supply it with the session name
// so that it can bind the new connection to the session once
// so that it can bind the new connection to the session once
// it is established.
// it is established.
zmq_connecter_t
*
connecter
=
new
zmq_connecter_t
(
zmq_connecter_t
*
connecter
=
new
(
std
::
nothrow
)
zmq_connecter_t
(
choose_io_thread
(
options
.
affinity
),
this
,
options
,
choose_io_thread
(
options
.
affinity
),
this
,
options
,
session_name
.
c_str
(),
false
);
session_name
.
c_str
(),
false
);
zmq_assert
(
connecter
);
int
rc
=
connecter
->
set_address
(
addr_args
.
c_str
());
int
rc
=
connecter
->
set_address
(
addr_args
.
c_str
());
if
(
rc
!=
0
)
{
if
(
rc
!=
0
)
{
delete
connecter
;
delete
connecter
;
...
@@ -237,9 +244,10 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -237,9 +244,10 @@ int zmq::socket_base_t::connect (const char *addr_)
if
(
options
.
requires_out
)
{
if
(
options
.
requires_out
)
{
// PGM sender.
// PGM sender.
pgm_sender_t
*
pgm_sender
=
pgm_sender_t
*
pgm_sender
=
new
(
std
::
nothrow
)
pgm_sender_t
(
new
pgm_sender_t
(
choose_io_thread
(
options
.
affinity
),
options
,
choose_io_thread
(
options
.
affinity
),
options
,
session_name
.
c_str
());
session_name
.
c_str
());
zmq_assert
(
pgm_sender
);
int
rc
=
pgm_sender
->
init
(
udp_encapsulation
,
addr_args
.
c_str
());
int
rc
=
pgm_sender
->
init
(
udp_encapsulation
,
addr_args
.
c_str
());
if
(
rc
!=
0
)
{
if
(
rc
!=
0
)
{
...
@@ -252,9 +260,10 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -252,9 +260,10 @@ int zmq::socket_base_t::connect (const char *addr_)
else
if
(
options
.
requires_in
)
{
else
if
(
options
.
requires_in
)
{
// PGM receiver.
// PGM receiver.
pgm_receiver_t
*
pgm_receiver
=
pgm_receiver_t
*
pgm_receiver
=
new
(
std
::
nothrow
)
pgm_receiver_t
(
new
pgm_receiver_t
(
choose_io_thread
(
options
.
affinity
),
options
,
choose_io_thread
(
options
.
affinity
),
options
,
session_name
.
c_str
());
session_name
.
c_str
());
zmq_assert
(
pgm_receiver
);
int
rc
=
pgm_receiver
->
init
(
udp_encapsulation
,
addr_args
.
c_str
());
int
rc
=
pgm_receiver
->
init
(
udp_encapsulation
,
addr_args
.
c_str
());
if
(
rc
!=
0
)
{
if
(
rc
!=
0
)
{
...
...
src/tcp_listener.cpp
View file @
8aa09086
...
@@ -66,7 +66,6 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
...
@@ -66,7 +66,6 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
// Bind the socket to the network interface and port.
// Bind the socket to the network interface and port.
rc
=
bind
(
s
,
(
struct
sockaddr
*
)
&
addr
,
sizeof
(
addr
));
rc
=
bind
(
s
,
(
struct
sockaddr
*
)
&
addr
,
sizeof
(
addr
));
// TODO: Convert error code to errno.
if
(
rc
==
SOCKET_ERROR
)
{
if
(
rc
==
SOCKET_ERROR
)
{
wsa_error_to_errno
();
wsa_error_to_errno
();
return
-
1
;
return
-
1
;
...
...
src/yqueue.hpp
View file @
8aa09086
...
@@ -20,6 +20,7 @@
...
@@ -20,6 +20,7 @@
#ifndef __ZMQ_YQUEUE_HPP_INCLUDED__
#ifndef __ZMQ_YQUEUE_HPP_INCLUDED__
#define __ZMQ_YQUEUE_HPP_INCLUDED__
#define __ZMQ_YQUEUE_HPP_INCLUDED__
#include <new>
#include <stddef.h>
#include <stddef.h>
#include "err.hpp"
#include "err.hpp"
...
@@ -47,7 +48,7 @@ namespace zmq
...
@@ -47,7 +48,7 @@ namespace zmq
// Create the queue.
// Create the queue.
inline
yqueue_t
()
inline
yqueue_t
()
{
{
begin_chunk
=
new
chunk_t
;
begin_chunk
=
new
(
std
::
nothrow
)
chunk_t
;
zmq_assert
(
begin_chunk
);
zmq_assert
(
begin_chunk
);
begin_pos
=
0
;
begin_pos
=
0
;
back_chunk
=
NULL
;
back_chunk
=
NULL
;
...
@@ -93,7 +94,7 @@ namespace zmq
...
@@ -93,7 +94,7 @@ namespace zmq
if
(
++
end_pos
!=
N
)
if
(
++
end_pos
!=
N
)
return
;
return
;
end_chunk
->
next
=
new
chunk_t
;
end_chunk
->
next
=
new
(
std
::
nothrow
)
chunk_t
;
zmq_assert
(
end_chunk
->
next
);
zmq_assert
(
end_chunk
->
next
);
end_chunk
=
end_chunk
->
next
;
end_chunk
=
end_chunk
->
next
;
end_pos
=
0
;
end_pos
=
0
;
...
...
src/zmq.cpp
View file @
8aa09086
...
@@ -208,8 +208,8 @@ void *zmq_init (int app_threads_, int io_threads_, int flags_)
...
@@ -208,8 +208,8 @@ void *zmq_init (int app_threads_, int io_threads_, int flags_)
return
NULL
;
return
NULL
;
}
}
zmq
::
dispatcher_t
*
dispatcher
=
new
zmq
::
dispatcher_t
(
app_threads_
,
zmq
::
dispatcher_t
*
dispatcher
=
new
(
std
::
nothrow
)
zmq
::
dispatcher_t
(
io_threads_
,
flags_
);
app_threads_
,
io_threads_
,
flags_
);
zmq_assert
(
dispatcher
);
zmq_assert
(
dispatcher
);
return
(
void
*
)
dispatcher
;
return
(
void
*
)
dispatcher
;
}
}
...
...
src/zmq_connecter.cpp
View file @
8aa09086
...
@@ -17,6 +17,8 @@
...
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include <new>
#include "zmq_connecter.hpp"
#include "zmq_connecter.hpp"
#include "zmq_connecter_init.hpp"
#include "zmq_connecter_init.hpp"
#include "io_thread.hpp"
#include "io_thread.hpp"
...
@@ -87,8 +89,8 @@ void zmq::zmq_connecter_t::out_event ()
...
@@ -87,8 +89,8 @@ void zmq::zmq_connecter_t::out_event ()
// Create an init object.
// Create an init object.
io_thread_t
*
io_thread
=
choose_io_thread
(
options
.
affinity
);
io_thread_t
*
io_thread
=
choose_io_thread
(
options
.
affinity
);
zmq_connecter_init_t
*
init
=
new
zmq_connecter_init_t
(
io_thread
,
owner
,
zmq_connecter_init_t
*
init
=
new
(
std
::
nothrow
)
zmq_connecter_init_t
(
fd
,
options
,
session_name
.
c_str
(),
address
.
c_str
());
io_thread
,
owner
,
fd
,
options
,
session_name
.
c_str
(),
address
.
c_str
());
zmq_assert
(
init
);
zmq_assert
(
init
);
send_plug
(
init
);
send_plug
(
init
);
send_own
(
owner
,
init
);
send_own
(
owner
,
init
);
...
...
src/zmq_connecter_init.cpp
View file @
8aa09086
...
@@ -17,6 +17,8 @@
...
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include <new>
#include "zmq_connecter_init.hpp"
#include "zmq_connecter_init.hpp"
#include "zmq_connecter.hpp"
#include "zmq_connecter.hpp"
#include "io_thread.hpp"
#include "io_thread.hpp"
...
@@ -31,7 +33,8 @@ zmq::zmq_connecter_init_t::zmq_connecter_init_t (io_thread_t *parent_,
...
@@ -31,7 +33,8 @@ zmq::zmq_connecter_init_t::zmq_connecter_init_t (io_thread_t *parent_,
session_name
(
session_name_
)
session_name
(
session_name_
)
{
{
// Create associated engine object.
// Create associated engine object.
engine
=
new
zmq_engine_t
(
parent_
,
fd_
,
options
,
true
,
address_
);
engine
=
new
(
std
::
nothrow
)
zmq_engine_t
(
parent_
,
fd_
,
options
,
true
,
address_
);
zmq_assert
(
engine
);
zmq_assert
(
engine
);
}
}
...
...
src/zmq_engine.cpp
View file @
8aa09086
...
@@ -17,6 +17,8 @@
...
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include <new>
#include "zmq_engine.hpp"
#include "zmq_engine.hpp"
#include "zmq_connecter.hpp"
#include "zmq_connecter.hpp"
#include "io_thread.hpp"
#include "io_thread.hpp"
...
@@ -155,7 +157,7 @@ void zmq::zmq_engine_t::error ()
...
@@ -155,7 +157,7 @@ void zmq::zmq_engine_t::error ()
// Create a connecter object to attempt reconnect.
// Create a connecter object to attempt reconnect.
// Ask it to wait for a while before reconnecting.
// Ask it to wait for a while before reconnecting.
reconnecter
=
new
zmq_connecter_t
(
reconnecter
=
new
(
std
::
nothrow
)
zmq_connecter_t
(
inout
->
get_io_thread
(),
inout
->
get_owner
(),
inout
->
get_io_thread
(),
inout
->
get_owner
(),
options
,
inout
->
get_session_name
(),
true
);
options
,
inout
->
get_session_name
(),
true
);
zmq_assert
(
reconnecter
);
zmq_assert
(
reconnecter
);
...
...
src/zmq_listener.cpp
View file @
8aa09086
...
@@ -17,6 +17,8 @@
...
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include <new>
#include "zmq_listener.hpp"
#include "zmq_listener.hpp"
#include "zmq_listener_init.hpp"
#include "zmq_listener_init.hpp"
#include "io_thread.hpp"
#include "io_thread.hpp"
...
@@ -62,8 +64,8 @@ void zmq::zmq_listener_t::in_event ()
...
@@ -62,8 +64,8 @@ void zmq::zmq_listener_t::in_event ()
// Create an init object.
// Create an init object.
io_thread_t
*
io_thread
=
choose_io_thread
(
options
.
affinity
);
io_thread_t
*
io_thread
=
choose_io_thread
(
options
.
affinity
);
zmq_listener_init_t
*
init
=
new
zmq_listener_init_t
(
io_thread
,
owner
,
zmq_listener_init_t
*
init
=
new
(
std
::
nothrow
)
zmq_listener_init_t
(
fd
,
options
);
io_thread
,
owner
,
fd
,
options
);
zmq_assert
(
init
);
zmq_assert
(
init
);
send_plug
(
init
);
send_plug
(
init
);
send_own
(
owner
,
init
);
send_own
(
owner
,
init
);
...
...
src/zmq_listener_init.cpp
View file @
8aa09086
...
@@ -17,6 +17,8 @@
...
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include <new>
#include "zmq_listener_init.hpp"
#include "zmq_listener_init.hpp"
#include "io_thread.hpp"
#include "io_thread.hpp"
#include "session.hpp"
#include "session.hpp"
...
@@ -29,7 +31,8 @@ zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_,
...
@@ -29,7 +31,8 @@ zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_,
has_peer_identity
(
false
)
has_peer_identity
(
false
)
{
{
// Create associated engine object.
// Create associated engine object.
engine
=
new
zmq_engine_t
(
parent_
,
fd_
,
options
,
false
,
NULL
);
engine
=
new
(
std
::
nothrow
)
zmq_engine_t
(
parent_
,
fd_
,
options
,
false
,
NULL
);
zmq_assert
(
engine
);
zmq_assert
(
engine
);
}
}
...
@@ -74,8 +77,8 @@ void zmq::zmq_listener_init_t::flush ()
...
@@ -74,8 +77,8 @@ void zmq::zmq_listener_init_t::flush ()
session
=
owner
->
find_session
(
peer_identity
.
c_str
());
session
=
owner
->
find_session
(
peer_identity
.
c_str
());
if
(
!
session
)
{
if
(
!
session
)
{
io_thread_t
*
io_thread
=
choose_io_thread
(
options
.
affinity
);
io_thread_t
*
io_thread
=
choose_io_thread
(
options
.
affinity
);
session
=
new
session_t
(
io_thread
,
owner
,
peer_identity
.
c_str
()
,
session
=
new
(
std
::
nothrow
)
session_t
(
io_thread
,
owner
,
options
,
false
);
peer_identity
.
c_str
(),
options
,
false
);
zmq_assert
(
session
);
zmq_assert
(
session
);
send_plug
(
session
);
send_plug
(
session
);
send_own
(
owner
,
session
);
send_own
(
owner
,
session
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment