See More

// @Author Lin Ya // @Email [email protected] #include "EventLoop.h" #include #include #include #include "Util.h" #include "base/Logging.h" using namespace std; __thread EventLoop* t_loopInThisThread = 0; int createEventfd() { int evtfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (evtfd < 0) { LOG << "Failed in eventfd"; abort(); } return evtfd; } EventLoop::EventLoop() : looping_(false), poller_(new Epoll()), wakeupFd_(createEventfd()), quit_(false), eventHandling_(false), callingPendingFunctors_(false), threadId_(CurrentThread::tid()), pwakeupChannel_(new Channel(this, wakeupFd_)) { if (t_loopInThisThread) { // LOG << "Another EventLoop " << t_loopInThisThread << " exists in this // thread " << threadId_; } else { t_loopInThisThread = this; } // pwakeupChannel_->setEvents(EPOLLIN | EPOLLET | EPOLLONESHOT); pwakeupChannel_->setEvents(EPOLLIN | EPOLLET); pwakeupChannel_->setReadHandler(bind(&EventLoop::handleRead, this)); pwakeupChannel_->setConnHandler(bind(&EventLoop::handleConn, this)); poller_->epoll_add(pwakeupChannel_, 0); } void EventLoop::handleConn() { // poller_->epoll_mod(wakeupFd_, pwakeupChannel_, (EPOLLIN | EPOLLET | // EPOLLONESHOT), 0); updatePoller(pwakeupChannel_, 0); } EventLoop::~EventLoop() { // wakeupChannel_->disableAll(); // wakeupChannel_->remove(); close(wakeupFd_); t_loopInThisThread = NULL; } void EventLoop::wakeup() { uint64_t one = 1; ssize_t n = writen(wakeupFd_, (char*)(&one), sizeof one); if (n != sizeof one) { LOG << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; } } void EventLoop::handleRead() { uint64_t one = 1; ssize_t n = readn(wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; } // pwakeupChannel_->setEvents(EPOLLIN | EPOLLET | EPOLLONESHOT); pwakeupChannel_->setEvents(EPOLLIN | EPOLLET); } void EventLoop::runInLoop(Functor&& cb) { if (isInLoopThread()) cb(); else queueInLoop(std::move(cb)); } void EventLoop::queueInLoop(Functor&& cb) { { MutexLockGuard lock(mutex_); pendingFunctors_.emplace_back(std::move(cb)); } if (!isInLoopThread() || callingPendingFunctors_) wakeup(); } void EventLoop::loop() { assert(!looping_); assert(isInLoopThread()); looping_ = true; quit_ = false; // LOG_TRACE << "EventLoop " << this << " start looping"; std::vector ret; while (!quit_) { // cout << "doing" << endl; ret.clear(); ret = poller_->poll(); eventHandling_ = true; for (auto& it : ret) it->handleEvents(); eventHandling_ = false; doPendingFunctors(); poller_->handleExpired(); } looping_ = false; } void EventLoop::doPendingFunctors() { std::vector functors; callingPendingFunctors_ = true; { MutexLockGuard lock(mutex_); functors.swap(pendingFunctors_); } for (size_t i = 0; i < functors.size(); ++i) functors[i](); callingPendingFunctors_ = false; } void EventLoop::quit() { quit_ = true; if (!isInLoopThread()) { wakeup(); } }