muduo网络库学习笔记(四) 通过eventfd实现的事件通知机制

muduo网络库学习笔记(四) 通过eventfd实现的事件通知机制


上篇文章为EventLoop添加了一个定时器Fd,为EventLoop增加了3个接口:runAfter()、runAt()、runEvery()、这三个接口用于处理定时任务和周期任务. 底层通过封装TimerFd实现。

	TimerId runAt(const TimeStamp& time, const NetCallBacks::TimerCallBack& cb);
TimerId runAfter(double delay, const NetCallBacks::TimerCallBack& cb);
TimerId runEvery(double interval, const NetCallBacks::TimerCallBack& cb);

今天为EventLoop添加另一个Fd:EventFd, 用于实现线程间的事件通知机制.本文会先介绍eventfd的使用,然后给出muduo中EventLoop对eventfd的封装.

eventfd的使用

eventfd系统函数

eventfd  - 事件通知文件描述符

#include <sys/eventfd.h>

int eventfd(unsigned int initval ,int flags );

创建一个能被用户应用程序用于时间等待唤醒机制的eventfd对象.

initval :

eventfd()创建一个可用作事件的“eventfd对象”用户空间应用程序和内核等待/通知机制通知用户空间应用程序的事件。该对象包含一个由内核维护的无符号64位整型(uint64_t)计数器。此计数器的初始值通过initval指定。一般设0.

flags

以下标志中按位OR运算以更改eventfd()的行为,(文件中常用的这两个flags肯定都懂意思吧,就不翻译了,第三个信号量的不管它.):

   EFD_CLOEXEC (since Linux 2.6.27)
Set the close-on-exec (FD_CLOEXEC) flag on the new file
descriptor. See the description of the O_CLOEXEC flag in
open(2) for reasons why this may be useful. EFD_NONBLOCK (since Linux 2.6.27)
Set the O_NONBLOCK file status flag on the new open file
description. Using this flag saves extra calls to fcntl(2) to
achieve the same result. EFD_SEMAPHORE (since Linux 2.6.30)
Provide semaphore-like semantics for reads from the new file
descriptor. See below.

read(2)

成功读取返回一个8byte的整数。read(2)如果提供的缓冲区的大小小于8个字节返回错误EINVAL

write (2)

将缓冲区写入的8字节整形值加到内核计数器上。可以写入的最大值

是计数器中是最大的无符号64位值减1(即0xfffffffffffffffe)。

返回值:

On success, eventfd() returns a new eventfd file descriptor. On error, -1 is returned and errno is set to indicate the error.

使用示例

#include <iostream>
#include <assert.h>
#include <poll.h>
#include <signal.h>
#include <sys/eventfd.h>
#include <unistd.h>
#include <string.h>
#include <thread> static int s_efd = 0; int createEventfd()
{
int evtfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); std::cout << "createEventfd() fd : " << evtfd << std::endl; if (evtfd < 0)
{
std::cout << "Failed in eventfd\n";
abort();
} return evtfd;
} void testThread()
{
int timeout = 0;
while(timeout < 3) {
sleep(1);
timeout++;
} uint64_t one = 1;
ssize_t n = write(s_efd, &one, sizeof one);
if(n != sizeof one)
{
std::cout << " writes " << n << " bytes instead of 8\n";
}
} int main()
{
s_efd = createEventfd(); fd_set rdset;
FD_ZERO(&rdset);
FD_SET(s_efd, &rdset); struct timeval timeout;
timeout.tv_sec = 1;
timeout.tv_usec = 0; std::thread t(testThread); while(1)
{
if(select(s_efd + 1, &rdset, NULL, NULL, &timeout) == 0)
{
std::cout << "timeout\n";
timeout.tv_sec = 1;
timeout.tv_usec = 0;
FD_SET(s_efd, &rdset);
continue;
} uint64_t one = 0; ssize_t n = read(s_efd, &one, sizeof one);
if(n != sizeof one)
{
std::cout << " read " << n << " bytes instead of 8\n";
} std::cout << " wakeup !\n"; break;
} t.join();
close(s_efd); return 0;
}
./test.out
createEventfd() fd : 3
timeout
timeout
timeout
wakeup !

eventfd 单纯的使用文件描述符实现的线程间的通知机制,可以很好的融入select、poll、epoll的I/O复用机制中.

EventLoop对eventfd的封装

所增加的接口及成员:

	typedef std::function<void()> Functor;
void runInLoop(const Functor& cb);
void wakeup(); //是写m_wakeupFd 通知poll 处理读事件.
void queueInLoop(const Functor& cb);
private:
//used to waked up
void handleRead();
void doPendingFunctors(); int m_wakeupFd;
std::unique_ptr<Channel> p_wakeupChannel;
mutable MutexLock m_mutex;
bool m_callingPendingFunctors; /* atomic */
std::vector<Functor> m_pendingFunctors; // @GuardedBy mutex_

工作时序

(runInLoop() -> quueInLoop())/queueInLoop() -> wakeup() -> poll() -> handleRead() -> doPendingFunctors()

runInLoop()

如果用户在当前IO线程调用这个函数, 回调会同步进行; 如果用户在其他线程调用runInLoop(),cb会被加入队列, IO线程会被唤醒来调用这个Functor.

void EventLoop::runInLoop(const Functor&  cb)
{
if(isInloopThread())
cb();
else
queueInLoop(cb);
}

queueInLoop()

会将回调添加到容器,同时通过wakeup()唤醒poll()调用容器内的回调.

void EventLoop::queueInLoop(const Functor& cb)
{
LOG_TRACE << "EventLoop::queueInLoop()";
{
MutexLockGuard lock(m_mutex);
m_pendingFunctors.push_back(std::move(cb));
} if(!isInloopThread())
{
wakeup();
}
}

内部实现,

wakeup()

写已注册到poll的eventfd 通知poll 处理读事件.

//  m_wakeupFd(createEventfd()),
// p_wakeupChannel(new Channel(this, m_wakeupFd)),
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = sockets::write(m_wakeupFd, &one, sizeof one);
if(n != sizeof one)
{
LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
}
}

handleRead()

poll回调读事件,处理eventfd.

void EventLoop::handleRead() //handle wakeup Fd
{
LOG_TRACE << "EventLoop::handleRead() handle wakeup Fd";
uint64_t one = 1;
ssize_t n = sockets::read(m_wakeupFd, &one, sizeof one);
if(n != sizeof one)
{
LOG_ERROR << "EventLoop::handleRead() reads " << n << "bytes instead of 8";
}
doPendingFunctors();
}

doPendingFunctors()

处理挂起的事件.

void EventLoop::doPendingFunctors()
{
LOG_TRACE << "EventLoop::doPendingFunctors()";
std::vector<Functor> functors;
m_callingPendingFunctors = true; {
MutexLockGuard lock(m_mutex);
functors.swap(m_pendingFunctors);
} for(size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
} m_callingPendingFunctors = false; }

总结

本文主要介绍了muduo中EventLoop通过 通过封装一层eventfd实现的runInLoop()函数,使得其他线程想往EventLoop所在的I/O线程注册任务成为可能.

下篇文章会写Connector和Acceptor,链接器和监听器 实现第一条链接。

上一篇:Java多线程系列--“基础篇”06之 线程让步


下一篇:免费公共DNS服务器IP地址大全(2017年6月24日)