Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
0e9ab2e8
Commit
0e9ab2e8
authored
Nov 21, 2009
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
inproc transport - initial commit
parent
14f2fecd
Show whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
165 additions
and
4 deletions
+165
-4
dispatcher.cpp
src/dispatcher.cpp
+56
-0
dispatcher.hpp
src/dispatcher.hpp
+12
-0
object.cpp
src/object.cpp
+15
-0
object.hpp
src/object.hpp
+6
-0
socket_base.cpp
src/socket_base.cpp
+60
-2
socket_base.hpp
src/socket_base.hpp
+12
-0
zmq.cpp
src/zmq.cpp
+4
-2
No files found.
src/dispatcher.cpp
View file @
0e9ab2e8
...
@@ -20,6 +20,7 @@
...
@@ -20,6 +20,7 @@
#include "../bindings/c/zmq.h"
#include "../bindings/c/zmq.h"
#include "dispatcher.hpp"
#include "dispatcher.hpp"
#include "socket_base.hpp"
#include "app_thread.hpp"
#include "app_thread.hpp"
#include "io_thread.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
#include "platform.hpp"
...
@@ -202,3 +203,58 @@ void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_)
...
@@ -202,3 +203,58 @@ void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_)
zmq_assert
(
erased
==
1
);
zmq_assert
(
erased
==
1
);
pipes_sync
.
unlock
();
pipes_sync
.
unlock
();
}
}
int
zmq
::
dispatcher_t
::
register_endpoint
(
const
char
*
addr_
,
socket_base_t
*
socket_
)
{
endpoints_sync
.
lock
();
bool
inserted
=
endpoints
.
insert
(
std
::
make_pair
(
addr_
,
socket_
)).
second
;
if
(
!
inserted
)
{
errno
=
EADDRINUSE
;
endpoints_sync
.
unlock
();
return
-
1
;
}
endpoints_sync
.
unlock
();
return
0
;
}
void
zmq
::
dispatcher_t
::
unregister_endpoints
(
socket_base_t
*
socket_
)
{
endpoints_sync
.
lock
();
endpoints_t
::
iterator
it
=
endpoints
.
begin
();
while
(
it
!=
endpoints
.
end
())
{
if
(
it
->
second
==
socket_
)
{
endpoints_t
::
iterator
to_erase
=
it
;
it
++
;
endpoints
.
erase
(
to_erase
);
continue
;
}
it
++
;
}
endpoints_sync
.
unlock
();
}
zmq
::
socket_base_t
*
zmq
::
dispatcher_t
::
find_endpoint
(
const
char
*
addr_
)
{
endpoints_sync
.
lock
();
endpoints_t
::
iterator
it
=
endpoints
.
find
(
addr_
);
if
(
it
==
endpoints
.
end
())
{
endpoints_sync
.
unlock
();
errno
=
ECONNREFUSED
;
return
NULL
;
}
socket_base_t
*
endpoint
=
it
->
second
;
// Increment the command sequence number of the peer so that it won't
// get deallocated until "bind" command is issued by the caller.
endpoint
->
inc_seqnum
();
endpoints_sync
.
unlock
();
return
endpoint
;
}
src/dispatcher.hpp
View file @
0e9ab2e8
...
@@ -97,6 +97,11 @@ namespace zmq
...
@@ -97,6 +97,11 @@ namespace zmq
void
register_pipe
(
class
pipe_t
*
pipe_
);
void
register_pipe
(
class
pipe_t
*
pipe_
);
void
unregister_pipe
(
class
pipe_t
*
pipe_
);
void
unregister_pipe
(
class
pipe_t
*
pipe_
);
// Management of inproc endpoints.
int
register_endpoint
(
const
char
*
addr_
,
class
socket_base_t
*
socket_
);
void
unregister_endpoints
(
class
socket_base_t
*
socket_
);
class
socket_base_t
*
find_endpoint
(
const
char
*
addr_
);
private
:
private
:
~
dispatcher_t
();
~
dispatcher_t
();
...
@@ -149,6 +154,13 @@ namespace zmq
...
@@ -149,6 +154,13 @@ namespace zmq
// and 'terminated' flag).
// and 'terminated' flag).
mutex_t
term_sync
;
mutex_t
term_sync
;
// List of inproc endpoints within this context.
typedef
std
::
map
<
std
::
string
,
class
socket_base_t
*>
endpoints_t
;
endpoints_t
endpoints
;
// Synchronisation of access to the list of inproc endpoints.
mutex_t
endpoints_sync
;
dispatcher_t
(
const
dispatcher_t
&
);
dispatcher_t
(
const
dispatcher_t
&
);
void
operator
=
(
const
dispatcher_t
&
);
void
operator
=
(
const
dispatcher_t
&
);
};
};
...
...
src/object.cpp
View file @
0e9ab2e8
...
@@ -122,6 +122,21 @@ void zmq::object_t::unregister_pipe (class pipe_t *pipe_)
...
@@ -122,6 +122,21 @@ void zmq::object_t::unregister_pipe (class pipe_t *pipe_)
dispatcher
->
unregister_pipe
(
pipe_
);
dispatcher
->
unregister_pipe
(
pipe_
);
}
}
int
zmq
::
object_t
::
register_endpoint
(
const
char
*
addr_
,
socket_base_t
*
socket_
)
{
return
dispatcher
->
register_endpoint
(
addr_
,
socket_
);
}
void
zmq
::
object_t
::
unregister_endpoints
(
socket_base_t
*
socket_
)
{
return
dispatcher
->
unregister_endpoints
(
socket_
);
}
zmq
::
socket_base_t
*
zmq
::
object_t
::
find_endpoint
(
const
char
*
addr_
)
{
return
dispatcher
->
find_endpoint
(
addr_
);
}
zmq
::
io_thread_t
*
zmq
::
object_t
::
choose_io_thread
(
uint64_t
taskset_
)
zmq
::
io_thread_t
*
zmq
::
object_t
::
choose_io_thread
(
uint64_t
taskset_
)
{
{
return
dispatcher
->
choose_io_thread
(
taskset_
);
return
dispatcher
->
choose_io_thread
(
taskset_
);
...
...
src/object.hpp
View file @
0e9ab2e8
...
@@ -49,6 +49,12 @@ namespace zmq
...
@@ -49,6 +49,12 @@ namespace zmq
protected
:
protected
:
// Using following function, socket is able to access global
// repository of inproc endpoints.
int
register_endpoint
(
const
char
*
addr_
,
class
socket_base_t
*
socket_
);
void
unregister_endpoints
(
class
socket_base_t
*
socket_
);
class
socket_base_t
*
find_endpoint
(
const
char
*
addr_
);
// Derived object can use following functions to interact with
// Derived object can use following functions to interact with
// global repositories. See dispatcher.hpp for function details.
// global repositories. See dispatcher.hpp for function details.
int
thread_slot_count
();
int
thread_slot_count
();
...
...
src/socket_base.cpp
View file @
0e9ab2e8
...
@@ -43,7 +43,9 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
...
@@ -43,7 +43,9 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
pending_term_acks
(
0
),
pending_term_acks
(
0
),
ticks
(
0
),
ticks
(
0
),
app_thread
(
parent_
),
app_thread
(
parent_
),
shutting_down
(
false
)
shutting_down
(
false
),
sent_seqnum
(
0
),
processed_seqnum
(
0
)
{
{
}
}
...
@@ -81,6 +83,9 @@ int zmq::socket_base_t::bind (const char *addr_)
...
@@ -81,6 +83,9 @@ int zmq::socket_base_t::bind (const char *addr_)
addr_type
=
addr
.
substr
(
0
,
pos
);
addr_type
=
addr
.
substr
(
0
,
pos
);
addr_args
=
addr
.
substr
(
pos
+
3
);
addr_args
=
addr
.
substr
(
pos
+
3
);
if
(
addr_type
==
"inproc"
)
return
register_endpoint
(
addr_args
.
c_str
(),
this
);
if
(
addr_type
==
"tcp"
)
{
if
(
addr_type
==
"tcp"
)
{
zmq_listener_t
*
listener
=
new
zmq_listener_t
(
zmq_listener_t
*
listener
=
new
zmq_listener_t
(
choose_io_thread
(
options
.
affinity
),
this
,
options
);
choose_io_thread
(
options
.
affinity
),
this
,
options
);
...
@@ -126,6 +131,42 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -126,6 +131,42 @@ int zmq::socket_base_t::connect (const char *addr_)
addr_type
=
addr
.
substr
(
0
,
pos
);
addr_type
=
addr
.
substr
(
0
,
pos
);
addr_args
=
addr
.
substr
(
pos
+
3
);
addr_args
=
addr
.
substr
(
pos
+
3
);
if
(
addr_type
==
"inproc"
)
{
// Find the peer socket.
socket_base_t
*
peer
=
find_endpoint
(
addr_args
.
c_str
());
if
(
!
peer
)
return
-
1
;
pipe_t
*
in_pipe
=
NULL
;
pipe_t
*
out_pipe
=
NULL
;
// Create inbound pipe, if required.
if
(
options
.
requires_in
)
{
in_pipe
=
new
pipe_t
(
this
,
peer
,
options
.
hwm
,
options
.
lwm
);
zmq_assert
(
in_pipe
);
}
// Create outbound pipe, if required.
if
(
options
.
requires_out
)
{
out_pipe
=
new
pipe_t
(
peer
,
this
,
options
.
hwm
,
options
.
lwm
);
zmq_assert
(
out_pipe
);
}
// Attach the pipes to this socket object.
attach_pipes
(
in_pipe
?
&
in_pipe
->
reader
:
NULL
,
out_pipe
?
&
out_pipe
->
writer
:
NULL
);
// Attach the pipes to the peer socket. Note that peer's seqnum
// was incremented in find_endpoint function. When this command
// is delivered, peer will consider the seqnum to be processed.
// TODO: Seems that 'session' parameter is unused...
send_bind
(
peer
,
NULL
,
out_pipe
?
&
out_pipe
->
reader
:
NULL
,
in_pipe
?
&
in_pipe
->
writer
:
NULL
);
return
0
;
}
// Create the session.
// Create the session.
io_thread_t
*
io_thread
=
choose_io_thread
(
options
.
affinity
);
io_thread_t
*
io_thread
=
choose_io_thread
(
options
.
affinity
);
session_t
*
session
=
new
session_t
(
io_thread
,
this
,
session_name
.
c_str
(),
session_t
*
session
=
new
session_t
(
io_thread
,
this
,
session_name
.
c_str
(),
...
@@ -319,13 +360,24 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
...
@@ -319,13 +360,24 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
int
zmq
::
socket_base_t
::
close
()
int
zmq
::
socket_base_t
::
close
()
{
{
shutting_down
=
true
;
// Let the thread know that the socket is no longer available.
app_thread
->
remove_socket
(
this
);
app_thread
->
remove_socket
(
this
);
// Pointer to the dispatcher must be retrieved before the socket is
// Pointer to the dispatcher must be retrieved before the socket is
// deallocated. Afterwards it is not available.
// deallocated. Afterwards it is not available.
dispatcher_t
*
dispatcher
=
get_dispatcher
();
dispatcher_t
*
dispatcher
=
get_dispatcher
();
shutting_down
=
true
;
// Unregister all inproc endpoints associated with this socket.
// From this point we are sure that inc_seqnum won't be called again
// on this object.
dispatcher
->
unregister_endpoints
(
this
);
// Wait till all undelivered commands are delivered. This should happen
// very quickly. There's no way to wait here for extensive period of time.
while
(
processed_seqnum
!=
sent_seqnum
.
get
())
app_thread
->
process_commands
(
true
,
false
);
while
(
true
)
{
while
(
true
)
{
...
@@ -364,6 +416,12 @@ int zmq::socket_base_t::close ()
...
@@ -364,6 +416,12 @@ int zmq::socket_base_t::close ()
return
0
;
return
0
;
}
}
void
zmq
::
socket_base_t
::
inc_seqnum
()
{
// NB: This function may be called from a different thread!
sent_seqnum
.
add
(
1
);
}
zmq
::
app_thread_t
*
zmq
::
socket_base_t
::
get_thread
()
zmq
::
app_thread_t
*
zmq
::
socket_base_t
::
get_thread
()
{
{
return
app_thread
;
return
app_thread
;
...
...
src/socket_base.hpp
View file @
0e9ab2e8
...
@@ -33,6 +33,7 @@
...
@@ -33,6 +33,7 @@
#include "mutex.hpp"
#include "mutex.hpp"
#include "options.hpp"
#include "options.hpp"
#include "stdint.hpp"
#include "stdint.hpp"
#include "atomic_counter.hpp"
namespace
zmq
namespace
zmq
{
{
...
@@ -54,6 +55,11 @@ namespace zmq
...
@@ -54,6 +55,11 @@ namespace zmq
int
recv
(
zmq_msg_t
*
msg_
,
int
flags_
);
int
recv
(
zmq_msg_t
*
msg_
,
int
flags_
);
int
close
();
int
close
();
// When another owned object wants to send command to this object
// it calls this function to let it know it should not shut down
// before the command is delivered.
void
inc_seqnum
();
// This function is used by the polling mechanism to determine
// This function is used by the polling mechanism to determine
// whether the socket belongs to the application thread the poll
// whether the socket belongs to the application thread the poll
// is called from.
// is called from.
...
@@ -132,6 +138,12 @@ namespace zmq
...
@@ -132,6 +138,12 @@ namespace zmq
// started.
// started.
bool
shutting_down
;
bool
shutting_down
;
// Sequence number of the last command sent to this object.
atomic_counter_t
sent_seqnum
;
// Sequence number of the last command processed by this object.
uint64_t
processed_seqnum
;
// List of existing sessions. This list is never referenced from within
// List of existing sessions. This list is never referenced from within
// the socket, instead it is used by I/O objects owned by the session.
// the socket, instead it is used by I/O objects owned by the session.
// As those objects can live in different threads, the access is
// As those objects can live in different threads, the access is
...
...
src/zmq.cpp
View file @
0e9ab2e8
...
@@ -198,8 +198,10 @@ size_t zmq_msg_size (zmq_msg_t *msg_)
...
@@ -198,8 +198,10 @@ size_t zmq_msg_size (zmq_msg_t *msg_)
void
*
zmq_init
(
int
app_threads_
,
int
io_threads_
,
int
flags_
)
void
*
zmq_init
(
int
app_threads_
,
int
io_threads_
,
int
flags_
)
{
{
// There should be at least a single thread managed by the dispatcher.
// There should be at least a single application thread managed
if
(
app_threads_
<=
0
||
io_threads_
<=
0
||
// by the dispatcher. There's no need for I/O threads if 0MQ is used
// only for inproc messaging
if
(
app_threads_
<
1
||
io_threads_
<
0
||
app_threads_
>
63
||
io_threads_
>
63
)
{
app_threads_
>
63
||
io_threads_
>
63
)
{
errno
=
EINVAL
;
errno
=
EINVAL
;
return
NULL
;
return
NULL
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment