// Copyright (c) 2016 Baidu, Inc.
// 
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// 
//     http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Authors: Ge,Jun (gejun@baidu.com)

// Access many http servers in parallel, much faster than curl (even called in batch)

#include <gflags/gflags.h>
#include <deque>
#include <bthread/bthread.h>
#include <butil/logging.h>
#include <butil/files/scoped_file.h>
#include <brpc/channel.h>

DEFINE_string(url_file, "", "The file containing urls to fetch. If this flag is"
              " empty, read urls from stdin");
DEFINE_int32(timeout_ms, 1000, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_int32(thread_num, 8, "Number of threads to access urls");
DEFINE_int32(concurrency, 1000, "Max number of http calls in parallel");
DEFINE_bool(one_line_mode, false, "Output as `URL HTTP-RESPONSE' on true");
DEFINE_bool(only_show_host, false, "Print host name only");

struct AccessThreadArgs {
    const std::deque<std::string>* url_list;
    size_t offset;
    std::deque<std::pair<std::string, butil::IOBuf> > output_queue;
    butil::Mutex output_queue_mutex;
    butil::atomic<int> current_concurrency;
};

class OnHttpCallEnd : public google::protobuf::Closure {
public:
    void Run();
public:
    brpc::Controller cntl;
    AccessThreadArgs* args;
    std::string url;
};

void OnHttpCallEnd::Run() {
    std::unique_ptr<OnHttpCallEnd> delete_self(this);
    {
        BAIDU_SCOPED_LOCK(args->output_queue_mutex);
        if (cntl.Failed()) {
            args->output_queue.push_back(std::make_pair(url, butil::IOBuf()));
        } else {
            args->output_queue.push_back(
                std::make_pair(url, cntl.response_attachment()));
        }
    }
    args->current_concurrency.fetch_sub(1, butil::memory_order_relaxed);
}

void* access_thread(void* void_args) {
    AccessThreadArgs* args = (AccessThreadArgs*)void_args;
    brpc::ChannelOptions options;
    options.protocol = brpc::PROTOCOL_HTTP;
    options.connect_timeout_ms = FLAGS_timeout_ms / 2;
    options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
    options.max_retry = FLAGS_max_retry;
    const int concurrency_for_this_thread = FLAGS_concurrency / FLAGS_thread_num;

    for (size_t i = args->offset; i < args->url_list->size(); i += FLAGS_thread_num) {
        std::string const& url = (*args->url_list)[i];
        brpc::Channel channel;
        if (channel.Init(url.c_str(), &options) != 0) {
            LOG(ERROR) << "Fail to create channel to url=" << url;
            BAIDU_SCOPED_LOCK(args->output_queue_mutex);
            args->output_queue.push_back(std::make_pair(url, butil::IOBuf()));
            continue;
        }
        while (args->current_concurrency.fetch_add(1, butil::memory_order_relaxed)
               > concurrency_for_this_thread) {
            args->current_concurrency.fetch_sub(1, butil::memory_order_relaxed);
            bthread_usleep(5000);
        }
        OnHttpCallEnd* done = new OnHttpCallEnd;
        done->cntl.http_request().uri() = url;
        done->args = args;
        done->url = url;
        channel.CallMethod(NULL, &done->cntl, NULL, NULL, done);
    }
    return NULL;
}

int main(int argc, char** argv) {
    // Parse gflags. We recommend you to use gflags as well.
    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
    
    // if (FLAGS_path.empty() || FLAGS_path[0] != '/') {
    //     FLAGS_path = "/" + FLAGS_path;
    // }

    butil::ScopedFILE fp_guard;
    FILE* fp = NULL;
    if (!FLAGS_url_file.empty()) {
        fp_guard.reset(fopen(FLAGS_url_file.c_str(), "r"));
        if (!fp_guard) {
            PLOG(ERROR) << "Fail to open `" << FLAGS_url_file << '\'';
            return -1;
        }
        fp = fp_guard.get();
    } else {
        fp = stdin;
    }
    char* line_buf = NULL;
    size_t line_len = 0;
    ssize_t nr = 0;
    std::deque<std::string> url_list;
    while ((nr = getline(&line_buf, &line_len, fp)) != -1) {
        if (line_buf[nr - 1] == '\n') { // remove ending newline
            line_buf[nr - 1] = '\0';
            --nr;
        }
        butil::StringPiece line(line_buf, nr);
        line.trim_spaces();
        if (!line.empty()) {
            url_list.push_back(line.as_string());
        }
    }
    if (url_list.empty()) {
        return 0;
    }
    AccessThreadArgs* args = new AccessThreadArgs[FLAGS_thread_num];
    for (int i = 0; i < FLAGS_thread_num; ++i) {
        args[i].url_list = &url_list;
        args[i].offset = i;
        args[i].current_concurrency.store(0, butil::memory_order_relaxed);
    }
    std::vector<bthread_t> tids;
    tids.resize(FLAGS_thread_num);
    for (int i = 0; i < FLAGS_thread_num; ++i) {
        CHECK_EQ(0, bthread_start_background(&tids[i], NULL, access_thread, &args[i]));
    }
    std::deque<std::pair<std::string, butil::IOBuf> > output_queue;
    size_t nprinted = 0;
    while (nprinted != url_list.size()) {
        for (int i = 0; i < FLAGS_thread_num; ++i) {
            {
                BAIDU_SCOPED_LOCK(args[i].output_queue_mutex);
                output_queue.swap(args[i].output_queue);
            }
            for (size_t i = 0; i < output_queue.size(); ++i) {
                butil::StringPiece url = output_queue[i].first;
                butil::StringPiece hostname;
                if (url.starts_with("http://")) {
                    url.remove_prefix(7);
                }
                size_t slash_pos = url.find('/');
                if (slash_pos != butil::StringPiece::npos) {
                    hostname = url.substr(0, slash_pos);
                } else {
                    hostname = url;
                }
                if (FLAGS_one_line_mode) {
                    if (FLAGS_only_show_host) {
                        std::cout << hostname;
                    } else {
                        std::cout << "http://" << url;
                    }
                    if (output_queue[i].second.empty()) {
                        std::cout << " ERROR" << std::endl;
                    } else {
                        std::cout << ' ' << output_queue[i].second << std::endl;
                    }
                } else {
                    // The prefix is unlikely be part of a ordinary http body,
                    // thus the line can be easily removed by shell utilities.
                    std::cout << "#### ";
                    if (FLAGS_only_show_host) {
                        std::cout << hostname;
                    } else {
                        std::cout << "http://" << url;
                    }
                    if (output_queue[i].second.empty()) {
                        std::cout << " ERROR" << std::endl;
                    } else {
                        std::cout << '\n' << output_queue[i].second << std::endl;
                    }
                }
            }
            nprinted += output_queue.size();
            output_queue.clear();
        }
        usleep(10000);
    }

    for (int i = 0; i < FLAGS_thread_num; ++i) {
        bthread_join(tids[i], NULL);
    }
    for (int i = 0; i < FLAGS_thread_num; ++i) {
        while (args[i].current_concurrency.load(butil::memory_order_relaxed) != 0) {
            usleep(10000);
        }
    }
    return 0;
}