kqueue.cpp 4.69 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2 3
    Copyright (c) 2007-2011 iMatix Corporation
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
4 5 6 7

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
8
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
9 10 11 12 13 14
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
16

17
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
18 19 20
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

21 22
#include "kqueue.hpp"
#if defined ZMQ_USE_KQUEUE
Martin Sustrik's avatar
Martin Sustrik committed
23 24 25 26 27 28 29

#include <sys/time.h>
#include <sys/types.h>
#include <sys/event.h>
#include <stdlib.h>
#include <unistd.h>
#include <algorithm>
30
#include <new>
Martin Sustrik's avatar
Martin Sustrik committed
31 32 33 34 35 36

#include "kqueue.hpp"
#include "err.hpp"
#include "config.hpp"
#include "i_poll_events.hpp"

Martin Lucina's avatar
Martin Lucina committed
37 38 39 40 41 42 43 44
//  NetBSD defines (struct kevent).udata as intptr_t, everyone else
//  as void *.
#if defined ZMQ_HAVE_NETBSD
#define kevent_udata_t intptr_t
#else
#define kevent_udata_t void *
#endif

45 46
zmq::kqueue_t::kqueue_t () :
    stopping (false)
Martin Sustrik's avatar
Martin Sustrik committed
47 48 49 50 51 52
{
    //  Create event queue
    kqueue_fd = kqueue ();
    errno_assert (kqueue_fd != -1);
}

Martin Sustrik's avatar
Martin Sustrik committed
53
zmq::kqueue_t::~kqueue_t ()
Martin Sustrik's avatar
Martin Sustrik committed
54
{
55
    worker.stop ();
Martin Sustrik's avatar
Martin Sustrik committed
56 57 58
    close (kqueue_fd);
}

Martin Sustrik's avatar
Martin Sustrik committed
59
void zmq::kqueue_t::kevent_add (fd_t fd_, short filter_, void *udata_)
Martin Sustrik's avatar
Martin Sustrik committed
60 61 62
{
    struct kevent ev;

Martin Lucina's avatar
Martin Lucina committed
63
    EV_SET (&ev, fd_, filter_, EV_ADD, 0, 0, (kevent_udata_t)udata_);
Martin Sustrik's avatar
Martin Sustrik committed
64 65 66 67
    int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL);
    errno_assert (rc != -1);
}

Martin Sustrik's avatar
Martin Sustrik committed
68
void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_)
Martin Sustrik's avatar
Martin Sustrik committed
69 70 71
{
    struct kevent ev;

72
    EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, 0);
Martin Sustrik's avatar
Martin Sustrik committed
73 74 75 76
    int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL);
    errno_assert (rc != -1);
}

77 78
zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_,
    i_poll_events *reactor_)
Martin Sustrik's avatar
Martin Sustrik committed
79
{
80
    poll_entry_t *pe = new (std::nothrow) poll_entry_t;
81
    alloc_assert (pe);
Martin Sustrik's avatar
Martin Sustrik committed
82 83 84 85 86 87

    pe->fd = fd_;
    pe->flag_pollin = 0;
    pe->flag_pollout = 0;
    pe->reactor = reactor_;

88 89
    adjust_load (1);

90
    return pe;
Martin Sustrik's avatar
Martin Sustrik committed
91 92
}

Martin Sustrik's avatar
Martin Sustrik committed
93
void zmq::kqueue_t::rm_fd (handle_t handle_)
Martin Sustrik's avatar
Martin Sustrik committed
94
{
95
    poll_entry_t *pe = (poll_entry_t*) handle_;
Martin Sustrik's avatar
Martin Sustrik committed
96 97 98 99 100 101
    if (pe->flag_pollin)
        kevent_delete (pe->fd, EVFILT_READ);
    if (pe->flag_pollout)
        kevent_delete (pe->fd, EVFILT_WRITE);
    pe->fd = retired_fd;
    retired.push_back (pe);
102 103

    adjust_load (-1);
Martin Sustrik's avatar
Martin Sustrik committed
104 105
}

Martin Sustrik's avatar
Martin Sustrik committed
106
void zmq::kqueue_t::set_pollin (handle_t handle_)
Martin Sustrik's avatar
Martin Sustrik committed
107
{
108
    poll_entry_t *pe = (poll_entry_t*) handle_;
Martin Sustrik's avatar
Martin Sustrik committed
109 110 111 112
    pe->flag_pollin = true;
    kevent_add (pe->fd, EVFILT_READ, pe);
}

Martin Sustrik's avatar
Martin Sustrik committed
113
void zmq::kqueue_t::reset_pollin (handle_t handle_)
Martin Sustrik's avatar
Martin Sustrik committed
114
{
115
    poll_entry_t *pe = (poll_entry_t*) handle_;
Martin Sustrik's avatar
Martin Sustrik committed
116 117 118 119
    pe->flag_pollin = false;
    kevent_delete (pe->fd, EVFILT_READ);
}

Martin Sustrik's avatar
Martin Sustrik committed
120
void zmq::kqueue_t::set_pollout (handle_t handle_)
Martin Sustrik's avatar
Martin Sustrik committed
121
{
122
    poll_entry_t *pe = (poll_entry_t*) handle_;
Martin Sustrik's avatar
Martin Sustrik committed
123 124 125 126
    pe->flag_pollout = true;
    kevent_add (pe->fd, EVFILT_WRITE, pe);
}

Martin Sustrik's avatar
Martin Sustrik committed
127
void zmq::kqueue_t::reset_pollout (handle_t handle_)
Martin Sustrik's avatar
Martin Sustrik committed
128
{
129
    poll_entry_t *pe = (poll_entry_t*) handle_;
Martin Sustrik's avatar
Martin Sustrik committed
130 131 132 133
    pe->flag_pollout = false;
    kevent_delete (pe->fd, EVFILT_WRITE);
}

Martin Sustrik's avatar
Martin Sustrik committed
134
void zmq::kqueue_t::start ()
Martin Sustrik's avatar
Martin Sustrik committed
135 136 137 138
{
    worker.start (worker_routine, this);
}

Martin Sustrik's avatar
Martin Sustrik committed
139
void zmq::kqueue_t::stop ()
Martin Sustrik's avatar
Martin Sustrik committed
140 141 142 143
{
    stopping = true;
}

Martin Sustrik's avatar
Martin Sustrik committed
144
void zmq::kqueue_t::loop ()
Martin Sustrik's avatar
Martin Sustrik committed
145 146 147
{
    while (!stopping) {

148
        //  Execute any due timers.
149
        int timeout = (int) execute_timers ();
Martin Sustrik's avatar
Martin Sustrik committed
150 151

        //  Wait for events.
152 153 154 155
        struct kevent ev_buf [max_io_events];
        timespec ts = {timeout / 1000, (timeout % 1000) * 1000000};
        int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events,
            timeout ? &ts: NULL);
Martin Sustrik's avatar
Martin Sustrik committed
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
        if (n == -1 && errno == EINTR)
            continue;
        errno_assert (n != -1);

        for (int i = 0; i < n; i ++) {
            poll_entry_t *pe = (poll_entry_t*) ev_buf [i].udata;

            if (pe->fd == retired_fd)
                continue;
            if (ev_buf [i].flags & EV_EOF)
                pe->reactor->in_event ();
            if (pe->fd == retired_fd)
                continue;
            if (ev_buf [i].filter == EVFILT_WRITE)
                pe->reactor->out_event ();
            if (pe->fd == retired_fd)
                continue;
            if (ev_buf [i].filter == EVFILT_READ)
                pe->reactor->in_event ();
        }

        //  Destroy retired event sources.
        for (retired_t::iterator it = retired.begin (); it != retired.end ();
179
              ++it)
Martin Sustrik's avatar
Martin Sustrik committed
180 181 182 183 184
            delete *it;
        retired.clear ();
    }
}

Martin Sustrik's avatar
Martin Sustrik committed
185
void zmq::kqueue_t::worker_routine (void *arg_)
Martin Sustrik's avatar
Martin Sustrik committed
186 187 188 189 190
{
    ((kqueue_t*) arg_)->loop ();
}

#endif