Merge lp:~albaguirre/unity-system-compositor/add-worker-thread into lp:unity-system-compositor

Proposed by Alberto Aguirre
Status: Merged
Approved by: Alberto Aguirre
Approved revision: 175
Merged at revision: 174
Proposed branch: lp:~albaguirre/unity-system-compositor/add-worker-thread
Merge into: lp:unity-system-compositor
Diff against target: 387 lines (+253/-20)
5 files modified
src/CMakeLists.txt (+1/-0)
src/dbus_screen.cpp (+49/-20)
src/dbus_screen.h (+7/-0)
src/worker_thread.cpp (+141/-0)
src/worker_thread.h (+55/-0)
To merge this branch: bzr merge lp:~albaguirre/unity-system-compositor/add-worker-thread
Reviewer Review Type Date Requested Status
Andreas Pokorny (community) Approve
PS Jenkins bot (community) continuous-integration Approve
Unity System Compositor Development Team Pending
Review via email: mp+234759@code.launchpad.net

Commit message

Add worker thread to handle incoming DBus requests.

The worker thread can optionally coalesce multiple queued tasks (LP: #1369790)
DBusScreen uses worker thread to serialize requests that were previously dispatched without any particular order (LP: #1368786)

Description of the change

Add worker thread to handle incoming DBus requests.

The worker thread can optionally coalesce multiple queued tasks (LP: #1369790)
DBusScreen uses worker thread to serialize requests that were previously dispatched without any particular order (LP: #1368786)

To post a comment you must log in.
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
175. By Alberto Aguirre

Small cleanup

Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
Revision history for this message
Andreas Pokorny (andreas-pokorny) wrote :

lgtm

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/CMakeLists.txt'
2--- src/CMakeLists.txt 2014-08-15 02:16:33 +0000
3+++ src/CMakeLists.txt 2014-09-16 15:04:17 +0000
4@@ -26,6 +26,7 @@
5 session_switcher.cpp
6 surface_coordinator.cpp
7 system_compositor.cpp
8+ worker_thread.cpp
9 )
10
11 qt5_generate_dbus_interface(dbus_screen.h com.canonical.Unity.Screen.xml)
12
13=== modified file 'src/dbus_screen.cpp'
14--- src/dbus_screen.cpp 2014-08-14 16:06:33 +0000
15+++ src/dbus_screen.cpp 2014-09-16 15:04:17 +0000
16@@ -18,6 +18,7 @@
17 #include "dbus_screen_adaptor.h"
18 #include "dbus_screen_observer.h"
19 #include "power_state_change_reason.h"
20+#include "worker_thread.h"
21
22 #include <atomic>
23 #include <memory>
24@@ -29,6 +30,8 @@
25 #include <QDBusServiceWatcher>
26 #include <QDebug>
27
28+namespace
29+{
30 bool is_valid_reason(int raw_reason)
31 {
32 auto reason = static_cast<PowerStateChangeReason>(raw_reason);
33@@ -43,12 +46,19 @@
34 return false;
35 }
36
37+enum DBusHandlerTaskId
38+{
39+ set_power_mode
40+};
41+}
42+
43
44 DBusScreen::DBusScreen(DBusScreenObserver& observer, QObject *parent)
45 : QObject(parent),
46 dbus_adaptor{new DBusScreenAdaptor(this)},
47 service_watcher{new QDBusServiceWatcher()},
48- observer{&observer}
49+ observer{&observer},
50+ worker_thread{new usc::WorkerThread("USC/DBusHandler")}
51 {
52 QDBusConnection bus = QDBusConnection::systemBus();
53 bus.registerObject("/com/canonical/Unity/Screen", this);
54@@ -90,9 +100,9 @@
55 }
56
57 //This call may block - avoid blocking this dbus handling thread
58- std::thread{[this, newPowerMode, reason]{
59+ worker_thread->queue_task([this, newPowerMode, reason]{
60 observer->set_screen_power_mode(newPowerMode, static_cast<PowerStateChangeReason>(reason));
61- }}.detach();
62+ }, DBusHandlerTaskId::set_power_mode);
63
64 return true;
65 }
66@@ -119,43 +129,62 @@
67 static std::atomic<uint32_t> request_id{0};
68
69 int id = request_id.fetch_add(1);
70-
71- //This call may block - avoid blocking this dbus handling thread
72- std::thread{[this]{observer->keep_display_on(true);}}.detach();
73-
74 auto const& caller = message().service();
75- auto& caller_requests = display_requests[caller.toStdString()];
76-
77- if (caller_requests.size() == 0)
78- service_watcher->addWatchedService(caller);
79-
80- caller_requests.insert(id);
81- std::cerr << "keepDisplayOn request id:" << id;
82- std::cerr << " requested by \"" << caller.toStdString() << "\"" << std::endl;
83+
84+ worker_thread->queue_task([this, id, caller]{
85+ std::lock_guard<decltype(guard)> lock{guard};
86+
87+ //Check that the owner of the request is still valid
88+ auto system_bus_if = QDBusConnection::systemBus().interface();
89+ QDBusReply<QString> reply = system_bus_if->serviceOwner(caller);
90+ if (!reply.isValid())
91+ return;
92+
93+ auto& caller_requests = display_requests[caller.toStdString()];
94+
95+ if (caller_requests.size() == 0)
96+ service_watcher->addWatchedService(caller);
97+
98+ caller_requests.insert(id);
99+
100+ //This call may block so it needs to be executed outside the thread
101+ //that received the dbus call
102+ observer->keep_display_on(true);
103+
104+ std::cout << "keepDisplayOn request id:" << id;
105+ std::cout << " requested by \"" << caller.toStdString() << "\"" << std::endl;
106+ });
107 return id;
108 }
109
110 void DBusScreen::removeDisplayOnRequest(int cookie)
111 {
112+ std::lock_guard<decltype(guard)> lock{guard};
113 auto const& requestor = message().service();
114
115 auto it = display_requests.find(requestor.toStdString());
116 if (it == display_requests.end())
117 return;
118
119- std::cerr << "removeDisplayOnRequest id:" << cookie;
120- std::cerr << " requested by \"" << requestor.toStdString() << "\"" << std::endl;
121+ std::cout << "removeDisplayOnRequest id:" << cookie;
122+ std::cout << " requested by \"" << requestor.toStdString() << "\"" << std::endl;
123
124 auto caller_requests = it->second;
125 caller_requests.erase(cookie);
126 if (caller_requests.size() == 0)
127- remove_display_on_requestor(requestor);
128+ remove_requestor(requestor, lock);
129 }
130
131 void DBusScreen::remove_display_on_requestor(QString const& requestor)
132 {
133- std::cerr << "remove_display_on_requestor \"" << requestor.toStdString() << "\"";
134- std::cerr << std::endl;
135+ std::lock_guard<decltype(guard)> lock{guard};
136+ remove_requestor(requestor, lock);
137+}
138+
139+void DBusScreen::remove_requestor(QString const& requestor, std::lock_guard<std::mutex> const&)
140+{
141+ std::cout << "remove_display_on_requestor \"" << requestor.toStdString() << "\"";
142+ std::cout << std::endl;
143
144 display_requests.erase(requestor.toStdString());
145 service_watcher->removeWatchedService(requestor);
146
147=== modified file 'src/dbus_screen.h'
148--- src/dbus_screen.h 2014-08-14 16:06:33 +0000
149+++ src/dbus_screen.h 2014-09-16 15:04:17 +0000
150@@ -20,6 +20,7 @@
151 #include <mir_toolkit/common.h>
152
153 #include <memory>
154+#include <mutex>
155 #include <unordered_map>
156 #include <unordered_set>
157
158@@ -27,6 +28,8 @@
159 #include <QtCore>
160 #include <QDBusContext>
161
162+namespace usc {class WorkerThread;}
163+
164 class DBusScreenAdaptor;
165 class DBusScreenObserver;
166 class QDBusInterface;
167@@ -59,10 +62,14 @@
168 void remove_display_on_requestor(QString const& requestor);
169
170 private:
171+ void remove_requestor(QString const& requestor, std::lock_guard<std::mutex> const& lock);
172+
173+ std::mutex guard;
174 std::unique_ptr<DBusScreenAdaptor> dbus_adaptor;
175 std::unique_ptr<QDBusServiceWatcher> service_watcher;
176 std::unordered_map<std::string, std::unordered_set<int>> display_requests;
177 DBusScreenObserver* const observer;
178+ std::unique_ptr<usc::WorkerThread> worker_thread;
179 };
180
181 #endif /* DBUS_SCREEN_H_ */
182
183=== added file 'src/worker_thread.cpp'
184--- src/worker_thread.cpp 1970-01-01 00:00:00 +0000
185+++ src/worker_thread.cpp 2014-09-16 15:04:17 +0000
186@@ -0,0 +1,141 @@
187+/*
188+ * Copyright © 2014 Canonical Ltd.
189+ *
190+ * This program is free software: you can redistribute it and/or modify it
191+ * under the terms of the GNU General Public License version 3,
192+ * as published by the Free Software Foundation.
193+ *
194+ * This program is distributed in the hope that it will be useful,
195+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
196+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
197+ * GNU General Public License for more details.
198+ *
199+ * You should have received a copy of the GNU General Public License
200+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
201+ *
202+ * Authored by: Alberto Aguirre <alberto.aguirre@canonical.com>
203+ */
204+
205+#include "worker_thread.h"
206+#include <mir/run_mir.h>
207+
208+#include <algorithm>
209+#include <condition_variable>
210+#include <deque>
211+#include <map>
212+
213+#ifndef _GNU_SOURCE
214+#define _GNU_SOURCE
215+#endif
216+#include <pthread.h>
217+
218+namespace usc
219+{
220+
221+class Worker
222+{
223+public:
224+ Worker(std::string name)
225+ : name{name.substr(0, 15)}, exiting{false}
226+ {
227+ }
228+
229+ ~Worker()
230+ {
231+ exit();
232+ }
233+
234+ void operator()() noexcept
235+ try
236+ {
237+ pthread_setname_np(pthread_self(), name.c_str());
238+
239+ std::unique_lock<std::mutex> lock{state_mutex};
240+ while (!exiting)
241+ {
242+ task_available_cv.wait(lock, [&]{ return exiting || !tasks.empty(); });
243+
244+ if (!exiting)
245+ {
246+ auto& task = tasks.front();
247+
248+ lock.unlock();
249+ task();
250+ lock.lock();
251+ tasks.pop_front();
252+ }
253+ }
254+ }
255+ catch(...)
256+ {
257+ mir::terminate_with_current_exception();
258+ }
259+
260+ void queue_task(std::function<void()> task)
261+ {
262+ std::lock_guard<std::mutex> lock{state_mutex};
263+ tasks.push_back(std::move(task));
264+ task_available_cv.notify_one();
265+ }
266+
267+ void queue_task(std::function<void()> task, int id)
268+ {
269+ std::lock_guard<std::mutex> lock{state_mutex};
270+ coalesced_tasks[id] = task;
271+ tasks.push_back([this, id]{ run_coalesced_task(id);});
272+ task_available_cv.notify_one();
273+ }
274+
275+ void run_coalesced_task(int id)
276+ {
277+ std::unique_lock<std::mutex> lock{state_mutex};
278+ auto it = coalesced_tasks.find(id);
279+ if (it != coalesced_tasks.end())
280+ {
281+ auto task = it->second;
282+ coalesced_tasks.erase(it);
283+ lock.unlock();
284+ task();
285+ }
286+ }
287+
288+ void exit()
289+ {
290+ std::lock_guard<std::mutex> lock{state_mutex};
291+ exiting = true;
292+ task_available_cv.notify_one();
293+ }
294+
295+private:
296+ std::mutex mutable state_mutex;
297+ std::string name;
298+ bool exiting;
299+ std::map<int, std::function<void()>> coalesced_tasks;
300+ std::deque<std::function<void()>> tasks;
301+ std::condition_variable task_available_cv;
302+};
303+
304+}
305+
306+usc::WorkerThread::WorkerThread(std::string name)
307+ : worker{new Worker(name)},
308+ thread{std::ref(*worker)}
309+{
310+}
311+
312+usc::WorkerThread::~WorkerThread()
313+{
314+ worker->exit();
315+ if (thread.joinable())
316+ thread.join();
317+}
318+
319+void usc::WorkerThread::queue_task(std::function<void()> task)
320+{
321+ worker->queue_task(std::move(task));
322+}
323+
324+void usc::WorkerThread::queue_task(std::function<void()> task, int id)
325+{
326+ worker->queue_task(std::move(task), id);
327+}
328
329=== added file 'src/worker_thread.h'
330--- src/worker_thread.h 1970-01-01 00:00:00 +0000
331+++ src/worker_thread.h 2014-09-16 15:04:17 +0000
332@@ -0,0 +1,55 @@
333+/*
334+ * Copyright © 2014 Canonical Ltd.
335+ *
336+ * This program is free software: you can redistribute it and/or modify it
337+ * under the terms of the GNU General Public License version 3,
338+ * as published by the Free Software Foundation.
339+ *
340+ * This program is distributed in the hope that it will be useful,
341+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
342+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
343+ * GNU General Public License for more details.
344+ *
345+ * You should have received a copy of the GNU General Public License
346+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
347+ *
348+ * Authored by: Alberto Aguirre <alberto.aguirre@canonical.com>
349+ */
350+
351+#ifndef USC_WORKER_THREAD_H_
352+#define USC_WORKER_THREAD_H_
353+
354+#include <memory>
355+#include <string>
356+#include <thread>
357+
358+namespace usc
359+{
360+class Worker;
361+class WorkerThread
362+{
363+public:
364+ WorkerThread(std::string name);
365+ ~WorkerThread();
366+
367+ /**
368+ * Queues a task to be executed on the worker thread
369+ */
370+ void queue_task(std::function<void()> task);
371+
372+ /**
373+ * Queues and coalesces tasks that share the same id
374+ */
375+ void queue_task(std::function<void()> task, int id);
376+
377+private:
378+ WorkerThread(WorkerThread const&) = delete;
379+ WorkerThread& operator=(WorkerThread const&) = delete;
380+
381+ std::unique_ptr<Worker> worker;
382+ std::thread thread;
383+};
384+
385+}
386+
387+#endif

Subscribers

People subscribed via source and target branches