Commit 30f356ab authored by gejun's avatar gejun

Patch svn r34985 r34988

Change-Id: I0940722f6d037f6783f82fec75f06a15d369885c
parent b8cd3c98
......@@ -50,8 +50,6 @@ std::lock_guard<typename std::remove_reference<T>::type> get_lock_guard();
namespace std {
#if !defined(BASE_CXX11_ENABLED)
template <typename Mutex> class lock_guard;
template <typename Mutex> class unique_lock;
// Do not acquire ownership of the mutex
struct defer_lock_t {};
......@@ -65,7 +63,89 @@ static const try_to_lock_t try_to_lock = {};
struct adopt_lock_t {};
static const adopt_lock_t adopt_lock = {};
#endif
template <typename Mutex> class lock_guard {
public:
explicit lock_guard(Mutex & mutex) : _pmutex(&mutex) { _pmutex->lock(); }
~lock_guard() { _pmutex->unlock(); }
private:
DISALLOW_COPY_AND_ASSIGN(lock_guard);
Mutex* _pmutex;
};
template <typename Mutex> class unique_lock {
DISALLOW_COPY_AND_ASSIGN(unique_lock);
public:
typedef Mutex mutex_type;
unique_lock() : _mutex(NULL), _owns_lock(false) {}
explicit unique_lock(mutex_type& mutex)
: _mutex(&mutex), _owns_lock(true) {
mutex.lock();
}
unique_lock(mutex_type& mutex, defer_lock_t)
: _mutex(&mutex), _owns_lock(false)
{}
unique_lock(mutex_type& mutex, try_to_lock_t)
: _mutex(&mutex), _owns_lock(mutex.try_lock())
{}
unique_lock(mutex_type& mutex, adopt_lock_t)
: _mutex(&mutex), _owns_lock(true)
{}
~unique_lock() {
if (_owns_lock) {
_mutex->unlock();
}
}
void lock() {
if (_owns_lock) {
CHECK(false) << "Detected deadlock issue";
return;
}
_owns_lock = true;
_mutex->lock();
}
bool try_lock() {
if (_owns_lock) {
CHECK(false) << "Detected deadlock issue";
return false;
}
_owns_lock = _mutex->try_lock();
return _owns_lock;
}
void unlock() {
if (!_owns_lock) {
CHECK(false) << "Invalid operation";
return;
}
_mutex->unlock();
_owns_lock = false;
}
void swap(unique_lock& rhs) {
std::swap(_mutex, rhs._mutex);
std::swap(_owns_lock, rhs._owns_lock);
}
mutex_type* release() {
mutex_type* saved_mutex = _mutex;
_mutex = NULL;
_owns_lock = false;
return saved_mutex;
}
mutex_type* mutex() { return _mutex; }
bool owns_lock() const { return _owns_lock; }
operator bool() const { return owns_lock(); }
private:
mutex_type* _mutex;
bool _owns_lock;
};
#endif // !defined(BASE_CXX11_ENABLED)
#if defined(OS_POSIX)
......@@ -133,8 +213,8 @@ public:
typedef pthread_mutex_t mutex_type;
unique_lock() : _mutex(NULL), _owns_lock(false) {}
explicit unique_lock(mutex_type& mutex)
: _mutex(&mutex), _owns_lock(false) {
lock();
: _mutex(&mutex), _owns_lock(true) {
pthread_mutex_lock(_mutex);
}
unique_lock(mutex_type& mutex, defer_lock_t)
: _mutex(&mutex), _owns_lock(false)
......@@ -148,7 +228,7 @@ public:
~unique_lock() {
if (_owns_lock) {
unlock();
pthread_mutex_unlock(_mutex);
}
}
......@@ -158,19 +238,16 @@ public:
return;
}
#if !defined(NDEBUG)
if (!_mutex) {
CHECK(false) << "_mutex is NULL";
return;
}
const int rc = pthread_mutex_lock(_mutex);
if (rc) {
LOG(FATAL) << "Fail to lock pthread_mutex=" << _mutex << ", " << berror(rc);
return;
}
_owns_lock = true;
#else
_owns_lock = true;
pthread_mutex_lock(_mutex);
#endif // NDEBUG
_owns_lock = true;
}
bool try_lock() {
......@@ -178,12 +255,6 @@ public:
CHECK(false) << "Detected deadlock issue";
return false;
}
#if !defined(NDEBUG)
if (!_mutex) {
CHECK(false) << "_mutex is NULL";
return false;
}
#endif
_owns_lock = !pthread_mutex_trylock(_mutex);
return _owns_lock;
}
......@@ -193,12 +264,6 @@ public:
CHECK(false) << "Invalid operation";
return;
}
#if !defined(NDEBUG)
if (!_mutex) {
CHECK(false) << "_mutex is NULL";
return;
}
#endif
pthread_mutex_unlock(_mutex);
_owns_lock = false;
}
......@@ -230,13 +295,13 @@ public:
typedef pthread_spinlock_t mutex_type;
unique_lock() : _mutex(NULL), _owns_lock(false) {}
explicit unique_lock(mutex_type& mutex)
: _mutex(&mutex), _owns_lock(false) {
lock();
: _mutex(&mutex), _owns_lock(true) {
pthread_spin_lock(_mutex);
}
~unique_lock() {
if (_owns_lock) {
unlock();
pthread_spin_unlock(_mutex);
}
}
unique_lock(mutex_type& mutex, defer_lock_t)
......@@ -255,20 +320,16 @@ public:
return;
}
#if !defined(NDEBUG)
if (!_mutex) {
CHECK(false) << "_mutex is NULL";
return;
}
const int rc = pthread_spin_lock(_mutex);
if (rc) {
LOG(FATAL) << "Fail to lock pthread_spinlock=" << _mutex << ", " << berror(rc);
_owns_lock = false;
return;
}
_owns_lock = true;
#else
_owns_lock = true;
pthread_spin_lock(_mutex);
#endif // NDEBUG
_owns_lock = true;
}
bool try_lock() {
......@@ -276,12 +337,6 @@ public:
CHECK(false) << "Detected deadlock issue";
return false;
}
#if !defined(NDEBUG)
if (!_mutex) {
CHECK(false) << "_mutex is NULL";
return false;
}
#endif
_owns_lock = !pthread_spin_trylock(_mutex);
return _owns_lock;
}
......@@ -291,12 +346,6 @@ public:
CHECK(false) << "Invalid operation";
return;
}
#if !defined(NDEBUG)
if (!_mutex) {
CHECK(false) << "_mutex is NULL";
return;
}
#endif
pthread_spin_unlock(_mutex);
_owns_lock = false;
}
......@@ -324,106 +373,6 @@ private:
#endif // defined(OS_POSIX)
template<> class lock_guard<base::Mutex> {
public:
explicit lock_guard(base::Mutex & mutex) : _pmutex(&mutex) { _pmutex->lock(); }
~lock_guard() { _pmutex->unlock(); }
private:
DISALLOW_COPY_AND_ASSIGN(lock_guard);
base::Mutex* _pmutex;
};
template<> class unique_lock<base::Mutex> {
DISALLOW_COPY_AND_ASSIGN(unique_lock);
public:
typedef base::Mutex mutex_type;
unique_lock() : _mutex(NULL), _owns_lock(false) {}
explicit unique_lock(mutex_type& mutex)
: _mutex(&mutex), _owns_lock(false) {
lock();
}
unique_lock(mutex_type& mutex, defer_lock_t)
: _mutex(&mutex), _owns_lock(false)
{}
unique_lock(mutex_type& mutex, try_to_lock_t)
: _mutex(&mutex), _owns_lock(mutex.try_lock())
{}
unique_lock(mutex_type& mutex, adopt_lock_t)
: _mutex(&mutex), _owns_lock(true)
{}
~unique_lock() {
if (_owns_lock) {
unlock();
}
}
void lock() {
if (_owns_lock) {
CHECK(false) << "Detected deadlock issue";
return;
}
#if !defined(NDEBUG)
if (!_mutex) {
CHECK(false) << "_mutex is NULL";
return;
}
#endif
_mutex->lock();
_owns_lock = true;
}
bool try_lock() {
if (_owns_lock) {
CHECK(false) << "Detected deadlock issue";
return false;
}
#if !defined(NDEBUG)
if (!_mutex) {
CHECK(false) << "_mutex is NULL";
return false;
}
#endif
_owns_lock = _mutex->try_lock();
return _owns_lock;
}
void unlock() {
if (!_owns_lock) {
CHECK(false) << "Invalid operation";
return;
}
#if !defined(NDEBUG)
if (!_mutex) {
CHECK(false) << "_mutex is NULL";
return;
}
#endif
_mutex->unlock();
_owns_lock = false;
}
void swap(unique_lock& rhs) {
std::swap(_mutex, rhs._mutex);
std::swap(_owns_lock, rhs._owns_lock);
}
mutex_type* release() {
mutex_type* saved_mutex = _mutex;
_mutex = NULL;
_owns_lock = false;
return saved_mutex;
}
mutex_type* mutex() { return _mutex; }
bool owns_lock() const { return _owns_lock; }
operator bool() const { return owns_lock(); }
private:
mutex_type* _mutex;
bool _owns_lock;
};
} // namespace std
namespace base {
......@@ -433,8 +382,8 @@ template <typename Mutex1, typename Mutex2>
void double_lock(std::unique_lock<Mutex1> &lck1, std::unique_lock<Mutex2> &lck2) {
DCHECK(!lck1.owns_lock());
DCHECK(!lck2.owns_lock());
const void* ptr1 = static_cast<void*>(lck1.mutex());
const void* ptr2 = static_cast<void*>(lck2.mutex());
volatile void* const ptr1 = lck1.mutex();
volatile void* const ptr2 = lck2.mutex();
DCHECK_NE(ptr1, ptr2);
if (ptr1 < ptr2) {
lck1.lock();
......
......@@ -275,7 +275,6 @@ struct BAIDU_CACHELINE_ALIGNMENT Socket::WriteRequest {
WriteRequest* next;
bthread_id_t id_wait;
Socket* socket;
bool is_debug;
uint32_t pipelined_count() const {
return (_pc_and_udmsg >> 48) & 0xFFFF;
......@@ -379,10 +378,60 @@ public:
static const uint64_t AUTH_FLAG = (1ul << 32);
Socket::Socket(Forbidden)
// must be even because Address() relies on evenness of version
: _versioned_ref(0)
, _shared_part(NULL)
, _nevent(0)
, _keytable_pool(NULL)
, _fd(-1)
, _tos(0)
, _reset_fd_real_us(-1)
, _on_edge_triggered_events(NULL)
, _user(NULL)
, _conn(NULL)
, _app_connect(NULL)
, _this_id(0)
, _preferred_index(-1)
, _hc_count(0)
, _last_msg_size(0)
, _avg_msg_size(0)
, _last_readtime_us(0)
, _parsing_context(NULL)
, _correlation_id(0)
, _health_check_interval_s(-1)
, _ninprocess(1)
, _auth_flag_error(0)
, _auth_id(INVALID_BTHREAD_ID)
, _auth_context(NULL)
, _ssl_state(SSL_UNKNOWN)
, _ssl_ctx(NULL)
, _ssl_session(NULL)
, _connection_type_for_progressive_read(CONNECTION_TYPE_UNKNOWN)
, _controller_released_socket(false)
, _overcrowded(false)
, _fail_me_at_server_stop(false)
, _logoff_flag(false)
, _recycle_flag(false)
, _error_code(0)
, _pipeline_q(NULL)
, _last_writetime_us(0)
, _unwritten_bytes(0)
, _epollout_butex(NULL)
, _write_head(NULL)
, _stream_set(NULL)
{
CreateVarsOnce();
pthread_mutex_init(&_id_wait_list_mutex, NULL);
_epollout_butex = bthread::butex_create_checked<base::atomic<int> >();
}
Socket::~Socket() {
pthread_mutex_destroy(&_id_wait_list_mutex);
bthread::butex_destroy(_epollout_butex);
}
void Socket::ReturnSuccessfulWriteRequest(Socket::WriteRequest* p) {
if (p->is_debug) {
LOG(WARNING) << "[DEBUG] ReturnSuccessfulWriteRequest, req=" << p << " SocketId=" << id();
}
DCHECK(p->data.empty());
AddOutputMessages(1);
const bthread_id_t id_wait = p->id_wait;
......@@ -394,9 +443,6 @@ void Socket::ReturnSuccessfulWriteRequest(Socket::WriteRequest* p) {
void Socket::ReturnFailedWriteRequest(Socket::WriteRequest* p, int error_code,
const std::string& error_text) {
if (p->is_debug) {
LOG(WARNING) << "[DEBUG] ReturnFailedWriteRequest, req=" << p << " SocketId=" << id();
}
if (!p->reset_pipelined_count_and_user_message()) {
CancelUnwrittenBytes(p->data.size());
}
......@@ -1371,7 +1417,6 @@ int Socket::Write(base::IOBuf* data, const WriteOptions* options_in) {
req->id_wait = opt.id_wait;
req->set_pipelined_count_and_user_message(
opt.pipelined_count, DUMMY_USER_MESSAGE);
req->is_debug = false;
return StartWrite(req, opt);
}
......@@ -1407,10 +1452,6 @@ int Socket::Write(SocketMessagePtr<>& msg, const WriteOptions* options_in) {
req->next = WriteRequest::UNCONNECTED;
req->id_wait = opt.id_wait;
req->set_pipelined_count_and_user_message(opt.pipelined_count, msg.release());
req->is_debug = false;
if (VLOG_IS_ON(RPC_VLOG_LEVEL)) {
req->is_debug = dynamic_cast<policy::RtmpCreateStreamMessage*>(req->user_message());
}
return StartWrite(req, opt);
}
......@@ -1465,16 +1506,7 @@ int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) {
base::IOBuf* data_arr[1] = { &req->data };
nw = _conn->CutMessageIntoFileDescriptor(fd(), data_arr, 1);
} else {
if (req->is_debug) {
LOG(WARNING) << "[DEBUG] Before write, data.size=" << req->data.size()
<< " req=" << req << " SocketId=" << id();
}
nw = req->data.cut_into_file_descriptor(fd());
if (req->is_debug) {
LOG(WARNING) << "[DEBUG] After write, data.size=" << req->data.size()
<< " nw=" << nw << " errno=" << errno
<< " req=" << req << " SocketId=" << id();
}
}
if (nw < 0) {
// RTMP may return EOVERCROWDED
......@@ -1491,16 +1523,10 @@ int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) {
}
if (IsWriteComplete(req, true, NULL)) {
ReturnSuccessfulWriteRequest(req);
if (req->is_debug) {
LOG(WARNING) << "[DEBUG] Written in-place, req=" << req << " SocketId=" << id();
}
return 0;
}
KEEPWRITE_IN_BACKGROUND:
if (req->is_debug) {
LOG(WARNING) << "[DEBUG] launch KeepWrite, req=" << req << " SocketId=" << id();
}
ReAddress(&ptr_for_keep_write);
req->socket = ptr_for_keep_write.release();
if (bthread_start_background(&th, &BTHREAD_ATTR_NORMAL,
......@@ -1511,10 +1537,6 @@ KEEPWRITE_IN_BACKGROUND:
return 0;
FAIL_TO_WRITE:
if (req->is_debug) {
LOG(WARNING) << "[DEBUG] Fail in-place, errno=" << saved_errno
<< " req=" << req << " SocketId=" << id();
}
// `SetFailed' before `ReturnFailedWriteRequest' (which will calls
// `on_reset' callback inside the id object) so that we immediately
// know this socket has failed inside the `on_reset' callback
......@@ -1529,9 +1551,6 @@ void* Socket::KeepWrite(void* void_arg) {
s_vars->nkeepwrite << 1;
WriteRequest* req = static_cast<WriteRequest*>(void_arg);
SocketUniquePtr s(req->socket);
if (req->is_debug) {
LOG(WARNING) << "[DEBUG] Start KeepWrite, req=" << req << " SocketId=" << s->id();
}
// When error occurs, spin until there's no more requests instead of
// returning directly otherwise _write_head is permantly non-NULL which
......@@ -1609,27 +1628,16 @@ ssize_t Socket::DoWrite(WriteRequest* req) {
// Group base::IOBuf in the list into a batch array.
base::IOBuf* data_list[DATA_LIST_MAX];
size_t ndata = 0;
WriteRequest* debug_req = NULL;
for (WriteRequest* p = req; p != NULL && ndata < DATA_LIST_MAX;
p = p->next) {
if (p->is_debug) {
debug_req = p;
}
data_list[ndata++] = &p->data;
}
// Write IOBuf in the batch array into the fd.
if (_conn) {
return _conn->CutMessageIntoFileDescriptor(fd(), data_list, ndata);
} else {
if (debug_req) {
LOG(WARNING) << "[DEBUG] Before cut, req=" << debug_req << " SocketId=" << id();
}
ssize_t nw = base::IOBuf::cut_multiple_into_file_descriptor(
fd(), data_list, ndata);
if (debug_req) {
LOG(WARNING) << "[DEBUG] After cut, nw=" << nw
<< " errno=" << errno << " req=" << debug_req << " SocketId=" << id();
}
return nw;
}
} else if (ssl_state() == SSL_UNKNOWN) {
......@@ -2326,7 +2334,7 @@ int Socket::GetShortSocket(Socket* main_socket,
void Socket::GetStat(SocketStat* s) const {
BAIDU_CASSERT(offsetof(Socket, _preferred_index) >= 64, different_cacheline);
//BAIDU_CASSERT(sizeof(WriteRequest) == 64, sizeof_write_request_is_64);
BAIDU_CASSERT(sizeof(WriteRequest) == 64, sizeof_write_request_is_64);
SharedPart* sp = GetSharedPart();
if (sp != NULL && sp->extended_stat != NULL) {
......
......@@ -16,7 +16,7 @@
#include "base/macros.h" // DISALLOW_COPY_AND_ASSIGN
#include "base/endpoint.h" // base::EndPoint
#include "base/resource_pool.h" // base::ResourceId
#include "bthread/butex.h" // BUTEX_MEMORY_SIZE
#include "bthread/butex.h" // butex_create_checked
#include "brpc/authenticator.h" // Authenticator
#include "brpc/details/ssl_helper.h" // SSLState
#include "brpc/stream.h" // StreamId
......@@ -704,7 +704,6 @@ private:
// Butex to wait for EPOLLOUT event
base::atomic<int>* _epollout_butex;
char _epollout_butex_memory[BUTEX_MEMORY_SIZE] __attribute__((__aligned__(sizeof(int))));
// Storing data that are not flushed into `fd' yet.
base::atomic<WriteRequest*> _write_head;
......
......@@ -52,63 +52,6 @@ inline SocketOptions::SocketOptions()
, initial_parsing_context(NULL)
{}
static const uint64_t INITIAL_VREF = 0;
static const uint32_t EOF_FLAG = (1 << 31);
inline Socket::Socket(Forbidden)
// must be even because Address() relies on evenness of version
: _versioned_ref(0)
, _shared_part(NULL)
, _nevent(0)
, _keytable_pool(NULL)
, _fd(-1)
, _tos(0)
, _reset_fd_real_us(-1)
, _on_edge_triggered_events(NULL)
, _user(NULL)
, _conn(NULL)
, _app_connect(NULL)
, _this_id(0)
, _preferred_index(-1)
, _hc_count(0)
, _last_msg_size(0)
, _avg_msg_size(0)
, _last_readtime_us(0)
, _parsing_context(NULL)
, _correlation_id(0)
, _health_check_interval_s(-1)
, _ninprocess(1)
, _auth_flag_error(0)
, _auth_id(INVALID_BTHREAD_ID)
, _auth_context(NULL)
, _ssl_state(SSL_UNKNOWN)
, _ssl_ctx(NULL)
, _ssl_session(NULL)
, _connection_type_for_progressive_read(CONNECTION_TYPE_UNKNOWN)
, _controller_released_socket(false)
, _overcrowded(false)
, _fail_me_at_server_stop(false)
, _logoff_flag(false)
, _recycle_flag(false)
, _error_code(0)
, _pipeline_q(NULL)
, _last_writetime_us(0)
, _unwritten_bytes(0)
, _epollout_butex(NULL)
, _write_head(NULL)
, _stream_set(NULL)
{
CreateVarsOnce();
pthread_mutex_init(&_id_wait_list_mutex, NULL);
_epollout_butex = (base::atomic<int>*)
::bthread::butex_construct(_epollout_butex_memory);
}
inline Socket::~Socket() {
pthread_mutex_destroy(&_id_wait_list_mutex);
::bthread::butex_destruct(_epollout_butex_memory);
}
inline int Socket::Dereference() {
const SocketId id = _this_id;
const uint64_t vref = _versioned_ref.fetch_sub(
......@@ -294,6 +237,8 @@ inline bool Socket::IsLogOff() const {
return _logoff_flag.load(base::memory_order_relaxed);
}
static const uint32_t EOF_FLAG = (1 << 31);
inline void Socket::PostponeEOF() {
if (CreatedByConnect()) { // not needed at server-side
_ninprocess.fetch_add(1, base::memory_order_relaxed);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment