muduo源码分析线程池

文章目录


线程池的实现原理:
在并发程序设计中,由于线程的反复创建于销毁是非常消耗时间的,在存在大量的线程的创建于销毁的程序中,我们可以事先创建出一部分线程,然后管理这些线程去处理我们的任务,这样可以节省一大部分反复创建与销毁的时间开销,线程池的好处这里不多说了,看一下muduo 网络库对线程池的处理

构造函数

ThreadPool::ThreadPool(const string& nameArg)
  : mutex_(),
  	//线程锁
    notEmpty_(mutex_),
    notFull_(mutex_),
    name_(nameArg),
    //线程池处理的最大的任务数
    maxQueueSize_(0),
    //标识线程池是否正在运行
    running_(false)
{
}

析构函数

ThreadPool::~ThreadPool()
{
  //如果线程池正在运行,停止线程池
  if (running_)
  {
    stop();
  }
}

start 线程池的启动函数

void ThreadPool::start(int numThreads)
{
	//判断线程池是否为空
  assert(threads_.empty());
	//线程池的运行状态设置为true
  running_ = true;
	//设置线程池的线程的个数
  threads_.reserve(numThreads);
  //创建numsThreads 个线程,并启动这些线程
  //这些线程在没有任务的时候进入睡眠状态,在Thread类中可以看出
  for (int i = 0; i < numThreads; ++i)
  {
    char id[32];
    snprintf(id, sizeof id, "%d", i+1);
    threads_.emplace_back(new muduo::Thread(
          std::bind(&ThreadPool::runInThread, this), name_+id));
	//启动线程,在Thread类中,启动一个线程
    threads_[i]->start();
  }
  if (numThreads == 0 && threadInitCallback_)
  {
    threadInitCallback_();
  }
}

stop 停止线程池

//用于回收所有的线程
void ThreadPool::stop()
{
  {
  MutexLockGuard lock(mutex_);
  running_ = false;
  //唤醒所有的线程
  notEmpty_.notifyAll();
  }
  for (auto& thr : threads_)
  {
    thr->join();
  }
}

run 线程池中添加一个任务

void ThreadPool::run(Task task)
{
  if (threads_.empty())
  {
  //如果没有子线程,就在主线程中执行该任务
    task();
  }
  else
  {
  	//对线程池队列加锁
    MutexLockGuard lock(mutex_);
    //如果任务队列满了,即线程池中的任务数达到了上限,就调用wait阻塞,直到可以添加任务位置
    while (isFull())
    {
      notFull_.wait();
    }
    assert(!isFull());

	//将任务放入队列
    queue_.push_back(std::move(task));
	//唤醒一个线程取执行任务
    notEmpty_.notify();
  }
}

take 线程池中的线程从任务队列取一个线程

ThreadPool::Task ThreadPool::take()
{
  MutexLockGuard lock(mutex_);
  // always use a while-loop, due to spurious wakeup
  //如果任务队列为空,并且线程池处于运行状态
  //则线程池中的线程wait 进入睡眠状态
  while (queue_.empty() && running_)
  {
  	//当前的子线程处于睡眠的状态
    notEmpty_.wait();
  }
  Task task;
  //任务队列不为空的时候
  if (!queue_.empty())
  {
  	//获取队列首部的任务
    task = queue_.front();
	//出队列一个任务
    queue_.pop_front();
    if (maxQueueSize_ > 0)
    {
    	//此时任务队列肯定不满,可以添加新的任务
      notFull_.notify();
    }
  }
  //将获得的任务返回执行
  return task;
}

runInThread 线程池中的线程的执行的区域

void ThreadPool::runInThread()
{
  try
  {
    if (threadInitCallback_)
    {
      threadInitCallback_();
    }
    while (running_)
    {
    	//Task task(take());	返回一个任务
      Task task(take());
      if (task)
      {
      //执行任务队列获得的任务
        task();
      }
    }
  }
  catch (const Exception& ex)
  {
    fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
    fprintf(stderr, "reason: %s\n", ex.what());
    fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
    abort();
  }
  catch (const std::exception& ex)
  {
    fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
    fprintf(stderr, "reason: %s\n", ex.what());
    abort();
  }
  catch (...)
  {
    fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str());
    throw; // rethrow
  }
}
上一篇:muduo学习一:简介


下一篇:muduo 网络库reactor 模式下 事件循环器的实现