// Copyright (c) 2016 Baidu, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Authors: Ge,Jun (gejun@baidu.com) // Access many http servers in parallel, much faster than curl (even called in batch) #include <gflags/gflags.h> #include <deque> #include <bthread/bthread.h> #include <butil/logging.h> #include <butil/files/scoped_file.h> #include <brpc/channel.h> DEFINE_string(url_file, "", "The file containing urls to fetch. If this flag is" " empty, read urls from stdin"); DEFINE_int32(timeout_ms, 1000, "RPC timeout in milliseconds"); DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); DEFINE_int32(thread_num, 8, "Number of threads to access urls"); DEFINE_int32(concurrency, 1000, "Max number of http calls in parallel"); DEFINE_bool(one_line_mode, false, "Output as `URL HTTP-RESPONSE' on true"); DEFINE_bool(only_show_host, false, "Print host name only"); struct AccessThreadArgs { const std::deque<std::string>* url_list; size_t offset; std::deque<std::pair<std::string, butil::IOBuf> > output_queue; butil::Mutex output_queue_mutex; butil::atomic<int> current_concurrency; }; class OnHttpCallEnd : public google::protobuf::Closure { public: void Run(); public: brpc::Controller cntl; AccessThreadArgs* args; std::string url; }; void OnHttpCallEnd::Run() { std::unique_ptr<OnHttpCallEnd> delete_self(this); { BAIDU_SCOPED_LOCK(args->output_queue_mutex); if (cntl.Failed()) { args->output_queue.push_back(std::make_pair(url, butil::IOBuf())); } else { args->output_queue.push_back( std::make_pair(url, cntl.response_attachment())); } } args->current_concurrency.fetch_sub(1, butil::memory_order_relaxed); } void* access_thread(void* void_args) { AccessThreadArgs* args = (AccessThreadArgs*)void_args; brpc::ChannelOptions options; options.protocol = brpc::PROTOCOL_HTTP; options.connect_timeout_ms = FLAGS_timeout_ms / 2; options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/; options.max_retry = FLAGS_max_retry; const int concurrency_for_this_thread = FLAGS_concurrency / FLAGS_thread_num; for (size_t i = args->offset; i < args->url_list->size(); i += FLAGS_thread_num) { std::string const& url = (*args->url_list)[i]; brpc::Channel channel; if (channel.Init(url.c_str(), &options) != 0) { LOG(ERROR) << "Fail to create channel to url=" << url; BAIDU_SCOPED_LOCK(args->output_queue_mutex); args->output_queue.push_back(std::make_pair(url, butil::IOBuf())); continue; } while (args->current_concurrency.fetch_add(1, butil::memory_order_relaxed) > concurrency_for_this_thread) { args->current_concurrency.fetch_sub(1, butil::memory_order_relaxed); bthread_usleep(5000); } OnHttpCallEnd* done = new OnHttpCallEnd; done->cntl.http_request().uri() = url; done->args = args; done->url = url; channel.CallMethod(NULL, &done->cntl, NULL, NULL, done); } return NULL; } int main(int argc, char** argv) { // Parse gflags. We recommend you to use gflags as well. GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); // if (FLAGS_path.empty() || FLAGS_path[0] != '/') { // FLAGS_path = "/" + FLAGS_path; // } butil::ScopedFILE fp_guard; FILE* fp = NULL; if (!FLAGS_url_file.empty()) { fp_guard.reset(fopen(FLAGS_url_file.c_str(), "r")); if (!fp_guard) { PLOG(ERROR) << "Fail to open `" << FLAGS_url_file << '\''; return -1; } fp = fp_guard.get(); } else { fp = stdin; } char* line_buf = NULL; size_t line_len = 0; ssize_t nr = 0; std::deque<std::string> url_list; while ((nr = getline(&line_buf, &line_len, fp)) != -1) { if (line_buf[nr - 1] == '\n') { // remove ending newline line_buf[nr - 1] = '\0'; --nr; } butil::StringPiece line(line_buf, nr); line.trim_spaces(); if (!line.empty()) { url_list.push_back(line.as_string()); } } if (url_list.empty()) { return 0; } AccessThreadArgs* args = new AccessThreadArgs[FLAGS_thread_num]; for (int i = 0; i < FLAGS_thread_num; ++i) { args[i].url_list = &url_list; args[i].offset = i; args[i].current_concurrency.store(0, butil::memory_order_relaxed); } std::vector<bthread_t> tids; tids.resize(FLAGS_thread_num); for (int i = 0; i < FLAGS_thread_num; ++i) { CHECK_EQ(0, bthread_start_background(&tids[i], NULL, access_thread, &args[i])); } std::deque<std::pair<std::string, butil::IOBuf> > output_queue; size_t nprinted = 0; while (nprinted != url_list.size()) { for (int i = 0; i < FLAGS_thread_num; ++i) { { BAIDU_SCOPED_LOCK(args[i].output_queue_mutex); output_queue.swap(args[i].output_queue); } for (size_t i = 0; i < output_queue.size(); ++i) { butil::StringPiece url = output_queue[i].first; butil::StringPiece hostname; if (url.starts_with("http://")) { url.remove_prefix(7); } size_t slash_pos = url.find('/'); if (slash_pos != butil::StringPiece::npos) { hostname = url.substr(0, slash_pos); } else { hostname = url; } if (FLAGS_one_line_mode) { if (FLAGS_only_show_host) { std::cout << hostname; } else { std::cout << "http://" << url; } if (output_queue[i].second.empty()) { std::cout << " ERROR" << std::endl; } else { std::cout << ' ' << output_queue[i].second << std::endl; } } else { // The prefix is unlikely be part of a ordinary http body, // thus the line can be easily removed by shell utilities. std::cout << "#### "; if (FLAGS_only_show_host) { std::cout << hostname; } else { std::cout << "http://" << url; } if (output_queue[i].second.empty()) { std::cout << " ERROR" << std::endl; } else { std::cout << '\n' << output_queue[i].second << std::endl; } } } nprinted += output_queue.size(); output_queue.clear(); } usleep(10000); } for (int i = 0; i < FLAGS_thread_num; ++i) { bthread_join(tids[i], NULL); } for (int i = 0; i < FLAGS_thread_num; ++i) { while (args[i].current_concurrency.load(butil::memory_order_relaxed) != 0) { usleep(10000); } } return 0; }