Merge lp:~michihenning/unity-scopes-api/async-create into lp:unity-scopes-api/devel
- async-create
- Merge into devel
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Michal Hruby | ||||
Approved revision: | 306 | ||||
Merged at revision: | 309 | ||||
Proposed branch: | lp:~michihenning/unity-scopes-api/async-create | ||||
Merge into: | lp:unity-scopes-api/devel | ||||
Diff against target: |
4879 lines (+2089/-517) 80 files modified
RELEASE_NOTES.txt (+8/-1) debian/libunity-scopes1.symbols (+11/-7) demo/client.cpp (+24/-14) demo/scopes/scope-A/scope-A.cpp (+45/-14) demo/scopes/scope-B/scope-B.cpp (+14/-9) demo/scopes/scope-C/scope-C.cpp (+16/-10) demo/scopes/scope-D/scope-D.cpp (+9/-5) demo/scopes/scope-N/scope-N.cpp (+21/-6) demo/scopes/scope-S/scope-S.cpp (+28/-21) include/unity/scopes/PreviewQueryBase.h (+1/-1) include/unity/scopes/QueryBase.h (+19/-1) include/unity/scopes/internal/ObjectImpl.h (+13/-4) include/unity/scopes/internal/QueryBaseImpl.h (+3/-0) include/unity/scopes/internal/QueryCtrlImpl.h (+9/-6) include/unity/scopes/internal/QueryObject.h (+3/-4) include/unity/scopes/internal/RegistryImpl.h (+1/-1) include/unity/scopes/internal/ReplyImpl.h (+1/-1) include/unity/scopes/internal/ReplyObject.h (+1/-1) include/unity/scopes/internal/RuntimeImpl.h (+9/-4) include/unity/scopes/internal/ScopeImpl.h (+2/-2) include/unity/scopes/internal/SearchReplyImpl.h (+1/-1) include/unity/scopes/internal/TaskWrapper.h (+1/-2) include/unity/scopes/internal/ThreadPool.h (+2/-0) include/unity/scopes/internal/ThreadSafeQueue.h (+51/-21) include/unity/scopes/internal/zmq_middleware/ConnectionPool.h (+7/-5) include/unity/scopes/internal/zmq_middleware/ObjectAdapter.h (+8/-6) include/unity/scopes/internal/zmq_middleware/StopPublisher.h (+85/-0) include/unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h (+6/-2) src/scopes/QueryBase.cpp (+5/-0) src/scopes/internal/ObjectImpl.cpp (+31/-3) src/scopes/internal/QueryBaseImpl.cpp (+24/-2) src/scopes/internal/QueryCtrlImpl.cpp (+39/-4) src/scopes/internal/QueryObject.cpp (+32/-12) src/scopes/internal/RegistryImpl.cpp (+1/-1) src/scopes/internal/ReplyImpl.cpp (+1/-1) src/scopes/internal/ReplyObject.cpp (+2/-3) src/scopes/internal/RuntimeImpl.cpp (+89/-15) src/scopes/internal/ScopeImpl.cpp (+125/-88) src/scopes/internal/ScopeObject.cpp (+0/-1) src/scopes/internal/SearchReplyImpl.cpp (+4/-2) src/scopes/internal/ThreadPool.cpp (+18/-11) src/scopes/internal/smartscopes/SSQueryObject.cpp (+1/-1) src/scopes/internal/zmq_middleware/CMakeLists.txt (+1/-0) src/scopes/internal/zmq_middleware/ConnectionPool.cpp (+19/-11) src/scopes/internal/zmq_middleware/ObjectAdapter.cpp (+55/-78) src/scopes/internal/zmq_middleware/StopPublisher.cpp (+225/-0) src/scopes/internal/zmq_middleware/ZmqMiddleware.cpp (+10/-10) src/scopes/internal/zmq_middleware/ZmqObject.cpp (+67/-44) src/scopes/internal/zmq_middleware/ZmqQuery.cpp (+1/-1) src/scopes/internal/zmq_middleware/ZmqQueryCtrl.cpp (+2/-2) src/scopes/internal/zmq_middleware/ZmqRegistry.cpp (+3/-3) src/scopes/internal/zmq_middleware/ZmqReply.cpp (+2/-2) src/scopes/internal/zmq_middleware/ZmqScope.cpp (+6/-4) src/scopes/internal/zmq_middleware/ZmqStateReceiver.cpp (+1/-1) test/gtest/scopes/CMakeLists.txt (+9/-8) test/gtest/scopes/Invocation/CMakeLists.txt (+8/-0) test/gtest/scopes/Invocation/Invocation_test.cpp (+145/-0) test/gtest/scopes/Invocation/Registry.ini.in (+8/-0) test/gtest/scopes/Invocation/Runtime.ini.in (+5/-0) test/gtest/scopes/Invocation/TestScope.cpp (+94/-0) test/gtest/scopes/Invocation/TestScope.h (+40/-0) test/gtest/scopes/Invocation/Zmq.ini.in (+3/-0) test/gtest/scopes/Registry/Registry_test.cpp (+39/-13) test/gtest/scopes/Runtime/CMakeLists.txt (+1/-1) test/gtest/scopes/Runtime/PusherScope.cpp (+8/-1) test/gtest/scopes/Runtime/PusherScope.h (+5/-0) test/gtest/scopes/Runtime/Runtime_test.cpp (+132/-6) test/gtest/scopes/Runtime/SlowCreateScope.cpp (+101/-0) test/gtest/scopes/Runtime/SlowCreateScope.h (+40/-0) test/gtest/scopes/Runtime/TestScope.cpp (+6/-0) test/gtest/scopes/Runtime/TestScope.h (+5/-0) test/gtest/scopes/ScopeExceptions/ScopeExceptions_test.cpp (+29/-0) test/gtest/scopes/internal/ThreadPool/ThreadPool_test.cpp (+11/-0) test/gtest/scopes/internal/ThreadSafeQueue/ThreadSafeQueue_test.cpp (+71/-8) test/gtest/scopes/internal/smartscopes/smartscopesproxy/smartscopesproxy_test.cpp (+2/-5) test/gtest/scopes/internal/zmq_middleware/CMakeLists.txt (+1/-0) test/gtest/scopes/internal/zmq_middleware/ObjectAdapter/ObjectAdapter_test.cpp (+21/-24) test/gtest/scopes/internal/zmq_middleware/RegistryI/RegistryI_test.cpp (+2/-2) test/gtest/scopes/internal/zmq_middleware/StopPublisher/CMakeLists.txt (+4/-0) test/gtest/scopes/internal/zmq_middleware/StopPublisher/StopPublisher_test.cpp (+136/-0) |
||||
To merge this branch: | bzr merge lp:~michihenning/unity-scopes-api/async-create | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Michal Hruby (community) | Approve | ||
Paweł Stołowski (community) | Approve | ||
PS Jenkins bot (community) | continuous-integration | Approve | |
Marcus Tomlinson (community) | Approve | ||
Review via email: mp+213489@code.launchpad.net |
Commit message
The scope search, activate, perform_action, and preview methods now return immediately instead of waiting for a response from the server. The returned QueryCtrl is a "fake" one: if cancel() is called on the returned QueryCtrl, the cancellation is remembered and sent once the corresponding create method in the scope has completed.
This change should be transparent to application code (other than that these methods now can no longer block).
Also added interlocks to QueryObject, so cancel() and run() see consistent data.
Fixed a number of race conditions during shutdown.
Description of the change
The scope search, activate, perform_action, and preview methods now return immediately instead of waiting for a response from the server. The returned QueryCtrl is a "fake" one: if cancel() is called on the returned QueryCtrl, the cancellation is remembered and sent once the corresponding create method in the scope has completed.
This change should be transparent to application code (other than that these methods now can no longer block).
Also added interlocks to QueryObject, so cancel() and run() see consistent data.
Fixed a number of race conditions during shutdown.
PS Jenkins bot (ps-jenkins) wrote : | # |
PS Jenkins bot (ps-jenkins) wrote : | # |
PASSED: Continuous integration, rev:284
http://
Executed test runs:
SUCCESS: http://
SUCCESS: http://
deb: http://
SUCCESS: http://
Click here to trigger a rebuild:
http://
Paweł Stołowski (stolowski) wrote : | # |
Impressive stuff! I've to admit I don't fully grasp all the changes yet, need to give it another look tomorrow. Just a few minor nitpicks for now:
8 +Changes in version 0.4.1
devel is currently at 0.4.2, you may want to make it 0.4.3.
437 +class Socket final
I think it would be worth marking it as non-copyable; I know it's internal class, but still, makes it harder for us to do silly mistakes.
1939 + * Copyright (C) 2013 Canonical Ltd
2038 + * Copyright (C) 2013 Canonical Ltd
Please also fix it in the header template you use ;)
Michi Henning (michihenning) wrote : | # |
> Impressive stuff! I've to admit I don't fully grasp all the changes yet, need
> to give it another look tomorrow.
Hold off with more review just yet please, this isn't ready for prime time as of now.
> Just a few minor nitpicks for now:
>
> 8 +Changes in version 0.4.1
>
> devel is currently at 0.4.2, you may want to make it 0.4.3.
Will do, thanks!
> 437 +class Socket final
>
> I think it would be worth marking it as non-copyable; I know it's internal
> class, but still, makes it harder for us to do silly mistakes.
It's non-copyable already because it defines a move constructor and move assignment operator, which suppresses the default-generated copy constructor and assignment operator.
> 1939 + * Copyright (C) 2013 Canonical Ltd
> 2038 + * Copyright (C) 2013 Canonical Ltd
>
> Please also fix it in the header template you use ;)
Thanks for that! :-) I don't use a header template, as rule. Instead, I just copy some similar header and hack it up. Need to get out of the habit of doing that…
PS Jenkins bot (ps-jenkins) wrote : | # |
FAILED: Continuous integration, rev:287
http://
Executed test runs:
FAILURE: http://
FAILURE: http://
FAILURE: http://
Click here to trigger a rebuild:
http://
PS Jenkins bot (ps-jenkins) wrote : | # |
FAILED: Continuous integration, rev:288
http://
Executed test runs:
FAILURE: http://
SUCCESS: http://
deb: http://
FAILURE: http://
Click here to trigger a rebuild:
http://
PS Jenkins bot (ps-jenkins) wrote : | # |
FAILED: Continuous integration, rev:289
http://
Executed test runs:
FAILURE: http://
SUCCESS: http://
deb: http://
FAILURE: http://
Click here to trigger a rebuild:
http://
PS Jenkins bot (ps-jenkins) wrote : | # |
PASSED: Continuous integration, rev:290
http://
Executed test runs:
SUCCESS: http://
SUCCESS: http://
deb: http://
SUCCESS: http://
Click here to trigger a rebuild:
http://
Michi Henning (michihenning) wrote : | # |
I think this now correctly does the async dispatch for search(), preview(), etc. These methods do not fail now. All errors are instead delivered to the finished() callback. The returned QueryCtrl proxy is a "fake" proxy: if cancel() is called on that proxy before the search method in the target scope has returned the real QueryCtrl, the cancel is remembered by the run time and delivered as soon as the real QueryCtrl arrives.
Michal, could you give this a spin please?
This doesn't not yet fix Marcus's problem where a finished() call and overtake a push(). Will work on that next.
PS Jenkins bot (ps-jenkins) wrote : | # |
PASSED: Continuous integration, rev:294
http://
Executed test runs:
SUCCESS: http://
SUCCESS: http://
deb: http://
SUCCESS: http://
Click here to trigger a rebuild:
http://
Marcus Tomlinson (marcustomlinson) wrote : | # |
Hi Michi, I noticed an issue with this after merging with my rebinding branch:
2709 + // If a request times out, we must close the corresponding socket, otherwise
2710 + // zmq gets confused: the reply will never be read, so the socket ends up
2711 + // in a bad state.
2712 + s.close();
2713 + throw TimeoutExceptio
I don't think we can just close the socket and leave it in the connection pool. We need to call "pool.remove(
Simply replacing "s.close()" with "pool.remove(
Marcus Tomlinson (marcustomlinson) wrote : | # |
117 + if (!valid())
rather than expecting every scope author to have to do this, could we not do this check internally from QueryObject::run() and SSQueryObject:
Michi Henning (michihenning) wrote : | # |
> 117 + if (!valid())
>
> rather than expecting every scope author to have to do this, could we not do
> this check internally from QueryObject::run() and SSQueryObject:
> before we call run()?
Unfortunately, no. run() and cancel() are called by different threads. That's so I can guarantee that there is always a thread available to deliver a cancel message, even if run() is implemented synchronously and all run threads are currently in use. This also means that the cancel() for a run() invocation can be dispatched *before* run() is called. In the dispatch code for run, I have to do something like this:
unique_lock<mutex> lock(mutex_);
if (not query cancelled already) // Call run() only if the query wasn't cancelled
{
lock.unlock();
application
}
The problem is that, between the unlock and the call to run(), the scheduler can suspend my thread, run the thread that processes an incoming cancel, and then run my thread again, meaning that, by the time run() gets into the application code, the query was cancelled already.
I can't call run() with the lock held because that would require the application code to unlock, and we can't rely on it doing that. (This would also create all sorts of deadlock scenarios.) Note that, even if I use an atomic_bool or some such, so the sequence is lock-free, the race remains: in between the test and the call to run(), the query can be cancelled.
The upshot of it all for the scope authors is:
- By the time the run() is called, the query may have been cancelled already. If so, valid() will return false. If it does, don't bother running the query; just return.
- The cancel may arrive at any time: before run() is called or while run() is still in progress, and the cancel() will always be delivered by a thread other than the one that called run(). If cancel() relies on state that is established by run(), the scope developer must use a full fence or mutex to make sure that cancel() operates on current state.
- The scope developer must write cancel() such that it finishes promptly.
- Always check the return value from push() and, if push() returns false, stop pushing and clean up. It is benign to ignore the return value from push(), but all the work done once push() returns false is wasted because the run time will simply throw the results that are pushed on the floor.
- 295. By Michi Henning
-
Fixed bug that would leave a bad socket in the connection pool after a
timeout exception. Added test for that, and improved test coverage
for ScopeExceptions.cpp.
Michi Henning (michihenning) wrote : | # |
> I don't think we can just close the socket and leave it in the connection
> pool.
Awesome catch, thanks for that!
I fixed it and added a test to make sure it works now.
PS Jenkins bot (ps-jenkins) wrote : | # |
PASSED: Continuous integration, rev:295
http://
Executed test runs:
SUCCESS: http://
SUCCESS: http://
deb: http://
SUCCESS: http://
Click here to trigger a rebuild:
http://
Marcus Tomlinson (marcustomlinson) wrote : | # |
> > 117 + if (!valid())
> >
> > rather than expecting every scope author to have to do this, could we not do
> > this check internally from QueryObject::run() and SSQueryObject:
> > before we call run()?
>
> Unfortunately, no. run() and cancel() are called by different threads. That's
> so I can guarantee that there is always a thread available to deliver a cancel
> message, even if run() is implemented synchronously and all run threads are
> currently in use. This also means that the cancel() for a run() invocation can
> be dispatched *before* run() is called. In the dispatch code for run, I have
> to do something like this:
>
> unique_lock<mutex> lock(mutex_);
> if (not query cancelled already) // Call run() only if the query wasn't
> cancelled
> {
> lock.unlock();
> application_
> }
>
> The problem is that, between the unlock and the call to run(), the scheduler
> can suspend my thread, run the thread that processes an incoming cancel, and
> then run my thread again, meaning that, by the time run() gets into the
> application code, the query was cancelled already.
Ok I see what you're saying, but consider these 2 code snippets:
Is there really any difference between them?
QueryObject::run()
{
...
if (scope.valid())
{
scope.run()
}
...
}
Scope::run()
{
if (!valid())
{
return;
}
...
}
I'm suggesting the first option is a lot cleaner and give the scope author one less thing to remember. Sorry if I'm still missing the point.
Michi Henning (michihenning) wrote : | # |
> QueryObject::run()
> {
> ...
> if (scope.valid())
> {
> scope.run()
> }
> ...
> }
This is what's effectively happening already. The problem is that between valid() returning true, and scope.run() being executed, the cancel can arrive. So, the point of valid() on the public API is to give a scope author a way to check whether the query was cancelled already before doing a lot of work setting up and running a query that, in the end, will return false from the first call to push().
Maybe it's me who's missing something? :-)
- 296. By Michi Henning
-
Improved documentation.
PS Jenkins bot (ps-jenkins) wrote : | # |
PASSED: Continuous integration, rev:296
http://
Executed test runs:
SUCCESS: http://
SUCCESS: http://
deb: http://
SUCCESS: http://
Click here to trigger a rebuild:
http://
Marcus Tomlinson (marcustomlinson) wrote : | # |
Well, I've read through this code a few times now with every effort to understand it all. As far as I can see, this looks really good!
Just a few little things I picked up:
366 + void set_proxy(MWProxy const& p); // Allows a derived proxy to replace mw_proxy_ for aynchronous twoway calls.
"aynchronous" should be "asynchronous"
2126 + stopper_.reset(new StopPublisher(
4045 + "failed (endpoint: inproc:
"stoppper"? Was this intended?
3587 + // cancel is correcly forwarded to the scope once the real reply proxy arrives
"correcly" should be "correctly"
- 297. By Michi Henning
-
Fixed typos.
Marcus Tomlinson (marcustomlinson) wrote : | # |
4045 + "failed (endpoint: inproc:
forgot this one ;)
- 298. By Michi Henning
-
Fixed another typo.
PS Jenkins bot (ps-jenkins) wrote : | # |
FAILED: Continuous integration, rev:297
http://
Executed test runs:
FAILURE: http://
FAILURE: http://
FAILURE: http://
Click here to trigger a rebuild:
http://
Marcus Tomlinson (marcustomlinson) wrote : | # |
I'm happy with this.
- 299. By Michi Henning
-
Increased wait time between tests due to "address in use" failures when running on Jenkins.
PS Jenkins bot (ps-jenkins) wrote : | # |
PASSED: Continuous integration, rev:298
http://
Executed test runs:
SUCCESS: http://
SUCCESS: http://
deb: http://
SUCCESS: http://
Click here to trigger a rebuild:
http://
PS Jenkins bot (ps-jenkins) wrote : | # |
PASSED: Continuous integration, rev:299
http://
Executed test runs:
SUCCESS: http://
SUCCESS: http://
deb: http://
SUCCESS: http://
Click here to trigger a rebuild:
http://
Michal Hruby (mhr3) wrote : | # |
179 + if (--num_children_ == 0)
180 + {
181 + upstream_
182 + }
Why is this needed suddenly? Wasn't the point of create_subquery to take care of that?
Michal Hruby (mhr3) wrote : | # |
Also, still managed to get it to hang on shutdown by running the client binary (from demo/ dir), here's the state of the threads:
Id Target Id Frame
11 Thread 0xb4c21410 (LWP 8173) "client" 0xb6b655f4 in __libc_do_syscall
() from /lib/arm-
10 Thread 0xb42ff410 (LWP 8174) "client" 0xb6b655f4 in __libc_do_syscall
() from /lib/arm-
9 Thread 0xb3aff410 (LWP 8175) "client" 0xb6b655f4 in __libc_do_syscall
() from /lib/arm-
8 Thread 0xb30ff410 (LWP 8176) "client" 0xb6b655f4 in __libc_do_syscall
() from /lib/arm-
7 Thread 0xb28ff410 (LWP 8177) "client" 0xb6b655f4 in __libc_do_syscall
() from /lib/arm-
6 Thread 0xb20ff410 (LWP 8178) "client" 0xb6d4f032 in epoll_wait ()
from /lib/arm-
5 Thread 0xb18ff410 (LWP 8179) "client" 0xb6d4f032 in epoll_wait ()
from /lib/arm-
4 Thread 0xb08ff410 (LWP 8181) "client" 0xb6b655f4 in __libc_do_syscall
() from /lib/arm-
3 Thread 0xb00ff410 (LWP 8182) "client" 0xb6d45d22 in poll ()
from /lib/arm-
2 Thread 0xaf8ff410 (LWP 8183) "client" 0xb6d45d22 in poll ()
from /lib/arm-
* 1 Thread 0xb4c59000 (LWP 8172) "client" 0xb6b655f4 in __libc_do_syscall
() from /lib/arm-
Thread 11 (Thread 0xb4c21410 (LWP 8173)):
#0 0xb6b655f4 in __libc_do_syscall ()
from /lib/arm-
#1 0xb6b611d8 in pthread_
from /lib/arm-
#2 0xb6e425f8 in std::condition_
() from /usr/lib/
#3 0xb6f31e92 in wait<unity:
__lock=..., this=0x3b7af0) at /usr/include/
#4 unity::
at /home/phablet/
#5 0xb6f30c42 in unity::
at /home/phablet/
#6 0xb6e44a2c in ?? () from /usr/lib/
Backtrace stopped: previous frame identical to this frame (corrupt stack?)
Thread 10 (Thread 0xb42ff410 (LWP 8174)):
#0 0xb6b655f4 in __libc_do_syscall ()
from /lib/arm-
#1 0xb6b63268 in __lll_lock_wait ()
from /lib/arm-
#2 0xb6b5f8d8 in pthread_mutex_lock ()
from /lib/arm-
#3 0xb6f1944a in __gthread_
at /usr/include/
#4 lock (this=0x3b055c) at /usr/include/
#5 lock_gua...
Michal Hruby (mhr3) wrote : | # |
As a note, I was trying this from the all-in-one rebinding-logic branch, so the line numbers are valid for that one.
Marcus Tomlinson (marcustomlinson) wrote : | # |
> As a note, I was trying this from the all-in-one rebinding-logic branch, so
> the line numbers are valid for that one.
I can confirm that this hang occurs with just this branch on its own (without the serialize-
Marcus Tomlinson (marcustomlinson) wrote : | # |
Despite the fact that I missed that hang issue, I will stick with my Approved stance on this branch. I probably won't be around when this is completed, so Michal's review will take preference anyway.
Paweł Stołowski (stolowski) wrote : | # |
I'm having issues with running demo/client against real scopes on the phone with this branch (I've replaced libunity-scopes on the phone with the one from this branch, symlinked system-wide Runtime.ini inside demo/ dir): the client receives metadata only, doesn't receive any results and the query finishes with no errors. Interestingly, Dash on the phone seems to be running just fine with this branch.
Michal tested this branch on the phone and desktop and he had same issue on the phone, but all worked fine on the desktop.
I repeated this test with pristine devel branch (also on the phone) and it worked just fine, so perhaps there is something wrong in the client code that breaks with async change, although I couldn't spot anything.
Could you please have a look at the client code or/and try to reproduce?
Michi Henning (michihenning) wrote : | # |
> 179 + if (--num_children_ == 0)
> 180 + {
> 181 + upstream_
> finishes
> 182 + }
>
> Why is this needed suddenly? Wasn't the point of create_subquery to take care
> of that?
No, not really. The sub-query thing takes care of transparently passing cancellation down to the child scopes. However, the aggregator still has to make a decision as to when a query is complete. It isn't necessarily the *first* finished() call that arrives. Rather, if the aggregator sends the query to three child scopes, and wants to consider the query finished once the last of these three scopes finishes, it needs to keep track of the number of finished() calls, which is what the above change is doing. (This was broken all along.)
Michal Hruby (mhr3) wrote : | # |
> > Why is this needed suddenly? Wasn't the point of create_subquery to take
> care
> > of that?
>
> No, not really. The sub-query thing takes care of transparently passing
> cancellation down to the child scopes.
Right, subsearch doesn't magically make it work, but...
> ... However, the aggregator still has to
> make a decision as to when a query is complete.
...this decision was implicit up until now - subsearch() took a SearchListenerBase instance that kept the upstream ReplyProxy alive, and only when the subsearches finished was the last reference to the upstream ReplyProxy destroyed which sent the finished message.
So something must be broke if this is needed now.
Paweł Stołowski (stolowski) wrote : | # |
> I'm having issues with running demo/client against real scopes on the phone
> with this branch (I've replaced libunity-scopes on the phone with the one from
> this branch, symlinked system-wide Runtime.ini inside demo/ dir): the client
> receives metadata only, doesn't receive any results and the query finishes
> with no errors. Interestingly, Dash on the phone seems to be running just fine
> with this branch.
>
> Michal tested this branch on the phone and desktop and he had same issue on
> the phone, but all worked fine on the desktop.
>
> I repeated this test with pristine devel branch (also on the phone) and it
> worked just fine, so perhaps there is something wrong in the client code that
> breaks with async change, although I couldn't spot anything.
>
> Could you please have a look at the client code or/and try to reproduce?
Some more info about how I tested this:
1) I've compiled a checkout of async-create with dpkg-buildpackage on the phone, then installed the resulting .deb package and rebooted the phone.
2) Inside ~/async-
phablet@
total 696
drwxrwxr-x 4 phablet phablet 4096 Apr 17 12:27 click
-rwxrwxr-x 1 phablet phablet 667320 Apr 17 12:59 client
drwxrwxr-x 3 phablet phablet 4096 Apr 17 12:27 CMakeFiles
-rw-rw-r-- 1 phablet phablet 1570 Apr 17 12:27 cmake_install.cmake
-rw-rw-r-- 1 phablet phablet 351 Apr 17 12:27 CTestTestfile.cmake
-rw-rw-r-- 1 phablet phablet 7477 Apr 17 12:27 Makefile
-rw-rw-r-- 1 phablet phablet 488 Apr 17 12:27 Registry.ini
lrwxrwxrwx 1 phablet phablet 39 Apr 17 13:11 Runtime.ini -> /usr/share/
-rw-rw-r-- 1 phablet phablet 236 Apr 17 12:27 Runtime.ini.bak
drwxrwxr-x 9 phablet phablet 4096 Apr 17 12:27 scopes
drwxrwxr-x 3 phablet phablet 4096 Apr 17 12:59 stand-alone
-rw-rw-r-- 1 phablet phablet 59 Apr 17 12:27 Zmq.ini
phablet@
3) Now, from within ~/async-
./client list
./client clickscope a
./client clickscope ""
for any scope and query, I'm only getting scope's metadata and no results:
./client clickscope ""
Scope metadata:
scope_id: clickscope
art: /usr/share/
icon: /usr/share/
hot_key: clickscope.HotKey
client: created query
client: wait returned
query complete, status: finished
Michi Henning (michihenning) wrote : | # |
> Some more info about how I tested this:
Thanks for that! After struggling with building on the phone some more, I could reproduce. (ccache is your friend when it comes to building on the bloody thing...)
The ultimate cause of the problem was this:
120.5.10 <email address hidden> 20140116 | Receiver(int index_to_save)
120.6.9 <email address hidden> 20140110 | push_result_
This constructor was added at some point. The class originally had a default constructor which, after this change, was redundant, but not removed. Unlike the original constructor, the new constructor failed to initialize the query_complete_ member variable. It turns out that, on the device, the corresponding boolean was inititalized to crap (meaning true), causing wait_until_
I haven't had a chance to look at the hang during shutdown yet. Will do that next.
- 300. By Michi Henning
-
Fixed broken wait for completion: member variable was not initialized,
causing the client to shut down the run time before the results arrived
from the scope. Fixed the other receiver classes to match.
Paweł Stołowski (stolowski) wrote : | # |
Thanks for spotting and fixing the issue! Now, there is just one conflict in release notes that needs fixing.
Michal Hruby (mhr3) wrote : | # |
+ the shutdown issue and the ReplyProxy issue.
PS Jenkins bot (ps-jenkins) wrote : | # |
FAILED: Continuous integration, rev:300
http://
Executed test runs:
FAILURE: http://
FAILURE: http://
FAILURE: http://
Click here to trigger a rebuild:
http://
- 301. By Michi Henning
-
Merged devel and resolved conflicts. Set version back to 0.4.2
because addition of valid() to QueryBase is ABI compatible.
PS Jenkins bot (ps-jenkins) wrote : | # |
FAILED: Continuous integration, rev:301
http://
Executed test runs:
SUCCESS: http://
SUCCESS: http://
deb: http://
FAILURE: http://
Click here to trigger a rebuild:
http://
- 302. By Michi Henning
-
Debug changes, unfinished.
- 303. By Michi Henning
-
Fixed incorrect scope of task in ThreadPool::run(): the destructor of the task
was called only once the next task was started, instead of being called
as soon as the task completed.
PS Jenkins bot (ps-jenkins) wrote : | # |
PASSED: Continuous integration, rev:303
http://
Executed test runs:
SUCCESS: http://
SUCCESS: http://
deb: http://
SUCCESS: http://
Click here to trigger a rebuild:
http://
- 304. By Michi Henning
-
Fixed incorrect count of threads in run() method and added a test for throwing tasks.
- 305. By Michi Henning
-
Tightened ThreadPool exception test.
PS Jenkins bot (ps-jenkins) wrote : | # |
PASSED: Continuous integration, rev:304
http://
Executed test runs:
SUCCESS: http://
SUCCESS: http://
deb: http://
SUCCESS: http://
Click here to trigger a rebuild:
http://
PS Jenkins bot (ps-jenkins) wrote : | # |
PASSED: Continuous integration, rev:305
http://
Executed test runs:
SUCCESS: http://
SUCCESS: http://
deb: http://
SUCCESS: http://
Click here to trigger a rebuild:
http://
- 306. By Michi Henning
-
Changed micro version back to 3 after all because there was mention of that in debian/changelog already.
Better safe than sorry.
Michi Henning (michihenning) wrote : | # |
> + the shutdown issue and the ReplyProxy issue.
They are both fixed now.
PS Jenkins bot (ps-jenkins) wrote : | # |
PASSED: Continuous integration, rev:306
http://
Executed test runs:
SUCCESS: http://
SUCCESS: http://
deb: http://
SUCCESS: http://
Click here to trigger a rebuild:
http://
Paweł Stołowski (stolowski) wrote : | # |
> > + the shutdown issue and the ReplyProxy issue.
>
> They are both fixed now.
Awesome, thanks for all this! +1 from me.
Michal Hruby (mhr3) wrote : | # |
+1, seems to work great, although the threading code is getting pretty scary, but I guess we need to live with that.
Preview Diff
1 | === modified file 'RELEASE_NOTES.txt' |
2 | --- RELEASE_NOTES.txt 2014-04-11 12:07:36 +0000 |
3 | +++ RELEASE_NOTES.txt 2014-04-28 05:57:56 +0000 |
4 | @@ -1,8 +1,15 @@ |
5 | Release Notes for unity-scopes-api |
6 | ================================== |
7 | |
8 | -Changes in version 0.4.3 |
9 | +Changes in version 0.4.2 |
10 | ======================== |
11 | + |
12 | + * Made the scope search, activate, perform_action, and preview methods non-blocking. |
13 | + A (fake) QueryCtrl is returned immediately from these methods now. Calling cancel() before |
14 | + the server has finished creating the query remembers the cancel and sends it to the |
15 | + server once the server has returned the real QueryCtrl. This change should be transparent |
16 | + to application code (the only difference being that these methods complete faster now). |
17 | + |
18 | * CannedQuery class can now be converted to and from a scopes:// uri with to_uri() and from_uri() methods. |
19 | These methods replace to_string() and from_string() methods that got removed. |
20 | |
21 | |
22 | === modified file 'debian/libunity-scopes1.symbols' |
23 | --- debian/libunity-scopes1.symbols 2014-04-17 10:31:23 +0000 |
24 | +++ debian/libunity-scopes1.symbols 2014-04-28 05:57:56 +0000 |
25 | @@ -285,6 +285,7 @@ |
26 | (c++)"unity::scopes::Runtime::run_scope(unity::scopes::ScopeBase*, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&)@Base" 0.4.2+14.04.20140404.2 |
27 | (c++)"unity::scopes::Runtime::Runtime(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&)@Base" 0.4.0+14.04.20140312.1 |
28 | (c++)"unity::scopes::Runtime::~Runtime()@Base" 0.4.0+14.04.20140312.1 |
29 | + (c++)"unity::scopes::Variant::deserialize_json(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&)@Base" 0replaceme |
30 | (c++)"unity::scopes::Variant::null()@Base" 0.4.0+14.04.20140312.1 |
31 | (c++)"unity::scopes::Variant::swap(unity::scopes::Variant&)@Base" 0.4.0+14.04.20140312.1 |
32 | (c++)"unity::scopes::Variant::Variant(unity::scopes::Variant&&)@Base" 0.4.0+14.04.20140312.1 |
33 | @@ -454,7 +455,11 @@ |
34 | (c++)"unity::scopes::internal::StateReceiverObject::push_state(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, unity::scopes::internal::StateReceiverObject::State const&)@Base" 0.4.2+14.04.20140404.2 |
35 | (c++)"unity::scopes::internal::StateReceiverObject::StateReceiverObject()@Base" 0.4.2+14.04.20140404.2 |
36 | (c++)"unity::scopes::internal::StateReceiverObject::~StateReceiverObject()@Base" 0.4.2+14.04.20140404.2 |
37 | + (c++)"unity::scopes::internal::Executor::exec(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::vector<std::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::basic_string<char, std::char_traits<char>, std::allocator<char> > > > const&, std::map<std::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::less<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::allocator<std::pair<std::basic_string<char, std::char_traits<char>, std::allocator<char> > const, std::basic_string<char, std::char_traits<char>, std::allocator<char> > > > > const&, core::posix::StandardStream const&)@Base" 0replaceme |
38 | + (c++)"unity::scopes::internal::Executor::Executor()@Base" 0replaceme |
39 | + (c++)"unity::scopes::internal::Executor::~Executor()@Base" 0replaceme |
40 | (c++)"unity::scopes::internal::ScopeImpl::perform_action(unity::scopes::Result const&, unity::scopes::ActionMetadata const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::shared_ptr<unity::scopes::ActivationListenerBase> const&)@Base" 0.4.0+14.04.20140312.1 |
41 | + (c++)"unity::scopes::internal::ScopeImpl::fwd()@Base" 0replaceme |
42 | (c++)"unity::scopes::internal::ScopeImpl::create(std::shared_ptr<unity::scopes::internal::MWScope> const&, unity::scopes::internal::RuntimeImpl*, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&)@Base" 0.4.0+14.04.20140312.1 |
43 | (c++)"unity::scopes::internal::ScopeImpl::search(unity::scopes::CannedQuery const&, unity::scopes::SearchMetadata const&, std::shared_ptr<unity::scopes::SearchListenerBase> const&)@Base" 0.4.0+14.04.20140312.1 |
44 | (c++)"unity::scopes::internal::ScopeImpl::search(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, unity::scopes::FilterState const&, unity::scopes::SearchMetadata const&, std::shared_ptr<unity::scopes::SearchListenerBase> const&)@Base" 0.4.0+14.04.20140312.1 |
45 | @@ -575,14 +580,13 @@ |
46 | (c++)"unity::scopes::Runtime::registry() const@Base" 0.4.0+14.04.20140312.1 |
47 | (c++)"unity::scopes::Variant::get_double() const@Base" 0.4.0+14.04.20140312.1 |
48 | (c++)"unity::scopes::Variant::get_string() const@Base" 0.4.0+14.04.20140312.1 |
49 | + (c++)"unity::scopes::Variant::serialize_json() const@Base" 0replaceme |
50 | (c++)"unity::scopes::Variant::which() const@Base" 0.4.0+14.04.20140312.1 |
51 | (c++)"unity::scopes::Variant::get_int() const@Base" 0.4.0+14.04.20140312.1 |
52 | (c++)"unity::scopes::Variant::is_null() const@Base" 0.4.0+14.04.20140312.1 |
53 | (c++)"unity::scopes::Variant::get_bool() const@Base" 0.4.0+14.04.20140312.1 |
54 | (c++)"unity::scopes::Variant::get_dict() const@Base" 0.4.0+14.04.20140312.1 |
55 | (c++)"unity::scopes::Variant::get_array() const@Base" 0.4.0+14.04.20140312.1 |
56 | - (c++)"unity::scopes::Variant::deserialize_json(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&)@Base" 0.4.2+14.04.20140404.2 |
57 | - (c++)"unity::scopes::Variant::serialize_json() const@Base" 0.4.2+14.04.20140404.2 |
58 | (c++)"unity::scopes::Variant::operator==(unity::scopes::Variant const&) const@Base" 0.4.0+14.04.20140312.1 |
59 | (c++)"unity::scopes::Variant::operator<(unity::scopes::Variant const&) const@Base" 0.4.0+14.04.20140312.1 |
60 | (c++)"unity::scopes::testing::ScopeMetadataBuilder::operator()() const@Base" 0.4.0+14.04.20140312.1 |
61 | @@ -599,11 +603,11 @@ |
62 | (c++)"unity::scopes::Category::icon() const@Base" 0.4.0+14.04.20140312.1 |
63 | (c++)"unity::scopes::Category::title() const@Base" 0.4.0+14.04.20140312.1 |
64 | (c++)"unity::scopes::Category::serialize() const@Base" 0.4.0+14.04.20140312.1 |
65 | - (c++)"unity::scopes::internal::Executor::exec(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::vector<std::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::basic_string<char, std::char_traits<char>, std::allocator<char> > > > const&, std::map<std::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::less<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::allocator<std::pair<std::basic_string<char, std::char_traits<char>, std::allocator<char> > const, std::basic_string<char, std::char_traits<char>, std::allocator<char> > > > > const&, core::posix::StandardStream const&)@Base" 0replaceme |
66 | - (c++)"unity::scopes::internal::Executor::Executor()@Base" 0replaceme |
67 | - (c++)"unity::scopes::internal::Executor::~Executor()@Base" 0replaceme |
68 | + (c++)"unity::scopes::internal::RuntimeImpl::async_pool() const@Base" 0replaceme |
69 | (c++)"unity::scopes::internal::RuntimeImpl::configfile() const@Base" 0.4.2+14.04.20140404.2 |
70 | + (c++)"unity::scopes::internal::RuntimeImpl::future_queue() const@Base" 0replaceme |
71 | (c++)"unity::scopes::internal::RuntimeImpl::reply_reaper() const@Base" 0.4.0+14.04.20140312.1 |
72 | + (c++)"unity::scopes::internal::RuntimeImpl::waiter_thread(std::shared_ptr<unity::scopes::internal::ThreadSafeQueue<std::future<void> > > const&) const@Base" 0replaceme |
73 | (c++)"unity::scopes::internal::RuntimeImpl::proxy_to_string(std::shared_ptr<unity::scopes::Object> const&) const@Base" 0.4.0+14.04.20140312.1 |
74 | (c++)"unity::scopes::internal::RuntimeImpl::string_to_proxy(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) const@Base" 0.4.0+14.04.20140312.1 |
75 | (c++)"unity::scopes::internal::RuntimeImpl::registry_endpoint() const@Base" 0.4.0+14.04.20140312.1 |
76 | @@ -637,10 +641,10 @@ |
77 | (c++)"unity::scopes::internal::RuntimeConfig::registry_configfile() const@Base" 0.4.2+14.04.20140404.2 |
78 | (c++)"unity::scopes::internal::RuntimeConfig::default_middleware_configfile() const@Base" 0.4.2+14.04.20140404.2 |
79 | (c++)"unity::scopes::internal::MiddlewareBase::runtime() const@Base" 0.4.0+14.04.20140312.1 |
80 | - (c++)"unity::scopes::internal::RegistryConfig::click_installdir() const@Base" 0replaceme |
81 | (c++)"unity::scopes::internal::RegistryConfig::endpointdir() const@Base" 0.4.0+14.04.20140312.1 |
82 | (c++)"unity::scopes::internal::RegistryConfig::mw_configfile() const@Base" 0.4.0+14.04.20140312.1 |
83 | (c++)"unity::scopes::internal::RegistryConfig::oem_installdir() const@Base" 0.4.0+14.04.20140312.1 |
84 | + (c++)"unity::scopes::internal::RegistryConfig::click_installdir() const@Base" 0replaceme |
85 | (c++)"unity::scopes::internal::RegistryConfig::scope_installdir() const@Base" 0.4.0+14.04.20140312.1 |
86 | (c++)"unity::scopes::internal::RegistryConfig::scoperunner_path() const@Base" 0.4.0+14.04.20140312.1 |
87 | (c++)"unity::scopes::internal::RegistryConfig::ss_registry_endpoint() const@Base" 0.4.0+14.04.20140312.1 |
88 | @@ -671,7 +675,7 @@ |
89 | (c++)"unity::scopes::internal::ScopeMetadataImpl::invisible() const@Base" 0.4.0+14.04.20140312.1 |
90 | (c++)"unity::scopes::internal::ScopeMetadataImpl::serialize() const@Base" 0.4.0+14.04.20140312.1 |
91 | (c++)"unity::scopes::internal::StateReceiverObject::state_received() const@Base" 0.4.2+14.04.20140404.2 |
92 | - (c++)"unity::scopes::internal::ScopeImpl::fwd() const@Base" 0.4.0+14.04.20140312.1 |
93 | + (c++)"unity::scopes::QueryBase::valid() const@Base" 0replaceme |
94 | (c++)"unity::scopes::ScopeBase::scope_directory() const@Base" 0.4.2+14.04.20140404.2 |
95 | (c++)"typeinfo for unity::scopes::Annotation@Base" 0.4.0+14.04.20140312.1 |
96 | (c++)"typeinfo for unity::scopes::FilterBase@Base" 0.4.0+14.04.20140312.1 |
97 | |
98 | === modified file 'demo/client.cpp' |
99 | --- demo/client.cpp 2014-04-23 11:33:49 +0000 |
100 | +++ demo/client.cpp 2014-04-28 05:57:56 +0000 |
101 | @@ -122,8 +122,9 @@ |
102 | { |
103 | public: |
104 | Receiver(int index_to_save) |
105 | - : index_to_save_(index_to_save), |
106 | - push_result_count_(0) |
107 | + : query_complete_(false), |
108 | + push_result_count_(0), |
109 | + index_to_save_(index_to_save) |
110 | { |
111 | } |
112 | |
113 | @@ -212,23 +213,23 @@ |
114 | return push_result_count_; |
115 | } |
116 | |
117 | - Receiver() : |
118 | - query_complete_(false) |
119 | - { |
120 | - } |
121 | - |
122 | private: |
123 | bool query_complete_; |
124 | + int push_result_count_ = 0; |
125 | int index_to_save_; |
126 | - mutex mutex_; |
127 | - condition_variable condvar_; |
128 | - int push_result_count_ = 0; |
129 | std::shared_ptr<Result> saved_result_; |
130 | + mutex mutex_; |
131 | + condition_variable condvar_; |
132 | }; |
133 | |
134 | class ActivationReceiver : public ActivationListenerBase |
135 | { |
136 | public: |
137 | + ActivationReceiver() |
138 | + : query_complete_(false) |
139 | + { |
140 | + } |
141 | + |
142 | void activated(ActivationResponse const& response) override |
143 | { |
144 | cout << "\tGot activation response: " << response.status() << endl; |
145 | @@ -237,16 +238,19 @@ |
146 | void finished(Reason r, std::string const& error_message) |
147 | { |
148 | cout << "\tActivation finished, reason: " << r << ", error_message: " << error_message << endl; |
149 | + lock_guard<decltype(mutex_)> lock(mutex_); |
150 | + query_complete_ = true; |
151 | condvar_.notify_one(); |
152 | } |
153 | |
154 | void wait_until_finished() |
155 | { |
156 | unique_lock<decltype(mutex_)> lock(mutex_); |
157 | - condvar_.wait(lock); |
158 | + condvar_.wait(lock, [this](){ return query_complete_; }); |
159 | } |
160 | |
161 | private: |
162 | + bool query_complete_; |
163 | mutex mutex_; |
164 | condition_variable condvar_; |
165 | }; |
166 | @@ -254,6 +258,11 @@ |
167 | class PreviewReceiver : public PreviewListenerBase |
168 | { |
169 | public: |
170 | + PreviewReceiver() |
171 | + : query_complete_(false) |
172 | + { |
173 | + } |
174 | + |
175 | void push(ColumnLayoutList const& columns) override |
176 | { |
177 | cout << "\tGot column layouts:" << endl; |
178 | @@ -297,17 +306,20 @@ |
179 | |
180 | void finished(Reason r, std::string const& error_message) override |
181 | { |
182 | + lock_guard<decltype(mutex_)> lock(mutex_); |
183 | cout << "\tPreview finished, reason: " << r << ", error_message: " << error_message << endl; |
184 | + query_complete_ = true; |
185 | condvar_.notify_one(); |
186 | } |
187 | |
188 | void wait_until_finished() |
189 | { |
190 | unique_lock<decltype(mutex_)> lock(mutex_); |
191 | - condvar_.wait(lock); |
192 | + condvar_.wait(lock, [this](){ return query_complete_; }); |
193 | } |
194 | |
195 | private: |
196 | + bool query_complete_; |
197 | mutex mutex_; |
198 | condition_variable condvar_; |
199 | }; |
200 | @@ -434,9 +446,7 @@ |
201 | SearchMetadata metadata("C", "desktop"); |
202 | metadata.set_cardinality(10); |
203 | auto ctrl = meta.proxy()->search(search_string, metadata, reply); // May raise TimeoutException |
204 | - cout << "client: created query" << endl; |
205 | reply->wait_until_finished(); |
206 | - cout << "client: wait returned" << endl; |
207 | |
208 | // handle activation |
209 | if (result_index > 0) |
210 | |
211 | === modified file 'demo/scopes/scope-A/scope-A.cpp' |
212 | --- demo/scopes/scope-A/scope-A.cpp 2014-02-27 23:20:56 +0000 |
213 | +++ demo/scopes/scope-A/scope-A.cpp 2014-04-28 05:57:56 +0000 |
214 | @@ -31,13 +31,16 @@ |
215 | class MyQuery : public SearchQueryBase |
216 | { |
217 | public: |
218 | - MyQuery(CannedQuery const& query) : |
219 | + MyQuery(string const& scope_id, CannedQuery const& query) : |
220 | + scope_id_(scope_id), |
221 | query_(query) |
222 | { |
223 | + cerr << scope_id_ << ": query instance for \"" << query.query_string() << "\" created" << endl; |
224 | } |
225 | |
226 | ~MyQuery() |
227 | { |
228 | + cerr << scope_id_ << ": query instance for \"" << query_.query_string() << "\" destroyed" << endl; |
229 | } |
230 | |
231 | virtual void cancelled() override |
232 | @@ -46,6 +49,11 @@ |
233 | |
234 | virtual void run(SearchReplyProxy const& reply) override |
235 | { |
236 | + if (!valid()) |
237 | + { |
238 | + return; // Query was cancelled |
239 | + } |
240 | + |
241 | DepartmentList departments({{"news", query_, "News", {{"news-world", query_, "World"}, {"news-europe", query_, "Europe"}}}, |
242 | {"sport", query_, "Sport"}}); |
243 | reply->register_departments(departments); |
244 | @@ -56,7 +64,10 @@ |
245 | filter->add_option("2", "Option 2"); |
246 | filters.push_back(filter); |
247 | FilterState filter_state; // TODO: push real state from query obj |
248 | - reply->push(filters, filter_state); |
249 | + if (!reply->push(filters, filter_state)) |
250 | + { |
251 | + return; // Query was cancelled |
252 | + } |
253 | |
254 | CategoryRenderer rdr; |
255 | auto cat = reply->register_category("cat1", "Category 1", "", rdr); |
256 | @@ -65,30 +76,37 @@ |
257 | res.set_title("scope-A: result 1 for query \"" + query_.query_string() + "\""); |
258 | res.set_art("icon"); |
259 | res.set_dnd_uri("dnd_uri"); |
260 | - reply->push(res); |
261 | + if (!reply->push(res)) |
262 | + { |
263 | + return; // Query was cancelled |
264 | + } |
265 | |
266 | CannedQuery q("scope-A", query_.query_string(), ""); |
267 | Annotation annotation(Annotation::Type::Link); |
268 | annotation.add_link("More...", q); |
269 | reply->register_annotation(annotation); |
270 | |
271 | - cout << "scope-A: query \"" << query_.query_string() << "\" complete" << endl; |
272 | + cerr << "scope-A: query \"" << query_.query_string() << "\" complete" << endl; |
273 | } |
274 | |
275 | private: |
276 | + string scope_id_; |
277 | CannedQuery query_; |
278 | }; |
279 | |
280 | class MyPreview : public PreviewQueryBase |
281 | { |
282 | public: |
283 | - MyPreview(string const& uri) : |
284 | + MyPreview(string const& scope_id, string const& uri) : |
285 | + scope_id_(scope_id), |
286 | uri_(uri) |
287 | { |
288 | + cerr << scope_id_ << ": preview instance for \"" << uri << "\" created" << endl; |
289 | } |
290 | |
291 | ~MyPreview() |
292 | { |
293 | + cerr << scope_id_ << ": preview instance for \"" << uri_ << "\" destroyed" << endl; |
294 | } |
295 | |
296 | virtual void cancelled() override |
297 | @@ -119,21 +137,32 @@ |
298 | layout3col.add_column({"rating"}); |
299 | |
300 | reply->register_layout({layout1col, layout2col, layout3col}); |
301 | - reply->push(widgets); |
302 | - reply->push("author", Variant("Foo")); |
303 | - reply->push("rating", Variant("4 blah")); |
304 | - cout << "scope-A: preview for \"" << uri_ << "\" complete" << endl; |
305 | + if (!reply->push(widgets)) |
306 | + { |
307 | + return; // Query was cancelled |
308 | + } |
309 | + if (!reply->push("author", Variant("Foo"))) |
310 | + { |
311 | + return; // Query was cancelled |
312 | + } |
313 | + if (!reply->push("rating", Variant("4 blah"))) |
314 | + { |
315 | + return; // Query was cancelled |
316 | + } |
317 | + cerr << "scope-A: preview for \"" << uri_ << "\" complete" << endl; |
318 | } |
319 | |
320 | private: |
321 | + string scope_id_; |
322 | string uri_; |
323 | }; |
324 | |
325 | class MyScope : public ScopeBase |
326 | { |
327 | public: |
328 | - virtual int start(string const&, RegistryProxy const&) override |
329 | + virtual int start(string const& scope_id, RegistryProxy const&) override |
330 | { |
331 | + scope_id_ = scope_id; |
332 | return VERSION; |
333 | } |
334 | |
335 | @@ -141,17 +170,19 @@ |
336 | |
337 | virtual SearchQueryBase::UPtr search(CannedQuery const& q, SearchMetadata const&) override |
338 | { |
339 | - SearchQueryBase::UPtr query(new MyQuery(q)); |
340 | - cout << "scope-A: created query: \"" << q.query_string() << "\"" << endl; |
341 | + SearchQueryBase::UPtr query(new MyQuery(scope_id_, q)); |
342 | return query; |
343 | } |
344 | |
345 | virtual PreviewQueryBase::UPtr preview(Result const& result, ActionMetadata const&) override |
346 | { |
347 | - PreviewQueryBase::UPtr preview(new MyPreview(result.uri())); |
348 | - cout << "scope-A: created previewer: \"" << result.uri() << "\"" << endl; |
349 | + PreviewQueryBase::UPtr preview(new MyPreview(scope_id_, result.uri())); |
350 | + cerr << "scope-A: created previewer: \"" << result.uri() << "\"" << endl; |
351 | return preview; |
352 | } |
353 | + |
354 | +private: |
355 | + string scope_id_; |
356 | }; |
357 | |
358 | extern "C" |
359 | |
360 | === modified file 'demo/scopes/scope-B/scope-B.cpp' |
361 | --- demo/scopes/scope-B/scope-B.cpp 2014-04-03 12:57:25 +0000 |
362 | +++ demo/scopes/scope-B/scope-B.cpp 2014-04-28 05:57:56 +0000 |
363 | @@ -47,12 +47,12 @@ |
364 | public: |
365 | virtual void push(Category::SCPtr category) override |
366 | { |
367 | - cout << "received category: id=" << category->id() << endl; |
368 | + cerr << scope_id_ << ": received category: id=" << category->id() << endl; |
369 | } |
370 | |
371 | virtual void push(CategorisedResult result) override |
372 | { |
373 | - cout << "received result from " << scope_id_ << ": " << result.uri() << ", " << result.title() << endl; |
374 | + cerr << scope_id_ << ": received result: " << result.uri() << ", " << result.title() << endl; |
375 | try |
376 | { |
377 | result.set_category(upstream_->lookup_category("catB")); |
378 | @@ -60,18 +60,18 @@ |
379 | } |
380 | catch (const unity::InvalidArgumentException &e) |
381 | { |
382 | - cerr << "error pushing result: " << e.what() << endl; |
383 | + cerr << scope_id_ << ": error pushing result: " << e.what() << endl; |
384 | } |
385 | } |
386 | |
387 | virtual void finished(Reason reason, string const& error_message) override |
388 | { |
389 | - cout << "query to " << scope_id_ << " complete, status: " << to_string(reason); |
390 | + cerr << scope_id_ << ": subquery complete, status: " << to_string(reason); |
391 | if (reason == ListenerBase::Error) |
392 | { |
393 | - cout << ": " << error_message; |
394 | + cerr << ": " << error_message; |
395 | } |
396 | - cout << endl; |
397 | + cerr << endl; |
398 | } |
399 | |
400 | Receiver(string const& scope_id, SearchReplyProxy const& upstream) : |
401 | @@ -101,11 +101,16 @@ |
402 | |
403 | virtual void cancelled() |
404 | { |
405 | - cout << "query to " << scope_id_ << " was cancelled" << endl; |
406 | + cerr << scope_id_ << ": query " << query_.query_string() << " was cancelled" << endl; |
407 | } |
408 | |
409 | virtual void run(SearchReplyProxy const& upstream_reply) |
410 | { |
411 | + if (!valid()) |
412 | + { |
413 | + return; // Query was cancelled |
414 | + } |
415 | + |
416 | // note, category id must mach categories received from scope C and D, otherwise result pushing will fail. |
417 | try |
418 | { |
419 | @@ -157,13 +162,13 @@ |
420 | virtual SearchQueryBase::UPtr search(CannedQuery const& q, SearchMetadata const&) override |
421 | { |
422 | SearchQueryBase::UPtr query(new MyQuery(scope_id_, q, scope_c_, scope_d_)); |
423 | - cout << "scope-B: created query: \"" << q.query_string() << "\"" << endl; |
424 | + cerr << "scope-B: created query: \"" << q.query_string() << "\"" << endl; |
425 | return query; |
426 | } |
427 | |
428 | virtual PreviewQueryBase::UPtr preview(Result const& result, ActionMetadata const&) override |
429 | { |
430 | - cout << "scope-B: preview: \"" << result.uri() << "\"" << endl; |
431 | + cerr << "scope-B: preview: \"" << result.uri() << "\"" << endl; |
432 | return nullptr; |
433 | } |
434 | |
435 | |
436 | === modified file 'demo/scopes/scope-C/scope-C.cpp' |
437 | --- demo/scopes/scope-C/scope-C.cpp 2014-04-11 12:04:41 +0000 |
438 | +++ demo/scopes/scope-C/scope-C.cpp 2014-04-28 05:57:56 +0000 |
439 | @@ -129,16 +129,17 @@ |
440 | class MyQuery : public SearchQueryBase |
441 | { |
442 | public: |
443 | - MyQuery(CannedQuery const& query, Queue& queue) : |
444 | + MyQuery(string const& scope_id, CannedQuery const& query, Queue& queue) : |
445 | + scope_id_(scope_id), |
446 | query_(query), |
447 | queue_(queue) |
448 | { |
449 | - cerr << "My Query created" << endl; |
450 | + cerr << scope_id_ << ": query instance for \"" << query.query_string() << "\" created" << endl; |
451 | } |
452 | |
453 | ~MyQuery() |
454 | { |
455 | - cerr << "My Query destroyed" << endl; |
456 | + cerr << scope_id_ << ": query instance for \"" << query_.query_string() << "\" destroyed" << endl; |
457 | } |
458 | |
459 | virtual void cancelled() override |
460 | @@ -148,16 +149,21 @@ |
461 | // query, the worker thread's next call to push() will return false, |
462 | // causing the worker thread to stop working on this query. |
463 | queue_.remove(this); |
464 | - cerr << "scope-C: \"" + query_.to_uri() + "\" cancelled" << endl; |
465 | + cerr << scope_id_ << ": \"" + query_.to_uri() + "\" cancelled" << endl; |
466 | } |
467 | |
468 | virtual void run(SearchReplyProxy const& reply) override |
469 | { |
470 | + if (!valid()) |
471 | + { |
472 | + return; // Query was cancelled |
473 | + } |
474 | + |
475 | queue_.put(this, query_.query_string(), reply); |
476 | - cerr << "scope-C: run() returning" << endl; |
477 | } |
478 | |
479 | private: |
480 | + string scope_id_; |
481 | CannedQuery query_; |
482 | Queue& queue_; |
483 | }; |
484 | @@ -205,7 +211,7 @@ |
485 | { |
486 | CategorisedResult result(cat); |
487 | result.set_uri("uri"); |
488 | - result.set_title("scope-C: result " + to_string(i) + " for query \"" + query + "\""); |
489 | + result.set_title(scope_id_ + ": result " + to_string(i) + " for query \"" + query + "\""); |
490 | result.set_art("icon"); |
491 | result.set_dnd_uri("dnd_uri"); |
492 | result.set_intercept_activation(); |
493 | @@ -215,24 +221,24 @@ |
494 | } |
495 | sleep(1); |
496 | } |
497 | + cerr << scope_id_ << ": query \"" << query << "\" complete" << endl; |
498 | } |
499 | } |
500 | |
501 | virtual SearchQueryBase::UPtr search(CannedQuery const& q, SearchMetadata const&) override |
502 | { |
503 | - cout << scope_id_ << ": created query: \"" << q.query_string() << "\"" << endl; |
504 | - return SearchQueryBase::UPtr(new MyQuery(q, queue)); |
505 | + return SearchQueryBase::UPtr(new MyQuery(scope_id_, q, queue)); |
506 | } |
507 | |
508 | virtual ActivationQueryBase::UPtr activate(Result const& result, ActionMetadata const& /* hints */) override |
509 | { |
510 | - cout << scope_id_ << ": activate: \"" << result.uri() << "\"" << endl; |
511 | + cerr << scope_id_ << ": activate: \"" << result.uri() << "\"" << endl; |
512 | return ActivationQueryBase::UPtr(new MyActivation()); |
513 | } |
514 | |
515 | virtual PreviewQueryBase::UPtr preview(Result const& result, ActionMetadata const&) override |
516 | { |
517 | - cout << "scope-C: preview: \"" << result.uri() << "\"" << endl; |
518 | + cerr << scope_id_ << ": preview: \"" << result.uri() << "\"" << endl; |
519 | return nullptr; |
520 | } |
521 | |
522 | |
523 | === modified file 'demo/scopes/scope-D/scope-D.cpp' |
524 | --- demo/scopes/scope-D/scope-D.cpp 2014-04-03 12:57:25 +0000 |
525 | +++ demo/scopes/scope-D/scope-D.cpp 2014-04-28 05:57:56 +0000 |
526 | @@ -132,12 +132,12 @@ |
527 | query_(query), |
528 | queue_(queue) |
529 | { |
530 | - cerr << "query instance for \"" << scope_id_ << ":" << query.query_string() << "\" created" << endl; |
531 | + cerr << scope_id_ << ": query instance for \"" << query.query_string() << "\" created" << endl; |
532 | } |
533 | |
534 | virtual ~MyQuery() |
535 | { |
536 | - cerr << "query instance for \"" << scope_id_ << ":" << query_.query_string() << "\" destroyed" << endl; |
537 | + cerr << scope_id_ << ": query instance for \"" << query_.query_string() << "\" destroyed" << endl; |
538 | } |
539 | |
540 | virtual void cancelled() override |
541 | @@ -151,11 +151,16 @@ |
542 | // work may still be in progress on a query. Note that cancellations are frequent; |
543 | // not responding to cancelled() correctly causes loss of performance. |
544 | |
545 | - cerr << "query for \"" << scope_id_ << ":" << query_.query_string() << "\" cancelled" << endl; |
546 | + cerr << scope_id_ << ": query for \"" << query_.query_string() << "\" cancelled" << endl; |
547 | } |
548 | |
549 | virtual void run(SearchReplyProxy const& reply) override |
550 | { |
551 | + if (!valid()) |
552 | + { |
553 | + return; // Query was cancelled |
554 | + } |
555 | + |
556 | // The query can do anything it likes with this method, that is, run() can push results |
557 | // directly on the provided reply, or it can save the reply for later use and return from |
558 | // run(). It is OK to push results on the reply from a different thread. |
559 | @@ -226,13 +231,12 @@ |
560 | virtual SearchQueryBase::UPtr search(CannedQuery const& q, SearchMetadata const&) override |
561 | { |
562 | SearchQueryBase::UPtr query(new MyQuery(scope_id_, q, queue_)); |
563 | - cerr << scope_id_ << ": created query: \"" << q.query_string() << "\"" << endl; |
564 | return query; |
565 | } |
566 | |
567 | virtual PreviewQueryBase::UPtr preview(Result const& result, ActionMetadata const&) override |
568 | { |
569 | - cout << scope_id_ << ": preview: \"" << result.uri() << "\"" << endl; |
570 | + cerr << scope_id_ << ": preview: \"" << result.uri() << "\"" << endl; |
571 | return nullptr; |
572 | } |
573 | |
574 | |
575 | === modified file 'demo/scopes/scope-N/scope-N.cpp' |
576 | --- demo/scopes/scope-N/scope-N.cpp 2014-02-27 23:20:56 +0000 |
577 | +++ demo/scopes/scope-N/scope-N.cpp 2014-04-28 05:57:56 +0000 |
578 | @@ -32,32 +32,44 @@ |
579 | class MyQuery : public SearchQueryBase |
580 | { |
581 | public: |
582 | - MyQuery() |
583 | + MyQuery(string const& scope_id) |
584 | + : scope_id_(scope_id) |
585 | { |
586 | + cerr << scope_id_ << ": query instance created" << endl; |
587 | } |
588 | |
589 | ~MyQuery() |
590 | { |
591 | + cerr << scope_id_ << ": query instance destroyed" << endl; |
592 | } |
593 | |
594 | virtual void cancelled() override |
595 | { |
596 | - cerr << "scope-no-op: received cancel request" << endl; |
597 | + cerr << scope_id_ << ": received cancel request" << endl; |
598 | } |
599 | |
600 | virtual void run(SearchReplyProxy const&) override |
601 | { |
602 | - cerr << "scope-no-op: received query" << endl; |
603 | + if (!valid()) |
604 | + { |
605 | + return; // Query was cancelled |
606 | + } |
607 | + |
608 | + cerr << scope_id_ << ": received query" << endl; |
609 | this_thread::sleep_for(chrono::seconds(3)); |
610 | - cerr << "scope-no-op: query complete" << endl; |
611 | + cerr << scope_id_ << ": query complete" << endl; |
612 | } |
613 | + |
614 | +private: |
615 | + string scope_id_; |
616 | }; |
617 | |
618 | class MyScope : public ScopeBase |
619 | { |
620 | public: |
621 | - virtual int start(string const&, RegistryProxy const&) override |
622 | + virtual int start(string const& scope_id, RegistryProxy const&) override |
623 | { |
624 | + scope_id_ = scope_id; |
625 | return VERSION; |
626 | } |
627 | |
628 | @@ -65,13 +77,16 @@ |
629 | |
630 | virtual SearchQueryBase::UPtr search(CannedQuery const&, SearchMetadata const&) override |
631 | { |
632 | - return SearchQueryBase::UPtr(new MyQuery); |
633 | + return SearchQueryBase::UPtr(new MyQuery(scope_id_)); |
634 | } |
635 | |
636 | virtual PreviewQueryBase::UPtr preview(Result const&, ActionMetadata const&) override |
637 | { |
638 | return nullptr; |
639 | } |
640 | + |
641 | +private: |
642 | + string scope_id_; |
643 | }; |
644 | |
645 | extern "C" |
646 | |
647 | === modified file 'demo/scopes/scope-S/scope-S.cpp' |
648 | --- demo/scopes/scope-S/scope-S.cpp 2014-03-04 03:15:11 +0000 |
649 | +++ demo/scopes/scope-S/scope-S.cpp 2014-04-28 05:57:56 +0000 |
650 | @@ -36,36 +36,49 @@ |
651 | class MyQuery : public SearchQueryBase |
652 | { |
653 | public: |
654 | - MyQuery(CannedQuery const& query, CategoryRenderer const& renderer) : |
655 | + MyQuery(string const& scope_id, CannedQuery const& query, CategoryRenderer const& renderer) : |
656 | + scope_id_(scope_id), |
657 | query_(query), |
658 | renderer_(renderer) |
659 | { |
660 | - cerr << "MyQuery/" << query.query_string() << " created" << endl; |
661 | + cerr << scope_id_ << ": query instance for \"" << query.query_string() << "\" created" << endl; |
662 | } |
663 | |
664 | ~MyQuery() |
665 | { |
666 | - cerr << "MyQuery/" << query_.query_string() << " destroyed" << endl; |
667 | + cerr << scope_id_ << ": query instance for \"" << query_.query_string() << "\" destroyed" << endl; |
668 | } |
669 | |
670 | virtual void cancelled() override |
671 | { |
672 | - cerr << "MyQuery/" << query_.query_string() << " cancelled" << endl; |
673 | + cerr << scope_id_ << ": \"" + query_.to_uri() + "\" cancelled" << endl; |
674 | } |
675 | |
676 | virtual void run(SearchReplyProxy const& reply) override |
677 | { |
678 | - cerr << "scope-slow: run called for \"" << query_.query_string() << "\"" << endl; |
679 | - this_thread::sleep_for(chrono::seconds(20)); |
680 | + if (!valid()) |
681 | + { |
682 | + return; // Query was cancelled |
683 | + } |
684 | + |
685 | + cerr << scope_id_ << ": run called for \"" << query_.query_string() << "\"" << endl; |
686 | + int const short_secs = 5; |
687 | + cerr << scope_id_ << ": sleeping for " << short_secs << " seconds" << endl; |
688 | + this_thread::sleep_for(chrono::seconds(short_secs)); |
689 | auto cat = reply->register_category("cat1", "Category 1", "", renderer_); |
690 | CategorisedResult result(cat); |
691 | result.set_uri("uri"); |
692 | - result.set_title("scope-slow: result 1 for query \"" + query_.query_string() + "\""); |
693 | + result.set_title(scope_id_ + ": result 1 for query \"" + query_.query_string() + "\""); |
694 | + cerr << scope_id_ << ": pushing result" << endl; |
695 | reply->push(result); |
696 | - cout << "scope-slow: query \"" << query_.query_string() << "\" complete" << endl; |
697 | + int const long_secs = 50; |
698 | + cerr << scope_id_ << ": sleeping for " << long_secs << " seconds" << endl; |
699 | + this_thread::sleep_for(chrono::seconds(long_secs)); |
700 | + cout << scope_id_ << ": query \"" << query_.query_string() << "\" complete" << endl; |
701 | } |
702 | |
703 | private: |
704 | + string scope_id_; |
705 | CannedQuery query_; |
706 | CategoryRenderer renderer_; |
707 | }; |
708 | @@ -73,35 +86,29 @@ |
709 | class MyScope : public ScopeBase |
710 | { |
711 | public: |
712 | - virtual int start(string const&, RegistryProxy const&) override |
713 | + virtual int start(string const& scope_id, RegistryProxy const&) override |
714 | { |
715 | + scope_id_ = scope_id; |
716 | return VERSION; |
717 | } |
718 | |
719 | virtual void stop() override {} |
720 | |
721 | - virtual SearchQueryBase::UPtr search(CannedQuery const& q, SearchMetadata const& hints) override |
722 | + virtual SearchQueryBase::UPtr search(CannedQuery const& q, SearchMetadata const& /* hints */) override |
723 | { |
724 | - SearchQueryBase::UPtr query(new MyQuery(q, renderer_)); |
725 | - cout << "scope-slow: created query: \"" << q.query_string() << "\"" << endl; |
726 | - |
727 | - if (hints.cardinality() > 0) |
728 | - { |
729 | - cerr << "result cardinality: " << hints.cardinality() << endl; |
730 | - } |
731 | - |
732 | - cerr << "locale: " << hints.locale() << endl; |
733 | - |
734 | + SearchQueryBase::UPtr query(new MyQuery(scope_id_, q, renderer_)); |
735 | + cout << scope_id_ << ": created query: \"" << q.query_string() << "\"" << endl; |
736 | return query; |
737 | } |
738 | |
739 | virtual PreviewQueryBase::UPtr preview(Result const& result, ActionMetadata const&) override |
740 | { |
741 | - cout << "scope-S: preview: \"" << result.uri() << "\"" << endl; |
742 | + cout << scope_id_ << ": preview: \"" << result.uri() << "\"" << endl; |
743 | return nullptr; |
744 | } |
745 | |
746 | private: |
747 | + string scope_id_; |
748 | CategoryRenderer renderer_; |
749 | }; |
750 | |
751 | |
752 | === modified file 'include/unity/scopes/PreviewQueryBase.h' |
753 | --- include/unity/scopes/PreviewQueryBase.h 2014-03-05 02:42:49 +0000 |
754 | +++ include/unity/scopes/PreviewQueryBase.h 2014-04-28 05:57:56 +0000 |
755 | @@ -76,7 +76,7 @@ |
756 | |
757 | \param reply The proxy on which to push results for the preview. |
758 | */ |
759 | - virtual void run(PreviewReplyProxy const& reply) = 0; // Called by the run time to start this query |
760 | + virtual void run(PreviewReplyProxy const& reply) = 0; // Called by the run time to start this preview |
761 | |
762 | // TODO: Add a method for subpreview request? |
763 | |
764 | |
765 | === modified file 'include/unity/scopes/QueryBase.h' |
766 | --- include/unity/scopes/QueryBase.h 2014-03-10 05:51:39 +0000 |
767 | +++ include/unity/scopes/QueryBase.h 2014-04-28 05:57:56 +0000 |
768 | @@ -72,10 +72,28 @@ |
769 | Your implementation of this method should ensure that the scope stops |
770 | processing the current query as soon as possible. Any calls to a `push()` method |
771 | once a query is cancelled are ignored, so continuing to push after cancellation |
772 | - only wastes CPU cycles. |
773 | + only wastes CPU cycles. (`push()` returns `false` once a query is cancelled or |
774 | + exceeds its cardinality limit.) |
775 | */ |
776 | virtual void cancelled() = 0; // Originator cancelled the query |
777 | |
778 | + /** |
779 | + \brief Check whether this query is still valid. |
780 | + |
781 | + valid() returns false if this query is finished or was cancelled earlier. Note that it is possible |
782 | + that the run time may call SearchQueryBase::run(), ActivationQueryBase::activate(), or PreviewQueryBase::run() |
783 | + \a after cancelled() was called. Your implementation of these methods should check whether the query is still |
784 | + valid and, if not, do nothing. |
785 | + |
786 | + This method is provided mainly for convenience: it can be used in your s `run()` or `activate()` implementation to |
787 | + avoid doing a lot of work setting up a query that was cancelled earlier. Note that, because cancellation |
788 | + can happen at any time during query execution, your implementation should always test the return value |
789 | + of `push()`. If `push()` returns `false`, the query was either cancelled or exceeded its cardinality limit. |
790 | + Either way, there is no point in continuing to push more results because, once `push()` returns `false`, |
791 | + the scopes run time discards all subsequent results for the query. |
792 | + */ |
793 | + bool valid() const; |
794 | + |
795 | /// @cond |
796 | virtual ~QueryBase(); |
797 | /// @endcond |
798 | |
799 | === modified file 'include/unity/scopes/internal/ObjectImpl.h' |
800 | --- include/unity/scopes/internal/ObjectImpl.h 2014-04-03 12:57:25 +0000 |
801 | +++ include/unity/scopes/internal/ObjectImpl.h 2014-04-28 05:57:56 +0000 |
802 | @@ -22,6 +22,8 @@ |
803 | #include<unity/scopes/internal/MWObjectProxyFwd.h> |
804 | #include<unity/scopes/Object.h> |
805 | |
806 | +#include <mutex> |
807 | + |
808 | namespace unity |
809 | { |
810 | |
811 | @@ -31,7 +33,7 @@ |
812 | namespace internal |
813 | { |
814 | |
815 | -class ObjectImpl : public virtual Object |
816 | +class ObjectImpl : public virtual Object, public virtual std::enable_shared_from_this<ObjectImpl> |
817 | { |
818 | public: |
819 | ObjectImpl(MWProxy const& mw_proxy); |
820 | @@ -48,10 +50,17 @@ |
821 | virtual void ping(); |
822 | |
823 | protected: |
824 | - MWProxy proxy() const; // Non-virtual because we cannot use covariance with incomplete types. |
825 | - // Each derived proxy implements a non-virtual fwd() method |
826 | - // that is called from within each operation to down-cast the MWProxy. |
827 | + MWProxy proxy(); // Non-virtual because we cannot use covariance with incomplete types. |
828 | + // Each derived proxy implements a non-virtual fwd() method |
829 | + // that is called from within each operation to down-cast the MWProxy. |
830 | + |
831 | + void set_proxy(MWProxy const& p); // Allows a derived proxy to replace mw_proxy_ for asynchronous twoway calls. |
832 | + |
833 | MWProxy mw_proxy_; |
834 | + std::mutex proxy_mutex_; // Protects mw_proxy_ |
835 | + |
836 | +private: |
837 | + void check_proxy(); // Throws from operations if mw_proxy_ is null |
838 | }; |
839 | |
840 | } // namespace internal |
841 | |
842 | === modified file 'include/unity/scopes/internal/QueryBaseImpl.h' |
843 | --- include/unity/scopes/internal/QueryBaseImpl.h 2014-03-10 04:17:09 +0000 |
844 | +++ include/unity/scopes/internal/QueryBaseImpl.h 2014-04-28 05:57:56 +0000 |
845 | @@ -63,10 +63,13 @@ |
846 | |
847 | void cancel(); |
848 | void set_metadata(SearchMetadata const& metadata); |
849 | + bool valid() const; |
850 | |
851 | private: |
852 | SearchMetadata::UPtr search_metadata_; |
853 | std::vector<QueryCtrlProxy> subqueries_; |
854 | + bool valid_; |
855 | + mutable std::mutex mutex_; |
856 | }; |
857 | |
858 | } // namespace internal |
859 | |
860 | === modified file 'include/unity/scopes/internal/QueryCtrlImpl.h' |
861 | --- include/unity/scopes/internal/QueryCtrlImpl.h 2014-03-07 02:18:16 +0000 |
862 | +++ include/unity/scopes/internal/QueryCtrlImpl.h 2014-04-28 05:57:56 +0000 |
863 | @@ -33,11 +33,7 @@ |
864 | namespace internal |
865 | { |
866 | |
867 | -// Proxy used by a scope to push results for a query. |
868 | -// To indicate that it has sent the last result, the scope must call finished(). |
869 | -// Subsequent calls to finished() are ignored. |
870 | -// Calls to push() after finished() was called are ignored. |
871 | -// If the proxy goes out of scope before finished was called, it implicitly calls finished(). |
872 | +class ScopeImpl; |
873 | |
874 | class QueryCtrlImpl : public virtual unity::scopes::QueryCtrl, public virtual ObjectImpl |
875 | { |
876 | @@ -47,10 +43,17 @@ |
877 | |
878 | virtual void cancel() override; |
879 | |
880 | + void set_proxy(MWQueryCtrlProxy const& p); |
881 | + |
882 | private: |
883 | - MWQueryCtrlProxy fwd() const; |
884 | + MWQueryCtrlProxy fwd(); |
885 | |
886 | MWReplyProxy reply_proxy_; |
887 | + bool ready_; // True once ObjectImpl::set_proxy() was called |
888 | + bool cancelled_; // True if cancel() is called before set_proxy() was called |
889 | + std::mutex mutex_; // Protects ready_ and cancelled_ |
890 | + |
891 | + friend class ScopeImpl; // Allows ScopeImpl to call set_proxy() |
892 | }; |
893 | |
894 | } // namespace internal |
895 | |
896 | === modified file 'include/unity/scopes/internal/QueryObject.h' |
897 | --- include/unity/scopes/internal/QueryObject.h 2014-03-04 06:26:21 +0000 |
898 | +++ include/unity/scopes/internal/QueryObject.h 2014-04-28 05:57:56 +0000 |
899 | @@ -24,8 +24,6 @@ |
900 | #include <unity/scopes/internal/MWQueryCtrlProxyFwd.h> |
901 | #include <unity/scopes/ReplyProxyFwd.h> |
902 | |
903 | -#include <atomic> |
904 | - |
905 | namespace unity |
906 | { |
907 | |
908 | @@ -71,9 +69,10 @@ |
909 | MWReplyProxy reply_; |
910 | std::weak_ptr<unity::scopes::Reply> reply_proxy_; |
911 | MWQueryCtrlProxy const ctrl_; |
912 | - std::atomic_bool pushable_; |
913 | + bool pushable_; |
914 | QueryObjectBase::SPtr self_; |
915 | - std::atomic_int cardinality_; |
916 | + int cardinality_; |
917 | + mutable std::mutex mutex_; |
918 | }; |
919 | |
920 | } // namespace internal |
921 | |
922 | === modified file 'include/unity/scopes/internal/RegistryImpl.h' |
923 | --- include/unity/scopes/internal/RegistryImpl.h 2014-04-03 12:57:25 +0000 |
924 | +++ include/unity/scopes/internal/RegistryImpl.h 2014-04-28 05:57:56 +0000 |
925 | @@ -48,7 +48,7 @@ |
926 | ObjectProxy locate(std::string const& identity); |
927 | |
928 | private: |
929 | - MWRegistryProxy fwd() const; |
930 | + MWRegistryProxy fwd(); |
931 | }; |
932 | |
933 | } // namespace internal |
934 | |
935 | === modified file 'include/unity/scopes/internal/ReplyImpl.h' |
936 | --- include/unity/scopes/internal/ReplyImpl.h 2014-03-07 05:56:22 +0000 |
937 | +++ include/unity/scopes/internal/ReplyImpl.h 2014-04-28 05:57:56 +0000 |
938 | @@ -54,7 +54,7 @@ |
939 | protected: |
940 | bool push(VariantMap const& variant_map); |
941 | |
942 | - MWReplyProxy fwd() const; |
943 | + MWReplyProxy fwd(); |
944 | |
945 | private: |
946 | std::shared_ptr<QueryObjectBase> qo_; |
947 | |
948 | === modified file 'include/unity/scopes/internal/ReplyObject.h' |
949 | --- include/unity/scopes/internal/ReplyObject.h 2014-03-07 01:07:28 +0000 |
950 | +++ include/unity/scopes/internal/ReplyObject.h 2014-04-28 05:57:56 +0000 |
951 | @@ -59,7 +59,7 @@ |
952 | void finished(ListenerBase::Reason reason, std::string const& error_message) noexcept override; |
953 | |
954 | private: |
955 | - ListenerBase::SPtr const listener_base_; |
956 | + ListenerBase::SPtr listener_base_; |
957 | ReapItem::SPtr reap_item_; |
958 | std::atomic_bool finished_; |
959 | std::mutex mutex_; |
960 | |
961 | === modified file 'include/unity/scopes/internal/RuntimeImpl.h' |
962 | --- include/unity/scopes/internal/RuntimeImpl.h 2014-04-03 14:22:02 +0000 |
963 | +++ include/unity/scopes/internal/RuntimeImpl.h 2014-04-28 05:57:56 +0000 |
964 | @@ -22,10 +22,9 @@ |
965 | #include <unity/scopes/internal/MiddlewareBase.h> |
966 | #include <unity/scopes/internal/MiddlewareFactory.h> |
967 | #include <unity/scopes/internal/Reaper.h> |
968 | +#include <unity/scopes/internal/ThreadPool.h> |
969 | #include <unity/scopes/Runtime.h> |
970 | |
971 | -#include <atomic> |
972 | - |
973 | namespace unity |
974 | { |
975 | |
976 | @@ -53,6 +52,8 @@ |
977 | std::string registry_endpointdir() const; |
978 | std::string registry_endpoint() const; |
979 | Reaper::SPtr reply_reaper() const; |
980 | + ThreadPool::SPtr async_pool() const; |
981 | + ThreadSafeQueue<std::future<void>>::SPtr future_queue() const; |
982 | void run_scope(ScopeBase *const scope_base, std::string const &scope_ini_file); |
983 | |
984 | ObjectProxy string_to_proxy(std::string const& s) const; |
985 | @@ -62,8 +63,9 @@ |
986 | |
987 | private: |
988 | RuntimeImpl(std::string const& scope_id, std::string const& configfile); |
989 | + void waiter_thread(ThreadSafeQueue<std::future<void>>::SPtr const& queue) const noexcept; |
990 | |
991 | - std::atomic_bool destroyed_; |
992 | + bool destroyed_; |
993 | std::string scope_id_; |
994 | std::string configfile_; |
995 | MiddlewareFactory::UPtr middleware_factory_; |
996 | @@ -74,7 +76,10 @@ |
997 | mutable std::string registry_endpointdir_; |
998 | mutable std::string registry_endpoint_; |
999 | mutable Reaper::SPtr reply_reaper_; |
1000 | - mutable std::mutex mutex_; // For lazy initialization of reply_reaper_ |
1001 | + mutable ThreadPool::SPtr async_pool_; // Pool of invocation threads for async query creation |
1002 | + mutable ThreadSafeQueue<std::future<void>>::SPtr future_queue_; |
1003 | + mutable std::thread waiter_thread_; |
1004 | + mutable std::mutex mutex_; // For lazy initialization of reply_reaper_, async_pool_, and queue_ |
1005 | }; |
1006 | |
1007 | } // namespace internal |
1008 | |
1009 | === modified file 'include/unity/scopes/internal/ScopeImpl.h' |
1010 | --- include/unity/scopes/internal/ScopeImpl.h 2014-03-07 04:19:32 +0000 |
1011 | +++ include/unity/scopes/internal/ScopeImpl.h 2014-04-28 05:57:56 +0000 |
1012 | @@ -66,7 +66,7 @@ |
1013 | |
1014 | QueryCtrlProxy search(CannedQuery const& query, |
1015 | SearchMetadata const& metadata, |
1016 | - SearchListenerBase::SPtr const& reply); // Not remote operation, hence not override |
1017 | + SearchListenerBase::SPtr const& reply); // Not remote, hence not override |
1018 | |
1019 | virtual QueryCtrlProxy activate(Result const& result, |
1020 | ActionMetadata const& metadata, |
1021 | @@ -85,7 +85,7 @@ |
1022 | static ScopeProxy create(MWScopeProxy const& mw_proxy, RuntimeImpl* runtime, std::string const& scope_id); |
1023 | |
1024 | private: |
1025 | - MWScopeProxy fwd() const; |
1026 | + MWScopeProxy fwd(); |
1027 | |
1028 | RuntimeImpl* const runtime_; |
1029 | std::string scope_id_; |
1030 | |
1031 | === modified file 'include/unity/scopes/internal/SearchReplyImpl.h' |
1032 | --- include/unity/scopes/internal/SearchReplyImpl.h 2014-03-07 10:54:15 +0000 |
1033 | +++ include/unity/scopes/internal/SearchReplyImpl.h 2014-04-28 05:57:56 +0000 |
1034 | @@ -47,7 +47,7 @@ |
1035 | class SearchReplyImpl : public virtual unity::scopes::SearchReply, public virtual ReplyImpl |
1036 | { |
1037 | public: |
1038 | - SearchReplyImpl(MWReplyProxy const& mw_proxy, std::shared_ptr<QueryObjectBase>const & qo); |
1039 | + SearchReplyImpl(MWReplyProxy const& mw_proxy, std::shared_ptr<QueryObjectBase>const & qo, int cardinality); |
1040 | virtual ~SearchReplyImpl(); |
1041 | |
1042 | virtual void register_departments(DepartmentList const& departments, std::string current_department_id) override; |
1043 | |
1044 | === modified file 'include/unity/scopes/internal/TaskWrapper.h' |
1045 | --- include/unity/scopes/internal/TaskWrapper.h 2014-01-23 11:28:34 +0000 |
1046 | +++ include/unity/scopes/internal/TaskWrapper.h 2014-04-28 05:57:56 +0000 |
1047 | @@ -31,7 +31,7 @@ |
1048 | { |
1049 | |
1050 | // Simple wrapper for tasks. Allows us to use a std::packaged_task as the functor |
1051 | -// for a task in a thread pool, without having to know the task's return type in advance. |
1052 | +// for a task in a thread pool without having to know the task's return type in advance. |
1053 | |
1054 | class TaskWrapper final |
1055 | { |
1056 | @@ -54,7 +54,6 @@ |
1057 | |
1058 | void operator()() |
1059 | { |
1060 | - |
1061 | task_->call(); |
1062 | } |
1063 | |
1064 | |
1065 | === modified file 'include/unity/scopes/internal/ThreadPool.h' |
1066 | --- include/unity/scopes/internal/ThreadPool.h 2014-01-23 11:28:34 +0000 |
1067 | +++ include/unity/scopes/internal/ThreadPool.h 2014-04-28 05:57:56 +0000 |
1068 | @@ -22,6 +22,7 @@ |
1069 | #include <unity/scopes/internal/ThreadSafeQueue.h> |
1070 | #include <unity/scopes/internal/TaskWrapper.h> |
1071 | #include <unity/UnityExceptions.h> |
1072 | +#include <unity/util/DefinesPtrs.h> |
1073 | |
1074 | #include <future> |
1075 | |
1076 | @@ -42,6 +43,7 @@ |
1077 | { |
1078 | public: |
1079 | NONCOPYABLE(ThreadPool); |
1080 | + UNITY_DEFINES_PTRS(ThreadPool); |
1081 | |
1082 | ThreadPool(int size); |
1083 | ~ThreadPool(); |
1084 | |
1085 | === modified file 'include/unity/scopes/internal/ThreadSafeQueue.h' |
1086 | --- include/unity/scopes/internal/ThreadSafeQueue.h 2014-01-23 11:28:34 +0000 |
1087 | +++ include/unity/scopes/internal/ThreadSafeQueue.h 2014-04-28 05:57:56 +0000 |
1088 | @@ -19,9 +19,9 @@ |
1089 | #ifndef UNITY_SCOPES_INTERNAL_THREADSAFEQUEUE_H |
1090 | #define UNITY_SCOPES_INTERNAL_THREADSAFEQUEUE_H |
1091 | |
1092 | +#include <unity/util/DefinesPtrs.h> |
1093 | #include <unity/util/NonCopyable.h> |
1094 | |
1095 | -#include <atomic> |
1096 | #include <condition_variable> |
1097 | #include <mutex> |
1098 | #include <queue> |
1099 | @@ -45,30 +45,34 @@ |
1100 | { |
1101 | public: |
1102 | NONCOPYABLE(ThreadSafeQueue); |
1103 | + UNITY_DEFINES_PTRS(ThreadSafeQueue); |
1104 | + |
1105 | typedef T value_type; |
1106 | |
1107 | ThreadSafeQueue(); |
1108 | ~ThreadSafeQueue(); |
1109 | |
1110 | void destroy() noexcept; |
1111 | + void wait_for_destroy() noexcept; |
1112 | void push(T const& item); |
1113 | void push(T&& item); |
1114 | T wait_and_pop(); |
1115 | bool try_pop(T& item); |
1116 | bool empty() const noexcept; |
1117 | + void wait_until_empty() const noexcept; |
1118 | size_t size() const noexcept; |
1119 | |
1120 | private: |
1121 | std::queue<T> queue_; |
1122 | mutable std::mutex mutex_; |
1123 | - std::condition_variable cond_; |
1124 | - bool done_; |
1125 | - std::atomic<int> num_waiters_; |
1126 | + mutable std::condition_variable cond_; |
1127 | + bool destroyed_; |
1128 | + int num_waiters_; |
1129 | }; |
1130 | |
1131 | template<typename T> |
1132 | ThreadSafeQueue<T>::ThreadSafeQueue() : |
1133 | - done_(false), |
1134 | + destroyed_(false), |
1135 | num_waiters_(0) |
1136 | { |
1137 | } |
1138 | @@ -77,40 +81,50 @@ |
1139 | ThreadSafeQueue<T>::~ThreadSafeQueue() |
1140 | { |
1141 | destroy(); |
1142 | - |
1143 | - // Don't destroy the object while there are still threads in wait_and_pop(), otherwise |
1144 | - // a thread that wakes up in wait_and_pop() will try to re-lock the already-destroyed |
1145 | - // mutex. |
1146 | - while (num_waiters_.load() > 0) |
1147 | - std::this_thread::yield(); // LCOV_EXCL_LINE (impossible to reliably hit with a test) |
1148 | + wait_for_destroy(); |
1149 | } |
1150 | |
1151 | template<typename T> |
1152 | void ThreadSafeQueue<T>::destroy() noexcept |
1153 | { |
1154 | std::lock_guard<std::mutex> lock(mutex_); |
1155 | - if (done_) |
1156 | + if (destroyed_) |
1157 | { |
1158 | return; |
1159 | } |
1160 | - done_ = true; |
1161 | - cond_.notify_all(); // Wake up anyone asleep in wait_and_pop() |
1162 | + destroyed_ = true; |
1163 | + cond_.notify_all(); // Wake up anyone asleep in wait_and_pop() or wait_for_destroy() |
1164 | +} |
1165 | + |
1166 | +template<typename T> |
1167 | +void ThreadSafeQueue<T>::wait_for_destroy() noexcept |
1168 | +{ |
1169 | + std::unique_lock<std::mutex> lock(mutex_); |
1170 | + cond_.wait(lock, [this] { return destroyed_ && num_waiters_ == 0; }); |
1171 | } |
1172 | |
1173 | template<typename T> |
1174 | void ThreadSafeQueue<T>::push(T const& item) |
1175 | { |
1176 | std::lock_guard<std::mutex> lock(mutex_); |
1177 | + if (destroyed_) |
1178 | + { |
1179 | + throw std::runtime_error("ThreadSafeQueue: cannot push onto destroyed queue"); |
1180 | + } |
1181 | queue_.push(item); |
1182 | - cond_.notify_one(); |
1183 | + cond_.notify_all(); |
1184 | } |
1185 | |
1186 | template<typename T> |
1187 | void ThreadSafeQueue<T>::push(T&& item) |
1188 | { |
1189 | std::lock_guard<std::mutex> lock(mutex_); |
1190 | + if (destroyed_) |
1191 | + { |
1192 | + throw std::runtime_error("ThreadSafeQueue: cannot push onto destroyed queue"); |
1193 | + } |
1194 | queue_.emplace(std::move(item)); |
1195 | - cond_.notify_one(); |
1196 | + cond_.notify_all(); |
1197 | } |
1198 | |
1199 | template<typename T> |
1200 | @@ -118,16 +132,21 @@ |
1201 | { |
1202 | std::unique_lock<std::mutex> lock(mutex_); |
1203 | ++num_waiters_; |
1204 | - cond_.wait(lock, [this] { return done_ || queue_.size() != 0; }); |
1205 | - if (done_) |
1206 | + cond_.wait(lock, [this] { return destroyed_ || queue_.size() != 0; }); |
1207 | + if (destroyed_) |
1208 | { |
1209 | - lock.unlock(); |
1210 | - --num_waiters_; |
1211 | + if (--num_waiters_ == 0) |
1212 | + { |
1213 | + cond_.notify_all(); |
1214 | + } |
1215 | throw std::runtime_error("ThreadSafeQueue: queue destroyed while thread was blocked in wait_and_pop()"); |
1216 | } |
1217 | T item = std::move(queue_.front()); |
1218 | queue_.pop(); |
1219 | - --num_waiters_; |
1220 | + if (--num_waiters_ == 0 || queue_.empty()) |
1221 | + { |
1222 | + cond_.notify_all(); |
1223 | + } |
1224 | return item; |
1225 | } |
1226 | |
1227 | @@ -141,6 +160,10 @@ |
1228 | } |
1229 | item = std::move(queue_.front()); |
1230 | queue_.pop(); |
1231 | + if (queue_.empty()) |
1232 | + { |
1233 | + cond_.notify_all(); |
1234 | + } |
1235 | return true; |
1236 | } |
1237 | |
1238 | @@ -152,6 +175,13 @@ |
1239 | } |
1240 | |
1241 | template<typename T> |
1242 | +void ThreadSafeQueue<T>::wait_until_empty() const noexcept |
1243 | +{ |
1244 | + std::unique_lock<std::mutex> lock(mutex_); |
1245 | + cond_.wait(lock, [this] { return queue_.empty(); }); |
1246 | +} |
1247 | + |
1248 | +template<typename T> |
1249 | size_t ThreadSafeQueue<T>::size() const noexcept |
1250 | { |
1251 | std::lock_guard<std::mutex> lock(mutex_); |
1252 | |
1253 | === modified file 'include/unity/scopes/internal/zmq_middleware/ConnectionPool.h' |
1254 | --- include/unity/scopes/internal/zmq_middleware/ConnectionPool.h 2014-01-22 03:29:48 +0000 |
1255 | +++ include/unity/scopes/internal/zmq_middleware/ConnectionPool.h 2014-04-28 05:57:56 +0000 |
1256 | @@ -1,5 +1,5 @@ |
1257 | /* |
1258 | - * Copyright (C) 2013 Canonical Ltd |
1259 | + * Copyright (C) 2014 Canonical Ltd |
1260 | * |
1261 | * This program is free software: you can redistribute it and/or modify |
1262 | * it under the terms of the GNU Lesser General Public License version 3 as |
1263 | @@ -24,7 +24,6 @@ |
1264 | |
1265 | #include <zmqpp/socket.hpp> |
1266 | |
1267 | -#include <mutex> |
1268 | #include <string> |
1269 | #include <unordered_map> |
1270 | |
1271 | @@ -33,6 +32,8 @@ |
1272 | // different threads to crash. |
1273 | // So, we maintain a pool of invocation threads, with each thread keeping its own cache of sockets. |
1274 | // Sockets are indexed by adapter name and created lazily. |
1275 | +// |
1276 | +// WARNING: No locking anywhere here. The pool is intended for us as a thread_local static member only. |
1277 | |
1278 | namespace unity |
1279 | { |
1280 | @@ -57,17 +58,18 @@ |
1281 | void register_socket(std::string const& endpoint, zmqpp::socket socket, RequestMode m); |
1282 | |
1283 | private: |
1284 | - struct Connection |
1285 | + struct SocketData |
1286 | { |
1287 | zmqpp::socket socket; |
1288 | RequestMode mode; |
1289 | }; |
1290 | |
1291 | - typedef std::unordered_map<std::string, Connection> CPool; |
1292 | + typedef std::unordered_map<std::string, SocketData> CPool; |
1293 | + |
1294 | + CPool::value_type create_connection(std::string const& endpoint, RequestMode m, int64_t timeout); |
1295 | |
1296 | zmqpp::context& context_; |
1297 | CPool pool_; |
1298 | - std::mutex mutex_; |
1299 | }; |
1300 | |
1301 | } // namespace zmq_middleware |
1302 | |
1303 | === modified file 'include/unity/scopes/internal/zmq_middleware/ObjectAdapter.h' |
1304 | --- include/unity/scopes/internal/zmq_middleware/ObjectAdapter.h 2014-01-24 00:26:43 +0000 |
1305 | +++ include/unity/scopes/internal/zmq_middleware/ObjectAdapter.h 2014-04-28 05:57:56 +0000 |
1306 | @@ -46,6 +46,7 @@ |
1307 | { |
1308 | |
1309 | class ServantBase; |
1310 | +class StopPublisher; |
1311 | class ZmqMiddleware; |
1312 | |
1313 | class ObjectAdapter final |
1314 | @@ -89,12 +90,12 @@ |
1315 | // The Failed state is reachable from any of the other states and indicates |
1316 | // a fatal error condition. |
1317 | enum AdapterState { Inactive, Activating, Active, Deactivating, Destroyed, Failed }; |
1318 | - void throw_bad_state(AdapterState state) const; |
1319 | + void throw_bad_state(std::string const& label, AdapterState state) const; |
1320 | |
1321 | void run_workers(); |
1322 | - void init_ctrl_socket(); |
1323 | - zmqpp::socket subscribe_to_ctrl_socket(); |
1324 | - void stop_workers() noexcept; |
1325 | + // void init_ctrl_socket(); |
1326 | + // zmqpp::socket subscribe_to_ctrl_socket(); |
1327 | + // void stop_workers() noexcept; |
1328 | |
1329 | void safe_bind(zmqpp::socket& s, std::string const& endpoint); |
1330 | |
1331 | @@ -114,8 +115,9 @@ |
1332 | std::string endpoint_; |
1333 | RequestMode mode_; |
1334 | int pool_size_; |
1335 | - std::unique_ptr<zmqpp::socket> ctrl_; // PUB socket to signal when to deactivate |
1336 | - std::mutex ctrl_mutex_; // Synchronizes access to ctrl_ when sending |
1337 | + // std::unique_ptr<zmqpp::socket> ctrl_; // PUB socket to signal when to deactivate |
1338 | + // std::mutex ctrl_mutex_; // Synchronizes access to ctrl_ when sending |
1339 | + std::unique_ptr<StopPublisher> stopper_; // Used to signal threads when it's time to terminate |
1340 | std::thread broker_; // Connects router with dealer |
1341 | std::vector<std::thread> workers_; // Threads for incoming invocations |
1342 | std::atomic_int num_workers_; // For handshake with parent |
1343 | |
1344 | === added file 'include/unity/scopes/internal/zmq_middleware/StopPublisher.h' |
1345 | --- include/unity/scopes/internal/zmq_middleware/StopPublisher.h 1970-01-01 00:00:00 +0000 |
1346 | +++ include/unity/scopes/internal/zmq_middleware/StopPublisher.h 2014-04-28 05:57:56 +0000 |
1347 | @@ -0,0 +1,85 @@ |
1348 | +/* |
1349 | + * Copyright (C) 2014 Canonical Ltd |
1350 | + * |
1351 | + * This program is free software: you can redistribute it and/or modify |
1352 | + * it under the terms of the GNU Lesser General Public License version 3 as |
1353 | + * published by the Free Software Foundation. |
1354 | + * |
1355 | + * This program is distributed in the hope that it will be useful, |
1356 | + * but WITHOUT ANY WARRANTY; without even the implied warranty of |
1357 | + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
1358 | + * GNU Lesser General Public License for more details. |
1359 | + * |
1360 | + * You should have received a copy of the GNU Lesser General Public License |
1361 | + * along with this program. If not, see <http://www.gnu.org/licenses/>. |
1362 | + * |
1363 | + * Authored by: Michi Henning <michi.henning@canonical.com> |
1364 | + */ |
1365 | + |
1366 | +#ifndef UNITY_SCOPES_INTERNAL_ZMQMIDDLEWARE_STOPPUBLISHER_H |
1367 | +#define UNITY_SCOPES_INTERNAL_ZMQMIDDLEWARE_STOPPUBLISHER_H |
1368 | + |
1369 | +#include <unity/util/NonCopyable.h> |
1370 | + |
1371 | +#include <zmqpp/context.hpp> |
1372 | +#include <zmqpp/socket.hpp> |
1373 | + |
1374 | +#include <condition_variable> |
1375 | +#include <mutex> |
1376 | +#include <thread> |
1377 | + |
1378 | +namespace unity |
1379 | +{ |
1380 | + |
1381 | +namespace scopes |
1382 | +{ |
1383 | + |
1384 | +namespace internal |
1385 | +{ |
1386 | + |
1387 | +namespace zmq_middleware |
1388 | +{ |
1389 | + |
1390 | +// StopPublisher provides a way to use Zmq pub/sub without falling prey |
1391 | +// to the slow joiner syndrome. The provided name is used to create an inproc |
1392 | +// endpoint by prefixing the name with "inproc://". |
1393 | +// Subscribers create a socket by calling subscribe(). The returned |
1394 | +// socket becomes ready for reading (for a zero-length message) once |
1395 | +// some thread calls stop(). Multiple calls to stop() are benign. |
1396 | + |
1397 | +class StopPublisher final |
1398 | +{ |
1399 | +public: |
1400 | + NONCOPYABLE(StopPublisher); |
1401 | + |
1402 | + StopPublisher(zmqpp::context* context, std::string const& inproc_name); |
1403 | + ~StopPublisher(); |
1404 | + |
1405 | + std::string endpoint() const; |
1406 | + zmqpp::socket subscribe(); |
1407 | + void stop(); |
1408 | + |
1409 | +private: |
1410 | + zmqpp::context* context_; |
1411 | + std::string endpoint_; |
1412 | + std::thread thread_; |
1413 | + std::exception_ptr ex_; |
1414 | + |
1415 | + enum State { Starting, Started, Stopping, Stopped, Failed }; |
1416 | + State state_; |
1417 | + |
1418 | + mutable std::mutex m_; |
1419 | + std::condition_variable cond_; |
1420 | + |
1421 | + void stopper_thread() noexcept; |
1422 | +}; |
1423 | + |
1424 | +} // namespace zmq_middleware |
1425 | + |
1426 | +} // namespace internal |
1427 | + |
1428 | +} // namespace scopes |
1429 | + |
1430 | +} // namespace unity |
1431 | + |
1432 | +#endif |
1433 | |
1434 | === modified file 'include/unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h' |
1435 | --- include/unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h 2014-02-03 02:53:33 +0000 |
1436 | +++ include/unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h 2014-04-28 05:57:56 +0000 |
1437 | @@ -21,6 +21,7 @@ |
1438 | |
1439 | #include <unity/scopes/internal/MWObjectProxy.h> |
1440 | #include <scopes/internal/zmq_middleware/capnproto/Message.capnp.h> |
1441 | +#include <unity/scopes/internal/zmq_middleware/ConnectionPool.h> |
1442 | #include <unity/scopes/internal/zmq_middleware/RequestMode.h> |
1443 | #include <unity/scopes/internal/zmq_middleware/ZmqMiddleware.h> |
1444 | #include <unity/scopes/internal/zmq_middleware/ZmqObjectProxyFwd.h> |
1445 | @@ -71,8 +72,11 @@ |
1446 | |
1447 | protected: |
1448 | capnproto::Request::Builder make_request_(capnp::MessageBuilder& b, std::string const& operation_name) const; |
1449 | - ZmqReceiver invoke_(capnp::MessageBuilder& out_params); |
1450 | - ZmqReceiver invoke_(capnp::MessageBuilder& out_params, int64_t timeout); |
1451 | + |
1452 | + void invoke_oneway_(capnp::MessageBuilder& out_params); |
1453 | + |
1454 | + ZmqReceiver invoke_twoway_(capnp::MessageBuilder& out_params); |
1455 | + ZmqReceiver invoke_twoway_(capnp::MessageBuilder& out_params, int64_t timeout); |
1456 | |
1457 | private: |
1458 | std::string endpoint_; |
1459 | |
1460 | === modified file 'src/scopes/QueryBase.cpp' |
1461 | --- src/scopes/QueryBase.cpp 2014-03-10 04:17:09 +0000 |
1462 | +++ src/scopes/QueryBase.cpp 2014-04-28 05:57:56 +0000 |
1463 | @@ -50,6 +50,11 @@ |
1464 | p->set_metadata(metadata); |
1465 | } |
1466 | |
1467 | +bool QueryBase::valid() const |
1468 | +{ |
1469 | + return p->valid(); |
1470 | +} |
1471 | + |
1472 | } // namespace scopes |
1473 | |
1474 | } // namespace unity |
1475 | |
1476 | === modified file 'src/scopes/internal/ObjectImpl.cpp' |
1477 | --- src/scopes/internal/ObjectImpl.cpp 2014-04-03 12:57:25 +0000 |
1478 | +++ src/scopes/internal/ObjectImpl.cpp 2014-04-28 05:57:56 +0000 |
1479 | @@ -19,8 +19,12 @@ |
1480 | #include <unity/scopes/internal/ObjectImpl.h> |
1481 | |
1482 | #include <unity/scopes/internal/MWObjectProxy.h> |
1483 | +#include <unity/scopes/ScopeExceptions.h> |
1484 | + |
1485 | +#include <cassert> |
1486 | |
1487 | using namespace std; |
1488 | +using namespace unity::scopes; |
1489 | |
1490 | namespace unity |
1491 | { |
1492 | @@ -31,9 +35,10 @@ |
1493 | namespace internal |
1494 | { |
1495 | |
1496 | -ObjectImpl::ObjectImpl(MWProxy const& mw_proxy) : |
1497 | - mw_proxy_(mw_proxy) |
1498 | +ObjectImpl::ObjectImpl(MWProxy const& mw_proxy) |
1499 | { |
1500 | + lock_guard<mutex> lock(proxy_mutex_); |
1501 | + mw_proxy_ = mw_proxy; |
1502 | } |
1503 | |
1504 | ObjectImpl::~ObjectImpl() |
1505 | @@ -42,6 +47,7 @@ |
1506 | |
1507 | string ObjectImpl::identity() |
1508 | { |
1509 | + check_proxy(); |
1510 | return mw_proxy_->identity(); |
1511 | } |
1512 | |
1513 | @@ -52,29 +58,51 @@ |
1514 | |
1515 | string ObjectImpl::endpoint() |
1516 | { |
1517 | + check_proxy(); |
1518 | return mw_proxy_->endpoint(); |
1519 | } |
1520 | |
1521 | int64_t ObjectImpl::timeout() |
1522 | { |
1523 | + check_proxy(); |
1524 | return mw_proxy_->timeout(); |
1525 | } |
1526 | |
1527 | string ObjectImpl::to_string() |
1528 | { |
1529 | + check_proxy(); |
1530 | return mw_proxy_->to_string(); |
1531 | } |
1532 | |
1533 | void ObjectImpl::ping() |
1534 | { |
1535 | + check_proxy(); |
1536 | mw_proxy_->ping(); |
1537 | } |
1538 | |
1539 | -MWProxy ObjectImpl::proxy() const |
1540 | +MWProxy ObjectImpl::proxy() |
1541 | { |
1542 | + lock_guard<mutex> lock(proxy_mutex_); |
1543 | return mw_proxy_; |
1544 | } |
1545 | |
1546 | +void ObjectImpl::set_proxy(MWProxy const& p) |
1547 | +{ |
1548 | + assert(p); |
1549 | + lock_guard<mutex> lock(proxy_mutex_); |
1550 | + assert(!mw_proxy_); |
1551 | + mw_proxy_ = p; |
1552 | +} |
1553 | + |
1554 | +void ObjectImpl::check_proxy() |
1555 | +{ |
1556 | + lock_guard<mutex> lock(proxy_mutex_); |
1557 | + if (!mw_proxy_) |
1558 | + { |
1559 | + throw MiddlewareException("Cannot invoke on null proxy"); |
1560 | + } |
1561 | +} |
1562 | + |
1563 | } // namespace internal |
1564 | |
1565 | } // namespace scopes |
1566 | |
1567 | === modified file 'src/scopes/internal/QueryBaseImpl.cpp' |
1568 | --- src/scopes/internal/QueryBaseImpl.cpp 2014-03-10 04:17:09 +0000 |
1569 | +++ src/scopes/internal/QueryBaseImpl.cpp 2014-04-28 05:57:56 +0000 |
1570 | @@ -37,6 +37,7 @@ |
1571 | { |
1572 | |
1573 | QueryBaseImpl::QueryBaseImpl() |
1574 | + : valid_(true) |
1575 | { |
1576 | } |
1577 | |
1578 | @@ -54,6 +55,8 @@ |
1579 | // This allows cancel() to forward incoming cancellations to subqueries |
1580 | // without intervention from the scope application code. |
1581 | QueryCtrlProxy qcp = scope->search(query_string, *search_metadata_, reply); |
1582 | + |
1583 | + lock_guard<mutex> lock(mutex_); |
1584 | subqueries_.push_back(qcp); |
1585 | return qcp; |
1586 | } |
1587 | @@ -66,6 +69,8 @@ |
1588 | assert(search_metadata_); |
1589 | |
1590 | QueryCtrlProxy qcp = scope->search(query_string, filter_state, *search_metadata_, reply); |
1591 | + |
1592 | + lock_guard<mutex> lock(mutex_); |
1593 | subqueries_.push_back(qcp); |
1594 | return qcp; |
1595 | } |
1596 | @@ -79,6 +84,8 @@ |
1597 | assert(search_metadata_); |
1598 | |
1599 | QueryCtrlProxy qcp = scope->search(query_string, department_id, filter_state, *search_metadata_, reply); |
1600 | + |
1601 | + lock_guard<mutex> lock(mutex_); |
1602 | subqueries_.push_back(qcp); |
1603 | return qcp; |
1604 | } |
1605 | @@ -91,6 +98,8 @@ |
1606 | SearchListenerBase::SPtr const& reply) |
1607 | { |
1608 | QueryCtrlProxy qcp = scope->search(query_string, department_id, filter_state, metadata, reply); |
1609 | + |
1610 | + lock_guard<mutex> lock(mutex_); |
1611 | subqueries_.push_back(qcp); |
1612 | return qcp; |
1613 | } |
1614 | @@ -102,16 +111,29 @@ |
1615 | |
1616 | void QueryBaseImpl::cancel() |
1617 | { |
1618 | + lock_guard<mutex> lock(mutex_); |
1619 | + |
1620 | + if (!valid_) |
1621 | + { |
1622 | + return; |
1623 | + } |
1624 | + valid_ = false; |
1625 | for (auto& ctrl : subqueries_) |
1626 | { |
1627 | ctrl->cancel(); // Forward the cancellation to any subqueries that might be active |
1628 | } |
1629 | // We release the memory for the subquery controls here. That's just a micro-optimization |
1630 | - // because this QueryBase will be destroyed shortly anyway, once the cancelled() method |
1631 | - // of the application returns. (Not deallocating here would work too.) |
1632 | + // because this QueryBase will be destroyed shortly anyway, once the cancelled() (and possibly |
1633 | + // run()) methods of the application return. (Not deallocating here would work too.) |
1634 | vector<QueryCtrlProxy>().swap(subqueries_); |
1635 | } |
1636 | |
1637 | +bool QueryBaseImpl::valid() const |
1638 | +{ |
1639 | + lock_guard<mutex> lock(mutex_); |
1640 | + return valid_; |
1641 | +} |
1642 | + |
1643 | } // namespace internal |
1644 | |
1645 | } // namespace scopes |
1646 | |
1647 | === modified file 'src/scopes/internal/QueryCtrlImpl.cpp' |
1648 | --- src/scopes/internal/QueryCtrlImpl.cpp 2014-03-07 02:01:47 +0000 |
1649 | +++ src/scopes/internal/QueryCtrlImpl.cpp 2014-04-28 05:57:56 +0000 |
1650 | @@ -43,10 +43,13 @@ |
1651 | ObjectImpl(ctrl_proxy), |
1652 | reply_proxy_(reply_proxy) |
1653 | { |
1654 | - assert(ctrl_proxy); |
1655 | - assert(reply_proxy); |
1656 | // We remember the reply proxy so, when the query is cancelled, we can |
1657 | // inform the reply object belonging to this query that the query is finished. |
1658 | + assert(reply_proxy); |
1659 | + |
1660 | + lock_guard<mutex> lock(mutex_); |
1661 | + ready_ = ctrl_proxy != nullptr; |
1662 | + cancelled_ = false; |
1663 | } |
1664 | |
1665 | QueryCtrlImpl::~QueryCtrlImpl() |
1666 | @@ -55,13 +58,24 @@ |
1667 | |
1668 | void QueryCtrlImpl::cancel() |
1669 | { |
1670 | + { |
1671 | + lock_guard<mutex> lock(mutex_); |
1672 | + if (!ready_) |
1673 | + { |
1674 | + // Remember that query was cancelled, so we can call |
1675 | + // cancel() once set_proxy() is called. |
1676 | + cancelled_ = true; |
1677 | + return; |
1678 | + } |
1679 | + } |
1680 | + |
1681 | try |
1682 | { |
1683 | // Forward cancellation down-stream to the query. This does not block. |
1684 | fwd()->cancel(); |
1685 | |
1686 | // Indicate (to ourselves) that this query is complete. Calling via the MWReplyProxy ensures |
1687 | - // the finished() call will be processed by a seperate server-side thread, |
1688 | + // the finished() call will be processed by a separate server-side thread, |
1689 | // so we cannot block here. |
1690 | reply_proxy_->finished(ListenerBase::Cancelled, ""); |
1691 | } |
1692 | @@ -72,7 +86,28 @@ |
1693 | } |
1694 | } |
1695 | |
1696 | -MWQueryCtrlProxy QueryCtrlImpl::fwd() const |
1697 | +void QueryCtrlImpl::set_proxy(MWQueryCtrlProxy const& p) |
1698 | +{ |
1699 | + assert(proxy() == nullptr); |
1700 | + ObjectImpl::set_proxy(p); |
1701 | + |
1702 | + bool need_cancel; |
1703 | + |
1704 | + { |
1705 | + lock_guard<mutex> lock(mutex_); |
1706 | + ready_ = true; |
1707 | + need_cancel = cancelled_; |
1708 | + } // Unlock |
1709 | + |
1710 | + if (need_cancel) |
1711 | + { |
1712 | + // If cancel() was called earlier, do the actual cancellation now |
1713 | + // that we have the middleware proxy. |
1714 | + cancel(); |
1715 | + } |
1716 | +} |
1717 | + |
1718 | +MWQueryCtrlProxy QueryCtrlImpl::fwd() |
1719 | { |
1720 | return dynamic_pointer_cast<MWQueryCtrl>(proxy()); |
1721 | } |
1722 | |
1723 | === modified file 'src/scopes/internal/QueryObject.cpp' |
1724 | --- src/scopes/internal/QueryObject.cpp 2014-03-07 03:49:03 +0000 |
1725 | +++ src/scopes/internal/QueryObject.cpp 2014-04-28 05:57:56 +0000 |
1726 | @@ -77,6 +77,8 @@ |
1727 | |
1728 | void QueryObject::run(MWReplyProxy const& reply, InvokeInfo const& /* info */) noexcept |
1729 | { |
1730 | + unique_lock<mutex> lock(mutex_); |
1731 | + |
1732 | // It is possible for a run() to be dispatched by the middleware *after* the query |
1733 | // was cancelled. This can happen because run() and cancel() are dispatched by different |
1734 | // thread pools. If the scope implementation uses a synchronous run(), a later run() |
1735 | @@ -84,16 +86,18 @@ |
1736 | // completes, by which time the query for the later run() call may have been |
1737 | // cancelled already. |
1738 | // If the query was cancelled by the client before we even receive the |
1739 | - // run invocation, we never forward the run() to the implementation. |
1740 | + // run invocation, we never forward the run() call to the implementation. |
1741 | if (!pushable_) |
1742 | { |
1743 | + self_ = nullptr; |
1744 | + disconnect(); |
1745 | return; |
1746 | } |
1747 | |
1748 | // Create the reply proxy to pass to query_base_ and keep a weak_ptr, which we will need |
1749 | // if cancel() is called later. |
1750 | assert(self_); |
1751 | - auto reply_proxy = make_shared<SearchReplyImpl>(reply, self_); |
1752 | + auto reply_proxy = make_shared<SearchReplyImpl>(reply, self_, cardinality_); |
1753 | assert(reply_proxy); |
1754 | reply_proxy_ = reply_proxy; |
1755 | |
1756 | @@ -102,12 +106,15 @@ |
1757 | self_ = nullptr; |
1758 | disconnect(); |
1759 | |
1760 | - // Synchronous call into scope implementation. |
1761 | - // On return, replies for the query may still be outstanding. |
1762 | try |
1763 | { |
1764 | auto search_query = dynamic_pointer_cast<SearchQueryBase>(query_base_); |
1765 | assert(search_query); |
1766 | + |
1767 | + lock.unlock(); |
1768 | + |
1769 | + // Synchronous call into scope implementation. |
1770 | + // On return, replies for the query may still be outstanding. |
1771 | search_query->run(reply_proxy); |
1772 | } |
1773 | catch (std::exception const& e) |
1774 | @@ -128,15 +135,24 @@ |
1775 | |
1776 | void QueryObject::cancel(InvokeInfo const& /* info */) |
1777 | { |
1778 | - pushable_ = false; |
1779 | - auto rp = reply_proxy_.lock(); |
1780 | - if (rp) |
1781 | { |
1782 | - // Send finished() to up-stream client to tell him the query is done. |
1783 | - // We send via the MWReplyProxy here because that allows passing |
1784 | - // a ListenerBase::Reason (whereas the public ReplyProxy does not). |
1785 | - reply_->finished(ListenerBase::Cancelled, ""); // Oneway, can't block |
1786 | - } |
1787 | + lock_guard<mutex> lock(mutex_); |
1788 | + |
1789 | + if (!pushable_) |
1790 | + { |
1791 | + return; |
1792 | + } |
1793 | + pushable_ = false; |
1794 | + |
1795 | + auto rp = reply_proxy_.lock(); |
1796 | + if (rp) |
1797 | + { |
1798 | + // Send finished() to up-stream client to tell him the query is done. |
1799 | + // We send via the MWReplyProxy here because that allows passing |
1800 | + // a ListenerBase::Reason (whereas the public ReplyProxy does not). |
1801 | + reply_->finished(ListenerBase::Cancelled, ""); // Oneway, can't block |
1802 | + } |
1803 | + } // Release lock |
1804 | |
1805 | // Forward the cancellation to the query base (which in turn will forward it to any subqueries). |
1806 | // The query base also calls the cancelled() callback to inform the application code. |
1807 | @@ -145,11 +161,13 @@ |
1808 | |
1809 | bool QueryObject::pushable(InvokeInfo const& /* info */) const noexcept |
1810 | { |
1811 | + lock_guard<mutex> lock(mutex_); |
1812 | return pushable_; |
1813 | } |
1814 | |
1815 | int QueryObject::cardinality(InvokeInfo const& /* info */) const noexcept |
1816 | { |
1817 | + lock_guard<mutex> lock(mutex_); |
1818 | return cardinality_; |
1819 | } |
1820 | |
1821 | @@ -169,6 +187,8 @@ |
1822 | |
1823 | void QueryObject::set_self(QueryObjectBase::SPtr const& self) noexcept |
1824 | { |
1825 | + lock_guard<mutex> lock(mutex_); |
1826 | + |
1827 | assert(self); |
1828 | assert(!self_); |
1829 | self_ = self; |
1830 | |
1831 | === modified file 'src/scopes/internal/RegistryImpl.cpp' |
1832 | --- src/scopes/internal/RegistryImpl.cpp 2014-04-03 12:57:25 +0000 |
1833 | +++ src/scopes/internal/RegistryImpl.cpp 2014-04-28 05:57:56 +0000 |
1834 | @@ -71,7 +71,7 @@ |
1835 | return matching_entries; |
1836 | } |
1837 | |
1838 | -MWRegistryProxy RegistryImpl::fwd() const |
1839 | +MWRegistryProxy RegistryImpl::fwd() |
1840 | { |
1841 | return dynamic_pointer_cast<MWRegistry>(proxy()); |
1842 | } |
1843 | |
1844 | === modified file 'src/scopes/internal/ReplyImpl.cpp' |
1845 | --- src/scopes/internal/ReplyImpl.cpp 2014-03-07 05:56:22 +0000 |
1846 | +++ src/scopes/internal/ReplyImpl.cpp 2014-04-28 05:57:56 +0000 |
1847 | @@ -143,7 +143,7 @@ |
1848 | } |
1849 | } |
1850 | |
1851 | -MWReplyProxy ReplyImpl::fwd() const |
1852 | +MWReplyProxy ReplyImpl::fwd() |
1853 | { |
1854 | return dynamic_pointer_cast<MWReply>(proxy()); |
1855 | } |
1856 | |
1857 | === modified file 'src/scopes/internal/ReplyObject.cpp' |
1858 | --- src/scopes/internal/ReplyObject.cpp 2014-03-10 10:16:13 +0000 |
1859 | +++ src/scopes/internal/ReplyObject.cpp 2014-04-28 05:57:56 +0000 |
1860 | @@ -50,9 +50,8 @@ |
1861 | assert(receiver_base); |
1862 | assert(runtime); |
1863 | reap_item_ = runtime->reply_reaper()->add([this] { |
1864 | - cerr << "No activity on ReplyObject for scope " << this->origin_proxy_ |
1865 | - << ": ReplyObject destroyed" << endl; |
1866 | - this->disconnect(); |
1867 | + string msg = "No activity on ReplyObject for scope " + this->origin_proxy_ + ": ReplyObject destroyed"; |
1868 | + this->finished(ListenerBase::Error, msg); |
1869 | }); |
1870 | } |
1871 | |
1872 | |
1873 | === modified file 'src/scopes/internal/RuntimeImpl.cpp' |
1874 | --- src/scopes/internal/RuntimeImpl.cpp 2014-04-11 08:22:17 +0000 |
1875 | +++ src/scopes/internal/RuntimeImpl.cpp 2014-04-28 05:57:56 +0000 |
1876 | @@ -51,11 +51,14 @@ |
1877 | namespace internal |
1878 | { |
1879 | |
1880 | -RuntimeImpl::RuntimeImpl(string const& scope_id, string const& configfile) : |
1881 | - destroyed_(false), |
1882 | - scope_id_(scope_id) |
1883 | +RuntimeImpl::RuntimeImpl(string const& scope_id, string const& configfile) |
1884 | { |
1885 | - if (scope_id.empty()) |
1886 | + lock_guard<mutex> lock(mutex_); |
1887 | + |
1888 | + destroyed_ = false; |
1889 | + |
1890 | + scope_id_ = scope_id; |
1891 | + if (scope_id_.empty()) |
1892 | { |
1893 | UniqueID id; |
1894 | scope_id_ = "c-" + id.gen(); |
1895 | @@ -77,6 +80,10 @@ |
1896 | middleware_ = middleware_factory_->create(scope_id_, default_middleware, middleware_configfile); |
1897 | middleware_->start(); |
1898 | |
1899 | + async_pool_ = make_shared<ThreadPool>(1); // TODO: configurable pool size |
1900 | + future_queue_ = make_shared<ThreadSafeQueue<future<void>>>(); |
1901 | + waiter_thread_ = std::thread([this]{ waiter_thread(future_queue_); }); |
1902 | + |
1903 | if (registry_configfile_.empty() || registry_identity_.empty()) |
1904 | { |
1905 | cerr << "Warning: no registry configured" << endl; |
1906 | @@ -124,21 +131,51 @@ |
1907 | |
1908 | void RuntimeImpl::destroy() |
1909 | { |
1910 | - if (!destroyed_.exchange(true)) |
1911 | - { |
1912 | - // TODO: not good enough. Need to wait for the middleware to stop and for the reaper |
1913 | - // to terminate. Otherwise, it's possible that we exit while threads are still running |
1914 | - // with undefined behavior. |
1915 | - registry_ = nullptr; |
1916 | - middleware_->stop(); |
1917 | - middleware_ = nullptr; |
1918 | - middleware_factory_.reset(nullptr); |
1919 | + lock_guard<mutex> lock(mutex_); |
1920 | + |
1921 | + if (destroyed_) |
1922 | + { |
1923 | + return; |
1924 | + } |
1925 | + destroyed_ = true; |
1926 | + |
1927 | + // Stop the reaper. Any ReplyObject instances that may still |
1928 | + // be hanging around will be cleaned up when the server side |
1929 | + // is shut down. |
1930 | + if (reply_reaper_) |
1931 | + { |
1932 | + reply_reaper_->destroy(); |
1933 | reply_reaper_ = nullptr; |
1934 | } |
1935 | + |
1936 | + // No more outgoing invocations. |
1937 | + async_pool_ = nullptr; |
1938 | + |
1939 | + // Wait for any twoway operations that were invoked asynchronously to complete. |
1940 | + if (future_queue_) |
1941 | + { |
1942 | + future_queue_->wait_until_empty(); |
1943 | + future_queue_->destroy(); |
1944 | + future_queue_->wait_for_destroy(); |
1945 | + } |
1946 | + |
1947 | + if (waiter_thread_.joinable()) |
1948 | + { |
1949 | + waiter_thread_.join(); |
1950 | + } |
1951 | + |
1952 | + // Shut down server-side. |
1953 | + middleware_->stop(); |
1954 | + middleware_->wait_for_shutdown(); |
1955 | + middleware_ = nullptr; |
1956 | + middleware_factory_.reset(nullptr); |
1957 | + |
1958 | + registry_ = nullptr; |
1959 | } |
1960 | |
1961 | string RuntimeImpl::scope_id() const |
1962 | { |
1963 | + lock_guard<mutex> lock(mutex_); |
1964 | return scope_id_; |
1965 | } |
1966 | |
1967 | @@ -149,7 +186,9 @@ |
1968 | |
1969 | MiddlewareFactory const* RuntimeImpl::factory() const |
1970 | { |
1971 | - if (destroyed_.load()) |
1972 | + lock_guard<mutex> lock(mutex_); |
1973 | + |
1974 | + if (destroyed_) |
1975 | { |
1976 | throw LogicException("factory(): Cannot obtain factory for already destroyed run time"); |
1977 | } |
1978 | @@ -158,7 +197,9 @@ |
1979 | |
1980 | RegistryProxy RuntimeImpl::registry() const |
1981 | { |
1982 | - if (destroyed_.load()) |
1983 | + lock_guard<mutex> lock(mutex_); |
1984 | + |
1985 | + if (destroyed_) |
1986 | { |
1987 | throw LogicException("registry(): Cannot obtain registry for already destroyed run time"); |
1988 | } |
1989 | @@ -171,21 +212,25 @@ |
1990 | |
1991 | string RuntimeImpl::registry_configfile() const |
1992 | { |
1993 | + lock_guard<mutex> lock(mutex_); |
1994 | return registry_configfile_; |
1995 | } |
1996 | |
1997 | string RuntimeImpl::registry_identity() const |
1998 | { |
1999 | + lock_guard<mutex> lock(mutex_); |
2000 | return registry_identity_; |
2001 | } |
2002 | |
2003 | string RuntimeImpl::registry_endpointdir() const |
2004 | { |
2005 | + lock_guard<mutex> lock(mutex_); |
2006 | return registry_endpointdir_; |
2007 | } |
2008 | |
2009 | string RuntimeImpl::registry_endpoint() const |
2010 | { |
2011 | + lock_guard<mutex> lock(mutex_); |
2012 | return registry_endpoint_; |
2013 | } |
2014 | |
2015 | @@ -200,6 +245,35 @@ |
2016 | return reply_reaper_; |
2017 | } |
2018 | |
2019 | +void RuntimeImpl::waiter_thread(ThreadSafeQueue<std::future<void>>::SPtr const& queue) const noexcept |
2020 | +{ |
2021 | + for (;;) |
2022 | + { |
2023 | + try |
2024 | + { |
2025 | + // Wait on the future from an async invocation. |
2026 | + // wait_and_pop() throws runtime_error when queue is destroyed |
2027 | + queue->wait_and_pop().get(); |
2028 | + } |
2029 | + catch (std::runtime_error const&) |
2030 | + { |
2031 | + break; |
2032 | + } |
2033 | + } |
2034 | +} |
2035 | + |
2036 | +ThreadPool::SPtr RuntimeImpl::async_pool() const |
2037 | +{ |
2038 | + lock_guard<mutex> lock(mutex_); |
2039 | + return async_pool_; |
2040 | +} |
2041 | + |
2042 | +ThreadSafeQueue<future<void>>::SPtr RuntimeImpl::future_queue() const |
2043 | +{ |
2044 | + lock_guard<mutex> lock(mutex_); |
2045 | + return future_queue_; |
2046 | +} |
2047 | + |
2048 | void RuntimeImpl::run_scope(ScopeBase *const scope_base, std::string const& scope_ini_file) |
2049 | { |
2050 | // Retrieve the registry middleware and create a proxy to its state receiver |
2051 | |
2052 | === modified file 'src/scopes/internal/ScopeImpl.cpp' |
2053 | --- src/scopes/internal/ScopeImpl.cpp 2014-03-07 04:19:32 +0000 |
2054 | +++ src/scopes/internal/ScopeImpl.cpp 2014-04-28 05:57:56 +0000 |
2055 | @@ -21,6 +21,7 @@ |
2056 | #include <unity/scopes/ActionMetadata.h> |
2057 | #include <unity/scopes/internal/ActivationReplyObject.h> |
2058 | #include <unity/scopes/internal/MiddlewareBase.h> |
2059 | +#include <unity/scopes/internal/MWQueryCtrl.h> |
2060 | #include <unity/scopes/internal/MWScope.h> |
2061 | #include <unity/scopes/internal/PreviewReplyObject.h> |
2062 | #include <unity/scopes/internal/QueryCtrlImpl.h> |
2063 | @@ -30,7 +31,6 @@ |
2064 | #include <unity/UnityExceptions.h> |
2065 | |
2066 | #include <cassert> |
2067 | -#include <iostream> // TODO: remove this once logging is added |
2068 | |
2069 | using namespace std; |
2070 | |
2071 | @@ -95,30 +95,48 @@ |
2072 | throw unity::InvalidArgumentException("Scope::search(): invalid SearchListenerBase (nullptr)"); |
2073 | } |
2074 | |
2075 | - QueryCtrlProxy ctrl; |
2076 | ReplyObject::SPtr ro(make_shared<ResultReplyObject>(reply, runtime_, to_string(), metadata.cardinality())); |
2077 | - try |
2078 | - { |
2079 | - MWReplyProxy rp = fwd()->mw_base()->add_reply_object(ro); |
2080 | - |
2081 | - // Forward the search() method across the bus. This is a |
2082 | - // synchronous twoway interaction with the scope, so it can return |
2083 | - // the QueryCtrlProxy. This may block for some time, for example, if |
2084 | - // the scope is not running and needs to be activated by the registry first. |
2085 | - ctrl = fwd()->search(query, metadata.serialize(), rp); |
2086 | - assert(ctrl); |
2087 | - } |
2088 | - catch (std::exception const& e) |
2089 | + MWReplyProxy rp = fwd()->mw_base()->add_reply_object(ro); |
2090 | + |
2091 | + // "Fake" QueryCtrlProxy that doesn't have a real MWQueryCtrlProxy yet. |
2092 | + shared_ptr<QueryCtrlImpl> ctrl = make_shared<QueryCtrlImpl>(nullptr, rp); |
2093 | + |
2094 | + // We pass a shared pointer to the lambda (instead of the this pointer) |
2095 | + // to keep ourselves alive until after the lambda fires. |
2096 | + // Otherwise, if the ScopeProxy for this invocation goes out of scope before the invocation is |
2097 | + // sent in the new thread, the lambda will call into this by-now-destroyed instance. |
2098 | + auto impl = dynamic_pointer_cast<ScopeImpl>(shared_from_this()); |
2099 | + |
2100 | + auto send_search = [impl, query, metadata, rp, ro, ctrl]() -> void |
2101 | { |
2102 | try |
2103 | { |
2104 | - ro->finished(ListenerBase::Error, e.what()); |
2105 | + // Forward the (synchronous) search() method across the bus. |
2106 | + auto real_ctrl = dynamic_pointer_cast<QueryCtrlImpl>(impl->fwd()->search(query, metadata.serialize(), rp)); |
2107 | + assert(real_ctrl); |
2108 | + |
2109 | + // Call has completed now, so we update the MWQueryCtrlProxy for the fake proxy |
2110 | + // with the real proxy that was returned. |
2111 | + auto new_proxy = dynamic_pointer_cast<MWQueryCtrl>(real_ctrl->proxy()); |
2112 | + assert(new_proxy); |
2113 | + ctrl->set_proxy(new_proxy); |
2114 | } |
2115 | - catch (...) |
2116 | + catch (std::exception const& e) |
2117 | { |
2118 | + try |
2119 | + { |
2120 | + ro->finished(ListenerBase::Error, e.what()); |
2121 | + } |
2122 | + catch (...) |
2123 | + { |
2124 | + } |
2125 | } |
2126 | - throw; |
2127 | - } |
2128 | + }; |
2129 | + |
2130 | + // Send the blocking twoway request asynchronously via the async invocation pool. The waiter thread |
2131 | + // waits on the future, so it gets cleaned up. |
2132 | + auto future = runtime_->async_pool()->submit(send_search); |
2133 | + runtime_->future_queue()->push(move(future)); |
2134 | return ctrl; |
2135 | } |
2136 | |
2137 | @@ -131,31 +149,40 @@ |
2138 | throw unity::InvalidArgumentException("Scope::activate(): invalid ActivationListenerBase (nullptr)"); |
2139 | } |
2140 | |
2141 | - QueryCtrlProxy ctrl; |
2142 | ActivationReplyObject::SPtr ro(make_shared<ActivationReplyObject>(reply, runtime_, to_string())); |
2143 | - try |
2144 | - { |
2145 | - MWReplyProxy rp = fwd()->mw_base()->add_reply_object(ro); |
2146 | - |
2147 | - // Forward the activate() method across the bus. |
2148 | - ctrl = fwd()->activate(result.p->activation_target(), metadata.serialize(), rp); |
2149 | - assert(ctrl); |
2150 | - } |
2151 | - catch (std::exception const& e) |
2152 | - { |
2153 | - // TODO: log error |
2154 | - cerr << "activate(): " << e.what() << endl; |
2155 | + MWReplyProxy rp = fwd()->mw_base()->add_reply_object(ro); |
2156 | + |
2157 | + shared_ptr<QueryCtrlImpl> ctrl = make_shared<QueryCtrlImpl>(nullptr, rp); |
2158 | + |
2159 | + auto impl = dynamic_pointer_cast<ScopeImpl>(shared_from_this()); |
2160 | + |
2161 | + auto send_activate = [impl, result, metadata, rp, ro, ctrl]() -> void |
2162 | + { |
2163 | try |
2164 | { |
2165 | - ro->finished(ListenerBase::Error, e.what()); |
2166 | - throw; |
2167 | + auto real_ctrl = dynamic_pointer_cast<QueryCtrlImpl>(impl->fwd()->activate(result.p->activation_target(), |
2168 | + metadata.serialize(), |
2169 | + rp)); |
2170 | + assert(real_ctrl); |
2171 | + |
2172 | + auto new_proxy = dynamic_pointer_cast<MWQueryCtrl>(real_ctrl->proxy()); |
2173 | + assert(new_proxy); |
2174 | + ctrl->set_proxy(new_proxy); |
2175 | } |
2176 | - catch (...) |
2177 | + catch (std::exception const& e) |
2178 | { |
2179 | - cerr << "activate(): unknown exception" << endl; |
2180 | + try |
2181 | + { |
2182 | + ro->finished(ListenerBase::Error, e.what()); |
2183 | + } |
2184 | + catch (...) |
2185 | + { |
2186 | + } |
2187 | } |
2188 | - throw; |
2189 | - } |
2190 | + }; |
2191 | + |
2192 | + auto future = runtime_->async_pool()->submit(send_activate); |
2193 | + runtime_->future_queue()->push(move(future)); |
2194 | return ctrl; |
2195 | } |
2196 | |
2197 | @@ -170,35 +197,43 @@ |
2198 | throw unity::InvalidArgumentException("Scope::perform_action(): invalid ActivationListenerBase (nullptr)"); |
2199 | } |
2200 | |
2201 | - QueryCtrlProxy ctrl; |
2202 | - try |
2203 | - { |
2204 | - // Create a middleware server-side object that can receive incoming |
2205 | - // push() and finished() messages over the network. |
2206 | - ActivationReplyObject::SPtr ro(make_shared<ActivationReplyObject>(reply, runtime_, to_string())); |
2207 | - MWReplyProxy rp = fwd()->mw_base()->add_reply_object(ro); |
2208 | - |
2209 | - // Forward the activate() method across the bus. |
2210 | - ctrl = fwd()->perform_action(result.p->activation_target(), metadata.serialize(), widget_id, action_id, rp); |
2211 | - assert(ctrl); |
2212 | - } |
2213 | - catch (std::exception const& e) |
2214 | - { |
2215 | - // TODO: log error |
2216 | - cerr << "perform_action(): " << e.what() << endl; |
2217 | + ActivationReplyObject::SPtr ro(make_shared<ActivationReplyObject>(reply, runtime_, to_string())); |
2218 | + MWReplyProxy rp = fwd()->mw_base()->add_reply_object(ro); |
2219 | + |
2220 | + shared_ptr<QueryCtrlImpl> ctrl = make_shared<QueryCtrlImpl>(nullptr, rp); |
2221 | + |
2222 | + auto impl = dynamic_pointer_cast<ScopeImpl>(shared_from_this()); |
2223 | + |
2224 | + auto send_perform_action = [impl, result, metadata, widget_id, action_id, rp, ro, ctrl]() -> void |
2225 | + { |
2226 | try |
2227 | { |
2228 | - // TODO: if things go wrong, we need to make sure that the reply object |
2229 | - // is disconnected from the middleware, so it gets deallocated. |
2230 | - reply->finished(ListenerBase::Error, e.what()); |
2231 | - throw; |
2232 | + auto real_ctrl = dynamic_pointer_cast<QueryCtrlImpl>(impl->fwd()->perform_action( |
2233 | + result.p->activation_target(), |
2234 | + metadata.serialize(), |
2235 | + widget_id, |
2236 | + action_id, |
2237 | + rp)); |
2238 | + assert(real_ctrl); |
2239 | + |
2240 | + auto new_proxy = dynamic_pointer_cast<MWQueryCtrl>(real_ctrl->proxy()); |
2241 | + assert(new_proxy); |
2242 | + ctrl->set_proxy(new_proxy); |
2243 | } |
2244 | - catch (...) |
2245 | + catch (std::exception const& e) |
2246 | { |
2247 | - cerr << "perform_action(): unknown exception" << endl; |
2248 | + try |
2249 | + { |
2250 | + ro->finished(ListenerBase::Error, e.what()); |
2251 | + } |
2252 | + catch (...) |
2253 | + { |
2254 | + } |
2255 | } |
2256 | - throw; |
2257 | - } |
2258 | + }; |
2259 | + |
2260 | + auto future = runtime_->async_pool()->submit(send_perform_action); |
2261 | + runtime_->future_queue()->push(move(future)); |
2262 | return ctrl; |
2263 | } |
2264 | |
2265 | @@ -211,38 +246,40 @@ |
2266 | throw unity::InvalidArgumentException("Scope::preview(): invalid PreviewListenerBase (nullptr)"); |
2267 | } |
2268 | |
2269 | - QueryCtrlProxy ctrl; |
2270 | PreviewReplyObject::SPtr ro(make_shared<PreviewReplyObject>(reply, runtime_, to_string())); |
2271 | - try |
2272 | - { |
2273 | - // Create a middleware server-side object that can receive incoming |
2274 | - // push() and finished() messages over the network. |
2275 | - MWReplyProxy rp = fwd()->mw_base()->add_reply_object(ro); |
2276 | - |
2277 | - // Forward the search() method across the bus. This is a |
2278 | - // synchronous twoway interaction with the scope, so it can return |
2279 | - // the QueryCtrlProxy. Because the Scope implementation has a separate |
2280 | - // thread for search() calls, this is guaranteed not to block for |
2281 | - // any length of time. (No application code other than the QueryBase constructor |
2282 | - // is called by search() on the server side.) |
2283 | - ctrl = fwd()->preview(result.p->activation_target(), hints.serialize(), rp); |
2284 | - assert(ctrl); |
2285 | - } |
2286 | - catch (std::exception const& e) |
2287 | - { |
2288 | - // TODO: log error |
2289 | - cerr << "preview(): " << e.what() << endl; |
2290 | + MWReplyProxy rp = fwd()->mw_base()->add_reply_object(ro); |
2291 | + |
2292 | + shared_ptr<QueryCtrlImpl> ctrl = make_shared<QueryCtrlImpl>(nullptr, rp); |
2293 | + |
2294 | + auto impl = dynamic_pointer_cast<ScopeImpl>(shared_from_this()); |
2295 | + |
2296 | + auto send_preview = [impl, result, hints, rp, ro, ctrl]() -> void |
2297 | + { |
2298 | try |
2299 | { |
2300 | - ro->finished(ListenerBase::Error, e.what()); |
2301 | - throw; |
2302 | + auto real_ctrl = dynamic_pointer_cast<QueryCtrlImpl>(impl->fwd()->preview(result.p->activation_target(), |
2303 | + hints.serialize(), |
2304 | + rp)); |
2305 | + assert(real_ctrl); |
2306 | + |
2307 | + auto new_proxy = dynamic_pointer_cast<MWQueryCtrl>(real_ctrl->proxy()); |
2308 | + assert(new_proxy); |
2309 | + ctrl->set_proxy(new_proxy); |
2310 | } |
2311 | - catch (...) |
2312 | + catch (std::exception const& e) |
2313 | { |
2314 | - cerr << "preview(): unknown exception" << endl; |
2315 | + try |
2316 | + { |
2317 | + ro->finished(ListenerBase::Error, e.what()); |
2318 | + } |
2319 | + catch (...) |
2320 | + { |
2321 | + } |
2322 | } |
2323 | - throw; |
2324 | - } |
2325 | + }; |
2326 | + |
2327 | + auto future = runtime_->async_pool()->submit(send_preview); |
2328 | + runtime_->future_queue()->push(move(future)); |
2329 | return ctrl; |
2330 | } |
2331 | |
2332 | @@ -251,7 +288,7 @@ |
2333 | return make_shared<ScopeImpl>(mw_proxy, runtime, scope_id); |
2334 | } |
2335 | |
2336 | -MWScopeProxy ScopeImpl::fwd() const |
2337 | +MWScopeProxy ScopeImpl::fwd() |
2338 | { |
2339 | return dynamic_pointer_cast<MWScope>(proxy()); |
2340 | } |
2341 | |
2342 | === modified file 'src/scopes/internal/ScopeObject.cpp' |
2343 | --- src/scopes/internal/ScopeObject.cpp 2014-03-10 04:17:09 +0000 |
2344 | +++ src/scopes/internal/ScopeObject.cpp 2014-04-28 05:57:56 +0000 |
2345 | @@ -68,7 +68,6 @@ |
2346 | // to be safe, we don't assert, in case someone is running a broken client. |
2347 | |
2348 | // TODO: log error about incoming request containing an invalid reply proxy. |
2349 | - |
2350 | throw LogicException("Scope \"" + runtime_->scope_id() + "\": query() called with null reply proxy"); |
2351 | } |
2352 | |
2353 | |
2354 | === modified file 'src/scopes/internal/SearchReplyImpl.cpp' |
2355 | --- src/scopes/internal/SearchReplyImpl.cpp 2014-03-10 10:34:43 +0000 |
2356 | +++ src/scopes/internal/SearchReplyImpl.cpp 2014-04-28 05:57:56 +0000 |
2357 | @@ -38,11 +38,13 @@ |
2358 | namespace internal |
2359 | { |
2360 | |
2361 | -SearchReplyImpl::SearchReplyImpl(MWReplyProxy const& mw_proxy, std::shared_ptr<QueryObjectBase> const& qo) : |
2362 | +SearchReplyImpl::SearchReplyImpl(MWReplyProxy const& mw_proxy, |
2363 | + std::shared_ptr<QueryObjectBase> const& qo, |
2364 | + int cardinality) : |
2365 | ObjectImpl(mw_proxy), |
2366 | ReplyImpl(mw_proxy, qo), |
2367 | cat_registry_(new CategoryRegistry()), |
2368 | - cardinality_(qo->cardinality({ fwd()->identity(), fwd()->mw_base() })), |
2369 | + cardinality_(cardinality), |
2370 | num_pushes_(0) |
2371 | { |
2372 | } |
2373 | |
2374 | === modified file 'src/scopes/internal/ThreadPool.cpp' |
2375 | --- src/scopes/internal/ThreadPool.cpp 2014-01-23 11:28:34 +0000 |
2376 | +++ src/scopes/internal/ThreadPool.cpp 2014-04-28 05:57:56 +0000 |
2377 | @@ -54,7 +54,6 @@ |
2378 | threads_.push_back(std::thread(&ThreadPool::run, this)); |
2379 | } |
2380 | auto future = threads_ready_.get_future(); |
2381 | - future.wait(); |
2382 | future.get(); |
2383 | } |
2384 | catch (std::exception const&) // LCOV_EXCL_LINE |
2385 | @@ -66,11 +65,18 @@ |
2386 | ThreadPool::~ThreadPool() |
2387 | { |
2388 | queue_->destroy(); |
2389 | + |
2390 | + vector<thread> threads; |
2391 | + { |
2392 | + lock_guard<mutex> lock(mutex_); |
2393 | + threads.swap(threads_); |
2394 | + } |
2395 | + |
2396 | try |
2397 | { |
2398 | - for (size_t i = 0; i < threads_.size(); ++i) |
2399 | + for (size_t i = 0; i < threads.size(); ++i) |
2400 | { |
2401 | - threads_[i].join(); |
2402 | + threads[i].join(); |
2403 | } |
2404 | } |
2405 | catch (...) // LCOV_EXCL_LINE |
2406 | @@ -81,18 +87,19 @@ |
2407 | |
2408 | void ThreadPool::run() |
2409 | { |
2410 | - TaskQueue::value_type task; |
2411 | + { |
2412 | + lock_guard<mutex> lock(mutex_); |
2413 | + if (--num_threads_ == 0) |
2414 | + { |
2415 | + threads_ready_.set_value(); |
2416 | + } |
2417 | + } |
2418 | + |
2419 | for (;;) |
2420 | { |
2421 | + TaskQueue::value_type task; // Task must go out of scope in each iteration, in case it stores shared_ptrs. |
2422 | try |
2423 | { |
2424 | - { |
2425 | - lock_guard<mutex> lock(mutex_); |
2426 | - if (--num_threads_ == 0) |
2427 | - { |
2428 | - threads_ready_.set_value(); |
2429 | - } |
2430 | - } |
2431 | task = queue_->wait_and_pop(); |
2432 | } |
2433 | catch (runtime_error const&) |
2434 | |
2435 | === modified file 'src/scopes/internal/smartscopes/SSQueryObject.cpp' |
2436 | --- src/scopes/internal/smartscopes/SSQueryObject.cpp 2014-03-07 03:49:03 +0000 |
2437 | +++ src/scopes/internal/smartscopes/SSQueryObject.cpp 2014-04-28 05:57:56 +0000 |
2438 | @@ -206,7 +206,7 @@ |
2439 | |
2440 | // Create the reply proxy and keep a weak_ptr, which we will need |
2441 | // if cancel() is called later. |
2442 | - q_reply_proxy = make_shared<SearchReplyImpl>(reply, shared_from_this()); |
2443 | + q_reply_proxy = make_shared<SearchReplyImpl>(reply, shared_from_this(), query->q_cardinality); |
2444 | assert(q_reply_proxy); |
2445 | query->q_reply_proxy = q_reply_proxy; |
2446 | |
2447 | |
2448 | === modified file 'src/scopes/internal/zmq_middleware/CMakeLists.txt' |
2449 | --- src/scopes/internal/zmq_middleware/CMakeLists.txt 2014-04-02 09:24:02 +0000 |
2450 | +++ src/scopes/internal/zmq_middleware/CMakeLists.txt 2014-04-28 05:57:56 +0000 |
2451 | @@ -13,6 +13,7 @@ |
2452 | ${CMAKE_CURRENT_SOURCE_DIR}/RethrowException.cpp |
2453 | ${CMAKE_CURRENT_SOURCE_DIR}/ScopeI.cpp |
2454 | ${CMAKE_CURRENT_SOURCE_DIR}/ServantBase.cpp |
2455 | + ${CMAKE_CURRENT_SOURCE_DIR}/StopPublisher.cpp |
2456 | ${CMAKE_CURRENT_SOURCE_DIR}/StateReceiverI.cpp |
2457 | ${CMAKE_CURRENT_SOURCE_DIR}/Util.cpp |
2458 | ${CMAKE_CURRENT_SOURCE_DIR}/VariantConverter.cpp |
2459 | |
2460 | === modified file 'src/scopes/internal/zmq_middleware/ConnectionPool.cpp' |
2461 | --- src/scopes/internal/zmq_middleware/ConnectionPool.cpp 2014-01-22 03:29:48 +0000 |
2462 | +++ src/scopes/internal/zmq_middleware/ConnectionPool.cpp 2014-04-28 05:57:56 +0000 |
2463 | @@ -36,8 +36,8 @@ |
2464 | namespace zmq_middleware |
2465 | { |
2466 | |
2467 | -ConnectionPool::ConnectionPool(zmqpp::context& context) : |
2468 | - context_(context) |
2469 | +ConnectionPool::ConnectionPool(zmqpp::context& context) |
2470 | + : context_(context) |
2471 | { |
2472 | } |
2473 | |
2474 | @@ -50,9 +50,7 @@ |
2475 | { |
2476 | assert(!endpoint.empty()); |
2477 | |
2478 | - lock_guard<mutex> lock(mutex_); |
2479 | - |
2480 | - // Look for existing connection. If there is one, but with the wrong type, we throw. |
2481 | + // Look for existing connection. |
2482 | auto const& it = pool_.find(endpoint); |
2483 | if (it != pool_.end()) |
2484 | { |
2485 | @@ -66,19 +64,30 @@ |
2486 | } |
2487 | |
2488 | // No existing connection yet, establish one. |
2489 | + auto entry = create_connection(endpoint, m, -1); |
2490 | + return pool_.emplace(move(entry)).first->second.socket; |
2491 | +} |
2492 | + |
2493 | +ConnectionPool::CPool::value_type ConnectionPool::create_connection(std::string const& endpoint, |
2494 | + RequestMode m, |
2495 | + int64_t timeout) |
2496 | +{ |
2497 | + if (timeout == -1) |
2498 | + { |
2499 | + timeout = 0; // Don't linger on close |
2500 | + } |
2501 | zmqpp::socket_type stype = m == RequestMode::Twoway ? zmqpp::socket_type::request : zmqpp::socket_type::push; |
2502 | zmqpp::socket s(context_, stype); |
2503 | - s.set(zmqpp::socket_option::linger, 0); |
2504 | + auto zmqpp_timeout = m == RequestMode::Twoway ? int32_t(timeout) : 0; |
2505 | + s.set(zmqpp::socket_option::linger, zmqpp_timeout); |
2506 | s.connect(endpoint); |
2507 | - return pool_.emplace(endpoint, Connection { move(s), m }).first->second.socket; |
2508 | + return CPool::value_type{ endpoint, SocketData{ move(s), m } }; |
2509 | } |
2510 | |
2511 | void ConnectionPool::remove(std::string const& endpoint) |
2512 | { |
2513 | assert(!endpoint.empty()); |
2514 | |
2515 | - lock_guard<mutex> lock(mutex_); |
2516 | - |
2517 | auto const& it = pool_.find(endpoint); |
2518 | if (it != pool_.end()) |
2519 | { |
2520 | @@ -90,8 +99,7 @@ |
2521 | { |
2522 | assert(!endpoint.empty()); |
2523 | |
2524 | - lock_guard<mutex> lock(mutex_); |
2525 | - pool_.emplace(endpoint, Connection { move(socket), m }); |
2526 | + pool_.emplace(endpoint, SocketData{ move(socket), m }); |
2527 | } |
2528 | |
2529 | } // namespace zmq_middleware |
2530 | |
2531 | === modified file 'src/scopes/internal/zmq_middleware/ObjectAdapter.cpp' |
2532 | --- src/scopes/internal/zmq_middleware/ObjectAdapter.cpp 2014-02-20 19:19:25 +0000 |
2533 | +++ src/scopes/internal/zmq_middleware/ObjectAdapter.cpp 2014-04-28 05:57:56 +0000 |
2534 | @@ -19,6 +19,7 @@ |
2535 | #include <unity/scopes/internal/zmq_middleware/ObjectAdapter.h> |
2536 | |
2537 | #include <unity/scopes/internal/zmq_middleware/ServantBase.h> |
2538 | +#include <unity/scopes/internal/zmq_middleware/StopPublisher.h> |
2539 | #include <unity/scopes/internal/zmq_middleware/Util.h> |
2540 | #include <unity/scopes/internal/zmq_middleware/ZmqException.h> |
2541 | #include <unity/scopes/internal/zmq_middleware/ZmqReceiver.h> |
2542 | @@ -134,7 +135,7 @@ |
2543 | lock_guard<mutex> state_lock(state_mutex_, adopt_lock); |
2544 | if (state_ == Destroyed || state_ == Failed) |
2545 | { |
2546 | - throw_bad_state(state_); |
2547 | + throw_bad_state("add()", state_); |
2548 | } |
2549 | } |
2550 | |
2551 | @@ -158,7 +159,7 @@ |
2552 | lock_guard<mutex> state_lock(state_mutex_, adopt_lock); |
2553 | if (state_ == Destroyed || state_ == Failed) |
2554 | { |
2555 | - throw_bad_state(state_); |
2556 | + throw_bad_state("remove()", state_); |
2557 | } |
2558 | } |
2559 | |
2560 | @@ -185,7 +186,7 @@ |
2561 | lock_guard<mutex> state_lock(state_mutex_, adopt_lock); |
2562 | if (state_ == Destroyed || state_ == Failed) |
2563 | { |
2564 | - throw_bad_state(state_); |
2565 | + throw_bad_state("find()", state_); |
2566 | } |
2567 | } |
2568 | |
2569 | @@ -210,7 +211,7 @@ |
2570 | lock_guard<mutex> state_lock(state_mutex_, adopt_lock); |
2571 | if (state_ == Destroyed || state_ == Failed) |
2572 | { |
2573 | - throw_bad_state(state_); |
2574 | + throw_bad_state("add_dflt_servant()", state_); |
2575 | } |
2576 | } |
2577 | |
2578 | @@ -234,7 +235,7 @@ |
2579 | lock_guard<mutex> state_lock(state_mutex_, adopt_lock); |
2580 | if (state_ == Destroyed || state_ == Failed) |
2581 | { |
2582 | - throw_bad_state(state_); |
2583 | + throw_bad_state("remove_dflt_servant()", state_); |
2584 | } |
2585 | } |
2586 | |
2587 | @@ -262,7 +263,7 @@ |
2588 | lock_guard<mutex> state_lock(state_mutex_, adopt_lock); |
2589 | if (state_ == Destroyed || state_ == Failed) |
2590 | { |
2591 | - throw_bad_state(state_); |
2592 | + throw_bad_state("find_dflt_servant()", state_); |
2593 | } |
2594 | } |
2595 | |
2596 | @@ -308,7 +309,7 @@ |
2597 | case Destroyed: |
2598 | case Failed: |
2599 | { |
2600 | - throw_bad_state(state_); |
2601 | + throw_bad_state("activate()", state_); |
2602 | } |
2603 | default: |
2604 | { |
2605 | @@ -329,7 +330,7 @@ |
2606 | } |
2607 | case Failed: |
2608 | { |
2609 | - throw_bad_state(state_); |
2610 | + throw_bad_state("shutdown() [state_ == Failed]", state_); |
2611 | } |
2612 | case Inactive: |
2613 | { |
2614 | @@ -349,7 +350,7 @@ |
2615 | } |
2616 | if (state_ == Failed) |
2617 | { |
2618 | - throw_bad_state(state_); |
2619 | + throw_bad_state("shutdown() [state == Activating]", state_); |
2620 | } |
2621 | // LCOV_EXCL_STOP |
2622 | |
2623 | @@ -358,7 +359,7 @@ |
2624 | case Active: |
2625 | { |
2626 | state_ = Deactivating; |
2627 | - stop_workers(); |
2628 | + stopper_->stop(); |
2629 | state_changed_.notify_all(); |
2630 | break; |
2631 | } |
2632 | @@ -388,11 +389,11 @@ |
2633 | |
2634 | if (state == Failed) |
2635 | { |
2636 | - throw_bad_state(state); |
2637 | + throw_bad_state("wait_for_shutdown()", state); |
2638 | } |
2639 | } |
2640 | |
2641 | -void ObjectAdapter::throw_bad_state(AdapterState state) const |
2642 | +void ObjectAdapter::throw_bad_state(string const& label, AdapterState state) const |
2643 | { |
2644 | string bad_state; |
2645 | switch (state) |
2646 | @@ -420,16 +421,31 @@ |
2647 | break; |
2648 | } |
2649 | } |
2650 | - MiddlewareException e("Object adapter in " + bad_state + " state (adapter: " + name_ + ")"); |
2651 | + MiddlewareException e(label + ": Object adapter in " + bad_state + " state (adapter: " + name_ + ")"); |
2652 | e.remember(exception_); // Remember any exception encountered by the broker thread or a worker thread. |
2653 | throw e; |
2654 | } |
2655 | |
2656 | void ObjectAdapter::run_workers() |
2657 | { |
2658 | + { |
2659 | + lock_guard<mutex> state_lock(state_mutex_); |
2660 | + try |
2661 | + { |
2662 | + // Start the publisher for stop messages. |
2663 | + stopper_.reset(new StopPublisher(mw_.context(), name_ + "-stopper")); |
2664 | + } |
2665 | + catch (...) |
2666 | + { |
2667 | + state_ = Failed; |
2668 | + state_changed_.notify_all(); |
2669 | + throw MiddlewareException("ObjectAdapter::run_workers(): stop thread failure (adapter: " + name_ + ")"); // LCOV_EXCL_LINE |
2670 | + } |
2671 | + } |
2672 | + |
2673 | // Start a broker thread to forward incoming messages to backend workers and |
2674 | // wait for the broker thread to finish its initialization. The broker |
2675 | - // signals after it has connected to the ctrl socket. |
2676 | + // signals after it has connected to the stop socket. |
2677 | { |
2678 | lock_guard<mutex> lock(ready_mutex_); |
2679 | ready_ = promise<void>(); |
2680 | @@ -469,56 +485,13 @@ |
2681 | // LCOV_EXCL_START |
2682 | catch (...) // |
2683 | { |
2684 | - stop_workers(); |
2685 | + stopper_->stop(); |
2686 | throw MiddlewareException("ObjectAdapter::run_workers(): worker thread failure (adapter: " + name_ + ")"); |
2687 | } |
2688 | // LCOV_EXCL_STOP |
2689 | } |
2690 | } |
2691 | |
2692 | -void ObjectAdapter::init_ctrl_socket() |
2693 | -{ |
2694 | - lock_guard<mutex> lock(ctrl_mutex_); |
2695 | - |
2696 | - // PUB socket to let the broker and workers know when it is time to shut down. |
2697 | - // Sending anything means the socket becomes ready for reading, which causes |
2698 | - // the broker and worker threads to finish. |
2699 | - ctrl_.reset(new zmqpp::socket(*mw_.context(), zmqpp::socket_type::publish)); |
2700 | - ctrl_->set(zmqpp::socket_option::linger, 0); |
2701 | - ctrl_->bind("inproc://" + name_ + "_adapter_ctrl"); |
2702 | -} |
2703 | - |
2704 | -zmqpp::socket ObjectAdapter::subscribe_to_ctrl_socket() |
2705 | -{ |
2706 | - zmqpp::socket ctrl(*mw_.context(), zmqpp::socket_type::subscribe); |
2707 | - ctrl.set(zmqpp::socket_option::linger, 0); |
2708 | - ctrl.connect("inproc://" + name_ + "_adapter_ctrl"); // Once a thread can read from here, that's the command to stop. |
2709 | - ctrl.subscribe(""); |
2710 | - return move(ctrl); |
2711 | -} |
2712 | - |
2713 | -void ObjectAdapter::stop_workers() noexcept |
2714 | -{ |
2715 | - lock_guard<mutex> lock(ctrl_mutex_); |
2716 | - |
2717 | - try |
2718 | - { |
2719 | - ctrl_->send("stop"); |
2720 | - } |
2721 | - // LCOV_EXCL_START |
2722 | - catch (std::exception const& e) |
2723 | - { |
2724 | - // TODO: log this instead |
2725 | - cerr << "ObjectAdapter::stop_workers(): " << e.what() << endl; |
2726 | - } |
2727 | - catch (...) |
2728 | - { |
2729 | - // TODO: log this instead |
2730 | - cerr << "ObjectAdapter::stop_workers(): unknown exception" << endl; |
2731 | - } |
2732 | - // LCOV_EXCL_STOP |
2733 | -} |
2734 | - |
2735 | // For the ipc transport, zmq permits more than one server to bind to the same endpoint. |
2736 | // If a server binds to an endpoint while another server is using that endpoint, the |
2737 | // second server silently "steals" the endpoint from the previous server, so all |
2738 | @@ -574,12 +547,8 @@ |
2739 | { |
2740 | try |
2741 | { |
2742 | - // Create the writing end of the ctrl socket. We need to do this before subscribing because, for inproc |
2743 | - // pub-sub sockets, the bind must happen before the connect. |
2744 | - init_ctrl_socket(); |
2745 | - |
2746 | - // Subscribe to ctrl socket. Once this socket becomes readable, that's the command to finish. |
2747 | - auto ctrl = subscribe_to_ctrl_socket(); |
2748 | + // Subscribe to stop socket. Once this socket becomes readable, that's the command to finish. |
2749 | + auto stop = stopper_->subscribe(); |
2750 | |
2751 | // Set up message pump. Router-dealer for twoway adapter, pull-push for oneway adapter. |
2752 | auto socket_type = mode_ == RequestMode::Twoway ? zmqpp::socket_type::router : zmqpp::socket_type::pull; |
2753 | @@ -592,7 +561,7 @@ |
2754 | |
2755 | try |
2756 | { |
2757 | - poller.add(ctrl); |
2758 | + poller.add(stop); |
2759 | |
2760 | frontend.set(zmqpp::socket_option::linger, 0); |
2761 | // "Safe" bind: prevents two servers from binding to the same endpoint. |
2762 | @@ -625,13 +594,13 @@ |
2763 | { |
2764 | zmqpp::message message; |
2765 | poller.poll(); |
2766 | - if (poller.has_input(ctrl)) |
2767 | + if (poller.has_input(stop)) |
2768 | { |
2769 | - // When the ctrl socket becomes ready, we need to get out of here. |
2770 | + // When the stop socket becomes ready, we need to get out of here. |
2771 | // We stop reading more requests from the router, but continue processing |
2772 | // while there are still replies outstanding. |
2773 | - ctrl.receive(message); |
2774 | - ctrl.close(); |
2775 | + poller.remove(stop); |
2776 | + stop.close(); |
2777 | shutting_down = true; |
2778 | } |
2779 | if (!shutting_down && poller.has_input(frontend)) // Once shutting down, we no longer read incoming messages. |
2780 | @@ -696,9 +665,9 @@ |
2781 | { |
2782 | zmqpp::poller poller; |
2783 | |
2784 | - // Subscribe to ctrl socket. Once this socket becomes readable, that's the command to finish. |
2785 | - auto ctrl = subscribe_to_ctrl_socket(); |
2786 | - poller.add(ctrl); |
2787 | + // Subscribe to stop socket. Once this socket becomes readable, that's the command to finish. |
2788 | + auto stop = stopper_->subscribe(); |
2789 | + poller.add(stop); |
2790 | |
2791 | auto socket_type = mode_ == RequestMode::Twoway ? zmqpp::socket_type::reply : zmqpp::socket_type::pull; |
2792 | zmqpp::socket s(*mw_.context(), socket_type); |
2793 | @@ -728,11 +697,10 @@ |
2794 | } |
2795 | |
2796 | poller.poll(); |
2797 | - if (poller.has_input(ctrl)) // Parent sent a stop message, so we are supposed to go away. |
2798 | + if (poller.has_input(stop)) // Parent sent a stop message, so we are supposed to go away. |
2799 | { |
2800 | - zmqpp::message message; |
2801 | - ctrl.receive(message); |
2802 | - ctrl.close(); |
2803 | + poller.remove(stop); |
2804 | + stop.close(); |
2805 | finish = true; |
2806 | } |
2807 | |
2808 | @@ -810,7 +778,16 @@ |
2809 | } |
2810 | |
2811 | // Look for a servant with matching id. |
2812 | - auto servant = find_servant(current.id, current.category); |
2813 | + shared_ptr<ServantBase> servant; |
2814 | + try |
2815 | + { |
2816 | + servant = find_servant(current.id, current.category); |
2817 | + } |
2818 | + catch (std::exception const&) |
2819 | + { |
2820 | + // Ignore failure to find servant during destruction phase. |
2821 | + } |
2822 | + |
2823 | if (!servant) |
2824 | { |
2825 | if (mode_ == RequestMode::Twoway) |
2826 | @@ -838,7 +815,7 @@ |
2827 | // LCOV_EXCL_START |
2828 | catch (...) |
2829 | { |
2830 | - stop_workers(); // Fatal error, we need to stop all other workers and the broker. |
2831 | + stopper_->stop(); // Fatal error, we need to stop all other workers and the broker. |
2832 | MiddlewareException e("ObjectAdapter: worker thread failure (adapter: " + name_ + ")"); |
2833 | store_exception(e); |
2834 | // We may not have signaled the parent yet, depending on where things went wrong. |
2835 | |
2836 | === added file 'src/scopes/internal/zmq_middleware/StopPublisher.cpp' |
2837 | --- src/scopes/internal/zmq_middleware/StopPublisher.cpp 1970-01-01 00:00:00 +0000 |
2838 | +++ src/scopes/internal/zmq_middleware/StopPublisher.cpp 2014-04-28 05:57:56 +0000 |
2839 | @@ -0,0 +1,225 @@ |
2840 | +/* |
2841 | + * Copyright (C) 2014 Canonical Ltd |
2842 | + * |
2843 | + * This program is free software: you can redistribute it and/or modify |
2844 | + * it under the terms of the GNU Lesser General Public License version 3 as |
2845 | + * published by the Free Software Foundation. |
2846 | + * |
2847 | + * This program is distributed in the hope that it will be useful, |
2848 | + * but WITHOUT ANY WARRANTY; without even the implied warranty of |
2849 | + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
2850 | + * GNU Lesser General Public License for more details. |
2851 | + * |
2852 | + * You should have received a copy of the GNU Lesser General Public License |
2853 | + * along with this program. If not, see <http://www.gnu.org/licenses/>. |
2854 | + * |
2855 | + * Authored by: Michi Henning <michi.henning@canonical.com> |
2856 | + */ |
2857 | + |
2858 | +#include <unity/scopes/internal/zmq_middleware/StopPublisher.h> |
2859 | + |
2860 | +#include <unity/scopes/ScopeExceptions.h> |
2861 | + |
2862 | +#include <cassert> |
2863 | +#include <iostream> |
2864 | + |
2865 | +using namespace std; |
2866 | + |
2867 | +namespace unity |
2868 | +{ |
2869 | + |
2870 | +namespace scopes |
2871 | +{ |
2872 | + |
2873 | +namespace internal |
2874 | +{ |
2875 | + |
2876 | +namespace zmq_middleware |
2877 | +{ |
2878 | + |
2879 | +StopPublisher::StopPublisher(zmqpp::context* context, string const& inproc_name) |
2880 | + : context_(context) |
2881 | + , endpoint_("inproc://" + inproc_name) |
2882 | + , state_(Starting) |
2883 | +{ |
2884 | + assert(context); |
2885 | + assert(!inproc_name.empty()); |
2886 | + |
2887 | + thread_ = thread(&StopPublisher::stopper_thread, this); |
2888 | + |
2889 | + unique_lock<mutex> lock(m_); |
2890 | + cond_.wait(lock, [this] { return state_ == Started || state_ == Failed; }); |
2891 | + |
2892 | + if (state_ == Failed) |
2893 | + { |
2894 | + if (thread_.joinable()) |
2895 | + { |
2896 | + thread_.join(); |
2897 | + } |
2898 | + try |
2899 | + { |
2900 | + rethrow_exception(ex_); |
2901 | + } |
2902 | + catch (...) |
2903 | + { |
2904 | + throw MiddlewareException("StopPublisher(): publisher thread failed (endpoint: " + endpoint_ + ")"); |
2905 | + } |
2906 | + } |
2907 | +} |
2908 | + |
2909 | +StopPublisher::~StopPublisher() |
2910 | +{ |
2911 | + try |
2912 | + { |
2913 | + stop(); |
2914 | + } |
2915 | + // LCOV_EXCL_START |
2916 | + catch (std::exception const& e) |
2917 | + { |
2918 | + cerr << "~StopPublisher(): exception from stop(): " << e.what() << endl; |
2919 | + // TODO: log this |
2920 | + } |
2921 | + catch (...) |
2922 | + { |
2923 | + cerr << "~StopPublisher(): unknown exception from stop()" << endl; |
2924 | + // TODO: log this |
2925 | + } |
2926 | + // LCOV_EXCL_STOP |
2927 | + if (thread_.joinable()) |
2928 | + { |
2929 | + thread_.join(); |
2930 | + } |
2931 | +} |
2932 | + |
2933 | +string StopPublisher::endpoint() const |
2934 | +{ |
2935 | + lock_guard<mutex> lock(m_); |
2936 | + return endpoint_; |
2937 | +} |
2938 | + |
2939 | +// Return a socket that becomes ready for reading once stop() is called. |
2940 | +// Once the socket becomes readable, it has exactly one zero-length message to be read. |
2941 | + |
2942 | +zmqpp::socket StopPublisher::subscribe() |
2943 | +{ |
2944 | + lock_guard<mutex> lock(m_); |
2945 | + |
2946 | + switch (state_) |
2947 | + { |
2948 | + case Stopping: |
2949 | + case Stopped: |
2950 | + { |
2951 | + throw MiddlewareException( |
2952 | + "StopPublisher::subscribe(): cannot subscribe to stopped publisher " |
2953 | + "(endpoint: " + endpoint_ + ")"); |
2954 | + } |
2955 | + // LCOV_EXCL_START |
2956 | + case Failed: |
2957 | + { |
2958 | + try |
2959 | + { |
2960 | + rethrow_exception(ex_); |
2961 | + } |
2962 | + catch (...) |
2963 | + { |
2964 | + throw MiddlewareException( |
2965 | + "StopPublisher::subscribe(): cannot subscribe to failed publisher " |
2966 | + "(endpoint: " + endpoint_ + ")"); |
2967 | + } |
2968 | + } |
2969 | + // LCOV_EXCL_STOP |
2970 | + default: |
2971 | + { |
2972 | + assert(state_ == Started); |
2973 | + zmqpp::socket s(*context_, zmqpp::socket_type::subscribe); |
2974 | + s.set(zmqpp::socket_option::linger, 0); |
2975 | + s.connect(endpoint_); |
2976 | + s.subscribe(""); |
2977 | + return move(s); |
2978 | + } |
2979 | + } |
2980 | +} |
2981 | + |
2982 | +// Tell the stopper thread to publish the stop message and terminate itself. |
2983 | + |
2984 | +void StopPublisher::stop() |
2985 | +{ |
2986 | + unique_lock<mutex> lock(m_); |
2987 | + switch (state_) |
2988 | + { |
2989 | + case Stopping: |
2990 | + case Stopped: |
2991 | + { |
2992 | + return; // no-op |
2993 | + } |
2994 | + // LCOV_EXCL_START |
2995 | + case Failed: |
2996 | + { |
2997 | + try |
2998 | + { |
2999 | + rethrow_exception(ex_); |
3000 | + } |
3001 | + catch (...) |
3002 | + { |
3003 | + throw MiddlewareException("StopPublisher::stop(): cannot stop failed publisher"); |
3004 | + } |
3005 | + } |
3006 | + // LCOV_EXCL_STOP |
3007 | + default: |
3008 | + { |
3009 | + assert(state_ == Started); |
3010 | + state_ = Stopping; |
3011 | + cond_.notify_all(); |
3012 | + } |
3013 | + } |
3014 | +} |
3015 | + |
3016 | +void StopPublisher::stopper_thread() noexcept |
3017 | +{ |
3018 | + try |
3019 | + { |
3020 | + unique_lock<mutex> lock(m_); |
3021 | + |
3022 | + assert(state_ == Starting); |
3023 | + |
3024 | + // Create the publishing socket. |
3025 | + zmqpp::socket pub_socket(zmqpp::socket(*context_, zmqpp::socket_type::publish)); |
3026 | + // Allow time for stop message to be buffered, so close() won't discard it. |
3027 | + pub_socket.set(zmqpp::socket_option::linger, 5000); |
3028 | + pub_socket.bind(endpoint_); |
3029 | + |
3030 | + // Notify that we are ready. This ensures that subscribers |
3031 | + // cannot subscribe before the pub socket was bound. |
3032 | + state_ = Started; |
3033 | + cond_.notify_all(); |
3034 | + |
3035 | + lock.unlock(); // Allow parent to wake up. |
3036 | + |
3037 | + // Wait until we are told to stop. |
3038 | + lock.lock(); |
3039 | + cond_.wait(lock, [this] { return state_ == Stopping; }); |
3040 | + |
3041 | + // Write the stop message for the subscribers and close. |
3042 | + // Sending an empty string is OK; the receiver will get a zero-length message. |
3043 | + pub_socket.send(""); |
3044 | + pub_socket.close(); |
3045 | + state_ = Stopped; |
3046 | + } |
3047 | + catch (...) |
3048 | + { |
3049 | + lock_guard<mutex> lock(m_); |
3050 | + state_ = Failed; |
3051 | + ex_ = current_exception(); |
3052 | + cond_.notify_all(); |
3053 | + } |
3054 | + |
3055 | + assert(state_ == Stopped || state_ == Failed); |
3056 | +} |
3057 | + |
3058 | +} // namespace zmq_middleware |
3059 | + |
3060 | +} // namespace internal |
3061 | + |
3062 | +} // namespace scopes |
3063 | + |
3064 | +} // namespace unity |
3065 | |
3066 | === modified file 'src/scopes/internal/zmq_middleware/ZmqMiddleware.cpp' |
3067 | --- src/scopes/internal/zmq_middleware/ZmqMiddleware.cpp 2014-04-02 09:24:02 +0000 |
3068 | +++ src/scopes/internal/zmq_middleware/ZmqMiddleware.cpp 2014-04-28 05:57:56 +0000 |
3069 | @@ -434,9 +434,9 @@ |
3070 | shared_ptr<QueryCtrlI> qci(make_shared<QueryCtrlI>(ctrl)); |
3071 | auto adapter = find_adapter(server_name_ + ctrl_suffix, config_.private_dir()); |
3072 | function<void()> df; |
3073 | - auto proxy = safe_add(df, adapter, "", qci); |
3074 | + auto p = safe_add(df, adapter, "", qci); |
3075 | ctrl->set_disconnect_function(df); |
3076 | - return ZmqQueryCtrlProxy(new ZmqQueryCtrl(this, proxy->endpoint(), proxy->identity(), "QueryCtrl")); |
3077 | + proxy = ZmqQueryCtrlProxy(new ZmqQueryCtrl(this, p->endpoint(), p->identity(), "QueryCtrl")); |
3078 | } |
3079 | catch (std::exception const& e) // Should never happen unless our implementation is broken |
3080 | { |
3081 | @@ -476,9 +476,9 @@ |
3082 | shared_ptr<QueryI> qi(make_shared<QueryI>(query)); |
3083 | auto adapter = find_adapter(server_name_ + query_suffix, config_.private_dir()); |
3084 | function<void()> df; |
3085 | - auto proxy = safe_add(df, adapter, "", qi); |
3086 | + auto p = safe_add(df, adapter, "", qi); |
3087 | query->set_disconnect_function(df); |
3088 | - return ZmqQueryProxy(new ZmqQuery(this, proxy->endpoint(), proxy->identity(), "Query")); |
3089 | + proxy = ZmqQueryProxy(new ZmqQuery(this, p->endpoint(), p->identity(), "Query")); |
3090 | } |
3091 | catch (std::exception const& e) // Should never happen unless our implementation is broken |
3092 | { |
3093 | @@ -519,9 +519,9 @@ |
3094 | shared_ptr<RegistryI> ri(make_shared<RegistryI>(registry)); |
3095 | auto adapter = find_adapter(server_name_, runtime()->registry_endpointdir()); |
3096 | function<void()> df; |
3097 | - auto proxy = safe_add(df, adapter, identity, ri); |
3098 | + auto p = safe_add(df, adapter, identity, ri); |
3099 | registry->set_disconnect_function(df); |
3100 | - return ZmqRegistryProxy(new ZmqRegistry(this, proxy->endpoint(), proxy->identity(), "Registry", twoway_timeout_)); |
3101 | + proxy = ZmqRegistryProxy(new ZmqRegistry(this, p->endpoint(), p->identity(), "Registry", twoway_timeout_)); |
3102 | } |
3103 | catch (std::exception const& e) // Should never happen unless our implementation is broken |
3104 | { |
3105 | @@ -542,9 +542,9 @@ |
3106 | shared_ptr<ReplyI> ri(make_shared<ReplyI>(reply)); |
3107 | auto adapter = find_adapter(server_name_ + reply_suffix, config_.public_dir()); |
3108 | function<void()> df; |
3109 | - auto proxy = safe_add(df, adapter, "", ri); |
3110 | + auto p = safe_add(df, adapter, "", ri); |
3111 | reply->set_disconnect_function(df); |
3112 | - return ZmqReplyProxy(new ZmqReply(this, proxy->endpoint(), proxy->identity(), "Reply")); |
3113 | + proxy = ZmqReplyProxy(new ZmqReply(this, p->endpoint(), p->identity(), "Reply")); |
3114 | } |
3115 | catch (std::exception const& e) // Should never happen unless our implementation is broken |
3116 | { |
3117 | @@ -566,9 +566,9 @@ |
3118 | shared_ptr<ScopeI> si(make_shared<ScopeI>(scope)); |
3119 | auto adapter = find_adapter(server_name_, config_.private_dir()); |
3120 | function<void()> df; |
3121 | - auto proxy = safe_add(df, adapter, identity, si); |
3122 | + auto p = safe_add(df, adapter, identity, si); |
3123 | scope->set_disconnect_function(df); |
3124 | - return ZmqScopeProxy(new ZmqScope(this, proxy->endpoint(), proxy->identity(), "Scope", twoway_timeout_)); |
3125 | + proxy = ZmqScopeProxy(new ZmqScope(this, p->endpoint(), p->identity(), "Scope", twoway_timeout_)); |
3126 | } |
3127 | catch (std::exception const& e) // Should never happen unless our implementation is broken |
3128 | { |
3129 | |
3130 | === modified file 'src/scopes/internal/zmq_middleware/ZmqObject.cpp' |
3131 | --- src/scopes/internal/zmq_middleware/ZmqObject.cpp 2014-02-03 07:28:36 +0000 |
3132 | +++ src/scopes/internal/zmq_middleware/ZmqObject.cpp 2014-04-28 05:57:56 +0000 |
3133 | @@ -18,7 +18,6 @@ |
3134 | |
3135 | #include <unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h> |
3136 | |
3137 | -#include <unity/scopes/internal/zmq_middleware/ConnectionPool.h> |
3138 | #include <unity/scopes/internal/zmq_middleware/Util.h> |
3139 | #include <unity/scopes/internal/zmq_middleware/ZmqException.h> |
3140 | #include <unity/scopes/internal/zmq_middleware/ZmqSender.h> |
3141 | @@ -127,7 +126,8 @@ |
3142 | capnp::MallocMessageBuilder request_builder; |
3143 | make_request_(request_builder, "ping"); |
3144 | |
3145 | - auto future = mw_base()->invoke_pool()->submit([&] { this->invoke_(request_builder); }); |
3146 | + auto future = mw_base()->invoke_pool()->submit([&] { this->invoke_twoway_(request_builder); }); |
3147 | + // TODO: dubious, waiter thread in runtimeImpl should do this? |
3148 | future.wait(); |
3149 | } |
3150 | |
3151 | @@ -148,59 +148,82 @@ |
3152 | return request; |
3153 | } |
3154 | |
3155 | +#ifdef ENABLE_IPC_MONITOR |
3156 | void register_monitor_socket (ConnectionPool& pool, zmqpp::context_t const& context) |
3157 | { |
3158 | thread_local static bool monitor_initialized = false; |
3159 | if (!monitor_initialized) { |
3160 | monitor_initialized = true; |
3161 | zmqpp::socket monitor_socket(context, zmqpp::socket_type::publish); |
3162 | + monitor_socket.set(zmqpp::socket_option::linger, 0); |
3163 | monitor_socket.connect(MONITOR_ENDPOINT); |
3164 | - monitor_socket.set(zmqpp::socket_option::linger, 0); |
3165 | pool.register_socket(MONITOR_ENDPOINT, move(monitor_socket), RequestMode::Oneway); |
3166 | } |
3167 | } |
3168 | - |
3169 | -ZmqReceiver ZmqObjectProxy::invoke_(capnp::MessageBuilder& out_params) |
3170 | -{ |
3171 | - return invoke_(out_params, timeout_); |
3172 | -} |
3173 | - |
3174 | -// Get a socket to the endpoint for this proxy and write the request on the wire. |
3175 | -// For a twoway request, poll for the reply with the timeout set for this proxy. |
3176 | -// Return a receiver for the response (whether this is a oneway or twoway request). |
3177 | - |
3178 | -ZmqReceiver ZmqObjectProxy::invoke_(capnp::MessageBuilder& out_params, int64_t timeout) |
3179 | -{ |
3180 | - // Each calling thread gets its own pool because zmq sockets are not thread-safe. |
3181 | - thread_local static ConnectionPool pool(*mw_base()->context()); |
3182 | - |
3183 | - zmqpp::socket& s = pool.find(endpoint_, mode_); |
3184 | - ZmqSender sender(s); |
3185 | - auto segments = out_params.getSegmentsForOutput(); |
3186 | - sender.send(segments); |
3187 | - |
3188 | -#ifdef ENABLE_IPC_MONITOR |
3189 | - if (true) { |
3190 | - register_monitor_socket(pool, *mw_base()->context()); |
3191 | - zmqpp::socket& monitor = pool.find(MONITOR_ENDPOINT, RequestMode::Oneway); |
3192 | - auto word_arr = capnp::messageToFlatArray(segments); |
3193 | - monitor.send_raw(reinterpret_cast<char*>(&word_arr[0]), word_arr.size() * sizeof(capnp::word)); |
3194 | - } |
3195 | -#endif |
3196 | - |
3197 | - if (mode_ == RequestMode::Twoway) |
3198 | +#endif |
3199 | + |
3200 | +// Get a socket to the endpoint for this proxy and write the request on the wire. |
3201 | + |
3202 | +void ZmqObjectProxy::invoke_oneway_(capnp::MessageBuilder& out_params) |
3203 | +{ |
3204 | + // Each calling thread gets its own pool because zmq sockets are not thread-safe. |
3205 | + thread_local static ConnectionPool pool(*mw_base()->context()); |
3206 | + |
3207 | + assert(mode_ == RequestMode::Oneway); |
3208 | + zmqpp::socket& s = pool.find(endpoint_, mode_); |
3209 | + ZmqSender sender(s); |
3210 | + auto segments = out_params.getSegmentsForOutput(); |
3211 | + sender.send(segments); |
3212 | + |
3213 | +#ifdef ENABLE_IPC_MONITOR |
3214 | + if (true) { |
3215 | + register_monitor_socket(pool, *mw_base()->context()); |
3216 | + zmqpp::socket& monitor = pool.find(MONITOR_ENDPOINT, RequestMode::Oneway); |
3217 | + auto word_arr = capnp::messageToFlatArray(segments); |
3218 | + monitor.send_raw(reinterpret_cast<char*>(&word_arr[0]), word_arr.size() * sizeof(capnp::word)); |
3219 | + } |
3220 | +#endif |
3221 | +} |
3222 | + |
3223 | +ZmqReceiver ZmqObjectProxy::invoke_twoway_(capnp::MessageBuilder& out_params) |
3224 | +{ |
3225 | + return invoke_twoway_(out_params, timeout_); |
3226 | +} |
3227 | + |
3228 | +// Get a socket to the endpoint for this proxy and write the request on the wire. |
3229 | +// Poll for the reply with the given timeout. |
3230 | +// Return a socket for the response or throw if the timeout expires. |
3231 | + |
3232 | +ZmqReceiver ZmqObjectProxy::invoke_twoway_(capnp::MessageBuilder& out_params, int64_t timeout) |
3233 | +{ |
3234 | + // Each calling thread gets its own pool because zmq sockets are not thread-safe. |
3235 | + thread_local static ConnectionPool pool(*mw_base()->context()); |
3236 | + |
3237 | + assert(mode_ == RequestMode::Twoway); |
3238 | + zmqpp::socket& s = pool.find(endpoint_, mode_); |
3239 | + ZmqSender sender(s); |
3240 | + auto segments = out_params.getSegmentsForOutput(); |
3241 | + sender.send(segments); |
3242 | + |
3243 | +#ifdef ENABLE_IPC_MONITOR |
3244 | + if (true) { |
3245 | + register_monitor_socket(pool, *mw_base()->context()); |
3246 | + zmqpp::socket& monitor = pool.find(MONITOR_ENDPOINT, RequestMode::Oneway); |
3247 | + auto word_arr = capnp::messageToFlatArray(segments); |
3248 | + monitor.send_raw(reinterpret_cast<char*>(&word_arr[0]), word_arr.size() * sizeof(capnp::word)); |
3249 | + } |
3250 | +#endif |
3251 | + |
3252 | + zmqpp::poller p; |
3253 | + p.add(s); |
3254 | + p.poll(timeout); |
3255 | + if (!p.has_input(s)) |
3256 | { |
3257 | - zmqpp::poller p; |
3258 | - p.add(s); |
3259 | - p.poll(timeout); |
3260 | - if (!p.has_input(s)) |
3261 | - { |
3262 | - // If a request times out, we must close the corresponding socket, otherwise |
3263 | - // zmq gets confused: the reply will never be read, so the socket ends up |
3264 | - // in a bad state. |
3265 | - pool.remove(endpoint_); |
3266 | - throw TimeoutException("Request timed out after " + std::to_string(timeout) + " milliseconds"); |
3267 | - } |
3268 | + // If a request times out, we must trash the corresponding socket, otherwise |
3269 | + // zmq gets confused: the reply will never be read, so the socket ends up |
3270 | + // in a bad state. |
3271 | + pool.remove(endpoint_); |
3272 | + throw TimeoutException("Request timed out after " + std::to_string(timeout) + " milliseconds"); |
3273 | } |
3274 | return ZmqReceiver(s); |
3275 | } |
3276 | |
3277 | === modified file 'src/scopes/internal/zmq_middleware/ZmqQuery.cpp' |
3278 | --- src/scopes/internal/zmq_middleware/ZmqQuery.cpp 2014-01-24 00:26:43 +0000 |
3279 | +++ src/scopes/internal/zmq_middleware/ZmqQuery.cpp 2014-04-28 05:57:56 +0000 |
3280 | @@ -66,7 +66,7 @@ |
3281 | proxy.setIdentity(rp->identity().c_str()); |
3282 | proxy.setCategory(rp->category().c_str()); |
3283 | |
3284 | - auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); }); |
3285 | + auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_oneway_(request_builder); }); |
3286 | future.wait(); |
3287 | } |
3288 | |
3289 | |
3290 | === modified file 'src/scopes/internal/zmq_middleware/ZmqQueryCtrl.cpp' |
3291 | --- src/scopes/internal/zmq_middleware/ZmqQueryCtrl.cpp 2014-01-24 00:26:43 +0000 |
3292 | +++ src/scopes/internal/zmq_middleware/ZmqQueryCtrl.cpp 2014-04-28 05:57:56 +0000 |
3293 | @@ -58,7 +58,7 @@ |
3294 | capnp::MallocMessageBuilder request_builder; |
3295 | make_request_(request_builder, "cancel"); |
3296 | |
3297 | - auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); }); |
3298 | + auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_oneway_(request_builder); }); |
3299 | future.wait(); |
3300 | } |
3301 | |
3302 | @@ -67,7 +67,7 @@ |
3303 | capnp::MallocMessageBuilder request_builder; |
3304 | make_request_(request_builder, "destroy"); |
3305 | |
3306 | - auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); }); |
3307 | + auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_oneway_(request_builder); }); |
3308 | future.wait(); |
3309 | } |
3310 | |
3311 | |
3312 | === modified file 'src/scopes/internal/zmq_middleware/ZmqRegistry.cpp' |
3313 | --- src/scopes/internal/zmq_middleware/ZmqRegistry.cpp 2014-04-03 12:57:25 +0000 |
3314 | +++ src/scopes/internal/zmq_middleware/ZmqRegistry.cpp 2014-04-28 05:57:56 +0000 |
3315 | @@ -85,7 +85,7 @@ |
3316 | auto in_params = request.initInParams().getAs<capnproto::Registry::GetMetadataRequest>(); |
3317 | in_params.setIdentity(scope_id.c_str()); |
3318 | |
3319 | - auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); }); |
3320 | + auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_twoway_(request_builder); }); |
3321 | auto receiver = future.get(); |
3322 | auto segments = receiver.receive(); |
3323 | capnp::SegmentArrayMessageReader reader(segments); |
3324 | @@ -119,7 +119,7 @@ |
3325 | capnp::MallocMessageBuilder request_builder; |
3326 | make_request_(request_builder, "list"); |
3327 | |
3328 | - auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); }); |
3329 | + auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_twoway_(request_builder); }); |
3330 | auto receiver = future.get(); |
3331 | auto segments = receiver.receive(); |
3332 | capnp::SegmentArrayMessageReader reader(segments); |
3333 | @@ -149,7 +149,7 @@ |
3334 | |
3335 | // locate uses a custom timeout because it needs to potentially fork/exec a scope. |
3336 | int64_t timeout = 1000; // TODO: get timeout from config |
3337 | - auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder, timeout); }); |
3338 | + auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_twoway_(request_builder, timeout); }); |
3339 | auto receiver = future.get(); |
3340 | auto segments = receiver.receive(); |
3341 | capnp::SegmentArrayMessageReader reader(segments); |
3342 | |
3343 | === modified file 'src/scopes/internal/zmq_middleware/ZmqReply.cpp' |
3344 | --- src/scopes/internal/zmq_middleware/ZmqReply.cpp 2014-01-24 00:26:43 +0000 |
3345 | +++ src/scopes/internal/zmq_middleware/ZmqReply.cpp 2014-04-28 05:57:56 +0000 |
3346 | @@ -64,7 +64,7 @@ |
3347 | auto resultBuilder = in_params.getResult(); |
3348 | to_value_dict(result, resultBuilder); |
3349 | |
3350 | - auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); }); |
3351 | + auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_oneway_(request_builder); }); |
3352 | future.wait(); |
3353 | } |
3354 | |
3355 | @@ -99,7 +99,7 @@ |
3356 | } |
3357 | in_params.setReason(r); |
3358 | |
3359 | - auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); }); |
3360 | + auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_oneway_(request_builder); }); |
3361 | future.wait(); |
3362 | } |
3363 | |
3364 | |
3365 | === modified file 'src/scopes/internal/zmq_middleware/ZmqScope.cpp' |
3366 | --- src/scopes/internal/zmq_middleware/ZmqScope.cpp 2014-03-07 02:01:47 +0000 |
3367 | +++ src/scopes/internal/zmq_middleware/ZmqScope.cpp 2014-04-28 05:57:56 +0000 |
3368 | @@ -27,6 +27,8 @@ |
3369 | #include <unity/scopes/Result.h> |
3370 | #include <unity/scopes/CannedQuery.h> |
3371 | |
3372 | +#include <iostream> // TODO: remove this |
3373 | + |
3374 | using namespace std; |
3375 | |
3376 | namespace unity |
3377 | @@ -90,7 +92,7 @@ |
3378 | p.setCategory(reply_proxy->category().c_str()); |
3379 | } |
3380 | |
3381 | - auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); }); |
3382 | + auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_twoway_(request_builder); }); |
3383 | |
3384 | auto receiver = future.get(); |
3385 | auto segments = receiver.receive(); |
3386 | @@ -122,7 +124,7 @@ |
3387 | p.setIdentity(reply_proxy->identity().c_str()); |
3388 | } |
3389 | |
3390 | - auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); }); |
3391 | + auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_twoway_(request_builder); }); |
3392 | |
3393 | auto receiver = future.get(); |
3394 | auto segments = receiver.receive(); |
3395 | @@ -157,7 +159,7 @@ |
3396 | p.setIdentity(reply_proxy->identity().c_str()); |
3397 | } |
3398 | |
3399 | - auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); }); |
3400 | + auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_twoway_(request_builder); }); |
3401 | future.wait(); |
3402 | |
3403 | auto receiver = future.get(); |
3404 | @@ -190,7 +192,7 @@ |
3405 | p.setIdentity(reply_proxy->identity().c_str()); |
3406 | } |
3407 | |
3408 | - auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); }); |
3409 | + auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_twoway_(request_builder); }); |
3410 | |
3411 | auto receiver = future.get(); |
3412 | auto segments = receiver.receive(); |
3413 | |
3414 | === modified file 'src/scopes/internal/zmq_middleware/ZmqStateReceiver.cpp' |
3415 | --- src/scopes/internal/zmq_middleware/ZmqStateReceiver.cpp 2014-04-02 09:24:02 +0000 |
3416 | +++ src/scopes/internal/zmq_middleware/ZmqStateReceiver.cpp 2014-04-28 05:57:56 +0000 |
3417 | @@ -83,7 +83,7 @@ |
3418 | in_params.setState(s); |
3419 | in_params.setSenderId(sender_id); |
3420 | |
3421 | - auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); }); |
3422 | + auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_oneway_(request_builder); }); |
3423 | future.wait(); |
3424 | } |
3425 | |
3426 | |
3427 | === modified file 'test/gtest/scopes/CMakeLists.txt' |
3428 | --- test/gtest/scopes/CMakeLists.txt 2014-04-01 16:58:32 +0000 |
3429 | +++ test/gtest/scopes/CMakeLists.txt 2014-04-28 05:57:56 +0000 |
3430 | @@ -1,23 +1,24 @@ |
3431 | add_subdirectory(internal) |
3432 | add_subdirectory(testing) |
3433 | |
3434 | -add_subdirectory(Registry) |
3435 | -add_subdirectory(Runtime) |
3436 | add_subdirectory(Activation) |
3437 | add_subdirectory(ActivationResponse) |
3438 | add_subdirectory(Annotation) |
3439 | add_subdirectory(CannedQuery) |
3440 | -add_subdirectory(Department) |
3441 | -add_subdirectory(Filters) |
3442 | -add_subdirectory(ScopeBase) |
3443 | -add_subdirectory(ScopeExceptions) |
3444 | +add_subdirectory(CategorisedResult) |
3445 | add_subdirectory(Category) |
3446 | -add_subdirectory(CategorisedResult) |
3447 | add_subdirectory(CategoryRenderer) |
3448 | add_subdirectory(ColumnLayout) |
3449 | +add_subdirectory(Department) |
3450 | +add_subdirectory(Filters) |
3451 | +add_subdirectory(Invocation) |
3452 | +add_subdirectory(OptionSelectorFilter) |
3453 | add_subdirectory(PreviewWidget) |
3454 | add_subdirectory(QueryMetadata) |
3455 | -add_subdirectory(OptionSelectorFilter) |
3456 | +add_subdirectory(Registry) |
3457 | +add_subdirectory(Runtime) |
3458 | +add_subdirectory(ScopeBase) |
3459 | +add_subdirectory(ScopeExceptions) |
3460 | add_subdirectory(Variant) |
3461 | add_subdirectory(VariantBuilder) |
3462 | add_subdirectory(Version) |
3463 | |
3464 | === added directory 'test/gtest/scopes/Invocation' |
3465 | === added file 'test/gtest/scopes/Invocation/CMakeLists.txt' |
3466 | --- test/gtest/scopes/Invocation/CMakeLists.txt 1970-01-01 00:00:00 +0000 |
3467 | +++ test/gtest/scopes/Invocation/CMakeLists.txt 2014-04-28 05:57:56 +0000 |
3468 | @@ -0,0 +1,8 @@ |
3469 | +configure_file(Registry.ini.in ${CMAKE_CURRENT_BINARY_DIR}/Registry.ini) |
3470 | +configure_file(Runtime.ini.in ${CMAKE_CURRENT_BINARY_DIR}/Runtime.ini) |
3471 | +configure_file(Zmq.ini.in ${CMAKE_CURRENT_BINARY_DIR}/Zmq.ini) |
3472 | + |
3473 | +add_executable(Invocation_test Invocation_test.cpp TestScope.cpp) |
3474 | +target_link_libraries(Invocation_test ${TESTLIBS}) |
3475 | + |
3476 | +add_test(Invocation Invocation_test) |
3477 | |
3478 | === added file 'test/gtest/scopes/Invocation/Invocation_test.cpp' |
3479 | --- test/gtest/scopes/Invocation/Invocation_test.cpp 1970-01-01 00:00:00 +0000 |
3480 | +++ test/gtest/scopes/Invocation/Invocation_test.cpp 2014-04-28 05:57:56 +0000 |
3481 | @@ -0,0 +1,145 @@ |
3482 | +/* |
3483 | + * Copyright (C) 2014 Canonical Ltd |
3484 | + * |
3485 | + * This program is free software: you can redistribute it and/or modify |
3486 | + * it under the terms of the GNU Lesser General Public License version 3 as |
3487 | + * published by the Free Software Foundation. |
3488 | + * |
3489 | + * This program is distributed in the hope that it will be useful, |
3490 | + * but WITHOUT ANY WARRANTY; without even the implied warranty of |
3491 | + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
3492 | + * GNU Lesser General Public License for more details. |
3493 | + * |
3494 | + * You should have received a copy of the GNU Lesser General Public License |
3495 | + * along with this program. If not, see <http://www.gnu.org/licenses/>. |
3496 | + * |
3497 | + * Authored by: Michi Henning <michi.henning@canonical.com> |
3498 | + */ |
3499 | + |
3500 | +#include <signal.h> |
3501 | +#include <sys/types.h> |
3502 | +#include <unistd.h> |
3503 | + |
3504 | +#include <mutex> |
3505 | + |
3506 | +#include <unity/scopes/ActionMetadata.h> |
3507 | +#include <unity/scopes/CategorisedResult.h> |
3508 | +#include <unity/scopes/internal/MWScope.h> |
3509 | +#include <unity/scopes/internal/RuntimeImpl.h> |
3510 | +#include <unity/scopes/internal/ScopeImpl.h> |
3511 | +#include <unity/scopes/ListenerBase.h> |
3512 | +#include <unity/scopes/QueryCtrl.h> |
3513 | +#include <unity/scopes/Runtime.h> |
3514 | +#include <unity/scopes/SearchMetadata.h> |
3515 | +#include <unity/UnityExceptions.h> |
3516 | + |
3517 | +#include <gtest/gtest.h> |
3518 | + |
3519 | +#include "TestScope.h" |
3520 | + |
3521 | +using namespace std; |
3522 | +using namespace unity::scopes; |
3523 | + |
3524 | +class TestReceiver : public SearchListenerBase |
3525 | +{ |
3526 | +public: |
3527 | + TestReceiver() |
3528 | + : query_complete_(false) |
3529 | + { |
3530 | + } |
3531 | + |
3532 | + virtual void push(CategorisedResult /* result */) override |
3533 | + { |
3534 | + } |
3535 | + |
3536 | + virtual void finished(ListenerBase::Reason reason, string const& error_message) override |
3537 | + { |
3538 | + lock_guard<mutex> lock(mutex_); |
3539 | + reason_ = reason; |
3540 | + error_message_ = error_message; |
3541 | + query_complete_ = true; |
3542 | + cond_.notify_one(); |
3543 | + } |
3544 | + |
3545 | + void wait_until_finished() |
3546 | + { |
3547 | + unique_lock<mutex> lock(mutex_); |
3548 | + cond_.wait(lock, [this] { return this->query_complete_; }); |
3549 | + query_complete_ = false; |
3550 | + } |
3551 | + |
3552 | + ListenerBase::Reason reason() |
3553 | + { |
3554 | + lock_guard<mutex> lock(mutex_); |
3555 | + return reason_; |
3556 | + } |
3557 | + |
3558 | + string error_message() |
3559 | + { |
3560 | + lock_guard<mutex> lock(mutex_); |
3561 | + return error_message_; |
3562 | + } |
3563 | + |
3564 | +private: |
3565 | + bool query_complete_; |
3566 | + ListenerBase::Reason reason_; |
3567 | + string error_message_; |
3568 | + mutex mutex_; |
3569 | + condition_variable cond_; |
3570 | +}; |
3571 | + |
3572 | +// Check that invoking on a scope after a timeout exception from a previous |
3573 | +// invocation works correctly. This tests that a failed socket is removed |
3574 | +// from the connection pool in ZmqObject::invoke_twoway_(). |
3575 | + |
3576 | +TEST(Invocation, timeout) |
3577 | +{ |
3578 | + auto rt = internal::RuntimeImpl::create("", "Runtime.ini"); |
3579 | + auto mw = rt->factory()->create("TestScope", "Zmq", "Zmq.ini"); |
3580 | + mw->start(); |
3581 | + auto proxy = mw->create_scope_proxy("TestScope"); |
3582 | + auto scope = internal::ScopeImpl::create(proxy, rt.get(), "no_such_scope"); |
3583 | + |
3584 | + auto receiver = make_shared<TestReceiver>(); |
3585 | + |
3586 | + // First call must time out. |
3587 | + scope->search("test", SearchMetadata("unused", "unused"), receiver); |
3588 | + receiver->wait_until_finished(); |
3589 | + |
3590 | + EXPECT_EQ(ListenerBase::Error, receiver->reason()); |
3591 | + EXPECT_EQ("unity::scopes::TimeoutException: Request timed out after 300 milliseconds", receiver->error_message()); |
3592 | + |
3593 | + // this_thread::sleep_for(chrono::milliseconds(500)); |
3594 | + |
3595 | + // Second call must succeed |
3596 | + scope->search("test", SearchMetadata("unused", "unused"), receiver); |
3597 | + receiver->wait_until_finished(); |
3598 | + |
3599 | + EXPECT_EQ(ListenerBase::Finished, receiver->reason()); |
3600 | + EXPECT_EQ("", receiver->error_message()); |
3601 | +} |
3602 | + |
3603 | +void scope_thread(Runtime::SPtr const& rt) |
3604 | +{ |
3605 | + TestScope scope; |
3606 | + rt->run_scope(&scope, "/foo"); |
3607 | +} |
3608 | + |
3609 | +int main(int argc, char **argv) |
3610 | +{ |
3611 | + ::testing::InitGoogleTest(&argc, argv); |
3612 | + |
3613 | + Runtime::SPtr srt = move(Runtime::create_scope_runtime("TestScope", "Runtime.ini")); |
3614 | + std::thread scope_t(scope_thread, srt); |
3615 | + |
3616 | + // Give thread some time to bind to its endpoint, to avoid getting ObjectNotExistException |
3617 | + // from a synchronous remote call. |
3618 | + this_thread::sleep_for(chrono::milliseconds(200)); |
3619 | + |
3620 | + auto rc = RUN_ALL_TESTS(); |
3621 | + |
3622 | + srt->destroy(); |
3623 | + scope_t.join(); |
3624 | + |
3625 | + return rc; |
3626 | +} |
3627 | |
3628 | === added file 'test/gtest/scopes/Invocation/Registry.ini.in' |
3629 | --- test/gtest/scopes/Invocation/Registry.ini.in 1970-01-01 00:00:00 +0000 |
3630 | +++ test/gtest/scopes/Invocation/Registry.ini.in 2014-04-28 05:57:56 +0000 |
3631 | @@ -0,0 +1,8 @@ |
3632 | +[Registry] |
3633 | +Middleware = Zmq |
3634 | +Zmq.Endpoint = ipc:///tmp/socket_for_registry |
3635 | +Zmq.EndpointDir = /tmp |
3636 | +Zmq.ConfigFile = Zmq.ini |
3637 | +Scope.InstallDir = /unused |
3638 | +Click.InstallDir = /unused |
3639 | +Scoperunner.Path = /unused |
3640 | |
3641 | === added file 'test/gtest/scopes/Invocation/Runtime.ini.in' |
3642 | --- test/gtest/scopes/Invocation/Runtime.ini.in 1970-01-01 00:00:00 +0000 |
3643 | +++ test/gtest/scopes/Invocation/Runtime.ini.in 2014-04-28 05:57:56 +0000 |
3644 | @@ -0,0 +1,5 @@ |
3645 | +[Runtime] |
3646 | +Registry.Identity = Registry |
3647 | +Registry.ConfigFile = Registry.ini |
3648 | +Default.Middleware = Zmq |
3649 | +Zmq.ConfigFile = Zmq.ini |
3650 | |
3651 | === added file 'test/gtest/scopes/Invocation/TestScope.cpp' |
3652 | --- test/gtest/scopes/Invocation/TestScope.cpp 1970-01-01 00:00:00 +0000 |
3653 | +++ test/gtest/scopes/Invocation/TestScope.cpp 2014-04-28 05:57:56 +0000 |
3654 | @@ -0,0 +1,94 @@ |
3655 | +/* |
3656 | + * Copyright (C) 2014 Canonical Ltd |
3657 | + * |
3658 | + * This program is free software: you can redistribute it and/or modify |
3659 | + * it under the terms of the GNU Lesser General Public License version 3 as |
3660 | + * published by the Free Software Foundation. |
3661 | + * |
3662 | + * This program is distributed in the hope that it will be useful, |
3663 | + * but WITHOUT ANY WARRANTY; without even the implied warranty of |
3664 | + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
3665 | + * GNU Lesser General Public License for more details. |
3666 | + * |
3667 | + * You should have received a copy of the GNU Lesser General Public License |
3668 | + * along with this program. If not, see <http://www.gnu.org/licenses/>. |
3669 | + * |
3670 | + * Authored by: Michi Henning <michi.henning@canonical.com> |
3671 | + */ |
3672 | + |
3673 | +#include "TestScope.h" |
3674 | + |
3675 | +#include <unity/scopes/CategorisedResult.h> |
3676 | +#include <unity/scopes/ScopeBase.h> |
3677 | +#include <unity/scopes/SearchReply.h> |
3678 | + |
3679 | +#include <mutex> |
3680 | +#include <thread> |
3681 | + |
3682 | +using namespace std; |
3683 | +using namespace unity::scopes; |
3684 | + |
3685 | +namespace |
3686 | +{ |
3687 | + |
3688 | +class TestQuery : public SearchQueryBase |
3689 | +{ |
3690 | +public: |
3691 | + TestQuery() |
3692 | + { |
3693 | + } |
3694 | + |
3695 | + virtual void cancelled() override |
3696 | + { |
3697 | + } |
3698 | + |
3699 | + virtual void run(SearchReplyProxy const& reply) override |
3700 | + { |
3701 | + auto cat = reply->register_category("cat1", "Category 1", ""); |
3702 | + CategorisedResult res(cat); |
3703 | + res.set_uri("uri"); |
3704 | + res.set_title("title"); |
3705 | + res.set_art("art"); |
3706 | + res.set_dnd_uri("dnd_uri"); |
3707 | + reply->push(res); |
3708 | + } |
3709 | +}; |
3710 | + |
3711 | +} // namespace |
3712 | + |
3713 | +int TestScope::start(string const&, RegistryProxy const &) |
3714 | +{ |
3715 | + return VERSION; |
3716 | +} |
3717 | + |
3718 | +void TestScope::stop() |
3719 | +{ |
3720 | +} |
3721 | + |
3722 | +void TestScope::run() |
3723 | +{ |
3724 | +} |
3725 | + |
3726 | +namespace |
3727 | +{ |
3728 | + |
3729 | +mutex m; |
3730 | +int count = 0; |
3731 | + |
3732 | +} // namespace |
3733 | + |
3734 | +SearchQueryBase::UPtr TestScope::search(CannedQuery const&, SearchMetadata const &) |
3735 | +{ |
3736 | + lock_guard<mutex> lock(m); |
3737 | + |
3738 | + if (count++ == 0) |
3739 | + { |
3740 | + this_thread::sleep_for(chrono::milliseconds(400)); // Force timeout on first call |
3741 | + } |
3742 | + return SearchQueryBase::UPtr(new TestQuery()); |
3743 | +} |
3744 | + |
3745 | +PreviewQueryBase::UPtr TestScope::preview(Result const&, ActionMetadata const &) |
3746 | +{ |
3747 | + return nullptr; // unused |
3748 | +} |
3749 | |
3750 | === added file 'test/gtest/scopes/Invocation/TestScope.h' |
3751 | --- test/gtest/scopes/Invocation/TestScope.h 1970-01-01 00:00:00 +0000 |
3752 | +++ test/gtest/scopes/Invocation/TestScope.h 2014-04-28 05:57:56 +0000 |
3753 | @@ -0,0 +1,40 @@ |
3754 | +/* |
3755 | + * Copyright (C) 2014 Canonical Ltd |
3756 | + * |
3757 | + * This program is free software: you can redistribute it and/or modify |
3758 | + * it under the terms of the GNU Lesser General Public License version 3 as |
3759 | + * published by the Free Software Foundation. |
3760 | + * |
3761 | + * This program is distributed in the hope that it will be useful, |
3762 | + * but WITHOUT ANY WARRANTY; without even the implied warranty of |
3763 | + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
3764 | + * GNU Lesser General Public License for more details. |
3765 | + * |
3766 | + * You should have received a copy of the GNU Lesser General Public License |
3767 | + * along with this program. If not, see <http://www.gnu.org/licenses/>. |
3768 | + * |
3769 | + * Authored by: Michi Henning <michi.henning@canonical.com> |
3770 | + */ |
3771 | + |
3772 | +#ifndef TEST_TESTSCOPE_H |
3773 | +#define TEST_TESTSCOPE_H |
3774 | + |
3775 | +#include <unity/scopes/ScopeBase.h> |
3776 | + |
3777 | +using namespace std; |
3778 | +using namespace unity::scopes; |
3779 | + |
3780 | +class TestScope : public ScopeBase |
3781 | +{ |
3782 | +public: |
3783 | + virtual int start(string const&, RegistryProxy const &) override; |
3784 | + |
3785 | + virtual void stop() override; |
3786 | + |
3787 | + virtual void run() override; |
3788 | + |
3789 | + virtual SearchQueryBase::UPtr search(CannedQuery const &, SearchMetadata const &) override; |
3790 | + virtual PreviewQueryBase::UPtr preview(Result const&, ActionMetadata const &) override; |
3791 | +}; |
3792 | + |
3793 | +#endif |
3794 | |
3795 | === added file 'test/gtest/scopes/Invocation/Zmq.ini.in' |
3796 | --- test/gtest/scopes/Invocation/Zmq.ini.in 1970-01-01 00:00:00 +0000 |
3797 | +++ test/gtest/scopes/Invocation/Zmq.ini.in 2014-04-28 05:57:56 +0000 |
3798 | @@ -0,0 +1,3 @@ |
3799 | +[Zmq] |
3800 | +EndpointDir.Public = /tmp |
3801 | +EndpointDir.Private = /tmp |
3802 | |
3803 | === modified file 'test/gtest/scopes/Registry/Registry_test.cpp' |
3804 | --- test/gtest/scopes/Registry/Registry_test.cpp 2014-04-08 10:34:19 +0000 |
3805 | +++ test/gtest/scopes/Registry/Registry_test.cpp 2014-04-28 05:57:56 +0000 |
3806 | @@ -24,8 +24,12 @@ |
3807 | #include <boost/filesystem.hpp> |
3808 | #include <boost/system/error_code.hpp> |
3809 | #include <boost/filesystem/operations.hpp> |
3810 | +#include <gtest/gtest.h> |
3811 | + |
3812 | +#include <condition_variable> |
3813 | #include <functional> |
3814 | -#include <gtest/gtest.h> |
3815 | +#include <mutex> |
3816 | + |
3817 | #include <signal.h> |
3818 | #include <unistd.h> |
3819 | |
3820 | @@ -34,13 +38,40 @@ |
3821 | class Receiver : public SearchListenerBase |
3822 | { |
3823 | public: |
3824 | + Receiver() |
3825 | + : done_(false) |
3826 | + , finished_ok_(false) |
3827 | + { |
3828 | + } |
3829 | + |
3830 | virtual void push(CategorisedResult /* result */) override |
3831 | { |
3832 | } |
3833 | |
3834 | - virtual void finished(ListenerBase::Reason /* reason */, std::string const& /* error_message */) override |
3835 | - { |
3836 | - } |
3837 | + virtual void finished(ListenerBase::Reason reason, std::string const& /* error_message */) override |
3838 | + { |
3839 | + std::lock_guard<std::mutex> lock(mutex_); |
3840 | + |
3841 | + EXPECT_EQ(Finished, reason); |
3842 | + finished_ok_ = reason == Finished; |
3843 | + done_ = true; |
3844 | + cond_.notify_all(); |
3845 | + } |
3846 | + |
3847 | + bool wait_until_finished() |
3848 | + { |
3849 | + std::unique_lock<std::mutex> lock(mutex_); |
3850 | + auto now = std::chrono::steady_clock::now(); |
3851 | + auto expiry_time = now + std::chrono::seconds(5); |
3852 | + EXPECT_TRUE(cond_.wait_until(lock, expiry_time, [this]{ return done_; })) << "finished message not delivered"; |
3853 | + return finished_ok_; |
3854 | + } |
3855 | + |
3856 | +private: |
3857 | + bool done_; |
3858 | + bool finished_ok_; |
3859 | + std::mutex mutex_; |
3860 | + std::condition_variable cond_; |
3861 | }; |
3862 | |
3863 | TEST(Registry, metadata) |
3864 | @@ -75,18 +106,13 @@ |
3865 | |
3866 | auto sp = meta.proxy(); |
3867 | |
3868 | - SearchListenerBase::SPtr reply(new Receiver); |
3869 | + auto receiver = std::make_shared<Receiver>(); |
3870 | + SearchListenerBase::SPtr reply(receiver); |
3871 | SearchMetadata metadata("C", "desktop"); |
3872 | |
3873 | // search would fail if testscopeB can't be executed |
3874 | - try |
3875 | - { |
3876 | - auto ctrl = sp->search("foo", metadata, reply); |
3877 | - } |
3878 | - catch (...) |
3879 | - { |
3880 | - FAIL(); |
3881 | - } |
3882 | + auto ctrl = sp->search("foo", metadata, reply); |
3883 | + EXPECT_TRUE(receiver->wait_until_finished()); |
3884 | } |
3885 | |
3886 | bool wait_for_registry() |
3887 | |
3888 | === modified file 'test/gtest/scopes/Runtime/CMakeLists.txt' |
3889 | --- test/gtest/scopes/Runtime/CMakeLists.txt 2014-02-19 04:18:26 +0000 |
3890 | +++ test/gtest/scopes/Runtime/CMakeLists.txt 2014-04-28 05:57:56 +0000 |
3891 | @@ -2,7 +2,7 @@ |
3892 | configure_file(Runtime.ini.in ${CMAKE_CURRENT_BINARY_DIR}/Runtime.ini) |
3893 | configure_file(Zmq.ini.in ${CMAKE_CURRENT_BINARY_DIR}/Zmq.ini) |
3894 | |
3895 | -add_executable(Runtime_test Runtime_test.cpp TestScope.cpp PusherScope.cpp) |
3896 | +add_executable(Runtime_test Runtime_test.cpp TestScope.cpp PusherScope.cpp SlowCreateScope.cpp) |
3897 | target_link_libraries(Runtime_test ${TESTLIBS}) |
3898 | |
3899 | add_test(Runtime Runtime_test) |
3900 | |
3901 | === modified file 'test/gtest/scopes/Runtime/PusherScope.cpp' |
3902 | --- test/gtest/scopes/Runtime/PusherScope.cpp 2014-03-07 01:07:28 +0000 |
3903 | +++ test/gtest/scopes/Runtime/PusherScope.cpp 2014-04-28 05:57:56 +0000 |
3904 | @@ -23,11 +23,16 @@ |
3905 | #include <unity/scopes/ScopeBase.h> |
3906 | #include <unity/scopes/SearchReply.h> |
3907 | |
3908 | +#include <atomic> |
3909 | + |
3910 | #include <gtest/gtest.h> |
3911 | |
3912 | using namespace std; |
3913 | using namespace unity::scopes; |
3914 | |
3915 | +namespace |
3916 | +{ |
3917 | + |
3918 | class PusherQuery : public SearchQueryBase |
3919 | { |
3920 | public: |
3921 | @@ -59,9 +64,11 @@ |
3922 | } |
3923 | |
3924 | private: |
3925 | - int cardinality_; |
3926 | + atomic_int cardinality_; |
3927 | }; |
3928 | |
3929 | +} // namespace |
3930 | + |
3931 | int PusherScope::start(string const&, RegistryProxy const &) |
3932 | { |
3933 | return VERSION; |
3934 | |
3935 | === modified file 'test/gtest/scopes/Runtime/PusherScope.h' |
3936 | --- test/gtest/scopes/Runtime/PusherScope.h 2014-02-27 23:20:56 +0000 |
3937 | +++ test/gtest/scopes/Runtime/PusherScope.h 2014-04-28 05:57:56 +0000 |
3938 | @@ -16,6 +16,9 @@ |
3939 | * Authored by: Michi Henning <michi.henning@canonical.com> |
3940 | */ |
3941 | |
3942 | +#ifndef TEST_PUSHERSCOPE_H |
3943 | +#define TEST_PUSHERSCOPE_H |
3944 | + |
3945 | #include <unity/scopes/ScopeBase.h> |
3946 | |
3947 | using namespace std; |
3948 | @@ -33,3 +36,5 @@ |
3949 | virtual SearchQueryBase::UPtr search(CannedQuery const &, SearchMetadata const &) override; |
3950 | virtual PreviewQueryBase::UPtr preview(Result const& result, ActionMetadata const& metadata) override; |
3951 | }; |
3952 | + |
3953 | +#endif |
3954 | |
3955 | === modified file 'test/gtest/scopes/Runtime/Runtime_test.cpp' |
3956 | --- test/gtest/scopes/Runtime/Runtime_test.cpp 2014-04-03 14:22:02 +0000 |
3957 | +++ test/gtest/scopes/Runtime/Runtime_test.cpp 2014-04-28 05:57:56 +0000 |
3958 | @@ -1,5 +1,5 @@ |
3959 | /* |
3960 | -* Copyright (C) 2013 Canonical Ltd |
3961 | + * Copyright (C) 2013 Canonical Ltd |
3962 | * |
3963 | * This program is free software: you can redistribute it and/or modify |
3964 | * it under the terms of the GNU Lesser General Public License version 3 as |
3965 | @@ -22,20 +22,22 @@ |
3966 | |
3967 | #include <mutex> |
3968 | |
3969 | +#include <unity/scopes/ActionMetadata.h> |
3970 | #include <unity/scopes/CategorisedResult.h> |
3971 | +#include <unity/scopes/internal/MWScope.h> |
3972 | +#include <unity/scopes/internal/RuntimeImpl.h> |
3973 | +#include <unity/scopes/internal/ScopeImpl.h> |
3974 | #include <unity/scopes/ListenerBase.h> |
3975 | +#include <unity/scopes/QueryCtrl.h> |
3976 | #include <unity/scopes/Runtime.h> |
3977 | -#include <unity/scopes/internal/RuntimeImpl.h> |
3978 | -#include <unity/scopes/internal/MWScope.h> |
3979 | -#include <unity/scopes/internal/ScopeImpl.h> |
3980 | -#include <unity/scopes/ActionMetadata.h> |
3981 | #include <unity/scopes/SearchMetadata.h> |
3982 | #include <unity/UnityExceptions.h> |
3983 | |
3984 | #include <gtest/gtest.h> |
3985 | |
3986 | +#include "PusherScope.h" |
3987 | +#include "SlowCreateScope.h" |
3988 | #include "TestScope.h" |
3989 | -#include "PusherScope.h" |
3990 | |
3991 | using namespace std; |
3992 | using namespace unity::scopes; |
3993 | @@ -82,6 +84,7 @@ |
3994 | count_++; |
3995 | last_result_ = std::make_shared<Result>(result); |
3996 | } |
3997 | + |
3998 | virtual void push(Annotation annotation) override |
3999 | { |
4000 | EXPECT_EQ(1u, annotation.links().size()); |
4001 | @@ -92,6 +95,7 @@ |
4002 | EXPECT_EQ("dep1", query.department_id()); |
4003 | annotation_count_++; |
4004 | } |
4005 | + |
4006 | virtual void finished(ListenerBase::Reason reason, string const& error_message) override |
4007 | { |
4008 | EXPECT_EQ(Finished, reason); |
4009 | @@ -104,15 +108,18 @@ |
4010 | query_complete_ = true; |
4011 | cond_.notify_one(); |
4012 | } |
4013 | + |
4014 | void wait_until_finished() |
4015 | { |
4016 | unique_lock<mutex> lock(mutex_); |
4017 | cond_.wait(lock, [this] { return this->query_complete_; }); |
4018 | } |
4019 | + |
4020 | std::shared_ptr<Result> last_result() |
4021 | { |
4022 | return last_result_; |
4023 | } |
4024 | + |
4025 | private: |
4026 | bool query_complete_; |
4027 | mutex mutex_; |
4028 | @@ -225,6 +232,47 @@ |
4029 | receiver->wait_until_finished(); |
4030 | } |
4031 | |
4032 | +TEST(Runtime, consecutive_search) |
4033 | +{ |
4034 | + auto rt = internal::RuntimeImpl::create("", "Runtime.ini"); |
4035 | + auto mw = rt->factory()->create("TestScope", "Zmq", "Zmq.ini"); |
4036 | + mw->start(); |
4037 | + |
4038 | + auto proxy = mw->create_scope_proxy("TestScope"); |
4039 | + auto scope = internal::ScopeImpl::create(proxy, rt.get(), "TestScope"); |
4040 | + |
4041 | + auto receiver = make_shared<Receiver>(); |
4042 | + auto ctrl = scope->search("test", SearchMetadata("en", "phone"), receiver); |
4043 | + receiver->wait_until_finished(); |
4044 | + |
4045 | + std::vector<std::shared_ptr<Receiver>> replies; |
4046 | + |
4047 | + const int num_searches = 100; |
4048 | + for (int i = 0; i < num_searches; ++i) |
4049 | + { |
4050 | + replies.push_back(std::make_shared<Receiver>()); |
4051 | + scope->search("test", SearchMetadata("en", "phone"), replies.back()); |
4052 | + } |
4053 | + |
4054 | + for (int i = 0; i < num_searches; ++i) |
4055 | + { |
4056 | + replies[i]->wait_until_finished(); |
4057 | + } |
4058 | + |
4059 | + // Do it again, to test that re-use of previously-created connections works. |
4060 | + replies.clear(); |
4061 | + for (int i = 0; i < num_searches; ++i) |
4062 | + { |
4063 | + replies.push_back(std::make_shared<Receiver>()); |
4064 | + scope->search("test", SearchMetadata("en", "phone"), replies.back()); |
4065 | + } |
4066 | + |
4067 | + for (int i = 0; i < num_searches; ++i) |
4068 | + { |
4069 | + replies[i]->wait_until_finished(); |
4070 | + } |
4071 | +} |
4072 | + |
4073 | TEST(Runtime, preview) |
4074 | { |
4075 | // connect to scope and run a query |
4076 | @@ -272,6 +320,68 @@ |
4077 | receiver->wait_until_finished(); |
4078 | } |
4079 | |
4080 | +class CancelReceiver : public SearchListenerBase |
4081 | +{ |
4082 | +public: |
4083 | + CancelReceiver() |
4084 | + : query_complete_(false) |
4085 | + { |
4086 | + } |
4087 | + |
4088 | + virtual void push(CategorisedResult /* result */) override |
4089 | + { |
4090 | + FAIL(); |
4091 | + } |
4092 | + |
4093 | + virtual void finished(ListenerBase::Reason reason, string const& error_message) override |
4094 | + { |
4095 | + EXPECT_EQ(Cancelled, reason); |
4096 | + EXPECT_EQ("", error_message); |
4097 | + // Signal that the query has completed. |
4098 | + unique_lock<mutex> lock(mutex_); |
4099 | + query_complete_ = true; |
4100 | + cond_.notify_one(); |
4101 | + } |
4102 | + |
4103 | + void wait_until_finished() |
4104 | + { |
4105 | + unique_lock<mutex> lock(mutex_); |
4106 | + cond_.wait(lock, [this] { return this->query_complete_; }); |
4107 | + } |
4108 | + |
4109 | +private: |
4110 | + bool query_complete_; |
4111 | + mutex mutex_; |
4112 | + condition_variable cond_; |
4113 | +}; |
4114 | + |
4115 | +TEST(Runtime, early_cancel) |
4116 | +{ |
4117 | + auto rt = internal::RuntimeImpl::create("", "Runtime.ini"); |
4118 | + auto mw = rt->factory()->create("SlowCreateScope", "Zmq", "Zmq.ini"); |
4119 | + mw->start(); |
4120 | + auto proxy = mw->create_scope_proxy("SlowCreateScope"); |
4121 | + auto scope = internal::ScopeImpl::create(proxy, rt.get(), "SlowCreateScope"); |
4122 | + |
4123 | + // Check that, if a cancel is sent before search() returns on the server side, the |
4124 | + // cancel is correctly forwarded to the scope once the real reply proxy arrives |
4125 | + // over the wire. |
4126 | + auto receiver = make_shared<CancelReceiver>(); |
4127 | + auto ctrl = scope->search("test", SearchMetadata("unused", "unused"), receiver); |
4128 | + // Allow some time for the search message to get there. |
4129 | + this_thread::sleep_for(chrono::milliseconds(100)); |
4130 | + // search() in the scope doesn't return for some time, so the cancel() that follows |
4131 | + // is sent to the "fake" QueryCtrlProxy. |
4132 | + ctrl->cancel(); |
4133 | + receiver->wait_until_finished(); |
4134 | + // The receiver receives its cancel from the client-side run time instead of the |
4135 | + // scope because the run time short-cuts sending the cancel locally instead |
4136 | + // of waiting for the cancel message from the scope. Allow some time for the |
4137 | + // cancel to reach the scope before shutting down the run time, so the scope |
4138 | + // can test that it received the cancel. |
4139 | + this_thread::sleep_for(chrono::milliseconds(200)); |
4140 | +} |
4141 | + |
4142 | void scope_thread(Runtime::SPtr const& rt) |
4143 | { |
4144 | TestScope scope; |
4145 | @@ -284,6 +394,12 @@ |
4146 | rt->run_scope(&scope, "/foo"); |
4147 | } |
4148 | |
4149 | +void slow_create_thread(Runtime::SPtr const& rt) |
4150 | +{ |
4151 | + SlowCreateScope scope; |
4152 | + rt->run_scope(&scope, "/foo"); |
4153 | +} |
4154 | + |
4155 | int main(int argc, char **argv) |
4156 | { |
4157 | ::testing::InitGoogleTest(&argc, argv); |
4158 | @@ -294,6 +410,13 @@ |
4159 | Runtime::SPtr prt = move(Runtime::create_scope_runtime("PusherScope", "Runtime.ini")); |
4160 | std::thread pusher_t(pusher_thread, prt); |
4161 | |
4162 | + Runtime::SPtr scrt = move(Runtime::create_scope_runtime("SlowCreateScope", "Runtime.ini")); |
4163 | + std::thread slow_create_t(slow_create_thread, scrt); |
4164 | + |
4165 | + // Give threads some time to bind to their endpoints, to avoid getting ObjectNotExistException |
4166 | + // from a synchronous remote call. |
4167 | + this_thread::sleep_for(chrono::milliseconds(200)); |
4168 | + |
4169 | auto rc = RUN_ALL_TESTS(); |
4170 | |
4171 | srt->destroy(); |
4172 | @@ -302,5 +425,8 @@ |
4173 | prt->destroy(); |
4174 | pusher_t.join(); |
4175 | |
4176 | + scrt->destroy(); |
4177 | + slow_create_t.join(); |
4178 | + |
4179 | return rc; |
4180 | } |
4181 | |
4182 | === added file 'test/gtest/scopes/Runtime/SlowCreateScope.cpp' |
4183 | --- test/gtest/scopes/Runtime/SlowCreateScope.cpp 1970-01-01 00:00:00 +0000 |
4184 | +++ test/gtest/scopes/Runtime/SlowCreateScope.cpp 2014-04-28 05:57:56 +0000 |
4185 | @@ -0,0 +1,101 @@ |
4186 | +/* |
4187 | + * Copyright (C) 2013 Canonical Ltd |
4188 | + * |
4189 | + * This program is free software: you can redistribute it and/or modify |
4190 | + * it under the terms of the GNU Lesser General Public License version 3 as |
4191 | + * published by the Free Software Foundation. |
4192 | + * |
4193 | + * This program is distributed in the hope that it will be useful, |
4194 | + * but WITHOUT ANY WARRANTY; without even the implied warranty of |
4195 | + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
4196 | + * GNU Lesser General Public License for more details. |
4197 | + * |
4198 | + * You should have received a copy of the GNU Lesser General Public License |
4199 | + * along with this program. If not, see <http://www.gnu.org/licenses/>. |
4200 | + * |
4201 | + * Authored by: Michi Henning <michi.henning@canonical.com> |
4202 | + */ |
4203 | + |
4204 | +#include <unity/scopes/ScopeBase.h> |
4205 | + |
4206 | +#include "SlowCreateScope.h" |
4207 | + |
4208 | +#include <condition_variable> |
4209 | +#include <mutex> |
4210 | +#include <thread> |
4211 | + |
4212 | +#include <gtest/gtest.h> |
4213 | + |
4214 | +using namespace std; |
4215 | +using namespace unity::scopes; |
4216 | + |
4217 | +namespace |
4218 | +{ |
4219 | + |
4220 | +class TestQuery : public SearchQueryBase |
4221 | +{ |
4222 | +public: |
4223 | + TestQuery() |
4224 | + { |
4225 | + lock_guard<mutex> lock(mutex_); |
4226 | + cancelled_ = false; |
4227 | + } |
4228 | + |
4229 | + virtual void cancelled() override |
4230 | + { |
4231 | + lock_guard<mutex> lock(mutex_); |
4232 | + cancelled_ = true; |
4233 | + cond_.notify_all(); |
4234 | + } |
4235 | + |
4236 | + virtual void run(SearchReplyProxy const&) override |
4237 | + { |
4238 | + if (!valid()) |
4239 | + { |
4240 | + return; // Query was cancelled already |
4241 | + } |
4242 | + // We time out the wait because, otherwise, the scope's object adapter can't shut down |
4243 | + // and the test hangs forever. |
4244 | + // If this fails, the cancelled method wasn't called after five seconds. |
4245 | + auto stop_time = chrono::steady_clock::now() + chrono::seconds(5); |
4246 | + unique_lock<mutex> lock(mutex_); |
4247 | + EXPECT_TRUE(cond_.wait_until(lock, stop_time, [this]{ return cancelled_;})); |
4248 | + } |
4249 | + |
4250 | +private: |
4251 | + mutex mutex_; |
4252 | + condition_variable cond_; |
4253 | + bool cancelled_; |
4254 | +}; |
4255 | + |
4256 | +} // namespace |
4257 | + |
4258 | +int SlowCreateScope::start(string const&, RegistryProxy const &) |
4259 | +{ |
4260 | + return VERSION; |
4261 | +} |
4262 | + |
4263 | +void SlowCreateScope::stop() |
4264 | +{ |
4265 | +} |
4266 | + |
4267 | +void SlowCreateScope::run() |
4268 | +{ |
4269 | +} |
4270 | + |
4271 | +SearchQueryBase::UPtr SlowCreateScope::search(CannedQuery const&, SearchMetadata const &) |
4272 | +{ |
4273 | + // Sleep for a while. This allows the client to call cancel() before this function |
4274 | + // returns, while the test still holds a fake QueryCtrl returned by the async |
4275 | + // invocation. When this method returns, the client-side run time calls |
4276 | + // the real cancel, which triggers the cancelled() callback above, which, in turn, |
4277 | + // causes TestQuery::run() to complete. |
4278 | + this_thread::sleep_for(chrono::milliseconds(200)); |
4279 | + |
4280 | + return SearchQueryBase::UPtr(new TestQuery()); |
4281 | +} |
4282 | + |
4283 | +PreviewQueryBase::UPtr SlowCreateScope::preview(Result const&, ActionMetadata const &) |
4284 | +{ |
4285 | + return nullptr; // Not called |
4286 | +} |
4287 | |
4288 | === added file 'test/gtest/scopes/Runtime/SlowCreateScope.h' |
4289 | --- test/gtest/scopes/Runtime/SlowCreateScope.h 1970-01-01 00:00:00 +0000 |
4290 | +++ test/gtest/scopes/Runtime/SlowCreateScope.h 2014-04-28 05:57:56 +0000 |
4291 | @@ -0,0 +1,40 @@ |
4292 | +/* |
4293 | + * Copyright (C) 2013 Canonical Ltd |
4294 | + * |
4295 | + * This program is free software: you can redistribute it and/or modify |
4296 | + * it under the terms of the GNU Lesser General Public License version 3 as |
4297 | + * published by the Free Software Foundation. |
4298 | + * |
4299 | + * This program is distributed in the hope that it will be useful, |
4300 | + * but WITHOUT ANY WARRANTY; without even the implied warranty of |
4301 | + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
4302 | + * GNU Lesser General Public License for more details. |
4303 | + * |
4304 | + * You should have received a copy of the GNU Lesser General Public License |
4305 | + * along with this program. If not, see <http://www.gnu.org/licenses/>. |
4306 | + * |
4307 | + * Authored by: Michi Henning <michi.henning@canonical.com> |
4308 | + */ |
4309 | + |
4310 | +#ifndef TEST_SLOWCREATESCOPE_H |
4311 | +#define TEST_SLOWCREATESCOPE_H |
4312 | + |
4313 | +#include <unity/scopes/ScopeBase.h> |
4314 | + |
4315 | +using namespace std; |
4316 | +using namespace unity::scopes; |
4317 | + |
4318 | +class SlowCreateScope : public ScopeBase |
4319 | +{ |
4320 | +public: |
4321 | + virtual int start(string const&, RegistryProxy const &) override; |
4322 | + |
4323 | + virtual void stop() override; |
4324 | + |
4325 | + virtual void run() override; |
4326 | + |
4327 | + virtual SearchQueryBase::UPtr search(CannedQuery const &, SearchMetadata const &) override; |
4328 | + virtual PreviewQueryBase::UPtr preview(Result const&, ActionMetadata const &) override; |
4329 | +}; |
4330 | + |
4331 | +#endif |
4332 | |
4333 | === modified file 'test/gtest/scopes/Runtime/TestScope.cpp' |
4334 | --- test/gtest/scopes/Runtime/TestScope.cpp 2014-03-07 01:07:28 +0000 |
4335 | +++ test/gtest/scopes/Runtime/TestScope.cpp 2014-04-28 05:57:56 +0000 |
4336 | @@ -30,6 +30,9 @@ |
4337 | using namespace std; |
4338 | using namespace unity::scopes; |
4339 | |
4340 | +namespace |
4341 | +{ |
4342 | + |
4343 | class TestQuery : public SearchQueryBase |
4344 | { |
4345 | public: |
4346 | @@ -41,6 +44,7 @@ |
4347 | virtual void cancelled() override |
4348 | { |
4349 | } |
4350 | + |
4351 | virtual void run(SearchReplyProxy const& reply) override |
4352 | { |
4353 | Department dep("news", query_, "News"); |
4354 | @@ -65,6 +69,8 @@ |
4355 | CannedQuery query_; |
4356 | }; |
4357 | |
4358 | +} // namespace |
4359 | + |
4360 | class TestPreview : public PreviewQueryBase |
4361 | { |
4362 | public: |
4363 | |
4364 | === modified file 'test/gtest/scopes/Runtime/TestScope.h' |
4365 | --- test/gtest/scopes/Runtime/TestScope.h 2014-02-27 23:20:56 +0000 |
4366 | +++ test/gtest/scopes/Runtime/TestScope.h 2014-04-28 05:57:56 +0000 |
4367 | @@ -16,6 +16,9 @@ |
4368 | * Authored by: James Henstridge <james.henstridge@canonical.com> |
4369 | */ |
4370 | |
4371 | +#ifndef TEST_TESTSCOPE_H |
4372 | +#define TEST_TESTSCOPE_H |
4373 | + |
4374 | #include <unity/scopes/ScopeBase.h> |
4375 | |
4376 | using namespace std; |
4377 | @@ -33,3 +36,5 @@ |
4378 | virtual SearchQueryBase::UPtr search(CannedQuery const &, SearchMetadata const &) override; |
4379 | virtual PreviewQueryBase::UPtr preview(Result const&, ActionMetadata const &) override; |
4380 | }; |
4381 | + |
4382 | +#endif |
4383 | |
4384 | === modified file 'test/gtest/scopes/ScopeExceptions/ScopeExceptions_test.cpp' |
4385 | --- test/gtest/scopes/ScopeExceptions/ScopeExceptions_test.cpp 2014-01-09 09:47:56 +0000 |
4386 | +++ test/gtest/scopes/ScopeExceptions/ScopeExceptions_test.cpp 2014-04-28 05:57:56 +0000 |
4387 | @@ -60,3 +60,32 @@ |
4388 | EXPECT_EQ(e.name(), e2.name()); |
4389 | } |
4390 | } |
4391 | + |
4392 | +TEST(ObjectNotExistException, state) |
4393 | +{ |
4394 | + { |
4395 | + ObjectNotExistException e("some error", "some id"); |
4396 | + EXPECT_STREQ("unity::scopes::ObjectNotExistException: some error (id = some id)", e.what()); |
4397 | + EXPECT_EQ("unity::scopes::ObjectNotExistException", e.name()); |
4398 | + EXPECT_EQ("some id", e.id()); |
4399 | + EXPECT_THROW(rethrow_exception(e.self()), ObjectNotExistException); |
4400 | + ObjectNotExistException e2("blah", "some id"); |
4401 | + e2 = e; |
4402 | + EXPECT_EQ(e.reason(), e2.reason()); |
4403 | + EXPECT_EQ(e.name(), e2.name()); |
4404 | + } |
4405 | +} |
4406 | + |
4407 | +TEST(TimeoutException, state) |
4408 | +{ |
4409 | + { |
4410 | + TimeoutException e("some error"); |
4411 | + EXPECT_STREQ("unity::scopes::TimeoutException: some error", e.what()); |
4412 | + EXPECT_EQ("unity::scopes::TimeoutException", e.name()); |
4413 | + EXPECT_THROW(rethrow_exception(e.self()), TimeoutException); |
4414 | + TimeoutException e2("blah"); |
4415 | + e2 = e; |
4416 | + EXPECT_EQ(e.reason(), e2.reason()); |
4417 | + EXPECT_EQ(e.name(), e2.name()); |
4418 | + } |
4419 | +} |
4420 | |
4421 | === modified file 'test/gtest/scopes/internal/ThreadPool/ThreadPool_test.cpp' |
4422 | --- test/gtest/scopes/internal/ThreadPool/ThreadPool_test.cpp 2014-01-24 03:18:25 +0000 |
4423 | +++ test/gtest/scopes/internal/ThreadPool/ThreadPool_test.cpp 2014-04-28 05:57:56 +0000 |
4424 | @@ -104,3 +104,14 @@ |
4425 | EXPECT_STREQ("unity::InvalidArgumentException: ThreadPool(): invalid pool size: -1", e.what()); |
4426 | } |
4427 | } |
4428 | + |
4429 | +TEST(ThreadPool, throwing_task) |
4430 | +{ |
4431 | + ThreadPool p(1); |
4432 | + auto thrower = [](){ throw std::logic_error("some error"); }; |
4433 | + auto thrower2 = [](){ throw 99; }; |
4434 | + auto f = p.submit(thrower); |
4435 | + auto f2 = p.submit(thrower2); |
4436 | + EXPECT_THROW(f.get(), std::logic_error); |
4437 | + EXPECT_THROW(f2.get(), int); |
4438 | +} |
4439 | |
4440 | === modified file 'test/gtest/scopes/internal/ThreadSafeQueue/ThreadSafeQueue_test.cpp' |
4441 | --- test/gtest/scopes/internal/ThreadSafeQueue/ThreadSafeQueue_test.cpp 2014-02-25 12:08:41 +0000 |
4442 | +++ test/gtest/scopes/internal/ThreadSafeQueue/ThreadSafeQueue_test.cpp 2014-04-28 05:57:56 +0000 |
4443 | @@ -74,14 +74,26 @@ |
4444 | |
4445 | TEST(ThreadSafeQueue, exception) |
4446 | { |
4447 | - unique_ptr<ThreadSafeQueue<string>> q(new ThreadSafeQueue<string>); |
4448 | - q->push("fred"); |
4449 | - auto f = waiter_ready.get_future(); |
4450 | - auto t = thread(waiter_thread, q.get()); |
4451 | - f.wait(); |
4452 | - this_thread::sleep_for(chrono::milliseconds(50)); // Make sure child thread has time to call wait_and_pop() |
4453 | - q.reset(); |
4454 | - t.join(); |
4455 | + { |
4456 | + unique_ptr<ThreadSafeQueue<string>> q(new ThreadSafeQueue<string>); |
4457 | + q->push("fred"); |
4458 | + auto f = waiter_ready.get_future(); |
4459 | + auto t = thread(waiter_thread, q.get()); |
4460 | + f.wait(); |
4461 | + this_thread::sleep_for(chrono::milliseconds(50)); // Make sure child thread has time to call wait_and_pop() |
4462 | + q->destroy(); |
4463 | + t.join(); |
4464 | + |
4465 | + try |
4466 | + { |
4467 | + q->push("fred"); |
4468 | + FAIL(); |
4469 | + } |
4470 | + catch (std::runtime_error const& e) |
4471 | + { |
4472 | + EXPECT_STREQ("ThreadSafeQueue: cannot push onto destroyed queue", e.what()); |
4473 | + } |
4474 | + } |
4475 | } |
4476 | |
4477 | atomic_int count; |
4478 | @@ -112,6 +124,7 @@ |
4479 | |
4480 | // Destroy the queue while multiple threads are sleeping in wait_and_pop(). |
4481 | q.destroy(); |
4482 | + q.wait_for_destroy(); |
4483 | |
4484 | for (auto& t : threads) |
4485 | { |
4486 | @@ -120,6 +133,45 @@ |
4487 | EXPECT_EQ(20, count); |
4488 | } |
4489 | |
4490 | +void destroy_thread(ThreadSafeQueue<int>* q) |
4491 | +{ |
4492 | + this_thread::sleep_for(chrono::milliseconds(100)); |
4493 | + EXPECT_EQ(0u, q->size()); |
4494 | + q->destroy(); |
4495 | +} |
4496 | + |
4497 | +TEST(ThreadSafeQueue, destroy_while_empty) |
4498 | +{ |
4499 | + ThreadSafeQueue<int> q; |
4500 | + thread t(destroy_thread, &q); |
4501 | + q.wait_for_destroy(); |
4502 | + EXPECT_EQ(0u, q.size()); |
4503 | + t.join(); |
4504 | +} |
4505 | + |
4506 | +void wait_for_destroy_thread(ThreadSafeQueue<int>* q) |
4507 | +{ |
4508 | + q->wait_for_destroy(); |
4509 | +} |
4510 | + |
4511 | +TEST(ThreadSafeQueue, destroy_while_waiting_for_destroy) |
4512 | +{ |
4513 | + ThreadSafeQueue<int> q; |
4514 | + q.push(42); |
4515 | + thread t(wait_for_destroy_thread, &q); |
4516 | + this_thread::sleep_for(chrono::milliseconds(100)); |
4517 | + q.destroy(); |
4518 | + t.join(); |
4519 | + |
4520 | + // Call destroy() and wait_for_destroy() again, to make sure they do nothing. |
4521 | + q.destroy(); |
4522 | + q.wait_for_destroy(); |
4523 | + q.destroy(); |
4524 | + q.destroy(); |
4525 | + q.wait_for_destroy(); |
4526 | + q.wait_for_destroy(); |
4527 | +} |
4528 | + |
4529 | class MoveOnly |
4530 | { |
4531 | public: |
4532 | @@ -156,4 +208,15 @@ |
4533 | EXPECT_EQ("world", m.val()); |
4534 | m = q.wait_and_pop(); |
4535 | EXPECT_EQ("again", m.val()); |
4536 | + |
4537 | + q.destroy(); |
4538 | + try |
4539 | + { |
4540 | + q.push(move(MoveOnly("no_push"))); |
4541 | + FAIL(); |
4542 | + } |
4543 | + catch (std::runtime_error const& e) |
4544 | + { |
4545 | + EXPECT_STREQ("ThreadSafeQueue: cannot push onto destroyed queue", e.what()); |
4546 | + } |
4547 | } |
4548 | |
4549 | === modified file 'test/gtest/scopes/internal/smartscopes/smartscopesproxy/smartscopesproxy_test.cpp' |
4550 | --- test/gtest/scopes/internal/smartscopes/smartscopesproxy/smartscopesproxy_test.cpp 2014-04-01 15:26:50 +0000 |
4551 | +++ test/gtest/scopes/internal/smartscopes/smartscopesproxy/smartscopesproxy_test.cpp 2014-04-28 05:57:56 +0000 |
4552 | @@ -77,11 +77,8 @@ |
4553 | |
4554 | ~smartscopesproxytest() |
4555 | { |
4556 | - scope_mw_->stop(); |
4557 | - scope_mw_->wait_for_shutdown(); |
4558 | - |
4559 | - reg_mw_->stop(); |
4560 | - reg_mw_->wait_for_shutdown(); |
4561 | + scope_rt_->destroy(); |
4562 | + reg_rt_->destroy(); |
4563 | } |
4564 | |
4565 | protected: |
4566 | |
4567 | === modified file 'test/gtest/scopes/internal/zmq_middleware/CMakeLists.txt' |
4568 | --- test/gtest/scopes/internal/zmq_middleware/CMakeLists.txt 2014-01-28 06:34:03 +0000 |
4569 | +++ test/gtest/scopes/internal/zmq_middleware/CMakeLists.txt 2014-04-28 05:57:56 +0000 |
4570 | @@ -1,6 +1,7 @@ |
4571 | add_subdirectory(ObjectAdapter) |
4572 | add_subdirectory(RegistryI) |
4573 | add_subdirectory(ServantBase) |
4574 | +add_subdirectory(StopPublisher) |
4575 | add_subdirectory(Util) |
4576 | add_subdirectory(VariantConverter) |
4577 | add_subdirectory(ZmqMiddleware) |
4578 | |
4579 | === modified file 'test/gtest/scopes/internal/zmq_middleware/ObjectAdapter/ObjectAdapter_test.cpp' |
4580 | --- test/gtest/scopes/internal/zmq_middleware/ObjectAdapter/ObjectAdapter_test.cpp 2014-01-24 11:16:14 +0000 |
4581 | +++ test/gtest/scopes/internal/zmq_middleware/ObjectAdapter/ObjectAdapter_test.cpp 2014-04-28 05:57:56 +0000 |
4582 | @@ -44,7 +44,7 @@ |
4583 | // an occastional "address in use" exception because the new socket |
4584 | // tries to bind while the old one is still in the process of destroying itself. |
4585 | |
4586 | -void wait(int millisec = 20) |
4587 | +void wait(int millisec = 200) |
4588 | { |
4589 | this_thread::sleep_for(chrono::milliseconds(millisec)); |
4590 | } |
4591 | @@ -176,7 +176,7 @@ |
4592 | } |
4593 | catch (MiddlewareException const& e) |
4594 | { |
4595 | - EXPECT_STREQ("unity::scopes::MiddlewareException: Object adapter in Destroyed " |
4596 | + EXPECT_STREQ("unity::scopes::MiddlewareException: activate(): Object adapter in Destroyed " |
4597 | "state (adapter: testscope)", |
4598 | e.what()); |
4599 | } |
4600 | @@ -194,25 +194,22 @@ |
4601 | } |
4602 | catch (MiddlewareException const& e) |
4603 | { |
4604 | - EXPECT_STREQ("unity::scopes::MiddlewareException: ObjectAdapter::run_workers(): broker thread " |
4605 | + EXPECT_STREQ("unity::scopes::MiddlewareException: ObjectAdapter::run_workers(): stop thread " |
4606 | "failure (adapter: testscope):\n" |
4607 | - " unity::scopes::MiddlewareException: ObjectAdapter: broker thread failure " |
4608 | - "(adapter: testscope):\n" |
4609 | + " unity::scopes::MiddlewareException: StopPublisher(): publisher thread " |
4610 | + "failed (endpoint: inproc://testscope-stopper):\n" |
4611 | " Address already in use", |
4612 | e.what()); |
4613 | } |
4614 | + |
4615 | try |
4616 | { |
4617 | b.shutdown(); |
4618 | } |
4619 | catch (MiddlewareException const& e) |
4620 | { |
4621 | - EXPECT_STREQ("unity::scopes::MiddlewareException: Object adapter in Failed state (adapter: testscope)\n" |
4622 | - " Exception history:\n" |
4623 | - " Exception #1:\n" |
4624 | - " unity::scopes::MiddlewareException: ObjectAdapter: broker thread failure " |
4625 | - "(adapter: testscope):\n" |
4626 | - " Address already in use", |
4627 | + EXPECT_STREQ("unity::scopes::MiddlewareException: shutdown() [state_ == Failed]: " |
4628 | + "Object adapter in Failed state (adapter: testscope)", |
4629 | e.what()); |
4630 | } |
4631 | try |
4632 | @@ -221,13 +218,9 @@ |
4633 | } |
4634 | catch (MiddlewareException const& e) |
4635 | { |
4636 | - EXPECT_STREQ("unity::scopes::MiddlewareException: Object adapter in Failed state (adapter: testscope)\n" |
4637 | - " Exception history:\n" |
4638 | - " Exception #1:\n" |
4639 | - " unity::scopes::MiddlewareException: ObjectAdapter: broker thread failure " |
4640 | - "(adapter: testscope):\n" |
4641 | - " Address already in use", |
4642 | - e.what()); |
4643 | + EXPECT_STREQ("unity::scopes::MiddlewareException: wait_for_shutdown(): " |
4644 | + "Object adapter in Failed state (adapter: testscope)", |
4645 | + e.what()); |
4646 | } |
4647 | } |
4648 | } |
4649 | @@ -907,7 +900,8 @@ |
4650 | } |
4651 | catch (MiddlewareException const& e) |
4652 | { |
4653 | - EXPECT_STREQ("unity::scopes::MiddlewareException: Object adapter in Destroyed state (adapter: testscope)", |
4654 | + EXPECT_STREQ("unity::scopes::MiddlewareException: remove(): " |
4655 | + "Object adapter in Destroyed state (adapter: testscope)", |
4656 | e.what()); |
4657 | } |
4658 | }; |
4659 | @@ -1005,7 +999,7 @@ |
4660 | } |
4661 | catch (MiddlewareException const& e) |
4662 | { |
4663 | - EXPECT_STREQ("unity::scopes::MiddlewareException: Object adapter in Destroyed " |
4664 | + EXPECT_STREQ("unity::scopes::MiddlewareException: add(): Object adapter in Destroyed " |
4665 | "state (adapter: testscope)", |
4666 | e.what()); |
4667 | } |
4668 | @@ -1016,7 +1010,7 @@ |
4669 | } |
4670 | catch (MiddlewareException const& e) |
4671 | { |
4672 | - EXPECT_STREQ("unity::scopes::MiddlewareException: Object adapter in Destroyed " |
4673 | + EXPECT_STREQ("unity::scopes::MiddlewareException: find(): Object adapter in Destroyed " |
4674 | "state (adapter: testscope)", |
4675 | e.what()); |
4676 | } |
4677 | @@ -1212,7 +1206,8 @@ |
4678 | } |
4679 | catch (MiddlewareException const& e) |
4680 | { |
4681 | - EXPECT_STREQ("unity::scopes::MiddlewareException: Object adapter in Failed state (adapter: testscope2)", |
4682 | + EXPECT_STREQ("unity::scopes::MiddlewareException: add_dflt_servant(): " |
4683 | + "Object adapter in Failed state (adapter: testscope2)", |
4684 | e.what()); |
4685 | } |
4686 | |
4687 | @@ -1224,7 +1219,8 @@ |
4688 | } |
4689 | catch (MiddlewareException const& e) |
4690 | { |
4691 | - EXPECT_STREQ("unity::scopes::MiddlewareException: Object adapter in Failed state (adapter: testscope2)", |
4692 | + EXPECT_STREQ("unity::scopes::MiddlewareException: remove_dflt_servant(): " |
4693 | + "Object adapter in Failed state (adapter: testscope2)", |
4694 | e.what()); |
4695 | } |
4696 | |
4697 | @@ -1236,7 +1232,8 @@ |
4698 | } |
4699 | catch (MiddlewareException const& e) |
4700 | { |
4701 | - EXPECT_STREQ("unity::scopes::MiddlewareException: Object adapter in Failed state (adapter: testscope2)", |
4702 | + EXPECT_STREQ("unity::scopes::MiddlewareException: find_dflt_servant(): " |
4703 | + "Object adapter in Failed state (adapter: testscope2)", |
4704 | e.what()); |
4705 | } |
4706 | } |
4707 | |
4708 | === modified file 'test/gtest/scopes/internal/zmq_middleware/RegistryI/RegistryI_test.cpp' |
4709 | --- test/gtest/scopes/internal/zmq_middleware/RegistryI/RegistryI_test.cpp 2014-04-14 09:31:25 +0000 |
4710 | +++ test/gtest/scopes/internal/zmq_middleware/RegistryI/RegistryI_test.cpp 2014-04-28 05:57:56 +0000 |
4711 | @@ -404,7 +404,7 @@ |
4712 | ///! TODO: HACK: |
4713 | /// we have to start scope-C and scope-D before starting scope-B here, as B aggregates C and D. |
4714 | /// (When re-binding logic is introduced, this will be unnecessary) |
4715 | - scope_ids = {"scope-A", "scope-C", "scope-D", "scope-B", "scope-N", "scope-S"}; |
4716 | + scope_ids = { {"scope-A", "scope-C", "scope-D", "scope-B", "scope-N", "scope-S"} }; |
4717 | for (auto& scope_id : scope_ids) |
4718 | { |
4719 | proxies[scope_id] = ScopeImpl::create(mw->create_scope_proxy(scope_id), mw->runtime(), scope_id); |
4720 | @@ -509,7 +509,7 @@ |
4721 | // test locating all scopes |
4722 | TEST_F(RegistryTest, locate_all) |
4723 | { |
4724 | - // locate all scopes (hense starting all scope processes) |
4725 | + // locate all scopes (hence starting all scope processes) |
4726 | for (auto const& scope_id : scope_ids) |
4727 | { |
4728 | EXPECT_EQ(proxies[scope_id], reg->locate(scope_id)); |
4729 | |
4730 | === added directory 'test/gtest/scopes/internal/zmq_middleware/StopPublisher' |
4731 | === added file 'test/gtest/scopes/internal/zmq_middleware/StopPublisher/CMakeLists.txt' |
4732 | --- test/gtest/scopes/internal/zmq_middleware/StopPublisher/CMakeLists.txt 1970-01-01 00:00:00 +0000 |
4733 | +++ test/gtest/scopes/internal/zmq_middleware/StopPublisher/CMakeLists.txt 2014-04-28 05:57:56 +0000 |
4734 | @@ -0,0 +1,4 @@ |
4735 | +add_executable(StopPublisher_test StopPublisher_test.cpp) |
4736 | +target_link_libraries(StopPublisher_test ${LIBS} ${TESTLIBS}) |
4737 | + |
4738 | +add_test(StopPublisher StopPublisher_test) |
4739 | |
4740 | === added file 'test/gtest/scopes/internal/zmq_middleware/StopPublisher/StopPublisher_test.cpp' |
4741 | --- test/gtest/scopes/internal/zmq_middleware/StopPublisher/StopPublisher_test.cpp 1970-01-01 00:00:00 +0000 |
4742 | +++ test/gtest/scopes/internal/zmq_middleware/StopPublisher/StopPublisher_test.cpp 2014-04-28 05:57:56 +0000 |
4743 | @@ -0,0 +1,136 @@ |
4744 | +/* |
4745 | + * Copyright (C) 2014 Canonical Ltd |
4746 | + * |
4747 | + * This program is free software: you can redistribute it and/or modify |
4748 | + * it under the terms of the GNU Lesser General Public License version 3 as |
4749 | + * published by the Free Software Foundation. |
4750 | + * |
4751 | + * This program is distributed in the hope that it will be useful, |
4752 | + * but WITHOUT ANY WARRANTY; without even the implied warranty of |
4753 | + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
4754 | + * GNU Lesser General Public License for more details. |
4755 | + * |
4756 | + * You should have received a copy of the GNU Lesser General Public License |
4757 | + * along with this program. If not, see <http://www.gnu.org/lzmqnses/>. |
4758 | + * |
4759 | + * Authored by: Michi Henning <michi.henning@canonical.com> |
4760 | + */ |
4761 | + |
4762 | +#include <unity/scopes/internal/zmq_middleware/StopPublisher.h> |
4763 | +#include <unity/scopes/ScopeExceptions.h> |
4764 | +#include <zmqpp/poller.hpp> |
4765 | + |
4766 | +#include <gtest/gtest.h> |
4767 | + |
4768 | +using namespace std; |
4769 | +using namespace unity::scopes; |
4770 | +using namespace unity::scopes::internal::zmq_middleware; |
4771 | + |
4772 | +TEST(StopPublisher, basic) |
4773 | +{ |
4774 | + { |
4775 | + zmqpp::context c; |
4776 | + StopPublisher p(&c, "x"); |
4777 | + EXPECT_EQ("inproc://x", p.endpoint()); |
4778 | + } |
4779 | + |
4780 | + { |
4781 | + zmqpp::context c; |
4782 | + StopPublisher p(&c, "x"); |
4783 | + p.stop(); // no-op if stop() is called several times. |
4784 | + p.stop(); |
4785 | + EXPECT_EQ("inproc://x", p.endpoint()); |
4786 | + } |
4787 | +} |
4788 | + |
4789 | +TEST(StopPublisher, exceptions) |
4790 | +{ |
4791 | + { |
4792 | + zmqpp::context c; |
4793 | + StopPublisher p(&c, "x"); |
4794 | + p.stop(); |
4795 | + try |
4796 | + { |
4797 | + p.subscribe(); |
4798 | + FAIL(); |
4799 | + } |
4800 | + catch (MiddlewareException const& e) |
4801 | + { |
4802 | + EXPECT_STREQ("unity::scopes::MiddlewareException: StopPublisher::subscribe(): " |
4803 | + "cannot subscribe to stopped publisher (endpoint: inproc://x)", |
4804 | + e.what()); |
4805 | + } |
4806 | + } |
4807 | + |
4808 | + { |
4809 | + zmqpp::context c; |
4810 | + StopPublisher p(&c, "x"); |
4811 | + try |
4812 | + { |
4813 | + StopPublisher q(&c, "x"); // Second publisher at the same endpoint must fail |
4814 | + FAIL(); |
4815 | + } |
4816 | + catch (MiddlewareException const& e) |
4817 | + { |
4818 | + EXPECT_STREQ("unity::scopes::MiddlewareException: StopPublisher(): publisher thread " |
4819 | + "failed (endpoint: inproc://x):\n" |
4820 | + " Address already in use", |
4821 | + e.what()); |
4822 | + } |
4823 | + } |
4824 | +} |
4825 | + |
4826 | +TEST(StopPublisher, subscribe) |
4827 | +{ |
4828 | + { |
4829 | + zmqpp::context c; |
4830 | + StopPublisher p(&c, "x"); |
4831 | + auto socket = p.subscribe(); |
4832 | + zmqpp::poller poller; |
4833 | + poller.add(socket); |
4834 | + poller.poll(100); |
4835 | + EXPECT_FALSE(poller.has_input(socket)); |
4836 | + p.stop(); |
4837 | + poller.poll(100); |
4838 | + EXPECT_TRUE(poller.has_input(socket)); |
4839 | + } |
4840 | +} |
4841 | + |
4842 | +void subscriber_thread(StopPublisher* p) |
4843 | +{ |
4844 | + auto socket = p->subscribe(); |
4845 | + zmqpp::poller poller; |
4846 | + poller.add(socket); |
4847 | + poller.poll(); // Blocks indefinitely until socket is ready |
4848 | +} |
4849 | + |
4850 | +void publisher_thread(unique_ptr<StopPublisher>) |
4851 | +{ |
4852 | + // Sleep for a while, so all the subscribers get a chance to block in poll(). |
4853 | + this_thread::sleep_for(chrono::milliseconds(500)); |
4854 | + // Destructor of unique_ptr in-param calls stop() |
4855 | +} |
4856 | + |
4857 | +// Test that subscriber threads terminate correctly even if the destructor of the |
4858 | +// publisher is called from a different thread. |
4859 | + |
4860 | +TEST(StopPublisher, threading) |
4861 | +{ |
4862 | + zmqpp::context c; |
4863 | + unique_ptr<StopPublisher> p(new StopPublisher(&c, "x")); |
4864 | + const int num_subscribers = 5; |
4865 | + vector<thread> subscribers; |
4866 | + for (int i = 0; i < num_subscribers; ++i) |
4867 | + { |
4868 | + subscribers.emplace_back(thread(subscriber_thread, p.get())); |
4869 | + } |
4870 | + |
4871 | + thread publisher(publisher_thread, move(p)); // Moves publisher to publisher thread |
4872 | + |
4873 | + for (auto& s : subscribers) |
4874 | + { |
4875 | + s.join(); |
4876 | + } |
4877 | + |
4878 | + publisher.join(); |
4879 | +} |
FAILED: Continuous integration, rev:283 jenkins. qa.ubuntu. com/job/ unity-scopes- api-devel- ci/321/ jenkins. qa.ubuntu. com/job/ unity-scopes- api-devel- trusty- amd64-ci/ 322/console jenkins. qa.ubuntu. com/job/ unity-scopes- api-devel- trusty- armhf-ci/ 321/console jenkins. qa.ubuntu. com/job/ unity-scopes- api-devel- trusty- i386-ci/ 321/console
http://
Executed test runs:
FAILURE: http://
FAILURE: http://
FAILURE: http://
Click here to trigger a rebuild: s-jenkins. ubuntu- ci:8080/ job/unity- scopes- api-devel- ci/321/ rebuild
http://