Commit 70d08547 authored by gejun's avatar gejun

call_id cannot be destroyed(via RPC interfaces) before being used by a RPC &…

call_id cannot be destroyed(via RPC interfaces) before being used by a RPC & Replace Controller._run_done_state with more general allow_done_to_run_in_place
parent 7bef18dd
...@@ -234,33 +234,6 @@ static void HandleBackupRequest(void* arg) { ...@@ -234,33 +234,6 @@ static void HandleBackupRequest(void* arg) {
bthread_id_error(correlation_id, EBACKUPREQUEST); bthread_id_error(correlation_id, EBACKUPREQUEST);
} }
static void* RunDone(void* arg) {
static_cast<google::protobuf::Closure*>(arg)->Run();
return NULL;
}
static void RunDoneInAnotherThread(google::protobuf::Closure* done) {
bthread_t bh;
bthread_attr_t attr = (FLAGS_usercode_in_pthread ?
BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
if (bthread_start_background(&bh, &attr, RunDone, done) != 0) {
LOG(FATAL) << "Fail to start bthread";
done->Run();
}
}
void RunDoneByState(Controller* cntl,
google::protobuf::Closure* done) {
if (done) {
if (cntl->_run_done_state == Controller::CALLMETHOD_CAN_RUN_DONE) {
cntl->_run_done_state = Controller::CALLMETHOD_DID_RUN_DONE;
done->Run();
} else {
RunDoneInAnotherThread(done);
}
}
}
void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller_base, google::protobuf::RpcController* controller_base,
const google::protobuf::Message* request, const google::protobuf::Message* request,
...@@ -284,19 +257,27 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, ...@@ -284,19 +257,27 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
correlation_id, NULL, 2 + cntl->max_retry()); correlation_id, NULL, 2 + cntl->max_retry());
if (rc != 0) { if (rc != 0) {
CHECK_EQ(EINVAL, rc); CHECK_EQ(EINVAL, rc);
const int err = cntl->ErrorCode(); if (!cntl->FailedInline()) {
if (err != ECANCELED) { cntl->SetFailed(EINVAL, "Fail to lock call_id=%" PRId64,
// it's very likely that user reused a un-Reset() Controller.
cntl->SetFailed((err ? err : EINVAL), "call_id=%" PRId64 " was "
"destroyed before CallMethod(), did you forget to "
"Reset() the Controller?",
correlation_id.value); correlation_id.value);
} else {
// not warn for canceling which is common.
} }
RunDoneByState(cntl, done); LOG_IF(ERROR, cntl->is_used_by_rpc())
<< "Controller=" << cntl << " was used by another RPC before. "
"Did you forget to Reset() it before reuse?";
// Have to run done in-place. If the done runs in another thread,
// Join() on this RPC is no-op and probably ends earlier than running
// the callback and releases resources used in the callback.
// Since this branch is only entered by wrongly-used RPC, the
// potentially introduced deadlock(caused by locking RPC and done with
// the same non-recursive lock) is acceptable and removable by fixing
// user's code.
if (done) {
done->Run();
}
return; return;
} }
cntl->set_used_by_rpc();
if (cntl->_sender == NULL && IsTraceable(Span::tls_parent())) { if (cntl->_sender == NULL && IsTraceable(Span::tls_parent())) {
const int64_t start_send_us = butil::cpuwide_time_us(); const int64_t start_send_us = butil::cpuwide_time_us();
const std::string* method_name = NULL; const std::string* method_name = NULL;
...@@ -353,6 +334,11 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, ...@@ -353,6 +334,11 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method,
"-usercode_in_pthread is on"); "-usercode_in_pthread is on");
return cntl->HandleSendFailed(); return cntl->HandleSendFailed();
} }
if (cntl->FailedInline()) {
// probably failed before RPC, not called until all necessary
// parameters in `cntl' are set.
return cntl->HandleSendFailed();
}
_serialize_request(&cntl->_request_buf, cntl, request); _serialize_request(&cntl->_request_buf, cntl, request);
if (cntl->FailedInline()) { if (cntl->FailedInline()) {
return cntl->HandleSendFailed(); return cntl->HandleSendFailed();
......
...@@ -156,7 +156,7 @@ void Controller::DeleteStuff() { ...@@ -156,7 +156,7 @@ void Controller::DeleteStuff() {
if (_correlation_id != INVALID_BTHREAD_ID && if (_correlation_id != INVALID_BTHREAD_ID &&
!has_flag(FLAGS_DESTROYED_CID)) { !has_flag(FLAGS_DESTROYED_CID)) {
bthread_id_cancel(_correlation_id); CHECK_NE(EPERM, bthread_id_cancel(_correlation_id));
} }
if (_oncancel_id != INVALID_BTHREAD_ID) { if (_oncancel_id != INVALID_BTHREAD_ID) {
bthread_id_error(_oncancel_id, 0); bthread_id_error(_oncancel_id, 0);
...@@ -206,8 +206,6 @@ void Controller::InternalReset(bool in_constructor) { ...@@ -206,8 +206,6 @@ void Controller::InternalReset(bool in_constructor) {
_error_code = 0; _error_code = 0;
_remote_side = butil::EndPoint(); _remote_side = butil::EndPoint();
_local_side = butil::EndPoint(); _local_side = butil::EndPoint();
_begin_time_us = 0;
_end_time_us = 0;
_session_local_data = NULL; _session_local_data = NULL;
_server = NULL; _server = NULL;
_oncancel_id = INVALID_BTHREAD_ID; _oncancel_id = INVALID_BTHREAD_ID;
...@@ -222,8 +220,10 @@ void Controller::InternalReset(bool in_constructor) { ...@@ -222,8 +220,10 @@ void Controller::InternalReset(bool in_constructor) {
_backup_request_ms = UNSET_MAGIC_NUM; _backup_request_ms = UNSET_MAGIC_NUM;
_connect_timeout_ms = UNSET_MAGIC_NUM; _connect_timeout_ms = UNSET_MAGIC_NUM;
_abstime_us = -1; _abstime_us = -1;
_timeout_id = 0;
_begin_time_us = 0;
_end_time_us = 0;
_tos = 0; _tos = 0;
_run_done_state = CALLMETHOD_CANNOT_RUN_DONE;
_preferred_index = -1; _preferred_index = -1;
_request_compress_type = COMPRESS_TYPE_NONE; _request_compress_type = COMPRESS_TYPE_NONE;
_response_compress_type = COMPRESS_TYPE_NONE; _response_compress_type = COMPRESS_TYPE_NONE;
...@@ -242,7 +242,6 @@ void Controller::InternalReset(bool in_constructor) { ...@@ -242,7 +242,6 @@ void Controller::InternalReset(bool in_constructor) {
_pack_request = NULL; _pack_request = NULL;
_method = NULL; _method = NULL;
_auth = NULL; _auth = NULL;
_timeout_id = 0;
_idl_names = idl_single_req_single_res; _idl_names = idl_single_req_single_res;
_idl_result = IDL_VOID_RESULT; _idl_result = IDL_VOID_RESULT;
_http_request = NULL; _http_request = NULL;
...@@ -630,11 +629,10 @@ END_OF_RPC: ...@@ -630,11 +629,10 @@ END_OF_RPC:
bthread_attr_t attr = (FLAGS_usercode_in_pthread ? bthread_attr_t attr = (FLAGS_usercode_in_pthread ?
BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL); BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
_tmp_completion_info = info; _tmp_completion_info = info;
if (bthread_start_background(&bt, &attr, RunEndRPC, this) == 0) { if (bthread_start_background(&bt, &attr, RunEndRPC, this) != 0) {
return;
}
LOG(FATAL) << "Fail to start bthread"; LOG(FATAL) << "Fail to start bthread";
EndRPC(info); EndRPC(info);
}
} else { } else {
if (_done != NULL/*Note[_done]*/ && if (_done != NULL/*Note[_done]*/ &&
!has_flag(FLAGS_DESTROY_CID_IN_DONE)/*Note[cid]*/) { !has_flag(FLAGS_DESTROY_CID_IN_DONE)/*Note[cid]*/) {
...@@ -894,18 +892,20 @@ void Controller::SubmitSpan() { ...@@ -894,18 +892,20 @@ void Controller::SubmitSpan() {
} }
void Controller::HandleSendFailed() { void Controller::HandleSendFailed() {
// NOTE: Must launch new thread to run the callback in an asynchronous
// call. Users may hold a lock before asynchronus CallMethod returns and
// grab the same lock inside done->Run(). If we call done->Run() in the
// same stack of CallMethod, we're deadlocked.
// We don't need to run the callback with new thread in a sync call since
// the created thread needs to be joined anyway before CallMethod ends.
if (!FailedInline()) { if (!FailedInline()) {
SetFailed("Must be SetFailed() before calling HandleSendFailed()"); SetFailed("Must be SetFailed() before calling HandleSendFailed()");
LOG(FATAL) << ErrorText(); LOG(FATAL) << ErrorText();
} }
CompletionInfo info = { current_id(), false }; const CompletionInfo info = { current_id(), false };
OnVersionedRPCReturned(info, _done != NULL, _error_code); // NOTE: Launch new thread to run the callback in an asynchronous call
// (and done is not allowed to run in-place)
// Users may hold a lock before asynchronus CallMethod returns and
// grab the same lock inside done->Run(). If done->Run() is called in the
// same stack of CallMethod, the code is deadlocked.
// We don't need to run the callback in new thread in a sync call since
// the created thread needs to be joined anyway before end of CallMethod.
const bool new_bthread = (_done != NULL && !is_done_allowed_to_run_in_place());
OnVersionedRPCReturned(info, new_bthread, _error_code);
} }
void Controller::IssueRPC(int64_t start_realtime_us) { void Controller::IssueRPC(int64_t start_realtime_us) {
...@@ -1122,6 +1122,16 @@ void Controller::set_auth_context(const AuthContext* ctx) { ...@@ -1122,6 +1122,16 @@ void Controller::set_auth_context(const AuthContext* ctx) {
int Controller::HandleSocketFailed(bthread_id_t id, void* data, int error_code, int Controller::HandleSocketFailed(bthread_id_t id, void* data, int error_code,
const std::string& error_text) { const std::string& error_text) {
Controller* cntl = static_cast<Controller*>(data); Controller* cntl = static_cast<Controller*>(data);
if (!cntl->is_used_by_rpc()) {
// Cannot destroy the call_id before RPC otherwise an async RPC
// using the controller cannot be joined and related resources may be
// destroyed before done->Run() running in another bthread.
// The error set will be detected in Channel::CallMethod and fail
// the RPC.
cntl->SetFailed(error_code, "Cancel call_id=%" PRId64
" before CallMethod()", id.value);
return bthread_id_unlock(id);
}
const int saved_error = cntl->ErrorCode(); const int saved_error = cntl->ErrorCode();
if (error_code == ERPCTIMEDOUT) { if (error_code == ERPCTIMEDOUT) {
cntl->SetFailed(error_code, "Reached timeout=%" PRId64 "ms @%s", cntl->SetFailed(error_code, "Reached timeout=%" PRId64 "ms @%s",
......
...@@ -96,7 +96,6 @@ friend class SelectiveChannel; ...@@ -96,7 +96,6 @@ friend class SelectiveChannel;
friend class schan::Sender; friend class schan::Sender;
friend class schan::SubDone; friend class schan::SubDone;
friend class policy::OnServerStreamCreated; friend class policy::OnServerStreamCreated;
friend void RunDoneByState(Controller*, google::protobuf::Closure*);
friend int StreamCreate(StreamId*, Controller&, const StreamOptions*); friend int StreamCreate(StreamId*, Controller&, const StreamOptions*);
friend int StreamAccept(StreamId*, Controller&, const StreamOptions*); friend int StreamAccept(StreamId*, Controller&, const StreamOptions*);
friend void policy::ProcessMongoRequest(InputMessageBase*); friend void policy::ProcessMongoRequest(InputMessageBase*);
...@@ -116,6 +115,8 @@ friend void policy::ProcessMongoRequest(InputMessageBase*); ...@@ -116,6 +115,8 @@ friend void policy::ProcessMongoRequest(InputMessageBase*);
static const uint32_t FLAGS_LOG_ID = (1 << 9); // log_id is set static const uint32_t FLAGS_LOG_ID = (1 << 9); // log_id is set
static const uint32_t FLAGS_REQUEST_CODE = (1 << 10); static const uint32_t FLAGS_REQUEST_CODE = (1 << 10);
static const uint32_t FLAGS_PB_BYTES_TO_BASE64 = (1 << 11); static const uint32_t FLAGS_PB_BYTES_TO_BASE64 = (1 << 11);
static const uint32_t FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE = (1 << 12);
static const uint32_t FLAGS_USED_BY_RPC = (1 << 13);
public: public:
Controller(); Controller();
...@@ -262,7 +263,20 @@ public: ...@@ -262,7 +263,20 @@ public:
// Set if the field of bytes in protobuf message should be encoded // Set if the field of bytes in protobuf message should be encoded
// to base64 string in HTTP request. // to base64 string in HTTP request.
void set_pb_bytes_to_base64(bool f) { set_flag(FLAGS_PB_BYTES_TO_BASE64, f); } void set_pb_bytes_to_base64(bool f) { set_flag(FLAGS_PB_BYTES_TO_BASE64, f); }
bool has_pb_bytes_to_base64() { return has_flag(FLAGS_PB_BYTES_TO_BASE64); } bool has_pb_bytes_to_base64() const { return has_flag(FLAGS_PB_BYTES_TO_BASE64); }
// Tell RPC that done of the RPC can be run in the same thread where
// the RPC is issued, otherwise done is always run in a different thread.
// In current implementation, this option only affects RPC that fails
// before sending the request.
// This option is *rarely* needed by ordinary users. Don't set this option
// if you don't know the consequences. Read implementions in channel.cpp
// and controller.cpp to know more.
void allow_done_to_run_in_place()
{ add_flag(FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE); }
// True iff above method was called.
bool is_done_allowed_to_run_in_place() const
{ return has_flag(FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE); }
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Server-side methods. // Server-side methods.
...@@ -506,11 +520,6 @@ private: ...@@ -506,11 +520,6 @@ private:
// Append server information to `_error_text' // Append server information to `_error_text'
void AppendServerIdentiy(); void AppendServerIdentiy();
// Used by ParallelChannel
static const int8_t CALLMETHOD_CANNOT_RUN_DONE = 0;
static const int8_t CALLMETHOD_CAN_RUN_DONE = 1;
static const int8_t CALLMETHOD_DID_RUN_DONE = 2;
// Contexts for tracking and ending a sent request. // Contexts for tracking and ending a sent request.
// One RPC to a channel may send several requests due to retrying. // One RPC to a channel may send several requests due to retrying.
struct Call { struct Call {
...@@ -558,6 +567,9 @@ private: ...@@ -558,6 +567,9 @@ private:
{ return t ? add_flag(f) : clear_flag(f); } { return t ? add_flag(f) : clear_flag(f); }
inline bool has_flag(uint32_t f) const { return _flags & f; } inline bool has_flag(uint32_t f) const { return _flags & f; }
void set_used_by_rpc() { add_flag(FLAGS_USED_BY_RPC); }
bool is_used_by_rpc() const { return has_flag(FLAGS_USED_BY_RPC); }
private: private:
// NOTE: align and group fields to make Controller as compact as possible. // NOTE: align and group fields to make Controller as compact as possible.
...@@ -603,9 +615,7 @@ private: ...@@ -603,9 +615,7 @@ private:
// Begin/End time of a single RPC call (since Epoch in microseconds) // Begin/End time of a single RPC call (since Epoch in microseconds)
int64_t _begin_time_us; int64_t _begin_time_us;
int64_t _end_time_us; int64_t _end_time_us;
short _tos; // Type of service. short _tos; // Type of service.
int8_t _run_done_state;
// The index of parse function which `InputMessenger' will use // The index of parse function which `InputMessenger' will use
int _preferred_index; int _preferred_index;
CompressType _request_compress_type; CompressType _request_compress_type;
......
...@@ -51,7 +51,9 @@ private: ...@@ -51,7 +51,9 @@ private:
, _current_fail(0) , _current_fail(0)
, _current_done(0) , _current_done(0)
, _cntl(cntl) , _cntl(cntl)
, _user_done(user_done) { , _user_done(user_done)
, _callmethod_bthread(INVALID_BTHREAD)
, _callmethod_pthread(0) {
} }
~ParallelChannelDone() { } ~ParallelChannelDone() { }
...@@ -137,7 +139,7 @@ public: ...@@ -137,7 +139,7 @@ public:
for (int i = 0; i < ndone; ++i) { for (int i = 0; i < ndone; ++i) {
new (d->sub_done(i)) SubDone; new (d->sub_done(i)) SubDone;
d->sub_done(i)->cntl.ApplyClientSettings(settings); d->sub_done(i)->cntl.ApplyClientSettings(settings);
d->sub_done(i)->cntl._run_done_state = Controller::CALLMETHOD_CAN_RUN_DONE; d->sub_done(i)->cntl.allow_done_to_run_in_place();
} }
// Setup the map for finding sub_done of i-th sub_channel // Setup the map for finding sub_done of i-th sub_channel
if (ndone != nchan) { if (ndone != nchan) {
...@@ -196,6 +198,21 @@ public: ...@@ -196,6 +198,21 @@ public:
return NULL; return NULL;
} }
// For otherwhere to know if they're in the same thread.
void SaveThreadInfoOfCallsite() {
_callmethod_bthread = bthread_self();
if (_callmethod_bthread == INVALID_BTHREAD) {
_callmethod_pthread = pthread_self();
}
}
bool IsSameThreadAsCallMethod() const {
if (_callmethod_bthread != INVALID_BTHREAD) {
return bthread_self() == _callmethod_bthread;
}
return pthread_self() == _callmethod_pthread;
}
void OnSubDoneRun(SubDone* fin) { void OnSubDoneRun(SubDone* fin) {
if (fin != NULL) { if (fin != NULL) {
// [ called from SubDone::Run() ] // [ called from SubDone::Run() ]
...@@ -261,7 +278,8 @@ public: ...@@ -261,7 +278,8 @@ public:
butil::atomic_thread_fence(butil::memory_order_acquire); butil::atomic_thread_fence(butil::memory_order_acquire);
if (fin != NULL && if (fin != NULL &&
fin->cntl._run_done_state == Controller::CALLMETHOD_DID_RUN_DONE) { !_cntl->is_done_allowed_to_run_in_place() &&
IsSameThreadAsCallMethod()) {
// A sub channel's CallMethod calls a subdone directly, create a // A sub channel's CallMethod calls a subdone directly, create a
// thread to run OnComplete. // thread to run OnComplete.
bthread_t bh; bthread_t bh;
...@@ -413,9 +431,12 @@ private: ...@@ -413,9 +431,12 @@ private:
butil::atomic<uint32_t> _current_done; butil::atomic<uint32_t> _current_done;
Controller* _cntl; Controller* _cntl;
google::protobuf::Closure* _user_done; google::protobuf::Closure* _user_done;
bthread_t _callmethod_bthread;
pthread_t _callmethod_pthread;
SubDone _sub_done[0]; SubDone _sub_done[0];
}; };
// Used in controller.cpp
void DestroyParallelChannelDone(google::protobuf::Closure* c) { void DestroyParallelChannelDone(google::protobuf::Closure* c) {
ParallelChannelDone::Destroy(static_cast<ParallelChannelDone*>(c)); ParallelChannelDone::Destroy(static_cast<ParallelChannelDone*>(c));
} }
...@@ -512,7 +533,17 @@ static void HandleTimeout(void* arg) { ...@@ -512,7 +533,17 @@ static void HandleTimeout(void* arg) {
bthread_id_error(correlation_id, ERPCTIMEDOUT); bthread_id_error(correlation_id, ERPCTIMEDOUT);
} }
void RunDoneByState(Controller*, google::protobuf::Closure*); void* ParallelChannel::RunDoneAndDestroy(void* arg) {
Controller* c = static_cast<Controller*>(arg);
// Move done out from the controller.
google::protobuf::Closure* done = c->_done;
c->_done = NULL;
// Save call_id from the controller which may be deleted after Run().
const bthread_id_t cid = c->call_id();
done->Run();
CHECK_EQ(0, bthread_id_unlock_and_destroy(cid));
return NULL;
}
void ParallelChannel::CallMethod( void ParallelChannel::CallMethod(
const google::protobuf::MethodDescriptor* method, const google::protobuf::MethodDescriptor* method,
...@@ -530,25 +561,30 @@ void ParallelChannel::CallMethod( ...@@ -530,25 +561,30 @@ void ParallelChannel::CallMethod(
const int rc = bthread_id_lock(cid, NULL); const int rc = bthread_id_lock(cid, NULL);
if (rc != 0) { if (rc != 0) {
CHECK_EQ(EINVAL, rc); CHECK_EQ(EINVAL, rc);
const int err = cntl->ErrorCode(); if (!cntl->FailedInline()) {
if (err != ECANCELED) { cntl->SetFailed(EINVAL, "Fail to lock call_id=%" PRId64, cid.value);
// it's very likely that user reused a un-Reset() Controller. }
cntl->SetFailed((err ? err : EINVAL), LOG_IF(ERROR, cntl->is_used_by_rpc())
"call_id=%lld was destroyed before CallMethod(), " << "Controller=" << cntl << " was used by another RPC before. "
"did you forget to Reset() the Controller?", "Did you forget to Reset() it before reuse?";
(long long)cid.value); // Have to run done in-place.
} else { // Read comment in CallMethod() in channel.cpp for details.
// not warn for canceling which is common. if (done) {
done->Run();
} }
RunDoneByState(cntl, done);
return; return;
} }
cntl->set_used_by_rpc();
ParallelChannelDone* d = NULL; ParallelChannelDone* d = NULL;
int ndone = nchan; int ndone = nchan;
int fail_limit = 1; int fail_limit = 1;
DEFINE_SMALL_ARRAY(SubCall, aps, nchan, 64); DEFINE_SMALL_ARRAY(SubCall, aps, nchan, 64);
if (cntl->FailedInline()) {
// The call_id is cancelled before RPC.
goto FAIL;
}
// we don't support http whose response is NULL. // we don't support http whose response is NULL.
if (response == NULL) { if (response == NULL) {
cntl->SetFailed(EINVAL, "response must be non-NULL"); cntl->SetFailed(EINVAL, "response must be non-NULL");
...@@ -634,8 +670,9 @@ void ParallelChannel::CallMethod( ...@@ -634,8 +670,9 @@ void ParallelChannel::CallMethod(
} else { } else {
cntl->_abstime_us = -1; cntl->_abstime_us = -1;
} }
d->SaveThreadInfoOfCallsite();
CHECK_EQ(0, bthread_id_unlock(cid)); CHECK_EQ(0, bthread_id_unlock(cid));
// Don't touch cntl again. // Don't touch `cntl' and `d' again (for async RPC)
for (int i = 0, j = 0; i < nchan; ++i) { for (int i = 0, j = 0; i < nchan; ++i) {
if (!aps[i].is_skip()) { if (!aps[i].is_skip()) {
...@@ -655,13 +692,27 @@ void ParallelChannel::CallMethod( ...@@ -655,13 +692,27 @@ void ParallelChannel::CallMethod(
return; return;
FAIL: FAIL:
// The RPC was failed after locking call_id and before calling sub channels.
if (d) { if (d) {
// The call ends before calling CallMethod of any sub channel, we // Set the _done to NULL to make sure cntl->sub(any_index) is NULL.
// set the _done to NULL to make sure cntl->sub(any_index) is NULL.
cntl->_done = NULL; cntl->_done = NULL;
ParallelChannelDone::Destroy(d); ParallelChannelDone::Destroy(d);
} }
RunDoneByState(cntl, done); if (done) {
if (!cntl->is_done_allowed_to_run_in_place()) {
bthread_t bh;
bthread_attr_t attr = (FLAGS_usercode_in_pthread ?
BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL);
// Hack: save done in cntl->_done to remove a malloc of args.
cntl->_done = done;
if (bthread_start_background(&bh, &attr, RunDoneAndDestroy, cntl) == 0) {
return;
}
cntl->_done = NULL;
LOG(FATAL) << "Fail to start bthread";
}
done->Run();
}
CHECK_EQ(0, bthread_id_unlock_and_destroy(cid)); CHECK_EQ(0, bthread_id_unlock_and_destroy(cid));
} }
......
...@@ -240,6 +240,7 @@ public: ...@@ -240,6 +240,7 @@ public:
typedef std::vector<SubChan> ChannelList; typedef std::vector<SubChan> ChannelList;
protected: protected:
static void* RunDoneAndDestroy(void* arg);
int CheckHealth(); int CheckHealth();
ParallelChannelOptions _options; ParallelChannelOptions _options;
......
...@@ -249,8 +249,6 @@ int PartitionChannel::Init(int num_partition_kinds, ...@@ -249,8 +249,6 @@ int PartitionChannel::Init(int num_partition_kinds,
return 0; return 0;
} }
void RunDoneByState(Controller*, google::protobuf::Closure*);
void PartitionChannel::CallMethod( void PartitionChannel::CallMethod(
const google::protobuf::MethodDescriptor* method, const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller, google::protobuf::RpcController* controller,
...@@ -263,7 +261,11 @@ void PartitionChannel::CallMethod( ...@@ -263,7 +261,11 @@ void PartitionChannel::CallMethod(
Controller* cntl = static_cast<Controller*>(controller); Controller* cntl = static_cast<Controller*>(controller);
cntl->SetFailed(EINVAL, "PartitionChannel=%p is not initialized yet", cntl->SetFailed(EINVAL, "PartitionChannel=%p is not initialized yet",
this); this);
RunDoneByState(cntl, done); // This is a branch only entered by wrongly-used RPC, just call done
// in-place. See comments in channel.cpp on deadlock concerns.
if (done) {
done->Run();
}
} }
} }
......
...@@ -531,8 +531,6 @@ void SelectiveChannel::RemoveAndDestroyChannel(ChannelHandle handle) { ...@@ -531,8 +531,6 @@ void SelectiveChannel::RemoveAndDestroyChannel(ChannelHandle handle) {
lb->RemoveAndDestroyChannel(handle); lb->RemoveAndDestroyChannel(handle);
} }
void RunDoneByState(Controller*, google::protobuf::Closure*);
void SelectiveChannel::CallMethod( void SelectiveChannel::CallMethod(
const google::protobuf::MethodDescriptor* method, const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller_base, google::protobuf::RpcController* controller_base,
...@@ -543,7 +541,6 @@ void SelectiveChannel::CallMethod( ...@@ -543,7 +541,6 @@ void SelectiveChannel::CallMethod(
if (!initialized()) { if (!initialized()) {
cntl->SetFailed(EINVAL, "SelectiveChannel=%p is not initialized yet", cntl->SetFailed(EINVAL, "SelectiveChannel=%p is not initialized yet",
this); this);
return RunDoneByState(cntl, user_done);
} }
schan::Sender* sndr = new schan::Sender(cntl, request, response, user_done); schan::Sender* sndr = new schan::Sender(cntl, request, response, user_done);
cntl->_sender = sndr; cntl->_sender = sndr;
......
...@@ -2440,7 +2440,7 @@ TEST_F(ChannelTest, global_channel_should_quit_successfully) { ...@@ -2440,7 +2440,7 @@ TEST_F(ChannelTest, global_channel_should_quit_successfully) {
g_chan.Init("bns://qa-pbrpc.SAT.tjyx", "rr", NULL); g_chan.Init("bns://qa-pbrpc.SAT.tjyx", "rr", NULL);
} }
TEST_F(ChannelTest, unused) { TEST_F(ChannelTest, unused_call_id) {
{ {
brpc::Controller cntl; brpc::Controller cntl;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment