Mir

Merge lp:~albaguirre/mir/recycle-compositor-threads-take2 into lp:mir

Proposed by Alberto Aguirre
Status: Merged
Approved by: Alberto Aguirre
Approved revision: no longer in the source branch.
Merged at revision: 1897
Proposed branch: lp:~albaguirre/mir/recycle-compositor-threads-take2
Merge into: lp:mir
Diff against target: 661 lines (+538/-8)
9 files modified
include.private/server/mir/thread/basic_thread_pool.h (+63/-0)
src/server/CMakeLists.txt (+2/-0)
src/server/compositor/multi_threaded_compositor.cpp (+8/-5)
src/server/compositor/multi_threaded_compositor.h (+5/-3)
src/server/thread/CMakeLists.txt (+12/-0)
src/server/thread/basic_thread_pool.cpp (+221/-0)
tests/unit-tests/CMakeLists.txt (+1/-0)
tests/unit-tests/thread/CMakeLists.txt (+5/-0)
tests/unit-tests/thread/test_basic_thread_pool.cpp (+221/-0)
To merge this branch: bzr merge lp:~albaguirre/mir/recycle-compositor-threads-take2
Reviewer Review Type Date Requested Status
PS Jenkins bot (community) continuous-integration Approve
Alexandros Frantzis (community) Approve
Review via email: mp+233609@code.launchpad.net

Commit message

Recycle compositor threads by using a thread pool (LP: #1362841)

Description of the change

Recycle compositor threads by using a thread pool

Adds a basic thread pool placed under server instead of common as I wanted to use mir::terminate_with_current_exception() and currently in use by server code only.

To post a comment you must log in.
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
Revision history for this message
Alexandros Frantzis (afrantzis) wrote :

Looks good.

review: Approve
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
Revision history for this message
Alberto Aguirre (albaguirre) wrote :

^--DemoPrivateProtobuf.client_calls_server had an exception....seems unrelated. Re-approving...

Revision history for this message
PS Jenkins bot (ps-jenkins) :
review: Approve (continuous-integration)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added directory 'include.private/server/mir/thread'
2=== added file 'include.private/server/mir/thread/basic_thread_pool.h'
3--- include.private/server/mir/thread/basic_thread_pool.h 1970-01-01 00:00:00 +0000
4+++ include.private/server/mir/thread/basic_thread_pool.h 2014-09-06 14:29:21 +0000
5@@ -0,0 +1,63 @@
6+/*
7+ * Copyright © 2014 Canonical Ltd.
8+ *
9+ * This program is free software: you can redistribute it and/or modify
10+ * it under the terms of the GNU Lesser General Public License version 3 as
11+ * published by the Free Software Foundation.
12+ *
13+ * This program is distributed in the hope that it will be useful,
14+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
15+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16+ * GNU Lesser General Public License for more details.
17+ *
18+ * You should have received a copy of the GNU Lesser General Public License
19+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
20+ *
21+ * Authored by: Alberto Aguirre <alberto.aguirre@canonical.com>
22+ */
23+
24+#ifndef MIR_THREAD_BASIC_THREAD_POOL_H_
25+#define MIR_THREAD_BASIC_THREAD_POOL_H_
26+
27+#include <functional>
28+#include <future>
29+#include <vector>
30+#include <memory>
31+#include <mutex>
32+
33+namespace mir
34+{
35+namespace thread
36+{
37+
38+class WorkerThread;
39+class BasicThreadPool
40+{
41+public:
42+ BasicThreadPool(int min_threads);
43+ ~BasicThreadPool();
44+
45+ std::future<void> run(std::function<void()> const& task);
46+
47+ typedef void const* TaskId;
48+ std::future<void> run(std::function<void()> const& task, TaskId id);
49+
50+ void shrink();
51+
52+private:
53+ BasicThreadPool(BasicThreadPool const&) = delete;
54+ BasicThreadPool& operator=(BasicThreadPool const&) = delete;
55+
56+ std::future<void> run(WorkerThread* t, std::function<void()> const& task, TaskId id);
57+ WorkerThread *find_thread_by(TaskId id);
58+ WorkerThread *find_idle_thread();
59+
60+ std::mutex mutex;
61+ int const min_threads;
62+ std::vector<std::unique_ptr<WorkerThread>> threads;
63+};
64+
65+}
66+}
67+
68+#endif
69
70=== modified file 'src/server/CMakeLists.txt'
71--- src/server/CMakeLists.txt 2014-09-05 03:37:19 +0000
72+++ src/server/CMakeLists.txt 2014-09-06 14:29:21 +0000
73@@ -13,6 +13,7 @@
74 add_subdirectory(scene/)
75 add_subdirectory(frontend/)
76 add_subdirectory(shell/)
77+add_subdirectory(thread/)
78
79 set(PREFIX "${CMAKE_INSTALL_PREFIX}")
80 set(EXEC_PREFIX "${CMAKE_INSTALL_PREFIX}")
81@@ -48,6 +49,7 @@
82 $<TARGET_OBJECTS:mirlogger>
83 $<TARGET_OBJECTS:mirnestedgraphics>
84 $<TARGET_OBJECTS:miroffscreengraphics>
85+ $<TARGET_OBJECTS:mirthread>
86 )
87
88 set(MIR_SERVER_REFERENCES
89
90=== modified file 'src/server/compositor/multi_threaded_compositor.cpp'
91--- src/server/compositor/multi_threaded_compositor.cpp 2014-09-05 03:37:19 +0000
92+++ src/server/compositor/multi_threaded_compositor.cpp 2014-09-06 14:29:21 +0000
93@@ -195,7 +195,8 @@
94 display_buffer_compositor_factory{db_compositor_factory},
95 report{compositor_report},
96 state{CompositorState::stopped},
97- compose_on_start{compose_on_start}
98+ compose_on_start{compose_on_start},
99+ thread_pool{1}
100 {
101 observer = std::make_shared<ms::LegacySceneChangeNotification>(
102 [this]()
103@@ -289,10 +290,12 @@
104 auto thread_functor_raw = new mc::CompositingFunctor{display_buffer_compositor_factory, buffer, report};
105 auto thread_functor = std::unique_ptr<mc::CompositingFunctor>(thread_functor_raw);
106
107- threads.push_back(std::thread{std::ref(*thread_functor)});
108+ futures.push_back(thread_pool.run(std::ref(*thread_functor), &buffer));
109 thread_functors.push_back(std::move(thread_functor));
110 });
111
112+ thread_pool.shrink();
113+
114 state = CompositorState::started;
115 }
116
117@@ -307,11 +310,11 @@
118 for (auto& f : thread_functors)
119 f->stop();
120
121- for (auto& t : threads)
122- t.join();
123+ for (auto& f : futures)
124+ f.wait();
125
126 thread_functors.clear();
127- threads.clear();
128+ futures.clear();
129
130 report->stopped();
131
132
133=== modified file 'src/server/compositor/multi_threaded_compositor.h'
134--- src/server/compositor/multi_threaded_compositor.h 2014-09-05 03:37:19 +0000
135+++ src/server/compositor/multi_threaded_compositor.h 2014-09-06 14:29:21 +0000
136@@ -20,11 +20,12 @@
137 #define MIR_COMPOSITOR_MULTI_THREADED_COMPOSITOR_H_
138
139 #include "mir/compositor/compositor.h"
140+#include "mir/thread/basic_thread_pool.h"
141
142 #include <mutex>
143 #include <memory>
144 #include <vector>
145-#include <thread>
146+#include <future>
147
148 namespace mir
149 {
150@@ -76,15 +77,16 @@
151 std::shared_ptr<CompositorReport> const report;
152
153 std::vector<std::unique_ptr<CompositingFunctor>> thread_functors;
154- std::vector<std::thread> threads;
155+ std::vector<std::future<void>> futures;
156
157 std::mutex state_guard;
158 CompositorState state;
159 bool compose_on_start;
160
161 void schedule_compositing(int number_composites);
162-
163+
164 std::shared_ptr<mir::scene::Observer> observer;
165+ mir::thread::BasicThreadPool thread_pool;
166 };
167
168 }
169
170=== added directory 'src/server/thread'
171=== added file 'src/server/thread/CMakeLists.txt'
172--- src/server/thread/CMakeLists.txt 1970-01-01 00:00:00 +0000
173+++ src/server/thread/CMakeLists.txt 2014-09-06 14:29:21 +0000
174@@ -0,0 +1,12 @@
175+set(
176+ MIR_THREAD_SRCS
177+
178+ basic_thread_pool.cpp
179+)
180+
181+ADD_LIBRARY(
182+ mirthread OBJECT
183+
184+ ${MIR_THREAD_SRCS}
185+)
186+
187
188=== added file 'src/server/thread/basic_thread_pool.cpp'
189--- src/server/thread/basic_thread_pool.cpp 1970-01-01 00:00:00 +0000
190+++ src/server/thread/basic_thread_pool.cpp 2014-09-06 14:29:21 +0000
191@@ -0,0 +1,221 @@
192+/*
193+ * Copyright © 2014 Canonical Ltd.
194+ *
195+ * This program is free software: you can redistribute it and/or modify it
196+ * under the terms of the GNU General Public License version 3,
197+ * as published by the Free Software Foundation.
198+ *
199+ * This program is distributed in the hope that it will be useful,
200+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
201+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
202+ * GNU General Public License for more details.
203+ *
204+ * You should have received a copy of the GNU General Public License
205+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
206+ *
207+ * Authored by: Alberto Aguirre <alberto.aguirre@canonical.com>
208+ */
209+
210+#include "mir/thread/basic_thread_pool.h"
211+#include <mir/run_mir.h>
212+
213+#include <deque>
214+#include <algorithm>
215+#include <condition_variable>
216+
217+namespace mt = mir::thread;
218+
219+namespace
220+{
221+
222+class Worker
223+{
224+public:
225+ Worker() : exiting{false}
226+ {
227+ }
228+
229+ ~Worker()
230+ {
231+ exit();
232+ }
233+
234+ void operator()() noexcept
235+ try
236+ {
237+ std::unique_lock<std::mutex> lock{state_mutex};
238+ while (!exiting)
239+ {
240+ task_available_cv.wait(lock, [&]{ return exiting || !tasks.empty(); });
241+
242+ if (!exiting)
243+ {
244+ auto& task = tasks.front();
245+ lock.unlock();
246+ task();
247+ lock.lock();
248+ tasks.pop_front();
249+ }
250+ }
251+ }
252+ catch(...)
253+ {
254+ mir::terminate_with_current_exception();
255+ }
256+
257+ void queue_task(std::packaged_task<void()> task)
258+ {
259+ std::lock_guard<std::mutex> lock{state_mutex};
260+ tasks.push_back(std::move(task));
261+ task_available_cv.notify_one();
262+ }
263+
264+ void exit()
265+ {
266+ std::lock_guard<std::mutex> lock{state_mutex};
267+ exiting = true;
268+ task_available_cv.notify_one();
269+ }
270+
271+ bool is_idle() const
272+ {
273+ std::lock_guard<std::mutex> lock{state_mutex};
274+ return tasks.empty();
275+ }
276+
277+private:
278+ std::deque<std::packaged_task<void()>> tasks;
279+ bool exiting;
280+ std::mutex mutable state_mutex;
281+ std::condition_variable task_available_cv;
282+};
283+
284+}
285+
286+namespace mir
287+{
288+namespace thread
289+{
290+class WorkerThread
291+{
292+public:
293+ WorkerThread(mt::BasicThreadPool::TaskId an_id)
294+ : thread{std::ref(worker)},
295+ id_{an_id}
296+ {}
297+
298+ ~WorkerThread()
299+ {
300+ worker.exit();
301+ if (thread.joinable())
302+ thread.join();
303+ }
304+
305+ void queue_task(std::packaged_task<void()> task, mt::BasicThreadPool::TaskId the_id)
306+ {
307+ worker.queue_task(std::move(task));
308+ id_ = the_id;
309+ }
310+
311+ bool is_idle() const
312+ {
313+ return worker.is_idle();
314+ }
315+
316+ mt::BasicThreadPool::TaskId current_id() const
317+ {
318+ return id_;
319+ }
320+
321+private:
322+ ::Worker worker;
323+ std::thread thread;
324+ mt::BasicThreadPool::TaskId id_;
325+};
326+}
327+}
328+
329+mt::BasicThreadPool::BasicThreadPool(int min_threads)
330+ : min_threads{min_threads}
331+{
332+}
333+
334+mt::BasicThreadPool::~BasicThreadPool() = default;
335+
336+std::future<void> mt::BasicThreadPool::run(std::function<void()> const& task)
337+{
338+ std::lock_guard<decltype(mutex)> lock{mutex};
339+
340+ TaskId const generic_id = nullptr;
341+ WorkerThread* no_preferred_thread = nullptr;
342+ return run(no_preferred_thread, task, generic_id);
343+}
344+
345+std::future<void> mt::BasicThreadPool::run(std::function<void()> const& task, TaskId id)
346+{
347+ std::lock_guard<decltype(mutex)> lock{mutex};
348+ return run(find_thread_by(id), task, id);
349+}
350+
351+std::future<void> mt::BasicThreadPool::run(WorkerThread* worker_thread,
352+ std::function<void()> const& task, TaskId id)
353+{
354+ // No pre-selected thread to execute task, find an idle thread
355+ if (worker_thread == nullptr)
356+ worker_thread = find_idle_thread();
357+
358+ if (worker_thread == nullptr)
359+ {
360+ // No idle threads available so create a new one
361+ std::unique_ptr<WorkerThread> new_worker_thread{new WorkerThread{id}};
362+ threads.push_back(std::move(new_worker_thread));
363+ worker_thread = threads.back().get();
364+ }
365+
366+ std::packaged_task<void()> a_task{task};
367+ auto future = a_task.get_future();
368+ worker_thread->queue_task(std::move(a_task), id);
369+ return future;
370+}
371+
372+
373+void mt::BasicThreadPool::shrink()
374+{
375+ std::lock_guard<decltype(mutex)> lock{mutex};
376+
377+ int max_threads_to_remove = threads.size() - min_threads;
378+ auto it = std::remove_if(threads.begin(), threads.end(),
379+ [&max_threads_to_remove](std::unique_ptr<WorkerThread> const& worker_thread)
380+ {
381+ bool remove = worker_thread->is_idle() && max_threads_to_remove > 0;
382+ if (remove)
383+ max_threads_to_remove--;
384+ return remove;
385+ }
386+ );
387+ threads.erase(it, threads.end());
388+}
389+
390+mt::WorkerThread* mt::BasicThreadPool::find_thread_by(TaskId id)
391+{
392+ auto it = std::find_if(threads.begin(), threads.end(),
393+ [id](std::unique_ptr<WorkerThread> const& worker_thread)
394+ {
395+ return worker_thread->current_id() == id;
396+ }
397+ );
398+
399+ return it == threads.end() ? nullptr : it->get();
400+}
401+
402+mt::WorkerThread *mt::BasicThreadPool::find_idle_thread()
403+{
404+ auto it = std::find_if(threads.begin(), threads.end(),
405+ [](std::unique_ptr<WorkerThread> const& worker_thread)
406+ {
407+ return worker_thread->is_idle();
408+ }
409+ );
410+
411+ return it == threads.end() ? nullptr : it->get();
412+}
413
414=== modified file 'tests/unit-tests/CMakeLists.txt'
415--- tests/unit-tests/CMakeLists.txt 2014-09-05 03:37:19 +0000
416+++ tests/unit-tests/CMakeLists.txt 2014-09-06 14:29:21 +0000
417@@ -36,6 +36,7 @@
418 add_subdirectory(scene/)
419 add_subdirectory(draw/)
420 add_subdirectory(examples/)
421+add_subdirectory(thread/)
422
423 link_directories(${LIBRARY_OUTPUT_PATH})
424
425
426=== added directory 'tests/unit-tests/thread'
427=== added file 'tests/unit-tests/thread/CMakeLists.txt'
428--- tests/unit-tests/thread/CMakeLists.txt 1970-01-01 00:00:00 +0000
429+++ tests/unit-tests/thread/CMakeLists.txt 2014-09-06 14:29:21 +0000
430@@ -0,0 +1,5 @@
431+list(APPEND UNIT_TEST_SOURCES
432+ ${CMAKE_CURRENT_SOURCE_DIR}/test_basic_thread_pool.cpp
433+)
434+
435+set(UNIT_TEST_SOURCES ${UNIT_TEST_SOURCES} PARENT_SCOPE)
436
437=== added file 'tests/unit-tests/thread/test_basic_thread_pool.cpp'
438--- tests/unit-tests/thread/test_basic_thread_pool.cpp 1970-01-01 00:00:00 +0000
439+++ tests/unit-tests/thread/test_basic_thread_pool.cpp 2014-09-06 14:29:21 +0000
440@@ -0,0 +1,221 @@
441+/*
442+ * Copyright © 2014 Canonical Ltd.
443+ *
444+ * This program is free software: you can redistribute it and/or modify
445+ * it under the terms of the GNU General Public License version 3 as
446+ * published by the Free Software Foundation.
447+ *
448+ * This program is distributed in the hope that it will be useful,
449+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
450+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
451+ * GNU General Public License for more details.
452+ *
453+ * You should have received a copy of the GNU General Public License
454+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
455+ *
456+ * Authored by: Alberto Aguirre <alberto.aguirre@canonical.com>
457+ */
458+
459+#include "mir/thread/basic_thread_pool.h"
460+
461+#include "mir/thread_name.h"
462+#include "mir_test/current_thread_name.h"
463+#include "mir_test/signal.h"
464+
465+#include <memory>
466+
467+#include <gmock/gmock.h>
468+#include <gtest/gtest.h>
469+
470+namespace mt = mir::test;
471+namespace mth = mir::thread;
472+
473+namespace
474+{
475+class BasicThreadPool : public testing::Test
476+{
477+public:
478+ BasicThreadPool()
479+ : default_task_id{nullptr},
480+ default_num_threads{3},
481+ expected_name{"test_thread"}
482+ {
483+ }
484+protected:
485+ mth::BasicThreadPool::TaskId default_task_id;
486+ int default_num_threads;
487+ std::string expected_name;
488+};
489+
490+class TestTask
491+{
492+public:
493+ TestTask()
494+ : TestTask("")
495+ {
496+ }
497+
498+ TestTask(std::string name)
499+ : called{false},
500+ block{false},
501+ name{name}
502+ {
503+ }
504+
505+ ~TestTask()
506+ {
507+ unblock();
508+ }
509+
510+ void operator()() noexcept
511+ {
512+ called = true;
513+ if (!name.empty())
514+ mir::set_thread_name(name);
515+ else
516+ name = mt::current_thread_name();
517+
518+ if (block)
519+ signal.wait();
520+ }
521+
522+ std::string thread_name() const
523+ {
524+ return name;
525+ }
526+
527+ bool was_called() const
528+ {
529+ return called;
530+ }
531+
532+ void block_on_execution()
533+ {
534+ block = true;
535+ }
536+
537+ void unblock()
538+ {
539+ signal.raise();
540+ }
541+
542+private:
543+ bool called;
544+ bool block;
545+ std::string name;
546+ mt::Signal signal;
547+};
548+}
549+
550+TEST_F(BasicThreadPool, executes_given_functor)
551+{
552+ using namespace testing;
553+ mth::BasicThreadPool p{default_num_threads};
554+
555+ TestTask task;
556+ auto future = p.run(std::ref(task));
557+ future.wait();
558+
559+ EXPECT_TRUE(task.was_called());
560+}
561+
562+TEST_F(BasicThreadPool, executes_on_preferred_thread)
563+{
564+ using namespace testing;
565+ mth::BasicThreadPool p{default_num_threads};
566+
567+ TestTask task1{expected_name};
568+ task1.block_on_execution();
569+ auto future1 = p.run(std::ref(task1), default_task_id);
570+
571+ // This task should be queued to the thread associated with the given id
572+ // even when its busy
573+ TestTask task2;
574+ auto future2 = p.run(std::ref(task2), default_task_id);
575+
576+ task1.unblock();
577+ future1.wait();
578+ future2.wait();
579+
580+ // Check if the second task executed on the same thread as the first task
581+ EXPECT_TRUE(task1.was_called());
582+ EXPECT_TRUE(task2.was_called());
583+ EXPECT_THAT(task2.thread_name(), Eq(expected_name));
584+}
585+
586+TEST_F(BasicThreadPool, recycles_threads)
587+{
588+ using namespace testing;
589+ mth::BasicThreadPool p{2};
590+
591+ std::string const thread1_name = "thread1";
592+ std::string const thread2_name = "thread2";
593+
594+ //Create a couple of blocking tasks, so that we force the pool
595+ //to assign new threads to each task
596+ TestTask task1{thread1_name};
597+ task1.block_on_execution();
598+ auto future1 = p.run(std::ref(task1));
599+
600+ TestTask task2{thread2_name};
601+ task2.block_on_execution();
602+ auto future2 = p.run(std::ref(task2));
603+
604+ task1.unblock();
605+ task2.unblock();
606+ future1.wait();
607+ future2.wait();
608+
609+ // Now we should have some idle threads, the next task should
610+ // run in any of the two previously created threads
611+ TestTask task3;
612+ auto future = p.run(std::ref(task3));
613+ future.wait();
614+
615+ EXPECT_TRUE(task1.was_called());
616+ EXPECT_TRUE(task2.was_called());
617+ EXPECT_TRUE(task3.was_called());
618+ EXPECT_THAT(task3.thread_name(), AnyOf(Eq(thread1_name), Eq(thread2_name)));
619+}
620+
621+TEST_F(BasicThreadPool, creates_new_threads)
622+{
623+ using namespace testing;
624+ mth::BasicThreadPool p{1};
625+
626+ TestTask task1{expected_name};
627+ task1.block_on_execution();
628+ auto future1 = p.run(std::ref(task1));
629+
630+ TestTask task2;
631+ auto future2 = p.run(std::ref(task2));
632+
633+ task1.unblock();
634+ future1.wait();
635+ future2.wait();
636+
637+ EXPECT_TRUE(task1.was_called());
638+ EXPECT_TRUE(task2.was_called());
639+ EXPECT_THAT(task2.thread_name(), Ne(expected_name));
640+}
641+
642+TEST_F(BasicThreadPool, can_shrink)
643+{
644+ using namespace testing;
645+ mth::BasicThreadPool p{0};
646+
647+ TestTask task1{expected_name};
648+ auto future = p.run(std::ref(task1));
649+ future.wait();
650+
651+ // This should delete all threads since we specified a minimum of 0 threads for our pool
652+ p.shrink();
653+
654+ TestTask task2;
655+ future = p.run(std::ref(task2));
656+ future.wait();
657+
658+ EXPECT_TRUE(task1.was_called());
659+ EXPECT_TRUE(task2.was_called());
660+ EXPECT_THAT(task2.thread_name(), Ne(expected_name));
661+}

Subscribers

People subscribed via source and target branches