Commit 3b85e1c1 authored by Kenton Varda's avatar Kenton Varda

Address Harris' review comments.

parent 76104a88
...@@ -2435,6 +2435,16 @@ public: ...@@ -2435,6 +2435,16 @@ public:
}; };
class WebSocketPipeImpl final: public AbortableWebSocket, public kj::Refcounted { class WebSocketPipeImpl final: public AbortableWebSocket, public kj::Refcounted {
// Represents one direction of a WebSocket pipe.
//
// This class behaves as a "loopback" WebSocket: a message sent using send() is received using
// receive(), on the same object. This is *not* how WebSocket implementations usually behave.
// But, this object is actually used to implement only one direction of a bidirectional pipe. At
// another layer above this, the pipe is actually composed of two WebSocketPipeEnd instances,
// which layer on top of two WebSocketPipeImpl instances representing the two directions. So,
// send() calls on a WebSocketPipeImpl instance always come from one of the two WebSocketPipeEnds
// while receive() calls come from the other end.
public: public:
~WebSocketPipeImpl() noexcept(false) { ~WebSocketPipeImpl() noexcept(false) {
KJ_REQUIRE(state == nullptr || ownState.get() != nullptr, KJ_REQUIRE(state == nullptr || ownState.get() != nullptr,
...@@ -2508,8 +2518,9 @@ public: ...@@ -2508,8 +2518,9 @@ public:
private: private:
kj::Maybe<AbortableWebSocket&> state; kj::Maybe<AbortableWebSocket&> state;
// Represents the state of the rendezvous for this end's send() calls / the other end's // Object-oriented state! If any method call is blocked waiting on activity from the other end,
// receive() calls. (otherEnd.sendState provides the reverse state.) // then `state` is non-null and method calls should be forwarded to it. If no calls are
// outstanding, `state` is null.
kj::Own<AbortableWebSocket> ownState; kj::Own<AbortableWebSocket> ownState;
...@@ -2736,6 +2747,7 @@ private: ...@@ -2736,6 +2747,7 @@ private:
kj::Maybe<kj::Promise<void>> tryPumpFrom(WebSocket& other) override { kj::Maybe<kj::Promise<void>> tryPumpFrom(WebSocket& other) override {
KJ_REQUIRE(canceler.isEmpty(), "already pumping"); KJ_REQUIRE(canceler.isEmpty(), "already pumping");
return canceler.wrap(other.receive().then([this,&other](Message message) { return canceler.wrap(other.receive().then([this,&other](Message message) {
canceler.release();
fulfiller.fulfill(kj::mv(message)); fulfiller.fulfill(kj::mv(message));
pipe.endState(*this); pipe.endState(*this);
return other.pumpTo(pipe); return other.pumpTo(pipe);
......
...@@ -646,7 +646,7 @@ struct WebSocketPipe { ...@@ -646,7 +646,7 @@ struct WebSocketPipe {
WebSocketPipe newWebSocketPipe(); WebSocketPipe newWebSocketPipe();
// Create a WebSocket pipe. Messages written to one end of the pipe will be readable from the other // Create a WebSocket pipe. Messages written to one end of the pipe will be readable from the other
// end. No buffering occurs -- a message send does not complete under a corresponding receive // end. No buffering occurs -- a message send does not complete until a corresponding receive
// accepts the message. // accepts the message.
struct HttpServerSettings { struct HttpServerSettings {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment