Merge lp:~michihenning/unity-scopes-api/fix-threadpool-race into lp:unity-scopes-api

Proposed by Michi Henning
Status: Merged
Approved by: Paweł Stołowski
Approved revision: 46
Merged at revision: 50
Proposed branch: lp:~michihenning/unity-scopes-api/fix-threadpool-race
Merge into: lp:unity-scopes-api
Diff against target: 372 lines (+198/-11)
7 files modified
include/scopes/internal/ThreadPool.h (+2/-3)
include/scopes/internal/ThreadSafeQueue.h (+27/-3)
src/internal/ThreadPool.cpp (+21/-4)
test/gtest/unity/api/scopes/internal/CMakeLists.txt (+1/-0)
test/gtest/unity/api/scopes/internal/ThreadPool/CMakeLists.txt (+4/-0)
test/gtest/unity/api/scopes/internal/ThreadPool/ThreadPool_test.cpp (+106/-0)
test/gtest/unity/api/scopes/internal/ThreadSafeQueue/ThreadSafeQueue_test.cpp (+37/-1)
To merge this branch: bzr merge lp:~michihenning/unity-scopes-api/fix-threadpool-race
Reviewer Review Type Date Requested Status
Paweł Stołowski (community) Approve
Review via email: mp+194283@code.launchpad.net

Commit message

Fixed race condition that could cause a segfault if a thread pool was created and immediately destroyed again.
This gets rid of the occasional failures we've been seeing in the Runtime and RuntimeImpl tests.

Description of the change

Fixed race condition that could cause a segfault if a thread pool was created and immediately destroyed again.
This gets rid of the occasional failures we've been seeing in the Runtime and RuntimeImpl tests.

To post a comment you must log in.
Revision history for this message
Michi Henning (michihenning) wrote :

Could someone review this please?

45. By Michi Henning

Merged trunk.

46. By Michi Henning

Made changes to work with latest trunk.

Added interlock to ThreadPool to wait until all the worker threads have been created, to prevent a worker thread
from calling into an already-destroyed ThreadSafeQueue.

Revision history for this message
Paweł Stołowski (stolowski) wrote :

Looks good and I don't see any random test failures anymore. +1

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'include/scopes/internal/ThreadPool.h'
2--- include/scopes/internal/ThreadPool.h 2013-11-01 12:44:48 +0000
3+++ include/scopes/internal/ThreadPool.h 2013-11-12 00:50:34 +0000
4@@ -24,7 +24,6 @@
5 #include <unity/UnityExceptions.h>
6
7 #include <future>
8-#include <thread>
9
10 namespace unity
11 {
12@@ -58,8 +57,8 @@
13 std::unique_ptr<TaskQueue> queue_;
14 std::vector<std::thread> threads_;
15 std::mutex mutex_;
16- std::condition_variable cond_;
17- bool done_;
18+ std::promise<void> threads_ready_;
19+ int num_threads_;
20 };
21
22 template<typename F>
23
24=== modified file 'include/scopes/internal/ThreadSafeQueue.h'
25--- include/scopes/internal/ThreadSafeQueue.h 2013-09-23 06:13:06 +0000
26+++ include/scopes/internal/ThreadSafeQueue.h 2013-11-12 00:50:34 +0000
27@@ -21,11 +21,12 @@
28
29 #include <unity/util/NonCopyable.h>
30
31-#include <cassert>
32+#include <atomic>
33 #include <condition_variable>
34 #include <mutex>
35 #include <queue>
36 #include <stdexcept>
37+#include <thread>
38
39 namespace unity
40 {
41@@ -51,6 +52,7 @@
42 ThreadSafeQueue();
43 ~ThreadSafeQueue() noexcept;
44
45+ void destroy() noexcept;
46 void push(T const& item);
47 void push(T&& item);
48 T wait_and_pop();
49@@ -63,20 +65,38 @@
50 mutable std::mutex mutex_;
51 std::condition_variable cond_;
52 bool done_;
53+ std::atomic<int> num_waiters_;
54 };
55
56 template<typename T>
57 ThreadSafeQueue<T>::ThreadSafeQueue() :
58- done_(false)
59+ done_(false),
60+ num_waiters_(0)
61 {
62 }
63
64 template<typename T>
65 ThreadSafeQueue<T>::~ThreadSafeQueue() noexcept
66 {
67+ destroy();
68+
69+ // Don't destroy the object while there are still threads in wait_and_pop(), otherwise
70+ // a thread that wakes up in wait_and_pop() will try to re-lock the already-destroyed
71+ // mutex.
72+ while (num_waiters_.load() > 0)
73+ std::this_thread::yield(); // LCOV_EXCL_LINE (impossible to reliably hit with a test)
74+}
75+
76+template<typename T>
77+void ThreadSafeQueue<T>::destroy() noexcept
78+{
79 std::lock_guard<std::mutex> lock(mutex_);
80+ if (done_)
81+ {
82+ return;
83+ }
84 done_ = true;
85- cond_.notify_all();
86+ cond_.notify_all(); // Wake up anyone asleep in wait_and_pop()
87 }
88
89 template<typename T>
90@@ -99,13 +119,17 @@
91 T ThreadSafeQueue<T>::wait_and_pop()
92 {
93 std::unique_lock<std::mutex> lock(mutex_);
94+ ++num_waiters_;
95 cond_.wait(lock, [this] { return done_ || queue_.size() != 0; });
96 if (done_)
97 {
98+ lock.unlock();
99+ --num_waiters_;
100 throw std::runtime_error("ThreadSafeQueue: queue destroyed while thread was blocked in wait_and_pop()");
101 }
102 T item = std::move(queue_.front());
103 queue_.pop();
104+ --num_waiters_;
105 return item;
106 }
107
108
109=== modified file 'src/internal/ThreadPool.cpp'
110--- src/internal/ThreadPool.cpp 2013-11-01 12:44:48 +0000
111+++ src/internal/ThreadPool.cpp 2013-11-12 00:50:34 +0000
112@@ -20,6 +20,8 @@
113
114 #include <unity/UnityExceptions.h>
115
116+#include <cassert>
117+
118 using namespace std;
119
120 namespace unity
121@@ -35,6 +37,7 @@
122 {
123
124 ThreadPool::ThreadPool(int size)
125+ : num_threads_(size)
126 {
127 if (size < 1)
128 {
129@@ -45,22 +48,29 @@
130
131 try
132 {
133+ {
134+ lock_guard<mutex> lock(mutex_);
135+ threads_ready_ = std::promise<void>();
136+ }
137 for (int i = 0; i < size; ++i)
138 {
139 threads_.push_back(std::thread(&ThreadPool::run, this));
140 }
141+ auto future = threads_ready_.get_future();
142+ future.wait();
143+ future.get();
144 }
145 catch (std::exception const&) // LCOV_EXCL_LINE
146 {
147- throw unity::ResourceException("ThreadPool(): exception during pool creation"); // LCOV_EXCL_LINE
148+ throw ResourceException("ThreadPool(): exception during pool creation"); // LCOV_EXCL_LINE
149 }
150 }
151
152 ThreadPool::~ThreadPool() noexcept
153 {
154+ queue_->destroy();
155 try
156 {
157- queue_.reset(nullptr);
158 for (size_t i = 0; i < threads_.size(); ++i)
159 {
160 threads_[i].join();
161@@ -74,14 +84,21 @@
162
163 void ThreadPool::run()
164 {
165+ TaskQueue::value_type task;
166 for (;;)
167 {
168- TaskQueue::value_type task;
169 try
170 {
171+ {
172+ lock_guard<mutex> lock(mutex_);
173+ if (--num_threads_ == 0)
174+ {
175+ threads_ready_.set_value();
176+ }
177+ }
178 task = queue_->wait_and_pop();
179 }
180- catch (...)
181+ catch (runtime_error const&)
182 {
183 return; // wait_and_pop() throws if the queue is destroyed while threads are blocked on it.
184 }
185
186=== modified file 'test/gtest/unity/api/scopes/internal/CMakeLists.txt'
187--- test/gtest/unity/api/scopes/internal/CMakeLists.txt 2013-10-28 07:06:08 +0000
188+++ test/gtest/unity/api/scopes/internal/CMakeLists.txt 2013-11-12 00:50:34 +0000
189@@ -8,6 +8,7 @@
190 add_subdirectory(RuntimeConfig)
191 add_subdirectory(RuntimeImpl)
192 add_subdirectory(ScopeLoader)
193+add_subdirectory(ThreadPool)
194 add_subdirectory(ThreadSafeQueue)
195 add_subdirectory(UniqueID)
196 add_subdirectory(zmq_middleware)
197
198=== added directory 'test/gtest/unity/api/scopes/internal/ThreadPool'
199=== added file 'test/gtest/unity/api/scopes/internal/ThreadPool/CMakeLists.txt'
200--- test/gtest/unity/api/scopes/internal/ThreadPool/CMakeLists.txt 1970-01-01 00:00:00 +0000
201+++ test/gtest/unity/api/scopes/internal/ThreadPool/CMakeLists.txt 2013-11-12 00:50:34 +0000
202@@ -0,0 +1,4 @@
203+add_executable(ThreadPool_test ThreadPool_test.cpp)
204+target_link_libraries(ThreadPool_test ${TESTLIBS})
205+
206+add_test(ThreadPool ThreadPool_test)
207
208=== added file 'test/gtest/unity/api/scopes/internal/ThreadPool/ThreadPool_test.cpp'
209--- test/gtest/unity/api/scopes/internal/ThreadPool/ThreadPool_test.cpp 1970-01-01 00:00:00 +0000
210+++ test/gtest/unity/api/scopes/internal/ThreadPool/ThreadPool_test.cpp 2013-11-12 00:50:34 +0000
211@@ -0,0 +1,106 @@
212+/*
213+ * Copyright (C) 2013 Canonical Ltd
214+ *
215+ * This program is free software: you can redistribute it and/or modify
216+ * it under the terms of the GNU General Public License version 3 as
217+ * published by the Free Software Foundation.
218+ *
219+ * This program is distributed in the hope that it will be useful,
220+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
221+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
222+ * GNU General Public License for more details.
223+ *
224+ * You should have received a copy of the GNU General Public License
225+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
226+ *
227+ * Authored by: Michi Henning <michi.henning@canonical.com>
228+ */
229+
230+#include <scopes/internal/ThreadPool.h>
231+
232+#include <gtest/gtest.h>
233+
234+using namespace std;
235+using namespace unity::api::scopes::internal;
236+
237+TEST(ThreadPool, basic)
238+{
239+ // Creation and destruction in quick succession
240+ {
241+ ThreadPool p(1);
242+ }
243+ {
244+ ThreadPool p(5);
245+ }
246+ {
247+ ThreadPool p(20);
248+ }
249+}
250+
251+atomic_int count;
252+
253+void f()
254+{
255+ ++count;
256+ this_thread::sleep_for(chrono::milliseconds(200));
257+}
258+
259+void g()
260+{
261+ ++count;
262+}
263+
264+TEST(ThreadPool, submit)
265+{
266+ {
267+ ThreadPool p(1);
268+ p.submit(f);
269+ p.submit(f);
270+ }
271+
272+ {
273+ ThreadPool p(5);
274+ p.submit(f);
275+ p.submit(f);
276+ }
277+
278+ {
279+ count = 0;
280+ ThreadPool p(20);
281+ p.submit(f);
282+ p.submit(f);
283+ this_thread::sleep_for(chrono::milliseconds(300));
284+ EXPECT_EQ(2, count);
285+ }
286+
287+ {
288+ count = 0;
289+ ThreadPool p(20);
290+ p.submit(g);
291+ p.submit(g);
292+ p.submit(g);
293+ this_thread::sleep_for(chrono::milliseconds(300));
294+ EXPECT_EQ(3, count);
295+ }
296+}
297+
298+TEST(ThreadPool, exception)
299+{
300+ try
301+ {
302+ ThreadPool p(0);
303+ }
304+ catch (unity::InvalidArgumentException const& e)
305+ {
306+ EXPECT_EQ("unity::InvalidArgumentException: ThreadPool(): invalid pool size: 0", e.to_string());
307+ }
308+
309+ try
310+ {
311+ ThreadPool p(-1);
312+ }
313+ catch (unity::InvalidArgumentException const& e)
314+ {
315+ EXPECT_EQ("unity::InvalidArgumentException: ThreadPool(): invalid pool size: -1", e.to_string());
316+ }
317+}
318
319=== modified file 'test/gtest/unity/api/scopes/internal/ThreadSafeQueue/ThreadSafeQueue_test.cpp'
320--- test/gtest/unity/api/scopes/internal/ThreadSafeQueue/ThreadSafeQueue_test.cpp 2013-11-01 12:44:48 +0000
321+++ test/gtest/unity/api/scopes/internal/ThreadSafeQueue/ThreadSafeQueue_test.cpp 2013-11-12 00:50:34 +0000
322@@ -77,13 +77,49 @@
323 unique_ptr<ThreadSafeQueue<string>> q(new ThreadSafeQueue<string>);
324 q->push("fred");
325 auto f = waiter_ready.get_future();
326- auto t = thread(&waiter_thread, q.get());
327+ auto t = thread(waiter_thread, q.get());
328 f.wait();
329 this_thread::sleep_for(chrono::milliseconds(50)); // Make sure child thread has time to call wait_and_pop()
330 q.reset();
331 t.join();
332 }
333
334+atomic_int count;
335+
336+void int_reader_thread(ThreadSafeQueue<int>* q)
337+{
338+ try
339+ {
340+ q->wait_and_pop();
341+ FAIL();
342+ }
343+ catch (std::runtime_error const&)
344+ {
345+ ++count;
346+ }
347+}
348+
349+TEST(ThreadSafeQueue, wait_for_threads)
350+{
351+ ThreadSafeQueue<int> q;
352+ count = 0;
353+ vector<thread> threads;
354+ for (auto i = 0; i < 20; ++i)
355+ {
356+ threads.push_back(thread(int_reader_thread, &q));
357+ }
358+ this_thread::sleep_for(chrono::milliseconds(300));
359+
360+ // Destroy the queue while multiple threads are sleeping in wait_and_pop().
361+ q.destroy();
362+
363+ for (auto& t : threads)
364+ {
365+ t.join();
366+ }
367+ EXPECT_EQ(20, count);
368+}
369+
370 class MoveOnly
371 {
372 public:

Subscribers

People subscribed via source and target branches

to all changes: