Mir

Merge lp:~raof/mir/manual-connection-dispatch into lp:mir

Proposed by Chris Halse Rogers
Status: Work in progress
Proposed branch: lp:~raof/mir/manual-connection-dispatch
Merge into: lp:mir
Diff against target: 1750 lines (+912/-252)
22 files modified
src/client/mir_connection.cpp (+18/-2)
src/client/mir_connection.h (+12/-0)
src/client/rpc/CMakeLists.txt (+1/-0)
src/client/rpc/dispatchable.h (+55/-0)
src/client/rpc/mir_protobuf_rpc_channel.cpp (+10/-0)
src/client/rpc/mir_protobuf_rpc_channel.h (+8/-1)
src/client/rpc/simple_rpc_thread.cpp (+103/-0)
src/client/rpc/simple_rpc_thread.h (+50/-0)
src/client/rpc/stream_socket_transport.cpp (+47/-116)
src/client/rpc/stream_socket_transport.h (+4/-4)
src/client/rpc/stream_transport.h (+4/-5)
tests/include/mir_test/fd_utils.h (+69/-0)
tests/include/mir_test/test_protobuf_client.h (+8/-0)
tests/integration-tests/client/test_screencast.cpp (+6/-0)
tests/mir_test/CMakeLists.txt (+1/-0)
tests/mir_test/fd_utils.cpp (+40/-0)
tests/mir_test_doubles/test_protobuf_client.cpp (+5/-1)
tests/unit-tests/client/CMakeLists.txt (+1/-0)
tests/unit-tests/client/test_mir_connection.cpp (+107/-4)
tests/unit-tests/client/test_protobuf_rpc_channel.cpp (+2/-0)
tests/unit-tests/client/test_simple_rpc_thread.cpp (+105/-0)
tests/unit-tests/client/test_stream_transport.cpp (+256/-119)
To merge this branch: bzr merge lp:~raof/mir/manual-connection-dispatch
Reviewer Review Type Date Requested Status
Mir development team Pending
Review via email: mp+243613@code.launchpad.net

Description of the change

First stage of client-eventloop driven client library dispatch.

To post a comment you must log in.

Unmerged revisions

1746. By Chris Halse Rogers

Test that MirConnection::watch_fd() returns a pollable fd in manual dispatch mode

1745. By Chris Halse Rogers

Move mt::fd_utils implementation out of header file

1744. By Chris Halse Rogers

Test that MirConnection::watch_fd() returns an invalid fd when in automatic dispatch mode

1743. By Chris Halse Rogers

Merge trunk

1742. By Chris Halse Rogers

Add an option to MirConnection selecting between automatic and manual dispatch.

1741. By Chris Halse Rogers

Add a simple eventloop thread implementation

1740. By Chris Halse Rogers

Fix more docs; Observers are now called from the dispatch() thread

1739. By Chris Halse Rogers

Test that we dispatch a signle event for a single ::dispatch()

1738. By Chris Halse Rogers

Simplify StreamSocketTransport::dispatch.

We don't actually need to do all that exception-catching, particularly since we don't do
anything but eat the exceptions anyway.

1737. By Chris Halse Rogers

Move dispatch tests to the top of the file.

These are more primitive than the observer & data read/write ones

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/client/mir_connection.cpp'
2--- src/client/mir_connection.cpp 2014-11-27 09:00:52 +0000
3+++ src/client/mir_connection.cpp 2014-12-04 01:50:56 +0000
4@@ -22,6 +22,8 @@
5 #include "client_platform.h"
6 #include "client_platform_factory.h"
7 #include "rpc/mir_basic_rpc_channel.h"
8+#include "rpc/dispatchable.h"
9+#include "rpc/simple_rpc_thread.h"
10 #include "connection_configuration.h"
11 #include "display_configuration.h"
12 #include "connection_surface_map.h"
13@@ -92,8 +94,13 @@
14 {
15 }
16
17+MirConnection::MirConnection(mir::client::ConnectionConfiguration& conf) :
18+ MirConnection(conf, DispatchType::automatic)
19+{
20+}
21+
22 MirConnection::MirConnection(
23- mir::client::ConnectionConfiguration& conf) :
24+ mir::client::ConnectionConfiguration& conf, DispatchType dispatch) :
25 deregisterer{this},
26 platform_library{conf.the_platform_library()},
27 channel(conf.the_rpc_channel()),
28@@ -105,7 +112,11 @@
29 display_configuration(conf.the_display_configuration()),
30 lifecycle_control(conf.the_lifecycle_control()),
31 surface_map(conf.the_surface_map()),
32- event_handler_register(conf.the_event_handler_register())
33+ event_handler_register(conf.the_event_handler_register()),
34+ eventloop{dispatch == DispatchType::automatic ?
35+ new mcl::rpc::SimpleRpcThread{std::dynamic_pointer_cast<mcl::rpc::Dispatchable>(channel)} :
36+ nullptr
37+ }
38 {
39 connect_result.set_error("connect not called");
40 {
41@@ -333,6 +344,11 @@
42 drm_auth_magic_wait_handle.result_received();
43 }
44
45+int MirConnection::watch_fd() const
46+{
47+ return eventloop ? -1 : std::dynamic_pointer_cast<mcl::rpc::Dispatchable>(channel)->watch_fd();
48+}
49+
50 MirWaitHandle* MirConnection::drm_auth_magic(unsigned int magic,
51 mir_drm_auth_magic_callback callback,
52 void* context)
53
54=== modified file 'src/client/mir_connection.h'
55--- src/client/mir_connection.h 2014-11-27 09:00:52 +0000
56+++ src/client/mir_connection.h 2014-12-04 01:50:56 +0000
57@@ -52,6 +52,7 @@
58 namespace rpc
59 {
60 class MirBasicRpcChannel;
61+class SimpleRpcThread;
62 }
63 }
64
65@@ -69,12 +70,19 @@
66 }
67 }
68
69+enum class DispatchType
70+{
71+ automatic,
72+ manual
73+};
74+
75 struct MirConnection : mir::client::ClientContext
76 {
77 public:
78 MirConnection(std::string const& error_message);
79
80 MirConnection(mir::client::ConnectionConfiguration& conf);
81+ MirConnection(mir::client::ConnectionConfiguration &conf, DispatchType dispatch);
82 ~MirConnection() noexcept;
83
84 MirConnection(MirConnection const &) = delete;
85@@ -100,6 +108,8 @@
86
87 MirWaitHandle* disconnect();
88
89+ int watch_fd() const;
90+
91 MirWaitHandle* drm_auth_magic(unsigned int magic,
92 mir_drm_auth_magic_callback callback,
93 void* context);
94@@ -183,6 +193,8 @@
95
96 std::shared_ptr<mir::client::EventHandlerRegister> const event_handler_register;
97
98+ std::unique_ptr<mir::client::rpc::SimpleRpcThread> const eventloop;
99+
100 std::vector<int> extra_platform_data;
101
102 struct SurfaceRelease;
103
104=== modified file 'src/client/rpc/CMakeLists.txt'
105--- src/client/rpc/CMakeLists.txt 2014-11-27 09:00:52 +0000
106+++ src/client/rpc/CMakeLists.txt 2014-12-04 01:50:56 +0000
107@@ -5,4 +5,5 @@
108 mir_protobuf_rpc_channel.cpp
109 make_socket_rpc_channel.cpp
110 stream_socket_transport.cpp
111+ simple_rpc_thread.cpp
112 )
113
114=== added file 'src/client/rpc/dispatchable.h'
115--- src/client/rpc/dispatchable.h 1970-01-01 00:00:00 +0000
116+++ src/client/rpc/dispatchable.h 2014-12-04 01:50:56 +0000
117@@ -0,0 +1,55 @@
118+/*
119+ * Copyright © 2014 Canonical Ltd.
120+ *
121+ * This program is free software: you can redistribute it and/or modify it
122+ * under the terms of the GNU Lesser General Public License version 3,
123+ * as published by the Free Software Foundation.
124+ *
125+ * This program is distributed in the hope that it will be useful,
126+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
127+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
128+ * GNU Lesser General Public License for more details.
129+ *
130+ * You should have received a copy of the GNU Lesser General Public License
131+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
132+ *
133+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
134+ */
135+
136+#ifndef MIR_CLIENT_RPC_DISPATCHABLE_H_
137+#define MIR_CLIENT_RPC_DISPATCHABLE_H_
138+
139+#include "mir/fd.h"
140+
141+namespace mir
142+{
143+namespace client
144+{
145+namespace rpc
146+{
147+class Dispatchable
148+{
149+public:
150+ Dispatchable() = default;
151+ virtual ~Dispatchable() = default;
152+
153+ Dispatchable& operator=(Dispatchable const&) = delete;
154+ Dispatchable(Dispatchable const&) = delete;
155+
156+ /**
157+ * \brief Get a poll()able file descriptor
158+ * \return A file descriptor usable with poll() or equivalent function calls that
159+ * becomes readable when there are dispatchable events
160+ */
161+ virtual Fd watch_fd() const = 0;
162+
163+ /**
164+ * \brief Dispatch one pending event
165+ */
166+ virtual void dispatch() = 0;
167+};
168+}
169+}
170+}
171+
172+#endif // MIR_CLIENT_RPC_DISPATCHABLE_H_
173
174=== modified file 'src/client/rpc/mir_protobuf_rpc_channel.cpp'
175--- src/client/rpc/mir_protobuf_rpc_channel.cpp 2014-11-27 09:00:52 +0000
176+++ src/client/rpc/mir_protobuf_rpc_channel.cpp 2014-12-04 01:50:56 +0000
177@@ -338,3 +338,13 @@
178 {
179 notify_disconnected();
180 }
181+
182+mir::Fd mir::client::rpc::MirProtobufRpcChannel::watch_fd() const
183+{
184+ return transport->watch_fd();
185+}
186+
187+void mir::client::rpc::MirProtobufRpcChannel::dispatch()
188+{
189+ transport->dispatch();
190+}
191
192=== modified file 'src/client/rpc/mir_protobuf_rpc_channel.h'
193--- src/client/rpc/mir_protobuf_rpc_channel.h 2014-11-27 09:00:52 +0000
194+++ src/client/rpc/mir_protobuf_rpc_channel.h 2014-12-04 01:50:56 +0000
195@@ -21,6 +21,7 @@
196
197 #include "mir_basic_rpc_channel.h"
198 #include "stream_transport.h"
199+#include "dispatchable.h"
200
201 #include <google/protobuf/service.h>
202 #include <google/protobuf/descriptor.h>
203@@ -52,7 +53,8 @@
204
205 class MirProtobufRpcChannel :
206 public MirBasicRpcChannel,
207- public StreamTransport::Observer
208+ public StreamTransport::Observer,
209+ public Dispatchable
210 {
211 public:
212 MirProtobufRpcChannel(std::unique_ptr<StreamTransport> transport,
213@@ -64,8 +66,13 @@
214
215 ~MirProtobufRpcChannel() = default;
216
217+ // StreamTransport::Observer
218 void on_data_available() override;
219 void on_disconnected() override;
220+
221+ // Dispatchable
222+ Fd watch_fd() const override;
223+ void dispatch() override;
224 private:
225 virtual void CallMethod(const google::protobuf::MethodDescriptor* method, google::protobuf::RpcController*,
226 const google::protobuf::Message* parameters, google::protobuf::Message* response,
227
228=== added file 'src/client/rpc/simple_rpc_thread.cpp'
229--- src/client/rpc/simple_rpc_thread.cpp 1970-01-01 00:00:00 +0000
230+++ src/client/rpc/simple_rpc_thread.cpp 2014-12-04 01:50:56 +0000
231@@ -0,0 +1,103 @@
232+/*
233+ * Copyright © 2014 Canonical Ltd.
234+ *
235+ * This program is free software: you can redistribute it and/or modify it
236+ * under the terms of the GNU Lesser General Public License version 3,
237+ * as published by the Free Software Foundation.
238+ *
239+ * This program is distributed in the hope that it will be useful,
240+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
241+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
242+ * GNU Lesser General Public License for more details.
243+ *
244+ * You should have received a copy of the GNU Lesser General Public License
245+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
246+ *
247+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
248+ */
249+
250+#include "simple_rpc_thread.h"
251+#include "dispatchable.h"
252+
253+#include <sys/epoll.h>
254+#include <unistd.h>
255+#include <system_error>
256+#include <signal.h>
257+#include <boost/exception/all.hpp>
258+
259+namespace mclr = mir::client::rpc;
260+
261+namespace
262+{
263+void wait_for_events_forever(std::shared_ptr<mclr::Dispatchable> const& dispatchee, mir::Fd shutdown_fd)
264+{
265+ auto epoll_fd = mir::Fd{epoll_create1(0)};
266+ if (epoll_fd == mir::Fd::invalid)
267+ {
268+ BOOST_THROW_EXCEPTION((std::system_error{errno,
269+ std::system_category(),
270+ "Failed to create epoll IO monitor"}));
271+ }
272+ epoll_event event;
273+ memset(&event, 0, sizeof(event));
274+
275+ // We only care when the shutdown pipe has been closed
276+ event.events = EPOLLRDHUP;
277+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, shutdown_fd, &event);
278+
279+ // We want readability or closure for the dispatchee.
280+ event.events = EPOLLIN | EPOLLRDHUP;
281+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, dispatchee->watch_fd(), &event);
282+
283+ for (;;)
284+ {
285+ epoll_wait(epoll_fd, &event, 1, -1);
286+ if (event.events & EPOLLIN)
287+ {
288+ dispatchee->dispatch();
289+ }
290+ else if (event.events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR))
291+ {
292+ // On hangup we go away.
293+ return;
294+ }
295+ }
296+}
297+
298+}
299+
300+mclr::SimpleRpcThread::SimpleRpcThread(std::shared_ptr<mclr::Dispatchable> const& dispatchee)
301+{
302+ int pipefds[2];
303+ if (pipe(pipefds) < 0)
304+ {
305+ BOOST_THROW_EXCEPTION((std::system_error{errno,
306+ std::system_category(),
307+ "Failed to create shutdown pipe for IO thread"}));
308+ }
309+ shutdown_fd = mir::Fd{pipefds[1]};
310+ mir::Fd const terminate_fd = mir::Fd{pipefds[0]};
311+ eventloop = std::thread{[dispatchee, terminate_fd]()
312+ {
313+ // Our IO threads must not receive any signals
314+ sigset_t all_signals;
315+ sigfillset(&all_signals);
316+
317+ if (auto error = pthread_sigmask(SIG_BLOCK, &all_signals, NULL))
318+ BOOST_THROW_EXCEPTION((
319+ std::system_error{error,
320+ std::system_category(),
321+ "Failed to block signals on IO thread"}));
322+
323+ wait_for_events_forever(dispatchee, terminate_fd);
324+ }};
325+}
326+
327+mclr::SimpleRpcThread::~SimpleRpcThread() noexcept
328+{
329+ ::close(shutdown_fd);
330+ if (eventloop.joinable())
331+ {
332+ eventloop.join();
333+ }
334+}
335
336=== added file 'src/client/rpc/simple_rpc_thread.h'
337--- src/client/rpc/simple_rpc_thread.h 1970-01-01 00:00:00 +0000
338+++ src/client/rpc/simple_rpc_thread.h 2014-12-04 01:50:56 +0000
339@@ -0,0 +1,50 @@
340+/*
341+ * Copyright © 2014 Canonical Ltd.
342+ *
343+ * This program is free software: you can redistribute it and/or modify it
344+ * under the terms of the GNU Lesser General Public License version 3,
345+ * as published by the Free Software Foundation.
346+ *
347+ * This program is distributed in the hope that it will be useful,
348+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
349+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
350+ * GNU Lesser General Public License for more details.
351+ *
352+ * You should have received a copy of the GNU Lesser General Public License
353+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
354+ *
355+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
356+ */
357+
358+#ifndef MIR_CLIENT_RPC_SIMPLE_RPC_THREAD_H_
359+#define MIR_CLIENT_RPC_SIMPLE_RPC_THREAD_H_
360+
361+#include <memory>
362+#include <thread>
363+#include "mir/fd.h"
364+
365+namespace mir
366+{
367+namespace client
368+{
369+namespace rpc
370+{
371+class Dispatchable;
372+
373+class SimpleRpcThread
374+{
375+public:
376+ SimpleRpcThread(std::shared_ptr<Dispatchable> const& dispatchee);
377+ ~SimpleRpcThread() noexcept;
378+
379+private:
380+ Fd shutdown_fd;
381+ std::thread eventloop;
382+};
383+
384+}
385+}
386+}
387+
388+
389+#endif // MIR_CLIENT_RPC_SIMPLE_RPC_THREAD_H_
390
391=== modified file 'src/client/rpc/stream_socket_transport.cpp'
392--- src/client/rpc/stream_socket_transport.cpp 2014-11-27 09:00:52 +0000
393+++ src/client/rpc/stream_socket_transport.cpp 2014-12-04 01:50:56 +0000
394@@ -23,7 +23,6 @@
395
396 #include <system_error>
397
398-#include <signal.h>
399 #include <errno.h>
400 #include <sys/epoll.h>
401 #include <sys/types.h>
402@@ -35,10 +34,23 @@
403
404 namespace mclr = mir::client::rpc;
405
406-mclr::StreamSocketTransport::StreamSocketTransport(mir::Fd const& fd)
407- : socket_fd{fd}
408+mclr::StreamSocketTransport::StreamSocketTransport(Fd const& fd)
409+ : socket_fd{fd},
410+ epoll_fd{epoll_create1(EPOLL_CLOEXEC)}
411 {
412- init();
413+ if (epoll_fd < 0)
414+ {
415+ BOOST_THROW_EXCEPTION((std::system_error{errno,
416+ std::system_category(),
417+ "Failed to create epoll monitor for IO"}));
418+ }
419+ epoll_event event;
420+ // Make valgrind happy, harder
421+ memset(&event, 0, sizeof(event));
422+
423+ event.events = EPOLLIN | EPOLLRDHUP;
424+ event.data.fd = socket_fd;
425+ epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket_fd, &event);
426 }
427
428 mclr::StreamSocketTransport::StreamSocketTransport(std::string const& socket_path)
429@@ -46,17 +58,6 @@
430 {
431 }
432
433-mclr::StreamSocketTransport::~StreamSocketTransport()
434-{
435- int dummy{0};
436- send(shutdown_fd, &dummy, sizeof(dummy), MSG_NOSIGNAL);
437- if (io_service_thread.joinable())
438- {
439- io_service_thread.join();
440- }
441- close(shutdown_fd);
442-}
443-
444 void mclr::StreamSocketTransport::register_observer(std::shared_ptr<Observer> const& observer)
445 {
446 std::lock_guard<decltype(observer_mutex)> lock(observer_mutex);
447@@ -176,110 +177,40 @@
448 mir::send_fds(socket_fd, fds);
449 }
450
451-void mclr::StreamSocketTransport::init()
452+mir::Fd mclr::StreamSocketTransport::watch_fd() const
453 {
454- // We use sockets rather than a pipe so that we can control
455- // EPIPE behaviour; we don't want SIGPIPE when the IO loop terminates.
456- int socket_fds[2];
457- socketpair(AF_UNIX, SOCK_STREAM, 0, socket_fds);
458- this->shutdown_fd = mir::Fd{socket_fds[1]};
459+ return epoll_fd;
460+}
461
462- auto shutdown_fd = mir::Fd{socket_fds[0]};
463- io_service_thread = std::thread([this, shutdown_fd]
464+void mclr::StreamSocketTransport::dispatch()
465+{
466+ epoll_event event;
467+ epoll_wait(epoll_fd, &event, 1, 0);
468+ if (event.data.fd == socket_fd)
469 {
470- // Our IO threads must not receive any signals
471- sigset_t all_signals;
472- sigfillset(&all_signals);
473-
474- if (auto error = pthread_sigmask(SIG_BLOCK, &all_signals, NULL))
475- BOOST_THROW_EXCEPTION(
476- boost::enable_error_info(
477- std::runtime_error("Failed to block signals on IO thread")) << boost::errinfo_errno(error));
478-
479- mir::set_thread_name("Client IO loop");
480-
481- int epoll_fd = epoll_create1(0);
482-
483- epoll_event event;
484- // Make valgrind happy, harder
485- memset(&event, 0, sizeof(event));
486-
487- event.events = EPOLLIN | EPOLLRDHUP;
488- event.data.fd = socket_fd;
489- epoll_ctl(epoll_fd, EPOLL_CTL_ADD, socket_fd, &event);
490-
491- event.events = EPOLLIN | EPOLLRDHUP;
492- event.data.fd = shutdown_fd;
493- epoll_ctl(epoll_fd, EPOLL_CTL_ADD, shutdown_fd, &event);
494-
495- bool shutdown_requested{false};
496- while (!shutdown_requested)
497- {
498- epoll_event event;
499- epoll_wait(epoll_fd, &event, 1, -1);
500- if (event.data.fd == socket_fd)
501- {
502- if (event.events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR))
503- {
504- if (event.events & EPOLLIN)
505- {
506- // If the remote end shut down cleanly it's possible there's some more
507- // data left to read, or that reads will now return 0 (EOF)
508- //
509- // If there's more data left to read, notify of this before disconnect.
510- int dummy;
511- if (recv(socket_fd, &dummy, sizeof(dummy), MSG_PEEK | MSG_NOSIGNAL) > 0)
512- {
513- try
514- {
515- notify_data_available();
516- }
517- catch(...)
518- {
519- //It's quite likely that notify_data_available() will lead to
520- //an exception being thrown; after all, the remote has closed
521- //the connection.
522- //
523- //This doesn't matter; we're already shutting down.
524- }
525- }
526- }
527- notify_disconnected();
528- shutdown_requested = true;
529- }
530- else if (event.events & EPOLLIN)
531- {
532- try
533- {
534- notify_data_available();
535- }
536- catch (socket_disconnected_error &err)
537- {
538- // We've already notified of disconnection.
539- shutdown_requested = true;
540- }
541- // These need not be fatal.
542- catch (fd_reception_error &err)
543- {
544- }
545- catch (socket_error &err)
546- {
547- }
548- catch (...)
549- {
550- // We've no idea what the problem is, so clean up as best we can.
551- notify_disconnected();
552- shutdown_requested = true;
553- }
554- }
555- }
556- if (event.data.fd == shutdown_fd)
557- {
558- shutdown_requested = true;
559- }
560- }
561- ::close(epoll_fd);
562- });
563+ if (event.events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR))
564+ {
565+ if (event.events & EPOLLIN)
566+ {
567+ // If the remote end shut down cleanly it's possible there's some more
568+ // data left to read, or that reads will now return 0 (EOF)
569+ //
570+ // If there's more data left to read, notify of this before disconnect.
571+ int dummy;
572+ if (recv(socket_fd, &dummy, sizeof(dummy), MSG_PEEK | MSG_NOSIGNAL) > 0)
573+ {
574+ notify_data_available();
575+ return;
576+ }
577+ }
578+ notify_disconnected();
579+ epoll_ctl(epoll_fd, EPOLL_CTL_DEL, socket_fd, nullptr);
580+ }
581+ else if (event.events & EPOLLIN)
582+ {
583+ notify_data_available();
584+ }
585+ }
586 }
587
588 mir::Fd mclr::StreamSocketTransport::open_socket(std::string const& path)
589
590=== modified file 'src/client/rpc/stream_socket_transport.h'
591--- src/client/rpc/stream_socket_transport.h 2014-11-27 09:00:52 +0000
592+++ src/client/rpc/stream_socket_transport.h 2014-12-04 01:50:56 +0000
593@@ -38,22 +38,22 @@
594 public:
595 StreamSocketTransport(Fd const& fd);
596 StreamSocketTransport(std::string const& socket_path);
597- ~StreamSocketTransport() override;
598
599 void register_observer(std::shared_ptr<Observer> const& observer) override;
600 void receive_data(void* buffer, size_t bytes_requested) override;
601 void receive_data(void* buffer, size_t bytes_requested, std::vector<Fd>& fds) override;
602 void send_message(std::vector<uint8_t> const& buffer, std::vector<mir::Fd> const& fds) override;
603
604+ Fd watch_fd() const override;
605+ void dispatch() override;
606+
607 private:
608- void init();
609 Fd open_socket(std::string const& path);
610 void notify_data_available();
611 void notify_disconnected();
612
613- std::thread io_service_thread;
614 Fd const socket_fd;
615- Fd shutdown_fd;
616+ Fd const epoll_fd;
617
618 std::mutex observer_mutex;
619 std::vector<std::shared_ptr<Observer>> observers;
620
621=== modified file 'src/client/rpc/stream_transport.h'
622--- src/client/rpc/stream_transport.h 2014-11-27 09:00:52 +0000
623+++ src/client/rpc/stream_transport.h 2014-12-04 01:50:56 +0000
624@@ -25,6 +25,7 @@
625 #include <stdint.h>
626
627 #include "mir/fd.h"
628+#include "dispatchable.h"
629
630 namespace mir
631 {
632@@ -64,7 +65,7 @@
633 * from different threads. Multiple threads calling the same
634 * function need synchronisation.
635 */
636-class StreamTransport
637+class StreamTransport : public Dispatchable
638 {
639 public:
640 /**
641@@ -80,8 +81,8 @@
642
643 /**
644 * \brief Observer of IO status
645- * \note The Transport may call Observer members from arbitrary threads.
646- * The Observer implementation is responsible for any synchronisation.
647+ * \note The Transport will only call Observers in response to dispatch(),
648+ * and on the thread calling dispatch().
649 */
650 class Observer
651 {
652@@ -106,8 +107,6 @@
653 /**
654 * \brief Register an IO observer
655 * \param [in] observer
656- * \note There is no guarantee which thread will call into the observer.
657- * Synchronisation is the responsibility of the caller.
658 */
659 virtual void register_observer(std::shared_ptr<Observer> const& observer) = 0;
660
661
662=== added file 'tests/include/mir_test/fd_utils.h'
663--- tests/include/mir_test/fd_utils.h 1970-01-01 00:00:00 +0000
664+++ tests/include/mir_test/fd_utils.h 2014-12-04 01:50:56 +0000
665@@ -0,0 +1,69 @@
666+/*
667+ * Copyright © 2014 Canonical Ltd.
668+ *
669+ * This program is free software: you can redistribute it and/or modify
670+ * it under the terms of the GNU General Public License version 3 as
671+ * published by the Free Software Foundation.
672+ *
673+ * This program is distributed in the hope that it will be useful,
674+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
675+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
676+ * GNU General Public License for more details.
677+ *
678+ * You should have received a copy of the GNU General Public License
679+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
680+ *
681+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
682+ */
683+
684+#ifndef MIR_TEST_FD_UTILS_H_
685+#define MIR_TEST_FD_UTILS_H_
686+
687+#include "mir/fd.h"
688+#include <chrono>
689+
690+#include <poll.h>
691+
692+#include <gtest/gtest.h>
693+
694+namespace mir
695+{
696+namespace test
697+{
698+::testing::AssertionResult std_call_succeeded(int retval);
699+
700+::testing::AssertionResult fd_is_readable(mir::Fd const& fd);
701+
702+template<typename Period, typename Rep>
703+::testing::AssertionResult fd_becomes_readable(mir::Fd const& fd,
704+ std::chrono::duration<Period, Rep> timeout)
705+{
706+ int timeout_ms = std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count();
707+
708+ pollfd readable;
709+ readable.events = POLLIN;
710+ readable.fd = fd;
711+
712+ auto result = std_call_succeeded(poll(&readable, 1, timeout_ms));
713+ if (result)
714+ {
715+ if (readable.revents & POLLERR)
716+ {
717+ return ::testing::AssertionFailure() << "error condition on fd";
718+ }
719+ if (readable.revents & POLLNVAL)
720+ {
721+ return ::testing::AssertionFailure() << "fd is invalid";
722+ }
723+ if (!(readable.revents & POLLIN))
724+ {
725+ return ::testing::AssertionFailure() << "fd is not readable";
726+ }
727+ return ::testing::AssertionSuccess();
728+ }
729+ return result;
730+}
731+}
732+}
733+
734+#endif // MIR_TEST_FD_UTILS_H_
735
736=== modified file 'tests/include/mir_test/test_protobuf_client.h'
737--- tests/include/mir_test/test_protobuf_client.h 2014-11-27 09:00:52 +0000
738+++ tests/include/mir_test/test_protobuf_client.h 2014-12-04 01:50:56 +0000
739@@ -30,6 +30,13 @@
740
741 namespace mir
742 {
743+namespace client
744+{
745+namespace rpc
746+{
747+class SimpleRpcThread;
748+}
749+}
750 namespace test
751 {
752 namespace doubles
753@@ -42,6 +49,7 @@
754
755 std::shared_ptr<doubles::MockRpcReport> rpc_report;
756 std::shared_ptr<google::protobuf::RpcChannel> channel;
757+ std::shared_ptr<client::rpc::SimpleRpcThread> eventloop;
758 mir::protobuf::DisplayServer::Stub display_server;
759 mir::protobuf::ConnectParameters connect_parameters;
760 mir::protobuf::SurfaceParameters surface_parameters;
761
762=== modified file 'tests/integration-tests/client/test_screencast.cpp'
763--- tests/integration-tests/client/test_screencast.cpp 2014-11-27 09:00:52 +0000
764+++ tests/integration-tests/client/test_screencast.cpp 2014-12-04 01:50:56 +0000
765@@ -18,6 +18,8 @@
766
767 #include "mir_protobuf.pb.h"
768 #include "src/client/default_connection_configuration.h"
769+#include "src/client/rpc/simple_rpc_thread.h"
770+#include "src/client/rpc/dispatchable.h"
771
772 #include "mir/frontend/connector.h"
773 #include "mir_test/test_protobuf_server.h"
774@@ -74,6 +76,9 @@
775 mcl::DefaultConnectionConfiguration{test_socket}.the_rpc_channel();
776 protobuf_server =
777 std::make_shared<mir::protobuf::DisplayServer::Stub>(rpc_channel.get());
778+ eventloop =
779+ std::make_shared<mir::client::rpc::SimpleRpcThread>(
780+ std::dynamic_pointer_cast<mir::client::rpc::Dispatchable>(rpc_channel));
781 }
782
783 char const* const test_socket = "./test_socket_screencast";
784@@ -81,6 +86,7 @@
785 std::shared_ptr<mt::TestProtobufServer> test_server;
786 std::shared_ptr<google::protobuf::RpcChannel> rpc_channel;
787 std::shared_ptr<mir::protobuf::DisplayServer> protobuf_server;
788+ std::shared_ptr<mir::client::rpc::SimpleRpcThread> eventloop;
789 };
790
791 }
792
793=== modified file 'tests/mir_test/CMakeLists.txt'
794--- tests/mir_test/CMakeLists.txt 2014-11-27 09:00:52 +0000
795+++ tests/mir_test/CMakeLists.txt 2014-12-04 01:50:56 +0000
796@@ -11,6 +11,7 @@
797 wait_object.cpp
798 current_thread_name.cpp
799 validity_matchers.cpp
800+ fd_utils.cpp
801 )
802
803 target_link_libraries(mir-test mirprotobuf)
804
805=== added file 'tests/mir_test/fd_utils.cpp'
806--- tests/mir_test/fd_utils.cpp 1970-01-01 00:00:00 +0000
807+++ tests/mir_test/fd_utils.cpp 2014-12-04 01:50:56 +0000
808@@ -0,0 +1,40 @@
809+/*
810+ * Copyright © 2014 Canonical Ltd.
811+ *
812+ * This program is free software: you can redistribute it and/or modify
813+ * it under the terms of the GNU General Public License version 3 as
814+ * published by the Free Software Foundation.
815+ *
816+ * This program is distributed in the hope that it will be useful,
817+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
818+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
819+ * GNU General Public License for more details.
820+ *
821+ * You should have received a copy of the GNU General Public License
822+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
823+ *
824+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
825+ */
826+
827+#include "mir_test/fd_utils.h"
828+
829+::testing::AssertionResult mir::test::std_call_succeeded(int retval)
830+{
831+ if (retval >= 0)
832+ {
833+ return ::testing::AssertionSuccess();
834+ }
835+ else
836+ {
837+ return ::testing::AssertionFailure() << "errno: "
838+ << errno
839+ << " ["
840+ << strerror(errno)
841+ << "]";
842+ }
843+}
844+
845+::testing::AssertionResult mir::test::fd_is_readable(mir::Fd const& fd)
846+{
847+ return fd_becomes_readable(fd, std::chrono::seconds{0});
848+}
849
850=== modified file 'tests/mir_test_doubles/test_protobuf_client.cpp'
851--- tests/mir_test_doubles/test_protobuf_client.cpp 2014-11-27 09:00:52 +0000
852+++ tests/mir_test_doubles/test_protobuf_client.cpp 2014-12-04 01:50:56 +0000
853@@ -25,22 +25,26 @@
854 #include "src/client/lifecycle_control.h"
855 #include "src/client/rpc/make_rpc_channel.h"
856 #include "src/client/rpc/mir_basic_rpc_channel.h"
857+#include "src/client/rpc/dispatchable.h"
858+#include "src/client/rpc/simple_rpc_thread.h"
859
860 #include <thread>
861
862 namespace mtd = mir::test::doubles;
863+namespace mclr = mir::client::rpc;
864
865 mir::test::TestProtobufClient::TestProtobufClient(
866 std::string socket_file,
867 int timeout_ms) :
868 rpc_report(std::make_shared<testing::NiceMock<doubles::MockRpcReport>>()),
869- channel(mir::client::rpc::make_rpc_channel(
870+ channel(mclr::make_rpc_channel(
871 socket_file,
872 std::make_shared<mir::client::ConnectionSurfaceMap>(),
873 std::make_shared<mir::client::DisplayConfiguration>(),
874 rpc_report,
875 std::make_shared<mir::client::LifecycleControl>(),
876 std::make_shared<mtd::NullClientEventSink>())),
877+ eventloop{std::make_shared<mclr::SimpleRpcThread>(std::dynamic_pointer_cast<mclr::Dispatchable>(channel))},
878 display_server(channel.get(), ::google::protobuf::Service::STUB_DOESNT_OWN_CHANNEL),
879 maxwait(timeout_ms),
880 connect_done_called(false),
881
882=== modified file 'tests/unit-tests/client/CMakeLists.txt'
883--- tests/unit-tests/client/CMakeLists.txt 2014-11-27 09:00:52 +0000
884+++ tests/unit-tests/client/CMakeLists.txt 2014-12-04 01:50:56 +0000
885@@ -13,6 +13,7 @@
886 ${CMAKE_CURRENT_SOURCE_DIR}/test_mir_prompt_session.cpp
887 ${CMAKE_CURRENT_SOURCE_DIR}/test_event_distributor.cpp
888 ${CMAKE_CURRENT_SOURCE_DIR}/test_periodic_perf_report.cpp
889+ ${CMAKE_CURRENT_SOURCE_DIR}/test_simple_rpc_thread.cpp
890 )
891
892 if(MIR_TEST_PLATFORM STREQUAL "android")
893
894=== modified file 'tests/unit-tests/client/test_mir_connection.cpp'
895--- tests/unit-tests/client/test_mir_connection.cpp 2014-11-27 09:00:52 +0000
896+++ tests/unit-tests/client/test_mir_connection.cpp 2014-12-04 01:50:56 +0000
897@@ -25,10 +25,14 @@
898 #include "src/client/display_configuration.h"
899 #include "src/client/mir_surface.h"
900 #include "src/client/client_buffer_factory.h"
901+#include "src/client/rpc/dispatchable.h"
902
903 #include "src/server/frontend/resource_cache.h" /* needed by test_server.h */
904 #include "mir_test/test_protobuf_server.h"
905 #include "mir_test/stub_server_tool.h"
906+#include "mir_test/pipe.h"
907+#include "mir_test/signal.h"
908+#include "mir_test/fd_utils.h"
909 #include "mir_test_doubles/stub_client_buffer_factory.h"
910
911 #include "mir_protobuf.pb.h"
912@@ -46,8 +50,14 @@
913 namespace
914 {
915
916-struct MockRpcChannel : public mir::client::rpc::MirBasicRpcChannel
917+struct MockRpcChannel : public mir::client::rpc::MirBasicRpcChannel,
918+ public mir::client::rpc::Dispatchable
919 {
920+ MockRpcChannel()
921+ {
922+ ON_CALL(*this, watch_fd()).WillByDefault(testing::Return(mir::Fd{}));
923+ }
924+
925 void CallMethod(const google::protobuf::MethodDescriptor* method,
926 google::protobuf::RpcController*,
927 const google::protobuf::Message* parameters,
928@@ -75,6 +85,9 @@
929 MOCK_METHOD1(drm_auth_magic, void(const mp::DRMMagic*));
930 MOCK_METHOD2(connect, void(mp::ConnectParameters const*,mp::Connection*));
931 MOCK_METHOD1(configure_display_sent, void(mp::DisplayConfiguration const*));
932+
933+ MOCK_CONST_METHOD0(watch_fd, mir::Fd());
934+ MOCK_METHOD0(dispatch, void());
935 };
936
937 struct MockClientPlatform : public mcl::ClientPlatform
938@@ -166,21 +179,21 @@
939 MirConnectionTest()
940 : mock_platform{std::make_shared<testing::NiceMock<MockClientPlatform>>()},
941 mock_channel{std::make_shared<testing::NiceMock<MockRpcChannel>>()},
942- conf{mock_platform, mock_channel},
943- connection{std::make_shared<MirConnection>(conf)}
944+ conf{mock_platform, mock_channel}
945 {
946 }
947
948 std::shared_ptr<testing::NiceMock<MockClientPlatform>> const mock_platform;
949 std::shared_ptr<testing::NiceMock<MockRpcChannel>> const mock_channel;
950 TestConnectionConfiguration conf;
951- std::shared_ptr<MirConnection> const connection;
952 };
953
954 TEST_F(MirConnectionTest, returns_correct_egl_native_display)
955 {
956 using namespace testing;
957
958+ auto connection = std::make_shared<MirConnection>(conf);
959+
960 EGLNativeDisplayType native_display_raw = reinterpret_cast<EGLNativeDisplayType>(0xabcdef);
961 auto native_display = std::make_shared<EGLNativeDisplayType>();
962 *native_display = native_display_raw;
963@@ -206,6 +219,8 @@
964 {
965 using namespace testing;
966
967+ auto connection = std::make_shared<MirConnection>(conf);
968+
969 unsigned int const drm_magic{0x10111213};
970
971 EXPECT_CALL(*mock_channel, drm_auth_magic(has_drm_magic(drm_magic)))
972@@ -276,6 +291,8 @@
973 {
974 using namespace testing;
975
976+ auto connection = std::make_shared<MirConnection>(conf);
977+
978 EXPECT_CALL(*mock_channel, connect(_,_))
979 .WillOnce(Invoke(fill_display_configuration));
980
981@@ -315,6 +332,8 @@
982 {
983 using namespace testing;
984
985+ auto connection = std::make_shared<MirConnection>(conf);
986+
987 EXPECT_CALL(*mock_channel, connect(_,_))
988 .WillOnce(Invoke(fill_display_configuration));
989
990@@ -360,6 +379,8 @@
991 {
992 using namespace testing;
993
994+ auto connection = std::make_shared<MirConnection>(conf);
995+
996 EXPECT_CALL(*mock_channel, connect(_,_))
997 .WillOnce(Invoke(fill_display_configuration));
998
999@@ -381,6 +402,8 @@
1000 {
1001 using namespace testing;
1002
1003+ auto connection = std::make_shared<MirConnection>(conf);
1004+
1005 EXPECT_CALL(*mock_channel, connect(_,_))
1006 .WillOnce(Invoke(fill_display_configuration));
1007
1008@@ -418,6 +441,8 @@
1009 {
1010 using namespace testing;
1011
1012+ auto connection = std::make_shared<MirConnection>(conf);
1013+
1014 EXPECT_CALL(*mock_channel, connect(_,_))
1015 .WillOnce(Invoke(fill_surface_pixel_formats));
1016 MirWaitHandle* wait_handle = connection->connect("MirClientSurfaceTest",
1017@@ -441,6 +466,8 @@
1018 {
1019 using namespace testing;
1020
1021+ auto connection = std::make_shared<MirConnection>(conf);
1022+
1023 EXPECT_CALL(*mock_channel, connect(_,_))
1024 .WillOnce(Invoke(fill_display_configuration));
1025
1026@@ -504,6 +531,8 @@
1027 {
1028 using namespace testing;
1029
1030+ auto connection = std::make_shared<MirConnection>(conf);
1031+
1032 MirSurfaceParameters params;
1033 params.name = __PRETTY_FUNCTION__;
1034
1035@@ -542,6 +571,8 @@
1036 {
1037 using namespace testing;
1038
1039+ auto connection = std::make_shared<MirConnection>(conf);
1040+
1041 MirSurfaceParameters params;
1042 params.name = __PRETTY_FUNCTION__;
1043
1044@@ -593,6 +624,8 @@
1045 std::vector<int> const initial_data{0x66, 0x67, 0x68};
1046 std::vector<int> const extra_data{0x11, 0x12, 0x13};
1047
1048+ auto connection = std::make_shared<MirConnection>(conf);
1049+
1050 EXPECT_CALL(*mock_channel, connect(_,_))
1051 .WillOnce(FillPlatformDataWith(initial_data));
1052
1053@@ -622,3 +655,73 @@
1054 for (size_t i = 0; i < extra_data.size(); i++)
1055 EXPECT_EQ(extra_data[i], pkg.data[i + initial_data.size()]) << " i=" << i;
1056 }
1057+
1058+TEST_F(MirConnectionTest, dispatch_works_with_automatic_dispatch)
1059+{
1060+ using namespace testing;
1061+
1062+ auto channel = std::dynamic_pointer_cast<MockRpcChannel>(conf.the_rpc_channel());
1063+ mir::test::Pipe mock_epoll;
1064+ auto dispatched = std::make_shared<mir::test::Signal>();
1065+
1066+ ON_CALL(*channel, watch_fd()).WillByDefault(Return(mir::Fd{mock_epoll.read_fd()}));
1067+ ON_CALL(*channel, dispatch())
1068+ .WillByDefault(Invoke([dispatched]() { dispatched->raise(); }));
1069+
1070+ auto connection = std::make_shared<MirConnection>(conf, DispatchType::automatic);
1071+
1072+ int dummy{0};
1073+ EXPECT_EQ(sizeof(dummy), write(mock_epoll.write_fd(), &dummy, sizeof(dummy)));
1074+
1075+ EXPECT_TRUE(dispatched->wait_for(std::chrono::seconds{1}));
1076+}
1077+
1078+TEST_F(MirConnectionTest, manual_dispatch_is_not_automatically_dispatched)
1079+{
1080+ using namespace testing;
1081+
1082+ auto channel = std::dynamic_pointer_cast<MockRpcChannel>(conf.the_rpc_channel());
1083+
1084+ mir::test::Pipe mock_epoll;
1085+ int dummy{0};
1086+ EXPECT_EQ(sizeof(dummy), write(mock_epoll.write_fd(), &dummy, sizeof(dummy)));
1087+
1088+ auto dispatched = std::make_shared<mir::test::Signal>();
1089+
1090+ EXPECT_CALL(*channel, watch_fd()).Times(0);
1091+ ON_CALL(*channel, dispatch())
1092+ .WillByDefault(Invoke([dispatched]() { dispatched->raise(); }));
1093+
1094+ auto connection = std::make_shared<MirConnection>(conf, DispatchType::manual);
1095+
1096+ EXPECT_FALSE(dispatched->wait_for(std::chrono::seconds{1}));
1097+}
1098+
1099+TEST_F(MirConnectionTest, returns_invalid_watch_fd_when_using_automatic_dispatch)
1100+{
1101+ using namespace testing;
1102+
1103+ auto connection = std::make_shared<MirConnection>(conf, DispatchType::automatic);
1104+
1105+ EXPECT_THAT(connection->watch_fd(), Lt(0));
1106+}
1107+
1108+TEST_F(MirConnectionTest, returns_pollable_watch_fd_when_using_manual_dispatch)
1109+{
1110+ using namespace testing;
1111+
1112+ // MirConnection will need a valid fd
1113+ auto channel = std::dynamic_pointer_cast<MockRpcChannel>(conf.the_rpc_channel());
1114+
1115+ mir::test::Pipe mock_epoll;
1116+ ON_CALL(*channel, watch_fd()).WillByDefault(Return(mir::Fd{mock_epoll.read_fd()}));
1117+
1118+ auto connection = std::make_shared<MirConnection>(conf, DispatchType::manual);
1119+
1120+ EXPECT_THAT(connection->watch_fd(), Gt(0));
1121+
1122+ pollfd fd_readable;
1123+ fd_readable.events = POLLIN;
1124+ fd_readable.fd = connection->watch_fd();
1125+ EXPECT_TRUE(mir::test::std_call_succeeded(poll(&fd_readable, 1, 0)));
1126+}
1127
1128=== modified file 'tests/unit-tests/client/test_protobuf_rpc_channel.cpp'
1129--- tests/unit-tests/client/test_protobuf_rpc_channel.cpp 2014-11-27 09:00:52 +0000
1130+++ tests/unit-tests/client/test_protobuf_rpc_channel.cpp 2014-12-04 01:50:56 +0000
1131@@ -108,6 +108,8 @@
1132 MOCK_METHOD2(receive_data, void(void*, size_t));
1133 MOCK_METHOD3(receive_data, void(void*, size_t, std::vector<mir::Fd>&));
1134 MOCK_METHOD2(send_message, void(std::vector<uint8_t> const&, std::vector<mir::Fd> const&));
1135+ MOCK_CONST_METHOD0(watch_fd, mir::Fd());
1136+ MOCK_METHOD0(dispatch, void());
1137
1138 // Transport interface
1139 void register_observer_default(std::shared_ptr<Observer> const& observer)
1140
1141=== added file 'tests/unit-tests/client/test_simple_rpc_thread.cpp'
1142--- tests/unit-tests/client/test_simple_rpc_thread.cpp 1970-01-01 00:00:00 +0000
1143+++ tests/unit-tests/client/test_simple_rpc_thread.cpp 2014-12-04 01:50:56 +0000
1144@@ -0,0 +1,105 @@
1145+/*
1146+ * Copyright © 2014 Canonical Ltd.
1147+ *
1148+ * This program is free software: you can redistribute it and/or modify
1149+ * it under the terms of the GNU General Public License version 3 as
1150+ * published by the Free Software Foundation.
1151+ *
1152+ * This program is distributed in the hope that it will be useful,
1153+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
1154+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1155+ * GNU General Public License for more details.
1156+ *
1157+ * You should have received a copy of the GNU General Public License
1158+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
1159+ *
1160+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
1161+ */
1162+
1163+#include "src/client/rpc/simple_rpc_thread.h"
1164+#include "src/client/rpc/dispatchable.h"
1165+#include "mir/fd.h"
1166+#include "mir_test/pipe.h"
1167+#include "mir_test/signal.h"
1168+
1169+#include <fcntl.h>
1170+
1171+#include <atomic>
1172+
1173+#include <gtest/gtest.h>
1174+#include <gmock/gmock.h>
1175+
1176+namespace mclr = mir::client::rpc;
1177+namespace mt = mir::test;
1178+
1179+namespace
1180+{
1181+class SimpleRPCThreadTest : public ::testing::Test
1182+{
1183+public:
1184+ SimpleRPCThreadTest()
1185+ {
1186+ watch_fd = mir::Fd{pipe.read_fd()};
1187+ test_fd = mir::Fd{pipe.write_fd()};
1188+ fcntl(watch_fd, F_SETFL, O_NONBLOCK);
1189+ }
1190+
1191+ mir::Fd watch_fd;
1192+ mir::Fd test_fd;
1193+private:
1194+ mt::Pipe pipe;
1195+};
1196+
1197+class MockDispatchable : public mclr::Dispatchable
1198+{
1199+public:
1200+ MOCK_CONST_METHOD0(watch_fd, mir::Fd());
1201+ MOCK_METHOD0(dispatch, void());
1202+};
1203+
1204+}
1205+
1206+TEST_F(SimpleRPCThreadTest, CallsDispatchWhenFdIsReadable)
1207+{
1208+ using namespace testing;
1209+
1210+ auto dispatchable = std::make_shared<NiceMock<MockDispatchable>>();
1211+ ON_CALL(*dispatchable, watch_fd()).WillByDefault(Return(watch_fd));
1212+
1213+ auto dispatched = std::make_shared<mt::Signal>();
1214+ ON_CALL(*dispatchable, dispatch()).WillByDefault(Invoke([dispatched]() { dispatched->raise(); }));
1215+
1216+ mclr::SimpleRpcThread dispatcher{dispatchable};
1217+
1218+ uint64_t dummy{0xdeadbeef};
1219+ EXPECT_EQ(sizeof(dummy), write(test_fd, &dummy, sizeof(dummy)));
1220+
1221+ EXPECT_TRUE(dispatched->wait_for(std::chrono::seconds{1}));
1222+}
1223+
1224+TEST_F(SimpleRPCThreadTest, StopsCallingDispatchOnceFdIsNotReadable)
1225+{
1226+ using namespace testing;
1227+
1228+ uint64_t dummy{0xdeadbeef};
1229+ std::atomic<int> dispatch_count{0};
1230+
1231+ auto dispatchable = std::make_shared<NiceMock<MockDispatchable>>();
1232+ ON_CALL(*dispatchable, watch_fd()).WillByDefault(Return(watch_fd));
1233+
1234+ auto dispatched = std::make_shared<mt::Signal>();
1235+ ON_CALL(*dispatchable, dispatch()).WillByDefault(Invoke([this, &dispatch_count]()
1236+ {
1237+ decltype(dummy) buffer;
1238+ dispatch_count++;
1239+ read(this->watch_fd, &buffer, sizeof(buffer));
1240+ }));
1241+
1242+ mclr::SimpleRpcThread dispatcher{dispatchable};
1243+
1244+ EXPECT_EQ(sizeof(dummy), write(test_fd, &dummy, sizeof(dummy)));
1245+
1246+ std::this_thread::sleep_for(std::chrono::seconds{1});
1247+
1248+ EXPECT_EQ(1, dispatch_count);
1249+}
1250
1251=== modified file 'tests/unit-tests/client/test_stream_transport.cpp'
1252--- tests/unit-tests/client/test_stream_transport.cpp 2014-11-27 09:00:52 +0000
1253+++ tests/unit-tests/client/test_stream_transport.cpp 2014-12-04 01:50:56 +0000
1254@@ -22,6 +22,7 @@
1255
1256 #include "mir_test/auto_unblock_thread.h"
1257 #include "mir_test/signal.h"
1258+#include "mir_test/fd_utils.h"
1259 #include "mir/raii.h"
1260
1261 #include <sys/socket.h>
1262@@ -43,6 +44,7 @@
1263 #include <gmock/gmock.h>
1264
1265 namespace mclr = mir::client::rpc;
1266+namespace mt = mir::test;
1267
1268 namespace
1269 {
1270@@ -71,12 +73,6 @@
1271 transport = std::make_shared<TransportMechanism>(transport_fd);
1272 }
1273
1274- virtual ~StreamTransportTest()
1275- {
1276- // We don't care about errors, so unconditionally close the test fd.
1277- close(test_fd);
1278- }
1279-
1280 mir::Fd transport_fd;
1281 mir::Fd test_fd;
1282 std::shared_ptr<TransportMechanism> transport;
1283@@ -85,51 +81,201 @@
1284 typedef ::testing::Types<mclr::StreamSocketTransport> Transports;
1285 TYPED_TEST_CASE(StreamTransportTest, Transports);
1286
1287+TYPED_TEST(StreamTransportTest, ReturnsValidWatchFd)
1288+{
1289+ // A valid fd is >= 0, and we know that stdin, stdout, and stderr aren't correct.
1290+ EXPECT_GE(this->transport->watch_fd(), 3);
1291+}
1292+
1293+TYPED_TEST(StreamTransportTest, WatchFdIsPollable)
1294+{
1295+ pollfd socket_readable;
1296+ socket_readable.events = POLLIN;
1297+ socket_readable.fd = this->transport->watch_fd();
1298+
1299+ ASSERT_TRUE(mt::std_call_succeeded(poll(&socket_readable, 1, 0)));
1300+
1301+ EXPECT_FALSE(socket_readable.revents & POLLERR);
1302+ EXPECT_FALSE(socket_readable.revents & POLLNVAL);
1303+}
1304+
1305+TYPED_TEST(StreamTransportTest, WatchFdNotifiesReadableWhenDataPending)
1306+{
1307+ uint64_t dummy{0xdeadbeef};
1308+ EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));
1309+
1310+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1311+}
1312+
1313+TYPED_TEST(StreamTransportTest, WatchFdRemainsUnreadableUntilEventPending)
1314+{
1315+ EXPECT_FALSE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1316+
1317+ uint64_t dummy{0xdeadbeef};
1318+ EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));
1319+
1320+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1321+}
1322+
1323+TYPED_TEST(StreamTransportTest, WatchFdIsNoLongerReadableAfterEventProcessing)
1324+{
1325+ using namespace testing;
1326+
1327+ uint64_t dummy{0xdeadbeef};
1328+
1329+ auto observer = std::make_shared<NiceMock<MockObserver>>();
1330+
1331+ ON_CALL(*observer, on_data_available())
1332+ .WillByDefault(Invoke([dummy, this]()
1333+ {
1334+ decltype(dummy) buffer;
1335+ this->transport->receive_data(&buffer, sizeof(dummy));
1336+ }));
1337+
1338+ this->transport->register_observer(observer);
1339+
1340+ EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));
1341+
1342+ ASSERT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1343+
1344+ this->transport->dispatch();
1345+
1346+ EXPECT_FALSE(mt::fd_is_readable(this->test_fd));
1347+}
1348+
1349+TYPED_TEST(StreamTransportTest, NoEventsDispatchedUntilDispatchCalled)
1350+{
1351+ using namespace testing;
1352+
1353+ auto observer = std::make_shared<NiceMock<MockObserver>>();
1354+ bool data_available{false};
1355+ bool disconnected{false};
1356+
1357+ uint64_t dummy{0xdeadbeef};
1358+ EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));
1359+ ::close(this->test_fd);
1360+
1361+ ON_CALL(*observer, on_data_available()).WillByDefault(Invoke([this, dummy, &data_available]()
1362+ {
1363+ decltype(dummy) buffer;
1364+ this->transport->receive_data(&buffer, sizeof(buffer));
1365+ data_available = true;
1366+ }));
1367+ ON_CALL(*observer, on_disconnected()).WillByDefault(Invoke([&disconnected]()
1368+ { disconnected = true; }));
1369+
1370+ this->transport->register_observer(observer);
1371+
1372+ std::this_thread::sleep_for(std::chrono::seconds{1});
1373+ EXPECT_FALSE(data_available);
1374+ EXPECT_FALSE(disconnected);
1375+
1376+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1377+ while (mt::fd_is_readable(this->transport->watch_fd()))
1378+ {
1379+ this->transport->dispatch();
1380+ }
1381+
1382+ EXPECT_TRUE(data_available);
1383+ EXPECT_TRUE(disconnected);
1384+}
1385+
1386+TYPED_TEST(StreamTransportTest, DispatchesSingleEventAtATime)
1387+{
1388+ using namespace testing;
1389+
1390+ auto observer = std::make_shared<NiceMock<MockObserver>>();
1391+ bool data_available{false};
1392+ bool disconnected{false};
1393+
1394+ uint64_t dummy{0xdeadbeef};
1395+ EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));
1396+ ::close(this->test_fd);
1397+
1398+ ON_CALL(*observer, on_data_available()).WillByDefault(Invoke([this, dummy, &data_available]()
1399+ {
1400+ decltype(dummy) buffer;
1401+ this->transport->receive_data(&buffer, sizeof(buffer));
1402+ data_available = true;
1403+ }));
1404+ ON_CALL(*observer, on_disconnected()).WillByDefault(Invoke([&disconnected]()
1405+ { disconnected = true; }));
1406+
1407+ this->transport->register_observer(observer);
1408+
1409+ EXPECT_FALSE(data_available);
1410+ EXPECT_FALSE(disconnected);
1411+
1412+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1413+
1414+ this->transport->dispatch();
1415+
1416+ EXPECT_TRUE(data_available xor disconnected);
1417+
1418+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1419+
1420+ this->transport->dispatch();
1421+
1422+ EXPECT_TRUE(data_available);
1423+ EXPECT_TRUE(disconnected);
1424+}
1425+
1426 TYPED_TEST(StreamTransportTest, NoticesRemoteDisconnect)
1427 {
1428 using namespace testing;
1429 auto observer = std::make_shared<NiceMock<MockObserver>>();
1430- auto done = std::make_shared<mir::test::Signal>();
1431+ bool disconnected{false};
1432
1433- ON_CALL(*observer, on_disconnected()).WillByDefault(Invoke([done]()
1434- { done->raise(); }));
1435+ ON_CALL(*observer, on_disconnected()).WillByDefault(Invoke([&disconnected]()
1436+ { disconnected = true; }));
1437
1438 this->transport->register_observer(observer);
1439
1440 close(this->test_fd);
1441
1442- EXPECT_TRUE(done->wait_for(std::chrono::seconds{1}));
1443+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1444+ while (mt::fd_is_readable(this->transport->watch_fd()))
1445+ {
1446+ this->transport->dispatch();
1447+ }
1448+
1449+ EXPECT_TRUE(disconnected);
1450 }
1451
1452-TYPED_TEST(StreamTransportTest, NoticesRemoteDisconnectWhileReadingInIOLoop)
1453+TYPED_TEST(StreamTransportTest, NoticesRemoteDisconnectWhileReading)
1454 {
1455 using namespace testing;
1456 auto observer = std::make_shared<NiceMock<MockObserver>>();
1457- auto done = std::make_shared<mir::test::Signal>();
1458- bool data_notified{false};
1459- bool finished_read{false};
1460-
1461- ON_CALL(*observer, on_disconnected()).WillByDefault(Invoke([done]()
1462- { done->raise(); }));
1463- ON_CALL(*observer, on_data_available())
1464- .WillByDefault(Invoke([this, &data_notified, &finished_read]()
1465- {
1466- data_notified = true;
1467- char buffer[8];
1468- this->transport->receive_data(buffer, sizeof(buffer));
1469- finished_read = true;
1470- }));
1471-
1472+ bool disconnected{false};
1473+ bool receive_error_detected{false};
1474+
1475+ ON_CALL(*observer, on_disconnected()).WillByDefault(Invoke([&disconnected]()
1476+ { disconnected = true; }));
1477 this->transport->register_observer(observer);
1478
1479- uint32_t dummy{0xdeadbeef};
1480- EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));
1481-
1482- close(this->test_fd);
1483-
1484- EXPECT_TRUE(done->wait_for(std::chrono::seconds{1}));
1485- EXPECT_TRUE(data_notified);
1486- EXPECT_FALSE(finished_read);
1487+ mir::test::AutoJoinThread closer{[this]()
1488+ {
1489+ std::this_thread::sleep_for(std::chrono::seconds{1});
1490+ ::close(this->test_fd);
1491+ }};
1492+
1493+ try
1494+ {
1495+ char buffer[8];
1496+ this->transport->receive_data(buffer, sizeof(buffer));
1497+ }
1498+ catch (std::runtime_error)
1499+ {
1500+ receive_error_detected = true;
1501+ }
1502+
1503+ // There should now be a disconnect event pending...
1504+ EXPECT_TRUE(mt::fd_is_readable(this->transport->watch_fd()));
1505+
1506+ this->transport->dispatch();
1507+
1508+ EXPECT_TRUE(disconnected);
1509+ EXPECT_TRUE(receive_error_detected);
1510 }
1511
1512 TYPED_TEST(StreamTransportTest, NotifiesOnDataAvailable)
1513@@ -137,39 +283,21 @@
1514 using namespace testing;
1515
1516 auto observer = std::make_shared<NiceMock<MockObserver>>();
1517- auto done = std::make_shared<mir::test::Signal>();
1518-
1519- ON_CALL(*observer, on_data_available()).WillByDefault(Invoke([done]()
1520- { done->raise(); }));
1521-
1522- this->transport->register_observer(observer);
1523-
1524- uint64_t dummy{0xdeadbeef};
1525- EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));
1526-
1527- EXPECT_TRUE(done->wait_for(std::chrono::seconds{1}));
1528-}
1529-
1530-TYPED_TEST(StreamTransportTest, DoesntNotifyUntilDataAvailable)
1531-{
1532- using namespace testing;
1533-
1534- auto observer = std::make_shared<NiceMock<MockObserver>>();
1535- auto done = std::make_shared<mir::test::Signal>();
1536-
1537- ON_CALL(*observer, on_data_available()).WillByDefault(Invoke([done]()
1538- { done->raise(); }));
1539-
1540- this->transport->register_observer(observer);
1541-
1542- std::this_thread::sleep_for(std::chrono::seconds{1});
1543-
1544- EXPECT_FALSE(done->raised());
1545-
1546- uint64_t dummy{0xdeadbeef};
1547- EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));
1548-
1549- EXPECT_TRUE(done->wait_for(std::chrono::seconds{1}));
1550+ bool notified_data_available{false};
1551+
1552+ ON_CALL(*observer, on_data_available()).WillByDefault(Invoke([&notified_data_available]()
1553+ { notified_data_available = true; }));
1554+
1555+ this->transport->register_observer(observer);
1556+
1557+ uint64_t dummy{0xdeadbeef};
1558+ EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));
1559+
1560+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1561+
1562+ this->transport->dispatch();
1563+
1564+ EXPECT_TRUE(notified_data_available);
1565 }
1566
1567 TYPED_TEST(StreamTransportTest, KeepsNotifyingOfAvailableDataUntilAllIsRead)
1568@@ -177,98 +305,104 @@
1569 using namespace testing;
1570
1571 auto observer = std::make_shared<NiceMock<MockObserver>>();
1572- auto done = std::make_shared<mir::test::Signal>();
1573
1574 std::array<uint8_t, sizeof(int) * 256> data;
1575 data.fill(0);
1576- std::atomic<size_t> bytes_left{data.size()};
1577+ size_t bytes_left{data.size()};
1578
1579 ON_CALL(*observer, on_data_available())
1580- .WillByDefault(Invoke([done, &bytes_left, this]()
1581+ .WillByDefault(Invoke([&bytes_left, this]()
1582 {
1583 int dummy;
1584 this->transport->receive_data(&dummy, sizeof(dummy));
1585- bytes_left.fetch_sub(sizeof(dummy));
1586- if (bytes_left.load() == 0)
1587- {
1588- done->raise();
1589- }
1590+ bytes_left -= sizeof(dummy);
1591 }));
1592
1593 this->transport->register_observer(observer);
1594
1595 EXPECT_EQ(data.size(), write(this->test_fd, data.data(), data.size()));
1596
1597- EXPECT_TRUE(done->wait_for(std::chrono::seconds{5}));
1598- EXPECT_EQ(0, bytes_left.load());
1599+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1600+ while (mt::fd_is_readable(this->transport->watch_fd()))
1601+ {
1602+ this->transport->dispatch();
1603+ }
1604+
1605+ EXPECT_EQ(0, bytes_left);
1606 }
1607
1608 TYPED_TEST(StreamTransportTest, StopsNotifyingOnceAllDataIsRead)
1609 {
1610 using namespace testing;
1611- int const buffer_size{256};
1612
1613 auto observer = std::make_shared<NiceMock<MockObserver>>();
1614- auto done = std::make_shared<mir::test::Signal>();
1615+
1616+ std::array<uint8_t, sizeof(int) * 256> data;
1617+ data.fill(0);
1618+ size_t bytes_left{data.size()};
1619
1620 ON_CALL(*observer, on_data_available())
1621- .WillByDefault(Invoke([this, done]()
1622+ .WillByDefault(Invoke([&bytes_left, this]()
1623 {
1624- if (done->raised())
1625- {
1626- FAIL() << "on_data_available called without new data available";
1627- }
1628- uint8_t dummy_buffer[buffer_size];
1629- this->transport->receive_data(dummy_buffer, sizeof(dummy_buffer));
1630- done->raise();
1631+ int dummy;
1632+ this->transport->receive_data(&dummy, sizeof(dummy));
1633+ bytes_left -= sizeof(dummy);
1634 }));
1635+
1636 this->transport->register_observer(observer);
1637
1638- EXPECT_FALSE(done->raised());
1639- uint8_t dummy_buffer[buffer_size];
1640- memset(dummy_buffer, 0xab, sizeof(dummy_buffer));
1641- EXPECT_EQ(sizeof(dummy_buffer), write(this->test_fd, dummy_buffer, sizeof(dummy_buffer)));
1642-
1643- EXPECT_TRUE(done->wait_for(std::chrono::seconds{1}));
1644-
1645- std::this_thread::sleep_for(std::chrono::seconds{1});
1646+ EXPECT_EQ(data.size(), write(this->test_fd, data.data(), data.size()));
1647+
1648+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1649+ while (bytes_left > 0)
1650+ {
1651+ this->transport->dispatch();
1652+ }
1653+
1654+ EXPECT_FALSE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1655 }
1656
1657 TYPED_TEST(StreamTransportTest, DoesntSendDataAvailableNotificationOnDisconnect)
1658 {
1659 using namespace testing;
1660- int const buffer_size{256};
1661
1662 auto observer = std::make_shared<NiceMock<MockObserver>>();
1663- auto read_done = std::make_shared<mir::test::Signal>();
1664- auto disconnect_done = std::make_shared<mir::test::Signal>();
1665- std::atomic<int> notify_count{0};
1666-
1667+ int notify_count{0};
1668+ bool disconnected{false};
1669+
1670+ uint64_t dummy{0xdeedfaac};
1671+ EXPECT_EQ(sizeof(dummy), write(this->test_fd, &dummy, sizeof(dummy)));
1672+
1673+ ON_CALL(*observer, on_disconnected()).WillByDefault(Invoke([&disconnected]()
1674+ { disconnected = true; }));
1675 ON_CALL(*observer, on_data_available())
1676- .WillByDefault(Invoke([this, read_done, &notify_count]()
1677+ .WillByDefault(Invoke([dummy, &notify_count, this]()
1678 {
1679 notify_count++;
1680- uint8_t dummy_buffer[buffer_size];
1681- this->transport->receive_data(dummy_buffer, sizeof(dummy_buffer));
1682- read_done->raise();
1683+
1684+ decltype(dummy) buffer;
1685+ this->transport->receive_data(&buffer, sizeof(buffer));
1686 }));
1687- ON_CALL(*observer, on_disconnected()).WillByDefault(Invoke([this, disconnect_done]()
1688- { disconnect_done->raise(); }));
1689
1690 this->transport->register_observer(observer);
1691
1692- EXPECT_FALSE(read_done->raised());
1693- uint8_t dummy_buffer[buffer_size];
1694- memset(dummy_buffer, 0xab, sizeof(dummy_buffer));
1695- EXPECT_EQ(sizeof(dummy_buffer), write(this->test_fd, dummy_buffer, sizeof(dummy_buffer)));
1696-
1697- EXPECT_TRUE(read_done->wait_for(std::chrono::seconds{1}));
1698- EXPECT_EQ(1, notify_count);
1699+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1700+ while (mt::fd_is_readable(this->transport->watch_fd()))
1701+ {
1702+ this->transport->dispatch();
1703+ }
1704
1705 ::close(this->test_fd);
1706- EXPECT_TRUE(disconnect_done->wait_for(std::chrono::seconds{1}));
1707-
1708+
1709+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1710+ while (mt::fd_is_readable(this->transport->watch_fd()))
1711+ {
1712+ this->transport->dispatch();
1713+ }
1714+
1715+ EXPECT_FALSE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1716 EXPECT_EQ(1, notify_count);
1717+ EXPECT_TRUE(disconnected);
1718 }
1719
1720 TYPED_TEST(StreamTransportTest, ReadsCorrectData)
1721@@ -276,23 +410,26 @@
1722 using namespace testing;
1723
1724 auto observer = std::make_shared<NiceMock<MockObserver>>();
1725- auto done = std::make_shared<mir::test::Signal>();
1726
1727 std::string expected{"I am the very model of a modern major general"};
1728 std::vector<char> received(expected.size());
1729
1730 ON_CALL(*observer, on_data_available())
1731- .WillByDefault(Invoke([done, &received, this]()
1732+ .WillByDefault(Invoke([&received, this]()
1733 {
1734 this->transport->receive_data(received.data(), received.size());
1735- done->raise();
1736 }));
1737
1738 this->transport->register_observer(observer);
1739
1740 EXPECT_EQ(expected.size(), write(this->test_fd, expected.data(), expected.size()));
1741
1742- ASSERT_TRUE(done->wait_for(std::chrono::seconds{1}));
1743+ EXPECT_TRUE(mt::fd_becomes_readable(this->transport->watch_fd(), std::chrono::seconds{1}));
1744+ while (mt::fd_is_readable(this->transport->watch_fd()))
1745+ {
1746+ this->transport->dispatch();
1747+ }
1748+
1749 EXPECT_EQ(0, memcmp(expected.data(), received.data(), expected.size()));
1750 }
1751

Subscribers

People subscribed via source and target branches