Merge lp:~albaguirre/unity-system-compositor/add-worker-thread into lp:unity-system-compositor

Proposed by Alberto Aguirre
Status: Merged
Approved by: Alberto Aguirre
Approved revision: 175
Merged at revision: 174
Proposed branch: lp:~albaguirre/unity-system-compositor/add-worker-thread
Merge into: lp:unity-system-compositor
Diff against target: 387 lines (+253/-20)
5 files modified
src/CMakeLists.txt (+1/-0)
src/dbus_screen.cpp (+49/-20)
src/dbus_screen.h (+7/-0)
src/worker_thread.cpp (+141/-0)
src/worker_thread.h (+55/-0)
To merge this branch: bzr merge lp:~albaguirre/unity-system-compositor/add-worker-thread
Reviewer Review Type Date Requested Status
Andreas Pokorny (community) Approve
PS Jenkins bot (community) continuous-integration Approve
Unity System Compositor Development Team Pending
Review via email: mp+234759@code.launchpad.net

Commit message

Add worker thread to handle incoming DBus requests.

The worker thread can optionally coalesce multiple queued tasks (LP: #1369790)
DBusScreen uses worker thread to serialize requests that were previously dispatched without any particular order (LP: #1368786)

Description of the change

Add worker thread to handle incoming DBus requests.

The worker thread can optionally coalesce multiple queued tasks (LP: #1369790)
DBusScreen uses worker thread to serialize requests that were previously dispatched without any particular order (LP: #1368786)

To post a comment you must log in.
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
175. By Alberto Aguirre

Small cleanup

Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
Revision history for this message
Andreas Pokorny (andreas-pokorny) wrote :

lgtm

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'src/CMakeLists.txt'
--- src/CMakeLists.txt 2014-08-15 02:16:33 +0000
+++ src/CMakeLists.txt 2014-09-16 15:04:17 +0000
@@ -26,6 +26,7 @@
26 session_switcher.cpp26 session_switcher.cpp
27 surface_coordinator.cpp27 surface_coordinator.cpp
28 system_compositor.cpp28 system_compositor.cpp
29 worker_thread.cpp
29)30)
3031
31qt5_generate_dbus_interface(dbus_screen.h com.canonical.Unity.Screen.xml)32qt5_generate_dbus_interface(dbus_screen.h com.canonical.Unity.Screen.xml)
3233
=== modified file 'src/dbus_screen.cpp'
--- src/dbus_screen.cpp 2014-08-14 16:06:33 +0000
+++ src/dbus_screen.cpp 2014-09-16 15:04:17 +0000
@@ -18,6 +18,7 @@
18#include "dbus_screen_adaptor.h"18#include "dbus_screen_adaptor.h"
19#include "dbus_screen_observer.h"19#include "dbus_screen_observer.h"
20#include "power_state_change_reason.h"20#include "power_state_change_reason.h"
21#include "worker_thread.h"
2122
22#include <atomic>23#include <atomic>
23#include <memory>24#include <memory>
@@ -29,6 +30,8 @@
29#include <QDBusServiceWatcher>30#include <QDBusServiceWatcher>
30#include <QDebug>31#include <QDebug>
3132
33namespace
34{
32bool is_valid_reason(int raw_reason)35bool is_valid_reason(int raw_reason)
33{36{
34 auto reason = static_cast<PowerStateChangeReason>(raw_reason);37 auto reason = static_cast<PowerStateChangeReason>(raw_reason);
@@ -43,12 +46,19 @@
43 return false;46 return false;
44}47}
4548
49enum DBusHandlerTaskId
50{
51 set_power_mode
52};
53}
54
4655
47DBusScreen::DBusScreen(DBusScreenObserver& observer, QObject *parent)56DBusScreen::DBusScreen(DBusScreenObserver& observer, QObject *parent)
48 : QObject(parent),57 : QObject(parent),
49 dbus_adaptor{new DBusScreenAdaptor(this)},58 dbus_adaptor{new DBusScreenAdaptor(this)},
50 service_watcher{new QDBusServiceWatcher()},59 service_watcher{new QDBusServiceWatcher()},
51 observer{&observer}60 observer{&observer},
61 worker_thread{new usc::WorkerThread("USC/DBusHandler")}
52{62{
53 QDBusConnection bus = QDBusConnection::systemBus();63 QDBusConnection bus = QDBusConnection::systemBus();
54 bus.registerObject("/com/canonical/Unity/Screen", this);64 bus.registerObject("/com/canonical/Unity/Screen", this);
@@ -90,9 +100,9 @@
90 }100 }
91101
92 //This call may block - avoid blocking this dbus handling thread102 //This call may block - avoid blocking this dbus handling thread
93 std::thread{[this, newPowerMode, reason]{103 worker_thread->queue_task([this, newPowerMode, reason]{
94 observer->set_screen_power_mode(newPowerMode, static_cast<PowerStateChangeReason>(reason));104 observer->set_screen_power_mode(newPowerMode, static_cast<PowerStateChangeReason>(reason));
95 }}.detach();105 }, DBusHandlerTaskId::set_power_mode);
96106
97 return true;107 return true;
98}108}
@@ -119,43 +129,62 @@
119 static std::atomic<uint32_t> request_id{0};129 static std::atomic<uint32_t> request_id{0};
120130
121 int id = request_id.fetch_add(1);131 int id = request_id.fetch_add(1);
122
123 //This call may block - avoid blocking this dbus handling thread
124 std::thread{[this]{observer->keep_display_on(true);}}.detach();
125
126 auto const& caller = message().service();132 auto const& caller = message().service();
127 auto& caller_requests = display_requests[caller.toStdString()];133
128134 worker_thread->queue_task([this, id, caller]{
129 if (caller_requests.size() == 0)135 std::lock_guard<decltype(guard)> lock{guard};
130 service_watcher->addWatchedService(caller);136
131137 //Check that the owner of the request is still valid
132 caller_requests.insert(id);138 auto system_bus_if = QDBusConnection::systemBus().interface();
133 std::cerr << "keepDisplayOn request id:" << id;139 QDBusReply<QString> reply = system_bus_if->serviceOwner(caller);
134 std::cerr << " requested by \"" << caller.toStdString() << "\"" << std::endl;140 if (!reply.isValid())
141 return;
142
143 auto& caller_requests = display_requests[caller.toStdString()];
144
145 if (caller_requests.size() == 0)
146 service_watcher->addWatchedService(caller);
147
148 caller_requests.insert(id);
149
150 //This call may block so it needs to be executed outside the thread
151 //that received the dbus call
152 observer->keep_display_on(true);
153
154 std::cout << "keepDisplayOn request id:" << id;
155 std::cout << " requested by \"" << caller.toStdString() << "\"" << std::endl;
156 });
135 return id;157 return id;
136}158}
137159
138void DBusScreen::removeDisplayOnRequest(int cookie)160void DBusScreen::removeDisplayOnRequest(int cookie)
139{161{
162 std::lock_guard<decltype(guard)> lock{guard};
140 auto const& requestor = message().service();163 auto const& requestor = message().service();
141164
142 auto it = display_requests.find(requestor.toStdString());165 auto it = display_requests.find(requestor.toStdString());
143 if (it == display_requests.end())166 if (it == display_requests.end())
144 return;167 return;
145168
146 std::cerr << "removeDisplayOnRequest id:" << cookie;169 std::cout << "removeDisplayOnRequest id:" << cookie;
147 std::cerr << " requested by \"" << requestor.toStdString() << "\"" << std::endl;170 std::cout << " requested by \"" << requestor.toStdString() << "\"" << std::endl;
148171
149 auto caller_requests = it->second;172 auto caller_requests = it->second;
150 caller_requests.erase(cookie);173 caller_requests.erase(cookie);
151 if (caller_requests.size() == 0)174 if (caller_requests.size() == 0)
152 remove_display_on_requestor(requestor);175 remove_requestor(requestor, lock);
153}176}
154177
155void DBusScreen::remove_display_on_requestor(QString const& requestor)178void DBusScreen::remove_display_on_requestor(QString const& requestor)
156{179{
157 std::cerr << "remove_display_on_requestor \"" << requestor.toStdString() << "\"";180 std::lock_guard<decltype(guard)> lock{guard};
158 std::cerr << std::endl;181 remove_requestor(requestor, lock);
182}
183
184void DBusScreen::remove_requestor(QString const& requestor, std::lock_guard<std::mutex> const&)
185{
186 std::cout << "remove_display_on_requestor \"" << requestor.toStdString() << "\"";
187 std::cout << std::endl;
159188
160 display_requests.erase(requestor.toStdString());189 display_requests.erase(requestor.toStdString());
161 service_watcher->removeWatchedService(requestor);190 service_watcher->removeWatchedService(requestor);
162191
=== modified file 'src/dbus_screen.h'
--- src/dbus_screen.h 2014-08-14 16:06:33 +0000
+++ src/dbus_screen.h 2014-09-16 15:04:17 +0000
@@ -20,6 +20,7 @@
20#include <mir_toolkit/common.h>20#include <mir_toolkit/common.h>
2121
22#include <memory>22#include <memory>
23#include <mutex>
23#include <unordered_map>24#include <unordered_map>
24#include <unordered_set>25#include <unordered_set>
2526
@@ -27,6 +28,8 @@
27#include <QtCore>28#include <QtCore>
28#include <QDBusContext>29#include <QDBusContext>
2930
31namespace usc {class WorkerThread;}
32
30class DBusScreenAdaptor;33class DBusScreenAdaptor;
31class DBusScreenObserver;34class DBusScreenObserver;
32class QDBusInterface;35class QDBusInterface;
@@ -59,10 +62,14 @@
59 void remove_display_on_requestor(QString const& requestor);62 void remove_display_on_requestor(QString const& requestor);
6063
61private:64private:
65 void remove_requestor(QString const& requestor, std::lock_guard<std::mutex> const& lock);
66
67 std::mutex guard;
62 std::unique_ptr<DBusScreenAdaptor> dbus_adaptor;68 std::unique_ptr<DBusScreenAdaptor> dbus_adaptor;
63 std::unique_ptr<QDBusServiceWatcher> service_watcher;69 std::unique_ptr<QDBusServiceWatcher> service_watcher;
64 std::unordered_map<std::string, std::unordered_set<int>> display_requests;70 std::unordered_map<std::string, std::unordered_set<int>> display_requests;
65 DBusScreenObserver* const observer;71 DBusScreenObserver* const observer;
72 std::unique_ptr<usc::WorkerThread> worker_thread;
66};73};
6774
68#endif /* DBUS_SCREEN_H_ */75#endif /* DBUS_SCREEN_H_ */
6976
=== added file 'src/worker_thread.cpp'
--- src/worker_thread.cpp 1970-01-01 00:00:00 +0000
+++ src/worker_thread.cpp 2014-09-16 15:04:17 +0000
@@ -0,0 +1,141 @@
1/*
2 * Copyright © 2014 Canonical Ltd.
3 *
4 * This program is free software: you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 3,
6 * as published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 * Authored by: Alberto Aguirre <alberto.aguirre@canonical.com>
17 */
18
19#include "worker_thread.h"
20#include <mir/run_mir.h>
21
22#include <algorithm>
23#include <condition_variable>
24#include <deque>
25#include <map>
26
27#ifndef _GNU_SOURCE
28#define _GNU_SOURCE
29#endif
30#include <pthread.h>
31
32namespace usc
33{
34
35class Worker
36{
37public:
38 Worker(std::string name)
39 : name{name.substr(0, 15)}, exiting{false}
40 {
41 }
42
43 ~Worker()
44 {
45 exit();
46 }
47
48 void operator()() noexcept
49 try
50 {
51 pthread_setname_np(pthread_self(), name.c_str());
52
53 std::unique_lock<std::mutex> lock{state_mutex};
54 while (!exiting)
55 {
56 task_available_cv.wait(lock, [&]{ return exiting || !tasks.empty(); });
57
58 if (!exiting)
59 {
60 auto& task = tasks.front();
61
62 lock.unlock();
63 task();
64 lock.lock();
65 tasks.pop_front();
66 }
67 }
68 }
69 catch(...)
70 {
71 mir::terminate_with_current_exception();
72 }
73
74 void queue_task(std::function<void()> task)
75 {
76 std::lock_guard<std::mutex> lock{state_mutex};
77 tasks.push_back(std::move(task));
78 task_available_cv.notify_one();
79 }
80
81 void queue_task(std::function<void()> task, int id)
82 {
83 std::lock_guard<std::mutex> lock{state_mutex};
84 coalesced_tasks[id] = task;
85 tasks.push_back([this, id]{ run_coalesced_task(id);});
86 task_available_cv.notify_one();
87 }
88
89 void run_coalesced_task(int id)
90 {
91 std::unique_lock<std::mutex> lock{state_mutex};
92 auto it = coalesced_tasks.find(id);
93 if (it != coalesced_tasks.end())
94 {
95 auto task = it->second;
96 coalesced_tasks.erase(it);
97 lock.unlock();
98 task();
99 }
100 }
101
102 void exit()
103 {
104 std::lock_guard<std::mutex> lock{state_mutex};
105 exiting = true;
106 task_available_cv.notify_one();
107 }
108
109private:
110 std::mutex mutable state_mutex;
111 std::string name;
112 bool exiting;
113 std::map<int, std::function<void()>> coalesced_tasks;
114 std::deque<std::function<void()>> tasks;
115 std::condition_variable task_available_cv;
116};
117
118}
119
120usc::WorkerThread::WorkerThread(std::string name)
121 : worker{new Worker(name)},
122 thread{std::ref(*worker)}
123{
124}
125
126usc::WorkerThread::~WorkerThread()
127{
128 worker->exit();
129 if (thread.joinable())
130 thread.join();
131}
132
133void usc::WorkerThread::queue_task(std::function<void()> task)
134{
135 worker->queue_task(std::move(task));
136}
137
138void usc::WorkerThread::queue_task(std::function<void()> task, int id)
139{
140 worker->queue_task(std::move(task), id);
141}
0142
=== added file 'src/worker_thread.h'
--- src/worker_thread.h 1970-01-01 00:00:00 +0000
+++ src/worker_thread.h 2014-09-16 15:04:17 +0000
@@ -0,0 +1,55 @@
1/*
2 * Copyright © 2014 Canonical Ltd.
3 *
4 * This program is free software: you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 3,
6 * as published by the Free Software Foundation.
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15 *
16 * Authored by: Alberto Aguirre <alberto.aguirre@canonical.com>
17 */
18
19#ifndef USC_WORKER_THREAD_H_
20#define USC_WORKER_THREAD_H_
21
22#include <memory>
23#include <string>
24#include <thread>
25
26namespace usc
27{
28class Worker;
29class WorkerThread
30{
31public:
32 WorkerThread(std::string name);
33 ~WorkerThread();
34
35 /**
36 * Queues a task to be executed on the worker thread
37 */
38 void queue_task(std::function<void()> task);
39
40 /**
41 * Queues and coalesces tasks that share the same id
42 */
43 void queue_task(std::function<void()> task, int id);
44
45private:
46 WorkerThread(WorkerThread const&) = delete;
47 WorkerThread& operator=(WorkerThread const&) = delete;
48
49 std::unique_ptr<Worker> worker;
50 std::thread thread;
51};
52
53}
54
55#endif

Subscribers

People subscribed via source and target branches