Commit ed3133e1 authored by gejun's avatar gejun

fix an issue that thrift_done->Run() does nothing

parent 3afac97f
...@@ -187,8 +187,9 @@ public: ...@@ -187,8 +187,9 @@ public:
// [Required] Call this to send response back to the client. // [Required] Call this to send response back to the client.
void Run() override; void Run() override;
// Run() is suspended before this method is called. // Suspend/resume calling DoRun().
void AllowToRun(); void SuspendRunning();
void ResumeRunning();
private: private:
void DoRun(); void DoRun();
...@@ -202,21 +203,25 @@ friend void ProcessThriftRequest(InputMessageBase* msg_base); ...@@ -202,21 +203,25 @@ friend void ProcessThriftRequest(InputMessageBase* msg_base);
}; };
inline ThriftClosure::ThriftClosure() inline ThriftClosure::ThriftClosure()
: _run_counter(0), _received_us(0) { : _run_counter(1), _received_us(0) {
} }
ThriftClosure::~ThriftClosure() { ThriftClosure::~ThriftClosure() {
LogErrorTextAndDelete(false)(&_controller); LogErrorTextAndDelete(false)(&_controller);
} }
inline void ThriftClosure::AllowToRun() { inline void ThriftClosure::SuspendRunning() {
if (_run_counter.fetch_add(1, butil::memory_order_relaxed) == 1) { _run_counter.fetch_add(1, butil::memory_order_relaxed);
}
inline void ThriftClosure::ResumeRunning() {
if (_run_counter.fetch_sub(1, butil::memory_order_relaxed) == 1) {
DoRun(); DoRun();
} }
} }
void ThriftClosure::Run() { void ThriftClosure::Run() {
if (_run_counter.fetch_add(1, butil::memory_order_relaxed) == 1) { if (_run_counter.fetch_sub(1, butil::memory_order_relaxed) == 1) {
DoRun(); DoRun();
} }
} }
...@@ -385,8 +390,9 @@ inline void ProcessThriftFramedRequestNoExcept(ThriftService* service, ...@@ -385,8 +390,9 @@ inline void ProcessThriftFramedRequestNoExcept(ThriftService* service,
ThriftFramedMessage* req, ThriftFramedMessage* req,
ThriftFramedMessage* res, ThriftFramedMessage* res,
ThriftClosure* done) { ThriftClosure* done) {
// NOTE: done is not actually run before AllowToRun() is called so that // NOTE: done is not actually run before ResumeRunning() is called so that
// we can still set `cntl' in the catch branch. // we can still set `cntl' in the catch branch.
done->SuspendRunning();
try { try {
service->ProcessThriftFramedRequest(cntl, req, res, done); service->ProcessThriftFramedRequest(cntl, req, res, done);
} catch (std::exception& e) { } catch (std::exception& e) {
...@@ -398,7 +404,7 @@ inline void ProcessThriftFramedRequestNoExcept(ThriftService* service, ...@@ -398,7 +404,7 @@ inline void ProcessThriftFramedRequestNoExcept(ThriftService* service,
} catch (...) { } catch (...) {
cntl->SetFailed(EINTERNAL, "Catched unknown exception"); cntl->SetFailed(EINTERNAL, "Catched unknown exception");
} }
done->AllowToRun(); done->ResumeRunning();
} }
struct CallMethodInBackupThreadArgs { struct CallMethodInBackupThreadArgs {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment