Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
B
brpc
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
brpc
Commits
00dae2e2
Commit
00dae2e2
authored
6 years ago
by
root
Committed by
caidaojin
6 years ago
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
support rebalance handling
parent
3d7bcc67
master
0.9.8-rc01
0.9.7
0.9.7-rc03
0.9.7-rc02
0.9.7-rc01
0.9.6
0.9.6-rc03
0.9.6-rc02
0.9.6-rc01
No related merge requests found
Hide whitespace changes
Inline
Side-by-side
Showing
19 changed files
with
1206 additions
and
155 deletions
+1206
-155
BUILD
BUILD
+1
-0
CMakeLists.txt
CMakeLists.txt
+1
-0
Makefile
Makefile
+1
-0
channel.h
src/brpc/channel.h
+2
-0
controller.h
src/brpc/controller.h
+0
-2
couchbase.cpp
src/brpc/couchbase.cpp
+29
-4
couchbase.h
src/brpc/couchbase.h
+17
-7
couchbase_channel.cpp
src/brpc/couchbase_channel.cpp
+697
-105
couchbase_channel.h
src/brpc/couchbase_channel.h
+88
-34
global.cpp
src/brpc/global.cpp
+3
-0
couchbase_naming_service.cpp
src/brpc/policy/couchbase_naming_service.cpp
+213
-0
couchbase_naming_service.h
src/brpc/policy/couchbase_naming_service.h
+91
-0
memcache_binary_header.h
src/brpc/policy/memcache_binary_header.h
+4
-1
memcache_binary_protocol.cpp
src/brpc/policy/memcache_binary_protocol.cpp
+1
-0
ketama.c
src/butil/third_party/libvbucket/ketama.c
+1
-1
md5.h
src/butil/third_party/libvbucket/rfc1321/md5.h
+3
-0
md5c.c
src/butil/third_party/libvbucket/rfc1321/md5c.c
+0
-1
vbucket.c
src/butil/third_party/libvbucket/vbucket.c
+21
-0
vbucket.h
src/butil/third_party/libvbucket/vbucket.h
+33
-0
No files found.
BUILD
View file @
00dae2e2
...
...
@@ -118,6 +118,7 @@ BUTIL_SRCS = [
"src/butil/third_party/snappy/snappy-stubs-internal.cc",
"src/butil/third_party/snappy/snappy.cc",
"src/butil/third_party/murmurhash3/murmurhash3.cpp",
"src/butil/third_party/libvbucket/rfc1321/md5c.c",
"src/butil/third_party/libvbucket/cJSON.c",
"src/butil/third_party/libvbucket/crc32.c",
"src/butil/third_party/libvbucket/ketama.c",
...
...
This diff is collapsed.
Click to expand it.
CMakeLists.txt
View file @
00dae2e2
...
...
@@ -207,6 +207,7 @@ set(BUTIL_SOURCES
${
PROJECT_SOURCE_DIR
}
/src/butil/third_party/snappy/snappy-stubs-internal.cc
${
PROJECT_SOURCE_DIR
}
/src/butil/third_party/snappy/snappy.cc
${
PROJECT_SOURCE_DIR
}
/src/butil/third_party/murmurhash3/murmurhash3.cpp
${
PROJECT_SOURCE_DIR
}
/src/butil/third_party/libvbucket/rfc1321/md5c.c
${
PROJECT_SOURCE_DIR
}
/src/butil/third_party/libvbucket/cJSON.c
${
PROJECT_SOURCE_DIR
}
/src/butil/third_party/libvbucket/crc32.c
${
PROJECT_SOURCE_DIR
}
/src/butil/third_party/libvbucket/ketama.c
...
...
This diff is collapsed.
Click to expand it.
Makefile
View file @
00dae2e2
...
...
@@ -47,6 +47,7 @@ BUTIL_SOURCES = \
src/butil/third_party/snappy/snappy-stubs-internal.cc
\
src/butil/third_party/snappy/snappy.cc
\
src/butil/third_party/murmurhash3/murmurhash3.cpp
\
src/butil/third_party/libvbucket/rfc1321/md5c.c
\
src/butil/third_party/libvbucket/cJSON.c
\
src/butil/third_party/libvbucket/crc32.c
\
src/butil/third_party/libvbucket/ketama.c
\
...
...
This diff is collapsed.
Click to expand it.
src/brpc/channel.h
View file @
00dae2e2
...
...
@@ -124,6 +124,8 @@ struct ChannelOptions {
class
Channel
:
public
ChannelBase
{
friend
class
Controller
;
friend
class
SelectiveChannel
;
friend
class
CouchbaseChannel
;
friend
class
CouchbaseServerListener
;
public
:
Channel
(
ProfilerLinker
=
ProfilerLinker
());
~
Channel
();
...
...
This diff is collapsed.
Click to expand it.
src/brpc/controller.h
View file @
00dae2e2
...
...
@@ -62,8 +62,6 @@ class MongoContext;
class
RetryPolicy
;
class
InputMessageBase
;
class
ThriftStub
;
class
CouchbaseChannel
;
class
CouchbaseDone
;
namespace
policy
{
class
OnServerStreamCreated
;
void
ProcessMongoRequest
(
InputMessageBase
*
);
...
...
This diff is collapsed.
Click to expand it.
src/brpc/couchbase.cpp
View file @
00dae2e2
...
...
@@ -38,8 +38,8 @@ int CouchbaseRequest::ParseRequest(
return
0
;
}
bool
CouchbaseRequest
::
Build
NewWithVBucketId
(
CouchbaseRequest
*
request
,
const
size_t
vbucket_id
)
const
{
bool
CouchbaseRequest
::
Build
VBucketId
(
const
size_t
vbucket_id
,
CouchbaseRequest
*
request
)
const
{
if
(
this
==
request
)
{
return
false
;
}
...
...
@@ -56,17 +56,19 @@ bool CouchbaseRequest::BuildNewWithVBucketId(CouchbaseRequest* request,
}
_buf
.
append_to
(
&
request
->
_buf
,
n
-
sizeof
(
header
),
sizeof
(
header
));
request
->
_pipelined_count
=
_pipelined_count
;
request
->
_read_replicas
=
_read_replicas
;
return
true
;
}
bool
CouchbaseRequest
::
ReplicasGet
(
const
butil
::
StringPiece
&
key
)
{
bool
CouchbaseRequest
::
ReplicasGet
(
const
butil
::
StringPiece
&
key
,
const
size_t
vbucket_id
)
{
const
policy
::
MemcacheRequestHeader
header
=
{
policy
::
MC_MAGIC_REQUEST
,
0x83
,
butil
::
HostToNet16
(
key
.
size
()),
0
,
policy
::
MC_BINARY_RAW_BYTES
,
0
,
butil
::
HostToNet16
(
vbucket_id
)
,
butil
::
HostToNet32
(
key
.
size
()),
0
,
0
...
...
@@ -77,10 +79,33 @@ bool CouchbaseRequest::ReplicasGet(const butil::StringPiece& key) {
if
(
_buf
.
append
(
key
.
data
(),
key
.
size
()))
{
return
false
;
}
_read_replicas
=
true
;
++
_pipelined_count
;
return
true
;
}
bool
CouchbaseResponse
::
RecoverOptCodeForReplicasRead
()
{
const
size_t
n
=
_buf
.
size
();
policy
::
MemcacheResponseHeader
header
;
if
(
n
<
sizeof
(
header
))
{
butil
::
string_printf
(
&
_err
,
"buffer is too small to contain a header"
);
return
false
;
}
_buf
.
copy_to
(
&
header
,
sizeof
(
header
));
if
(
header
.
command
!=
(
uint8_t
)
policy
::
MC_BINARY_REPLICAS_READ
)
{
butil
::
string_printf
(
&
_err
,
"not a replicas get response"
);
return
false
;
}
header
.
command
=
(
uint8_t
)
policy
::
MC_BINARY_GET
;
CouchbaseResponse
response
;
if
(
response
.
_buf
.
append
(
&
header
,
sizeof
(
header
)))
{
return
false
;
}
_buf
.
append_to
(
&
response
.
_buf
,
n
-
sizeof
(
header
),
sizeof
(
header
));
Swap
(
&
response
);
return
true
;
}
bool
CouchbaseResponse
::
GetStatus
(
Status
*
st
)
{
const
size_t
n
=
_buf
.
size
();
policy
::
MemcacheResponseHeader
header
;
...
...
This diff is collapsed.
Click to expand it.
src/brpc/couchbase.h
View file @
00dae2e2
...
...
@@ -24,14 +24,18 @@ namespace brpc {
// Request to couchbase.
// Do not support pipeline multiple operations in one request and sent now.
// Do not support Flush/Version
class
CouchbaseRequest
:
public
MemcacheRequest
{
friend
class
CouchbaseChannel
;
friend
class
VBucketContext
;
public
:
void
Swap
(
CouchbaseRequest
*
other
)
{
MemcacheRequest
::
Swap
(
other
);
}
bool
Get
(
const
butil
::
StringPiece
&
key
)
{
bool
Get
(
const
butil
::
StringPiece
&
key
,
bool
read_replicas
=
false
)
{
MemcacheRequest
::
Clear
();
_read_replicas
=
read_replicas
;
return
MemcacheRequest
::
Get
(
key
);
}
...
...
@@ -101,24 +105,28 @@ public:
void
CopyFrom
(
const
CouchbaseRequest
&
from
)
{
MemcacheRequest
::
CopyFrom
(
from
);
_read_replicas
=
from
.
_read_replicas
;
}
private
:
int
ParseRequest
(
std
::
string
*
key
,
policy
::
MemcacheBinaryCommand
*
command
)
const
;
bool
Build
NewWithVBucketId
(
CouchbaseRequest
*
request
,
const
size_t
vbucket_id
)
const
;
bool
Build
VBucketId
(
const
size_t
vbucket_id
,
CouchbaseRequest
*
request
)
const
;
bool
ReplicasGet
(
const
butil
::
StringPiece
&
key
);
bool
ReplicasGet
(
const
butil
::
StringPiece
&
key
,
const
size_t
vbucket_id
);
private
:
void
MergeFrom
(
const
CouchbaseRequest
&
from
);
int
pipelined_count
();
bool
read_replicas
()
const
{
return
_read_replicas
;
}
bool
_read_replicas
=
false
;
};
// Request to couchbase.
// Do not support pipeline multiple operations in one request and sent now.
// Response from couchbase.
class
CouchbaseResponse
:
public
MemcacheResponse
{
public
:
void
Swap
(
CouchbaseResponse
*
other
)
{
...
...
@@ -133,6 +141,8 @@ public:
bool
GetStatus
(
Status
*
status
);
bool
RecoverOptCodeForReplicasRead
();
private
:
void
MergeFrom
(
const
CouchbaseResponse
&
from
);
...
...
This diff is collapsed.
Click to expand it.
src/brpc/couchbase_channel.cpp
View file @
00dae2e2
...
...
@@ -16,9 +16,13 @@
#include "brpc/couchbase_channel.h"
#include "brpc/policy/couchbase_authenticator.h"
#include "brpc/policy/couchbase_naming_service.h"
#include "brpc/progressive_reader.h"
#include "bthread/bthread.h"
#include "butil/atomicops.h"
#include "butil/base64.h"
#include "butil/string_splitter.h"
#include "butil/strings/string_number_conversions.h"
#include "butil/third_party/libvbucket/hash.h"
#include "butil/third_party/libvbucket/vbucket.h"
...
...
@@ -26,37 +30,80 @@ namespace brpc {
DEFINE_string
(
couchbase_authorization_http_basic
,
""
,
"Http basic authorization of couchbase"
);
DEFINE_string
(
couchbase_bucket_init_string
,
""
,
"If the string is set, 'CouchbaseServerListener' will build vbucket map"
"directly by parsing from this string in initialization"
);
DEFINE_string
(
couchbase_bucket_streaming_url
,
"/pools/default/bucketsStreaming/"
,
"Monitor couchbase vbuckets map through this url"
);
DEFINE_int32
(
listener_retry_times
,
5
,
DEFINE_string
(
couchbase_bucket_name
,
""
,
"couchbase bucket name to access"
);
DEFINE_int32
(
couchbase_listen_retry_times
,
5
,
"Retry times to create couchbase vbucket map monitoring connection."
"Listen thread will sleep a while when reach this times."
);
DEFINE_int32
(
listener_sleep_interval_ms
,
1
00
,
DEFINE_int32
(
couchbase_listen_interval_ms
,
10
00
,
"Listen thread sleep for the number of milliseconds after creating"
"vbucket map monitoring connection failure."
);
DEFINE_bool
(
retry_during_rebalance
,
true
,
"A swith indicating whether to open retry during rebalance"
);
DEFINE_bool
(
replicas_read_flag
,
false
,
"Read replicas for get request in case of master node failure."
"This does not ensure that the data is the most current."
);
DEFINE_bool
(
couchbase_disable_retry_during_rebalance
,
false
,
"A swith indicating whether to open retry during rebalance status"
);
DEFINE_bool
(
couchbase_disable_retry_during_active
,
false
,
"A swith indicating whether to open retry during active status"
);
// Define error_code about retry during rebalance.
enum
RetryReason
{
// No need retry, dummy value.
DEFAULT_DUMMY
=
0
,
// No need retry, Rpc failed except cases include in SERVER_DOWN.
RPC_FAILED
=
1
,
// Server is down, need retry other servers during rebalance.
SERVER_DOWN
=
2
,
// Server is not mapped to the bucket, need retry other servers during rebalance.
RPC_SUCCESS_BUT_WRONG_SERVER
=
3
,
// Server is mapped to the bucket, retry the same server.
RPC_SUCCESS_BUT_RESPONSE_FAULT
=
4
,
// No need retry, response is ok.
RESPONSE_OK
=
5
,
};
enum
ServerType
{
// Master server choosed.
MASTER_SERVER
=
0x00
,
// Detected server choosed during rebalance
DETECTED_SERVER
=
0x01
,
// Replica server choosed for replicas read.
REPLICA_SERVER
=
0x02
,
};
namespace
{
const
butil
::
StringPiece
kSeparator
(
"
\n\n\n\n
"
,
4
);
const
std
::
string
kBucketStreamingUrlPrefix
(
"/pools/default/bucketsStreaming/"
);
const
std
::
string
kBucketUrlPrefix
(
"/pools/default/buckets/"
);
// The maximum number of vbuckets that couchbase support
const
size_t
kCouchbaseMaxVBuckets
=
65536
;
}
class
CouchbaseServerListener
;
class
LoadBalancerWithNaming
;
class
VBucketContext
;
enum
RetryReason
{
RPC_FAILED
=
0
,
WRONG_SERVER
=
1
,
RESPONSE_FAULT
=
2
,
};
// Get master server address(addr:port) of a vbucket.
// Return pointer to server if found, otherwise nullptr.
// If 'index' is not null, set server index to it.
const
std
::
string
*
GetMaster
(
const
VBucketServerMap
*
vb_map
,
const
size_t
vb_index
,
int
*
index
=
nullptr
);
// Get forward master server address(addr:port) of a vbucket.
// Return pointer to server if found, otherwise nullptr.
// If 'index' is not null, set index of found server to it.
const
std
::
string
*
GetForwardMaster
(
const
VBucketServerMap
*
vb_map
,
const
size_t
vb_index
,
int
*
index
=
nullptr
);
// Get replicas server address(addr:port) of a vbucket.
// Return pointer to server if found, otherwise nullptr.
// 'offset': zero-based index of vbucket. 0 indicating the first replica server.
// 'from_fvb': Get replica from fvbucket if true, otherwise get from vbucket.
const
std
::
string
*
GetReplica
(
const
VBucketServerMap
*
vb_map
,
const
size_t
vb_index
);
// Get vbucket id of key belonged to.
size_t
Hash
(
const
butil
::
StringPiece
&
key
,
const
size_t
vbuckets_num
);
class
CouchbaseServerListener
;
class
VBucketMapReader
:
public
ProgressiveReader
{
public
:
...
...
@@ -84,16 +131,28 @@ public:
butil
::
Mutex
_mutex
;
};
// TODO: Inherit from SharedObject. Couchbase channels to connect same server
// can share the same listener.
class
CouchbaseServerListener
{
public
:
CouchbaseServerListener
(
const
char
*
server_
addr
,
CouchbaseChannel
*
channel
)
:
_server_addr
(
server_addr
),
_url
(
FLAGS_couchbase_bucket_streaming_url
),
CouchbaseServerListener
(
const
char
*
server_
list
,
const
char
*
bucket_name
,
CouchbaseChannel
*
channel
)
:
_streaming_url
(
kBucketStreamingUrlPrefix
+
bucket_name
),
_cb_channel
(
channel
),
_reader
(
new
VBucketMapReader
(
this
))
{
Init
();
Init
(
server_list
,
kBucketUrlPrefix
+
bucket_name
);
}
CouchbaseServerListener
(
const
char
*
listen_url
,
CouchbaseChannel
*
channel
)
:
_cb_channel
(
channel
),
_reader
(
new
VBucketMapReader
(
this
))
{
std
::
string
init_url
;
std
::
string
server
;
if
(
!
policy
::
CouchbaseNamingService
::
ParseListenUrl
(
listen_url
,
&
server
,
&
_streaming_url
,
&
init_url
))
{
return
;
}
Init
(
server
.
c_str
(),
init_url
);
}
~
CouchbaseServerListener
();
void
UpdateVBucketMap
(
butil
::
VBUCKET_CONFIG_HANDLE
vbucket
);
...
...
@@ -104,21 +163,21 @@ private:
CouchbaseServerListener
(
const
CouchbaseServerListener
&
)
=
delete
;
CouchbaseServerListener
&
operator
=
(
const
CouchbaseServerListener
&
)
=
delete
;
void
Init
();
void
Init
(
const
char
*
server_list
,
const
std
::
string
&
init_url
);
void
InitVBucketMap
(
const
std
::
string
&
str
);
bool
InitVBucketMap
(
const
std
::
string
&
url
);
static
void
*
ListenThread
(
void
*
arg
);
bthread_t
_listen_bth
;
// Server address list of couchbase servers. From these servers(host:port),
// we can monitor vbucket map.
const
std
::
string
_server_addr
;
// REST/JSON url to monitor vbucket map.
const
std
::
string
_url
;
std
::
string
_streaming
_url
;
std
::
string
_auth
;
std
::
string
_listen_port
;
std
::
string
_service_name
;
CouchbaseChannel
*
_cb_channel
;
// Monitor couchbase vbuckets map on this channel.
// TODO: Add/removed server due to rebalance/failover.
Channel
_listen_channel
;
// If _reader is not attached to listen socket, it will be released in
// CouchbaseServerListener desconstruction. Otherwise, it will be released
...
...
@@ -137,11 +196,11 @@ butil::Status VBucketMapReader::OnReadOnePart(const void* data, size_t length) {
size_t
pos
=
0
;
size_t
new_pos
=
_buf
.
find
(
kSeparator
.
data
(),
pos
,
kSeparator
.
size
());
while
(
new_pos
!=
std
::
string
::
npos
)
{
std
::
string
complete
=
_buf
.
substr
(
pos
,
new_pos
);
std
::
string
complete
=
_buf
.
substr
(
pos
,
new_pos
-
pos
);
butil
::
VBUCKET_CONFIG_HANDLE
vb
=
butil
::
vbucket_config_parse_string
(
complete
.
c_str
());
_listener
->
UpdateVBucketMap
(
vb
);
if
(
vb
!=
nullptr
)
{
_listener
->
UpdateVBucketMap
(
vb
);
butil
::
vbucket_config_destroy
(
vb
);
}
pos
=
new_pos
+
kSeparator
.
size
();
...
...
@@ -166,11 +225,26 @@ void VBucketMapReader::OnEndOfMessage(const butil::Status& status) {
return
;
}
}
// If '_listener' is desconstructed, release this object.
std
::
unique_ptr
<
VBucketMapReader
>
release
(
this
);
}
void
CouchbaseServerListener
::
Init
()
{
CouchbaseServerListener
::~
CouchbaseServerListener
()
{
std
::
unique_lock
<
butil
::
Mutex
>
mu
(
_reader
->
_mutex
);
bthread_stop
(
_listen_bth
);
bthread_join
(
_listen_bth
,
nullptr
);
if
(
!
_reader
->
IsAttached
())
{
mu
.
unlock
();
std
::
unique_ptr
<
VBucketMapReader
>
p
(
_reader
);
}
else
{
_reader
->
Destroy
();
}
policy
::
CouchbaseNamingService
::
ClearNamingServiceData
(
_service_name
);
}
void
CouchbaseServerListener
::
Init
(
const
char
*
server_list
,
const
std
::
string
&
init_url
)
{
if
(
!
FLAGS_couchbase_authorization_http_basic
.
empty
())
{
butil
::
Base64Encode
(
FLAGS_couchbase_authorization_http_basic
,
&
_auth
);
_auth
=
"Basic "
+
_auth
;
...
...
@@ -183,33 +257,52 @@ void CouchbaseServerListener::Init() {
}
ChannelOptions
options
;
options
.
protocol
=
PROTOCOL_HTTP
;
CHECK
(
_listen_channel
.
Init
(
_server_addr
.
c_str
(),
"rr"
,
&
options
)
==
0
)
<<
"Failed to init listen channel."
;
if
(
!
FLAGS_couchbase_bucket_init_string
.
empty
()
)
{
InitVBucketMap
(
FLAGS_couchbase_bucket_init_string
);
std
::
string
ns_servers
;
butil
::
StringPiece
servers
(
server_list
)
;
if
(
servers
.
find
(
"//"
)
==
servers
.
npos
)
{
ns_servers
.
append
(
"couchbase_list://"
);
}
CreateListener
();
}
CouchbaseServerListener
::~
CouchbaseServerListener
()
{
std
::
unique_lock
<
butil
::
Mutex
>
mu
(
_reader
->
_mutex
);
bthread_stop
(
_listen_bth
);
bthread_join
(
_listen_bth
,
nullptr
);
if
(
!
_reader
->
IsAttached
())
{
mu
.
unlock
();
std
::
unique_ptr
<
VBucketMapReader
>
p
(
_reader
);
ns_servers
.
append
(
server_list
);
if
(
policy
::
CouchbaseNamingService
::
ParseNamingServiceUrl
(
ns_servers
,
&
_listen_port
))
{
std
::
string
unique_id
=
"_"
+
butil
::
Uint64ToString
(
reinterpret_cast
<
uint64_t
>
(
this
));
ns_servers
+=
unique_id
;
_service_name
=
server_list
+
unique_id
;
CHECK
(
_listen_channel
.
Init
(
ns_servers
.
c_str
(),
"rr"
,
&
options
)
==
0
)
<<
"Failed to init listen channel."
;
}
else
{
_reader
->
Destroy
();
LOG
(
FATAL
)
<<
"Failed to init couchbase listener."
;
return
;
}
if
(
!
InitVBucketMap
(
init_url
))
{
LOG
(
ERROR
)
<<
"Failed to init vbucket map."
;
}
CreateListener
();
}
void
CouchbaseServerListener
::
InitVBucketMap
(
const
std
::
string
&
str
)
{
butil
::
VBUCKET_CONFIG_HANDLE
vb
=
butil
::
vbucket_config_parse_string
(
str
.
c_str
());
UpdateVBucketMap
(
vb
);
if
(
vb
!=
nullptr
)
{
butil
::
vbucket_config_destroy
(
vb
);
bool
CouchbaseServerListener
::
InitVBucketMap
(
const
std
::
string
&
uri
)
{
Controller
cntl
;
for
(
int
i
=
0
;
i
!=
FLAGS_couchbase_listen_retry_times
;
++
i
)
{
cntl
.
Reset
();
if
(
!
_auth
.
empty
())
{
cntl
.
http_request
().
SetHeader
(
"Authorization"
,
_auth
);
}
cntl
.
http_request
().
uri
()
=
uri
;
_listen_channel
.
CallMethod
(
nullptr
,
&
cntl
,
nullptr
,
nullptr
,
nullptr
);
if
(
cntl
.
Failed
())
{
LOG
(
ERROR
)
<<
"Failed to get vbucket map: "
<<
cntl
.
ErrorText
();
continue
;
}
std
::
string
str
=
cntl
.
response_attachment
().
to_string
();
butil
::
VBUCKET_CONFIG_HANDLE
vb
=
butil
::
vbucket_config_parse_string
(
str
.
c_str
());
if
(
vb
!=
nullptr
)
{
UpdateVBucketMap
(
vb
);
butil
::
vbucket_config_destroy
(
vb
);
return
true
;
}
}
return
false
;
}
void
*
CouchbaseServerListener
::
ListenThread
(
void
*
arg
)
{
...
...
@@ -219,11 +312,11 @@ void* CouchbaseServerListener::ListenThread(void* arg) {
listener
->
_reader
->
Detach
();
Controller
cntl
;
int
i
=
0
;
for
(;
i
!=
FLAGS_
listener
_retry_times
;
++
i
)
{
for
(;
i
!=
FLAGS_
couchbase_listen
_retry_times
;
++
i
)
{
if
(
!
listener
->
_auth
.
empty
())
{
cntl
.
http_request
().
SetHeader
(
"Authorization"
,
listener
->
_auth
);
}
cntl
.
http_request
().
uri
()
=
listener
->
_url
;
cntl
.
http_request
().
uri
()
=
listener
->
_
streaming_
url
;
cntl
.
response_will_be_read_progressively
();
listener
->
_listen_channel
.
CallMethod
(
nullptr
,
&
cntl
,
nullptr
,
nullptr
,
nullptr
);
...
...
@@ -236,8 +329,8 @@ void* CouchbaseServerListener::ListenThread(void* arg) {
break
;
}
if
(
i
==
FLAGS_
listener
_retry_times
)
{
if
(
bthread_usleep
(
FLAGS_
listener_sleep
_interval_ms
*
1000
)
<
0
)
{
if
(
i
==
FLAGS_
couchbase_listen
_retry_times
)
{
if
(
bthread_usleep
(
FLAGS_
couchbase_listen
_interval_ms
*
1000
)
<
0
)
{
if
(
errno
==
ESTOP
)
{
LOG
(
INFO
)
<<
"ListenThread is stopped."
;
break
;
...
...
@@ -271,50 +364,197 @@ void CouchbaseServerListener::UpdateVBucketMap(
// TODO: ketama distribution
if
(
butil
::
vbucket_config_get_distribution_type
(
vb_conf
)
==
butil
::
VBUCKET_DISTRIBUTION_KETAMA
)
{
LOG
(
FATAL
)
<<
"
Not support ketama
distribution."
;
!=
butil
::
VBUCKET_DISTRIBUTION_VBUCKET
)
{
LOG
(
FATAL
)
<<
"
Only support vbucket
distribution."
;
return
;
}
const
CouchbaseChannelMap
&
channel_map
=
_cb_channel
->
GetChannelM
ap
();
in
t
vb_num
=
butil
::
vbucket_config_get_num_vbuckets
(
vb_conf
);
in
t
replicas_num
=
butil
::
vbucket_config_get_num_replicas
(
vb_conf
);
in
t
server_num
=
butil
::
vbucket_config_get_num_servers
(
vb_conf
);
const
VBucketServerMap
*
vb_map
=
_cb_channel
->
vbucket_m
ap
();
const
size_
t
vb_num
=
butil
::
vbucket_config_get_num_vbuckets
(
vb_conf
);
const
size_
t
replicas_num
=
butil
::
vbucket_config_get_num_replicas
(
vb_conf
);
const
size_
t
server_num
=
butil
::
vbucket_config_get_num_servers
(
vb_conf
);
std
::
vector
<
std
::
vector
<
int
>>
vbuckets
(
vb_num
);
std
::
vector
<
std
::
vector
<
int
>>
fvbuckets
;
std
::
vector
<
std
::
string
>
servers
(
server_num
);
std
::
vector
<
std
::
string
>
added_servers
;
std
::
vector
<
std
::
string
>
removed_servers
;
for
(
int
i
=
0
;
i
!=
vb_num
;
++
i
)
{
if
(
butil
::
vbucket_config_has_forward_vbuckets
(
vb_conf
))
{
fvbuckets
.
resize
(
vb_num
);
}
for
(
size_t
i
=
0
;
i
!=
vb_num
;
++
i
)
{
if
(
butil
::
vbucket_config_has_forward_vbuckets
(
vb_conf
))
{
fvbuckets
[
i
].
resize
(
replicas_num
+
1
,
-
1
);
}
vbuckets
[
i
].
resize
(
replicas_num
+
1
,
-
1
);
vbuckets
[
i
][
0
]
=
butil
::
vbucket_get_master
(
vb_conf
,
i
);
for
(
int
j
=
1
;
j
<=
replicas_num
;
++
j
)
{
vbuckets
[
i
][
j
]
=
butil
::
vbucket_get_replica
(
vb_conf
,
i
,
j
);
if
(
butil
::
vbucket_config_has_forward_vbuckets
(
vb_conf
))
{
fvbuckets
[
i
][
0
]
=
butil
::
fvbucket_get_master
(
vb_conf
,
i
);
}
for
(
size_t
j
=
0
;
j
<
replicas_num
;
++
j
)
{
vbuckets
[
i
][
j
+
1
]
=
butil
::
vbucket_get_replica
(
vb_conf
,
i
,
j
);
if
(
butil
::
vbucket_config_has_forward_vbuckets
(
vb_conf
))
{
fvbuckets
[
i
][
j
+
1
]
=
butil
::
fvbucket_get_replica
(
vb_conf
,
i
,
j
);
}
}
}
for
(
int
i
=
0
;
i
!=
server_num
;
++
i
)
{
std
::
vector
<
size_t
>
keeping_servers
;
for
(
size_t
i
=
0
;
i
!=
server_num
;
++
i
)
{
servers
[
i
]
=
butil
::
vbucket_config_get_server
(
vb_conf
,
i
);
const
auto
iter
=
channel_map
.
find
(
servers
[
i
]);
if
(
iter
==
channel_map
.
end
())
{
const
auto
iter
=
vb_map
->
_
channel_map
.
find
(
servers
[
i
]);
if
(
iter
==
vb_map
->
_
channel_map
.
end
())
{
added_servers
.
emplace_back
(
servers
[
i
]);
}
else
{
keeping_servers
.
emplace_back
(
i
);
}
}
for
(
size_t
i
=
0
;
i
!=
vb_map
->
_servers
.
size
();
++
i
)
{
size_t
j
=
0
;
for
(;
j
!=
keeping_servers
.
size
();
++
j
)
{
if
(
vb_map
->
_servers
[
i
]
==
servers
[
keeping_servers
[
j
]])
{
break
;
}
}
if
(
j
==
keeping_servers
.
size
())
{
removed_servers
.
emplace_back
(
vb_map
->
_servers
[
i
]);
}
}
// Reset new server list of listen channel.
if
(
!
added_servers
.
empty
()
||
!
removed_servers
.
empty
())
{
std
::
string
server_list
;
for
(
const
auto
&
server
:
servers
)
{
const
size_t
pos
=
server
.
find
(
':'
);
server_list
.
append
(
server
.
data
(),
pos
);
server_list
+=
":"
+
_listen_port
+
","
;
}
server_list
.
pop_back
();
policy
::
CouchbaseNamingService
::
ResetCouchbaseListenerServers
(
_service_name
,
server_list
);
}
bool
curr_rebalance
=
_cb_channel
->
IsInRebalancing
(
vb_map
);
bool
update_rebalance
=
!
fvbuckets
.
empty
();
uint64_t
version
=
vb_map
->
_version
;
_cb_channel
->
UpdateVBucketServerMap
(
replicas_num
,
vbuckets
,
fvbuckets
,
servers
,
added_servers
,
removed_servers
);
if
(
!
curr_rebalance
&&
update_rebalance
)
{
LOG
(
ERROR
)
<<
"Couchbase enters into rebalance status from version "
<<
++
version
;
}
if
(
curr_rebalance
&&
!
update_rebalance
)
{
DetectedVBucketMap
&
detect
=
*
_cb_channel
->
_detected_vbucket_map
;
for
(
size_t
vb_index
=
0
;
vb_index
!=
vb_num
;
++
vb_index
)
{
detect
[
vb_index
].
_verified
.
store
(
false
,
butil
::
memory_order_relaxed
);
detect
[
vb_index
].
_index
.
store
(
-
1
,
butil
::
memory_order_relaxed
);
}
LOG
(
ERROR
)
<<
"Couchbase quit rebalance status from version "
<<
++
version
;
}
}
class
VBucketContext
{
public
:
VBucketContext
()
=
default
;
~
VBucketContext
()
=
default
;
bool
Init
(
const
VBucketServerMap
*
vb_map
,
const
size_t
vb_index
,
const
int
server_type
,
const
int
server_index
,
const
CouchbaseRequest
*
request
,
const
std
::
string
&
key
,
const
policy
::
MemcacheBinaryCommand
command
);
VBucketStatus
Update
(
const
VBucketServerMap
*
vb_map
,
const
size_t
vb_index
);
const
CouchbaseRequest
*
GetReplicasReadRequest
();
public
:
size_t
_retried_count
=
0
;
uint64_t
_version
=
0
;
int
_server_type
=
0
;
int
_server_type
=
MASTER_SERVER
;
int
_server_index
=
0
;
size_t
_vbucket_index
;
policy
::
MemcacheBinaryCommand
_command
;
std
::
string
_forward_master
;
std
::
string
_master
;
std
::
string
_key
;
policy
::
MemcacheBinaryCommand
_command
;
CouchbaseRequest
_request
;
CouchbaseRequest
_replica
s_req
;
CouchbaseRequest
_replica
_request
;
};
bool
VBucketContext
::
Init
(
const
VBucketServerMap
*
vb_map
,
const
size_t
vb_index
,
const
int
server_type
,
const
int
server_index
,
const
CouchbaseRequest
*
request
,
const
std
::
string
&
key
,
const
policy
::
MemcacheBinaryCommand
command
)
{
if
(
vb_map
->
_version
==
0
)
{
return
false
;
}
_version
=
vb_map
->
_version
;
_vbucket_index
=
vb_index
;
_server_type
=
server_type
;
_server_index
=
server_index
;
_command
=
command
;
_key
=
key
;
const
std
::
string
*
fm
=
GetForwardMaster
(
vb_map
,
vb_index
);
if
(
fm
!=
nullptr
)
{
_forward_master
=
*
fm
;
}
const
std
::
string
*
master
=
GetMaster
(
vb_map
,
vb_index
);
_master
=
*
master
;
if
(
!
request
->
BuildVBucketId
(
vb_index
,
&
_request
))
{
return
false
;
}
return
true
;
}
VBucketStatus
VBucketContext
::
Update
(
const
VBucketServerMap
*
vb_map
,
const
size_t
vb_index
)
{
VBucketStatus
change
=
NO_CHANGE
;
if
(
_version
==
vb_map
->
_version
)
{
change
=
NO_CHANGE
;
return
change
;
}
_version
=
vb_map
->
_version
;
const
std
::
string
*
fm
=
GetForwardMaster
(
vb_map
,
vb_index
);
const
std
::
string
*
master
=
GetMaster
(
vb_map
,
vb_index
);
if
(
_forward_master
.
empty
())
{
if
(
fm
==
nullptr
)
{
if
(
_master
==
*
master
)
{
change
=
MASTER_KEEPING_WITHOUT_F
;
}
else
{
change
=
MASTER_CHANGE_WITHOUT_F
;
}
}
else
{
change
=
FORWARD_CREATE
;
}
}
else
{
if
(
fm
==
nullptr
)
{
change
=
FORWARD_FINISH
;
}
else
{
if
(
_forward_master
==
*
fm
)
{
change
=
FORWARD_KEEPING
;
}
else
{
change
=
FORWARD_CHANGE
;
}
}
}
if
(
fm
!=
nullptr
)
{
_forward_master
=
*
fm
;
}
_master
=
*
master
;
return
change
;
}
const
CouchbaseRequest
*
VBucketContext
::
GetReplicasReadRequest
()
{
if
(
!
_replica_request
.
IsInitialized
())
{
_replica_request
.
ReplicasGet
(
_key
,
_vbucket_index
);
}
return
&
_replica_request
;
}
class
CouchbaseDone
:
public
google
::
protobuf
::
Closure
{
friend
class
CouchbaseChannel
;
public
:
CouchbaseDone
(
CouchbaseChannel
*
cb_channel
,
brpc
::
Controller
*
cntl
,
CouchbaseResponse
*
response
,
google
::
protobuf
::
Closure
*
done
)
...
...
@@ -333,11 +573,66 @@ private:
void
CouchbaseDone
::
Run
()
{
std
::
unique_ptr
<
CouchbaseDone
>
self_guard
(
this
);
while
(
FLAGS_retry_during_rebalance
)
{
//TODO: retry in case of rebalance/failover.
break
;
ClosureGuard
done_guard
(
_done
);
if
(
FLAGS_couchbase_disable_retry_during_rebalance
)
{
return
;
}
int
reason
=
0
;
std
::
string
error_text
;
bool
retry
=
_cb_channel
->
IsNeedRetry
(
_cntl
,
_vb_context
,
_response
,
&
reason
,
&
error_text
);
_cb_channel
->
UpdateDetectedMasterIfNeeded
(
reason
,
_vb_context
);
if
(
!
retry
)
{
return
;
}
int64_t
remain_ms
=
_cntl
->
timeout_ms
()
-
_cntl
->
latency_us
()
/
1000
;
if
(
remain_ms
<=
0
)
{
_cntl
->
SetFailed
(
ERPCTIMEDOUT
,
"reach timeout, finish retry"
);
return
;
}
if
(
reason
!=
SERVER_DOWN
)
{
_cntl
->
SetFailed
(
error_text
);
}
Controller
retry_cntl
;
retry_cntl
.
set_timeout_ms
(
remain_ms
);
// TODO: Inherit other fields except of timeout_ms of _cntl.
retry_cntl
.
set_log_id
(
_cntl
->
log_id
());
retry_cntl
.
set_max_retry
(
0
);
while
(
true
)
{
// TODO: _cntl cancel
if
(
!
_cb_channel
->
DoRetry
(
reason
,
&
retry_cntl
,
_response
,
&
_vb_context
))
{
break
;
}
reason
=
0
;
retry
=
_cb_channel
->
IsNeedRetry
(
&
retry_cntl
,
_vb_context
,
_response
,
&
reason
,
&
error_text
);
_cb_channel
->
UpdateDetectedMasterIfNeeded
(
reason
,
_vb_context
);
if
(
!
retry
)
{
break
;
}
remain_ms
=
retry_cntl
.
timeout_ms
()
-
retry_cntl
.
latency_us
()
/
1000
;
if
(
remain_ms
<=
0
)
{
retry_cntl
.
SetFailed
(
ERPCTIMEDOUT
,
"reach timeout, finish retry"
);
break
;
}
_cntl
->
SetFailed
(
error_text
);
retry_cntl
.
Reset
();
retry_cntl
.
set_timeout_ms
(
remain_ms
);
// TODO: Inherit other fields except of timeout_ms of _cntl.
retry_cntl
.
set_log_id
(
_cntl
->
log_id
());
retry_cntl
.
set_max_retry
(
0
);
}
if
(
_vb_context
.
_server_type
==
REPLICA_SERVER
&&
retry_cntl
.
ErrorCode
()
==
0
)
{
_response
->
RecoverOptCodeForReplicasRead
();
}
// Fetch result from retry_cntl to _cntl. They share the same response.
if
(
retry_cntl
.
Failed
())
{
_cntl
->
SetFailed
(
retry_cntl
.
ErrorText
());
}
_done
->
Run
();
_cntl
->
_error_code
=
retry_cntl
.
ErrorCode
();
_cntl
->
OnRPCEnd
(
butil
::
gettimeofday_us
());
}
CouchbaseChannel
::
CouchbaseChannel
()
{}
...
...
@@ -346,7 +641,29 @@ CouchbaseChannel::~CouchbaseChannel() {
_listener
.
reset
(
nullptr
);
}
int
CouchbaseChannel
::
Init
(
const
char
*
server_addr
,
int
CouchbaseChannel
::
Init
(
const
char
*
listen_url
,
const
ChannelOptions
*
options
)
{
if
(
options
!=
nullptr
)
{
if
(
options
->
protocol
!=
PROTOCOL_UNKNOWN
&&
options
->
protocol
!=
PROTOCOL_MEMCACHE
)
{
LOG
(
FATAL
)
<<
"Failed to init channel due to invalid protocol "
<<
options
->
protocol
.
name
()
<<
'.'
;
return
-
1
;
}
_common_options
=
*
options
;
}
_common_options
.
protocol
=
PROTOCOL_MEMCACHE
;
_detected_vbucket_map
.
reset
(
new
std
::
vector
<
DetectedMaster
>
(
kCouchbaseMaxVBuckets
));
auto
ptr
=
new
CouchbaseServerListener
(
listen_url
,
this
);
if
(
ptr
==
nullptr
)
{
LOG
(
FATAL
)
<<
"Failed to init CouchbaseChannel to "
<<
listen_url
<<
'.'
;
return
-
1
;
}
_listener
.
reset
(
ptr
);
return
0
;
}
int
CouchbaseChannel
::
Init
(
const
char
*
server_addr
,
const
char
*
bucket_name
,
const
ChannelOptions
*
options
)
{
if
(
options
!=
nullptr
)
{
if
(
options
->
protocol
!=
PROTOCOL_UNKNOWN
&&
...
...
@@ -358,7 +675,9 @@ int CouchbaseChannel::Init(const char* server_addr,
_common_options
=
*
options
;
}
_common_options
.
protocol
=
PROTOCOL_MEMCACHE
;
auto
ptr
=
new
CouchbaseServerListener
(
server_addr
,
this
);
_detected_vbucket_map
.
reset
(
new
std
::
vector
<
DetectedMaster
>
(
kCouchbaseMaxVBuckets
));
auto
ptr
=
new
CouchbaseServerListener
(
server_addr
,
bucket_name
,
this
);
if
(
ptr
==
nullptr
)
{
LOG
(
FATAL
)
<<
"Failed to init CouchbaseChannel to "
<<
server_addr
<<
'.'
;
return
-
1
;
...
...
@@ -378,11 +697,11 @@ void CouchbaseChannel::CallMethod(const google::protobuf::MethodDescriptor* meth
ClosureGuard
done_guard
(
done
);
std
::
string
key
;
policy
::
MemcacheBinaryCommand
command
;
// Do not support Flush/Version
if
(
req
->
ParseRequest
(
&
key
,
&
command
)
!=
0
)
{
cntl
->
SetFailed
(
"failed to parse key and command from request"
);
return
;
}
const
CallId
call_id
=
cntl
->
call_id
();
{
butil
::
DoublyBufferedData
<
VBucketServerMap
>::
ScopedPtr
vb_map
;
if
(
_vbucket_map
.
Read
(
&
vb_map
)
!=
0
)
{
...
...
@@ -393,34 +712,65 @@ void CouchbaseChannel::CallMethod(const google::protobuf::MethodDescriptor* meth
cntl
->
SetFailed
(
ENODATA
,
"vbucket map is not initialize"
);
return
;
}
ServerType
type
=
MASTER_SERVER
;
int
index
=
-
1
;
const
size_t
vb_index
=
Hash
(
key
,
vb_map
->
_vbucket
.
size
());
channel
=
SelectMasterChannel
(
vb_map
.
get
(),
vb_index
);
if
(
!
IsInRebalancing
(
vb_map
.
get
())
||
FLAGS_couchbase_disable_retry_during_rebalance
)
{
const
std
::
string
*
server
=
GetMaster
(
vb_map
.
get
(),
vb_index
,
&
index
);
channel
=
GetMappedChannel
(
server
,
vb_map
.
get
());
}
else
{
// Close the default retry policy. CouchbaeChannel decide to how to retry.
cntl
->
set_max_retry
(
0
);
index
=
GetDetectedMaster
(
vb_map
.
get
(),
vb_index
);
if
(
index
>=
0
)
{
type
=
DETECTED_SERVER
;
channel
=
GetMappedChannel
(
&
vb_map
->
_servers
[
index
],
vb_map
.
get
());
}
else
{
const
std
::
string
*
server
=
GetMaster
(
vb_map
.
get
(),
vb_index
,
&
index
);
channel
=
GetMappedChannel
(
server
,
vb_map
.
get
());
}
}
if
(
channel
==
nullptr
)
{
cntl
->
SetFailed
(
ENODATA
,
"failed to get mapped channel"
);
return
;
}
CouchbaseRequest
new_req
;
if
(
!
req
->
BuildNewWithVBucketId
(
&
new_req
,
vb_index
))
{
cntl
->
SetFailed
(
"failed to add vbucket id"
);
CouchbaseDone
*
cb_done
=
new
CouchbaseDone
(
this
,
cntl
,
static_cast
<
CouchbaseResponse
*>
(
response
),
done
);
if
(
!
cb_done
->
_vb_context
.
Init
(
vb_map
.
get
(),
vb_index
,
type
,
index
,
req
,
key
,
command
))
{
cntl
->
SetFailed
(
ENOMEM
,
"failed to init couchbase context"
);
return
;
}
done_guard
.
release
();
channel
->
CallMethod
(
nullptr
,
cntl
,
&
new_req
,
response
,
done
);
channel
->
CallMethod
(
nullptr
,
cntl
,
&
cb_done
->
_vb_context
.
_request
,
response
,
cb_done
);
}
if
(
done
==
nullptr
)
{
Join
(
call_id
);
}
return
;
}
Channel
*
CouchbaseChannel
::
SelectMasterChannel
(
const
VBucketServerMap
*
vb_map
,
const
size_t
vb_index
)
{
return
GetMappedChannel
(
GetMaster
(
vb_map
,
vb_index
),
vb_map
);
Channel
*
CouchbaseChannel
::
SelectBackupChannel
(
const
VBucketServerMap
*
vb_map
,
const
size_t
vb_index
,
const
int
reason
,
VBucketContext
*
context
)
{
VBucketStatus
change
=
VBucketStatus
::
NO_CHANGE
;
if
(
vb_map
->
_version
!=
context
->
_version
)
{
change
=
context
->
Update
(
vb_map
,
vb_index
);
}
const
std
::
string
*
server
=
GetNextRetryServer
(
change
,
reason
,
vb_map
,
vb_index
,
context
);
return
server
?
GetMappedChannel
(
server
,
vb_map
)
:
nullptr
;
}
const
CouchbaseChannelMap
&
CouchbaseChannel
::
GetChannelM
ap
()
{
const
VBucketServerMap
*
CouchbaseChannel
::
vbucket_m
ap
()
{
butil
::
DoublyBufferedData
<
VBucketServerMap
>::
ScopedPtr
vbucket_map
;
if
(
_vbucket_map
.
Read
(
&
vbucket_map
)
!=
0
)
{
LOG
(
FATAL
)
<<
"Failed to read vbucket map."
;
LOG
(
ERROR
)
<<
"Failed to read vbucket map."
;
return
nullptr
;
}
return
vbucket_map
->
_channel_map
;
return
vbucket_map
.
get
()
;
}
Channel
*
CouchbaseChannel
::
GetMappedChannel
(
const
std
::
string
*
server
,
...
...
@@ -435,24 +785,219 @@ Channel* CouchbaseChannel::GetMappedChannel(const std::string* server,
return
nullptr
;
}
const
std
::
string
*
CouchbaseChannel
::
GetMaster
(
const
VBucketServerMap
*
vb_map
,
const
size_t
vb_index
,
int
*
index
)
{
if
(
vb_index
<
vb_map
->
_vbucket
.
size
())
{
const
int
i
=
vb_map
->
_vbucket
[
vb_index
][
0
];
if
(
i
>=
0
&&
i
<
static_cast
<
int
>
(
vb_map
->
_servers
.
size
()))
{
if
(
index
!=
nullptr
)
{
*
index
=
i
;
bool
CouchbaseChannel
::
IsNeedRetry
(
const
Controller
*
cntl
,
const
VBucketContext
&
context
,
CouchbaseResponse
*
response
,
int
*
reason
,
std
::
string
*
error_text
)
{
*
reason
=
DEFAULT_DUMMY
;
error_text
->
clear
();
const
int
error_code
=
cntl
->
ErrorCode
();
if
(
error_code
!=
0
)
{
if
(
error_code
==
EHOSTDOWN
||
error_code
==
ELOGOFF
||
error_code
==
EFAILEDSOCKET
||
error_code
==
EEOF
||
error_code
==
ECLOSE
||
error_code
==
ECONNRESET
)
{
*
reason
=
SERVER_DOWN
;
error_text
->
append
(
cntl
->
ErrorText
());
error_text
->
append
(
";"
);
}
else
{
*
reason
=
RPC_FAILED
;
}
}
else
{
CouchbaseResponse
::
Status
status
=
CouchbaseResponse
::
STATUS_SUCCESS
;
const
size_t
vb_index
=
context
.
_vbucket_index
;
if
(
response
->
GetStatus
(
&
status
))
{
if
(
status
!=
CouchbaseResponse
::
STATUS_SUCCESS
)
{
*
reason
=
status
==
CouchbaseResponse
::
STATUS_NOT_MY_VBUCKET
?
RPC_SUCCESS_BUT_WRONG_SERVER
:
RPC_SUCCESS_BUT_RESPONSE_FAULT
;
error_text
->
append
(
CouchbaseResponse
::
status_str
(
status
));
error_text
->
append
(
"(vbucket_id="
+
butil
::
IntToString
(
vb_index
)
+
") latency="
+
butil
::
Int64ToString
(
cntl
->
latency_us
())
+
"us @"
);
error_text
->
append
(
butil
::
endpoint2str
(
cntl
->
remote_side
()).
c_str
());
error_text
->
append
(
";"
);
}
else
{
*
reason
=
RESPONSE_OK
;
}
return
&
vb_map
->
_servers
[
i
];
}
}
return
nullptr
;
if
(
IsInRebalancing
(
vbucket_map
()))
{
return
*
reason
==
SERVER_DOWN
||
*
reason
==
RPC_SUCCESS_BUT_WRONG_SERVER
||
*
reason
==
RPC_SUCCESS_BUT_RESPONSE_FAULT
;
}
else
if
(
!
FLAGS_couchbase_disable_retry_during_active
)
{
return
*
reason
==
RPC_SUCCESS_BUT_WRONG_SERVER
||
(
*
reason
==
SERVER_DOWN
&&
context
.
_request
.
read_replicas
());
}
return
false
;
}
size_t
CouchbaseChannel
::
Hash
(
const
butil
::
StringPiece
&
key
,
const
size_t
vbuckets_num
)
{
size_t
digest
=
butil
::
hash_crc32
(
key
.
data
(),
key
.
size
());
return
digest
&
(
vbuckets_num
-
1
);
bool
CouchbaseChannel
::
DoRetry
(
const
int
reason
,
Controller
*
cntl
,
CouchbaseResponse
*
response
,
VBucketContext
*
vb_ctx
)
{
{
butil
::
DoublyBufferedData
<
VBucketServerMap
>::
ScopedPtr
vb_map
;
if
(
_vbucket_map
.
Read
(
&
vb_map
)
!=
0
)
{
cntl
->
SetFailed
(
ENOMEM
,
"failed to read vbucket map"
);
return
false
;
}
if
(
++
(
vb_ctx
->
_retried_count
)
>=
vb_map
->
_servers
.
size
())
{
cntl
->
SetFailed
(
"Reach the max couchbase retry count"
);
return
false
;
}
const
size_t
vb_index
=
vb_ctx
->
_vbucket_index
;
Channel
*
channel
=
SelectBackupChannel
(
vb_map
.
get
(),
vb_index
,
reason
,
vb_ctx
);
if
(
channel
==
nullptr
)
{
cntl
->
SetFailed
(
ENODATA
,
"no buckup server found"
);
return
false
;
}
const
CouchbaseRequest
*
request
=
&
(
vb_ctx
->
_request
);
if
(
vb_ctx
->
_server_type
==
REPLICA_SERVER
)
{
request
=
vb_ctx
->
GetReplicasReadRequest
();
}
response
->
Clear
();
channel
->
CallMethod
(
nullptr
,
cntl
,
request
,
response
,
DoNothing
());
}
Join
(
cntl
->
call_id
());
return
true
;
}
const
std
::
string
*
CouchbaseChannel
::
GetNextRetryServer
(
const
VBucketStatus
change
,
const
int
reason
,
const
VBucketServerMap
*
vb_map
,
const
size_t
vb_index
,
VBucketContext
*
context
)
{
int
curr_index
=
context
->
_server_index
;
const
int
server_num
=
vb_map
->
_servers
.
size
();
if
(
IsInRebalancing
(
vb_map
))
{
// keep current server to retry if it is right server of the vbucket.
if
(
reason
!=
RPC_SUCCESS_BUT_WRONG_SERVER
&&
reason
!=
SERVER_DOWN
)
{
if
(
curr_index
<
server_num
)
{
return
&
(
vb_map
->
_servers
[
curr_index
]);
}
}
int
next_index
=
GetDetectedMaster
(
vb_map
,
vb_index
);
if
(
next_index
>=
0
)
{
context
->
_server_type
=
DETECTED_SERVER
;
context
->
_server_index
=
next_index
;
return
&
(
vb_map
->
_servers
[
next_index
]);
}
int
dummy_index
=
-
1
;
// Retry forward master as first if having forward master. Otherwise,
// probe other servers.
if
(
!
GetForwardMaster
(
vb_map
,
vb_index
,
&
next_index
))
{
next_index
=
(
curr_index
+
1
)
%
server_num
;
}
(
*
_detected_vbucket_map
)[
vb_index
].
_index
.
compare_exchange_strong
(
dummy_index
,
next_index
,
butil
::
memory_order_release
);
context
->
_server_type
=
DETECTED_SERVER
;
context
->
_server_index
=
next_index
;
return
&
(
vb_map
->
_servers
[
next_index
]);
}
else
{
if
(
change
==
FORWARD_FINISH
||
change
==
MASTER_CHANGE_WITHOUT_F
)
{
context
->
_server_type
=
MASTER_SERVER
;
return
GetMaster
(
vb_map
,
vb_index
,
&
context
->
_server_index
);
}
else
{
if
(
reason
==
SERVER_DOWN
&&
context
->
_request
.
read_replicas
())
{
context
->
_server_type
=
REPLICA_SERVER
;
return
GetReplica
(
vb_map
,
vb_index
);
}
if
(
reason
==
RPC_SUCCESS_BUT_WRONG_SERVER
)
{
context
->
_server_type
=
DETECTED_SERVER
;
context
->
_server_index
=
(
curr_index
+
1
)
%
server_num
;
// TODO: need update detect server.
return
&
(
vb_map
->
_servers
[
context
->
_server_index
]);
}
}
}
return
nullptr
;
}
void
CouchbaseChannel
::
UpdateDetectedMasterIfNeeded
(
const
int
reason
,
const
VBucketContext
&
context
)
{
if
(
context
.
_server_type
==
REPLICA_SERVER
)
{
return
;
}
if
(
reason
==
DEFAULT_DUMMY
||
reason
==
RPC_FAILED
)
{
return
;
}
butil
::
DoublyBufferedData
<
VBucketServerMap
>::
ScopedPtr
vb_map
;
if
(
_vbucket_map
.
Read
(
&
vb_map
)
!=
0
)
{
LOG
(
ERROR
)
<<
"Failed to read vbucket map."
;
return
;
}
if
(
!
IsInRebalancing
(
vb_map
.
get
()))
{
return
;
}
const
int
server_num
=
vb_map
->
_servers
.
size
();
int
curr_index
=
context
.
_server_index
;
if
(
curr_index
>=
server_num
)
{
return
;
}
const
size_t
vb_index
=
context
.
_vbucket_index
;
DetectedMaster
&
detect_master
=
(
*
_detected_vbucket_map
)[
vb_index
];
butil
::
atomic
<
bool
>&
is_verified
=
detect_master
.
_verified
;
butil
::
atomic
<
int
>&
index
=
detect_master
.
_index
;
if
(
reason
!=
SERVER_DOWN
&&
reason
!=
RPC_SUCCESS_BUT_WRONG_SERVER
)
{
if
(
context
.
_server_type
==
MASTER_SERVER
)
{
return
;
}
// We detected the right new master for vbucket no matter the
// response status is success or not. Record for following request
// during rebalancing.
if
(
curr_index
!=
index
.
load
(
butil
::
memory_order_acquire
))
{
index
.
store
(
curr_index
,
butil
::
memory_order_relaxed
);
}
if
(
!
is_verified
.
load
(
butil
::
memory_order_acquire
))
{
is_verified
.
store
(
true
,
butil
::
memory_order_relaxed
);
}
}
else
{
// Server is down or it is a wrong server of the vbucket. Go on probing
// other servers.
int
dummy_index
=
-
1
;
int
next_index
=
-
1
;
// Detect forward master as the first if having forwad master,
// otherwise, probe other servers.
if
(
dummy_index
==
index
.
load
(
butil
::
memory_order_acquire
))
{
if
(
!
GetForwardMaster
(
vb_map
.
get
(),
vb_index
,
&
next_index
))
{
next_index
=
(
curr_index
+
1
)
%
server_num
;
}
index
.
compare_exchange_strong
(
dummy_index
,
next_index
,
butil
::
memory_order_release
,
butil
::
memory_order_relaxed
);
if
(
is_verified
.
load
(
butil
::
memory_order_acquire
))
{
is_verified
.
store
(
false
,
butil
::
memory_order_relaxed
);
}
}
else
{
next_index
=
(
curr_index
+
1
)
%
server_num
;
if
(
is_verified
.
load
(
butil
::
memory_order_acquire
))
{
// Verified master server is invalid. Reset to detect again.
if
(
index
.
compare_exchange_strong
(
curr_index
,
-
1
,
butil
::
memory_order_relaxed
,
butil
::
memory_order_relaxed
))
{
is_verified
.
store
(
false
,
butil
::
memory_order_relaxed
);
}
}
else
{
// Probe next servers.
index
.
compare_exchange_strong
(
curr_index
,
next_index
,
butil
::
memory_order_release
,
butil
::
memory_order_relaxed
);
}
}
}
}
int
CouchbaseChannel
::
GetDetectedMaster
(
const
VBucketServerMap
*
vb_map
,
const
size_t
vb_index
)
{
butil
::
atomic
<
int
>&
detected_index
=
(
*
_detected_vbucket_map
)[
vb_index
].
_index
;
const
int
server_num
=
vb_map
->
_servers
.
size
();
int
curr_index
=
detected_index
.
load
(
butil
::
memory_order_acquire
);
if
(
curr_index
>=
0
&&
curr_index
<
server_num
)
{
return
curr_index
;
}
if
(
curr_index
>=
server_num
)
{
detected_index
.
compare_exchange_strong
(
curr_index
,
-
1
,
butil
::
memory_order_relaxed
);
}
return
-
1
;
}
bool
CouchbaseChannel
::
UpdateVBucketServerMap
(
...
...
@@ -559,4 +1104,51 @@ int CouchbaseChannel::CheckHealth() {
return
0
;
}
const
std
::
string
*
GetMaster
(
const
VBucketServerMap
*
vb_map
,
const
size_t
vb_index
,
int
*
index
)
{
if
(
vb_index
<
vb_map
->
_vbucket
.
size
())
{
const
int
i
=
vb_map
->
_vbucket
[
vb_index
][
0
];
if
(
i
>=
0
&&
i
<
static_cast
<
int
>
(
vb_map
->
_servers
.
size
()))
{
if
(
index
!=
nullptr
)
{
*
index
=
i
;
}
return
&
vb_map
->
_servers
[
i
];
}
}
return
nullptr
;
}
const
std
::
string
*
GetForwardMaster
(
const
VBucketServerMap
*
vb_map
,
const
size_t
vb_index
,
int
*
index
)
{
if
(
vb_index
<
vb_map
->
_fvbucket
.
size
())
{
const
int
i
=
vb_map
->
_fvbucket
[
vb_index
][
0
];
if
(
i
>=
0
&&
i
<
static_cast
<
int
>
(
vb_map
->
_servers
.
size
()))
{
if
(
index
!=
nullptr
)
{
*
index
=
i
;
}
return
&
vb_map
->
_servers
[
i
];
}
}
if
(
index
!=
nullptr
)
{
*
index
=
-
1
;
}
return
nullptr
;
}
const
std
::
string
*
GetReplica
(
const
VBucketServerMap
*
vb_map
,
const
size_t
vb_index
)
{
if
(
vb_index
<
vb_map
->
_vbucket
.
size
())
{
const
int
index
=
vb_map
->
_vbucket
[
vb_index
][
0
];
if
(
index
!=
-
1
)
{
return
&
vb_map
->
_servers
[
index
];
}
}
return
nullptr
;
}
size_t
Hash
(
const
butil
::
StringPiece
&
key
,
const
size_t
vbuckets_num
)
{
size_t
digest
=
butil
::
hash_crc32
(
key
.
data
(),
key
.
size
());
return
digest
&
(
vbuckets_num
-
1
);
}
}
// namespace brpc
This diff is collapsed.
Click to expand it.
src/brpc/couchbase_channel.h
View file @
00dae2e2
...
...
@@ -25,11 +25,59 @@
#include "butil/containers/doubly_buffered_data.h"
namespace
brpc
{
// It is used to detect the new master server of vbuckets when the lastest
// vbucket mapping has not been received during rebalance.
class
DetectedMaster
{
public
:
DetectedMaster
()
:
_verified
(
false
),
_index
(
-
1
)
{}
butil
::
atomic
<
bool
>
_verified
;
butil
::
atomic
<
int
>
_index
;
private
:
DetectedMaster
(
const
DetectedMaster
&
)
=
delete
;
DetectedMaster
&
operator
=
(
const
DetectedMaster
&
)
=
delete
;
};
using
CouchbaseChannelMap
=
std
::
unordered_map
<
std
::
string
,
std
::
unique_ptr
<
Channel
>>
;
using
DetectedVBucketMap
=
std
::
vector
<
DetectedMaster
>
;
// Couchbase has two type of distribution used to map keys to servers.
// One is vbucket distribution and other is ketama distribution.
// This struct describes vbucket distribution of couchbase.
// 'num_replicas': the number of copies that will be stored on servers of one
// vbucket. Each vbucket must have this number of servers
// indexes plus one.
// '_vbucket': A zero-based indexed by vBucketId. The entries in the _vbucket
// are arrays of integers, where each integer is a zero-based
// index into the '_servers'.
// '_fvbucket': It is fast forward map with same struct as _vbucket. It is
// used to provide the final vBubcket-to-server map during the
// statrt of the rebalance.
// '_servers': all servers of a bucket.
// '_channel_map': the memcache channel for each server.
// TODO: support ketama vbucket distribution
struct
VBucketServerMap
{
uint64_t
_version
=
0
;
int
_num_replicas
=
0
;
std
::
vector
<
std
::
vector
<
int
>>
_vbucket
;
std
::
vector
<
std
::
vector
<
int
>>
_fvbucket
;
std
::
vector
<
std
::
string
>
_servers
;
CouchbaseChannelMap
_channel_map
;
};
enum
VBucketStatus
{
FORWARD_CREATE
=
0x00
,
FORWARD_FINISH
=
0x01
,
FORWARD_KEEPING
=
0x02
,
FORWARD_CHANGE
=
0x03
,
MASTER_CHANGE_WITHOUT_F
=
0x04
,
MASTER_KEEPING_WITHOUT_F
=
0x05
,
NO_CHANGE
=
0x06
,
};
class
CouchbaseServerListener
;
class
VBucketContext
;
// A couchbase channel maps different key to sub memcache channel according to
// current vbuckets mapping. It retrieves current vbuckets mapping by maintain
...
...
@@ -40,17 +88,27 @@ class CouchbaseServerListener;
// For async rpc, Should not delete this channel until rpc done.
class
CouchbaseChannel
:
public
ChannelBase
/*non-copyable*/
{
friend
class
CouchbaseServerListener
;
friend
class
VBucketContext
;
friend
class
CouchbaseDone
;
public
:
CouchbaseChannel
();
~
CouchbaseChannel
();
// You MUST initialize a couchbasechannel before using it.
// 'Server_addr': address list of couchbase servers. On these addresses, we
// can get vbucket map.
// can get vbucket map. Like following: "addr1:port1,addr2:port2"
// 'bucket_name': the bucket name of couchbase server to access.
// 'options': is used for each memcache channel of vbucket. The protocol
// should be PROTOCOL_MEMCACHE. If 'options' is null,
// use default options.
int
Init
(
const
char
*
server_addr
,
const
ChannelOptions
*
options
);
int
Init
(
const
char
*
server_addr
,
const
char
*
bucket_name
,
const
ChannelOptions
*
options
);
// 'listen_url': from this url, we can get vbucket map. Usually, it is
// somthing like following:
// "http://host:port/pools/default/bucketsStreaming/bucket_name" or
// "http://host:port/pools/default/buckets/bucket_name"
int
Init
(
const
char
*
listen_url
,
const
ChannelOptions
*
options
);
// TODO: Do not support pipeline mode now.
// Send request to the mapped channel according to the key of request.
...
...
@@ -62,45 +120,38 @@ public:
void
Describe
(
std
::
ostream
&
os
,
const
DescribeOptions
&
options
);
// Couchbase has two type of distribution used to map keys to servers.
// One is vbucket distribution and other is ketama distribution.
// This struct describes vbucket distribution of couchbase.
// 'num_replicas': the number of copies that will be stored on servers of one
// vbucket. Each vbucket must have this number of servers
// indexes plus one.
// '_vbucket': A zero-based indexed by vBucketId. The entries in the _vbucket
// are arrays of integers, where each integer is a zero-based
// index into the '_servers'.
// '_fvbucket': It is fast forward map with same struct as _vbucket. It is
// used to provide the final vBubcket-to-server map during the
// statrt of the rebalance.
// '_servers': all servers of a bucket.
// '_channel_map': the memcache channel for each server.
// TODO: support ketama vbucket distribution
struct
VBucketServerMap
{
uint64_t
_version
=
0
;
int
_num_replicas
=
0
;
std
::
vector
<
std
::
vector
<
int
>>
_vbucket
;
std
::
vector
<
std
::
vector
<
int
>>
_fvbucket
;
std
::
vector
<
std
::
string
>
_servers
;
CouchbaseChannelMap
_channel_map
;
};
private
:
int
CheckHealth
();
Channel
*
SelectMasterChannel
(
const
VBucketServerMap
*
vb_map
,
const
size_t
vb_index
);
Channel
*
SelectBackupChannel
(
const
VBucketServerMap
*
vb_map
,
const
size_t
vb_index
,
const
int
reason
,
VBucketContext
*
context
);
Channel
*
GetMappedChannel
(
const
std
::
string
*
server
,
const
VBucketServerMap
*
vb_map
);
const
CouchbaseChannelMap
&
GetChannelMap
();
const
VBucketServerMap
*
vbucket_map
();
bool
IsNeedRetry
(
const
Controller
*
cntl
,
const
VBucketContext
&
context
,
CouchbaseResponse
*
response
,
int
*
reason
,
std
::
string
*
error_text
);
bool
DoRetry
(
const
int
reason
,
Controller
*
cntl
,
CouchbaseResponse
*
response
,
VBucketContext
*
vb_ct
);
int
GetDetectedMaster
(
const
VBucketServerMap
*
vb_map
,
const
size_t
vb_index
);
const
std
::
string
*
GetMaster
(
const
VBucketServerMap
*
vb_map
,
const
size_t
vb_index
,
int
*
index
=
nullptr
);
void
UpdateDetectedMasterIfNeeded
(
const
int
reason
,
const
VBucketContext
&
context
);
size_t
Hash
(
const
butil
::
StringPiece
&
key
,
const
size_t
vbuckets_num
);
bool
IsInRebalancing
(
const
VBucketServerMap
*
vb_map
)
{
return
!
vb_map
->
_fvbucket
.
empty
();
}
const
std
::
string
*
GetNextRetryServer
(
const
VBucketStatus
change
,
const
int
reason
,
const
VBucketServerMap
*
vb_map
,
const
size_t
vb_index
,
VBucketContext
*
context
);
bool
UpdateVBucketServerMap
(
const
int
num_replicas
,
...
...
@@ -121,10 +172,13 @@ private:
std
::
string
GetAuthentication
()
const
;
// Options for each memcache channel of
vbucket
.
// Options for each memcache channel of
real servers
.
ChannelOptions
_common_options
;
// Listener monitor and update vbucket map
information
.
// Listener monitor and update vbucket map.
std
::
unique_ptr
<
CouchbaseServerListener
>
_listener
;
// We need detect new vbucket map due to current vbucket map is invalid
// during rebalance.
std
::
unique_ptr
<
DetectedVBucketMap
>
_detected_vbucket_map
;
butil
::
DoublyBufferedData
<
VBucketServerMap
>
_vbucket_map
;
};
...
...
This diff is collapsed.
Click to expand it.
src/brpc/global.cpp
View file @
00dae2e2
...
...
@@ -30,6 +30,7 @@
#include "brpc/policy/domain_naming_service.h"
#include "brpc/policy/remote_file_naming_service.h"
#include "brpc/policy/consul_naming_service.h"
#include "brpc/policy/couchbase_naming_service.h"
// Load Balancers
#include "brpc/policy/round_robin_load_balancer.h"
...
...
@@ -119,6 +120,7 @@ struct GlobalExtensions {
DomainNamingService
dns
;
RemoteFileNamingService
rfns
;
ConsulNamingService
cns
;
CouchbaseNamingService
cblns
;
RoundRobinLoadBalancer
rr_lb
;
WeightedRoundRobinLoadBalancer
wrr_lb
;
...
...
@@ -337,6 +339,7 @@ static void GlobalInitializeOrDieImpl() {
NamingServiceExtension
()
->
RegisterOrDie
(
"redis"
,
&
g_ext
->
dns
);
NamingServiceExtension
()
->
RegisterOrDie
(
"remotefile"
,
&
g_ext
->
rfns
);
NamingServiceExtension
()
->
RegisterOrDie
(
"consul"
,
&
g_ext
->
cns
);
NamingServiceExtension
()
->
RegisterOrDie
(
"couchbase_list"
,
&
g_ext
->
cblns
);
// Load Balancers
LoadBalancerExtension
()
->
RegisterOrDie
(
"rr"
,
&
g_ext
->
rr_lb
);
...
...
This diff is collapsed.
Click to expand it.
src/brpc/policy/couchbase_naming_service.cpp
0 → 100644
View file @
00dae2e2
// Copyright (c) 2018 Iqiyi, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Cai,Daojin (Caidaojin@qiyi.com)
#include <stdlib.h> // strtol
#include <string> // std::string
#include <set> // std::set
#include "butil/string_splitter.h" // StringSplitter
#include "butil/strings/string_piece.h"
#include "butil/strings/string_split.h"
#include "butil/strings/string_number_conversions.h"
#include "brpc/log.h"
#include "brpc/policy/couchbase_naming_service.h"
namespace
brpc
{
namespace
policy
{
// Defined in file_naming_service.cpp
bool
SplitIntoServerAndTag
(
const
butil
::
StringPiece
&
line
,
butil
::
StringPiece
*
server_addr
,
butil
::
StringPiece
*
tag
);
butil
::
Mutex
CouchbaseNamingService
::
_mutex
;
std
::
unordered_map
<
std
::
string
,
std
::
string
>
CouchbaseNamingService
::
servers_map
;
bool
CouchbaseNamingService
::
ParseListenUrl
(
const
butil
::
StringPiece
listen_url
,
std
::
string
*
server
,
std
::
string
*
streaming_uri
,
std
::
string
*
init_uri
)
{
do
{
const
size_t
pos
=
listen_url
.
find
(
"//"
);
if
(
pos
==
listen_url
.
npos
)
{
break
;
}
const
size_t
host_pos
=
listen_url
.
find
(
'/'
,
pos
+
2
);
if
(
host_pos
==
listen_url
.
npos
)
{
break
;
}
butil
::
StringPiece
sub_str
=
listen_url
.
substr
(
pos
+
2
,
host_pos
-
pos
-
2
);
server
->
clear
();
server
->
append
(
sub_str
.
data
(),
sub_str
.
length
());
butil
::
EndPoint
point
;
if
(
butil
::
str2endpoint
(
server
->
c_str
(),
&
point
)
!=
0
)
{
LOG
(
FATAL
)
<<
"Failed to get address and port
\'
"
<<
server
<<
"
\'
."
;
break
;
}
butil
::
StringPiece
uri_sub
=
listen_url
;
uri_sub
.
remove_prefix
(
host_pos
);
size_t
uri_pos
=
uri_sub
.
find
(
"/bucketsStreaming/"
);
if
(
uri_pos
!=
uri_sub
.
npos
)
{
streaming_uri
->
clear
();
streaming_uri
->
append
(
uri_sub
.
data
(),
uri_sub
.
length
());
init_uri
->
clear
();
init_uri
->
append
(
uri_sub
.
data
(),
uri_pos
);
init_uri
->
append
(
"/buckets/"
);
butil
::
StringPiece
bucket_name
=
uri_sub
;
bucket_name
.
remove_prefix
(
uri_pos
+
std
::
strlen
(
"/bucketsStreaming/"
));
init_uri
->
append
(
bucket_name
.
data
(),
bucket_name
.
length
());
return
true
;
}
uri_pos
=
uri_sub
.
find
(
"/buckets/"
);
if
(
uri_pos
!=
uri_sub
.
npos
)
{
init_uri
->
clear
();
init_uri
->
append
(
uri_sub
.
data
(),
uri_sub
.
length
());
streaming_uri
->
clear
();
streaming_uri
->
append
(
uri_sub
.
data
(),
uri_pos
);
streaming_uri
->
append
(
"/bucketsStreaming/"
);
butil
::
StringPiece
bucket_name
=
uri_sub
;
bucket_name
.
remove_prefix
(
uri_pos
+
std
::
strlen
(
"/buckets/"
));
streaming_uri
->
append
(
bucket_name
.
data
(),
bucket_name
.
length
());
return
true
;
}
}
while
(
false
);
LOG
(
FATAL
)
<<
"Failed to parse listen url
\'
"
<<
listen_url
<<
"
\'
."
;
return
false
;
}
bool
CouchbaseNamingService
::
ParseNamingServiceUrl
(
const
butil
::
StringPiece
ns_url
,
std
::
string
*
listen_port
)
{
butil
::
StringPiece
protocol
;
std
::
string
server_list
;
const
size_t
pos
=
ns_url
.
find
(
"//"
);
if
(
pos
!=
ns_url
.
npos
)
{
protocol
=
ns_url
.
substr
(
0
,
pos
);
butil
::
StringPiece
sub
=
ns_url
.
substr
(
pos
+
2
);
server_list
.
append
(
sub
.
data
(),
sub
.
length
());
}
if
(
protocol
!=
"couchbase_list:"
&&
server_list
.
empty
())
{
LOG
(
FATAL
)
<<
"Invalid couchbase naming service "
<<
ns_url
;
return
false
;
}
std
::
vector
<
std
::
string
>
server_array
;
butil
::
SplitString
(
server_list
,
','
,
&
server_array
);
listen_port
->
clear
();
for
(
const
std
::
string
&
addr_port
:
server_array
)
{
butil
::
EndPoint
point
;
if
(
butil
::
str2endpoint
(
addr_port
.
c_str
(),
&
point
)
!=
0
)
{
LOG
(
FATAL
)
<<
"Failed to get endpoint from
\'
"
<<
addr_port
<<
"
\'
of the naming server url
\'
"
<<
ns_url
<<
"
\'
."
;
return
false
;
}
if
(
listen_port
->
empty
())
{
*
listen_port
=
butil
::
IntToString
(
point
.
port
);
}
}
return
true
;
}
int
CouchbaseNamingService
::
GetServers
(
const
char
*
service_name
,
std
::
vector
<
ServerNode
>*
servers
)
{
servers
->
clear
();
// Sort/unique the inserted vector is faster, but may have a different order
// of addresses from the file. To make assertions in tests easier, we use
// set to de-duplicate and keep the order.
std
::
set
<
ServerNode
>
presence
;
std
::
string
line
;
if
(
!
service_name
)
{
LOG
(
FATAL
)
<<
"Param[service_name] is NULL"
;
return
-
1
;
}
std
::
string
new_servers
(
service_name
);
{
BAIDU_SCOPED_LOCK
(
_mutex
);
const
auto
&
iter
=
servers_map
.
find
(
new_servers
);
if
(
iter
!=
servers_map
.
end
())
{
new_servers
=
iter
->
second
;
}
}
RemoveUniqueSuffix
(
new_servers
);
for
(
butil
::
StringSplitter
sp
(
new_servers
.
c_str
(),
','
);
sp
!=
NULL
;
++
sp
)
{
line
.
assign
(
sp
.
field
(),
sp
.
length
());
butil
::
StringPiece
addr
;
butil
::
StringPiece
tag
;
if
(
!
SplitIntoServerAndTag
(
line
,
&
addr
,
&
tag
))
{
continue
;
}
const_cast
<
char
*>
(
addr
.
data
())[
addr
.
size
()]
=
'\0'
;
// safe
butil
::
EndPoint
point
;
if
(
str2endpoint
(
addr
.
data
(),
&
point
)
!=
0
&&
hostname2endpoint
(
addr
.
data
(),
&
point
)
!=
0
)
{
LOG
(
ERROR
)
<<
"Invalid address=`"
<<
addr
<<
'\''
;
continue
;
}
ServerNode
node
;
node
.
addr
=
point
;
tag
.
CopyToString
(
&
node
.
tag
);
if
(
presence
.
insert
(
node
).
second
)
{
servers
->
push_back
(
node
);
}
else
{
RPC_VLOG
<<
"Duplicated server="
<<
node
;
}
}
RPC_VLOG
<<
"Got "
<<
servers
->
size
()
<<
(
servers
->
size
()
>
1
?
" servers"
:
" server"
);
return
0
;
}
void
CouchbaseNamingService
::
Describe
(
std
::
ostream
&
os
,
const
DescribeOptions
&
)
const
{
os
<<
"Couchbase_list"
;
return
;
}
NamingService
*
CouchbaseNamingService
::
New
()
const
{
return
new
CouchbaseNamingService
;
}
void
CouchbaseNamingService
::
Destroy
()
{
delete
this
;
}
void
CouchbaseNamingService
::
ResetCouchbaseListenerServers
(
const
std
::
string
&
service_name
,
std
::
string
&
new_servers
)
{
BAIDU_SCOPED_LOCK
(
_mutex
);
auto
iter
=
servers_map
.
find
(
service_name
);
if
(
iter
!=
servers_map
.
end
())
{
iter
->
second
.
swap
(
new_servers
);
}
else
{
servers_map
.
emplace
(
service_name
,
new_servers
);
}
}
std
::
string
CouchbaseNamingService
::
AddUniqueSuffix
(
const
char
*
name_url
,
const
char
*
unique_id
)
{
std
::
string
couchbase_name_url
;
couchbase_name_url
.
append
(
name_url
);
couchbase_name_url
.
append
(
1
,
'_'
);
couchbase_name_url
.
append
(
unique_id
);
return
std
::
move
(
couchbase_name_url
);
}
void
CouchbaseNamingService
::
RemoveUniqueSuffix
(
std
::
string
&
name_service
)
{
const
size_t
pos
=
name_service
.
find
(
'_'
);
if
(
pos
!=
std
::
string
::
npos
)
{
name_service
.
resize
(
pos
);
}
}
}
// namespace policy
}
// namespace brpc
This diff is collapsed.
Click to expand it.
src/brpc/policy/couchbase_naming_service.h
0 → 100644
View file @
00dae2e2
// Copyright (c) 2018 Iqiyi, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Cai,Daojin (caidaojin@qiyi.com)
#ifndef BRPC_POLICY_COUCHBASE_NAMING_SERVICE
#define BRPC_POLICY_COUCHBASE_NAMING_SERVICE
#include <unordered_map>
#include "brpc/periodic_naming_service.h"
namespace
brpc
{
class
CouchbaseServerListener
;
}
namespace
brpc
{
namespace
policy
{
// It is only used for couchbase channel. It updates servers for listen channel
// of CouchbaseServerListener. The naming service format is like
// "couchbase_list://addr1:port,addr:port_****" where "_****" is a unique id for
// each couchbase channel since we can not share naming service and "addr*:port"
// are avalible servers for initializing.
// After initialization, it get the latest server list periodically from
// 'servers_map' by service name as key.
class
CouchbaseNamingService
:
public
PeriodicNamingService
{
friend
brpc
::
CouchbaseServerListener
;
private
:
static
butil
::
Mutex
_mutex
;
// Store the lastest server list for each couchbase channel.
// Key is service name of each couchbase channel and value is the latest
// server list. It is like following:
// key: addr1:port,addr2:port_****
// value: addr1:port,addr2:port,addr3:port
static
std
::
unordered_map
<
std
::
string
,
std
::
string
>
servers_map
;
int
GetServers
(
const
char
*
service_name
,
std
::
vector
<
ServerNode
>*
servers
);
static
bool
ParseNamingServiceUrl
(
butil
::
StringPiece
ns_url
,
std
::
string
*
listen_port
);
static
bool
ParseListenUrl
(
const
butil
::
StringPiece
listen_url
,
std
::
string
*
server_address
,
std
::
string
*
streaming_uri
,
std
::
string
*
init_uri
);
// Clear naming server data when couchbase channel destroyed.
static
void
ClearNamingServiceData
(
const
std
::
string
&
service_name
)
{
BAIDU_SCOPED_LOCK
(
_mutex
);
servers_map
.
erase
(
service_name
);
}
// Called by couchbase listener when vbucekt map changing.
// It set new server list for key 'service_name' in servers_map.
static
void
ResetCouchbaseListenerServers
(
const
std
::
string
&
service_name
,
std
::
string
&
new_servers
);
// For couchbase listeners, we should not share this name service object.
// So we append couchbase listener address to make name_url unique.
// Input: couchbase_list://address1:port1,address2:port2
// Output: couchbase_list://address1:port1,address2:port2_****
static
std
::
string
AddUniqueSuffix
(
const
char
*
name_url
,
const
char
*
unique_id
);
// Reserve handling to AddPrefixBeforeAddress.
void
RemoveUniqueSuffix
(
std
::
string
&
name_service
);
void
Describe
(
std
::
ostream
&
os
,
const
DescribeOptions
&
options
)
const
;
NamingService
*
New
()
const
;
void
Destroy
();
};
}
// namespace policy
}
// namespace brpc
#endif //BRPC_POLICY_COUCHBASE_NAMING_SERVICE
This diff is collapsed.
Click to expand it.
src/brpc/policy/memcache_binary_header.h
View file @
00dae2e2
...
...
@@ -91,7 +91,10 @@ enum MemcacheBinaryCommand {
MC_BINARY_RINCR
=
0x39
,
MC_BINARY_RINCRQ
=
0x3a
,
MC_BINARY_RDECR
=
0x3b
,
MC_BINARY_RDECRQ
=
0x3c
MC_BINARY_RDECRQ
=
0x3c
,
// Replicas read for couchbase
MC_BINARY_REPLICAS_READ
=
0x83
// End Range operations
};
...
...
This diff is collapsed.
Click to expand it.
src/brpc/policy/memcache_binary_protocol.cpp
View file @
00dae2e2
...
...
@@ -64,6 +64,7 @@ static void InitSupportedCommandMap() {
butil
::
bit_array_set
(
supported_cmd_map
,
MC_BINARY_STAT
);
butil
::
bit_array_set
(
supported_cmd_map
,
MC_BINARY_TOUCH
);
butil
::
bit_array_set
(
supported_cmd_map
,
MC_BINARY_SASL_AUTH
);
butil
::
bit_array_set
(
supported_cmd_map
,
MC_BINARY_REPLICAS_READ
);
}
inline
bool
IsSupportedCommand
(
uint8_t
command
)
{
...
...
This diff is collapsed.
Click to expand it.
src/butil/third_party/libvbucket/ketama.c
View file @
00dae2e2
...
...
@@ -4,7 +4,7 @@
/* This library uses the reference MD5 implementation from [RFC1321] */
#define PROTOTYPES 1
#include "butil/third_party/libvbucket/rfc1321/md5
c.c
"
#include "butil/third_party/libvbucket/rfc1321/md5
.h
"
#undef PROTOTYPES
void
hash_md5
(
const
char
*
key
,
size_t
key_length
,
unsigned
char
*
result
)
...
...
This diff is collapsed.
Click to expand it.
src/butil/third_party/libvbucket/rfc1321/md5.h
View file @
00dae2e2
...
...
@@ -24,6 +24,9 @@
*/
/* MD5 context. */
#include "butil/third_party/libvbucket/rfc1321/global.h"
typedef
struct
{
UINT4
state
[
4
];
/* state (ABCD) */
UINT4
count
[
2
];
/* number of bits, modulo 2^64 (lsb first) */
...
...
This diff is collapsed.
Click to expand it.
src/butil/third_party/libvbucket/rfc1321/md5c.c
View file @
00dae2e2
...
...
@@ -23,7 +23,6 @@
documentation and/or software.
*/
#include "butil/third_party/libvbucket/rfc1321/global.h"
#include "butil/third_party/libvbucket/rfc1321/md5.h"
/* Constants for MD5Transform routine.
...
...
This diff is collapsed.
Click to expand it.
src/butil/third_party/libvbucket/vbucket.c
View file @
00dae2e2
...
...
@@ -741,6 +741,10 @@ const char *vbucket_config_get_rest_api_server(VBUCKET_CONFIG_HANDLE vb, int i)
return
vb
->
servers
[
i
].
rest_api_authority
;
}
int
vbucket_config_has_forward_vbuckets
(
VBUCKET_CONFIG_HANDLE
vb
)
{
return
vb
->
fvbuckets
?
1
:
0
;
}
int
vbucket_config_is_config_node
(
VBUCKET_CONFIG_HANDLE
vb
,
int
i
)
{
return
vb
->
servers
[
i
].
config_node
;
}
...
...
@@ -782,6 +786,23 @@ int vbucket_get_replica(VBUCKET_CONFIG_HANDLE vb, int vbucket, int i) {
}
}
int
fvbucket_get_master
(
VBUCKET_CONFIG_HANDLE
vb
,
int
vbucket
)
{
if
(
vb
->
fvbuckets
)
{
return
vb
->
fvbuckets
[
vbucket
].
servers
[
0
];
}
return
-
1
;
}
int
fvbucket_get_replica
(
VBUCKET_CONFIG_HANDLE
vb
,
int
vbucket
,
int
i
)
{
if
(
vb
->
fvbuckets
)
{
int
idx
=
i
+
1
;
if
(
idx
<
vb
->
num_servers
)
{
return
vb
->
fvbuckets
[
vbucket
].
servers
[
idx
];
}
}
return
-
1
;
}
int
vbucket_found_incorrect_master
(
VBUCKET_CONFIG_HANDLE
vb
,
int
vbucket
,
int
wrongserver
)
{
int
mappedServer
=
vb
->
vbuckets
[
vbucket
].
servers
[
0
];
...
...
This diff is collapsed.
Click to expand it.
src/butil/third_party/libvbucket/vbucket.h
View file @
00dae2e2
...
...
@@ -352,6 +352,39 @@ namespace butil {
LIBVBUCKET_PUBLIC_API
int
vbucket_get_replica
(
VBUCKET_CONFIG_HANDLE
h
,
int
id
,
int
n
);
/**
* Check whether including forward vbuckets
*
* @param id the fvbucket identifier
*
* @return true if forward vbuckets included.
*/
LIBVBUCKET_PUBLIC_API
int
vbucket_config_has_forward_vbuckets
(
VBUCKET_CONFIG_HANDLE
h
);
/**
* Get the master server for the given vbucket.
*
* @param h the vbucket config
* @param id the fvbucket identifier
*
* @return the server index
*/
LIBVBUCKET_PUBLIC_API
int
fvbucket_get_master
(
VBUCKET_CONFIG_HANDLE
h
,
int
id
);
/**
* Get a given replica for a forward vbucket.
*
* @param h the vbucket config
* @param id the vbucket id
* @param n the replica number
*
* @return the server ID
*/
LIBVBUCKET_PUBLIC_API
int
fvbucket_get_replica
(
VBUCKET_CONFIG_HANDLE
h
,
int
id
,
int
n
);
/**
* @}
*/
...
...
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment