Commit b8bc2bc0 authored by gejun's avatar gejun

untested support for goaway & pass stream_id via HttpResponseSender instead of Controller

parent dc9c05e9
......@@ -362,19 +362,15 @@ void Controller::AppendServerIdentiy() {
}
}
// Defined in http_rpc_protocol.cpp
namespace policy {
int ErrorCode2StatusCode(int error_code);
}
inline void UpdateResponseHeader(Controller* cntl) {
DCHECK(cntl->Failed());
if (cntl->request_protocol() == PROTOCOL_HTTP) {
if (cntl->request_protocol() == PROTOCOL_HTTP ||
cntl->request_protocol() == PROTOCOL_HTTP2) {
if (cntl->ErrorCode() != EHTTP) {
// We assume that status code is already set along with EHTTP.
// Set the related status code
cntl->http_response().set_status_code(
policy::ErrorCode2StatusCode(cntl->ErrorCode()));
}
ErrorCodeToStatusCode(cntl->ErrorCode()));
} // else assume that status code is already set along with EHTTP.
if (cntl->server() != NULL) {
// Override HTTP body at server-side to conduct error text
// to the client.
......
......@@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "brpc/http2.h"
#include "brpc/details/hpack.h"
#include <limits>
#include "butil/logging.h"
#include "brpc/details/hpack.h"
#include "brpc/errno.pb.h"
#include "brpc/http2.h"
namespace brpc {
......
......@@ -24,9 +24,7 @@ namespace brpc {
HttpHeader::HttpHeader()
: _status_code(HTTP_STATUS_OK)
, _method(HTTP_METHOD_GET)
, _version(1, 1)
, _h2_stream_id(0)
, _h2_error(H2_NO_ERROR) {
, _version(1, 1) {
// NOTE: don't forget to clear the field in Clear() as well.
}
......@@ -50,8 +48,6 @@ void HttpHeader::Swap(HttpHeader &rhs) {
_content_type.swap(rhs._content_type);
_unresolved_path.swap(rhs._unresolved_path);
std::swap(_version, rhs._version);
std::swap(_h2_stream_id, rhs._h2_stream_id);
std::swap(_h2_error, rhs._h2_error);
}
void HttpHeader::Clear() {
......@@ -62,8 +58,6 @@ void HttpHeader::Clear() {
_content_type.clear();
_unresolved_path.clear();
_version = std::make_pair(1, 1);
_h2_stream_id = 0;
_h2_error = H2_NO_ERROR;
}
const char* HttpHeader::reason_phrase() const {
......
......@@ -63,12 +63,6 @@ public:
// True if the message is from HTTP2.
bool is_http2() const { return major_version() == 2; }
// Id of the HTTP2 stream where the message is from.
// 0 when is_http2() is false.
int h2_stream_id() const { return _h2_stream_id; }
H2Error h2_error() const { return _h2_error; }
// Get/set "Content-Type". Notice that you can't get "Content-Type"
// via GetHeader().
// possible values: "text/plain", "application/json" ...
......@@ -160,8 +154,6 @@ friend void policy::ProcessHttpRequest(InputMessageBase *msg);
std::string _content_type;
std::string _unresolved_path;
std::pair<int, int> _version;
int _h2_stream_id;
H2Error _h2_error;
};
const HttpHeader& DefaultHttpHeader();
......
......@@ -19,7 +19,7 @@
#include "butil/logging.h" // BAIDU_*
#include "butil/macros.h" // ARRAY_SIZE
#include "butil/thread_local.h" // thread_local
#include "brpc/errno.pb.h"
#include "brpc/http_status_code.h"
......@@ -113,4 +113,30 @@ const char *HttpReasonPhrase(int status_code) {
return tls_phrase_cache;
}
int ErrorCodeToStatusCode(int error_code) {
if (error_code == 0) {
return HTTP_STATUS_OK;
}
switch (error_code) {
case ENOSERVICE:
case ENOMETHOD:
return HTTP_STATUS_NOT_FOUND;
case ERPCAUTH:
return HTTP_STATUS_UNAUTHORIZED;
case EREQUEST:
case EINVAL:
return HTTP_STATUS_BAD_REQUEST;
case ELIMIT:
case ELOGOFF:
return HTTP_STATUS_SERVICE_UNAVAILABLE;
case EPERM:
return HTTP_STATUS_FORBIDDEN;
case ERPCTIMEDOUT:
case ETIMEDOUT:
return HTTP_STATUS_GATEWAY_TIMEOUT;
default:
return HTTP_STATUS_INTERNAL_SERVER_ERROR;
}
}
} // namespace brpc
......@@ -76,6 +76,9 @@ namespace brpc {
// value into a container. Directly copy the memory instead.
const char *HttpReasonPhrase(int status_code);
// Convert brpc error code to related status code.
int ErrorCodeToStatusCode(int error_code);
// Informational 1xx
// This class of status code indicates a provisional response, consisting
// only of the Status-Line and optional headers, and is terminated by an
......
......@@ -26,6 +26,7 @@ namespace brpc {
DECLARE_bool(http_verbose);
DECLARE_int32(http_verbose_max_body_length);
DECLARE_int32(health_check_interval);
DECLARE_bool(usercode_in_pthread);
namespace policy {
......@@ -318,6 +319,7 @@ H2Context::H2Context(Socket* socket, const Server* server)
, _conn_state(H2_CONNECTION_UNINITIALIZED)
, _last_server_stream_id(-1)
, _last_client_stream_id(1)
, _goaway_stream_id(-1)
, _deferred_window_update(0) {
// Stop printing the field which is useless for remote settings.
_remote_settings.connection_window_size = 0;
......@@ -344,49 +346,6 @@ H2Context::~H2Context() {
_pending_streams.clear();
}
H2StreamContext::H2StreamContext()
: _conn_ctx(NULL)
#if defined(BRPC_H2_STREAM_STATE)
, _state(H2_STREAM_IDLE)
#endif
, _stream_ended(false)
, _remote_window_left(0)
, _deferred_window_update(0)
, _correlation_id(INVALID_BTHREAD_ID.value) {
header().set_version(2, 0);
#ifndef NDEBUG
get_http2_bvars()->h2_stream_context_count << 1;
#endif
}
H2StreamContext::~H2StreamContext() {
#ifndef NDEBUG
get_http2_bvars()->h2_stream_context_count << -1;
#endif
}
bool H2StreamContext::ConsumeWindowSize(int64_t size) {
// This method is guaranteed to be called in AppendAndDestroySelf() which
// is run sequentially. As a result, _remote_window_left of this stream
// context will not be decremented (may be incremented) because following
// AppendAndDestroySelf() are not run yet.
// This fact is important to make window_size changes to stream and
// connection contexts transactionally.
if (_remote_window_left.load(butil::memory_order_relaxed) < size) {
return false;
}
if (!MinusWindowSize(&_conn_ctx->_remote_window_left, size)) {
return false;
}
int64_t after_sub = _remote_window_left.fetch_sub(size, butil::memory_order_relaxed) - size;
if (after_sub < 0) {
LOG(FATAL) << "Impossible, the http2 impl is buggy";
_remote_window_left.fetch_add(size, butil::memory_order_relaxed);
return false;
}
return true;
}
int H2Context::Init() {
if (_pending_streams.init(64, 70) != 0) {
LOG(ERROR) << "Fail to init _pending_streams";
......@@ -399,19 +358,6 @@ int H2Context::Init() {
return 0;
}
inline int H2Context::AllocateClientStreamId() {
if (RunOutStreams()) {
return -1;
}
const int id = _last_client_stream_id;
_last_client_stream_id += 2;
return id;
}
inline bool H2Context::RunOutStreams() const {
return (_last_client_stream_id > 0x7FFFFFFF);
}
H2StreamContext* H2Context::RemoveStream(int stream_id) {
H2StreamContext* sctx = NULL;
std::unique_lock<butil::Mutex> mu(_stream_mutex);
......@@ -421,6 +367,35 @@ H2StreamContext* H2Context::RemoveStream(int stream_id) {
return NULL;
}
void H2Context::RemoveGoAwayStreams(
int goaway_stream_id, std::vector<H2StreamContext*>* out_streams) {
out_streams->clear();
if (goaway_stream_id == 0) { // quick path
StreamMap tmp;
{
std::unique_lock<butil::Mutex> mu(_stream_mutex);
_goaway_stream_id = goaway_stream_id;
_pending_streams.swap(tmp);
}
for (StreamMap::const_iterator it = tmp.begin(); it != tmp.end(); ++it) {
out_streams->push_back(it->second);
}
} else {
std::unique_lock<butil::Mutex> mu(_stream_mutex);
_goaway_stream_id = goaway_stream_id;
for (StreamMap::const_iterator it = _pending_streams.begin();
it != _pending_streams.end(); ++it) {
if (it->first > goaway_stream_id) {
out_streams->push_back(it->second);
}
}
for (size_t i = 0; i < out_streams->size(); ++i) {
_pending_streams.erase((*out_streams)[i]->stream_id());
}
}
}
H2StreamContext* H2Context::FindStream(int stream_id) {
std::unique_lock<butil::Mutex> mu(_stream_mutex);
H2StreamContext** psctx = _pending_streams.seek(stream_id);
......@@ -430,18 +405,17 @@ H2StreamContext* H2Context::FindStream(int stream_id) {
return NULL;
}
bool H2Context::TryToInsertStream(int stream_id, H2StreamContext* ctx) {
int H2Context::TryToInsertStream(int stream_id, H2StreamContext* ctx) {
std::unique_lock<butil::Mutex> mu(_stream_mutex);
if (_goaway_stream_id >= 0 && stream_id > _goaway_stream_id) {
return 1;
}
H2StreamContext*& sctx = _pending_streams[stream_id];
if (sctx == NULL) {
sctx = ctx;
return true;
return 0;
}
return false;
}
inline uint32_t H2Context::VolatilePendingStreamSize() const {
return _pending_streams.size();
return -1;
}
ParseResult H2Context::ConsumeFrameHead(
......@@ -598,10 +572,14 @@ H2ParseResult H2Context::OnHeaders(
}
_last_server_stream_id = frame_head.stream_id;
sctx = new H2StreamContext(this, frame_head.stream_id);
if (!TryToInsertStream(frame_head.stream_id, sctx)) {
const int rc = TryToInsertStream(frame_head.stream_id, sctx);
if (rc < 0) {
delete sctx;
LOG(ERROR) << "Fail to insert stream_id=" << frame_head.stream_id;
LOG(ERROR) << "Fail to insert existing stream_id=" << frame_head.stream_id;
return MakeH2Error(H2_PROTOCOL_ERROR);
} else if (rc > 0) {
delete sctx;
return MakeH2Error(H2_REFUSED_STREAM);
}
} else {
sctx = FindStream(frame_head.stream_id);
......@@ -808,7 +786,6 @@ H2ParseResult H2StreamContext::OnResetStream(
}
if (_conn_ctx->is_client_side()) {
sctx->header().set_status_code(HTTP_STATUS_INTERNAL_SERVER_ERROR);
sctx->header()._h2_error = h2_error;
return MakeH2Message(sctx);
} else {
// No need to process the request.
......@@ -937,12 +914,40 @@ H2ParseResult H2Context::OnPing(
return MakeH2Message(NULL);
}
static void* ProcessHttpResponseWrapper(void* void_arg) {
ProcessHttpResponse(static_cast<InputMessageBase*>(void_arg));
return NULL;
}
H2ParseResult H2Context::OnGoAway(
butil::IOBufBytesIterator&, const H2FrameHead&) {
// TODO(zhujiashun): deal with the stream identifier of the
// last peer-initiated stream that was or might be processed
// on the sending endpoint in this connection.
return MakeH2Message(NULL);
butil::IOBufBytesIterator&, const H2FrameHead& h) {
if (is_client_side()) {
// The socket will not be selected for further requests.
_socket->SetLogOff();
std::vector<H2StreamContext*> goaway_streams;
RemoveGoAwayStreams(h.stream_id, &goaway_streams);
if (goaway_streams.empty()) {
return MakeH2Message(NULL);
}
for (size_t i = 0; i < goaway_streams.size(); ++i) {
H2StreamContext* sctx = goaway_streams[i];
sctx->header().set_status_code(HTTP_STATUS_SERVICE_UNAVAILABLE);
}
for (size_t i = 1; i < goaway_streams.size(); ++i) {
bthread_t th;
bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?
BTHREAD_ATTR_PTHREAD :
BTHREAD_ATTR_NORMAL);
tmp.keytable_pool = _socket->keytable_pool();
CHECK_EQ(0, bthread_start_background(
&th, &tmp, ProcessHttpResponseWrapper, goaway_streams[i]));
}
return MakeH2Message(goaway_streams[0]);
} else {
// server serves requests on-demand, ignoring GOAWAY is OK.
return MakeH2Message(NULL);
}
}
H2ParseResult H2Context::OnWindowUpdate(
......@@ -1105,11 +1110,27 @@ void H2Context::ClearAbandonedStreamsImpl() {
}
}
H2StreamContext::H2StreamContext()
: _conn_ctx(NULL)
#if defined(BRPC_H2_STREAM_STATE)
, _state(H2_STREAM_IDLE)
#endif
, _stream_id(0)
, _stream_ended(false)
, _remote_window_left(0)
, _deferred_window_update(0)
, _correlation_id(INVALID_BTHREAD_ID.value) {
header().set_version(2, 0);
#ifndef NDEBUG
get_http2_bvars()->h2_stream_context_count << 1;
#endif
}
void H2StreamContext::Init(H2Context* conn_ctx, int stream_id) {
_conn_ctx = conn_ctx;
_stream_id = stream_id;
_remote_window_left.store(conn_ctx->remote_settings().stream_window_size,
butil::memory_order_relaxed);
header()._h2_stream_id = stream_id;
}
H2StreamContext::H2StreamContext(H2Context* conn_ctx, int stream_id)
......@@ -1117,17 +1138,23 @@ H2StreamContext::H2StreamContext(H2Context* conn_ctx, int stream_id)
#if defined(BRPC_H2_STREAM_STATE)
, _state(H2_STREAM_IDLE)
#endif
, _stream_id(stream_id)
, _stream_ended(false)
, _remote_window_left(conn_ctx->remote_settings().stream_window_size)
, _deferred_window_update(0)
, _correlation_id(INVALID_BTHREAD_ID.value) {
header().set_version(2, 0);
header()._h2_stream_id = stream_id;
#ifndef NDEBUG
get_http2_bvars()->h2_stream_context_count << 1;
#endif
}
H2StreamContext::~H2StreamContext() {
#ifndef NDEBUG
get_http2_bvars()->h2_stream_context_count << -1;
#endif
}
#if defined(BRPC_H2_STREAM_STATE)
void H2StreamContext::SetState(H2StreamState state) {
const H2StreamState old_state = _state;
......@@ -1138,6 +1165,28 @@ void H2StreamContext::SetState(H2StreamState state) {
}
#endif
bool H2StreamContext::ConsumeWindowSize(int64_t size) {
// This method is guaranteed to be called in AppendAndDestroySelf() which
// is run sequentially. As a result, _remote_window_left of this stream
// context will not be decremented (may be incremented) because following
// AppendAndDestroySelf() are not run yet.
// This fact is important to make window_size changes to stream and
// connection contexts transactionally.
if (_remote_window_left.load(butil::memory_order_relaxed) < size) {
return false;
}
if (!MinusWindowSize(&_conn_ctx->_remote_window_left, size)) {
return false;
}
int64_t after_sub = _remote_window_left.fetch_sub(size, butil::memory_order_relaxed) - size;
if (after_sub < 0) {
LOG(FATAL) << "Impossible, the http2 impl is buggy";
_remote_window_left.fetch_add(size, butil::memory_order_relaxed);
return false;
}
return true;
}
int H2StreamContext::ConsumeHeaders(butil::IOBufBytesIterator& it) {
HPacker& hpacker = _conn_ctx->hpacker();
HttpHeader& h = header();
......@@ -1425,7 +1474,7 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
ctx = new H2Context(socket, NULL);
if (ctx->Init() != 0) {
delete ctx;
return butil::Status(EINVAL, "Fail to init H2Context");
return butil::Status(EINTERNAL, "Fail to init H2Context");
}
socket->initialize_parsing_context(&ctx);
......@@ -1440,10 +1489,9 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
out->append(settingsbuf, nb);
}
// FIXME(gejun): Replace EAGAIN
// TODO(zhujiashun): also check this in server push
if (ctx->VolatilePendingStreamSize() > ctx->remote_settings().max_concurrent_streams) {
return butil::Status(EAGAIN, "Pending Stream count exceeds max concurrent stream");
return butil::Status(ELIMIT, "Pending Stream count exceeds max concurrent stream");
}
// Although the critical section looks huge, it should rarely be contended
......@@ -1464,9 +1512,13 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
}
H2StreamContext* sctx = _sctx.release();
sctx->Init(ctx, id);
if (!ctx->TryToInsertStream(id, sctx)) {
const int rc = ctx->TryToInsertStream(id, sctx);
if (rc < 0) {
delete sctx;
return butil::Status(EINTERNAL, "Fail to insert existing stream_id");
} else if (rc > 0) {
delete sctx;
return butil::Status(ECANCELED, "stream_id already exists");
return butil::Status(ELOGOFF, "the connection just issued GOAWAY");
}
_stream_id = sctx->stream_id();
......@@ -1474,7 +1526,7 @@ H2UnsentRequest::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
if (!_cntl->request_attachment().empty()) {
const int64_t data_size = _cntl->request_attachment().size();
if (!sctx->ConsumeWindowSize(data_size)) {
return butil::Status(EAGAIN, "remote_window_left is not enough, data_size=%" PRId64, data_size);
return butil::Status(ELIMIT, "remote_window_left is not enough, data_size=%" PRId64, data_size);
}
}
......@@ -1555,7 +1607,7 @@ void H2UnsentRequest::Describe(butil::IOBuf* desc) const {
}
}
H2UnsentResponse* H2UnsentResponse::New(Controller* c) {
H2UnsentResponse* H2UnsentResponse::New(Controller* c, int stream_id) {
const HttpHeader* const h = &c->http_response();
const CommonStrings* const common = get_common_strings();
const bool need_content_length =
......@@ -1566,7 +1618,7 @@ H2UnsentResponse* H2UnsentResponse::New(Controller* c) {
+ (size_t)need_content_type;
const size_t memsize = offsetof(H2UnsentResponse, _list) +
sizeof(HPacker::Header) * maxsize;
H2UnsentResponse* msg = new (malloc(memsize)) H2UnsentResponse(c);
H2UnsentResponse* msg = new (malloc(memsize)) H2UnsentResponse(c, stream_id);
// :status
if (h->status_code() == 200) {
msg->push(common->H2_STATUS, common->STATUS_200);
......@@ -1614,7 +1666,7 @@ H2UnsentResponse::AppendAndDestroySelf(butil::IOBuf* out, Socket* socket) {
// window size is useless, however it's not true when progressive request
// is supported.
if (!MinusWindowSize(&ctx->_remote_window_left, _data.size())) {
return butil::Status(EAGAIN, "Remote window size is not enough");
return butil::Status(ELIMIT, "Remote window size is not enough");
}
HPacker& hpacker = ctx->hpacker();
......
......@@ -185,7 +185,7 @@ private:
private:
butil::atomic<int> _nref;
uint32_t _size;
uint32_t _stream_id;
int _stream_id;
mutable butil::Mutex _mutex;
Controller* _cntl;
std::unique_ptr<H2StreamContext> _sctx;
......@@ -194,7 +194,7 @@ private:
class H2UnsentResponse : public SocketMessage {
public:
static H2UnsentResponse* New(Controller* c);
static H2UnsentResponse* New(Controller* c, int stream_id);
void Destroy();
void Describe(butil::IOBuf*) const;
// @SocketMessage
......@@ -208,9 +208,9 @@ private:
void push(const std::string& name, const std::string& value)
{ new (&_list[_size++]) HPacker::Header(name, value); }
H2UnsentResponse(Controller* c)
H2UnsentResponse(Controller* c, int stream_id)
: _size(0)
, _stream_id(c->http_request().h2_stream_id())
, _stream_id(stream_id)
, _http_response(c->release_http_response()) {
_data.swap(c->response_attachment());
}
......@@ -251,7 +251,7 @@ public:
void set_correlation_id(uint64_t cid) { _correlation_id = cid; }
size_t parsed_length() const { return this->_parsed_length; }
int stream_id() const { return header().h2_stream_id(); }
int stream_id() const { return _stream_id; }
int64_t ReleaseDeferredWindowUpdate() {
if (_deferred_window_update.load(butil::memory_order_relaxed) == 0) {
......@@ -272,6 +272,7 @@ friend class H2Context;
#if defined(BRPC_H2_STREAM_STATE)
H2StreamState _state;
#endif
int _stream_id;
bool _stream_ended;
butil::atomic<int64_t> _remote_window_left;
butil::atomic<int64_t> _deferred_window_update;
......@@ -336,9 +337,9 @@ public:
int AllocateClientStreamId();
bool RunOutStreams() const;
// Try to map stream_id to ctx if stream_id does not exist before
// Returns true on success, false otherwise.
bool TryToInsertStream(int stream_id, H2StreamContext* ctx);
uint32_t VolatilePendingStreamSize() const;
// Returns 0 on success, -1 on exist, 1 on goaway.
int TryToInsertStream(int stream_id, H2StreamContext* ctx);
size_t VolatilePendingStreamSize() const { return _pending_streams.size(); }
HPacker& hpacker() { return _hpacker; }
const H2Settings& remote_settings() const { return _remote_settings; }
......@@ -372,6 +373,8 @@ friend void InitFrameHandlers();
H2ParseResult OnContinuation(butil::IOBufBytesIterator&, const H2FrameHead&);
H2StreamContext* RemoveStream(int stream_id);
void RemoveGoAwayStreams(int goaway_stream_id, std::vector<H2StreamContext*>* out_streams);
H2StreamContext* FindStream(int stream_id);
void ClearAbandonedStreamsImpl();
......@@ -382,6 +385,7 @@ friend void InitFrameHandlers();
H2ConnectionState _conn_state;
int _last_server_stream_id;
uint32_t _last_client_stream_id;
int _goaway_stream_id;
H2Settings _remote_settings;
H2Settings _local_settings;
H2Settings _unack_local_settings;
......@@ -394,6 +398,19 @@ friend void InitFrameHandlers();
butil::atomic<int64_t> _deferred_window_update;
};
inline int H2Context::AllocateClientStreamId() {
if (RunOutStreams()) {
return -1;
}
const int id = _last_client_stream_id;
_last_client_stream_id += 2;
return id;
}
inline bool H2Context::RunOutStreams() const {
return (_last_client_stream_id > 0x7FFFFFFF);
}
} // namespace policy
} // namespace brpc
......
......@@ -536,46 +536,59 @@ inline bool SupportGzip(Controller* cntl) {
return encodings->find(common->GZIP) != std::string::npos;
}
// Called in controller.cpp as well
int ErrorCode2StatusCode(int error_code) {
switch (error_code) {
case ENOSERVICE:
case ENOMETHOD:
return HTTP_STATUS_NOT_FOUND;
case ERPCAUTH:
return HTTP_STATUS_UNAUTHORIZED;
case EREQUEST:
case EINVAL:
return HTTP_STATUS_BAD_REQUEST;
case ELIMIT:
case ELOGOFF:
return HTTP_STATUS_SERVICE_UNAVAILABLE;
case EPERM:
return HTTP_STATUS_FORBIDDEN;
case ERPCTIMEDOUT:
case ETIMEDOUT:
return HTTP_STATUS_GATEWAY_TIMEOUT;
default:
return HTTP_STATUS_INTERNAL_SERVER_ERROR;
}
}
class HttpResponseSender {
friend class HttpResponseSenderAsDone;
public:
HttpResponseSender()
: _method_status(NULL), _received_us(0), _h2_stream_id(-1) {}
HttpResponseSender(Controller* cntl/*own*/)
: _cntl(cntl), _method_status(NULL), _received_us(0), _h2_stream_id(-1) {}
HttpResponseSender(HttpResponseSender&& s)
: _cntl(std::move(s._cntl))
, _req(std::move(s._req))
, _res(std::move(s._res))
, _method_status(std::move(s._method_status))
, _received_us(s._received_us)
, _h2_stream_id(s._h2_stream_id) {
}
~HttpResponseSender();
void own_request(google::protobuf::Message* req) { _req.reset(req); }
void own_response(google::protobuf::Message* res) { _res.reset(res); }
void set_method_status(MethodStatus* ms) { _method_status = ms; }
void set_received_us(int64_t t) { _received_us = t; }
void set_h2_stream_id(int id) { _h2_stream_id = id; }
private:
std::unique_ptr<Controller, LogErrorTextAndDelete> _cntl;
std::unique_ptr<google::protobuf::Message> _req;
std::unique_ptr<google::protobuf::Message> _res;
MethodStatus* _method_status;
int64_t _received_us;
int _h2_stream_id;
};
static void SendHttpResponse(Controller *cntl,
const google::protobuf::Message *req,
const google::protobuf::Message *res,
const Server* server,
MethodStatus* method_status,
int64_t received_us) {
class HttpResponseSenderAsDone : public google::protobuf::Closure {
public:
HttpResponseSenderAsDone(HttpResponseSender* s) : _sender(std::move(*s)) {}
void Run() override { delete this; }
private:
HttpResponseSender _sender;
};
HttpResponseSender::~HttpResponseSender() {
Controller* cntl = _cntl.get();
if (cntl == NULL) {
return;
}
ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span();
if (span) {
span->set_start_send_us(butil::cpuwide_time_us());
}
std::unique_ptr<Controller, LogErrorTextAndDelete> recycle_cntl(cntl);
ConcurrencyRemover concurrency_remover(method_status, cntl, received_us);
std::unique_ptr<const google::protobuf::Message> recycle_req(req);
std::unique_ptr<const google::protobuf::Message> recycle_res(res);
ConcurrencyRemover concurrency_remover(_method_status, cntl, _received_us);
Socket* socket = accessor.get_sending_socket();
const google::protobuf::Message* res = _res.get();
if (cntl->IsCloseConnection()) {
socket->SetFailed();
......@@ -668,7 +681,7 @@ static void SendHttpResponse(Controller *cntl,
// Set status-code with default value(converted from error code)
// if user did not set it.
if (res_header->status_code() == HTTP_STATUS_OK) {
res_header->set_status_code(ErrorCode2StatusCode(cntl->ErrorCode()));
res_header->set_status_code(ErrorCodeToStatusCode(cntl->ErrorCode()));
}
// Fill ErrorCode into header
res_header->SetHeader(common->ERROR_CODE,
......@@ -719,7 +732,8 @@ static void SendHttpResponse(Controller *cntl,
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (req_header->is_http2()) {
SocketMessagePtr<H2UnsentResponse> h2_response(H2UnsentResponse::New(cntl));
SocketMessagePtr<H2UnsentResponse>
h2_response(H2UnsentResponse::New(cntl, _h2_stream_id));
if (h2_response == NULL) {
LOG(ERROR) << "Fail to make http2 response";
errno = EINVAL;
......@@ -764,13 +778,6 @@ static void SendHttpResponse(Controller *cntl,
}
}
inline void SendHttpResponse(Controller *cntl, const Server* svr,
MethodStatus* method_status,
int64_t received_us) {
SendHttpResponse(cntl, NULL, NULL, svr, method_status, received_us);
}
// Normalize the sub string of `uri_path' covered by `splitter' and
// put it into `unresolved_path'
static void FillUnresolvedPath(std::string* unresolved_path,
......@@ -1070,13 +1077,22 @@ void ProcessHttpRequest(InputMessageBase *msg) {
Socket* socket = socket_guard.get();
const Server* server = static_cast<const Server*>(msg->arg());
ScopedNonServiceError non_service_error(server);
std::unique_ptr<Controller> cntl(new (std::nothrow) Controller);
if (NULL == cntl.get()) {
Controller* cntl = new (std::nothrow) Controller;
if (NULL == cntl) {
LOG(FATAL) << "Fail to new Controller";
return;
}
ControllerPrivateAccessor accessor(cntl.get());
HttpResponseSender resp_sender(cntl);
resp_sender.set_received_us(msg->received_us());
const bool is_http2 = imsg_guard->header().is_http2();
if (is_http2) {
H2StreamContext* http2_sctx = static_cast<H2StreamContext*>(msg);
resp_sender.set_h2_stream_id(http2_sctx->stream_id());
}
ControllerPrivateAccessor accessor(cntl);
HttpHeader& req_header = cntl->http_request();
imsg_guard->header().Swap(req_header);
butil::IOBuf& req_body = imsg_guard->body();
......@@ -1150,7 +1166,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
if (!server->IsRunning()) {
cntl->SetFailed(ELOGOFF, "Server is stopping");
return SendHttpResponse(cntl.release(), server, NULL, msg->received_us());
return;
}
if (server->options().http_master_service) {
......@@ -1160,23 +1176,18 @@ void ProcessHttpRequest(InputMessageBase *msg) {
svc->GetDescriptor()->FindMethodByName(common->DEFAULT_METHOD);
if (md == NULL) {
cntl->SetFailed(ENOMETHOD, "No default_method in http_master_service");
return SendHttpResponse(cntl.release(), server, NULL, msg->received_us());
return;
}
accessor.set_method(md);
cntl->request_attachment().swap(req_body);
google::protobuf::Closure* done = brpc::NewCallback<
Controller*, const google::protobuf::Message*,
const google::protobuf::Message*, const Server*,
MethodStatus *, int64_t>(
&SendHttpResponse, cntl.get(), NULL, NULL, server,
NULL, msg->received_us());
google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender);
if (span) {
span->ResetServerSpanName(md->full_name());
span->set_start_callback_us(butil::cpuwide_time_us());
span->AsParent();
}
// `cntl', `req' and `res' will be deleted inside `done'
return svc->CallMethod(md, cntl.release(), NULL, NULL, done);
return svc->CallMethod(md, cntl, NULL, NULL, done);
}
const Server::MethodProperty* const sp =
......@@ -1189,24 +1200,25 @@ void ProcessHttpRequest(InputMessageBase *msg) {
} else {
cntl->SetFailed(ENOMETHOD, "Fail to find method on `%s'", path.c_str());
}
return SendHttpResponse(cntl.release(), server, NULL, msg->received_us());
return;
} else if (sp->service->GetDescriptor() == BadMethodService::descriptor()) {
BadMethodRequest breq;
BadMethodResponse bres;
butil::StringSplitter split(path.c_str(), '/');
breq.set_service_name(std::string(split.field(), split.length()));
sp->service->CallMethod(sp->method, cntl.get(), &breq, &bres, NULL);
return SendHttpResponse(cntl.release(), server, NULL, msg->received_us());
sp->service->CallMethod(sp->method, cntl, &breq, &bres, NULL);
return;
}
// Switch to service-specific error.
non_service_error.release();
MethodStatus* method_status = sp->status;
resp_sender.set_method_status(method_status);
if (method_status) {
int rejected_cc = 0;
if (!method_status->OnRequested(&rejected_cc)) {
cntl->SetFailed(ELIMIT, "Rejected by %s's ConcurrencyLimiter, concurrency=%d",
sp->method->full_name().c_str(), rejected_cc);
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
return;
}
}
......@@ -1219,36 +1231,37 @@ void ProcessHttpRequest(InputMessageBase *msg) {
if (socket->is_overcrowded()) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
return;
}
if (!server_accessor.AddConcurrency(cntl.get())) {
if (!server_accessor.AddConcurrency(cntl)) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency);
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
return;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
cntl->SetFailed(ELIMIT, "Too many user code to run when"
" -usercode_in_pthread is on");
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
return;
}
} else if (security_mode) {
cntl->SetFailed(EPERM, "Not allowed to access builtin services, try "
"ServerOptions.internal_port=%d instead if you're in"
" internal network", server->options().internal_port);
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
return;
}
google::protobuf::Service* svc = sp->service;
const google::protobuf::MethodDescriptor* method = sp->method;
accessor.set_method(method);
std::unique_ptr<google::protobuf::Message> req(
svc->GetRequestPrototype(method).New());
std::unique_ptr<google::protobuf::Message> res(
svc->GetResponsePrototype(method).New());
google::protobuf::Message* req = svc->GetRequestPrototype(method).New();
resp_sender.own_request(req);
google::protobuf::Message* res = svc->GetResponsePrototype(method).New();
resp_sender.own_response(res);
if (__builtin_expect(!req || !res, 0)) {
PLOG(FATAL) << "Fail to new req or res";
cntl->SetFailed("Fail to new req or res");
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
return;
}
if (sp->params.allow_http_body_to_pb &&
method->input_type()->field_count() > 0) {
......@@ -1262,7 +1275,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
cntl->SetFailed(EREQUEST, "%s needs to be created from a"
" non-empty json, it has required fields.",
req->GetDescriptor()->full_name().c_str());
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
return;
} // else all fields of the request are optional.
} else {
const std::string* encoding =
......@@ -1273,15 +1286,15 @@ void ProcessHttpRequest(InputMessageBase *msg) {
butil::IOBuf uncompressed;
if (!policy::GzipDecompress(req_body, &uncompressed)) {
cntl->SetFailed(EREQUEST, "Fail to un-gzip request body");
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
return;
}
req_body.swap(uncompressed);
}
if (ParseContentType(req_header.content_type()) == HTTP_CONTENT_PROTO) {
if (!ParsePbFromIOBuf(req.get(), req_body)) {
if (!ParsePbFromIOBuf(req, req_body)) {
cntl->SetFailed(EREQUEST, "Fail to parse http body as %s",
req->GetDescriptor()->full_name().c_str());
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
return;
}
} else {
butil::IOBufAsZeroCopyInputStream wrapper(req_body);
......@@ -1289,10 +1302,10 @@ void ProcessHttpRequest(InputMessageBase *msg) {
json2pb::Json2PbOptions options;
options.base64_to_bytes = sp->params.pb_bytes_to_base64;
cntl->set_pb_bytes_to_base64(sp->params.pb_bytes_to_base64);
if (!json2pb::JsonToProtoMessage(&wrapper, req.get(), options, &err)) {
if (!json2pb::JsonToProtoMessage(&wrapper, req, options, &err)) {
cntl->SetFailed(EREQUEST, "Fail to parse http body as %s, %s",
req->GetDescriptor()->full_name().c_str(), err.c_str());
return SendHttpResponse(cntl.release(), server, method_status, msg->received_us());
return;
}
}
}
......@@ -1301,14 +1314,7 @@ void ProcessHttpRequest(InputMessageBase *msg) {
cntl->request_attachment().swap(req_body);
}
google::protobuf::Closure* done = brpc::NewCallback<
Controller*, const google::protobuf::Message*,
const google::protobuf::Message*, const Server*,
MethodStatus *, int64_t>(
&SendHttpResponse, cntl.get(),
req.get(), res.get(), server,
method_status, msg->received_us());
google::protobuf::Closure* done = new HttpResponseSenderAsDone(&resp_sender);
imsg_guard.reset(); // optional, just release resourse ASAP
if (span) {
......@@ -1316,17 +1322,13 @@ void ProcessHttpRequest(InputMessageBase *msg) {
span->AsParent();
}
if (!FLAGS_usercode_in_pthread) {
return svc->CallMethod(method, cntl.release(),
req.release(), res.release(), done);
return svc->CallMethod(method, cntl, req, res, done);
}
if (BeginRunningUserCode()) {
svc->CallMethod(method, cntl.release(),
req.release(), res.release(), done);
svc->CallMethod(method, cntl, req, res, done);
return EndRunningUserCodeInPlace();
} else {
return EndRunningCallMethodInPool(
svc, method, cntl.release(),
req.release(), res.release(), done);
return EndRunningCallMethodInPool(svc, method, cntl, req, res, done);
}
}
......
......@@ -489,6 +489,8 @@ public:
// Returns true if the remote side is overcrowded.
bool is_overcrowded() const { return _overcrowded; }
bthread_keytable_pool_t* keytable_pool() const { return _keytable_pool; }
private:
DISALLOW_COPY_AND_ASSIGN(Socket);
......
......@@ -211,13 +211,12 @@ protected:
brpc::Controller cntl;
test::EchoResponse res;
res.set_message(EXP_RESPONSE);
cntl.http_request()._h2_stream_id = h2_stream_id;
cntl.http_request().set_content_type("application/proto");
{
butil::IOBufAsZeroCopyOutputStream wrapper(&cntl.response_attachment());
EXPECT_TRUE(res.SerializeToZeroCopyStream(&wrapper));
}
brpc::policy::H2UnsentResponse* h2_res = brpc::policy::H2UnsentResponse::New(&cntl);
brpc::policy::H2UnsentResponse* h2_res = brpc::policy::H2UnsentResponse::New(&cntl, h2_stream_id);
butil::Status st = h2_res->AppendAndDestroySelf(out, _h2_client_sock.get());
ASSERT_TRUE(st.ok());
}
......@@ -687,7 +686,7 @@ TEST_F(HttpTest, read_long_body_progressively) {
{
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_HTTP2;
options.protocol = brpc::PROTOCOL_HTTP;
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
{
brpc::Controller cntl;
......@@ -1109,7 +1108,7 @@ TEST_F(HttpTest, http2_window_used_up) {
if (i == nsuc) {
// the last message should fail according to flow control policy.
ASSERT_FALSE(st.ok());
ASSERT_TRUE(st.error_code() == EAGAIN);
ASSERT_TRUE(st.error_code() == brpc::ELIMIT);
ASSERT_TRUE(butil::StringPiece(st.error_str()).starts_with("remote_window_left is not enough"));
} else {
ASSERT_TRUE(st.ok());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment