async-unix-test.c++ 21.1 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21

22 23
#if !_WIN32

24 25 26
#include "async-unix.h"
#include "thread.h"
#include "debug.h"
27
#include "io.h"
28 29 30
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
31
#include <sys/socket.h>
32
#include <sys/stat.h>
33
#include <netinet/in.h>
34
#include <kj/compat/gtest.h>
35
#include <pthread.h>
36
#include <algorithm>
37
#include <sys/wait.h>
38 39
#include <sys/time.h>
#include <errno.h>
40 41

namespace kj {
42
namespace {
43 44 45

inline void delay() { usleep(10000); }

46 47 48 49 50 51 52
// On OSX, si_code seems to be zero when SI_USER is expected.
#if __linux__ || __CYGWIN__
#define EXPECT_SI_CODE EXPECT_EQ
#else
#define EXPECT_SI_CODE(a,b)
#endif

53 54 55 56 57
void captureSignals() {
  static bool captured = false;
  if (!captured) {
    captured = true;

58 59 60 61
    // We use SIGIO and SIGURG as our test signals because they're two signals that we can be
    // reasonably confident won't otherwise be delivered to any KJ or Cap'n Proto test.  We can't
    // use SIGUSR1 because it is reserved by UnixEventPort and SIGUSR2 is used by Valgrind on OSX.
    UnixEventPort::captureSignal(SIGURG);
62
    UnixEventPort::captureSignal(SIGIO);
63 64

    UnixEventPort::captureChildExit();
65
  }
66
}
67

68 69
TEST(AsyncUnixTest, Signals) {
  captureSignals();
70 71
  UnixEventPort port;
  EventLoop loop(port);
72
  WaitScope waitScope(loop);
73

74
  kill(getpid(), SIGURG);
75

76 77
  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
78
  EXPECT_SI_CODE(SI_USER, info.si_code);
79 80
}

81
#if defined(SIGRTMIN) && !__BIONIC__ && !(__linux__ && __mips__)
82
TEST(AsyncUnixTest, SignalWithValue) {
83 84
  // This tests that if we use sigqueue() to attach a value to the signal, that value is received
  // correctly.  Note that this only works on platforms that support real-time signals -- even
85
  // though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
86
  // signals.  Hence this test won't run on e.g. Mac OSX.
Kenton Varda's avatar
Kenton Varda committed
87 88
  //
  // Also, Android's bionic does not appear to support sigqueue() even though the kernel does.
89 90 91 92
  //
  // Also, this test fails on Linux on mipsel. si_value comes back as zero. No one with a mips
  // machine wants to debug the problem but they demand a patch fixing it, so we disable the test.
  // Sad. https://github.com/sandstorm-io/capnproto/issues/204
93

94
  captureSignals();
95 96
  UnixEventPort port;
  EventLoop loop(port);
97
  WaitScope waitScope(loop);
98

99
  union sigval value;
100
  memset(&value, 0, sizeof(value));
101
  value.sival_int = 123;
102
  sigqueue(getpid(), SIGURG, value);
103

104 105
  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
106
  EXPECT_SI_CODE(SI_QUEUE, info.si_code);
107 108
  EXPECT_EQ(123, info.si_value.sival_int);
}
109

110
TEST(AsyncUnixTest, SignalWithPointerValue) {
111 112 113 114
  // This tests that if we use sigqueue() to attach a value to the signal, that value is received
  // correctly.  Note that this only works on platforms that support real-time signals -- even
  // though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
  // signals.  Hence this test won't run on e.g. Mac OSX.
Kenton Varda's avatar
Kenton Varda committed
115 116
  //
  // Also, Android's bionic does not appear to support sigqueue() even though the kernel does.
117 118 119 120
  //
  // Also, this test fails on Linux on mipsel. si_value comes back as zero. No one with a mips
  // machine wants to debug the problem but they demand a patch fixing it, so we disable the test.
  // Sad. https://github.com/sandstorm-io/capnproto/issues/204
121

122
  captureSignals();
123 124 125 126 127 128 129 130 131 132 133 134 135 136
  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);

  union sigval value;
  memset(&value, 0, sizeof(value));
  value.sival_ptr = &port;
  sigqueue(getpid(), SIGURG, value);

  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
  EXPECT_SI_CODE(SI_QUEUE, info.si_code);
  EXPECT_EQ(&port, info.si_value.sival_ptr);
}
137
#endif
138

139 140
TEST(AsyncUnixTest, SignalsMultiListen) {
  captureSignals();
141 142
  UnixEventPort port;
  EventLoop loop(port);
143
  WaitScope waitScope(loop);
144

145
  port.onSignal(SIGIO).then([](siginfo_t&&) {
146
    KJ_FAIL_EXPECT("Received wrong signal.");
147
  }).detach([](kj::Exception&& exception) {
148
    KJ_FAIL_EXPECT(exception);
149
  });
150

151
  kill(getpid(), SIGURG);
152

153 154
  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
155
  EXPECT_SI_CODE(SI_USER, info.si_code);
156 157
}

158 159 160 161 162
#if !__CYGWIN32__
// Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does
// deliver SIGIO, if you reverse the order of the waits).  Since this doesn't occur on any other
// platform I'm assuming it's a Cygwin bug.

163 164
TEST(AsyncUnixTest, SignalsMultiReceive) {
  captureSignals();
165 166
  UnixEventPort port;
  EventLoop loop(port);
167
  WaitScope waitScope(loop);
168

169
  kill(getpid(), SIGURG);
170 171
  kill(getpid(), SIGIO);

172 173
  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
174 175
  EXPECT_SI_CODE(SI_USER, info.si_code);

176
  info = port.onSignal(SIGIO).wait(waitScope);
177 178 179 180
  EXPECT_EQ(SIGIO, info.si_signo);
  EXPECT_SI_CODE(SI_USER, info.si_code);
}

181 182
#endif  // !__CYGWIN32__

183 184
TEST(AsyncUnixTest, SignalsAsync) {
  captureSignals();
185 186
  UnixEventPort port;
  EventLoop loop(port);
187
  WaitScope waitScope(loop);
188 189 190

  // Arrange for a signal to be sent from another thread.
  pthread_t mainThread = pthread_self();
191
  Thread thread([&]() {
192
    delay();
193
    pthread_kill(mainThread, SIGURG);
194
  });
195

196 197
  siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
  EXPECT_EQ(SIGURG, info.si_signo);
198
#if __linux__
199
  EXPECT_SI_CODE(SI_TKILL, info.si_code);
200
#endif
201 202
}

203 204 205 206 207
#if !__CYGWIN32__
// Cygwin32 (but not Cygwin64) appears not to deliver SIGURG in the following test (but it does
// deliver SIGIO, if you reverse the order of the waits).  Since this doesn't occur on any other
// platform I'm assuming it's a Cygwin bug.

208
TEST(AsyncUnixTest, SignalsNoWait) {
209 210
  // Verify that UnixEventPort::poll() correctly receives pending signals.

211
  captureSignals();
212 213
  UnixEventPort port;
  EventLoop loop(port);
214
  WaitScope waitScope(loop);
215

216
  bool receivedSigurg = false;
217
  bool receivedSigio = false;
218 219 220
  port.onSignal(SIGURG).then([&](siginfo_t&& info) {
    receivedSigurg = true;
    EXPECT_EQ(SIGURG, info.si_signo);
221
    EXPECT_SI_CODE(SI_USER, info.si_code);
222
  }).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); });
223 224 225 226
  port.onSignal(SIGIO).then([&](siginfo_t&& info) {
    receivedSigio = true;
    EXPECT_EQ(SIGIO, info.si_signo);
    EXPECT_SI_CODE(SI_USER, info.si_code);
227
  }).detach([](Exception&& e) { KJ_FAIL_EXPECT(e); });
228

229
  kill(getpid(), SIGURG);
230 231
  kill(getpid(), SIGIO);

232
  EXPECT_FALSE(receivedSigurg);
233 234 235 236
  EXPECT_FALSE(receivedSigio);

  loop.run();

237
  EXPECT_FALSE(receivedSigurg);
238 239 240 241
  EXPECT_FALSE(receivedSigio);

  port.poll();

242
  EXPECT_FALSE(receivedSigurg);
243 244 245 246
  EXPECT_FALSE(receivedSigio);

  loop.run();

247
  EXPECT_TRUE(receivedSigurg);
248 249 250
  EXPECT_TRUE(receivedSigio);
}

251 252
#endif  // !__CYGWIN32__

253 254
TEST(AsyncUnixTest, ReadObserver) {
  captureSignals();
255 256
  UnixEventPort port;
  EventLoop loop(port);
257
  WaitScope waitScope(loop);
258 259 260

  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
261 262
  kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);

263
  UnixEventPort::FdObserver observer(port, infd, UnixEventPort::FdObserver::OBSERVE_READ);
264 265 266

  KJ_SYSCALL(write(outfd, "foo", 3));

267
  observer.whenBecomesReadable().wait(waitScope);
268 269

#if __linux__  // platform known to support POLLRDHUP
270
  EXPECT_FALSE(KJ_ASSERT_NONNULL(observer.atEndHint()));
271

272 273 274 275 276 277 278 279
  char buffer[4096];
  ssize_t n;
  KJ_SYSCALL(n = read(infd, &buffer, sizeof(buffer)));
  EXPECT_EQ(3, n);

  KJ_SYSCALL(write(outfd, "bar", 3));
  outfd = nullptr;

280
  observer.whenBecomesReadable().wait(waitScope);
281

282
  EXPECT_TRUE(KJ_ASSERT_NONNULL(observer.atEndHint()));
283
#endif
284 285
}

286 287
TEST(AsyncUnixTest, ReadObserverMultiListen) {
  captureSignals();
288 289
  UnixEventPort port;
  EventLoop loop(port);
290
  WaitScope waitScope(loop);
291 292 293 294 295

  int bogusPipefds[2];
  KJ_SYSCALL(pipe(bogusPipefds));
  KJ_DEFER({ close(bogusPipefds[1]); close(bogusPipefds[0]); });

296 297 298 299
  UnixEventPort::FdObserver bogusObserver(port, bogusPipefds[0],
      UnixEventPort::FdObserver::OBSERVE_READ);

  bogusObserver.whenBecomesReadable().then([]() {
300
    ADD_FAILURE() << "Received wrong poll.";
301
  }).detach([](kj::Exception&& exception) {
302 303
    ADD_FAILURE() << kj::str(exception).cStr();
  });
304 305 306 307

  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
  KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
308

309 310
  UnixEventPort::FdObserver observer(port, pipefds[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
311 312
  KJ_SYSCALL(write(pipefds[1], "foo", 3));

313
  observer.whenBecomesReadable().wait(waitScope);
314 315
}

316 317
TEST(AsyncUnixTest, ReadObserverMultiReceive) {
  captureSignals();
318 319
  UnixEventPort port;
  EventLoop loop(port);
320
  WaitScope waitScope(loop);
321 322 323 324

  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
  KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
325

326 327
  UnixEventPort::FdObserver observer(port, pipefds[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
328 329 330 331 332
  KJ_SYSCALL(write(pipefds[1], "foo", 3));

  int pipefds2[2];
  KJ_SYSCALL(pipe(pipefds2));
  KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
333

334 335
  UnixEventPort::FdObserver observer2(port, pipefds2[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
336 337
  KJ_SYSCALL(write(pipefds2[1], "bar", 3));

338 339 340 341
  auto promise1 = observer.whenBecomesReadable();
  auto promise2 = observer2.whenBecomesReadable();
  promise1.wait(waitScope);
  promise2.wait(waitScope);
342 343
}

344 345
TEST(AsyncUnixTest, ReadObserverAsync) {
  captureSignals();
346 347
  UnixEventPort port;
  EventLoop loop(port);
348
  WaitScope waitScope(loop);
349

350
  // Make a pipe and wait on its read end while another thread writes to it.
351 352
  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
353
  KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
354 355
  UnixEventPort::FdObserver observer(port, pipefds[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
356

357 358 359
  Thread thread([&]() {
    delay();
    KJ_SYSCALL(write(pipefds[1], "foo", 3));
360 361
  });

362
  // Wait for the event in this thread.
363
  observer.whenBecomesReadable().wait(waitScope);
364 365
}

366
TEST(AsyncUnixTest, ReadObserverNoWait) {
367 368
  // Verify that UnixEventPort::poll() correctly receives pending FD events.

369
  captureSignals();
370 371
  UnixEventPort port;
  EventLoop loop(port);
372
  WaitScope waitScope(loop);
373 374 375 376

  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
  KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
377 378
  UnixEventPort::FdObserver observer(port, pipefds[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
379 380 381 382

  int pipefds2[2];
  KJ_SYSCALL(pipe(pipefds2));
  KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
383 384
  UnixEventPort::FdObserver observer2(port, pipefds2[0],
      UnixEventPort::FdObserver::OBSERVE_READ);
385 386

  int receivedCount = 0;
387
  observer.whenBecomesReadable().then([&]() {
388
    receivedCount++;
389
  }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
390
  observer2.whenBecomesReadable().then([&]() {
391
    receivedCount++;
392
  }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409

  KJ_SYSCALL(write(pipefds[1], "foo", 3));
  KJ_SYSCALL(write(pipefds2[1], "bar", 3));

  EXPECT_EQ(0, receivedCount);

  loop.run();

  EXPECT_EQ(0, receivedCount);

  port.poll();

  EXPECT_EQ(0, receivedCount);

  loop.run();

  EXPECT_EQ(2, receivedCount);
410 411
}

412 413 414 415 416 417 418 419
static void setNonblocking(int fd) {
  int flags;
  KJ_SYSCALL(flags = fcntl(fd, F_GETFL));
  if ((flags & O_NONBLOCK) == 0) {
    KJ_SYSCALL(fcntl(fd, F_SETFL, flags | O_NONBLOCK));
  }
}

420 421
TEST(AsyncUnixTest, WriteObserver) {
  captureSignals();
422 423 424 425 426 427 428 429
  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);

  int pipefds[2];
  KJ_SYSCALL(pipe(pipefds));
  kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
  setNonblocking(outfd);
430
  setNonblocking(infd);
431

432
  UnixEventPort::FdObserver observer(port, outfd, UnixEventPort::FdObserver::OBSERVE_WRITE);
433 434 435 436 437 438 439 440

  // Fill buffer.
  ssize_t n;
  do {
    KJ_NONBLOCKING_SYSCALL(n = write(outfd, "foo", 3));
  } while (n >= 0);

  bool writable = false;
441
  auto promise = observer.whenBecomesWritable()
442 443 444 445 446 447 448 449
      .then([&]() { writable = true; }).eagerlyEvaluate(nullptr);

  loop.run();
  port.poll();
  loop.run();

  EXPECT_FALSE(writable);

450 451 452 453
  // Empty the read end so that the write end becomes writable. Note that Linux implements a
  // high watermark / low watermark heuristic which means that only reading one byte is not
  // sufficient. The amount we have to read is in fact architecture-dependent -- it appears to be
  // 1 page. To be safe, we read everything.
454
  char buffer[4096];
455
  do {
456
    KJ_NONBLOCKING_SYSCALL(n = read(infd, &buffer, sizeof(buffer)));
457
  } while (n > 0);
458 459 460 461 462 463 464 465

  loop.run();
  port.poll();
  loop.run();

  EXPECT_TRUE(writable);
}

466 467
#if !__APPLE__
// Disabled on macOS due to https://github.com/sandstorm-io/capnproto/issues/374.
468 469 470
TEST(AsyncUnixTest, UrgentObserver) {
  // Verify that FdObserver correctly detects availability of out-of-band data.
  // Availability of out-of-band data is implementation-specific.
471 472
  // Linux's and OS X's TCP/IP stack supports out-of-band messages for TCP sockets, which is used
  // for this test.
473 474 475 476 477

  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);
  int tmpFd;
478
  char c;
479

480
  // Spawn a TCP server
481 482
  KJ_SYSCALL(tmpFd = socket(AF_INET, SOCK_STREAM, 0));
  kj::AutoCloseFd serverFd(tmpFd);
483 484
  sockaddr_in saddr;
  memset(&saddr, 0, sizeof(saddr));
485 486 487
  saddr.sin_family = AF_INET;
  saddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
  KJ_SYSCALL(bind(serverFd, reinterpret_cast<sockaddr*>(&saddr), sizeof(saddr)));
488 489
  socklen_t saddrLen = sizeof(saddr);
  KJ_SYSCALL(getsockname(serverFd, reinterpret_cast<sockaddr*>(&saddr), &saddrLen));
490 491
  KJ_SYSCALL(listen(serverFd, 1));

492
  // Accept one connection, send in-band and OOB byte, wait for a quit message
493
  Thread thread([&]() {
494 495 496
    int tmpFd;
    char c;

497 498 499 500 501
    sockaddr_in caddr;
    socklen_t caddrLen = sizeof(caddr);
    KJ_SYSCALL(tmpFd = accept(serverFd, reinterpret_cast<sockaddr*>(&caddr), &caddrLen));
    kj::AutoCloseFd clientFd(tmpFd);
    delay();
502 503 504 505 506

    // Workaround: OS X won't signal POLLPRI without POLLIN. Also enqueue some in-band data.
    c = 'i';
    KJ_SYSCALL(send(clientFd, &c, 1, 0));
    c = 'o';
507
    KJ_SYSCALL(send(clientFd, &c, 1, MSG_OOB));
508

509
    KJ_SYSCALL(recv(clientFd, &c, 1, 0));
510
    EXPECT_EQ('q', c);
511
  });
512
  KJ_DEFER({ shutdown(serverFd, SHUT_RDWR); serverFd = nullptr; });
513 514 515

  KJ_SYSCALL(tmpFd = socket(AF_INET, SOCK_STREAM, 0));
  kj::AutoCloseFd clientFd(tmpFd);
516
  KJ_SYSCALL(connect(clientFd, reinterpret_cast<sockaddr*>(&saddr), saddrLen));
517

518 519 520
  UnixEventPort::FdObserver observer(port, clientFd,
      UnixEventPort::FdObserver::OBSERVE_READ | UnixEventPort::FdObserver::OBSERVE_URGENT);

521
  observer.whenUrgentDataAvailable().wait(waitScope);
Kenton Varda's avatar
Kenton Varda committed
522 523 524 525 526 527 528 529 530 531 532

#if __CYGWIN__
  // On Cygwin, reading the urgent byte first causes the subsequent regular read to block until
  // such a time as the connection closes -- and then the byte is successfully returned. This
  // seems to be a cygwin bug.
  KJ_SYSCALL(recv(clientFd, &c, 1, 0));
  EXPECT_EQ('i', c);
  KJ_SYSCALL(recv(clientFd, &c, 1, MSG_OOB));
  EXPECT_EQ('o', c);
#else
  // Attempt to read the urgent byte prior to reading the in-band byte.
533 534 535 536
  KJ_SYSCALL(recv(clientFd, &c, 1, MSG_OOB));
  EXPECT_EQ('o', c);
  KJ_SYSCALL(recv(clientFd, &c, 1, 0));
  EXPECT_EQ('i', c);
Kenton Varda's avatar
Kenton Varda committed
537
#endif
538

539 540
  // Allow server thread to let its clientFd go out of scope.
  c = 'q';
541
  KJ_SYSCALL(send(clientFd, &c, 1, 0));
542
  KJ_SYSCALL(shutdown(clientFd, SHUT_RDWR));
543
}
544
#endif
545

546 547
TEST(AsyncUnixTest, SteadyTimers) {
  captureSignals();
548 549 550 551
  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);

552 553 554
  auto& timer = port.getTimer();

  auto start = timer.now();
555 556
  kj::Vector<TimePoint> expected;
  kj::Vector<TimePoint> actual;
557

558 559
  auto addTimer = [&](Duration delay) {
    expected.add(max(start + delay, start));
560 561
    timer.atTime(start + delay).then([&]() {
      actual.add(timer.now());
562 563 564
    }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
  };

565 566 567 568 569
  addTimer(30 * MILLISECONDS);
  addTimer(40 * MILLISECONDS);
  addTimer(20350 * MICROSECONDS);
  addTimer(30 * MILLISECONDS);
  addTimer(-10 * MILLISECONDS);
570 571

  std::sort(expected.begin(), expected.end());
572
  timer.atTime(expected.back() + MILLISECONDS).wait(waitScope);
573 574 575

  ASSERT_EQ(expected.size(), actual.size());
  for (int i = 0; i < expected.size(); ++i) {
576 577
    KJ_EXPECT(expected[i] <= actual[i], "Actual time for timer i is too early.",
              i, ((expected[i] - actual[i]) / NANOSECONDS));
578 579 580
  }
}

581 582 583 584 585 586 587 588 589 590 591
bool dummySignalHandlerCalled = false;
void dummySignalHandler(int) {
  dummySignalHandlerCalled = true;
}

TEST(AsyncUnixTest, InterruptedTimer) {
  captureSignals();
  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);

592 593 594 595 596 597
#if __linux__
  // Linux timeslices are 1ms.
  constexpr auto OS_SLOWNESS_FACTOR = 1;
#else
  // OSX timeslices are 10ms, so we need longer timeouts to avoid flakiness.
  // To be safe we'll assume other OS's are similar.
598
  constexpr auto OS_SLOWNESS_FACTOR = 10;
599 600
#endif

601
  // Schedule a timer event in 100ms.
602 603
  auto& timer = port.getTimer();
  auto start = timer.now();
604
  constexpr auto timeout = 100 * MILLISECONDS * OS_SLOWNESS_FACTOR;
605

606
  // Arrange SIGALRM to be delivered in 50ms, handled in an empty signal handler. This will cause
607 608 609 610 611 612 613 614
  // our wait to be interrupted with EINTR. We should nevertheless continue waiting for the right
  // amount of time.
  dummySignalHandlerCalled = false;
  if (signal(SIGALRM, &dummySignalHandler) == SIG_ERR) {
    KJ_FAIL_SYSCALL("signal(SIGALRM)", errno);
  }
  struct itimerval itv;
  memset(&itv, 0, sizeof(itv));
615
  itv.it_value.tv_usec = 50000 * OS_SLOWNESS_FACTOR;  // signal after 50ms
616 617 618 619 620 621
  setitimer(ITIMER_REAL, &itv, nullptr);

  timer.afterDelay(timeout).wait(waitScope);

  KJ_EXPECT(dummySignalHandlerCalled);
  KJ_EXPECT(timer.now() - start >= timeout);
622
  KJ_EXPECT(timer.now() - start <= timeout + (timeout / 5));  // allow 20ms error
623 624
}

625 626
TEST(AsyncUnixTest, Wake) {
  captureSignals();
627 628 629 630 631 632 633 634 635 636 637 638 639
  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);

  EXPECT_FALSE(port.poll());
  port.wake();
  EXPECT_TRUE(port.poll());
  EXPECT_FALSE(port.poll());

  port.wake();
  EXPECT_TRUE(port.wait());

  {
640
    auto promise = port.getTimer().atTime(port.getTimer().now());
641 642 643 644 645 646 647 648 649 650 651 652 653
    EXPECT_FALSE(port.wait());
  }

  bool woken = false;
  Thread thread([&]() {
    delay();
    woken = true;
    port.wake();
  });

  EXPECT_TRUE(port.wait());
}

654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719
int exitCodeForSignal = 0;
void exitSignalHandler(int) {
  _exit(exitCodeForSignal);
}

struct TestChild {
  kj::Maybe<pid_t> pid;
  kj::Promise<int> promise = nullptr;

  TestChild(UnixEventPort& port, int exitCode) {
    pid_t p;
    KJ_SYSCALL(p = fork());
    if (p == 0) {
      // Arrange for SIGTERM to cause the process to exit normally.
      exitCodeForSignal = exitCode;
      signal(SIGTERM, &exitSignalHandler);
      sigset_t sigs;
      sigemptyset(&sigs);
      sigaddset(&sigs, SIGTERM);
      sigprocmask(SIG_UNBLOCK, &sigs, nullptr);

      for (;;) pause();
    }
    pid = p;
    promise = port.onChildExit(pid);
  }

  ~TestChild() noexcept(false) {
    KJ_IF_MAYBE(p, pid) {
      KJ_SYSCALL(::kill(*p, SIGKILL)) { return; }
      int status;
      KJ_SYSCALL(waitpid(*p, &status, 0)) { return; }
    }
  }

  void kill(int signo) {
    KJ_SYSCALL(::kill(KJ_REQUIRE_NONNULL(pid), signo));
  }

  KJ_DISALLOW_COPY(TestChild);
};

TEST(AsyncUnixTest, ChildProcess) {
  captureSignals();
  UnixEventPort port;
  EventLoop loop(port);
  WaitScope waitScope(loop);

  // Block SIGTERM so that we can carefully un-block it in children.
  sigset_t sigs, oldsigs;
  KJ_SYSCALL(sigemptyset(&sigs));
  KJ_SYSCALL(sigaddset(&sigs, SIGTERM));
  KJ_SYSCALL(sigprocmask(SIG_BLOCK, &sigs, &oldsigs));
  KJ_DEFER(KJ_SYSCALL(sigprocmask(SIG_SETMASK, &oldsigs, nullptr)) { break; });

  TestChild child1(port, 123);
  KJ_EXPECT(!child1.promise.poll(waitScope));

  child1.kill(SIGTERM);

  {
    int status = child1.promise.wait(waitScope);
    KJ_EXPECT(WIFEXITED(status));
    KJ_EXPECT(WEXITSTATUS(status) == 123);
  }

720 721 722
  TestChild child2(port, 234);
  TestChild child3(port, 345);

723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739
  KJ_EXPECT(!child2.promise.poll(waitScope));
  KJ_EXPECT(!child3.promise.poll(waitScope));

  child2.kill(SIGKILL);

  {
    int status = child2.promise.wait(waitScope);
    KJ_EXPECT(!WIFEXITED(status));
    KJ_EXPECT(WIFSIGNALED(status));
    KJ_EXPECT(WTERMSIG(status) == SIGKILL);
  }

  KJ_EXPECT(!child3.promise.poll(waitScope));

  // child3 will be killed and synchronously waited on the way out.
}

740
}  // namespace
741
}  // namespace kj
742 743

#endif  // !_WIN32