Mir

Merge lp:~albaguirre/mir/recycle-compositor-threads-take2 into lp:mir

Proposed by Alberto Aguirre
Status: Merged
Approved by: Alberto Aguirre
Approved revision: no longer in the source branch.
Merged at revision: 1897
Proposed branch: lp:~albaguirre/mir/recycle-compositor-threads-take2
Merge into: lp:mir
Diff against target: 661 lines (+538/-8)
9 files modified
include.private/server/mir/thread/basic_thread_pool.h (+63/-0)
src/server/CMakeLists.txt (+2/-0)
src/server/compositor/multi_threaded_compositor.cpp (+8/-5)
src/server/compositor/multi_threaded_compositor.h (+5/-3)
src/server/thread/CMakeLists.txt (+12/-0)
src/server/thread/basic_thread_pool.cpp (+221/-0)
tests/unit-tests/CMakeLists.txt (+1/-0)
tests/unit-tests/thread/CMakeLists.txt (+5/-0)
tests/unit-tests/thread/test_basic_thread_pool.cpp (+221/-0)
To merge this branch: bzr merge lp:~albaguirre/mir/recycle-compositor-threads-take2
Reviewer Review Type Date Requested Status
PS Jenkins bot (community) continuous-integration Approve
Alexandros Frantzis (community) Approve
Review via email: mp+233609@code.launchpad.net

Commit message

Recycle compositor threads by using a thread pool (LP: #1362841)

Description of the change

Recycle compositor threads by using a thread pool

Adds a basic thread pool placed under server instead of common as I wanted to use mir::terminate_with_current_exception() and currently in use by server code only.

To post a comment you must log in.
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
Revision history for this message
Alexandros Frantzis (afrantzis) wrote :

Looks good.

review: Approve
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
Revision history for this message
Alberto Aguirre (albaguirre) wrote :

^--DemoPrivateProtobuf.client_calls_server had an exception....seems unrelated. Re-approving...

Revision history for this message
PS Jenkins bot (ps-jenkins) :
review: Approve (continuous-integration)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== added directory 'include.private/server/mir/thread'
=== added file 'include.private/server/mir/thread/basic_thread_pool.h'
--- include.private/server/mir/thread/basic_thread_pool.h 1970-01-01 00:00:00 +0000
+++ include.private/server/mir/thread/basic_thread_pool.h 2014-09-06 14:29:21 +0000
@@ -0,0 +1,63 @@
1/*
2 * Copyright © 2014 Canonical Ltd.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License version 3 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU Lesser General Public License for more details.
12 *
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 * Authored by: Alberto Aguirre <alberto.aguirre@canonical.com>
17 */
18
19#ifndef MIR_THREAD_BASIC_THREAD_POOL_H_
20#define MIR_THREAD_BASIC_THREAD_POOL_H_
21
22#include <functional>
23#include <future>
24#include <vector>
25#include <memory>
26#include <mutex>
27
28namespace mir
29{
30namespace thread
31{
32
33class WorkerThread;
34class BasicThreadPool
35{
36public:
37 BasicThreadPool(int min_threads);
38 ~BasicThreadPool();
39
40 std::future<void> run(std::function<void()> const& task);
41
42 typedef void const* TaskId;
43 std::future<void> run(std::function<void()> const& task, TaskId id);
44
45 void shrink();
46
47private:
48 BasicThreadPool(BasicThreadPool const&) = delete;
49 BasicThreadPool& operator=(BasicThreadPool const&) = delete;
50
51 std::future<void> run(WorkerThread* t, std::function<void()> const& task, TaskId id);
52 WorkerThread *find_thread_by(TaskId id);
53 WorkerThread *find_idle_thread();
54
55 std::mutex mutex;
56 int const min_threads;
57 std::vector<std::unique_ptr<WorkerThread>> threads;
58};
59
60}
61}
62
63#endif
064
=== modified file 'src/server/CMakeLists.txt'
--- src/server/CMakeLists.txt 2014-09-05 03:37:19 +0000
+++ src/server/CMakeLists.txt 2014-09-06 14:29:21 +0000
@@ -13,6 +13,7 @@
13add_subdirectory(scene/)13add_subdirectory(scene/)
14add_subdirectory(frontend/)14add_subdirectory(frontend/)
15add_subdirectory(shell/)15add_subdirectory(shell/)
16add_subdirectory(thread/)
1617
17set(PREFIX "${CMAKE_INSTALL_PREFIX}")18set(PREFIX "${CMAKE_INSTALL_PREFIX}")
18set(EXEC_PREFIX "${CMAKE_INSTALL_PREFIX}")19set(EXEC_PREFIX "${CMAKE_INSTALL_PREFIX}")
@@ -48,6 +49,7 @@
48 $<TARGET_OBJECTS:mirlogger>49 $<TARGET_OBJECTS:mirlogger>
49 $<TARGET_OBJECTS:mirnestedgraphics>50 $<TARGET_OBJECTS:mirnestedgraphics>
50 $<TARGET_OBJECTS:miroffscreengraphics>51 $<TARGET_OBJECTS:miroffscreengraphics>
52 $<TARGET_OBJECTS:mirthread>
51)53)
5254
53set(MIR_SERVER_REFERENCES55set(MIR_SERVER_REFERENCES
5456
=== modified file 'src/server/compositor/multi_threaded_compositor.cpp'
--- src/server/compositor/multi_threaded_compositor.cpp 2014-09-05 03:37:19 +0000
+++ src/server/compositor/multi_threaded_compositor.cpp 2014-09-06 14:29:21 +0000
@@ -195,7 +195,8 @@
195 display_buffer_compositor_factory{db_compositor_factory},195 display_buffer_compositor_factory{db_compositor_factory},
196 report{compositor_report},196 report{compositor_report},
197 state{CompositorState::stopped},197 state{CompositorState::stopped},
198 compose_on_start{compose_on_start}198 compose_on_start{compose_on_start},
199 thread_pool{1}
199{200{
200 observer = std::make_shared<ms::LegacySceneChangeNotification>(201 observer = std::make_shared<ms::LegacySceneChangeNotification>(
201 [this]()202 [this]()
@@ -289,10 +290,12 @@
289 auto thread_functor_raw = new mc::CompositingFunctor{display_buffer_compositor_factory, buffer, report};290 auto thread_functor_raw = new mc::CompositingFunctor{display_buffer_compositor_factory, buffer, report};
290 auto thread_functor = std::unique_ptr<mc::CompositingFunctor>(thread_functor_raw);291 auto thread_functor = std::unique_ptr<mc::CompositingFunctor>(thread_functor_raw);
291292
292 threads.push_back(std::thread{std::ref(*thread_functor)});293 futures.push_back(thread_pool.run(std::ref(*thread_functor), &buffer));
293 thread_functors.push_back(std::move(thread_functor));294 thread_functors.push_back(std::move(thread_functor));
294 });295 });
295296
297 thread_pool.shrink();
298
296 state = CompositorState::started;299 state = CompositorState::started;
297}300}
298301
@@ -307,11 +310,11 @@
307 for (auto& f : thread_functors)310 for (auto& f : thread_functors)
308 f->stop();311 f->stop();
309312
310 for (auto& t : threads)313 for (auto& f : futures)
311 t.join();314 f.wait();
312315
313 thread_functors.clear();316 thread_functors.clear();
314 threads.clear();317 futures.clear();
315318
316 report->stopped();319 report->stopped();
317320
318321
=== modified file 'src/server/compositor/multi_threaded_compositor.h'
--- src/server/compositor/multi_threaded_compositor.h 2014-09-05 03:37:19 +0000
+++ src/server/compositor/multi_threaded_compositor.h 2014-09-06 14:29:21 +0000
@@ -20,11 +20,12 @@
20#define MIR_COMPOSITOR_MULTI_THREADED_COMPOSITOR_H_20#define MIR_COMPOSITOR_MULTI_THREADED_COMPOSITOR_H_
2121
22#include "mir/compositor/compositor.h"22#include "mir/compositor/compositor.h"
23#include "mir/thread/basic_thread_pool.h"
2324
24#include <mutex>25#include <mutex>
25#include <memory>26#include <memory>
26#include <vector>27#include <vector>
27#include <thread>28#include <future>
2829
29namespace mir30namespace mir
30{31{
@@ -76,15 +77,16 @@
76 std::shared_ptr<CompositorReport> const report;77 std::shared_ptr<CompositorReport> const report;
7778
78 std::vector<std::unique_ptr<CompositingFunctor>> thread_functors;79 std::vector<std::unique_ptr<CompositingFunctor>> thread_functors;
79 std::vector<std::thread> threads;80 std::vector<std::future<void>> futures;
8081
81 std::mutex state_guard;82 std::mutex state_guard;
82 CompositorState state;83 CompositorState state;
83 bool compose_on_start;84 bool compose_on_start;
8485
85 void schedule_compositing(int number_composites);86 void schedule_compositing(int number_composites);
86 87
87 std::shared_ptr<mir::scene::Observer> observer;88 std::shared_ptr<mir::scene::Observer> observer;
89 mir::thread::BasicThreadPool thread_pool;
88};90};
8991
90}92}
9193
=== added directory 'src/server/thread'
=== added file 'src/server/thread/CMakeLists.txt'
--- src/server/thread/CMakeLists.txt 1970-01-01 00:00:00 +0000
+++ src/server/thread/CMakeLists.txt 2014-09-06 14:29:21 +0000
@@ -0,0 +1,12 @@
1set(
2 MIR_THREAD_SRCS
3
4 basic_thread_pool.cpp
5)
6
7ADD_LIBRARY(
8 mirthread OBJECT
9
10 ${MIR_THREAD_SRCS}
11)
12
013
=== added file 'src/server/thread/basic_thread_pool.cpp'
--- src/server/thread/basic_thread_pool.cpp 1970-01-01 00:00:00 +0000
+++ src/server/thread/basic_thread_pool.cpp 2014-09-06 14:29:21 +0000
@@ -0,0 +1,221 @@
1/*
2 * Copyright © 2014 Canonical Ltd.
3 *
4 * This program is free software: you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 3,
6 * as published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 * Authored by: Alberto Aguirre <alberto.aguirre@canonical.com>
17 */
18
19#include "mir/thread/basic_thread_pool.h"
20#include <mir/run_mir.h>
21
22#include <deque>
23#include <algorithm>
24#include <condition_variable>
25
26namespace mt = mir::thread;
27
28namespace
29{
30
31class Worker
32{
33public:
34 Worker() : exiting{false}
35 {
36 }
37
38 ~Worker()
39 {
40 exit();
41 }
42
43 void operator()() noexcept
44 try
45 {
46 std::unique_lock<std::mutex> lock{state_mutex};
47 while (!exiting)
48 {
49 task_available_cv.wait(lock, [&]{ return exiting || !tasks.empty(); });
50
51 if (!exiting)
52 {
53 auto& task = tasks.front();
54 lock.unlock();
55 task();
56 lock.lock();
57 tasks.pop_front();
58 }
59 }
60 }
61 catch(...)
62 {
63 mir::terminate_with_current_exception();
64 }
65
66 void queue_task(std::packaged_task<void()> task)
67 {
68 std::lock_guard<std::mutex> lock{state_mutex};
69 tasks.push_back(std::move(task));
70 task_available_cv.notify_one();
71 }
72
73 void exit()
74 {
75 std::lock_guard<std::mutex> lock{state_mutex};
76 exiting = true;
77 task_available_cv.notify_one();
78 }
79
80 bool is_idle() const
81 {
82 std::lock_guard<std::mutex> lock{state_mutex};
83 return tasks.empty();
84 }
85
86private:
87 std::deque<std::packaged_task<void()>> tasks;
88 bool exiting;
89 std::mutex mutable state_mutex;
90 std::condition_variable task_available_cv;
91};
92
93}
94
95namespace mir
96{
97namespace thread
98{
99class WorkerThread
100{
101public:
102 WorkerThread(mt::BasicThreadPool::TaskId an_id)
103 : thread{std::ref(worker)},
104 id_{an_id}
105 {}
106
107 ~WorkerThread()
108 {
109 worker.exit();
110 if (thread.joinable())
111 thread.join();
112 }
113
114 void queue_task(std::packaged_task<void()> task, mt::BasicThreadPool::TaskId the_id)
115 {
116 worker.queue_task(std::move(task));
117 id_ = the_id;
118 }
119
120 bool is_idle() const
121 {
122 return worker.is_idle();
123 }
124
125 mt::BasicThreadPool::TaskId current_id() const
126 {
127 return id_;
128 }
129
130private:
131 ::Worker worker;
132 std::thread thread;
133 mt::BasicThreadPool::TaskId id_;
134};
135}
136}
137
138mt::BasicThreadPool::BasicThreadPool(int min_threads)
139 : min_threads{min_threads}
140{
141}
142
143mt::BasicThreadPool::~BasicThreadPool() = default;
144
145std::future<void> mt::BasicThreadPool::run(std::function<void()> const& task)
146{
147 std::lock_guard<decltype(mutex)> lock{mutex};
148
149 TaskId const generic_id = nullptr;
150 WorkerThread* no_preferred_thread = nullptr;
151 return run(no_preferred_thread, task, generic_id);
152}
153
154std::future<void> mt::BasicThreadPool::run(std::function<void()> const& task, TaskId id)
155{
156 std::lock_guard<decltype(mutex)> lock{mutex};
157 return run(find_thread_by(id), task, id);
158}
159
160std::future<void> mt::BasicThreadPool::run(WorkerThread* worker_thread,
161 std::function<void()> const& task, TaskId id)
162{
163 // No pre-selected thread to execute task, find an idle thread
164 if (worker_thread == nullptr)
165 worker_thread = find_idle_thread();
166
167 if (worker_thread == nullptr)
168 {
169 // No idle threads available so create a new one
170 std::unique_ptr<WorkerThread> new_worker_thread{new WorkerThread{id}};
171 threads.push_back(std::move(new_worker_thread));
172 worker_thread = threads.back().get();
173 }
174
175 std::packaged_task<void()> a_task{task};
176 auto future = a_task.get_future();
177 worker_thread->queue_task(std::move(a_task), id);
178 return future;
179}
180
181
182void mt::BasicThreadPool::shrink()
183{
184 std::lock_guard<decltype(mutex)> lock{mutex};
185
186 int max_threads_to_remove = threads.size() - min_threads;
187 auto it = std::remove_if(threads.begin(), threads.end(),
188 [&max_threads_to_remove](std::unique_ptr<WorkerThread> const& worker_thread)
189 {
190 bool remove = worker_thread->is_idle() && max_threads_to_remove > 0;
191 if (remove)
192 max_threads_to_remove--;
193 return remove;
194 }
195 );
196 threads.erase(it, threads.end());
197}
198
199mt::WorkerThread* mt::BasicThreadPool::find_thread_by(TaskId id)
200{
201 auto it = std::find_if(threads.begin(), threads.end(),
202 [id](std::unique_ptr<WorkerThread> const& worker_thread)
203 {
204 return worker_thread->current_id() == id;
205 }
206 );
207
208 return it == threads.end() ? nullptr : it->get();
209}
210
211mt::WorkerThread *mt::BasicThreadPool::find_idle_thread()
212{
213 auto it = std::find_if(threads.begin(), threads.end(),
214 [](std::unique_ptr<WorkerThread> const& worker_thread)
215 {
216 return worker_thread->is_idle();
217 }
218 );
219
220 return it == threads.end() ? nullptr : it->get();
221}
0222
=== modified file 'tests/unit-tests/CMakeLists.txt'
--- tests/unit-tests/CMakeLists.txt 2014-09-05 03:37:19 +0000
+++ tests/unit-tests/CMakeLists.txt 2014-09-06 14:29:21 +0000
@@ -36,6 +36,7 @@
36add_subdirectory(scene/)36add_subdirectory(scene/)
37add_subdirectory(draw/)37add_subdirectory(draw/)
38add_subdirectory(examples/)38add_subdirectory(examples/)
39add_subdirectory(thread/)
3940
40link_directories(${LIBRARY_OUTPUT_PATH})41link_directories(${LIBRARY_OUTPUT_PATH})
4142
4243
=== added directory 'tests/unit-tests/thread'
=== added file 'tests/unit-tests/thread/CMakeLists.txt'
--- tests/unit-tests/thread/CMakeLists.txt 1970-01-01 00:00:00 +0000
+++ tests/unit-tests/thread/CMakeLists.txt 2014-09-06 14:29:21 +0000
@@ -0,0 +1,5 @@
1list(APPEND UNIT_TEST_SOURCES
2 ${CMAKE_CURRENT_SOURCE_DIR}/test_basic_thread_pool.cpp
3)
4
5set(UNIT_TEST_SOURCES ${UNIT_TEST_SOURCES} PARENT_SCOPE)
06
=== added file 'tests/unit-tests/thread/test_basic_thread_pool.cpp'
--- tests/unit-tests/thread/test_basic_thread_pool.cpp 1970-01-01 00:00:00 +0000
+++ tests/unit-tests/thread/test_basic_thread_pool.cpp 2014-09-06 14:29:21 +0000
@@ -0,0 +1,221 @@
1/*
2 * Copyright © 2014 Canonical Ltd.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License version 3 as
6 * published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 * Authored by: Alberto Aguirre <alberto.aguirre@canonical.com>
17 */
18
19#include "mir/thread/basic_thread_pool.h"
20
21#include "mir/thread_name.h"
22#include "mir_test/current_thread_name.h"
23#include "mir_test/signal.h"
24
25#include <memory>
26
27#include <gmock/gmock.h>
28#include <gtest/gtest.h>
29
30namespace mt = mir::test;
31namespace mth = mir::thread;
32
33namespace
34{
35class BasicThreadPool : public testing::Test
36{
37public:
38 BasicThreadPool()
39 : default_task_id{nullptr},
40 default_num_threads{3},
41 expected_name{"test_thread"}
42 {
43 }
44protected:
45 mth::BasicThreadPool::TaskId default_task_id;
46 int default_num_threads;
47 std::string expected_name;
48};
49
50class TestTask
51{
52public:
53 TestTask()
54 : TestTask("")
55 {
56 }
57
58 TestTask(std::string name)
59 : called{false},
60 block{false},
61 name{name}
62 {
63 }
64
65 ~TestTask()
66 {
67 unblock();
68 }
69
70 void operator()() noexcept
71 {
72 called = true;
73 if (!name.empty())
74 mir::set_thread_name(name);
75 else
76 name = mt::current_thread_name();
77
78 if (block)
79 signal.wait();
80 }
81
82 std::string thread_name() const
83 {
84 return name;
85 }
86
87 bool was_called() const
88 {
89 return called;
90 }
91
92 void block_on_execution()
93 {
94 block = true;
95 }
96
97 void unblock()
98 {
99 signal.raise();
100 }
101
102private:
103 bool called;
104 bool block;
105 std::string name;
106 mt::Signal signal;
107};
108}
109
110TEST_F(BasicThreadPool, executes_given_functor)
111{
112 using namespace testing;
113 mth::BasicThreadPool p{default_num_threads};
114
115 TestTask task;
116 auto future = p.run(std::ref(task));
117 future.wait();
118
119 EXPECT_TRUE(task.was_called());
120}
121
122TEST_F(BasicThreadPool, executes_on_preferred_thread)
123{
124 using namespace testing;
125 mth::BasicThreadPool p{default_num_threads};
126
127 TestTask task1{expected_name};
128 task1.block_on_execution();
129 auto future1 = p.run(std::ref(task1), default_task_id);
130
131 // This task should be queued to the thread associated with the given id
132 // even when its busy
133 TestTask task2;
134 auto future2 = p.run(std::ref(task2), default_task_id);
135
136 task1.unblock();
137 future1.wait();
138 future2.wait();
139
140 // Check if the second task executed on the same thread as the first task
141 EXPECT_TRUE(task1.was_called());
142 EXPECT_TRUE(task2.was_called());
143 EXPECT_THAT(task2.thread_name(), Eq(expected_name));
144}
145
146TEST_F(BasicThreadPool, recycles_threads)
147{
148 using namespace testing;
149 mth::BasicThreadPool p{2};
150
151 std::string const thread1_name = "thread1";
152 std::string const thread2_name = "thread2";
153
154 //Create a couple of blocking tasks, so that we force the pool
155 //to assign new threads to each task
156 TestTask task1{thread1_name};
157 task1.block_on_execution();
158 auto future1 = p.run(std::ref(task1));
159
160 TestTask task2{thread2_name};
161 task2.block_on_execution();
162 auto future2 = p.run(std::ref(task2));
163
164 task1.unblock();
165 task2.unblock();
166 future1.wait();
167 future2.wait();
168
169 // Now we should have some idle threads, the next task should
170 // run in any of the two previously created threads
171 TestTask task3;
172 auto future = p.run(std::ref(task3));
173 future.wait();
174
175 EXPECT_TRUE(task1.was_called());
176 EXPECT_TRUE(task2.was_called());
177 EXPECT_TRUE(task3.was_called());
178 EXPECT_THAT(task3.thread_name(), AnyOf(Eq(thread1_name), Eq(thread2_name)));
179}
180
181TEST_F(BasicThreadPool, creates_new_threads)
182{
183 using namespace testing;
184 mth::BasicThreadPool p{1};
185
186 TestTask task1{expected_name};
187 task1.block_on_execution();
188 auto future1 = p.run(std::ref(task1));
189
190 TestTask task2;
191 auto future2 = p.run(std::ref(task2));
192
193 task1.unblock();
194 future1.wait();
195 future2.wait();
196
197 EXPECT_TRUE(task1.was_called());
198 EXPECT_TRUE(task2.was_called());
199 EXPECT_THAT(task2.thread_name(), Ne(expected_name));
200}
201
202TEST_F(BasicThreadPool, can_shrink)
203{
204 using namespace testing;
205 mth::BasicThreadPool p{0};
206
207 TestTask task1{expected_name};
208 auto future = p.run(std::ref(task1));
209 future.wait();
210
211 // This should delete all threads since we specified a minimum of 0 threads for our pool
212 p.shrink();
213
214 TestTask task2;
215 future = p.run(std::ref(task2));
216 future.wait();
217
218 EXPECT_TRUE(task1.was_called());
219 EXPECT_TRUE(task2.was_called());
220 EXPECT_THAT(task2.thread_name(), Ne(expected_name));
221}

Subscribers

People subscribed via source and target branches