Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
B
brpc
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
brpc
Commits
675197a0
Commit
675197a0
authored
Mar 09, 2018
by
root
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
fix for ci comments
parent
4eb18301
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
50 additions
and
48 deletions
+50
-48
weighted_round_robin_load_balancer.cpp
src/brpc/policy/weighted_round_robin_load_balancer.cpp
+18
-14
weighted_round_robin_load_balancer.h
src/brpc/policy/weighted_round_robin_load_balancer.h
+10
-10
brpc_load_balancer_unittest.cpp
test/brpc_load_balancer_unittest.cpp
+22
-24
No files found.
src/brpc/policy/weighted_round_robin_load_balancer.cpp
View file @
675197a0
...
...
@@ -23,13 +23,16 @@
namespace
{
const
std
::
vector
<
uint32_t
>
prime_stride
=
{
2
,
3
,
5
,
11
,
17
,
29
,
47
,
71
,
107
,
137
,
163
,
251
,
307
,
379
,
569
,
683
,
857
,
1289
,
1543
,
1949
,
2617
,
2927
,
3407
,
4391
,
6599
,
9901
,
14867
,
22303
,
33457
,
50207
,
75323
,
112997
,
169501
,
254257
,
381389
,
572087
};
bool
IsCoprime
(
uint32_t
num1
,
uint32_t
num2
)
{
uint32_t
temp
;
const
std
::
vector
<
uint64_t
>
prime_stride
=
{
2
,
3
,
5
,
11
,
17
,
29
,
47
,
71
,
107
,
137
,
163
,
251
,
307
,
379
,
569
,
683
,
857
,
1289
,
1543
,
1949
,
2617
,
2927
,
3407
,
4391
,
6599
,
9901
,
14867
,
22303
,
33457
,
50207
,
75323
,
112997
,
169501
,
254257
,
381389
,
572087
,
849083
,
1273637
,
1910471
,
2865727
,
4298629
,
6447943
,
9671923
,
14507903
,
21761863
,
32642861
,
48964297
,
73446469
,
110169743
,
165254623
,
247881989
,
371822987
,
557734537
,
836601847
,
1254902827
,
1882354259
,
2823531397
,
4235297173
,
6352945771
,
9529418671
};
bool
IsCoprime
(
uint64_t
num1
,
uint64_t
num2
)
{
uint64_t
temp
;
if
(
num1
<
num2
)
{
temp
=
num1
;
num1
=
num2
;
...
...
@@ -48,7 +51,7 @@ bool IsCoprime(uint32_t num1, uint32_t num2) {
}
// Get a reasonable stride according to weights configured of servers.
uint
32_t
GetStride
(
const
uint32_t
weight_sum
,
const
uint32
_t
num
)
{
uint
64_t
GetStride
(
const
uint64_t
weight_sum
,
const
size
_t
num
)
{
if
(
weight_sum
==
1
)
{
return
1
;
}
...
...
@@ -72,8 +75,9 @@ bool WeightedRoundRobinLoadBalancer::Add(Servers& bg, const ServerId& id) {
if
(
bg
.
server_list
.
capacity
()
<
128
)
{
bg
.
server_list
.
reserve
(
128
);
}
int
weight
=
0
;
if
(
butil
::
StringToInt
(
id
.
tag
,
&
weight
)
&&
weight
>
0
)
{
uint32_t
weight
=
0
;
if
(
butil
::
StringToUint
(
id
.
tag
,
&
weight
)
&&
weight
>
0
)
{
bool
insert_server
=
bg
.
server_map
.
emplace
(
id
.
id
,
bg
.
server_list
.
size
()).
second
;
if
(
insert_server
)
{
...
...
@@ -167,7 +171,7 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
tls
.
remain_server
.
id
!=
s
->
server_list
[
tls
.
position
].
id
)
{
tls
.
remain_server
.
weight
=
0
;
}
for
(
uint
32
_t
i
=
0
;
i
!=
tls
.
stride
;
++
i
)
{
for
(
uint
64
_t
i
=
0
;
i
!=
tls
.
stride
;
++
i
)
{
SocketId
server_id
=
GetServerInNextStride
(
s
->
server_list
,
tls
);
if
(
!
ExcludedServers
::
IsExcluded
(
in
.
excluded
,
server_id
)
&&
Socket
::
Address
(
server_id
,
out
->
ptr
)
==
0
...
...
@@ -178,10 +182,10 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut*
return
EHOSTDOWN
;
}
int64_t
WeightedRoundRobinLoadBalancer
::
GetServerInNextStride
(
SocketId
WeightedRoundRobinLoadBalancer
::
GetServerInNextStride
(
const
std
::
vector
<
Server
>&
server_list
,
TLS
&
tls
)
{
SocketId
final_server
=
0
;
in
t
stride
=
tls
.
stride
;
uint64_
t
stride
=
tls
.
stride
;
if
(
tls
.
remain_server
.
weight
>
0
)
{
final_server
=
tls
.
remain_server
.
id
;
if
(
tls
.
remain_server
.
weight
>
stride
)
{
...
...
@@ -195,7 +199,7 @@ int64_t WeightedRoundRobinLoadBalancer::GetServerInNextStride(
}
}
while
(
stride
>
0
)
{
in
t
configured_weight
=
server_list
[
tls
.
position
].
weight
;
uint32_
t
configured_weight
=
server_list
[
tls
.
position
].
weight
;
final_server
=
server_list
[
tls
.
position
].
id
;
if
(
configured_weight
>
stride
)
{
tls
.
remain_server
.
id
=
final_server
;
...
...
src/brpc/policy/weighted_round_robin_load_balancer.h
View file @
675197a0
...
...
@@ -40,24 +40,24 @@ public:
private
:
struct
Server
{
Server
(
SocketId
s_id
=
0
,
in
t
s_w
=
0
)
:
id
(
s_id
),
weight
(
s_w
)
{}
Server
(
SocketId
s_id
=
0
,
uint32_
t
s_w
=
0
)
:
id
(
s_id
),
weight
(
s_w
)
{}
SocketId
id
;
in
t
weight
;
uint32_
t
weight
;
};
struct
Servers
{
// The value is configured weight for each server.
std
::
vector
<
Server
>
server_list
;
// The value is the index of the server in "server_list".
std
::
map
<
SocketId
,
size_t
>
server_map
;
uint
32
_t
weight_sum
=
0
;
uint
64
_t
weight_sum
=
0
;
};
struct
TLS
{
uint32
_t
position
=
0
;
uint
32
_t
stride
=
0
;
size
_t
position
=
0
;
uint
64
_t
stride
=
0
;
Server
remain_server
;
// If server list changed, we need caculate a new stride.
bool
IsNeededCaculateNewStride
(
const
uint
32
_t
curr_weight_sum
,
const
uint32
_t
curr_servers_num
)
{
bool
IsNeededCaculateNewStride
(
const
uint
64
_t
curr_weight_sum
,
const
size
_t
curr_servers_num
)
{
if
(
curr_weight_sum
!=
weight_sum
||
curr_servers_num
!=
servers_num
)
{
weight_sum
=
curr_weight_sum
;
...
...
@@ -67,14 +67,14 @@ private:
return
false
;
}
private
:
uint
32
_t
weight_sum
=
0
;
uint32
_t
servers_num
=
0
;
uint
64
_t
weight_sum
=
0
;
size
_t
servers_num
=
0
;
};
static
bool
Add
(
Servers
&
bg
,
const
ServerId
&
id
);
static
bool
Remove
(
Servers
&
bg
,
const
ServerId
&
id
);
static
size_t
BatchAdd
(
Servers
&
bg
,
const
std
::
vector
<
ServerId
>&
servers
);
static
size_t
BatchRemove
(
Servers
&
bg
,
const
std
::
vector
<
ServerId
>&
servers
);
static
int64_t
GetServerInNextStride
(
const
std
::
vector
<
Server
>&
server_list
,
static
SocketId
GetServerInNextStride
(
const
std
::
vector
<
Server
>&
server_list
,
TLS
&
tls
);
butil
::
DoublyBufferedData
<
Servers
,
TLS
>
_db_servers
;
...
...
test/brpc_load_balancer_unittest.cpp
View file @
675197a0
...
...
@@ -389,7 +389,7 @@ TEST_F(LoadBalancerTest, fairness) {
id
.
tag
=
"100"
;
}
else
if
(
4
==
round
)
{
if
(
i
%
50
==
0
)
{
id
.
tag
=
std
::
to_string
(
i
/
50
*
100
+
butil
::
fast_rand_less_than
(
40
)
+
80
);
id
.
tag
=
std
::
to_string
(
i
*
2
+
butil
::
fast_rand_less_than
(
40
)
+
80
);
}
else
{
id
.
tag
=
std
::
to_string
(
butil
::
fast_rand_less_than
(
40
)
+
80
);
}
...
...
@@ -437,13 +437,6 @@ TEST_F(LoadBalancerTest, fairness) {
size_t
count_sum
=
0
;
size_t
count_squared_sum
=
0
;
std
::
cout
<<
lb_name
<<
':'
<<
'\n'
;
if
(
3
==
round
||
4
==
round
)
{
std
::
cout
<<
"configured weight: "
<<
std
::
endl
;
std
::
ostringstream
os
;
brpc
::
DescribeOptions
opt
;
lb
->
Describe
(
os
,
opt
);
std
::
cout
<<
os
.
str
()
<<
std
::
endl
;
}
if
(
round
!=
3
&&
round
!=
4
)
{
for
(
size_t
i
=
0
;
i
<
ids
.
size
();
++
i
)
{
...
...
@@ -458,22 +451,27 @@ TEST_F(LoadBalancerTest, fairness) {
<<
": average="
<<
count_sum
/
ids
.
size
()
<<
" deviation="
<<
sqrt
(
count_squared_sum
*
ids
.
size
()
-
count_sum
*
count_sum
)
/
ids
.
size
()
<<
std
::
endl
;
}
else
{
// for weighted round robin load balancer
double
scaling_count_sum
=
0.0
;
double
scaling_count_squared_sum
=
0.0
;
for
(
size_t
i
=
0
;
i
<
ids
.
size
();
++
i
)
{
size_t
count
=
total_count
[
ids
[
i
].
id
];
ASSERT_NE
(
0ul
,
count
)
<<
"i="
<<
i
;
std
::
cout
<<
i
<<
'='
<<
count
<<
' '
;
double
scaling_count
=
static_cast
<
double
>
(
count
)
/
std
::
stoi
(
ids
[
i
].
tag
);
scaling_count_sum
+=
scaling_count
;
scaling_count_squared_sum
+=
scaling_count
*
scaling_count
;
}
std
::
cout
<<
'\n'
<<
": scaling average="
<<
scaling_count_sum
/
ids
.
size
()
<<
" scaling deviation="
<<
sqrt
(
scaling_count_squared_sum
*
ids
.
size
()
-
scaling_count_sum
*
scaling_count_sum
)
/
ids
.
size
()
<<
std
::
endl
;
}
}
else
{
// for weighted round robin load balancer
std
::
cout
<<
"configured weight: "
<<
std
::
endl
;
std
::
ostringstream
os
;
brpc
::
DescribeOptions
opt
;
lb
->
Describe
(
os
,
opt
);
std
::
cout
<<
os
.
str
()
<<
std
::
endl
;
double
scaling_count_sum
=
0.0
;
double
scaling_count_squared_sum
=
0.0
;
for
(
size_t
i
=
0
;
i
<
ids
.
size
();
++
i
)
{
size_t
count
=
total_count
[
ids
[
i
].
id
];
ASSERT_NE
(
0ul
,
count
)
<<
"i="
<<
i
;
std
::
cout
<<
i
<<
'='
<<
count
<<
' '
;
double
scaling_count
=
static_cast
<
double
>
(
count
)
/
std
::
stoi
(
ids
[
i
].
tag
);
scaling_count_sum
+=
scaling_count
;
scaling_count_squared_sum
+=
scaling_count
*
scaling_count
;
}
std
::
cout
<<
'\n'
<<
": scaling average="
<<
scaling_count_sum
/
ids
.
size
()
<<
" scaling deviation="
<<
sqrt
(
scaling_count_squared_sum
*
ids
.
size
()
-
scaling_count_sum
*
scaling_count_sum
)
/
ids
.
size
()
<<
std
::
endl
;
}
for
(
size_t
i
=
0
;
i
<
ids
.
size
();
++
i
)
{
ASSERT_EQ
(
0
,
brpc
::
Socket
::
SetFailed
(
ids
[
i
].
id
));
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment