async-unix-test.c++ 22.6 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21

22 23
#if !_WIN32

24 25 26
#include "async-unix.h"
#include "thread.h"
#include "debug.h"
27
#include "io.h"
28 29 30
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
31
#include <sys/socket.h>
32
#include <sys/stat.h>
33
#include <netinet/in.h>
34
#include <kj/compat/gtest.h>
35
#include <pthread.h>
36
#include <algorithm>
37
#include <sys/wait.h>
38 39
#include <sys/time.h>
#include <errno.h>
40 41

namespace kj {
42
namespace {
43 44 45

inline void delay() { usleep(10000); }

46 47 48 49 50 51 52
// On OSX, si_code seems to be zero when SI_USER is expected.
#if __linux__ || __CYGWIN__
#define EXPECT_SI_CODE EXPECT_EQ
#else
#define EXPECT_SI_CODE(a,b)
#endif

53 54 55 56 57
void captureSignals() {
  static bool captured = false;
  if (!captured) {
    captured = true;

58 59 60 61
    // We use SIGIO and SIGURG as our test signals because they're two signals that we can be
    // reasonably confident won't otherwise be delivered to any KJ or Cap'n Proto test.  We can't
    // use SIGUSR1 because it is reserved by UnixEventPort and SIGUSR2 is used by Valgrind on OSX.
    UnixEventPort::captureSignal(SIGURG);
62
    UnixEventPort::captureSignal(SIGIO);
63 64

    UnixEventPort::captureChildExit();
65
  }
66
}
67

68 69
TEST(AsyncUnixTest, Signals) {
  captureSignals();
70 71
  UnixEventPort port;
  EventLoop loop(port);
72
  WaitScope waitScope(loop);
73

74
  kill(getpid(), SIGURG);
75

76 77
  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
78
  EXPECT_SI_CODE(SI_USER, info.si_code);
79 80
}

81
#if defined(SIGRTMIN) && !__BIONIC__ && !(__linux__ && __mips__)
82
TEST(AsyncUnixTest, SignalWithValue) {
83 84
  // This tests that if we use sigqueue() to attach a value to the signal, that value is received
  // correctly.  Note that this only works on platforms that support real-time signals -- even
85
  // though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
86
  // signals.  Hence this test won't run on e.g. Mac OSX.
Kenton Varda's avatar
Kenton Varda committed
87 88
  //
  // Also, Android's bionic does not appear to support sigqueue() even though the kernel does.
89 90 91 92
  //
  // Also, this test fails on Linux on mipsel. si_value comes back as zero. No one with a mips
  // machine wants to debug the problem but they demand a patch fixing it, so we disable the test.
  // Sad. https://github.com/sandstorm-io/capnproto/issues/204
93

94
  captureSignals();
95 96
  UnixEventPort port;
  EventLoop loop(port);
97
  WaitScope waitScope(loop);
98

99
  union sigval value;
100
  memset(&value, 0, sizeof(value));
101
  value.sival_int = 123;
102 103 104 105 106 107 108 109
  KJ_SYSCALL_HANDLE_ERRORS(sigqueue(getpid(), SIGURG, value)) {
    case ENOSYS:
      // sigqueue() not supported. Maybe running on WSL.
      KJ_LOG(WARNING, "sigqueue() is not implemented by your system; skipping test");
      return;
    default:
      KJ_FAIL_SYSCALL("sigqueue(getpid(), SIGURG, value)", error);
  }
110

111 112
  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
113
  EXPECT_SI_CODE(SI_QUEUE, info.si_code);
114 115
  EXPECT_EQ(123, info.si_value.sival_int);
}
116

117
TEST(AsyncUnixTest, SignalWithPointerValue) {
118 119 120 121
  // This tests that if we use sigqueue() to attach a value to the signal, that value is received
  // correctly.  Note that this only works on platforms that support real-time signals -- even
  // though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
  // signals.  Hence this test won't run on e.g. Mac OSX.
Kenton Varda's avatar
Kenton Varda committed
122 123
  //
  // Also, Android's bionic does not appear to support sigqueue() even though the kernel does.
124 125 126 127
  //
  // Also, this test fails on Linux on mipsel. si_value comes back as zero. No one with a mips
  // machine wants to debug the problem but they demand a patch fixing it, so we disable the test.
  // Sad. https://github.com/sandstorm-io/capnproto/issues/204
128

129
  captureSignals();
130 131 132 133 134 135 136
  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);

  union sigval value;
  memset(&value, 0, sizeof(value));
  value.sival_ptr = &port;
137 138 139 140 141 142 143 144
  KJ_SYSCALL_HANDLE_ERRORS(sigqueue(getpid(), SIGURG, value)) {
    case ENOSYS:
      // sigqueue() not supported. Maybe running on WSL.
      KJ_LOG(WARNING, "sigqueue() is not implemented by your system; skipping test");
      return;
    default:
      KJ_FAIL_SYSCALL("sigqueue(getpid(), SIGURG, value)", error);
  }
145 146 147 148 149 150

  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
  EXPECT_SI_CODE(SI_QUEUE, info.si_code);
  EXPECT_EQ(&port, info.si_value.sival_ptr);
}
151
#endif
152

153 154
TEST(AsyncUnixTest, SignalsMultiListen) {
  captureSignals();
155 156
  UnixEventPort port;
  EventLoop loop(port);
157
  WaitScope waitScope(loop);
158

159
  port.onSignal(SIGIO).then([](siginfo_t&&) {
160
    KJ_FAIL_EXPECT("Received wrong signal.");
161
  }).detach([](kj::Exception&& exception) {
162
    KJ_FAIL_EXPECT(exception);
163
  });
164

165
  kill(getpid(), SIGURG);
166

167 168
  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
169
  EXPECT_SI_CODE(SI_USER, info.si_code);
170 171
}

172 173 174 175 176
#if !__CYGWIN32__
// Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does
// deliver SIGIO, if you reverse the order of the waits).  Since this doesn't occur on any other
// platform I'm assuming it's a Cygwin bug.

177 178
TEST(AsyncUnixTest, SignalsMultiReceive) {
  captureSignals();
179 180
  UnixEventPort port;
  EventLoop loop(port);
181
  WaitScope waitScope(loop);
182

183
  kill(getpid(), SIGURG);
184 185
  kill(getpid(), SIGIO);

186 187
  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
188 189
  EXPECT_SI_CODE(SI_USER, info.si_code);

190
  info = port.onSignal(SIGIO).wait(waitScope);
191 192 193 194
  EXPECT_EQ(SIGIO, info.si_signo);
  EXPECT_SI_CODE(SI_USER, info.si_code);
}

195 196
#endif  // !__CYGWIN32__

197 198
TEST(AsyncUnixTest, SignalsAsync) {
  captureSignals();
199 200
  UnixEventPort port;
  EventLoop loop(port);
201
  WaitScope waitScope(loop);
202 203 204

  // Arrange for a signal to be sent from another thread.
  pthread_t mainThread = pthread_self();
205
  Thread thread([&]() {
206
    delay();
207
    pthread_kill(mainThread, SIGURG);
208
  });
209

210 211
  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
212
#if __linux__
213
  EXPECT_SI_CODE(SI_TKILL, info.si_code);
214
#endif
215 216
}

217 218 219 220 221
#if !__CYGWIN32__
// Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does
// deliver SIGIO, if you reverse the order of the waits).  Since this doesn't occur on any other
// platform I'm assuming it's a Cygwin bug.

222
TEST(AsyncUnixTest, SignalsNoWait) {
223 224
  // Verify that UnixEventPort::poll() correctly receives pending signals.

225
  captureSignals();
226 227
  UnixEventPort port;
  EventLoop loop(port);
228
  WaitScope waitScope(loop);
229

230
  bool receivedSigurg = false;
231
  bool receivedSigio = false;
232 233 234
  port.onSignal(SIGURG).then([&](siginfo_t&& info) {
    receivedSigurg = true;
    EXPECT_EQ(SIGURG, info.si_signo);
235
    EXPECT_SI_CODE(SI_USER, info.si_code);
236
  }).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); });
237 238 239 240
  port.onSignal(SIGIO).then([&](siginfo_t&& info) {
    receivedSigio = true;
    EXPECT_EQ(SIGIO, info.si_signo);
    EXPECT_SI_CODE(SI_USER, info.si_code);
241
  }).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); });
242

243
  kill(getpid(), SIGURG);
244 245
  kill(getpid(), SIGIO);

246
  EXPECT_FALSE(receivedSigurg);
247 248 249 250
  EXPECT_FALSE(receivedSigio);

  loop.run();

251
  EXPECT_FALSE(receivedSigurg);
252 253 254 255
  EXPECT_FALSE(receivedSigio);

  port.poll();

256
  EXPECT_FALSE(receivedSigurg);
257 258 259 260
  EXPECT_FALSE(receivedSigio);

  loop.run();

261
  EXPECT_TRUE(receivedSigurg);
262 263 264
  EXPECT_TRUE(receivedSigio);
}

265 266
#endif  // !__CYGWIN32__

267 268
TEST(AsyncUnixTest, ReadObserver) {
  captureSignals();
269 270
  UnixEventPort port;
  EventLoop loop(port);
271
  WaitScope waitScope(loop);
272 273 274

  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
275 276
  kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);

277
  UnixEventPort::FdObserver observer(port, infd, UnixEventPort::FdObserver::OBSERVE_READ);
278 279 280

  KJ_SYSCALL(write(outfd, "foo", 3));

281
  observer.whenBecomesReadable().wait(waitScope);
282 283

#if __linux__  // platform known to support POLLRDHUP
284
  EXPECT_FALSE(KJ_ASSERT_NONNULL(observer.atEndHint()));
285

286 287 288 289 290 291 292 293
  char buffer[4096];
  ssize_t n;
  KJ_SYSCALL(n = read(infd, &buffer, sizeof(buffer)));
  EXPECT_EQ(3, n);

  KJ_SYSCALL(write(outfd, "bar", 3));
  outfd = nullptr;

294
  observer.whenBecomesReadable().wait(waitScope);
295

296
  EXPECT_TRUE(KJ_ASSERT_NONNULL(observer.atEndHint()));
297
#endif
298 299
}

300 301
TEST(AsyncUnixTest, ReadObserverMultiListen) {
  captureSignals();
302 303
  UnixEventPort port;
  EventLoop loop(port);
304
  WaitScope waitScope(loop);
305 306 307 308 309

  int bogusPipefds[2];
  KJ_SYSCALL(pipe(bogusPipefds));
  KJ_DEFER({ close(bogusPipefds[1]); close(bogusPipefds[0]); });

310 311 312 313
  UnixEventPort::FdObserver bogusObserver(port, bogusPipefds[0],
      UnixEventPort::FdObserver::OBSERVE_READ);

  bogusObserver.whenBecomesReadable().then([]() {
314
    ADD_FAILURE() << "Received wrong poll.";
315
  }).detach([](kj::Exception&& exception) {
316 317
    ADD_FAILURE() << kj::str(exception).cStr();
  });
318 319 320 321

  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
  KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
322

323 324
  UnixEventPort::FdObserver observer(port, pipefds[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
325 326
  KJ_SYSCALL(write(pipefds[1], "foo", 3));

327
  observer.whenBecomesReadable().wait(waitScope);
328 329
}

330 331
TEST(AsyncUnixTest, ReadObserverMultiReceive) {
  captureSignals();
332 333
  UnixEventPort port;
  EventLoop loop(port);
334
  WaitScope waitScope(loop);
335 336 337 338

  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
  KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
339

340 341
  UnixEventPort::FdObserver observer(port, pipefds[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
342 343 344 345 346
  KJ_SYSCALL(write(pipefds[1], "foo", 3));

  int pipefds2[2];
  KJ_SYSCALL(pipe(pipefds2));
  KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
347

348 349
  UnixEventPort::FdObserver observer2(port, pipefds2[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
350 351
  KJ_SYSCALL(write(pipefds2[1], "bar", 3));

352 353 354 355
  auto promise1 = observer.whenBecomesReadable();
  auto promise2 = observer2.whenBecomesReadable();
  promise1.wait(waitScope);
  promise2.wait(waitScope);
356 357
}

358 359
TEST(AsyncUnixTest, ReadObserverAsync) {
  captureSignals();
360 361
  UnixEventPort port;
  EventLoop loop(port);
362
  WaitScope waitScope(loop);
363

364
  // Make a pipe and wait on its read end while another thread writes to it.
365 366
  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
367
  KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
368 369
  UnixEventPort::FdObserver observer(port, pipefds[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
370

371 372 373
  Thread thread([&]() {
    delay();
    KJ_SYSCALL(write(pipefds[1], "foo", 3));
374 375
  });

376
  // Wait for the event in this thread.
377
  observer.whenBecomesReadable().wait(waitScope);
378 379
}

380
TEST(AsyncUnixTest, ReadObserverNoWait) {
381 382
  // Verify that UnixEventPort::poll() correctly receives pending FD events.

383
  captureSignals();
384 385
  UnixEventPort port;
  EventLoop loop(port);
386
  WaitScope waitScope(loop);
387 388 389 390

  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
  KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
391 392
  UnixEventPort::FdObserver observer(port, pipefds[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
393 394 395 396

  int pipefds2[2];
  KJ_SYSCALL(pipe(pipefds2));
  KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
397 398
  UnixEventPort::FdObserver observer2(port, pipefds2[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
399 400

  int receivedCount = 0;
401
  observer.whenBecomesReadable().then([&]() {
402
    receivedCount++;
403
  }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
404
  observer2.whenBecomesReadable().then([&]() {
405
    receivedCount++;
406
  }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423

  KJ_SYSCALL(write(pipefds[1], "foo", 3));
  KJ_SYSCALL(write(pipefds2[1], "bar", 3));

  EXPECT_EQ(0, receivedCount);

  loop.run();

  EXPECT_EQ(0, receivedCount);

  port.poll();

  EXPECT_EQ(0, receivedCount);

  loop.run();

  EXPECT_EQ(2, receivedCount);
424 425
}

426 427 428 429 430 431 432 433
static void setNonblocking(int fd) {
  int flags;
  KJ_SYSCALL(flags = fcntl(fd, F_GETFL));
  if ((flags & O_NONBLOCK) == 0) {
    KJ_SYSCALL(fcntl(fd, F_SETFL, flags | O_NONBLOCK));
  }
}

434 435
TEST(AsyncUnixTest, WriteObserver) {
  captureSignals();
436 437 438 439 440 441 442 443
  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);

  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
  kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
  setNonblocking(outfd);
444
  setNonblocking(infd);
445

446
  UnixEventPort::FdObserver observer(port, outfd, UnixEventPort::FdObserver::OBSERVE_WRITE);
447 448 449 450 451 452 453 454

  // Fill buffer.
  ssize_t n;
  do {
    KJ_NONBLOCKING_SYSCALL(n = write(outfd, "foo", 3));
  } while (n >= 0);

  bool writable = false;
455
  auto promise = observer.whenBecomesWritable()
456 457 458 459 460 461 462 463
      .then([&]() { writable = true; }).eagerlyEvaluate(nullptr);

  loop.run();
  port.poll();
  loop.run();

  EXPECT_FALSE(writable);

464 465 466 467
  // Empty the read end so that the write end becomes writable. Note that Linux implements a
  // high watermark / low watermark heuristic which means that only reading one byte is not
  // sufficient. The amount we have to read is in fact architecture-dependent -- it appears to be
  // 1 page. To be safe, we read everything.
468
  char buffer[4096];
469
  do {
470
    KJ_NONBLOCKING_SYSCALL(n = read(infd, &buffer, sizeof(buffer)));
471
  } while (n > 0);
472 473 474 475 476 477 478 479

  loop.run();
  port.poll();
  loop.run();

  EXPECT_TRUE(writable);
}

480 481
#if !__APPLE__
// Disabled on macOS due to https://github.com/sandstorm-io/capnproto/issues/374.
482 483 484
TEST(AsyncUnixTest, UrgentObserver) {
  // Verify that FdObserver correctly detects availability of out-of-band data.
  // Availability of out-of-band data is implementation-specific.
485 486
  // Linux's and OS X's TCP/IP stack supports out-of-band messages for TCP sockets, which is used
  // for this test.
487 488 489 490 491

  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);
  int tmpFd;
492
  char c;
493

494
  // Spawn a TCP server
495 496
  KJ_SYSCALL(tmpFd = socket(AF_INET, SOCK_STREAM, 0));
  kj::AutoCloseFd serverFd(tmpFd);
497 498
  sockaddr_in saddr;
  memset(&saddr, 0, sizeof(saddr));
499 500 501
  saddr.sin_family = AF_INET;
  saddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
  KJ_SYSCALL(bind(serverFd, reinterpret_cast<sockaddr*>(&saddr), sizeof(saddr)));
502 503
  socklen_t saddrLen = sizeof(saddr);
  KJ_SYSCALL(getsockname(serverFd, reinterpret_cast<sockaddr*>(&saddr), &saddrLen));
504 505
  KJ_SYSCALL(listen(serverFd, 1));

506 507 508 509 510 511 512 513
  // Create a pipe that we'll use to signal if MSG_OOB return EINVAL.
  int failpipe[2];
  KJ_SYSCALL(pipe(failpipe));
  KJ_DEFER({
    close(failpipe[0]);
    close(failpipe[1]);
  });

514
  // Accept one connection, send in-band and OOB byte, wait for a quit message
515
  Thread thread([&]() {
516 517 518
    int tmpFd;
    char c;

519 520 521 522 523
    sockaddr_in caddr;
    socklen_t caddrLen = sizeof(caddr);
    KJ_SYSCALL(tmpFd = accept(serverFd, reinterpret_cast<sockaddr*>(&caddr), &caddrLen));
    kj::AutoCloseFd clientFd(tmpFd);
    delay();
524 525 526 527 528

    // Workaround: OS X won't signal POLLPRI without POLLIN. Also enqueue some in-band data.
    c = 'i';
    KJ_SYSCALL(send(clientFd, &c, 1, 0));
    c = 'o';
529 530 531 532 533 534 535 536
    KJ_SYSCALL_HANDLE_ERRORS(send(clientFd, &c, 1, MSG_OOB)) {
      case EINVAL:
        // Looks like MSG_OOB is not supported. (This is the case e.g. on WSL.)
        KJ_SYSCALL(write(failpipe[1], &c, 1));
        break;
      default:
        KJ_FAIL_SYSCALL("send(..., MSG_OOB)", error);
    }
537

538
    KJ_SYSCALL(recv(clientFd, &c, 1, 0));
539
    EXPECT_EQ('q', c);
540
  });
541
  KJ_DEFER({ shutdown(serverFd, SHUT_RDWR); serverFd = nullptr; });
542 543 544

  KJ_SYSCALL(tmpFd = socket(AF_INET, SOCK_STREAM, 0));
  kj::AutoCloseFd clientFd(tmpFd);
545
  KJ_SYSCALL(connect(clientFd, reinterpret_cast<sockaddr*>(&saddr), saddrLen));
546

547 548
  UnixEventPort::FdObserver observer(port, clientFd,
      UnixEventPort::FdObserver::OBSERVE_READ | UnixEventPort::FdObserver::OBSERVE_URGENT);
549 550
  UnixEventPort::FdObserver failObserver(port, failpipe[0],
      UnixEventPort::FdObserver::OBSERVE_READ | UnixEventPort::FdObserver::OBSERVE_URGENT);
551

552 553
  auto promise = observer.whenUrgentDataAvailable().then([]() { return true; });
  auto failPromise = failObserver.whenBecomesReadable().then([]() { return false; });
Kenton Varda's avatar
Kenton Varda committed
554

555 556
  bool oobSupported = promise.exclusiveJoin(kj::mv(failPromise)).wait(waitScope);
  if (oobSupported) {
Kenton Varda's avatar
Kenton Varda committed
557
#if __CYGWIN__
558 559 560 561 562 563 564
    // On Cygwin, reading the urgent byte first causes the subsequent regular read to block until
    // such a time as the connection closes -- and then the byte is successfully returned. This
    // seems to be a cygwin bug.
    KJ_SYSCALL(recv(clientFd, &c, 1, 0));
    EXPECT_EQ('i', c);
    KJ_SYSCALL(recv(clientFd, &c, 1, MSG_OOB));
    EXPECT_EQ('o', c);
Kenton Varda's avatar
Kenton Varda committed
565
#else
566 567 568 569 570
    // Attempt to read the urgent byte prior to reading the in-band byte.
    KJ_SYSCALL(recv(clientFd, &c, 1, MSG_OOB));
    EXPECT_EQ('o', c);
    KJ_SYSCALL(recv(clientFd, &c, 1, 0));
    EXPECT_EQ('i', c);
Kenton Varda's avatar
Kenton Varda committed
571
#endif
572 573 574
  } else {
    KJ_LOG(WARNING, "MSG_OOB doesn't seem to be supported on your platform.");
  }
575

576 577
  // Allow server thread to let its clientFd go out of scope.
  c = 'q';
578
  KJ_SYSCALL(send(clientFd, &c, 1, 0));
579
  KJ_SYSCALL(shutdown(clientFd, SHUT_RDWR));
580
}
581
#endif
582

583 584
TEST(AsyncUnixTest, SteadyTimers) {
  captureSignals();
585 586 587 588
  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);

589 590 591
  auto& timer = port.getTimer();

  auto start = timer.now();
592 593
  kj::Vector<TimePoint> expected;
  kj::Vector<TimePoint> actual;
594

595 596
  auto addTimer = [&](Duration delay) {
    expected.add(max(start + delay, start));
597 598
    timer.atTime(start + delay).then([&]() {
      actual.add(timer.now());
599 600 601
    }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
  };

602 603 604 605 606
  addTimer(30 * MILLISECONDS);
  addTimer(40 * MILLISECONDS);
  addTimer(20350 * MICROSECONDS);
  addTimer(30 * MILLISECONDS);
  addTimer(-10 * MILLISECONDS);
607 608

  std::sort(expected.begin(), expected.end());
609
  timer.atTime(expected.back() + MILLISECONDS).wait(waitScope);
610 611 612

  ASSERT_EQ(expected.size(), actual.size());
  for (int i = 0; i < expected.size(); ++i) {
613 614
    KJ_EXPECT(expected[i] <= actual[i], "Actual time for timer i is too early.",
              i, ((expected[i] - actual[i]) / NANOSECONDS));
615 616 617
  }
}

618 619 620 621 622 623 624 625 626 627 628
bool dummySignalHandlerCalled = false;
void dummySignalHandler(int) {
  dummySignalHandlerCalled = true;
}

TEST(AsyncUnixTest, InterruptedTimer) {
  captureSignals();
  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);

629 630 631 632 633 634
#if __linux__
  // Linux timeslices are 1ms.
  constexpr auto OS_SLOWNESS_FACTOR = 1;
#else
  // OSX timeslices are 10ms, so we need longer timeouts to avoid flakiness.
  // To be safe we'll assume other OS's are similar.
635
  constexpr auto OS_SLOWNESS_FACTOR = 10;
636 637
#endif

638
  // Schedule a timer event in 100ms.
639 640
  auto& timer = port.getTimer();
  auto start = timer.now();
641
  constexpr auto timeout = 100 * MILLISECONDS * OS_SLOWNESS_FACTOR;
642

643
  // Arrange SIGALRM to be delivered in 50ms, handled in an empty signal handler. This will cause
644 645 646 647 648 649 650 651
  // our wait to be interrupted with EINTR. We should nevertheless continue waiting for the right
  // amount of time.
  dummySignalHandlerCalled = false;
  if (signal(SIGALRM, &dummySignalHandler) == SIG_ERR) {
    KJ_FAIL_SYSCALL("signal(SIGALRM)", errno);
  }
  struct itimerval itv;
  memset(&itv, 0, sizeof(itv));
652
  itv.it_value.tv_usec = 50000 * OS_SLOWNESS_FACTOR;  // signal after 50ms
653 654 655 656 657 658
  setitimer(ITIMER_REAL, &itv, nullptr);

  timer.afterDelay(timeout).wait(waitScope);

  KJ_EXPECT(dummySignalHandlerCalled);
  KJ_EXPECT(timer.now() - start >= timeout);
659
  KJ_EXPECT(timer.now() - start <= timeout + (timeout / 5));  // allow 20ms error
660 661
}

662 663
TEST(AsyncUnixTest, Wake) {
  captureSignals();
664 665 666 667 668 669 670 671 672 673 674 675 676
  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);

  EXPECT_FALSE(port.poll());
  port.wake();
  EXPECT_TRUE(port.poll());
  EXPECT_FALSE(port.poll());

  port.wake();
  EXPECT_TRUE(port.wait());

  {
677
    auto promise = port.getTimer().atTime(port.getTimer().now());
678 679 680 681 682 683 684 685 686 687 688 689 690
    EXPECT_FALSE(port.wait());
  }

  bool woken = false;
  Thread thread([&]() {
    delay();
    woken = true;
    port.wake();
  });

  EXPECT_TRUE(port.wait());
}

691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756
int exitCodeForSignal = 0;
void exitSignalHandler(int) {
  _exit(exitCodeForSignal);
}

struct TestChild {
  kj::Maybe<pid_t> pid;
  kj::Promise<int> promise = nullptr;

  TestChild(UnixEventPort& port, int exitCode) {
    pid_t p;
    KJ_SYSCALL(p = fork());
    if (p == 0) {
      // Arrange for SIGTERM to cause the process to exit normally.
      exitCodeForSignal = exitCode;
      signal(SIGTERM, &exitSignalHandler);
      sigset_t sigs;
      sigemptyset(&sigs);
      sigaddset(&sigs, SIGTERM);
      sigprocmask(SIG_UNBLOCK, &sigs, nullptr);

      for (;;) pause();
    }
    pid = p;
    promise = port.onChildExit(pid);
  }

  ~TestChild() noexcept(false) {
    KJ_IF_MAYBE(p, pid) {
      KJ_SYSCALL(::kill(*p, SIGKILL)) { return; }
      int status;
      KJ_SYSCALL(waitpid(*p, &status, 0)) { return; }
    }
  }

  void kill(int signo) {
    KJ_SYSCALL(::kill(KJ_REQUIRE_NONNULL(pid), signo));
  }

  KJ_DISALLOW_COPY(TestChild);
};

TEST(AsyncUnixTest, ChildProcess) {
  captureSignals();
  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);

  // Block SIGTERM so that we can carefully un-block it in children.
  sigset_t sigs, oldsigs;
  KJ_SYSCALL(sigemptyset(&sigs));
  KJ_SYSCALL(sigaddset(&sigs, SIGTERM));
  KJ_SYSCALL(sigprocmask(SIG_BLOCK, &sigs, &oldsigs));
  KJ_DEFER(KJ_SYSCALL(sigprocmask(SIG_SETMASK, &oldsigs, nullptr)) { break; });

  TestChild child1(port, 123);
  KJ_EXPECT(!child1.promise.poll(waitScope));

  child1.kill(SIGTERM);

  {
    int status = child1.promise.wait(waitScope);
    KJ_EXPECT(WIFEXITED(status));
    KJ_EXPECT(WEXITSTATUS(status) == 123);
  }

757 758 759
  TestChild child2(port, 234);
  TestChild child3(port, 345);

760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776
  KJ_EXPECT(!child2.promise.poll(waitScope));
  KJ_EXPECT(!child3.promise.poll(waitScope));

  child2.kill(SIGKILL);

  {
    int status = child2.promise.wait(waitScope);
    KJ_EXPECT(!WIFEXITED(status));
    KJ_EXPECT(WIFSIGNALED(status));
    KJ_EXPECT(WTERMSIG(status) == SIGKILL);
  }

  KJ_EXPECT(!child3.promise.poll(waitScope));

  // child3 will be killed and synchronously waited on the way out.
}

777
}  // namespace
778
}  // namespace kj
779 780

#endif  // !_WIN32