Commit 1f2b637a authored by caidaojin's avatar caidaojin

couchbase channel

parent 5047332c
...@@ -10,29 +10,11 @@ config_setting( ...@@ -10,29 +10,11 @@ config_setting(
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
) )
config_setting(
name = "with_thrift",
define_values = {"with_thrift": "true"},
visibility = ["//visibility:public"],
)
config_setting( config_setting(
name = "unittest", name = "unittest",
define_values = {"unittest": "true"}, define_values = {"unittest": "true"},
) )
config_setting(
name = "darwin",
values = {"cpu": "darwin"},
visibility = ["//visibility:public"],
)
config_setting(
name = "linux",
values = {"cpu": "linux"},
visibility = ["//visibility:public"],
)
COPTS = [ COPTS = [
"-DBTHREAD_USE_FAST_PTHREAD_MUTEX", "-DBTHREAD_USE_FAST_PTHREAD_MUTEX",
"-D__const__=", "-D__const__=",
...@@ -46,40 +28,16 @@ COPTS = [ ...@@ -46,40 +28,16 @@ COPTS = [
] + select({ ] + select({
":with_glog": ["-DBRPC_WITH_GLOG=1"], ":with_glog": ["-DBRPC_WITH_GLOG=1"],
"//conditions:default": ["-DBRPC_WITH_GLOG=0"], "//conditions:default": ["-DBRPC_WITH_GLOG=0"],
}) + select({
":with_thrift": ["-DENABLE_THRIFT_FRAMED_PROTOCOL=1"],
"//conditions:default": [""],
}) })
LINKOPTS = [ LINKOPTS = [
"-lpthread", "-lpthread",
"-ldl", "-lrt",
"-lz",
"-lssl", "-lssl",
"-lcrypto", "-lcrypto",
] + select({ "-ldl",
":darwin": [ "-lz",
"-framework CoreFoundation", ]
"-framework CoreGraphics",
"-framework CoreData",
"-framework CoreText",
"-framework Security",
"-framework Foundation",
"-Wl,-U,_MallocExtension_ReleaseFreeMemory",
"-Wl,-U,_ProfilerStart",
"-Wl,-U,_ProfilerStop",
"-Wl,-U,_RegisterThriftProtocol",
],
"//conditions:default": [
"-lrt",
],
}) + select({
":with_thrift": [
"-lthriftnb",
"-levent",
"-lthrift"],
"//conditions:default": [],
})
genrule( genrule(
name = "config_h", name = "config_h",
...@@ -118,6 +76,10 @@ BUTIL_SRCS = [ ...@@ -118,6 +76,10 @@ BUTIL_SRCS = [
"src/butil/third_party/snappy/snappy-stubs-internal.cc", "src/butil/third_party/snappy/snappy-stubs-internal.cc",
"src/butil/third_party/snappy/snappy.cc", "src/butil/third_party/snappy/snappy.cc",
"src/butil/third_party/murmurhash3/murmurhash3.cpp", "src/butil/third_party/murmurhash3/murmurhash3.cpp",
"src/butil/third_party/libvbucket/cJSON.c",
"src/butil/third_party/libvbucket/crc32.c",
"src/butil/third_party/libvbucket/ketama.c",
"src/butil/third_party/libvbucket/vbucket.c",
"src/butil/arena.cpp", "src/butil/arena.cpp",
"src/butil/at_exit.cc", "src/butil/at_exit.cc",
"src/butil/atomicops_internals_x86_gcc.cc", "src/butil/atomicops_internals_x86_gcc.cc",
...@@ -145,6 +107,7 @@ BUTIL_SRCS = [ ...@@ -145,6 +107,7 @@ BUTIL_SRCS = [
"src/butil/files/scoped_file.cc", "src/butil/files/scoped_file.cc",
"src/butil/files/scoped_temp_dir.cc", "src/butil/files/scoped_temp_dir.cc",
"src/butil/file_util.cc", "src/butil/file_util.cc",
"src/butil/file_util_linux.cc",
"src/butil/file_util_posix.cc", "src/butil/file_util_posix.cc",
"src/butil/guid.cc", "src/butil/guid.cc",
"src/butil/guid_posix.cc", "src/butil/guid_posix.cc",
...@@ -159,7 +122,6 @@ BUTIL_SRCS = [ ...@@ -159,7 +122,6 @@ BUTIL_SRCS = [
"src/butil/memory/weak_ptr.cc", "src/butil/memory/weak_ptr.cc",
"src/butil/posix/file_descriptor_shuffle.cc", "src/butil/posix/file_descriptor_shuffle.cc",
"src/butil/posix/global_descriptors.cc", "src/butil/posix/global_descriptors.cc",
"src/butil/process_util.cc",
"src/butil/rand_util.cc", "src/butil/rand_util.cc",
"src/butil/rand_util_posix.cc", "src/butil/rand_util_posix.cc",
"src/butil/fast_rand.cpp", "src/butil/fast_rand.cpp",
...@@ -175,6 +137,7 @@ BUTIL_SRCS = [ ...@@ -175,6 +137,7 @@ BUTIL_SRCS = [
"src/butil/strings/string_util.cc", "src/butil/strings/string_util.cc",
"src/butil/strings/string_util_constants.cc", "src/butil/strings/string_util_constants.cc",
"src/butil/strings/stringprintf.cc", "src/butil/strings/stringprintf.cc",
"src/butil/strings/sys_string_conversions_posix.cc",
"src/butil/strings/utf_offset_string_conversions.cc", "src/butil/strings/utf_offset_string_conversions.cc",
"src/butil/strings/utf_string_conversion_utils.cc", "src/butil/strings/utf_string_conversion_utils.cc",
"src/butil/strings/utf_string_conversions.cc", "src/butil/strings/utf_string_conversions.cc",
...@@ -182,6 +145,7 @@ BUTIL_SRCS = [ ...@@ -182,6 +145,7 @@ BUTIL_SRCS = [
"src/butil/synchronization/condition_variable_posix.cc", "src/butil/synchronization/condition_variable_posix.cc",
"src/butil/synchronization/waitable_event_posix.cc", "src/butil/synchronization/waitable_event_posix.cc",
"src/butil/threading/non_thread_safe_impl.cc", "src/butil/threading/non_thread_safe_impl.cc",
"src/butil/threading/platform_thread_linux.cc",
"src/butil/threading/platform_thread_posix.cc", "src/butil/threading/platform_thread_posix.cc",
"src/butil/threading/simple_thread.cc", "src/butil/threading/simple_thread.cc",
"src/butil/threading/thread_checker_impl.cc", "src/butil/threading/thread_checker_impl.cc",
...@@ -217,82 +181,8 @@ BUTIL_SRCS = [ ...@@ -217,82 +181,8 @@ BUTIL_SRCS = [
"src/butil/containers/case_ignored_flat_map.cpp", "src/butil/containers/case_ignored_flat_map.cpp",
"src/butil/iobuf.cpp", "src/butil/iobuf.cpp",
"src/butil/popen.cpp", "src/butil/popen.cpp",
] + select({ ]
":darwin": [
"src/butil/time/time_mac.cc",
"src/butil/mac/scoped_mach_port.cc",
],
"//conditions:default": [
"src/butil/file_util_linux.cc",
"src/butil/threading/platform_thread_linux.cc",
"src/butil/strings/sys_string_conversions_posix.cc",
],
})
objc_library(
name = "macos_lib",
hdrs = [":config_h",
"src/butil/atomicops.h",
"src/butil/atomicops_internals_atomicword_compat.h",
"src/butil/atomicops_internals_mac.h",
"src/butil/base_export.h",
"src/butil/basictypes.h",
"src/butil/build_config.h",
"src/butil/compat.h",
"src/butil/compiler_specific.h",
"src/butil/containers/hash_tables.h",
"src/butil/debug/debugger.h",
"src/butil/debug/leak_annotations.h",
"src/butil/file_util.h",
"src/butil/file_descriptor_posix.h",
"src/butil/files/file_path.h",
"src/butil/files/file.h",
"src/butil/files/scoped_file.h",
"src/butil/lazy_instance.h",
"src/butil/logging.h",
"src/butil/mac/bundle_locations.h",
"src/butil/mac/foundation_util.h",
"src/butil/mac/scoped_cftyperef.h",
"src/butil/mac/scoped_typeref.h",
"src/butil/macros.h",
"src/butil/memory/aligned_memory.h",
"src/butil/memory/scoped_policy.h",
"src/butil/memory/scoped_ptr.h",
"src/butil/move.h",
"src/butil/port.h",
"src/butil/posix/eintr_wrapper.h",
"src/butil/scoped_generic.h",
"src/butil/strings/string16.h",
"src/butil/strings/string_piece.h",
"src/butil/strings/string_util.h",
"src/butil/strings/string_util_posix.h",
"src/butil/strings/sys_string_conversions.h",
"src/butil/synchronization/lock.h",
"src/butil/time/time.h",
"src/butil/time.h",
"src/butil/third_party/dynamic_annotations/dynamic_annotations.h",
"src/butil/threading/platform_thread.h",
"src/butil/threading/thread_restrictions.h",
"src/butil/threading/thread_id_name_manager.h",
"src/butil/type_traits.h",
],
non_arc_srcs = [
"src/butil/mac/bundle_locations.mm",
"src/butil/mac/foundation_util.mm",
"src/butil/file_util_mac.mm",
"src/butil/threading/platform_thread_mac.mm",
"src/butil/strings/sys_string_conversions_mac.mm",
],
deps = [
"@com_github_gflags_gflags//:gflags",
] + select({
":with_glog": ["@com_github_google_glog//:glog"],
"//conditions:default": [],
}),
includes = ["src/"],
enable_modules = True,
tags = ["manual"],
)
cc_library( cc_library(
name = "butil", name = "butil",
...@@ -313,7 +203,6 @@ cc_library( ...@@ -313,7 +203,6 @@ cc_library(
"@com_github_gflags_gflags//:gflags", "@com_github_gflags_gflags//:gflags",
] + select({ ] + select({
":with_glog": ["@com_github_google_glog//:glog"], ":with_glog": ["@com_github_google_glog//:glog"],
":darwin": [":macos_lib"],
"//conditions:default": [], "//conditions:default": [],
}), }),
includes = [ includes = [
...@@ -458,17 +347,7 @@ cc_library( ...@@ -458,17 +347,7 @@ cc_library(
srcs = glob([ srcs = glob([
"src/brpc/*.cpp", "src/brpc/*.cpp",
"src/brpc/**/*.cpp", "src/brpc/**/*.cpp",
], ]),
exclude = [
"src/brpc/thrift_service.cpp",
"src/brpc/thrift_message.cpp",
"src/brpc/policy/thrift_protocol.cpp",
]) + select({
":with_thrift" : glob([
"src/brpc/thrift*.cpp",
"src/brpc/**/thrift*.cpp"]),
"//conditions:default" : [],
}),
hdrs = glob([ hdrs = glob([
"src/brpc/*.h", "src/brpc/*.h",
"src/brpc/**/*.h" "src/brpc/**/*.h"
......
...@@ -2,9 +2,7 @@ cmake_minimum_required(VERSION 2.8.10) ...@@ -2,9 +2,7 @@ cmake_minimum_required(VERSION 2.8.10)
project(brpc C CXX) project(brpc C CXX)
# Enable MACOSX_RPATH. Run "cmake --help-policy CMP0042" for policy details. # Enable MACOSX_RPATH. Run "cmake --help-policy CMP0042" for policy details.
if(POLICY CMP0042) cmake_policy(SET CMP0042 NEW)
cmake_policy(SET CMP0042 NEW)
endif()
set(BRPC_VERSION 0.9.0) set(BRPC_VERSION 0.9.0)
...@@ -22,14 +20,14 @@ else() ...@@ -22,14 +20,14 @@ else()
message(WARNING "You are using an unsupported compiler! Compilation has only been tested with Clang and GCC.") message(WARNING "You are using an unsupported compiler! Compilation has only been tested with Clang and GCC.")
endif() endif()
option(WITH_GLOG "With glog" OFF) option(BRPC_WITH_GLOG "With glog" OFF)
option(DEBUG "Print debug logs" OFF) option(DEBUG "Print debug logs" OFF)
option(WITH_DEBUG_SYMBOLS "With debug symbols" ON) option(WITH_DEBUG_SYMBOLS "With debug symbols" ON)
option(WITH_THRIFT "With thrift framed protocol supported" OFF) option(BRPC_WITH_THRIFT "With thrift framed protocol supported" OFF)
option(BUILD_UNIT_TESTS "Whether to build unit tests" OFF) option(BUILD_UNIT_TESTS "Whether to build unit tests" OFF)
set(WITH_GLOG_VAL "0") set(WITH_GLOG_VAL "0")
if(WITH_GLOG) if(BRPC_WITH_GLOG)
set(WITH_GLOG_VAL "1") set(WITH_GLOG_VAL "1")
endif() endif()
...@@ -37,10 +35,10 @@ if(WITH_DEBUG_SYMBOLS) ...@@ -37,10 +35,10 @@ if(WITH_DEBUG_SYMBOLS)
set(DEBUG_SYMBOL "-g") set(DEBUG_SYMBOL "-g")
endif() endif()
if(WITH_THRIFT) if(BRPC_WITH_THRIFT)
set(THRIFT_CPP_FLAG "-DENABLE_THRIFT_FRAMED_PROTOCOL") set(THRIFT_CPP_FLAG "-DENABLE_THRIFT_FRAMED_PROTOCOL")
set(THRIFTNB_LIB "thriftnb") set(THRIFT_LIB "thriftnb")
set(THRIFT_LIB "thrift") message("Enable thrift framed procotol")
endif() endif()
include(GNUInstallDirs) include(GNUInstallDirs)
...@@ -121,7 +119,7 @@ if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB)) ...@@ -121,7 +119,7 @@ if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB))
message(FATAL_ERROR "Fail to find leveldb") message(FATAL_ERROR "Fail to find leveldb")
endif() endif()
if(WITH_GLOG) if(BRPC_WITH_GLOG)
find_path(GLOG_INCLUDE_PATH NAMES glog/logging.h) find_path(GLOG_INCLUDE_PATH NAMES glog/logging.h)
find_library(GLOG_LIB NAMES glog) find_library(GLOG_LIB NAMES glog)
if((NOT GLOG_INCLUDE_PATH) OR (NOT GLOG_LIB)) if((NOT GLOG_INCLUDE_PATH) OR (NOT GLOG_LIB))
...@@ -157,7 +155,6 @@ set(DYNAMIC_LIB ...@@ -157,7 +155,6 @@ set(DYNAMIC_LIB
${PROTOC_LIB} ${PROTOC_LIB}
${CMAKE_THREAD_LIBS_INIT} ${CMAKE_THREAD_LIBS_INIT}
${THRIFT_LIB} ${THRIFT_LIB}
${THRIFTNB_LIB}
${OPENSSL_LIBRARIES} ${OPENSSL_LIBRARIES}
${OPENSSL_CRYPTO_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY}
dl dl
...@@ -165,7 +162,7 @@ set(DYNAMIC_LIB ...@@ -165,7 +162,7 @@ set(DYNAMIC_LIB
) )
set(BRPC_PRIVATE_LIBS "-lgflags -lprotobuf -lleveldb -lprotoc -lssl -lcrypto -ldl -lz") set(BRPC_PRIVATE_LIBS "-lgflags -lprotobuf -lleveldb -lprotoc -lssl -lcrypto -ldl -lz")
if(WITH_GLOG) if(BRPC_WITH_GLOG)
set(DYNAMIC_LIB ${DYNAMIC_LIB} ${GLOG_LIB}) set(DYNAMIC_LIB ${DYNAMIC_LIB} ${GLOG_LIB})
set(BRPC_PRIVATE_LIBS "${BRPC_PRIVATE_LIBS} -lglog") set(BRPC_PRIVATE_LIBS "${BRPC_PRIVATE_LIBS} -lglog")
endif() endif()
...@@ -184,7 +181,8 @@ elseif(CMAKE_SYSTEM_NAME STREQUAL "Darwin") ...@@ -184,7 +181,8 @@ elseif(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
"-framework Foundation" "-framework Foundation"
"-Wl,-U,_MallocExtension_ReleaseFreeMemory" "-Wl,-U,_MallocExtension_ReleaseFreeMemory"
"-Wl,-U,_ProfilerStart" "-Wl,-U,_ProfilerStart"
"-Wl,-U,_ProfilerStop") "-Wl,-U,_ProfilerStop"
"-Wl,-U,_RegisterThriftProtocol")
endif() endif()
# for *.so # for *.so
...@@ -207,6 +205,10 @@ set(BUTIL_SOURCES ...@@ -207,6 +205,10 @@ set(BUTIL_SOURCES
${PROJECT_SOURCE_DIR}/src/butil/third_party/snappy/snappy-stubs-internal.cc ${PROJECT_SOURCE_DIR}/src/butil/third_party/snappy/snappy-stubs-internal.cc
${PROJECT_SOURCE_DIR}/src/butil/third_party/snappy/snappy.cc ${PROJECT_SOURCE_DIR}/src/butil/third_party/snappy/snappy.cc
${PROJECT_SOURCE_DIR}/src/butil/third_party/murmurhash3/murmurhash3.cpp ${PROJECT_SOURCE_DIR}/src/butil/third_party/murmurhash3/murmurhash3.cpp
${PROJECT_SOURCE_DIR}/src/butil/third_party/libvbucket/cJSON.c
${PROJECT_SOURCE_DIR}/src/butil/third_party/libvbucket/crc32.c
${PROJECT_SOURCE_DIR}/src/butil/third_party/libvbucket/ketama.c
${PROJECT_SOURCE_DIR}/src/butil/third_party/libvbucket/vbucket.c
${PROJECT_SOURCE_DIR}/src/butil/arena.cpp ${PROJECT_SOURCE_DIR}/src/butil/arena.cpp
${PROJECT_SOURCE_DIR}/src/butil/at_exit.cc ${PROJECT_SOURCE_DIR}/src/butil/at_exit.cc
${PROJECT_SOURCE_DIR}/src/butil/atomicops_internals_x86_gcc.cc ${PROJECT_SOURCE_DIR}/src/butil/atomicops_internals_x86_gcc.cc
...@@ -379,7 +381,6 @@ set(SOURCES ...@@ -379,7 +381,6 @@ set(SOURCES
${JSON2PB_SOURCES} ${JSON2PB_SOURCES}
${MCPACK2PB_SOURCES} ${MCPACK2PB_SOURCES}
${BRPC_SOURCES} ${BRPC_SOURCES}
${THRIFT_SOURCES}
) )
add_subdirectory(src) add_subdirectory(src)
......
...@@ -47,6 +47,10 @@ BUTIL_SOURCES = \ ...@@ -47,6 +47,10 @@ BUTIL_SOURCES = \
src/butil/third_party/snappy/snappy-stubs-internal.cc \ src/butil/third_party/snappy/snappy-stubs-internal.cc \
src/butil/third_party/snappy/snappy.cc \ src/butil/third_party/snappy/snappy.cc \
src/butil/third_party/murmurhash3/murmurhash3.cpp \ src/butil/third_party/murmurhash3/murmurhash3.cpp \
src/butil/third_party/libvbucket/cJSON.c \
src/butil/third_party/libvbucket/crc32.c \
src/butil/third_party/libvbucket/ketama.c \
src/butil/third_party/libvbucket/vbucket.c \
src/butil/arena.cpp \ src/butil/arena.cpp \
src/butil/at_exit.cc \ src/butil/at_exit.cc \
src/butil/atomicops_internals_x86_gcc.cc \ src/butil/atomicops_internals_x86_gcc.cc \
......
...@@ -124,6 +124,7 @@ struct ChannelOptions { ...@@ -124,6 +124,7 @@ struct ChannelOptions {
class Channel : public ChannelBase { class Channel : public ChannelBase {
friend class Controller; friend class Controller;
friend class SelectiveChannel; friend class SelectiveChannel;
friend class CouchbaseChannel;
public: public:
Channel(ProfilerLinker = ProfilerLinker()); Channel(ProfilerLinker = ProfilerLinker());
~Channel(); ~Channel();
......
...@@ -105,6 +105,8 @@ friend class ControllerPrivateAccessor; ...@@ -105,6 +105,8 @@ friend class ControllerPrivateAccessor;
friend class ServerPrivateAccessor; friend class ServerPrivateAccessor;
friend class SelectiveChannel; friend class SelectiveChannel;
friend class ThriftStub; friend class ThriftStub;
friend class CouchbaseChannel;
friend class CouchbaseDone;
friend class schan::Sender; friend class schan::Sender;
friend class schan::SubDone; friend class schan::SubDone;
friend class policy::OnServerStreamCreated; friend class policy::OnServerStreamCreated;
...@@ -145,6 +147,15 @@ public: ...@@ -145,6 +147,15 @@ public:
void set_timeout_ms(int64_t timeout_ms); void set_timeout_ms(int64_t timeout_ms);
int64_t timeout_ms() const { return _timeout_ms; } int64_t timeout_ms() const { return _timeout_ms; }
// Set timeout of the request trace deadline (in milliseconds)
void set_request_trace_timeout_ms(int64_t timeout_ms);
// Set the request trace deadline. We suggest you to use
// set_request_trace_timeout_ms for root request.
void set_request_trace_deadline(int64_t request_trace_deadline) {
_request_trace_deadline = request_trace_deadline;
}
// Set/get the delay to send backup request in milliseconds. Use // Set/get the delay to send backup request in milliseconds. Use
// ChannelOptions.backup_request_ms on unset. // ChannelOptions.backup_request_ms on unset.
void set_backup_request_ms(int64_t timeout_ms); void set_backup_request_ms(int64_t timeout_ms);
...@@ -373,6 +384,11 @@ public: ...@@ -373,6 +384,11 @@ public:
// Get the data attached to a mongo session(practically a socket). // Get the data attached to a mongo session(practically a socket).
MongoContext* mongo_session_data() { return _mongo_session_data.get(); } MongoContext* mongo_session_data() { return _mongo_session_data.get(); }
// Get a request trace deadline timestamp.
int64_t request_trace_deadline() const;
// Get remain milliseconds to the request trace deadline.
int64_t get_request_trace_remain_ms() const;
// ------------------------------------------------------------------- // -------------------------------------------------------------------
// Both-side methods. // Both-side methods.
// Following methods can be called from both client and server. But they // Following methods can be called from both client and server. But they
...@@ -454,7 +470,14 @@ public: ...@@ -454,7 +470,14 @@ public:
void set_idl_result(int64_t result) { _idl_result = result; } void set_idl_result(int64_t result) { _idl_result = result; }
int64_t idl_result() const { return _idl_result; } int64_t idl_result() const { return _idl_result; }
const std::string& thrift_method_name() { return _thrift_method_name; } bool has_request_trace_deadline() const {
return _request_trace_deadline != UNSET_MAGIC_NUM;
}
void set_thrift_method_name(const std::string& method_name) {
_thrift_method_name = method_name;
}
std::string thrift_method_name() { return _thrift_method_name; }
private: private:
struct CompletionInfo { struct CompletionInfo {
...@@ -628,6 +651,10 @@ private: ...@@ -628,6 +651,10 @@ private:
int32_t _timeout_ms; int32_t _timeout_ms;
int32_t _connect_timeout_ms; int32_t _connect_timeout_ms;
int32_t _backup_request_ms; int32_t _backup_request_ms;
// Deadline of this rpc trace(since the Epoch in microseconds),
// set by root request of the rpc trace, and each child node of trace
// can judge root rpc request timed out or not according to the value.
int64_t _request_trace_deadline;
// Deadline of this RPC (since the Epoch in microseconds). // Deadline of this RPC (since the Epoch in microseconds).
int64_t _abstime_us; int64_t _abstime_us;
// Timer registered to trigger RPC timeout event // Timer registered to trigger RPC timeout event
...@@ -689,6 +716,7 @@ private: ...@@ -689,6 +716,7 @@ private:
// Thrift method name, only used when thrift protocol enabled // Thrift method name, only used when thrift protocol enabled
std::string _thrift_method_name; std::string _thrift_method_name;
uint32_t _thrift_seq_id;
}; };
// Advises the RPC system that the caller desires that the RPC call be // Advises the RPC system that the caller desires that the RPC call be
......
// Copyright (c) 2018 Qiyi, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Daojin Cai (caidaojin@qiyi.com)
#include "brpc/couchbase.h"
#include "brpc/policy/memcache_binary_header.h"
#include "butil/string_printf.h"
#include "butil/sys_byteorder.h"
namespace brpc {
int CouchbaseRequest::ParseRequest(
std::string* key, policy::MemcacheBinaryCommand* command) const {
const size_t n = _buf.size();
policy::MemcacheRequestHeader header;
if (n < sizeof(header)) {
return -1;
}
_buf.copy_to(&header, sizeof(header));
// TODO: need check header.total_body_length
if (header.key_length == 0) {
return 1;
}
*command = static_cast<policy::MemcacheBinaryCommand>(header.command);
_buf.copy_to(key, header.key_length, sizeof(header) + header.extras_length);
return 0;
}
bool CouchbaseRequest::ReplicasGet(const butil::StringPiece& key) {
const policy::MemcacheRequestHeader header = {
policy::MC_MAGIC_REQUEST,
0x83,
butil::HostToNet16(key.size()),
0,
policy::MC_BINARY_RAW_BYTES,
0,
butil::HostToNet32(key.size()),
0,
0
};
if (_buf.append(&header, sizeof(header))) {
return false;
}
if (_buf.append(key.data(), key.size())) {
return false;
}
++_pipelined_count;
return true;
}
bool CouchbaseResponse::GetStatus(Status* st) {
const size_t n = _buf.size();
policy::MemcacheResponseHeader header;
if (n < sizeof(header)) {
butil::string_printf(&_err, "buffer is too small to contain a header");
return false;
}
_buf.copy_to(&header, sizeof(header));
if (n < sizeof(header) + header.total_body_length) {
butil::string_printf(&_err, "response=%u < header=%u + body=%u",
(unsigned)n, (unsigned)sizeof(header), header.total_body_length);
return false;
}
*st = static_cast<Status>(header.status);
return true;
}
} // namespace brpc
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#define BRPC_COUCHBASE_H #define BRPC_COUCHBASE_H
#include "brpc/memcache.h" #include "brpc/memcache.h"
#include "brpc/policy/memcache_binary_header.h"
namespace brpc { namespace brpc {
...@@ -102,6 +103,11 @@ public: ...@@ -102,6 +103,11 @@ public:
MemcacheRequest::CopyFrom(from); MemcacheRequest::CopyFrom(from);
} }
int ParseRequest(std::string* key,
policy::MemcacheBinaryCommand* command) const;
bool ReplicasGet(const butil::StringPiece& key);
private: private:
void MergeFrom(const CouchbaseRequest& from); void MergeFrom(const CouchbaseRequest& from);
...@@ -122,6 +128,8 @@ public: ...@@ -122,6 +128,8 @@ public:
MemcacheResponse::CopyFrom(from); MemcacheResponse::CopyFrom(from);
} }
bool GetStatus(Status* status);
private: private:
void MergeFrom(const CouchbaseResponse& from); void MergeFrom(const CouchbaseResponse& from);
......
This diff is collapsed.
...@@ -21,21 +21,35 @@ ...@@ -21,21 +21,35 @@
#include <unordered_map> #include <unordered_map>
#include "brpc/channel.h" #include "brpc/channel.h"
#include "brpc/couchbase.h"
#include "butil/containers/doubly_buffered_data.h" #include "butil/containers/doubly_buffered_data.h"
namespace brpc { namespace brpc {
using CouchbaseChannelMap =
std::unordered_map<std::string, std::unique_ptr<Channel>>;
class CouchbaseServerListener; class CouchbaseServerListener;
// A couchbase channel maps different key to sub memcache channel according to
// current vbuckets mapping. It retrieves current vbuckets mapping by maintain
// an connection for streaming updates from ther couchbase server.
//
// CAUTION:
// ========
// For async rpc, Should not delete this channel until rpc done.
class CouchbaseChannel : public ChannelBase/*non-copyable*/ { class CouchbaseChannel : public ChannelBase/*non-copyable*/ {
friend class CouchbaseServerListener;
public: public:
CouchbaseChannel() = default; CouchbaseChannel();
~CouchbaseChannel(); ~CouchbaseChannel();
// You MUST initialize a couchbasechannel before using it. 'Server_addr' // You MUST initialize a couchbasechannel before using it.
// is address of couchbase server. 'options' is used for each channel to // 'Server_addr': address list of couchbase servers. On these addresses, we
// real servers of bucket. The protocol should be PROTOCOL_MEMCACHE. // can get vbucket map.
// If 'options' is null, use default options. // 'options': is used for each memcache channel of vbucket. The protocol
// should be PROTOCOL_MEMCACHE. If 'options' is null,
// use default options.
int Init(const char* server_addr, const ChannelOptions* options); int Init(const char* server_addr, const ChannelOptions* options);
// TODO: Do not support pipeline mode now. // TODO: Do not support pipeline mode now.
...@@ -46,51 +60,71 @@ public: ...@@ -46,51 +60,71 @@ public:
google::protobuf::Message* response, google::protobuf::Message* response,
google::protobuf::Closure* done); google::protobuf::Closure* done);
void Describe(std::ostream& os, const DescribeOptions& options) const; void Describe(std::ostream& os, const DescribeOptions& options);
private: // Couchbase has two type of distribution used to map keys to servers.
// TODO: This struct describes map between vbucket and real memcache server. // One is vbucket distribution and other is ketama distribution.
// '_hash_algorithm': The hash algorithm couchbase used. // This struct describes vbucket distribution of couchbase.
// '_vbucket_servers': server list of vbuckets, like "list://addr1:port1, // 'num_replicas': the number of copies that will be stored on servers of one
// addr2:port2...". // vbucket. Each vbucket must have this number of servers
// '_channel_map': the channel for each vbucket. // indexes plus one.
// '_vbucket': A zero-based indexed by vBucketId. The entries in the _vbucket
// are arrays of integers, where each integer is a zero-based
// index into the '_servers'.
// '_fvbucket': It is fast forward map with same struct as _vbucket. It is
// used to provide the final vBubcket-to-server map during the
// statrt of the rebalance.
// '_servers': all servers of a bucket.
// '_channel_map': the memcache channel for each server.
// TODO: support ketama vbucket distribution
struct VBucketServerMap { struct VBucketServerMap {
std::string _hash_algorithm; uint64_t _version = 0;
std::vector<std::string> _vbucket_servers; int _num_replicas = 0;
std::unordered_map<std::string, std::unique_ptr<Channel>> _channel_map; std::vector<std::vector<int>> _vbucket;
std::vector<std::vector<int>> _fvbucket;
std::vector<std::string> _servers;
CouchbaseChannelMap _channel_map;
}; };
private:
int CheckHealth(); int CheckHealth();
bool GetKeyFromRequest(const google::protobuf::Message* request, Channel* SelectMasterChannel(const VBucketServerMap* vb_map,
butil::StringPiece* key); const size_t vb_index);
Channel* GetMappedChannel(const std::string* server,
const VBucketServerMap* vb_map);
Channel* SelectChannel(const butil::StringPiece& key, const CouchbaseChannelMap& GetChannelMap();
const VBucketServerMap* vbucket_map);
//TODO: Get different hash algorithm if needed. const std::string* GetMaster(const VBucketServerMap* vb_map,
size_t Hash(const std::string& type, const size_t vb_index, int* index = nullptr);
const butil::StringPiece& key,
const size_t size); size_t Hash(const butil::StringPiece& key, const size_t vbuckets_num);
bool UpdateVBucketServerMap( bool UpdateVBucketServerMap(
const std::string* hash_algo, const int num_replicas,
std::vector<std::string>* vbucket_servers, std::vector<std::vector<int>>& vbucket,
const std::vector<std::string>* added_vbuckets, std::vector<std::vector<int>>& fvbucket,
const std::vector<std::string>* removed_vbuckets); std::vector<std::string>& servers,
const std::vector<std::string>& added_servers,
const std::vector<std::string>& removed_serverss);
static bool Update(VBucketServerMap& vbucket_map, static bool Update(VBucketServerMap& vbucket_map,
const ChannelOptions* options, const ChannelOptions* options,
const std::string* hash_algo, const int num_replicas,
std::vector<std::string>* vbucket_servers, std::vector<std::vector<int>>& vbucket,
const std::vector<std::string>* added_vbuckets, std::vector<std::vector<int>>& fvbucket,
const std::vector<std::string>* removed_vbuckets); std::vector<std::string>& servers,
const std::vector<std::string>& added_servers,
const std::vector<std::string>& removed_servers);
std::string GetAuthentication() const;
// Options for each memcache channel of vbucket. // Options for each memcache channel of vbucket.
ChannelOptions _common_options; ChannelOptions _common_options;
// Listener monitor and update vbucket map information.
std::unique_ptr<CouchbaseServerListener> _listener; std::unique_ptr<CouchbaseServerListener> _listener;
// Memcache channel of each vbucket of couchbase. The key is the server list
// of this vbucket, like 'list://addr1:port1,addr2:port2...'.
butil::DoublyBufferedData<VBucketServerMap> _vbucket_map; butil::DoublyBufferedData<VBucketServerMap> _vbucket_map;
}; };
......
This diff is collapsed.
This diff is collapsed.
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/* The crc32 functions and data was originally written by Spencer
* Garrett <srg@quick.com> and was gleaned from the PostgreSQL source
* tree via the files contrib/ltree/crc32.[ch] and from FreeBSD at
* src/usr.bin/cksum/crc32.c.
*/
#include "butil/third_party/libvbucket/hash.h"
static const uint32_t crc32tab[256] = {
0x00000000, 0x77073096, 0xee0e612c, 0x990951ba,
0x076dc419, 0x706af48f, 0xe963a535, 0x9e6495a3,
0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91,
0x1db71064, 0x6ab020f2, 0xf3b97148, 0x84be41de,
0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec,
0x14015c4f, 0x63066cd9, 0xfa0f3d63, 0x8d080df5,
0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b,
0x35b5a8fa, 0x42b2986c, 0xdbbbc9d6, 0xacbcf940,
0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116,
0x21b4f4b5, 0x56b3c423, 0xcfba9599, 0xb8bda50f,
0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d,
0x76dc4190, 0x01db7106, 0x98d220bc, 0xefd5102a,
0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818,
0x7f6a0dbb, 0x086d3d2d, 0x91646c97, 0xe6635c01,
0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457,
0x65b0d9c6, 0x12b7e950, 0x8bbeb8ea, 0xfcb9887c,
0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2,
0x4adfa541, 0x3dd895d7, 0xa4d1c46d, 0xd3d6f4fb,
0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9,
0x5005713c, 0x270241aa, 0xbe0b1010, 0xc90c2086,
0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4,
0x59b33d17, 0x2eb40d81, 0xb7bd5c3b, 0xc0ba6cad,
0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683,
0xe3630b12, 0x94643b84, 0x0d6d6a3e, 0x7a6a5aa8,
0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe,
0xf762575d, 0x806567cb, 0x196c3671, 0x6e6b06e7,
0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5,
0xd6d6a3e8, 0xa1d1937e, 0x38d8c2c4, 0x4fdff252,
0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60,
0xdf60efc3, 0xa867df55, 0x316e8eef, 0x4669be79,
0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f,
0xc5ba3bbe, 0xb2bd0b28, 0x2bb45a92, 0x5cb36a04,
0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a,
0x9c0906a9, 0xeb0e363f, 0x72076785, 0x05005713,
0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21,
0x86d3d2d4, 0xf1d4e242, 0x68ddb3f8, 0x1fda836e,
0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c,
0x8f659eff, 0xf862ae69, 0x616bffd3, 0x166ccf45,
0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db,
0xaed16a4a, 0xd9d65adc, 0x40df0b66, 0x37d83bf0,
0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6,
0xbad03605, 0xcdd70693, 0x54de5729, 0x23d967bf,
0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d,
};
uint32_t hash_crc32(const char *key, size_t key_length)
{
uint64_t x;
uint32_t crc= UINT32_MAX;
for (x= 0; x < key_length; x++)
crc= (crc >> 8) ^ crc32tab[(crc ^ (uint64_t)key[x]) & 0xff];
return ((~crc) >> 16) & 0x7fff;
}
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2010 NorthScale, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBVBUCKET_HASH_H
#define LIBVBUCKET_HASH_H 1
#include <stdint.h>
#include <sys/types.h>
#include <stdio.h>
#ifdef __cplusplus
extern "C" {
namespace butil {
#endif
uint32_t hash_crc32(const char *key, size_t key_length);
uint32_t hash_ketama(const char *key, size_t key_length);
void hash_md5(const char *key, size_t key_length, unsigned char *result);
void* hash_md5_update(void *ctx, const char *key, size_t key_length);
void hash_md5_final(void *ctx, unsigned char *result);
#ifdef __cplusplus
} // namespace butil
}
#endif
#endif
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#include <stdlib.h>
#include "butil/third_party/libvbucket/hash.h"
/* This library uses the reference MD5 implementation from [RFC1321] */
#define PROTOTYPES 1
#include "butil/third_party/libvbucket/rfc1321/md5c.c"
#undef PROTOTYPES
void hash_md5(const char *key, size_t key_length, unsigned char *result)
{
MD5_CTX ctx;
MD5Init(&ctx);
MD5Update(&ctx, (unsigned char *)key, key_length);
MD5Final(result, &ctx);
}
void* hash_md5_update(void *ctx, const char *key, size_t key_length)
{
if (ctx == NULL) {
ctx = calloc(1, sizeof(MD5_CTX));
MD5Init(ctx);
}
MD5Update(ctx, (unsigned char *)key, key_length);
return ctx;
}
void hash_md5_final(void *ctx, unsigned char *result)
{
if (ctx == NULL) {
return;
}
MD5Final(result, ctx);
free(ctx);
}
uint32_t hash_ketama(const char *key, size_t key_length)
{
unsigned char digest[16];
hash_md5(key, key_length, digest);
return (uint32_t) ( (digest[3] << 24)
|(digest[2] << 16)
|(digest[1] << 8)
| digest[0]);
}
/* GLOBAL.H - RSAREF types and constants
*/
/* PROTOTYPES should be set to one if and only if the compiler supports
function argument prototyping.
The following makes PROTOTYPES default to 0 if it has not already
been defined with C compiler flags.
*/
#ifndef PROTOTYPES
#define PROTOTYPES 0
#endif
#include <stdint.h>
/* POINTER defines a generic pointer type */
typedef unsigned char *POINTER;
/* UINT2 defines a two byte word */
typedef uint16_t UINT2;
/* UINT4 defines a four byte word */
typedef uint32_t UINT4;
/* PROTO_LIST is defined depending on how PROTOTYPES is defined above.
If using PROTOTYPES, then PROTO_LIST returns the list, otherwise it
returns an empty list.
*/
#if PROTOTYPES
#define PROTO_LIST(list) list
#else
#define PROTO_LIST(list) ()
#endif
/* MD5.H - header file for MD5C.C
*/
/* Copyright (C) 1991-2, RSA Data Security, Inc. Created 1991. All
rights reserved.
License to copy and use this software is granted provided that it
is identified as the "RSA Data Security, Inc. MD5 Message-Digest
Algorithm" in all material mentioning or referencing this software
or this function.
License is also granted to make and use derivative works provided
that such works are identified as "derived from the RSA Data
Security, Inc. MD5 Message-Digest Algorithm" in all material
mentioning or referencing the derived work.
RSA Data Security, Inc. makes no representations concerning either
the merchantability of this software or the suitability of this
software for any particular purpose. It is provided "as is"
without express or implied warranty of any kind.
These notices must be retained in any copies of any part of this
documentation and/or software.
*/
/* MD5 context. */
typedef struct {
UINT4 state[4]; /* state (ABCD) */
UINT4 count[2]; /* number of bits, modulo 2^64 (lsb first) */
unsigned char buffer[64]; /* input buffer */
} MD5_CTX;
void MD5Init PROTO_LIST ((MD5_CTX *));
void MD5Update PROTO_LIST ((MD5_CTX *, unsigned char *, unsigned int));
void MD5Final PROTO_LIST ((unsigned char [16], MD5_CTX *));
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2010 NorthScale, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBVBUCKET_VISIBILITY_H
#define LIBVBUCKET_VISIBILITY_H 1
#ifdef BUILDING_LIBVBUCKET
#if defined (__SUNPRO_C) && (__SUNPRO_C >= 0x550)
#define LIBVBUCKET_PUBLIC_API __global
#elif defined __GNUC__
#define LIBVBUCKET_PUBLIC_API __attribute__ ((visibility("default")))
#elif defined(_MSC_VER)
#define LIBVBUCKET_PUBLIC_API extern __declspec(dllexport)
#else
/* unknown compiler */
#define LIBVBUCKET_PUBLIC_API
#endif
#else
#if defined(_MSC_VER)
#define LIBVBUCKET_PUBLIC_API extern __declspec(dllimport)
#else
#define LIBVBUCKET_PUBLIC_API
#endif
#endif
#endif /* LIBVBUCKET_VISIBILITY_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment