Commit f34a468a authored by Martin Sustrik's avatar Martin Sustrik

coding style fixed in zmqd

parent 7773fddd
...@@ -24,271 +24,273 @@ ...@@ -24,271 +24,273 @@
#include "../include/zmq.hpp" #include "../include/zmq.hpp"
#include "../foreign/xmlParser/xmlParser.cpp" #include "../foreign/xmlParser/xmlParser.cpp"
namespace { class device_cfg_t
{
class device_cfg_t enum endpoint_direction {connect, bind};
{
enum endpoint_direction {connect, bind}; typedef std::pair <endpoint_direction, std::string> sock_details_t;
typedef std::pair<endpoint_direction, std::string> sock_details_t; typedef std::vector <sock_details_t> vsock_dets_t;
typedef std::vector<sock_details_t> vsock_dets_t; public:
public: explicit device_cfg_t (int type) :
device_type (type),
context (0),
in_socket (0),
out_socket (0)
{
}
explicit device_cfg_t(int type) virtual ~device_cfg_t()
: device_type(type) , context(0) {
, in_socket(0), out_socket(0) delete out_socket;
{ delete in_socket;
} }
virtual ~device_cfg_t() bool init (XMLNode &device)
{ {
delete out_socket; XMLNode in_node = device.getChildNode ("in");
delete in_socket; if (in_node.isEmpty ()) {
fprintf (stderr,
"'in' node is missing in the configuration file\n");
return false;
} }
bool init(XMLNode& device) XMLNode out_node = device.getChildNode ("out");
{ if (out_node.isEmpty ()) {
fprintf (stderr,
"'out' node is missing in the configuration file\n");
return false;
}
XMLNode in_node = device.getChildNode ("in"); if (!process_node (in_node, true, device_cfg_t::bind))
if (in_node.isEmpty ()) { return false;
fprintf (stderr, "'in' node is missing in the configuration file\n"); if (!process_node (in_node, true, device_cfg_t::connect))
return false; return false;
} if (!process_node (out_node, false, device_cfg_t::bind))
return false;
if (!process_node (out_node, false, device_cfg_t::connect))
return false;
XMLNode out_node = device.getChildNode ("out"); return true;
if (out_node.isEmpty ()) { }
fprintf (stderr, "'out' node is missing in the configuration file\n");
return false;
}
if (!process_node(in_node,true,device_cfg_t::bind)) void set_context(zmq::context_t *context_)
return false; {
if (!process_node(in_node,true,device_cfg_t::connect)) context = context_;
return false; }
if (!process_node(out_node,false,device_cfg_t::bind))
return false;
if (!process_node(out_node,false,device_cfg_t::connect))
return false;
return true; zmq::context_t *get_context () const
} {
return context;
}
void set_context(zmq::context_t* context_) virtual bool make_sockets () = 0;
{
context = context_;
}
zmq::context_t *get_context() const bool set_up_connections ()
{ {
return context; for (vsock_dets_t::const_iterator i = in.begin (); i != in.end ();
++i) {
switch (i->first)
{
case device_cfg_t::connect:
in_socket->connect (i->second.c_str ());
break;
case device_cfg_t::bind:
in_socket->bind (i->second.c_str ());
}
} }
virtual bool make_sockets() = 0; for (vsock_dets_t::const_iterator i = out.begin (); i != out.end ();
++i) {
bool set_up_connections() switch (i->first)
{ {
for (vsock_dets_t::const_iterator i = in.begin() ; i != in.end(); case device_cfg_t::connect:
++i) { out_socket->connect (i->second.c_str ());
break;
switch (i->first) case device_cfg_t::bind:
{ out_socket->bind (i->second.c_str ());
case device_cfg_t::connect :
in_socket->connect(i->second.c_str());
break;
case device_cfg_t::bind :
in_socket->bind(i->second.c_str());
}
} }
for (vsock_dets_t::const_iterator i = out.begin() ; i != out.end();
++i) {
switch (i->first)
{
case device_cfg_t::connect :
out_socket->connect(i->second.c_str());
break;
case device_cfg_t::bind :
out_socket->bind(i->second.c_str());
}
}
return true;
} }
return true;
}
void run() void run()
{ {
zmq::device(device_type, *in_socket, *out_socket); zmq::device(device_type, *in_socket, *out_socket);
} }
protected: protected:
bool make_sockets(int in_type, int out_type) bool make_sockets (int in_type, int out_type)
{ {
in_socket = new (std::nothrow) zmq::socket_t(*context, in_type); in_socket = new (std::nothrow) zmq::socket_t (*context, in_type);
if (!in_socket) if (!in_socket)
return false; return false;
out_socket = new (std::nothrow) zmq::socket_t(*context, out_type); out_socket = new (std::nothrow) zmq::socket_t (*context, out_type);
if (!out_socket) { if (!out_socket) {
return false; return false;
}
return true;
} }
return true;
}
int process_node(XMLNode& target_, bool in_, int process_node (XMLNode& target_, bool in_,
device_cfg_t::endpoint_direction ept_) device_cfg_t::endpoint_direction ept_)
{ {
const char *name =
const char * name = (ept_ == device_cfg_t::connect) ? "connect" : "bind";
(ept_ == device_cfg_t::connect) ? "connect" : "bind"; int n = 0;
int n = 0; while (true) {
while (true) { XMLNode connect = target_.getChildNode (name, n);
XMLNode connect = target_.getChildNode (name, n); if (connect.isEmpty ())
if (connect.isEmpty ()) break;
break; const char *addr = connect.getAttribute ("addr");
const char *addr = connect.getAttribute ("addr"); if (!addr) {
if (!addr) { fprintf (stderr, "'%s' node is missing 'addr' attribute\n",
fprintf (stderr, "'%s' node is missing 'addr' attribute\n", name);
name); return 0;
return 0;
}
if (in_)
in.push_back( sock_details_t(ept_, addr));
else
out.push_back( sock_details_t(ept_, addr));
n++;
} }
return 1; if (in_)
in.push_back (sock_details_t (ept_, addr));
else
out.push_back (sock_details_t (ept_, addr));
n++;
} }
return 1;
}
protected: protected:
int device_type; int device_type;
zmq::context_t* context; zmq::context_t *context;
vsock_dets_t in; vsock_dets_t in;
vsock_dets_t out; vsock_dets_t out;
zmq::socket_t* in_socket; zmq::socket_t *in_socket;
zmq::socket_t* out_socket; zmq::socket_t *out_socket;
private: private:
void operator = (device_cfg_t const &);
device_cfg_t(device_cfg_t const &);
};
device_cfg_t (device_cfg_t const&);
void operator = (device_cfg_t const&);
};
class queue_device_cfg_t : public device_cfg_t
{
public:
class queue_device_cfg_t : public device_cfg_t queue_device_cfg_t () :
device_cfg_t (ZMQ_QUEUE)
{ {
public: }
queue_device_cfg_t()
: device_cfg_t(ZMQ_QUEUE) virtual bool make_sockets ()
{} {
virtual bool make_sockets(){ return device_cfg_t::make_sockets (ZMQ_XREP, ZMQ_XREQ);
return device_cfg_t::make_sockets(ZMQ_XREP, ZMQ_XREQ); }
} };
};
class streamer_device_cfg_t : public device_cfg_t class streamer_device_cfg_t : public device_cfg_t
{
public:
streamer_device_cfg_t () :
device_cfg_t (ZMQ_STREAMER)
{ {
public: }
streamer_device_cfg_t()
: device_cfg_t(ZMQ_STREAMER)
{}
virtual bool make_sockets () {
return device_cfg_t::make_sockets(ZMQ_UPSTREAM, ZMQ_DOWNSTREAM);
}
};
class forwarder_device_cfg_t : public device_cfg_t virtual bool make_sockets ()
{ {
public: return device_cfg_t::make_sockets (ZMQ_UPSTREAM, ZMQ_DOWNSTREAM);
forwarder_device_cfg_t() }
: device_cfg_t(ZMQ_FORWARDER) };
{}
virtual bool make_sockets() {
if (!device_cfg_t::make_sockets(ZMQ_SUB, ZMQ_PUB) ) {
return false;
}
in_socket->setsockopt (ZMQ_SUBSCRIBE, "", 0);
return true;
}
};
class forwarder_device_cfg_t : public device_cfg_t
{
public:
device_cfg_t* make_device_config(XMLNode& device) forwarder_device_cfg_t() :
device_cfg_t (ZMQ_FORWARDER)
{ {
const char *dev_type = device.getAttribute ("type"); }
if (!dev_type) {
fprintf (stderr, "'device' node is missing 'type' attribute\n");
return NULL;
}
if (strcmp (dev_type, "forwarder") == 0) { virtual bool make_sockets()
return new (std::nothrow) forwarder_device_cfg_t; {
} if (!device_cfg_t::make_sockets (ZMQ_SUB, ZMQ_PUB) ) {
else if (strcmp (dev_type, "streamer") == 0) { return false;
return new (std::nothrow) streamer_device_cfg_t;
}
else if (strcmp (dev_type, "queue") == 0) {
return new (std::nothrow) queue_device_cfg_t;
} }
in_socket->setsockopt (ZMQ_SUBSCRIBE, "", 0);
fprintf (stderr, "type attribute in the device configuration file " return true;
"should be named 'forwarder', 'streamer' or 'queue'\n");
return NULL;
} }
};
extern "C" void* worker_function(void *arg) device_cfg_t *make_device_config (XMLNode& device)
{ {
const char *dev_type = device.getAttribute ("type");
if (!arg) { if (!dev_type) {
fprintf (stderr, "arg is null, returning \n"); fprintf (stderr, "'device' node is missing 'type' attribute\n");
return 0; return NULL;
} }
std::auto_ptr<device_cfg_t> cfg ( (device_cfg_t*) arg ); if (strcmp (dev_type, "forwarder") == 0) {
return new (std::nothrow) forwarder_device_cfg_t;
}
else if (strcmp (dev_type, "streamer") == 0) {
return new (std::nothrow) streamer_device_cfg_t;
}
else if (strcmp (dev_type, "queue") == 0) {
return new (std::nothrow) queue_device_cfg_t;
}
fprintf (stderr, "type attribute in the device configuration file "
"should be named 'forwarder', 'streamer' or 'queue'\n");
zmq::context_t* ctx = cfg->get_context(); return NULL;
}
if (!ctx) {
fprintf (stderr, "no context, returning \n");
return 0;
}
if (! cfg->make_sockets()) { extern "C" void* worker_function(void *arg)
fprintf (stderr, "failed to make sockets, returning \n"); {
return 0; if (!arg) {
} fprintf (stderr, "arg is null, returning \n");
return 0;
}
std::auto_ptr <device_cfg_t> cfg ((device_cfg_t*) arg);
if (! cfg->set_up_connections()) { zmq::context_t *ctx = cfg->get_context ();
fprintf (stderr, "failed to set up connections, returning \n");
return 0;
}
cfg->run(); if (!ctx) {
fprintf (stderr, "no context, returning \n");
return 0;
}
if (!cfg->make_sockets ()) {
fprintf (stderr, "failed to make sockets, returning \n");
return 0; return 0;
}
if (!cfg->set_up_connections ()) {
fprintf (stderr, "failed to set up connections, returning \n");
return 0;
} }
cfg->run();
return 0;
} }
int main (int argc, char *argv []) int main (int argc, char *argv [])
{ {
if (argc != 2) { if (argc != 2) {
...@@ -309,8 +311,7 @@ int main (int argc, char *argv []) ...@@ -309,8 +311,7 @@ int main (int argc, char *argv [])
return 1; return 1;
} }
std::vector <device_cfg_t*> vdev;
std::vector<device_cfg_t*> vdev;
while (true) { while (true) {
...@@ -319,49 +320,44 @@ int main (int argc, char *argv []) ...@@ -319,49 +320,44 @@ int main (int argc, char *argv [])
if (device.isEmpty()) if (device.isEmpty())
break; break;
device_cfg_t* dev = make_device_config(device); device_cfg_t* dev = make_device_config (device);
if (!dev) { if (!dev) {
fprintf(stderr, "failed to create device config\n"); fprintf(stderr, "failed to create device config\n");
return 1; return 1;
} }
if (! dev->init(device) ) { if (!dev->init (device)) {
fprintf(stderr,"error with initialising device configuration\n"); fprintf(stderr,"error with initialising device configuration\n");
delete dev; delete dev;
return 1; return 1;
} }
vdev.push_back(dev); vdev.push_back (dev);
} }
std::vector<device_cfg_t*>::size_type num_devices = vdev.size(); std::vector <device_cfg_t*>::size_type num_devices = vdev.size ();
if ( num_devices == 0 ) { if (num_devices == 0) {
fprintf(stderr,"no devices in the config file\n"); fprintf(stderr,"no devices in the config file\n");
return 1; return 1;
} }
zmq::context_t ctx (num_devices, 1);
zmq::context_t ctx (num_devices,1);
for (unsigned int i = 0 ; i < num_devices ; ++i) { for (unsigned int i = 0 ; i < num_devices ; ++i) {
vdev[i]->set_context(&ctx); vdev [i]->set_context (&ctx);
if (i) { if (i) {
pthread_t worker; pthread_t worker;
int rc = pthread_create (&worker, NULL, &worker_function, int rc = pthread_create (&worker, NULL, &worker_function,
(void*) vdev[i]); (void*) vdev [i]);
assert (rc == 0); assert (rc == 0);
} }
} }
worker_function ((void*) vdev [0]);
worker_function((void*)vdev[0]);
return 0; return 0;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment