Commit 3b7c81a2 authored by Kenton Varda's avatar Kenton Varda Committed by GitHub

Merge pull request #404 from sandstorm-io/http

Add HTTP client and server implementation.
parents e5962ef2 180e6772
......@@ -333,6 +333,16 @@ TEST(Array, Map) {
EXPECT_STREQ("bcde", str(bar).cStr());
}
TEST(Array, MapRawArray) {
uint foo[4] = {1, 2, 3, 4};
Array<uint> bar = KJ_MAP(i, foo) -> uint { return i * i; };
ASSERT_EQ(4, bar.size());
EXPECT_EQ(1, bar[0]);
EXPECT_EQ(4, bar[1]);
EXPECT_EQ(9, bar[2]);
EXPECT_EQ(16, bar[3]);
}
TEST(Array, ReleaseAsBytesOrChars) {
{
Array<char> chars = kj::heapArray<char>("foo", 3);
......
......@@ -294,7 +294,7 @@ template <typename T> Array<T> heapArray(std::initializer_list<T> init);
// Allocate a heap array containing a copy of the given content.
template <typename T, typename Container>
Array<T> heapArrayFromIterable(Container&& a) { return heapArray(a.begin(), a.end()); }
Array<T> heapArrayFromIterable(Container&& a) { return heapArray<T>(a.begin(), a.end()); }
template <typename T>
Array<T> heapArrayFromIterable(Array<T>&& a) { return mv(a); }
......@@ -501,7 +501,8 @@ private:
// KJ_MAP
#define KJ_MAP(elementName, array) \
::kj::_::Mapper<KJ_DECLTYPE_REF(array)>(array) * [&](decltype(*(array).begin()) elementName)
::kj::_::Mapper<KJ_DECLTYPE_REF(array)>(array) * \
[&](typename ::kj::_::Mapper<KJ_DECLTYPE_REF(array)>::Element elementName)
// Applies some function to every element of an array, returning an Array of the results, with
// nice syntax. Example:
//
......@@ -523,6 +524,22 @@ struct Mapper {
}
return builder.finish();
}
typedef decltype(*(array).begin()) Element;
};
template <typename T, size_t s>
struct Mapper<T(&)[s]> {
T* array;
Mapper(T* array): array(array) {}
template <typename Func>
auto operator*(Func&& func) -> Array<decltype(func(*array))> {
auto builder = heapArrayBuilder<decltype(func(*array))>(s);
for (size_t i = 0; i < s; i++) {
builder.add(func(array[i]));
}
return builder.finish();
}
typedef decltype(*array)& Element;
};
} // namespace _ (private)
......
......@@ -21,12 +21,14 @@
#include "async-io.h"
#include "debug.h"
#include "vector.h"
namespace kj {
Promise<void> AsyncInputStream::read(void* buffer, size_t bytes) {
return read(buffer, bytes, bytes).then([](size_t) {});
}
Promise<size_t> AsyncInputStream::read(void* buffer, size_t minBytes, size_t maxBytes) {
return tryRead(buffer, minBytes, maxBytes).then([=](size_t result) {
KJ_REQUIRE(result >= minBytes, "Premature EOF") {
......@@ -38,6 +40,127 @@ Promise<size_t> AsyncInputStream::read(void* buffer, size_t minBytes, size_t max
});
}
Maybe<uint64_t> AsyncInputStream::tryGetLength() { return nullptr; }
namespace {
class AsyncPump {
public:
AsyncPump(AsyncInputStream& input, AsyncOutputStream& output, uint64_t limit)
: input(input), output(output), limit(limit) {}
Promise<uint64_t> pump() {
// TODO(perf): This could be more efficient by reading half a buffer at a time and then
// starting the next read concurrent with writing the data from the previous read.
uint64_t n = kj::min(limit - doneSoFar, sizeof(buffer));
if (n == 0) return doneSoFar;
return input.tryRead(buffer, 1, sizeof(buffer))
.then([this](size_t amount) -> Promise<uint64_t> {
if (amount == 0) return doneSoFar; // EOF
doneSoFar += amount;
return output.write(buffer, amount)
.then([this]() {
return pump();
});
});
}
private:
AsyncInputStream& input;
AsyncOutputStream& output;
uint64_t limit;
uint64_t doneSoFar = 0;
byte buffer[4096];
};
} // namespace
Promise<uint64_t> AsyncInputStream::pumpTo(
AsyncOutputStream& output, uint64_t amount) {
// See if output wants to dispatch on us.
KJ_IF_MAYBE(result, output.tryPumpFrom(*this, amount)) {
return kj::mv(*result);
}
// OK, fall back to naive approach.
auto pump = heap<AsyncPump>(*this, output, amount);
auto promise = pump->pump();
return promise.attach(kj::mv(pump));
}
namespace {
class AllReader {
public:
AllReader(AsyncInputStream& input): input(input) {}
Promise<Array<byte>> readAllBytes() {
return loop().then([this](uint64_t size) {
auto out = heapArray<byte>(size);
copyInto(out);
return out;
});
}
Promise<String> readAllText() {
return loop().then([this](uint64_t size) {
auto out = heapArray<char>(size + 1);
copyInto(out.slice(0, out.size() - 1).asBytes());
out.back() = '\0';
return String(kj::mv(out));
});
}
private:
AsyncInputStream& input;
Vector<Array<byte>> parts;
Promise<uint64_t> loop(uint64_t total = 0) {
auto part = heapArray<byte>(4096);
auto partPtr = part.asPtr();
parts.add(kj::mv(part));
return input.tryRead(partPtr.begin(), partPtr.size(), partPtr.size())
.then([this,partPtr,total](size_t amount) -> Promise<uint64_t> {
uint64_t newTotal = total + amount;
if (amount < partPtr.size()) {
return newTotal;
} else {
return loop(newTotal);
}
});
}
void copyInto(ArrayPtr<byte> out) {
size_t pos = 0;
for (auto& part: parts) {
size_t n = kj::min(part.size(), out.size() - pos);
memcpy(out.begin() + pos, part.begin(), n);
pos += n;
}
}
};
} // namespace
Promise<Array<byte>> AsyncInputStream::readAllBytes() {
auto reader = kj::heap<AllReader>(*this);
auto promise = reader->readAllBytes();
return promise.attach(kj::mv(reader));
}
Promise<String> AsyncInputStream::readAllText() {
auto reader = kj::heap<AllReader>(*this);
auto promise = reader->readAllText();
return promise.attach(kj::mv(reader));
}
Maybe<Promise<uint64_t>> AsyncOutputStream::tryPumpFrom(
AsyncInputStream& input, uint64_t amount) {
return nullptr;
}
void AsyncIoStream::getsockopt(int level, int option, void* value, uint* length) {
KJ_UNIMPLEMENTED("Not a socket.");
}
......
......@@ -42,6 +42,7 @@ class UnixEventPort;
#endif
class NetworkAddress;
class AsyncOutputStream;
// =======================================================================================
// Streaming I/O
......@@ -54,6 +55,31 @@ public:
virtual Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0;
Promise<void> read(void* buffer, size_t bytes);
virtual Maybe<uint64_t> tryGetLength();
// Get the remaining number of bytes that will be produced by this stream, if known.
//
// This is used e.g. to fill in the Content-Length header of an HTTP message. If unknown, the
// HTTP implementation may need to fall back to Transfer-Encoding: chunked.
//
// The default implementation always returns null.
virtual Promise<uint64_t> pumpTo(
AsyncOutputStream& output, uint64_t amount = kj::maxValue);
// Read `amount` bytes from this stream (or to EOF) and write them to `output`, returning the
// total bytes actually pumped (which is only less than `amount` if EOF was reached).
//
// Override this if your stream type knows how to pump itself to certain kinds of output
// streams more efficiently than via the naive approach. You can use
// kj::dynamicDowncastIfAvailable() to test for stream types you recognize, and if none match,
// delegate to the default implementation.
//
// The default implementation first tries calling output.tryPumpFrom(), but if that fails, it
// performs a naive pump by allocating a buffer and reading to it / writing from it in a loop.
Promise<Array<byte>> readAllBytes();
Promise<String> readAllText();
// Read until EOF and return as one big byte array or string.
};
class AsyncOutputStream {
......@@ -62,6 +88,17 @@ class AsyncOutputStream {
public:
virtual Promise<void> write(const void* buffer, size_t size) = 0;
virtual Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) = 0;
virtual Maybe<Promise<uint64_t>> tryPumpFrom(
AsyncInputStream& input, uint64_t amount = kj::maxValue);
// Implements double-dispatch for AsyncInputStream::pumpTo().
//
// This method should only be called from within an implementation of pumpTo().
//
// This method examines the type of `input` to find optimized ways to pump data from it to this
// output stream. If it finds one, it performs the pump. Otherwise, it returns null.
//
// The default implementation always returns null.
};
class AsyncIoStream: public AsyncInputStream, public AsyncOutputStream {
......
......@@ -737,6 +737,7 @@ public:
inline Iterator end() const { return Iterator(value, count); }
inline size_t size() const { return count; }
inline T operator[](ptrdiff_t) const { return value; }
private:
T value;
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -107,11 +107,15 @@ public:
return CharGroup_(~bits[0], ~bits[1], ~bits[2], ~bits[3]);
}
constexpr inline bool contains(char c) const {
return (bits[c / 64] & (1ll << (c % 64))) != 0;
}
template <typename Input>
Maybe<char> operator()(Input& input) const {
if (input.atEnd()) return nullptr;
unsigned char c = input.current();
if ((bits[c / 64] & (1ll << (c % 64))) != 0) {
if (contains(c)) {
input.next();
return c;
} else {
......
......@@ -170,6 +170,10 @@ public:
inline ArrayPtr<const byte> asBytes() const { return asArray().asBytes(); }
// Result does not include NUL terminator.
inline Array<char> releaseArray() { return kj::mv(content); }
// Disowns the backing array (which includes the NUL terminator) and returns it. The String value
// is clobbered (as if moved away).
inline const char* cStr() const;
inline size_t size() const;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment