// Copyright (c) 2015 Baidu, Inc.
// 
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// 
//     http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Authors: Ge,Jun (gejun@baidu.com)

#define INTERNAL_SUPPRESS_PROTOBUF_FIELD_DEPRECATION
#include <algorithm>
#include <google/protobuf/stubs/once.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/wire_format_lite_inl.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/reflection_ops.h>
#include <google/protobuf/wire_format.h>
#include "butil/string_printf.h"
#include "butil/macros.h"
#include "butil/sys_byteorder.h"
#include "brpc/controller.h"
#include "brpc/memcache.h"
#include "brpc/policy/memcache_binary_header.h"


namespace brpc {

// Internal implementation detail -- do not call these.
void protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto_impl();
void protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto();
void protobuf_AssignDesc_baidu_2frpc_2fmemcache_5fbase_2eproto();
void protobuf_ShutdownFile_baidu_2frpc_2fmemcache_5fbase_2eproto();

namespace {

const ::google::protobuf::Descriptor* MemcacheRequest_descriptor_ = NULL;
const ::google::protobuf::Descriptor* MemcacheResponse_descriptor_ = NULL;

}  // namespace

void protobuf_AssignDesc_baidu_2frpc_2fmemcache_5fbase_2eproto() {
    protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto();
    const ::google::protobuf::FileDescriptor* file =
        ::google::protobuf::DescriptorPool::generated_pool()->FindFileByName(
            "baidu/rpc/memcache_base.proto");
    GOOGLE_CHECK(file != NULL);
    MemcacheRequest_descriptor_ = file->message_type(0);
    MemcacheResponse_descriptor_ = file->message_type(1);
}

namespace {

GOOGLE_PROTOBUF_DECLARE_ONCE(protobuf_AssignDescriptors_once_);
inline void protobuf_AssignDescriptorsOnce() {
    ::google::protobuf::GoogleOnceInit(&protobuf_AssignDescriptors_once_,
                                       &protobuf_AssignDesc_baidu_2frpc_2fmemcache_5fbase_2eproto);
}

void protobuf_RegisterTypes(const ::std::string&) {
    protobuf_AssignDescriptorsOnce();
    ::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage(
        MemcacheRequest_descriptor_, &MemcacheRequest::default_instance());
    ::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage(
        MemcacheResponse_descriptor_, &MemcacheResponse::default_instance());
}

}  // namespace

void protobuf_ShutdownFile_baidu_2frpc_2fmemcache_5fbase_2eproto() {
    delete MemcacheRequest::default_instance_;
    delete MemcacheResponse::default_instance_;
}

void protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto_impl() {
    GOOGLE_PROTOBUF_VERIFY_VERSION;

#if GOOGLE_PROTOBUF_VERSION >= 3002000
    ::google::protobuf::internal::InitProtobufDefaults();
#else
    ::google::protobuf::protobuf_AddDesc_google_2fprotobuf_2fdescriptor_2eproto();
#endif
    ::google::protobuf::DescriptorPool::InternalAddGeneratedFile(
        "\n\035baidu/rpc/memcache_base.proto\022\tbaidu.r"
        "pc\032 google/protobuf/descriptor.proto\"\021\n\017"
        "MemcacheRequest\"\022\n\020MemcacheResponseB\003\200\001\001", 120);
    ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile(
        "baidu/rpc/memcache_base.proto", &protobuf_RegisterTypes);
    MemcacheRequest::default_instance_ = new MemcacheRequest();
    MemcacheResponse::default_instance_ = new MemcacheResponse();
    MemcacheRequest::default_instance_->InitAsDefaultInstance();
    MemcacheResponse::default_instance_->InitAsDefaultInstance();
    ::google::protobuf::internal::OnShutdown(&protobuf_ShutdownFile_baidu_2frpc_2fmemcache_5fbase_2eproto);
}

GOOGLE_PROTOBUF_DECLARE_ONCE(protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto_once);
void protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto() {
    ::google::protobuf::GoogleOnceInit(
        &protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto_once,
        &protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto_impl);
}

// Force AddDescriptors() to be called at static initialization time.
struct StaticDescriptorInitializer_baidu_2frpc_2fmemcache_5fbase_2eproto {
    StaticDescriptorInitializer_baidu_2frpc_2fmemcache_5fbase_2eproto() {
        protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto();
    }
} static_descriptor_initializer_baidu_2frpc_2fmemcache_5fbase_2eproto_;


// ===================================================================

#ifndef _MSC_VER
#endif  // !_MSC_VER

MemcacheRequest::MemcacheRequest()
    : ::google::protobuf::Message() {
    SharedCtor();
}

void MemcacheRequest::InitAsDefaultInstance() {
}

MemcacheRequest::MemcacheRequest(const MemcacheRequest& from)
    : ::google::protobuf::Message() {
    SharedCtor();
    MergeFrom(from);
}

void MemcacheRequest::SharedCtor() {
    _pipelined_count = 0;
    _cached_size_ = 0;
}

MemcacheRequest::~MemcacheRequest() {
    SharedDtor();
}

void MemcacheRequest::SharedDtor() {
    if (this != default_instance_) {
    }
}

void MemcacheRequest::SetCachedSize(int size) const {
    GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN();
    _cached_size_ = size;
    GOOGLE_SAFE_CONCURRENT_WRITES_END();
}
const ::google::protobuf::Descriptor* MemcacheRequest::descriptor() {
    protobuf_AssignDescriptorsOnce();
    return MemcacheRequest_descriptor_;
}

const MemcacheRequest& MemcacheRequest::default_instance() {
    if (default_instance_ == NULL) {
        protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto();
    }
    return *default_instance_;
}

MemcacheRequest* MemcacheRequest::default_instance_ = NULL;

MemcacheRequest* MemcacheRequest::New() const {
    return new MemcacheRequest;
}

void MemcacheRequest::Clear() {
    _buf.clear();
    _pipelined_count = 0;
}

bool MemcacheRequest::MergePartialFromCodedStream(
    ::google::protobuf::io::CodedInputStream* input) {
    LOG(WARNING) << "You're not supposed to parse a MemcacheRequest";
    
    // simple approach just making it work.
    butil::IOBuf tmp;
    const void* data = NULL;
    int size = 0;
    while (input->GetDirectBufferPointer(&data, &size)) {
        tmp.append(data, size);
        input->Skip(size);
    }
    const butil::IOBuf saved = tmp;
    int count = 0;
    for (; !tmp.empty(); ++count) {
        char aux_buf[sizeof(policy::MemcacheRequestHeader)];
        const policy::MemcacheRequestHeader* header =
            (const policy::MemcacheRequestHeader*)tmp.fetch(aux_buf, sizeof(aux_buf));
        if (header == NULL) {
            return false;
        }
        if (header->magic != (uint8_t)policy::MC_MAGIC_REQUEST) {
            return false;
        }
        uint32_t total_body_length = butil::NetToHost32(header->total_body_length);
        if (tmp.size() < sizeof(*header) + total_body_length) {
            return false;
        }
        tmp.pop_front(sizeof(*header) + total_body_length);
    }
    _buf.append(saved);
    _pipelined_count += count;
    return true;
}

void MemcacheRequest::SerializeWithCachedSizes(
    ::google::protobuf::io::CodedOutputStream* output) const {
    LOG(WARNING) << "You're not supposed to serialize a MemcacheRequest";

    // simple approach just making it work.
    butil::IOBufAsZeroCopyInputStream wrapper(_buf);
    const void* data = NULL;
    int size = 0;
    while (wrapper.Next(&data, &size)) {
        output->WriteRaw(data, size);
    }
}

::google::protobuf::uint8* MemcacheRequest::SerializeWithCachedSizesToArray(
    ::google::protobuf::uint8* target) const {
    return target;
}

int MemcacheRequest::ByteSize() const {
    int total_size =  _buf.size();
    GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN();
    _cached_size_ = total_size;
    GOOGLE_SAFE_CONCURRENT_WRITES_END();
    return total_size;
}

void MemcacheRequest::MergeFrom(const ::google::protobuf::Message& from) {
    GOOGLE_CHECK_NE(&from, this);
    const MemcacheRequest* source =
        ::google::protobuf::internal::dynamic_cast_if_available<const MemcacheRequest*>(&from);
    if (source == NULL) {
        ::google::protobuf::internal::ReflectionOps::Merge(from, this);
    } else {
        MergeFrom(*source);
    }
}

void MemcacheRequest::MergeFrom(const MemcacheRequest& from) {
    GOOGLE_CHECK_NE(&from, this);
    _buf.append(from._buf);
    _pipelined_count += from._pipelined_count;
}

void MemcacheRequest::CopyFrom(const ::google::protobuf::Message& from) {
    if (&from == this) return;
    Clear();
    MergeFrom(from);
}

void MemcacheRequest::CopyFrom(const MemcacheRequest& from) {
    if (&from == this) return;
    Clear();
    MergeFrom(from);
}

bool MemcacheRequest::IsInitialized() const {
    return _pipelined_count != 0;
}

void MemcacheRequest::Swap(MemcacheRequest* other) {
    if (other != this) {
        _buf.swap(other->_buf);
        std::swap(_pipelined_count, other->_pipelined_count);
        std::swap(_cached_size_, other->_cached_size_);
    }
}

::google::protobuf::Metadata MemcacheRequest::GetMetadata() const {
    protobuf_AssignDescriptorsOnce();
    ::google::protobuf::Metadata metadata;
    metadata.descriptor = MemcacheRequest_descriptor_;
    metadata.reflection = NULL;
    return metadata;
}

// ===================================================================

#ifndef _MSC_VER
#endif  // !_MSC_VER

MemcacheResponse::MemcacheResponse()
    : ::google::protobuf::Message() {
    SharedCtor();
}

void MemcacheResponse::InitAsDefaultInstance() {
}

MemcacheResponse::MemcacheResponse(const MemcacheResponse& from)
    : ::google::protobuf::Message() {
    SharedCtor();
    MergeFrom(from);
}

void MemcacheResponse::SharedCtor() {
    _cached_size_ = 0;
}

MemcacheResponse::~MemcacheResponse() {
    SharedDtor();
}

void MemcacheResponse::SharedDtor() {
    if (this != default_instance_) {
    }
}

void MemcacheResponse::SetCachedSize(int size) const {
    GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN();
    _cached_size_ = size;
    GOOGLE_SAFE_CONCURRENT_WRITES_END();
}
const ::google::protobuf::Descriptor* MemcacheResponse::descriptor() {
    protobuf_AssignDescriptorsOnce();
    return MemcacheResponse_descriptor_;
}

const MemcacheResponse& MemcacheResponse::default_instance() {
    if (default_instance_ == NULL) {
        protobuf_AddDesc_baidu_2frpc_2fmemcache_5fbase_2eproto();
    }
    return *default_instance_;
}

MemcacheResponse* MemcacheResponse::default_instance_ = NULL;

MemcacheResponse* MemcacheResponse::New() const {
    return new MemcacheResponse;
}

void MemcacheResponse::Clear() {
}

bool MemcacheResponse::MergePartialFromCodedStream(
    ::google::protobuf::io::CodedInputStream* input) {
    LOG(WARNING) << "You're not supposed to parse a MemcacheResponse";

    // simple approach just making it work.
    const void* data = NULL;
    int size = 0;
    while (input->GetDirectBufferPointer(&data, &size)) {
        _buf.append(data, size);
        input->Skip(size);
    }
    return true;
}

void MemcacheResponse::SerializeWithCachedSizes(
    ::google::protobuf::io::CodedOutputStream* output) const {
    LOG(WARNING) << "You're not supposed to serialize a MemcacheResponse";
    
    // simple approach just making it work.
    butil::IOBufAsZeroCopyInputStream wrapper(_buf);
    const void* data = NULL;
    int size = 0;
    while (wrapper.Next(&data, &size)) {
        output->WriteRaw(data, size);
    }
}

::google::protobuf::uint8* MemcacheResponse::SerializeWithCachedSizesToArray(
    ::google::protobuf::uint8* target) const {
    return target;
}

int MemcacheResponse::ByteSize() const {
    int total_size = _buf.size();
    GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN();
    _cached_size_ = total_size;
    GOOGLE_SAFE_CONCURRENT_WRITES_END();
    return total_size;
}

void MemcacheResponse::MergeFrom(const ::google::protobuf::Message& from) {
    GOOGLE_CHECK_NE(&from, this);
    const MemcacheResponse* source =
        ::google::protobuf::internal::dynamic_cast_if_available<const MemcacheResponse*>(&from);
    if (source == NULL) {
        ::google::protobuf::internal::ReflectionOps::Merge(from, this);
    } else {
        MergeFrom(*source);
    }
}

void MemcacheResponse::MergeFrom(const MemcacheResponse& from) {
    GOOGLE_CHECK_NE(&from, this);
    _err = from._err;
    // responses of memcached according to their binary layout, should be
    // directly concatenatible.
    _buf.append(from._buf);
}

void MemcacheResponse::CopyFrom(const ::google::protobuf::Message& from) {
    if (&from == this) return;
    Clear();
    MergeFrom(from);
}

void MemcacheResponse::CopyFrom(const MemcacheResponse& from) {
    if (&from == this) return;
    Clear();
    MergeFrom(from);
}

bool MemcacheResponse::IsInitialized() const {
    return !_buf.empty();
}

void MemcacheResponse::Swap(MemcacheResponse* other) {
    if (other != this) {
        _buf.swap(other->_buf);
        std::swap(_cached_size_, other->_cached_size_);
    }
}

::google::protobuf::Metadata MemcacheResponse::GetMetadata() const {
    protobuf_AssignDescriptorsOnce();
    ::google::protobuf::Metadata metadata;
    metadata.descriptor = MemcacheResponse_descriptor_;
    metadata.reflection = NULL;
    return metadata;
}

// ===================================================================

const char* MemcacheResponse::status_str(Status st) {
    switch (st) {
    case STATUS_SUCCESS:
        return "SUCCESS";
    case STATUS_KEY_ENOENT:
        return "The key does not exist";
    case STATUS_KEY_EEXISTS:
        return "The key exists";
    case STATUS_E2BIG:
        return "Arg list is too long";
    case STATUS_EINVAL:
        return "Invalid argument";
    case STATUS_NOT_STORED:
        return "Not stored";
    case STATUS_DELTA_BADVAL:
        return "Bad delta";
    case STATUS_AUTH_ERROR:
        return "authentication error";
    case STATUS_AUTH_CONTINUE:
        return "authentication continue";
    case STATUS_UNKNOWN_COMMAND:
        return "Unknown command";
    case STATUS_ENOMEM:
        return "Out of memory";
    }
    return "Unknown status";
}

// MUST NOT have extras.
// MUST have key.
// MUST NOT have value.
bool MemcacheRequest::GetOrDelete(uint8_t command, const butil::StringPiece& key) {
    const policy::MemcacheRequestHeader header = {
        policy::MC_MAGIC_REQUEST,
        command,
        butil::HostToNet16(key.size()),
        0,
        policy::MC_BINARY_RAW_BYTES,
        0,
        butil::HostToNet32(key.size()),
        0,
        0
    };
    if (_buf.append(&header, sizeof(header))) {
        return false;
    }
    if (_buf.append(key.data(), key.size())) {
        return false;
    }
    ++_pipelined_count;
    return true;
}

bool MemcacheRequest::Get(const butil::StringPiece& key) {
    return GetOrDelete(policy::MC_BINARY_GET, key);
}

bool MemcacheRequest::Delete(const butil::StringPiece& key) {
    return GetOrDelete(policy::MC_BINARY_DELETE, key);
}

struct FlushHeaderWithExtras {
    policy::MemcacheRequestHeader header;
    uint32_t exptime;
} __attribute__((packed));
BAIDU_CASSERT(sizeof(FlushHeaderWithExtras) == 28, must_match);

// MAY have extras.
// MUST NOT have key.
// MUST NOT have value.
// Extra data for flush:
//    Byte/     0       |       1       |       2       |       3       |
//       /              |               |               |               |
//      |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
//      +---------------+---------------+---------------+---------------+
//     0| Expiration                                                    |
//      +---------------+---------------+---------------+---------------+
//    Total 4 bytes
bool MemcacheRequest::Flush(uint32_t timeout) {
    const uint8_t FLUSH_EXTRAS = (timeout == 0 ? 0 : 4);
    FlushHeaderWithExtras header_with_extras = {{
            policy::MC_MAGIC_REQUEST,
            policy::MC_BINARY_FLUSH,
            0,
            FLUSH_EXTRAS,
            policy::MC_BINARY_RAW_BYTES,
            0,
            butil::HostToNet32(FLUSH_EXTRAS),
            0,
            0 }, butil::HostToNet32(timeout) };
    if (FLUSH_EXTRAS == 0) {
        if (_buf.append(&header_with_extras.header,
                       sizeof(policy::MemcacheRequestHeader))) {
            return false;
        }
    } else {
        if (_buf.append(&header_with_extras, sizeof(header_with_extras))) {
            return false;
        }
    }
    ++_pipelined_count;
    return true;
}

// (if found):
// MUST have extras.
// MAY have key.
// MAY have value.
// Extra data for the get commands:
// Byte/     0       |       1       |       2       |       3       |
//    /              |               |               |               |
//   |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
//   +---------------+---------------+---------------+---------------+
//  0| Flags                                                         |
//   +---------------+---------------+---------------+---------------+
//   Total 4 bytes
bool MemcacheResponse::PopGet(
    butil::IOBuf* value, uint32_t* flags, uint64_t* cas_value) {
    const size_t n = _buf.size();
    policy::MemcacheResponseHeader header;
    if (n < sizeof(header)) {
        butil::string_printf(&_err, "buffer is too small to contain a header");
        return false;
    }
    _buf.copy_to(&header, sizeof(header));
    if (header.command != (uint8_t)policy::MC_BINARY_GET) {
        butil::string_printf(&_err, "not a GET response");
        return false;
    }
    if (n < sizeof(header) + header.total_body_length) {
        butil::string_printf(&_err, "response=%u < header=%u + body=%u",
                  (unsigned)n, (unsigned)sizeof(header), header.total_body_length);
        return false;
    }
    if (header.status != (uint16_t)STATUS_SUCCESS) {
        LOG_IF(ERROR, header.extras_length != 0) << "GET response must not have flags";
        LOG_IF(ERROR, header.key_length != 0) << "GET response must not have key";
        const int value_size = (int)header.total_body_length - (int)header.extras_length
            - (int)header.key_length;
        if (value_size < 0) {
            butil::string_printf(&_err, "value_size=%d is non-negative", value_size);
            return false;
        }
        _buf.pop_front(sizeof(header) + header.extras_length +
                      header.key_length);
        _err.clear();
        _buf.cutn(&_err, value_size);
        return false;
    }
    if (header.extras_length != 4u) {
        butil::string_printf(&_err, "GET response must have flags as extras, actual length=%u",
                  header.extras_length);
        return false;
    }
    if (header.key_length != 0) {
        butil::string_printf(&_err, "GET response must not have key");
        return false;
    }
    const int value_size = (int)header.total_body_length - (int)header.extras_length
        - (int)header.key_length;
    if (value_size < 0) {
        butil::string_printf(&_err, "value_size=%d is non-negative", value_size);
        return false;
    }
    _buf.pop_front(sizeof(header));
    uint32_t raw_flags = 0;
    _buf.cutn(&raw_flags, sizeof(raw_flags));
    if (flags) {
        *flags = butil::NetToHost32(raw_flags);
    }
    if (value) {
        value->clear();
        _buf.cutn(value, value_size);
    }
    if (cas_value) {
        *cas_value = header.cas_value;
    }
    _err.clear();
    return true;
}

bool MemcacheResponse::PopGet(
    std::string* value, uint32_t* flags, uint64_t* cas_value) {
    butil::IOBuf tmp;
    if (PopGet(&tmp, flags, cas_value)) {
        tmp.copy_to(value);
        return true;
    }
    return false;
}

// MUST NOT have extras
// MUST NOT have key
// MUST NOT have value
bool MemcacheResponse::PopDelete() {
    return PopStore(policy::MC_BINARY_DELETE, NULL);
}
bool MemcacheResponse::PopFlush() {
    return PopStore(policy::MC_BINARY_FLUSH, NULL);
}

struct StoreHeaderWithExtras {
    policy::MemcacheRequestHeader header;
    uint32_t flags;
    uint32_t exptime;
} __attribute__((packed));
BAIDU_CASSERT(sizeof(StoreHeaderWithExtras) == 32, must_match);
const size_t STORE_EXTRAS = sizeof(StoreHeaderWithExtras) -
                                                    sizeof(policy::MemcacheRequestHeader);
// MUST have extras.
// MUST have key.
// MAY have value.
// Extra data for set/add/replace:
// Byte/     0       |       1       |       2       |       3       |
//    /              |               |               |               |
//   |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
//   +---------------+---------------+---------------+---------------+
//  0| Flags                                                         |
//   +---------------+---------------+---------------+---------------+
//  4| Expiration                                                    |
//   +---------------+---------------+---------------+---------------+
//   Total 8 bytes
bool MemcacheRequest::Store(
    uint8_t command, const butil::StringPiece& key, const butil::StringPiece& value,
    uint32_t flags, uint32_t exptime, uint64_t cas_value) {
    StoreHeaderWithExtras header_with_extras = {{
            policy::MC_MAGIC_REQUEST,
            command,
            butil::HostToNet16(key.size()),
            STORE_EXTRAS,
            policy::MC_BINARY_RAW_BYTES,
            0,
            butil::HostToNet32(STORE_EXTRAS + key.size() + value.size()),
            0,
            butil::HostToNet64(cas_value)
        }, butil::HostToNet32(flags), butil::HostToNet32(exptime)};
    if (_buf.append(&header_with_extras, sizeof(header_with_extras))) {
        return false;
    }
    if (_buf.append(key.data(), key.size())) {
        return false;
    }
    if (_buf.append(value.data(), value.size())) {
        return false;
    }
    ++_pipelined_count;
    return true;
}

// MUST have CAS
// MUST NOT have extras
// MUST NOT have key
// MUST NOT have value
bool MemcacheResponse::PopStore(uint8_t command, uint64_t* cas_value) {
    const size_t n = _buf.size();
    policy::MemcacheResponseHeader header;
    if (n < sizeof(header)) {
        butil::string_printf(&_err, "buffer is too small to contain a header");
        return false;
    }
    _buf.copy_to(&header, sizeof(header));
    if (header.command != command) {
        butil::string_printf(&_err, "Not a STORE response");
        return false;
    }
    if (n < sizeof(header) + header.total_body_length) {
        butil::string_printf(&_err, "Not enough data");
        return false;
    }
    LOG_IF(ERROR, header.extras_length != 0) << "STORE response must not have flags";
    LOG_IF(ERROR, header.key_length != 0) << "STORE response must not have key";
    int value_size = (int)header.total_body_length - (int)header.extras_length
        - (int)header.key_length;
    if (header.status != (uint16_t)STATUS_SUCCESS) {
        _buf.pop_front(sizeof(header) + header.extras_length + header.key_length);
        _err.clear();
        _buf.cutn(&_err, value_size);
        return false;
    }
    LOG_IF(ERROR, value_size != 0) << "STORE response must not have value, actually="
                                   << value_size;
    _buf.pop_front(sizeof(header) + header.total_body_length);
    if (cas_value) {
        CHECK(header.cas_value);
        *cas_value = header.cas_value;
    }
    _err.clear();
    return true;
}

bool MemcacheRequest::Set(
    const butil::StringPiece& key, const butil::StringPiece& value,
    uint32_t flags, uint32_t exptime, uint64_t cas_value) {
    return Store(policy::MC_BINARY_SET, key, value, flags, exptime, cas_value);
}

bool MemcacheRequest::Add(
    const butil::StringPiece& key, const butil::StringPiece& value,
    uint32_t flags, uint32_t exptime, uint64_t cas_value) {
    return Store(policy::MC_BINARY_ADD, key, value, flags, exptime, cas_value);
}

bool MemcacheRequest::Replace(
    const butil::StringPiece& key, const butil::StringPiece& value,
    uint32_t flags, uint32_t exptime, uint64_t cas_value) {
    return Store(policy::MC_BINARY_REPLACE, key, value, flags, exptime, cas_value);
}
    
bool MemcacheRequest::Append(
    const butil::StringPiece& key, const butil::StringPiece& value,
    uint32_t flags, uint32_t exptime, uint64_t cas_value) {
    if (value.empty()) {
        LOG(ERROR) << "value to append must be non-empty";
        return false;
    }
    return Store(policy::MC_BINARY_APPEND, key, value, flags, exptime, cas_value);
}

bool MemcacheRequest::Prepend(
    const butil::StringPiece& key, const butil::StringPiece& value,
    uint32_t flags, uint32_t exptime, uint64_t cas_value) {
    if (value.empty()) {
        LOG(ERROR) << "value to prepend must be non-empty";
        return false;
    }
    return Store(policy::MC_BINARY_PREPEND, key, value, flags, exptime, cas_value);
}

bool MemcacheResponse::PopSet(uint64_t* cas_value) {
    return PopStore(policy::MC_BINARY_SET, cas_value);
}
bool MemcacheResponse::PopAdd(uint64_t* cas_value) {
    return PopStore(policy::MC_BINARY_ADD, cas_value);
}
bool MemcacheResponse::PopReplace(uint64_t* cas_value) {
    return PopStore(policy::MC_BINARY_REPLACE, cas_value);
}
bool MemcacheResponse::PopAppend(uint64_t* cas_value) {
    return PopStore(policy::MC_BINARY_APPEND, cas_value);
}
bool MemcacheResponse::PopPrepend(uint64_t* cas_value) {
    return PopStore(policy::MC_BINARY_PREPEND, cas_value);
}

struct IncrHeaderWithExtras {
    policy::MemcacheRequestHeader header;
    uint64_t delta;
    uint64_t initial_value;
    uint32_t exptime;
} __attribute__((packed));
BAIDU_CASSERT(sizeof(IncrHeaderWithExtras) == 44, must_match);

const size_t INCR_EXTRAS = sizeof(IncrHeaderWithExtras) -
    sizeof(policy::MemcacheRequestHeader);

// MUST have extras.
// MUST have key.
// MUST NOT have value.
// Extra data for incr/decr:
// Byte/     0       |       1       |       2       |       3       |
//    /              |               |               |               |
//   |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
//   +---------------+---------------+---------------+---------------+
//  0| Delta to add / subtract                                      |
//   |                                                               |
//   +---------------+---------------+---------------+---------------+
//  8| Initial value                                                 |
//   |                                                               |
//   +---------------+---------------+---------------+---------------+
// 16| Expiration                                                    |
//   +---------------+---------------+---------------+---------------+
//   Total 20 bytes
bool MemcacheRequest::Counter(
    uint8_t command, const butil::StringPiece& key, uint64_t delta,
    uint64_t initial_value, uint32_t exptime) {
    IncrHeaderWithExtras header_with_extras = {{
            policy::MC_MAGIC_REQUEST,
            command,
            butil::HostToNet16(key.size()),
            INCR_EXTRAS,
            policy::MC_BINARY_RAW_BYTES,
            0,
            butil::HostToNet32(INCR_EXTRAS + key.size()),
            0,
            0 }, butil::HostToNet64(delta), butil::HostToNet64(initial_value), butil::HostToNet32(exptime) };
    if (_buf.append(&header_with_extras, sizeof(header_with_extras))) {
        return false;
    }
    if (_buf.append(key.data(), key.size())) {
        return false;
    }
    ++_pipelined_count;
    return true;
}

bool MemcacheRequest::Increment(const butil::StringPiece& key, uint64_t delta,
                                uint64_t initial_value, uint32_t exptime) {
    return Counter(policy::MC_BINARY_INCREMENT, key, delta, initial_value, exptime);
}

bool MemcacheRequest::Decrement(const butil::StringPiece& key, uint64_t delta,
                                uint64_t initial_value, uint32_t exptime) {
    return Counter(policy::MC_BINARY_DECREMENT, key, delta, initial_value, exptime);
}

// MUST NOT have extras.
// MUST NOT have key.
// MUST have value.
// Byte/     0       |       1       |       2       |       3       |
//    /              |               |               |               |
//   |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
//   +---------------+---------------+---------------+---------------+
//  0| 64-bit unsigned response.                                     |
//   |                                                               |
//   +---------------+---------------+---------------+---------------+
//   Total 8 bytes
bool MemcacheResponse::PopCounter(
    uint8_t command, uint64_t* new_value, uint64_t* cas_value) {
    const size_t n = _buf.size();
    policy::MemcacheResponseHeader header;
    if (n < sizeof(header)) {
        butil::string_printf(&_err, "buffer is too small to contain a header");
        return false;
    }
    _buf.copy_to(&header, sizeof(header));
    if (header.command != command) {
        butil::string_printf(&_err, "not a INCR/DECR response");
        return false;
    }
    if (n < sizeof(header) + header.total_body_length) {
        butil::string_printf(&_err, "response=%u < header=%u + body=%u",
                  (unsigned)n, (unsigned)sizeof(header), header.total_body_length);
        return false;
    }
    LOG_IF(ERROR, header.extras_length != 0) << "INCR/DECR response must not have flags";
    LOG_IF(ERROR, header.key_length != 0) << "INCR/DECR response must not have key";
    const int value_size = (int)header.total_body_length - (int)header.extras_length
        - (int)header.key_length;
    _buf.pop_front(sizeof(header) + header.extras_length + header.key_length);

    if (header.status != (uint16_t)STATUS_SUCCESS) {
        if (value_size < 0) {
            butil::string_printf(&_err, "value_size=%d is negative", value_size);
        } else {
            _err.clear();
            _buf.cutn(&_err, value_size);
        }
        return false;
    }
    if (value_size != 8) {
        butil::string_printf(&_err, "value_size=%d is not 8", value_size);
        return false;
    }
    uint64_t raw_value = 0;
    _buf.cutn(&raw_value, sizeof(raw_value));
    *new_value = butil::NetToHost64(raw_value);
    if (cas_value) {
        *cas_value = header.cas_value;
    }
    _err.clear();
    return true;
}

bool MemcacheResponse::PopIncrement(uint64_t* new_value, uint64_t* cas_value) {
    return PopCounter(policy::MC_BINARY_INCREMENT, new_value, cas_value);
}
bool MemcacheResponse::PopDecrement(uint64_t* new_value, uint64_t* cas_value) {
    return PopCounter(policy::MC_BINARY_DECREMENT, new_value, cas_value);
}

// MUST have extras.
// MUST have key.
// MUST NOT have value.
struct TouchHeaderWithExtras {
    policy::MemcacheRequestHeader header;
    uint32_t exptime;
} __attribute__((packed));
BAIDU_CASSERT(sizeof(TouchHeaderWithExtras) == 28, must_match);
const size_t TOUCH_EXTRAS = sizeof(TouchHeaderWithExtras) - sizeof(policy::MemcacheRequestHeader);

// MAY have extras.
// MUST NOT have key.
// MUST NOT have value.
// Extra data for touch:
//    Byte/     0       |       1       |       2       |       3       |
//       /              |               |               |               |
//      |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
//      +---------------+---------------+---------------+---------------+
//     0| Expiration                                                    |
//      +---------------+---------------+---------------+---------------+
//    Total 4 bytes
bool MemcacheRequest::Touch(const butil::StringPiece& key, uint32_t exptime) {
    TouchHeaderWithExtras header_with_extras = {{
            policy::MC_MAGIC_REQUEST,
            policy::MC_BINARY_TOUCH,
            butil::HostToNet16(key.size()),
            TOUCH_EXTRAS,
            policy::MC_BINARY_RAW_BYTES,
            0,
            butil::HostToNet32(TOUCH_EXTRAS + key.size()),
            0,
            0 }, butil::HostToNet32(exptime) };
    if (_buf.append(&header_with_extras, sizeof(header_with_extras))) {
        return false;
    }
    if (_buf.append(key.data(), key.size())) {
        return false;
    }
    ++_pipelined_count;
    return true;
}

// MUST NOT have extras.
// MUST NOT have key.
// MUST NOT have value.
bool MemcacheRequest::Version() {
    const policy::MemcacheRequestHeader header = {
        policy::MC_MAGIC_REQUEST,
        policy::MC_BINARY_VERSION,
        0,
        0,
        policy::MC_BINARY_RAW_BYTES,
        0,
        0,
        0,
        0
    };
    if (_buf.append(&header, sizeof(header))) {
        return false;
    }
    ++_pipelined_count;
    return true;
}

// MUST NOT have extras.
// MUST NOT have key.
// MUST have value.
bool MemcacheResponse::PopVersion(std::string* version) {
    const size_t n = _buf.size();
    policy::MemcacheResponseHeader header;
    if (n < sizeof(header)) {
        butil::string_printf(&_err, "buffer is too small to contain a header");
        return false;
    }
    _buf.copy_to(&header, sizeof(header));
    if (header.command != policy::MC_BINARY_VERSION) {
        butil::string_printf(&_err, "not a VERSION response");
        return false;
    }
    if (n < sizeof(header) + header.total_body_length) {
        butil::string_printf(&_err, "response=%u < header=%u + body=%u",
                  (unsigned)n, (unsigned)sizeof(header), header.total_body_length);
        return false;
    }
    LOG_IF(ERROR, header.extras_length != 0) << "VERSION response must not have flags";
    LOG_IF(ERROR, header.key_length != 0) << "VERSION response must not have key";
    const int value_size = (int)header.total_body_length - (int)header.extras_length
        - (int)header.key_length;
    _buf.pop_front(sizeof(header) + header.extras_length + header.key_length);
    if (value_size < 0) {
        butil::string_printf(&_err, "value_size=%d is negative", value_size);
        return false;
    }
    if (header.status != (uint16_t)STATUS_SUCCESS) {
        _err.clear();
        _buf.cutn(&_err, value_size);
        return false;
    }
    if (version) {
        version->clear();
        _buf.cutn(version, value_size);
    }
    _err.clear();
    return true;
}
 
} // namespace brpc