kqueue.cpp 4.81 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2010 iMatix Corporation
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
15

16
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
17 18 19 20 21
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "platform.hpp"

Martin Lucina's avatar
Martin Lucina committed
22 23
#if defined ZMQ_HAVE_FREEBSD || defined ZMQ_HAVE_OPENBSD ||\
    defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_NETBSD
Martin Sustrik's avatar
Martin Sustrik committed
24 25 26 27 28 29 30

#include <sys/time.h>
#include <sys/types.h>
#include <sys/event.h>
#include <stdlib.h>
#include <unistd.h>
#include <algorithm>
31
#include <new>
Martin Sustrik's avatar
Martin Sustrik committed
32 33 34 35 36 37

#include "kqueue.hpp"
#include "err.hpp"
#include "config.hpp"
#include "i_poll_events.hpp"

Martin Lucina's avatar
Martin Lucina committed
38 39 40 41 42 43 44 45
//  NetBSD defines (struct kevent).udata as intptr_t, everyone else
//  as void *.
#if defined ZMQ_HAVE_NETBSD
#define kevent_udata_t intptr_t
#else
#define kevent_udata_t void *
#endif

46 47
zmq::kqueue_t::kqueue_t () :
    stopping (false)
Martin Sustrik's avatar
Martin Sustrik committed
48 49 50 51 52 53
{
    //  Create event queue
    kqueue_fd = kqueue ();
    errno_assert (kqueue_fd != -1);
}

Martin Sustrik's avatar
Martin Sustrik committed
54
zmq::kqueue_t::~kqueue_t ()
Martin Sustrik's avatar
Martin Sustrik committed
55
{
56
    worker.stop ();
Martin Sustrik's avatar
Martin Sustrik committed
57 58 59
    close (kqueue_fd);
}

Martin Sustrik's avatar
Martin Sustrik committed
60
void zmq::kqueue_t::kevent_add (fd_t fd_, short filter_, void *udata_)
Martin Sustrik's avatar
Martin Sustrik committed
61 62 63
{
    struct kevent ev;

Martin Lucina's avatar
Martin Lucina committed
64
    EV_SET (&ev, fd_, filter_, EV_ADD, 0, 0, (kevent_udata_t)udata_);
Martin Sustrik's avatar
Martin Sustrik committed
65 66 67 68
    int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL);
    errno_assert (rc != -1);
}

Martin Sustrik's avatar
Martin Sustrik committed
69
void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_)
Martin Sustrik's avatar
Martin Sustrik committed
70 71 72
{
    struct kevent ev;

73
    EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, (kevent_udata_t) NULL);
Martin Sustrik's avatar
Martin Sustrik committed
74 75 76 77
    int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL);
    errno_assert (rc != -1);
}

78 79
zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_,
    i_poll_events *reactor_)
Martin Sustrik's avatar
Martin Sustrik committed
80
{
81
    poll_entry_t *pe = new (std::nothrow) poll_entry_t;
Martin Sustrik's avatar
Martin Sustrik committed
82
    zmq_assert (pe != NULL);
Martin Sustrik's avatar
Martin Sustrik committed
83 84 85 86 87 88

    pe->fd = fd_;
    pe->flag_pollin = 0;
    pe->flag_pollout = 0;
    pe->reactor = reactor_;

89 90
    adjust_load (1);

91
    return pe;
Martin Sustrik's avatar
Martin Sustrik committed
92 93
}

Martin Sustrik's avatar
Martin Sustrik committed
94
void zmq::kqueue_t::rm_fd (handle_t handle_)
Martin Sustrik's avatar
Martin Sustrik committed
95
{
96
    poll_entry_t *pe = (poll_entry_t*) handle_;
Martin Sustrik's avatar
Martin Sustrik committed
97 98 99 100 101 102
    if (pe->flag_pollin)
        kevent_delete (pe->fd, EVFILT_READ);
    if (pe->flag_pollout)
        kevent_delete (pe->fd, EVFILT_WRITE);
    pe->fd = retired_fd;
    retired.push_back (pe);
103 104

    adjust_load (-1);
Martin Sustrik's avatar
Martin Sustrik committed
105 106
}

Martin Sustrik's avatar
Martin Sustrik committed
107
void zmq::kqueue_t::set_pollin (handle_t handle_)
Martin Sustrik's avatar
Martin Sustrik committed
108
{
109
    poll_entry_t *pe = (poll_entry_t*) handle_;
Martin Sustrik's avatar
Martin Sustrik committed
110 111 112 113
    pe->flag_pollin = true;
    kevent_add (pe->fd, EVFILT_READ, pe);
}

Martin Sustrik's avatar
Martin Sustrik committed
114
void zmq::kqueue_t::reset_pollin (handle_t handle_)
Martin Sustrik's avatar
Martin Sustrik committed
115
{
116
    poll_entry_t *pe = (poll_entry_t*) handle_;
Martin Sustrik's avatar
Martin Sustrik committed
117 118 119 120
    pe->flag_pollin = false;
    kevent_delete (pe->fd, EVFILT_READ);
}

Martin Sustrik's avatar
Martin Sustrik committed
121
void zmq::kqueue_t::set_pollout (handle_t handle_)
Martin Sustrik's avatar
Martin Sustrik committed
122
{
123
    poll_entry_t *pe = (poll_entry_t*) handle_;
Martin Sustrik's avatar
Martin Sustrik committed
124 125 126 127
    pe->flag_pollout = true;
    kevent_add (pe->fd, EVFILT_WRITE, pe);
}

Martin Sustrik's avatar
Martin Sustrik committed
128
void zmq::kqueue_t::reset_pollout (handle_t handle_)
Martin Sustrik's avatar
Martin Sustrik committed
129
{
130
    poll_entry_t *pe = (poll_entry_t*) handle_;
Martin Sustrik's avatar
Martin Sustrik committed
131 132 133 134
    pe->flag_pollout = false;
    kevent_delete (pe->fd, EVFILT_WRITE);
}

Martin Sustrik's avatar
Martin Sustrik committed
135
void zmq::kqueue_t::start ()
Martin Sustrik's avatar
Martin Sustrik committed
136 137 138 139
{
    worker.start (worker_routine, this);
}

Martin Sustrik's avatar
Martin Sustrik committed
140
void zmq::kqueue_t::stop ()
Martin Sustrik's avatar
Martin Sustrik committed
141 142 143 144
{
    stopping = true;
}

Martin Sustrik's avatar
Martin Sustrik committed
145
void zmq::kqueue_t::loop ()
Martin Sustrik's avatar
Martin Sustrik committed
146 147 148
{
    while (!stopping) {

149
        //  Execute any due timers.
150
        int timeout = (int) execute_timers ();
Martin Sustrik's avatar
Martin Sustrik committed
151 152

        //  Wait for events.
153 154 155 156
        struct kevent ev_buf [max_io_events];
        timespec ts = {timeout / 1000, (timeout % 1000) * 1000000};
        int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events,
            timeout ? &ts: NULL);
Martin Sustrik's avatar
Martin Sustrik committed
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
        if (n == -1 && errno == EINTR)
            continue;
        errno_assert (n != -1);

        for (int i = 0; i < n; i ++) {
            poll_entry_t *pe = (poll_entry_t*) ev_buf [i].udata;

            if (pe->fd == retired_fd)
                continue;
            if (ev_buf [i].flags & EV_EOF)
                pe->reactor->in_event ();
            if (pe->fd == retired_fd)
                continue;
            if (ev_buf [i].filter == EVFILT_WRITE)
                pe->reactor->out_event ();
            if (pe->fd == retired_fd)
                continue;
            if (ev_buf [i].filter == EVFILT_READ)
                pe->reactor->in_event ();
        }

        //  Destroy retired event sources.
        for (retired_t::iterator it = retired.begin (); it != retired.end ();
              it ++)
            delete *it;
        retired.clear ();
    }
}

Martin Sustrik's avatar
Martin Sustrik committed
186
void zmq::kqueue_t::worker_routine (void *arg_)
Martin Sustrik's avatar
Martin Sustrik committed
187 188 189 190
{
    ((kqueue_t*) arg_)->loop ();
}

Martin Lucina's avatar
Martin Lucina committed
191 192
//  Don't pollute namespace with defines local to this file
#undef kevent_udata_t
Martin Sustrik's avatar
Martin Sustrik committed
193
#endif