Commit 44c9698a authored by zhujiashun's avatar zhujiashun

health_check_using_rpc: change the position of Channel::Init

parent 69477c32
...@@ -565,7 +565,8 @@ int Channel::Weight() { ...@@ -565,7 +565,8 @@ int Channel::Weight() {
int Channel::CheckHealth() { int Channel::CheckHealth() {
if (_lb == NULL) { if (_lb == NULL) {
SocketUniquePtr ptr; SocketUniquePtr ptr;
if (Socket::Address(_server_id, &ptr) == 0) { if (Socket::Address(_server_id, &ptr) == 0 && !ptr->IsLogOff() &&
!ptr->IsHealthCheckingUsingRPC()) {
return 0; return 0;
} }
return -1; return -1;
......
...@@ -146,6 +146,7 @@ private: ...@@ -146,6 +146,7 @@ private:
class Channel : public ChannelBase { class Channel : public ChannelBase {
friend class Controller; friend class Controller;
friend class SelectiveChannel; friend class SelectiveChannel;
friend class HealthCheckTask;
public: public:
Channel(ProfilerLinker = ProfilerLinker()); Channel(ProfilerLinker = ProfilerLinker());
~Channel(); ~Channel();
...@@ -155,7 +156,6 @@ public: ...@@ -155,7 +156,6 @@ public:
int Init(butil::EndPoint server_addr_and_port, const ChannelOptions* options); int Init(butil::EndPoint server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr_and_port, const ChannelOptions* options); int Init(const char* server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr, int port, const ChannelOptions* options); int Init(const char* server_addr, int port, const ChannelOptions* options);
int Init(SocketId id, const ChannelOptions* options);
// Connect this channel to a group of servers whose addresses can be // Connect this channel to a group of servers whose addresses can be
// accessed via `naming_service_url' according to its protocol. Use the // accessed via `naming_service_url' according to its protocol. Use the
...@@ -215,6 +215,10 @@ protected: ...@@ -215,6 +215,10 @@ protected:
const char* raw_server_address, const char* raw_server_address,
const ChannelOptions* options); const ChannelOptions* options);
// Init a channel from a known SocketId. Currently it is
// used only by health check using rpc.
int Init(SocketId id, const ChannelOptions* options);
butil::EndPoint _server_address; butil::EndPoint _server_address;
SocketId _server_id; SocketId _server_id;
Protocol::SerializeRequest _serialize_request; Protocol::SerializeRequest _serialize_request;
......
...@@ -706,7 +706,7 @@ TEST_F(HttpTest, read_long_body_progressively) { ...@@ -706,7 +706,7 @@ TEST_F(HttpTest, read_long_body_progressively) {
last_read = current_read; last_read = current_read;
} }
// Read something in past N seconds. // Read something in past N seconds.
ASSERT_GT(last_read, 100000); ASSERT_GT(last_read, (size_t)100000);
} }
// the socket still holds a ref. // the socket still holds a ref.
ASSERT_FALSE(reader->destroyed()); ASSERT_FALSE(reader->destroyed());
...@@ -794,7 +794,7 @@ TEST_F(HttpTest, read_progressively_after_cntl_destroys) { ...@@ -794,7 +794,7 @@ TEST_F(HttpTest, read_progressively_after_cntl_destroys) {
last_read = current_read; last_read = current_read;
} }
// Read something in past N seconds. // Read something in past N seconds.
ASSERT_GT(last_read, 100000); ASSERT_GT(last_read, (size_t)100000);
ASSERT_FALSE(reader->destroyed()); ASSERT_FALSE(reader->destroyed());
} }
// Wait for recycling of the main socket. // Wait for recycling of the main socket.
...@@ -843,7 +843,7 @@ TEST_F(HttpTest, read_progressively_after_long_delay) { ...@@ -843,7 +843,7 @@ TEST_F(HttpTest, read_progressively_after_long_delay) {
last_read = current_read; last_read = current_read;
} }
// Read something in past N seconds. // Read something in past N seconds.
ASSERT_GT(last_read, 100000); ASSERT_GT(last_read, (size_t)100000);
} }
ASSERT_FALSE(reader->destroyed()); ASSERT_FALSE(reader->destroyed());
} }
...@@ -883,7 +883,7 @@ TEST_F(HttpTest, skip_progressive_reading) { ...@@ -883,7 +883,7 @@ TEST_F(HttpTest, skip_progressive_reading) {
ASSERT_EQ(0, svc.last_errno()); ASSERT_EQ(0, svc.last_errno());
LOG(INFO) << "Server still wrote " << new_written_bytes - old_written_bytes; LOG(INFO) << "Server still wrote " << new_written_bytes - old_written_bytes;
// The server side still wrote things. // The server side still wrote things.
ASSERT_GT(new_written_bytes - old_written_bytes, 100000); ASSERT_GT(new_written_bytes - old_written_bytes, (size_t)100000);
} }
class AlwaysFailRead : public brpc::ProgressiveReader { class AlwaysFailRead : public brpc::ProgressiveReader {
...@@ -954,7 +954,7 @@ TEST_F(HttpTest, broken_socket_stops_progressive_reading) { ...@@ -954,7 +954,7 @@ TEST_F(HttpTest, broken_socket_stops_progressive_reading) {
last_read = current_read; last_read = current_read;
} }
// Read something in past N seconds. // Read something in past N seconds.
ASSERT_GT(last_read, 100000); ASSERT_GT(last_read, (size_t)100000);
} }
// the socket still holds a ref. // the socket still holds a ref.
ASSERT_FALSE(reader->destroyed()); ASSERT_FALSE(reader->destroyed());
......
...@@ -53,7 +53,7 @@ TEST_F(NamingServiceFilterTest, sanity) { ...@@ -53,7 +53,7 @@ TEST_F(NamingServiceFilterTest, sanity) {
ASSERT_EQ(0, butil::hostname2endpoint("10.128.0.1:1234", &ep)); ASSERT_EQ(0, butil::hostname2endpoint("10.128.0.1:1234", &ep));
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
brpc::SocketUniquePtr tmp_sock; brpc::SocketUniquePtr tmp_sock;
brpc::LoadBalancer::SelectIn sel_in = { 0, false, false, 0, NULL }; brpc::LoadBalancer::SelectIn sel_in = { 0, false, false, 0, NULL, false };
brpc::LoadBalancer::SelectOut sel_out(&tmp_sock); brpc::LoadBalancer::SelectOut sel_out(&tmp_sock);
ASSERT_EQ(0, channel._lb->SelectServer(sel_in, &sel_out)); ASSERT_EQ(0, channel._lb->SelectServer(sel_in, &sel_out));
ASSERT_EQ(ep, tmp_sock->remote_side()); ASSERT_EQ(ep, tmp_sock->remote_side());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment