Commit 60504d79 authored by zhujiashun's avatar zhujiashun

Change the interface of OnMetaData and SendMetaData

parent 035cce37
...@@ -2214,8 +2214,9 @@ bool RtmpChunkStream::OnDataMessageAMF0( ...@@ -2214,8 +2214,9 @@ bool RtmpChunkStream::OnDataMessageAMF0(
// Ignore empty metadata (seen in pulling streams from quanmin) // Ignore empty metadata (seen in pulling streams from quanmin)
return false; return false;
} }
AMFObject metadata; RtmpMetaData metadata;
if (!ReadAMFObject(&metadata, &istream)) { metadata.timestamp = mh.timestamp;
if (!ReadAMFObject(&metadata.data, &istream)) {
RTMP_ERROR(socket, mh) << "Fail to read metadata"; RTMP_ERROR(socket, mh) << "Fail to read metadata";
return false; return false;
} }
......
...@@ -118,13 +118,13 @@ butil::Status FlvWriter::Write(const RtmpAudioMessage& msg) { ...@@ -118,13 +118,13 @@ butil::Status FlvWriter::Write(const RtmpAudioMessage& msg) {
return butil::Status::OK(); return butil::Status::OK();
} }
butil::Status FlvWriter::Write(const AMFObject& metadata) { butil::Status FlvWriter::Write(const RtmpMetaData& metadata) {
butil::IOBuf req_buf; butil::IOBuf req_buf;
{ {
butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf); butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
AMFOutputStream ostream(&zc_stream); AMFOutputStream ostream(&zc_stream);
WriteAMFString(RTMP_AMF0_ON_META_DATA, &ostream); WriteAMFString(RTMP_AMF0_ON_META_DATA, &ostream);
WriteAMFObject(metadata, &ostream); WriteAMFObject(metadata.data, &ostream);
if (!ostream.good()) { if (!ostream.good()) {
return butil::Status(EINVAL, "Fail to serialize metadata"); return butil::Status(EINVAL, "Fail to serialize metadata");
} }
...@@ -140,8 +140,8 @@ butil::Status FlvWriter::Write(const AMFObject& metadata) { ...@@ -140,8 +140,8 @@ butil::Status FlvWriter::Write(const AMFObject& metadata) {
// FLV tag // FLV tag
*p++ = FLV_TAG_SCRIPT_DATA; *p++ = FLV_TAG_SCRIPT_DATA;
policy::WriteBigEndian3Bytes(&p, req_buf.size()); policy::WriteBigEndian3Bytes(&p, req_buf.size());
policy::WriteBigEndian3Bytes(&p, 0); policy::WriteBigEndian3Bytes(&p, (metadata.timestamp & 0xFFFFFF));
*p++ = 0; *p++ = (metadata.timestamp >> 24) & 0xFF;
policy::WriteBigEndian3Bytes(&p, 0); // StreamID policy::WriteBigEndian3Bytes(&p, 0); // StreamID
_buf->append(buf, p - buf); _buf->append(buf, p - buf);
_buf->append(req_buf); _buf->append(req_buf);
...@@ -195,7 +195,7 @@ butil::Status FlvReader::PeekMessageType(FlvTagType* type_out) { ...@@ -195,7 +195,7 @@ butil::Status FlvReader::PeekMessageType(FlvTagType* type_out) {
butil::Status FlvReader::Read(RtmpVideoMessage* msg) { butil::Status FlvReader::Read(RtmpVideoMessage* msg) {
char tags[11]; char tags[11];
const char* p = (const char*)_buf->fetch(tags, sizeof(tags)); const unsigned char* p = (const unsigned char*)_buf->fetch(tags, sizeof(tags));
if (p == NULL) { if (p == NULL) {
return butil::Status(EAGAIN, "Fail to read, not enough data"); return butil::Status(EAGAIN, "Fail to read, not enough data");
} }
...@@ -223,7 +223,7 @@ butil::Status FlvReader::Read(RtmpVideoMessage* msg) { ...@@ -223,7 +223,7 @@ butil::Status FlvReader::Read(RtmpVideoMessage* msg) {
butil::Status FlvReader::Read(RtmpAudioMessage* msg) { butil::Status FlvReader::Read(RtmpAudioMessage* msg) {
char tags[11]; char tags[11];
const char* p = (const char*)_buf->fetch(tags, sizeof(tags)); const unsigned char* p = (const unsigned char*)_buf->fetch(tags, sizeof(tags));
if (p == NULL) { if (p == NULL) {
return butil::Status(EAGAIN, "Fail to read, not enough data"); return butil::Status(EAGAIN, "Fail to read, not enough data");
} }
...@@ -250,9 +250,9 @@ butil::Status FlvReader::Read(RtmpAudioMessage* msg) { ...@@ -250,9 +250,9 @@ butil::Status FlvReader::Read(RtmpAudioMessage* msg) {
return butil::Status::OK(); return butil::Status::OK();
} }
butil::Status FlvReader::Read(AMFObject* msg, std::string* name) { butil::Status FlvReader::Read(RtmpMetaData* msg, std::string* name) {
char tags[11]; char tags[11];
const char* p = (const char*)_buf->fetch(tags, sizeof(tags)); const unsigned char* p = (const unsigned char*)_buf->fetch(tags, sizeof(tags));
if (p == NULL) { if (p == NULL) {
return butil::Status(EAGAIN, "Fail to read, not enough data"); return butil::Status(EAGAIN, "Fail to read, not enough data");
} }
...@@ -260,6 +260,8 @@ butil::Status FlvReader::Read(AMFObject* msg, std::string* name) { ...@@ -260,6 +260,8 @@ butil::Status FlvReader::Read(AMFObject* msg, std::string* name) {
return butil::Status(EINVAL, "Fail to parse RtmpScriptMessage"); return butil::Status(EINVAL, "Fail to parse RtmpScriptMessage");
} }
uint32_t msg_size = policy::ReadBigEndian3Bytes(p + 1); uint32_t msg_size = policy::ReadBigEndian3Bytes(p + 1);
uint32_t timestamp = policy::ReadBigEndian3Bytes(p + 4);
timestamp |= (*(p + 7) << 24);
if (_buf->length() < 11 + msg_size + 4/*PreviousTagSize*/) { if (_buf->length() < 11 + msg_size + 4/*PreviousTagSize*/) {
return butil::Status(EAGAIN, "Fail to read, not enough data"); return butil::Status(EAGAIN, "Fail to read, not enough data");
} }
...@@ -273,10 +275,11 @@ butil::Status FlvReader::Read(AMFObject* msg, std::string* name) { ...@@ -273,10 +275,11 @@ butil::Status FlvReader::Read(AMFObject* msg, std::string* name) {
if (!ReadAMFString(name, &istream)) { if (!ReadAMFString(name, &istream)) {
return butil::Status(EINVAL, "Fail to read AMF string"); return butil::Status(EINVAL, "Fail to read AMF string");
} }
if (!ReadAMFObject(msg, &istream)) { if (!ReadAMFObject(&msg->data, &istream)) {
return butil::Status(EINVAL, "Fail to read AMF object"); return butil::Status(EINVAL, "Fail to read AMF object");
} }
} }
msg->timestamp = timestamp;
return butil::Status::OK(); return butil::Status::OK();
} }
...@@ -1260,20 +1263,20 @@ int RtmpStreamBase::SendControlMessage( ...@@ -1260,20 +1263,20 @@ int RtmpStreamBase::SendControlMessage(
return _rtmpsock->Write(msg); return _rtmpsock->Write(msg);
} }
int RtmpStreamBase::SendMetaData(const AMFObject& metadata, int RtmpStreamBase::SendMetaData(const RtmpMetaData& metadata,
const butil::StringPiece& name) { const butil::StringPiece& name) {
butil::IOBuf req_buf; butil::IOBuf req_buf;
{ {
butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf); butil::IOBufAsZeroCopyOutputStream zc_stream(&req_buf);
AMFOutputStream ostream(&zc_stream); AMFOutputStream ostream(&zc_stream);
WriteAMFString(name, &ostream); WriteAMFString(name, &ostream);
WriteAMFObject(metadata, &ostream); WriteAMFObject(metadata.data, &ostream);
if (!ostream.good()) { if (!ostream.good()) {
LOG(ERROR) << "Fail to serialize metadata"; LOG(ERROR) << "Fail to serialize metadata";
return -1; return -1;
} }
} }
return SendMessage(0, policy::RTMP_MESSAGE_DATA_AMF0, req_buf); return SendMessage(metadata.timestamp, policy::RTMP_MESSAGE_DATA_AMF0, req_buf);
} }
int RtmpStreamBase::SendSharedObjectMessage(const RtmpSharedObjectMessage&) { int RtmpStreamBase::SendSharedObjectMessage(const RtmpSharedObjectMessage&) {
...@@ -1445,9 +1448,9 @@ void RtmpStreamBase::OnUserData(void*) { ...@@ -1445,9 +1448,9 @@ void RtmpStreamBase::OnUserData(void*) {
<< "] ignored UserData{}"; << "] ignored UserData{}";
} }
void RtmpStreamBase::OnMetaData(AMFObject* metadata, const butil::StringPiece& name) { void RtmpStreamBase::OnMetaData(RtmpMetaData* metadata, const butil::StringPiece& name) {
LOG(INFO) << remote_side() << '[' << stream_id() LOG(INFO) << remote_side() << '[' << stream_id()
<< "] ignored MetaData{" << *metadata << '}' << "] ignored MetaData{" << metadata->data << '}'
<< " name{" << name << '}'; << " name{" << name << '}';
} }
...@@ -1504,7 +1507,7 @@ void RtmpStreamBase::CallOnUserData(void* data) { ...@@ -1504,7 +1507,7 @@ void RtmpStreamBase::CallOnUserData(void* data) {
} }
} }
void RtmpStreamBase::CallOnMetaData(AMFObject* obj, const butil::StringPiece& name) { void RtmpStreamBase::CallOnMetaData(RtmpMetaData* obj, const butil::StringPiece& name) {
if (BeginProcessingMessage("OnMetaData()")) { if (BeginProcessingMessage("OnMetaData()")) {
OnMetaData(obj, name); OnMetaData(obj, name);
EndProcessingMessage(); EndProcessingMessage();
...@@ -2245,7 +2248,7 @@ void RetryingClientMessageHandler::OnUserData(void* msg) { ...@@ -2245,7 +2248,7 @@ void RetryingClientMessageHandler::OnUserData(void* msg) {
_parent->CallOnUserData(msg); _parent->CallOnUserData(msg);
} }
void RetryingClientMessageHandler::OnMetaData(brpc::AMFObject* metadata, const butil::StringPiece& name) { void RetryingClientMessageHandler::OnMetaData(brpc::RtmpMetaData* metadata, const butil::StringPiece& name) {
_parent->CallOnMetaData(metadata, name); _parent->CallOnMetaData(metadata, name);
} }
...@@ -2407,7 +2410,7 @@ int RtmpRetryingClientStream::AcquireStreamToSend( ...@@ -2407,7 +2410,7 @@ int RtmpRetryingClientStream::AcquireStreamToSend(
return 0; return 0;
} }
int RtmpRetryingClientStream::SendMetaData(const AMFObject& obj, const butil::StringPiece& name) { int RtmpRetryingClientStream::SendMetaData(const RtmpMetaData& obj, const butil::StringPiece& name) {
butil::intrusive_ptr<RtmpStreamBase> ptr; butil::intrusive_ptr<RtmpStreamBase> ptr;
if (AcquireStreamToSend(&ptr) != 0) { if (AcquireStreamToSend(&ptr) != 0) {
return -1; return -1;
......
...@@ -365,6 +365,11 @@ enum RtmpObjectEncoding { ...@@ -365,6 +365,11 @@ enum RtmpObjectEncoding {
}; };
const char* RtmpObjectEncoding2Str(RtmpObjectEncoding); const char* RtmpObjectEncoding2Str(RtmpObjectEncoding);
struct RtmpMetaData {
uint32_t timestamp;
AMFObject data;
};
struct RtmpSharedObjectMessage { struct RtmpSharedObjectMessage {
// Not implemented yet. // Not implemented yet.
}; };
...@@ -383,7 +388,7 @@ public: ...@@ -383,7 +388,7 @@ public:
// Append a video/audio/metadata message into the output buffer. // Append a video/audio/metadata message into the output buffer.
butil::Status Write(const RtmpVideoMessage&); butil::Status Write(const RtmpVideoMessage&);
butil::Status Write(const RtmpAudioMessage&); butil::Status Write(const RtmpAudioMessage&);
butil::Status Write(const AMFObject&); butil::Status Write(const RtmpMetaData&);
private: private:
bool _write_header; bool _write_header;
butil::IOBuf* _buf; butil::IOBuf* _buf;
...@@ -409,7 +414,7 @@ public: ...@@ -409,7 +414,7 @@ public:
// PeekMessageType, caller should call Read(RtmpAudioMessage*) subsequently. // PeekMessageType, caller should call Read(RtmpAudioMessage*) subsequently.
butil::Status Read(RtmpVideoMessage* msg); butil::Status Read(RtmpVideoMessage* msg);
butil::Status Read(RtmpAudioMessage* msg); butil::Status Read(RtmpAudioMessage* msg);
butil::Status Read(AMFObject* object, std::string* object_name); butil::Status Read(RtmpMetaData* object, std::string* object_name);
private: private:
butil::Status ReadHeader(); butil::Status ReadHeader();
...@@ -505,7 +510,7 @@ public: ...@@ -505,7 +510,7 @@ public:
// simultaneously. // simultaneously.
// NOTE: Inputs can be modified and consumed. // NOTE: Inputs can be modified and consumed.
virtual void OnUserData(void* msg); virtual void OnUserData(void* msg);
virtual void OnMetaData(AMFObject*, const butil::StringPiece&); virtual void OnMetaData(RtmpMetaData*, const butil::StringPiece&);
virtual void OnSharedObjectMessage(RtmpSharedObjectMessage* msg); virtual void OnSharedObjectMessage(RtmpSharedObjectMessage* msg);
virtual void OnAudioMessage(RtmpAudioMessage* msg); virtual void OnAudioMessage(RtmpAudioMessage* msg);
virtual void OnVideoMessage(RtmpVideoMessage* msg); virtual void OnVideoMessage(RtmpVideoMessage* msg);
...@@ -521,7 +526,7 @@ public: ...@@ -521,7 +526,7 @@ public:
// Send media messages to the peer. // Send media messages to the peer.
// Returns 0 on success, -1 otherwise. // Returns 0 on success, -1 otherwise.
virtual int SendMetaData(const AMFObject&, virtual int SendMetaData(const RtmpMetaData&,
const butil::StringPiece& name = "onMetaData"); const butil::StringPiece& name = "onMetaData");
virtual int SendSharedObjectMessage(const RtmpSharedObjectMessage& msg); virtual int SendSharedObjectMessage(const RtmpSharedObjectMessage& msg);
virtual int SendAudioMessage(const RtmpAudioMessage& msg); virtual int SendAudioMessage(const RtmpAudioMessage& msg);
...@@ -598,7 +603,7 @@ friend class policy::OnServerStreamCreated; ...@@ -598,7 +603,7 @@ friend class policy::OnServerStreamCreated;
bool BeginProcessingMessage(const char* fun_name); bool BeginProcessingMessage(const char* fun_name);
void EndProcessingMessage(); void EndProcessingMessage();
void CallOnUserData(void* data); void CallOnUserData(void* data);
void CallOnMetaData(AMFObject*, const butil::StringPiece&); void CallOnMetaData(RtmpMetaData*, const butil::StringPiece&);
void CallOnSharedObjectMessage(RtmpSharedObjectMessage* msg); void CallOnSharedObjectMessage(RtmpSharedObjectMessage* msg);
void CallOnAudioMessage(RtmpAudioMessage* msg); void CallOnAudioMessage(RtmpAudioMessage* msg);
void CallOnVideoMessage(RtmpVideoMessage* msg); void CallOnVideoMessage(RtmpVideoMessage* msg);
...@@ -869,7 +874,7 @@ class RtmpMessageHandler { ...@@ -869,7 +874,7 @@ class RtmpMessageHandler {
public: public:
virtual void OnPlayable() = 0; virtual void OnPlayable() = 0;
virtual void OnUserData(void*) = 0; virtual void OnUserData(void*) = 0;
virtual void OnMetaData(brpc::AMFObject* metadata, const butil::StringPiece& name) = 0; virtual void OnMetaData(brpc::RtmpMetaData* metadata, const butil::StringPiece& name) = 0;
virtual void OnAudioMessage(brpc::RtmpAudioMessage* msg) = 0; virtual void OnAudioMessage(brpc::RtmpAudioMessage* msg) = 0;
virtual void OnVideoMessage(brpc::RtmpVideoMessage* msg) = 0; virtual void OnVideoMessage(brpc::RtmpVideoMessage* msg) = 0;
virtual void OnSharedObjectMessage(RtmpSharedObjectMessage* msg) = 0; virtual void OnSharedObjectMessage(RtmpSharedObjectMessage* msg) = 0;
...@@ -886,7 +891,7 @@ public: ...@@ -886,7 +891,7 @@ public:
void OnPlayable(); void OnPlayable();
void OnUserData(void*); void OnUserData(void*);
void OnMetaData(brpc::AMFObject* metadata, const butil::StringPiece& name); void OnMetaData(brpc::RtmpMetaData* metadata, const butil::StringPiece& name);
void OnAudioMessage(brpc::RtmpAudioMessage* msg); void OnAudioMessage(brpc::RtmpAudioMessage* msg);
void OnVideoMessage(brpc::RtmpVideoMessage* msg); void OnVideoMessage(brpc::RtmpVideoMessage* msg);
void OnSharedObjectMessage(RtmpSharedObjectMessage* msg); void OnSharedObjectMessage(RtmpSharedObjectMessage* msg);
...@@ -929,7 +934,7 @@ public: ...@@ -929,7 +934,7 @@ public:
// If the stream is recreated, following methods may return -1 and set // If the stream is recreated, following methods may return -1 and set
// errno to ERTMPPUBLISHABLE for once. (so that users can be notified to // errno to ERTMPPUBLISHABLE for once. (so that users can be notified to
// resend metadata or header messages). // resend metadata or header messages).
int SendMetaData(const AMFObject&, int SendMetaData(const RtmpMetaData&,
const butil::StringPiece& name = "onMetaData"); const butil::StringPiece& name = "onMetaData");
int SendSharedObjectMessage(const RtmpSharedObjectMessage& msg); int SendSharedObjectMessage(const RtmpSharedObjectMessage& msg);
int SendAudioMessage(const RtmpAudioMessage& msg); int SendAudioMessage(const RtmpAudioMessage& msg);
......
...@@ -338,7 +338,7 @@ public: ...@@ -338,7 +338,7 @@ public:
explicit RtmpSubStream(brpc::RtmpMessageHandler* mh) explicit RtmpSubStream(brpc::RtmpMessageHandler* mh)
: _message_handler(mh) {} : _message_handler(mh) {}
// @RtmpStreamBase // @RtmpStreamBase
void OnMetaData(brpc::AMFObject*, const butil::StringPiece&); void OnMetaData(brpc::RtmpMetaData*, const butil::StringPiece&);
void OnSharedObjectMessage(brpc::RtmpSharedObjectMessage* msg); void OnSharedObjectMessage(brpc::RtmpSharedObjectMessage* msg);
void OnAudioMessage(brpc::RtmpAudioMessage* msg); void OnAudioMessage(brpc::RtmpAudioMessage* msg);
void OnVideoMessage(brpc::RtmpVideoMessage* msg); void OnVideoMessage(brpc::RtmpVideoMessage* msg);
...@@ -352,7 +352,7 @@ void RtmpSubStream::OnFirstMessage() { ...@@ -352,7 +352,7 @@ void RtmpSubStream::OnFirstMessage() {
_message_handler->OnPlayable(); _message_handler->OnPlayable();
} }
void RtmpSubStream::OnMetaData(brpc::AMFObject* obj, const butil::StringPiece& name) { void RtmpSubStream::OnMetaData(brpc::RtmpMetaData* obj, const butil::StringPiece& name) {
_message_handler->OnMetaData(obj, name); _message_handler->OnMetaData(obj, name);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment