// Copyright (c) 2014 Baidu, Inc.
// 
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// 
//     http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Authors: Ge,Jun (gejun@baidu.com)

// A server to receive TrackMeRequest and send back TrackMeResponse.

#include <gflags/gflags.h>
#include <memory>
#include <butil/logging.h>
#include <brpc/server.h>
#include <butil/files/file_watcher.h>
#include <butil/files/scoped_file.h>
#include <brpc/trackme.pb.h>

DEFINE_string(bug_file, "./bugs", "A file containing revision and information of bugs");
DEFINE_int32(port, 8877, "TCP Port of this server");
DEFINE_int32(reporting_interval, 300, "Reporting interval of clients");

struct RevisionInfo {
    int64_t min_rev;
    int64_t max_rev;
    brpc::TrackMeSeverity severity;
    std::string error_text;
};

// Load bugs from a file periodically.
class BugsLoader {
public:
    BugsLoader();
    bool start(const std::string& bugs_file);
    void stop();
    bool find(int64_t revision, brpc::TrackMeResponse* response);
private:
    void load_bugs();
    void run();
    static void* run_this(void* arg);
    typedef std::vector<RevisionInfo> BugList;
    std::string _bugs_file;
    bool _started;
    bool _stop;
    pthread_t _tid;
    std::shared_ptr<BugList> _bug_list;
};

class TrackMeServiceImpl : public brpc::TrackMeService {
public:
    explicit TrackMeServiceImpl(BugsLoader* bugs) : _bugs(bugs) {
    }
    ~TrackMeServiceImpl() {};
    void TrackMe(google::protobuf::RpcController* cntl_base,
                 const brpc::TrackMeRequest* request,
                 brpc::TrackMeResponse* response,
                 google::protobuf::Closure* done) {
        brpc::ClosureGuard done_guard(done);
        brpc::Controller* cntl = (brpc::Controller*)cntl_base;
        // Set to OK by default.
        response->set_severity(brpc::TrackMeOK);
        // Check if the version is affected by bugs if client set it.
        if (request->has_rpc_version()) {
            _bugs->find(request->rpc_version(), response);
        } 
        response->set_new_interval(FLAGS_reporting_interval);
        butil::EndPoint server_addr;
        CHECK_EQ(0, butil::str2endpoint(request->server_addr().c_str(), &server_addr));
        // NOTE(gejun): The ip reported is inaccessible in many cases, use 
        // remote_side instead right now.
        server_addr.ip = cntl->remote_side().ip;
        LOG(INFO) << "Pinged by " << server_addr << " (r"
                  << request->rpc_version() << ")";
    }

private:
    BugsLoader* _bugs;
};

int main(int argc, char* argv[]) {
    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
    
    brpc::Server server;
    server.set_version("trackme_server");
    BugsLoader bugs;
    if (!bugs.start(FLAGS_bug_file)) {
        LOG(ERROR) << "Fail to start BugsLoader";
        return -1;
    }
    TrackMeServiceImpl echo_service_impl(&bugs);
    if (server.AddService(&echo_service_impl, 
                          brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
        LOG(ERROR) << "Fail to add service";
        return -1;
    }
    brpc::ServerOptions options;
    // I've noticed that many connections do not report. Don't know the
    // root cause yet. Set the idle_time to keep connections clean.
    options.idle_timeout_sec = FLAGS_reporting_interval * 2;
    if (server.Start(FLAGS_port, &options) != 0) {
        LOG(ERROR) << "Fail to start TrackMeServer";
        return -1;
    }
    server.RunUntilAskedToQuit();
    bugs.stop();
    return 0;
}

BugsLoader::BugsLoader()
    : _started(false)
    , _stop(false)
    , _tid(0)
{}

bool BugsLoader::start(const std::string& bugs_file) {
    _bugs_file = bugs_file;
    if (pthread_create(&_tid, NULL, run_this, this) != 0) {
        LOG(ERROR) << "Fail to create loading thread";
        return false;
    }
    _started = true;
    return true;
}

void BugsLoader::stop() {
    if (!_started) {
        return;
    }
    _stop = true;
    pthread_join(_tid, NULL);
}

void* BugsLoader::run_this(void* arg) {
    ((BugsLoader*)arg)->run();
    return NULL;
}

void BugsLoader::run() {
    // Check status of _bugs_files periodically.
    butil::FileWatcher fw;
    if (fw.init(_bugs_file.c_str()) < 0) {
        LOG(ERROR) << "Fail to init FileWatcher on `" << _bugs_file << "'";
        return;
    }
    while (!_stop) {
        load_bugs();
        while (!_stop) {
            butil::FileWatcher::Change change = fw.check_and_consume();
            if (change > 0) {
                break;
            }
            if (change < 0) {
                LOG(ERROR) << "`" << _bugs_file << "' was deleted";
            }
            sleep(1);
        }
    }

}

void BugsLoader::load_bugs() {
    butil::ScopedFILE fp(fopen(_bugs_file.c_str(), "r"));
    if (!fp) {
        PLOG(WARNING) << "Fail to open `" << _bugs_file << '\'';
        return;
    }

    char* line = NULL;
    size_t line_len = 0;
    ssize_t nr = 0;
    int nline = 0;
    std::unique_ptr<BugList> m(new BugList);
    while ((nr = getline(&line, &line_len, fp.get())) != -1) {
        ++nline;
        if (line[nr - 1] == '\n') { // remove ending newline
            --nr;
        }
        // line format: 
        //   min_rev <sp> max_rev <sp> severity <sp> description
        butil::StringMultiSplitter sp(line, line + nr, " \t");
        if (!sp) {
            continue;
        }
        long long min_rev;
        if (sp.to_longlong(&min_rev)) {
            LOG(WARNING) << "[line" << nline << "] Fail to parse column1 as min_rev";
            continue;
        }
        ++sp;
        long long max_rev;
        if (!sp || sp.to_longlong(&max_rev)) {
            LOG(WARNING) << "[line" << nline << "] Fail to parse column2 as max_rev";
            continue;
        }
        if (max_rev < min_rev) {
            LOG(WARNING) << "[line" << nline << "] max_rev=" << max_rev
                         << " is less than min_rev=" << min_rev;
            continue;
        }
        ++sp;
        if (!sp) {
            LOG(WARNING) << "[line" << nline << "] Fail to parse column3 as severity";
            continue;
        }
        brpc::TrackMeSeverity severity = brpc::TrackMeOK;
        butil::StringPiece severity_str(sp.field(), sp.length());
        if (severity_str == "f" || severity_str == "F") {
            severity = brpc::TrackMeFatal;
        } else if (severity_str == "w" || severity_str == "W") {\
            severity = brpc::TrackMeWarning;
        } else {
            LOG(WARNING) << "[line" << nline << "] Invalid severity=" << severity_str;
            continue;
        }
        ++sp;
        if (!sp) {
            LOG(WARNING) << "[line" << nline << "] Fail to parse column4 as string";
            continue;
        }
        // Treat everything until end of the line as description. So don't add 
        // comments starting with # or //, they are not recognized.
        butil::StringPiece description(sp.field(), line + nr - sp.field());
        RevisionInfo info;
        info.min_rev = min_rev;
        info.max_rev = max_rev;
        info.severity = severity;
        info.error_text.assign(description.data(), description.size());
        m->push_back(info);
    }
    LOG(INFO) << "Loaded " << m->size() << " bugs";
    free(line);
    // Just reseting the shared_ptr. Previous BugList will be destroyed when
    // no threads reference it.
    _bug_list.reset(m.release());
}

bool BugsLoader::find(int64_t revision, brpc::TrackMeResponse* response) {
    // Add reference to make sure the bug list is not deleted.
    std::shared_ptr<BugList> local_list = _bug_list;
    if (local_list.get() == NULL) {
        return false;
    }
    // Reading the list in this function is always safe because a BugList
    // is never changed after creation.
    bool found = false;
    for (size_t i = 0; i < local_list->size(); ++i) {
        const RevisionInfo & info = (*local_list)[i];
        if (info.min_rev <= revision && revision <= info.max_rev) {
            found = true;
            if (info.severity > response->severity()) {
                response->set_severity(info.severity);
            }
            if (info.severity != brpc::TrackMeOK) {
                std::string* error = response->mutable_error_text();
                char prefix[64];
                if (info.min_rev != info.max_rev) {
                    snprintf(prefix, sizeof(prefix), "[r%lld-r%lld] ",
                             (long long)info.min_rev, (long long)info.max_rev);
                } else {
                    snprintf(prefix, sizeof(prefix), "[r%lld] ",
                             (long long)info.min_rev);
                }
                error->append(prefix);
                error->append(info.error_text);
                error->append("; ");
            }
        }
    }
    return found;
}