// Copyright (c) 2015 Baidu, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef PBRPCPRESS_PBRPC_PRESS_H #define PBRPCPRESS_PBRPC_PRESS_H #include <stdio.h> #include <deque> #include <google/protobuf/compiler/importer.h> #include <google/protobuf/dynamic_message.h> #include <bvar/bvar.h> #include <brpc/channel.h> #include "info_thread.h" #include "pb_util.h" namespace pbrpcframework { class JsonUtil; struct PressOptions { std::string service; //service name (packet.rpcservice) std::string method; //method name (rpc service method) int server_type; // server type: 0 = hulu server, 1 = old pbrpc server, 2 = sofa server double test_req_rate; // 0 = no limit int test_thread_num; std::string input; std::string output; std::string host; // server's ip:port, used by hulu server and sofa server std::string channel; // server's channel, used by old pbrpc server //comcfg::Configure conf; std::string conf_dir; std::string conf_file; std::string connection_type; // connection type 0:SINGLE 1:POOLED 2:SHORT int connect_timeout_ms; // connection timeout in milliseconds int timeout_ms; // RPC timeout in milliseconds int max_retry; // Maximum retry times by RPC framework std::string protocol; int request_compress_type; // Snappy:1 Gzip:2 Zlib:3 LZ4:4 None:0 int response_compress_type; // Snappy:1 Gzip:2 Zlib:3 LZ4:4 None:0 int attachment_size; // Snappy:1 Gzip:2 Zlib:3 LZ4:4 None:0 bool auth;// Enable Giano authentication std::string auth_group; std::string lb_policy; // "rr", "Policy of load balance rr ||random" std::string proto_file; std::string proto_includes; PressOptions() : server_type(0), test_req_rate(0), test_thread_num(1), connect_timeout_ms(1000), timeout_ms(1000), max_retry(3), request_compress_type(0), response_compress_type(0), attachment_size(0), auth(false) {} }; class PressClient { public: PressClient(const PressOptions* options, google::protobuf::compiler::Importer* importer, google::protobuf::DynamicMessageFactory* factory) { _method_descriptor = NULL; _response_prototype = NULL; _options = options; _importer = importer; _factory = factory; } google::protobuf::Message* get_output_message() { return _response_prototype->New(); } int init(); void call_method(brpc::Controller* cntl, google::protobuf::Message* request, google::protobuf::Message* response, google::protobuf::Closure* done); public: brpc::Channel _rpc_client; std::string _attachment; const PressOptions* _options; const google::protobuf::MethodDescriptor* _method_descriptor; const google::protobuf::Message* _response_prototype; google::protobuf::compiler::Importer* _importer; google::protobuf::DynamicMessageFactory* _factory; }; class RpcPress { public: RpcPress(); ~RpcPress(); int init(const PressOptions* options); int start(); int stop(); const PressOptions* options() { return &_options; } private: DISALLOW_COPY_AND_ASSIGN(RpcPress); bool new_pbrpc_press_client_by_client_type(int client_type); void sync_client(); void handle_response(brpc::Controller* cntl, google::protobuf::Message* request, google::protobuf::Message* response, int64_t start_time_ns); static void* sync_call_thread(void* arg); bvar::LatencyRecorder _latency_recorder; bvar::Adder<int64_t> _error_count; bvar::Adder<int64_t> _sent_count; std::deque<google::protobuf::Message*> _msgs; PressClient* _pbrpc_client; PressOptions _options; bool _started; bool _stop; FILE* _output_json; google::protobuf::compiler::Importer* _importer; google::protobuf::DynamicMessageFactory _factory; std::vector<pthread_t> _ttid; brpc::InfoThread _info_thr; }; } #endif // PBRPCPRESS_PBRPC_PRESS_H