Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
4307baf7
Commit
4307baf7
authored
Sep 04, 2009
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
python binding functional
parent
450b31c3
Show whitespace changes
Inline
Side-by-side
Showing
19 changed files
with
130 additions
and
48 deletions
+130
-48
local_lat.c
perf/c/local_lat.c
+7
-4
local_thr.c
perf/c/local_thr.c
+6
-4
remote_lat.c
perf/c/remote_lat.c
+7
-4
remote_thr.c
perf/c/remote_thr.c
+7
-4
local_lat.cpp
perf/cpp/local_lat.cpp
+4
-4
local_thr.cpp
perf/cpp/local_thr.cpp
+4
-4
remote_lat.cpp
perf/cpp/remote_lat.cpp
+4
-4
remote_thr.cpp
perf/cpp/remote_thr.cpp
+4
-4
local_lat.py
perf/python/local_lat.py
+3
-3
remote_lat.py
perf/python/remote_lat.py
+2
-2
remote_thr.py
perf/python/remote_thr.py
+1
-1
app_thread.cpp
src/app_thread.cpp
+1
-3
dispatcher.cpp
src/dispatcher.cpp
+35
-1
dispatcher.hpp
src/dispatcher.hpp
+21
-2
object.cpp
src/object.cpp
+5
-0
object.hpp
src/object.hpp
+1
-0
session.cpp
src/session.cpp
+6
-1
socket_base.cpp
src/socket_base.cpp
+11
-1
zmq.cpp
src/zmq.cpp
+1
-2
No files found.
perf/c/local_lat.c
View file @
4307baf7
...
...
@@ -35,13 +35,13 @@ int main (int argc, char *argv [])
struct
zmq_msg_t
msg
;
if
(
argc
!=
4
)
{
printf
(
"usage: local_lat <bind-to> <
roundtrip-count
> "
"<
message-size
>
\n
"
);
printf
(
"usage: local_lat <bind-to> <
message-size
> "
"<
roundtrip-count
>
\n
"
);
return
1
;
}
bind_to
=
argv
[
1
];
roundtrip_count
=
atoi
(
argv
[
2
]);
message_size
=
atoi
(
argv
[
3
]);
message_size
=
atoi
(
argv
[
2
]);
roundtrip_count
=
atoi
(
argv
[
3
]);
ctx
=
zmq_init
(
1
,
1
);
assert
(
ctx
);
...
...
@@ -68,6 +68,9 @@ int main (int argc, char *argv [])
sleep
(
1
);
rc
=
zmq_close
(
s
);
assert
(
rc
==
0
);
rc
=
zmq_term
(
ctx
);
assert
(
rc
==
0
);
...
...
perf/c/local_thr.c
View file @
4307baf7
...
...
@@ -41,13 +41,12 @@ int main (int argc, char *argv [])
double
megabits
;
if
(
argc
!=
4
)
{
printf
(
"usage: local_thr <bind-to> <message-count> "
"<message-size>
\n
"
);
printf
(
"usage: local_thr <bind-to> <message-size> <message-count>
\n
"
);
return
1
;
}
bind_to
=
argv
[
1
];
message_
count
=
atoi
(
argv
[
2
]);
message_
size
=
atoi
(
argv
[
3
]);
message_
size
=
atoi
(
argv
[
2
]);
message_
count
=
atoi
(
argv
[
3
]);
ctx
=
zmq_init
(
1
,
1
);
assert
(
ctx
);
...
...
@@ -92,6 +91,9 @@ int main (int argc, char *argv [])
printf
(
"mean throughput: %d [msg/s]
\n
"
,
(
int
)
throughput
);
printf
(
"mean throughput: %.3f [Mb/s]
\n
"
,
(
double
)
megabits
);
rc
=
zmq_close
(
s
);
assert
(
rc
==
0
);
rc
=
zmq_term
(
ctx
);
assert
(
rc
==
0
);
...
...
perf/c/remote_lat.c
View file @
4307baf7
...
...
@@ -39,13 +39,13 @@ int main (int argc, char *argv [])
double
latency
;
if
(
argc
!=
4
)
{
printf
(
"usage: remote_lat <connect-to> <
roundtrip-count
> "
"<
message-size
>
\n
"
);
printf
(
"usage: remote_lat <connect-to> <
message-size
> "
"<
roundtrip-count
>
\n
"
);
return
1
;
}
connect_to
=
argv
[
1
];
roundtrip_count
=
atoi
(
argv
[
2
]);
message_size
=
atoi
(
argv
[
3
]);
message_size
=
atoi
(
argv
[
2
]);
roundtrip_count
=
atoi
(
argv
[
3
]);
ctx
=
zmq_init
(
1
,
1
);
assert
(
ctx
);
...
...
@@ -87,6 +87,9 @@ int main (int argc, char *argv [])
printf
(
"roundtrip count: %d
\n
"
,
(
int
)
roundtrip_count
);
printf
(
"average latency: %.3f [us]
\n
"
,
(
double
)
latency
);
rc
=
zmq_close
(
s
);
assert
(
rc
==
0
);
rc
=
zmq_term
(
ctx
);
assert
(
rc
==
0
);
...
...
perf/c/remote_thr.c
View file @
4307baf7
...
...
@@ -35,13 +35,13 @@ int main (int argc, char *argv [])
struct
zmq_msg_t
msg
;
if
(
argc
!=
4
)
{
printf
(
"usage: remote_thr <connect-to> <message-
count
> "
"<message-
size
>
\n
"
);
printf
(
"usage: remote_thr <connect-to> <message-
size
> "
"<message-
count
>
\n
"
);
return
1
;
}
connect_to
=
argv
[
1
];
message_
count
=
atoi
(
argv
[
2
]);
message_
size
=
atoi
(
argv
[
3
]);
message_
size
=
atoi
(
argv
[
2
]);
message_
count
=
atoi
(
argv
[
3
]);
ctx
=
zmq_init
(
1
,
1
);
assert
(
ctx
);
...
...
@@ -63,6 +63,9 @@ int main (int argc, char *argv [])
sleep
(
10
);
rc
=
zmq_close
(
s
);
assert
(
rc
==
0
);
rc
=
zmq_term
(
ctx
);
assert
(
rc
==
0
);
...
...
perf/cpp/local_lat.cpp
View file @
4307baf7
...
...
@@ -27,13 +27,13 @@
int
main
(
int
argc
,
char
*
argv
[])
{
if
(
argc
!=
4
)
{
printf
(
"usage: local_lat <bind-to> <
roundtrip-count
> "
"<
message-size
>
\n
"
);
printf
(
"usage: local_lat <bind-to> <
message-size
> "
"<
roundtrip-count
>
\n
"
);
return
1
;
}
const
char
*
bind_to
=
argv
[
1
];
int
roundtrip_count
=
atoi
(
argv
[
2
]);
size_t
message_size
=
(
size_t
)
atoi
(
argv
[
3
]);
size_t
message_size
=
(
size_t
)
atoi
(
argv
[
2
]);
int
roundtrip_count
=
atoi
(
argv
[
3
]);
zmq
::
context_t
ctx
(
1
,
1
);
...
...
perf/cpp/local_thr.cpp
View file @
4307baf7
...
...
@@ -28,13 +28,13 @@
int
main
(
int
argc
,
char
*
argv
[])
{
if
(
argc
!=
4
)
{
printf
(
"usage: local_thr <bind-to> <message-
count
> "
"<message-
size
>
\n
"
);
printf
(
"usage: local_thr <bind-to> <message-
size
> "
"<message-
count
>
\n
"
);
return
1
;
}
const
char
*
bind_to
=
argv
[
1
];
int
message_count
=
atoi
(
argv
[
2
]);
size_t
message_size
=
(
size_t
)
atoi
(
argv
[
3
]);
size_t
message_size
=
(
size_t
)
atoi
(
argv
[
2
]);
int
message_count
=
atoi
(
argv
[
3
]);
zmq
::
context_t
ctx
(
1
,
1
);
...
...
perf/cpp/remote_lat.cpp
View file @
4307baf7
...
...
@@ -27,13 +27,13 @@
int
main
(
int
argc
,
char
*
argv
[])
{
if
(
argc
!=
4
)
{
printf
(
"usage: remote_lat <connect-to> <
roundtrip-count
> "
"<
message-size
>
\n
"
);
printf
(
"usage: remote_lat <connect-to> <
message-size
> "
"<
roundtrip-count
>
\n
"
);
return
1
;
}
const
char
*
connect_to
=
argv
[
1
];
int
roundtrip_count
=
atoi
(
argv
[
2
]);
size_t
message_size
=
(
size_t
)
atoi
(
argv
[
3
]);
size_t
message_size
=
(
size_t
)
atoi
(
argv
[
2
]);
int
roundtrip_count
=
atoi
(
argv
[
3
]);
zmq
::
context_t
ctx
(
1
,
1
);
...
...
perf/cpp/remote_thr.cpp
View file @
4307baf7
...
...
@@ -27,13 +27,13 @@
int
main
(
int
argc
,
char
*
argv
[])
{
if
(
argc
!=
4
)
{
printf
(
"usage: remote_thr <connect-to> <message-
count
> "
"<message-
size
>
\n
"
);
printf
(
"usage: remote_thr <connect-to> <message-
size
> "
"<message-
count
>
\n
"
);
return
1
;
}
const
char
*
connect_to
=
argv
[
1
];
int
message_count
=
atoi
(
argv
[
2
]);
size_t
message_size
=
(
size_t
)
atoi
(
argv
[
3
]);
size_t
message_size
=
(
size_t
)
atoi
(
argv
[
2
]);
int
message_count
=
atoi
(
argv
[
3
]);
zmq
::
context_t
ctx
(
1
,
1
);
...
...
perf/python/local_lat.py
View file @
4307baf7
...
...
@@ -23,13 +23,13 @@ import libpyzmq
def
main
():
if
len
(
sys
.
argv
)
!=
4
:
print
'usage: local_lat <bind-to> <
roundtrip-count> <message-size
>'
print
'usage: local_lat <bind-to> <
message-size> <roundtrip-count
>'
sys
.
exit
(
1
)
try
:
bind_to
=
sys
.
argv
[
1
]
roundtrip_count
=
int
(
sys
.
argv
[
2
])
message_size
=
int
(
sys
.
argv
[
3
])
message_size
=
int
(
sys
.
argv
[
2
])
roundtrip_count
=
int
(
sys
.
argv
[
3
])
except
(
ValueError
,
OverflowError
),
e
:
print
'message-size and roundtrip-count must be integers'
sys
.
exit
(
1
)
...
...
perf/python/remote_lat.py
View file @
4307baf7
...
...
@@ -23,7 +23,7 @@ import libpyzmq
def
main
():
if
len
(
sys
.
argv
)
!=
4
:
print
'usage: remote_lat <connect-to> <
roundtrip-count> <message-size
>'
print
'usage: remote_lat <connect-to> <
message-size> <roundtrip-count
>'
sys
.
exit
(
1
)
try
:
...
...
@@ -49,7 +49,7 @@ def main ():
end
=
datetime
.
now
()
delta
=
(
end
-
start
)
.
microseconds
+
1000000
*
(
end
-
start
)
.
seconds
latency
=
delta
/
roundtrip_count
/
2
latency
=
float
(
delta
)
/
roundtrip_count
/
2
print
"message size:
%.0
f [B]"
%
(
message_size
,
)
print
"roundtrip count:
%.0
f"
%
(
roundtrip_count
,
)
...
...
perf/python/remote_thr.py
View file @
4307baf7
...
...
@@ -27,7 +27,7 @@ def main ():
sys
.
exit
(
1
)
try
:
connect_to
=
argv
[
1
]
connect_to
=
sys
.
argv
[
1
]
message_size
=
int
(
sys
.
argv
[
2
])
message_count
=
int
(
sys
.
argv
[
3
])
except
(
ValueError
,
OverflowError
),
e
:
...
...
src/app_thread.cpp
View file @
4307baf7
...
...
@@ -51,9 +51,7 @@ zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
zmq
::
app_thread_t
::~
app_thread_t
()
{
// Destroy all the sockets owned by this application thread.
for
(
sockets_t
::
iterator
it
=
sockets
.
begin
();
it
!=
sockets
.
end
();
it
++
)
delete
*
it
;
zmq_assert
(
sockets
.
empty
());
}
zmq
::
i_signaler
*
zmq
::
app_thread_t
::
get_signaler
()
...
...
src/dispatcher.cpp
View file @
4307baf7
...
...
@@ -30,7 +30,9 @@
#include "windows.h"
#endif
zmq
::
dispatcher_t
::
dispatcher_t
(
int
app_threads_
,
int
io_threads_
)
zmq
::
dispatcher_t
::
dispatcher_t
(
int
app_threads_
,
int
io_threads_
)
:
sockets
(
0
),
terminated
(
false
)
{
#ifdef ZMQ_HAVE_WINDOWS
// Intialise Windows sockets. Note that WSAStartup can be called multiple
...
...
@@ -68,6 +70,20 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_)
io_threads
[
i
]
->
start
();
}
int
zmq
::
dispatcher_t
::
term
()
{
term_sync
.
lock
();
zmq_assert
(
!
terminated
);
terminated
=
true
;
bool
destroy
=
(
sockets
==
0
);
term_sync
.
unlock
();
if
(
destroy
)
delete
this
;
return
0
;
}
zmq
::
dispatcher_t
::~
dispatcher_t
()
{
// Close all application theads, sockets, io_objects etc.
...
...
@@ -111,9 +127,27 @@ zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
}
threads_sync
.
unlock
();
term_sync
.
lock
();
sockets
++
;
term_sync
.
unlock
();
return
thread
->
create_socket
(
type_
);
}
void
zmq
::
dispatcher_t
::
destroy_socket
()
{
// If zmq_term was already called and there are no more sockets,
// terminate the whole 0MQ infrastructure.
term_sync
.
lock
();
zmq_assert
(
sockets
>
0
);
sockets
--
;
bool
destroy
=
(
sockets
==
0
&&
terminated
);
term_sync
.
unlock
();
if
(
destroy
)
delete
this
;
}
zmq
::
app_thread_t
*
zmq
::
dispatcher_t
::
choose_app_thread
()
{
// Check whether thread ID is already assigned. If so, return it.
...
...
src/dispatcher.hpp
View file @
4307baf7
...
...
@@ -52,12 +52,18 @@ namespace zmq
// signalers.
dispatcher_t
(
int
app_threads_
,
int
io_threads_
);
// To be called to terminate the whole infrastructure (zmq_term).
~
dispatcher_t
();
// This function is called when user invokes zmq_term. If there are
// no more sockets open it'll cause all the infrastructure to be shut
// down. If there are open sockets still, the deallocation happens
// after the last one is closed.
int
term
();
// Create a socket.
class
socket_base_t
*
create_socket
(
int
type_
);
// Destroy a socket.
void
destroy_socket
();
// Returns number of thread slots in the dispatcher. To be used by
// individual threads to find out how many distinct signals can be
// received.
...
...
@@ -93,6 +99,8 @@ namespace zmq
private
:
~
dispatcher_t
();
// Returns the app thread associated with the current thread.
// NULL if we are out of app thread slots.
class
app_thread_t
*
choose_app_thread
();
...
...
@@ -130,6 +138,17 @@ namespace zmq
// Synchronisation of access to the pipes repository.
mutex_t
pipes_sync
;
// Number of sockets alive.
int
sockets
;
// If true, zmq_term was already called. When last socket is closed
// the whole 0MQ infrastructure should be deallocated.
bool
terminated
;
// Synchronisation of access to the termination data (socket count
// and 'terminated' flag).
mutex_t
term_sync
;
dispatcher_t
(
const
dispatcher_t
&
);
void
operator
=
(
const
dispatcher_t
&
);
};
...
...
src/object.cpp
View file @
4307baf7
...
...
@@ -53,6 +53,11 @@ int zmq::object_t::get_thread_slot ()
return
thread_slot
;
}
zmq
::
dispatcher_t
*
zmq
::
object_t
::
get_dispatcher
()
{
return
dispatcher
;
}
void
zmq
::
object_t
::
process_command
(
command_t
&
cmd_
)
{
switch
(
cmd_
.
type
)
{
...
...
src/object.hpp
View file @
4307baf7
...
...
@@ -40,6 +40,7 @@ namespace zmq
~
object_t
();
int
get_thread_slot
();
dispatcher_t
*
get_dispatcher
();
void
process_command
(
struct
command_t
&
cmd_
);
// Allow pipe to access corresponding dispatcher functions.
...
...
src/session.cpp
View file @
4307baf7
...
...
@@ -54,7 +54,12 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)
bool
zmq
::
session_t
::
write
(
::
zmq_msg_t
*
msg_
)
{
return
out_pipe
->
write
(
msg_
);
if
(
out_pipe
->
write
(
msg_
))
{
zmq_msg_init
(
msg_
);
return
true
;
}
return
false
;
}
void
zmq
::
session_t
::
flush
()
...
...
src/socket_base.cpp
View file @
4307baf7
...
...
@@ -24,7 +24,7 @@
#include "socket_base.hpp"
#include "app_thread.hpp"
#include "
er
r.hpp"
#include "
dispatche
r.hpp"
#include "zmq_listener.hpp"
#include "zmq_connecter.hpp"
#include "msg_content.hpp"
...
...
@@ -34,6 +34,7 @@
#include "owned.hpp"
#include "uuid.hpp"
#include "pipe.hpp"
#include "err.hpp"
zmq
::
socket_base_t
::
socket_base_t
(
app_thread_t
*
parent_
)
:
object_t
(
parent_
),
...
...
@@ -288,7 +289,16 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
int
zmq
::
socket_base_t
::
close
()
{
app_thread
->
remove_socket
(
this
);
// Pointer to the dispatcher must be retrieved before the socket is
// deallocated. Afterwards it is not available.
dispatcher_t
*
dispatcher
=
get_dispatcher
();
delete
this
;
// This function must be called after the socket is completely deallocated
// as it may cause termination of the whole 0MQ infrastructure.
dispatcher
->
destroy_socket
();
return
0
;
}
...
...
src/zmq.cpp
View file @
4307baf7
...
...
@@ -183,8 +183,7 @@ void *zmq_init (int app_threads_, int io_threads_)
int
zmq_term
(
void
*
dispatcher_
)
{
delete
(
zmq
::
dispatcher_t
*
)
dispatcher_
;
return
0
;
return
((
zmq
::
dispatcher_t
*
)
dispatcher_
)
->
term
();
}
void
*
zmq_socket
(
void
*
dispatcher_
,
int
type_
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment