Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
C
capnproto
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
capnproto
Commits
d3bc948e
Commit
d3bc948e
authored
Aug 31, 2018
by
Kenton Varda
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Make it possible to configure HttpClient to not reuse connections at all.
parent
ab359776
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
93 additions
and
7 deletions
+93
-7
http-test.c++
c++/src/kj/compat/http-test.c++
+89
-4
http.c++
c++/src/kj/compat/http.c++
+3
-2
http.h
c++/src/kj/compat/http.h
+1
-1
No files found.
c++/src/kj/compat/http-test.c++
View file @
d3bc948e
...
@@ -2604,16 +2604,18 @@ public:
...
@@ -2604,16 +2604,18 @@ public:
class
CountingNetworkAddress
final
:
public
kj
::
NetworkAddress
{
class
CountingNetworkAddress
final
:
public
kj
::
NetworkAddress
{
public
:
public
:
CountingNetworkAddress
(
kj
::
NetworkAddress
&
inner
,
uint
&
count
)
CountingNetworkAddress
(
kj
::
NetworkAddress
&
inner
,
uint
&
count
,
uint
&
cumulative
)
:
inner
(
inner
),
count
(
count
),
addrCount
(
ownAddrCount
)
{}
:
inner
(
inner
),
count
(
count
),
addrCount
(
ownAddrCount
)
,
cumulative
(
cumulative
)
{}
CountingNetworkAddress
(
kj
::
Own
<
kj
::
NetworkAddress
>
inner
,
uint
&
count
,
uint
&
addrCount
)
CountingNetworkAddress
(
kj
::
Own
<
kj
::
NetworkAddress
>
inner
,
uint
&
count
,
uint
&
addrCount
)
:
inner
(
*
inner
),
ownInner
(
kj
::
mv
(
inner
)),
count
(
count
),
addrCount
(
addrCount
)
{}
:
inner
(
*
inner
),
ownInner
(
kj
::
mv
(
inner
)),
count
(
count
),
addrCount
(
addrCount
),
cumulative
(
ownCumulative
)
{}
~
CountingNetworkAddress
()
noexcept
(
false
)
{
~
CountingNetworkAddress
()
noexcept
(
false
)
{
--
addrCount
;
--
addrCount
;
}
}
kj
::
Promise
<
kj
::
Own
<
kj
::
AsyncIoStream
>>
connect
()
override
{
kj
::
Promise
<
kj
::
Own
<
kj
::
AsyncIoStream
>>
connect
()
override
{
++
count
;
++
count
;
++
cumulative
;
return
inner
.
connect
()
return
inner
.
connect
()
.
then
([
this
](
kj
::
Own
<
kj
::
AsyncIoStream
>
stream
)
->
kj
::
Own
<
kj
::
AsyncIoStream
>
{
.
then
([
this
](
kj
::
Own
<
kj
::
AsyncIoStream
>
stream
)
->
kj
::
Own
<
kj
::
AsyncIoStream
>
{
return
kj
::
heap
<
CountingIoStream
>
(
kj
::
mv
(
stream
),
count
);
return
kj
::
heap
<
CountingIoStream
>
(
kj
::
mv
(
stream
),
count
);
...
@@ -2630,6 +2632,8 @@ private:
...
@@ -2630,6 +2632,8 @@ private:
uint
&
count
;
uint
&
count
;
uint
ownAddrCount
=
1
;
uint
ownAddrCount
=
1
;
uint
&
addrCount
;
uint
&
addrCount
;
uint
ownCumulative
=
0
;
uint
&
cumulative
;
};
};
class
ConnectionCountingNetwork
final
:
public
kj
::
Network
{
class
ConnectionCountingNetwork
final
:
public
kj
::
Network
{
...
@@ -2710,7 +2714,8 @@ KJ_TEST("HttpClient connection management") {
...
@@ -2710,7 +2714,8 @@ KJ_TEST("HttpClient connection management") {
auto
addr
=
io
.
provider
->
getNetwork
().
parseAddress
(
"localhost"
,
listener
->
getPort
())
auto
addr
=
io
.
provider
->
getNetwork
().
parseAddress
(
"localhost"
,
listener
->
getPort
())
.
wait
(
io
.
waitScope
);
.
wait
(
io
.
waitScope
);
uint
count
=
0
;
uint
count
=
0
;
CountingNetworkAddress
countingAddr
(
*
addr
,
count
);
uint
cumulative
=
0
;
CountingNetworkAddress
countingAddr
(
*
addr
,
count
,
cumulative
);
FakeEntropySource
entropySource
;
FakeEntropySource
entropySource
;
HttpClientSettings
clientSettings
;
HttpClientSettings
clientSettings
;
...
@@ -2718,6 +2723,7 @@ KJ_TEST("HttpClient connection management") {
...
@@ -2718,6 +2723,7 @@ KJ_TEST("HttpClient connection management") {
auto
client
=
newHttpClient
(
clientTimer
,
headerTable
,
countingAddr
,
clientSettings
);
auto
client
=
newHttpClient
(
clientTimer
,
headerTable
,
countingAddr
,
clientSettings
);
KJ_EXPECT
(
count
==
0
);
KJ_EXPECT
(
count
==
0
);
KJ_EXPECT
(
cumulative
==
0
);
uint
i
=
0
;
uint
i
=
0
;
auto
doRequest
=
[
&
]()
{
auto
doRequest
=
[
&
]()
{
...
@@ -2736,6 +2742,7 @@ KJ_TEST("HttpClient connection management") {
...
@@ -2736,6 +2742,7 @@ KJ_TEST("HttpClient connection management") {
doRequest
().
wait
(
io
.
waitScope
);
doRequest
().
wait
(
io
.
waitScope
);
doRequest
().
wait
(
io
.
waitScope
);
doRequest
().
wait
(
io
.
waitScope
);
KJ_EXPECT
(
count
==
1
);
KJ_EXPECT
(
count
==
1
);
KJ_EXPECT
(
cumulative
==
1
);
// But if we do two in parallel, we'll end up with two connections.
// But if we do two in parallel, we'll end up with two connections.
auto
req1
=
doRequest
();
auto
req1
=
doRequest
();
...
@@ -2743,6 +2750,7 @@ KJ_TEST("HttpClient connection management") {
...
@@ -2743,6 +2750,7 @@ KJ_TEST("HttpClient connection management") {
req1
.
wait
(
io
.
waitScope
);
req1
.
wait
(
io
.
waitScope
);
req2
.
wait
(
io
.
waitScope
);
req2
.
wait
(
io
.
waitScope
);
KJ_EXPECT
(
count
==
2
);
KJ_EXPECT
(
count
==
2
);
KJ_EXPECT
(
cumulative
==
2
);
// We can reuse after a POST, provided we write the whole POST body properly.
// We can reuse after a POST, provided we write the whole POST body properly.
{
{
...
@@ -2752,8 +2760,10 @@ KJ_TEST("HttpClient connection management") {
...
@@ -2752,8 +2760,10 @@ KJ_TEST("HttpClient connection management") {
req
.
response
.
wait
(
io
.
waitScope
).
body
->
readAllBytes
().
wait
(
io
.
waitScope
);
req
.
response
.
wait
(
io
.
waitScope
).
body
->
readAllBytes
().
wait
(
io
.
waitScope
);
}
}
KJ_EXPECT
(
count
==
2
);
KJ_EXPECT
(
count
==
2
);
KJ_EXPECT
(
cumulative
==
2
);
doRequest
().
wait
(
io
.
waitScope
);
doRequest
().
wait
(
io
.
waitScope
);
KJ_EXPECT
(
count
==
2
);
KJ_EXPECT
(
count
==
2
);
KJ_EXPECT
(
cumulative
==
2
);
// Advance time for half the timeout, then exercise one of the connections.
// Advance time for half the timeout, then exercise one of the connections.
clientTimer
.
advanceTo
(
clientTimer
.
now
()
+
clientSettings
.
idleTimout
/
2
);
clientTimer
.
advanceTo
(
clientTimer
.
now
()
+
clientSettings
.
idleTimout
/
2
);
...
@@ -2761,52 +2771,65 @@ KJ_TEST("HttpClient connection management") {
...
@@ -2761,52 +2771,65 @@ KJ_TEST("HttpClient connection management") {
doRequest
().
wait
(
io
.
waitScope
);
doRequest
().
wait
(
io
.
waitScope
);
io
.
waitScope
.
poll
();
io
.
waitScope
.
poll
();
KJ_EXPECT
(
count
==
2
);
KJ_EXPECT
(
count
==
2
);
KJ_EXPECT
(
cumulative
==
2
);
// Advance time past when the other connection should time out. It should be dropped.
// Advance time past when the other connection should time out. It should be dropped.
clientTimer
.
advanceTo
(
clientTimer
.
now
()
+
clientSettings
.
idleTimout
*
3
/
4
);
clientTimer
.
advanceTo
(
clientTimer
.
now
()
+
clientSettings
.
idleTimout
*
3
/
4
);
io
.
waitScope
.
poll
();
io
.
waitScope
.
poll
();
KJ_EXPECT
(
count
==
1
);
KJ_EXPECT
(
count
==
1
);
KJ_EXPECT
(
cumulative
==
2
);
// Wait for the other to drop.
// Wait for the other to drop.
clientTimer
.
advanceTo
(
clientTimer
.
now
()
+
clientSettings
.
idleTimout
/
2
);
clientTimer
.
advanceTo
(
clientTimer
.
now
()
+
clientSettings
.
idleTimout
/
2
);
io
.
waitScope
.
poll
();
io
.
waitScope
.
poll
();
KJ_EXPECT
(
count
==
0
);
KJ_EXPECT
(
count
==
0
);
KJ_EXPECT
(
cumulative
==
2
);
// New request creates a new connection again.
// New request creates a new connection again.
doRequest
().
wait
(
io
.
waitScope
);
doRequest
().
wait
(
io
.
waitScope
);
KJ_EXPECT
(
count
==
1
);
KJ_EXPECT
(
count
==
1
);
KJ_EXPECT
(
cumulative
==
3
);
// WebSocket connections are not reused.
// WebSocket connections are not reused.
client
->
openWebSocket
(
kj
::
str
(
"/websocket"
),
HttpHeaders
(
headerTable
))
client
->
openWebSocket
(
kj
::
str
(
"/websocket"
),
HttpHeaders
(
headerTable
))
.
wait
(
io
.
waitScope
);
.
wait
(
io
.
waitScope
);
KJ_EXPECT
(
count
==
0
);
KJ_EXPECT
(
count
==
0
);
KJ_EXPECT
(
cumulative
==
3
);
// Errored connections are not reused.
// Errored connections are not reused.
doRequest
().
wait
(
io
.
waitScope
);
doRequest
().
wait
(
io
.
waitScope
);
KJ_EXPECT
(
count
==
1
);
KJ_EXPECT
(
count
==
1
);
KJ_EXPECT
(
cumulative
==
4
);
client
->
request
(
HttpMethod
::
GET
,
kj
::
str
(
"/throw"
),
HttpHeaders
(
headerTable
)).
response
client
->
request
(
HttpMethod
::
GET
,
kj
::
str
(
"/throw"
),
HttpHeaders
(
headerTable
)).
response
.
wait
(
io
.
waitScope
).
body
->
readAllBytes
().
wait
(
io
.
waitScope
);
.
wait
(
io
.
waitScope
).
body
->
readAllBytes
().
wait
(
io
.
waitScope
);
KJ_EXPECT
(
count
==
0
);
KJ_EXPECT
(
count
==
0
);
KJ_EXPECT
(
cumulative
==
4
);
// Connections where we failed to read the full response body are not reused.
// Connections where we failed to read the full response body are not reused.
doRequest
().
wait
(
io
.
waitScope
);
doRequest
().
wait
(
io
.
waitScope
);
KJ_EXPECT
(
count
==
1
);
KJ_EXPECT
(
count
==
1
);
KJ_EXPECT
(
cumulative
==
5
);
client
->
request
(
HttpMethod
::
GET
,
kj
::
str
(
"/foo"
),
HttpHeaders
(
headerTable
)).
response
client
->
request
(
HttpMethod
::
GET
,
kj
::
str
(
"/foo"
),
HttpHeaders
(
headerTable
)).
response
.
wait
(
io
.
waitScope
);
.
wait
(
io
.
waitScope
);
KJ_EXPECT
(
count
==
0
);
KJ_EXPECT
(
count
==
0
);
KJ_EXPECT
(
cumulative
==
5
);
// Connections where we didn't even wait for the response headers are not reused.
// Connections where we didn't even wait for the response headers are not reused.
doRequest
().
wait
(
io
.
waitScope
);
doRequest
().
wait
(
io
.
waitScope
);
KJ_EXPECT
(
count
==
1
);
KJ_EXPECT
(
count
==
1
);
KJ_EXPECT
(
cumulative
==
6
);
client
->
request
(
HttpMethod
::
GET
,
kj
::
str
(
"/foo"
),
HttpHeaders
(
headerTable
));
client
->
request
(
HttpMethod
::
GET
,
kj
::
str
(
"/foo"
),
HttpHeaders
(
headerTable
));
KJ_EXPECT
(
count
==
0
);
KJ_EXPECT
(
count
==
0
);
KJ_EXPECT
(
cumulative
==
6
);
// Connections where we failed to write the full request body are not reused.
// Connections where we failed to write the full request body are not reused.
doRequest
().
wait
(
io
.
waitScope
);
doRequest
().
wait
(
io
.
waitScope
);
KJ_EXPECT
(
count
==
1
);
KJ_EXPECT
(
count
==
1
);
KJ_EXPECT
(
cumulative
==
7
);
client
->
request
(
HttpMethod
::
POST
,
kj
::
str
(
"/foo"
),
HttpHeaders
(
headerTable
),
size_t
(
6
)).
response
client
->
request
(
HttpMethod
::
POST
,
kj
::
str
(
"/foo"
),
HttpHeaders
(
headerTable
),
size_t
(
6
)).
response
.
wait
(
io
.
waitScope
).
body
->
readAllBytes
().
wait
(
io
.
waitScope
);
.
wait
(
io
.
waitScope
).
body
->
readAllBytes
().
wait
(
io
.
waitScope
);
KJ_EXPECT
(
count
==
0
);
KJ_EXPECT
(
count
==
0
);
KJ_EXPECT
(
cumulative
==
7
);
#if __linux__
#if __linux__
// TODO(someday): Figure out why this doesn't work on Windows and is flakey on Mac. My guess is
// TODO(someday): Figure out why this doesn't work on Windows and is flakey on Mac. My guess is
...
@@ -2821,14 +2844,76 @@ KJ_TEST("HttpClient connection management") {
...
@@ -2821,14 +2844,76 @@ KJ_TEST("HttpClient connection management") {
// If the server times out the connection, we figure it out on the client.
// If the server times out the connection, we figure it out on the client.
doRequest
().
wait
(
io
.
waitScope
);
doRequest
().
wait
(
io
.
waitScope
);
KJ_EXPECT
(
count
==
1
);
KJ_EXPECT
(
count
==
1
);
KJ_EXPECT
(
cumulative
==
8
);
serverTimer
.
advanceTo
(
serverTimer
.
now
()
+
serverSettings
.
pipelineTimeout
*
2
);
serverTimer
.
advanceTo
(
serverTimer
.
now
()
+
serverSettings
.
pipelineTimeout
*
2
);
io
.
waitScope
.
poll
();
io
.
waitScope
.
poll
();
KJ_EXPECT
(
count
==
0
);
KJ_EXPECT
(
count
==
0
);
KJ_EXPECT
(
cumulative
==
8
);
#else
++
cumulative
;
// hack
#endif
#endif
// Can still make requests.
// Can still make requests.
doRequest
().
wait
(
io
.
waitScope
);
doRequest
().
wait
(
io
.
waitScope
);
KJ_EXPECT
(
count
==
1
);
KJ_EXPECT
(
count
==
1
);
KJ_EXPECT
(
cumulative
==
9
);
}
KJ_TEST
(
"HttpClient disable connection reuse"
)
{
auto
io
=
kj
::
setupAsyncIo
();
kj
::
TimerImpl
serverTimer
(
kj
::
origin
<
kj
::
TimePoint
>
());
kj
::
TimerImpl
clientTimer
(
kj
::
origin
<
kj
::
TimePoint
>
());
HttpHeaderTable
headerTable
;
auto
listener
=
io
.
provider
->
getNetwork
().
parseAddress
(
"localhost"
,
0
)
.
wait
(
io
.
waitScope
)
->
listen
();
DummyService
service
(
headerTable
);
HttpServerSettings
serverSettings
;
HttpServer
server
(
serverTimer
,
headerTable
,
service
,
serverSettings
);
auto
listenTask
=
server
.
listenHttp
(
*
listener
);
auto
addr
=
io
.
provider
->
getNetwork
().
parseAddress
(
"localhost"
,
listener
->
getPort
())
.
wait
(
io
.
waitScope
);
uint
count
=
0
;
uint
cumulative
=
0
;
CountingNetworkAddress
countingAddr
(
*
addr
,
count
,
cumulative
);
FakeEntropySource
entropySource
;
HttpClientSettings
clientSettings
;
clientSettings
.
entropySource
=
entropySource
;
clientSettings
.
idleTimout
=
0
*
kj
::
SECONDS
;
auto
client
=
newHttpClient
(
clientTimer
,
headerTable
,
countingAddr
,
clientSettings
);
KJ_EXPECT
(
count
==
0
);
KJ_EXPECT
(
cumulative
==
0
);
uint
i
=
0
;
auto
doRequest
=
[
&
]()
{
uint
n
=
i
++
;
return
client
->
request
(
HttpMethod
::
GET
,
kj
::
str
(
"/"
,
n
),
HttpHeaders
(
headerTable
)).
response
.
then
([](
HttpClient
::
Response
&&
response
)
{
auto
promise
=
response
.
body
->
readAllText
();
return
promise
.
attach
(
kj
::
mv
(
response
.
body
));
}).
then
([
n
](
kj
::
String
body
)
{
KJ_EXPECT
(
body
==
kj
::
str
(
"null:/"
,
n
));
});
};
// We can do several requests in a row and only have one connection.
doRequest
().
wait
(
io
.
waitScope
);
doRequest
().
wait
(
io
.
waitScope
);
doRequest
().
wait
(
io
.
waitScope
);
KJ_EXPECT
(
count
==
0
);
KJ_EXPECT
(
cumulative
==
3
);
// But if we do two in parallel, we'll end up with two connections.
auto
req1
=
doRequest
();
auto
req2
=
doRequest
();
req1
.
wait
(
io
.
waitScope
);
req2
.
wait
(
io
.
waitScope
);
KJ_EXPECT
(
count
==
0
);
KJ_EXPECT
(
cumulative
==
5
);
}
}
KJ_TEST
(
"HttpClient multi host"
)
{
KJ_TEST
(
"HttpClient multi host"
)
{
...
...
c++/src/kj/compat/http.c++
View file @
d3bc948e
...
@@ -3516,8 +3516,9 @@ private:
...
@@ -3516,8 +3516,9 @@ private:
}
}
void
returnClientToAvailable
(
kj
::
Own
<
HttpClientImpl
>
client
)
{
void
returnClientToAvailable
(
kj
::
Own
<
HttpClientImpl
>
client
)
{
// Only return the connection to the pool if it is reusable.
// Only return the connection to the pool if it is reusable and if our settings indicate we
if
(
client
->
canReuse
())
{
// should reuse connections.
if
(
client
->
canReuse
()
&&
settings
.
idleTimout
>
0
*
kj
::
SECONDS
)
{
availableClients
.
push_back
(
AvailableClient
{
availableClients
.
push_back
(
AvailableClient
{
kj
::
mv
(
client
),
timer
.
now
()
+
settings
.
idleTimout
kj
::
mv
(
client
),
timer
.
now
()
+
settings
.
idleTimout
});
});
...
...
c++/src/kj/compat/http.h
View file @
d3bc948e
...
@@ -583,7 +583,7 @@ public:
...
@@ -583,7 +583,7 @@ public:
struct
HttpClientSettings
{
struct
HttpClientSettings
{
kj
::
Duration
idleTimout
=
5
*
kj
::
SECONDS
;
kj
::
Duration
idleTimout
=
5
*
kj
::
SECONDS
;
// For clients which automatically create new connections, any connection idle for at least this
// For clients which automatically create new connections, any connection idle for at least this
// long will be closed.
// long will be closed.
Set this to 0 to prevent connection reuse entirely.
kj
::
Maybe
<
EntropySource
&>
entropySource
=
nullptr
;
kj
::
Maybe
<
EntropySource
&>
entropySource
=
nullptr
;
// Must be provided in order to use `openWebSocket`. If you don't need WebSockets, this can be
// Must be provided in order to use `openWebSocket`. If you don't need WebSockets, this can be
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment