async-unix.h 14 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21

22
#pragma once
23

24 25 26 27
#if _WIN32
#error "This file is Unix-specific. On Windows, include async-win32.h instead."
#endif

28 29 30 31
#if defined(__GNUC__) && !KJ_HEADER_WARNINGS
#pragma GCC system_header
#endif

32
#include "async.h"
33
#include "timer.h"
34
#include "vector.h"
35
#include "io.h"
36
#include <signal.h>
37

Kenton Varda's avatar
Kenton Varda committed
38 39
#if __linux__ && !__BIONIC__ && !defined(KJ_USE_EPOLL)
// Default to epoll on Linux, except on Bionic (Android) which doesn't have signalfd.h.
40 41
#define KJ_USE_EPOLL 1
#endif
42 43 44

namespace kj {

45 46
class UnixEventPort: public EventPort {
  // An EventPort implementation which can wait for events on file descriptors as well as signals.
47 48 49 50 51 52 53
  // This API only makes sense on Unix.
  //
  // The implementation uses `poll()` or possibly a platform-specific API (e.g. epoll, kqueue).
  // To also wait on signals without race conditions, the implementation may block signals until
  // just before `poll()` while using a signal handler which `siglongjmp()`s back to just before
  // the signal was unblocked, or it may use a nicer platform-specific API like signalfd.
  //
54 55 56
  // The implementation reserves a signal for internal use.  By default, it uses SIGUSR1.  If you
  // need to use SIGUSR1 for something else, you must offer a different signal by calling
  // setReservedSignal() at startup.
57 58 59 60 61 62
  //
  // WARNING: A UnixEventPort can only be used in the thread and process that created it. In
  //   particular, note that after a fork(), a UnixEventPort created in the parent process will
  //   not work correctly in the child, even if the parent ceases to use its copy. In particular
  //   note that this means that server processes which daemonize themselves at startup must wait
  //   until after daemonization to create a UnixEventPort.
63 64

public:
65
  UnixEventPort();
66
  ~UnixEventPort() noexcept(false);
67

68 69
  class FdObserver;
  // Class that watches an fd for readability or writability. See definition below.
70

71
  Promise<siginfo_t> onSignal(int signum);
72 73 74 75 76 77 78 79 80 81 82
  // When the given signal is delivered to this thread, return the corresponding siginfo_t.
  // The signal must have been captured using `captureSignal()`.
  //
  // If `onSignal()` has not been called, the signal will remain blocked in this thread.
  // Therefore, a signal which arrives before `onSignal()` was called will not be "missed" -- the
  // next call to 'onSignal()' will receive it.  Also, you can control which thread receives a
  // process-wide signal by only calling `onSignal()` on that thread's event loop.
  //
  // The result of waiting on the same signal twice at once is undefined.

  static void captureSignal(int signum);
83
  // Arranges for the given signal to be captured and handled via UnixEventPort, so that you may
84 85 86 87 88 89 90 91 92 93
  // then pass it to `onSignal()`.  This method is static because it registers a signal handler
  // which applies process-wide.  If any other threads exist in the process when `captureSignal()`
  // is called, you *must* set the signal mask in those threads to block this signal, otherwise
  // terrible things will happen if the signal happens to be delivered to those threads.  If at
  // all possible, call `captureSignal()` *before* creating threads, so that threads you create in
  // the future will inherit the proper signal mask.
  //
  // To un-capture a signal, simply install a different signal handler and then un-block it from
  // the signal mask.

94 95 96 97 98
  static void setReservedSignal(int signum);
  // Sets the signal number which `UnixEventPort` reserves for internal use.  If your application
  // needs to use SIGUSR1, call this at startup (before any calls to `captureSignal()` and before
  // constructing an `UnixEventPort`) to offer a different signal.

99
  Timer& getTimer() { return timerImpl; }
100

101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
  Promise<int> onChildExit(Maybe<pid_t>& pid);
  // When the given child process exits, resolves to its wait status, as returned by wait(2). You
  // will need to use the WIFEXITED() etc. macros to interpret the status code.
  //
  // You must call onChildExit() immediately after the child is created, before returning to the
  // event loop. Otherwise, you may miss the child exit event.
  //
  // `pid` is a reference to a Maybe<pid_t> which must be non-null at the time of the call. When
  // wait() is invoked (and indicates this pid has finished), `pid` will be nulled out. This is
  // necessary to avoid a race condition: as soon as the child has been wait()ed, the PID table
  // entry is freed and can then be reused. So, if you ever want safely to call `kill()` on the
  // PID, it's necessary to know whether it has been wait()ed already. Since the promise's
  // .then() continuation may not run immediately, we need a more precise way, hence we null out
  // the Maybe.
  //
  // You must call `kj::UnixEventPort::captureChildExit()` early in your program if you want to use
  // `onChildExit()`.
  //
  // WARNING: Only one UnixEventPort per process is allowed to use onChildExit(). This is because
  //   child exit is signaled to the process via SIGCHLD, and Unix does not allow the program to
  //   control which thread receives the signal. (We may fix this in the future by automatically
  //   coordinating between threads when multiple threads are expecting child exits.)
  // WARNING 2: If any UnixEventPort in the process is currently waiting for onChildExit(), then
  //   *only* that port's thread can safely wait on child processes, even synchronously. This is
  //   because the thread which used onChildExit() uses wait() to reap children, without specifying
  //   which child, and therefore it may inadvertently reap children created by other threads.

  static void captureChildExit();
129
  // Arranges for child process exit to be captured and handled via UnixEventPort, so that you may
130 131 132 133 134 135
  // call `onChildExit()`. Much like `captureSignal()`, this static method must be called early on
  // in program startup.
  //
  // This method may capture the `SIGCHLD` signal. You must not use `captureSignal(SIGCHLD)` nor
  // `onSignal(SIGCHLD)` in your own code if you use `captureChildExit()`.

136
  // implements EventPort ------------------------------------------------------
137 138 139
  bool wait() override;
  bool poll() override;
  void wake() const override;
140 141

private:
142
  class SignalPromiseAdapter;
143
  class ChildExitPromiseAdapter;
144

145
  TimerImpl timerImpl;
146

147 148
  SignalPromiseAdapter* signalHead = nullptr;
  SignalPromiseAdapter** signalTail = &signalHead;
149

150
  TimePoint readClock();
151
  void gotSignal(const siginfo_t& siginfo);
152 153

  friend class TimerPromiseAdapter;
154 155 156 157 158 159 160 161 162 163

#if KJ_USE_EPOLL
  AutoCloseFd epollFd;
  AutoCloseFd signalFd;
  AutoCloseFd eventFd;   // Used for cross-thread wakeups.

  sigset_t signalFdSigset;
  // Signal mask as currently set on the signalFd. Tracked so we can detect whether or not it
  // needs updating.

164
  bool doEpollWait(int timeout);
165 166 167 168 169 170

#else
  class PollContext;

  FdObserver* observersHead = nullptr;
  FdObserver** observersTail = &observersHead;
171 172

  unsigned long long threadId;  // actually pthread_t
173
#endif
174 175 176

  struct ChildSet;
  Maybe<Own<ChildSet>> childSet;
177 178
};

179 180
class UnixEventPort::FdObserver {
  // Object which watches a file descriptor to determine when it is readable or writable.
181 182 183 184 185
  //
  // For listen sockets, "readable" means that there is a connection to accept(). For everything
  // else, it means that read() (or recv()) will return data.
  //
  // The presence of out-of-band data should NOT fire this event. However, the event may
David Renshaw's avatar
David Renshaw committed
186 187
  // occasionally fire spuriously (when there is actually no data to read), and one thing that can
  // cause such spurious events is the arrival of OOB data on certain platforms whose event
188 189 190 191 192 193 194
  // interfaces fail to distinguish between regular and OOB data (e.g. Mac OSX).
  //
  // WARNING: The exact behavior of this class differs across systems, since event interfaces
  //   vary wildly. Be sure to read the documentation carefully and avoid depending on unspecified
  //   behavior. If at all possible, use the higher-level AsyncInputStream interface instead.

public:
195 196 197
  enum Flags {
    OBSERVE_READ = 1,
    OBSERVE_WRITE = 2,
198
    OBSERVE_URGENT = 4,
199 200 201 202
    OBSERVE_READ_WRITE = OBSERVE_READ | OBSERVE_WRITE
  };

  FdObserver(UnixEventPort& eventPort, int fd, uint flags);
203 204 205
  // Begin watching the given file descriptor for readability. Only one ReadObserver may exist
  // for a given file descriptor at a time.

206 207 208
  ~FdObserver() noexcept(false);

  KJ_DISALLOW_COPY(FdObserver);
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268

  Promise<void> whenBecomesReadable();
  // Resolves the next time the file descriptor transitions from having no data to read to having
  // some data to read.
  //
  // KJ uses "edge-triggered" event notification whenever possible. As a result, it is an error
  // to call this method when there is already data in the read buffer which has been there since
  // prior to the last turn of the event loop or prior to creation FdWatcher. In this case, it is
  // unspecified whether the promise will ever resolve -- it depends on the underlying event
  // mechanism being used.
  //
  // In order to avoid this problem, make sure that you only call `whenBecomesReadable()`
  // only at times when you know the buffer is empty. You know this for sure when one of the
  // following happens:
  // * read() or recv() fails with EAGAIN or EWOULDBLOCK. (You MUST have non-blocking mode
  //   enabled on the fd!)
  // * The file descriptor is a regular byte-oriented object (like a socket or pipe),
  //   read() or recv() returns fewer than the number of bytes requested, and `atEndHint()`
  //   returns false. This can only happen if the buffer is empty but EOF is not reached. (Note,
  //   though, that for record-oriented file descriptors like Linux's inotify interface, this
  //   rule does not hold, because it could simply be that the next record did not fit into the
  //   space available.)
  //
  // It is an error to call `whenBecomesReadable()` again when the promise returned previously
  // has not yet resolved. If you do this, the previous promise may throw an exception.

  inline Maybe<bool> atEndHint() { return atEnd; }
  // Returns true if the event system has indicated that EOF has been received. There may still
  // be data in the read buffer, but once that is gone, there's nothing left.
  //
  // Returns false if the event system has indicated that EOF had NOT been received as of the
  // last turn of the event loop.
  //
  // Returns nullptr if the event system does not know whether EOF has been reached. In this
  // case, the only way to know for sure is to call read() or recv() and check if it returns
  // zero.
  //
  // This hint may be useful as an optimization to avoid an unnecessary system call.

  Promise<void> whenBecomesWritable();
  // Resolves the next time the file descriptor transitions from having no space available in the
  // write buffer to having some space available.
  //
  // KJ uses "edge-triggered" event notification whenever possible. As a result, it is an error
  // to call this method when there is already space in the write buffer which has been there
  // since prior to the last turn of the event loop or prior to creation FdWatcher. In this case,
  // it is unspecified whether the promise will ever resolve -- it depends on the underlying
  // event mechanism being used.
  //
  // In order to avoid this problem, make sure that you only call `whenBecomesWritable()`
  // only at times when you know the buffer is full. You know this for sure when one of the
  // following happens:
  // * write() or send() fails with EAGAIN or EWOULDBLOCK. (You MUST have non-blocking mode
  //   enabled on the fd!)
  // * write() or send() succeeds but accepts fewer than the number of bytes provided. This can
  //   only happen if the buffer is full.
  //
  // It is an error to call `whenBecomesWritable()` again when the promise returned previously
  // has not yet resolved. If you do this, the previous promise may throw an exception.

269 270 271 272 273 274 275 276
  Promise<void> whenUrgentDataAvailable();
  // Resolves the next time the file descriptor's read buffer contains "urgent" data.
  //
  // The conditions for availability of urgent data are specific to the file descriptor's
  // underlying implementation.
  //
  // It is an error to call `whenUrgentDataAvailable()` again when the promise returned previously
  // has not yet resolved. If you do this, the previous promise may throw an exception.
277 278 279
  //
  // WARNING: This has some known weird behavior on macOS. See
  //   https://github.com/sandstorm-io/capnproto/issues/374.
280

281 282 283
private:
  UnixEventPort& eventPort;
  int fd;
284
  uint flags;
285

286 287
  kj::Maybe<Own<PromiseFulfiller<void>>> readFulfiller;
  kj::Maybe<Own<PromiseFulfiller<void>>> writeFulfiller;
288
  kj::Maybe<Own<PromiseFulfiller<void>>> urgentFulfiller;
289 290 291 292 293 294 295 296 297 298 299 300 301 302
  // Replaced each time `whenBecomesReadable()` or `whenBecomesWritable()` is called. Reverted to
  // null every time an event is fired.

  Maybe<bool> atEnd;

  void fire(short events);

#if !KJ_USE_EPOLL
  FdObserver* next;
  FdObserver** prev;
  // Linked list of observers which currently have a non-null readFulfiller or writeFulfiller.
  // If `prev` is null then the observer is not currently in the list.

  short getEventMask();
303 304 305
#endif

  friend class UnixEventPort;
306 307 308
};

}  // namespace kj