[C++11]std::condition_variable notes
condition_variable 类是同步原语,能用于阻塞一个线程,或同时阻塞多个线程,直至另一线程修改共享变量(条件)并通知 condition_variable 。
用人话来说,condition_variable,也就是条件变量,是线程间通信的一种方式。
线程之间在很多时候需要通信,比如经典的生产者消费者问题
一个比较naive的方案是,用mutex来保护一个flag,然后另一线程不停得check这个flag的状态是否改变。以及在这个方案上的改进:让另一个线程check之后,可以先睡一段时间。
但是这两种方法都不够好。第一种不好的原因当然是不停得check,肯定会耗费大量的资源。而第二种,由于没办法准确估计要休眠的时间,因此不够实际。
这个时候我们可以考虑使用条件变量。
条件变量是可以用在如下场景: 一个或者多个线程在等某个条件的成立,而这个条件由另外的线程所控制。当该条件成立时,控制该条件的线程会主动通知这些线程,将这些线程唤醒。
如下是一个最简单的例子:
1 std::mutex mut;
2 std::queue<data_chunk> data_queue; // 1
3 std::condition_variable data_cond;
4
5 void data_preparation_thread()
6 {
7 while(more_data_to_prepare())
8 {
9 data_chunk const data=prepare_data();
10 std::lock_guard<std::mutex> lk(mut);
11 data_queue.push(data); // 2
12 data_cond.notify_one(); // 3
13 }
14 }
15
16 void data_processing_thread()
17 {
18 while(true)
19 {
20 std::unique_lock<std::mutex> lk(mut); // 4
21 data_cond.wait(
22 lk,[]{return !data_queue.empty();}); // 5
23 data_chunk data=data_queue.front();
24 data_queue.pop();
25 lk.unlock(); // 6
26 process(data);
27 if(is_last_chunk(data))
28 break;
29 }
30 }
接下来是一个较为复杂的例子,一个线程安全的队列的实现,
1 #include <queue>
2 #include <memory>
3 #include <mutex>
4 #include <condition_variable>
5
6 template<typename T>
7 class threadsafe_queue
8 {
9 private:
10 mutable std::mutex mut; // 1 互斥量必须是可变的
11 std::queue<T> data_queue;
12 std::condition_variable data_cond;
13 public:
14 threadsafe_queue()
15 {}
16 threadsafe_queue(threadsafe_queue const& other)
17 {
18 std::lock_guard<std::mutex> lk(other.mut);
19 data_queue=other.data_queue;
20 }
21
22 void push(T new_value)
23 {
24 std::lock_guard<std::mutex> lk(mut);
25 data_queue.push(new_value);
26 data_cond.notify_one();
27 }
28
29 void wait_and_pop(T& value)
30 {
31 std::unique_lock<std::mutex> lk(mut);
32 data_cond.wait(lk,[this]{return !data_queue.empty();});
33 value=data_queue.front();
34 data_queue.pop();
35 }
36
37 std::shared_ptr<T> wait_and_pop()
38 {
39 std::unique_lock<std::mutex> lk(mut);
40 data_cond.wait(lk,[this]{return !data_queue.empty();});
41 std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
42 data_queue.pop();
43 return res;
44 }
45
46 bool try_pop(T& value)
47 {
48 std::lock_guard<std::mutex> lk(mut);
49 if(data_queue.empty())
50 return false;
51 value=data_queue.front();
52 data_queue.pop();
53 return true;
54 }
55
56 std::shared_ptr<T> try_pop()
57 {
58 std::lock_guard<std::mutex> lk(mut);
59 if(data_queue.empty())
60 return std::shared_ptr<T>();
61 std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
62 data_queue.pop();
63 return res;
64 }
65
66 bool empty() const
67 {
68 std::lock_guard<std::mutex> lk(mut);
69 return data_queue.empty();
70 }
71 };
以及,虽然现在,这样线程安全的数据结构应该早就有人封装好了,我们直接使用就行了,比如之前用过的intel tbb的concurrent_queue,不过如果对内部实现一无所知,大概没办法感觉到这种精巧之处吧。。
参考资料:
以及借鉴了 c++并发编程第四章中格式化后的代码