C++并发编程实战笔记(一)线程概念与基本控制
tags: C++ Concurrency
写在前面
在C++ 中实现多线程还是很容易的, 不像C的pthreads接口, 下面来总结一下C++多线程的一些基本操作, 包括线程的创建, 合并, 分离, 获取ID等操作, 主要参考了C++并发编程实战(第二版)的第一二章, 这本书应该是C++并发必看的经典了.
另外参考:
std::thread;
一些有用的程序
用于辅助
#include <iostream>
#include <cassert> // 断言
#include <chrono> // 计时
#include <thread> // 线程using namespace std;
using namespace std::chrono; // 计时
using namespace std::literals; // 秒数字面量, C++14
睡眠
测试多线程, 不加睡眠系统实在是太容易假死了.
this_thread::sleep_for(1s); // 睡眠1s
计时
auto start = system_clock::now();
// ... 待计时的程序
auto end = system_clock::now();
auto duration = duration_cast<microseconds>(end - start);
cout << "Time spent: "<< double(duration.count()) * microseconds::period::num /microseconds::period::den<< "s" << endl;
线程基础
头文件
thread
查看硬件支持
我的是8核CPU.
#include <iostream>
#include <thread>int main(int argc, char const *argv[]) {// static methodstd::cout << std::thread::hardware_concurrency(); // 8return 0;
}
如果是1核, 那就只能实现并发而不能实现并行了.
创建与合并(join)
构造函数: 直接传入函数名(函数指针), 以及对应的参数(如果有), 需要注意线程的join(), 否则主线程不会等待子线程结束.
void fun() { cout << "Hello t1!\\n"; }void t1() {thread t1(&fun); // 传入函数指针if (t1.joinable()) cout << "t1 is joinable\\n", t1.join();// 令主线程等待子线程// t1 is joinable// Hello t1!
}
其他创建方法:
- 传入函数对象: 临时对象, 即右值
- 传入函数对象: 具名对象, 即左值
- 传入lambda表达式
void t2() {thread t2([] { cout << "Hello t2!\\n"; }); // lambda 表达式t2.join(); // Hello t2!
}struct Foo {void operator()() const { cout << "Hello t3!\\n"; }
};void t3() { // 传入临时函数对象// 二义性(烦人的分析机制), 参见Effective STL,// 即`只要C++语句有可能被解释成函数声明, 编译器就肯定将其解释为函数声明`// thread t3((foo())); // 由于存在函数指针二义性, 这里必须用圆括号包裹thread t3{Foo()}; // 同理, 这里用一致性初始化{}, 推荐这种方法t3.join(); // Hello t3!
}struct Foo1 {void operator()() const { cout << "Hello t4!\\n"; }
};void t4() {Foo1 f;thread t4(f);t4.join(); // Hello t4!
}void t5() {auto t5 = thread([] { cout << "Hello t5!\\n"; });t5.join(); // Hello t5!
}
事实上使用join()方法等待线程是一刀切
式的, 即要么不等待, 要么一直等待, 之后会采用期值(future)或者条件变量(condition_variable)来做.
并且线程只能被join一次.
int main() {//thread t1([] { cout << "AA\\n"; });t1.join(); //"AA"cout << t1.joinable() << endl; // 0t1.join(); // libc++abi: terminating with uncaught exception of type// std::__1::system_error: thread::join failed: Invalid argument
}
线程分离: detach
分离的线程不受主线程(即main函数)的管理, 而是由C++runtime库管理(成为daemon守护/后台进程).
但是分离线程之后就无法等待线程结束了
void t1() {thread t([] {cout << "detached thread\\n";this_thread::sleep_for(1s);});t.detach();assert(!t.joinable());this_thread::sleep_for(1s);cout << "Main thread\\n";
}int main(int argc, char const* argv[]) {auto start = system_clock::now();t1();auto end = system_clock::now();auto duration = duration_cast<microseconds>(end - start);cout << "Time spent: "<< double(duration.count()) * microseconds::period::num /microseconds::period::den<< "s" << endl;// detached thread// Main thread// Time spent: 1.00508sreturn 0;
}
可见主线程和分离的线程(几乎)同时结束. 耗时1s.
上面代码中, 如果用join而不是detach, 那么用时就是2s, 大家可以测试一下.
获取id
两种获取方法:
- 直接对thread对象调用成员函数
.get_id()
; - 通过在对应线程中(即传入线程的函数中)调用
this_thread::get_id()
.
int main() {cout << "null thread id: " << thread().get_id() << endl;cout << "null thread id: " << thread::id() << endl; // static functhread t1([] {cout << "Hello t1!\\n";cout << "t1 thread id(use this_thread::get_id): "<< this_thread::get_id() << endl;});cout << "main thread id: " << this_thread::get_id() << endl;cout << "t1 id(use t1.get_id): " << t1.get_id() << endl;t1.join();// null thread id: 0x0// null thread id: 0x0// main thread id: 0x1046d0580// t1 id(use t1.get_id): 0x16bc43000// Hello t1!// t1 thread id(use this_thread::get_id): 0x16bc43000
}
线程实战
参数传递的小问题
case 1: 常量引用
线程具有内部存储空间, 参数会按照默认方式先复制到该处, 新创建的线程才能直接访问它们.
然后, 这些副本被当成临时变量, 以右值形式传给新线程上的函数或者可调用对象.
即便函数的相关参数是引用, 上述过程依然会发生.
void oops() {//auto f = [](int i, string const& s) { cout << i << s << endl; };char buf[1024]; // 局部变量(自动变量 )snprintf(buf, 10, "%i", 100);// thread t(f, 3, buf); // buf 可能已销毁// 直接传入 buf 可能会出现安全性问题, 原因是参数传递本意是将 buf 隐式转换为// String, 再将其作为函数参数, 但转换不一定能及时开始(由于 thread// 的工作机制, 其构造函数需要原样复制所有传入的参数)thread t(f, 3, string(buf)); // 这样可以解决, 直接在传入之前进行构造t.detach();
}
自动变量: 代码块内声明或者定义的局部变量, 位于程序的栈区.
case 2: 非常量引用
// 传入一个非常量引用
class Widget {};
void oops_again() {auto f = [](int id, Widget& w) {};Widget w1;// 此时传入的 w1 是右值形式, move-only 型别, 因为非常量引用不能向其传递右值// thread t(f, 10, w1);thread t(f, 10, std::ref(w1));t.join();
}
针对非常量引用, 由于这种形参不能接受右值变量, 所以一定要加上std::ref
修饰(配接器)
case 3: 成员函数
class X {
public:void do_something() { cout << "do_something\\n"; }
};
void t2() {X my_x;// 向某个类的成员函数设定为线程函数, 需要传入函数指针, 指向该成员函数thread t(&X::do_something, &my_x);// 若考虑到对象指针, 成员函数的第一个形参实际上是其第二个实参// 向线程函数 传入的第三个参数就是成员函数的第一个参数
}
针对成员函数的参数传递, 需要考虑形参的顺序(将成员的地址作为成员函数的第一个参数, 然后才传入成员函数的参数)
case 4: 智能指针的控制权转移
void process(unique_ptr<X>){} // X 定义在 case 3
void t3(){unique_ptr<X> p(new X);p->do_something();thread t(process, std::move(p));// 通过 move 移交智能指针所指对象的控制权
}
通过 std::move() 移交控制权
移动语义支持
通过移动语义, thread可以实现控制权移交.
void f() { cout << "f()\\n"; }
void g() { cout << "g()\\n"; }void test1() {thread t1(f); // t1:ft1.join();thread t2 = move(t1); // t2:ft1 = thread(g); // t1:gt1.join();thread t3;t3 = move(t2); // t1:g t2:∅ t3:f// 运行f的线程归属权转移到t1, 该线程最初由t1启动, 但是在转移时,// t1已经关联到g的线程, 因此terminate()会被调用, 终止程序.t1 = move(t3); // 终止整个程序// f()// g()
}void f3(thread t) {}
void g3() {// 线程归属权可以转移到函数内部, 函数能够接收thread实例作为按右值传递的参数.f3(thread(f));thread t(f);f3(std::move(t));
}
std::move() 仅仅将左值强制类型转换为右值, 但是不进行其他操作, 真正移交控制权的时刻是 t2 的move构造调用时(初始化)
并行版的accumulate
template <typename Iterator, typename T>
struct accmuluate_block {void operator()(Iterator first, Iterator last, T& result) {result = accumulate(first, last, result);}
};template <typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init) {// 设置常量unsigned long const length = distance(first, last);if (!length) return init; // 如果计算区间为空, 返回初值unsigned long const min_per_thread = 25; // 每一个线程计算的数量unsigned long const max_threads = // 最大线程数(length + min_per_thread - 1) / min_per_thread;unsigned long const hardeare_threads = thread::hardware_concurrency(); // 8unsigned long const num_threads = // 实际线程数min(hardeare_threads != 0 ? hardeare_threads : 2, max_threads);unsigned long const block_size = length / num_threads;// 存放计算结果,vector<T> results(num_threads);// 设置线程存储vector<thread> threads(num_threads - 1);Iterator block_start = first;for (unsigned long i{}; i < num_threads - 1; ++i) {Iterator block_end = block_start;advance(block_end, block_size);threads[i] = thread(accmuluate_block<Iterator, T>(), block_start,block_end, ref(results[i])); // 这里使用ref适配器block_start = block_end;}accmuluate_block<Iterator, T>()(block_start, last,results[num_threads - 1]);for (auto& entry : threads) entry.join();// 汇总每一个线程分块的结果, 累加得到最终结果, 所以结果需要满足结合律// (double/float不满足, 所以可能与串行版accumulate结果有出入)return accumulate(results.begin(), results.end(), init);
}vector<int> get_vec() { // 生成测试数据vector<int> v;for (int i{}; i < 10000000; ++i) v.emplace_back(i);return v;
}void t1() {auto v = get_vec();auto start = system_clock::now();int ans = parallel_accumulate(v.begin(), v.end(), 0);// int ans = accumulate(v.begin(), v.end(), 0);auto end = system_clock::now();auto duration = duration_cast<microseconds>(end - start);cout << "Time spent: "<< double(duration.count()) * microseconds::period::num /microseconds::period::den<< "s" << endl;cout << ans;// with parallel:// Time spent: 0.014697s// -2014260032// without parallel:// Time spent: 0.083763s// -2014260032
}
确实是快了将近8倍…