io_thread.cpp 2.4 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
Martin Sustrik's avatar
Martin Sustrik committed
2
    Copyright (c) 2009-2011 250bpm s.r.o.
3
    Copyright (c) 2007-2009 iMatix Corporation
4
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
5 6 7 8

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
9
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
10 11 12 13 14 15
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
17

18
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
19 20 21
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

22 23
#include <new>

Martin Sustrik's avatar
Martin Sustrik committed
24 25 26
#include "io_thread.hpp"
#include "platform.hpp"
#include "err.hpp"
27
#include "ctx.hpp"
Martin Sustrik's avatar
Martin Sustrik committed
28

Martin Sustrik's avatar
Martin Sustrik committed
29 30
zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
    object_t (ctx_, tid_)
Martin Sustrik's avatar
Martin Sustrik committed
31
{
32
    poller = new (std::nothrow) poller_t;
33
    alloc_assert (poller);
Martin Sustrik's avatar
Martin Sustrik committed
34

35 36
    mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
    poller->set_pollin (mailbox_handle);
Martin Sustrik's avatar
Martin Sustrik committed
37 38
}

Martin Sustrik's avatar
Martin Sustrik committed
39
zmq::io_thread_t::~io_thread_t ()
Martin Sustrik's avatar
Martin Sustrik committed
40 41 42 43
{
    delete poller;
}

Martin Sustrik's avatar
Martin Sustrik committed
44
void zmq::io_thread_t::start ()
Martin Sustrik's avatar
Martin Sustrik committed
45 46 47 48 49
{
    //  Start the underlying I/O thread.
    poller->start ();
}

Martin Sustrik's avatar
Martin Sustrik committed
50
void zmq::io_thread_t::stop ()
Martin Sustrik's avatar
Martin Sustrik committed
51 52 53 54
{
    send_stop ();
}

55
zmq::mailbox_t *zmq::io_thread_t::get_mailbox ()
Martin Sustrik's avatar
Martin Sustrik committed
56
{
57
    return &mailbox;
Martin Sustrik's avatar
Martin Sustrik committed
58 59
}

Martin Sustrik's avatar
Martin Sustrik committed
60
int zmq::io_thread_t::get_load ()
Martin Sustrik's avatar
Martin Sustrik committed
61 62 63 64
{
    return poller->get_load ();
}

Martin Sustrik's avatar
Martin Sustrik committed
65
void zmq::io_thread_t::in_event ()
Martin Sustrik's avatar
Martin Sustrik committed
66
{
67 68 69
    //  TODO: Do we want to limit number of commands I/O thread can
    //  process in a single go?

70 71 72 73 74 75 76
    command_t cmd;
    int rc = mailbox.recv (&cmd, 0);

    while (rc == 0 || errno == EINTR) {
        if (rc == 0)
            cmd.destination->process_command (cmd);
        rc = mailbox.recv (&cmd, 0);
Martin Sustrik's avatar
Martin Sustrik committed
77
    }
78 79

    errno_assert (rc != 0 && errno == EAGAIN);
Martin Sustrik's avatar
Martin Sustrik committed
80 81
}

Martin Sustrik's avatar
Martin Sustrik committed
82
void zmq::io_thread_t::out_event ()
Martin Sustrik's avatar
Martin Sustrik committed
83 84
{
    //  We are never polling for POLLOUT here. This function is never called.
Martin Sustrik's avatar
Martin Sustrik committed
85
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
86 87
}

88
void zmq::io_thread_t::timer_event (int)
Martin Sustrik's avatar
Martin Sustrik committed
89 90
{
    //  No timers here. This function is never called.
Martin Sustrik's avatar
Martin Sustrik committed
91
    zmq_assert (false);
Martin Sustrik's avatar
Martin Sustrik committed
92 93
}

94
zmq::poller_t *zmq::io_thread_t::get_poller ()
Martin Sustrik's avatar
Martin Sustrik committed
95
{
Martin Sustrik's avatar
Martin Sustrik committed
96
    zmq_assert (poller);
Martin Sustrik's avatar
Martin Sustrik committed
97 98 99
    return poller;
}

Martin Sustrik's avatar
Martin Sustrik committed
100
void zmq::io_thread_t::process_stop ()
Martin Sustrik's avatar
Martin Sustrik committed
101
{
102
    poller->rm_fd (mailbox_handle);
Martin Sustrik's avatar
Martin Sustrik committed
103 104
    poller->stop ();
}