Commit 66578349 authored by zhujiashun's avatar zhujiashun

Change Controller::rpc_dump_meta to Controller::sampled_request

parent 585d5393
...@@ -41,7 +41,7 @@ ...@@ -41,7 +41,7 @@
#include "brpc/retry_policy.h" #include "brpc/retry_policy.h"
#include "brpc/stream_impl.h" #include "brpc/stream_impl.h"
#include "brpc/policy/streaming_rpc_protocol.h" // FIXME #include "brpc/policy/streaming_rpc_protocol.h" // FIXME
#include "brpc/rpc_dump.pb.h" #include "brpc/rpc_dump.h"
#include "brpc/details/usercode_backup_pool.h" // RunUserCode #include "brpc/details/usercode_backup_pool.h" // RunUserCode
#include "brpc/mongo_service_adaptor.h" #include "brpc/mongo_service_adaptor.h"
...@@ -159,7 +159,7 @@ void Controller::ResetNonPods() { ...@@ -159,7 +159,7 @@ void Controller::ResetNonPods() {
_server->_session_local_data_pool->Return(_session_local_data); _server->_session_local_data_pool->Return(_session_local_data);
} }
_mongo_session_data.reset(); _mongo_session_data.reset();
delete _rpc_dump_meta; delete _sampled_request;
if (!is_used_by_rpc() && _correlation_id != INVALID_BTHREAD_ID) { if (!is_used_by_rpc() && _correlation_id != INVALID_BTHREAD_ID) {
CHECK_NE(EPERM, bthread_id_cancel(_correlation_id)); CHECK_NE(EPERM, bthread_id_cancel(_correlation_id));
...@@ -213,7 +213,7 @@ void Controller::ResetPods() { ...@@ -213,7 +213,7 @@ void Controller::ResetPods() {
_server = NULL; _server = NULL;
_oncancel_id = INVALID_BTHREAD_ID; _oncancel_id = INVALID_BTHREAD_ID;
_auth_context = NULL; _auth_context = NULL;
_rpc_dump_meta = NULL; _sampled_request = NULL;
_request_protocol = PROTOCOL_UNKNOWN; _request_protocol = PROTOCOL_UNKNOWN;
_max_retry = UNSET_MAGIC_NUM; _max_retry = UNSET_MAGIC_NUM;
_retry_policy = NULL; _retry_policy = NULL;
...@@ -1331,9 +1331,9 @@ void WebEscape(const std::string& source, std::string* output) { ...@@ -1331,9 +1331,9 @@ void WebEscape(const std::string& source, std::string* output) {
} }
} }
void Controller::reset_rpc_dump_meta(RpcDumpMeta* meta) { void Controller::reset_sampled_request(SampledRequest* req) {
delete _rpc_dump_meta; delete _sampled_request;
_rpc_dump_meta = meta; _sampled_request = req;
} }
void Controller::set_stream_creator(StreamCreator* sc) { void Controller::set_stream_creator(StreamCreator* sc) {
......
...@@ -63,7 +63,7 @@ class SharedLoadBalancer; ...@@ -63,7 +63,7 @@ class SharedLoadBalancer;
class ExcludedServers; class ExcludedServers;
class RPCSender; class RPCSender;
class StreamSettings; class StreamSettings;
class RpcDumpMeta; class SampledRequest;
class MongoContext; class MongoContext;
class RetryPolicy; class RetryPolicy;
class InputMessageBase; class InputMessageBase;
...@@ -258,10 +258,10 @@ public: ...@@ -258,10 +258,10 @@ public:
int sub_count() const; int sub_count() const;
const Controller* sub(int index) const; const Controller* sub(int index) const;
// Get/own RpcDumpMeta for sending dumped requests. // Get/own SampledRquest for sending dumped requests.
// Deleted along with controller. // Deleted along with controller.
void reset_rpc_dump_meta(RpcDumpMeta* meta); void reset_sampled_request(SampledRequest* req);
const RpcDumpMeta* rpc_dump_meta() { return _rpc_dump_meta; } const SampledRequest* sampled_request() { return _sampled_request; }
// Attach a StreamCreator to this RPC. Notice that the ownership of sc has // Attach a StreamCreator to this RPC. Notice that the ownership of sc has
// been transferred to cntl, and sc->DestroyStreamCreator() would be called // been transferred to cntl, and sc->DestroyStreamCreator() would be called
...@@ -672,7 +672,7 @@ private: ...@@ -672,7 +672,7 @@ private:
bthread_id_t _oncancel_id; bthread_id_t _oncancel_id;
const AuthContext* _auth_context; // Authentication result const AuthContext* _auth_context; // Authentication result
butil::intrusive_ptr<MongoContext> _mongo_session_data; butil::intrusive_ptr<MongoContext> _mongo_session_data;
RpcDumpMeta* _rpc_dump_meta; SampledRequest* _sampled_request;
ProtocolType _request_protocol; ProtocolType _request_protocol;
// Some of them are copied from `Channel' which might be destroyed // Some of them are copied from `Channel' which might be destroyed
......
...@@ -636,11 +636,11 @@ void PackRpcRequest(butil::IOBuf* req_buf, ...@@ -636,11 +636,11 @@ void PackRpcRequest(butil::IOBuf* req_buf,
method->service()->name()); method->service()->name());
request_meta->set_method_name(method->name()); request_meta->set_method_name(method->name());
meta.set_compress_type(cntl->request_compress_type()); meta.set_compress_type(cntl->request_compress_type());
} else if (cntl->rpc_dump_meta()) { } else if (cntl->sampled_request()) {
// Replaying. Keep service-name as the one seen by server. // Replaying. Keep service-name as the one seen by server.
request_meta->set_service_name(cntl->rpc_dump_meta()->service_name()); request_meta->set_service_name(cntl->sampled_request()->meta.service_name());
request_meta->set_method_name(cntl->rpc_dump_meta()->method_name()); request_meta->set_method_name(cntl->sampled_request()->meta.method_name());
meta.set_compress_type(cntl->rpc_dump_meta()->compress_type()); meta.set_compress_type(cntl->sampled_request()->meta.compress_type());
} else { } else {
return cntl->SetFailed(ENOMETHOD, "%s.method is NULL", __FUNCTION__); return cntl->SetFailed(ENOMETHOD, "%s.method is NULL", __FUNCTION__);
} }
......
...@@ -639,13 +639,13 @@ void PackHuluRequest(butil::IOBuf* req_buf, ...@@ -639,13 +639,13 @@ void PackHuluRequest(butil::IOBuf* req_buf,
meta.set_service_name(method->service()->name()); meta.set_service_name(method->service()->name());
meta.set_method_index(method->index()); meta.set_method_index(method->index());
meta.set_compress_type(CompressType2Hulu(cntl->request_compress_type())); meta.set_compress_type(CompressType2Hulu(cntl->request_compress_type()));
} else if (cntl->rpc_dump_meta()) { } else if (cntl->sampled_request()) {
// Replaying. Keep service-name as the one seen by server. // Replaying. Keep service-name as the one seen by server.
meta.set_service_name(cntl->rpc_dump_meta()->service_name()); meta.set_service_name(cntl->sampled_request()->meta.service_name());
meta.set_method_index(cntl->rpc_dump_meta()->method_index()); meta.set_method_index(cntl->sampled_request()->meta.method_index());
meta.set_compress_type( meta.set_compress_type(
CompressType2Hulu(cntl->rpc_dump_meta()->compress_type())); CompressType2Hulu(cntl->sampled_request()->meta.compress_type()));
meta.set_user_data(cntl->rpc_dump_meta()->user_data()); meta.set_user_data(cntl->sampled_request()->meta.user_data());
} else { } else {
return cntl->SetFailed(ENOMETHOD, "method is NULL"); return cntl->SetFailed(ENOMETHOD, "method is NULL");
} }
......
...@@ -545,11 +545,11 @@ void PackSofaRequest(butil::IOBuf* req_buf, ...@@ -545,11 +545,11 @@ void PackSofaRequest(butil::IOBuf* req_buf,
if (method) { if (method) {
meta.set_method(method->full_name()); meta.set_method(method->full_name());
meta.set_compress_type(CompressType2Sofa(cntl->request_compress_type())); meta.set_compress_type(CompressType2Sofa(cntl->request_compress_type()));
} else if (cntl->rpc_dump_meta()) { } else if (cntl->sampled_request()) {
// Replaying. // Replaying.
meta.set_method(cntl->rpc_dump_meta()->method_name()); meta.set_method(cntl->sampled_request()->meta.method_name());
meta.set_compress_type( meta.set_compress_type(
CompressType2Sofa(cntl->rpc_dump_meta()->compress_type())); CompressType2Sofa(cntl->sampled_request()->meta.compress_type()));
} else { } else {
return cntl->SetFailed(ENOMETHOD, "method is NULL"); return cntl->SetFailed(ENOMETHOD, "method is NULL");
} }
......
...@@ -31,7 +31,6 @@ namespace bvar { ...@@ -31,7 +31,6 @@ namespace bvar {
std::string read_command_name(); std::string read_command_name();
} }
namespace brpc { namespace brpc {
DECLARE_uint64(max_body_size); DECLARE_uint64(max_body_size);
......
...@@ -147,21 +147,21 @@ static void* replay_thread(void* arg) { ...@@ -147,21 +147,21 @@ static void* replay_thread(void* arg) {
continue; continue;
} }
brpc::Channel* chan = brpc::Channel* chan =
chan_group->channel(sample->protocol_type()); chan_group->channel(sample->meta.protocol_type());
if (chan == NULL) { if (chan == NULL) {
LOG(ERROR) << "No channel on protocol=" LOG(ERROR) << "No channel on protocol="
<< sample->protocol_type(); << sample->meta.protocol_type();
continue; continue;
} }
brpc::Controller* cntl = new brpc::Controller; brpc::Controller* cntl = new brpc::Controller;
req.Clear(); req.Clear();
cntl->reset_rpc_dump_meta(sample_guard.release()); cntl->reset_sampled_request(sample_guard.release());
if (sample->attachment_size() > 0) { if (sample->meta.attachment_size() > 0) {
sample->request.cutn( sample->request.cutn(
&req.serialized_data(), &req.serialized_data(),
sample->request.size() - sample->attachment_size()); sample->request.size() - sample->meta.attachment_size());
cntl->request_attachment() = sample->request.movable(); cntl->request_attachment() = sample->request.movable();
} else { } else {
req.serialized_data() = sample->request.movable(); req.serialized_data() = sample->request.movable();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment