5.5、线程池同步机制类封装及线程池实现
代码地址
5.5、线程池同步机制类封装及线程池实现
- 1.线程池
- 2.代码实现
-
- ①锁
-
- Ⅰ、locker.h
- Ⅱ、locker.cpp
- ②条件变量
-
- Ⅰ、cond.h
- Ⅱ、cond.cpp
- ③信号量
-
- Ⅰ、sem.h
- Ⅱ、sem.cpp
- ④线程池
-
- Ⅰ、threadpool.h
- Ⅱ、threadpool.cpp
1.线程池
线程池是由服务器预先创建的一组子线程,线程池中的线程数量应该和 CPU
数量差不多。线程池中的所有子线程都运行着相同的代码。当有新的任务到来时,主线程将通过某种方式选择线程池中的某一个子线程来为之服务。相比与动态的创建子线程,选择一个已经存在的子线程的代价显然要小得多。至于主线程选择哪个子线程来为新任务服务,则有多种方式:
- 主线程使用某种算法来主动选择子线程。最简单、最常用的算法是随机算法和
Round Robin(轮流选取)
算法,但更优秀、更智能的算法将使任务在各个工作线程中更均匀地分配,从而减轻服务器的整体压力。 - 主线程和所有子线程通过一个共享的工作队列来同步,子线程都睡眠在该工作队列上。当有新的任务到来时,主线程将任务添加到工作队列中。这将唤醒正在等待任务的子线程,不过只有一个子线程将获得新任务的”接管权“,它可以从工作队列中取出任务并执行之,而其他子线程将继续睡眠在工作队列上。
线程池中的线程数量最直接的限制因素是中央处理器
(CPU)
的处理器(processors/cores)
的数量N
:如果你的CPU
是4-cores
的,对于CPU密集型
的任务(如视频剪辑等消耗CPU
计算资源的任务)来说,那线程池中的线程数量最好也设置为4
(或者+1
防止其他因素造成的线程阻塞);对于IO密集型
的任务,一般要多于CPU
的核数,因为线程间竞争的不是CPU
的计算资源而是IO
,IO
的处理一
般较慢,多于cores
数的线程将为CPU
争取更多的任务,不至在线程处理IO
的过程造成CPU
空闲导致资源浪费。
- 空间换时间,浪费服务器的硬件资源,换取运行效率。
- 池是一组资源的集合,这组资源在服务器启动之初就被完全创建好并初始化,这称为静态资源。
- 当服务器进入正式运行阶段,开始处理客户请求的时候,如果它需要相关的资源,可以直接从池中获取,无需动态分配。
- 当服务器处理完一个客户连接后,可以把相关的资源放回池中,无需执行系统调用释放资源。
2.代码实现
①锁
Ⅰ、locker.h
#ifndef LOCK_H
#define LOCK_H#include <pthread.h>class Locker {
private:// 互斥锁pthread_mutex_t m_mutex;public:Locker();~Locker();// 加锁bool lock();// 解锁bool unlock();// 获取锁;pthread_mutex_t * get();};#endif
Ⅱ、locker.cpp
#include "locker.h"
#include <exception>// 构造函数,初始化 m_mutex
Locker::Locker() {if (pthread_mutex_init(&m_mutex, NULL) != 0) {throw std::exception();}
}// 析构函数,销毁 m_mutex
Locker::~Locker() {pthread_mutex_destroy(&m_mutex);
}// 给 m_mutex 加锁
bool Locker::lock() {return pthread_mutex_lock(&m_mutex) == 0;
}// 解开 m_mutex 的锁
bool Locker::unlock() {return pthread_mutex_unlock(&m_mutex) == 0;
}// 获取 m_mutex
pthread_mutex_t * Locker::get() {return &m_mutex;
}
②条件变量
Ⅰ、cond.h
#ifndef COND_H
#define COND_H#include <pthread.h>
#include <time.h>class Cond {private:// 条件变量pthread_cond_t m_cond;public:Cond();~Cond();// 阻塞等待唤醒bool wait(pthread_mutex_t *);// 超时等待bool timedwait(pthread_mutex_t *, timespec);// 唤醒单个等待条件bool signal();// 唤醒全部等待条件bool broadcast();};#endif
Ⅱ、cond.cpp
#include "cond.h"
#include <exception>// 构造函数,初始化 m_cond
Cond::Cond() {if (pthread_cond_init(&m_cond, NULL) != 0) {throw std::exception();}
}// 销毁 m_cond
Cond::~Cond() {pthread_cond_destroy(&m_cond);
}// 条件阻塞等待唤醒 m_cond
bool Cond::wait(pthread_mutex_t * mutex) {return pthread_cond_wait(&m_cond, mutex) == 0;
}// 超时等待条件阻塞
bool Cond::timedwait(pthread_mutex_t * mutex, timespec t) {return pthread_cond_timedwait(&m_cond, mutex, &t) == 0;
}// 唤醒等待进程
bool Cond::signal() {return pthread_cond_signal(&m_cond) == 0;
}// 唤醒全部进程
bool Cond::broadcast() {return pthread_cond_broadcast(&m_cond) == 0;
}
③信号量
Ⅰ、sem.h
#ifndef SEM_H
#define SEM_H#include <semaphore.h>class Sem {
private:// 信号量sem_t m_sem;public:Sem();Sem(int);~Sem();// 信号量的加锁,如果为0,就阻塞,调用一个减 1 bool wait();// 调用一次 + 1bool post();};#endif
Ⅱ、sem.cpp
#include "sem.h"
#include <exception>// 构造函数
Sem::Sem() {if (sem_init(&m_sem, 0, 0) != 0) {throw std::exception();}
}// 构造函数带初始值 num
Sem::Sem(int num) {if (sem_init(&m_sem, NULL, num) != 0) {throw std::exception();}
}// 析构函数,销毁信号量 m_sem
Sem::~Sem() {sem_destroy(&m_sem);
}// 阻塞信号量,数值 -1
bool Sem::wait() {return sem_wait(&m_sem) == 0;
}// 增加信号的值,+1
bool Sem::post() {return sem_post(&m_sem) == 0;
}
④线程池
Ⅰ、threadpool.h
#ifndef THREADPOOL_H
#define THREADPOOL_H#include <list>
#include "locker.h"
#include "sem.h"
#include "cond.h"// 线程池
template<class T>
class ThreadPool {private:// 线程的数量int m_thread_number;// 线程数组,大小为m_thread_numberpthread_t * m_threads;// 允许等待的最大数量int m_max_requests;// 请求队列,需要处理的任务std::list<T*> m_workqueue;// 请求队列的互斥锁locker m_queuelocker;// 是否有任务需要处理,信号量的数量sem m_queuestat;// 是否结束进程bool m_stop;public://thread_number是线程池中线程的数量//max_requests是请求队列中最多允许的、等待处理的请求的数量ThreadPool(int thraed_number = 8, int max_requests = 10000);// 析构函数,销毁线程的数据~ThreadPool();// 增加任务进工作队列中bool append(T *);private:// 创建线程之后的运行函数void * worker(void * arg);// 取出队列中任务,不断的运行线程处理任务void run();};#endif
Ⅱ、threadpool.cpp
#include "threadpool.h"
#include <iostream>// 线程池的构造函数
template<class T>
ThreadPool<T>::ThreadPool(int thread_numebr, int max_requests) {// 传入错误的参数if (thread_number <= 0 || max_requests <= 0) {throw std::exception();}// 属性赋值m_thread_number = thread_numebr;m_max_requests = max_requests;m_stop = false;m_queuelocker = locker();m_queuestat = sem();m_workqueue.clear();// 创建线程数组m_threads = new pthread_t[m_thread_number];if (m_threads == nullptr) {throw std::exception();}// 创建线程for (int i = 0; i < m_thread_number; i ++ ) {printf("create the %dth thread\\n", i);// 如果创建失败返回if (pthread_create(m_threads[i], nullptr, worker, this) != 0) {delete[] m_threads;throw std::exception();}// 创建线程分离if (pthread_detach(m_threads[i]) != 0) {delete[] m_threads;throw std::exception();}}
}// 线程池的析构函数,销毁一些变量
template<class T>
ThreadPool<T>::~ThreadPool() {delete[] m_threads;m_stop = true;m_workqueue.clear();
}template<class T>
bool ThreadPool<T>::append(T * requests) {// 操作工作队列时一定要加锁,因为它被所有线程共享。m_queuelocker.lock();if (m_workqueue.size() >= m_max_requests) {m_queuelocker.unlock();return false;}m_workqueue.push_back(requests);m_queuelocker.unlock();m_queuestat.post();return true;}// 创建的线程需要运行的函数
template<class T>
void * ThreadPool<T>::worker(void * arg) {ThreadPool * p = (ThreadPool *) arg;p->run();return p;
}// 线程池处理函数
template<class T>
void ThreadPool<T>::run() {while (!m_stop) {// 信号量不为0,可以运行m_queuestat.wait();m_queuelocker.lock();// 获取队列第一个请求T * requests = m_workqueue.front();// 第一个请求已经取出执行,可以pop掉m_workqueue.pop_front();// 解锁m_queuelocker.unlock();if (requests == nullptr) {continue;}// 请求的处理requests->process();}}