Mir

Merge lp:~mir-team/mir/add-multiplexing-dispatchable into lp:mir

Proposed by Chris Halse Rogers on 2015-01-16
Status: Merged
Approved by: Chris Halse Rogers on 2015-02-18
Approved revision: 2310
Merged at revision: 2326
Proposed branch: lp:~mir-team/mir/add-multiplexing-dispatchable
Merge into: lp:mir
Prerequisite: lp:~mir-team/mir/add-dispatchable-interface
Diff against target: 1390 lines (+1216/-46)
14 files modified
benchmarks/CMakeLists.txt (+8/-0)
benchmarks/benchmark_multiplexing_dispatchable.cpp (+122/-0)
include/common/mir/dispatch/multiplexing_dispatchable.h (+106/-0)
src/common/dispatch/CMakeLists.txt (+2/-0)
src/common/dispatch/multiplexing_dispatchable.cpp (+300/-0)
src/common/dispatch/simple_dispatch_thread.cpp (+3/-45)
src/common/dispatch/utils.cpp (+65/-0)
src/common/dispatch/utils.h (+38/-0)
src/common/input/android/android_input_receiver_thread.cpp (+1/-1)
src/common/symbols.map (+11/-0)
tests/unit-tests/CMakeLists.txt (+1/-0)
tests/unit-tests/dispatch/CMakeLists.txt (+2/-0)
tests/unit-tests/dispatch/test_dispatch_utils.cpp (+70/-0)
tests/unit-tests/dispatch/test_multiplexing_dispatchable.cpp (+487/-0)
To merge this branch: bzr merge lp:~mir-team/mir/add-multiplexing-dispatchable
Reviewer Review Type Date Requested Status
Daniel van Vugt Abstain on 2015-02-17
Alexandros Frantzis (community) Approve on 2015-02-13
PS Jenkins bot continuous-integration Approve on 2015-02-12
Chris Halse Rogers Abstain on 2015-02-11
Alan Griffiths 2015-01-16 Abstain on 2015-01-28
Robert Carr (community) Approve on 2015-01-27
Review via email: mp+246674@code.launchpad.net

Commit Message

Add MultiplexingDispatchable.

This does exactly what it says; multiplexes multiple Dispatchables into a single Dispatchable. Each dispatch() of the MultiplexingDispatchable dispatches to a child Dispatchable with active events.

Description of the Change

The second part of eventloop integration. This is pretty obvious; it's necessary because we want to present a single fd to wait on for the client regardless of the internal implementation, and we currently have one RPC fd + one input fd per surface with input.

Note that there's potentially a small performance improvement available if we specialise adding a MultiplexingDispatchable to a different MultiDisp, but I'm not sure if that will come up.

To post a comment you must log in.
2232. By Daniel van Vugt on 2015-01-19

Add client API access to set surfaces "hidden".

Approved by PS Jenkins bot, Alberto Aguirre, Cemil Azizoglu, Alan Griffiths.

2233. By Kevin DuBois on 2015-01-19

android: make the helper class HwcLayerList more helpful by adding a common hwc list function to update the framebuffer.

Approved by PS Jenkins bot, Alexandros Frantzis, Robert Carr.

2234. By Alan Griffiths on 2015-01-19

tests: fix logging option for some acceptance and integration tests
(LP: #1408231). Fixes: https://bugs.launchpad.net/bugs/1408231.

Approved by PS Jenkins bot, Daniel van Vugt, Kevin DuBois, Robert Carr, Cemil Azizoglu.

2235. By Daniel van Vugt on 2015-01-19

Modifier flags: Avoid casting to impossible (unnamed) enum values. That would only confuse debuggers and future maintainers.

Approved by Cemil Azizoglu, PS Jenkins bot.

2236. By Daniel van Vugt on 2015-01-20

demo-shell: Allow mouse resizing (Alt+middlebutton) to control any edge
or corner.

Approved by Alexandros Frantzis, PS Jenkins bot.

2237. By Alan Griffiths on 2015-01-20

Bump server ABI.

Approved by Daniel van Vugt, PS Jenkins bot.

2238. By Alan Griffiths on 2015-01-20

shell: remove FocusSetter from public interface.

Approved by PS Jenkins bot, Alexandros Frantzis.

2239. By Daniel van Vugt on 2015-01-20

Document server ABI bump to 29.

Approved by Alan Griffiths, PS Jenkins bot.

2240. By Chris Halse Rogers on 2015-01-21

Add the start of a pollable fd interface to StreamTransport

2241. By Chris Halse Rogers on 2015-01-21

Test that watch_fd() is pollable

2242. By Chris Halse Rogers on 2015-01-21

Wire up watch_fd() to the actual epoller

2243. By Chris Halse Rogers on 2015-01-21

Add dispatch() method to StreamTransport

2244. By Chris Halse Rogers on 2015-01-21

Use mir::Fd in StreamTransport

2245. By Chris Halse Rogers on 2015-01-21

Remove superfluous ~StreamTransportTest

2246. By Chris Halse Rogers on 2015-01-21

Switch StreamSocketTransport over to a dispatch() driven interface.

Only the specific tests have been updated; at this point all client code will fail.

2247. By Chris Halse Rogers on 2015-01-21

Drop the ‘more events pending’ return value from StreamTransport::dispatch().

Anyone polling watch_fd() will do this with equal efficiency.

2248. By Chris Halse Rogers on 2015-01-21

Move dispatch tests to the top of the file.

These are more primitive than the observer & data read/write ones

2249. By Chris Halse Rogers on 2015-01-21

Simplify StreamSocketTransport::dispatch.

We don't actually need to do all that exception-catching, particularly since
we don't do anything but eat the exceptions anyway.

2250. By Chris Halse Rogers on 2015-01-21

Test that we dispatch a signle event for a single ::dispatch()

2251. By Chris Halse Rogers on 2015-01-21

Fix more docs; Observers are now called from the dispatch() thread

2252. By Chris Halse Rogers on 2015-01-21

Factor out fd_is_readable and fd_becomes_readable.

These are broadly useful; now live in mir_test

2253. By Chris Halse Rogers on 2015-01-21

Add a simple eventloop thread implementation

2254. By Chris Halse Rogers on 2015-01-21

Promote Dispatchable to a top-level namespace

2255. By Chris Halse Rogers on 2015-01-21

Move dispatch-y things to mir::dispatch namespace

2256. By Chris Halse Rogers on 2015-01-21

Also rename the SimpleRPCThread tests

2257. By Chris Halse Rogers on 2015-01-21

(Prepare to) Pass the set of events through to Dispatchable::dispatch.

It can be useful for a Dispatchable to know why it's been dispatched.
Specifically, with this we'll be to implement StreamSocketTransport
without an additional epoll fd.

2258. By Chris Halse Rogers on 2015-01-21

Make SimpleDispatchThread pass the relevant fd_event to its dispatchee

2259. By Chris Halse Rogers on 2015-01-21

Simplify StreamSocketTransport with the new dispatch(fd_events) interface.

We no longer need an epoll fd simply to monitor when our socket is closed.

2260. By Chris Halse Rogers on 2015-01-21

Return false from Dispatchable::dispatch() to indicate no further events.

Once the remote end has closed the connection a fd will remain in state
fd_event::remote_closed indefinitely. Likewise, a file descriptor at EOF
will indefinitely be readable.

At the same time it's unsafe to ::close the relevant fd. For a start,
there's no guarantee that someone hasn't dup()ed it, or kept a reference
to the mir::Fd that will now close some random file descriptor when it
loses scope. Even if neither of those are true, close()ing a file descriptor
when some other thread is in select(), poll(), or epoll_wait() invokes
undefined behaviour.

Let the Dispatchable return false from dispatch to signal that it should
no longer be called, and punt that problem to whatever is dispatching it.

2261. By Chris Halse Rogers on 2015-01-21

Reword watch_fd() comment

2262. By Chris Halse Rogers on 2015-01-21

socket_fd can be const again

2263. By Chris Halse Rogers on 2015-01-21

Consume the result of read() in SimpleDispatchThread tests

2264. By Chris Halse Rogers on 2015-01-21

Use (and test) Dispatchable::relevant_events()

2265. By Chris Halse Rogers on 2015-01-21

Slightly better docs for Dispatchable

2266. By Chris Halse Rogers on 2015-01-21

CamelCase fd_event and fd_events

2267. By Chris Halse Rogers on 2015-01-21

Fixup test_naming_policy

2268. By Chris Halse Rogers on 2015-01-21

Move mt::fd_utils implementation out of header file

2269. By Chris Halse Rogers on 2015-01-21

Port mir::test::Pipe to mir::Fd

2270. By Chris Halse Rogers on 2015-01-21

Augment mt::Pipe to accept pipe2() flags

2271. By Chris Halse Rogers on 2015-01-21

Add a TestDispatchable to mir::test

2272. By Chris Halse Rogers on 2015-01-21

Simplify SimpleDispatchThread tests with mt::TestDispatchable

2273. By Chris Halse Rogers on 2015-01-21

Fix all the copyrights

2274. By Chris Halse Rogers on 2015-01-21

SimpleDispatchThread: Correctly close shutdown mir::Fd

2275. By Chris Halse Rogers on 2015-01-21

Dispatchable: Doc fixup

Chris Halse Rogers (raof) wrote :

For what it's worth, ThreadSanitizer gives this code a clean bill of health.

2276. By Chris Halse Rogers on 2015-01-22

Add new public headers to SHA1sums

Alan Griffiths (alan-griffiths) wrote :

It isn't clear to me how this fits into the system - I would have expected the code to live in libmirclient, not libmircommon. Why am I wrong?

review: Needs Information
Robert Carr (robertcarr) wrote :

Looks good to me. Chris and I discussed the choice of the GC implementation some on IRC...

If anyone else is as confused as I initially was I've archived the conversation for reading:
http://pastebin.ubuntu.com/9889667/

Some "nonblocking" test comments from the first pass:

+ auto dispatchee = std::make_shared<mt::TestDispatchable>([&dispatched]() { dispatched = true; });

This looks like a common enough usage throughout multiple tests that perhaps this could be the default TestDispatchable constructor (with a ::dispatched member variable).

+TEST(DispatchUtilsTest, conversion_functions_roundtrip)

Roundtrip?

Robert Carr (robertcarr) :
review: Approve
2277. By Chris Halse Rogers on 2015-01-27

Drop accidental flag.h detritus from ABI SHAsums

2278. By Chris Halse Rogers on 2015-01-27

Add an epoll-based MultiplexingDispatchable

2279. By Chris Halse Rogers on 2015-01-27

Add ability to remove a Dispatchable from a MultiDispatchable

2280. By Chris Halse Rogers on 2015-01-27

Improve error message for duplicate watches

2281. By Chris Halse Rogers on 2015-01-27

Make add_watch failures not change the state of the MultiDispatchable

2282. By Chris Halse Rogers on 2015-01-27

Make MultiDispatchable threadsafe.

We rely on the threadsafety of epoll_ctl/epoll_wait, and use the EPOLLONESHOT
mode to ensure that each time a watched file descriptor becomes readable it is
dispatched from exactly one thread waiting in epoll_wait.

2283. By Chris Halse Rogers on 2015-01-27

Add optional concurrent dispatch mode to MultiDispatch

2284. By Chris Halse Rogers on 2015-01-27

Factor out epoll_to_fd_event and fd_event_to_epoll.

Also test them, fixing the embarassing bugs in the existing implementation.

2285. By Chris Halse Rogers on 2015-01-27

Add some doxygen to MultiplexingDispatchable

2286. By Chris Halse Rogers on 2015-01-27

Interface for adding/removing a raw {fd, std::function<void()>} pair

2287. By Chris Halse Rogers on 2015-01-27

Test (and fix) that remove_watch is threadsafe

2288. By Chris Halse Rogers on 2015-01-27

More threadsafety testing (and a fix) for MultiplexingDispatchable

2289. By Chris Halse Rogers on 2015-01-27

Test that Dispatchables that return false from dispatch() are removed

2290. By Chris Halse Rogers on 2015-01-27

Oh, yeah. It's 2015, that's right

2291. By Chris Halse Rogers on 2015-01-27

Add new public headers to the SHA1sums

Chris Halse Rogers (raof) wrote :

@Alan: It could be moved to libmirclient, this is true. It's in libmircommon because (a) it's not client-specific code at all, and could be used in the server in future, and (b) because it evolved from code in mircommon.

Alan Griffiths (alan-griffiths) wrote :

> @Alan: It could be moved to libmirclient, this is true. It's in libmircommon
> because (a) it's not client-specific code at all, and could be used in the
> server in future, and (b) because it evolved from code in mircommon.

we envisage three types of downstream:

1. Clients - use libmirclient + libmircommon
2. Servers - use libmirserver + libmirclient + libmirplatform + libmircommon
3. Platforms - use libmirplatform + libmircommon

Unless we see the code being used by platforms then I'd rather see it in libmirclient to minimize those dependencies.

2292. By Chris Halse Rogers on 2015-01-27

Merge prerequisite branch

2293. By Chris Halse Rogers on 2015-01-28

Fix epoll event request for subsequent dispatches

2294. By Chris Halse Rogers on 2015-01-28

Speed up a particularly slow unit-test

Chris Halse Rogers (raof) wrote :

Oh, yeah. That's right!

@Alan: The reason why this is in mircommon is because AndroidInputReceiver is in mircommon and I use MultiplexingDispatcher there in the next step of the merge proposals.

Alan Griffiths (alan-griffiths) wrote :

OK, I'm not convinced we have mir::input in the correct library but this MP didn't cause that.

The next concern I have is whether this code is sufficiently generally applicable to meet the needs of different toolkits. It would be a shame it they each implemented something similar. But I'm not sufficiently informed about that to block.

review: Abstain
Robert Carr (robertcarr) wrote :

InputReceiver was only in mircommon for the qtmir psuedo surface of days past.

Chris Halse Rogers (raof) wrote :

@Robert - so there's a later cleanup available by way of moving that stuff into mirclient. Sweet.

@Alan - I'm not sure what you mean? This code isn't visible to toolkits; it's purely an implementation detail. It's sufficient to meet the needs of *my* code in subsequent MPs. It'd also be fine for obvious extensions of the client-fd API, such as multiple dispatch queues

Chris Halse Rogers (raof) wrote :

Hm. And if you mean “implemented something similar” then they mostly already do - each toolkit has its own “add this fd to the set of fds my event loop watches” type API. The GLib version of that specifically is not suitable for this use because you can't get a file descriptor *out* of it, so (unless I wanted to export a GSource from the Mir client API) it's not usable for this purpose.

Alexandros Frantzis (afrantzis) wrote :

Here is a scenario in which the code seems to not work correctly:

Assume there is one thread in dispatch() (*in_current_generation == 1) and MultiplexingDispatchable currently watches 2 fds, fd1/dispatchee1 and fd2/dispatchee2.

After a call to remove_watch(fd1), the gc_queue contains info to remove dispatchee1, and in_current_generation is pointing to a new generation counter with value 0 (see lines 499-).

Now a call to remove_watch(fd2) comes, finds *in_current_generation == 0 and frees dispatchee2 immediately, while the thread is still in dispatch().

If I understood the intention of the code correctly, then dispatchee2 shouldn't have been freed immediately (i.e. while the thread is running). Have I missed something?

review: Needs Information
Robert Carr (robertcarr) wrote :

I still wouldn't be unhappy to see the difficult to understand GC system replaced with a read/write mutex based implementation as discussed in the IRC log ;)

2295. By Chris Halse Rogers on 2015-02-04

Merge trunk

2296. By Chris Halse Rogers on 2015-02-04

Fix race in MultiplexingDispatcher::remove_watch()

We tried to optimise the case where there are no threads in dispatch() by
immediately deleting the relevant Dispatchable.

Sadly, this was racy in the presence of multiple calls to remove_watch():
after the first call to remove_watch() we can no longer determine whether
there are any threads in dispatch() using the Dispatchable we want to remove.

Add a test, and remove the optimisation to fix it.

2297. By Chris Halse Rogers on 2015-02-04

Better document the assumptions pull_from_gc_queue makes

Chris Halse Rogers (raof) wrote :

Thanks for the catch, Alexandros!

I've written a test which triggers that bug, and fixed it.

Bah trying to add a fastpath :)

Chris Halse Rogers (raof) wrote :

Hm, actually, that test is insufficient. Let me fix it.

review: Needs Fixing
2298. By Chris Halse Rogers on 2015-02-05

Fix multiple-removal threadsafety test, and associated code.

Chris Halse Rogers (raof) wrote :

Fixed the test, and also the code.

review: Abstain
Robert Carr (robertcarr) wrote :

SHA1SUMS STRIKE AGAIN

2299. By Chris Halse Rogers on 2015-02-06

Merge trunk

Alexandros Frantzis (afrantzis) wrote :

> I still wouldn't be unhappy to see the difficult to understand GC system replaced with a
> read/write mutex based implementation as discussed in the IRC log ;)

This sounds sensible. I haven't though through the details, but it seems that a read/write lock would make the code *much* simpler. There is going to be a performance hit of course, but I am not convinced it is a problem (uncontended locks are cheap anyway). If we find that it is a bottleneck, we can then look into more complex solutions, such as the lockless-main-path/GC one proposed here.

That being said, the new code seems correct, although I think it's on the verge of being "so complicated that there are no obvious deficiencies".

review: Abstain
Chris Halse Rogers (raof) wrote :
Download full text (5.4 KiB)

Ok! Good news for those who think the atomics+GC system is too complex. I've done some benchmarks for worst-case overhead - these spin up 8 threads (one per logical CPU on my system) and concurrently dispatch a dispatchable that does nothing but increment an atomic counter until it reaches the limit.

Three different implementations - the current atomic one, RecursiveReadWriteMutex, and pthread_rwlock:

Funky atomics+GC:

 ~/Canonical/Mir/mir/add-multiplexing-dispatchable/build  perf stat --repeat=10 bin/benchmark_multiplexing_dispatchable 8 10000000
Dispatching 10000000 times took 9488484241ns
Dispatching 10000000 times took 9396650870ns
Dispatching 10000000 times took 9430338483ns
Dispatching 10000000 times took 9422382181ns
Dispatching 10000000 times took 9316097769ns
Dispatching 10000000 times took 9048800767ns
Dispatching 10000000 times took 9230631179ns
Dispatching 10000000 times took 9059141983ns
Dispatching 10000000 times took 9442825755ns
Dispatching 10000000 times took 9030773329ns

 Performance counter stats for 'bin/benchmark_multiplexing_dispatchable 8 10000000' (10 runs):

      56429.679021 task-clock (msec) # 6.075 CPUs utilized ( +- 0.79% )
         1,090,378 context-switches # 0.019 M/sec ( +- 0.94% )
               266 cpu-migrations # 0.005 K/sec ( +- 13.22% )
               176 page-faults # 0.003 K/sec ( +- 0.54% )
   166,455,416,913 cycles # 2.950 GHz ( +- 0.79% )
   <not supported> stalled-cycles-frontend
   <not supported> stalled-cycles-backend
    55,263,597,398 instructions # 0.33 insns per cycle ( +- 0.73% )
    13,009,159,774 branches # 230.538 M/sec ( +- 0.88% )
       122,711,777 branch-misses # 0.94% of all branches ( +- 0.53% )

       9.288753925 seconds time elapsed ( +- 0.62% )

RecursiveReadWriteLock:

 ~/Canonical/Mir/mir/add-multiplexing-dispatchable/build  perf stat -B --repeat 10 bin/benchmark_multiplexing_dispatchable 8 10000000
Dispatching 10000000 times took 17776611842ns
Dispatching 10000000 times took 17162422952ns
Dispatching 10000000 times took 16978416490ns
Dispatching 10000000 times took 16816637546ns
Dispatching 10000000 times took 17111542565ns
Dispatching 10000000 times took 16939661008ns
Dispatching 10000000 times took 17518474808ns
Dispatching 10000000 times took 16914539739ns
Dispatching 10000000 times took 17174350069ns
Dispatching 10000000 times took 17391618505ns

 Performance counter stats for 'bin/benchmark_multiplexing_dispatchable 8 10000000' (10 runs):

     108793.953081 task-clock (msec) # 6.332 CPUs utilized ( +- 0.62% )
         4,336,580 context-switches # 0.040 M/sec ( +- 0.91% )
             1,617 cpu-migrations # 0.015 K/sec ( +- 22.08% )
               183 page-faults # 0.002 K/sec ...

Read more...

review: Needs Fixing
2300. By Chris Halse Rogers on 2015-02-11

Add benchmark for md::MultiplexingDispatchable

2301. By Chris Halse Rogers on 2015-02-11

Use a pthread read-write lock in md::MultiDispatchable rather than atomics+GC.

It turns out that pthread_rwlock is just as fast and is significantly
more obviously correct.

The existing mir::RecursiveReadWriteMutex is half as fast and has a
lot of behaviours we don't need here; recursive locking and being
able to take both a read-lock and a write-lock in the same thread.

2302. By Chris Halse Rogers on 2015-02-11

Make the benchmark of md::MultiplexingDispatchable a bit more specific.

Make the dispatch count thread local, rather than process-global; this significantly
reduces the time spent bouncing CPU cachelines in dispatch(), and so makes the runtime
more directly due to md::MultiplexingDispatchable overhead.

2303. By Chris Halse Rogers on 2015-02-11

Add a test for, and fix, concurrent automatic removals in md::MultiplexinDispatchable

2304. By Chris Halse Rogers on 2015-02-11

Add copyright header to MultiplexingDispatchable benchmark

Chris Halse Rogers (raof) wrote :

There we go. Now with read-write lock.

review: Abstain
2305. By Chris Halse Rogers on 2015-02-11

Fix typo in comment

2306. By Chris Halse Rogers on 2015-02-11

Mark MultiplexingDispatchable as final.

It's not possible to usefully derive from it, so make it a compile
error to try.

Might also help compilers generate better code. Who knows!

2307. By Chris Halse Rogers on 2015-02-12

Oops. RAII correctly. Thanks, ThreadSanitizer!

2308. By Chris Halse Rogers on 2015-02-12

Check pipe() and write() returns in MultiplexingDispatchable benchmark

2309. By Chris Halse Rogers on 2015-02-12

Make ~ReadLock() and ~WriteLock() noexcept.

While pthread_rwlock_unlock can technically fail, it can only fail in cases where
the constructor would have thrown anyway, so we gain nothing by error-checking
in the destructor.

2310. By Chris Halse Rogers on 2015-02-12

Merge trunk

Alexandros Frantzis (afrantzis) wrote :

Looks good.

review: Approve
Daniel van Vugt (vanvugt) wrote :

(1) This will limit our performance to only being able to handle one event per timeslice/user-kernel switch:
    459 + auto result = epoll_wait(epoll_fd, &event, 1, 0);

Much higher performance can be achieved in theory if you pass an array of N events, and loop through them afterwards. Then our theoretical performance upper limit becomes N events per timeslice instead of one. Rather important for the multiplexing case...

For an example see: man epoll

review: Needs Fixing
Daniel van Vugt (vanvugt) wrote :

And yes epoll_wait is that dumb. Looking at the source it's just implemented a light wrapper around a syscall. So if we don't do the batching in our own code it won't happen. And we'll be forced to switch to/from the kernel once per event... :P

Chris Halse Rogers (raof) wrote :

Why do you think the epoll call is going to relinquish our timeslice? It's got a timeout of 0 - ie: return immediately without sleeping.

While it's true that we could achieve higher single-thread throughput by acquiring all available events with a single call, this would be either at the cost of multi-thread throughput or significantly more complex code. If we batch then we have a single thread sequentially processing each member of the batch - which somewhat defeats the purpose of making this thread safe, which I specifically use to provide the existing threaded behaviour in later branches.

While syscalls are relatively expensive, they certainly don't terminate your timeslice. Check out the benchmark_multiplexing_dispatchable - on my machine each event dispatch takes <500ns, or ~1,500 CPU cycles.

Chris Halse Rogers (raof) wrote :

Or, to put it another way - when I switched the benchmark to using a thread-local counter for the test dispatchable it reduced the runtime of the benchmark by 20%. The time spent bouncing an atomic int from CPU to CPU was a significant fraction of the total runtime.

Daniel van Vugt (vanvugt) wrote :

Yeah realistically we won't relinquish the time slice (assuming the kernel is sane). But bouncing into and out of kernel space will cost us. This is why the glibc documentation (and even asio's implementation) uses an array of events. And bouncing between multiple threads costs even more.

If you have multiple events ready and there's a chance you can handle most of them in one time slice (very likely else we need to fix our code), then the array approach will be dramatically more efficient.

You can decide separately if more than one thread is still required. But keep in mind that in a single time slice we can do billions of simple operations. If you intentionally divide up responsibility to a new thread each time, you limit yourself to only thousands of operations. That's potentially a million times slower for the simplest of ops.

It might sound like we're splitting hairs but this is the stuff that will help Mir to catch up to Wayland's performance. And we aim to overtake.

Daniel van Vugt (vanvugt) wrote :

Correction:

You can decide separately if more than one thread is still required. But keep in mind that in a single time slice we can do millions of simple operations without any context switching. If you intentionally divide up responsibility to a different thread each event, you might not use any more CPU time, but you will take significantly greater real time.

Threads are the problem we need to kill in rewriting our sockets code and catching up to Wayland.

Chris Halse Rogers (raof) wrote :

1) Bouncing in and out of kernel space costs us, but not all that much.
2) If there are multiple events on a single Dispatchable then passing in an array of events will still only pick up that one Dispatchable
3) If there are multiple Dispatchables ready then this is *exactly* the time that we need to process them on multiple threads (for our threaded input dispatch model).
4) If you're doing eventloop dispatch then you *still* have to dispatch a single event at a time, because that's what toolkits want.

Threads are absolutely not the primary problem with our RPC code, and batching epoll calls will provide no meaningful improvement to the performance of this code.

Daniel van Vugt (vanvugt) wrote :

I think we might be getting ahead of ourselves.

So long as the Mir server socket code doesn't use this, it's not a problem. It's only our server-side socket handling that will need a more efficient approach than epoll_wait with a single event.

Chris Halse Rogers (raof) wrote :

So, can I assume that's not needs-fixing, then?

Daniel van Vugt (vanvugt) wrote :

If you mean "we won't ever use this for the server side socket logic" then I'm happy to abstain.

review: Needs Information
Chris Halse Rogers (raof) wrote :

I mean, we don't currently use this for server side socket logic, and
if we do we can revisit the batching.

At some point I can also optimise the (actually pretty common case) of
adding a watch on a MultiplexingDispatchable.

Chris Halse Rogers (raof) wrote :

And once we get to the server side logic we can work out whether
threading client requests is a benefit, too.

Daniel van Vugt (vanvugt) :
review: Abstain

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'benchmarks/CMakeLists.txt'
2--- benchmarks/CMakeLists.txt 2014-11-02 11:35:32 +0000
3+++ benchmarks/CMakeLists.txt 2015-02-12 05:55:48 +0000
4@@ -10,3 +10,11 @@
5 add_subdirectory(frame-uniformity)
6 add_dependencies(benchmarks frame_uniformity_test_client)
7 endif ()
8+
9+add_executable(benchmark_multiplexing_dispatchable
10+ benchmark_multiplexing_dispatchable.cpp
11+)
12+
13+target_link_libraries(benchmark_multiplexing_dispatchable
14+ mircommon
15+)
16
17=== added file 'benchmarks/benchmark_multiplexing_dispatchable.cpp'
18--- benchmarks/benchmark_multiplexing_dispatchable.cpp 1970-01-01 00:00:00 +0000
19+++ benchmarks/benchmark_multiplexing_dispatchable.cpp 2015-02-12 05:55:48 +0000
20@@ -0,0 +1,122 @@
21+/*
22+ * Copyright © 2015 Canonical Ltd.
23+ *
24+ * This program is free software: you can redistribute it and/or modify
25+ * it under the terms of the GNU General Public License version 3 as
26+ * published by the Free Software Foundation.
27+ *
28+ * This program is distributed in the hope that it will be useful,
29+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
30+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31+ * GNU General Public License for more details.
32+ *
33+ * You should have received a copy of the GNU General Public License
34+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
35+ *
36+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
37+ */
38+
39+#include "mir/dispatch/multiplexing_dispatchable.h"
40+#include "mir/dispatch/simple_dispatch_thread.h"
41+
42+#include <iostream>
43+#include <atomic>
44+#include <vector>
45+#include <memory>
46+#include <chrono>
47+#include <poll.h>
48+#include <unistd.h>
49+
50+namespace md = mir::dispatch;
51+
52+class TestDispatchable : public md::Dispatchable
53+{
54+public:
55+ TestDispatchable(uint64_t limit)
56+ : dispatch_limit{limit}
57+ {
58+ int pipefds[2];
59+ if (pipe(pipefds) < 0)
60+ {
61+ throw std::system_error{errno, std::system_category(), "Failed to create pipe"};
62+ }
63+
64+ read_fd = mir::Fd{pipefds[0]};
65+ write_fd = mir::Fd{pipefds[1]};
66+
67+ char dummy{0};
68+ if (::write(write_fd, &dummy, sizeof(dummy)) != sizeof(dummy))
69+ {
70+ throw std::system_error{errno, std::system_category(), "Failed to mark dispatchable"};
71+ }
72+ }
73+
74+ mir::Fd watch_fd() const override
75+ {
76+ return read_fd;
77+ }
78+ bool dispatch(md::FdEvents) override
79+ {
80+ ++dispatch_count;
81+ return (dispatch_count < dispatch_limit);
82+ }
83+ md::FdEvents relevant_events() const override
84+ {
85+ return md::FdEvent::readable;
86+ }
87+
88+private:
89+ static thread_local uint64_t dispatch_count;
90+ uint64_t const dispatch_limit;
91+ mir::Fd read_fd, write_fd;
92+};
93+
94+thread_local uint64_t TestDispatchable::dispatch_count = 0;
95+
96+bool fd_is_readable(int fd)
97+{
98+ struct pollfd poller {
99+ fd,
100+ POLLIN,
101+ 0
102+ };
103+ return poll(&poller, 1, 0);
104+}
105+
106+int main(int argc, char** argv)
107+{
108+ if (argc != 3)
109+ {
110+ std::cout<<"Usage: "<<argv[0]<<" <number of threads> <dispatch count>"<<std::endl;
111+ exit(1);
112+ }
113+
114+ int const thread_count = std::atoi(argv[1]);
115+ uint64_t const dispatch_count = std::atoll(argv[2]);
116+
117+ auto dispatcher = std::make_shared<md::MultiplexingDispatchable>();
118+ dispatcher->add_watch(std::make_shared<TestDispatchable>(dispatch_count / thread_count), md::DispatchReentrancy::reentrant);
119+
120+ auto start = std::chrono::steady_clock::now();
121+
122+ std::vector<std::thread> thread_loops;
123+ for (int i = 0; i < thread_count; ++i)
124+ {
125+ thread_loops.emplace_back([](md::Dispatchable& dispatch)
126+ {
127+ while(fd_is_readable(dispatch.watch_fd()))
128+ {
129+ dispatch.dispatch(md::FdEvent::readable);
130+ }
131+ }, std::ref(*dispatcher));
132+ }
133+
134+ for (auto& thread : thread_loops)
135+ {
136+ thread.join();
137+ }
138+
139+ auto duration = std::chrono::steady_clock::now() - start;
140+ std::cout<<"Dispatching "<<dispatch_count<<" times took "<<std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count()<<"ns"<<std::endl;
141+ exit(0);
142+}
143
144=== added file 'include/common/mir/dispatch/multiplexing_dispatchable.h'
145--- include/common/mir/dispatch/multiplexing_dispatchable.h 1970-01-01 00:00:00 +0000
146+++ include/common/mir/dispatch/multiplexing_dispatchable.h 2015-02-12 05:55:48 +0000
147@@ -0,0 +1,106 @@
148+/*
149+ * Copyright © 2015 Canonical Ltd.
150+ *
151+ * This program is free software: you can redistribute it and/or modify it
152+ * under the terms of the GNU Lesser General Public License version 3,
153+ * as published by the Free Software Foundation.
154+ *
155+ * This program is distributed in the hope that it will be useful,
156+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
157+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
158+ * GNU Lesser General Public License for more details.
159+ *
160+ * You should have received a copy of the GNU Lesser General Public License
161+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
162+ *
163+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
164+ */
165+
166+#ifndef MIR_DISPATCH_MULTIPLEXING_DISPATCHABLE_H_
167+#define MIR_DISPATCH_MULTIPLEXING_DISPATCHABLE_H_
168+
169+#include "mir/dispatch/dispatchable.h"
170+
171+#include <functional>
172+#include <initializer_list>
173+#include <list>
174+#include <mutex>
175+#include <tuple>
176+#include <atomic>
177+
178+#include <pthread.h>
179+
180+namespace mir
181+{
182+namespace dispatch
183+{
184+/**
185+ * \brief How concurrent dispatch should be handled
186+ */
187+enum class DispatchReentrancy
188+{
189+ sequential, /**< The dispatch function is guaranteed not to be called
190+ * while a thread is currently running it.
191+ */
192+ reentrant /**< The dispatch function may be called on multiple threads
193+ * simultaneously
194+ */
195+};
196+
197+/**
198+ * \brief An adaptor that combines multiple Dispatchables into a single Dispatchable
199+ * \note Instances are fully thread-safe.
200+ */
201+class MultiplexingDispatchable final : public Dispatchable
202+{
203+public:
204+ MultiplexingDispatchable();
205+ MultiplexingDispatchable(std::initializer_list<std::shared_ptr<Dispatchable>> dispatchees);
206+ virtual ~MultiplexingDispatchable() noexcept;
207+
208+ MultiplexingDispatchable& operator=(MultiplexingDispatchable const&) = delete;
209+ MultiplexingDispatchable(MultiplexingDispatchable const&) = delete;
210+
211+ Fd watch_fd() const override;
212+ bool dispatch(FdEvents events) override;
213+ FdEvents relevant_events() const override;
214+
215+ /**
216+ * \brief Add a dispatchable to the adaptor
217+ * \param [in] dispatchee Dispatchable to add. The Dispatchable's dispatch()
218+ * function will not be called reentrantly.
219+ */
220+ void add_watch(std::shared_ptr<Dispatchable> const& dispatchee);
221+ /**
222+ * \brief Add a dispatchable to the adaptor, specifying the reentrancy of dispatch()
223+ */
224+ void add_watch(std::shared_ptr<Dispatchable> const& dispatchee, DispatchReentrancy reentrancy);
225+
226+ /**
227+ * \brief Add a simple callback to the adaptor
228+ * \param [in] fd File descriptor to monitor for readability
229+ * \param [in] callback Callback to fire when \ref fd becomes readable.
230+ * This callback is not called reentrantly.
231+ */
232+ void add_watch(Fd const& fd, std::function<void()> const& callback);
233+
234+ /**
235+ * \brief Remove a watch from the dispatchable
236+ * \param [in] dispatchee Dispatchable to remove
237+ */
238+ void remove_watch(std::shared_ptr<Dispatchable> const& dispatchee);
239+
240+ /**
241+ * \brief Remove a watch by file-descriptor
242+ * \param [in] fd File descriptor of watch to remove.
243+ */
244+ void remove_watch(Fd const& fd);
245+private:
246+ pthread_rwlock_t lifetime_mutex;
247+ std::list<std::pair<std::shared_ptr<Dispatchable>, bool>> dispatchee_holder;
248+
249+ Fd epoll_fd;
250+};
251+}
252+}
253+#endif // MIR_DISPATCH_MULTIPLEXING_DISPATCHABLE_H_
254
255=== modified file 'src/common/dispatch/CMakeLists.txt'
256--- src/common/dispatch/CMakeLists.txt 2015-01-21 01:00:57 +0000
257+++ src/common/dispatch/CMakeLists.txt 2015-02-12 05:55:48 +0000
258@@ -17,6 +17,8 @@
259 list(
260 APPEND MIR_COMMON_SOURCES
261 ${CMAKE_CURRENT_SOURCE_DIR}/simple_dispatch_thread.cpp
262+ ${CMAKE_CURRENT_SOURCE_DIR}/multiplexing_dispatchable.cpp
263+ ${CMAKE_CURRENT_SOURCE_DIR}/utils.cpp
264 )
265
266 set(MIR_COMMON_SOURCES ${MIR_COMMON_SOURCES} PARENT_SCOPE)
267
268=== added file 'src/common/dispatch/multiplexing_dispatchable.cpp'
269--- src/common/dispatch/multiplexing_dispatchable.cpp 1970-01-01 00:00:00 +0000
270+++ src/common/dispatch/multiplexing_dispatchable.cpp 2015-02-12 05:55:48 +0000
271@@ -0,0 +1,300 @@
272+/*
273+ * Copyright © 2015 Canonical Ltd.
274+ *
275+ * This program is free software: you can redistribute it and/or modify it
276+ * under the terms of the GNU Lesser General Public License version 3,
277+ * as published by the Free Software Foundation.
278+ *
279+ * This program is distributed in the hope that it will be useful,
280+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
281+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
282+ * GNU Lesser General Public License for more details.
283+ *
284+ * You should have received a copy of the GNU Lesser General Public License
285+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
286+ *
287+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
288+ */
289+
290+#include "mir/dispatch/multiplexing_dispatchable.h"
291+#include "utils.h"
292+#include "mir/raii.h"
293+
294+#include <boost/throw_exception.hpp>
295+
296+#include <sys/epoll.h>
297+#include <poll.h>
298+#include <limits.h>
299+#include <unistd.h>
300+#include <string.h>
301+#include <system_error>
302+#include <algorithm>
303+
304+namespace md = mir::dispatch;
305+
306+namespace
307+{
308+class DispatchableAdaptor : public md::Dispatchable
309+{
310+public:
311+ DispatchableAdaptor(mir::Fd const& fd, std::function<void()> const& callback)
312+ : fd{fd},
313+ handler{callback}
314+ {
315+ }
316+
317+ mir::Fd watch_fd() const override
318+ {
319+ return fd;
320+ }
321+
322+ bool dispatch(md::FdEvents events) override
323+ {
324+ if (events & md::FdEvent::error)
325+ {
326+ return false;
327+ }
328+ handler();
329+ return true;
330+ }
331+
332+ md::FdEvents relevant_events() const override
333+ {
334+ return md::FdEvent::readable;
335+ }
336+private:
337+ mir::Fd const fd;
338+ std::function<void()> const handler;
339+};
340+
341+class ReadLock
342+{
343+public:
344+ ReadLock(pthread_rwlock_t& lock)
345+ : mutex{&lock}
346+ {
347+ auto err = pthread_rwlock_rdlock(mutex);
348+ if (err != 0)
349+ {
350+ BOOST_THROW_EXCEPTION((std::system_error{err,
351+ std::system_category(),
352+ "Failed to acquire read lock"}));
353+ }
354+ }
355+
356+ ~ReadLock() noexcept
357+ {
358+ pthread_rwlock_unlock(mutex);
359+ }
360+private:
361+ pthread_rwlock_t* mutex;
362+};
363+
364+class WriteLock
365+{
366+public:
367+ WriteLock(pthread_rwlock_t& lock)
368+ : mutex{&lock}
369+ {
370+ auto err = pthread_rwlock_wrlock(mutex);
371+ if (err != 0)
372+ {
373+ BOOST_THROW_EXCEPTION((std::system_error{err,
374+ std::system_category(),
375+ "Failed to acquire write lock"}));
376+ }
377+ }
378+
379+ ~WriteLock() noexcept
380+ {
381+ pthread_rwlock_unlock(mutex);
382+ }
383+private:
384+ pthread_rwlock_t* mutex;
385+};
386+}
387+
388+md::MultiplexingDispatchable::MultiplexingDispatchable()
389+ : epoll_fd{mir::Fd{::epoll_create1(EPOLL_CLOEXEC)}}
390+{
391+ if (epoll_fd == mir::Fd::invalid)
392+ {
393+ BOOST_THROW_EXCEPTION((std::system_error{errno,
394+ std::system_category(),
395+ "Failed to create epoll monitor"}));
396+ }
397+
398+ pthread_rwlockattr_t attr;
399+ int err;
400+ err = pthread_rwlockattr_init(&attr);
401+ if (err != 0)
402+ {
403+ BOOST_THROW_EXCEPTION((std::system_error{err,
404+ std::system_category(),
405+ "Failed to init pthread attrs"}));
406+ }
407+ // Set writer preference; otherwise remove_watch could block indefinitely
408+ err = pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
409+ if (err != 0)
410+ {
411+ BOOST_THROW_EXCEPTION((std::system_error{err,
412+ std::system_category(),
413+ "Failed to set preferred rw-lock mode"}));
414+ }
415+ err = pthread_rwlock_init(&lifetime_mutex, &attr);
416+ if (err != 0)
417+ {
418+ BOOST_THROW_EXCEPTION((std::system_error{err,
419+ std::system_category(),
420+ "Failed to init rw-lock"}));
421+ }
422+
423+ pthread_rwlockattr_destroy(&attr);
424+}
425+
426+md::MultiplexingDispatchable::~MultiplexingDispatchable() noexcept
427+{
428+ pthread_rwlock_destroy(&lifetime_mutex);
429+}
430+
431+md::MultiplexingDispatchable::MultiplexingDispatchable(std::initializer_list<std::shared_ptr<Dispatchable>> dispatchees)
432+ : MultiplexingDispatchable()
433+{
434+ for (auto& target : dispatchees)
435+ {
436+ add_watch(target);
437+ }
438+}
439+
440+mir::Fd md::MultiplexingDispatchable::watch_fd() const
441+{
442+ return epoll_fd;
443+}
444+
445+bool md::MultiplexingDispatchable::dispatch(md::FdEvents events)
446+{
447+ if (events & md::FdEvent::error)
448+ {
449+ return false;
450+ }
451+
452+ std::shared_ptr<md::Dispatchable> source;
453+ bool rearm_source{false};
454+ epoll_event event;
455+
456+ {
457+ ReadLock lock{lifetime_mutex};
458+
459+ auto result = epoll_wait(epoll_fd, &event, 1, 0);
460+
461+ if (result < 0)
462+ {
463+ BOOST_THROW_EXCEPTION((std::system_error{errno,
464+ std::system_category(),
465+ "Failed to wait on fds"}));
466+ }
467+
468+ if (result == 0)
469+ {
470+ // Some other thread must have stolen the event we were woken for;
471+ // that's ok, just return.
472+ return true;
473+ }
474+
475+ auto event_source = reinterpret_cast<decltype(dispatchee_holder)::pointer>(event.data.ptr);
476+
477+ source = event_source->first;
478+ rearm_source = event_source->second;
479+ }
480+
481+ if (!source->dispatch(epoll_to_fd_event(event)))
482+ {
483+ remove_watch(source);
484+ }
485+ else if (rearm_source)
486+ {
487+ event.events = fd_event_to_epoll(source->relevant_events()) | EPOLLONESHOT;
488+ epoll_ctl(epoll_fd, EPOLL_CTL_MOD, source->watch_fd(), &event);
489+ }
490+
491+ return true;
492+}
493+
494+md::FdEvents md::MultiplexingDispatchable::relevant_events() const
495+{
496+ return md::FdEvent::readable;
497+}
498+
499+void md::MultiplexingDispatchable::add_watch(std::shared_ptr<md::Dispatchable> const& dispatchee)
500+{
501+ add_watch(dispatchee, DispatchReentrancy::sequential);
502+}
503+
504+void md::MultiplexingDispatchable::add_watch(std::shared_ptr<md::Dispatchable> const& dispatchee,
505+ DispatchReentrancy reentrancy)
506+{
507+ decltype(dispatchee_holder)::iterator new_holder;
508+ {
509+ WriteLock lock{lifetime_mutex};
510+ new_holder = dispatchee_holder.emplace(dispatchee_holder.begin(),
511+ dispatchee,
512+ reentrancy == DispatchReentrancy::sequential);
513+ }
514+
515+ epoll_event e;
516+ ::memset(&e, 0, sizeof(e));
517+
518+ e.events = fd_event_to_epoll(dispatchee->relevant_events());
519+ if (reentrancy == DispatchReentrancy::sequential)
520+ {
521+ e.events |= EPOLLONESHOT;
522+ }
523+ e.data.ptr = static_cast<void*>(&(*new_holder));
524+ if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, dispatchee->watch_fd(), &e) < 0)
525+ {
526+ WriteLock lock{lifetime_mutex};
527+ dispatchee_holder.erase(new_holder);
528+ if (errno == EEXIST)
529+ {
530+ BOOST_THROW_EXCEPTION((std::logic_error{"Attempted to monitor the same fd twice"}));
531+ }
532+ BOOST_THROW_EXCEPTION((std::system_error{errno,
533+ std::system_category(),
534+ "Failed to monitor fd"}));
535+ }
536+}
537+
538+void md::MultiplexingDispatchable::add_watch(Fd const& fd, std::function<void()> const& callback)
539+{
540+ add_watch(std::make_shared<DispatchableAdaptor>(fd, callback));
541+}
542+
543+void md::MultiplexingDispatchable::remove_watch(std::shared_ptr<Dispatchable> const& dispatchee)
544+{
545+ remove_watch(dispatchee->watch_fd());
546+}
547+
548+void md::MultiplexingDispatchable::remove_watch(Fd const& fd)
549+{
550+ if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr))
551+ {
552+ if (errno == ENOENT)
553+ {
554+ // If reentrant dispatch returns false we can try to remove the same dispatchable twice.
555+ //
556+ // The reference-counting on mir::Fd should prevent the fd being closed, and
557+ // hence the handle being reused, before we've processed all such removals,
558+ // so this should not be racy with new Dispatchable creation + add_watch.
559+ return;
560+ }
561+ BOOST_THROW_EXCEPTION((std::system_error{errno,
562+ std::system_category(),
563+ "Failed to remove fd monitor"}));
564+ }
565+
566+ WriteLock lock{lifetime_mutex};
567+ dispatchee_holder.remove_if([&fd](std::pair<std::shared_ptr<Dispatchable>,bool> const& candidate)
568+ {
569+ return candidate.first->watch_fd() == fd;
570+ });
571+}
572
573=== modified file 'src/common/dispatch/simple_dispatch_thread.cpp'
574--- src/common/dispatch/simple_dispatch_thread.cpp 2015-01-21 01:14:19 +0000
575+++ src/common/dispatch/simple_dispatch_thread.cpp 2015-02-12 05:55:48 +0000
576@@ -18,6 +18,7 @@
577
578 #include "mir/dispatch/simple_dispatch_thread.h"
579 #include "mir/dispatch/dispatchable.h"
580+#include "utils.h"
581
582 #include <sys/epoll.h>
583 #include <unistd.h>
584@@ -29,49 +30,6 @@
585
586 namespace
587 {
588-md::FdEvents epoll_to_fd_event(epoll_event const& event)
589-{
590- md::FdEvents val{0};
591- if (event.events & EPOLLIN)
592- {
593- val |= md::FdEvent::readable;
594- }
595- if (event.events & EPOLLOUT)
596- {
597- val = md::FdEvent::writable;
598- }
599- if (event.events & (EPOLLHUP | EPOLLRDHUP))
600- {
601- val |= md::FdEvent::remote_closed;
602- }
603- if (event.events & EPOLLERR)
604- {
605- val = md::FdEvent::error;
606- }
607- return val;
608-}
609-
610-int fd_event_to_epoll(md::FdEvents const& event)
611-{
612- int epoll_value{0};
613- if (event & md::FdEvent::readable)
614- {
615- epoll_value |= EPOLLIN;
616- }
617- if (event & md::FdEvent::writable)
618- {
619- epoll_value |= EPOLLOUT;
620- }
621- if (event & md::FdEvent::remote_closed)
622- {
623- epoll_value |= EPOLLRDHUP | EPOLLHUP;
624- }
625- if (event & md::FdEvent::error)
626- {
627- epoll_value |= EPOLLERR;
628- }
629- return epoll_value;
630-}
631
632 void wait_for_events_forever(std::shared_ptr<md::Dispatchable> const& dispatchee, mir::Fd shutdown_fd)
633 {
634@@ -97,7 +55,7 @@
635
636 // Ask the dispatchee what it events it's interested in...
637 event.data.u32 = fd_names::dispatchee_fd;
638- event.events = fd_event_to_epoll(dispatchee->relevant_events());
639+ event.events = md::fd_event_to_epoll(dispatchee->relevant_events());
640 epoll_ctl(epoll_fd, EPOLL_CTL_ADD, dispatchee->watch_fd(), &event);
641
642 for (;;)
643@@ -105,7 +63,7 @@
644 epoll_wait(epoll_fd, &event, 1, -1);
645 if (event.data.u32 == fd_names::dispatchee_fd)
646 {
647- if (!dispatchee->dispatch(epoll_to_fd_event(event)))
648+ if (!dispatchee->dispatch(md::epoll_to_fd_event(event)))
649 {
650 // No need to keep looping, the Dispatchable's not going to produce any more events.
651 return;
652
653=== added file 'src/common/dispatch/utils.cpp'
654--- src/common/dispatch/utils.cpp 1970-01-01 00:00:00 +0000
655+++ src/common/dispatch/utils.cpp 2015-02-12 05:55:48 +0000
656@@ -0,0 +1,65 @@
657+/*
658+ * Copyright © 2015 Canonical Ltd.
659+ *
660+ * This program is free software: you can redistribute it and/or modify it
661+ * under the terms of the GNU Lesser General Public License version 3,
662+ * as published by the Free Software Foundation.
663+ *
664+ * This program is distributed in the hope that it will be useful,
665+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
666+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
667+ * GNU Lesser General Public License for more details.
668+ *
669+ * You should have received a copy of the GNU Lesser General Public License
670+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
671+ *
672+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
673+ */
674+
675+#include "utils.h"
676+
677+namespace md = mir::dispatch;
678+
679+md::FdEvents md::epoll_to_fd_event(epoll_event const& event)
680+{
681+ FdEvents val{0};
682+ if (event.events & EPOLLIN)
683+ {
684+ val |= FdEvent::readable;
685+ }
686+ if (event.events & EPOLLOUT)
687+ {
688+ val |= FdEvent::writable;
689+ }
690+ if (event.events & (EPOLLHUP | EPOLLRDHUP))
691+ {
692+ val |= FdEvent::remote_closed;
693+ }
694+ if (event.events & EPOLLERR)
695+ {
696+ val |= FdEvent::error;
697+ }
698+ return val;
699+}
700+
701+int md::fd_event_to_epoll(FdEvents const& event)
702+{
703+ int epoll_value{0};
704+ if (event & FdEvent::readable)
705+ {
706+ epoll_value |= EPOLLIN;
707+ }
708+ if (event & FdEvent::writable)
709+ {
710+ epoll_value |= EPOLLOUT;
711+ }
712+ if (event & FdEvent::remote_closed)
713+ {
714+ epoll_value |= EPOLLRDHUP | EPOLLHUP;
715+ }
716+ if (event & FdEvent::error)
717+ {
718+ epoll_value |= EPOLLERR;
719+ }
720+ return epoll_value;
721+}
722
723=== added file 'src/common/dispatch/utils.h'
724--- src/common/dispatch/utils.h 1970-01-01 00:00:00 +0000
725+++ src/common/dispatch/utils.h 2015-02-12 05:55:48 +0000
726@@ -0,0 +1,38 @@
727+/*
728+ * Copyright © 2015 Canonical Ltd.
729+ *
730+ * This program is free software: you can redistribute it and/or modify it
731+ * under the terms of the GNU Lesser General Public License version 3,
732+ * as published by the Free Software Foundation.
733+ *
734+ * This program is distributed in the hope that it will be useful,
735+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
736+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
737+ * GNU Lesser General Public License for more details.
738+ *
739+ * You should have received a copy of the GNU Lesser General Public License
740+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
741+ *
742+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
743+ */
744+
745+#ifndef MIR_DISPATCH_UTILS_H_
746+#define MIR_DISPATCH_UTILS_H_
747+
748+#include "mir/dispatch/dispatchable.h"
749+#include <sys/epoll.h>
750+
751+namespace mir
752+{
753+namespace dispatch
754+{
755+
756+FdEvents epoll_to_fd_event(epoll_event const& event);
757+
758+int fd_event_to_epoll(FdEvents const& event);
759+
760+}
761+}
762+
763+
764+#endif // MIR_DISPATCH_UTILS_H_
765
766=== modified file 'src/common/input/android/android_input_receiver_thread.cpp'
767--- src/common/input/android/android_input_receiver_thread.cpp 2015-02-04 08:45:38 +0000
768+++ src/common/input/android/android_input_receiver_thread.cpp 2015-02-12 05:55:48 +0000
769@@ -26,7 +26,7 @@
770 namespace mircva = mir::input::receiver::android;
771
772 mircva::InputReceiverThread::InputReceiverThread(std::shared_ptr<mircva::InputReceiver> const& receiver,
773- std::function<void(MirEvent*)> const& event_handling_callback)
774+ std::function<void(MirEvent*)> const& event_handling_callback)
775 : receiver(receiver),
776 handler(event_handling_callback),
777 running(false)
778
779=== modified file 'src/common/symbols.map'
780--- src/common/symbols.map 2015-02-04 16:26:41 +0000
781+++ src/common/symbols.map 2015-02-12 05:55:48 +0000
782@@ -221,6 +221,17 @@
783 mir::dispatch::SimpleDispatchThread::?SimpleDispatchThread*;
784 typeinfo?for?mir::dispatch::SimpleDispatchThread;
785 vtable?for?mir::dispatch::SimpleDispatchThread;
786+
787+ mir::dispatch::MultiplexingDispatchable::MultiplexingDispatchable*;
788+ mir::dispatch::MultiplexingDispatchable::?MultiplexingDispatchable*;
789+ mir::dispatch::MultiplexingDispatchable::watch_fd*;
790+ mir::dispatch::MultiplexingDispatchable::dispatch*;
791+ mir::dispatch::MultiplexingDispatchable::relevant_events*;
792+ mir::dispatch::MultiplexingDispatchable::add_watch*;
793+
794+ typeinfo?for?mir::dispatch::MultiplexingDispatchable;
795+ vtable?for?mir::dispatch::MultiplexingDispatchable;
796+
797 mir::events::*
798 };
799 } MIR_COMMON_3.1;
800
801=== modified file 'tests/unit-tests/CMakeLists.txt'
802--- tests/unit-tests/CMakeLists.txt 2015-02-05 14:25:35 +0000
803+++ tests/unit-tests/CMakeLists.txt 2015-02-12 05:55:48 +0000
804@@ -88,6 +88,7 @@
805 mirclientrpc
806 mirclientlttngstatic
807 demo-shell
808+ mircommon
809
810 mir-test
811 mir-test-framework
812
813=== modified file 'tests/unit-tests/dispatch/CMakeLists.txt'
814--- tests/unit-tests/dispatch/CMakeLists.txt 2015-01-21 00:55:43 +0000
815+++ tests/unit-tests/dispatch/CMakeLists.txt 2015-02-12 05:55:48 +0000
816@@ -1,5 +1,7 @@
817 list(APPEND UNIT_TEST_SOURCES
818 ${CMAKE_CURRENT_SOURCE_DIR}/test_simple_dispatch_thread.cpp
819+ ${CMAKE_CURRENT_SOURCE_DIR}/test_multiplexing_dispatchable.cpp
820+ ${CMAKE_CURRENT_SOURCE_DIR}/test_dispatch_utils.cpp
821 )
822
823 set(UNIT_TEST_SOURCES ${UNIT_TEST_SOURCES} PARENT_SCOPE)
824
825=== added file 'tests/unit-tests/dispatch/test_dispatch_utils.cpp'
826--- tests/unit-tests/dispatch/test_dispatch_utils.cpp 1970-01-01 00:00:00 +0000
827+++ tests/unit-tests/dispatch/test_dispatch_utils.cpp 2015-02-12 05:55:48 +0000
828@@ -0,0 +1,70 @@
829+/*
830+ * Copyright © 2015 Canonical Ltd.
831+ *
832+ * This program is free software: you can redistribute it and/or modify
833+ * it under the terms of the GNU General Public License version 3 as
834+ * published by the Free Software Foundation.
835+ *
836+ * This program is distributed in the hope that it will be useful,
837+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
838+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
839+ * GNU General Public License for more details.
840+ *
841+ * You should have received a copy of the GNU General Public License
842+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
843+ *
844+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
845+ */
846+
847+#include "src/common/dispatch/utils.h"
848+
849+#include <gtest/gtest.h>
850+#include <gmock/gmock.h>
851+
852+namespace md = mir::dispatch;
853+
854+TEST(DispatchUtilsTest, conversion_functions_roundtrip)
855+{
856+ using namespace testing;
857+
858+ epoll_event dummy;
859+ for (int event : {EPOLLIN, EPOLLOUT, EPOLLERR})
860+ {
861+ dummy.events = event;
862+ EXPECT_THAT(md::fd_event_to_epoll(md::epoll_to_fd_event(dummy)), Eq(event));
863+ }
864+
865+ // HUP is special; FdEvent doesn't differentiate between remote hangup and local hangup
866+ dummy.events = EPOLLHUP;
867+ EXPECT_THAT(md::fd_event_to_epoll(md::epoll_to_fd_event(dummy)), Eq(EPOLLHUP | EPOLLRDHUP));
868+ dummy.events = EPOLLRDHUP;
869+ EXPECT_THAT(md::fd_event_to_epoll(md::epoll_to_fd_event(dummy)), Eq(EPOLLHUP | EPOLLRDHUP));
870+}
871+
872+TEST(DispatchUtilsTest, epoll_to_fd_event_works_for_combinations)
873+{
874+ using namespace testing;
875+
876+ epoll_event dummy;
877+
878+ dummy.events = EPOLLIN | EPOLLOUT;
879+ EXPECT_THAT(md::epoll_to_fd_event(dummy), Eq(md::FdEvent::readable | md::FdEvent::writable));
880+
881+ dummy.events = EPOLLERR | EPOLLOUT;
882+ EXPECT_THAT(md::epoll_to_fd_event(dummy), Eq(md::FdEvent::error | md::FdEvent::writable));
883+
884+ dummy.events = EPOLLIN | EPOLLRDHUP;
885+ EXPECT_THAT(md::epoll_to_fd_event(dummy), Eq(md::FdEvent::readable | md::FdEvent::remote_closed));
886+}
887+
888+TEST(DispatchUtilsTest, fd_event_to_epoll_works_for_combinations)
889+{
890+ using namespace testing;
891+
892+ EXPECT_THAT(md::fd_event_to_epoll(md::FdEvent::readable | md::FdEvent::writable),
893+ Eq(EPOLLIN | EPOLLOUT));
894+ EXPECT_THAT(md::fd_event_to_epoll(md::FdEvent::error | md::FdEvent::writable),
895+ Eq(EPOLLERR | EPOLLOUT));
896+ EXPECT_THAT(md::fd_event_to_epoll(md::FdEvent::readable | md::FdEvent::remote_closed),
897+ Eq(EPOLLIN | EPOLLHUP | EPOLLRDHUP));
898+}
899
900=== added file 'tests/unit-tests/dispatch/test_multiplexing_dispatchable.cpp'
901--- tests/unit-tests/dispatch/test_multiplexing_dispatchable.cpp 1970-01-01 00:00:00 +0000
902+++ tests/unit-tests/dispatch/test_multiplexing_dispatchable.cpp 2015-02-12 05:55:48 +0000
903@@ -0,0 +1,487 @@
904+/*
905+ * Copyright © 2015 Canonical Ltd.
906+ *
907+ * This program is free software: you can redistribute it and/or modify
908+ * it under the terms of the GNU General Public License version 3 as
909+ * published by the Free Software Foundation.
910+ *
911+ * This program is distributed in the hope that it will be useful,
912+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
913+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
914+ * GNU General Public License for more details.
915+ *
916+ * You should have received a copy of the GNU General Public License
917+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
918+ *
919+ * Authored by: Christopher James Halse Rogers <christopher.halse.rogers@canonical.com>
920+ */
921+
922+#include "mir/dispatch/multiplexing_dispatchable.h"
923+#include "mir/dispatch/simple_dispatch_thread.h"
924+#include "mir/fd.h"
925+#include "mir_test/pipe.h"
926+#include "mir_test/signal.h"
927+#include "mir_test/fd_utils.h"
928+#include "mir_test/test_dispatchable.h"
929+#include "mir_test/auto_unblock_thread.h"
930+
931+#include <fcntl.h>
932+
933+#include <atomic>
934+#include <thread>
935+
936+#include <gtest/gtest.h>
937+#include <gmock/gmock.h>
938+
939+namespace md = mir::dispatch;
940+namespace mt = mir::test;
941+
942+TEST(MultiplexingDispatchableTest, dispatches_dispatchee_when_appropriate)
943+{
944+ bool dispatched{false};
945+ auto dispatchee = std::make_shared<mt::TestDispatchable>([&dispatched]() { dispatched = true; });
946+ md::MultiplexingDispatchable dispatcher{dispatchee};
947+
948+ dispatchee->trigger();
949+
950+ ASSERT_TRUE(mt::fd_is_readable(dispatcher.watch_fd()));
951+ dispatcher.dispatch(md::FdEvent::readable);
952+
953+ EXPECT_TRUE(dispatched);
954+}
955+
956+TEST(MultiplexingDispatchableTest, calls_correct_dispatchee_when_fd_becomes_readable)
957+{
958+ bool a_dispatched{false};
959+ auto dispatchee_a = std::make_shared<mt::TestDispatchable>([&a_dispatched]() { a_dispatched = true; });
960+
961+ bool b_dispatched{false};
962+ auto dispatchee_b = std::make_shared<mt::TestDispatchable>([&b_dispatched]() { b_dispatched = true; });
963+
964+ md::MultiplexingDispatchable dispatcher{dispatchee_a, dispatchee_b};
965+
966+ dispatchee_a->trigger();
967+
968+ ASSERT_TRUE(mt::fd_is_readable(dispatcher.watch_fd()));
969+ dispatcher.dispatch(md::FdEvent::readable);
970+
971+ EXPECT_TRUE(a_dispatched);
972+ EXPECT_FALSE(b_dispatched);
973+
974+ ASSERT_FALSE(mt::fd_is_readable(dispatcher.watch_fd()));
975+ a_dispatched = false;
976+
977+ dispatchee_b->trigger();
978+
979+ ASSERT_TRUE(mt::fd_is_readable(dispatcher.watch_fd()));
980+ dispatcher.dispatch(md::FdEvent::readable);
981+
982+ EXPECT_FALSE(a_dispatched);
983+ EXPECT_TRUE(b_dispatched);
984+}
985+
986+TEST(MultiplexingDispatchableTest, keeps_dispatching_until_fd_is_unreadable)
987+{
988+ bool dispatched{false};
989+ auto dispatchee = std::make_shared<mt::TestDispatchable>([&dispatched]() { dispatched = true; });
990+ md::MultiplexingDispatchable dispatcher{dispatchee};
991+
992+ int const trigger_count{10};
993+
994+ for (int i = 0; i < trigger_count; ++i)
995+ {
996+ dispatchee->trigger();
997+ }
998+
999+ for (int i = 0; i < trigger_count; ++i)
1000+ {
1001+ ASSERT_TRUE(mt::fd_is_readable(dispatcher.watch_fd()));
1002+ dispatcher.dispatch(md::FdEvent::readable);
1003+
1004+ EXPECT_TRUE(dispatched);
1005+ dispatched = false;
1006+ }
1007+
1008+ EXPECT_FALSE(mt::fd_is_readable(dispatcher.watch_fd()));
1009+}
1010+
1011+TEST(MultiplexingDispatchableTest, dispatching_without_pending_event_is_harmless)
1012+{
1013+ bool dispatched{false};
1014+ auto dispatchee = std::make_shared<mt::TestDispatchable>([&dispatched]() { dispatched = true; });
1015+ md::MultiplexingDispatchable dispatcher{dispatchee};
1016+
1017+ dispatcher.dispatch(md::FdEvent::readable);
1018+
1019+ EXPECT_FALSE(dispatched);
1020+}
1021+
1022+TEST(MultiplexingDispatchableTest, keeps_dispatchees_alive)
1023+{
1024+ bool dispatched{false};
1025+ auto dispatchee = std::make_shared<mt::TestDispatchable>([&dispatched]() { dispatched = true; });
1026+ dispatchee->trigger();
1027+
1028+ md::MultiplexingDispatchable dispatcher;
1029+ dispatcher.add_watch(dispatchee);
1030+ dispatchee.reset();
1031+
1032+ ASSERT_TRUE(mt::fd_is_readable(dispatcher.watch_fd()));
1033+ dispatcher.dispatch(md::FdEvent::readable);
1034+
1035+ EXPECT_TRUE(dispatched);
1036+}
1037+
1038+TEST(MultiplexingDispatchableTest, removed_dispatchables_are_no_longer_dispatched)
1039+{
1040+ using namespace testing;
1041+
1042+ mt::Pipe pipe;
1043+
1044+ bool dispatched{false};
1045+ auto dispatchable = std::make_shared<mt::TestDispatchable>([&dispatched]() { dispatched = true; });
1046+
1047+ md::MultiplexingDispatchable dispatcher;
1048+ dispatcher.add_watch(dispatchable);
1049+ dispatcher.remove_watch(dispatchable);
1050+
1051+ while (mt::fd_is_readable(dispatcher.watch_fd()))
1052+ {
1053+ dispatcher.dispatch(md::FdEvent::readable);
1054+ }
1055+
1056+ dispatchable->trigger();
1057+
1058+ EXPECT_FALSE(mt::fd_is_readable(dispatcher.watch_fd()));
1059+ dispatcher.dispatch(md::FdEvent::readable);
1060+
1061+ EXPECT_FALSE(dispatched);
1062+}
1063+
1064+TEST(MultiplexingDispatchableTest, adding_same_fd_twice_is_an_error)
1065+{
1066+ using namespace testing;
1067+
1068+ auto dispatchable = std::make_shared<mt::TestDispatchable>([](){});
1069+
1070+ md::MultiplexingDispatchable dispatcher;
1071+ dispatcher.add_watch(dispatchable);
1072+
1073+ EXPECT_THROW(dispatcher.add_watch(dispatchable),
1074+ std::logic_error);
1075+}
1076+
1077+TEST(MultiplexingDispatchableTest, dispatcher_does_not_hold_reference_after_failing_to_add_dispatchee)
1078+{
1079+ using namespace testing;
1080+
1081+ auto dispatchable = std::make_shared<mt::TestDispatchable>([](){});
1082+
1083+ md::MultiplexingDispatchable dispatcher;
1084+ dispatcher.add_watch(dispatchable);
1085+
1086+ auto const dispatchable_refcount = dispatchable.use_count();
1087+
1088+ // This should not increase refcount
1089+ EXPECT_THROW(dispatcher.add_watch(dispatchable),
1090+ std::logic_error);
1091+ EXPECT_THAT(dispatchable.use_count(), Eq(dispatchable_refcount));
1092+}
1093+
1094+TEST(MultiplexingDispatchableTest, individual_dispatchee_is_not_concurrent)
1095+{
1096+ using namespace testing;
1097+
1098+ auto second_dispatch = std::make_shared<mt::Signal>();
1099+ auto dispatchee = std::make_shared<mt::TestDispatchable>([second_dispatch]()
1100+ {
1101+ static std::atomic<int> canary{0};
1102+ static std::atomic<int> total_count{0};
1103+
1104+ ++canary;
1105+ EXPECT_THAT(canary, Eq(1));
1106+ if (++total_count == 2)
1107+ {
1108+ second_dispatch->raise();
1109+ }
1110+ else
1111+ {
1112+ std::this_thread::sleep_for(std::chrono::seconds{1});
1113+ }
1114+ --canary;
1115+ });
1116+
1117+ dispatchee->trigger();
1118+ dispatchee->trigger();
1119+
1120+ auto dispatcher = std::make_shared<md::MultiplexingDispatchable>();
1121+ dispatcher->add_watch(dispatchee);
1122+
1123+ md::SimpleDispatchThread first_loop{dispatcher};
1124+ md::SimpleDispatchThread second_loop{dispatcher};
1125+
1126+ EXPECT_TRUE(second_dispatch->wait_for(std::chrono::seconds{5}));
1127+}
1128+
1129+TEST(MultiplexingDispatchableTest, reentrant_dispatchee_is_dispatched_concurrently)
1130+{
1131+ using namespace testing;
1132+
1133+ std::atomic<int> count{0};
1134+
1135+ auto dispatchee = std::make_shared<mt::TestDispatchable>([&count]()
1136+ {
1137+ ++count;
1138+ std::this_thread::sleep_for(std::chrono::seconds{1});
1139+ EXPECT_THAT(count, Gt(1));
1140+ });
1141+
1142+ dispatchee->trigger();
1143+ dispatchee->trigger();
1144+
1145+ md::MultiplexingDispatchable dispatcher;
1146+ dispatcher.add_watch(dispatchee, md::DispatchReentrancy::reentrant);
1147+
1148+ std::thread first{[&dispatcher]() { dispatcher.dispatch(md::FdEvent::readable); }};
1149+ std::thread second{[&dispatcher]() { dispatcher.dispatch(md::FdEvent::readable); }};
1150+
1151+ first.join();
1152+ second.join();
1153+}
1154+
1155+TEST(MultiplexingDispatchableTest, raw_callback_is_dispatched)
1156+{
1157+ using namespace testing;
1158+
1159+ bool dispatched{false};
1160+ auto dispatchee = [&dispatched]() { dispatched = true; };
1161+ mt::Pipe fd_source;
1162+
1163+ md::MultiplexingDispatchable dispatcher;
1164+ dispatcher.add_watch(fd_source.read_fd(), dispatchee);
1165+
1166+ char buffer{0};
1167+ ASSERT_THAT(::write(fd_source.write_fd(), &buffer, sizeof(buffer)), Eq(sizeof(buffer)));
1168+
1169+ EXPECT_TRUE(mt::fd_is_readable(dispatcher.watch_fd()));
1170+ dispatcher.dispatch(md::FdEvent::readable);
1171+
1172+ EXPECT_TRUE(dispatched);
1173+}
1174+
1175+TEST(MultiplexingDispatchableTest, raw_callback_can_be_removed)
1176+{
1177+ using namespace testing;
1178+
1179+ bool dispatched{false};
1180+ auto dispatchee = [&dispatched]() { dispatched = true; };
1181+ mt::Pipe fd_source;
1182+
1183+ md::MultiplexingDispatchable dispatcher;
1184+ dispatcher.add_watch(fd_source.read_fd(), dispatchee);
1185+ dispatcher.remove_watch(fd_source.read_fd());
1186+
1187+ while (mt::fd_is_readable(dispatcher.watch_fd()))
1188+ {
1189+ dispatcher.dispatch(md::FdEvent::readable);
1190+ }
1191+
1192+ char buffer{0};
1193+ ASSERT_THAT(::write(fd_source.write_fd(), &buffer, sizeof(buffer)), Eq(sizeof(buffer)));
1194+
1195+ EXPECT_FALSE(mt::fd_is_readable(dispatcher.watch_fd()));
1196+ dispatcher.dispatch(md::FdEvent::readable);
1197+
1198+ EXPECT_FALSE(dispatched);
1199+}
1200+
1201+TEST(MultiplexingDispatchableTest, removal_is_threadsafe)
1202+{
1203+ using namespace testing;
1204+
1205+ auto canary_killed = std::make_shared<mt::Signal>();
1206+ auto canary = std::shared_ptr<int>(new int, [canary_killed](int* victim) { delete victim; canary_killed->raise(); });
1207+ auto in_dispatch = std::make_shared<mt::Signal>();
1208+
1209+ auto dispatcher = std::make_shared<md::MultiplexingDispatchable>();
1210+
1211+ auto dispatchee = std::make_shared<mt::TestDispatchable>([canary, in_dispatch]()
1212+ {
1213+ in_dispatch->raise();
1214+ std::this_thread::sleep_for(std::chrono::seconds{1});
1215+ EXPECT_THAT(canary.use_count(), Gt(0));
1216+ });
1217+ dispatcher->add_watch(dispatchee);
1218+
1219+ dispatchee->trigger();
1220+
1221+ md::SimpleDispatchThread eventloop{dispatcher};
1222+
1223+ EXPECT_TRUE(in_dispatch->wait_for(std::chrono::seconds{1}));
1224+
1225+ dispatcher->remove_watch(dispatchee);
1226+ dispatchee.reset();
1227+ canary.reset();
1228+
1229+ EXPECT_TRUE(canary_killed->wait_for(std::chrono::seconds{2}));
1230+}
1231+
1232+TEST(MultiplexingDispatchableTest, destruction_is_threadsafe)
1233+{
1234+ using namespace testing;
1235+
1236+ auto canary_killed = std::make_shared<mt::Signal>();
1237+ auto canary = std::shared_ptr<int>(new int, [canary_killed](int* victim) { delete victim; canary_killed->raise(); });
1238+ auto in_dispatch = std::make_shared<mt::Signal>();
1239+
1240+ auto dispatcher = std::make_shared<md::MultiplexingDispatchable>();
1241+
1242+ auto dispatchee = std::make_shared<mt::TestDispatchable>([canary, in_dispatch]()
1243+ {
1244+ in_dispatch->raise();
1245+ std::this_thread::sleep_for(std::chrono::seconds{1});
1246+ EXPECT_THAT(canary.use_count(), Gt(0));
1247+ });
1248+ dispatcher->add_watch(dispatchee);
1249+
1250+ dispatchee->trigger();
1251+
1252+ mt::AutoJoinThread dispatch_thread{[dispatcher]() { dispatcher->dispatch(md::FdEvent::readable); }};
1253+
1254+ EXPECT_TRUE(in_dispatch->wait_for(std::chrono::seconds{1}));
1255+
1256+ dispatcher->remove_watch(dispatchee);
1257+ dispatcher.reset();
1258+ dispatchee.reset();
1259+ canary.reset();
1260+
1261+ EXPECT_TRUE(canary_killed->wait_for(std::chrono::seconds{2}));
1262+}
1263+
1264+TEST(MultiplexingDispatchableTest, stress_test_threading)
1265+{
1266+ using namespace testing;
1267+
1268+ int const dispatchee_count{20};
1269+
1270+ auto dispatcher = std::make_shared<md::MultiplexingDispatchable>();
1271+
1272+ std::vector<std::shared_ptr<md::SimpleDispatchThread>> eventloops;
1273+ for (int i = 0 ; i < dispatchee_count + 5 ; ++i)
1274+ {
1275+ eventloops.push_back(std::make_shared<md::SimpleDispatchThread>(dispatcher));
1276+ }
1277+
1278+ std::vector<std::shared_ptr<mt::Signal>> canary_tomb;
1279+ std::vector<std::shared_ptr<mt::TestDispatchable>> dispatchees;
1280+ for (int i = 0 ; i < dispatchee_count ; ++i)
1281+ {
1282+ canary_tomb.push_back(std::make_shared<mt::Signal>());
1283+ auto current_canary = canary_tomb.back();
1284+ auto canary = std::shared_ptr<int>(new int, [current_canary](int* victim) { delete victim; current_canary->raise(); });
1285+ auto dispatchee = std::make_shared<mt::TestDispatchable>([canary]()
1286+ {
1287+ std::this_thread::sleep_for(std::chrono::seconds{1});
1288+ EXPECT_THAT(canary.use_count(), Gt(0));
1289+ });
1290+ dispatcher->add_watch(dispatchee, md::DispatchReentrancy::reentrant);
1291+
1292+ dispatchee->trigger();
1293+ }
1294+
1295+ for (auto& dispatchee : dispatchees)
1296+ {
1297+ dispatchee->trigger();
1298+ dispatcher->remove_watch(dispatchee);
1299+ }
1300+
1301+ dispatchees.clear();
1302+ dispatcher.reset();
1303+ eventloops.clear();
1304+
1305+ for (auto headstone : canary_tomb)
1306+ {
1307+ // Use assert so as to not block for *ages* on failure
1308+ ASSERT_TRUE(headstone->wait_for(std::chrono::seconds{2}));
1309+ }
1310+}
1311+
1312+TEST(MultiplexingDispatchableTest, removes_dispatchable_that_returns_false_from_dispatch)
1313+{
1314+ bool dispatched{false};
1315+ auto dispatchee = std::make_shared<mt::TestDispatchable>([&dispatched](md::FdEvents)
1316+ {
1317+ dispatched = true;
1318+ return false;
1319+ });
1320+ md::MultiplexingDispatchable dispatcher{dispatchee};
1321+
1322+ dispatchee->trigger();
1323+ dispatchee->trigger();
1324+
1325+ ASSERT_TRUE(mt::fd_is_readable(dispatcher.watch_fd()));
1326+ dispatcher.dispatch(md::FdEvent::readable);
1327+
1328+ EXPECT_TRUE(dispatched);
1329+
1330+ dispatched = false;
1331+ while (mt::fd_is_readable(dispatcher.watch_fd()))
1332+ {
1333+ dispatcher.dispatch(md::FdEvent::readable);
1334+ }
1335+
1336+ EXPECT_FALSE(dispatched);
1337+}
1338+
1339+TEST(MultiplexingDispatchableTest, multiple_removals_are_threadsafe)
1340+{
1341+ using namespace testing;
1342+
1343+ auto canary_killed = std::make_shared<mt::Signal>();
1344+ auto canary = std::shared_ptr<int>(new int, [canary_killed](int* victim) { delete victim; canary_killed->raise(); });
1345+ auto in_dispatch = std::make_shared<mt::Signal>();
1346+ auto unblock_dispatchee = std::make_shared<mt::Signal>();
1347+
1348+ auto dispatcher = std::make_shared<md::MultiplexingDispatchable>();
1349+
1350+ auto first_dispatchee = std::make_shared<mt::TestDispatchable>([canary, in_dispatch, unblock_dispatchee]()
1351+ {
1352+ in_dispatch->raise();
1353+ EXPECT_TRUE(unblock_dispatchee->wait_for(std::chrono::seconds{5}));
1354+ EXPECT_THAT(canary.use_count(), Gt(0));
1355+ });
1356+ auto dummy_dispatchee = std::make_shared<mt::TestDispatchable>([](){});
1357+ dispatcher->add_watch(first_dispatchee);
1358+ dispatcher->add_watch(dummy_dispatchee);
1359+
1360+ first_dispatchee->trigger();
1361+
1362+ md::SimpleDispatchThread eventloop_one{dispatcher};
1363+ md::SimpleDispatchThread eventloop_two{dispatcher};
1364+
1365+ EXPECT_TRUE(in_dispatch->wait_for(std::chrono::seconds{1}));
1366+
1367+ dispatcher->remove_watch(dummy_dispatchee);
1368+ dispatcher->remove_watch(first_dispatchee);
1369+ dispatcher.reset();
1370+ first_dispatchee.reset();
1371+ dummy_dispatchee.reset();
1372+ canary.reset();
1373+
1374+ unblock_dispatchee->raise();
1375+
1376+ EXPECT_TRUE(canary_killed->wait_for(std::chrono::seconds{2}));
1377+}
1378+
1379+TEST(MultiplexingDispatchableTest, automatic_removals_are_threadsafe)
1380+{
1381+ auto dispatcher = std::make_shared<md::MultiplexingDispatchable>();
1382+
1383+ auto dispatchee = std::make_shared<mt::TestDispatchable>([](md::FdEvents) { return false; });
1384+
1385+ dispatcher->add_watch(dispatchee, md::DispatchReentrancy::reentrant);
1386+
1387+ md::SimpleDispatchThread one{dispatcher}, two{dispatcher}, three{dispatcher}, four{dispatcher};
1388+
1389+ dispatchee->trigger();
1390+}

Subscribers

People subscribed via source and target branches