Merge lp:~thomas-voss/dbus-cpp/fix-invalid-reads-in-executor into lp:dbus-cpp

Proposed by Thomas Voß
Status: Merged
Approved by: Thomas Voß
Approved revision: 83
Merged at revision: 78
Proposed branch: lp:~thomas-voss/dbus-cpp/fix-invalid-reads-in-executor
Merge into: lp:dbus-cpp
Diff against target: 499 lines (+321/-49)
5 files modified
src/core/dbus/asio/executor.cpp (+34/-20)
src/core/dbus/pending_call_impl.h (+114/-20)
tests/CMakeLists.txt (+27/-7)
tests/async_execution_load_test.cpp (+146/-0)
tests/executor_test.cpp (+0/-2)
To merge this branch: bzr merge lp:~thomas-voss/dbus-cpp/fix-invalid-reads-in-executor
Reviewer Review Type Date Requested Status
PS Jenkins bot continuous-integration Approve
Alberto Aguirre (community) Approve
Ubuntu Phablet Team Pending
Review via email: mp+236266@code.launchpad.net

Commit message

Always unref pending calls, add load test to ensure correct behavior.
Make sure that timeouts are never notified after destruction.

Description of the change

Always unref pending calls, add load test to ensure correct behavior.
Make sure that timeouts are never notified after destruction.

To post a comment you must log in.
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
79. By Thomas Voß

Further harden PendingCallImpl.

80. By Thomas Voß

Make sure that the ExecutionCounter does not go out of scope.

81. By Thomas Voß

Pass by value, not by reference.

Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
Revision history for this message
Alberto Aguirre (albaguirre) wrote :

Looks good.

135 + std::lock_guard<std::mutex> lg{wrapper->pending_call->guard};

One small nit:

148 + void notify_locked(const Message::Ptr& msg)

Maybe just take the a std::lock_guard const& to enforce the locked constraint?

466 + auto ec = std::make_shared<CountingEventCollector>(5000);
467 +
468 + std::thread t1{[dbus, ec]() {invoke_list_names_n_times_and_update_event_collector(dbus, 1000, ec);}};
469 + std::thread t2{[dbus, ec]() {invoke_list_names_n_times_and_update_event_collector(dbus, 1000, ec);}};
470 + std::thread t3{[dbus, ec]() {invoke_list_names_n_times_and_update_event_collector(dbus, 1000, ec);}};
471 + std::thread t4{[dbus, ec]() {invoke_list_names_n_times_and_update_event_collector(dbus, 1000, ec);}};
472 + std::thread t5{[dbus, ec]() {invoke_list_names_n_times_and_update_event_collector(dbus, 1000, ec);}};

I guess CI is too slow...maybe 100 per thread should be enough?

Revision history for this message
Alberto Aguirre (albaguirre) :
review: Approve
82. By Thomas Voß

Adjust test timeout.

Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
83. By Thomas Voß

Lower the default number of remote queries to make Jenkins happy.

Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
Revision history for this message
Alberto Aguirre (albaguirre) wrote :

428 + std::uint64_t expected;
429 + std::atomic<std::uint64_t> counter;

Can you make these int (mainly the std::atomic<std::uint64_t> to std::atomic<int> ) so that it compiles in ppc?

84. By Thomas Voß

std::*64 -> int for powerpc compatibility.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/core/dbus/asio/executor.cpp'
2--- src/core/dbus/asio/executor.cpp 2014-06-23 10:19:48 +0000
3+++ src/core/dbus/asio/executor.cpp 2014-10-02 20:47:37 +0000
4@@ -122,14 +122,20 @@
5 return;
6 }
7
8+ // We do not keep ourselves alive to prevent from races during destruction.
9+ std::weak_ptr<Timeout<UnderlyingTimeoutType>> wp{this->shared_from_this()};
10+
11 timer.expires_from_now(
12 boost::posix_time::milliseconds(
13 traits::Timeout<UnderlyingTimeoutType>::get_timeout_interval(
14 timeout)));
15- timer.async_wait(
16- std::bind(&Timeout::on_timeout,
17- Timeout<UnderlyingTimeoutType>::shared_from_this(),
18- std::placeholders::_1));
19+ timer.async_wait([wp](const boost::system::error_code& ec)
20+ {
21+ auto sp = wp.lock();
22+
23+ if (sp)
24+ sp->on_timeout(ec);
25+ });
26 }
27
28 void cancel()
29@@ -184,27 +190,35 @@
30
31 void restart()
32 {
33+ // We do not keep ourselves alive to prevent from races during destruction.
34+ std::weak_ptr<Watch<UnderlyingWatchType>> wp{this->shared_from_this()};
35+
36 if (traits::Watch<UnderlyingWatchType>::is_watch_monitoring_fd_for_readable(watch))
37 {
38- stream_descriptor.async_read_some(
39- boost::asio::null_buffers(),
40- std::bind(
41- &Watch::on_stream_descriptor_event,
42- Watch<UnderlyingWatchType>::shared_from_this(),
43- traits::Watch<UnderlyingWatchType>::readable_event(),
44- std::placeholders::_1,
45- std::placeholders::_2));
46+ stream_descriptor.async_read_some(boost::asio::null_buffers(), [wp](boost::system::error_code ec, std::size_t bytes_transferred)
47+ {
48+ auto sp = wp.lock();
49+
50+ if (sp)
51+ sp->on_stream_descriptor_event(
52+ traits::Watch<UnderlyingWatchType>::readable_event(),
53+ ec,
54+ bytes_transferred);
55+ });
56 }
57+
58 if (traits::Watch<UnderlyingWatchType>::is_watch_monitoring_fd_for_writable(watch))
59 {
60- stream_descriptor.async_write_some(
61- boost::asio::null_buffers(),
62- std::bind(
63- &Watch::on_stream_descriptor_event,
64- Watch<UnderlyingWatchType>::shared_from_this(),
65- traits::Watch<UnderlyingWatchType>::writeable_event(),
66- std::placeholders::_1,
67- std::placeholders::_2));
68+ stream_descriptor.async_write_some(boost::asio::null_buffers(), [wp](boost::system::error_code ec, std::size_t bytes_transferred)
69+ {
70+ auto sp = wp.lock();
71+
72+ if (sp)
73+ sp->on_stream_descriptor_event(
74+ traits::Watch<UnderlyingWatchType>::writeable_event(),
75+ ec,
76+ bytes_transferred);
77+ });
78 }
79 }
80
81
82=== modified file 'src/core/dbus/pending_call_impl.h'
83--- src/core/dbus/pending_call_impl.h 2014-03-05 09:08:15 +0000
84+++ src/core/dbus/pending_call_impl.h 2014-10-02 20:47:37 +0000
85@@ -26,6 +26,17 @@
86
87 #include <mutex>
88
89+namespace
90+{
91+// Better save than sorry. We wrap common error handling here.
92+bool is_pending_call_completed(DBusPendingCall* pending_call)
93+{
94+ if (not pending_call)
95+ return false;
96+
97+ return dbus_pending_call_get_completed(pending_call) == TRUE;
98+}
99+}
100 namespace core
101 {
102 namespace dbus
103@@ -45,18 +56,44 @@
104 {
105 auto wrapper = static_cast<Wrapper*>(cookie);
106
107- auto message = dbus_pending_call_steal_reply(call);
108+ if (not wrapper)
109+ return;
110
111- if (message)
112+ // We tie cleanup of the wrapper to the scope of the callback.
113+ // With that, even an exception being thrown would _not_ result
114+ // in an instance being leaked.
115+ struct Scope
116 {
117- wrapper->pending_call->notify(Message::from_raw_message(message));
118- }
119+ ~Scope()
120+ {
121+ // We take over ownership of the wrapper and destroy on scope exit.
122+ delete wrapper;
123+ // We always unref this message, even if notify_locked or
124+ // from_raw_message throws.
125+ if (message) dbus_message_unref(message);
126+ }
127+
128+ Wrapper* wrapper;
129+ // We only need this later on, when we have stolen the reply
130+ // from the pending call.
131+ DBusMessage* message;
132+ } scope{wrapper, nullptr};
133+
134+ // We synchronize to avoid races on construction.
135+ std::lock_guard<std::mutex> lg{wrapper->pending_call->guard};
136+ // And we only steal the reply if the call actually completed.
137+ if (not is_pending_call_completed(call))
138+ return;
139+
140+ scope.message = dbus_pending_call_steal_reply(call);
141+
142+ if (scope.message)
143+ wrapper->pending_call->notify_locked(Message::from_raw_message(scope.message));
144 }
145
146- void notify(const Message::Ptr& msg)
147+ // Announce incoming reply and invoke the callback if set.
148+ void notify_locked(const Message::Ptr& msg)
149 {
150- std::lock_guard<std::mutex> lg(guard);
151-
152 message = msg;
153
154 if (callback)
155@@ -71,36 +108,93 @@
156 public:
157 inline static core::dbus::PendingCall::Ptr create(DBusPendingCall* call)
158 {
159+ if (not call) throw std::runtime_error
160+ {
161+ "core::dbus::PendingCall cannot be constructed for null object."
162+ };
163+
164 auto result = std::shared_ptr<core::dbus::impl::PendingCall>
165 {
166 new core::dbus::impl::PendingCall{call}
167 };
168
169+ // Our scope contains two objects that are dynamically created:
170+ // * The actual PendingCall implementation (managed by a shared pointer)
171+ // * A helper object of type Wrapper for passing around the pending call instance.
172+ // The latter one needs some manual memory mgmt. until we are sure that it got handed
173+ // over to libdbus correctly.
174+ struct Scope
175+ {
176+ // Whenever we go out of scope, we unref the call (we do not need it anymore)
177+ // and we potentially (if armed) delete the wrapper.
178+ ~Scope()
179+ {
180+ dbus_pending_call_unref(call);
181+ if (armed) delete wrapper;
182+ }
183+
184+ // Disarms the scope for handling the wrapper instance.
185+ // Think about it like giving up ownership as we handed over
186+ // to libdbus.
187+ void disarm_for_wrapper()
188+ {
189+ armed = false;
190+ }
191+
192+ // The raw call instance.
193+ DBusPendingCall* call;
194+ // True if the wrapper instance is still owned by the scope.
195+ bool armed;
196+ // The wrapper instance.
197+ Wrapper* wrapper;
198+ } scope
199+ {
200+ // The raw call that we got handed from the caller.
201+ call,
202+ // Yes, we still own the Wrapper instance.
203+ true, new Wrapper{result}
204+ };
205+
206+ // We synchronize to avoid races on construction.
207+ std::lock_guard<std::mutex> lg{result->guard};
208+
209 if (FALSE == dbus_pending_call_set_notify(
210- result->pending_call,
211- PendingCall::on_pending_call_completed,
212- new Wrapper{result},
213- [](void* data)
214- {
215- delete static_cast<Wrapper*>(data);
216- }))
217+ // We dispatch to the static on_pending_call_completed when
218+ // the call completes.
219+ result->pending_call, PendingCall::on_pending_call_completed,
220+ // We pass in our helper object and do not specify a deleter.
221+ // Please refer to the source-code comments in on_pending_call_completed.
222+ scope.wrapper, nullptr))
223 {
224 throw std::runtime_error("Error setting up pending call notification.");
225 }
226
227 // And here comes the beauty of libdbus, and its racy architecture:
228- {
229- std::lock_guard<std::mutex> lg(result->guard);
230- if (TRUE == dbus_pending_call_get_completed(call))
231+ {
232+ if (is_pending_call_completed(call))
233 {
234 // We took too long while setting up the pending call notification.
235 // For that we now have to inject the message here.
236 auto msg = dbus_pending_call_steal_reply(call);
237- result->message = Message::from_raw_message(msg);
238+
239+ if (msg)
240+ {
241+ result->message = Message::from_raw_message(msg);
242+ // We decrease the reference count as Message::from_raw_message
243+ // always refs the object that it is passed.
244+ dbus_message_unref(msg);
245+ }
246+ }
247+ else
248+ {
249+ // We need the wrapper to be alive, so we disarm the scope.
250+ // Please note that this is the only "good" path through this
251+ // mess of setup and notification functions.
252+ scope.disarm_for_wrapper();
253 }
254 }
255
256- return std::dynamic_pointer_cast<core::dbus::PendingCall>(result);
257+ return result;
258 }
259
260 void cancel()
261@@ -122,7 +216,7 @@
262 PendingCall(DBusPendingCall* call)
263 : pending_call(call)
264 {
265- }
266+ }
267 };
268 }
269 }
270
271=== modified file 'tests/CMakeLists.txt'
272--- tests/CMakeLists.txt 2014-01-27 12:22:43 +0000
273+++ tests/CMakeLists.txt 2014-10-02 20:47:37 +0000
274@@ -14,13 +14,14 @@
275 #
276 # Authored by: Thomas Voss <thomas.voss@canonical.com>
277
278-set (OLD_CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
279-# Don't treat warnings as errors in 3rd_party/{gmock,cucumber-cpp}
280-string (REPLACE " -Werror " " " CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
281-find_package(Gtest REQUIRED)
282-include_directories(${GMOCK_INCLUDE_DIR} ${GTEST_INCLUDE_DIR})
283-set (CMAKE_CXX_FLAGS ${OLD_CMAKE_CXX_FLAGS})
284-
285+# Build with system gmock and embedded gtest
286+set (GMOCK_INCLUDE_DIR "/usr/include/gmock/include" CACHE PATH "gmock source include directory")
287+set (GMOCK_SOURCE_DIR "/usr/src/gmock" CACHE PATH "gmock source directory")
288+set (GTEST_INCLUDE_DIR "${GMOCK_SOURCE_DIR}/gtest/include" CACHE PATH "gtest source include directory")
289+
290+add_subdirectory(${GMOCK_SOURCE_DIR} "${CMAKE_CURRENT_BINARY_DIR}/gmock")
291+
292+set(GTEST_BOTH_LIBRARIES gmock gtest gtest_main)
293 find_package(Threads)
294
295 add_definitions(-DCORE_DBUS_ENABLE_GOOGLE_TEST_FIXTURE)
296@@ -41,6 +42,11 @@
297 )
298
299 add_executable(
300+ async_execution_load_test
301+ async_execution_load_test.cpp
302+ )
303+
304+add_executable(
305 bus_test
306 bus_test.cpp
307 )
308@@ -111,6 +117,19 @@
309 )
310
311 target_link_libraries(
312+ async_execution_load_test
313+
314+ dbus-cpp
315+ dbus-cppc-helper
316+
317+ ${CMAKE_THREAD_LIBS_INIT}
318+ ${Boost_LIBRARIES}
319+ ${DBUS_LIBRARIES}
320+ ${GTEST_BOTH_LIBRARIES}
321+ ${PROCESS_CPP_LIBRARIES}
322+)
323+
324+target_link_libraries(
325 bus_test
326
327 dbus-cpp
328@@ -279,6 +298,7 @@
329 ${GTEST_BOTH_LIBRARIES}
330 )
331
332+add_test(async_execution_load_test ${CMAKE_CURRENT_BINARY_DIR}/async_execution_load_test)
333 add_test(bus_test ${CMAKE_CURRENT_BINARY_DIR}/bus_test)
334 add_test(cache_test ${CMAKE_CURRENT_BINARY_DIR}/cache_test)
335 add_test(dbus_test ${CMAKE_CURRENT_BINARY_DIR}/dbus_test)
336
337=== added file 'tests/async_execution_load_test.cpp'
338--- tests/async_execution_load_test.cpp 1970-01-01 00:00:00 +0000
339+++ tests/async_execution_load_test.cpp 2014-10-02 20:47:37 +0000
340@@ -0,0 +1,146 @@
341+/*
342+ * Copyright © 2014 Canonical Ltd.
343+ *
344+ * This program is free software: you can redistribute it and/or modify it
345+ * under the terms of the GNU Lesser General Public License version 3,
346+ * as published by the Free Software Foundation.
347+ *
348+ * This program is distributed in the hope that it will be useful,
349+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
350+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
351+ * GNU Lesser General Public License for more details.
352+ *
353+ * You should have received a copy of the GNU Lesser General Public License
354+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
355+ *
356+ * Authored by: Thomas Voß <thomas.voss@canonical.com>
357+ */
358+
359+#include <core/dbus/fixture.h>
360+#include <core/dbus/macros.h>
361+#include <core/dbus/object.h>
362+#include <core/dbus/service.h>
363+
364+#include <core/dbus/asio/executor.h>
365+
366+#include <core/dbus/types/stl/string.h>
367+#include <core/dbus/types/stl/vector.h>
368+
369+#include <core/testing/fork_and_run.h>
370+
371+#include <gmock/gmock.h>
372+
373+#include "test_data.h"
374+
375+namespace
376+{
377+// Our fixture for starting up private system- and session-bus instances.
378+struct AsyncExecutionLoadTest : public core::dbus::testing::Fixture
379+{
380+};
381+
382+auto session_bus_config_file =
383+ core::dbus::testing::Fixture::default_session_bus_config_file() =
384+ core::testing::session_bus_configuration_file();
385+
386+auto system_bus_config_file =
387+ core::dbus::testing::Fixture::default_system_bus_config_file() =
388+ core::testing::system_bus_configuration_file();
389+
390+struct DBus
391+{
392+ static const std::string& name()
393+ {
394+ static const std::string s{DBUS_SERVICE_DBUS};
395+ return s;
396+ }
397+
398+ DBUS_CPP_METHOD_WITH_TIMEOUT_DEF(ListNames, DBus, 25000)
399+};
400+
401+struct CountingEventCollector
402+{
403+ CountingEventCollector(int expected)
404+ : expected{expected},
405+ counter{0}
406+ {
407+ }
408+
409+ void update()
410+ {
411+ if (++counter == expected)
412+ wait_condition.notify_all();
413+ }
414+
415+ ::testing::AssertionResult wait_for(const std::chrono::milliseconds& ms)
416+ {
417+ std::unique_lock<std::mutex> ul(guard);
418+
419+ auto result = wait_condition.wait_for(ul, ms, [this]() { return counter == expected; });
420+
421+ if (result)
422+ return ::testing::AssertionSuccess();
423+
424+ return ::testing::AssertionFailure() << "Current count of "
425+ << counter << " does not match " << expected;
426+ }
427+
428+ int expected;
429+ std::atomic<int> counter;
430+
431+ std::mutex guard;
432+ std::condition_variable wait_condition;
433+};
434+
435+void invoke_list_names_n_times_and_update_event_collector(
436+ // The object referring to the bus daemon
437+ const core::dbus::Object::Ptr& dbus,
438+ // Number of iterations
439+ std::size_t n,
440+ // The event collector instance that should be updated
441+ const std::shared_ptr<CountingEventCollector>& ec)
442+{
443+ for (unsigned int i = 0; i < n; i++)
444+ {
445+ dbus->invoke_method_asynchronously_with_callback<DBus::ListNames, std::vector<std::string>>([ec](const core::dbus::Result<std::vector<std::string>>& vs)
446+ {
447+ if (not vs.is_error())
448+ ec->update();
449+ });
450+ }
451+}
452+}
453+
454+TEST_F(AsyncExecutionLoadTest, RepeatedlyInvokingAnAsyncFunctionWorks)
455+{
456+ using namespace ::testing;
457+
458+ auto bus = session_bus();
459+ bus->install_executor(core::dbus::asio::make_executor(bus));
460+
461+ std::thread worker{[bus]() { bus->run(); }};
462+
463+ auto service = core::dbus::Service::use_service(bus, DBus::name());
464+ auto dbus = service->object_for_path(core::dbus::types::ObjectPath{DBUS_PATH_DBUS});
465+
466+ auto ec = std::make_shared<CountingEventCollector>(500);
467+
468+ std::thread t1{[dbus, ec]() {invoke_list_names_n_times_and_update_event_collector(dbus, 100, ec);}};
469+ std::thread t2{[dbus, ec]() {invoke_list_names_n_times_and_update_event_collector(dbus, 100, ec);}};
470+ std::thread t3{[dbus, ec]() {invoke_list_names_n_times_and_update_event_collector(dbus, 100, ec);}};
471+ std::thread t4{[dbus, ec]() {invoke_list_names_n_times_and_update_event_collector(dbus, 100, ec);}};
472+ std::thread t5{[dbus, ec]() {invoke_list_names_n_times_and_update_event_collector(dbus, 100, ec);}};
473+
474+ EXPECT_TRUE(ec->wait_for(std::chrono::seconds{60}));
475+
476+ bus->stop();
477+
478+ if (t1.joinable()) t1.join();
479+ if (t2.joinable()) t2.join();
480+ if (t3.joinable()) t3.join();
481+ if (t4.joinable()) t4.join();
482+ if (t5.joinable()) t5.join();
483+
484+ if (worker.joinable())
485+ worker.join();
486+}
487
488=== modified file 'tests/executor_test.cpp'
489--- tests/executor_test.cpp 2014-06-23 10:19:48 +0000
490+++ tests/executor_test.cpp 2014-10-02 20:47:37 +0000
491@@ -263,8 +263,6 @@
492 EXPECT_EQ(core::testing::ForkAndRunResult::empty, core::testing::fork_and_run(service, client));
493 }
494
495-
496-
497 /*TEST(Bus, TimeoutThrowsForNullDBusWatch)
498 {
499 boost::asio::io_service io_service;

Subscribers

People subscribed via source and target branches