// Baidu RPC - A framework to host and access services throughout Baidu. // Copyright (c) 2014 Baidu, Inc. // Date: Fri May 20 15:52:22 CST 2016 #include <sys/ioctl.h> #include <sys/types.h> #include <sys/socket.h> #include <gtest/gtest.h> #include <gflags/gflags.h> #include <google/protobuf/descriptor.h> #include <google/protobuf/io/zero_copy_stream_impl_lite.h> #include "butil/time.h" #include "butil/macros.h" #include "brpc/socket.h" #include "brpc/acceptor.h" #include "brpc/server.h" #include "brpc/controller.h" #include "brpc/rtmp.h" #include "brpc/amf.h" int main(int argc, char* argv[]) { testing::InitGoogleTest(&argc, argv); GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); return RUN_ALL_TESTS(); } class TestRtmpClientStream : public brpc::RtmpClientStream { public: TestRtmpClientStream() : _called_on_stop(0) , _called_on_first_message(0) , _nvideomsg(0) , _naudiomsg(0) { LOG(INFO) << __FUNCTION__; } ~TestRtmpClientStream() { LOG(INFO) << __FUNCTION__; assertions_on_stop(); } void assertions_on_stop() { ASSERT_EQ(1, _called_on_stop); } void assertions_on_successful_play() { ASSERT_EQ(1, _called_on_first_message); ASSERT_LT(0, _nvideomsg); ASSERT_LT(0, _naudiomsg); } void assertions_on_failure() { ASSERT_EQ(0, _called_on_first_message); ASSERT_EQ(0, _nvideomsg); ASSERT_EQ(0, _naudiomsg); assertions_on_stop(); } void OnFirstMessage() { ++_called_on_first_message; } void OnStop() { ++_called_on_stop; } void OnVideoMessage(brpc::RtmpVideoMessage* msg) { ++_nvideomsg; // video data is ascii in UT, print it out. LOG(INFO) << remote_side() << "|stream=" << stream_id() << ": Got " << *msg << " data=" << msg->data; } void OnAudioMessage(brpc::RtmpAudioMessage* msg) { ++_naudiomsg; // audio data is ascii in UT, print it out. LOG(INFO) << remote_side() << "|stream=" << stream_id() << ": Got " << *msg << " data=" << msg->data; } private: int _called_on_stop; int _called_on_first_message; int _nvideomsg; int _naudiomsg; }; class TestRtmpRetryingClientStream : public brpc::RtmpRetryingClientStream { public: TestRtmpRetryingClientStream() : _called_on_stop(0) , _called_on_first_message(0) , _called_on_playable(0) { LOG(INFO) << __FUNCTION__; } ~TestRtmpRetryingClientStream() { LOG(INFO) << __FUNCTION__; assertions_on_stop(); } void assertions_on_stop() { ASSERT_EQ(1, _called_on_stop); } void OnStop() { ++_called_on_stop; } void OnFirstMessage() { ++_called_on_first_message; } void OnPlayable() { ++_called_on_playable; } void OnVideoMessage(brpc::RtmpVideoMessage* msg) { // video data is ascii in UT, print it out. LOG(INFO) << remote_side() << "|stream=" << stream_id() << ": Got " << *msg << " data=" << msg->data; } void OnAudioMessage(brpc::RtmpAudioMessage* msg) { // audio data is ascii in UT, print it out. LOG(INFO) << remote_side() << "|stream=" << stream_id() << ": Got " << *msg << " data=" << msg->data; } private: int _called_on_stop; int _called_on_first_message; int _called_on_playable; }; const char* UNEXIST_NAME = "unexist_stream"; class PlayingDummyStream : public brpc::RtmpServerStream { public: enum State { STATE_UNPLAYING, STATE_PLAYING, STATE_STOPPED }; PlayingDummyStream(int64_t sleep_ms) : _state(STATE_UNPLAYING), _sleep_ms(sleep_ms) { LOG(INFO) << __FUNCTION__ << "(" << this << ")"; } ~PlayingDummyStream() { LOG(INFO) << __FUNCTION__ << "(" << this << ")"; } void OnPlay(const brpc::RtmpPlayOptions& opt, butil::Status* status, google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); LOG(INFO) << remote_side() << "|stream=" << stream_id() << ": Got play{stream_name=" << opt.stream_name << " start=" << opt.start << " duration=" << opt.duration << " reset=" << opt.reset << '}'; if (opt.stream_name == UNEXIST_NAME) { status->set_error(EPERM, "Unexist stream"); return; } if (_sleep_ms > 0) { LOG(INFO) << "Sleep " << _sleep_ms << " ms before responding play request"; bthread_usleep(_sleep_ms * 1000L); } int rc = bthread_start_background(&_play_thread, NULL, RunSendData, this); if (rc) { status->set_error(rc, "Fail to create thread"); return; } State expected = STATE_UNPLAYING; if (!_state.compare_exchange_strong(expected, STATE_PLAYING)) { if (expected == STATE_STOPPED) { bthread_stop(_play_thread); bthread_join(_play_thread, NULL); } else { CHECK(false) << "Impossible"; } } } void OnStop() { LOG(INFO) << "OnStop of PlayingDummyStream=" << this; if (_state.exchange(STATE_STOPPED) == STATE_PLAYING) { bthread_stop(_play_thread); bthread_join(_play_thread, NULL); } } void SendData(); private: static void* RunSendData(void* arg) { ((PlayingDummyStream*)arg)->SendData(); return NULL; } butil::atomic<State> _state; bthread_t _play_thread; int64_t _sleep_ms; }; void PlayingDummyStream::SendData() { LOG(INFO) << "Enter SendData of PlayingDummyStream=" << this; brpc::RtmpVideoMessage vmsg; brpc::RtmpAudioMessage amsg; vmsg.timestamp = 1000; amsg.timestamp = 1000; for (int i = 0; !bthread_stopped(bthread_self()); ++i) { vmsg.timestamp += 20; amsg.timestamp += 20; vmsg.frame_type = brpc::FLV_VIDEO_FRAME_KEYFRAME; vmsg.codec = brpc::FLV_VIDEO_AVC; vmsg.data.clear(); vmsg.data.append(butil::string_printf("video_%d(ms_id=%u)", i, stream_id())); //failing to send is possible SendVideoMessage(vmsg); amsg.codec = brpc::FLV_AUDIO_AAC; amsg.rate = brpc::FLV_SOUND_RATE_44100HZ; amsg.bits = brpc::FLV_SOUND_16BIT; amsg.type = brpc::FLV_SOUND_STEREO; amsg.data.clear(); amsg.data.append(butil::string_printf("audio_%d(ms_id=%u)", i, stream_id())); SendAudioMessage(amsg); bthread_usleep(1000000); } LOG(INFO) << "Quit SendData of PlayingDummyStream=" << this; } class PlayingDummyService : public brpc::RtmpService { public: PlayingDummyService(int64_t sleep_ms = 0) : _sleep_ms(sleep_ms) {} private: // Called to create a server-side stream. virtual brpc::RtmpServerStream* NewStream( const brpc::RtmpConnectRequest&) { return new PlayingDummyStream(_sleep_ms); } int64_t _sleep_ms; }; class PublishStream : public brpc::RtmpServerStream { public: PublishStream(int64_t sleep_ms) : _sleep_ms(sleep_ms) , _called_on_stop(0) , _called_on_first_message(0) , _nvideomsg(0) , _naudiomsg(0) { LOG(INFO) << __FUNCTION__ << "(" << this << ")"; } ~PublishStream() { LOG(INFO) << __FUNCTION__ << "(" << this << ")"; assertions_on_stop(); } void assertions_on_stop() { ASSERT_EQ(1, _called_on_stop); } void OnPublish(const std::string& stream_name, brpc::RtmpPublishType publish_type, butil::Status* status, google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); LOG(INFO) << remote_side() << "|stream=" << stream_id() << ": Got publish{stream_name=" << stream_name << " type=" << brpc::RtmpPublishType2Str(publish_type) << '}'; if (stream_name == UNEXIST_NAME) { status->set_error(EPERM, "Unexist stream"); return; } if (_sleep_ms > 0) { LOG(INFO) << "Sleep " << _sleep_ms << " ms before responding play request"; bthread_usleep(_sleep_ms * 1000L); } } void OnFirstMessage() { ++_called_on_first_message; } void OnStop() { LOG(INFO) << "OnStop of PublishStream=" << this; ++_called_on_stop; } void OnVideoMessage(brpc::RtmpVideoMessage* msg) { ++_nvideomsg; // video data is ascii in UT, print it out. LOG(INFO) << remote_side() << "|stream=" << stream_id() << ": Got " << *msg << " data=" << msg->data; } void OnAudioMessage(brpc::RtmpAudioMessage* msg) { ++_naudiomsg; // audio data is ascii in UT, print it out. LOG(INFO) << remote_side() << "|stream=" << stream_id() << ": Got " << *msg << " data=" << msg->data; } private: int64_t _sleep_ms; int _called_on_stop; int _called_on_first_message; int _nvideomsg; int _naudiomsg; }; class PublishService : public brpc::RtmpService { public: PublishService(int64_t sleep_ms = 0) : _sleep_ms(sleep_ms) { pthread_mutex_init(&_mutex, NULL); } ~PublishService() { pthread_mutex_destroy(&_mutex); } void move_created_streams( std::vector<butil::intrusive_ptr<PublishStream> >* out) { out->clear(); BAIDU_SCOPED_LOCK(_mutex); out->swap(_created_streams); } private: // Called to create a server-side stream. virtual brpc::RtmpServerStream* NewStream( const brpc::RtmpConnectRequest&) { PublishStream* stream = new PublishStream(_sleep_ms); { BAIDU_SCOPED_LOCK(_mutex); _created_streams.push_back(stream); } return stream; } int64_t _sleep_ms; pthread_mutex_t _mutex; std::vector<butil::intrusive_ptr<PublishStream> > _created_streams; }; class RtmpSubStream : public brpc::RtmpClientStream { public: explicit RtmpSubStream(brpc::RtmpMessageHandler* mh) : _message_handler(mh) {} // @RtmpStreamBase void OnMetaData(brpc::AMFObject*, const butil::StringPiece&); void OnSharedObjectMessage(brpc::RtmpSharedObjectMessage* msg); void OnAudioMessage(brpc::RtmpAudioMessage* msg); void OnVideoMessage(brpc::RtmpVideoMessage* msg); void OnFirstMessage(); void OnStop(); private: std::unique_ptr<brpc::RtmpMessageHandler> _message_handler; }; void RtmpSubStream::OnFirstMessage() { _message_handler->OnPlayable(); } void RtmpSubStream::OnMetaData(brpc::AMFObject* obj, const butil::StringPiece& name) { _message_handler->OnMetaData(obj, name); } void RtmpSubStream::OnSharedObjectMessage(brpc::RtmpSharedObjectMessage* msg) { _message_handler->OnSharedObjectMessage(msg); } void RtmpSubStream::OnAudioMessage(brpc::RtmpAudioMessage* msg) { _message_handler->OnAudioMessage(msg); } void RtmpSubStream::OnVideoMessage(brpc::RtmpVideoMessage* msg) { _message_handler->OnVideoMessage(msg); } void RtmpSubStream::OnStop() { _message_handler->OnSubStreamStop(this); } class RtmpSubStreamCreator : public brpc::SubStreamCreator { public: RtmpSubStreamCreator(const brpc::RtmpClient* client); ~RtmpSubStreamCreator(); // @SubStreamCreator void NewSubStream(brpc::RtmpMessageHandler* message_handler, butil::intrusive_ptr<brpc::RtmpStreamBase>* sub_stream); void LaunchSubStream(brpc::RtmpStreamBase* sub_stream, brpc::RtmpRetryingClientStreamOptions* options); private: const brpc::RtmpClient* _client; }; RtmpSubStreamCreator::RtmpSubStreamCreator(const brpc::RtmpClient* client) : _client(client) {} RtmpSubStreamCreator::~RtmpSubStreamCreator() {} void RtmpSubStreamCreator::NewSubStream(brpc::RtmpMessageHandler* message_handler, butil::intrusive_ptr<brpc::RtmpStreamBase>* sub_stream) { if (sub_stream) { (*sub_stream).reset(new RtmpSubStream(message_handler)); } return; } void RtmpSubStreamCreator::LaunchSubStream( brpc::RtmpStreamBase* sub_stream, brpc::RtmpRetryingClientStreamOptions* options) { brpc::RtmpClientStreamOptions client_options = *options; dynamic_cast<RtmpSubStream*>(sub_stream)->Init(_client, client_options); } TEST(RtmpTest, parse_rtmp_url) { butil::StringPiece host; butil::StringPiece vhost; butil::StringPiece port; butil::StringPiece app; butil::StringPiece stream_name; brpc::ParseRtmpURL("rtmp://HOST/APP/STREAM", &host, &vhost, &port, &app, &stream_name); ASSERT_EQ("HOST", host); ASSERT_TRUE(vhost.empty()); ASSERT_EQ("1935", port); ASSERT_EQ("APP", app); ASSERT_EQ("STREAM", stream_name); brpc::ParseRtmpURL("HOST/APP/STREAM", &host, &vhost, &port, &app, &stream_name); ASSERT_EQ("HOST", host); ASSERT_TRUE(vhost.empty()); ASSERT_EQ("1935", port); ASSERT_EQ("APP", app); ASSERT_EQ("STREAM", stream_name); brpc::ParseRtmpURL("rtmp://HOST:8765//APP?vhost=abc///STREAM?queries", &host, &vhost, &port, &app, &stream_name); ASSERT_EQ("HOST", host); ASSERT_EQ("abc", vhost); ASSERT_EQ("8765", port); ASSERT_EQ("APP", app); ASSERT_EQ("STREAM?queries", stream_name); brpc::ParseRtmpURL("HOST:8765//APP?vhost=abc///STREAM?queries", &host, &vhost, &port, &app, &stream_name); ASSERT_EQ("HOST", host); ASSERT_EQ("abc", vhost); ASSERT_EQ("8765", port); ASSERT_EQ("APP", app); ASSERT_EQ("STREAM?queries", stream_name); brpc::ParseRtmpURL("HOST:8765//APP?vhost=abc///STREAM?queries/", &host, &vhost, &port, &app, &stream_name); ASSERT_EQ("HOST", host); ASSERT_EQ("abc", vhost); ASSERT_EQ("8765", port); ASSERT_EQ("APP", app); ASSERT_EQ("STREAM?queries/", stream_name); brpc::ParseRtmpURL("HOST:8765/APP?vhost=abc", &host, &vhost, &port, &app, &stream_name); ASSERT_EQ("HOST", host); ASSERT_EQ("abc", vhost); ASSERT_EQ("8765", port); ASSERT_EQ("APP", app); ASSERT_TRUE(stream_name.empty()); } TEST(RtmpTest, amf) { std::string req_buf; brpc::RtmpInfo info; brpc::AMFObject obj; std::string dummy = "_result"; { google::protobuf::io::StringOutputStream zc_stream(&req_buf); brpc::AMFOutputStream ostream(&zc_stream); brpc::WriteAMFString(dummy, &ostream); brpc::WriteAMFUint32(17, &ostream); info.set_code("NetConnection.Connect"); // TODO info.set_level("error"); info.set_description("heheda hello foobar"); brpc::WriteAMFObject(info, &ostream); ASSERT_TRUE(ostream.good()); obj.SetString("code", "foo"); obj.SetString("level", "bar"); obj.SetString("description", "heheda"); brpc::WriteAMFObject(obj, &ostream); ASSERT_TRUE(ostream.good()); } google::protobuf::io::ArrayInputStream zc_stream(req_buf.data(), req_buf.size()); brpc::AMFInputStream istream(&zc_stream); std::string result; ASSERT_TRUE(brpc::ReadAMFString(&result, &istream)); ASSERT_EQ(dummy, result); uint32_t num = 0; ASSERT_TRUE(brpc::ReadAMFUint32(&num, &istream)); ASSERT_EQ(17u, num); brpc::RtmpInfo info2; ASSERT_TRUE(brpc::ReadAMFObject(&info2, &istream)); ASSERT_EQ(info.code(), info2.code()); ASSERT_EQ(info.level(), info2.level()); ASSERT_EQ(info.description(), info2.description()); brpc::RtmpInfo info3; ASSERT_TRUE(brpc::ReadAMFObject(&info3, &istream)); ASSERT_EQ("foo", info3.code()); ASSERT_EQ("bar", info3.level()); ASSERT_EQ("heheda", info3.description()); } TEST(RtmpTest, successfully_play_streams) { PlayingDummyService rtmp_service; brpc::Server server; brpc::ServerOptions server_opt; server_opt.rtmp_service = &rtmp_service; ASSERT_EQ(0, server.Start(8571, &server_opt)); brpc::RtmpClientOptions rtmp_opt; rtmp_opt.app = "hello"; rtmp_opt.swfUrl = "anything"; rtmp_opt.tcUrl = "rtmp://heheda"; brpc::RtmpClient rtmp_client; ASSERT_EQ(0, rtmp_client.Init("localhost:8571", rtmp_opt)); // Create multiple streams. const int NSTREAM = 2; brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM]; for (int i = 0; i < NSTREAM; ++i) { cstreams[i].reset(new TestRtmpClientStream); brpc::RtmpClientStreamOptions opt; opt.play_name = butil::string_printf("play_name_%d", i); //opt.publish_name = butil::string_printf("pub_name_%d", i); opt.wait_until_play_or_publish_is_sent = true; cstreams[i]->Init(&rtmp_client, opt); } sleep(5); for (int i = 0; i < NSTREAM; ++i) { cstreams[i]->assertions_on_successful_play(); } LOG(INFO) << "Quiting program..."; } TEST(RtmpTest, fail_to_play_streams) { PlayingDummyService rtmp_service; brpc::Server server; brpc::ServerOptions server_opt; server_opt.rtmp_service = &rtmp_service; ASSERT_EQ(0, server.Start(8571, &server_opt)); brpc::RtmpClientOptions rtmp_opt; rtmp_opt.app = "hello"; rtmp_opt.swfUrl = "anything"; rtmp_opt.tcUrl = "rtmp://heheda"; brpc::RtmpClient rtmp_client; ASSERT_EQ(0, rtmp_client.Init("localhost:8571", rtmp_opt)); // Create multiple streams. const int NSTREAM = 2; brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM]; for (int i = 0; i < NSTREAM; ++i) { cstreams[i].reset(new TestRtmpClientStream); brpc::RtmpClientStreamOptions opt; opt.play_name = UNEXIST_NAME; opt.wait_until_play_or_publish_is_sent = true; cstreams[i]->Init(&rtmp_client, opt); } sleep(1); for (int i = 0; i < NSTREAM; ++i) { cstreams[i]->assertions_on_failure(); } LOG(INFO) << "Quiting program..."; } TEST(RtmpTest, successfully_publish_streams) { PublishService rtmp_service; brpc::Server server; brpc::ServerOptions server_opt; server_opt.rtmp_service = &rtmp_service; ASSERT_EQ(0, server.Start(8571, &server_opt)); brpc::RtmpClientOptions rtmp_opt; rtmp_opt.app = "hello"; rtmp_opt.swfUrl = "anything"; rtmp_opt.tcUrl = "rtmp://heheda"; brpc::RtmpClient rtmp_client; ASSERT_EQ(0, rtmp_client.Init("localhost:8571", rtmp_opt)); // Create multiple streams. const int NSTREAM = 2; brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM]; for (int i = 0; i < NSTREAM; ++i) { cstreams[i].reset(new TestRtmpClientStream); brpc::RtmpClientStreamOptions opt; opt.publish_name = butil::string_printf("pub_name_%d", i); opt.wait_until_play_or_publish_is_sent = true; cstreams[i]->Init(&rtmp_client, opt); } const int REP = 5; for (int i = 0; i < REP; ++i) { brpc::RtmpVideoMessage vmsg; vmsg.timestamp = 1000 + i * 20; vmsg.frame_type = brpc::FLV_VIDEO_FRAME_KEYFRAME; vmsg.codec = brpc::FLV_VIDEO_AVC; vmsg.data.append(butil::string_printf("video_%d", i)); for (int j = 0; j < NSTREAM; j += 2) { ASSERT_EQ(0, cstreams[j]->SendVideoMessage(vmsg)); } brpc::RtmpAudioMessage amsg; amsg.timestamp = 1000 + i * 20; amsg.codec = brpc::FLV_AUDIO_AAC; amsg.rate = brpc::FLV_SOUND_RATE_44100HZ; amsg.bits = brpc::FLV_SOUND_16BIT; amsg.type = brpc::FLV_SOUND_STEREO; amsg.data.append(butil::string_printf("audio_%d", i)); for (int j = 1; j < NSTREAM; j += 2) { ASSERT_EQ(0, cstreams[j]->SendAudioMessage(amsg)); } bthread_usleep(500000); } std::vector<butil::intrusive_ptr<PublishStream> > created_streams; rtmp_service.move_created_streams(&created_streams); ASSERT_EQ(NSTREAM, (int)created_streams.size()); for (int i = 0; i < NSTREAM; ++i) { EXPECT_EQ(1, created_streams[i]->_called_on_first_message); } for (int j = 0; j < NSTREAM; j += 2) { ASSERT_EQ(REP, created_streams[j]->_nvideomsg); } for (int j = 1; j < NSTREAM; j += 2) { ASSERT_EQ(REP, created_streams[j]->_naudiomsg); } LOG(INFO) << "Quiting program..."; } TEST(RtmpTest, failed_to_publish_streams) { PublishService rtmp_service; brpc::Server server; brpc::ServerOptions server_opt; server_opt.rtmp_service = &rtmp_service; ASSERT_EQ(0, server.Start(8575, &server_opt)); brpc::RtmpClientOptions rtmp_opt; rtmp_opt.app = "hello"; rtmp_opt.swfUrl = "anything"; rtmp_opt.tcUrl = "rtmp://heheda"; brpc::RtmpClient rtmp_client; ASSERT_EQ(0, rtmp_client.Init("localhost:8575", rtmp_opt)); // Create multiple streams. const int NSTREAM = 2; brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM]; for (int i = 0; i < NSTREAM; ++i) { cstreams[i].reset(new TestRtmpClientStream); brpc::RtmpClientStreamOptions opt; opt.publish_name = UNEXIST_NAME; opt.wait_until_play_or_publish_is_sent = true; cstreams[i]->Init(&rtmp_client, opt); } sleep(1); for (int i = 0; i < NSTREAM; ++i) { cstreams[i]->assertions_on_failure(); } std::vector<butil::intrusive_ptr<PublishStream> > created_streams; rtmp_service.move_created_streams(&created_streams); ASSERT_EQ(NSTREAM, (int)created_streams.size()); for (int i = 0; i < NSTREAM; ++i) { ASSERT_EQ(0, created_streams[i]->_called_on_first_message); ASSERT_EQ(0, created_streams[i]->_nvideomsg); ASSERT_EQ(0, created_streams[i]->_naudiomsg); } LOG(INFO) << "Quiting program..."; } TEST(RtmpTest, failed_to_connect_client_streams) { brpc::RtmpClientOptions rtmp_opt; rtmp_opt.app = "hello"; rtmp_opt.swfUrl = "anything"; rtmp_opt.tcUrl = "rtmp://heheda"; brpc::RtmpClient rtmp_client; ASSERT_EQ(0, rtmp_client.Init("localhost:8572", rtmp_opt)); // Create multiple streams. const int NSTREAM = 2; brpc::DestroyingPtr<TestRtmpClientStream> cstreams[NSTREAM]; for (int i = 0; i < NSTREAM; ++i) { cstreams[i].reset(new TestRtmpClientStream); brpc::RtmpClientStreamOptions opt; opt.play_name = butil::string_printf("play_name_%d", i); opt.wait_until_play_or_publish_is_sent = true; cstreams[i]->Init(&rtmp_client, opt); cstreams[i]->assertions_on_failure(); } LOG(INFO) << "Quiting program..."; } TEST(RtmpTest, destroy_client_streams_before_init) { brpc::RtmpClientOptions rtmp_opt; rtmp_opt.app = "hello"; rtmp_opt.swfUrl = "anything"; rtmp_opt.tcUrl = "rtmp://heheda"; brpc::RtmpClient rtmp_client; ASSERT_EQ(0, rtmp_client.Init("localhost:8573", rtmp_opt)); // Create multiple streams. const int NSTREAM = 2; butil::intrusive_ptr<TestRtmpClientStream> cstreams[NSTREAM]; for (int i = 0; i < NSTREAM; ++i) { cstreams[i].reset(new TestRtmpClientStream); cstreams[i]->Destroy(); ASSERT_EQ(1, cstreams[i]->_called_on_stop); ASSERT_EQ(brpc::RtmpClientStream::STATE_DESTROYING, cstreams[i]->_state); brpc::RtmpClientStreamOptions opt; opt.play_name = butil::string_printf("play_name_%d", i); opt.wait_until_play_or_publish_is_sent = true; cstreams[i]->Init(&rtmp_client, opt); cstreams[i]->assertions_on_failure(); } LOG(INFO) << "Quiting program..."; } TEST(RtmpTest, destroy_retrying_client_streams_before_init) { brpc::RtmpClientOptions rtmp_opt; rtmp_opt.app = "hello"; rtmp_opt.swfUrl = "anything"; rtmp_opt.tcUrl = "rtmp://heheda"; brpc::RtmpClient rtmp_client; ASSERT_EQ(0, rtmp_client.Init("localhost:8573", rtmp_opt)); // Create multiple streams. const int NSTREAM = 2; butil::intrusive_ptr<TestRtmpRetryingClientStream> cstreams[NSTREAM]; for (int i = 0; i < NSTREAM; ++i) { cstreams[i].reset(new TestRtmpRetryingClientStream); cstreams[i]->Destroy(); ASSERT_EQ(1, cstreams[i]->_called_on_stop); brpc::RtmpRetryingClientStreamOptions opt; opt.play_name = butil::string_printf("play_name_%d", i); brpc::SubStreamCreator* sc = new RtmpSubStreamCreator(&rtmp_client); cstreams[i]->Init(sc, opt); ASSERT_EQ(1, cstreams[i]->_called_on_stop); } LOG(INFO) << "Quiting program..."; } TEST(RtmpTest, destroy_client_streams_during_creation) { PlayingDummyService rtmp_service(2000/*sleep 2s*/); brpc::Server server; brpc::ServerOptions server_opt; server_opt.rtmp_service = &rtmp_service; ASSERT_EQ(0, server.Start(8574, &server_opt)); brpc::RtmpClientOptions rtmp_opt; rtmp_opt.app = "hello"; rtmp_opt.swfUrl = "anything"; rtmp_opt.tcUrl = "rtmp://heheda"; brpc::RtmpClient rtmp_client; ASSERT_EQ(0, rtmp_client.Init("localhost:8574", rtmp_opt)); // Create multiple streams. const int NSTREAM = 2; butil::intrusive_ptr<TestRtmpClientStream> cstreams[NSTREAM]; for (int i = 0; i < NSTREAM; ++i) { cstreams[i].reset(new TestRtmpClientStream); brpc::RtmpClientStreamOptions opt; opt.play_name = butil::string_printf("play_name_%d", i); cstreams[i]->Init(&rtmp_client, opt); ASSERT_EQ(0, cstreams[i]->_called_on_stop); usleep(500*1000); ASSERT_EQ(0, cstreams[i]->_called_on_stop); cstreams[i]->Destroy(); usleep(10*1000); ASSERT_EQ(1, cstreams[i]->_called_on_stop); } LOG(INFO) << "Quiting program..."; } TEST(RtmpTest, destroy_retrying_client_streams_during_creation) { PlayingDummyService rtmp_service(2000/*sleep 2s*/); brpc::Server server; brpc::ServerOptions server_opt; server_opt.rtmp_service = &rtmp_service; ASSERT_EQ(0, server.Start(8574, &server_opt)); brpc::RtmpClientOptions rtmp_opt; rtmp_opt.app = "hello"; rtmp_opt.swfUrl = "anything"; rtmp_opt.tcUrl = "rtmp://heheda"; brpc::RtmpClient rtmp_client; ASSERT_EQ(0, rtmp_client.Init("localhost:8574", rtmp_opt)); // Create multiple streams. const int NSTREAM = 2; butil::intrusive_ptr<TestRtmpRetryingClientStream> cstreams[NSTREAM]; for (int i = 0; i < NSTREAM; ++i) { cstreams[i].reset(new TestRtmpRetryingClientStream); brpc::RtmpRetryingClientStreamOptions opt; opt.play_name = butil::string_printf("play_name_%d", i); brpc::SubStreamCreator* sc = new RtmpSubStreamCreator(&rtmp_client); cstreams[i]->Init(sc, opt); ASSERT_EQ(0, cstreams[i]->_called_on_stop); usleep(500*1000); ASSERT_EQ(0, cstreams[i]->_called_on_stop); cstreams[i]->Destroy(); usleep(10*1000); ASSERT_EQ(1, cstreams[i]->_called_on_stop); } LOG(INFO) << "Quiting program..."; } TEST(RtmpTest, retrying_stream) { PlayingDummyService rtmp_service; brpc::Server server; brpc::ServerOptions server_opt; server_opt.rtmp_service = &rtmp_service; ASSERT_EQ(0, server.Start(8576, &server_opt)); brpc::RtmpClientOptions rtmp_opt; rtmp_opt.app = "hello"; rtmp_opt.swfUrl = "anything"; rtmp_opt.tcUrl = "rtmp://heheda"; brpc::RtmpClient rtmp_client; ASSERT_EQ(0, rtmp_client.Init("localhost:8576", rtmp_opt)); // Create multiple streams. const int NSTREAM = 2; brpc::DestroyingPtr<TestRtmpRetryingClientStream> cstreams[NSTREAM]; for (int i = 0; i < NSTREAM; ++i) { cstreams[i].reset(new TestRtmpRetryingClientStream); brpc::Controller cntl; brpc::RtmpRetryingClientStreamOptions opt; opt.play_name = butil::string_printf("name_%d", i); brpc::SubStreamCreator* sc = new RtmpSubStreamCreator(&rtmp_client); cstreams[i]->Init(sc, opt); } sleep(3); LOG(INFO) << "Stopping server"; server.Stop(0); server.Join(); LOG(INFO) << "Stopped server and sleep for awhile"; sleep(3); ASSERT_EQ(0, server.Start(8576, &server_opt)); sleep(3); for (int i = 0; i < NSTREAM; ++i) { ASSERT_EQ(1, cstreams[i]->_called_on_first_message); ASSERT_EQ(2, cstreams[i]->_called_on_playable); } LOG(INFO) << "Quiting program..."; }