Mir

Merge lp:~mir-team/mir/add-dispatchable-interface into lp:mir

Proposed by Chris Halse Rogers
Status: Merged
Approved by: Chris Halse Rogers
Approved revision: no longer in the source branch.
Merged at revision: 2269
Proposed branch: lp:~mir-team/mir/add-dispatchable-interface
Merge into: lp:mir
Diff against target: 2044 lines (+1169/-266)
33 files modified
common-ABI-sha1sums (+2/-0)
include/common/mir/dispatch/dispatchable.h (+78/-0)
include/common/mir/dispatch/simple_dispatch_thread.h (+48/-0)
platform-ABI-sha1sums (+2/-0)
server-ABI-sha1sums (+2/-0)
src/client/mir_connection.cpp (+5/-1)
src/client/mir_connection.h (+7/-0)
src/client/rpc/mir_protobuf_rpc_channel.cpp (+16/-0)
src/client/rpc/mir_protobuf_rpc_channel.h (+9/-1)
src/client/rpc/stream_socket_transport.cpp (+30/-111)
src/client/rpc/stream_socket_transport.h (+3/-4)
src/client/rpc/stream_transport.h (+4/-5)
src/common/CMakeLists.txt (+2/-0)
src/common/dispatch/CMakeLists.txt (+22/-0)
src/common/dispatch/simple_dispatch_thread.cpp (+158/-0)
src/common/symbols.map (+6/-2)
tests/CMakeLists.txt (+1/-0)
tests/include/mir_test/fd_utils.h (+69/-0)
tests/include/mir_test/pipe.h (+8/-4)
tests/include/mir_test/test_dispatchable.h (+60/-0)
tests/include/mir_test/test_protobuf_client.h (+5/-0)
tests/integration-tests/client/test_screencast.cpp (+7/-0)
tests/mir_test/CMakeLists.txt (+2/-0)
tests/mir_test/fd_utils.cpp (+40/-0)
tests/mir_test/pipe.cpp (+21/-18)
tests/mir_test/test_dispatchable.cpp (+98/-0)
tests/mir_test_doubles/test_protobuf_client.cpp (+6/-1)
tests/unit-tests/CMakeLists.txt (+1/-0)
tests/unit-tests/client/test_mir_connection.cpp (+13/-1)
tests/unit-tests/client/test_protobuf_rpc_channel.cpp (+4/-0)
tests/unit-tests/client/test_stream_transport.cpp (+250/-118)
tests/unit-tests/dispatch/CMakeLists.txt (+5/-0)
tests/unit-tests/dispatch/test_simple_dispatch_thread.cpp (+185/-0)
To merge this branch: bzr merge lp:~mir-team/mir/add-dispatchable-interface
Reviewer Review Type Date Requested Status
PS Jenkins bot (community) continuous-integration Approve
Daniel van Vugt Abstain
Robert Carr (community) Approve
Andreas Pokorny (community) Needs Information
Alexandros Frantzis (community) Approve
Cemil Azizoglu (community) Approve
Review via email: mp+246372@code.launchpad.net

Commit message

Add a Dispatchable interface, and transition StreamSocketTransport.
(LP: #1397375)

The Dispatchable interface is useful for anything that can provide a monitorable file descriptor.

Description of the change

A rough guide for the reviewer:

This whole enchilada is necessary because we want to expose a fd based wait/dispatch event model to clients: https://bugs.launchpad.net/bugs/1397375 (This is good for GTK, XMir, and makes a plymouth backend much more feasible).

In turn, this means that all event generators in the client library need to provide some mechanism we can aggregate into the client dispatch fd; epoll is a very nice interface for this, so in the end the event generators will provide an fd that the client library can add to an epoll fd and pass that epoll fd out to the client to wait on.

The meat of this MP is the mir::dispatch::Dispatchable interface, and the implementation of this in mir::client::rpc::StreamSocketTransport.

SimpleDispatchThread is split out of StreamSocketTransport to do the newly-necessary event dispatch for MirConnection.

The next step in this series is to turn the AndroidInputDispatcher into a Dispatchable.

To post a comment you must log in.
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
Revision history for this message
Alexandros Frantzis (afrantzis) wrote :

A first pass...

This MP introduces another event loop implementation. We are already using the GLib event loop API in the server, perhaps we should also do so in the client. I guess the counter-argument is that we don't want to impose extra dependencies on the client. I am OK with having this custom implementation as long as it doesn't get too complex (in which case we can switch to GLib or other library).

34 +enum fd_event : uint32_t {
41 +using fd_events = uint32_t;

Types should use PascalCase.

76 + virtual fd_events relevant_events() const = 0;

Just noting that this is not used at the moment. Not really an issue since its utility is evident, but still it would be good to use it in wait_for_events_forever() since it's there.

review: Needs Fixing
Revision history for this message
Chris Halse Rogers (raof) wrote :

Yeah, it does introduce another event loop. I think that it's sufficiently simple now that it's not really worth replacing with g_main_loop, and although subsequent MPs will extend it, I think they're extending it in ways not supported by g_main_loop anyway.

Revision history for this message
Chris Halse Rogers (raof) wrote :

(In particular, in later MPs SimpleDispatchThread becomes ThreadedDispatcer which dispatches from multiple threads, which is not supported for GMainContext)

Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
Revision history for this message
Robert Carr (robertcarr) wrote :

I think some of the tests could be made easier to read without much effort.

test_simple_dispatch_thread:

Since there are no expectations you can use a StubDispatchable and clean up the test body a little. Likewise you can move at least the dispatcher and the signal and uint64_t dummy to the fixture.

test_stream_transport:

+ // A valid fd is >= 0, and we know that stdin, stdout, and stderr aren't correct.

fnctl(fd, O_GETFD) is used elsewhere.

+ uint64_t dummy{0xdeadbeef};

Can be moved to fixture or

1084:
+ EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));

Perhaps move to fixture...write_to_fd()

1138: + std::this_thread::sleep_for(std::chrono::seconds{1});

:(

1095: + auto observer = std::make_shared<NiceMock<MockObserver>>();

Stub

review: Needs Fixing
Revision history for this message
Robert Carr (robertcarr) wrote :

I guess the needs fixing are please fix the stub v. mocks and fixtures in order to clear up test bodies. Will finish review tomorrow :)

Revision history for this message
Cemil Azizoglu (cemil-azizoglu) wrote :

Thanks for the guide, it was useful.

Some nits, otherwise all good.

63 + * Dispatch should no longer be called.
Indentation

65 + * specify in \ref event (eg: readable | remote_closed) then dispatch
you mean "specified", I think.

534 +# Copyright © 2014 Canonical Ltd.
s/2014/2015

548 +# Authored by: Robert Carr <email address hidden>
Correct attribution?

562 + * Copyright © 2014 Canonical Ltd.
s/2014/2015

review: Approve
Revision history for this message
Alexandros Frantzis (afrantzis) wrote :

693 + shutdown_fd = mir::Fd{pipefds[1]};
<snip>
713 + ::close(shutdown_fd);

This can lead to nasty double close bug: after the call to explicitly close the shutdown_fd, some other thread in the process may open a file and get the fd number that was just closed. When the shutdown_fd destructor is finally called the newly created file will be closed.

1709 + EXPECT_FALSE(dispatched_more_than_enough->wait_for(std::chrono::seconds{1}));
1741 + EXPECT_FALSE(dispatched_closed->wait_for(std::chrono::seconds{1}));

It's difficult (and often not possible) to test that an event will not occur, without waiting for some amount of time. I am a bit torn about this. On the one hand, unit tests should be super-fast. On the other hand one could argue that with the ability to run tests in parallel, small delays don't matter.

> 1095: + auto observer = std::make_shared<NiceMock<MockObserver>>();
>
> Stub

+1

review: Needs Fixing
Revision history for this message
Chris Halse Rogers (raof) wrote :

> 1709 +
> EXPECT_FALSE(dispatched_more_than_enough->wait_for(std::chrono::seconds{1}));
> 1741 + EXPECT_FALSE(dispatched_closed->wait_for(std::chrono::seconds{1}));
>
> It's difficult (and often not possible) to test that an event will not occur,
> without waiting for some amount of time. I am a bit torn about this. On the
> one hand, unit tests should be super-fast. On the other hand one could argue
> that with the ability to run tests in parallel, small delays don't matter.

Right. We do ideally want to avoid long tests, but the testsuite still takes ~5 seconds for a full run, which is close enough to instant for me.

Revision history for this message
Chris Halse Rogers (raof) wrote :

In case you're wondering about the long stream of commits Launchpad sees, that's an unfortunate artefact of git-remote-bzr; as far as I can tell I can't merge from trunk, I need to rebase on it. It shouldn't be visible in the final bzr log, and I won't use git-remote-bzr for any further branches.

Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
Revision history for this message
Alexandros Frantzis (afrantzis) wrote :

Looks good.

review: Approve
Revision history for this message
Andreas Pokorny (andreas-pokorny) wrote :

side note:
+ 41 FdEvents could be made type safer with:

http://bazaar.launchpad.net/~andreas-pokorny/mir/evdev-input-platform/view/head:/src/include/common/mir/flags.h

hmm it only lacks toggling and disabling bits..

Why is that in include/, instead of src/include?

review: Needs Information
Revision history for this message
Robert Carr (robertcarr) wrote :

I still think it maybe worth trying to clean up the tests a little anyway don't want to block this on that though....:)

review: Approve
Revision history for this message
Daniel van Vugt (vanvugt) wrote :

Adding to the public API:
1 === added directory 'include/common/mir/dispatch'
2 === added file 'include/common/mir/dispatch/dispatchable.h'
3 --- include/common/mir/dispatch/dispatchable.h 1970-01-01 00:00:00 +0000
4 +++ include/common/mir/dispatch/dispatchable.h 2015-01-21 01:20:26 +0000
also means we need to update the sha1sums.

review: Needs Fixing
Revision history for this message
Chris Halse Rogers (raof) wrote :

@Andreas: I've looked at using mir::Flags<FdEvent>, but it becomes a bit ugly - particularly, you can't easily check against (md::FdEvent::readable | md::FdEvent::writable). Sadly, I think that a better thing would end up using the C pre-processor.

Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
Revision history for this message
Daniel van Vugt (vanvugt) :
review: Abstain
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'common-ABI-sha1sums'
2--- common-ABI-sha1sums 2015-01-27 19:46:30 +0000
3+++ common-ABI-sha1sums 2015-01-27 23:19:58 +0000
4@@ -1,4 +1,6 @@
5 3329ada91412ded2f127aee9a92f065e57b81cb2 include/common/mir/cached_ptr.h
6+d20a846dfa0e46fb276b4c181a6a5a56e7a884da include/common/mir/dispatch/dispatchable.h
7+7bdc0e58dc228ac655a618208bf133c6a499ca13 include/common/mir/dispatch/simple_dispatch_thread.h
8 6473b6b17e72053a4ca3f7dcf8e2d81156fbfb85 include/common/mir/events/event_builders.h
9 82ff9499ef62739379616e02164dc98f9914c329 include/common/mir/fd.h
10 fe0275d9c64e7a2d99990382d04b1fe956d7b7e4 include/common/mir/frontend/surface_id.h
11
12=== added directory 'include/common/mir/dispatch'
13=== added file 'include/common/mir/dispatch/dispatchable.h'
14--- include/common/mir/dispatch/dispatchable.h 1970-01-01 00:00:00 +0000
15+++ include/common/mir/dispatch/dispatchable.h 2015-01-27 23:19:58 +0000
16@@ -0,0 +1,78 @@
17+/*
18+ * Copyright © 2015 Canonical Ltd.
19+ *
20+ * This program is free software: you can redistribute it and/or modify it
21+ * under the terms of the GNU Lesser General Public License version 3,
22+ * as published by the Free Software Foundation.
23+ *
24+ * This program is distributed in the hope that it will be useful,
25+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
26+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27+ * GNU Lesser General Public License for more details.
28+ *
29+ * You should have received a copy of the GNU Lesser General Public License
30+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
31+ *
32+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
33+ */
34+
35+#ifndef MIR_DISPATCH_DISPATCHABLE_H_
36+#define MIR_DISPATCH_DISPATCHABLE_H_
37+
38+#include "mir/fd.h"
39+
40+namespace mir
41+{
42+namespace dispatch
43+{
44+
45+enum FdEvent : uint32_t {
46+ readable = 1<<0,
47+ writable = 1<<1,
48+ remote_closed = 1<<2,
49+ error = 1<<3
50+};
51+
52+using FdEvents = uint32_t;
53+
54+class Dispatchable
55+{
56+public:
57+ Dispatchable() = default;
58+ virtual ~Dispatchable() = default;
59+
60+ Dispatchable& operator=(Dispatchable const&) = delete;
61+ Dispatchable(Dispatchable const&) = delete;
62+
63+ /**
64+ * \brief Get a poll()able file descriptor
65+ * \return A file descriptor usable with poll() or equivalent function calls.
66+ * relevant_events() contains the set of event types to watch for.
67+ */
68+ virtual Fd watch_fd() const = 0;
69+
70+ /**
71+ * \brief Dispatch one pending event
72+ * \param [in] event The set of events current on the file-descriptor
73+ * \returns False iff no more events will be produced by this Dispatchable.
74+ * Dispatch should no longer be called.
75+ * \note This will dispatch at most one event. If there are multiple events
76+ * specified in \ref event (eg: readable | remote_closed) then dispatch
77+ * will process only one.
78+ * \note It is harmless to call dispatch() with an event that does not contain
79+ * any of the events from relevant_events(). The function will do
80+ * nothing in such a case.
81+ * \note An implementation of dispatch() MUST handle FdEvent::error,
82+ * if only to return false and terminate further event dispatch.
83+ */
84+ virtual bool dispatch(FdEvents events) = 0;
85+
86+ /**
87+ * \brief The set of file-descriptor events this Dispatchable handles.
88+ */
89+ virtual FdEvents relevant_events() const = 0;
90+};
91+}
92+}
93+
94+#endif // MIR_DISPATCH_DISPATCHABLE_H_
95
96=== added file 'include/common/mir/dispatch/simple_dispatch_thread.h'
97--- include/common/mir/dispatch/simple_dispatch_thread.h 1970-01-01 00:00:00 +0000
98+++ include/common/mir/dispatch/simple_dispatch_thread.h 2015-01-27 23:19:58 +0000
99@@ -0,0 +1,48 @@
100+/*
101+ * Copyright © 2015 Canonical Ltd.
102+ *
103+ * This program is free software: you can redistribute it and/or modify it
104+ * under the terms of the GNU Lesser General Public License version 3,
105+ * as published by the Free Software Foundation.
106+ *
107+ * This program is distributed in the hope that it will be useful,
108+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
109+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
110+ * GNU Lesser General Public License for more details.
111+ *
112+ * You should have received a copy of the GNU Lesser General Public License
113+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
114+ *
115+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
116+ */
117+
118+#ifndef MIR_DISPATCH_SIMPLE_DISPATCH_THREAD_H_
119+#define MIR_DISPATCH_SIMPLE_DISPATCH_THREAD_H_
120+
121+#include <memory>
122+#include <thread>
123+#include "mir/fd.h"
124+
125+namespace mir
126+{
127+namespace dispatch
128+{
129+class Dispatchable;
130+
131+
132+class SimpleDispatchThread
133+{
134+public:
135+ SimpleDispatchThread(std::shared_ptr<Dispatchable> const& dispatchee);
136+ ~SimpleDispatchThread() noexcept;
137+
138+private:
139+ Fd shutdown_fd;
140+ std::thread eventloop;
141+};
142+
143+}
144+}
145+
146+
147+#endif // MIR_DISPATCH_SIMPLE_DISPATCH_THREAD_H_
148
149=== modified file 'platform-ABI-sha1sums'
150--- platform-ABI-sha1sums 2015-01-27 19:46:30 +0000
151+++ platform-ABI-sha1sums 2015-01-27 23:19:58 +0000
152@@ -1,4 +1,6 @@
153 3329ada91412ded2f127aee9a92f065e57b81cb2 include/common/mir/cached_ptr.h
154+d20a846dfa0e46fb276b4c181a6a5a56e7a884da include/common/mir/dispatch/dispatchable.h
155+7bdc0e58dc228ac655a618208bf133c6a499ca13 include/common/mir/dispatch/simple_dispatch_thread.h
156 6473b6b17e72053a4ca3f7dcf8e2d81156fbfb85 include/common/mir/events/event_builders.h
157 82ff9499ef62739379616e02164dc98f9914c329 include/common/mir/fd.h
158 fe0275d9c64e7a2d99990382d04b1fe956d7b7e4 include/common/mir/frontend/surface_id.h
159
160=== modified file 'server-ABI-sha1sums'
161--- server-ABI-sha1sums 2015-01-27 19:46:30 +0000
162+++ server-ABI-sha1sums 2015-01-27 23:19:58 +0000
163@@ -1,4 +1,6 @@
164 3329ada91412ded2f127aee9a92f065e57b81cb2 include/common/mir/cached_ptr.h
165+d20a846dfa0e46fb276b4c181a6a5a56e7a884da include/common/mir/dispatch/dispatchable.h
166+7bdc0e58dc228ac655a618208bf133c6a499ca13 include/common/mir/dispatch/simple_dispatch_thread.h
167 6473b6b17e72053a4ca3f7dcf8e2d81156fbfb85 include/common/mir/events/event_builders.h
168 82ff9499ef62739379616e02164dc98f9914c329 include/common/mir/fd.h
169 fe0275d9c64e7a2d99990382d04b1fe956d7b7e4 include/common/mir/frontend/surface_id.h
170
171=== modified file 'src/client/mir_connection.cpp'
172--- src/client/mir_connection.cpp 2015-01-26 23:09:56 +0000
173+++ src/client/mir_connection.cpp 2015-01-27 23:19:58 +0000
174@@ -26,6 +26,8 @@
175 #include "mir/client_platform.h"
176 #include "mir/client_platform_factory.h"
177 #include "rpc/mir_basic_rpc_channel.h"
178+#include "mir/dispatch/dispatchable.h"
179+#include "mir/dispatch/simple_dispatch_thread.h"
180 #include "connection_configuration.h"
181 #include "display_configuration.h"
182 #include "connection_surface_map.h"
183@@ -41,6 +43,7 @@
184 #include <boost/exception/diagnostic_information.hpp>
185
186 namespace mcl = mir::client;
187+namespace md = mir::dispatch;
188 namespace mircv = mir::input::receiver;
189 namespace gp = google::protobuf;
190
191@@ -109,7 +112,8 @@
192 display_configuration(conf.the_display_configuration()),
193 lifecycle_control(conf.the_lifecycle_control()),
194 surface_map(conf.the_surface_map()),
195- event_handler_register(conf.the_event_handler_register())
196+ event_handler_register(conf.the_event_handler_register()),
197+ eventloop{new md::SimpleDispatchThread{std::dynamic_pointer_cast<md::Dispatchable>(channel)}}
198 {
199 connect_result.set_error("connect not called");
200 {
201
202=== modified file 'src/client/mir_connection.h'
203--- src/client/mir_connection.h 2015-01-20 22:23:20 +0000
204+++ src/client/mir_connection.h 2015-01-27 23:19:58 +0000
205@@ -69,6 +69,11 @@
206 {
207 class Logger;
208 }
209+
210+namespace dispatch
211+{
212+class SimpleDispatchThread;
213+}
214 }
215
216 struct MirConnection : mir::client::ClientContext
217@@ -192,6 +197,8 @@
218
219 std::shared_ptr<mir::client::EventHandlerRegister> const event_handler_register;
220
221+ std::unique_ptr<mir::dispatch::SimpleDispatchThread> const eventloop;
222+
223 std::vector<int> extra_platform_data;
224
225 std::shared_ptr<mir::client::ClientBufferStreamFactory> buffer_stream_factory;
226
227=== modified file 'src/client/rpc/mir_protobuf_rpc_channel.cpp'
228--- src/client/rpc/mir_protobuf_rpc_channel.cpp 2015-01-26 23:09:56 +0000
229+++ src/client/rpc/mir_protobuf_rpc_channel.cpp 2015-01-27 23:19:58 +0000
230@@ -40,6 +40,7 @@
231
232 namespace mcl = mir::client;
233 namespace mclr = mir::client::rpc;
234+namespace md = mir::dispatch;
235
236 namespace
237 {
238@@ -354,3 +355,18 @@
239 {
240 notify_disconnected();
241 }
242+
243+mir::Fd mir::client::rpc::MirProtobufRpcChannel::watch_fd() const
244+{
245+ return transport->watch_fd();
246+}
247+
248+bool mir::client::rpc::MirProtobufRpcChannel::dispatch(md::FdEvents events)
249+{
250+ return transport->dispatch(events);
251+}
252+
253+md::FdEvents mclr::MirProtobufRpcChannel::relevant_events() const
254+{
255+ return transport->relevant_events();
256+}
257
258=== modified file 'src/client/rpc/mir_protobuf_rpc_channel.h'
259--- src/client/rpc/mir_protobuf_rpc_channel.h 2015-01-14 06:39:13 +0000
260+++ src/client/rpc/mir_protobuf_rpc_channel.h 2015-01-27 23:19:58 +0000
261@@ -21,6 +21,7 @@
262
263 #include "mir_basic_rpc_channel.h"
264 #include "stream_transport.h"
265+#include "mir/dispatch/dispatchable.h"
266
267 #include <google/protobuf/service.h>
268 #include <google/protobuf/descriptor.h>
269@@ -52,7 +53,8 @@
270
271 class MirProtobufRpcChannel :
272 public MirBasicRpcChannel,
273- public StreamTransport::Observer
274+ public StreamTransport::Observer,
275+ public dispatch::Dispatchable
276 {
277 public:
278 MirProtobufRpcChannel(std::unique_ptr<StreamTransport> transport,
279@@ -64,8 +66,14 @@
280
281 ~MirProtobufRpcChannel() = default;
282
283+ // StreamTransport::Observer
284 void on_data_available() override;
285 void on_disconnected() override;
286+
287+ // Dispatchable
288+ Fd watch_fd() const override;
289+ bool dispatch(mir::dispatch::FdEvents events) override;
290+ mir::dispatch::FdEvents relevant_events() const override;
291 private:
292 virtual void CallMethod(const google::protobuf::MethodDescriptor* method, google::protobuf::RpcController*,
293 const google::protobuf::Message* parameters, google::protobuf::Message* response,
294
295=== modified file 'src/client/rpc/stream_socket_transport.cpp'
296--- src/client/rpc/stream_socket_transport.cpp 2015-01-14 06:39:13 +0000
297+++ src/client/rpc/stream_socket_transport.cpp 2015-01-27 23:19:58 +0000
298@@ -23,7 +23,6 @@
299
300 #include <system_error>
301
302-#include <signal.h>
303 #include <errno.h>
304 #include <sys/epoll.h>
305 #include <sys/types.h>
306@@ -34,11 +33,11 @@
307 #include <boost/throw_exception.hpp>
308
309 namespace mclr = mir::client::rpc;
310+namespace md = mir::dispatch;
311
312 mclr::StreamSocketTransport::StreamSocketTransport(mir::Fd const& fd)
313 : socket_fd{fd}
314 {
315- init();
316 }
317
318 mclr::StreamSocketTransport::StreamSocketTransport(std::string const& socket_path)
319@@ -46,17 +45,6 @@
320 {
321 }
322
323-mclr::StreamSocketTransport::~StreamSocketTransport()
324-{
325- int dummy{0};
326- send(shutdown_fd, &dummy, sizeof(dummy), MSG_NOSIGNAL);
327- if (io_service_thread.joinable())
328- {
329- io_service_thread.join();
330- }
331- close(shutdown_fd);
332-}
333-
334 void mclr::StreamSocketTransport::register_observer(std::shared_ptr<Observer> const& observer)
335 {
336 std::lock_guard<decltype(observer_mutex)> lock(observer_mutex);
337@@ -176,110 +164,41 @@
338 mir::send_fds(socket_fd, fds);
339 }
340
341-void mclr::StreamSocketTransport::init()
342+mir::Fd mclr::StreamSocketTransport::watch_fd() const
343 {
344- // We use sockets rather than a pipe so that we can control
345- // EPIPE behaviour; we don't want SIGPIPE when the IO loop terminates.
346- int socket_fds[2];
347- socketpair(AF_UNIX, SOCK_STREAM, 0, socket_fds);
348- this->shutdown_fd = mir::Fd{socket_fds[1]};
349+ return socket_fd;
350+}
351
352- auto shutdown_fd = mir::Fd{socket_fds[0]};
353- io_service_thread = std::thread([this, shutdown_fd]
354+bool mclr::StreamSocketTransport::dispatch(md::FdEvents events)
355+{
356+ if (events & (md::FdEvent::remote_closed | md::FdEvent::error))
357 {
358- // Our IO threads must not receive any signals
359- sigset_t all_signals;
360- sigfillset(&all_signals);
361-
362- if (auto error = pthread_sigmask(SIG_BLOCK, &all_signals, NULL))
363- BOOST_THROW_EXCEPTION(
364- boost::enable_error_info(
365- std::runtime_error("Failed to block signals on IO thread")) << boost::errinfo_errno(error));
366-
367- mir::set_thread_name("Client IO loop");
368-
369- int epoll_fd = epoll_create1(0);
370-
371- epoll_event event;
372- // Make valgrind happy, harder
373- memset(&event, 0, sizeof(event));
374-
375- event.events = EPOLLIN | EPOLLRDHUP;
376- event.data.fd = socket_fd;
377- epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket_fd, &event);
378-
379- event.events = EPOLLIN | EPOLLRDHUP;
380- event.data.fd = shutdown_fd;
381- epoll_ctl(epoll_fd, EPOLL_CTL_ADD, shutdown_fd, &event);
382-
383- bool shutdown_requested{false};
384- while (!shutdown_requested)
385+ if (events & md::FdEvent::readable)
386 {
387- epoll_event event;
388- epoll_wait(epoll_fd, &event, 1, -1);
389- if (event.data.fd == socket_fd)
390- {
391- if (event.events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR))
392- {
393- if (event.events & EPOLLIN)
394- {
395- // If the remote end shut down cleanly it's possible there's some more
396- // data left to read, or that reads will now return 0 (EOF)
397- //
398- // If there's more data left to read, notify of this before disconnect.
399- int dummy;
400- if (recv(socket_fd, &dummy, sizeof(dummy), MSG_PEEK | MSG_NOSIGNAL) > 0)
401- {
402- try
403- {
404- notify_data_available();
405- }
406- catch(...)
407- {
408- //It's quite likely that notify_data_available() will lead to
409- //an exception being thrown; after all, the remote has closed
410- //the connection.
411- //
412- //This doesn't matter; we're already shutting down.
413- }
414- }
415- }
416- notify_disconnected();
417- shutdown_requested = true;
418- }
419- else if (event.events & EPOLLIN)
420- {
421- try
422- {
423- notify_data_available();
424- }
425- catch (socket_disconnected_error &err)
426- {
427- // We've already notified of disconnection.
428- shutdown_requested = true;
429- }
430- // These need not be fatal.
431- catch (fd_reception_error &err)
432- {
433- }
434- catch (socket_error &err)
435- {
436- }
437- catch (...)
438- {
439- // We've no idea what the problem is, so clean up as best we can.
440- notify_disconnected();
441- shutdown_requested = true;
442- }
443- }
444- }
445- if (event.data.fd == shutdown_fd)
446- {
447- shutdown_requested = true;
448+ // If the remote end shut down cleanly it's possible there's some more
449+ // data left to read, or that reads will now return 0 (EOF)
450+ //
451+ // If there's more data left to read, notify of this before disconnect.
452+ int dummy;
453+ if (recv(socket_fd, &dummy, sizeof(dummy), MSG_PEEK | MSG_NOSIGNAL) > 0)
454+ {
455+ notify_data_available();
456+ return true;
457 }
458 }
459- ::close(epoll_fd);
460- });
461+ notify_disconnected();
462+ return false;
463+ }
464+ else if (events & md::FdEvent::readable)
465+ {
466+ notify_data_available();
467+ }
468+ return true;
469+}
470+
471+md::FdEvents mclr::StreamSocketTransport::relevant_events() const
472+{
473+ return md::FdEvent::readable | md::FdEvent::remote_closed;
474 }
475
476 mir::Fd mclr::StreamSocketTransport::open_socket(std::string const& path)
477
478=== modified file 'src/client/rpc/stream_socket_transport.h'
479--- src/client/rpc/stream_socket_transport.h 2015-01-14 06:39:13 +0000
480+++ src/client/rpc/stream_socket_transport.h 2015-01-27 23:19:58 +0000
481@@ -38,22 +38,21 @@
482 public:
483 StreamSocketTransport(Fd const& fd);
484 StreamSocketTransport(std::string const& socket_path);
485- ~StreamSocketTransport() override;
486
487 void register_observer(std::shared_ptr<Observer> const& observer) override;
488 void receive_data(void* buffer, size_t bytes_requested) override;
489 void receive_data(void* buffer, size_t bytes_requested, std::vector<Fd>& fds) override;
490 void send_message(std::vector<uint8_t> const& buffer, std::vector<mir::Fd> const& fds) override;
491
492+ Fd watch_fd() const override;
493+ bool dispatch(mir::dispatch::FdEvents event) override;
494+ mir::dispatch::FdEvents relevant_events() const override;
495 private:
496- void init();
497 Fd open_socket(std::string const& path);
498 void notify_data_available();
499 void notify_disconnected();
500
501- std::thread io_service_thread;
502 Fd const socket_fd;
503- Fd shutdown_fd;
504
505 std::mutex observer_mutex;
506 std::vector<std::shared_ptr<Observer>> observers;
507
508=== modified file 'src/client/rpc/stream_transport.h'
509--- src/client/rpc/stream_transport.h 2015-01-14 06:39:13 +0000
510+++ src/client/rpc/stream_transport.h 2015-01-27 23:19:58 +0000
511@@ -25,6 +25,7 @@
512 #include <stdint.h>
513
514 #include "mir/fd.h"
515+#include "mir/dispatch/dispatchable.h"
516
517 namespace mir
518 {
519@@ -64,7 +65,7 @@
520 * from different threads. Multiple threads calling the same
521 * function need synchronisation.
522 */
523-class StreamTransport
524+class StreamTransport : public dispatch::Dispatchable
525 {
526 public:
527 /**
528@@ -80,8 +81,8 @@
529
530 /**
531 * \brief Observer of IO status
532- * \note The Transport may call Observer members from arbitrary threads.
533- * The Observer implementation is responsible for any synchronisation.
534+ * \note The Transport will only call Observers in response to dispatch(),
535+ * and on the thread calling dispatch().
536 */
537 class Observer
538 {
539@@ -106,8 +107,6 @@
540 /**
541 * \brief Register an IO observer
542 * \param [in] observer
543- * \note There is no guarantee which thread will call into the observer.
544- * Synchronisation is the responsibility of the caller.
545 */
546 virtual void register_observer(std::shared_ptr<Observer> const& observer) = 0;
547
548
549=== modified file 'src/common/CMakeLists.txt'
550--- src/common/CMakeLists.txt 2015-01-22 23:12:46 +0000
551+++ src/common/CMakeLists.txt 2015-01-27 23:19:58 +0000
552@@ -17,6 +17,8 @@
553 add_subdirectory(sharedlibrary)
554 add_subdirectory(thread)
555 add_subdirectory(time)
556+add_subdirectory(dispatch)
557+
558 list(APPEND MIR_COMMON_SOURCES
559 $<TARGET_OBJECTS:mirtime>
560 ${CMAKE_CURRENT_SOURCE_DIR}/log.cpp
561
562=== added directory 'src/common/dispatch'
563=== added file 'src/common/dispatch/CMakeLists.txt'
564--- src/common/dispatch/CMakeLists.txt 1970-01-01 00:00:00 +0000
565+++ src/common/dispatch/CMakeLists.txt 2015-01-27 23:19:58 +0000
566@@ -0,0 +1,22 @@
567+# Copyright © 2015 Canonical Ltd.
568+#
569+# This program is free software: you can redistribute it and/or modify
570+# it under the terms of the GNU Lesser General Public License version 3 as
571+# published by the Free Software Foundation.
572+#
573+# This program is distributed in the hope that it will be useful,
574+# but WITHOUT ANY WARRANTY; without even the implied warranty of
575+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
576+# GNU Lesser General Public License for more details.
577+#
578+# You should have received a copy of the GNU Lesser General Public License
579+# along with this program. If not, see <http://www.gnu.org/licenses/>.
580+#
581+# Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
582+
583+list(
584+ APPEND MIR_COMMON_SOURCES
585+ ${CMAKE_CURRENT_SOURCE_DIR}/simple_dispatch_thread.cpp
586+)
587+
588+set(MIR_COMMON_SOURCES ${MIR_COMMON_SOURCES} PARENT_SCOPE)
589
590=== added file 'src/common/dispatch/simple_dispatch_thread.cpp'
591--- src/common/dispatch/simple_dispatch_thread.cpp 1970-01-01 00:00:00 +0000
592+++ src/common/dispatch/simple_dispatch_thread.cpp 2015-01-27 23:19:58 +0000
593@@ -0,0 +1,158 @@
594+/*
595+ * Copyright © 2015 Canonical Ltd.
596+ *
597+ * This program is free software: you can redistribute it and/or modify it
598+ * under the terms of the GNU Lesser General Public License version 3,
599+ * as published by the Free Software Foundation.
600+ *
601+ * This program is distributed in the hope that it will be useful,
602+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
603+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
604+ * GNU Lesser General Public License for more details.
605+ *
606+ * You should have received a copy of the GNU Lesser General Public License
607+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
608+ *
609+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
610+ */
611+
612+#include "mir/dispatch/simple_dispatch_thread.h"
613+#include "mir/dispatch/dispatchable.h"
614+
615+#include <sys/epoll.h>
616+#include <unistd.h>
617+#include <system_error>
618+#include <signal.h>
619+#include <boost/exception/all.hpp>
620+
621+namespace md = mir::dispatch;
622+
623+namespace
624+{
625+md::FdEvents epoll_to_fd_event(epoll_event const& event)
626+{
627+ md::FdEvents val{0};
628+ if (event.events & EPOLLIN)
629+ {
630+ val |= md::FdEvent::readable;
631+ }
632+ if (event.events & EPOLLOUT)
633+ {
634+ val = md::FdEvent::writable;
635+ }
636+ if (event.events & (EPOLLHUP | EPOLLRDHUP))
637+ {
638+ val |= md::FdEvent::remote_closed;
639+ }
640+ if (event.events & EPOLLERR)
641+ {
642+ val = md::FdEvent::error;
643+ }
644+ return val;
645+}
646+
647+int fd_event_to_epoll(md::FdEvents const& event)
648+{
649+ int epoll_value{0};
650+ if (event & md::FdEvent::readable)
651+ {
652+ epoll_value |= EPOLLIN;
653+ }
654+ if (event & md::FdEvent::writable)
655+ {
656+ epoll_value |= EPOLLOUT;
657+ }
658+ if (event & md::FdEvent::remote_closed)
659+ {
660+ epoll_value |= EPOLLRDHUP | EPOLLHUP;
661+ }
662+ if (event & md::FdEvent::error)
663+ {
664+ epoll_value |= EPOLLERR;
665+ }
666+ return epoll_value;
667+}
668+
669+void wait_for_events_forever(std::shared_ptr<md::Dispatchable> const& dispatchee, mir::Fd shutdown_fd)
670+{
671+ auto epoll_fd = mir::Fd{epoll_create1(0)};
672+ if (epoll_fd == mir::Fd::invalid)
673+ {
674+ BOOST_THROW_EXCEPTION((std::system_error{errno,
675+ std::system_category(),
676+ "Failed to create epoll IO monitor"}));
677+ }
678+ epoll_event event;
679+ memset(&event, 0, sizeof(event));
680+
681+ enum fd_names : uint32_t {
682+ shutdown,
683+ dispatchee_fd
684+ };
685+
686+ // We only care when the shutdown pipe has been closed
687+ event.data.u32 = fd_names::shutdown;
688+ event.events = EPOLLRDHUP;
689+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, shutdown_fd, &event);
690+
691+ // Ask the dispatchee what it events it's interested in...
692+ event.data.u32 = fd_names::dispatchee_fd;
693+ event.events = fd_event_to_epoll(dispatchee->relevant_events());
694+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, dispatchee->watch_fd(), &event);
695+
696+ for (;;)
697+ {
698+ epoll_wait(epoll_fd, &event, 1, -1);
699+ if (event.data.u32 == fd_names::dispatchee_fd)
700+ {
701+ if (!dispatchee->dispatch(epoll_to_fd_event(event)))
702+ {
703+ // No need to keep looping, the Dispatchable's not going to produce any more events.
704+ return;
705+ }
706+ }
707+ else if (event.data.u32 == fd_names::shutdown)
708+ {
709+ // The only thing we do with the shutdown fd is to close it.
710+ return;
711+ }
712+ }
713+}
714+
715+}
716+
717+md::SimpleDispatchThread::SimpleDispatchThread(std::shared_ptr<md::Dispatchable> const& dispatchee)
718+{
719+ int pipefds[2];
720+ if (pipe(pipefds) < 0)
721+ {
722+ BOOST_THROW_EXCEPTION((std::system_error{errno,
723+ std::system_category(),
724+ "Failed to create shutdown pipe for IO thread"}));
725+ }
726+ shutdown_fd = mir::Fd{pipefds[1]};
727+ mir::Fd const terminate_fd = mir::Fd{pipefds[0]};
728+ eventloop = std::thread{[dispatchee, terminate_fd]()
729+ {
730+ // Our IO threads must not receive any signals
731+ sigset_t all_signals;
732+ sigfillset(&all_signals);
733+
734+ if (auto error = pthread_sigmask(SIG_BLOCK, &all_signals, NULL))
735+ BOOST_THROW_EXCEPTION((
736+ std::system_error{error,
737+ std::system_category(),
738+ "Failed to block signals on IO thread"}));
739+
740+ wait_for_events_forever(dispatchee, terminate_fd);
741+ }};
742+}
743+
744+md::SimpleDispatchThread::~SimpleDispatchThread() noexcept
745+{
746+ shutdown_fd = mir::Fd{};
747+ if (eventloop.joinable())
748+ {
749+ eventloop.join();
750+ }
751+}
752
753=== modified file 'src/common/symbols.map'
754--- src/common/symbols.map 2015-01-26 19:12:43 +0000
755+++ src/common/symbols.map 2015-01-27 23:19:58 +0000
756@@ -216,6 +216,10 @@
757 MIR_COMMON_3.2 {
758 global:
759 extern "C++" {
760- mir::events::*
761+ mir::dispatch::SimpleDispatchThread::SimpleDispatchThread*;
762+ mir::dispatch::SimpleDispatchThread::?SimpleDispatchThread*;
763+ typeinfo?for?mir::dispatch::SimpleDispatchThread;
764+ vtable?for?mir::dispatch::SimpleDispatchThread;
765+ mir::events::*
766 };
767-} MIR_COMMON_3;
768+} MIR_COMMON_3.1;
769
770=== modified file 'tests/CMakeLists.txt'
771--- tests/CMakeLists.txt 2015-01-22 13:01:46 +0000
772+++ tests/CMakeLists.txt 2015-01-27 23:19:58 +0000
773@@ -44,6 +44,7 @@
774 ${PROJECT_SOURCE_DIR}/include/platform
775 ${PROJECT_SOURCE_DIR}/include/server
776 ${PROJECT_SOURCE_DIR}/include/client
777+ ${PROJECT_SOURCE_DIR}/include/common
778 ${PROJECT_SOURCE_DIR}/src/platforms/mesa/include
779 )
780
781
782=== added file 'tests/include/mir_test/fd_utils.h'
783--- tests/include/mir_test/fd_utils.h 1970-01-01 00:00:00 +0000
784+++ tests/include/mir_test/fd_utils.h 2015-01-27 23:19:58 +0000
785@@ -0,0 +1,69 @@
786+/*
787+ * Copyright © 2015 Canonical Ltd.
788+ *
789+ * This program is free software: you can redistribute it and/or modify
790+ * it under the terms of the GNU General Public License version 3 as
791+ * published by the Free Software Foundation.
792+ *
793+ * This program is distributed in the hope that it will be useful,
794+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
795+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
796+ * GNU General Public License for more details.
797+ *
798+ * You should have received a copy of the GNU General Public License
799+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
800+ *
801+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
802+ */
803+
804+#ifndef MIR_TEST_FD_UTILS_H_
805+#define MIR_TEST_FD_UTILS_H_
806+
807+#include "mir/fd.h"
808+#include <chrono>
809+
810+#include <poll.h>
811+
812+#include <gtest/gtest.h>
813+
814+namespace mir
815+{
816+namespace test
817+{
818+::testing::AssertionResult std_call_succeeded(int retval);
819+
820+::testing::AssertionResult fd_is_readable(mir::Fd const& fd);
821+
822+template<typename Period, typename Rep>
823+::testing::AssertionResult fd_becomes_readable(mir::Fd const& fd,
824+ std::chrono::duration<Period, Rep> timeout)
825+{
826+ int timeout_ms = std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count();
827+
828+ pollfd readable;
829+ readable.events = POLLIN;
830+ readable.fd = fd;
831+
832+ auto result = std_call_succeeded(poll(&readable, 1, timeout_ms));
833+ if (result)
834+ {
835+ if (readable.revents & POLLERR)
836+ {
837+ return ::testing::AssertionFailure() << "error condition on fd";
838+ }
839+ if (readable.revents & POLLNVAL)
840+ {
841+ return ::testing::AssertionFailure() << "fd is invalid";
842+ }
843+ if (!(readable.revents & POLLIN))
844+ {
845+ return ::testing::AssertionFailure() << "fd is not readable";
846+ }
847+ return ::testing::AssertionSuccess();
848+ }
849+ return result;
850+}
851+}
852+}
853+
854+#endif // MIR_TEST_FD_UTILS_H_
855
856=== modified file 'tests/include/mir_test/pipe.h'
857--- tests/include/mir_test/pipe.h 2013-07-03 09:54:10 +0000
858+++ tests/include/mir_test/pipe.h 2015-01-27 23:19:58 +0000
859@@ -19,6 +19,8 @@
860 #ifndef MIR_TEST_PIPE_H_
861 #define MIR_TEST_PIPE_H_
862
863+#include "mir/fd.h"
864+
865 namespace mir
866 {
867 namespace test
868@@ -28,16 +30,18 @@
869 {
870 public:
871 Pipe();
872- ~Pipe();
873+ Pipe(int flags);
874+ ~Pipe() = default;
875
876- int read_fd() const;
877- int write_fd() const;
878+ Fd read_fd() const;
879+ Fd write_fd() const;
880
881 private:
882 Pipe(Pipe const&) = delete;
883 Pipe& operator=(Pipe const&) = delete;
884
885- int pipefd[2];
886+ Fd reader;
887+ Fd writer;
888 };
889
890 }
891
892=== added file 'tests/include/mir_test/test_dispatchable.h'
893--- tests/include/mir_test/test_dispatchable.h 1970-01-01 00:00:00 +0000
894+++ tests/include/mir_test/test_dispatchable.h 2015-01-27 23:19:58 +0000
895@@ -0,0 +1,60 @@
896+/*
897+ * Copyright © 2015 Canonical Ltd.
898+ *
899+ * This program is free software: you can redistribute it and/or modify
900+ * it under the terms of the GNU General Public License version 3 as
901+ * published by the Free Software Foundation.
902+ *
903+ * This program is distributed in the hope that it will be useful,
904+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
905+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
906+ * GNU General Public License for more details.
907+ *
908+ * You should have received a copy of the GNU General Public License
909+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
910+ *
911+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
912+ */
913+
914+
915+#ifndef MIR_TEST_TEST_DISPATCHABLE_H_
916+#define MIR_TEST_TEST_DISPATCHABLE_H_
917+
918+#include "mir_test/pipe.h"
919+#include "mir/dispatch/dispatchable.h"
920+
921+#include <functional>
922+
923+namespace mir
924+{
925+namespace test
926+{
927+
928+class TestDispatchable : public dispatch::Dispatchable
929+{
930+public:
931+ TestDispatchable(std::function<bool(dispatch::FdEvents)> const& target,
932+ dispatch::FdEvents relevant_events);
933+ TestDispatchable(std::function<bool(dispatch::FdEvents)> const& target);
934+ TestDispatchable(std::function<void()> const& target);
935+
936+ mir::Fd watch_fd() const override;
937+ bool dispatch(dispatch::FdEvents events) override;
938+ dispatch::FdEvents relevant_events() const override;
939+
940+ void trigger();
941+ void untrigger();
942+ void hangup();
943+
944+private:
945+ mir::Fd read_fd;
946+ mir::Fd write_fd;
947+ std::function<bool(dispatch::FdEvents)> const target;
948+ dispatch::FdEvents const eventmask;
949+};
950+
951+}
952+}
953+
954+
955+#endif // MIR_TEST_TEST_DISPATCHABLE_H_
956
957=== modified file 'tests/include/mir_test/test_protobuf_client.h'
958--- tests/include/mir_test/test_protobuf_client.h 2015-01-14 06:39:13 +0000
959+++ tests/include/mir_test/test_protobuf_client.h 2015-01-27 23:19:58 +0000
960@@ -30,6 +30,10 @@
961
962 namespace mir
963 {
964+namespace dispatch
965+{
966+class SimpleDispatchThread;
967+}
968 namespace test
969 {
970 namespace doubles
971@@ -42,6 +46,7 @@
972
973 std::shared_ptr<doubles::MockRpcReport> rpc_report;
974 std::shared_ptr<google::protobuf::RpcChannel> channel;
975+ std::shared_ptr<dispatch::SimpleDispatchThread> eventloop;
976 mir::protobuf::DisplayServer::Stub display_server;
977 mir::protobuf::ConnectParameters connect_parameters;
978 mir::protobuf::SurfaceParameters surface_parameters;
979
980=== modified file 'tests/integration-tests/client/test_screencast.cpp'
981--- tests/integration-tests/client/test_screencast.cpp 2015-01-20 22:23:20 +0000
982+++ tests/integration-tests/client/test_screencast.cpp 2015-01-27 23:19:58 +0000
983@@ -18,6 +18,8 @@
984
985 #include "mir_protobuf.pb.h"
986 #include "src/client/default_connection_configuration.h"
987+#include "mir/dispatch/simple_dispatch_thread.h"
988+#include "mir/dispatch/dispatchable.h"
989
990 #include "mir/frontend/connector.h"
991 #include "mir_test/test_protobuf_server.h"
992@@ -31,6 +33,7 @@
993
994 namespace mcl = mir::client;
995 namespace mt = mir::test;
996+namespace md = mir::dispatch;
997
998 namespace
999 {
1000@@ -74,6 +77,9 @@
1001 mcl::DefaultConnectionConfiguration{test_socket}.the_rpc_channel();
1002 protobuf_server =
1003 std::make_shared<mir::protobuf::DisplayServer::Stub>(rpc_channel.get());
1004+ eventloop =
1005+ std::make_shared<md::SimpleDispatchThread>(
1006+ std::dynamic_pointer_cast<md::Dispatchable>(rpc_channel));
1007 }
1008
1009 char const* const test_socket = "./test_socket_screencast";
1010@@ -81,6 +87,7 @@
1011 std::shared_ptr<mt::TestProtobufServer> test_server;
1012 std::shared_ptr<google::protobuf::RpcChannel> rpc_channel;
1013 std::shared_ptr<mir::protobuf::DisplayServer> protobuf_server;
1014+ std::shared_ptr<mir::dispatch::SimpleDispatchThread> eventloop;
1015 };
1016
1017 }
1018
1019=== modified file 'tests/mir_test/CMakeLists.txt'
1020--- tests/mir_test/CMakeLists.txt 2015-01-14 06:39:13 +0000
1021+++ tests/mir_test/CMakeLists.txt 2015-01-27 23:19:58 +0000
1022@@ -11,6 +11,8 @@
1023 wait_object.cpp
1024 current_thread_name.cpp
1025 validity_matchers.cpp
1026+ fd_utils.cpp
1027+ test_dispatchable.cpp
1028 )
1029
1030 target_link_libraries(mir-test mirprotobuf)
1031
1032=== added file 'tests/mir_test/fd_utils.cpp'
1033--- tests/mir_test/fd_utils.cpp 1970-01-01 00:00:00 +0000
1034+++ tests/mir_test/fd_utils.cpp 2015-01-27 23:19:58 +0000
1035@@ -0,0 +1,40 @@
1036+/*
1037+ * Copyright © 2015 Canonical Ltd.
1038+ *
1039+ * This program is free software: you can redistribute it and/or modify
1040+ * it under the terms of the GNU General Public License version 3 as
1041+ * published by the Free Software Foundation.
1042+ *
1043+ * This program is distributed in the hope that it will be useful,
1044+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
1045+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1046+ * GNU General Public License for more details.
1047+ *
1048+ * You should have received a copy of the GNU General Public License
1049+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
1050+ *
1051+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
1052+ */
1053+
1054+#include "mir_test/fd_utils.h"
1055+
1056+::testing::AssertionResult mir::test::std_call_succeeded(int retval)
1057+{
1058+ if (retval >= 0)
1059+ {
1060+ return ::testing::AssertionSuccess();
1061+ }
1062+ else
1063+ {
1064+ return ::testing::AssertionFailure() << "errno: "
1065+ << errno
1066+ << " ["
1067+ << strerror(errno)
1068+ << "]";
1069+ }
1070+}
1071+
1072+::testing::AssertionResult mir::test::fd_is_readable(mir::Fd const& fd)
1073+{
1074+ return fd_becomes_readable(fd, std::chrono::seconds{0});
1075+}
1076
1077=== modified file 'tests/mir_test/pipe.cpp'
1078--- tests/mir_test/pipe.cpp 2015-01-14 06:39:13 +0000
1079+++ tests/mir_test/pipe.cpp 2015-01-27 23:19:58 +0000
1080@@ -24,32 +24,35 @@
1081 #include <system_error>
1082
1083 #include <unistd.h>
1084+#include <fcntl.h>
1085
1086 namespace mt = mir::test;
1087
1088 mt::Pipe::Pipe()
1089-{
1090- if (pipe(pipefd))
1091+ : Pipe(0)
1092+{
1093+}
1094+
1095+mt::Pipe::Pipe(int flags)
1096+{
1097+ int pipefd[2];
1098+ if (pipe2(pipefd, flags))
1099 {
1100 BOOST_THROW_EXCEPTION(
1101 boost::enable_error_info(std::system_error(errno,
1102 std::system_category(),
1103 "Failed to create pipe")));
1104 }
1105-}
1106-
1107-mt::Pipe::~Pipe()
1108-{
1109- close(pipefd[0]);
1110- close(pipefd[1]);
1111-}
1112-
1113-int mt::Pipe::read_fd() const
1114-{
1115- return pipefd[0];
1116-}
1117-
1118-int mt::Pipe::write_fd() const
1119-{
1120- return pipefd[1];
1121+ reader = mir::Fd{pipefd[0]};
1122+ writer = mir::Fd{pipefd[1]};
1123+}
1124+
1125+mir::Fd mt::Pipe::read_fd() const
1126+{
1127+ return reader;
1128+}
1129+
1130+mir::Fd mt::Pipe::write_fd() const
1131+{
1132+ return writer;
1133 }
1134
1135=== added file 'tests/mir_test/test_dispatchable.cpp'
1136--- tests/mir_test/test_dispatchable.cpp 1970-01-01 00:00:00 +0000
1137+++ tests/mir_test/test_dispatchable.cpp 2015-01-27 23:19:58 +0000
1138@@ -0,0 +1,98 @@
1139+/*
1140+ * Copyright © 2015 Canonical Ltd.
1141+ *
1142+ * This program is free software: you can redistribute it and/or modify
1143+ * it under the terms of the GNU General Public License version 3 as
1144+ * published by the Free Software Foundation.
1145+ *
1146+ * This program is distributed in the hope that it will be useful,
1147+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
1148+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1149+ * GNU General Public License for more details.
1150+ *
1151+ * You should have received a copy of the GNU General Public License
1152+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
1153+ *
1154+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
1155+ */
1156+
1157+#include <fcntl.h>
1158+
1159+#include "mir_test/pipe.h"
1160+#include "mir_test/test_dispatchable.h"
1161+
1162+#include <gtest/gtest.h>
1163+#include <gmock/gmock.h>
1164+
1165+namespace mt = mir::test;
1166+namespace md = mir::dispatch;
1167+
1168+mt::TestDispatchable::TestDispatchable(std::function<bool(md::FdEvents)> const& target,
1169+ md::FdEvents relevant_events)
1170+ : target{target},
1171+ eventmask{relevant_events}
1172+{
1173+ // Need to use O_NONBLOCK here to ensure concurrent dispatch doesn't block indefinitely
1174+ // in read()
1175+ mt::Pipe pipe{O_NONBLOCK};
1176+ read_fd = pipe.read_fd();
1177+ write_fd = pipe.write_fd();
1178+}
1179+
1180+mt::TestDispatchable::TestDispatchable(std::function<bool(md::FdEvents)> const& target)
1181+ : TestDispatchable(target, md::FdEvent::readable)
1182+{
1183+}
1184+
1185+mt::TestDispatchable::TestDispatchable(std::function<void()> const& target)
1186+ : TestDispatchable([target](md::FdEvents) { target(); return true; })
1187+{
1188+}
1189+
1190+mir::Fd mt::TestDispatchable::watch_fd() const
1191+{
1192+ return read_fd;
1193+}
1194+
1195+bool mt::TestDispatchable::dispatch(md::FdEvents events)
1196+{
1197+ auto continue_dispatch = target(events);
1198+ if (!(events & md::FdEvent::remote_closed))
1199+ {
1200+ // There's no way to untrigger remote hangup :)
1201+ untrigger();
1202+ }
1203+ return continue_dispatch;
1204+}
1205+
1206+md::FdEvents mt::TestDispatchable::relevant_events() const
1207+{
1208+ return eventmask;
1209+}
1210+
1211+void mt::TestDispatchable::trigger()
1212+{
1213+ using namespace testing;
1214+ char dummy{0};
1215+ EXPECT_THAT(::write(write_fd, &dummy, sizeof(dummy)), Eq(sizeof(dummy)));
1216+}
1217+
1218+void mt::TestDispatchable::untrigger()
1219+{
1220+ using namespace testing;
1221+ char dummy{0};
1222+ auto val = ::read(read_fd, &dummy, sizeof(dummy));
1223+ if (val < 0)
1224+ {
1225+ EXPECT_THAT(errno, Eq(EAGAIN));
1226+ }
1227+ else
1228+ {
1229+ EXPECT_THAT(val, Eq(sizeof(dummy)));
1230+ }
1231+}
1232+
1233+void mt::TestDispatchable::hangup()
1234+{
1235+ write_fd = mir::Fd{};
1236+}
1237
1238=== modified file 'tests/mir_test_doubles/test_protobuf_client.cpp'
1239--- tests/mir_test_doubles/test_protobuf_client.cpp 2015-01-21 00:49:28 +0000
1240+++ tests/mir_test_doubles/test_protobuf_client.cpp 2015-01-27 23:19:58 +0000
1241@@ -27,22 +27,27 @@
1242 #include "src/client/lifecycle_control.h"
1243 #include "src/client/rpc/make_rpc_channel.h"
1244 #include "src/client/rpc/mir_basic_rpc_channel.h"
1245+#include "mir/dispatch/dispatchable.h"
1246+#include "mir/dispatch/simple_dispatch_thread.h"
1247
1248 #include <thread>
1249
1250 namespace mtd = mir::test::doubles;
1251+namespace mclr = mir::client::rpc;
1252+namespace md = mir::dispatch;
1253
1254 mir::test::TestProtobufClient::TestProtobufClient(
1255 std::string socket_file,
1256 int timeout_ms) :
1257 rpc_report(std::make_shared<testing::NiceMock<doubles::MockRpcReport>>()),
1258- channel(mir::client::rpc::make_rpc_channel(
1259+ channel(mclr::make_rpc_channel(
1260 socket_file,
1261 std::make_shared<mir::client::ConnectionSurfaceMap>(),
1262 std::make_shared<mir::client::DisplayConfiguration>(),
1263 rpc_report,
1264 std::make_shared<mir::client::LifecycleControl>(),
1265 std::make_shared<mtd::NullClientEventSink>())),
1266+ eventloop{std::make_shared<md::SimpleDispatchThread>(std::dynamic_pointer_cast<md::Dispatchable>(channel))},
1267 display_server(channel.get(), ::google::protobuf::Service::STUB_DOESNT_OWN_CHANNEL),
1268 maxwait(timeout_ms),
1269 connect_done_called(false),
1270
1271=== modified file 'tests/unit-tests/CMakeLists.txt'
1272--- tests/unit-tests/CMakeLists.txt 2015-01-27 04:48:26 +0000
1273+++ tests/unit-tests/CMakeLists.txt 2015-01-27 23:19:58 +0000
1274@@ -63,6 +63,7 @@
1275 add_subdirectory(scene/)
1276 add_subdirectory(examples/)
1277 add_subdirectory(thread/)
1278+add_subdirectory(dispatch/)
1279
1280 link_directories(${LIBRARY_OUTPUT_PATH})
1281
1282
1283=== modified file 'tests/unit-tests/client/test_mir_connection.cpp'
1284--- tests/unit-tests/client/test_mir_connection.cpp 2015-01-26 23:09:56 +0000
1285+++ tests/unit-tests/client/test_mir_connection.cpp 2015-01-27 23:19:58 +0000
1286@@ -27,6 +27,7 @@
1287 #include "src/client/display_configuration.h"
1288 #include "src/client/mir_surface.h"
1289 #include "mir/client_buffer_factory.h"
1290+#include "mir/dispatch/dispatchable.h"
1291
1292 #include "src/server/frontend/resource_cache.h" /* needed by test_server.h */
1293 #include "mir_test/test_protobuf_server.h"
1294@@ -44,12 +45,19 @@
1295 namespace mp = mir::protobuf;
1296 namespace geom = mir::geometry;
1297 namespace mtd = mir::test::doubles;
1298+namespace md = mir::dispatch;
1299
1300 namespace
1301 {
1302
1303-struct MockRpcChannel : public mir::client::rpc::MirBasicRpcChannel
1304+struct MockRpcChannel : public mir::client::rpc::MirBasicRpcChannel,
1305+ public mir::dispatch::Dispatchable
1306 {
1307+ MockRpcChannel()
1308+ {
1309+ ON_CALL(*this, watch_fd()).WillByDefault(testing::Return(mir::Fd{}));
1310+ }
1311+
1312 void CallMethod(const google::protobuf::MethodDescriptor* method,
1313 google::protobuf::RpcController*,
1314 const google::protobuf::Message* parameters,
1315@@ -77,6 +85,10 @@
1316 MOCK_METHOD1(drm_auth_magic, void(const mp::DRMMagic*));
1317 MOCK_METHOD2(connect, void(mp::ConnectParameters const*,mp::Connection*));
1318 MOCK_METHOD1(configure_display_sent, void(mp::DisplayConfiguration const*));
1319+
1320+ MOCK_CONST_METHOD0(watch_fd, mir::Fd());
1321+ MOCK_METHOD1(dispatch, bool(md::FdEvents));
1322+ MOCK_CONST_METHOD0(relevant_events, md::FdEvents());
1323 };
1324
1325 struct MockClientPlatform : public mcl::ClientPlatform
1326
1327=== modified file 'tests/unit-tests/client/test_protobuf_rpc_channel.cpp'
1328--- tests/unit-tests/client/test_protobuf_rpc_channel.cpp 2015-01-14 06:39:13 +0000
1329+++ tests/unit-tests/client/test_protobuf_rpc_channel.cpp 2015-01-27 23:19:58 +0000
1330@@ -41,6 +41,7 @@
1331 namespace mcl = mir::client;
1332 namespace mclr = mir::client::rpc;
1333 namespace mtd = mir::test::doubles;
1334+namespace md = mir::dispatch;
1335
1336 namespace
1337 {
1338@@ -108,6 +109,9 @@
1339 MOCK_METHOD2(receive_data, void(void*, size_t));
1340 MOCK_METHOD3(receive_data, void(void*, size_t, std::vector<mir::Fd>&));
1341 MOCK_METHOD2(send_message, void(std::vector<uint8_t> const&, std::vector<mir::Fd> const&));
1342+ MOCK_CONST_METHOD0(watch_fd, mir::Fd());
1343+ MOCK_METHOD1(dispatch, bool(md::FdEvents));
1344+ MOCK_CONST_METHOD0(relevant_events, md::FdEvents());
1345
1346 // Transport interface
1347 void register_observer_default(std::shared_ptr<Observer> const& observer)
1348
1349=== modified file 'tests/unit-tests/client/test_stream_transport.cpp'
1350--- tests/unit-tests/client/test_stream_transport.cpp 2015-01-14 06:39:13 +0000
1351+++ tests/unit-tests/client/test_stream_transport.cpp 2015-01-27 23:19:58 +0000
1352@@ -22,6 +22,7 @@
1353
1354 #include "mir_test/auto_unblock_thread.h"
1355 #include "mir_test/signal.h"
1356+#include "mir_test/fd_utils.h"
1357 #include "mir/raii.h"
1358
1359 #include <sys/socket.h>
1360@@ -43,6 +44,8 @@
1361 #include <gmock/gmock.h>
1362
1363 namespace mclr = mir::client::rpc;
1364+namespace mt = mir::test;
1365+namespace md = mir::dispatch;
1366
1367 namespace
1368 {
1369@@ -71,12 +74,6 @@
1370 transport = std::make_shared<TransportMechanism>(transport_fd);
1371 }
1372
1373- virtual ~StreamTransportTest()
1374- {
1375- // We don't care about errors, so unconditionally close the test fd.
1376- close(test_fd);
1377- }
1378-
1379 mir::Fd transport_fd;
1380 mir::Fd test_fd;
1381 std::shared_ptr<TransportMechanism> transport;
1382@@ -85,51 +82,199 @@
1383 typedef ::testing::Types<mclr::StreamSocketTransport> Transports;
1384 TYPED_TEST_CASE(StreamTransportTest, Transports);
1385
1386+TYPED_TEST(StreamTransportTest, ReturnsValidWatchFd)
1387+{
1388+ // A valid fd is >= 0, and we know that stdin, stdout, and stderr aren't correct.
1389+ EXPECT_GE(this->transport->watch_fd(), 3);
1390+}
1391+
1392+TYPED_TEST(StreamTransportTest, WatchFdIsPollable)
1393+{
1394+ pollfd socket_readable;
1395+ socket_readable.events = POLLIN;
1396+ socket_readable.fd = this->transport->watch_fd();
1397+
1398+ ASSERT_TRUE(mt::std_call_succeeded(poll(&socket_readable, 1, 0)));
1399+
1400+ EXPECT_FALSE(socket_readable.revents & POLLERR);
1401+ EXPECT_FALSE(socket_readable.revents & POLLNVAL);
1402+}
1403+
1404+TYPED_TEST(StreamTransportTest, WatchFdNotifiesReadableWhenDataPending)
1405+{
1406+ uint64_t dummy{0xdeadbeef};
1407+ EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));
1408+
1409+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1410+}
1411+
1412+TYPED_TEST(StreamTransportTest, WatchFdRemainsUnreadableUntilEventPending)
1413+{
1414+ EXPECT_FALSE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1415+
1416+ uint64_t dummy{0xdeadbeef};
1417+ EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));
1418+
1419+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1420+}
1421+
1422+TYPED_TEST(StreamTransportTest, WatchFdIsNoLongerReadableAfterEventProcessing)
1423+{
1424+ using namespace testing;
1425+
1426+ uint64_t dummy{0xdeadbeef};
1427+
1428+ auto observer = std::make_shared<NiceMock<MockObserver>>();
1429+
1430+ ON_CALL(*observer, on_data_available())
1431+ .WillByDefault(Invoke([dummy, this]()
1432+ {
1433+ decltype(dummy) buffer;
1434+ this->transport->receive_data(&buffer, sizeof(dummy));
1435+ }));
1436+
1437+ this->transport->register_observer(observer);
1438+
1439+ EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));
1440+
1441+ ASSERT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1442+
1443+ this->transport->dispatch(md::FdEvent::readable);
1444+
1445+ EXPECT_FALSE(mt::fd_is_readable(this->test_fd));
1446+}
1447+
1448+TYPED_TEST(StreamTransportTest, NoEventsDispatchedUntilDispatchCalled)
1449+{
1450+ using namespace testing;
1451+
1452+ auto observer = std::make_shared<NiceMock<MockObserver>>();
1453+ bool data_available{false};
1454+ bool disconnected{false};
1455+
1456+ uint64_t dummy{0xdeadbeef};
1457+ EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));
1458+ ::close(this->test_fd);
1459+
1460+ ON_CALL(*observer, on_data_available()).WillByDefault(Invoke([this, dummy, &data_available]()
1461+ {
1462+ decltype(dummy) buffer;
1463+ this->transport->receive_data(&buffer, sizeof(buffer));
1464+ data_available = true;
1465+ }));
1466+ ON_CALL(*observer, on_disconnected()).WillByDefault(Invoke([&disconnected]()
1467+ { disconnected = true; }));
1468+
1469+ this->transport->register_observer(observer);
1470+
1471+ std::this_thread::sleep_for(std::chrono::seconds{1});
1472+ EXPECT_FALSE(data_available);
1473+ EXPECT_FALSE(disconnected);
1474+
1475+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1476+ while (mt::fd_is_readable(this->transport->watch_fd()) &&
1477+ this->transport->dispatch(md::FdEvent::readable | md::FdEvent::remote_closed))
1478+ ;
1479+
1480+ EXPECT_TRUE(data_available);
1481+ EXPECT_TRUE(disconnected);
1482+}
1483+
1484+TYPED_TEST(StreamTransportTest, DispatchesSingleEventAtATime)
1485+{
1486+ using namespace testing;
1487+
1488+ auto observer = std::make_shared<NiceMock<MockObserver>>();
1489+ bool data_available{false};
1490+ bool disconnected{false};
1491+
1492+ uint64_t dummy{0xdeadbeef};
1493+ EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));
1494+ ::close(this->test_fd);
1495+
1496+ ON_CALL(*observer, on_data_available()).WillByDefault(Invoke([this, dummy, &data_available]()
1497+ {
1498+ decltype(dummy) buffer;
1499+ this->transport->receive_data(&buffer, sizeof(buffer));
1500+ data_available = true;
1501+ }));
1502+ ON_CALL(*observer, on_disconnected()).WillByDefault(Invoke([&disconnected]()
1503+ { disconnected = true; }));
1504+
1505+ this->transport->register_observer(observer);
1506+
1507+ EXPECT_FALSE(data_available);
1508+ EXPECT_FALSE(disconnected);
1509+
1510+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1511+
1512+ this->transport->dispatch(md::FdEvent::readable | md::FdEvent::remote_closed);
1513+
1514+ EXPECT_TRUE(data_available xor disconnected);
1515+
1516+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1517+
1518+ this->transport->dispatch(md::FdEvent::readable | md::FdEvent::remote_closed);
1519+
1520+ EXPECT_TRUE(data_available);
1521+ EXPECT_TRUE(disconnected);
1522+}
1523+
1524 TYPED_TEST(StreamTransportTest, NoticesRemoteDisconnect)
1525 {
1526 using namespace testing;
1527 auto observer = std::make_shared<NiceMock<MockObserver>>();
1528- auto done = std::make_shared<mir::test::Signal>();
1529+ bool disconnected{false};
1530
1531- ON_CALL(*observer, on_disconnected()).WillByDefault(Invoke([done]()
1532- { done->raise(); }));
1533+ ON_CALL(*observer, on_disconnected()).WillByDefault(Invoke([&disconnected]()
1534+ { disconnected = true; }));
1535
1536 this->transport->register_observer(observer);
1537
1538 close(this->test_fd);
1539
1540- EXPECT_TRUE(done->wait_for(std::chrono::seconds{1}));
1541+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1542+ while (mt::fd_is_readable(this->transport->watch_fd()) &&
1543+ this->transport->dispatch(md::FdEvent::remote_closed))
1544+ ;
1545+
1546+ EXPECT_TRUE(disconnected);
1547 }
1548
1549-TYPED_TEST(StreamTransportTest, NoticesRemoteDisconnectWhileReadingInIOLoop)
1550+TYPED_TEST(StreamTransportTest, NoticesRemoteDisconnectWhileReading)
1551 {
1552 using namespace testing;
1553 auto observer = std::make_shared<NiceMock<MockObserver>>();
1554- auto done = std::make_shared<mir::test::Signal>();
1555- bool data_notified{false};
1556- bool finished_read{false};
1557-
1558- ON_CALL(*observer, on_disconnected()).WillByDefault(Invoke([done]()
1559- { done->raise(); }));
1560- ON_CALL(*observer, on_data_available())
1561- .WillByDefault(Invoke([this, &data_notified, &finished_read]()
1562- {
1563- data_notified = true;
1564- char buffer[8];
1565- this->transport->receive_data(buffer, sizeof(buffer));
1566- finished_read = true;
1567- }));
1568-
1569+ bool disconnected{false};
1570+ bool receive_error_detected{false};
1571+
1572+ ON_CALL(*observer, on_disconnected()).WillByDefault(Invoke([&disconnected]()
1573+ { disconnected = true; }));
1574 this->transport->register_observer(observer);
1575
1576- uint32_t dummy{0xdeadbeef};
1577- EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));
1578-
1579- close(this->test_fd);
1580-
1581- EXPECT_TRUE(done->wait_for(std::chrono::seconds{1}));
1582- EXPECT_TRUE(data_notified);
1583- EXPECT_FALSE(finished_read);
1584+ mir::test::AutoJoinThread closer{[this]()
1585+ {
1586+ std::this_thread::sleep_for(std::chrono::seconds{1});
1587+ ::close(this->test_fd);
1588+ }};
1589+
1590+ try
1591+ {
1592+ char buffer[8];
1593+ this->transport->receive_data(buffer, sizeof(buffer));
1594+ }
1595+ catch (std::runtime_error)
1596+ {
1597+ receive_error_detected = true;
1598+ }
1599+
1600+ // There should now be a disconnect event pending...
1601+ EXPECT_TRUE(mt::fd_is_readable(this->transport->watch_fd()));
1602+
1603+ this->transport->dispatch(md::FdEvent::remote_closed);
1604+
1605+ EXPECT_TRUE(disconnected);
1606+ EXPECT_TRUE(receive_error_detected);
1607 }
1608
1609 TYPED_TEST(StreamTransportTest, NotifiesOnDataAvailable)
1610@@ -137,39 +282,21 @@
1611 using namespace testing;
1612
1613 auto observer = std::make_shared<NiceMock<MockObserver>>();
1614- auto done = std::make_shared<mir::test::Signal>();
1615-
1616- ON_CALL(*observer, on_data_available()).WillByDefault(Invoke([done]()
1617- { done->raise(); }));
1618-
1619- this->transport->register_observer(observer);
1620-
1621- uint64_t dummy{0xdeadbeef};
1622- EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));
1623-
1624- EXPECT_TRUE(done->wait_for(std::chrono::seconds{1}));
1625-}
1626-
1627-TYPED_TEST(StreamTransportTest, DoesntNotifyUntilDataAvailable)
1628-{
1629- using namespace testing;
1630-
1631- auto observer = std::make_shared<NiceMock<MockObserver>>();
1632- auto done = std::make_shared<mir::test::Signal>();
1633-
1634- ON_CALL(*observer, on_data_available()).WillByDefault(Invoke([done]()
1635- { done->raise(); }));
1636-
1637- this->transport->register_observer(observer);
1638-
1639- std::this_thread::sleep_for(std::chrono::seconds{1});
1640-
1641- EXPECT_FALSE(done->raised());
1642-
1643- uint64_t dummy{0xdeadbeef};
1644- EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));
1645-
1646- EXPECT_TRUE(done->wait_for(std::chrono::seconds{1}));
1647+ bool notified_data_available{false};
1648+
1649+ ON_CALL(*observer, on_data_available()).WillByDefault(Invoke([&notified_data_available]()
1650+ { notified_data_available = true; }));
1651+
1652+ this->transport->register_observer(observer);
1653+
1654+ uint64_t dummy{0xdeadbeef};
1655+ EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));
1656+
1657+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1658+
1659+ this->transport->dispatch(md::FdEvent::readable);
1660+
1661+ EXPECT_TRUE(notified_data_available);
1662 }
1663
1664 TYPED_TEST(StreamTransportTest, KeepsNotifyingOfAvailableDataUntilAllIsRead)
1665@@ -177,98 +304,101 @@
1666 using namespace testing;
1667
1668 auto observer = std::make_shared<NiceMock<MockObserver>>();
1669- auto done = std::make_shared<mir::test::Signal>();
1670
1671 std::array<uint8_t, sizeof(int) * 256> data;
1672 data.fill(0);
1673- std::atomic<size_t> bytes_left{data.size()};
1674+ size_t bytes_left{data.size()};
1675
1676 ON_CALL(*observer, on_data_available())
1677- .WillByDefault(Invoke([done, &bytes_left, this]()
1678+ .WillByDefault(Invoke([&bytes_left, this]()
1679 {
1680 int dummy;
1681 this->transport->receive_data(&dummy, sizeof(dummy));
1682- bytes_left.fetch_sub(sizeof(dummy));
1683- if (bytes_left.load() == 0)
1684- {
1685- done->raise();
1686- }
1687+ bytes_left -= sizeof(dummy);
1688 }));
1689
1690 this->transport->register_observer(observer);
1691
1692 EXPECT_EQ(data.size(), write(this->test_fd, data.data(), data.size()));
1693
1694- EXPECT_TRUE(done->wait_for(std::chrono::seconds{5}));
1695- EXPECT_EQ(0, bytes_left.load());
1696+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1697+ while (mt::fd_is_readable(this->transport->watch_fd()) &&
1698+ this->transport->dispatch(md::FdEvent::readable))
1699+ ;
1700+
1701+ EXPECT_EQ(0, bytes_left);
1702 }
1703
1704 TYPED_TEST(StreamTransportTest, StopsNotifyingOnceAllDataIsRead)
1705 {
1706 using namespace testing;
1707- int const buffer_size{256};
1708
1709 auto observer = std::make_shared<NiceMock<MockObserver>>();
1710- auto done = std::make_shared<mir::test::Signal>();
1711+
1712+ std::array<uint8_t, sizeof(int) * 256> data;
1713+ data.fill(0);
1714+ size_t bytes_left{data.size()};
1715
1716 ON_CALL(*observer, on_data_available())
1717- .WillByDefault(Invoke([this, done]()
1718+ .WillByDefault(Invoke([&bytes_left, this]()
1719 {
1720- if (done->raised())
1721- {
1722- FAIL() << "on_data_available called without new data available";
1723- }
1724- uint8_t dummy_buffer[buffer_size];
1725- this->transport->receive_data(dummy_buffer, sizeof(dummy_buffer));
1726- done->raise();
1727+ int dummy;
1728+ this->transport->receive_data(&dummy, sizeof(dummy));
1729+ bytes_left -= sizeof(dummy);
1730 }));
1731+
1732 this->transport->register_observer(observer);
1733
1734- EXPECT_FALSE(done->raised());
1735- uint8_t dummy_buffer[buffer_size];
1736- memset(dummy_buffer, 0xab, sizeof(dummy_buffer));
1737- EXPECT_EQ(sizeof(dummy_buffer), write(this->test_fd, dummy_buffer, sizeof(dummy_buffer)));
1738-
1739- EXPECT_TRUE(done->wait_for(std::chrono::seconds{1}));
1740-
1741- std::this_thread::sleep_for(std::chrono::seconds{1});
1742+ EXPECT_EQ(data.size(), write(this->test_fd, data.data(), data.size()));
1743+
1744+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1745+ while (bytes_left > 0)
1746+ {
1747+ this->transport->dispatch(md::FdEvent::readable);
1748+ }
1749+
1750+ EXPECT_FALSE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1751 }
1752
1753 TYPED_TEST(StreamTransportTest, DoesntSendDataAvailableNotificationOnDisconnect)
1754 {
1755 using namespace testing;
1756- int const buffer_size{256};
1757
1758 auto observer = std::make_shared<NiceMock<MockObserver>>();
1759- auto read_done = std::make_shared<mir::test::Signal>();
1760- auto disconnect_done = std::make_shared<mir::test::Signal>();
1761- std::atomic<int> notify_count{0};
1762-
1763+ int notify_count{0};
1764+ bool disconnected{false};
1765+
1766+ uint64_t dummy{0xdeedfaac};
1767+ EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));
1768+
1769+ ON_CALL(*observer, on_disconnected()).WillByDefault(Invoke([&disconnected]()
1770+ { disconnected = true; }));
1771 ON_CALL(*observer, on_data_available())
1772- .WillByDefault(Invoke([this, read_done, &notify_count]()
1773+ .WillByDefault(Invoke([dummy, &notify_count, this]()
1774 {
1775 notify_count++;
1776- uint8_t dummy_buffer[buffer_size];
1777- this->transport->receive_data(dummy_buffer, sizeof(dummy_buffer));
1778- read_done->raise();
1779+
1780+ decltype(dummy) buffer;
1781+ this->transport->receive_data(&buffer, sizeof(buffer));
1782 }));
1783- ON_CALL(*observer, on_disconnected()).WillByDefault(Invoke([this, disconnect_done]()
1784- { disconnect_done->raise(); }));
1785
1786 this->transport->register_observer(observer);
1787
1788- EXPECT_FALSE(read_done->raised());
1789- uint8_t dummy_buffer[buffer_size];
1790- memset(dummy_buffer, 0xab, sizeof(dummy_buffer));
1791- EXPECT_EQ(sizeof(dummy_buffer), write(this->test_fd, dummy_buffer, sizeof(dummy_buffer)));
1792-
1793- EXPECT_TRUE(read_done->wait_for(std::chrono::seconds{1}));
1794- EXPECT_EQ(1, notify_count);
1795+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1796+ while (mt::fd_is_readable(this->transport->watch_fd()))
1797+ {
1798+ this->transport->dispatch(md::FdEvent::readable);
1799+ }
1800
1801 ::close(this->test_fd);
1802- EXPECT_TRUE(disconnect_done->wait_for(std::chrono::seconds{1}));
1803+
1804+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1805+ while (mt::fd_is_readable(this->transport->watch_fd()) &&
1806+ this->transport->dispatch(md::FdEvent::remote_closed | md::FdEvent::readable))
1807+ ;
1808
1809 EXPECT_EQ(1, notify_count);
1810+ EXPECT_TRUE(disconnected);
1811 }
1812
1813 TYPED_TEST(StreamTransportTest, ReadsCorrectData)
1814@@ -276,23 +406,25 @@
1815 using namespace testing;
1816
1817 auto observer = std::make_shared<NiceMock<MockObserver>>();
1818- auto done = std::make_shared<mir::test::Signal>();
1819
1820 std::string expected{"I am the very model of a modern major general"};
1821 std::vector<char> received(expected.size());
1822
1823 ON_CALL(*observer, on_data_available())
1824- .WillByDefault(Invoke([done, &received, this]()
1825+ .WillByDefault(Invoke([&received, this]()
1826 {
1827 this->transport->receive_data(received.data(), received.size());
1828- done->raise();
1829 }));
1830
1831 this->transport->register_observer(observer);
1832
1833 EXPECT_EQ(expected.size(), write(this->test_fd, expected.data(), expected.size()));
1834
1835- ASSERT_TRUE(done->wait_for(std::chrono::seconds{1}));
1836+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1837+ while (mt::fd_is_readable(this->transport->watch_fd()) &&
1838+ this->transport->dispatch(md::FdEvent::readable))
1839+ ;
1840+
1841 EXPECT_EQ(0, memcmp(expected.data(), received.data(), expected.size()));
1842 }
1843
1844
1845=== added directory 'tests/unit-tests/dispatch'
1846=== added file 'tests/unit-tests/dispatch/CMakeLists.txt'
1847--- tests/unit-tests/dispatch/CMakeLists.txt 1970-01-01 00:00:00 +0000
1848+++ tests/unit-tests/dispatch/CMakeLists.txt 2015-01-27 23:19:58 +0000
1849@@ -0,0 +1,5 @@
1850+list(APPEND UNIT_TEST_SOURCES
1851+ ${CMAKE_CURRENT_SOURCE_DIR}/test_simple_dispatch_thread.cpp
1852+)
1853+
1854+set(UNIT_TEST_SOURCES ${UNIT_TEST_SOURCES} PARENT_SCOPE)
1855
1856=== added file 'tests/unit-tests/dispatch/test_simple_dispatch_thread.cpp'
1857--- tests/unit-tests/dispatch/test_simple_dispatch_thread.cpp 1970-01-01 00:00:00 +0000
1858+++ tests/unit-tests/dispatch/test_simple_dispatch_thread.cpp 2015-01-27 23:19:58 +0000
1859@@ -0,0 +1,185 @@
1860+/*
1861+ * Copyright © 2015 Canonical Ltd.
1862+ *
1863+ * This program is free software: you can redistribute it and/or modify
1864+ * it under the terms of the GNU General Public License version 3 as
1865+ * published by the Free Software Foundation.
1866+ *
1867+ * This program is distributed in the hope that it will be useful,
1868+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
1869+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1870+ * GNU General Public License for more details.
1871+ *
1872+ * You should have received a copy of the GNU General Public License
1873+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
1874+ *
1875+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
1876+ */
1877+
1878+#include "mir/dispatch/simple_dispatch_thread.h"
1879+#include "mir/dispatch/dispatchable.h"
1880+#include "mir/fd.h"
1881+#include "mir_test/pipe.h"
1882+#include "mir_test/signal.h"
1883+#include "mir_test/test_dispatchable.h"
1884+
1885+#include <fcntl.h>
1886+
1887+#include <atomic>
1888+
1889+#include <gtest/gtest.h>
1890+#include <gmock/gmock.h>
1891+
1892+namespace md = mir::dispatch;
1893+namespace mt = mir::test;
1894+
1895+namespace
1896+{
1897+class SimpleDispatchThreadTest : public ::testing::Test
1898+{
1899+public:
1900+ SimpleDispatchThreadTest()
1901+ {
1902+ mt::Pipe pipe{O_NONBLOCK};
1903+ watch_fd = pipe.read_fd();
1904+ test_fd = pipe.write_fd();
1905+ }
1906+
1907+ mir::Fd watch_fd;
1908+ mir::Fd test_fd;
1909+};
1910+
1911+class MockDispatchable : public md::Dispatchable
1912+{
1913+public:
1914+ MOCK_CONST_METHOD0(watch_fd, mir::Fd());
1915+ MOCK_METHOD1(dispatch, bool(md::FdEvents));
1916+ MOCK_CONST_METHOD0(relevant_events, md::FdEvents());
1917+};
1918+
1919+}
1920+
1921+TEST_F(SimpleDispatchThreadTest, calls_dispatch_when_fd_is_readable)
1922+{
1923+ using namespace testing;
1924+
1925+ auto dispatched = std::make_shared<mt::Signal>();
1926+ auto dispatchable = std::make_shared<mt::TestDispatchable>([dispatched]() { dispatched->raise(); });
1927+
1928+ md::SimpleDispatchThread dispatcher{dispatchable};
1929+
1930+ dispatchable->trigger();
1931+
1932+ EXPECT_TRUE(dispatched->wait_for(std::chrono::seconds{1}));
1933+}
1934+
1935+TEST_F(SimpleDispatchThreadTest, stops_calling_dispatch_once_fd_is_not_readable)
1936+{
1937+ using namespace testing;
1938+
1939+ std::atomic<int> dispatch_count{0};
1940+ auto dispatchable = std::make_shared<mt::TestDispatchable>([&dispatch_count]() { ++dispatch_count; });
1941+
1942+ md::SimpleDispatchThread dispatcher{dispatchable};
1943+
1944+ dispatchable->trigger();
1945+
1946+ std::this_thread::sleep_for(std::chrono::seconds{1});
1947+
1948+ EXPECT_THAT(dispatch_count, Eq(1));
1949+}
1950+
1951+TEST_F(SimpleDispatchThreadTest, passes_dispatch_events_through)
1952+{
1953+ using namespace testing;
1954+
1955+ auto dispatched_with_only_readable = std::make_shared<mt::Signal>();
1956+ auto dispatched_with_hangup = std::make_shared<mt::Signal>();
1957+ auto delegate = [dispatched_with_only_readable, dispatched_with_hangup](md::FdEvents events)
1958+ {
1959+ if (events == md::FdEvent::readable)
1960+ {
1961+ dispatched_with_only_readable->raise();
1962+ }
1963+ if (events & md::FdEvent::remote_closed)
1964+ {
1965+ dispatched_with_hangup->raise();
1966+ return false;
1967+ }
1968+ return true;
1969+ };
1970+ auto dispatchable = std::make_shared<mt::TestDispatchable>(delegate, md::FdEvent::readable | md::FdEvent::remote_closed);
1971+
1972+ md::SimpleDispatchThread dispatcher{dispatchable};
1973+
1974+ dispatchable->trigger();
1975+ EXPECT_TRUE(dispatched_with_only_readable->wait_for(std::chrono::seconds{1}));
1976+
1977+ dispatchable->hangup();
1978+ EXPECT_TRUE(dispatched_with_hangup->wait_for(std::chrono::seconds{1}));
1979+}
1980+
1981+TEST_F(SimpleDispatchThreadTest, doesnt_call_dispatch_after_first_false_return)
1982+{
1983+ using namespace testing;
1984+
1985+ int constexpr expected_count{10};
1986+ auto dispatched_more_than_enough = std::make_shared<mt::Signal>();
1987+
1988+ auto delegate = [dispatched_more_than_enough](md::FdEvents)
1989+ {
1990+ static std::atomic<int> dispatch_count{0};
1991+
1992+ if (++dispatch_count == expected_count)
1993+ {
1994+ return false;
1995+ }
1996+ if (dispatch_count > expected_count)
1997+ {
1998+ dispatched_more_than_enough->raise();
1999+ }
2000+ return true;
2001+ };
2002+ auto dispatchable = std::make_shared<mt::TestDispatchable>(delegate);
2003+
2004+ md::SimpleDispatchThread dispatcher{dispatchable};
2005+
2006+ for (int i = 0; i < expected_count + 1; ++i)
2007+ {
2008+ dispatchable->trigger();
2009+ }
2010+
2011+ EXPECT_FALSE(dispatched_more_than_enough->wait_for(std::chrono::seconds{1}));
2012+}
2013+
2014+TEST_F(SimpleDispatchThreadTest, only_calls_dispatch_with_remote_closed_when_relevant)
2015+{
2016+ using namespace testing;
2017+
2018+ auto dispatchable = std::make_shared<NiceMock<MockDispatchable>>();
2019+ ON_CALL(*dispatchable, watch_fd()).WillByDefault(Return(test_fd));
2020+ ON_CALL(*dispatchable, relevant_events()).WillByDefault(Return(md::FdEvent::writable));
2021+ auto dispatched_writable = std::make_shared<mt::Signal>();
2022+ auto dispatched_closed = std::make_shared<mt::Signal>();
2023+
2024+ ON_CALL(*dispatchable, dispatch(_)).WillByDefault(Invoke([=](md::FdEvents events)
2025+ {
2026+ if (events & md::FdEvent::writable)
2027+ {
2028+ dispatched_writable->raise();
2029+ }
2030+ if (events & md::FdEvent::remote_closed)
2031+ {
2032+ dispatched_closed->raise();
2033+ }
2034+ return true;
2035+ }));
2036+
2037+ md::SimpleDispatchThread dispatcher{dispatchable};
2038+
2039+ EXPECT_TRUE(dispatched_writable->wait_for(std::chrono::seconds{1}));
2040+
2041+ // Make the fd remote-closed...
2042+ watch_fd = mir::Fd{};
2043+ EXPECT_FALSE(dispatched_closed->wait_for(std::chrono::seconds{1}));
2044+}

Subscribers

People subscribed via source and target branches