> 文章列表 > 5.5、线程池同步机制类封装及线程池实现

5.5、线程池同步机制类封装及线程池实现

5.5、线程池同步机制类封装及线程池实现

代码地址

5.5、线程池同步机制类封装及线程池实现

  • 1.线程池
  • 2.代码实现
    • ①锁
      • Ⅰ、locker.h
      • Ⅱ、locker.cpp
    • ②条件变量
      • Ⅰ、cond.h
      • Ⅱ、cond.cpp
    • 信号量
      • Ⅰ、sem.h
      • Ⅱ、sem.cpp
    • ④线程池
      • Ⅰ、threadpool.h
      • Ⅱ、threadpool.cpp

1.线程池

线程池是由服务器预先创建的一组子线程,线程池中的线程数量应该和 CPU 数量差不多。线程池中的所有子线程都运行着相同的代码。当有新的任务到来时,主线程将通过某种方式选择线程池中的某一个子线程来为之服务。相比与动态的创建子线程,选择一个已经存在的子线程的代价显然要小得多。至于主线程选择哪个子线程来为新任务服务,则有多种方式:

  • 主线程使用某种算法来主动选择子线程。最简单、最常用的算法是随机算法和 Round Robin(轮流选取)算法,但更优秀、更智能的算法将使任务在各个工作线程中更均匀地分配,从而减轻服务器的整体压力。
  • 主线程和所有子线程通过一个共享的工作队列来同步,子线程都睡眠在该工作队列上。当有新的任务到来时,主线程将任务添加到工作队列中。这将唤醒正在等待任务的子线程,不过只有一个子线程将获得新任务的”接管权“,它可以从工作队列中取出任务并执行之,而其他子线程将继续睡眠在工作队列上。
    5.5、线程池同步机制类封装及线程池实现

线程池中的线程数量最直接的限制因素是中央处理器(CPU)的处理器(processors/cores)的数量N :如果你的CPU4-cores的,对于CPU密集型的任务(如视频剪辑等消耗CPU计算资源的任务)来说,那线程池中的线程数量最好也设置为4(或者+1防止其他因素造成的线程阻塞);对于IO密集型的任务,一般要多于CPU的核数,因为线程间竞争的不是CPU的计算资源而是IOIO的处理一
般较慢,多于cores数的线程将为CPU争取更多的任务,不至在线程处理IO的过程造成CPU空闲导致资源浪费。

  • 空间换时间,浪费服务器的硬件资源,换取运行效率。
  • 池是一组资源的集合,这组资源在服务器启动之初就被完全创建好并初始化,这称为静态资源。
  • 当服务器进入正式运行阶段,开始处理客户请求的时候,如果它需要相关的资源,可以直接从池中获取,无需动态分配。
  • 当服务器处理完一个客户连接后,可以把相关的资源放回池中,无需执行系统调用释放资源。

2.代码实现

①锁

Ⅰ、locker.h

#ifndef LOCK_H
#define LOCK_H#include <pthread.h>class Locker {
private:// 互斥锁pthread_mutex_t m_mutex;public:Locker();~Locker();// 加锁bool lock();// 解锁bool unlock();// 获取锁;pthread_mutex_t * get();};#endif

Ⅱ、locker.cpp

#include "locker.h"
#include <exception>// 构造函数,初始化 m_mutex
Locker::Locker() {if (pthread_mutex_init(&m_mutex, NULL) != 0) {throw std::exception();}
}// 析构函数,销毁 m_mutex
Locker::~Locker() {pthread_mutex_destroy(&m_mutex);
}// 给 m_mutex 加锁
bool Locker::lock() {return pthread_mutex_lock(&m_mutex) == 0;
}// 解开 m_mutex 的锁
bool Locker::unlock() {return pthread_mutex_unlock(&m_mutex) == 0;
}// 获取 m_mutex
pthread_mutex_t * Locker::get() {return &m_mutex;
}

②条件变量

Ⅰ、cond.h

#ifndef COND_H
#define COND_H#include <pthread.h>
#include <time.h>class Cond {private:// 条件变量pthread_cond_t m_cond;public:Cond();~Cond();// 阻塞等待唤醒bool wait(pthread_mutex_t *);// 超时等待bool timedwait(pthread_mutex_t *, timespec);// 唤醒单个等待条件bool signal();// 唤醒全部等待条件bool broadcast();};#endif

Ⅱ、cond.cpp

#include "cond.h"
#include <exception>// 构造函数,初始化 m_cond
Cond::Cond() {if (pthread_cond_init(&m_cond, NULL) != 0) {throw std::exception();}
}// 销毁 m_cond
Cond::~Cond() {pthread_cond_destroy(&m_cond);
}// 条件阻塞等待唤醒 m_cond
bool Cond::wait(pthread_mutex_t * mutex) {return pthread_cond_wait(&m_cond, mutex) == 0;
}// 超时等待条件阻塞
bool Cond::timedwait(pthread_mutex_t * mutex, timespec t) {return pthread_cond_timedwait(&m_cond, mutex, &t) == 0;
}// 唤醒等待进程
bool Cond::signal() {return pthread_cond_signal(&m_cond) == 0;
}// 唤醒全部进程
bool Cond::broadcast() {return pthread_cond_broadcast(&m_cond) == 0;
}

③信号量

Ⅰ、sem.h

#ifndef SEM_H
#define SEM_H#include <semaphore.h>class Sem {
private:// 信号量sem_t m_sem;public:Sem();Sem(int);~Sem();// 信号量的加锁,如果为0,就阻塞,调用一个减 1 bool wait();// 调用一次 + 1bool post();};#endif

Ⅱ、sem.cpp

#include "sem.h"
#include <exception>// 构造函数
Sem::Sem() {if (sem_init(&m_sem, 0, 0) != 0) {throw std::exception();}
}// 构造函数带初始值 num
Sem::Sem(int num) {if (sem_init(&m_sem, NULL, num) != 0) {throw std::exception();}
}// 析构函数,销毁信号量 m_sem
Sem::~Sem() {sem_destroy(&m_sem);
}// 阻塞信号量,数值 -1
bool Sem::wait() {return sem_wait(&m_sem) == 0;
}// 增加信号的值,+1
bool Sem::post() {return sem_post(&m_sem) == 0;
}

④线程池

Ⅰ、threadpool.h

#ifndef THREADPOOL_H
#define THREADPOOL_H#include <list>
#include "locker.h"
#include "sem.h"
#include "cond.h"// 线程池
template<class T>
class ThreadPool {private:// 线程的数量int m_thread_number;// 线程数组,大小为m_thread_numberpthread_t * m_threads;// 允许等待的最大数量int m_max_requests;// 请求队列,需要处理的任务std::list<T*> m_workqueue;// 请求队列的互斥锁locker m_queuelocker;// 是否有任务需要处理,信号量的数量sem m_queuestat;// 是否结束进程bool m_stop;public://thread_number是线程池中线程的数量//max_requests是请求队列中最多允许的、等待处理的请求的数量ThreadPool(int thraed_number = 8, int max_requests = 10000);// 析构函数,销毁线程的数据~ThreadPool();// 增加任务进工作队列中bool append(T *);private:// 创建线程之后的运行函数void * worker(void * arg);// 取出队列中任务,不断的运行线程处理任务void run();};#endif

Ⅱ、threadpool.cpp

#include "threadpool.h"
#include <iostream>// 线程池的构造函数
template<class T>
ThreadPool<T>::ThreadPool(int thread_numebr, int max_requests) {// 传入错误的参数if (thread_number <= 0 || max_requests <= 0) {throw std::exception();}// 属性赋值m_thread_number = thread_numebr;m_max_requests = max_requests;m_stop = false;m_queuelocker = locker();m_queuestat = sem();m_workqueue.clear();// 创建线程数组m_threads = new pthread_t[m_thread_number];if (m_threads == nullptr) {throw std::exception();}// 创建线程for (int i = 0; i < m_thread_number; i ++ ) {printf("create the %dth thread\\n", i);// 如果创建失败返回if (pthread_create(m_threads[i], nullptr, worker, this) != 0) {delete[] m_threads;throw std::exception();}// 创建线程分离if (pthread_detach(m_threads[i]) != 0) {delete[] m_threads;throw std::exception();}}
}// 线程池的析构函数,销毁一些变量
template<class T>
ThreadPool<T>::~ThreadPool() {delete[] m_threads;m_stop = true;m_workqueue.clear();
}template<class T>
bool ThreadPool<T>::append(T * requests) {// 操作工作队列时一定要加锁,因为它被所有线程共享。m_queuelocker.lock();if (m_workqueue.size() >= m_max_requests) {m_queuelocker.unlock();return false;}m_workqueue.push_back(requests);m_queuelocker.unlock();m_queuestat.post();return true;}// 创建的线程需要运行的函数
template<class T>
void * ThreadPool<T>::worker(void * arg) {ThreadPool * p = (ThreadPool *) arg;p->run();return p;
}// 线程池处理函数
template<class T>
void ThreadPool<T>::run() {while (!m_stop) {// 信号量不为0,可以运行m_queuestat.wait();m_queuelocker.lock();// 获取队列第一个请求T * requests = m_workqueue.front();// 第一个请求已经取出执行,可以pop掉m_workqueue.pop_front();// 解锁m_queuelocker.unlock();if (requests == nullptr) {continue;}// 请求的处理requests->process();}}