Commit 86410ee7 authored by Kenton Varda's avatar Kenton Varda

Add optimization helpers to async streams.

- AsyncInputStream::tryGetLength() can report the amount of data the stream will produce. Useful for implementing HTTP Content-Length header.
- AsyncInputStream::pumpTo() copies data from an input stream to an output stream, using double-dispatch to find an optimal approach.
parent 40109b72
......@@ -40,6 +40,56 @@ Promise<size_t> AsyncInputStream::read(void* buffer, size_t minBytes, size_t max
});
}
Maybe<uint64_t> AsyncInputStream::tryGetLength() { return nullptr; }
namespace {
class AsyncPump {
public:
AsyncPump(AsyncInputStream& input, AsyncOutputStream& output, uint64_t limit)
: input(input), output(output), limit(limit) {}
Promise<uint64_t> pump() {
// TODO(perf): This could be more efficient by reading half a buffer at a time and then
// starting the next read concurrent with writing the data from the previous read.
uint64_t n = kj::min(limit - doneSoFar, sizeof(buffer));
if (n == 0) return doneSoFar;
return input.tryRead(buffer, 1, sizeof(buffer))
.then([this](size_t amount) -> Promise<uint64_t> {
if (amount == 0) return doneSoFar; // EOF
doneSoFar += amount;
return output.write(buffer, amount)
.then([this]() {
return pump();
});
});
}
private:
AsyncInputStream& input;
AsyncOutputStream& output;
uint64_t limit;
uint64_t doneSoFar = 0;
byte buffer[4096];
};
} // namespace
Promise<uint64_t> AsyncInputStream::pumpTo(
AsyncOutputStream& output, uint64_t amount) {
// See if output wants to dispatch on us.
KJ_IF_MAYBE(result, output.tryPumpFrom(*this, amount)) {
return kj::mv(*result);
}
// OK, fall back to naive approach.
auto pump = heap<AsyncPump>(*this, output, amount);
auto promise = pump->pump();
return promise.attach(kj::mv(pump));
}
namespace {
class AllReader {
......@@ -106,6 +156,11 @@ Promise<String> AsyncInputStream::readAllText() {
return promise.attach(kj::mv(reader));
}
Maybe<Promise<uint64_t>> AsyncOutputStream::tryPumpFrom(
AsyncInputStream& input, uint64_t amount) {
return nullptr;
}
void AsyncIoStream::getsockopt(int level, int option, void* value, uint* length) {
KJ_UNIMPLEMENTED("Not a socket.");
}
......
......@@ -42,6 +42,7 @@ class UnixEventPort;
#endif
class NetworkAddress;
class AsyncOutputStream;
// =======================================================================================
// Streaming I/O
......@@ -55,6 +56,27 @@ public:
Promise<void> read(void* buffer, size_t bytes);
virtual Maybe<uint64_t> tryGetLength();
// Get the remaining number of bytes that will be produced by this stream, if known.
//
// This is used e.g. to fill in the Content-Length header of an HTTP message. If unknown, the
// HTTP implementation may need to fall back to Transfer-Encoding: chunked.
//
// The default implementation always returns null.
virtual Promise<uint64_t> pumpTo(
AsyncOutputStream& output, uint64_t amount = kj::maxValue);
// Read `amount` bytes from this stream (or to EOF) and write them to `output`, returning the
// total bytes actually pumped (which is only less than `amount` if EOF was reached).
//
// Override this if your stream type knows how to pump itself to certain kinds of output
// streams more efficiently than via the naive approach. You can use
// kj::dynamicDowncastIfAvailable() to test for stream types you recognize, and if none match,
// delegate to the default implementation.
//
// The default implementation first tries calling output.tryPumpFrom(), but if that fails, it
// performs a naive pump by allocating a buffer and reading to it / writing from it in a loop.
Promise<Array<byte>> readAllBytes();
Promise<String> readAllText();
// Read until EOF and return as one big byte array or string.
......@@ -66,6 +88,17 @@ class AsyncOutputStream {
public:
virtual Promise<void> write(const void* buffer, size_t size) = 0;
virtual Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) = 0;
virtual Maybe<Promise<uint64_t>> tryPumpFrom(
AsyncInputStream& input, uint64_t amount = kj::maxValue);
// Implements double-dispatch for AsyncInputStream::pumpTo().
//
// This method should only be called from within an implementation of pumpTo().
//
// This method examines the type of `input` to find optimized ways to pump data from it to this
// output stream. If it finds one, it performs the pump. Otherwise, it returns null.
//
// The default implementation always returns null.
};
class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment