// Baidu RPC - A framework to host and access services throughout Baidu.
// Copyright (c) 2014 Baidu, Inc.

// Date: Fri May 20 15:52:22 CST 2016

#include <sys/ioctl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <gtest/gtest.h>
#include <gflags/gflags.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#include "butil/time.h"
#include "butil/macros.h"
#include "brpc/socket.h"
#include "brpc/acceptor.h"
#include "brpc/server.h"
#include "brpc/controller.h"
#include "brpc/rtmp.h"
#include "brpc/amf.h"

int main(int argc, char* argv[]) {
    testing::InitGoogleTest(&argc, argv);
    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
    return RUN_ALL_TESTS();
}

class TestRtmpClientStream : public brpc::RtmpClientStream {
public:
    TestRtmpClientStream()
        : _called_on_stop(0)
        , _called_on_first_message(0)
        , _nvideomsg(0)
        , _naudiomsg(0) {
        LOG(INFO) << __FUNCTION__;
    }
    ~TestRtmpClientStream() {
        LOG(INFO) << __FUNCTION__;
        assertions_on_stop();
    }
    void assertions_on_stop() {
        ASSERT_EQ(1, _called_on_stop);
    }
    void assertions_on_successful_play() {
        ASSERT_EQ(1, _called_on_first_message);
        ASSERT_LT(0, _nvideomsg);
        ASSERT_LT(0, _naudiomsg);
    }
    void assertions_on_failure() {
        ASSERT_EQ(0, _called_on_first_message);
        ASSERT_EQ(0, _nvideomsg);
        ASSERT_EQ(0, _naudiomsg);
        assertions_on_stop();
    }
    void OnFirstMessage() {
        ++_called_on_first_message;
    }
    void OnStop() {
        ++_called_on_stop;
    }
    void OnVideoMessage(brpc::RtmpVideoMessage* msg) {
        ++_nvideomsg;
        // video data is ascii in UT, print it out.
        LOG(INFO) << remote_side() << "|stream=" << stream_id()
                  << ": Got " << *msg << " data=" << msg->data;
    }
    void OnAudioMessage(brpc::RtmpAudioMessage* msg) {
        ++_naudiomsg;
        // audio data is ascii in UT, print it out.
        LOG(INFO) << remote_side() << "|stream=" << stream_id()
                  << ": Got " << *msg << " data=" << msg->data;
    }
private:
    int _called_on_stop;
    int _called_on_first_message;
    int _nvideomsg;
    int _naudiomsg;
};

class TestRtmpRetryingClientStream
    : public brpc::RtmpRetryingClientStream {
public:
    TestRtmpRetryingClientStream()
        : _called_on_stop(0)
        , _called_on_first_message(0)
        , _called_on_playable(0) {
        LOG(INFO) << __FUNCTION__;
    }
    ~TestRtmpRetryingClientStream() {
        LOG(INFO) << __FUNCTION__;
        assertions_on_stop();
    }
    void assertions_on_stop() {
        ASSERT_EQ(1, _called_on_stop);
    }
    void OnStop() {
        ++_called_on_stop;
    }
    void OnFirstMessage() {
        ++_called_on_first_message;
    }
    void OnPlayable() {
        ++_called_on_playable;
    }

    void OnVideoMessage(brpc::RtmpVideoMessage* msg) {
        // video data is ascii in UT, print it out.
        LOG(INFO) << remote_side() << "|stream=" << stream_id()
                  << ": Got " << *msg << " data=" << msg->data;
    }
    void OnAudioMessage(brpc::RtmpAudioMessage* msg) {
        // audio data is ascii in UT, print it out.
        LOG(INFO) << remote_side() << "|stream=" << stream_id()
                  << ": Got " << *msg << " data=" << msg->data;
    }
private:
    int _called_on_stop;
    int _called_on_first_message;
    int _called_on_playable;
};

const char* UNEXIST_NAME = "unexist_stream";

class PlayingDummyStream : public brpc::RtmpServerStream {
public:
    enum State {
        STATE_UNPLAYING,
        STATE_PLAYING,
        STATE_STOPPED
    };
    PlayingDummyStream(int64_t sleep_ms)
        : _state(STATE_UNPLAYING), _sleep_ms(sleep_ms) {
        LOG(INFO) << __FUNCTION__ << "(" << this << ")";
    }
    ~PlayingDummyStream() {
        LOG(INFO) << __FUNCTION__ << "(" << this << ")";
    }
    void OnPlay(const brpc::RtmpPlayOptions& opt,
                butil::Status* status,
                google::protobuf::Closure* done) {
        brpc::ClosureGuard done_guard(done);
        LOG(INFO) << remote_side() << "|stream=" << stream_id()
                  << ": Got play{stream_name=" << opt.stream_name
                  << " start=" << opt.start
                  << " duration=" << opt.duration
                  << " reset=" << opt.reset << '}';
        if (opt.stream_name == UNEXIST_NAME) {
            status->set_error(EPERM, "Unexist stream");
            return;
        }
        if (_sleep_ms > 0) {
            LOG(INFO) << "Sleep " << _sleep_ms
                      << " ms before responding play request";
            bthread_usleep(_sleep_ms * 1000L);
        }
        int rc = bthread_start_background(&_play_thread, NULL,
                                          RunSendData, this);
        if (rc) {
            status->set_error(rc, "Fail to create thread");
            return;
        }
        State expected = STATE_UNPLAYING;
        if (!_state.compare_exchange_strong(expected, STATE_PLAYING)) {
            if (expected == STATE_STOPPED) {
                bthread_stop(_play_thread);
                bthread_join(_play_thread, NULL);
            } else {
                CHECK(false) << "Impossible";
            }
        }
    }

    void OnStop() {
        LOG(INFO) << "OnStop of PlayingDummyStream=" << this;
        if (_state.exchange(STATE_STOPPED) == STATE_PLAYING) {
            bthread_stop(_play_thread);
            bthread_join(_play_thread, NULL);
        }
    }

    void SendData();
    
private:
    static void* RunSendData(void* arg) {
        ((PlayingDummyStream*)arg)->SendData();
        return NULL;
    }

    butil::atomic<State> _state;
    bthread_t _play_thread;
    int64_t _sleep_ms;
};

void PlayingDummyStream::SendData() {
    LOG(INFO) << "Enter SendData of PlayingDummyStream=" << this;

    brpc::RtmpVideoMessage vmsg;
    brpc::RtmpAudioMessage amsg;

    vmsg.timestamp = 1000;
    amsg.timestamp = 1000;
    for (int i = 0; !bthread_stopped(bthread_self()); ++i) {
        vmsg.timestamp += 20;
        amsg.timestamp += 20;

        vmsg.frame_type = brpc::FLV_VIDEO_FRAME_KEYFRAME;
        vmsg.codec = brpc::FLV_VIDEO_AVC;
        vmsg.data.clear();
        vmsg.data.append(butil::string_printf("video_%d(ms_id=%u)",
                                             i, stream_id()));
        //failing to send is possible
        SendVideoMessage(vmsg);

        amsg.codec = brpc::FLV_AUDIO_AAC;
        amsg.rate = brpc::FLV_SOUND_RATE_44100HZ;
        amsg.bits = brpc::FLV_SOUND_16BIT;
        amsg.type = brpc::FLV_SOUND_STEREO;
        amsg.data.clear();
        amsg.data.append(butil::string_printf("audio_%d(ms_id=%u)",
                                             i, stream_id()));
        SendAudioMessage(amsg);

        bthread_usleep(1000000);
    }

    LOG(INFO) << "Quit SendData of PlayingDummyStream=" << this;
}

class PlayingDummyService : public brpc::RtmpService {
public:
    PlayingDummyService(int64_t sleep_ms = 0) : _sleep_ms(sleep_ms) {}

private:
    // Called to create a server-side stream.
    virtual brpc::RtmpServerStream* NewStream(
        const brpc::RtmpConnectRequest&) {
        return new PlayingDummyStream(_sleep_ms);
    }
    int64_t _sleep_ms;
};

class PublishStream : public brpc::RtmpServerStream {
public:
    PublishStream(int64_t sleep_ms)
        : _sleep_ms(sleep_ms)
        , _called_on_stop(0)
        , _called_on_first_message(0)
        , _nvideomsg(0)
        , _naudiomsg(0) {
        LOG(INFO) << __FUNCTION__ << "(" << this << ")";
    }
    ~PublishStream() {
        LOG(INFO) << __FUNCTION__ << "(" << this << ")";
        assertions_on_stop();
    }
    void assertions_on_stop() {
        ASSERT_EQ(1, _called_on_stop);
    }
    void OnPublish(const std::string& stream_name,
                   brpc::RtmpPublishType publish_type,
                   butil::Status* status,
                   google::protobuf::Closure* done) {
        brpc::ClosureGuard done_guard(done);
        LOG(INFO) << remote_side() << "|stream=" << stream_id()
                  << ": Got publish{stream_name=" << stream_name
                  << " type=" << brpc::RtmpPublishType2Str(publish_type)
                  << '}';
        if (stream_name == UNEXIST_NAME) {
            status->set_error(EPERM, "Unexist stream");
            return;
        }
        if (_sleep_ms > 0) {
            LOG(INFO) << "Sleep " << _sleep_ms
                      << " ms before responding play request";
            bthread_usleep(_sleep_ms * 1000L);
        }
    }
    void OnFirstMessage() {
        ++_called_on_first_message;
    }
    void OnStop() {
        LOG(INFO) << "OnStop of PublishStream=" << this;
        ++_called_on_stop;
    }
    void OnVideoMessage(brpc::RtmpVideoMessage* msg) {
        ++_nvideomsg;
        // video data is ascii in UT, print it out.
        LOG(INFO) << remote_side() << "|stream=" << stream_id()
                  << ": Got " << *msg << " data=" << msg->data;
    }
    void OnAudioMessage(brpc::RtmpAudioMessage* msg) {
        ++_naudiomsg;
        // audio data is ascii in UT, print it out.
        LOG(INFO) << remote_side() << "|stream=" << stream_id()
                  << ": Got " << *msg << " data=" << msg->data;
    }
private:
    int64_t _sleep_ms;
    int _called_on_stop;
    int _called_on_first_message;
    int _nvideomsg;
    int _naudiomsg;
};

class PublishService : public brpc::RtmpService {
public:
    PublishService(int64_t sleep_ms = 0) : _sleep_ms(sleep_ms) {
        pthread_mutex_init(&_mutex, NULL);
    }
    ~PublishService() {
        pthread_mutex_destroy(&_mutex);
    }
    void move_created_streams(
        std::vector<butil::intrusive_ptr<PublishStream> >* out) {
        out->clear();
        BAIDU_SCOPED_LOCK(_mutex);
        out->swap(_created_streams);
    }

private:
    // Called to create a server-side stream.
    virtual brpc::RtmpServerStream* NewStream(
        const brpc::RtmpConnectRequest&) {
        PublishStream* stream = new PublishStream(_sleep_ms);
        {
            BAIDU_SCOPED_LOCK(_mutex);
            _created_streams.push_back(stream);
        }
        return stream;
    }
    int64_t _sleep_ms;
    pthread_mutex_t _mutex;
    std::vector<butil::intrusive_ptr<PublishStream> > _created_streams;
};

class RtmpSubStream : public brpc::RtmpClientStream {
public:
    explicit RtmpSubStream(brpc::RtmpMessageHandler* mh)
        : _message_handler(mh) {}
    // @RtmpStreamBase
    void OnMetaData(brpc::AMFObject*, const butil::StringPiece&);
    void OnSharedObjectMessage(brpc::RtmpSharedObjectMessage* msg);
    void OnAudioMessage(brpc::RtmpAudioMessage* msg);
    void OnVideoMessage(brpc::RtmpVideoMessage* msg);
    void OnFirstMessage();
    void OnStop();
private:
    std::unique_ptr<brpc::RtmpMessageHandler> _message_handler;
};

void RtmpSubStream::OnFirstMessage() {
    _message_handler->OnPlayable();
}

void RtmpSubStream::OnMetaData(brpc::AMFObject* obj, const butil::StringPiece& name) {
    _message_handler->OnMetaData(obj, name);
}

void RtmpSubStream::OnSharedObjectMessage(brpc::RtmpSharedObjectMessage* msg) {
    _message_handler->OnSharedObjectMessage(msg);
}

void RtmpSubStream::OnAudioMessage(brpc::RtmpAudioMessage* msg) {
    _message_handler->OnAudioMessage(msg);
}

void RtmpSubStream::OnVideoMessage(brpc::RtmpVideoMessage* msg) {
    _message_handler->OnVideoMessage(msg);
}

void RtmpSubStream::OnStop() {
    _message_handler->OnSubStreamStop(this);
}


class RtmpSubStreamCreator : public brpc::SubStreamCreator {
public:
    RtmpSubStreamCreator(const brpc::RtmpClient* client);

    ~RtmpSubStreamCreator();

    // @SubStreamCreator
    void NewSubStream(brpc::RtmpMessageHandler* message_handler,
                      butil::intrusive_ptr<brpc::RtmpStreamBase>* sub_stream);
    void LaunchSubStream(brpc::RtmpStreamBase* sub_stream,
                         brpc::RtmpRetryingClientStreamOptions* options);

private:
    const brpc::RtmpClient* _client;
};

RtmpSubStreamCreator::RtmpSubStreamCreator(const brpc::RtmpClient* client)
    : _client(client) {}

RtmpSubStreamCreator::~RtmpSubStreamCreator() {}
 
void RtmpSubStreamCreator::NewSubStream(brpc::RtmpMessageHandler* message_handler,
                                        butil::intrusive_ptr<brpc::RtmpStreamBase>* sub_stream) {
    if (sub_stream) { 
        (*sub_stream).reset(new RtmpSubStream(message_handler));
    }
    return;
}

void RtmpSubStreamCreator::LaunchSubStream(
    brpc::RtmpStreamBase* sub_stream, 
    brpc::RtmpRetryingClientStreamOptions* options) {
    brpc::RtmpClientStreamOptions client_options = *options;
    dynamic_cast<RtmpSubStream*>(sub_stream)->Init(_client, client_options);
}

TEST(RtmpTest, parse_rtmp_url) {
    butil::StringPiece host;
    butil::StringPiece vhost;
    butil::StringPiece port;
    butil::StringPiece app;
    butil::StringPiece stream_name;

    brpc::ParseRtmpURL("rtmp://HOST/APP/STREAM",
                             &host, &vhost, &port, &app, &stream_name);
    ASSERT_EQ("HOST", host);
    ASSERT_TRUE(vhost.empty());
    ASSERT_EQ("1935", port);
    ASSERT_EQ("APP", app);
    ASSERT_EQ("STREAM", stream_name);

    brpc::ParseRtmpURL("HOST/APP/STREAM",
                             &host, &vhost, &port, &app, &stream_name);
    ASSERT_EQ("HOST", host);
    ASSERT_TRUE(vhost.empty());
    ASSERT_EQ("1935", port);
    ASSERT_EQ("APP", app);
    ASSERT_EQ("STREAM", stream_name);

    brpc::ParseRtmpURL("rtmp://HOST:8765//APP?vhost=abc///STREAM?queries",
                             &host, &vhost, &port, &app, &stream_name);
    ASSERT_EQ("HOST", host);
    ASSERT_EQ("abc", vhost);
    ASSERT_EQ("8765", port);
    ASSERT_EQ("APP", app);
    ASSERT_EQ("STREAM?queries", stream_name);

    brpc::ParseRtmpURL("HOST:8765//APP?vhost=abc///STREAM?queries",
                             &host, &vhost, &port, &app, &stream_name);
    ASSERT_EQ("HOST", host);
    ASSERT_EQ("abc", vhost);
    ASSERT_EQ("8765", port);
    ASSERT_EQ("APP", app);
    ASSERT_EQ("STREAM?queries", stream_name);

    brpc::ParseRtmpURL("HOST:8765//APP?vhost=abc///STREAM?queries/",
                             &host, &vhost, &port, &app, &stream_name);
    ASSERT_EQ("HOST", host);
    ASSERT_EQ("abc", vhost);
    ASSERT_EQ("8765", port);
    ASSERT_EQ("APP", app);
    ASSERT_EQ("STREAM?queries/", stream_name);

    brpc::ParseRtmpURL("HOST:8765/APP?vhost=abc",
                             &host, &vhost, &port, &app, &stream_name);
    ASSERT_EQ("HOST", host);
    ASSERT_EQ("abc", vhost);
    ASSERT_EQ("8765", port);
    ASSERT_EQ("APP", app);
    ASSERT_TRUE(stream_name.empty());
}

TEST(RtmpTest, amf) {
    std::string req_buf;
    brpc::RtmpInfo info;
    brpc::AMFObject obj;
    std::string dummy = "_result";
    {
        google::protobuf::io::StringOutputStream zc_stream(&req_buf);
        brpc::AMFOutputStream ostream(&zc_stream);
        brpc::WriteAMFString(dummy, &ostream);
        brpc::WriteAMFUint32(17, &ostream);
        info.set_code("NetConnection.Connect"); // TODO
        info.set_level("error");
        info.set_description("heheda hello foobar");
        brpc::WriteAMFObject(info, &ostream);
        ASSERT_TRUE(ostream.good());
        obj.SetString("code", "foo");
        obj.SetString("level", "bar");
        obj.SetString("description", "heheda");
        brpc::WriteAMFObject(obj, &ostream);
        ASSERT_TRUE(ostream.good());
    }

    google::protobuf::io::ArrayInputStream zc_stream(req_buf.data(), req_buf.size());
    brpc::AMFInputStream istream(&zc_stream);
    std::string result;
    ASSERT_TRUE(brpc::ReadAMFString(&result, &istream));
    ASSERT_EQ(dummy, result);
    uint32_t num = 0;
    ASSERT_TRUE(brpc::ReadAMFUint32(&num, &istream));
    ASSERT_EQ(17u, num);
    brpc::RtmpInfo info2;
    ASSERT_TRUE(brpc::ReadAMFObject(&info2, &istream));
    ASSERT_EQ(info.code(), info2.code());
    ASSERT_EQ(info.level(), info2.level());
    ASSERT_EQ(info.description(), info2.description());
    brpc::RtmpInfo info3;
    ASSERT_TRUE(brpc::ReadAMFObject(&info3, &istream));
    ASSERT_EQ("foo", info3.code());
    ASSERT_EQ("bar", info3.level());
    ASSERT_EQ("heheda", info3.description());
}

TEST(RtmpTest, successfully_play_streams) {
    PlayingDummyService rtmp_service;
    brpc::Server server;
    brpc::ServerOptions server_opt;
    server_opt.rtmp_service = &rtmp_service;
    ASSERT_EQ(0, server.Start(8571, &server_opt));

    brpc::RtmpClientOptions rtmp_opt;
    rtmp_opt.app = "hello";
    rtmp_opt.swfUrl = "anything";
    rtmp_opt.tcUrl = "rtmp://heheda";
    brpc::RtmpClient rtmp_client;
    ASSERT_EQ(0, rtmp_client.Init("localhost:8571", rtmp_opt));

    // Create multiple streams.
    const int NSTREAM = 2;
    brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM];
    for (int i = 0; i < NSTREAM; ++i) {
        cstreams[i].reset(new TestRtmpClientStream);
        brpc::RtmpClientStreamOptions opt;
        opt.play_name = butil::string_printf("play_name_%d", i);
        //opt.publish_name = butil::string_printf("pub_name_%d", i);
        opt.wait_until_play_or_publish_is_sent = true;
        cstreams[i]->Init(&rtmp_client, opt);
    }
    sleep(5);
    for (int i = 0; i < NSTREAM; ++i) {
        cstreams[i]->assertions_on_successful_play();
    }
    LOG(INFO) << "Quiting program...";
}

TEST(RtmpTest, fail_to_play_streams) {
    PlayingDummyService rtmp_service;
    brpc::Server server;
    brpc::ServerOptions server_opt;
    server_opt.rtmp_service = &rtmp_service;
    ASSERT_EQ(0, server.Start(8571, &server_opt));

    brpc::RtmpClientOptions rtmp_opt;
    rtmp_opt.app = "hello";
    rtmp_opt.swfUrl = "anything";
    rtmp_opt.tcUrl = "rtmp://heheda";
    brpc::RtmpClient rtmp_client;
    ASSERT_EQ(0, rtmp_client.Init("localhost:8571", rtmp_opt));

    // Create multiple streams.
    const int NSTREAM = 2;
    brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM];
    for (int i = 0; i < NSTREAM; ++i) {
        cstreams[i].reset(new TestRtmpClientStream);
        brpc::RtmpClientStreamOptions opt;
        opt.play_name = UNEXIST_NAME;
        opt.wait_until_play_or_publish_is_sent = true;
        cstreams[i]->Init(&rtmp_client, opt);
    }
    sleep(1);
    for (int i = 0; i < NSTREAM; ++i) {
        cstreams[i]->assertions_on_failure();
    }
    LOG(INFO) << "Quiting program...";
}

TEST(RtmpTest, successfully_publish_streams) {
    PublishService rtmp_service;
    brpc::Server server;
    brpc::ServerOptions server_opt;
    server_opt.rtmp_service = &rtmp_service;
    ASSERT_EQ(0, server.Start(8571, &server_opt));

    brpc::RtmpClientOptions rtmp_opt;
    rtmp_opt.app = "hello";
    rtmp_opt.swfUrl = "anything";
    rtmp_opt.tcUrl = "rtmp://heheda";
    brpc::RtmpClient rtmp_client;
    ASSERT_EQ(0, rtmp_client.Init("localhost:8571", rtmp_opt));

    // Create multiple streams.
    const int NSTREAM = 2;
    brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM];
    for (int i = 0; i < NSTREAM; ++i) {
        cstreams[i].reset(new TestRtmpClientStream);
        brpc::RtmpClientStreamOptions opt;
        opt.publish_name = butil::string_printf("pub_name_%d", i);
        opt.wait_until_play_or_publish_is_sent = true;
        cstreams[i]->Init(&rtmp_client, opt);
    }
    const int REP = 5;
    for (int i = 0; i < REP; ++i) {
        brpc::RtmpVideoMessage vmsg;
        vmsg.timestamp = 1000 + i * 20;
        vmsg.frame_type = brpc::FLV_VIDEO_FRAME_KEYFRAME;
        vmsg.codec = brpc::FLV_VIDEO_AVC;
        vmsg.data.append(butil::string_printf("video_%d", i));
        for (int j = 0; j < NSTREAM; j += 2) {
            ASSERT_EQ(0, cstreams[j]->SendVideoMessage(vmsg));
        }
        
        brpc::RtmpAudioMessage amsg;
        amsg.timestamp = 1000 + i * 20;
        amsg.codec = brpc::FLV_AUDIO_AAC;
        amsg.rate = brpc::FLV_SOUND_RATE_44100HZ;
        amsg.bits = brpc::FLV_SOUND_16BIT;
        amsg.type = brpc::FLV_SOUND_STEREO;
        amsg.data.append(butil::string_printf("audio_%d", i));
        for (int j = 1; j < NSTREAM; j += 2) {
            ASSERT_EQ(0, cstreams[j]->SendAudioMessage(amsg));
        }
        
        bthread_usleep(500000);
    }
    std::vector<butil::intrusive_ptr<PublishStream> > created_streams;
    rtmp_service.move_created_streams(&created_streams);
    ASSERT_EQ(NSTREAM, (int)created_streams.size());
    for (int i = 0; i < NSTREAM; ++i) {
        EXPECT_EQ(1, created_streams[i]->_called_on_first_message);
    }
    for (int j = 0; j < NSTREAM; j += 2) {
        ASSERT_EQ(REP, created_streams[j]->_nvideomsg);
    }
    for (int j = 1; j < NSTREAM; j += 2) {
        ASSERT_EQ(REP, created_streams[j]->_naudiomsg);
    }
    LOG(INFO) << "Quiting program...";
}

TEST(RtmpTest, failed_to_publish_streams) {
    PublishService rtmp_service;
    brpc::Server server;
    brpc::ServerOptions server_opt;
    server_opt.rtmp_service = &rtmp_service;
    ASSERT_EQ(0, server.Start(8575, &server_opt));

    brpc::RtmpClientOptions rtmp_opt;
    rtmp_opt.app = "hello";
    rtmp_opt.swfUrl = "anything";
    rtmp_opt.tcUrl = "rtmp://heheda";
    brpc::RtmpClient rtmp_client;
    ASSERT_EQ(0, rtmp_client.Init("localhost:8575", rtmp_opt));

    // Create multiple streams.
    const int NSTREAM = 2;
    brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM];
    for (int i = 0; i < NSTREAM; ++i) {
        cstreams[i].reset(new TestRtmpClientStream);
        brpc::RtmpClientStreamOptions opt;
        opt.publish_name = UNEXIST_NAME;
        opt.wait_until_play_or_publish_is_sent = true;
        cstreams[i]->Init(&rtmp_client, opt);
    }
    sleep(1);
    for (int i = 0; i < NSTREAM; ++i) {
        cstreams[i]->assertions_on_failure();
    }
    std::vector<butil::intrusive_ptr<PublishStream> > created_streams;
    rtmp_service.move_created_streams(&created_streams);
    ASSERT_EQ(NSTREAM, (int)created_streams.size());
    for (int i = 0; i < NSTREAM; ++i) {
        ASSERT_EQ(0, created_streams[i]->_called_on_first_message);
        ASSERT_EQ(0, created_streams[i]->_nvideomsg);
        ASSERT_EQ(0, created_streams[i]->_naudiomsg);
    }
    LOG(INFO) << "Quiting program...";
}

TEST(RtmpTest, failed_to_connect_client_streams) {
    brpc::RtmpClientOptions rtmp_opt;
    rtmp_opt.app = "hello";
    rtmp_opt.swfUrl = "anything";
    rtmp_opt.tcUrl = "rtmp://heheda";
    brpc::RtmpClient rtmp_client;
    ASSERT_EQ(0, rtmp_client.Init("localhost:8572", rtmp_opt));

    // Create multiple streams.
    const int NSTREAM = 2;
    brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM];
    for (int i = 0; i < NSTREAM; ++i) {
        cstreams[i].reset(new TestRtmpClientStream);
        brpc::RtmpClientStreamOptions opt;
        opt.play_name = butil::string_printf("play_name_%d", i);
        opt.wait_until_play_or_publish_is_sent = true;
        cstreams[i]->Init(&rtmp_client, opt);
        cstreams[i]->assertions_on_failure();
    }
    LOG(INFO) << "Quiting program...";
}

TEST(RtmpTest, destroy_client_streams_before_init) {
    brpc::RtmpClientOptions rtmp_opt;
    rtmp_opt.app = "hello";
    rtmp_opt.swfUrl = "anything";
    rtmp_opt.tcUrl = "rtmp://heheda";
    brpc::RtmpClient rtmp_client;
    ASSERT_EQ(0, rtmp_client.Init("localhost:8573", rtmp_opt));

    // Create multiple streams.
    const int NSTREAM = 2;
    butil::intrusive_ptr<TestRtmpClientStream> cstreams[NSTREAM];
    for (int i = 0; i < NSTREAM; ++i) {
        cstreams[i].reset(new TestRtmpClientStream);
        cstreams[i]->Destroy();
        ASSERT_EQ(1, cstreams[i]->_called_on_stop);
        ASSERT_EQ(brpc::RtmpClientStream::STATE_DESTROYING, cstreams[i]->_state);
        brpc::RtmpClientStreamOptions opt;
        opt.play_name = butil::string_printf("play_name_%d", i);
        opt.wait_until_play_or_publish_is_sent = true;
        cstreams[i]->Init(&rtmp_client, opt);
        cstreams[i]->assertions_on_failure();
    }
    LOG(INFO) << "Quiting program...";
}

TEST(RtmpTest, destroy_retrying_client_streams_before_init) {
    brpc::RtmpClientOptions rtmp_opt;
    rtmp_opt.app = "hello";
    rtmp_opt.swfUrl = "anything";
    rtmp_opt.tcUrl = "rtmp://heheda";
    brpc::RtmpClient rtmp_client;
    ASSERT_EQ(0, rtmp_client.Init("localhost:8573", rtmp_opt));

    // Create multiple streams.
    const int NSTREAM = 2;
    butil::intrusive_ptr<TestRtmpRetryingClientStream> cstreams[NSTREAM];
    for (int i = 0; i < NSTREAM; ++i) {
        cstreams[i].reset(new TestRtmpRetryingClientStream);
        cstreams[i]->Destroy();
        ASSERT_EQ(1, cstreams[i]->_called_on_stop);
        brpc::RtmpRetryingClientStreamOptions opt;
        opt.play_name = butil::string_printf("play_name_%d", i);
        brpc::SubStreamCreator* sc = new RtmpSubStreamCreator(&rtmp_client);
        cstreams[i]->Init(sc, opt);
        ASSERT_EQ(1, cstreams[i]->_called_on_stop);
    }
    LOG(INFO) << "Quiting program...";
}

TEST(RtmpTest, destroy_client_streams_during_creation) {
    PlayingDummyService rtmp_service(2000/*sleep 2s*/);
    brpc::Server server;
    brpc::ServerOptions server_opt;
    server_opt.rtmp_service = &rtmp_service;
    ASSERT_EQ(0, server.Start(8574, &server_opt));

    brpc::RtmpClientOptions rtmp_opt;
    rtmp_opt.app = "hello";
    rtmp_opt.swfUrl = "anything";
    rtmp_opt.tcUrl = "rtmp://heheda";
    brpc::RtmpClient rtmp_client;
    ASSERT_EQ(0, rtmp_client.Init("localhost:8574", rtmp_opt));

    // Create multiple streams.
    const int NSTREAM = 2;
    butil::intrusive_ptr<TestRtmpClientStream> cstreams[NSTREAM];
    for (int i = 0; i < NSTREAM; ++i) {
        cstreams[i].reset(new TestRtmpClientStream);
        brpc::RtmpClientStreamOptions opt;
        opt.play_name = butil::string_printf("play_name_%d", i);
        cstreams[i]->Init(&rtmp_client, opt);
        ASSERT_EQ(0, cstreams[i]->_called_on_stop);
        usleep(500*1000);
        ASSERT_EQ(0, cstreams[i]->_called_on_stop);
        cstreams[i]->Destroy();
        usleep(10*1000);
        ASSERT_EQ(1, cstreams[i]->_called_on_stop);
    }
    LOG(INFO) << "Quiting program...";
}

TEST(RtmpTest, destroy_retrying_client_streams_during_creation) {
    PlayingDummyService rtmp_service(2000/*sleep 2s*/);
    brpc::Server server;
    brpc::ServerOptions server_opt;
    server_opt.rtmp_service = &rtmp_service;
    ASSERT_EQ(0, server.Start(8574, &server_opt));

    brpc::RtmpClientOptions rtmp_opt;
    rtmp_opt.app = "hello";
    rtmp_opt.swfUrl = "anything";
    rtmp_opt.tcUrl = "rtmp://heheda";
    brpc::RtmpClient rtmp_client;
    ASSERT_EQ(0, rtmp_client.Init("localhost:8574", rtmp_opt));

    // Create multiple streams.
    const int NSTREAM = 2;
    butil::intrusive_ptr<TestRtmpRetryingClientStream> cstreams[NSTREAM];
    for (int i = 0; i < NSTREAM; ++i) {
        cstreams[i].reset(new TestRtmpRetryingClientStream);
        brpc::RtmpRetryingClientStreamOptions opt;
        opt.play_name = butil::string_printf("play_name_%d", i);
        brpc::SubStreamCreator* sc = new RtmpSubStreamCreator(&rtmp_client);
        cstreams[i]->Init(sc, opt);
        ASSERT_EQ(0, cstreams[i]->_called_on_stop);
        usleep(500*1000);
        ASSERT_EQ(0, cstreams[i]->_called_on_stop);
        cstreams[i]->Destroy();
        usleep(10*1000);
        ASSERT_EQ(1, cstreams[i]->_called_on_stop);
    }
    LOG(INFO) << "Quiting program...";
}

TEST(RtmpTest, retrying_stream) {
    PlayingDummyService rtmp_service;
    brpc::Server server;
    brpc::ServerOptions server_opt;
    server_opt.rtmp_service = &rtmp_service;
    ASSERT_EQ(0, server.Start(8576, &server_opt));

    brpc::RtmpClientOptions rtmp_opt;
    rtmp_opt.app = "hello";
    rtmp_opt.swfUrl = "anything";
    rtmp_opt.tcUrl = "rtmp://heheda";
    brpc::RtmpClient rtmp_client;
    ASSERT_EQ(0, rtmp_client.Init("localhost:8576", rtmp_opt));

    // Create multiple streams.
    const int NSTREAM = 2;
    brpc::DestroyingPtr<TestRtmpRetryingClientStream> cstreams[NSTREAM];
    for (int i = 0; i < NSTREAM; ++i) {
        cstreams[i].reset(new TestRtmpRetryingClientStream);
        brpc::Controller cntl;
        brpc::RtmpRetryingClientStreamOptions opt;
        opt.play_name = butil::string_printf("name_%d", i);
        brpc::SubStreamCreator* sc = new RtmpSubStreamCreator(&rtmp_client);
        cstreams[i]->Init(sc, opt);
    }
    sleep(3);
    LOG(INFO) << "Stopping server";
    server.Stop(0);
    server.Join();
    LOG(INFO) << "Stopped server and sleep for awhile";
    sleep(3);
    ASSERT_EQ(0, server.Start(8576, &server_opt));
    sleep(3);
    for (int i = 0; i < NSTREAM; ++i) {
        ASSERT_EQ(1, cstreams[i]->_called_on_first_message);
        ASSERT_EQ(2, cstreams[i]->_called_on_playable);
    }
    LOG(INFO) << "Quiting program...";
}