Commit 35cd7b1c authored by Zhu Jiashun's avatar Zhu Jiashun Committed by gejun

Add http2.

TODO: 1. socket management 2. window mechanism
parent a9e95487
...@@ -212,6 +212,11 @@ public: ...@@ -212,6 +212,11 @@ public:
return *_http_request; return *_http_request;
} }
bool has_http_request() const { return _http_request; } bool has_http_request() const { return _http_request; }
HttpHeader* release_http_request() {
HttpHeader* const tmp = _http_request;
_http_request = NULL;
return tmp;
}
// User attached data or body of http request, which is wired to network // User attached data or body of http request, which is wired to network
// directly instead of being serialized into protobuf messages. // directly instead of being serialized into protobuf messages.
...@@ -334,6 +339,11 @@ public: ...@@ -334,6 +339,11 @@ public:
return *_http_response; return *_http_response;
} }
bool has_http_response() const { return _http_response; } bool has_http_response() const { return _http_response; }
HttpHeader* release_http_response() {
HttpHeader* const tmp = _http_response;
_http_response = NULL;
return tmp;
}
// User attached data or body of http response, which is wired to network // User attached data or body of http response, which is wired to network
// directly instead of being serialized into protobuf messages. // directly instead of being serialized into protobuf messages.
......
This diff is collapsed.
...@@ -19,20 +19,21 @@ ...@@ -19,20 +19,21 @@
#include "butil/iobuf.h" // butil::IOBuf #include "butil/iobuf.h" // butil::IOBuf
#include "butil/strings/string_piece.h" // butil::StringPiece #include "butil/strings/string_piece.h" // butil::StringPiece
#include "brpc/http2.h"
#include "brpc/describable.h"
namespace brpc { namespace brpc {
enum HeaderIndexPolicy { enum HeaderIndexPolicy {
// Append this header, alerting the decoder dynamic table // Append this header, alerting the decoder dynamic table
// - If the given header matches one of the indexed header, this header // - If the given header matches one of the indexed header, this header
// replaced by the index. // is replaced by the index.
// - If not, append this header into the decoder dynamic table // - If not, append this header into the decoder dynamic table
HPACK_INDEX_HEADER = 0, HPACK_INDEX_HEADER = 0,
// Append this header, without alerting the decoder dynamic table // Append this header, without alerting the decoder dynamic table
// - If the given header matches one of the indexed header, this header // - If the given header matches one of the indexed header, this header
// replaced by the index. // is replaced by the index.
// - If not, append this header directly *WITHOUT* any modification on the // - If not, append this header directly *WITHOUT* any modification on the
// decoder dynamic table // decoder dynamic table
HPACK_NOT_INDEX_HEADER = 1, HPACK_NOT_INDEX_HEADER = 1,
...@@ -79,35 +80,36 @@ class IndexTable; ...@@ -79,35 +80,36 @@ class IndexTable;
// header field names MUST be treated as malformed // header field names MUST be treated as malformed
// Not supported methods: // Not supported methods:
// - Resize dynamic table. // - Resize dynamic table.
class HPacker { class HPacker : public Describable {
public: public:
struct Header { struct Header {
std::string name; std::string name;
std::string value; std::string value;
Header() {}
explicit Header(const std::string& name2) : name(name2) {}
Header(const std::string& name2, const std::string& value2)
: name(name2), value(value2) {}
}; };
HPacker(); HPacker();
~HPacker(); ~HPacker();
// According to rfc7540#section-6.5.2.
// The initial value of SETTING_HEADER_TABLE_SIZE is 4096 octets.
const static size_t DEFAULT_HEADER_TABLE_SIZE = 4096;
// Initialize the instance. // Initialize the instance.
// Returns 0 on success, -1 otherwise. // Returns 0 on success, -1 otherwise.
int Init(size_t max_table_size = DEFAULT_HEADER_TABLE_SIZE); int Init(size_t max_table_size = H2Settings::DEFAULT_HEADER_TABLE_SIZE);
// Encode header and append the encoded buffer to |out| // Encode header and append the encoded buffer to |out|
// Returns the size of encoded buffer on success, -1 otherwise // Returns true on success.
ssize_t Encode(butil::IOBufAppender* out, const Header& header, void Encode(butil::IOBufAppender* out, const Header& header,
const HPackOptions& options); const HPackOptions& options);
ssize_t Encode(butil::IOBufAppender* out, const Header& header) void Encode(butil::IOBufAppender* out, const Header& header)
{ return Encode(out, header, HPackOptions()); } { return Encode(out, header, HPackOptions()); }
// Try to decode at most one Header from source and erase correspoding // Try to decode at most one Header from source and erase corresponding
// buffer. // buffer.
// Returns: // Returns:
// * $size of decoded buffer is a header is succesfully decoded // * $size of decoded buffer when a header is succesfully decoded
// * 0 when the source is incompleted // * 0 when the source is incompleted
// * -1 when the source is malformed // * -1 when the source is malformed
ssize_t Decode(butil::IOBuf* source, Header* h); ssize_t Decode(butil::IOBuf* source, Header* h);
...@@ -115,7 +117,11 @@ public: ...@@ -115,7 +117,11 @@ public:
// Like the previous function, except that the source is from // Like the previous function, except that the source is from
// IOBufBytesIterator. // IOBufBytesIterator.
ssize_t Decode(butil::IOBufBytesIterator& source, Header* h); ssize_t Decode(butil::IOBufBytesIterator& source, Header* h);
void Describe(std::ostream& os, const DescribeOptions&) const;
private: private:
DISALLOW_COPY_AND_ASSIGN(HPacker);
int FindHeaderFromIndexTable(const Header& h) const; int FindHeaderFromIndexTable(const Header& h) const;
int FindNameFromIndexTable(const std::string& name) const; int FindNameFromIndexTable(const std::string& name) const;
const Header* HeaderAt(int index) const; const Header* HeaderAt(int index) const;
...@@ -126,6 +132,9 @@ private: ...@@ -126,6 +132,9 @@ private:
IndexTable* _decode_table; IndexTable* _decode_table;
}; };
// Lowercase the input string, a fast implementation.
void tolower(std::string* s);
inline ssize_t HPacker::Decode(butil::IOBuf* source, Header* h) { inline ssize_t HPacker::Decode(butil::IOBuf* source, Header* h) {
butil::IOBufBytesIterator iter(*source); butil::IOBufBytesIterator iter(*source);
const ssize_t nc = Decode(iter, h); const ssize_t nc = Decode(iter, h);
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "butil/macros.h" #include "butil/macros.h"
#include "butil/iobuf.h" // butil::IOBuf #include "butil/iobuf.h" // butil::IOBuf
#include "butil/scoped_lock.h" // butil::unique_lock #include "butil/scoped_lock.h" // butil::unique_lock
#include "butil/endpoint.h"
#include "brpc/details/http_parser.h" // http_parser #include "brpc/details/http_parser.h" // http_parser
#include "brpc/http_header.h" // HttpHeader #include "brpc/http_header.h" // HttpHeader
#include "brpc/progressive_reader.h" // ProgressiveReader #include "brpc/progressive_reader.h" // ProgressiveReader
......
...@@ -49,6 +49,7 @@ ...@@ -49,6 +49,7 @@
#include "brpc/protocol.h" #include "brpc/protocol.h"
#include "brpc/policy/baidu_rpc_protocol.h" #include "brpc/policy/baidu_rpc_protocol.h"
#include "brpc/policy/http_rpc_protocol.h" #include "brpc/policy/http_rpc_protocol.h"
#include "brpc/policy/http2_rpc_protocol.h"
#include "brpc/policy/hulu_pbrpc_protocol.h" #include "brpc/policy/hulu_pbrpc_protocol.h"
#include "brpc/policy/nova_pbrpc_protocol.h" #include "brpc/policy/nova_pbrpc_protocol.h"
#include "brpc/policy/public_pbrpc_protocol.h" #include "brpc/policy/public_pbrpc_protocol.h"
...@@ -396,6 +397,17 @@ static void GlobalInitializeOrDieImpl() { ...@@ -396,6 +397,17 @@ static void GlobalInitializeOrDieImpl() {
exit(1); exit(1);
} }
Protocol http2_protocol = { ParseH2Message,
SerializeHttpRequest, PackH2Request,
ProcessHttpRequest, ProcessHttpResponse,
VerifyHttpRequest, ParseHttpServerAddress,
GetHttpMethodName,
CONNECTION_TYPE_SINGLE,
"h2c" };
if (RegisterProtocol(PROTOCOL_HTTP2, http2_protocol) != 0) {
exit(1);
}
Protocol hulu_protocol = { ParseHuluMessage, Protocol hulu_protocol = { ParseHuluMessage,
SerializeRequestDefault, PackHuluRequest, SerializeRequestDefault, PackHuluRequest,
ProcessHuluRequest, ProcessHuluResponse, ProcessHuluRequest, ProcessHuluResponse,
......
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "brpc/http2.h"
#include "brpc/details/hpack.h"
#include <limits>
#include "butil/logging.h"
namespace brpc {
enum H2SettingsIdentifier {
HTTP2_SETTINGS_HEADER_TABLE_SIZE = 0x1,
HTTP2_SETTINGS_ENABLE_PUSH = 0x2,
HTTP2_SETTINGS_MAX_CONCURRENT_STREAMS = 0x3,
HTTP2_SETTINGS_INITIAL_WINDOW_SIZE = 0x4,
HTTP2_SETTINGS_MAX_FRAME_SIZE = 0x5,
HTTP2_SETTINGS_MAX_HEADER_LIST_SIZE = 0x6
};
inline uint16_t LoadUint16(butil::IOBufBytesIterator& it) {
uint16_t v = *it; ++it;
v = ((v << 8) | *it); ++it;
return v;
}
inline uint32_t LoadUint32(butil::IOBufBytesIterator& it) {
uint32_t v = *it; ++it;
v = ((v << 8) | *it); ++it;
v = ((v << 8) | *it); ++it;
v = ((v << 8) | *it); ++it;
return v;
}
inline void SaveUint16(void* out, uint16_t v) {
uint8_t* p = (uint8_t*)out;
p[0] = (v >> 8) & 0xFF;
p[1] = v & 0xFF;
}
inline void SaveUint32(void* out, uint32_t v) {
uint8_t* p = (uint8_t*)out;
p[0] = (v >> 24) & 0xFF;
p[1] = (v >> 16) & 0xFF;
p[2] = (v >> 8) & 0xFF;
p[3] = v & 0xFF;
}
H2Settings::H2Settings()
: header_table_size(DEFAULT_HEADER_TABLE_SIZE)
, enable_push(DEFAULT_ENABLE_PUSH)
, max_concurrent_streams(std::numeric_limits<uint32_t>::max())
, initial_window_size(DEFAULT_INITIAL_WINDOW_SIZE)
, max_frame_size(DEFAULT_MAX_FRAME_SIZE)
, max_header_list_size(std::numeric_limits<uint32_t>::max()) {
}
bool H2Settings::ParseFrom(butil::IOBufBytesIterator& it, size_t n) {
const uint32_t npairs = n / 6;
if (npairs * 6 != n) {
LOG(ERROR) << "Invalid payload_size=" << n;
return false;
}
for (uint32_t i = 0; i < npairs; ++i) {
uint16_t id = LoadUint16(it);
uint32_t value = LoadUint32(it);
switch (static_cast<H2SettingsIdentifier>(id)) {
case HTTP2_SETTINGS_HEADER_TABLE_SIZE:
header_table_size = value;
break;
case HTTP2_SETTINGS_ENABLE_PUSH:
if (value > 1) {
LOG(ERROR) << "Invalid value=" << value << " for ENABLE_PUSH";
return false;
}
enable_push = value;
break;
case HTTP2_SETTINGS_MAX_CONCURRENT_STREAMS:
max_concurrent_streams = value;
break;
case HTTP2_SETTINGS_INITIAL_WINDOW_SIZE:
if (value > MAX_INITIAL_WINDOW_SIZE) {
LOG(ERROR) << "Invalid initial_window_size=" << value;
return false;
}
initial_window_size = value;
break;
case HTTP2_SETTINGS_MAX_FRAME_SIZE:
if (value > MAX_OF_MAX_FRAME_SIZE ||
value < DEFAULT_MAX_FRAME_SIZE) {
LOG(ERROR) << "Invalid max_frame_size=" << value;
return false;
}
max_frame_size = value;
break;
case HTTP2_SETTINGS_MAX_HEADER_LIST_SIZE:
max_header_list_size = value;
break;
default:
// An endpoint that receives a SETTINGS frame with any unknown or
// unsupported identifier MUST ignore that setting (section 6.5.2)
LOG(WARNING) << "Unknown setting, id=" << id << " value=" << value;
break;
}
}
return true;
}
size_t H2Settings::ByteSize() const {
size_t size = 0;
if (header_table_size != DEFAULT_HEADER_TABLE_SIZE) {
size += 6;
}
if (enable_push != DEFAULT_ENABLE_PUSH) {
size += 6;
}
if (max_concurrent_streams != std::numeric_limits<uint32_t>::max()) {
size += 6;
}
if (initial_window_size != DEFAULT_INITIAL_WINDOW_SIZE) {
size += 6;
}
if (max_frame_size != DEFAULT_MAX_FRAME_SIZE) {
size += 6;
}
if (max_header_list_size != std::numeric_limits<uint32_t>::max()) {
size += 6;
}
return size;
}
void H2Settings::SerializeTo(void* out) const {
uint8_t* p = (uint8_t*)out;
if (header_table_size != DEFAULT_HEADER_TABLE_SIZE) {
SaveUint16(p, HTTP2_SETTINGS_HEADER_TABLE_SIZE);
SaveUint32(p + 2, header_table_size);
p += 6;
}
if (enable_push != DEFAULT_ENABLE_PUSH) {
SaveUint16(p, HTTP2_SETTINGS_ENABLE_PUSH);
SaveUint32(p + 2, enable_push);
p += 6;
}
if (max_concurrent_streams != std::numeric_limits<uint32_t>::max()) {
SaveUint16(p, HTTP2_SETTINGS_MAX_CONCURRENT_STREAMS);
SaveUint32(p + 2, max_concurrent_streams);
p += 6;
}
if (initial_window_size != DEFAULT_INITIAL_WINDOW_SIZE) {
SaveUint16(p, HTTP2_SETTINGS_INITIAL_WINDOW_SIZE);
SaveUint32(p + 2, initial_window_size);
p += 6;
}
if (max_frame_size != DEFAULT_MAX_FRAME_SIZE) {
SaveUint16(p, HTTP2_SETTINGS_MAX_FRAME_SIZE);
SaveUint32(p + 2, max_frame_size);
p += 6;
}
if (max_header_list_size != std::numeric_limits<uint32_t>::max()) {
SaveUint16(p, HTTP2_SETTINGS_MAX_HEADER_LIST_SIZE);
SaveUint32(p + 2, max_header_list_size);
p += 6;
}
}
void H2Settings::Print(std::ostream& os) const {
os << "{header_table_size=" << header_table_size
<< " enable_push=" << enable_push
<< " max_concurrent_streams=" << max_concurrent_streams
<< " initial_window_size=" << initial_window_size
<< " max_frame_size=" << max_frame_size
<< " max_header_list_size=" << max_header_list_size
<< '}';
}
const char* H2ErrorToString(H2Error e) {
switch (e) {
case H2_NO_ERROR: return "NO_ERROR";
case H2_PROTOCOL_ERROR: return "PROTOCOL_ERROR";
case H2_INTERNAL_ERROR: return "INTERNAL_ERROR";
case H2_FLOW_CONTROL_ERROR: return "FLOW_CONTROL_ERROR";
case H2_SETTINGS_TIMEOUT: return "SETTINGS_TIMEOUT";
case H2_STREAM_CLOSED_ERROR: return "STREAM_CLOSED";
case H2_FRAME_SIZE_ERROR: return "FRAME_SIZE_ERROR";
case H2_REFUSED_STREAM: return "REFUSED_STREAM";
case H2_CANCEL: return "CANCEL";
case H2_COMPRESSION_ERROR: return "COMPRESSION_ERROR";
case H2_CONNECT_ERROR: return "CONNECT_ERROR";
case H2_ENHANCE_YOUR_CALM: return "ENHANCE_YOUR_CALM";
case H2_INADEQUATE_SECURITY: return "INADEQUATE_SECURITY";
case H2_HTTP_1_1_REQUIRED: return "HTTP_1_1_REQUIRED";
}
return "Unknown-H2Error";
}
} // namespace brpc
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef BAIDU_RPC_HTTP2_H
#define BAIDU_RPC_HTTP2_H
#include "butil/iobuf.h"
// To baidu-rpc developers: This is a header included by user, don't depend
// on internal structures, use opaque pointers instead.
namespace brpc {
struct H2Settings {
// Allows the sender to inform the remote endpoint of the maximum size of
// the header compression table used to decode header blocks, in octets.
// The encoder can select any size equal to or less than this value by
// using signaling specific to the header compression format inside a
// header block (see [COMPRESSION]).
// Default: 4096
static const uint32_t DEFAULT_HEADER_TABLE_SIZE = 4096;
uint32_t header_table_size;
// Enable server push or not (Section 8.2).
// An endpoint MUST NOT send a PUSH_PROMISE frame if it receives this
// parameter set to a value of 0. An endpoint that has both set this
// parameter to 0 and had it acknowledged MUST treat the receipt of a
// PUSH_PROMISE frame as a connection error (Section 5.4.1) of type
// PROTOCOL_ERROR.
// Default: true (server push is permitted)
static const bool DEFAULT_ENABLE_PUSH = true;
bool enable_push;
// The maximum number of concurrent streams that the sender will allow.
// This limit is directional: it applies to the number of streams that the
// sender permits the receiver to create. It is recommended that this value
// be no smaller than 100, so as to not unnecessarily limit parallelism.
// 0 prevents the creation of new streams. However, this can also happen
// for any limit that is exhausted with active streams. Servers SHOULD only
// set a zero value for short durations; if a server does not wish to
// accept requests, closing the connection is more appropriate.
// Default: unlimited
uint32_t max_concurrent_streams;
// Sender's initial window size (in octets) for stream-level flow control.
// This setting affects the window size of all streams (see Section 6.9.2).
// Values above the maximum flow-control window size of 2^31-1 are treated
// as a connection error (Section 5.4.1) of type FLOW_CONTROL_ERROR
// Default: 65535
static const uint32_t DEFAULT_INITIAL_WINDOW_SIZE = 65535;
static const uint32_t MAX_INITIAL_WINDOW_SIZE = (1u << 31) - 1;
uint32_t initial_window_size;
// Size of the largest frame payload that the sender is willing to receive,
// in octets. The value advertised by an endpoint MUST be between 16384 and
// 16777215, inclusive. Values outside this range are treated as a
// connection error(Section 5.4.1) of type PROTOCOL_ERROR.
// Default: 16384
static const uint32_t DEFAULT_MAX_FRAME_SIZE = 16384;
static const uint32_t MAX_OF_MAX_FRAME_SIZE = 16777215;
uint32_t max_frame_size;
// This advisory setting informs a peer of the maximum size of header list
// that the sender is prepared to accept, in octets. The value is based on
// the uncompressed size of header fields, including the length of the name
// and value in octets plus an overhead of 32 octets for each header field.
// For any given request, a lower limit than what is advertised MAY be
// enforced.
// Default: unlimited.
uint32_t max_header_list_size;
// Construct with default values.
H2Settings();
// True iff all fields are valid.
bool IsValid() const;
// [ https://tools.ietf.org/html/rfc7540#section-6.5.1 ]
// Parse from n bytes from the iterator.
// Returns true on success.
bool ParseFrom(butil::IOBufBytesIterator&, size_t n);
// Bytes of serialized data.
size_t ByteSize() const;
// Serialize to `out' which is at least ByteSize() bytes long.
void SerializeTo(void* out) const;
void Print(std::ostream&) const;
};
inline std::ostream& operator<<(std::ostream& os, const H2Settings& s) {
s.Print(os);
return os;
}
enum H2Error {
H2_NO_ERROR = 0x0, // Graceful shutdown
H2_PROTOCOL_ERROR = 0x1, // Protocol error detected
H2_INTERNAL_ERROR = 0x2, // Implementation fault
H2_FLOW_CONTROL_ERROR = 0x3, // Flow-control limits exceeded
H2_SETTINGS_TIMEOUT = 0x4, // Settings not acknowledged
H2_STREAM_CLOSED_ERROR = 0x5, // Frame received for closed stream
H2_FRAME_SIZE_ERROR = 0x6, // Frame size incorrect
H2_REFUSED_STREAM = 0x7, // Stream not processed
H2_CANCEL = 0x8, // Stream cancelled
H2_COMPRESSION_ERROR = 0x9, // Compression state not updated
H2_CONNECT_ERROR = 0xa, // TCP connection error for CONNECT method
H2_ENHANCE_YOUR_CALM = 0xb, // Processing capacity exceeded
H2_INADEQUATE_SECURITY = 0xc, // Negotiated TLS parameters not acceptable
H2_HTTP_1_1_REQUIRED = 0xd, // Use HTTP/1.1 for the request
};
const char* H2ErrorToString(H2Error e);
} // namespace brpc
#endif // BAIDU_RPC_HTTP2_H
...@@ -24,7 +24,9 @@ namespace brpc { ...@@ -24,7 +24,9 @@ namespace brpc {
HttpHeader::HttpHeader() HttpHeader::HttpHeader()
: _status_code(HTTP_STATUS_OK) : _status_code(HTTP_STATUS_OK)
, _method(HTTP_METHOD_GET) , _method(HTTP_METHOD_GET)
, _version(1, 1) { , _version(1, 1)
, _h2_stream_id(0)
, _h2_error(H2_NO_ERROR) {
// NOTE: don't forget to clear the field in Clear() as well. // NOTE: don't forget to clear the field in Clear() as well.
} }
...@@ -48,6 +50,8 @@ void HttpHeader::Swap(HttpHeader &rhs) { ...@@ -48,6 +50,8 @@ void HttpHeader::Swap(HttpHeader &rhs) {
_content_type.swap(rhs._content_type); _content_type.swap(rhs._content_type);
_unresolved_path.swap(rhs._unresolved_path); _unresolved_path.swap(rhs._unresolved_path);
std::swap(_version, rhs._version); std::swap(_version, rhs._version);
std::swap(_h2_stream_id, rhs._h2_stream_id);
std::swap(_h2_error, rhs._h2_error);
} }
void HttpHeader::Clear() { void HttpHeader::Clear() {
...@@ -58,6 +62,8 @@ void HttpHeader::Clear() { ...@@ -58,6 +62,8 @@ void HttpHeader::Clear() {
_content_type.clear(); _content_type.clear();
_unresolved_path.clear(); _unresolved_path.clear();
_version = std::make_pair(1, 1); _version = std::make_pair(1, 1);
_h2_stream_id = 0;
_h2_error = H2_NO_ERROR;
} }
const char* HttpHeader::reason_phrase() const { const char* HttpHeader::reason_phrase() const {
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "brpc/uri.h" // URI #include "brpc/uri.h" // URI
#include "brpc/http_method.h" // HttpMethod #include "brpc/http_method.h" // HttpMethod
#include "brpc/http_status_code.h" #include "brpc/http_status_code.h"
#include "brpc/http2.h"
// To rpc developers: DON'T put impl. details here, use opaque pointers instead. // To rpc developers: DON'T put impl. details here, use opaque pointers instead.
...@@ -31,6 +32,7 @@ namespace brpc { ...@@ -31,6 +32,7 @@ namespace brpc {
class InputMessageBase; class InputMessageBase;
namespace policy { namespace policy {
void ProcessHttpRequest(InputMessageBase *msg); void ProcessHttpRequest(InputMessageBase *msg);
class H2StreamContext;
} }
// Non-body part of a HTTP message. // Non-body part of a HTTP message.
...@@ -61,6 +63,12 @@ public: ...@@ -61,6 +63,12 @@ public:
// True if the message is from HTTP2. // True if the message is from HTTP2.
bool is_http2() const { return major_version() == 2; } bool is_http2() const { return major_version() == 2; }
// Id of the HTTP2 stream where the message is from.
// 0 when is_http2() is false.
int h2_stream_id() const { return _h2_stream_id; }
H2Error h2_error() const { return _h2_error; }
// Get/set "Content-Type". Notice that you can't get "Content-Type" // Get/set "Content-Type". Notice that you can't get "Content-Type"
// via GetHeader(). // via GetHeader().
// possible values: "text/plain", "application/json" ... // possible values: "text/plain", "application/json" ...
...@@ -135,6 +143,7 @@ public: ...@@ -135,6 +143,7 @@ public:
private: private:
friend class HttpMessage; friend class HttpMessage;
friend class HttpMessageSerializer; friend class HttpMessageSerializer;
friend class policy::H2StreamContext;
friend void policy::ProcessHttpRequest(InputMessageBase *msg); friend void policy::ProcessHttpRequest(InputMessageBase *msg);
std::string& GetOrAddHeader(const std::string& key) { std::string& GetOrAddHeader(const std::string& key) {
...@@ -151,6 +160,8 @@ friend void policy::ProcessHttpRequest(InputMessageBase *msg); ...@@ -151,6 +160,8 @@ friend void policy::ProcessHttpRequest(InputMessageBase *msg);
std::string _content_type; std::string _content_type;
std::string _unresolved_path; std::string _unresolved_path;
std::pair<int, int> _version; std::pair<int, int> _version;
int _h2_stream_id;
H2Error _h2_error;
}; };
const HttpHeader& DefaultHttpHeader(); const HttpHeader& DefaultHttpHeader();
......
...@@ -46,6 +46,7 @@ enum ProtocolType { ...@@ -46,6 +46,7 @@ enum ProtocolType {
// Reserve special protocol for cds-agent, which depends on FIFO right now // Reserve special protocol for cds-agent, which depends on FIFO right now
PROTOCOL_CDS_AGENT = 24; // Client side only PROTOCOL_CDS_AGENT = 24; // Client side only
PROTOCOL_ESP = 25; // Client side only PROTOCOL_ESP = 25; // Client side only
PROTOCOL_HTTP2 = 26;
} }
enum CompressType { enum CompressType {
......
diff a/src/brpc/options.proto b/src/brpc/options.proto (rejected hunks)
@@ -53,6 +53,7 @@ enum ProtocolType {
// Reserve special protocol for cds-agent, which depends on FIFO right now
PROTOCOL_CDS_AGENT = 23; // Client side only
PROTOCOL_ESP = 24; // Client side only
+ PROTOCOL_HTTP2 = 25;
}
message ChunkInfo {
This diff is collapsed.
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef BAIDU_RPC_POLICY_HTTP2_RPC_PROTOCOL_H
#define BAIDU_RPC_POLICY_HTTP2_RPC_PROTOCOL_H
#include "brpc/policy/http_rpc_protocol.h" // HttpContext
#include "brpc/input_message_base.h"
#include "brpc/protocol.h"
#include "brpc/details/hpack.h"
#include "brpc/stream_creator.h"
#include "brpc/controller.h"
namespace brpc {
namespace policy {
class H2StreamContext;
class H2ParseResult {
public:
explicit H2ParseResult(H2Error err, int stream_id)
: _msg(NULL), _err(err), _stream_id(stream_id) {}
explicit H2ParseResult(H2StreamContext* msg)
: _msg(msg), _err(H2_NO_ERROR), _stream_id(0) {}
// Return H2_NO_ERROR when the result is successful.
H2Error error() const { return _err; }
const char* error_str() const { return H2ErrorToString(_err); }
bool is_ok() const { return error() == H2_NO_ERROR; }
int stream_id() const { return _stream_id; }
// definitely NULL when result is failed.
H2StreamContext* message() const { return _msg; }
private:
H2StreamContext* _msg;
H2Error _err;
int _stream_id;
};
inline H2ParseResult MakeH2Error(H2Error err, int stream_id)
{ return H2ParseResult(err, stream_id); }
inline H2ParseResult MakeH2Error(H2Error err)
{ return H2ParseResult(err, 0); }
inline H2ParseResult MakeH2Message(H2StreamContext* msg)
{ return H2ParseResult(msg); }
class H2Context;
class H2FrameHead;
enum H2StreamState {
H2_STREAM_IDLE = 0,
H2_STREAM_RESERVED_LOCAL,
H2_STREAM_RESERVED_REMOTE,
H2_STREAM_OPEN,
H2_STREAM_HALF_CLOSED_LOCAL,
H2_STREAM_HALF_CLOSED_REMOTE,
H2_STREAM_CLOSED,
};
const char* H2StreamState2Str(H2StreamState);
class H2UnsentRequest : public SocketMessage, public StreamCreator {
public:
static H2UnsentRequest* New(Controller* c, uint64_t correlation_id);
void Describe(butil::IOBuf*) const;
int AddRefManually()
{ return _nref.fetch_add(1, butil::memory_order_relaxed); }
void RemoveRefManually() {
if (_nref.fetch_sub(1, butil::memory_order_release) == 1) {
butil::atomic_thread_fence(butil::memory_order_acquire);
Destroy();
}
}
// @StreamCreator
void ReplaceSocketForStream(SocketUniquePtr* inout, Controller* cntl);
void OnStreamCreationDone(SocketUniquePtr& sending_sock, Controller* cntl);
void CleanupSocketForStream(Socket* prev_sock, Controller* cntl,
int error_code);
// @SocketMessage
butil::Status AppendAndDestroySelf(butil::IOBuf* out, Socket*);
size_t EstimatedByteSize();
private:
std::string& push(const std::string& name)
{ return (new (&_list[_size++]) HPacker::Header(name))->value; }
void push(const std::string& name, const std::string& value)
{ new (&_list[_size++]) HPacker::Header(name, value); }
H2UnsentRequest(Controller* c)
: _nref(1)
, _size(0)
, _stream_id(0)
, _cntl(c) {}
~H2UnsentRequest() {}
H2UnsentRequest(const H2UnsentRequest&);
void operator=(const H2UnsentRequest&);
void Destroy();
private:
butil::atomic<int> _nref;
uint32_t _size;
uint32_t _stream_id;
mutable butil::Mutex _mutex;
Controller* _cntl;
std::unique_ptr<H2StreamContext> _sctx;
HPacker::Header _list[0];
};
class H2UnsentResponse : public SocketMessage {
public:
static H2UnsentResponse* New(Controller* c);
void Destroy();
void Describe(butil::IOBuf*) const;
// @SocketMessage
butil::Status AppendAndDestroySelf(butil::IOBuf* out, Socket*);
size_t EstimatedByteSize();
private:
std::string& push(const std::string& name)
{ return (new (&_list[_size++]) HPacker::Header(name))->value; }
void push(const std::string& name, const std::string& value)
{ new (&_list[_size++]) HPacker::Header(name, value); }
H2UnsentResponse(Controller* c)
: _size(0)
, _stream_id(c->http_request().h2_stream_id())
, _http_response(c->release_http_response()) {
_data.swap(c->response_attachment());
}
~H2UnsentResponse() {}
H2UnsentResponse(const H2UnsentResponse&);
void operator=(const H2UnsentResponse&);
private:
uint32_t _size;
uint32_t _stream_id;
std::unique_ptr<HttpHeader> _http_response;
butil::IOBuf _data;
HPacker::Header _list[0];
};
// Used in http_rpc_protocol.cpp
class H2StreamContext : public HttpContext {
public:
H2StreamContext();
void Init(H2Context* conn_ctx, int stream_id);
H2StreamContext(H2Context* conn_ctx, int stream_id);
// Decode headers in HPACK from *it and set into this->header(). The input
// does not need to complete.
// Returns 0 on success, -1 otherwise.
int ConsumeHeaders(butil::IOBufBytesIterator& it);
H2ParseResult EndRemoteStream();
H2ParseResult OnData(butil::IOBufBytesIterator&, const H2FrameHead&,
uint32_t frag_size, uint8_t pad_length);
H2ParseResult OnHeaders(butil::IOBufBytesIterator&, const H2FrameHead&,
uint32_t frag_size, uint8_t pad_length);
H2ParseResult OnContinuation(butil::IOBufBytesIterator&, const H2FrameHead&);
H2ParseResult OnResetStream(H2Error h2_error, const H2FrameHead&);
uint64_t correlation_id() const { return _correlation_id; }
void set_correlation_id(uint64_t cid) { _correlation_id = cid; }
size_t parsed_length() const { return this->_parsed_length; }
int stream_id() const { return header().h2_stream_id(); }
#ifdef HAS_H2_STREAM_STATE
H2StreamState state() const { return _state; }
void SetState(H2StreamState state);
#endif
friend class H2Context;
H2Context* _conn_ctx;
#ifdef HAS_H2_STREAM_STATE
H2StreamState _state;
#endif
bool _stream_ended;
butil::atomic<int64_t> _remote_window_size;
uint64_t _correlation_id;
butil::IOBuf _remaining_header_fragment;
};
StreamCreator* get_h2_global_stream_creator();
ParseResult ParseH2Message(butil::IOBuf *source, Socket *socket,
bool read_eof, const void *arg);
void PackH2Request(butil::IOBuf* buf,
SocketMessage** user_message_out,
uint64_t correlation_id,
const google::protobuf::MethodDescriptor* method,
Controller* controller,
const butil::IOBuf& request,
const Authenticator* auth);
} // namespace policy
} // namespace brpc
#endif // BAIDU_RPC_POLICY_HTTP2_RPC_PROTOCOL_H
...@@ -35,8 +35,8 @@ ...@@ -35,8 +35,8 @@
#include "brpc/details/controller_private_accessor.h" #include "brpc/details/controller_private_accessor.h"
#include "brpc/builtin/index_service.h" // IndexService #include "brpc/builtin/index_service.h" // IndexService
#include "brpc/policy/gzip_compress.h" #include "brpc/policy/gzip_compress.h"
#include "brpc/policy/http2_rpc_protocol.h"
#include "brpc/details/usercode_backup_pool.h" #include "brpc/details/usercode_backup_pool.h"
#include "brpc/policy/http_rpc_protocol.h"
extern "C" { extern "C" {
void bthread_assign_data(void* data); void bthread_assign_data(void* data);
...@@ -215,7 +215,14 @@ void ProcessHttpResponse(InputMessageBase* msg) { ...@@ -215,7 +215,14 @@ void ProcessHttpResponse(InputMessageBase* msg) {
const int64_t start_parse_us = butil::cpuwide_time_us(); const int64_t start_parse_us = butil::cpuwide_time_us();
DestroyingPtr<HttpContext> imsg_guard(static_cast<HttpContext*>(msg)); DestroyingPtr<HttpContext> imsg_guard(static_cast<HttpContext*>(msg));
Socket* socket = imsg_guard->socket(); Socket* socket = imsg_guard->socket();
uint64_t cid_value = socket->correlation_id(); uint64_t cid_value;
const bool is_http2 = imsg_guard->header().is_http2();
if (is_http2) {
H2StreamContext* http2_sctx = static_cast<H2StreamContext*>(msg);
cid_value = http2_sctx->correlation_id();
} else {
cid_value = socket->correlation_id();
}
if (cid_value == 0) { if (cid_value == 0) {
LOG(WARNING) << "Fail to find correlation_id from " << *socket; LOG(WARNING) << "Fail to find correlation_id from " << *socket;
return; return;
...@@ -247,6 +254,7 @@ void ProcessHttpResponse(InputMessageBase* msg) { ...@@ -247,6 +254,7 @@ void ProcessHttpResponse(InputMessageBase* msg) {
const int saved_error = cntl->ErrorCode(); const int saved_error = cntl->ErrorCode();
do { do {
if (!is_http2) {
// If header has "Connection: close", close the connection. // If header has "Connection: close", close the connection.
const std::string* conn_cmd = res_header->GetHeader(common->CONNECTION); const std::string* conn_cmd = res_header->GetHeader(common->CONNECTION);
if (conn_cmd != NULL && 0 == strcasecmp(conn_cmd->c_str(), "close")) { if (conn_cmd != NULL && 0 == strcasecmp(conn_cmd->c_str(), "close")) {
...@@ -258,6 +266,7 @@ void ProcessHttpResponse(InputMessageBase* msg) { ...@@ -258,6 +266,7 @@ void ProcessHttpResponse(InputMessageBase* msg) {
socket->SetFailed(); socket->SetFailed();
} }
} }
}
if (imsg_guard->read_body_progressively()) { if (imsg_guard->read_body_progressively()) {
// Set RPA if needed // Set RPA if needed
...@@ -632,9 +641,11 @@ static void SendHttpResponse(Controller *cntl, ...@@ -632,9 +641,11 @@ static void SendHttpResponse(Controller *cntl,
// or the server sent a Connection: close response header. If such a // or the server sent a Connection: close response header. If such a
// response header exists, the client must close its end of the connection // response header exists, the client must close its end of the connection
// after receiving the response. // after receiving the response.
if (!req_header->is_http2()) {
const std::string* res_conn = res_header->GetHeader(common->CONNECTION); const std::string* res_conn = res_header->GetHeader(common->CONNECTION);
if (res_conn == NULL || strcasecmp(res_conn->c_str(), "close") != 0) { if (res_conn == NULL || strcasecmp(res_conn->c_str(), "close") != 0) {
const std::string* req_conn = req_header->GetHeader(common->CONNECTION); const std::string* req_conn =
req_header->GetHeader(common->CONNECTION);
if (req_header->before_http_1_1()) { if (req_header->before_http_1_1()) {
if (req_conn != NULL && if (req_conn != NULL &&
strcasecmp(req_conn->c_str(), "keep-alive") == 0) { strcasecmp(req_conn->c_str(), "keep-alive") == 0) {
...@@ -648,7 +659,7 @@ static void SendHttpResponse(Controller *cntl, ...@@ -648,7 +659,7 @@ static void SendHttpResponse(Controller *cntl,
} }
} // else user explicitly set Connection:close, clients of } // else user explicitly set Connection:close, clients of
// HTTP 1.1/1.0/0.9 should all close the connection. // HTTP 1.1/1.0/0.9 should all close the connection.
}
if (cntl->Failed()) { if (cntl->Failed()) {
// Set status-code with default value(converted from error code) // Set status-code with default value(converted from error code)
// if user did not set it. // if user did not set it.
...@@ -703,6 +714,24 @@ static void SendHttpResponse(Controller *cntl, ...@@ -703,6 +714,24 @@ static void SendHttpResponse(Controller *cntl,
// users to set max_concurrency. // users to set max_concurrency.
Socket::WriteOptions wopt; Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true; wopt.ignore_eovercrowded = true;
if (req_header->is_http2()) {
SocketMessagePtr<H2UnsentResponse> h2_response(H2UnsentResponse::New(cntl));
if (h2_response == NULL) {
LOG(ERROR) << "Fail to make http2 response";
errno = EINVAL;
rc = -1;
} else {
if (FLAGS_http_verbose) {
butil::IOBuf desc;
h2_response->Describe(&desc);
std::cerr << desc << std::endl;
}
if (span) {
span->set_response_size(h2_response->EstimatedByteSize());
}
rc = socket->Write(h2_response, &wopt);
}
} else {
butil::IOBuf* content = NULL; butil::IOBuf* content = NULL;
if (cntl->Failed() || !cntl->has_progressive_writer()) { if (cntl->Failed() || !cntl->has_progressive_writer()) {
content = &cntl->response_attachment(); content = &cntl->response_attachment();
...@@ -716,6 +745,7 @@ static void SendHttpResponse(Controller *cntl, ...@@ -716,6 +745,7 @@ static void SendHttpResponse(Controller *cntl,
span->set_response_size(res_buf.size()); span->set_response_size(res_buf.size());
} }
rc = socket->Write(&res_buf, &wopt); rc = socket->Write(&res_buf, &wopt);
}
if (rc != 0) { if (rc != 0) {
// EPIPE is common in pooled connections + backup requests. // EPIPE is common in pooled connections + backup requests.
...@@ -1110,7 +1140,7 @@ void ProcessHttpRequest(InputMessageBase *msg) { ...@@ -1110,7 +1140,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
span->set_remote_side(user_addr); span->set_remote_side(user_addr);
span->set_received_us(msg->received_us()); span->set_received_us(msg->received_us());
span->set_start_parse_us(start_parse_us); span->set_start_parse_us(start_parse_us);
span->set_protocol(PROTOCOL_HTTP); span->set_protocol(req_header.is_http2() ? PROTOCOL_HTTP2 : PROTOCOL_HTTP);
span->set_request_size(imsg_guard->parsed_length()); span->set_request_size(imsg_guard->parsed_length());
} }
......
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
#define BRPC_POLICY_HTTP_RPC_PROTOCOL_H #define BRPC_POLICY_HTTP_RPC_PROTOCOL_H
#include "brpc/details/http_message.h" // HttpMessage #include "brpc/details/http_message.h" // HttpMessage
#include "brpc/details/controller_private_accessor.h"
#include "brpc/input_messenger.h" // InputMessenger #include "brpc/input_messenger.h" // InputMessenger
#include "brpc/protocol.h" #include "brpc/protocol.h"
......
...@@ -38,6 +38,7 @@ ...@@ -38,6 +38,7 @@
#include "brpc/details/profiler_linker.h" #include "brpc/details/profiler_linker.h"
#include "brpc/health_reporter.h" #include "brpc/health_reporter.h"
#include "brpc/adaptive_max_concurrency.h" #include "brpc/adaptive_max_concurrency.h"
#include "brpc/http2.h"
namespace brpc { namespace brpc {
...@@ -229,6 +230,9 @@ struct ServerOptions { ...@@ -229,6 +230,9 @@ struct ServerOptions {
// Default: empty (all protocols) // Default: empty (all protocols)
std::string enabled_protocols; std::string enabled_protocols;
// Customize parameters of HTTP2, defined in http2.h
H2Settings http2_settings;
private: private:
// SSLOptions is large and not often used, allocate it on heap to // SSLOptions is large and not often used, allocate it on heap to
// prevent ServerOptions from being bloated in most cases. // prevent ServerOptions from being bloated in most cases.
......
...@@ -167,9 +167,10 @@ public: ...@@ -167,9 +167,10 @@ public:
// Remove |key| and the associated value // Remove |key| and the associated value
// Returns: 1 on erased, 0 otherwise. // Returns: 1 on erased, 0 otherwise.
template <typename K2> size_t erase(const K2& key);
// Remove all items. Allocated spaces are NOT returned by system. // Remove all items. Allocated spaces are NOT returned by system.
template <typename K2>
size_t erase(const K2& key, mapped_type* old_value = NULL);
void clear(); void clear();
// Remove all items and return all allocated spaces to system. // Remove all items and return all allocated spaces to system.
...@@ -300,7 +301,7 @@ public: ...@@ -300,7 +301,7 @@ public:
{ return _map.insert(key, FlatMapVoid()); } { return _map.insert(key, FlatMapVoid()); }
template <typename K2> template <typename K2>
size_t erase(const K2& key) { return _map.erase(key); } size_t erase(const K2& key) { return _map.erase(key, NULL); }
void clear() { return _map.clear(); } void clear() { return _map.clear(); }
void clear_and_reset_pool() { return _map.clear_and_reset_pool(); } void clear_and_reset_pool() { return _map.clear_and_reset_pool(); }
......
...@@ -375,7 +375,7 @@ _T* FlatMap<_K, _T, _H, _E, _S>::insert(const key_type& key, ...@@ -375,7 +375,7 @@ _T* FlatMap<_K, _T, _H, _E, _S>::insert(const key_type& key,
template <typename _K, typename _T, typename _H, typename _E, bool _S> template <typename _K, typename _T, typename _H, typename _E, bool _S>
template <typename K2> template <typename K2>
size_t FlatMap<_K, _T, _H, _E, _S>::erase(const K2& key) { size_t FlatMap<_K, _T, _H, _E, _S>::erase(const K2& key, _T* old_value) {
if (!initialized()) { if (!initialized()) {
return 0; return 0;
} }
...@@ -386,6 +386,9 @@ size_t FlatMap<_K, _T, _H, _E, _S>::erase(const K2& key) { ...@@ -386,6 +386,9 @@ size_t FlatMap<_K, _T, _H, _E, _S>::erase(const K2& key) {
return 0; return 0;
} }
if (_eql(first_node.element().first_ref(), key)) { if (_eql(first_node.element().first_ref(), key)) {
if (old_value) {
*old_value = first_node.element().second_ref();
}
if (first_node.next == NULL) { if (first_node.next == NULL) {
first_node.element().~Element(); first_node.element().~Element();
first_node.set_invalid(); first_node.set_invalid();
...@@ -421,6 +424,9 @@ size_t FlatMap<_K, _T, _H, _E, _S>::erase(const K2& key) { ...@@ -421,6 +424,9 @@ size_t FlatMap<_K, _T, _H, _E, _S>::erase(const K2& key) {
Bucket *last_p = &first_node; Bucket *last_p = &first_node;
while (p) { while (p) {
if (_eql(p->element().first_ref(), key)) { if (_eql(p->element().first_ref(), key)) {
if (old_value) {
*old_value = p->element().second_ref();
}
last_p->next = p->next; last_p->next = p->next;
p->element().~Element(); p->element().~Element();
_pool.back(p); _pool.back(p);
......
...@@ -2059,6 +2059,37 @@ IOBufAppender::IOBufAppender() ...@@ -2059,6 +2059,37 @@ IOBufAppender::IOBufAppender()
, _zc_stream(&_buf) { , _zc_stream(&_buf) {
} }
size_t IOBufBytesIterator::append_and_forward(butil::IOBuf* buf, size_t n) {
size_t nc = 0;
while (nc < n && _bytes_left != 0) {
const IOBuf::BlockRef& r = _buf->_ref_at(_block_count - 1);
const size_t block_size = _block_end - _block_begin;
const size_t to_copy = std::min(block_size, n - nc);
IOBuf::BlockRef r2 = { (uint32_t)(_block_begin - r.block->data),
(uint32_t)to_copy, r.block };
buf->_push_back_ref(r2);
_block_begin += to_copy;
_bytes_left -= to_copy;
nc += to_copy;
if (_block_begin == _block_end) {
try_next_block();
}
}
return nc;
}
bool IOBufBytesIterator::forward_one_block(const void** data, size_t* size) {
if (_bytes_left == 0) {
return false;
}
const size_t block_size = _block_end - _block_begin;
*data = _block_begin;
*size = block_size;
_bytes_left -= block_size;
try_next_block();
return true;
}
} // namespace butil } // namespace butil
void* fast_memcpy(void *__restrict dest, const void *__restrict src, size_t n) { void* fast_memcpy(void *__restrict dest, const void *__restrict src, size_t n) {
......
...@@ -53,6 +53,7 @@ namespace butil { ...@@ -53,6 +53,7 @@ namespace butil {
class IOBuf { class IOBuf {
friend class IOBufAsZeroCopyInputStream; friend class IOBufAsZeroCopyInputStream;
friend class IOBufAsZeroCopyOutputStream; friend class IOBufAsZeroCopyOutputStream;
friend class IOBufBytesIterator;
public: public:
static const size_t DEFAULT_BLOCK_SIZE = 8192; static const size_t DEFAULT_BLOCK_SIZE = 8192;
static const size_t INITIAL_CAP = 32; // must be power of 2 static const size_t INITIAL_CAP = 32; // must be power of 2
...@@ -650,19 +651,30 @@ private: ...@@ -650,19 +651,30 @@ private:
}; };
// Iterate bytes of a IOBuf. // Iterate bytes of a IOBuf.
// During iteration, the iobuf should NOT be changed. For example, // During iteration, the iobuf should NOT be changed.
// IOBufBytesIterator will not iterate more data appended to the iobuf after
// iterator's creation. This is for performance consideration.
class IOBufBytesIterator { class IOBufBytesIterator {
public: public:
explicit IOBufBytesIterator(const butil::IOBuf& buf); explicit IOBufBytesIterator(const butil::IOBuf& buf);
char operator*() const { return *_block_begin; } // Construct from another iterator.
IOBufBytesIterator(const IOBufBytesIterator& it);
IOBufBytesIterator(const IOBufBytesIterator& it, size_t bytes_left);
// Returning unsigned is safer than char which would be more error prone
// to bitwise operations. For example: in "uint32_t value = *it", value
// is (unexpected) 4294967168 when *it returns (char)128.
unsigned char operator*() const { return (unsigned char)*_block_begin; }
operator const void*() const { return (const void*)!!_bytes_left; } operator const void*() const { return (const void*)!!_bytes_left; }
void operator++(); void operator++();
void operator++(int) { return operator++(); } void operator++(int) { return operator++(); }
// Copy at most n bytes into buf, forwarding this iterator. // Copy at most n bytes into buf, forwarding this iterator.
// Returns bytes copied.
size_t copy_and_forward(void* buf, size_t n); size_t copy_and_forward(void* buf, size_t n);
size_t copy_and_forward(std::string* s, size_t n); size_t copy_and_forward(std::string* s, size_t n);
// Just forward this iterator for at most n bytes.
size_t forward(size_t n);
// Append at most n bytes into buf, forwarding this iterator. Data are
// referenced rather than copied.
size_t append_and_forward(butil::IOBuf* buf, size_t n);
bool forward_one_block(const void** data, size_t* size);
size_t bytes_left() const { return _bytes_left; } size_t bytes_left() const { return _bytes_left; }
private: private:
void try_next_block(); void try_next_block();
......
...@@ -249,6 +249,27 @@ inline IOBufBytesIterator::IOBufBytesIterator(const butil::IOBuf& buf) ...@@ -249,6 +249,27 @@ inline IOBufBytesIterator::IOBufBytesIterator(const butil::IOBuf& buf)
try_next_block(); try_next_block();
} }
inline IOBufBytesIterator::IOBufBytesIterator(const IOBufBytesIterator& it)
: _block_begin(it._block_begin)
, _block_end(it._block_end)
, _block_count(it._block_count)
, _bytes_left(it._bytes_left)
, _buf(it._buf) {
}
inline IOBufBytesIterator::IOBufBytesIterator(
const IOBufBytesIterator& it, size_t bytes_left)
: _block_begin(it._block_begin)
, _block_end(it._block_end)
, _block_count(it._block_count)
, _bytes_left(bytes_left)
, _buf(it._buf) {
//CHECK_LE(_bytes_left, it._bytes_left);
if (_block_end > _block_begin + _bytes_left) {
_block_end = _block_begin + _bytes_left;
}
}
inline void IOBufBytesIterator::try_next_block() { inline void IOBufBytesIterator::try_next_block() {
if (_bytes_left == 0) { if (_bytes_left == 0) {
return; return;
...@@ -268,7 +289,7 @@ inline void IOBufBytesIterator::operator++() { ...@@ -268,7 +289,7 @@ inline void IOBufBytesIterator::operator++() {
inline size_t IOBufBytesIterator::copy_and_forward(void* buf, size_t n) { inline size_t IOBufBytesIterator::copy_and_forward(void* buf, size_t n) {
size_t nc = 0; size_t nc = 0;
while (nc < n && *this != NULL) { while (nc < n && _bytes_left != 0) {
const size_t block_size = _block_end - _block_begin; const size_t block_size = _block_end - _block_begin;
const size_t to_copy = std::min(block_size, n - nc); const size_t to_copy = std::min(block_size, n - nc);
fast_memcpy((char*)buf + nc, _block_begin, to_copy); fast_memcpy((char*)buf + nc, _block_begin, to_copy);
...@@ -295,6 +316,21 @@ inline size_t IOBufBytesIterator::copy_and_forward(std::string* s, size_t n) { ...@@ -295,6 +316,21 @@ inline size_t IOBufBytesIterator::copy_and_forward(std::string* s, size_t n) {
return nc; return nc;
} }
inline size_t IOBufBytesIterator::forward(size_t n) {
size_t nc = 0;
while (nc < n && _bytes_left != 0) {
const size_t block_size = _block_end - _block_begin;
const size_t to_copy = std::min(block_size, n - nc);
_block_begin += to_copy;
_bytes_left -= to_copy;
nc += to_copy;
if (_block_begin == _block_end) {
try_next_block();
}
}
return nc;
}
} // namespace butil } // namespace butil
#endif // BUTIL_IOBUF_INL_H #endif // BUTIL_IOBUF_INL_H
...@@ -12,18 +12,22 @@ class HPackTest : public testing::Test { ...@@ -12,18 +12,22 @@ class HPackTest : public testing::Test {
// Copied test cases from example of rfc7541 // Copied test cases from example of rfc7541
TEST_F(HPackTest, header_with_indexing) { TEST_F(HPackTest, header_with_indexing) {
char c = 128;
uint8_t c2 = c;
printf("%u %u %d %d\n", (uint32_t)c, (uint32_t)c2, (int)c, (int)c2);
brpc::HPacker p1; brpc::HPacker p1;
ASSERT_EQ(0, p1.Init(4096)); ASSERT_EQ(0, p1.Init(4096));
brpc::HPacker p2; brpc::HPacker p2;
ASSERT_EQ(0, p2.Init(4096)); ASSERT_EQ(0, p2.Init(4096));
brpc::HPacker::Header h; brpc::HPacker::Header h;
h.name = "custom-key"; h.name = "Custom-Key";
h.value = "custom-header"; h.value = "custom-header";
brpc::HPackOptions options; brpc::HPackOptions options;
options.index_policy = brpc::HPACK_INDEX_HEADER; options.index_policy = brpc::HPACK_INDEX_HEADER;
butil::IOBufAppender buf; butil::IOBufAppender buf;
ssize_t nwrite = p1.Encode(&buf, h, options); p1.Encode(&buf, h, options);
ASSERT_EQ((size_t)nwrite, buf.buf().length()); const size_t nwrite = buf.buf().size();
LOG(INFO) << butil::PrintedAsBinary(buf.buf()); LOG(INFO) << butil::PrintedAsBinary(buf.buf());
uint8_t expected[] = { uint8_t expected[] = {
0x40, 0x0a, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x2d, 0x6b, 0x65, 0x79, 0x40, 0x0a, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x2d, 0x6b, 0x65, 0x79,
...@@ -35,7 +39,9 @@ TEST_F(HPackTest, header_with_indexing) { ...@@ -35,7 +39,9 @@ TEST_F(HPackTest, header_with_indexing) {
ssize_t nread = p2.Decode(&buf.buf(), &h2); ssize_t nread = p2.Decode(&buf.buf(), &h2);
ASSERT_EQ(nread, nwrite); ASSERT_EQ(nread, nwrite);
ASSERT_TRUE(buf.buf().empty()); ASSERT_TRUE(buf.buf().empty());
ASSERT_EQ(h.name, h2.name); std::string lowercase_name = h.name;
brpc::tolower(&lowercase_name);
ASSERT_EQ(lowercase_name, h2.name);
ASSERT_EQ(h.value, h2.value); ASSERT_EQ(h.value, h2.value);
} }
...@@ -50,8 +56,8 @@ TEST_F(HPackTest, header_without_indexing) { ...@@ -50,8 +56,8 @@ TEST_F(HPackTest, header_without_indexing) {
brpc::HPackOptions options; brpc::HPackOptions options;
options.index_policy = brpc::HPACK_NOT_INDEX_HEADER; options.index_policy = brpc::HPACK_NOT_INDEX_HEADER;
butil::IOBufAppender buf; butil::IOBufAppender buf;
ssize_t nwrite = p1.Encode(&buf, h, options); p1.Encode(&buf, h, options);
ASSERT_EQ((size_t)nwrite, buf.buf().length()); const size_t nwrite = buf.buf().size();
LOG(INFO) << butil::PrintedAsBinary(buf.buf()); LOG(INFO) << butil::PrintedAsBinary(buf.buf());
uint8_t expected[] = { uint8_t expected[] = {
0x04, 0x0c, 0x2f, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2f, 0x70, 0x61, 0x04, 0x0c, 0x2f, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2f, 0x70, 0x61,
...@@ -63,7 +69,9 @@ TEST_F(HPackTest, header_without_indexing) { ...@@ -63,7 +69,9 @@ TEST_F(HPackTest, header_without_indexing) {
ssize_t nread = p2.Decode(&buf.buf(), &h2); ssize_t nread = p2.Decode(&buf.buf(), &h2);
ASSERT_EQ(nread, nwrite); ASSERT_EQ(nread, nwrite);
ASSERT_TRUE(buf.buf().empty()); ASSERT_TRUE(buf.buf().empty());
ASSERT_EQ(h.name, h2.name); std::string lowercase_name = h.name;
brpc::tolower(&lowercase_name);
ASSERT_EQ(lowercase_name, h2.name);
ASSERT_EQ(h.value, h2.value); ASSERT_EQ(h.value, h2.value);
} }
...@@ -78,11 +86,12 @@ TEST_F(HPackTest, header_never_indexed) { ...@@ -78,11 +86,12 @@ TEST_F(HPackTest, header_never_indexed) {
brpc::HPackOptions options; brpc::HPackOptions options;
options.index_policy = brpc::HPACK_NEVER_INDEX_HEADER; options.index_policy = brpc::HPACK_NEVER_INDEX_HEADER;
butil::IOBufAppender buf; butil::IOBufAppender buf;
ssize_t nwrite = p1.Encode(&buf, h, options); p1.Encode(&buf, h, options);
ASSERT_EQ((size_t)nwrite, buf.buf().length()); const size_t nwrite = buf.buf().size();
LOG(INFO) << butil::PrintedAsBinary(buf.buf()); LOG(INFO) << butil::PrintedAsBinary(buf.buf());
uint8_t expected[] = { uint8_t expected[] = {
0x10, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x06, 0x73, 0x65, 0x63, 0x72, 0x65, 0x74, 0x10, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64,
0x06, 0x73, 0x65, 0x63, 0x72, 0x65, 0x74,
}; };
butil::StringPiece sp((char*)expected, sizeof(expected)); butil::StringPiece sp((char*)expected, sizeof(expected));
ASSERT_TRUE(buf.buf().equals(sp)); ASSERT_TRUE(buf.buf().equals(sp));
...@@ -105,8 +114,8 @@ TEST_F(HPackTest, indexed_header) { ...@@ -105,8 +114,8 @@ TEST_F(HPackTest, indexed_header) {
brpc::HPackOptions options; brpc::HPackOptions options;
options.index_policy = brpc::HPACK_INDEX_HEADER; options.index_policy = brpc::HPACK_INDEX_HEADER;
butil::IOBufAppender buf; butil::IOBufAppender buf;
ssize_t nwrite = p1.Encode(&buf, h, options); p1.Encode(&buf, h, options);
ASSERT_EQ((size_t)nwrite, buf.buf().length()); const ssize_t nwrite = buf.buf().size();
LOG(INFO) << butil::PrintedAsBinary(buf.buf()); LOG(INFO) << butil::PrintedAsBinary(buf.buf());
uint8_t expected[] = { uint8_t expected[] = {
0x82, 0x82,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment