Commit 6418de1d authored by Kenton Varda's avatar Kenton Varda

Tweak timer implementation:

- Define separate times for durations vs. absolute times.
- Make more consistent with existing precedents in KJ style.
parent 8d6b39c8
...@@ -4,7 +4,7 @@ under the same BSD license terms as the rest of the library. ...@@ -4,7 +4,7 @@ under the same BSD license terms as the rest of the library.
Kenton Varda <temporal@gmail.com>: Primary Author Kenton Varda <temporal@gmail.com>: Primary Author
Jason Choy <jjwchoy@gmail.com>: kj/threadlocal.h and other iOS tweaks Jason Choy <jjwchoy@gmail.com>: kj/threadlocal.h and other iOS tweaks
Remy Blank <rblank@google.com> (contributions copyright Google Inc.) Remy Blank <rblank@google.com> (contributions copyright Google Inc.): KJ Timers
This file does not list people who maintain their own Cap'n Proto This file does not list people who maintain their own Cap'n Proto
implementations as separate projects. Those people are awesome too! :) implementations as separate projects. Those people are awesome too! :)
...@@ -743,13 +743,13 @@ class TimerImpl final: public Timer { ...@@ -743,13 +743,13 @@ class TimerImpl final: public Timer {
public: public:
TimerImpl(UnixEventPort& eventPort): eventPort(eventPort) {} TimerImpl(UnixEventPort& eventPort): eventPort(eventPort) {}
virtual Time steadyTime() { return eventPort.steadyTime(); } TimePoint now() override { return eventPort.steadyTime(); }
virtual Promise<void> atSteadyTime(Time time) { Promise<void> atTime(TimePoint time) override {
return eventPort.atSteadyTime(time); return eventPort.atSteadyTime(time);
} }
virtual Promise<void> atTimeFromNow(Time delay) { Promise<void> afterDelay(Duration delay) override {
return eventPort.atSteadyTime(eventPort.steadyTime() + delay); return eventPort.atSteadyTime(eventPort.steadyTime() + delay);
} }
......
...@@ -199,7 +199,12 @@ public: ...@@ -199,7 +199,12 @@ public:
// much at once but I'm not sure how to cleanly break it down. // much at once but I'm not sure how to cleanly break it down.
virtual Timer& getTimer() = 0; virtual Timer& getTimer() = 0;
// Returns a Timer interface for the underlying event loop. // Returns a `Timer` based on real time. Time does not pass while event handlers are running --
// it only updates when the event loop polls for system events. This means that calling `now()`
// on this timer does not require a system call.
//
// This timer is not affected by changes to the system date. It is unspecified whether the timer
// continues to count while the system is suspended.
}; };
class LowLevelAsyncIoProvider { class LowLevelAsyncIoProvider {
...@@ -276,7 +281,12 @@ public: ...@@ -276,7 +281,12 @@ public:
// `flags` is a bitwise-OR of the values of the `Flags` enum. // `flags` is a bitwise-OR of the values of the `Flags` enum.
virtual Timer& getTimer() = 0; virtual Timer& getTimer() = 0;
// Returns a Timer interface for the underlying event loop. // Returns a `Timer` based on real time. Time does not pass while event handlers are running --
// it only updates when the event loop polls for system events. This means that calling `now()`
// on this timer does not require a system call.
//
// This timer is not affected by changes to the system date. It is unspecified whether the timer
// continues to count while the system is suspended.
}; };
Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel); Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel);
......
...@@ -321,30 +321,29 @@ TEST_F(AsyncUnixTest, SteadyTimers) { ...@@ -321,30 +321,29 @@ TEST_F(AsyncUnixTest, SteadyTimers) {
WaitScope waitScope(loop); WaitScope waitScope(loop);
auto start = port.steadyTime(); auto start = port.steadyTime();
std::vector<Time> expected; kj::Vector<TimePoint> expected;
std::vector<Time> actual; kj::Vector<TimePoint> actual;
auto addTimer = [&](Time delay) { auto addTimer = [&](Duration delay) {
expected.push_back(start + std::max(delay, Time())); expected.add(max(start + delay, start));
port.atSteadyTime(start + delay).then([&]() { port.atSteadyTime(start + delay).then([&]() {
actual.push_back(port.steadyTime()); actual.add(port.steadyTime());
}).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); }); }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
}; };
addTimer(30 * MILLISECOND); addTimer(30 * MILLISECONDS);
addTimer(40 * MILLISECOND); addTimer(40 * MILLISECONDS);
addTimer(20350 * MICROSECOND); addTimer(20350 * MICROSECONDS);
addTimer(30 * MILLISECOND); addTimer(30 * MILLISECONDS);
addTimer(-10 * MILLISECOND); addTimer(-10 * MILLISECONDS);
std::sort(expected.begin(), expected.end()); std::sort(expected.begin(), expected.end());
port.atSteadyTime(expected.back() + MILLISECOND).wait(waitScope); port.atSteadyTime(expected.back() + MILLISECONDS).wait(waitScope);
ASSERT_EQ(expected.size(), actual.size()); ASSERT_EQ(expected.size(), actual.size());
for (int i = 0; i < expected.size(); ++i) { for (int i = 0; i < expected.size(); ++i) {
EXPECT_LE(expected[i], actual[i]) << "Actual time for timer " << i << "(" EXPECT_LE(expected[i], actual[i]) << "Actual time for timer " << i << " is "
<< actual[i] / MICROSECOND << ") lower than expected time (" << ((expected[i] - actual[i]) / NANOSECONDS) << " ns too early.";
<< expected[i] / MICROSECOND << ")";
} }
} }
......
...@@ -28,7 +28,8 @@ ...@@ -28,7 +28,8 @@
#include <errno.h> #include <errno.h>
#include <inttypes.h> #include <inttypes.h>
#include <limits> #include <limits>
#include <chrono> #include <set>
#include <time.h>
namespace kj { namespace kj {
...@@ -83,6 +84,16 @@ pthread_once_t registerReservedSignalOnce = PTHREAD_ONCE_INIT; ...@@ -83,6 +84,16 @@ pthread_once_t registerReservedSignalOnce = PTHREAD_ONCE_INIT;
// ======================================================================================= // =======================================================================================
struct UnixEventPort::TimerSet {
struct TimerBefore {
bool operator()(TimerPromiseAdapter* lhs, TimerPromiseAdapter* rhs);
};
using Timers = std::multiset<TimerPromiseAdapter*, TimerBefore>;
Timers timers;
};
// =======================================================================================
class UnixEventPort::SignalPromiseAdapter { class UnixEventPort::SignalPromiseAdapter {
public: public:
inline SignalPromiseAdapter(PromiseFulfiller<siginfo_t>& fulfiller, inline SignalPromiseAdapter(PromiseFulfiller<siginfo_t>& fulfiller,
...@@ -166,36 +177,38 @@ public: ...@@ -166,36 +177,38 @@ public:
class UnixEventPort::TimerPromiseAdapter { class UnixEventPort::TimerPromiseAdapter {
public: public:
TimerPromiseAdapter(PromiseFulfiller<void>& fulfiller, UnixEventPort& port, Time time) TimerPromiseAdapter(PromiseFulfiller<void>& fulfiller, UnixEventPort& port, TimePoint time)
: time(time), fulfiller(fulfiller), port(port) { : time(time), fulfiller(fulfiller), port(port) {
pos = port.timers.insert(this); pos = port.timers->timers.insert(this);
} }
~TimerPromiseAdapter() { ~TimerPromiseAdapter() {
if (pos != port.timers.end()) { if (pos != port.timers->timers.end()) {
port.timers.erase(pos); port.timers->timers.erase(pos);
} }
} }
void fulfill() { void fulfill() {
fulfiller.fulfill(); fulfiller.fulfill();
port.timers.erase(pos); port.timers->timers.erase(pos);
pos = port.timers.end(); pos = port.timers->timers.end();
} }
const Time time; const TimePoint time;
PromiseFulfiller<void>& fulfiller; PromiseFulfiller<void>& fulfiller;
UnixEventPort& port; UnixEventPort& port;
Timers::const_iterator pos; TimerSet::Timers::const_iterator pos;
}; };
bool UnixEventPort::TimerBefore::operator()(TimerPromiseAdapter* lhs, TimerPromiseAdapter* rhs) { bool UnixEventPort::TimerSet::TimerBefore::operator()(
TimerPromiseAdapter* lhs, TimerPromiseAdapter* rhs) {
return lhs->time < rhs->time; return lhs->time < rhs->time;
} }
UnixEventPort::UnixEventPort() { UnixEventPort::UnixEventPort()
: timers(kj::heap<TimerSet>()),
frozenSteadyTime(currentSteadyTime()) {
pthread_once(&registerReservedSignalOnce, &registerReservedSignal); pthread_once(&registerReservedSignalOnce, &registerReservedSignal);
frozenSteadyTime = currentSteadyTime();
} }
UnixEventPort::~UnixEventPort() {} UnixEventPort::~UnixEventPort() {}
...@@ -278,7 +291,7 @@ private: ...@@ -278,7 +291,7 @@ private:
int pollError = 0; int pollError = 0;
}; };
Promise<void> UnixEventPort::atSteadyTime(Time time) { Promise<void> UnixEventPort::atSteadyTime(TimePoint time) {
return newAdaptedPromise<void, TimerPromiseAdapter>(*this, time); return newAdaptedPromise<void, TimerPromiseAdapter>(*this, time);
} }
...@@ -316,22 +329,23 @@ void UnixEventPort::wait() { ...@@ -316,22 +329,23 @@ void UnixEventPort::wait() {
threadCapture = &capture; threadCapture = &capture;
sigprocmask(SIG_UNBLOCK, &newMask, &origMask); sigprocmask(SIG_UNBLOCK, &newMask, &origMask);
constexpr Time MAX_TIMEOUT = // poll()'s timeout is an `int` count of milliseconds, so truncate to that.
std::numeric_limits<int>::digits < std::numeric_limits<uint64_t>::digits ? // Also, make sure that we aren't within a millisecond of overflowing a `Duration` since that
int64_t(std::numeric_limits<int>::max() - 1) * MILLISECOND : // will break the math below.
Time(std::numeric_limits<uint64_t>::max()) - MILLISECOND; constexpr Duration MAX_TIMEOUT =
min(int(maxValue) * MILLISECONDS, Duration(maxValue) - MILLISECONDS);
int pollTimeout = -1; int pollTimeout = -1;
auto timer = timers.begin(); auto timer = timers->timers.begin();
if (timer != timers.end()) { if (timer != timers->timers.end()) {
Time timeout = (*timer)->time - currentSteadyTime(); Duration timeout = (*timer)->time - currentSteadyTime();
if (timeout < Time()) { if (timeout < 0 * SECONDS) {
pollTimeout = 0; pollTimeout = 0;
} else if (timeout <= MAX_TIMEOUT) { } else if (timeout < MAX_TIMEOUT) {
// Round up to the next millisecond // Round up to the next millisecond
pollTimeout = (timeout + MILLISECOND - unit<Time>()) / MILLISECOND; pollTimeout = (timeout + 1 * MILLISECONDS - unit<Duration>()) / MILLISECONDS;
} else { } else {
pollTimeout = MAX_TIMEOUT / MILLISECOND; pollTimeout = MAX_TIMEOUT / MILLISECONDS;
} }
} }
pollContext.run(pollTimeout); pollContext.run(pollTimeout);
...@@ -404,16 +418,17 @@ void UnixEventPort::gotSignal(const siginfo_t& siginfo) { ...@@ -404,16 +418,17 @@ void UnixEventPort::gotSignal(const siginfo_t& siginfo) {
} }
} }
Time UnixEventPort::currentSteadyTime() { TimePoint UnixEventPort::currentSteadyTime() {
return Time(std::chrono::duration_cast<std::chrono::microseconds>( struct timespec tp;
std::chrono::steady_clock::now().time_since_epoch()).count()); KJ_SYSCALL(clock_gettime(CLOCK_MONOTONIC, &tp));
return origin<TimePoint>() + (tp.tv_sec * SECONDS + tp.tv_sec * NANOSECONDS);
} }
void UnixEventPort::processTimers() { void UnixEventPort::processTimers() {
frozenSteadyTime = currentSteadyTime(); frozenSteadyTime = currentSteadyTime();
for (;;) { for (;;) {
auto front = timers.begin(); auto front = timers->timers.begin();
if (front == timers.end() || (*front)->time > frozenSteadyTime) { if (front == timers->timers.end() || (*front)->time > frozenSteadyTime) {
break; break;
} }
(*front)->fulfill(); (*front)->fulfill();
......
...@@ -30,7 +30,6 @@ ...@@ -30,7 +30,6 @@
#include <signal.h> #include <signal.h>
#include <poll.h> #include <poll.h>
#include <pthread.h> #include <pthread.h>
#include <set>
namespace kj { namespace kj {
...@@ -85,8 +84,8 @@ public: ...@@ -85,8 +84,8 @@ public:
// needs to use SIGUSR1, call this at startup (before any calls to `captureSignal()` and before // needs to use SIGUSR1, call this at startup (before any calls to `captureSignal()` and before
// constructing an `UnixEventPort`) to offer a different signal. // constructing an `UnixEventPort`) to offer a different signal.
Time steadyTime() { return frozenSteadyTime; } TimePoint steadyTime() { return frozenSteadyTime; }
Promise<void> atSteadyTime(Time time); Promise<void> atSteadyTime(TimePoint time);
// implements EventPort ------------------------------------------------------ // implements EventPort ------------------------------------------------------
void wait() override; void wait() override;
...@@ -98,21 +97,18 @@ private: ...@@ -98,21 +97,18 @@ private:
class TimerPromiseAdapter; class TimerPromiseAdapter;
class PollContext; class PollContext;
struct TimerBefore { struct TimerSet; // Defined in source file to avoid STL include.
bool operator()(TimerPromiseAdapter* lhs, TimerPromiseAdapter* rhs);
};
using Timers = std::multiset<TimerPromiseAdapter*, TimerBefore>;
PollPromiseAdapter* pollHead = nullptr; PollPromiseAdapter* pollHead = nullptr;
PollPromiseAdapter** pollTail = &pollHead; PollPromiseAdapter** pollTail = &pollHead;
SignalPromiseAdapter* signalHead = nullptr; SignalPromiseAdapter* signalHead = nullptr;
SignalPromiseAdapter** signalTail = &signalHead; SignalPromiseAdapter** signalTail = &signalHead;
Timers timers; Own<TimerSet> timers;
Time frozenSteadyTime; TimePoint frozenSteadyTime;
void gotSignal(const siginfo_t& siginfo); void gotSignal(const siginfo_t& siginfo);
Time currentSteadyTime(); TimePoint currentSteadyTime();
void processTimers(); void processTimers();
friend class TimerPromiseAdapter; friend class TimerPromiseAdapter;
......
// Copyright (c) 2014, Kenton Varda <temporal@gmail.com> // Copyright (c) 2014 Google Inc. (contributed by Remy Blank <rblank@google.com>)
// Copyright (c) 2014 Kenton Varda <temporal@gmail.com>
// All rights reserved. // All rights reserved.
// //
// Redistribution and use in source and binary forms, with or without // Redistribution and use in source and binary forms, with or without
...@@ -31,38 +32,52 @@ ...@@ -31,38 +32,52 @@
namespace kj { namespace kj {
namespace _ { // private namespace _ { // private
class MicrosecondLabel; class NanosecondLabel;
class TimeLabel;
class DateLabel;
} // namespace _ (private) } // namespace _ (private)
using Time = Quantity<int64_t, _::MicrosecondLabel>; using Duration = Quantity<int64_t, _::NanosecondLabel>;
// A time value, in microseconds. // A time value, in microseconds.
constexpr Time MICROSECOND = unit<Time>(); constexpr Duration NANOSECONDS = unit<Duration>();
constexpr Time MILLISECOND = 1000 * MICROSECOND; constexpr Duration MICROSECONDS = 1000 * NANOSECONDS;
constexpr Time SECOND = 1000 * MILLISECOND; constexpr Duration MILLISECONDS = 1000 * MICROSECONDS;
constexpr Time MINUTE = 60 * SECOND; constexpr Duration SECONDS = 1000 * MILLISECONDS;
constexpr Time HOUR = 60 * MINUTE; constexpr Duration MINUTES = 60 * SECONDS;
constexpr Time DAY = 24 * HOUR; constexpr Duration HOURS = 60 * MINUTES;
constexpr Duration DAYS = 24 * HOURS;
using TimePoint = Absolute<Duration, _::TimeLabel>;
// An absolute time measured by some particular instance of `Timer`. `Time`s from two different
// `Timer`s may be measured from different origins and so are not necessarily compatible.
using Date = Absolute<Duration, _::DateLabel>;
// A point in real-world time, measured relative to the Unix epoch (Jan 1, 1970 00:00:00 UTC).
constexpr Date UNIX_EPOCH = origin<Date>();
// The `Date` representing Jan 1, 1970 00:00:00 UTC.
class Timer { class Timer {
// Interface to time and timer functionality. The underlying time unit comes from // Interface to time and timer functionality.
// a steady clock, i.e. a clock that increments steadily and is independent of //
// system (or wall) time. // Each `Timer` may have a different origin, and some `Timer`s may in fact tick at a different
// rate than real time (e.g. a `Timer` could represent CPU time consumed by a thread). However,
// all `Timer`s are monotonic: time will never appear to move backwards, even if the calendar
// date as tracked by the system is manually modified.
public: public:
virtual Time steadyTime() = 0; virtual TimePoint now() = 0;
// Returns the current value of a clock that moves steadily forward, independent of any // Returns the current value of a clock that moves steadily forward, independent of any
// changes in the wall clock. The value is updated every time the event loop waits, // changes in the wall clock. The value is updated every time the event loop waits,
// and is constant in-between waits. // and is constant in-between waits.
virtual Promise<void> atSteadyTime(Time time) = 0; virtual Promise<void> atTime(TimePoint time) = 0;
// Schedules a timer that will trigger when the value returned by steadyTime() reaches // Returns a promise that returns as soon as now() >= time.
// the given time.
virtual Promise<void> atTimeFromNow(Time delay) = 0; virtual Promise<void> afterDelay(Duration delay) = 0;
// Schedules a timer that will trigger after the given delay. The timer uses steadyTime() // Equivalent to atTime(now() + delay).
// and is therefore immune to system clock updates.
}; };
} // namespace kj } // namespace kj
......
...@@ -219,6 +219,10 @@ class Quantity { ...@@ -219,6 +219,10 @@ class Quantity {
public: public:
inline constexpr Quantity() {} inline constexpr Quantity() {}
inline constexpr Quantity(decltype(maxValue)): value(maxValue) {}
inline constexpr Quantity(decltype(minValue)): value(minValue) {}
// Allow initialization from maxValue and minValue.
inline explicit constexpr Quantity(Number value): value(value) {} inline explicit constexpr Quantity(Number value): value(value) {}
// This constructor was intended to be private, but GCC complains about it being private in a // This constructor was intended to be private, but GCC complains about it being private in a
// bunch of places that don't appear to even call it, so I made it public. Oh well. // bunch of places that don't appear to even call it, so I made it public. Oh well.
...@@ -362,6 +366,63 @@ inline constexpr auto operator*(UnitRatio<Number1, Unit2, Unit> ratio, ...@@ -362,6 +366,63 @@ inline constexpr auto operator*(UnitRatio<Number1, Unit2, Unit> ratio,
return measure * ratio; return measure * ratio;
} }
// =======================================================================================
// Absolute measures
template <typename T, typename Label>
class Absolute {
// Wraps some other value -- typically a Quantity -- but represents a value measured based on
// some absolute origin. For exmaple, if `Duration` is a type representing a time duration,
// Absolute<Duration, UnixEpoch> might be a calendar date.
//
// Since Absolute represents measurements relative to some arbitrary origin, the only sensible
// arithmetic to perform on them is addition and subtraction.
// TODO(someday): Do the same automatic expansion of integer width that Quantity does? Doesn't
// matter for our time use case, where we always use 64-bit anyway. Note that fixing this
// would implicitly allow things like multiplying an Absolute by a UnitRatio to change its
// units, which is actually totally logical and kind of neat.
public:
inline constexpr Absolute operator+(const T& other) const { return Absolute(value + other); }
inline constexpr Absolute operator-(const T& other) const { return Absolute(value - other); }
inline constexpr T operator-(const Absolute& other) const { return value - other.value; }
inline Absolute& operator+=(const T& other) { value += other; return *this; }
inline Absolute& operator-=(const T& other) { value -= other; return *this; }
inline constexpr bool operator==(const Absolute& other) const { return value == other.value; }
inline constexpr bool operator!=(const Absolute& other) const { return value != other.value; }
inline constexpr bool operator<=(const Absolute& other) const { return value <= other.value; }
inline constexpr bool operator>=(const Absolute& other) const { return value >= other.value; }
inline constexpr bool operator< (const Absolute& other) const { return value < other.value; }
inline constexpr bool operator> (const Absolute& other) const { return value > other.value; }
private:
T value;
explicit constexpr Absolute(T value): value(value) {}
template <typename U>
friend inline constexpr U origin();
};
template <typename T, typename Label>
inline constexpr Absolute<T, Label> operator+(const T& a, const Absolute<T, Label>& b) {
return b + a;
}
template <typename T> struct UnitOf_ { typedef T Type; };
template <typename T, typename Label> struct UnitOf_<Absolute<T, Label>> { typedef T Type; };
template <typename T>
using UnitOf = typename UnitOf_<T>::Type;
// UnitOf<Absolute<T, U>> is T. UnitOf<AnythingElse> is AnythingElse.
template <typename T>
inline constexpr T origin() { return T(0 * unit<UnitOf<T>>()); }
// origin<Absolute<T, U>>() returns an Absolute of value 0. It also, intentionally, works on basic
// numeric types.
} // namespace kj } // namespace kj
#endif // KJ_UNITS_H_ #endif // KJ_UNITS_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment