Commit 40109b72 authored by Kenton Varda's avatar Kenton Varda

Add convenience readAll{Text,Bytes}() methods to AsyncInputStream.

parent 9182e67f
...@@ -21,12 +21,14 @@ ...@@ -21,12 +21,14 @@
#include "async-io.h" #include "async-io.h"
#include "debug.h" #include "debug.h"
#include "vector.h"
namespace kj { namespace kj {
Promise<void> AsyncInputStream::read(void* buffer, size_t bytes) { Promise<void> AsyncInputStream::read(void* buffer, size_t bytes) {
return read(buffer, bytes, bytes).then([](size_t) {}); return read(buffer, bytes, bytes).then([](size_t) {});
} }
Promise<size_t> AsyncInputStream::read(void* buffer, size_t minBytes, size_t maxBytes) { Promise<size_t> AsyncInputStream::read(void* buffer, size_t minBytes, size_t maxBytes) {
return tryRead(buffer, minBytes, maxBytes).then([=](size_t result) { return tryRead(buffer, minBytes, maxBytes).then([=](size_t result) {
KJ_REQUIRE(result >= minBytes, "Premature EOF") { KJ_REQUIRE(result >= minBytes, "Premature EOF") {
...@@ -38,6 +40,72 @@ Promise<size_t> AsyncInputStream::read(void* buffer, size_t minBytes, size_t max ...@@ -38,6 +40,72 @@ Promise<size_t> AsyncInputStream::read(void* buffer, size_t minBytes, size_t max
}); });
} }
namespace {
class AllReader {
public:
AllReader(AsyncInputStream& input): input(input) {}
Promise<Array<byte>> readAllBytes() {
return loop().then([this](uint64_t size) {
auto out = heapArray<byte>(size);
copyInto(out);
return out;
});
}
Promise<String> readAllText() {
return loop().then([this](uint64_t size) {
auto out = heapArray<char>(size + 1);
copyInto(out.slice(0, out.size() - 1).asBytes());
out.back() = '\0';
return String(kj::mv(out));
});
}
private:
AsyncInputStream& input;
Vector<Array<byte>> parts;
Promise<uint64_t> loop(uint64_t total = 0) {
auto part = heapArray<byte>(4096);
auto partPtr = part.asPtr();
parts.add(kj::mv(part));
return input.tryRead(partPtr.begin(), partPtr.size(), partPtr.size())
.then([this,partPtr,total](size_t amount) -> Promise<uint64_t> {
uint64_t newTotal = total + amount;
if (amount < partPtr.size()) {
return newTotal;
} else {
return loop(newTotal);
}
});
}
void copyInto(ArrayPtr<byte> out) {
size_t pos = 0;
for (auto& part: parts) {
size_t n = kj::min(part.size(), out.size() - pos);
memcpy(out.begin() + pos, part.begin(), n);
pos += n;
}
}
};
} // namespace
Promise<Array<byte>> AsyncInputStream::readAllBytes() {
auto reader = kj::heap<AllReader>(*this);
auto promise = reader->readAllBytes();
return promise.attach(kj::mv(reader));
}
Promise<String> AsyncInputStream::readAllText() {
auto reader = kj::heap<AllReader>(*this);
auto promise = reader->readAllText();
return promise.attach(kj::mv(reader));
}
void AsyncIoStream::getsockopt(int level, int option, void* value, uint* length) { void AsyncIoStream::getsockopt(int level, int option, void* value, uint* length) {
KJ_UNIMPLEMENTED("Not a socket."); KJ_UNIMPLEMENTED("Not a socket.");
} }
......
...@@ -54,6 +54,10 @@ public: ...@@ -54,6 +54,10 @@ public:
virtual Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0; virtual Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0;
Promise<void> read(void* buffer, size_t bytes); Promise<void> read(void* buffer, size_t bytes);
Promise<Array<byte>> readAllBytes();
Promise<String> readAllText();
// Read until EOF and return as one big byte array or string.
}; };
class AsyncOutputStream { class AsyncOutputStream {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment