Commit 7fb0515e authored by zhujiashun's avatar zhujiashun

replace epoll by kqueue in all places and remove fake epoll

parent 3e87f670
......@@ -28,7 +28,6 @@
#define EPOLLRDHUP 0x2000
#endif
namespace brpc {
static unsigned int check_epollrdhup() {
......
......@@ -129,7 +129,7 @@ void EventDispatcher::Stop() {
epoll_ctl(_epfd, EPOLL_CTL_ADD, _wakeup_fds[1], &evt);
#elif defined(OS_MACOSX)
struct kevent kqueue_event;
EV_SET(&kqueue_event, _wakeup_fds[1], EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR,
EV_SET(&kqueue_event, _wakeup_fds[1], EVFILT_WRITE, EV_ADD | EV_ENABLE,
0, 0, NULL);
kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL);
#endif
......
......@@ -1712,8 +1712,12 @@ ssize_t Socket::DoWrite(WriteRequest* req) {
break;
case SSL_ERROR_WANT_READ:
// Wait for EPOLLIN to finish renegotiation
// Wait for read event to finish renegotiation
#if defined(OS_LINUX)
if (bthread_fd_wait(fd(), EPOLLIN) == 0) {
#elif defined(OS_MACOSX)
if (bthread_fd_wait(fd(), EVFILT_READ) == 0) {
#endif
need_continue = true;
}
break;
......@@ -1794,8 +1798,12 @@ ssize_t Socket::DoRead(size_t size_hint) {
break;
case SSL_ERROR_WANT_WRITE:
// Wait for EPOLLOUT to finish renegotiation
// Wait for write event to finish renegotiation
#if defined(OS_LINUX)
if (bthread_fd_wait(fd(), EPOLLOUT) == 0) {
#elif defined(OS_MACOSX)
if (bthread_fd_wait(fd(), EVFILT_WRITE) == 0) {
#endif
need_continue = true;
}
break;
......
......@@ -19,6 +19,8 @@
#include "butil/compat.h"
#include <new> // std::nothrow
#include <sys/poll.h> // poll()
#include <sys/types.h> // struct kevent
#include <sys/event.h> // kevent(), kqueue()
#include "butil/atomicops.h"
#include "butil/time.h"
#include "butil/fd_utility.h" // make_non_blocking
......@@ -119,10 +121,14 @@ public:
_start_mutex.unlock();
return -1;
}
#if defined(OS_LINUX)
_epfd = epoll_create(epoll_size);
#elif defined(OS_MACOSX)
_epfd = kqueue();
#endif
_start_mutex.unlock();
if (_epfd < 0) {
PLOG(FATAL) << "Fail to epoll_create";
PLOG(FATAL) << "Fail to epoll_create/kqueue";
return -1;
}
if (bthread_start_background(
......@@ -159,9 +165,16 @@ public:
PLOG(FATAL) << "Fail to create closing_epoll_pipe";
return -1;
}
#if defined(OS_LINUX)
epoll_event evt = { EPOLLOUT, { NULL } };
if (epoll_ctl(saved_epfd, EPOLL_CTL_ADD,
closing_epoll_pipe[1], &evt) < 0) {
#elif defined(OS_MACOSX)
struct kevent kqueue_event;
EV_SET(&kqueue_event, closing_epoll_pipe[1], EVFILT_WRITE, EV_ADD | EV_ENABLE,
0, 0, NULL);
if (kevent(saved_epfd, &kqueue_event, 1, NULL, 0, NULL) < 0) {
#endif
PLOG(FATAL) << "Fail to add closing_epoll_pipe into epfd="
<< saved_epfd;
return -1;
......@@ -178,7 +191,7 @@ public:
return 0;
}
int fd_wait(int fd, unsigned epoll_events, const timespec* abstime) {
int fd_wait(int fd, unsigned events, const timespec* abstime) {
butil::atomic<EpollButex*>* p = fd_butexes.get_or_new(fd);
if (NULL == p) {
errno = ENOMEM;
......@@ -212,8 +225,9 @@ public:
// and EPOLL_CTL_ADD shall have release fence.
const int expected_val = butex->load(butil::memory_order_relaxed);
#ifdef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
epoll_event evt = { epoll_events | EPOLLONESHOT, { butex } };
#if defined(OS_LINUX)
# ifdef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
epoll_event evt = { events | EPOLLONESHOT, { butex } };
if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) {
if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0 &&
errno != EEXIST) {
......@@ -221,16 +235,25 @@ public:
return -1;
}
}
#else
# else
epoll_event evt;
evt.events = epoll_events;
evt.events = events;
evt.data.fd = fd;
if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0 &&
errno != EEXIST) {
PLOG(FATAL) << "Fail to add fd=" << fd << " into epfd=" << _epfd;
return -1;
}
#endif
# endif
#elif defined(OS_MACOSX)
struct kevent kqueue_event;
EV_SET(&kqueue_event, fd, events, EV_ADD | EV_ENABLE | EV_ONESHOT,
0, 0, butex);
if (kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL) < 0) {
PLOG(FATAL) << "Fail to add fd=" << fd << " into kqueuefd=" << _epfd;
return -1;
}
#endif
if (butex_wait(butex, expected_val, abstime) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
return -1;
......@@ -260,7 +283,15 @@ public:
butex->fetch_add(1, butil::memory_order_relaxed);
butex_wake_all(butex);
}
#if defined(OS_LINUX)
epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
#elif defined(OS_MACOSX)
struct kevent evt;
EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
kevent(_epfd, &evt, 1, NULL, 0, NULL);
EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
kevent(_epfd, &evt, 1, NULL, 0, NULL);
#endif
const int rc = close(fd);
pbutex->exchange(butex, butil::memory_order_relaxed);
return rc;
......@@ -278,18 +309,29 @@ private:
void* run() {
const int initial_epfd = _epfd;
const size_t MAX_EVENTS = 32;
#if defined(OS_LINUX)
epoll_event* e = new (std::nothrow) epoll_event[MAX_EVENTS];
#elif defined(OS_MACOSX)
typedef struct kevent KEVENT;
struct kevent* e = new (std::nothrow) KEVENT[MAX_EVENTS];
#endif
if (NULL == e) {
LOG(FATAL) << "Fail to new epoll_event";
return NULL;
}
#ifndef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
#if defined(OS_LINUX)
# ifndef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
DLOG(INFO) << "Use DEL+ADD instead of EPOLLONESHOT+MOD due to kernel bug. Performance will be much lower.";
# endif
#endif
while (!_stop) {
const int epfd = _epfd;
#if defined(OS_LINUX)
const int n = epoll_wait(epfd, e, MAX_EVENTS, -1);
#elif defined(OS_MACOSX)
const int n = kevent(epfd, NULL, 0, e, MAX_EVENTS, NULL);
#endif
if (_stop) {
break;
}
......@@ -311,14 +353,18 @@ private:
break;
}
#ifndef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
#if defined(OS_LINUX)
# ifndef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
for (int i = 0; i < n; ++i) {
epoll_ctl(epfd, EPOLL_CTL_DEL, e[i].data.fd, NULL);
}
# endif
#endif
for (int i = 0; i < n; ++i) {
#ifdef BAIDU_KERNEL_FIXED_EPOLLONESHOT_BUG
EpollButex* butex = static_cast<EpollButex*>(e[i].data.ptr);
#elif defined(OS_MACOSX)
EpollButex* butex = static_cast<EpollButex*>(e[i].udata);
#else
butil::atomic<EpollButex*>* pbutex = fd_butexes.get(e[i].data.fd);
EpollButex* butex = pbutex ?
......@@ -368,6 +414,7 @@ int stop_and_join_epoll_threads() {
return rc;
}
#if defined(OS_LINUX)
short epoll_to_poll_events(uint32_t epoll_events) {
// Most POLL* and EPOLL* are same values.
short poll_events = (epoll_events &
......@@ -378,9 +425,22 @@ short epoll_to_poll_events(uint32_t epoll_events) {
CHECK_EQ((uint32_t)poll_events, epoll_events);
return poll_events;
}
#elif defined(OS_MACOSX)
short kqueue_to_poll_events(uint32_t kqueue_events) {
//TODO: add more values?
short poll_events = 0;
if (kqueue_events == EVFILT_READ) {
poll_events |= POLLIN;
}
if (kqueue_events == EVFILT_WRITE) {
poll_events |= POLLOUT;
}
return poll_events;
}
#endif
// For pthreads.
int pthread_fd_wait(int fd, unsigned epoll_events,
int pthread_fd_wait(int fd, unsigned events,
const timespec* abstime) {
int diff_ms = -1;
if (abstime) {
......@@ -404,8 +464,13 @@ int pthread_fd_wait(int fd, unsigned epoll_events,
}
diff_ms = (abstime_us - now_us + 999L) / 1000L;
}
#if defined(OS_LINUX)
const short poll_events =
bthread::epoll_to_poll_events(events);
#elif defined(OS_MACOSX)
const short poll_events =
bthread::epoll_to_poll_events(epoll_events);
bthread::kqueue_to_poll_events(events);
#endif
if (poll_events == 0) {
errno = EINVAL;
return -1;
......@@ -430,7 +495,7 @@ int pthread_fd_wait(int fd, unsigned epoll_events,
extern "C" {
int bthread_fd_wait(int fd, unsigned epoll_events) {
int bthread_fd_wait(int fd, unsigned events) {
if (fd < 0) {
errno = EINVAL;
return -1;
......@@ -438,15 +503,15 @@ int bthread_fd_wait(int fd, unsigned epoll_events) {
bthread::TaskGroup* g = bthread::tls_task_group;
if (NULL != g && !g->is_current_pthread_task()) {
return bthread::get_epoll_thread(fd).fd_wait(
fd, epoll_events, NULL);
fd, events, NULL);
}
return bthread::pthread_fd_wait(fd, epoll_events, NULL);
return bthread::pthread_fd_wait(fd, events, NULL);
}
int bthread_fd_timedwait(int fd, unsigned epoll_events,
int bthread_fd_timedwait(int fd, unsigned events,
const timespec* abstime) {
if (NULL == abstime) {
return bthread_fd_wait(fd, epoll_events);
return bthread_fd_wait(fd, events);
}
if (fd < 0) {
errno = EINVAL;
......@@ -455,9 +520,9 @@ int bthread_fd_timedwait(int fd, unsigned epoll_events,
bthread::TaskGroup* g = bthread::tls_task_group;
if (NULL != g && !g->is_current_pthread_task()) {
return bthread::get_epoll_thread(fd).fd_wait(
fd, epoll_events, abstime);
fd, events, abstime);
}
return bthread::pthread_fd_wait(fd, epoll_events, abstime);
return bthread::pthread_fd_wait(fd, events, abstime);
}
int bthread_connect(int sockfd, const sockaddr* serv_addr,
......@@ -472,7 +537,11 @@ int bthread_connect(int sockfd, const sockaddr* serv_addr,
if (rc == 0 || errno != EINPROGRESS) {
return rc;
}
#if defined(OS_LINUX)
if (bthread_fd_wait(sockfd, EPOLLOUT) < 0) {
#elif defined(OS_MACOSX)
if (bthread_fd_wait(sockfd, EVFILT_WRITE) < 0) {
#endif
return -1;
}
int err;
......
......@@ -63,7 +63,7 @@ public:
futex_wake_private(&_pending_signal, 10000);
}
private:
// higher 31 bits for signalling, MLB for stopping.
// higher 31 bits for signalling, LSB for stopping.
butil::atomic<int> _pending_signal;
};
......
......@@ -56,7 +56,7 @@ extern int bthread_timer_del(bthread_timer_t id);
// current implementation relies on EPOLL_CTL_ADD and EPOLL_CTL_DEL which
// are not scalable, don't use bthread_fd_*wait functions in performance
// critical scenario.
extern int bthread_fd_wait(int fd, unsigned epoll_events);
extern int bthread_fd_wait(int fd, unsigned events);
// Suspend caller thread until the file descriptor `fd' has `epoll_events'
// or CLOCK_REALTIME reached `abstime' if abstime is not NULL.
......
......@@ -19,118 +19,35 @@ __BEGIN_DECLS
struct pthread_spinlock_t {
dispatch_semaphore_t sem;
};
inline int pthread_spin_init (pthread_spinlock_t *__lock, int __pshared) {
inline int pthread_spin_init(pthread_spinlock_t *__lock, int __pshared) {
if (__pshared != 0) {
return EINVAL;
}
__lock->sem = dispatch_semaphore_create(1);
return 0;
}
inline int pthread_spin_destroy (pthread_spinlock_t *__lock) {
inline int pthread_spin_destroy(pthread_spinlock_t *__lock) {
// TODO(gejun): Not see any destructive API on dispatch_semaphore
(void)__lock;
return 0;
}
inline int pthread_spin_lock (pthread_spinlock_t *__lock) {
inline int pthread_spin_lock(pthread_spinlock_t *__lock) {
return (int)dispatch_semaphore_wait(__lock->sem, DISPATCH_TIME_FOREVER);
}
inline int pthread_spin_trylock (pthread_spinlock_t *__lock) {
inline int pthread_spin_trylock(pthread_spinlock_t *__lock) {
return dispatch_semaphore_wait(__lock->sem, DISPATCH_TIME_NOW) == 0;
}
inline int pthread_spin_unlock (pthread_spinlock_t *__lock) {
inline int pthread_spin_unlock(pthread_spinlock_t *__lock) {
return dispatch_semaphore_signal(__lock->sem);
}
__END_DECLS
// fake epoll (temporary)
enum EPOLL_EVENTS
{
EPOLLIN = 0x001,
#define EPOLLIN EPOLLIN
EPOLLPRI = 0x002,
#define EPOLLPRI EPOLLPRI
EPOLLOUT = 0x004,
#define EPOLLOUT EPOLLOUT
EPOLLRDNORM = 0x040,
#define EPOLLRDNORM EPOLLRDNORM
EPOLLRDBAND = 0x080,
#define EPOLLRDBAND EPOLLRDBAND
EPOLLWRNORM = 0x100,
#define EPOLLWRNORM EPOLLWRNORM
EPOLLWRBAND = 0x200,
#define EPOLLWRBAND EPOLLWRBAND
EPOLLMSG = 0x400,
#define EPOLLMSG EPOLLMSG
EPOLLERR = 0x008,
#define EPOLLERR EPOLLERR
EPOLLHUP = 0x010,
#define EPOLLHUP EPOLLHUP
EPOLLRDHUP = 0x2000,
#define EPOLLRDHUP EPOLLRDHUP
EPOLLONESHOT = (1 << 30),
#define EPOLLONESHOT EPOLLONESHOT
EPOLLET = (1 << 31)
#define EPOLLET EPOLLET
};
/* Valid opcodes ( "op" parameter ) to issue to epoll_ctl(). */
#define EPOLL_CTL_ADD 1 /* Add a file decriptor to the interface. */
#define EPOLL_CTL_DEL 2 /* Remove a file decriptor from the interface. */
#define EPOLL_CTL_MOD 3 /* Change file decriptor epoll_event structure. */
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event
{
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
} __attribute__ ((__packed__));
__BEGIN_DECLS
/* Creates an epoll instance. Returns an fd for the new instance.
The "size" parameter is a hint specifying the number of file
descriptors to be associated with the new instance. The fd
returned by epoll_create() should be closed with close(). */
inline int epoll_create (int __size) { return 0;}
/* Same as epoll_create but with an FLAGS parameter. The unused SIZE
parameter has been dropped. */
inline int epoll_create1 (int __flags) {return 0;}
/* Manipulate an epoll instance "epfd". Returns 0 in case of success,
-1 in case of error ( the "errno" variable will contain the
specific error code ) The "op" parameter is one of the EPOLL_CTL_*
constants defined above. The "fd" parameter is the target of the
operation. The "event" parameter describes which events the caller
is interested in and any associated user data. */
inline int epoll_ctl (int __epfd, int __op, int __fd,
struct epoll_event *__event) {return 0;}
/* Wait for events on an epoll instance "epfd". Returns the number of
triggered events returned in "events" buffer. Or -1 in case of
error with the "errno" variable set to the specific error code. The
"events" parameter is a buffer that will contain triggered
events. The "maxevents" is the maximum number of events to be
returned ( usually size of "events" ). The "timeout" parameter
specifies the maximum wait time in milliseconds (-1 == infinite).
This function is a cancellation point and therefore not marked with
. */
inline int epoll_wait (int __epfd, struct epoll_event *__events,
int __maxevents, int __timeout) { return 0;}
__END_DECLS
#elif defined(OS_LINUX)
#include <sys/epoll.h>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment