Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
d844a906
Commit
d844a906
authored
Jun 04, 2010
by
Martin Lucina
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
zmqd: Removing for now, not ready for 2.0.7
parent
606c7736
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
3 additions
and
375 deletions
+3
-375
Makefile.am
Makefile.am
+2
-2
configure.in
configure.in
+1
-1
Makefile.am
zmqd/Makefile.am
+0
-8
zmqd.cpp
zmqd/zmqd.cpp
+0
-364
No files found.
Makefile.am
View file @
d844a906
ACLOCAL_AMFLAGS
=
-I
config
SUBDIRS
=
src doc perf devices
zmqd
DIST_SUBDIRS
=
src doc perf devices
zmqd
builds/msvc
SUBDIRS
=
src doc perf devices
DIST_SUBDIRS
=
src doc perf devices builds/msvc
EXTRA_DIST
=
\
$(top_srcdir)
/foreign/openpgm/@pgm_basename@.tar.gz
\
...
...
configure.in
View file @
d844a906
...
...
@@ -373,7 +373,7 @@ AC_OUTPUT(Makefile src/Makefile doc/Makefile
perf/Makefile src/libzmq.pc \
devices/Makefile devices/zmq_forwarder/Makefile \
devices/zmq_streamer/Makefile devices/zmq_queue/Makefile \
zmqd/Makefile
builds/msvc/Makefile)
builds/msvc/Makefile)
# On Linux patch libtool to delete hardcoded paths (rpath).
case "${host_os}" in
...
...
zmqd/Makefile.am
deleted
100644 → 0
View file @
606c7736
INCLUDES
=
-I
$(top_srcdir)
/include
bin_PROGRAMS
=
zmqd
zmqd_LDADD
=
$(top_builddir)
/src/libzmq.la
zmqd_SOURCES
=
zmqd.cpp
zmqd/zmqd.cpp
deleted
100644 → 0
View file @
606c7736
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <vector>
#include <string>
#include <memory>
#include "../include/zmq.hpp"
#include "../foreign/xmlParser/xmlParser.cpp"
class
device_cfg_t
{
enum
endpoint_direction
{
connect
,
bind
};
typedef
std
::
pair
<
endpoint_direction
,
std
::
string
>
sock_details_t
;
typedef
std
::
vector
<
sock_details_t
>
vsock_dets_t
;
public
:
explicit
device_cfg_t
(
int
type
)
:
device_type
(
type
),
context
(
0
),
in_socket
(
0
),
out_socket
(
0
)
{
}
virtual
~
device_cfg_t
()
{
delete
out_socket
;
delete
in_socket
;
}
bool
init
(
XMLNode
&
device
)
{
XMLNode
in_node
=
device
.
getChildNode
(
"in"
);
if
(
in_node
.
isEmpty
())
{
fprintf
(
stderr
,
"'in' node is missing in the configuration file
\n
"
);
return
false
;
}
XMLNode
out_node
=
device
.
getChildNode
(
"out"
);
if
(
out_node
.
isEmpty
())
{
fprintf
(
stderr
,
"'out' node is missing in the configuration file
\n
"
);
return
false
;
}
if
(
!
process_node
(
in_node
,
true
,
device_cfg_t
::
bind
))
return
false
;
if
(
!
process_node
(
in_node
,
true
,
device_cfg_t
::
connect
))
return
false
;
if
(
!
process_node
(
out_node
,
false
,
device_cfg_t
::
bind
))
return
false
;
if
(
!
process_node
(
out_node
,
false
,
device_cfg_t
::
connect
))
return
false
;
return
true
;
}
void
set_context
(
zmq
::
context_t
*
context_
)
{
context
=
context_
;
}
zmq
::
context_t
*
get_context
()
const
{
return
context
;
}
virtual
bool
make_sockets
()
=
0
;
bool
set_up_connections
()
{
for
(
vsock_dets_t
::
const_iterator
i
=
in
.
begin
();
i
!=
in
.
end
();
++
i
)
{
switch
(
i
->
first
)
{
case
device_cfg_t
:
:
connect
:
in_socket
->
connect
(
i
->
second
.
c_str
());
break
;
case
device_cfg_t
:
:
bind
:
in_socket
->
bind
(
i
->
second
.
c_str
());
}
}
for
(
vsock_dets_t
::
const_iterator
i
=
out
.
begin
();
i
!=
out
.
end
();
++
i
)
{
switch
(
i
->
first
)
{
case
device_cfg_t
:
:
connect
:
out_socket
->
connect
(
i
->
second
.
c_str
());
break
;
case
device_cfg_t
:
:
bind
:
out_socket
->
bind
(
i
->
second
.
c_str
());
}
}
return
true
;
}
void
run
()
{
zmq
::
device
(
device_type
,
*
in_socket
,
*
out_socket
);
}
protected
:
bool
make_sockets
(
int
in_type
,
int
out_type
)
{
in_socket
=
new
(
std
::
nothrow
)
zmq
::
socket_t
(
*
context
,
in_type
);
if
(
!
in_socket
)
return
false
;
out_socket
=
new
(
std
::
nothrow
)
zmq
::
socket_t
(
*
context
,
out_type
);
if
(
!
out_socket
)
{
return
false
;
}
return
true
;
}
int
process_node
(
XMLNode
&
target_
,
bool
in_
,
device_cfg_t
::
endpoint_direction
ept_
)
{
const
char
*
name
=
(
ept_
==
device_cfg_t
::
connect
)
?
"connect"
:
"bind"
;
int
n
=
0
;
while
(
true
)
{
XMLNode
connect
=
target_
.
getChildNode
(
name
,
n
);
if
(
connect
.
isEmpty
())
break
;
const
char
*
addr
=
connect
.
getAttribute
(
"addr"
);
if
(
!
addr
)
{
fprintf
(
stderr
,
"'%s' node is missing 'addr' attribute
\n
"
,
name
);
return
0
;
}
if
(
in_
)
in
.
push_back
(
sock_details_t
(
ept_
,
addr
));
else
out
.
push_back
(
sock_details_t
(
ept_
,
addr
));
n
++
;
}
return
1
;
}
protected
:
int
device_type
;
zmq
::
context_t
*
context
;
vsock_dets_t
in
;
vsock_dets_t
out
;
zmq
::
socket_t
*
in_socket
;
zmq
::
socket_t
*
out_socket
;
private
:
device_cfg_t
(
device_cfg_t
const
&
);
void
operator
=
(
device_cfg_t
const
&
);
};
class
queue_device_cfg_t
:
public
device_cfg_t
{
public
:
queue_device_cfg_t
()
:
device_cfg_t
(
ZMQ_QUEUE
)
{
}
virtual
bool
make_sockets
()
{
return
device_cfg_t
::
make_sockets
(
ZMQ_XREP
,
ZMQ_XREQ
);
}
};
class
streamer_device_cfg_t
:
public
device_cfg_t
{
public
:
streamer_device_cfg_t
()
:
device_cfg_t
(
ZMQ_STREAMER
)
{
}
virtual
bool
make_sockets
()
{
return
device_cfg_t
::
make_sockets
(
ZMQ_UPSTREAM
,
ZMQ_DOWNSTREAM
);
}
};
class
forwarder_device_cfg_t
:
public
device_cfg_t
{
public
:
forwarder_device_cfg_t
()
:
device_cfg_t
(
ZMQ_FORWARDER
)
{
}
virtual
bool
make_sockets
()
{
if
(
!
device_cfg_t
::
make_sockets
(
ZMQ_SUB
,
ZMQ_PUB
)
)
{
return
false
;
}
in_socket
->
setsockopt
(
ZMQ_SUBSCRIBE
,
""
,
0
);
return
true
;
}
};
device_cfg_t
*
make_device_config
(
XMLNode
&
device
)
{
const
char
*
dev_type
=
device
.
getAttribute
(
"type"
);
if
(
!
dev_type
)
{
fprintf
(
stderr
,
"'device' node is missing 'type' attribute
\n
"
);
return
NULL
;
}
if
(
strcmp
(
dev_type
,
"forwarder"
)
==
0
)
{
return
new
(
std
::
nothrow
)
forwarder_device_cfg_t
;
}
else
if
(
strcmp
(
dev_type
,
"streamer"
)
==
0
)
{
return
new
(
std
::
nothrow
)
streamer_device_cfg_t
;
}
else
if
(
strcmp
(
dev_type
,
"queue"
)
==
0
)
{
return
new
(
std
::
nothrow
)
queue_device_cfg_t
;
}
fprintf
(
stderr
,
"type attribute in the device configuration file "
"should be named 'forwarder', 'streamer' or 'queue'
\n
"
);
return
NULL
;
}
extern
"C"
void
*
worker_function
(
void
*
arg
)
{
if
(
!
arg
)
{
fprintf
(
stderr
,
"arg is null, returning
\n
"
);
return
0
;
}
std
::
auto_ptr
<
device_cfg_t
>
cfg
((
device_cfg_t
*
)
arg
);
zmq
::
context_t
*
ctx
=
cfg
->
get_context
();
if
(
!
ctx
)
{
fprintf
(
stderr
,
"no context, returning
\n
"
);
return
0
;
}
if
(
!
cfg
->
make_sockets
())
{
fprintf
(
stderr
,
"failed to make sockets, returning
\n
"
);
return
0
;
}
if
(
!
cfg
->
set_up_connections
())
{
fprintf
(
stderr
,
"failed to set up connections, returning
\n
"
);
return
0
;
}
cfg
->
run
();
return
0
;
}
int
main
(
int
argc
,
char
*
argv
[])
{
if
(
argc
!=
2
)
{
fprintf
(
stderr
,
"usage: zmqd <config-file>
\n
"
);
return
1
;
}
XMLNode
root
=
XMLNode
::
parseFile
(
argv
[
1
]);
if
(
root
.
isEmpty
())
{
fprintf
(
stderr
,
"configuration file not found or not an XML file
\n
"
);
return
1
;
}
if
(
strcmp
(
root
.
getName
(),
"config"
)
!=
0
)
{
fprintf
(
stderr
,
"root element in the configuration file should be "
"named 'config'
\n
"
);
return
1
;
}
std
::
vector
<
device_cfg_t
*>
vdev
;
while
(
true
)
{
XMLNode
device
=
root
.
getChildNode
(
"device"
,
vdev
.
size
());
if
(
device
.
isEmpty
())
break
;
device_cfg_t
*
dev
=
make_device_config
(
device
);
if
(
!
dev
)
{
fprintf
(
stderr
,
"failed to create device config
\n
"
);
return
1
;
}
if
(
!
dev
->
init
(
device
))
{
fprintf
(
stderr
,
"error with initialising device configuration
\n
"
);
delete
dev
;
return
1
;
}
vdev
.
push_back
(
dev
);
}
std
::
vector
<
device_cfg_t
*>::
size_type
num_devices
=
vdev
.
size
();
if
(
num_devices
==
0
)
{
fprintf
(
stderr
,
"no devices in the config file
\n
"
);
return
1
;
}
zmq
::
context_t
ctx
(
num_devices
,
1
);
for
(
unsigned
int
i
=
0
;
i
<
num_devices
;
++
i
)
{
vdev
[
i
]
->
set_context
(
&
ctx
);
if
(
i
)
{
pthread_t
worker
;
int
rc
=
pthread_create
(
&
worker
,
NULL
,
&
worker_function
,
(
void
*
)
vdev
[
i
]);
assert
(
rc
==
0
);
}
}
worker_function
((
void
*
)
vdev
[
0
]);
return
0
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment