Merge lp:~michihenning/unity-scopes-api/async-create into lp:unity-scopes-api/devel

Proposed by Michi Henning
Status: Merged
Approved by: Michal Hruby
Approved revision: 306
Merged at revision: 309
Proposed branch: lp:~michihenning/unity-scopes-api/async-create
Merge into: lp:unity-scopes-api/devel
Diff against target: 4879 lines (+2089/-517)
80 files modified
RELEASE_NOTES.txt (+8/-1)
debian/libunity-scopes1.symbols (+11/-7)
demo/client.cpp (+24/-14)
demo/scopes/scope-A/scope-A.cpp (+45/-14)
demo/scopes/scope-B/scope-B.cpp (+14/-9)
demo/scopes/scope-C/scope-C.cpp (+16/-10)
demo/scopes/scope-D/scope-D.cpp (+9/-5)
demo/scopes/scope-N/scope-N.cpp (+21/-6)
demo/scopes/scope-S/scope-S.cpp (+28/-21)
include/unity/scopes/PreviewQueryBase.h (+1/-1)
include/unity/scopes/QueryBase.h (+19/-1)
include/unity/scopes/internal/ObjectImpl.h (+13/-4)
include/unity/scopes/internal/QueryBaseImpl.h (+3/-0)
include/unity/scopes/internal/QueryCtrlImpl.h (+9/-6)
include/unity/scopes/internal/QueryObject.h (+3/-4)
include/unity/scopes/internal/RegistryImpl.h (+1/-1)
include/unity/scopes/internal/ReplyImpl.h (+1/-1)
include/unity/scopes/internal/ReplyObject.h (+1/-1)
include/unity/scopes/internal/RuntimeImpl.h (+9/-4)
include/unity/scopes/internal/ScopeImpl.h (+2/-2)
include/unity/scopes/internal/SearchReplyImpl.h (+1/-1)
include/unity/scopes/internal/TaskWrapper.h (+1/-2)
include/unity/scopes/internal/ThreadPool.h (+2/-0)
include/unity/scopes/internal/ThreadSafeQueue.h (+51/-21)
include/unity/scopes/internal/zmq_middleware/ConnectionPool.h (+7/-5)
include/unity/scopes/internal/zmq_middleware/ObjectAdapter.h (+8/-6)
include/unity/scopes/internal/zmq_middleware/StopPublisher.h (+85/-0)
include/unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h (+6/-2)
src/scopes/QueryBase.cpp (+5/-0)
src/scopes/internal/ObjectImpl.cpp (+31/-3)
src/scopes/internal/QueryBaseImpl.cpp (+24/-2)
src/scopes/internal/QueryCtrlImpl.cpp (+39/-4)
src/scopes/internal/QueryObject.cpp (+32/-12)
src/scopes/internal/RegistryImpl.cpp (+1/-1)
src/scopes/internal/ReplyImpl.cpp (+1/-1)
src/scopes/internal/ReplyObject.cpp (+2/-3)
src/scopes/internal/RuntimeImpl.cpp (+89/-15)
src/scopes/internal/ScopeImpl.cpp (+125/-88)
src/scopes/internal/ScopeObject.cpp (+0/-1)
src/scopes/internal/SearchReplyImpl.cpp (+4/-2)
src/scopes/internal/ThreadPool.cpp (+18/-11)
src/scopes/internal/smartscopes/SSQueryObject.cpp (+1/-1)
src/scopes/internal/zmq_middleware/CMakeLists.txt (+1/-0)
src/scopes/internal/zmq_middleware/ConnectionPool.cpp (+19/-11)
src/scopes/internal/zmq_middleware/ObjectAdapter.cpp (+55/-78)
src/scopes/internal/zmq_middleware/StopPublisher.cpp (+225/-0)
src/scopes/internal/zmq_middleware/ZmqMiddleware.cpp (+10/-10)
src/scopes/internal/zmq_middleware/ZmqObject.cpp (+67/-44)
src/scopes/internal/zmq_middleware/ZmqQuery.cpp (+1/-1)
src/scopes/internal/zmq_middleware/ZmqQueryCtrl.cpp (+2/-2)
src/scopes/internal/zmq_middleware/ZmqRegistry.cpp (+3/-3)
src/scopes/internal/zmq_middleware/ZmqReply.cpp (+2/-2)
src/scopes/internal/zmq_middleware/ZmqScope.cpp (+6/-4)
src/scopes/internal/zmq_middleware/ZmqStateReceiver.cpp (+1/-1)
test/gtest/scopes/CMakeLists.txt (+9/-8)
test/gtest/scopes/Invocation/CMakeLists.txt (+8/-0)
test/gtest/scopes/Invocation/Invocation_test.cpp (+145/-0)
test/gtest/scopes/Invocation/Registry.ini.in (+8/-0)
test/gtest/scopes/Invocation/Runtime.ini.in (+5/-0)
test/gtest/scopes/Invocation/TestScope.cpp (+94/-0)
test/gtest/scopes/Invocation/TestScope.h (+40/-0)
test/gtest/scopes/Invocation/Zmq.ini.in (+3/-0)
test/gtest/scopes/Registry/Registry_test.cpp (+39/-13)
test/gtest/scopes/Runtime/CMakeLists.txt (+1/-1)
test/gtest/scopes/Runtime/PusherScope.cpp (+8/-1)
test/gtest/scopes/Runtime/PusherScope.h (+5/-0)
test/gtest/scopes/Runtime/Runtime_test.cpp (+132/-6)
test/gtest/scopes/Runtime/SlowCreateScope.cpp (+101/-0)
test/gtest/scopes/Runtime/SlowCreateScope.h (+40/-0)
test/gtest/scopes/Runtime/TestScope.cpp (+6/-0)
test/gtest/scopes/Runtime/TestScope.h (+5/-0)
test/gtest/scopes/ScopeExceptions/ScopeExceptions_test.cpp (+29/-0)
test/gtest/scopes/internal/ThreadPool/ThreadPool_test.cpp (+11/-0)
test/gtest/scopes/internal/ThreadSafeQueue/ThreadSafeQueue_test.cpp (+71/-8)
test/gtest/scopes/internal/smartscopes/smartscopesproxy/smartscopesproxy_test.cpp (+2/-5)
test/gtest/scopes/internal/zmq_middleware/CMakeLists.txt (+1/-0)
test/gtest/scopes/internal/zmq_middleware/ObjectAdapter/ObjectAdapter_test.cpp (+21/-24)
test/gtest/scopes/internal/zmq_middleware/RegistryI/RegistryI_test.cpp (+2/-2)
test/gtest/scopes/internal/zmq_middleware/StopPublisher/CMakeLists.txt (+4/-0)
test/gtest/scopes/internal/zmq_middleware/StopPublisher/StopPublisher_test.cpp (+136/-0)
To merge this branch: bzr merge lp:~michihenning/unity-scopes-api/async-create
Reviewer Review Type Date Requested Status
Michal Hruby (community) Approve
Paweł Stołowski (community) Approve
PS Jenkins bot (community) continuous-integration Approve
Marcus Tomlinson (community) Approve
Review via email: mp+213489@code.launchpad.net

Commit message

The scope search, activate, perform_action, and preview methods now return immediately instead of waiting for a response from the server. The returned QueryCtrl is a "fake" one: if cancel() is called on the returned QueryCtrl, the cancellation is remembered and sent once the corresponding create method in the scope has completed.

This change should be transparent to application code (other than that these methods now can no longer block).

Also added interlocks to QueryObject, so cancel() and run() see consistent data.

Fixed a number of race conditions during shutdown.

Description of the change

The scope search, activate, perform_action, and preview methods now return immediately instead of waiting for a response from the server. The returned QueryCtrl is a "fake" one: if cancel() is called on the returned QueryCtrl, the cancellation is remembered and sent once the corresponding create method in the scope has completed.

This change should be transparent to application code (other than that these methods now can no longer block).

Also added interlocks to QueryObject, so cancel() and run() see consistent data.

Fixed a number of race conditions during shutdown.

To post a comment you must log in.
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
Revision history for this message
Paweł Stołowski (stolowski) wrote :

Impressive stuff! I've to admit I don't fully grasp all the changes yet, need to give it another look tomorrow. Just a few minor nitpicks for now:

8 +Changes in version 0.4.1

devel is currently at 0.4.2, you may want to make it 0.4.3.

437 +class Socket final

I think it would be worth marking it as non-copyable; I know it's internal class, but still, makes it harder for us to do silly mistakes.

1939 + * Copyright (C) 2013 Canonical Ltd
2038 + * Copyright (C) 2013 Canonical Ltd

Please also fix it in the header template you use ;)

review: Needs Fixing
Revision history for this message
Michi Henning (michihenning) wrote :

> Impressive stuff! I've to admit I don't fully grasp all the changes yet, need
> to give it another look tomorrow.

Hold off with more review just yet please, this isn't ready for prime time as of now.

> Just a few minor nitpicks for now:
>
> 8 +Changes in version 0.4.1
>
> devel is currently at 0.4.2, you may want to make it 0.4.3.

Will do, thanks!

> 437 +class Socket final
>
> I think it would be worth marking it as non-copyable; I know it's internal
> class, but still, makes it harder for us to do silly mistakes.

It's non-copyable already because it defines a move constructor and move assignment operator, which suppresses the default-generated copy constructor and assignment operator.

> 1939 + * Copyright (C) 2013 Canonical Ltd
> 2038 + * Copyright (C) 2013 Canonical Ltd
>
> Please also fix it in the header template you use ;)

Thanks for that! :-) I don't use a header template, as rule. Instead, I just copy some similar header and hack it up. Need to get out of the habit of doing that…

Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
Revision history for this message
Michi Henning (michihenning) wrote :

I think this now correctly does the async dispatch for search(), preview(), etc. These methods do not fail now. All errors are instead delivered to the finished() callback. The returned QueryCtrl proxy is a "fake" proxy: if cancel() is called on that proxy before the search method in the target scope has returned the real QueryCtrl, the cancel is remembered by the run time and delivered as soon as the real QueryCtrl arrives.

Michal, could you give this a spin please?

This doesn't not yet fix Marcus's problem where a finished() call and overtake a push(). Will work on that next.

Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
Revision history for this message
Marcus Tomlinson (marcustomlinson) wrote :

Hi Michi, I noticed an issue with this after merging with my rebinding branch:

2709 + // If a request times out, we must close the corresponding socket, otherwise
2710 + // zmq gets confused: the reply will never be read, so the socket ends up
2711 + // in a bad state.
2712 + s.close();
2713 + throw TimeoutException("Request timed out after " + std::to_string(timeout) + " milliseconds");

I don't think we can just close the socket and leave it in the connection pool. We need to call "pool.remove(endpoint_)" otherwise we attempt to talk to a closed socket the next time around -which is what I was seeing during rebind: "error: Socket operation on non-socket".

Simply replacing "s.close()" with "pool.remove(endpoint_)" should be sufficient, as removing a socket from the connection pool deletes it, hence closing the socket.

review: Needs Fixing
Revision history for this message
Marcus Tomlinson (marcustomlinson) wrote :

117 + if (!valid())

rather than expecting every scope author to have to do this, could we not do this check internally from QueryObject::run() and SSQueryObject::run_query() before we call run()?

review: Needs Fixing
Revision history for this message
Michi Henning (michihenning) wrote :

> 117 + if (!valid())
>
> rather than expecting every scope author to have to do this, could we not do
> this check internally from QueryObject::run() and SSQueryObject::run_query()
> before we call run()?

Unfortunately, no. run() and cancel() are called by different threads. That's so I can guarantee that there is always a thread available to deliver a cancel message, even if run() is implemented synchronously and all run threads are currently in use. This also means that the cancel() for a run() invocation can be dispatched *before* run() is called. In the dispatch code for run, I have to do something like this:

unique_lock<mutex> lock(mutex_);
if (not query cancelled already) // Call run() only if the query wasn't cancelled
{
    lock.unlock();
    application_code->run();
}

The problem is that, between the unlock and the call to run(), the scheduler can suspend my thread, run the thread that processes an incoming cancel, and then run my thread again, meaning that, by the time run() gets into the application code, the query was cancelled already.

I can't call run() with the lock held because that would require the application code to unlock, and we can't rely on it doing that. (This would also create all sorts of deadlock scenarios.) Note that, even if I use an atomic_bool or some such, so the sequence is lock-free, the race remains: in between the test and the call to run(), the query can be cancelled.

The upshot of it all for the scope authors is:

- By the time the run() is called, the query may have been cancelled already. If so, valid() will return false. If it does, don't bother running the query; just return.

- The cancel may arrive at any time: before run() is called or while run() is still in progress, and the cancel() will always be delivered by a thread other than the one that called run(). If cancel() relies on state that is established by run(), the scope developer must use a full fence or mutex to make sure that cancel() operates on current state.

- The scope developer must write cancel() such that it finishes promptly.

- Always check the return value from push() and, if push() returns false, stop pushing and clean up. It is benign to ignore the return value from push(), but all the work done once push() returns false is wasted because the run time will simply throw the results that are pushed on the floor.

295. By Michi Henning

Fixed bug that would leave a bad socket in the connection pool after a
timeout exception. Added test for that, and improved test coverage
for ScopeExceptions.cpp.

Revision history for this message
Michi Henning (michihenning) wrote :

> I don't think we can just close the socket and leave it in the connection
> pool.

Awesome catch, thanks for that!

I fixed it and added a test to make sure it works now.

Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
Revision history for this message
Marcus Tomlinson (marcustomlinson) wrote :

> > 117 + if (!valid())
> >
> > rather than expecting every scope author to have to do this, could we not do
> > this check internally from QueryObject::run() and SSQueryObject::run_query()
> > before we call run()?
>
> Unfortunately, no. run() and cancel() are called by different threads. That's
> so I can guarantee that there is always a thread available to deliver a cancel
> message, even if run() is implemented synchronously and all run threads are
> currently in use. This also means that the cancel() for a run() invocation can
> be dispatched *before* run() is called. In the dispatch code for run, I have
> to do something like this:
>
> unique_lock<mutex> lock(mutex_);
> if (not query cancelled already) // Call run() only if the query wasn't
> cancelled
> {
> lock.unlock();
> application_code->run();
> }
>
> The problem is that, between the unlock and the call to run(), the scheduler
> can suspend my thread, run the thread that processes an incoming cancel, and
> then run my thread again, meaning that, by the time run() gets into the
> application code, the query was cancelled already.

Ok I see what you're saying, but consider these 2 code snippets:
Is there really any difference between them?

QueryObject::run()
{
  ...
  if (scope.valid())
  {
    scope.run()
  }
  ...
}

Scope::run()
{
  if (!valid())
  {
    return;
  }
  ...
}

I'm suggesting the first option is a lot cleaner and give the scope author one less thing to remember. Sorry if I'm still missing the point.

Revision history for this message
Michi Henning (michihenning) wrote :

> QueryObject::run()
> {
> ...
> if (scope.valid())
> {
> scope.run()
> }
> ...
> }

This is what's effectively happening already. The problem is that between valid() returning true, and scope.run() being executed, the cancel can arrive. So, the point of valid() on the public API is to give a scope author a way to check whether the query was cancelled already before doing a lot of work setting up and running a query that, in the end, will return false from the first call to push().

Maybe it's me who's missing something? :-)

296. By Michi Henning

Improved documentation.

Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
Revision history for this message
Marcus Tomlinson (marcustomlinson) wrote :

Well, I've read through this code a few times now with every effort to understand it all. As far as I can see, this looks really good!

Just a few little things I picked up:

366 + void set_proxy(MWProxy const& p); // Allows a derived proxy to replace mw_proxy_ for aynchronous twoway calls.

"aynchronous" should be "asynchronous"

2126 + stopper_.reset(new StopPublisher(mw_.context(), name_ + "-stoppper"));
4045 + "failed (endpoint: inproc://testscope-stoppper):\n"

"stoppper"? Was this intended?

3587 + // cancel is correcly forwarded to the scope once the real reply proxy arrives

"correcly" should be "correctly"

review: Needs Fixing
297. By Michi Henning

Fixed typos.

Revision history for this message
Marcus Tomlinson (marcustomlinson) wrote :

4045 + "failed (endpoint: inproc://testscope-stoppper):\n"

forgot this one ;)

review: Needs Fixing
298. By Michi Henning

Fixed another typo.

Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
Revision history for this message
Marcus Tomlinson (marcustomlinson) wrote :

I'm happy with this.

review: Approve
299. By Michi Henning

Increased wait time between tests due to "address in use" failures when running on Jenkins.

Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
Revision history for this message
Michal Hruby (mhr3) wrote :

179 + if (--num_children_ == 0)
180 + {
181 + upstream_->finished(); // Send finished once last child finishes
182 + }

Why is this needed suddenly? Wasn't the point of create_subquery to take care of that?

review: Needs Information
Revision history for this message
Michal Hruby (mhr3) wrote :
Download full text (19.3 KiB)

Also, still managed to get it to hang on shutdown by running the client binary (from demo/ dir), here's the state of the threads:

  Id Target Id Frame
  11 Thread 0xb4c21410 (LWP 8173) "client" 0xb6b655f4 in __libc_do_syscall
    () from /lib/arm-linux-gnueabihf/libpthread.so.0
  10 Thread 0xb42ff410 (LWP 8174) "client" 0xb6b655f4 in __libc_do_syscall
    () from /lib/arm-linux-gnueabihf/libpthread.so.0
  9 Thread 0xb3aff410 (LWP 8175) "client" 0xb6b655f4 in __libc_do_syscall
    () from /lib/arm-linux-gnueabihf/libpthread.so.0
  8 Thread 0xb30ff410 (LWP 8176) "client" 0xb6b655f4 in __libc_do_syscall
    () from /lib/arm-linux-gnueabihf/libpthread.so.0
  7 Thread 0xb28ff410 (LWP 8177) "client" 0xb6b655f4 in __libc_do_syscall
    () from /lib/arm-linux-gnueabihf/libpthread.so.0
  6 Thread 0xb20ff410 (LWP 8178) "client" 0xb6d4f032 in epoll_wait ()
   from /lib/arm-linux-gnueabihf/libc.so.6
  5 Thread 0xb18ff410 (LWP 8179) "client" 0xb6d4f032 in epoll_wait ()
   from /lib/arm-linux-gnueabihf/libc.so.6
  4 Thread 0xb08ff410 (LWP 8181) "client" 0xb6b655f4 in __libc_do_syscall
    () from /lib/arm-linux-gnueabihf/libpthread.so.0
  3 Thread 0xb00ff410 (LWP 8182) "client" 0xb6d45d22 in poll ()
   from /lib/arm-linux-gnueabihf/libc.so.6
  2 Thread 0xaf8ff410 (LWP 8183) "client" 0xb6d45d22 in poll ()
   from /lib/arm-linux-gnueabihf/libc.so.6
* 1 Thread 0xb4c59000 (LWP 8172) "client" 0xb6b655f4 in __libc_do_syscall
    () from /lib/arm-linux-gnueabihf/libpthread.so.0

Thread 11 (Thread 0xb4c21410 (LWP 8173)):
#0 0xb6b655f4 in __libc_do_syscall ()
   from /lib/arm-linux-gnueabihf/libpthread.so.0
#1 0xb6b611d8 in pthread_cond_wait@@GLIBC_2.4 ()
   from /lib/arm-linux-gnueabihf/libpthread.so.0
#2 0xb6e425f8 in std::condition_variable::wait(std::unique_lock<std::mutex>&)
    () from /usr/lib/arm-linux-gnueabihf/libstdc++.so.6
#3 0xb6f31e92 in wait<unity::scopes::internal::ThreadSafeQueue<T>::wait_and_pop() [with T = unity::scopes::internal::TaskWrapper]::__lambda1> (__p=...,
    __lock=..., this=0x3b7af0) at /usr/include/c++/4.8/condition_variable:93
#4 unity::scopes::internal::ThreadSafeQueue<unity::scopes::internal::TaskWrapper>::wait_and_pop (this=0x3b7ab0)
    at /home/phablet/build-area/unity-scopes-api-0.4.2+14.04.20140404.2/include/unity/scopes/internal/ThreadSafeQueue.h:135
#5 0xb6f30c42 in unity::scopes::internal::ThreadPool::run (this=0x3b7a00)
    at /home/phablet/build-area/unity-scopes-api-0.4.2+14.04.20140404.2/src/scopes/internal/ThreadPool.cpp:102
#6 0xb6e44a2c in ?? () from /usr/lib/arm-linux-gnueabihf/libstdc++.so.6
Backtrace stopped: previous frame identical to this frame (corrupt stack?)

Thread 10 (Thread 0xb42ff410 (LWP 8174)):
#0 0xb6b655f4 in __libc_do_syscall ()
   from /lib/arm-linux-gnueabihf/libpthread.so.0
#1 0xb6b63268 in __lll_lock_wait ()
   from /lib/arm-linux-gnueabihf/libpthread.so.0
#2 0xb6b5f8d8 in pthread_mutex_lock ()
   from /lib/arm-linux-gnueabihf/libpthread.so.0
#3 0xb6f1944a in __gthread_mutex_lock (__mutex=0x3b055c)
    at /usr/include/arm-linux-gnueabihf/c++/4.8/bits/gthr-default.h:748
#4 lock (this=0x3b055c) at /usr/include/c++/4.8/mutex:134
#5 lock_gua...

Revision history for this message
Michal Hruby (mhr3) wrote :

As a note, I was trying this from the all-in-one rebinding-logic branch, so the line numbers are valid for that one.

Revision history for this message
Marcus Tomlinson (marcustomlinson) wrote :

> As a note, I was trying this from the all-in-one rebinding-logic branch, so
> the line numbers are valid for that one.

I can confirm that this hang occurs with just this branch on its own (without the serialize-oneway-invoke and rebinding_logic changes) :(

review: Needs Fixing
Revision history for this message
Marcus Tomlinson (marcustomlinson) wrote :

Despite the fact that I missed that hang issue, I will stick with my Approved stance on this branch. I probably won't be around when this is completed, so Michal's review will take preference anyway.

review: Approve
Revision history for this message
Paweł Stołowski (stolowski) wrote :

I'm having issues with running demo/client against real scopes on the phone with this branch (I've replaced libunity-scopes on the phone with the one from this branch, symlinked system-wide Runtime.ini inside demo/ dir): the client receives metadata only, doesn't receive any results and the query finishes with no errors. Interestingly, Dash on the phone seems to be running just fine with this branch.

Michal tested this branch on the phone and desktop and he had same issue on the phone, but all worked fine on the desktop.

I repeated this test with pristine devel branch (also on the phone) and it worked just fine, so perhaps there is something wrong in the client code that breaks with async change, although I couldn't spot anything.

Could you please have a look at the client code or/and try to reproduce?

review: Needs Information
Revision history for this message
Michi Henning (michihenning) wrote :

> 179 + if (--num_children_ == 0)
> 180 + {
> 181 + upstream_->finished(); // Send finished once last child
> finishes
> 182 + }
>
> Why is this needed suddenly? Wasn't the point of create_subquery to take care
> of that?

No, not really. The sub-query thing takes care of transparently passing cancellation down to the child scopes. However, the aggregator still has to make a decision as to when a query is complete. It isn't necessarily the *first* finished() call that arrives. Rather, if the aggregator sends the query to three child scopes, and wants to consider the query finished once the last of these three scopes finishes, it needs to keep track of the number of finished() calls, which is what the above change is doing. (This was broken all along.)

Revision history for this message
Michal Hruby (mhr3) wrote :

> > Why is this needed suddenly? Wasn't the point of create_subquery to take
> care
> > of that?
>
> No, not really. The sub-query thing takes care of transparently passing
> cancellation down to the child scopes.

Right, subsearch doesn't magically make it work, but...

> ... However, the aggregator still has to
> make a decision as to when a query is complete.

...this decision was implicit up until now - subsearch() took a SearchListenerBase instance that kept the upstream ReplyProxy alive, and only when the subsearches finished was the last reference to the upstream ReplyProxy destroyed which sent the finished message.

So something must be broke if this is needed now.

Revision history for this message
Paweł Stołowski (stolowski) wrote :

> I'm having issues with running demo/client against real scopes on the phone
> with this branch (I've replaced libunity-scopes on the phone with the one from
> this branch, symlinked system-wide Runtime.ini inside demo/ dir): the client
> receives metadata only, doesn't receive any results and the query finishes
> with no errors. Interestingly, Dash on the phone seems to be running just fine
> with this branch.
>
> Michal tested this branch on the phone and desktop and he had same issue on
> the phone, but all worked fine on the desktop.
>
> I repeated this test with pristine devel branch (also on the phone) and it
> worked just fine, so perhaps there is something wrong in the client code that
> breaks with async change, although I couldn't spot anything.
>
> Could you please have a look at the client code or/and try to reproduce?

Some more info about how I tested this:

1) I've compiled a checkout of async-create with dpkg-buildpackage on the phone, then installed the resulting .deb package and rebooted the phone.

2) Inside ~/async-create/obj-arm-linux-gnueabihf/demo (where the source code & build output reside), I've created a symlink to system-wide Runtime.ini.

phablet@ubuntu-phablet:~/async-create/obj-arm-linux-gnueabihf/demo$ ls -l
total 696
drwxrwxr-x 4 phablet phablet 4096 Apr 17 12:27 click
-rwxrwxr-x 1 phablet phablet 667320 Apr 17 12:59 client
drwxrwxr-x 3 phablet phablet 4096 Apr 17 12:27 CMakeFiles
-rw-rw-r-- 1 phablet phablet 1570 Apr 17 12:27 cmake_install.cmake
-rw-rw-r-- 1 phablet phablet 351 Apr 17 12:27 CTestTestfile.cmake
-rw-rw-r-- 1 phablet phablet 7477 Apr 17 12:27 Makefile
-rw-rw-r-- 1 phablet phablet 488 Apr 17 12:27 Registry.ini
lrwxrwxrwx 1 phablet phablet 39 Apr 17 13:11 Runtime.ini -> /usr/share/unity-scopes-api/Runtime.ini
-rw-rw-r-- 1 phablet phablet 236 Apr 17 12:27 Runtime.ini.bak
drwxrwxr-x 9 phablet phablet 4096 Apr 17 12:27 scopes
drwxrwxr-x 3 phablet phablet 4096 Apr 17 12:59 stand-alone
-rw-rw-r-- 1 phablet phablet 59 Apr 17 12:27 Zmq.ini
phablet@ubuntu-phablet:~/async-create/obj-arm-linux-gnueabihf/demo$

3) Now, from within ~/async-create/obj-arm-linux-gnueabihf/demo I run ./client against phone's scopes, e.g.
./client list
./client clickscope a
./client clickscope ""

for any scope and query, I'm only getting scope's metadata and no results:

./client clickscope ""
Scope metadata:
        scope_id: clickscope
        display_name: Apps
        description: Scope for searching the click app store
        art: /usr/share/unity/scopes/clickscope//clickscope-screenshot.jpg
        icon: /usr/share/unity/scopes/clickscope//apps-scope.svg
        search_hint: clickscope.SearchHint
        hot_key: clickscope.HotKey
client: created query
client: wait returned
query complete, status: finished

Revision history for this message
Michi Henning (michihenning) wrote :

> Some more info about how I tested this:

Thanks for that! After struggling with building on the phone some more, I could reproduce. (ccache is your friend when it comes to building on the bloody thing...)

The ultimate cause of the problem was this:

120.5.10 <email address hidden> 20140116 | Receiver(int index_to_save)
                                                 | : index_to_save_(index_to_save),
120.6.9 <email address hidden> 20140110 | push_result_count_(0)
                                                 | {
                                                 | }

This constructor was added at some point. The class originally had a default constructor which, after this change, was redundant, but not removed. Unlike the original constructor, the new constructor failed to initialize the query_complete_ member variable. It turns out that, on the device, the corresponding boolean was inititalized to crap (meaning true), causing wait_until_finished() to return immediately. In turn, that caused the client to shut down the run time before the results trickled in from the scope so, unsurprisingly, those results were lost. The finished() call that still seemed to arrive was in fact local and triggered by the shutdown of the run time which, when all remaining ReplyObjects are removed from the object adapter, calls finished(), so it doesn't leave any outstanding unfinished queries behind.

I haven't had a chance to look at the hang during shutdown yet. Will do that next.

300. By Michi Henning

Fixed broken wait for completion: member variable was not initialized,
causing the client to shut down the run time before the results arrived
from the scope. Fixed the other receiver classes to match.

Revision history for this message
Paweł Stołowski (stolowski) wrote :

Thanks for spotting and fixing the issue! Now, there is just one conflict in release notes that needs fixing.

review: Needs Fixing
Revision history for this message
Michal Hruby (mhr3) wrote :

+ the shutdown issue and the ReplyProxy issue.

review: Needs Fixing
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
301. By Michi Henning

Merged devel and resolved conflicts. Set version back to 0.4.2
because addition of valid() to QueryBase is ABI compatible.

Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
302. By Michi Henning

Debug changes, unfinished.

303. By Michi Henning

Fixed incorrect scope of task in ThreadPool::run(): the destructor of the task
was called only once the next task was started, instead of being called
as soon as the task completed.

Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
304. By Michi Henning

Fixed incorrect count of threads in run() method and added a test for throwing tasks.

305. By Michi Henning

Tightened ThreadPool exception test.

Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
306. By Michi Henning

Changed micro version back to 3 after all because there was mention of that in debian/changelog already.
Better safe than sorry.

Revision history for this message
Michi Henning (michihenning) wrote :

> + the shutdown issue and the ReplyProxy issue.

They are both fixed now.

Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
Revision history for this message
Paweł Stołowski (stolowski) wrote :

> > + the shutdown issue and the ReplyProxy issue.
>
> They are both fixed now.

Awesome, thanks for all this! +1 from me.

review: Approve
Revision history for this message
Michal Hruby (mhr3) wrote :

+1, seems to work great, although the threading code is getting pretty scary, but I guess we need to live with that.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'RELEASE_NOTES.txt'
2--- RELEASE_NOTES.txt 2014-04-11 12:07:36 +0000
3+++ RELEASE_NOTES.txt 2014-04-28 05:57:56 +0000
4@@ -1,8 +1,15 @@
5 Release Notes for unity-scopes-api
6 ==================================
7
8-Changes in version 0.4.3
9+Changes in version 0.4.2
10 ========================
11+
12+ * Made the scope search, activate, perform_action, and preview methods non-blocking.
13+ A (fake) QueryCtrl is returned immediately from these methods now. Calling cancel() before
14+ the server has finished creating the query remembers the cancel and sends it to the
15+ server once the server has returned the real QueryCtrl. This change should be transparent
16+ to application code (the only difference being that these methods complete faster now).
17+
18 * CannedQuery class can now be converted to and from a scopes:// uri with to_uri() and from_uri() methods.
19 These methods replace to_string() and from_string() methods that got removed.
20
21
22=== modified file 'debian/libunity-scopes1.symbols'
23--- debian/libunity-scopes1.symbols 2014-04-17 10:31:23 +0000
24+++ debian/libunity-scopes1.symbols 2014-04-28 05:57:56 +0000
25@@ -285,6 +285,7 @@
26 (c++)"unity::scopes::Runtime::run_scope(unity::scopes::ScopeBase*, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&)@Base" 0.4.2+14.04.20140404.2
27 (c++)"unity::scopes::Runtime::Runtime(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&)@Base" 0.4.0+14.04.20140312.1
28 (c++)"unity::scopes::Runtime::~Runtime()@Base" 0.4.0+14.04.20140312.1
29+ (c++)"unity::scopes::Variant::deserialize_json(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&)@Base" 0replaceme
30 (c++)"unity::scopes::Variant::null()@Base" 0.4.0+14.04.20140312.1
31 (c++)"unity::scopes::Variant::swap(unity::scopes::Variant&)@Base" 0.4.0+14.04.20140312.1
32 (c++)"unity::scopes::Variant::Variant(unity::scopes::Variant&&)@Base" 0.4.0+14.04.20140312.1
33@@ -454,7 +455,11 @@
34 (c++)"unity::scopes::internal::StateReceiverObject::push_state(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, unity::scopes::internal::StateReceiverObject::State const&)@Base" 0.4.2+14.04.20140404.2
35 (c++)"unity::scopes::internal::StateReceiverObject::StateReceiverObject()@Base" 0.4.2+14.04.20140404.2
36 (c++)"unity::scopes::internal::StateReceiverObject::~StateReceiverObject()@Base" 0.4.2+14.04.20140404.2
37+ (c++)"unity::scopes::internal::Executor::exec(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::vector<std::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::basic_string<char, std::char_traits<char>, std::allocator<char> > > > const&, std::map<std::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::less<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::allocator<std::pair<std::basic_string<char, std::char_traits<char>, std::allocator<char> > const, std::basic_string<char, std::char_traits<char>, std::allocator<char> > > > > const&, core::posix::StandardStream const&)@Base" 0replaceme
38+ (c++)"unity::scopes::internal::Executor::Executor()@Base" 0replaceme
39+ (c++)"unity::scopes::internal::Executor::~Executor()@Base" 0replaceme
40 (c++)"unity::scopes::internal::ScopeImpl::perform_action(unity::scopes::Result const&, unity::scopes::ActionMetadata const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::shared_ptr<unity::scopes::ActivationListenerBase> const&)@Base" 0.4.0+14.04.20140312.1
41+ (c++)"unity::scopes::internal::ScopeImpl::fwd()@Base" 0replaceme
42 (c++)"unity::scopes::internal::ScopeImpl::create(std::shared_ptr<unity::scopes::internal::MWScope> const&, unity::scopes::internal::RuntimeImpl*, std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&)@Base" 0.4.0+14.04.20140312.1
43 (c++)"unity::scopes::internal::ScopeImpl::search(unity::scopes::CannedQuery const&, unity::scopes::SearchMetadata const&, std::shared_ptr<unity::scopes::SearchListenerBase> const&)@Base" 0.4.0+14.04.20140312.1
44 (c++)"unity::scopes::internal::ScopeImpl::search(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, unity::scopes::FilterState const&, unity::scopes::SearchMetadata const&, std::shared_ptr<unity::scopes::SearchListenerBase> const&)@Base" 0.4.0+14.04.20140312.1
45@@ -575,14 +580,13 @@
46 (c++)"unity::scopes::Runtime::registry() const@Base" 0.4.0+14.04.20140312.1
47 (c++)"unity::scopes::Variant::get_double() const@Base" 0.4.0+14.04.20140312.1
48 (c++)"unity::scopes::Variant::get_string() const@Base" 0.4.0+14.04.20140312.1
49+ (c++)"unity::scopes::Variant::serialize_json() const@Base" 0replaceme
50 (c++)"unity::scopes::Variant::which() const@Base" 0.4.0+14.04.20140312.1
51 (c++)"unity::scopes::Variant::get_int() const@Base" 0.4.0+14.04.20140312.1
52 (c++)"unity::scopes::Variant::is_null() const@Base" 0.4.0+14.04.20140312.1
53 (c++)"unity::scopes::Variant::get_bool() const@Base" 0.4.0+14.04.20140312.1
54 (c++)"unity::scopes::Variant::get_dict() const@Base" 0.4.0+14.04.20140312.1
55 (c++)"unity::scopes::Variant::get_array() const@Base" 0.4.0+14.04.20140312.1
56- (c++)"unity::scopes::Variant::deserialize_json(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&)@Base" 0.4.2+14.04.20140404.2
57- (c++)"unity::scopes::Variant::serialize_json() const@Base" 0.4.2+14.04.20140404.2
58 (c++)"unity::scopes::Variant::operator==(unity::scopes::Variant const&) const@Base" 0.4.0+14.04.20140312.1
59 (c++)"unity::scopes::Variant::operator<(unity::scopes::Variant const&) const@Base" 0.4.0+14.04.20140312.1
60 (c++)"unity::scopes::testing::ScopeMetadataBuilder::operator()() const@Base" 0.4.0+14.04.20140312.1
61@@ -599,11 +603,11 @@
62 (c++)"unity::scopes::Category::icon() const@Base" 0.4.0+14.04.20140312.1
63 (c++)"unity::scopes::Category::title() const@Base" 0.4.0+14.04.20140312.1
64 (c++)"unity::scopes::Category::serialize() const@Base" 0.4.0+14.04.20140312.1
65- (c++)"unity::scopes::internal::Executor::exec(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::vector<std::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::allocator<std::basic_string<char, std::char_traits<char>, std::allocator<char> > > > const&, std::map<std::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::less<std::basic_string<char, std::char_traits<char>, std::allocator<char> > >, std::allocator<std::pair<std::basic_string<char, std::char_traits<char>, std::allocator<char> > const, std::basic_string<char, std::char_traits<char>, std::allocator<char> > > > > const&, core::posix::StandardStream const&)@Base" 0replaceme
66- (c++)"unity::scopes::internal::Executor::Executor()@Base" 0replaceme
67- (c++)"unity::scopes::internal::Executor::~Executor()@Base" 0replaceme
68+ (c++)"unity::scopes::internal::RuntimeImpl::async_pool() const@Base" 0replaceme
69 (c++)"unity::scopes::internal::RuntimeImpl::configfile() const@Base" 0.4.2+14.04.20140404.2
70+ (c++)"unity::scopes::internal::RuntimeImpl::future_queue() const@Base" 0replaceme
71 (c++)"unity::scopes::internal::RuntimeImpl::reply_reaper() const@Base" 0.4.0+14.04.20140312.1
72+ (c++)"unity::scopes::internal::RuntimeImpl::waiter_thread(std::shared_ptr<unity::scopes::internal::ThreadSafeQueue<std::future<void> > > const&) const@Base" 0replaceme
73 (c++)"unity::scopes::internal::RuntimeImpl::proxy_to_string(std::shared_ptr<unity::scopes::Object> const&) const@Base" 0.4.0+14.04.20140312.1
74 (c++)"unity::scopes::internal::RuntimeImpl::string_to_proxy(std::basic_string<char, std::char_traits<char>, std::allocator<char> > const&) const@Base" 0.4.0+14.04.20140312.1
75 (c++)"unity::scopes::internal::RuntimeImpl::registry_endpoint() const@Base" 0.4.0+14.04.20140312.1
76@@ -637,10 +641,10 @@
77 (c++)"unity::scopes::internal::RuntimeConfig::registry_configfile() const@Base" 0.4.2+14.04.20140404.2
78 (c++)"unity::scopes::internal::RuntimeConfig::default_middleware_configfile() const@Base" 0.4.2+14.04.20140404.2
79 (c++)"unity::scopes::internal::MiddlewareBase::runtime() const@Base" 0.4.0+14.04.20140312.1
80- (c++)"unity::scopes::internal::RegistryConfig::click_installdir() const@Base" 0replaceme
81 (c++)"unity::scopes::internal::RegistryConfig::endpointdir() const@Base" 0.4.0+14.04.20140312.1
82 (c++)"unity::scopes::internal::RegistryConfig::mw_configfile() const@Base" 0.4.0+14.04.20140312.1
83 (c++)"unity::scopes::internal::RegistryConfig::oem_installdir() const@Base" 0.4.0+14.04.20140312.1
84+ (c++)"unity::scopes::internal::RegistryConfig::click_installdir() const@Base" 0replaceme
85 (c++)"unity::scopes::internal::RegistryConfig::scope_installdir() const@Base" 0.4.0+14.04.20140312.1
86 (c++)"unity::scopes::internal::RegistryConfig::scoperunner_path() const@Base" 0.4.0+14.04.20140312.1
87 (c++)"unity::scopes::internal::RegistryConfig::ss_registry_endpoint() const@Base" 0.4.0+14.04.20140312.1
88@@ -671,7 +675,7 @@
89 (c++)"unity::scopes::internal::ScopeMetadataImpl::invisible() const@Base" 0.4.0+14.04.20140312.1
90 (c++)"unity::scopes::internal::ScopeMetadataImpl::serialize() const@Base" 0.4.0+14.04.20140312.1
91 (c++)"unity::scopes::internal::StateReceiverObject::state_received() const@Base" 0.4.2+14.04.20140404.2
92- (c++)"unity::scopes::internal::ScopeImpl::fwd() const@Base" 0.4.0+14.04.20140312.1
93+ (c++)"unity::scopes::QueryBase::valid() const@Base" 0replaceme
94 (c++)"unity::scopes::ScopeBase::scope_directory() const@Base" 0.4.2+14.04.20140404.2
95 (c++)"typeinfo for unity::scopes::Annotation@Base" 0.4.0+14.04.20140312.1
96 (c++)"typeinfo for unity::scopes::FilterBase@Base" 0.4.0+14.04.20140312.1
97
98=== modified file 'demo/client.cpp'
99--- demo/client.cpp 2014-04-23 11:33:49 +0000
100+++ demo/client.cpp 2014-04-28 05:57:56 +0000
101@@ -122,8 +122,9 @@
102 {
103 public:
104 Receiver(int index_to_save)
105- : index_to_save_(index_to_save),
106- push_result_count_(0)
107+ : query_complete_(false),
108+ push_result_count_(0),
109+ index_to_save_(index_to_save)
110 {
111 }
112
113@@ -212,23 +213,23 @@
114 return push_result_count_;
115 }
116
117- Receiver() :
118- query_complete_(false)
119- {
120- }
121-
122 private:
123 bool query_complete_;
124+ int push_result_count_ = 0;
125 int index_to_save_;
126- mutex mutex_;
127- condition_variable condvar_;
128- int push_result_count_ = 0;
129 std::shared_ptr<Result> saved_result_;
130+ mutex mutex_;
131+ condition_variable condvar_;
132 };
133
134 class ActivationReceiver : public ActivationListenerBase
135 {
136 public:
137+ ActivationReceiver()
138+ : query_complete_(false)
139+ {
140+ }
141+
142 void activated(ActivationResponse const& response) override
143 {
144 cout << "\tGot activation response: " << response.status() << endl;
145@@ -237,16 +238,19 @@
146 void finished(Reason r, std::string const& error_message)
147 {
148 cout << "\tActivation finished, reason: " << r << ", error_message: " << error_message << endl;
149+ lock_guard<decltype(mutex_)> lock(mutex_);
150+ query_complete_ = true;
151 condvar_.notify_one();
152 }
153
154 void wait_until_finished()
155 {
156 unique_lock<decltype(mutex_)> lock(mutex_);
157- condvar_.wait(lock);
158+ condvar_.wait(lock, [this](){ return query_complete_; });
159 }
160
161 private:
162+ bool query_complete_;
163 mutex mutex_;
164 condition_variable condvar_;
165 };
166@@ -254,6 +258,11 @@
167 class PreviewReceiver : public PreviewListenerBase
168 {
169 public:
170+ PreviewReceiver()
171+ : query_complete_(false)
172+ {
173+ }
174+
175 void push(ColumnLayoutList const& columns) override
176 {
177 cout << "\tGot column layouts:" << endl;
178@@ -297,17 +306,20 @@
179
180 void finished(Reason r, std::string const& error_message) override
181 {
182+ lock_guard<decltype(mutex_)> lock(mutex_);
183 cout << "\tPreview finished, reason: " << r << ", error_message: " << error_message << endl;
184+ query_complete_ = true;
185 condvar_.notify_one();
186 }
187
188 void wait_until_finished()
189 {
190 unique_lock<decltype(mutex_)> lock(mutex_);
191- condvar_.wait(lock);
192+ condvar_.wait(lock, [this](){ return query_complete_; });
193 }
194
195 private:
196+ bool query_complete_;
197 mutex mutex_;
198 condition_variable condvar_;
199 };
200@@ -434,9 +446,7 @@
201 SearchMetadata metadata("C", "desktop");
202 metadata.set_cardinality(10);
203 auto ctrl = meta.proxy()->search(search_string, metadata, reply); // May raise TimeoutException
204- cout << "client: created query" << endl;
205 reply->wait_until_finished();
206- cout << "client: wait returned" << endl;
207
208 // handle activation
209 if (result_index > 0)
210
211=== modified file 'demo/scopes/scope-A/scope-A.cpp'
212--- demo/scopes/scope-A/scope-A.cpp 2014-02-27 23:20:56 +0000
213+++ demo/scopes/scope-A/scope-A.cpp 2014-04-28 05:57:56 +0000
214@@ -31,13 +31,16 @@
215 class MyQuery : public SearchQueryBase
216 {
217 public:
218- MyQuery(CannedQuery const& query) :
219+ MyQuery(string const& scope_id, CannedQuery const& query) :
220+ scope_id_(scope_id),
221 query_(query)
222 {
223+ cerr << scope_id_ << ": query instance for \"" << query.query_string() << "\" created" << endl;
224 }
225
226 ~MyQuery()
227 {
228+ cerr << scope_id_ << ": query instance for \"" << query_.query_string() << "\" destroyed" << endl;
229 }
230
231 virtual void cancelled() override
232@@ -46,6 +49,11 @@
233
234 virtual void run(SearchReplyProxy const& reply) override
235 {
236+ if (!valid())
237+ {
238+ return; // Query was cancelled
239+ }
240+
241 DepartmentList departments({{"news", query_, "News", {{"news-world", query_, "World"}, {"news-europe", query_, "Europe"}}},
242 {"sport", query_, "Sport"}});
243 reply->register_departments(departments);
244@@ -56,7 +64,10 @@
245 filter->add_option("2", "Option 2");
246 filters.push_back(filter);
247 FilterState filter_state; // TODO: push real state from query obj
248- reply->push(filters, filter_state);
249+ if (!reply->push(filters, filter_state))
250+ {
251+ return; // Query was cancelled
252+ }
253
254 CategoryRenderer rdr;
255 auto cat = reply->register_category("cat1", "Category 1", "", rdr);
256@@ -65,30 +76,37 @@
257 res.set_title("scope-A: result 1 for query \"" + query_.query_string() + "\"");
258 res.set_art("icon");
259 res.set_dnd_uri("dnd_uri");
260- reply->push(res);
261+ if (!reply->push(res))
262+ {
263+ return; // Query was cancelled
264+ }
265
266 CannedQuery q("scope-A", query_.query_string(), "");
267 Annotation annotation(Annotation::Type::Link);
268 annotation.add_link("More...", q);
269 reply->register_annotation(annotation);
270
271- cout << "scope-A: query \"" << query_.query_string() << "\" complete" << endl;
272+ cerr << "scope-A: query \"" << query_.query_string() << "\" complete" << endl;
273 }
274
275 private:
276+ string scope_id_;
277 CannedQuery query_;
278 };
279
280 class MyPreview : public PreviewQueryBase
281 {
282 public:
283- MyPreview(string const& uri) :
284+ MyPreview(string const& scope_id, string const& uri) :
285+ scope_id_(scope_id),
286 uri_(uri)
287 {
288+ cerr << scope_id_ << ": preview instance for \"" << uri << "\" created" << endl;
289 }
290
291 ~MyPreview()
292 {
293+ cerr << scope_id_ << ": preview instance for \"" << uri_ << "\" destroyed" << endl;
294 }
295
296 virtual void cancelled() override
297@@ -119,21 +137,32 @@
298 layout3col.add_column({"rating"});
299
300 reply->register_layout({layout1col, layout2col, layout3col});
301- reply->push(widgets);
302- reply->push("author", Variant("Foo"));
303- reply->push("rating", Variant("4 blah"));
304- cout << "scope-A: preview for \"" << uri_ << "\" complete" << endl;
305+ if (!reply->push(widgets))
306+ {
307+ return; // Query was cancelled
308+ }
309+ if (!reply->push("author", Variant("Foo")))
310+ {
311+ return; // Query was cancelled
312+ }
313+ if (!reply->push("rating", Variant("4 blah")))
314+ {
315+ return; // Query was cancelled
316+ }
317+ cerr << "scope-A: preview for \"" << uri_ << "\" complete" << endl;
318 }
319
320 private:
321+ string scope_id_;
322 string uri_;
323 };
324
325 class MyScope : public ScopeBase
326 {
327 public:
328- virtual int start(string const&, RegistryProxy const&) override
329+ virtual int start(string const& scope_id, RegistryProxy const&) override
330 {
331+ scope_id_ = scope_id;
332 return VERSION;
333 }
334
335@@ -141,17 +170,19 @@
336
337 virtual SearchQueryBase::UPtr search(CannedQuery const& q, SearchMetadata const&) override
338 {
339- SearchQueryBase::UPtr query(new MyQuery(q));
340- cout << "scope-A: created query: \"" << q.query_string() << "\"" << endl;
341+ SearchQueryBase::UPtr query(new MyQuery(scope_id_, q));
342 return query;
343 }
344
345 virtual PreviewQueryBase::UPtr preview(Result const& result, ActionMetadata const&) override
346 {
347- PreviewQueryBase::UPtr preview(new MyPreview(result.uri()));
348- cout << "scope-A: created previewer: \"" << result.uri() << "\"" << endl;
349+ PreviewQueryBase::UPtr preview(new MyPreview(scope_id_, result.uri()));
350+ cerr << "scope-A: created previewer: \"" << result.uri() << "\"" << endl;
351 return preview;
352 }
353+
354+private:
355+ string scope_id_;
356 };
357
358 extern "C"
359
360=== modified file 'demo/scopes/scope-B/scope-B.cpp'
361--- demo/scopes/scope-B/scope-B.cpp 2014-04-03 12:57:25 +0000
362+++ demo/scopes/scope-B/scope-B.cpp 2014-04-28 05:57:56 +0000
363@@ -47,12 +47,12 @@
364 public:
365 virtual void push(Category::SCPtr category) override
366 {
367- cout << "received category: id=" << category->id() << endl;
368+ cerr << scope_id_ << ": received category: id=" << category->id() << endl;
369 }
370
371 virtual void push(CategorisedResult result) override
372 {
373- cout << "received result from " << scope_id_ << ": " << result.uri() << ", " << result.title() << endl;
374+ cerr << scope_id_ << ": received result: " << result.uri() << ", " << result.title() << endl;
375 try
376 {
377 result.set_category(upstream_->lookup_category("catB"));
378@@ -60,18 +60,18 @@
379 }
380 catch (const unity::InvalidArgumentException &e)
381 {
382- cerr << "error pushing result: " << e.what() << endl;
383+ cerr << scope_id_ << ": error pushing result: " << e.what() << endl;
384 }
385 }
386
387 virtual void finished(Reason reason, string const& error_message) override
388 {
389- cout << "query to " << scope_id_ << " complete, status: " << to_string(reason);
390+ cerr << scope_id_ << ": subquery complete, status: " << to_string(reason);
391 if (reason == ListenerBase::Error)
392 {
393- cout << ": " << error_message;
394+ cerr << ": " << error_message;
395 }
396- cout << endl;
397+ cerr << endl;
398 }
399
400 Receiver(string const& scope_id, SearchReplyProxy const& upstream) :
401@@ -101,11 +101,16 @@
402
403 virtual void cancelled()
404 {
405- cout << "query to " << scope_id_ << " was cancelled" << endl;
406+ cerr << scope_id_ << ": query " << query_.query_string() << " was cancelled" << endl;
407 }
408
409 virtual void run(SearchReplyProxy const& upstream_reply)
410 {
411+ if (!valid())
412+ {
413+ return; // Query was cancelled
414+ }
415+
416 // note, category id must mach categories received from scope C and D, otherwise result pushing will fail.
417 try
418 {
419@@ -157,13 +162,13 @@
420 virtual SearchQueryBase::UPtr search(CannedQuery const& q, SearchMetadata const&) override
421 {
422 SearchQueryBase::UPtr query(new MyQuery(scope_id_, q, scope_c_, scope_d_));
423- cout << "scope-B: created query: \"" << q.query_string() << "\"" << endl;
424+ cerr << "scope-B: created query: \"" << q.query_string() << "\"" << endl;
425 return query;
426 }
427
428 virtual PreviewQueryBase::UPtr preview(Result const& result, ActionMetadata const&) override
429 {
430- cout << "scope-B: preview: \"" << result.uri() << "\"" << endl;
431+ cerr << "scope-B: preview: \"" << result.uri() << "\"" << endl;
432 return nullptr;
433 }
434
435
436=== modified file 'demo/scopes/scope-C/scope-C.cpp'
437--- demo/scopes/scope-C/scope-C.cpp 2014-04-11 12:04:41 +0000
438+++ demo/scopes/scope-C/scope-C.cpp 2014-04-28 05:57:56 +0000
439@@ -129,16 +129,17 @@
440 class MyQuery : public SearchQueryBase
441 {
442 public:
443- MyQuery(CannedQuery const& query, Queue& queue) :
444+ MyQuery(string const& scope_id, CannedQuery const& query, Queue& queue) :
445+ scope_id_(scope_id),
446 query_(query),
447 queue_(queue)
448 {
449- cerr << "My Query created" << endl;
450+ cerr << scope_id_ << ": query instance for \"" << query.query_string() << "\" created" << endl;
451 }
452
453 ~MyQuery()
454 {
455- cerr << "My Query destroyed" << endl;
456+ cerr << scope_id_ << ": query instance for \"" << query_.query_string() << "\" destroyed" << endl;
457 }
458
459 virtual void cancelled() override
460@@ -148,16 +149,21 @@
461 // query, the worker thread's next call to push() will return false,
462 // causing the worker thread to stop working on this query.
463 queue_.remove(this);
464- cerr << "scope-C: \"" + query_.to_uri() + "\" cancelled" << endl;
465+ cerr << scope_id_ << ": \"" + query_.to_uri() + "\" cancelled" << endl;
466 }
467
468 virtual void run(SearchReplyProxy const& reply) override
469 {
470+ if (!valid())
471+ {
472+ return; // Query was cancelled
473+ }
474+
475 queue_.put(this, query_.query_string(), reply);
476- cerr << "scope-C: run() returning" << endl;
477 }
478
479 private:
480+ string scope_id_;
481 CannedQuery query_;
482 Queue& queue_;
483 };
484@@ -205,7 +211,7 @@
485 {
486 CategorisedResult result(cat);
487 result.set_uri("uri");
488- result.set_title("scope-C: result " + to_string(i) + " for query \"" + query + "\"");
489+ result.set_title(scope_id_ + ": result " + to_string(i) + " for query \"" + query + "\"");
490 result.set_art("icon");
491 result.set_dnd_uri("dnd_uri");
492 result.set_intercept_activation();
493@@ -215,24 +221,24 @@
494 }
495 sleep(1);
496 }
497+ cerr << scope_id_ << ": query \"" << query << "\" complete" << endl;
498 }
499 }
500
501 virtual SearchQueryBase::UPtr search(CannedQuery const& q, SearchMetadata const&) override
502 {
503- cout << scope_id_ << ": created query: \"" << q.query_string() << "\"" << endl;
504- return SearchQueryBase::UPtr(new MyQuery(q, queue));
505+ return SearchQueryBase::UPtr(new MyQuery(scope_id_, q, queue));
506 }
507
508 virtual ActivationQueryBase::UPtr activate(Result const& result, ActionMetadata const& /* hints */) override
509 {
510- cout << scope_id_ << ": activate: \"" << result.uri() << "\"" << endl;
511+ cerr << scope_id_ << ": activate: \"" << result.uri() << "\"" << endl;
512 return ActivationQueryBase::UPtr(new MyActivation());
513 }
514
515 virtual PreviewQueryBase::UPtr preview(Result const& result, ActionMetadata const&) override
516 {
517- cout << "scope-C: preview: \"" << result.uri() << "\"" << endl;
518+ cerr << scope_id_ << ": preview: \"" << result.uri() << "\"" << endl;
519 return nullptr;
520 }
521
522
523=== modified file 'demo/scopes/scope-D/scope-D.cpp'
524--- demo/scopes/scope-D/scope-D.cpp 2014-04-03 12:57:25 +0000
525+++ demo/scopes/scope-D/scope-D.cpp 2014-04-28 05:57:56 +0000
526@@ -132,12 +132,12 @@
527 query_(query),
528 queue_(queue)
529 {
530- cerr << "query instance for \"" << scope_id_ << ":" << query.query_string() << "\" created" << endl;
531+ cerr << scope_id_ << ": query instance for \"" << query.query_string() << "\" created" << endl;
532 }
533
534 virtual ~MyQuery()
535 {
536- cerr << "query instance for \"" << scope_id_ << ":" << query_.query_string() << "\" destroyed" << endl;
537+ cerr << scope_id_ << ": query instance for \"" << query_.query_string() << "\" destroyed" << endl;
538 }
539
540 virtual void cancelled() override
541@@ -151,11 +151,16 @@
542 // work may still be in progress on a query. Note that cancellations are frequent;
543 // not responding to cancelled() correctly causes loss of performance.
544
545- cerr << "query for \"" << scope_id_ << ":" << query_.query_string() << "\" cancelled" << endl;
546+ cerr << scope_id_ << ": query for \"" << query_.query_string() << "\" cancelled" << endl;
547 }
548
549 virtual void run(SearchReplyProxy const& reply) override
550 {
551+ if (!valid())
552+ {
553+ return; // Query was cancelled
554+ }
555+
556 // The query can do anything it likes with this method, that is, run() can push results
557 // directly on the provided reply, or it can save the reply for later use and return from
558 // run(). It is OK to push results on the reply from a different thread.
559@@ -226,13 +231,12 @@
560 virtual SearchQueryBase::UPtr search(CannedQuery const& q, SearchMetadata const&) override
561 {
562 SearchQueryBase::UPtr query(new MyQuery(scope_id_, q, queue_));
563- cerr << scope_id_ << ": created query: \"" << q.query_string() << "\"" << endl;
564 return query;
565 }
566
567 virtual PreviewQueryBase::UPtr preview(Result const& result, ActionMetadata const&) override
568 {
569- cout << scope_id_ << ": preview: \"" << result.uri() << "\"" << endl;
570+ cerr << scope_id_ << ": preview: \"" << result.uri() << "\"" << endl;
571 return nullptr;
572 }
573
574
575=== modified file 'demo/scopes/scope-N/scope-N.cpp'
576--- demo/scopes/scope-N/scope-N.cpp 2014-02-27 23:20:56 +0000
577+++ demo/scopes/scope-N/scope-N.cpp 2014-04-28 05:57:56 +0000
578@@ -32,32 +32,44 @@
579 class MyQuery : public SearchQueryBase
580 {
581 public:
582- MyQuery()
583+ MyQuery(string const& scope_id)
584+ : scope_id_(scope_id)
585 {
586+ cerr << scope_id_ << ": query instance created" << endl;
587 }
588
589 ~MyQuery()
590 {
591+ cerr << scope_id_ << ": query instance destroyed" << endl;
592 }
593
594 virtual void cancelled() override
595 {
596- cerr << "scope-no-op: received cancel request" << endl;
597+ cerr << scope_id_ << ": received cancel request" << endl;
598 }
599
600 virtual void run(SearchReplyProxy const&) override
601 {
602- cerr << "scope-no-op: received query" << endl;
603+ if (!valid())
604+ {
605+ return; // Query was cancelled
606+ }
607+
608+ cerr << scope_id_ << ": received query" << endl;
609 this_thread::sleep_for(chrono::seconds(3));
610- cerr << "scope-no-op: query complete" << endl;
611+ cerr << scope_id_ << ": query complete" << endl;
612 }
613+
614+private:
615+ string scope_id_;
616 };
617
618 class MyScope : public ScopeBase
619 {
620 public:
621- virtual int start(string const&, RegistryProxy const&) override
622+ virtual int start(string const& scope_id, RegistryProxy const&) override
623 {
624+ scope_id_ = scope_id;
625 return VERSION;
626 }
627
628@@ -65,13 +77,16 @@
629
630 virtual SearchQueryBase::UPtr search(CannedQuery const&, SearchMetadata const&) override
631 {
632- return SearchQueryBase::UPtr(new MyQuery);
633+ return SearchQueryBase::UPtr(new MyQuery(scope_id_));
634 }
635
636 virtual PreviewQueryBase::UPtr preview(Result const&, ActionMetadata const&) override
637 {
638 return nullptr;
639 }
640+
641+private:
642+ string scope_id_;
643 };
644
645 extern "C"
646
647=== modified file 'demo/scopes/scope-S/scope-S.cpp'
648--- demo/scopes/scope-S/scope-S.cpp 2014-03-04 03:15:11 +0000
649+++ demo/scopes/scope-S/scope-S.cpp 2014-04-28 05:57:56 +0000
650@@ -36,36 +36,49 @@
651 class MyQuery : public SearchQueryBase
652 {
653 public:
654- MyQuery(CannedQuery const& query, CategoryRenderer const& renderer) :
655+ MyQuery(string const& scope_id, CannedQuery const& query, CategoryRenderer const& renderer) :
656+ scope_id_(scope_id),
657 query_(query),
658 renderer_(renderer)
659 {
660- cerr << "MyQuery/" << query.query_string() << " created" << endl;
661+ cerr << scope_id_ << ": query instance for \"" << query.query_string() << "\" created" << endl;
662 }
663
664 ~MyQuery()
665 {
666- cerr << "MyQuery/" << query_.query_string() << " destroyed" << endl;
667+ cerr << scope_id_ << ": query instance for \"" << query_.query_string() << "\" destroyed" << endl;
668 }
669
670 virtual void cancelled() override
671 {
672- cerr << "MyQuery/" << query_.query_string() << " cancelled" << endl;
673+ cerr << scope_id_ << ": \"" + query_.to_uri() + "\" cancelled" << endl;
674 }
675
676 virtual void run(SearchReplyProxy const& reply) override
677 {
678- cerr << "scope-slow: run called for \"" << query_.query_string() << "\"" << endl;
679- this_thread::sleep_for(chrono::seconds(20));
680+ if (!valid())
681+ {
682+ return; // Query was cancelled
683+ }
684+
685+ cerr << scope_id_ << ": run called for \"" << query_.query_string() << "\"" << endl;
686+ int const short_secs = 5;
687+ cerr << scope_id_ << ": sleeping for " << short_secs << " seconds" << endl;
688+ this_thread::sleep_for(chrono::seconds(short_secs));
689 auto cat = reply->register_category("cat1", "Category 1", "", renderer_);
690 CategorisedResult result(cat);
691 result.set_uri("uri");
692- result.set_title("scope-slow: result 1 for query \"" + query_.query_string() + "\"");
693+ result.set_title(scope_id_ + ": result 1 for query \"" + query_.query_string() + "\"");
694+ cerr << scope_id_ << ": pushing result" << endl;
695 reply->push(result);
696- cout << "scope-slow: query \"" << query_.query_string() << "\" complete" << endl;
697+ int const long_secs = 50;
698+ cerr << scope_id_ << ": sleeping for " << long_secs << " seconds" << endl;
699+ this_thread::sleep_for(chrono::seconds(long_secs));
700+ cout << scope_id_ << ": query \"" << query_.query_string() << "\" complete" << endl;
701 }
702
703 private:
704+ string scope_id_;
705 CannedQuery query_;
706 CategoryRenderer renderer_;
707 };
708@@ -73,35 +86,29 @@
709 class MyScope : public ScopeBase
710 {
711 public:
712- virtual int start(string const&, RegistryProxy const&) override
713+ virtual int start(string const& scope_id, RegistryProxy const&) override
714 {
715+ scope_id_ = scope_id;
716 return VERSION;
717 }
718
719 virtual void stop() override {}
720
721- virtual SearchQueryBase::UPtr search(CannedQuery const& q, SearchMetadata const& hints) override
722+ virtual SearchQueryBase::UPtr search(CannedQuery const& q, SearchMetadata const& /* hints */) override
723 {
724- SearchQueryBase::UPtr query(new MyQuery(q, renderer_));
725- cout << "scope-slow: created query: \"" << q.query_string() << "\"" << endl;
726-
727- if (hints.cardinality() > 0)
728- {
729- cerr << "result cardinality: " << hints.cardinality() << endl;
730- }
731-
732- cerr << "locale: " << hints.locale() << endl;
733-
734+ SearchQueryBase::UPtr query(new MyQuery(scope_id_, q, renderer_));
735+ cout << scope_id_ << ": created query: \"" << q.query_string() << "\"" << endl;
736 return query;
737 }
738
739 virtual PreviewQueryBase::UPtr preview(Result const& result, ActionMetadata const&) override
740 {
741- cout << "scope-S: preview: \"" << result.uri() << "\"" << endl;
742+ cout << scope_id_ << ": preview: \"" << result.uri() << "\"" << endl;
743 return nullptr;
744 }
745
746 private:
747+ string scope_id_;
748 CategoryRenderer renderer_;
749 };
750
751
752=== modified file 'include/unity/scopes/PreviewQueryBase.h'
753--- include/unity/scopes/PreviewQueryBase.h 2014-03-05 02:42:49 +0000
754+++ include/unity/scopes/PreviewQueryBase.h 2014-04-28 05:57:56 +0000
755@@ -76,7 +76,7 @@
756
757 \param reply The proxy on which to push results for the preview.
758 */
759- virtual void run(PreviewReplyProxy const& reply) = 0; // Called by the run time to start this query
760+ virtual void run(PreviewReplyProxy const& reply) = 0; // Called by the run time to start this preview
761
762 // TODO: Add a method for subpreview request?
763
764
765=== modified file 'include/unity/scopes/QueryBase.h'
766--- include/unity/scopes/QueryBase.h 2014-03-10 05:51:39 +0000
767+++ include/unity/scopes/QueryBase.h 2014-04-28 05:57:56 +0000
768@@ -72,10 +72,28 @@
769 Your implementation of this method should ensure that the scope stops
770 processing the current query as soon as possible. Any calls to a `push()` method
771 once a query is cancelled are ignored, so continuing to push after cancellation
772- only wastes CPU cycles.
773+ only wastes CPU cycles. (`push()` returns `false` once a query is cancelled or
774+ exceeds its cardinality limit.)
775 */
776 virtual void cancelled() = 0; // Originator cancelled the query
777
778+ /**
779+ \brief Check whether this query is still valid.
780+
781+ valid() returns false if this query is finished or was cancelled earlier. Note that it is possible
782+ that the run time may call SearchQueryBase::run(), ActivationQueryBase::activate(), or PreviewQueryBase::run()
783+ \a after cancelled() was called. Your implementation of these methods should check whether the query is still
784+ valid and, if not, do nothing.
785+
786+ This method is provided mainly for convenience: it can be used in your s `run()` or `activate()` implementation to
787+ avoid doing a lot of work setting up a query that was cancelled earlier. Note that, because cancellation
788+ can happen at any time during query execution, your implementation should always test the return value
789+ of `push()`. If `push()` returns `false`, the query was either cancelled or exceeded its cardinality limit.
790+ Either way, there is no point in continuing to push more results because, once `push()` returns `false`,
791+ the scopes run time discards all subsequent results for the query.
792+ */
793+ bool valid() const;
794+
795 /// @cond
796 virtual ~QueryBase();
797 /// @endcond
798
799=== modified file 'include/unity/scopes/internal/ObjectImpl.h'
800--- include/unity/scopes/internal/ObjectImpl.h 2014-04-03 12:57:25 +0000
801+++ include/unity/scopes/internal/ObjectImpl.h 2014-04-28 05:57:56 +0000
802@@ -22,6 +22,8 @@
803 #include<unity/scopes/internal/MWObjectProxyFwd.h>
804 #include<unity/scopes/Object.h>
805
806+#include <mutex>
807+
808 namespace unity
809 {
810
811@@ -31,7 +33,7 @@
812 namespace internal
813 {
814
815-class ObjectImpl : public virtual Object
816+class ObjectImpl : public virtual Object, public virtual std::enable_shared_from_this<ObjectImpl>
817 {
818 public:
819 ObjectImpl(MWProxy const& mw_proxy);
820@@ -48,10 +50,17 @@
821 virtual void ping();
822
823 protected:
824- MWProxy proxy() const; // Non-virtual because we cannot use covariance with incomplete types.
825- // Each derived proxy implements a non-virtual fwd() method
826- // that is called from within each operation to down-cast the MWProxy.
827+ MWProxy proxy(); // Non-virtual because we cannot use covariance with incomplete types.
828+ // Each derived proxy implements a non-virtual fwd() method
829+ // that is called from within each operation to down-cast the MWProxy.
830+
831+ void set_proxy(MWProxy const& p); // Allows a derived proxy to replace mw_proxy_ for asynchronous twoway calls.
832+
833 MWProxy mw_proxy_;
834+ std::mutex proxy_mutex_; // Protects mw_proxy_
835+
836+private:
837+ void check_proxy(); // Throws from operations if mw_proxy_ is null
838 };
839
840 } // namespace internal
841
842=== modified file 'include/unity/scopes/internal/QueryBaseImpl.h'
843--- include/unity/scopes/internal/QueryBaseImpl.h 2014-03-10 04:17:09 +0000
844+++ include/unity/scopes/internal/QueryBaseImpl.h 2014-04-28 05:57:56 +0000
845@@ -63,10 +63,13 @@
846
847 void cancel();
848 void set_metadata(SearchMetadata const& metadata);
849+ bool valid() const;
850
851 private:
852 SearchMetadata::UPtr search_metadata_;
853 std::vector<QueryCtrlProxy> subqueries_;
854+ bool valid_;
855+ mutable std::mutex mutex_;
856 };
857
858 } // namespace internal
859
860=== modified file 'include/unity/scopes/internal/QueryCtrlImpl.h'
861--- include/unity/scopes/internal/QueryCtrlImpl.h 2014-03-07 02:18:16 +0000
862+++ include/unity/scopes/internal/QueryCtrlImpl.h 2014-04-28 05:57:56 +0000
863@@ -33,11 +33,7 @@
864 namespace internal
865 {
866
867-// Proxy used by a scope to push results for a query.
868-// To indicate that it has sent the last result, the scope must call finished().
869-// Subsequent calls to finished() are ignored.
870-// Calls to push() after finished() was called are ignored.
871-// If the proxy goes out of scope before finished was called, it implicitly calls finished().
872+class ScopeImpl;
873
874 class QueryCtrlImpl : public virtual unity::scopes::QueryCtrl, public virtual ObjectImpl
875 {
876@@ -47,10 +43,17 @@
877
878 virtual void cancel() override;
879
880+ void set_proxy(MWQueryCtrlProxy const& p);
881+
882 private:
883- MWQueryCtrlProxy fwd() const;
884+ MWQueryCtrlProxy fwd();
885
886 MWReplyProxy reply_proxy_;
887+ bool ready_; // True once ObjectImpl::set_proxy() was called
888+ bool cancelled_; // True if cancel() is called before set_proxy() was called
889+ std::mutex mutex_; // Protects ready_ and cancelled_
890+
891+ friend class ScopeImpl; // Allows ScopeImpl to call set_proxy()
892 };
893
894 } // namespace internal
895
896=== modified file 'include/unity/scopes/internal/QueryObject.h'
897--- include/unity/scopes/internal/QueryObject.h 2014-03-04 06:26:21 +0000
898+++ include/unity/scopes/internal/QueryObject.h 2014-04-28 05:57:56 +0000
899@@ -24,8 +24,6 @@
900 #include <unity/scopes/internal/MWQueryCtrlProxyFwd.h>
901 #include <unity/scopes/ReplyProxyFwd.h>
902
903-#include <atomic>
904-
905 namespace unity
906 {
907
908@@ -71,9 +69,10 @@
909 MWReplyProxy reply_;
910 std::weak_ptr<unity::scopes::Reply> reply_proxy_;
911 MWQueryCtrlProxy const ctrl_;
912- std::atomic_bool pushable_;
913+ bool pushable_;
914 QueryObjectBase::SPtr self_;
915- std::atomic_int cardinality_;
916+ int cardinality_;
917+ mutable std::mutex mutex_;
918 };
919
920 } // namespace internal
921
922=== modified file 'include/unity/scopes/internal/RegistryImpl.h'
923--- include/unity/scopes/internal/RegistryImpl.h 2014-04-03 12:57:25 +0000
924+++ include/unity/scopes/internal/RegistryImpl.h 2014-04-28 05:57:56 +0000
925@@ -48,7 +48,7 @@
926 ObjectProxy locate(std::string const& identity);
927
928 private:
929- MWRegistryProxy fwd() const;
930+ MWRegistryProxy fwd();
931 };
932
933 } // namespace internal
934
935=== modified file 'include/unity/scopes/internal/ReplyImpl.h'
936--- include/unity/scopes/internal/ReplyImpl.h 2014-03-07 05:56:22 +0000
937+++ include/unity/scopes/internal/ReplyImpl.h 2014-04-28 05:57:56 +0000
938@@ -54,7 +54,7 @@
939 protected:
940 bool push(VariantMap const& variant_map);
941
942- MWReplyProxy fwd() const;
943+ MWReplyProxy fwd();
944
945 private:
946 std::shared_ptr<QueryObjectBase> qo_;
947
948=== modified file 'include/unity/scopes/internal/ReplyObject.h'
949--- include/unity/scopes/internal/ReplyObject.h 2014-03-07 01:07:28 +0000
950+++ include/unity/scopes/internal/ReplyObject.h 2014-04-28 05:57:56 +0000
951@@ -59,7 +59,7 @@
952 void finished(ListenerBase::Reason reason, std::string const& error_message) noexcept override;
953
954 private:
955- ListenerBase::SPtr const listener_base_;
956+ ListenerBase::SPtr listener_base_;
957 ReapItem::SPtr reap_item_;
958 std::atomic_bool finished_;
959 std::mutex mutex_;
960
961=== modified file 'include/unity/scopes/internal/RuntimeImpl.h'
962--- include/unity/scopes/internal/RuntimeImpl.h 2014-04-03 14:22:02 +0000
963+++ include/unity/scopes/internal/RuntimeImpl.h 2014-04-28 05:57:56 +0000
964@@ -22,10 +22,9 @@
965 #include <unity/scopes/internal/MiddlewareBase.h>
966 #include <unity/scopes/internal/MiddlewareFactory.h>
967 #include <unity/scopes/internal/Reaper.h>
968+#include <unity/scopes/internal/ThreadPool.h>
969 #include <unity/scopes/Runtime.h>
970
971-#include <atomic>
972-
973 namespace unity
974 {
975
976@@ -53,6 +52,8 @@
977 std::string registry_endpointdir() const;
978 std::string registry_endpoint() const;
979 Reaper::SPtr reply_reaper() const;
980+ ThreadPool::SPtr async_pool() const;
981+ ThreadSafeQueue<std::future<void>>::SPtr future_queue() const;
982 void run_scope(ScopeBase *const scope_base, std::string const &scope_ini_file);
983
984 ObjectProxy string_to_proxy(std::string const& s) const;
985@@ -62,8 +63,9 @@
986
987 private:
988 RuntimeImpl(std::string const& scope_id, std::string const& configfile);
989+ void waiter_thread(ThreadSafeQueue<std::future<void>>::SPtr const& queue) const noexcept;
990
991- std::atomic_bool destroyed_;
992+ bool destroyed_;
993 std::string scope_id_;
994 std::string configfile_;
995 MiddlewareFactory::UPtr middleware_factory_;
996@@ -74,7 +76,10 @@
997 mutable std::string registry_endpointdir_;
998 mutable std::string registry_endpoint_;
999 mutable Reaper::SPtr reply_reaper_;
1000- mutable std::mutex mutex_; // For lazy initialization of reply_reaper_
1001+ mutable ThreadPool::SPtr async_pool_; // Pool of invocation threads for async query creation
1002+ mutable ThreadSafeQueue<std::future<void>>::SPtr future_queue_;
1003+ mutable std::thread waiter_thread_;
1004+ mutable std::mutex mutex_; // For lazy initialization of reply_reaper_, async_pool_, and queue_
1005 };
1006
1007 } // namespace internal
1008
1009=== modified file 'include/unity/scopes/internal/ScopeImpl.h'
1010--- include/unity/scopes/internal/ScopeImpl.h 2014-03-07 04:19:32 +0000
1011+++ include/unity/scopes/internal/ScopeImpl.h 2014-04-28 05:57:56 +0000
1012@@ -66,7 +66,7 @@
1013
1014 QueryCtrlProxy search(CannedQuery const& query,
1015 SearchMetadata const& metadata,
1016- SearchListenerBase::SPtr const& reply); // Not remote operation, hence not override
1017+ SearchListenerBase::SPtr const& reply); // Not remote, hence not override
1018
1019 virtual QueryCtrlProxy activate(Result const& result,
1020 ActionMetadata const& metadata,
1021@@ -85,7 +85,7 @@
1022 static ScopeProxy create(MWScopeProxy const& mw_proxy, RuntimeImpl* runtime, std::string const& scope_id);
1023
1024 private:
1025- MWScopeProxy fwd() const;
1026+ MWScopeProxy fwd();
1027
1028 RuntimeImpl* const runtime_;
1029 std::string scope_id_;
1030
1031=== modified file 'include/unity/scopes/internal/SearchReplyImpl.h'
1032--- include/unity/scopes/internal/SearchReplyImpl.h 2014-03-07 10:54:15 +0000
1033+++ include/unity/scopes/internal/SearchReplyImpl.h 2014-04-28 05:57:56 +0000
1034@@ -47,7 +47,7 @@
1035 class SearchReplyImpl : public virtual unity::scopes::SearchReply, public virtual ReplyImpl
1036 {
1037 public:
1038- SearchReplyImpl(MWReplyProxy const& mw_proxy, std::shared_ptr<QueryObjectBase>const & qo);
1039+ SearchReplyImpl(MWReplyProxy const& mw_proxy, std::shared_ptr<QueryObjectBase>const & qo, int cardinality);
1040 virtual ~SearchReplyImpl();
1041
1042 virtual void register_departments(DepartmentList const& departments, std::string current_department_id) override;
1043
1044=== modified file 'include/unity/scopes/internal/TaskWrapper.h'
1045--- include/unity/scopes/internal/TaskWrapper.h 2014-01-23 11:28:34 +0000
1046+++ include/unity/scopes/internal/TaskWrapper.h 2014-04-28 05:57:56 +0000
1047@@ -31,7 +31,7 @@
1048 {
1049
1050 // Simple wrapper for tasks. Allows us to use a std::packaged_task as the functor
1051-// for a task in a thread pool, without having to know the task's return type in advance.
1052+// for a task in a thread pool without having to know the task's return type in advance.
1053
1054 class TaskWrapper final
1055 {
1056@@ -54,7 +54,6 @@
1057
1058 void operator()()
1059 {
1060-
1061 task_->call();
1062 }
1063
1064
1065=== modified file 'include/unity/scopes/internal/ThreadPool.h'
1066--- include/unity/scopes/internal/ThreadPool.h 2014-01-23 11:28:34 +0000
1067+++ include/unity/scopes/internal/ThreadPool.h 2014-04-28 05:57:56 +0000
1068@@ -22,6 +22,7 @@
1069 #include <unity/scopes/internal/ThreadSafeQueue.h>
1070 #include <unity/scopes/internal/TaskWrapper.h>
1071 #include <unity/UnityExceptions.h>
1072+#include <unity/util/DefinesPtrs.h>
1073
1074 #include <future>
1075
1076@@ -42,6 +43,7 @@
1077 {
1078 public:
1079 NONCOPYABLE(ThreadPool);
1080+ UNITY_DEFINES_PTRS(ThreadPool);
1081
1082 ThreadPool(int size);
1083 ~ThreadPool();
1084
1085=== modified file 'include/unity/scopes/internal/ThreadSafeQueue.h'
1086--- include/unity/scopes/internal/ThreadSafeQueue.h 2014-01-23 11:28:34 +0000
1087+++ include/unity/scopes/internal/ThreadSafeQueue.h 2014-04-28 05:57:56 +0000
1088@@ -19,9 +19,9 @@
1089 #ifndef UNITY_SCOPES_INTERNAL_THREADSAFEQUEUE_H
1090 #define UNITY_SCOPES_INTERNAL_THREADSAFEQUEUE_H
1091
1092+#include <unity/util/DefinesPtrs.h>
1093 #include <unity/util/NonCopyable.h>
1094
1095-#include <atomic>
1096 #include <condition_variable>
1097 #include <mutex>
1098 #include <queue>
1099@@ -45,30 +45,34 @@
1100 {
1101 public:
1102 NONCOPYABLE(ThreadSafeQueue);
1103+ UNITY_DEFINES_PTRS(ThreadSafeQueue);
1104+
1105 typedef T value_type;
1106
1107 ThreadSafeQueue();
1108 ~ThreadSafeQueue();
1109
1110 void destroy() noexcept;
1111+ void wait_for_destroy() noexcept;
1112 void push(T const& item);
1113 void push(T&& item);
1114 T wait_and_pop();
1115 bool try_pop(T& item);
1116 bool empty() const noexcept;
1117+ void wait_until_empty() const noexcept;
1118 size_t size() const noexcept;
1119
1120 private:
1121 std::queue<T> queue_;
1122 mutable std::mutex mutex_;
1123- std::condition_variable cond_;
1124- bool done_;
1125- std::atomic<int> num_waiters_;
1126+ mutable std::condition_variable cond_;
1127+ bool destroyed_;
1128+ int num_waiters_;
1129 };
1130
1131 template<typename T>
1132 ThreadSafeQueue<T>::ThreadSafeQueue() :
1133- done_(false),
1134+ destroyed_(false),
1135 num_waiters_(0)
1136 {
1137 }
1138@@ -77,40 +81,50 @@
1139 ThreadSafeQueue<T>::~ThreadSafeQueue()
1140 {
1141 destroy();
1142-
1143- // Don't destroy the object while there are still threads in wait_and_pop(), otherwise
1144- // a thread that wakes up in wait_and_pop() will try to re-lock the already-destroyed
1145- // mutex.
1146- while (num_waiters_.load() > 0)
1147- std::this_thread::yield(); // LCOV_EXCL_LINE (impossible to reliably hit with a test)
1148+ wait_for_destroy();
1149 }
1150
1151 template<typename T>
1152 void ThreadSafeQueue<T>::destroy() noexcept
1153 {
1154 std::lock_guard<std::mutex> lock(mutex_);
1155- if (done_)
1156+ if (destroyed_)
1157 {
1158 return;
1159 }
1160- done_ = true;
1161- cond_.notify_all(); // Wake up anyone asleep in wait_and_pop()
1162+ destroyed_ = true;
1163+ cond_.notify_all(); // Wake up anyone asleep in wait_and_pop() or wait_for_destroy()
1164+}
1165+
1166+template<typename T>
1167+void ThreadSafeQueue<T>::wait_for_destroy() noexcept
1168+{
1169+ std::unique_lock<std::mutex> lock(mutex_);
1170+ cond_.wait(lock, [this] { return destroyed_ && num_waiters_ == 0; });
1171 }
1172
1173 template<typename T>
1174 void ThreadSafeQueue<T>::push(T const& item)
1175 {
1176 std::lock_guard<std::mutex> lock(mutex_);
1177+ if (destroyed_)
1178+ {
1179+ throw std::runtime_error("ThreadSafeQueue: cannot push onto destroyed queue");
1180+ }
1181 queue_.push(item);
1182- cond_.notify_one();
1183+ cond_.notify_all();
1184 }
1185
1186 template<typename T>
1187 void ThreadSafeQueue<T>::push(T&& item)
1188 {
1189 std::lock_guard<std::mutex> lock(mutex_);
1190+ if (destroyed_)
1191+ {
1192+ throw std::runtime_error("ThreadSafeQueue: cannot push onto destroyed queue");
1193+ }
1194 queue_.emplace(std::move(item));
1195- cond_.notify_one();
1196+ cond_.notify_all();
1197 }
1198
1199 template<typename T>
1200@@ -118,16 +132,21 @@
1201 {
1202 std::unique_lock<std::mutex> lock(mutex_);
1203 ++num_waiters_;
1204- cond_.wait(lock, [this] { return done_ || queue_.size() != 0; });
1205- if (done_)
1206+ cond_.wait(lock, [this] { return destroyed_ || queue_.size() != 0; });
1207+ if (destroyed_)
1208 {
1209- lock.unlock();
1210- --num_waiters_;
1211+ if (--num_waiters_ == 0)
1212+ {
1213+ cond_.notify_all();
1214+ }
1215 throw std::runtime_error("ThreadSafeQueue: queue destroyed while thread was blocked in wait_and_pop()");
1216 }
1217 T item = std::move(queue_.front());
1218 queue_.pop();
1219- --num_waiters_;
1220+ if (--num_waiters_ == 0 || queue_.empty())
1221+ {
1222+ cond_.notify_all();
1223+ }
1224 return item;
1225 }
1226
1227@@ -141,6 +160,10 @@
1228 }
1229 item = std::move(queue_.front());
1230 queue_.pop();
1231+ if (queue_.empty())
1232+ {
1233+ cond_.notify_all();
1234+ }
1235 return true;
1236 }
1237
1238@@ -152,6 +175,13 @@
1239 }
1240
1241 template<typename T>
1242+void ThreadSafeQueue<T>::wait_until_empty() const noexcept
1243+{
1244+ std::unique_lock<std::mutex> lock(mutex_);
1245+ cond_.wait(lock, [this] { return queue_.empty(); });
1246+}
1247+
1248+template<typename T>
1249 size_t ThreadSafeQueue<T>::size() const noexcept
1250 {
1251 std::lock_guard<std::mutex> lock(mutex_);
1252
1253=== modified file 'include/unity/scopes/internal/zmq_middleware/ConnectionPool.h'
1254--- include/unity/scopes/internal/zmq_middleware/ConnectionPool.h 2014-01-22 03:29:48 +0000
1255+++ include/unity/scopes/internal/zmq_middleware/ConnectionPool.h 2014-04-28 05:57:56 +0000
1256@@ -1,5 +1,5 @@
1257 /*
1258- * Copyright (C) 2013 Canonical Ltd
1259+ * Copyright (C) 2014 Canonical Ltd
1260 *
1261 * This program is free software: you can redistribute it and/or modify
1262 * it under the terms of the GNU Lesser General Public License version 3 as
1263@@ -24,7 +24,6 @@
1264
1265 #include <zmqpp/socket.hpp>
1266
1267-#include <mutex>
1268 #include <string>
1269 #include <unordered_map>
1270
1271@@ -33,6 +32,8 @@
1272 // different threads to crash.
1273 // So, we maintain a pool of invocation threads, with each thread keeping its own cache of sockets.
1274 // Sockets are indexed by adapter name and created lazily.
1275+//
1276+// WARNING: No locking anywhere here. The pool is intended for us as a thread_local static member only.
1277
1278 namespace unity
1279 {
1280@@ -57,17 +58,18 @@
1281 void register_socket(std::string const& endpoint, zmqpp::socket socket, RequestMode m);
1282
1283 private:
1284- struct Connection
1285+ struct SocketData
1286 {
1287 zmqpp::socket socket;
1288 RequestMode mode;
1289 };
1290
1291- typedef std::unordered_map<std::string, Connection> CPool;
1292+ typedef std::unordered_map<std::string, SocketData> CPool;
1293+
1294+ CPool::value_type create_connection(std::string const& endpoint, RequestMode m, int64_t timeout);
1295
1296 zmqpp::context& context_;
1297 CPool pool_;
1298- std::mutex mutex_;
1299 };
1300
1301 } // namespace zmq_middleware
1302
1303=== modified file 'include/unity/scopes/internal/zmq_middleware/ObjectAdapter.h'
1304--- include/unity/scopes/internal/zmq_middleware/ObjectAdapter.h 2014-01-24 00:26:43 +0000
1305+++ include/unity/scopes/internal/zmq_middleware/ObjectAdapter.h 2014-04-28 05:57:56 +0000
1306@@ -46,6 +46,7 @@
1307 {
1308
1309 class ServantBase;
1310+class StopPublisher;
1311 class ZmqMiddleware;
1312
1313 class ObjectAdapter final
1314@@ -89,12 +90,12 @@
1315 // The Failed state is reachable from any of the other states and indicates
1316 // a fatal error condition.
1317 enum AdapterState { Inactive, Activating, Active, Deactivating, Destroyed, Failed };
1318- void throw_bad_state(AdapterState state) const;
1319+ void throw_bad_state(std::string const& label, AdapterState state) const;
1320
1321 void run_workers();
1322- void init_ctrl_socket();
1323- zmqpp::socket subscribe_to_ctrl_socket();
1324- void stop_workers() noexcept;
1325+ // void init_ctrl_socket();
1326+ // zmqpp::socket subscribe_to_ctrl_socket();
1327+ // void stop_workers() noexcept;
1328
1329 void safe_bind(zmqpp::socket& s, std::string const& endpoint);
1330
1331@@ -114,8 +115,9 @@
1332 std::string endpoint_;
1333 RequestMode mode_;
1334 int pool_size_;
1335- std::unique_ptr<zmqpp::socket> ctrl_; // PUB socket to signal when to deactivate
1336- std::mutex ctrl_mutex_; // Synchronizes access to ctrl_ when sending
1337+ // std::unique_ptr<zmqpp::socket> ctrl_; // PUB socket to signal when to deactivate
1338+ // std::mutex ctrl_mutex_; // Synchronizes access to ctrl_ when sending
1339+ std::unique_ptr<StopPublisher> stopper_; // Used to signal threads when it's time to terminate
1340 std::thread broker_; // Connects router with dealer
1341 std::vector<std::thread> workers_; // Threads for incoming invocations
1342 std::atomic_int num_workers_; // For handshake with parent
1343
1344=== added file 'include/unity/scopes/internal/zmq_middleware/StopPublisher.h'
1345--- include/unity/scopes/internal/zmq_middleware/StopPublisher.h 1970-01-01 00:00:00 +0000
1346+++ include/unity/scopes/internal/zmq_middleware/StopPublisher.h 2014-04-28 05:57:56 +0000
1347@@ -0,0 +1,85 @@
1348+/*
1349+ * Copyright (C) 2014 Canonical Ltd
1350+ *
1351+ * This program is free software: you can redistribute it and/or modify
1352+ * it under the terms of the GNU Lesser General Public License version 3 as
1353+ * published by the Free Software Foundation.
1354+ *
1355+ * This program is distributed in the hope that it will be useful,
1356+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
1357+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1358+ * GNU Lesser General Public License for more details.
1359+ *
1360+ * You should have received a copy of the GNU Lesser General Public License
1361+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
1362+ *
1363+ * Authored by: Michi Henning <michi.henning@canonical.com>
1364+ */
1365+
1366+#ifndef UNITY_SCOPES_INTERNAL_ZMQMIDDLEWARE_STOPPUBLISHER_H
1367+#define UNITY_SCOPES_INTERNAL_ZMQMIDDLEWARE_STOPPUBLISHER_H
1368+
1369+#include <unity/util/NonCopyable.h>
1370+
1371+#include <zmqpp/context.hpp>
1372+#include <zmqpp/socket.hpp>
1373+
1374+#include <condition_variable>
1375+#include <mutex>
1376+#include <thread>
1377+
1378+namespace unity
1379+{
1380+
1381+namespace scopes
1382+{
1383+
1384+namespace internal
1385+{
1386+
1387+namespace zmq_middleware
1388+{
1389+
1390+// StopPublisher provides a way to use Zmq pub/sub without falling prey
1391+// to the slow joiner syndrome. The provided name is used to create an inproc
1392+// endpoint by prefixing the name with "inproc://".
1393+// Subscribers create a socket by calling subscribe(). The returned
1394+// socket becomes ready for reading (for a zero-length message) once
1395+// some thread calls stop(). Multiple calls to stop() are benign.
1396+
1397+class StopPublisher final
1398+{
1399+public:
1400+ NONCOPYABLE(StopPublisher);
1401+
1402+ StopPublisher(zmqpp::context* context, std::string const& inproc_name);
1403+ ~StopPublisher();
1404+
1405+ std::string endpoint() const;
1406+ zmqpp::socket subscribe();
1407+ void stop();
1408+
1409+private:
1410+ zmqpp::context* context_;
1411+ std::string endpoint_;
1412+ std::thread thread_;
1413+ std::exception_ptr ex_;
1414+
1415+ enum State { Starting, Started, Stopping, Stopped, Failed };
1416+ State state_;
1417+
1418+ mutable std::mutex m_;
1419+ std::condition_variable cond_;
1420+
1421+ void stopper_thread() noexcept;
1422+};
1423+
1424+} // namespace zmq_middleware
1425+
1426+} // namespace internal
1427+
1428+} // namespace scopes
1429+
1430+} // namespace unity
1431+
1432+#endif
1433
1434=== modified file 'include/unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h'
1435--- include/unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h 2014-02-03 02:53:33 +0000
1436+++ include/unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h 2014-04-28 05:57:56 +0000
1437@@ -21,6 +21,7 @@
1438
1439 #include <unity/scopes/internal/MWObjectProxy.h>
1440 #include <scopes/internal/zmq_middleware/capnproto/Message.capnp.h>
1441+#include <unity/scopes/internal/zmq_middleware/ConnectionPool.h>
1442 #include <unity/scopes/internal/zmq_middleware/RequestMode.h>
1443 #include <unity/scopes/internal/zmq_middleware/ZmqMiddleware.h>
1444 #include <unity/scopes/internal/zmq_middleware/ZmqObjectProxyFwd.h>
1445@@ -71,8 +72,11 @@
1446
1447 protected:
1448 capnproto::Request::Builder make_request_(capnp::MessageBuilder& b, std::string const& operation_name) const;
1449- ZmqReceiver invoke_(capnp::MessageBuilder& out_params);
1450- ZmqReceiver invoke_(capnp::MessageBuilder& out_params, int64_t timeout);
1451+
1452+ void invoke_oneway_(capnp::MessageBuilder& out_params);
1453+
1454+ ZmqReceiver invoke_twoway_(capnp::MessageBuilder& out_params);
1455+ ZmqReceiver invoke_twoway_(capnp::MessageBuilder& out_params, int64_t timeout);
1456
1457 private:
1458 std::string endpoint_;
1459
1460=== modified file 'src/scopes/QueryBase.cpp'
1461--- src/scopes/QueryBase.cpp 2014-03-10 04:17:09 +0000
1462+++ src/scopes/QueryBase.cpp 2014-04-28 05:57:56 +0000
1463@@ -50,6 +50,11 @@
1464 p->set_metadata(metadata);
1465 }
1466
1467+bool QueryBase::valid() const
1468+{
1469+ return p->valid();
1470+}
1471+
1472 } // namespace scopes
1473
1474 } // namespace unity
1475
1476=== modified file 'src/scopes/internal/ObjectImpl.cpp'
1477--- src/scopes/internal/ObjectImpl.cpp 2014-04-03 12:57:25 +0000
1478+++ src/scopes/internal/ObjectImpl.cpp 2014-04-28 05:57:56 +0000
1479@@ -19,8 +19,12 @@
1480 #include <unity/scopes/internal/ObjectImpl.h>
1481
1482 #include <unity/scopes/internal/MWObjectProxy.h>
1483+#include <unity/scopes/ScopeExceptions.h>
1484+
1485+#include <cassert>
1486
1487 using namespace std;
1488+using namespace unity::scopes;
1489
1490 namespace unity
1491 {
1492@@ -31,9 +35,10 @@
1493 namespace internal
1494 {
1495
1496-ObjectImpl::ObjectImpl(MWProxy const& mw_proxy) :
1497- mw_proxy_(mw_proxy)
1498+ObjectImpl::ObjectImpl(MWProxy const& mw_proxy)
1499 {
1500+ lock_guard<mutex> lock(proxy_mutex_);
1501+ mw_proxy_ = mw_proxy;
1502 }
1503
1504 ObjectImpl::~ObjectImpl()
1505@@ -42,6 +47,7 @@
1506
1507 string ObjectImpl::identity()
1508 {
1509+ check_proxy();
1510 return mw_proxy_->identity();
1511 }
1512
1513@@ -52,29 +58,51 @@
1514
1515 string ObjectImpl::endpoint()
1516 {
1517+ check_proxy();
1518 return mw_proxy_->endpoint();
1519 }
1520
1521 int64_t ObjectImpl::timeout()
1522 {
1523+ check_proxy();
1524 return mw_proxy_->timeout();
1525 }
1526
1527 string ObjectImpl::to_string()
1528 {
1529+ check_proxy();
1530 return mw_proxy_->to_string();
1531 }
1532
1533 void ObjectImpl::ping()
1534 {
1535+ check_proxy();
1536 mw_proxy_->ping();
1537 }
1538
1539-MWProxy ObjectImpl::proxy() const
1540+MWProxy ObjectImpl::proxy()
1541 {
1542+ lock_guard<mutex> lock(proxy_mutex_);
1543 return mw_proxy_;
1544 }
1545
1546+void ObjectImpl::set_proxy(MWProxy const& p)
1547+{
1548+ assert(p);
1549+ lock_guard<mutex> lock(proxy_mutex_);
1550+ assert(!mw_proxy_);
1551+ mw_proxy_ = p;
1552+}
1553+
1554+void ObjectImpl::check_proxy()
1555+{
1556+ lock_guard<mutex> lock(proxy_mutex_);
1557+ if (!mw_proxy_)
1558+ {
1559+ throw MiddlewareException("Cannot invoke on null proxy");
1560+ }
1561+}
1562+
1563 } // namespace internal
1564
1565 } // namespace scopes
1566
1567=== modified file 'src/scopes/internal/QueryBaseImpl.cpp'
1568--- src/scopes/internal/QueryBaseImpl.cpp 2014-03-10 04:17:09 +0000
1569+++ src/scopes/internal/QueryBaseImpl.cpp 2014-04-28 05:57:56 +0000
1570@@ -37,6 +37,7 @@
1571 {
1572
1573 QueryBaseImpl::QueryBaseImpl()
1574+ : valid_(true)
1575 {
1576 }
1577
1578@@ -54,6 +55,8 @@
1579 // This allows cancel() to forward incoming cancellations to subqueries
1580 // without intervention from the scope application code.
1581 QueryCtrlProxy qcp = scope->search(query_string, *search_metadata_, reply);
1582+
1583+ lock_guard<mutex> lock(mutex_);
1584 subqueries_.push_back(qcp);
1585 return qcp;
1586 }
1587@@ -66,6 +69,8 @@
1588 assert(search_metadata_);
1589
1590 QueryCtrlProxy qcp = scope->search(query_string, filter_state, *search_metadata_, reply);
1591+
1592+ lock_guard<mutex> lock(mutex_);
1593 subqueries_.push_back(qcp);
1594 return qcp;
1595 }
1596@@ -79,6 +84,8 @@
1597 assert(search_metadata_);
1598
1599 QueryCtrlProxy qcp = scope->search(query_string, department_id, filter_state, *search_metadata_, reply);
1600+
1601+ lock_guard<mutex> lock(mutex_);
1602 subqueries_.push_back(qcp);
1603 return qcp;
1604 }
1605@@ -91,6 +98,8 @@
1606 SearchListenerBase::SPtr const& reply)
1607 {
1608 QueryCtrlProxy qcp = scope->search(query_string, department_id, filter_state, metadata, reply);
1609+
1610+ lock_guard<mutex> lock(mutex_);
1611 subqueries_.push_back(qcp);
1612 return qcp;
1613 }
1614@@ -102,16 +111,29 @@
1615
1616 void QueryBaseImpl::cancel()
1617 {
1618+ lock_guard<mutex> lock(mutex_);
1619+
1620+ if (!valid_)
1621+ {
1622+ return;
1623+ }
1624+ valid_ = false;
1625 for (auto& ctrl : subqueries_)
1626 {
1627 ctrl->cancel(); // Forward the cancellation to any subqueries that might be active
1628 }
1629 // We release the memory for the subquery controls here. That's just a micro-optimization
1630- // because this QueryBase will be destroyed shortly anyway, once the cancelled() method
1631- // of the application returns. (Not deallocating here would work too.)
1632+ // because this QueryBase will be destroyed shortly anyway, once the cancelled() (and possibly
1633+ // run()) methods of the application return. (Not deallocating here would work too.)
1634 vector<QueryCtrlProxy>().swap(subqueries_);
1635 }
1636
1637+bool QueryBaseImpl::valid() const
1638+{
1639+ lock_guard<mutex> lock(mutex_);
1640+ return valid_;
1641+}
1642+
1643 } // namespace internal
1644
1645 } // namespace scopes
1646
1647=== modified file 'src/scopes/internal/QueryCtrlImpl.cpp'
1648--- src/scopes/internal/QueryCtrlImpl.cpp 2014-03-07 02:01:47 +0000
1649+++ src/scopes/internal/QueryCtrlImpl.cpp 2014-04-28 05:57:56 +0000
1650@@ -43,10 +43,13 @@
1651 ObjectImpl(ctrl_proxy),
1652 reply_proxy_(reply_proxy)
1653 {
1654- assert(ctrl_proxy);
1655- assert(reply_proxy);
1656 // We remember the reply proxy so, when the query is cancelled, we can
1657 // inform the reply object belonging to this query that the query is finished.
1658+ assert(reply_proxy);
1659+
1660+ lock_guard<mutex> lock(mutex_);
1661+ ready_ = ctrl_proxy != nullptr;
1662+ cancelled_ = false;
1663 }
1664
1665 QueryCtrlImpl::~QueryCtrlImpl()
1666@@ -55,13 +58,24 @@
1667
1668 void QueryCtrlImpl::cancel()
1669 {
1670+ {
1671+ lock_guard<mutex> lock(mutex_);
1672+ if (!ready_)
1673+ {
1674+ // Remember that query was cancelled, so we can call
1675+ // cancel() once set_proxy() is called.
1676+ cancelled_ = true;
1677+ return;
1678+ }
1679+ }
1680+
1681 try
1682 {
1683 // Forward cancellation down-stream to the query. This does not block.
1684 fwd()->cancel();
1685
1686 // Indicate (to ourselves) that this query is complete. Calling via the MWReplyProxy ensures
1687- // the finished() call will be processed by a seperate server-side thread,
1688+ // the finished() call will be processed by a separate server-side thread,
1689 // so we cannot block here.
1690 reply_proxy_->finished(ListenerBase::Cancelled, "");
1691 }
1692@@ -72,7 +86,28 @@
1693 }
1694 }
1695
1696-MWQueryCtrlProxy QueryCtrlImpl::fwd() const
1697+void QueryCtrlImpl::set_proxy(MWQueryCtrlProxy const& p)
1698+{
1699+ assert(proxy() == nullptr);
1700+ ObjectImpl::set_proxy(p);
1701+
1702+ bool need_cancel;
1703+
1704+ {
1705+ lock_guard<mutex> lock(mutex_);
1706+ ready_ = true;
1707+ need_cancel = cancelled_;
1708+ } // Unlock
1709+
1710+ if (need_cancel)
1711+ {
1712+ // If cancel() was called earlier, do the actual cancellation now
1713+ // that we have the middleware proxy.
1714+ cancel();
1715+ }
1716+}
1717+
1718+MWQueryCtrlProxy QueryCtrlImpl::fwd()
1719 {
1720 return dynamic_pointer_cast<MWQueryCtrl>(proxy());
1721 }
1722
1723=== modified file 'src/scopes/internal/QueryObject.cpp'
1724--- src/scopes/internal/QueryObject.cpp 2014-03-07 03:49:03 +0000
1725+++ src/scopes/internal/QueryObject.cpp 2014-04-28 05:57:56 +0000
1726@@ -77,6 +77,8 @@
1727
1728 void QueryObject::run(MWReplyProxy const& reply, InvokeInfo const& /* info */) noexcept
1729 {
1730+ unique_lock<mutex> lock(mutex_);
1731+
1732 // It is possible for a run() to be dispatched by the middleware *after* the query
1733 // was cancelled. This can happen because run() and cancel() are dispatched by different
1734 // thread pools. If the scope implementation uses a synchronous run(), a later run()
1735@@ -84,16 +86,18 @@
1736 // completes, by which time the query for the later run() call may have been
1737 // cancelled already.
1738 // If the query was cancelled by the client before we even receive the
1739- // run invocation, we never forward the run() to the implementation.
1740+ // run invocation, we never forward the run() call to the implementation.
1741 if (!pushable_)
1742 {
1743+ self_ = nullptr;
1744+ disconnect();
1745 return;
1746 }
1747
1748 // Create the reply proxy to pass to query_base_ and keep a weak_ptr, which we will need
1749 // if cancel() is called later.
1750 assert(self_);
1751- auto reply_proxy = make_shared<SearchReplyImpl>(reply, self_);
1752+ auto reply_proxy = make_shared<SearchReplyImpl>(reply, self_, cardinality_);
1753 assert(reply_proxy);
1754 reply_proxy_ = reply_proxy;
1755
1756@@ -102,12 +106,15 @@
1757 self_ = nullptr;
1758 disconnect();
1759
1760- // Synchronous call into scope implementation.
1761- // On return, replies for the query may still be outstanding.
1762 try
1763 {
1764 auto search_query = dynamic_pointer_cast<SearchQueryBase>(query_base_);
1765 assert(search_query);
1766+
1767+ lock.unlock();
1768+
1769+ // Synchronous call into scope implementation.
1770+ // On return, replies for the query may still be outstanding.
1771 search_query->run(reply_proxy);
1772 }
1773 catch (std::exception const& e)
1774@@ -128,15 +135,24 @@
1775
1776 void QueryObject::cancel(InvokeInfo const& /* info */)
1777 {
1778- pushable_ = false;
1779- auto rp = reply_proxy_.lock();
1780- if (rp)
1781 {
1782- // Send finished() to up-stream client to tell him the query is done.
1783- // We send via the MWReplyProxy here because that allows passing
1784- // a ListenerBase::Reason (whereas the public ReplyProxy does not).
1785- reply_->finished(ListenerBase::Cancelled, ""); // Oneway, can't block
1786- }
1787+ lock_guard<mutex> lock(mutex_);
1788+
1789+ if (!pushable_)
1790+ {
1791+ return;
1792+ }
1793+ pushable_ = false;
1794+
1795+ auto rp = reply_proxy_.lock();
1796+ if (rp)
1797+ {
1798+ // Send finished() to up-stream client to tell him the query is done.
1799+ // We send via the MWReplyProxy here because that allows passing
1800+ // a ListenerBase::Reason (whereas the public ReplyProxy does not).
1801+ reply_->finished(ListenerBase::Cancelled, ""); // Oneway, can't block
1802+ }
1803+ } // Release lock
1804
1805 // Forward the cancellation to the query base (which in turn will forward it to any subqueries).
1806 // The query base also calls the cancelled() callback to inform the application code.
1807@@ -145,11 +161,13 @@
1808
1809 bool QueryObject::pushable(InvokeInfo const& /* info */) const noexcept
1810 {
1811+ lock_guard<mutex> lock(mutex_);
1812 return pushable_;
1813 }
1814
1815 int QueryObject::cardinality(InvokeInfo const& /* info */) const noexcept
1816 {
1817+ lock_guard<mutex> lock(mutex_);
1818 return cardinality_;
1819 }
1820
1821@@ -169,6 +187,8 @@
1822
1823 void QueryObject::set_self(QueryObjectBase::SPtr const& self) noexcept
1824 {
1825+ lock_guard<mutex> lock(mutex_);
1826+
1827 assert(self);
1828 assert(!self_);
1829 self_ = self;
1830
1831=== modified file 'src/scopes/internal/RegistryImpl.cpp'
1832--- src/scopes/internal/RegistryImpl.cpp 2014-04-03 12:57:25 +0000
1833+++ src/scopes/internal/RegistryImpl.cpp 2014-04-28 05:57:56 +0000
1834@@ -71,7 +71,7 @@
1835 return matching_entries;
1836 }
1837
1838-MWRegistryProxy RegistryImpl::fwd() const
1839+MWRegistryProxy RegistryImpl::fwd()
1840 {
1841 return dynamic_pointer_cast<MWRegistry>(proxy());
1842 }
1843
1844=== modified file 'src/scopes/internal/ReplyImpl.cpp'
1845--- src/scopes/internal/ReplyImpl.cpp 2014-03-07 05:56:22 +0000
1846+++ src/scopes/internal/ReplyImpl.cpp 2014-04-28 05:57:56 +0000
1847@@ -143,7 +143,7 @@
1848 }
1849 }
1850
1851-MWReplyProxy ReplyImpl::fwd() const
1852+MWReplyProxy ReplyImpl::fwd()
1853 {
1854 return dynamic_pointer_cast<MWReply>(proxy());
1855 }
1856
1857=== modified file 'src/scopes/internal/ReplyObject.cpp'
1858--- src/scopes/internal/ReplyObject.cpp 2014-03-10 10:16:13 +0000
1859+++ src/scopes/internal/ReplyObject.cpp 2014-04-28 05:57:56 +0000
1860@@ -50,9 +50,8 @@
1861 assert(receiver_base);
1862 assert(runtime);
1863 reap_item_ = runtime->reply_reaper()->add([this] {
1864- cerr << "No activity on ReplyObject for scope " << this->origin_proxy_
1865- << ": ReplyObject destroyed" << endl;
1866- this->disconnect();
1867+ string msg = "No activity on ReplyObject for scope " + this->origin_proxy_ + ": ReplyObject destroyed";
1868+ this->finished(ListenerBase::Error, msg);
1869 });
1870 }
1871
1872
1873=== modified file 'src/scopes/internal/RuntimeImpl.cpp'
1874--- src/scopes/internal/RuntimeImpl.cpp 2014-04-11 08:22:17 +0000
1875+++ src/scopes/internal/RuntimeImpl.cpp 2014-04-28 05:57:56 +0000
1876@@ -51,11 +51,14 @@
1877 namespace internal
1878 {
1879
1880-RuntimeImpl::RuntimeImpl(string const& scope_id, string const& configfile) :
1881- destroyed_(false),
1882- scope_id_(scope_id)
1883+RuntimeImpl::RuntimeImpl(string const& scope_id, string const& configfile)
1884 {
1885- if (scope_id.empty())
1886+ lock_guard<mutex> lock(mutex_);
1887+
1888+ destroyed_ = false;
1889+
1890+ scope_id_ = scope_id;
1891+ if (scope_id_.empty())
1892 {
1893 UniqueID id;
1894 scope_id_ = "c-" + id.gen();
1895@@ -77,6 +80,10 @@
1896 middleware_ = middleware_factory_->create(scope_id_, default_middleware, middleware_configfile);
1897 middleware_->start();
1898
1899+ async_pool_ = make_shared<ThreadPool>(1); // TODO: configurable pool size
1900+ future_queue_ = make_shared<ThreadSafeQueue<future<void>>>();
1901+ waiter_thread_ = std::thread([this]{ waiter_thread(future_queue_); });
1902+
1903 if (registry_configfile_.empty() || registry_identity_.empty())
1904 {
1905 cerr << "Warning: no registry configured" << endl;
1906@@ -124,21 +131,51 @@
1907
1908 void RuntimeImpl::destroy()
1909 {
1910- if (!destroyed_.exchange(true))
1911- {
1912- // TODO: not good enough. Need to wait for the middleware to stop and for the reaper
1913- // to terminate. Otherwise, it's possible that we exit while threads are still running
1914- // with undefined behavior.
1915- registry_ = nullptr;
1916- middleware_->stop();
1917- middleware_ = nullptr;
1918- middleware_factory_.reset(nullptr);
1919+ lock_guard<mutex> lock(mutex_);
1920+
1921+ if (destroyed_)
1922+ {
1923+ return;
1924+ }
1925+ destroyed_ = true;
1926+
1927+ // Stop the reaper. Any ReplyObject instances that may still
1928+ // be hanging around will be cleaned up when the server side
1929+ // is shut down.
1930+ if (reply_reaper_)
1931+ {
1932+ reply_reaper_->destroy();
1933 reply_reaper_ = nullptr;
1934 }
1935+
1936+ // No more outgoing invocations.
1937+ async_pool_ = nullptr;
1938+
1939+ // Wait for any twoway operations that were invoked asynchronously to complete.
1940+ if (future_queue_)
1941+ {
1942+ future_queue_->wait_until_empty();
1943+ future_queue_->destroy();
1944+ future_queue_->wait_for_destroy();
1945+ }
1946+
1947+ if (waiter_thread_.joinable())
1948+ {
1949+ waiter_thread_.join();
1950+ }
1951+
1952+ // Shut down server-side.
1953+ middleware_->stop();
1954+ middleware_->wait_for_shutdown();
1955+ middleware_ = nullptr;
1956+ middleware_factory_.reset(nullptr);
1957+
1958+ registry_ = nullptr;
1959 }
1960
1961 string RuntimeImpl::scope_id() const
1962 {
1963+ lock_guard<mutex> lock(mutex_);
1964 return scope_id_;
1965 }
1966
1967@@ -149,7 +186,9 @@
1968
1969 MiddlewareFactory const* RuntimeImpl::factory() const
1970 {
1971- if (destroyed_.load())
1972+ lock_guard<mutex> lock(mutex_);
1973+
1974+ if (destroyed_)
1975 {
1976 throw LogicException("factory(): Cannot obtain factory for already destroyed run time");
1977 }
1978@@ -158,7 +197,9 @@
1979
1980 RegistryProxy RuntimeImpl::registry() const
1981 {
1982- if (destroyed_.load())
1983+ lock_guard<mutex> lock(mutex_);
1984+
1985+ if (destroyed_)
1986 {
1987 throw LogicException("registry(): Cannot obtain registry for already destroyed run time");
1988 }
1989@@ -171,21 +212,25 @@
1990
1991 string RuntimeImpl::registry_configfile() const
1992 {
1993+ lock_guard<mutex> lock(mutex_);
1994 return registry_configfile_;
1995 }
1996
1997 string RuntimeImpl::registry_identity() const
1998 {
1999+ lock_guard<mutex> lock(mutex_);
2000 return registry_identity_;
2001 }
2002
2003 string RuntimeImpl::registry_endpointdir() const
2004 {
2005+ lock_guard<mutex> lock(mutex_);
2006 return registry_endpointdir_;
2007 }
2008
2009 string RuntimeImpl::registry_endpoint() const
2010 {
2011+ lock_guard<mutex> lock(mutex_);
2012 return registry_endpoint_;
2013 }
2014
2015@@ -200,6 +245,35 @@
2016 return reply_reaper_;
2017 }
2018
2019+void RuntimeImpl::waiter_thread(ThreadSafeQueue<std::future<void>>::SPtr const& queue) const noexcept
2020+{
2021+ for (;;)
2022+ {
2023+ try
2024+ {
2025+ // Wait on the future from an async invocation.
2026+ // wait_and_pop() throws runtime_error when queue is destroyed
2027+ queue->wait_and_pop().get();
2028+ }
2029+ catch (std::runtime_error const&)
2030+ {
2031+ break;
2032+ }
2033+ }
2034+}
2035+
2036+ThreadPool::SPtr RuntimeImpl::async_pool() const
2037+{
2038+ lock_guard<mutex> lock(mutex_);
2039+ return async_pool_;
2040+}
2041+
2042+ThreadSafeQueue<future<void>>::SPtr RuntimeImpl::future_queue() const
2043+{
2044+ lock_guard<mutex> lock(mutex_);
2045+ return future_queue_;
2046+}
2047+
2048 void RuntimeImpl::run_scope(ScopeBase *const scope_base, std::string const& scope_ini_file)
2049 {
2050 // Retrieve the registry middleware and create a proxy to its state receiver
2051
2052=== modified file 'src/scopes/internal/ScopeImpl.cpp'
2053--- src/scopes/internal/ScopeImpl.cpp 2014-03-07 04:19:32 +0000
2054+++ src/scopes/internal/ScopeImpl.cpp 2014-04-28 05:57:56 +0000
2055@@ -21,6 +21,7 @@
2056 #include <unity/scopes/ActionMetadata.h>
2057 #include <unity/scopes/internal/ActivationReplyObject.h>
2058 #include <unity/scopes/internal/MiddlewareBase.h>
2059+#include <unity/scopes/internal/MWQueryCtrl.h>
2060 #include <unity/scopes/internal/MWScope.h>
2061 #include <unity/scopes/internal/PreviewReplyObject.h>
2062 #include <unity/scopes/internal/QueryCtrlImpl.h>
2063@@ -30,7 +31,6 @@
2064 #include <unity/UnityExceptions.h>
2065
2066 #include <cassert>
2067-#include <iostream> // TODO: remove this once logging is added
2068
2069 using namespace std;
2070
2071@@ -95,30 +95,48 @@
2072 throw unity::InvalidArgumentException("Scope::search(): invalid SearchListenerBase (nullptr)");
2073 }
2074
2075- QueryCtrlProxy ctrl;
2076 ReplyObject::SPtr ro(make_shared<ResultReplyObject>(reply, runtime_, to_string(), metadata.cardinality()));
2077- try
2078- {
2079- MWReplyProxy rp = fwd()->mw_base()->add_reply_object(ro);
2080-
2081- // Forward the search() method across the bus. This is a
2082- // synchronous twoway interaction with the scope, so it can return
2083- // the QueryCtrlProxy. This may block for some time, for example, if
2084- // the scope is not running and needs to be activated by the registry first.
2085- ctrl = fwd()->search(query, metadata.serialize(), rp);
2086- assert(ctrl);
2087- }
2088- catch (std::exception const& e)
2089+ MWReplyProxy rp = fwd()->mw_base()->add_reply_object(ro);
2090+
2091+ // "Fake" QueryCtrlProxy that doesn't have a real MWQueryCtrlProxy yet.
2092+ shared_ptr<QueryCtrlImpl> ctrl = make_shared<QueryCtrlImpl>(nullptr, rp);
2093+
2094+ // We pass a shared pointer to the lambda (instead of the this pointer)
2095+ // to keep ourselves alive until after the lambda fires.
2096+ // Otherwise, if the ScopeProxy for this invocation goes out of scope before the invocation is
2097+ // sent in the new thread, the lambda will call into this by-now-destroyed instance.
2098+ auto impl = dynamic_pointer_cast<ScopeImpl>(shared_from_this());
2099+
2100+ auto send_search = [impl, query, metadata, rp, ro, ctrl]() -> void
2101 {
2102 try
2103 {
2104- ro->finished(ListenerBase::Error, e.what());
2105+ // Forward the (synchronous) search() method across the bus.
2106+ auto real_ctrl = dynamic_pointer_cast<QueryCtrlImpl>(impl->fwd()->search(query, metadata.serialize(), rp));
2107+ assert(real_ctrl);
2108+
2109+ // Call has completed now, so we update the MWQueryCtrlProxy for the fake proxy
2110+ // with the real proxy that was returned.
2111+ auto new_proxy = dynamic_pointer_cast<MWQueryCtrl>(real_ctrl->proxy());
2112+ assert(new_proxy);
2113+ ctrl->set_proxy(new_proxy);
2114 }
2115- catch (...)
2116+ catch (std::exception const& e)
2117 {
2118+ try
2119+ {
2120+ ro->finished(ListenerBase::Error, e.what());
2121+ }
2122+ catch (...)
2123+ {
2124+ }
2125 }
2126- throw;
2127- }
2128+ };
2129+
2130+ // Send the blocking twoway request asynchronously via the async invocation pool. The waiter thread
2131+ // waits on the future, so it gets cleaned up.
2132+ auto future = runtime_->async_pool()->submit(send_search);
2133+ runtime_->future_queue()->push(move(future));
2134 return ctrl;
2135 }
2136
2137@@ -131,31 +149,40 @@
2138 throw unity::InvalidArgumentException("Scope::activate(): invalid ActivationListenerBase (nullptr)");
2139 }
2140
2141- QueryCtrlProxy ctrl;
2142 ActivationReplyObject::SPtr ro(make_shared<ActivationReplyObject>(reply, runtime_, to_string()));
2143- try
2144- {
2145- MWReplyProxy rp = fwd()->mw_base()->add_reply_object(ro);
2146-
2147- // Forward the activate() method across the bus.
2148- ctrl = fwd()->activate(result.p->activation_target(), metadata.serialize(), rp);
2149- assert(ctrl);
2150- }
2151- catch (std::exception const& e)
2152- {
2153- // TODO: log error
2154- cerr << "activate(): " << e.what() << endl;
2155+ MWReplyProxy rp = fwd()->mw_base()->add_reply_object(ro);
2156+
2157+ shared_ptr<QueryCtrlImpl> ctrl = make_shared<QueryCtrlImpl>(nullptr, rp);
2158+
2159+ auto impl = dynamic_pointer_cast<ScopeImpl>(shared_from_this());
2160+
2161+ auto send_activate = [impl, result, metadata, rp, ro, ctrl]() -> void
2162+ {
2163 try
2164 {
2165- ro->finished(ListenerBase::Error, e.what());
2166- throw;
2167+ auto real_ctrl = dynamic_pointer_cast<QueryCtrlImpl>(impl->fwd()->activate(result.p->activation_target(),
2168+ metadata.serialize(),
2169+ rp));
2170+ assert(real_ctrl);
2171+
2172+ auto new_proxy = dynamic_pointer_cast<MWQueryCtrl>(real_ctrl->proxy());
2173+ assert(new_proxy);
2174+ ctrl->set_proxy(new_proxy);
2175 }
2176- catch (...)
2177+ catch (std::exception const& e)
2178 {
2179- cerr << "activate(): unknown exception" << endl;
2180+ try
2181+ {
2182+ ro->finished(ListenerBase::Error, e.what());
2183+ }
2184+ catch (...)
2185+ {
2186+ }
2187 }
2188- throw;
2189- }
2190+ };
2191+
2192+ auto future = runtime_->async_pool()->submit(send_activate);
2193+ runtime_->future_queue()->push(move(future));
2194 return ctrl;
2195 }
2196
2197@@ -170,35 +197,43 @@
2198 throw unity::InvalidArgumentException("Scope::perform_action(): invalid ActivationListenerBase (nullptr)");
2199 }
2200
2201- QueryCtrlProxy ctrl;
2202- try
2203- {
2204- // Create a middleware server-side object that can receive incoming
2205- // push() and finished() messages over the network.
2206- ActivationReplyObject::SPtr ro(make_shared<ActivationReplyObject>(reply, runtime_, to_string()));
2207- MWReplyProxy rp = fwd()->mw_base()->add_reply_object(ro);
2208-
2209- // Forward the activate() method across the bus.
2210- ctrl = fwd()->perform_action(result.p->activation_target(), metadata.serialize(), widget_id, action_id, rp);
2211- assert(ctrl);
2212- }
2213- catch (std::exception const& e)
2214- {
2215- // TODO: log error
2216- cerr << "perform_action(): " << e.what() << endl;
2217+ ActivationReplyObject::SPtr ro(make_shared<ActivationReplyObject>(reply, runtime_, to_string()));
2218+ MWReplyProxy rp = fwd()->mw_base()->add_reply_object(ro);
2219+
2220+ shared_ptr<QueryCtrlImpl> ctrl = make_shared<QueryCtrlImpl>(nullptr, rp);
2221+
2222+ auto impl = dynamic_pointer_cast<ScopeImpl>(shared_from_this());
2223+
2224+ auto send_perform_action = [impl, result, metadata, widget_id, action_id, rp, ro, ctrl]() -> void
2225+ {
2226 try
2227 {
2228- // TODO: if things go wrong, we need to make sure that the reply object
2229- // is disconnected from the middleware, so it gets deallocated.
2230- reply->finished(ListenerBase::Error, e.what());
2231- throw;
2232+ auto real_ctrl = dynamic_pointer_cast<QueryCtrlImpl>(impl->fwd()->perform_action(
2233+ result.p->activation_target(),
2234+ metadata.serialize(),
2235+ widget_id,
2236+ action_id,
2237+ rp));
2238+ assert(real_ctrl);
2239+
2240+ auto new_proxy = dynamic_pointer_cast<MWQueryCtrl>(real_ctrl->proxy());
2241+ assert(new_proxy);
2242+ ctrl->set_proxy(new_proxy);
2243 }
2244- catch (...)
2245+ catch (std::exception const& e)
2246 {
2247- cerr << "perform_action(): unknown exception" << endl;
2248+ try
2249+ {
2250+ ro->finished(ListenerBase::Error, e.what());
2251+ }
2252+ catch (...)
2253+ {
2254+ }
2255 }
2256- throw;
2257- }
2258+ };
2259+
2260+ auto future = runtime_->async_pool()->submit(send_perform_action);
2261+ runtime_->future_queue()->push(move(future));
2262 return ctrl;
2263 }
2264
2265@@ -211,38 +246,40 @@
2266 throw unity::InvalidArgumentException("Scope::preview(): invalid PreviewListenerBase (nullptr)");
2267 }
2268
2269- QueryCtrlProxy ctrl;
2270 PreviewReplyObject::SPtr ro(make_shared<PreviewReplyObject>(reply, runtime_, to_string()));
2271- try
2272- {
2273- // Create a middleware server-side object that can receive incoming
2274- // push() and finished() messages over the network.
2275- MWReplyProxy rp = fwd()->mw_base()->add_reply_object(ro);
2276-
2277- // Forward the search() method across the bus. This is a
2278- // synchronous twoway interaction with the scope, so it can return
2279- // the QueryCtrlProxy. Because the Scope implementation has a separate
2280- // thread for search() calls, this is guaranteed not to block for
2281- // any length of time. (No application code other than the QueryBase constructor
2282- // is called by search() on the server side.)
2283- ctrl = fwd()->preview(result.p->activation_target(), hints.serialize(), rp);
2284- assert(ctrl);
2285- }
2286- catch (std::exception const& e)
2287- {
2288- // TODO: log error
2289- cerr << "preview(): " << e.what() << endl;
2290+ MWReplyProxy rp = fwd()->mw_base()->add_reply_object(ro);
2291+
2292+ shared_ptr<QueryCtrlImpl> ctrl = make_shared<QueryCtrlImpl>(nullptr, rp);
2293+
2294+ auto impl = dynamic_pointer_cast<ScopeImpl>(shared_from_this());
2295+
2296+ auto send_preview = [impl, result, hints, rp, ro, ctrl]() -> void
2297+ {
2298 try
2299 {
2300- ro->finished(ListenerBase::Error, e.what());
2301- throw;
2302+ auto real_ctrl = dynamic_pointer_cast<QueryCtrlImpl>(impl->fwd()->preview(result.p->activation_target(),
2303+ hints.serialize(),
2304+ rp));
2305+ assert(real_ctrl);
2306+
2307+ auto new_proxy = dynamic_pointer_cast<MWQueryCtrl>(real_ctrl->proxy());
2308+ assert(new_proxy);
2309+ ctrl->set_proxy(new_proxy);
2310 }
2311- catch (...)
2312+ catch (std::exception const& e)
2313 {
2314- cerr << "preview(): unknown exception" << endl;
2315+ try
2316+ {
2317+ ro->finished(ListenerBase::Error, e.what());
2318+ }
2319+ catch (...)
2320+ {
2321+ }
2322 }
2323- throw;
2324- }
2325+ };
2326+
2327+ auto future = runtime_->async_pool()->submit(send_preview);
2328+ runtime_->future_queue()->push(move(future));
2329 return ctrl;
2330 }
2331
2332@@ -251,7 +288,7 @@
2333 return make_shared<ScopeImpl>(mw_proxy, runtime, scope_id);
2334 }
2335
2336-MWScopeProxy ScopeImpl::fwd() const
2337+MWScopeProxy ScopeImpl::fwd()
2338 {
2339 return dynamic_pointer_cast<MWScope>(proxy());
2340 }
2341
2342=== modified file 'src/scopes/internal/ScopeObject.cpp'
2343--- src/scopes/internal/ScopeObject.cpp 2014-03-10 04:17:09 +0000
2344+++ src/scopes/internal/ScopeObject.cpp 2014-04-28 05:57:56 +0000
2345@@ -68,7 +68,6 @@
2346 // to be safe, we don't assert, in case someone is running a broken client.
2347
2348 // TODO: log error about incoming request containing an invalid reply proxy.
2349-
2350 throw LogicException("Scope \"" + runtime_->scope_id() + "\": query() called with null reply proxy");
2351 }
2352
2353
2354=== modified file 'src/scopes/internal/SearchReplyImpl.cpp'
2355--- src/scopes/internal/SearchReplyImpl.cpp 2014-03-10 10:34:43 +0000
2356+++ src/scopes/internal/SearchReplyImpl.cpp 2014-04-28 05:57:56 +0000
2357@@ -38,11 +38,13 @@
2358 namespace internal
2359 {
2360
2361-SearchReplyImpl::SearchReplyImpl(MWReplyProxy const& mw_proxy, std::shared_ptr<QueryObjectBase> const& qo) :
2362+SearchReplyImpl::SearchReplyImpl(MWReplyProxy const& mw_proxy,
2363+ std::shared_ptr<QueryObjectBase> const& qo,
2364+ int cardinality) :
2365 ObjectImpl(mw_proxy),
2366 ReplyImpl(mw_proxy, qo),
2367 cat_registry_(new CategoryRegistry()),
2368- cardinality_(qo->cardinality({ fwd()->identity(), fwd()->mw_base() })),
2369+ cardinality_(cardinality),
2370 num_pushes_(0)
2371 {
2372 }
2373
2374=== modified file 'src/scopes/internal/ThreadPool.cpp'
2375--- src/scopes/internal/ThreadPool.cpp 2014-01-23 11:28:34 +0000
2376+++ src/scopes/internal/ThreadPool.cpp 2014-04-28 05:57:56 +0000
2377@@ -54,7 +54,6 @@
2378 threads_.push_back(std::thread(&ThreadPool::run, this));
2379 }
2380 auto future = threads_ready_.get_future();
2381- future.wait();
2382 future.get();
2383 }
2384 catch (std::exception const&) // LCOV_EXCL_LINE
2385@@ -66,11 +65,18 @@
2386 ThreadPool::~ThreadPool()
2387 {
2388 queue_->destroy();
2389+
2390+ vector<thread> threads;
2391+ {
2392+ lock_guard<mutex> lock(mutex_);
2393+ threads.swap(threads_);
2394+ }
2395+
2396 try
2397 {
2398- for (size_t i = 0; i < threads_.size(); ++i)
2399+ for (size_t i = 0; i < threads.size(); ++i)
2400 {
2401- threads_[i].join();
2402+ threads[i].join();
2403 }
2404 }
2405 catch (...) // LCOV_EXCL_LINE
2406@@ -81,18 +87,19 @@
2407
2408 void ThreadPool::run()
2409 {
2410- TaskQueue::value_type task;
2411+ {
2412+ lock_guard<mutex> lock(mutex_);
2413+ if (--num_threads_ == 0)
2414+ {
2415+ threads_ready_.set_value();
2416+ }
2417+ }
2418+
2419 for (;;)
2420 {
2421+ TaskQueue::value_type task; // Task must go out of scope in each iteration, in case it stores shared_ptrs.
2422 try
2423 {
2424- {
2425- lock_guard<mutex> lock(mutex_);
2426- if (--num_threads_ == 0)
2427- {
2428- threads_ready_.set_value();
2429- }
2430- }
2431 task = queue_->wait_and_pop();
2432 }
2433 catch (runtime_error const&)
2434
2435=== modified file 'src/scopes/internal/smartscopes/SSQueryObject.cpp'
2436--- src/scopes/internal/smartscopes/SSQueryObject.cpp 2014-03-07 03:49:03 +0000
2437+++ src/scopes/internal/smartscopes/SSQueryObject.cpp 2014-04-28 05:57:56 +0000
2438@@ -206,7 +206,7 @@
2439
2440 // Create the reply proxy and keep a weak_ptr, which we will need
2441 // if cancel() is called later.
2442- q_reply_proxy = make_shared<SearchReplyImpl>(reply, shared_from_this());
2443+ q_reply_proxy = make_shared<SearchReplyImpl>(reply, shared_from_this(), query->q_cardinality);
2444 assert(q_reply_proxy);
2445 query->q_reply_proxy = q_reply_proxy;
2446
2447
2448=== modified file 'src/scopes/internal/zmq_middleware/CMakeLists.txt'
2449--- src/scopes/internal/zmq_middleware/CMakeLists.txt 2014-04-02 09:24:02 +0000
2450+++ src/scopes/internal/zmq_middleware/CMakeLists.txt 2014-04-28 05:57:56 +0000
2451@@ -13,6 +13,7 @@
2452 ${CMAKE_CURRENT_SOURCE_DIR}/RethrowException.cpp
2453 ${CMAKE_CURRENT_SOURCE_DIR}/ScopeI.cpp
2454 ${CMAKE_CURRENT_SOURCE_DIR}/ServantBase.cpp
2455+ ${CMAKE_CURRENT_SOURCE_DIR}/StopPublisher.cpp
2456 ${CMAKE_CURRENT_SOURCE_DIR}/StateReceiverI.cpp
2457 ${CMAKE_CURRENT_SOURCE_DIR}/Util.cpp
2458 ${CMAKE_CURRENT_SOURCE_DIR}/VariantConverter.cpp
2459
2460=== modified file 'src/scopes/internal/zmq_middleware/ConnectionPool.cpp'
2461--- src/scopes/internal/zmq_middleware/ConnectionPool.cpp 2014-01-22 03:29:48 +0000
2462+++ src/scopes/internal/zmq_middleware/ConnectionPool.cpp 2014-04-28 05:57:56 +0000
2463@@ -36,8 +36,8 @@
2464 namespace zmq_middleware
2465 {
2466
2467-ConnectionPool::ConnectionPool(zmqpp::context& context) :
2468- context_(context)
2469+ConnectionPool::ConnectionPool(zmqpp::context& context)
2470+ : context_(context)
2471 {
2472 }
2473
2474@@ -50,9 +50,7 @@
2475 {
2476 assert(!endpoint.empty());
2477
2478- lock_guard<mutex> lock(mutex_);
2479-
2480- // Look for existing connection. If there is one, but with the wrong type, we throw.
2481+ // Look for existing connection.
2482 auto const& it = pool_.find(endpoint);
2483 if (it != pool_.end())
2484 {
2485@@ -66,19 +64,30 @@
2486 }
2487
2488 // No existing connection yet, establish one.
2489+ auto entry = create_connection(endpoint, m, -1);
2490+ return pool_.emplace(move(entry)).first->second.socket;
2491+}
2492+
2493+ConnectionPool::CPool::value_type ConnectionPool::create_connection(std::string const& endpoint,
2494+ RequestMode m,
2495+ int64_t timeout)
2496+{
2497+ if (timeout == -1)
2498+ {
2499+ timeout = 0; // Don't linger on close
2500+ }
2501 zmqpp::socket_type stype = m == RequestMode::Twoway ? zmqpp::socket_type::request : zmqpp::socket_type::push;
2502 zmqpp::socket s(context_, stype);
2503- s.set(zmqpp::socket_option::linger, 0);
2504+ auto zmqpp_timeout = m == RequestMode::Twoway ? int32_t(timeout) : 0;
2505+ s.set(zmqpp::socket_option::linger, zmqpp_timeout);
2506 s.connect(endpoint);
2507- return pool_.emplace(endpoint, Connection { move(s), m }).first->second.socket;
2508+ return CPool::value_type{ endpoint, SocketData{ move(s), m } };
2509 }
2510
2511 void ConnectionPool::remove(std::string const& endpoint)
2512 {
2513 assert(!endpoint.empty());
2514
2515- lock_guard<mutex> lock(mutex_);
2516-
2517 auto const& it = pool_.find(endpoint);
2518 if (it != pool_.end())
2519 {
2520@@ -90,8 +99,7 @@
2521 {
2522 assert(!endpoint.empty());
2523
2524- lock_guard<mutex> lock(mutex_);
2525- pool_.emplace(endpoint, Connection { move(socket), m });
2526+ pool_.emplace(endpoint, SocketData{ move(socket), m });
2527 }
2528
2529 } // namespace zmq_middleware
2530
2531=== modified file 'src/scopes/internal/zmq_middleware/ObjectAdapter.cpp'
2532--- src/scopes/internal/zmq_middleware/ObjectAdapter.cpp 2014-02-20 19:19:25 +0000
2533+++ src/scopes/internal/zmq_middleware/ObjectAdapter.cpp 2014-04-28 05:57:56 +0000
2534@@ -19,6 +19,7 @@
2535 #include <unity/scopes/internal/zmq_middleware/ObjectAdapter.h>
2536
2537 #include <unity/scopes/internal/zmq_middleware/ServantBase.h>
2538+#include <unity/scopes/internal/zmq_middleware/StopPublisher.h>
2539 #include <unity/scopes/internal/zmq_middleware/Util.h>
2540 #include <unity/scopes/internal/zmq_middleware/ZmqException.h>
2541 #include <unity/scopes/internal/zmq_middleware/ZmqReceiver.h>
2542@@ -134,7 +135,7 @@
2543 lock_guard<mutex> state_lock(state_mutex_, adopt_lock);
2544 if (state_ == Destroyed || state_ == Failed)
2545 {
2546- throw_bad_state(state_);
2547+ throw_bad_state("add()", state_);
2548 }
2549 }
2550
2551@@ -158,7 +159,7 @@
2552 lock_guard<mutex> state_lock(state_mutex_, adopt_lock);
2553 if (state_ == Destroyed || state_ == Failed)
2554 {
2555- throw_bad_state(state_);
2556+ throw_bad_state("remove()", state_);
2557 }
2558 }
2559
2560@@ -185,7 +186,7 @@
2561 lock_guard<mutex> state_lock(state_mutex_, adopt_lock);
2562 if (state_ == Destroyed || state_ == Failed)
2563 {
2564- throw_bad_state(state_);
2565+ throw_bad_state("find()", state_);
2566 }
2567 }
2568
2569@@ -210,7 +211,7 @@
2570 lock_guard<mutex> state_lock(state_mutex_, adopt_lock);
2571 if (state_ == Destroyed || state_ == Failed)
2572 {
2573- throw_bad_state(state_);
2574+ throw_bad_state("add_dflt_servant()", state_);
2575 }
2576 }
2577
2578@@ -234,7 +235,7 @@
2579 lock_guard<mutex> state_lock(state_mutex_, adopt_lock);
2580 if (state_ == Destroyed || state_ == Failed)
2581 {
2582- throw_bad_state(state_);
2583+ throw_bad_state("remove_dflt_servant()", state_);
2584 }
2585 }
2586
2587@@ -262,7 +263,7 @@
2588 lock_guard<mutex> state_lock(state_mutex_, adopt_lock);
2589 if (state_ == Destroyed || state_ == Failed)
2590 {
2591- throw_bad_state(state_);
2592+ throw_bad_state("find_dflt_servant()", state_);
2593 }
2594 }
2595
2596@@ -308,7 +309,7 @@
2597 case Destroyed:
2598 case Failed:
2599 {
2600- throw_bad_state(state_);
2601+ throw_bad_state("activate()", state_);
2602 }
2603 default:
2604 {
2605@@ -329,7 +330,7 @@
2606 }
2607 case Failed:
2608 {
2609- throw_bad_state(state_);
2610+ throw_bad_state("shutdown() [state_ == Failed]", state_);
2611 }
2612 case Inactive:
2613 {
2614@@ -349,7 +350,7 @@
2615 }
2616 if (state_ == Failed)
2617 {
2618- throw_bad_state(state_);
2619+ throw_bad_state("shutdown() [state == Activating]", state_);
2620 }
2621 // LCOV_EXCL_STOP
2622
2623@@ -358,7 +359,7 @@
2624 case Active:
2625 {
2626 state_ = Deactivating;
2627- stop_workers();
2628+ stopper_->stop();
2629 state_changed_.notify_all();
2630 break;
2631 }
2632@@ -388,11 +389,11 @@
2633
2634 if (state == Failed)
2635 {
2636- throw_bad_state(state);
2637+ throw_bad_state("wait_for_shutdown()", state);
2638 }
2639 }
2640
2641-void ObjectAdapter::throw_bad_state(AdapterState state) const
2642+void ObjectAdapter::throw_bad_state(string const& label, AdapterState state) const
2643 {
2644 string bad_state;
2645 switch (state)
2646@@ -420,16 +421,31 @@
2647 break;
2648 }
2649 }
2650- MiddlewareException e("Object adapter in " + bad_state + " state (adapter: " + name_ + ")");
2651+ MiddlewareException e(label + ": Object adapter in " + bad_state + " state (adapter: " + name_ + ")");
2652 e.remember(exception_); // Remember any exception encountered by the broker thread or a worker thread.
2653 throw e;
2654 }
2655
2656 void ObjectAdapter::run_workers()
2657 {
2658+ {
2659+ lock_guard<mutex> state_lock(state_mutex_);
2660+ try
2661+ {
2662+ // Start the publisher for stop messages.
2663+ stopper_.reset(new StopPublisher(mw_.context(), name_ + "-stopper"));
2664+ }
2665+ catch (...)
2666+ {
2667+ state_ = Failed;
2668+ state_changed_.notify_all();
2669+ throw MiddlewareException("ObjectAdapter::run_workers(): stop thread failure (adapter: " + name_ + ")"); // LCOV_EXCL_LINE
2670+ }
2671+ }
2672+
2673 // Start a broker thread to forward incoming messages to backend workers and
2674 // wait for the broker thread to finish its initialization. The broker
2675- // signals after it has connected to the ctrl socket.
2676+ // signals after it has connected to the stop socket.
2677 {
2678 lock_guard<mutex> lock(ready_mutex_);
2679 ready_ = promise<void>();
2680@@ -469,56 +485,13 @@
2681 // LCOV_EXCL_START
2682 catch (...) //
2683 {
2684- stop_workers();
2685+ stopper_->stop();
2686 throw MiddlewareException("ObjectAdapter::run_workers(): worker thread failure (adapter: " + name_ + ")");
2687 }
2688 // LCOV_EXCL_STOP
2689 }
2690 }
2691
2692-void ObjectAdapter::init_ctrl_socket()
2693-{
2694- lock_guard<mutex> lock(ctrl_mutex_);
2695-
2696- // PUB socket to let the broker and workers know when it is time to shut down.
2697- // Sending anything means the socket becomes ready for reading, which causes
2698- // the broker and worker threads to finish.
2699- ctrl_.reset(new zmqpp::socket(*mw_.context(), zmqpp::socket_type::publish));
2700- ctrl_->set(zmqpp::socket_option::linger, 0);
2701- ctrl_->bind("inproc://" + name_ + "_adapter_ctrl");
2702-}
2703-
2704-zmqpp::socket ObjectAdapter::subscribe_to_ctrl_socket()
2705-{
2706- zmqpp::socket ctrl(*mw_.context(), zmqpp::socket_type::subscribe);
2707- ctrl.set(zmqpp::socket_option::linger, 0);
2708- ctrl.connect("inproc://" + name_ + "_adapter_ctrl"); // Once a thread can read from here, that's the command to stop.
2709- ctrl.subscribe("");
2710- return move(ctrl);
2711-}
2712-
2713-void ObjectAdapter::stop_workers() noexcept
2714-{
2715- lock_guard<mutex> lock(ctrl_mutex_);
2716-
2717- try
2718- {
2719- ctrl_->send("stop");
2720- }
2721- // LCOV_EXCL_START
2722- catch (std::exception const& e)
2723- {
2724- // TODO: log this instead
2725- cerr << "ObjectAdapter::stop_workers(): " << e.what() << endl;
2726- }
2727- catch (...)
2728- {
2729- // TODO: log this instead
2730- cerr << "ObjectAdapter::stop_workers(): unknown exception" << endl;
2731- }
2732- // LCOV_EXCL_STOP
2733-}
2734-
2735 // For the ipc transport, zmq permits more than one server to bind to the same endpoint.
2736 // If a server binds to an endpoint while another server is using that endpoint, the
2737 // second server silently "steals" the endpoint from the previous server, so all
2738@@ -574,12 +547,8 @@
2739 {
2740 try
2741 {
2742- // Create the writing end of the ctrl socket. We need to do this before subscribing because, for inproc
2743- // pub-sub sockets, the bind must happen before the connect.
2744- init_ctrl_socket();
2745-
2746- // Subscribe to ctrl socket. Once this socket becomes readable, that's the command to finish.
2747- auto ctrl = subscribe_to_ctrl_socket();
2748+ // Subscribe to stop socket. Once this socket becomes readable, that's the command to finish.
2749+ auto stop = stopper_->subscribe();
2750
2751 // Set up message pump. Router-dealer for twoway adapter, pull-push for oneway adapter.
2752 auto socket_type = mode_ == RequestMode::Twoway ? zmqpp::socket_type::router : zmqpp::socket_type::pull;
2753@@ -592,7 +561,7 @@
2754
2755 try
2756 {
2757- poller.add(ctrl);
2758+ poller.add(stop);
2759
2760 frontend.set(zmqpp::socket_option::linger, 0);
2761 // "Safe" bind: prevents two servers from binding to the same endpoint.
2762@@ -625,13 +594,13 @@
2763 {
2764 zmqpp::message message;
2765 poller.poll();
2766- if (poller.has_input(ctrl))
2767+ if (poller.has_input(stop))
2768 {
2769- // When the ctrl socket becomes ready, we need to get out of here.
2770+ // When the stop socket becomes ready, we need to get out of here.
2771 // We stop reading more requests from the router, but continue processing
2772 // while there are still replies outstanding.
2773- ctrl.receive(message);
2774- ctrl.close();
2775+ poller.remove(stop);
2776+ stop.close();
2777 shutting_down = true;
2778 }
2779 if (!shutting_down && poller.has_input(frontend)) // Once shutting down, we no longer read incoming messages.
2780@@ -696,9 +665,9 @@
2781 {
2782 zmqpp::poller poller;
2783
2784- // Subscribe to ctrl socket. Once this socket becomes readable, that's the command to finish.
2785- auto ctrl = subscribe_to_ctrl_socket();
2786- poller.add(ctrl);
2787+ // Subscribe to stop socket. Once this socket becomes readable, that's the command to finish.
2788+ auto stop = stopper_->subscribe();
2789+ poller.add(stop);
2790
2791 auto socket_type = mode_ == RequestMode::Twoway ? zmqpp::socket_type::reply : zmqpp::socket_type::pull;
2792 zmqpp::socket s(*mw_.context(), socket_type);
2793@@ -728,11 +697,10 @@
2794 }
2795
2796 poller.poll();
2797- if (poller.has_input(ctrl)) // Parent sent a stop message, so we are supposed to go away.
2798+ if (poller.has_input(stop)) // Parent sent a stop message, so we are supposed to go away.
2799 {
2800- zmqpp::message message;
2801- ctrl.receive(message);
2802- ctrl.close();
2803+ poller.remove(stop);
2804+ stop.close();
2805 finish = true;
2806 }
2807
2808@@ -810,7 +778,16 @@
2809 }
2810
2811 // Look for a servant with matching id.
2812- auto servant = find_servant(current.id, current.category);
2813+ shared_ptr<ServantBase> servant;
2814+ try
2815+ {
2816+ servant = find_servant(current.id, current.category);
2817+ }
2818+ catch (std::exception const&)
2819+ {
2820+ // Ignore failure to find servant during destruction phase.
2821+ }
2822+
2823 if (!servant)
2824 {
2825 if (mode_ == RequestMode::Twoway)
2826@@ -838,7 +815,7 @@
2827 // LCOV_EXCL_START
2828 catch (...)
2829 {
2830- stop_workers(); // Fatal error, we need to stop all other workers and the broker.
2831+ stopper_->stop(); // Fatal error, we need to stop all other workers and the broker.
2832 MiddlewareException e("ObjectAdapter: worker thread failure (adapter: " + name_ + ")");
2833 store_exception(e);
2834 // We may not have signaled the parent yet, depending on where things went wrong.
2835
2836=== added file 'src/scopes/internal/zmq_middleware/StopPublisher.cpp'
2837--- src/scopes/internal/zmq_middleware/StopPublisher.cpp 1970-01-01 00:00:00 +0000
2838+++ src/scopes/internal/zmq_middleware/StopPublisher.cpp 2014-04-28 05:57:56 +0000
2839@@ -0,0 +1,225 @@
2840+/*
2841+ * Copyright (C) 2014 Canonical Ltd
2842+ *
2843+ * This program is free software: you can redistribute it and/or modify
2844+ * it under the terms of the GNU Lesser General Public License version 3 as
2845+ * published by the Free Software Foundation.
2846+ *
2847+ * This program is distributed in the hope that it will be useful,
2848+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
2849+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
2850+ * GNU Lesser General Public License for more details.
2851+ *
2852+ * You should have received a copy of the GNU Lesser General Public License
2853+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
2854+ *
2855+ * Authored by: Michi Henning <michi.henning@canonical.com>
2856+ */
2857+
2858+#include <unity/scopes/internal/zmq_middleware/StopPublisher.h>
2859+
2860+#include <unity/scopes/ScopeExceptions.h>
2861+
2862+#include <cassert>
2863+#include <iostream>
2864+
2865+using namespace std;
2866+
2867+namespace unity
2868+{
2869+
2870+namespace scopes
2871+{
2872+
2873+namespace internal
2874+{
2875+
2876+namespace zmq_middleware
2877+{
2878+
2879+StopPublisher::StopPublisher(zmqpp::context* context, string const& inproc_name)
2880+ : context_(context)
2881+ , endpoint_("inproc://" + inproc_name)
2882+ , state_(Starting)
2883+{
2884+ assert(context);
2885+ assert(!inproc_name.empty());
2886+
2887+ thread_ = thread(&StopPublisher::stopper_thread, this);
2888+
2889+ unique_lock<mutex> lock(m_);
2890+ cond_.wait(lock, [this] { return state_ == Started || state_ == Failed; });
2891+
2892+ if (state_ == Failed)
2893+ {
2894+ if (thread_.joinable())
2895+ {
2896+ thread_.join();
2897+ }
2898+ try
2899+ {
2900+ rethrow_exception(ex_);
2901+ }
2902+ catch (...)
2903+ {
2904+ throw MiddlewareException("StopPublisher(): publisher thread failed (endpoint: " + endpoint_ + ")");
2905+ }
2906+ }
2907+}
2908+
2909+StopPublisher::~StopPublisher()
2910+{
2911+ try
2912+ {
2913+ stop();
2914+ }
2915+ // LCOV_EXCL_START
2916+ catch (std::exception const& e)
2917+ {
2918+ cerr << "~StopPublisher(): exception from stop(): " << e.what() << endl;
2919+ // TODO: log this
2920+ }
2921+ catch (...)
2922+ {
2923+ cerr << "~StopPublisher(): unknown exception from stop()" << endl;
2924+ // TODO: log this
2925+ }
2926+ // LCOV_EXCL_STOP
2927+ if (thread_.joinable())
2928+ {
2929+ thread_.join();
2930+ }
2931+}
2932+
2933+string StopPublisher::endpoint() const
2934+{
2935+ lock_guard<mutex> lock(m_);
2936+ return endpoint_;
2937+}
2938+
2939+// Return a socket that becomes ready for reading once stop() is called.
2940+// Once the socket becomes readable, it has exactly one zero-length message to be read.
2941+
2942+zmqpp::socket StopPublisher::subscribe()
2943+{
2944+ lock_guard<mutex> lock(m_);
2945+
2946+ switch (state_)
2947+ {
2948+ case Stopping:
2949+ case Stopped:
2950+ {
2951+ throw MiddlewareException(
2952+ "StopPublisher::subscribe(): cannot subscribe to stopped publisher "
2953+ "(endpoint: " + endpoint_ + ")");
2954+ }
2955+ // LCOV_EXCL_START
2956+ case Failed:
2957+ {
2958+ try
2959+ {
2960+ rethrow_exception(ex_);
2961+ }
2962+ catch (...)
2963+ {
2964+ throw MiddlewareException(
2965+ "StopPublisher::subscribe(): cannot subscribe to failed publisher "
2966+ "(endpoint: " + endpoint_ + ")");
2967+ }
2968+ }
2969+ // LCOV_EXCL_STOP
2970+ default:
2971+ {
2972+ assert(state_ == Started);
2973+ zmqpp::socket s(*context_, zmqpp::socket_type::subscribe);
2974+ s.set(zmqpp::socket_option::linger, 0);
2975+ s.connect(endpoint_);
2976+ s.subscribe("");
2977+ return move(s);
2978+ }
2979+ }
2980+}
2981+
2982+// Tell the stopper thread to publish the stop message and terminate itself.
2983+
2984+void StopPublisher::stop()
2985+{
2986+ unique_lock<mutex> lock(m_);
2987+ switch (state_)
2988+ {
2989+ case Stopping:
2990+ case Stopped:
2991+ {
2992+ return; // no-op
2993+ }
2994+ // LCOV_EXCL_START
2995+ case Failed:
2996+ {
2997+ try
2998+ {
2999+ rethrow_exception(ex_);
3000+ }
3001+ catch (...)
3002+ {
3003+ throw MiddlewareException("StopPublisher::stop(): cannot stop failed publisher");
3004+ }
3005+ }
3006+ // LCOV_EXCL_STOP
3007+ default:
3008+ {
3009+ assert(state_ == Started);
3010+ state_ = Stopping;
3011+ cond_.notify_all();
3012+ }
3013+ }
3014+}
3015+
3016+void StopPublisher::stopper_thread() noexcept
3017+{
3018+ try
3019+ {
3020+ unique_lock<mutex> lock(m_);
3021+
3022+ assert(state_ == Starting);
3023+
3024+ // Create the publishing socket.
3025+ zmqpp::socket pub_socket(zmqpp::socket(*context_, zmqpp::socket_type::publish));
3026+ // Allow time for stop message to be buffered, so close() won't discard it.
3027+ pub_socket.set(zmqpp::socket_option::linger, 5000);
3028+ pub_socket.bind(endpoint_);
3029+
3030+ // Notify that we are ready. This ensures that subscribers
3031+ // cannot subscribe before the pub socket was bound.
3032+ state_ = Started;
3033+ cond_.notify_all();
3034+
3035+ lock.unlock(); // Allow parent to wake up.
3036+
3037+ // Wait until we are told to stop.
3038+ lock.lock();
3039+ cond_.wait(lock, [this] { return state_ == Stopping; });
3040+
3041+ // Write the stop message for the subscribers and close.
3042+ // Sending an empty string is OK; the receiver will get a zero-length message.
3043+ pub_socket.send("");
3044+ pub_socket.close();
3045+ state_ = Stopped;
3046+ }
3047+ catch (...)
3048+ {
3049+ lock_guard<mutex> lock(m_);
3050+ state_ = Failed;
3051+ ex_ = current_exception();
3052+ cond_.notify_all();
3053+ }
3054+
3055+ assert(state_ == Stopped || state_ == Failed);
3056+}
3057+
3058+} // namespace zmq_middleware
3059+
3060+} // namespace internal
3061+
3062+} // namespace scopes
3063+
3064+} // namespace unity
3065
3066=== modified file 'src/scopes/internal/zmq_middleware/ZmqMiddleware.cpp'
3067--- src/scopes/internal/zmq_middleware/ZmqMiddleware.cpp 2014-04-02 09:24:02 +0000
3068+++ src/scopes/internal/zmq_middleware/ZmqMiddleware.cpp 2014-04-28 05:57:56 +0000
3069@@ -434,9 +434,9 @@
3070 shared_ptr<QueryCtrlI> qci(make_shared<QueryCtrlI>(ctrl));
3071 auto adapter = find_adapter(server_name_ + ctrl_suffix, config_.private_dir());
3072 function<void()> df;
3073- auto proxy = safe_add(df, adapter, "", qci);
3074+ auto p = safe_add(df, adapter, "", qci);
3075 ctrl->set_disconnect_function(df);
3076- return ZmqQueryCtrlProxy(new ZmqQueryCtrl(this, proxy->endpoint(), proxy->identity(), "QueryCtrl"));
3077+ proxy = ZmqQueryCtrlProxy(new ZmqQueryCtrl(this, p->endpoint(), p->identity(), "QueryCtrl"));
3078 }
3079 catch (std::exception const& e) // Should never happen unless our implementation is broken
3080 {
3081@@ -476,9 +476,9 @@
3082 shared_ptr<QueryI> qi(make_shared<QueryI>(query));
3083 auto adapter = find_adapter(server_name_ + query_suffix, config_.private_dir());
3084 function<void()> df;
3085- auto proxy = safe_add(df, adapter, "", qi);
3086+ auto p = safe_add(df, adapter, "", qi);
3087 query->set_disconnect_function(df);
3088- return ZmqQueryProxy(new ZmqQuery(this, proxy->endpoint(), proxy->identity(), "Query"));
3089+ proxy = ZmqQueryProxy(new ZmqQuery(this, p->endpoint(), p->identity(), "Query"));
3090 }
3091 catch (std::exception const& e) // Should never happen unless our implementation is broken
3092 {
3093@@ -519,9 +519,9 @@
3094 shared_ptr<RegistryI> ri(make_shared<RegistryI>(registry));
3095 auto adapter = find_adapter(server_name_, runtime()->registry_endpointdir());
3096 function<void()> df;
3097- auto proxy = safe_add(df, adapter, identity, ri);
3098+ auto p = safe_add(df, adapter, identity, ri);
3099 registry->set_disconnect_function(df);
3100- return ZmqRegistryProxy(new ZmqRegistry(this, proxy->endpoint(), proxy->identity(), "Registry", twoway_timeout_));
3101+ proxy = ZmqRegistryProxy(new ZmqRegistry(this, p->endpoint(), p->identity(), "Registry", twoway_timeout_));
3102 }
3103 catch (std::exception const& e) // Should never happen unless our implementation is broken
3104 {
3105@@ -542,9 +542,9 @@
3106 shared_ptr<ReplyI> ri(make_shared<ReplyI>(reply));
3107 auto adapter = find_adapter(server_name_ + reply_suffix, config_.public_dir());
3108 function<void()> df;
3109- auto proxy = safe_add(df, adapter, "", ri);
3110+ auto p = safe_add(df, adapter, "", ri);
3111 reply->set_disconnect_function(df);
3112- return ZmqReplyProxy(new ZmqReply(this, proxy->endpoint(), proxy->identity(), "Reply"));
3113+ proxy = ZmqReplyProxy(new ZmqReply(this, p->endpoint(), p->identity(), "Reply"));
3114 }
3115 catch (std::exception const& e) // Should never happen unless our implementation is broken
3116 {
3117@@ -566,9 +566,9 @@
3118 shared_ptr<ScopeI> si(make_shared<ScopeI>(scope));
3119 auto adapter = find_adapter(server_name_, config_.private_dir());
3120 function<void()> df;
3121- auto proxy = safe_add(df, adapter, identity, si);
3122+ auto p = safe_add(df, adapter, identity, si);
3123 scope->set_disconnect_function(df);
3124- return ZmqScopeProxy(new ZmqScope(this, proxy->endpoint(), proxy->identity(), "Scope", twoway_timeout_));
3125+ proxy = ZmqScopeProxy(new ZmqScope(this, p->endpoint(), p->identity(), "Scope", twoway_timeout_));
3126 }
3127 catch (std::exception const& e) // Should never happen unless our implementation is broken
3128 {
3129
3130=== modified file 'src/scopes/internal/zmq_middleware/ZmqObject.cpp'
3131--- src/scopes/internal/zmq_middleware/ZmqObject.cpp 2014-02-03 07:28:36 +0000
3132+++ src/scopes/internal/zmq_middleware/ZmqObject.cpp 2014-04-28 05:57:56 +0000
3133@@ -18,7 +18,6 @@
3134
3135 #include <unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h>
3136
3137-#include <unity/scopes/internal/zmq_middleware/ConnectionPool.h>
3138 #include <unity/scopes/internal/zmq_middleware/Util.h>
3139 #include <unity/scopes/internal/zmq_middleware/ZmqException.h>
3140 #include <unity/scopes/internal/zmq_middleware/ZmqSender.h>
3141@@ -127,7 +126,8 @@
3142 capnp::MallocMessageBuilder request_builder;
3143 make_request_(request_builder, "ping");
3144
3145- auto future = mw_base()->invoke_pool()->submit([&] { this->invoke_(request_builder); });
3146+ auto future = mw_base()->invoke_pool()->submit([&] { this->invoke_twoway_(request_builder); });
3147+ // TODO: dubious, waiter thread in runtimeImpl should do this?
3148 future.wait();
3149 }
3150
3151@@ -148,59 +148,82 @@
3152 return request;
3153 }
3154
3155+#ifdef ENABLE_IPC_MONITOR
3156 void register_monitor_socket (ConnectionPool& pool, zmqpp::context_t const& context)
3157 {
3158 thread_local static bool monitor_initialized = false;
3159 if (!monitor_initialized) {
3160 monitor_initialized = true;
3161 zmqpp::socket monitor_socket(context, zmqpp::socket_type::publish);
3162+ monitor_socket.set(zmqpp::socket_option::linger, 0);
3163 monitor_socket.connect(MONITOR_ENDPOINT);
3164- monitor_socket.set(zmqpp::socket_option::linger, 0);
3165 pool.register_socket(MONITOR_ENDPOINT, move(monitor_socket), RequestMode::Oneway);
3166 }
3167 }
3168-
3169-ZmqReceiver ZmqObjectProxy::invoke_(capnp::MessageBuilder& out_params)
3170-{
3171- return invoke_(out_params, timeout_);
3172-}
3173-
3174-// Get a socket to the endpoint for this proxy and write the request on the wire.
3175-// For a twoway request, poll for the reply with the timeout set for this proxy.
3176-// Return a receiver for the response (whether this is a oneway or twoway request).
3177-
3178-ZmqReceiver ZmqObjectProxy::invoke_(capnp::MessageBuilder& out_params, int64_t timeout)
3179-{
3180- // Each calling thread gets its own pool because zmq sockets are not thread-safe.
3181- thread_local static ConnectionPool pool(*mw_base()->context());
3182-
3183- zmqpp::socket& s = pool.find(endpoint_, mode_);
3184- ZmqSender sender(s);
3185- auto segments = out_params.getSegmentsForOutput();
3186- sender.send(segments);
3187-
3188-#ifdef ENABLE_IPC_MONITOR
3189- if (true) {
3190- register_monitor_socket(pool, *mw_base()->context());
3191- zmqpp::socket& monitor = pool.find(MONITOR_ENDPOINT, RequestMode::Oneway);
3192- auto word_arr = capnp::messageToFlatArray(segments);
3193- monitor.send_raw(reinterpret_cast<char*>(&word_arr[0]), word_arr.size() * sizeof(capnp::word));
3194- }
3195-#endif
3196-
3197- if (mode_ == RequestMode::Twoway)
3198+#endif
3199+
3200+// Get a socket to the endpoint for this proxy and write the request on the wire.
3201+
3202+void ZmqObjectProxy::invoke_oneway_(capnp::MessageBuilder& out_params)
3203+{
3204+ // Each calling thread gets its own pool because zmq sockets are not thread-safe.
3205+ thread_local static ConnectionPool pool(*mw_base()->context());
3206+
3207+ assert(mode_ == RequestMode::Oneway);
3208+ zmqpp::socket& s = pool.find(endpoint_, mode_);
3209+ ZmqSender sender(s);
3210+ auto segments = out_params.getSegmentsForOutput();
3211+ sender.send(segments);
3212+
3213+#ifdef ENABLE_IPC_MONITOR
3214+ if (true) {
3215+ register_monitor_socket(pool, *mw_base()->context());
3216+ zmqpp::socket& monitor = pool.find(MONITOR_ENDPOINT, RequestMode::Oneway);
3217+ auto word_arr = capnp::messageToFlatArray(segments);
3218+ monitor.send_raw(reinterpret_cast<char*>(&word_arr[0]), word_arr.size() * sizeof(capnp::word));
3219+ }
3220+#endif
3221+}
3222+
3223+ZmqReceiver ZmqObjectProxy::invoke_twoway_(capnp::MessageBuilder& out_params)
3224+{
3225+ return invoke_twoway_(out_params, timeout_);
3226+}
3227+
3228+// Get a socket to the endpoint for this proxy and write the request on the wire.
3229+// Poll for the reply with the given timeout.
3230+// Return a socket for the response or throw if the timeout expires.
3231+
3232+ZmqReceiver ZmqObjectProxy::invoke_twoway_(capnp::MessageBuilder& out_params, int64_t timeout)
3233+{
3234+ // Each calling thread gets its own pool because zmq sockets are not thread-safe.
3235+ thread_local static ConnectionPool pool(*mw_base()->context());
3236+
3237+ assert(mode_ == RequestMode::Twoway);
3238+ zmqpp::socket& s = pool.find(endpoint_, mode_);
3239+ ZmqSender sender(s);
3240+ auto segments = out_params.getSegmentsForOutput();
3241+ sender.send(segments);
3242+
3243+#ifdef ENABLE_IPC_MONITOR
3244+ if (true) {
3245+ register_monitor_socket(pool, *mw_base()->context());
3246+ zmqpp::socket& monitor = pool.find(MONITOR_ENDPOINT, RequestMode::Oneway);
3247+ auto word_arr = capnp::messageToFlatArray(segments);
3248+ monitor.send_raw(reinterpret_cast<char*>(&word_arr[0]), word_arr.size() * sizeof(capnp::word));
3249+ }
3250+#endif
3251+
3252+ zmqpp::poller p;
3253+ p.add(s);
3254+ p.poll(timeout);
3255+ if (!p.has_input(s))
3256 {
3257- zmqpp::poller p;
3258- p.add(s);
3259- p.poll(timeout);
3260- if (!p.has_input(s))
3261- {
3262- // If a request times out, we must close the corresponding socket, otherwise
3263- // zmq gets confused: the reply will never be read, so the socket ends up
3264- // in a bad state.
3265- pool.remove(endpoint_);
3266- throw TimeoutException("Request timed out after " + std::to_string(timeout) + " milliseconds");
3267- }
3268+ // If a request times out, we must trash the corresponding socket, otherwise
3269+ // zmq gets confused: the reply will never be read, so the socket ends up
3270+ // in a bad state.
3271+ pool.remove(endpoint_);
3272+ throw TimeoutException("Request timed out after " + std::to_string(timeout) + " milliseconds");
3273 }
3274 return ZmqReceiver(s);
3275 }
3276
3277=== modified file 'src/scopes/internal/zmq_middleware/ZmqQuery.cpp'
3278--- src/scopes/internal/zmq_middleware/ZmqQuery.cpp 2014-01-24 00:26:43 +0000
3279+++ src/scopes/internal/zmq_middleware/ZmqQuery.cpp 2014-04-28 05:57:56 +0000
3280@@ -66,7 +66,7 @@
3281 proxy.setIdentity(rp->identity().c_str());
3282 proxy.setCategory(rp->category().c_str());
3283
3284- auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); });
3285+ auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_oneway_(request_builder); });
3286 future.wait();
3287 }
3288
3289
3290=== modified file 'src/scopes/internal/zmq_middleware/ZmqQueryCtrl.cpp'
3291--- src/scopes/internal/zmq_middleware/ZmqQueryCtrl.cpp 2014-01-24 00:26:43 +0000
3292+++ src/scopes/internal/zmq_middleware/ZmqQueryCtrl.cpp 2014-04-28 05:57:56 +0000
3293@@ -58,7 +58,7 @@
3294 capnp::MallocMessageBuilder request_builder;
3295 make_request_(request_builder, "cancel");
3296
3297- auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); });
3298+ auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_oneway_(request_builder); });
3299 future.wait();
3300 }
3301
3302@@ -67,7 +67,7 @@
3303 capnp::MallocMessageBuilder request_builder;
3304 make_request_(request_builder, "destroy");
3305
3306- auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); });
3307+ auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_oneway_(request_builder); });
3308 future.wait();
3309 }
3310
3311
3312=== modified file 'src/scopes/internal/zmq_middleware/ZmqRegistry.cpp'
3313--- src/scopes/internal/zmq_middleware/ZmqRegistry.cpp 2014-04-03 12:57:25 +0000
3314+++ src/scopes/internal/zmq_middleware/ZmqRegistry.cpp 2014-04-28 05:57:56 +0000
3315@@ -85,7 +85,7 @@
3316 auto in_params = request.initInParams().getAs<capnproto::Registry::GetMetadataRequest>();
3317 in_params.setIdentity(scope_id.c_str());
3318
3319- auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); });
3320+ auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_twoway_(request_builder); });
3321 auto receiver = future.get();
3322 auto segments = receiver.receive();
3323 capnp::SegmentArrayMessageReader reader(segments);
3324@@ -119,7 +119,7 @@
3325 capnp::MallocMessageBuilder request_builder;
3326 make_request_(request_builder, "list");
3327
3328- auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); });
3329+ auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_twoway_(request_builder); });
3330 auto receiver = future.get();
3331 auto segments = receiver.receive();
3332 capnp::SegmentArrayMessageReader reader(segments);
3333@@ -149,7 +149,7 @@
3334
3335 // locate uses a custom timeout because it needs to potentially fork/exec a scope.
3336 int64_t timeout = 1000; // TODO: get timeout from config
3337- auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder, timeout); });
3338+ auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_twoway_(request_builder, timeout); });
3339 auto receiver = future.get();
3340 auto segments = receiver.receive();
3341 capnp::SegmentArrayMessageReader reader(segments);
3342
3343=== modified file 'src/scopes/internal/zmq_middleware/ZmqReply.cpp'
3344--- src/scopes/internal/zmq_middleware/ZmqReply.cpp 2014-01-24 00:26:43 +0000
3345+++ src/scopes/internal/zmq_middleware/ZmqReply.cpp 2014-04-28 05:57:56 +0000
3346@@ -64,7 +64,7 @@
3347 auto resultBuilder = in_params.getResult();
3348 to_value_dict(result, resultBuilder);
3349
3350- auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); });
3351+ auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_oneway_(request_builder); });
3352 future.wait();
3353 }
3354
3355@@ -99,7 +99,7 @@
3356 }
3357 in_params.setReason(r);
3358
3359- auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); });
3360+ auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_oneway_(request_builder); });
3361 future.wait();
3362 }
3363
3364
3365=== modified file 'src/scopes/internal/zmq_middleware/ZmqScope.cpp'
3366--- src/scopes/internal/zmq_middleware/ZmqScope.cpp 2014-03-07 02:01:47 +0000
3367+++ src/scopes/internal/zmq_middleware/ZmqScope.cpp 2014-04-28 05:57:56 +0000
3368@@ -27,6 +27,8 @@
3369 #include <unity/scopes/Result.h>
3370 #include <unity/scopes/CannedQuery.h>
3371
3372+#include <iostream> // TODO: remove this
3373+
3374 using namespace std;
3375
3376 namespace unity
3377@@ -90,7 +92,7 @@
3378 p.setCategory(reply_proxy->category().c_str());
3379 }
3380
3381- auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); });
3382+ auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_twoway_(request_builder); });
3383
3384 auto receiver = future.get();
3385 auto segments = receiver.receive();
3386@@ -122,7 +124,7 @@
3387 p.setIdentity(reply_proxy->identity().c_str());
3388 }
3389
3390- auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); });
3391+ auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_twoway_(request_builder); });
3392
3393 auto receiver = future.get();
3394 auto segments = receiver.receive();
3395@@ -157,7 +159,7 @@
3396 p.setIdentity(reply_proxy->identity().c_str());
3397 }
3398
3399- auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); });
3400+ auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_twoway_(request_builder); });
3401 future.wait();
3402
3403 auto receiver = future.get();
3404@@ -190,7 +192,7 @@
3405 p.setIdentity(reply_proxy->identity().c_str());
3406 }
3407
3408- auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); });
3409+ auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_twoway_(request_builder); });
3410
3411 auto receiver = future.get();
3412 auto segments = receiver.receive();
3413
3414=== modified file 'src/scopes/internal/zmq_middleware/ZmqStateReceiver.cpp'
3415--- src/scopes/internal/zmq_middleware/ZmqStateReceiver.cpp 2014-04-02 09:24:02 +0000
3416+++ src/scopes/internal/zmq_middleware/ZmqStateReceiver.cpp 2014-04-28 05:57:56 +0000
3417@@ -83,7 +83,7 @@
3418 in_params.setState(s);
3419 in_params.setSenderId(sender_id);
3420
3421- auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); });
3422+ auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_oneway_(request_builder); });
3423 future.wait();
3424 }
3425
3426
3427=== modified file 'test/gtest/scopes/CMakeLists.txt'
3428--- test/gtest/scopes/CMakeLists.txt 2014-04-01 16:58:32 +0000
3429+++ test/gtest/scopes/CMakeLists.txt 2014-04-28 05:57:56 +0000
3430@@ -1,23 +1,24 @@
3431 add_subdirectory(internal)
3432 add_subdirectory(testing)
3433
3434-add_subdirectory(Registry)
3435-add_subdirectory(Runtime)
3436 add_subdirectory(Activation)
3437 add_subdirectory(ActivationResponse)
3438 add_subdirectory(Annotation)
3439 add_subdirectory(CannedQuery)
3440-add_subdirectory(Department)
3441-add_subdirectory(Filters)
3442-add_subdirectory(ScopeBase)
3443-add_subdirectory(ScopeExceptions)
3444+add_subdirectory(CategorisedResult)
3445 add_subdirectory(Category)
3446-add_subdirectory(CategorisedResult)
3447 add_subdirectory(CategoryRenderer)
3448 add_subdirectory(ColumnLayout)
3449+add_subdirectory(Department)
3450+add_subdirectory(Filters)
3451+add_subdirectory(Invocation)
3452+add_subdirectory(OptionSelectorFilter)
3453 add_subdirectory(PreviewWidget)
3454 add_subdirectory(QueryMetadata)
3455-add_subdirectory(OptionSelectorFilter)
3456+add_subdirectory(Registry)
3457+add_subdirectory(Runtime)
3458+add_subdirectory(ScopeBase)
3459+add_subdirectory(ScopeExceptions)
3460 add_subdirectory(Variant)
3461 add_subdirectory(VariantBuilder)
3462 add_subdirectory(Version)
3463
3464=== added directory 'test/gtest/scopes/Invocation'
3465=== added file 'test/gtest/scopes/Invocation/CMakeLists.txt'
3466--- test/gtest/scopes/Invocation/CMakeLists.txt 1970-01-01 00:00:00 +0000
3467+++ test/gtest/scopes/Invocation/CMakeLists.txt 2014-04-28 05:57:56 +0000
3468@@ -0,0 +1,8 @@
3469+configure_file(Registry.ini.in ${CMAKE_CURRENT_BINARY_DIR}/Registry.ini)
3470+configure_file(Runtime.ini.in ${CMAKE_CURRENT_BINARY_DIR}/Runtime.ini)
3471+configure_file(Zmq.ini.in ${CMAKE_CURRENT_BINARY_DIR}/Zmq.ini)
3472+
3473+add_executable(Invocation_test Invocation_test.cpp TestScope.cpp)
3474+target_link_libraries(Invocation_test ${TESTLIBS})
3475+
3476+add_test(Invocation Invocation_test)
3477
3478=== added file 'test/gtest/scopes/Invocation/Invocation_test.cpp'
3479--- test/gtest/scopes/Invocation/Invocation_test.cpp 1970-01-01 00:00:00 +0000
3480+++ test/gtest/scopes/Invocation/Invocation_test.cpp 2014-04-28 05:57:56 +0000
3481@@ -0,0 +1,145 @@
3482+/*
3483+ * Copyright (C) 2014 Canonical Ltd
3484+ *
3485+ * This program is free software: you can redistribute it and/or modify
3486+ * it under the terms of the GNU Lesser General Public License version 3 as
3487+ * published by the Free Software Foundation.
3488+ *
3489+ * This program is distributed in the hope that it will be useful,
3490+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
3491+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
3492+ * GNU Lesser General Public License for more details.
3493+ *
3494+ * You should have received a copy of the GNU Lesser General Public License
3495+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
3496+ *
3497+ * Authored by: Michi Henning <michi.henning@canonical.com>
3498+ */
3499+
3500+#include <signal.h>
3501+#include <sys/types.h>
3502+#include <unistd.h>
3503+
3504+#include <mutex>
3505+
3506+#include <unity/scopes/ActionMetadata.h>
3507+#include <unity/scopes/CategorisedResult.h>
3508+#include <unity/scopes/internal/MWScope.h>
3509+#include <unity/scopes/internal/RuntimeImpl.h>
3510+#include <unity/scopes/internal/ScopeImpl.h>
3511+#include <unity/scopes/ListenerBase.h>
3512+#include <unity/scopes/QueryCtrl.h>
3513+#include <unity/scopes/Runtime.h>
3514+#include <unity/scopes/SearchMetadata.h>
3515+#include <unity/UnityExceptions.h>
3516+
3517+#include <gtest/gtest.h>
3518+
3519+#include "TestScope.h"
3520+
3521+using namespace std;
3522+using namespace unity::scopes;
3523+
3524+class TestReceiver : public SearchListenerBase
3525+{
3526+public:
3527+ TestReceiver()
3528+ : query_complete_(false)
3529+ {
3530+ }
3531+
3532+ virtual void push(CategorisedResult /* result */) override
3533+ {
3534+ }
3535+
3536+ virtual void finished(ListenerBase::Reason reason, string const& error_message) override
3537+ {
3538+ lock_guard<mutex> lock(mutex_);
3539+ reason_ = reason;
3540+ error_message_ = error_message;
3541+ query_complete_ = true;
3542+ cond_.notify_one();
3543+ }
3544+
3545+ void wait_until_finished()
3546+ {
3547+ unique_lock<mutex> lock(mutex_);
3548+ cond_.wait(lock, [this] { return this->query_complete_; });
3549+ query_complete_ = false;
3550+ }
3551+
3552+ ListenerBase::Reason reason()
3553+ {
3554+ lock_guard<mutex> lock(mutex_);
3555+ return reason_;
3556+ }
3557+
3558+ string error_message()
3559+ {
3560+ lock_guard<mutex> lock(mutex_);
3561+ return error_message_;
3562+ }
3563+
3564+private:
3565+ bool query_complete_;
3566+ ListenerBase::Reason reason_;
3567+ string error_message_;
3568+ mutex mutex_;
3569+ condition_variable cond_;
3570+};
3571+
3572+// Check that invoking on a scope after a timeout exception from a previous
3573+// invocation works correctly. This tests that a failed socket is removed
3574+// from the connection pool in ZmqObject::invoke_twoway_().
3575+
3576+TEST(Invocation, timeout)
3577+{
3578+ auto rt = internal::RuntimeImpl::create("", "Runtime.ini");
3579+ auto mw = rt->factory()->create("TestScope", "Zmq", "Zmq.ini");
3580+ mw->start();
3581+ auto proxy = mw->create_scope_proxy("TestScope");
3582+ auto scope = internal::ScopeImpl::create(proxy, rt.get(), "no_such_scope");
3583+
3584+ auto receiver = make_shared<TestReceiver>();
3585+
3586+ // First call must time out.
3587+ scope->search("test", SearchMetadata("unused", "unused"), receiver);
3588+ receiver->wait_until_finished();
3589+
3590+ EXPECT_EQ(ListenerBase::Error, receiver->reason());
3591+ EXPECT_EQ("unity::scopes::TimeoutException: Request timed out after 300 milliseconds", receiver->error_message());
3592+
3593+ // this_thread::sleep_for(chrono::milliseconds(500));
3594+
3595+ // Second call must succeed
3596+ scope->search("test", SearchMetadata("unused", "unused"), receiver);
3597+ receiver->wait_until_finished();
3598+
3599+ EXPECT_EQ(ListenerBase::Finished, receiver->reason());
3600+ EXPECT_EQ("", receiver->error_message());
3601+}
3602+
3603+void scope_thread(Runtime::SPtr const& rt)
3604+{
3605+ TestScope scope;
3606+ rt->run_scope(&scope, "/foo");
3607+}
3608+
3609+int main(int argc, char **argv)
3610+{
3611+ ::testing::InitGoogleTest(&argc, argv);
3612+
3613+ Runtime::SPtr srt = move(Runtime::create_scope_runtime("TestScope", "Runtime.ini"));
3614+ std::thread scope_t(scope_thread, srt);
3615+
3616+ // Give thread some time to bind to its endpoint, to avoid getting ObjectNotExistException
3617+ // from a synchronous remote call.
3618+ this_thread::sleep_for(chrono::milliseconds(200));
3619+
3620+ auto rc = RUN_ALL_TESTS();
3621+
3622+ srt->destroy();
3623+ scope_t.join();
3624+
3625+ return rc;
3626+}
3627
3628=== added file 'test/gtest/scopes/Invocation/Registry.ini.in'
3629--- test/gtest/scopes/Invocation/Registry.ini.in 1970-01-01 00:00:00 +0000
3630+++ test/gtest/scopes/Invocation/Registry.ini.in 2014-04-28 05:57:56 +0000
3631@@ -0,0 +1,8 @@
3632+[Registry]
3633+Middleware = Zmq
3634+Zmq.Endpoint = ipc:///tmp/socket_for_registry
3635+Zmq.EndpointDir = /tmp
3636+Zmq.ConfigFile = Zmq.ini
3637+Scope.InstallDir = /unused
3638+Click.InstallDir = /unused
3639+Scoperunner.Path = /unused
3640
3641=== added file 'test/gtest/scopes/Invocation/Runtime.ini.in'
3642--- test/gtest/scopes/Invocation/Runtime.ini.in 1970-01-01 00:00:00 +0000
3643+++ test/gtest/scopes/Invocation/Runtime.ini.in 2014-04-28 05:57:56 +0000
3644@@ -0,0 +1,5 @@
3645+[Runtime]
3646+Registry.Identity = Registry
3647+Registry.ConfigFile = Registry.ini
3648+Default.Middleware = Zmq
3649+Zmq.ConfigFile = Zmq.ini
3650
3651=== added file 'test/gtest/scopes/Invocation/TestScope.cpp'
3652--- test/gtest/scopes/Invocation/TestScope.cpp 1970-01-01 00:00:00 +0000
3653+++ test/gtest/scopes/Invocation/TestScope.cpp 2014-04-28 05:57:56 +0000
3654@@ -0,0 +1,94 @@
3655+/*
3656+ * Copyright (C) 2014 Canonical Ltd
3657+ *
3658+ * This program is free software: you can redistribute it and/or modify
3659+ * it under the terms of the GNU Lesser General Public License version 3 as
3660+ * published by the Free Software Foundation.
3661+ *
3662+ * This program is distributed in the hope that it will be useful,
3663+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
3664+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
3665+ * GNU Lesser General Public License for more details.
3666+ *
3667+ * You should have received a copy of the GNU Lesser General Public License
3668+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
3669+ *
3670+ * Authored by: Michi Henning <michi.henning@canonical.com>
3671+ */
3672+
3673+#include "TestScope.h"
3674+
3675+#include <unity/scopes/CategorisedResult.h>
3676+#include <unity/scopes/ScopeBase.h>
3677+#include <unity/scopes/SearchReply.h>
3678+
3679+#include <mutex>
3680+#include <thread>
3681+
3682+using namespace std;
3683+using namespace unity::scopes;
3684+
3685+namespace
3686+{
3687+
3688+class TestQuery : public SearchQueryBase
3689+{
3690+public:
3691+ TestQuery()
3692+ {
3693+ }
3694+
3695+ virtual void cancelled() override
3696+ {
3697+ }
3698+
3699+ virtual void run(SearchReplyProxy const& reply) override
3700+ {
3701+ auto cat = reply->register_category("cat1", "Category 1", "");
3702+ CategorisedResult res(cat);
3703+ res.set_uri("uri");
3704+ res.set_title("title");
3705+ res.set_art("art");
3706+ res.set_dnd_uri("dnd_uri");
3707+ reply->push(res);
3708+ }
3709+};
3710+
3711+} // namespace
3712+
3713+int TestScope::start(string const&, RegistryProxy const &)
3714+{
3715+ return VERSION;
3716+}
3717+
3718+void TestScope::stop()
3719+{
3720+}
3721+
3722+void TestScope::run()
3723+{
3724+}
3725+
3726+namespace
3727+{
3728+
3729+mutex m;
3730+int count = 0;
3731+
3732+} // namespace
3733+
3734+SearchQueryBase::UPtr TestScope::search(CannedQuery const&, SearchMetadata const &)
3735+{
3736+ lock_guard<mutex> lock(m);
3737+
3738+ if (count++ == 0)
3739+ {
3740+ this_thread::sleep_for(chrono::milliseconds(400)); // Force timeout on first call
3741+ }
3742+ return SearchQueryBase::UPtr(new TestQuery());
3743+}
3744+
3745+PreviewQueryBase::UPtr TestScope::preview(Result const&, ActionMetadata const &)
3746+{
3747+ return nullptr; // unused
3748+}
3749
3750=== added file 'test/gtest/scopes/Invocation/TestScope.h'
3751--- test/gtest/scopes/Invocation/TestScope.h 1970-01-01 00:00:00 +0000
3752+++ test/gtest/scopes/Invocation/TestScope.h 2014-04-28 05:57:56 +0000
3753@@ -0,0 +1,40 @@
3754+/*
3755+ * Copyright (C) 2014 Canonical Ltd
3756+ *
3757+ * This program is free software: you can redistribute it and/or modify
3758+ * it under the terms of the GNU Lesser General Public License version 3 as
3759+ * published by the Free Software Foundation.
3760+ *
3761+ * This program is distributed in the hope that it will be useful,
3762+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
3763+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
3764+ * GNU Lesser General Public License for more details.
3765+ *
3766+ * You should have received a copy of the GNU Lesser General Public License
3767+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
3768+ *
3769+ * Authored by: Michi Henning <michi.henning@canonical.com>
3770+ */
3771+
3772+#ifndef TEST_TESTSCOPE_H
3773+#define TEST_TESTSCOPE_H
3774+
3775+#include <unity/scopes/ScopeBase.h>
3776+
3777+using namespace std;
3778+using namespace unity::scopes;
3779+
3780+class TestScope : public ScopeBase
3781+{
3782+public:
3783+ virtual int start(string const&, RegistryProxy const &) override;
3784+
3785+ virtual void stop() override;
3786+
3787+ virtual void run() override;
3788+
3789+ virtual SearchQueryBase::UPtr search(CannedQuery const &, SearchMetadata const &) override;
3790+ virtual PreviewQueryBase::UPtr preview(Result const&, ActionMetadata const &) override;
3791+};
3792+
3793+#endif
3794
3795=== added file 'test/gtest/scopes/Invocation/Zmq.ini.in'
3796--- test/gtest/scopes/Invocation/Zmq.ini.in 1970-01-01 00:00:00 +0000
3797+++ test/gtest/scopes/Invocation/Zmq.ini.in 2014-04-28 05:57:56 +0000
3798@@ -0,0 +1,3 @@
3799+[Zmq]
3800+EndpointDir.Public = /tmp
3801+EndpointDir.Private = /tmp
3802
3803=== modified file 'test/gtest/scopes/Registry/Registry_test.cpp'
3804--- test/gtest/scopes/Registry/Registry_test.cpp 2014-04-08 10:34:19 +0000
3805+++ test/gtest/scopes/Registry/Registry_test.cpp 2014-04-28 05:57:56 +0000
3806@@ -24,8 +24,12 @@
3807 #include <boost/filesystem.hpp>
3808 #include <boost/system/error_code.hpp>
3809 #include <boost/filesystem/operations.hpp>
3810+#include <gtest/gtest.h>
3811+
3812+#include <condition_variable>
3813 #include <functional>
3814-#include <gtest/gtest.h>
3815+#include <mutex>
3816+
3817 #include <signal.h>
3818 #include <unistd.h>
3819
3820@@ -34,13 +38,40 @@
3821 class Receiver : public SearchListenerBase
3822 {
3823 public:
3824+ Receiver()
3825+ : done_(false)
3826+ , finished_ok_(false)
3827+ {
3828+ }
3829+
3830 virtual void push(CategorisedResult /* result */) override
3831 {
3832 }
3833
3834- virtual void finished(ListenerBase::Reason /* reason */, std::string const& /* error_message */) override
3835- {
3836- }
3837+ virtual void finished(ListenerBase::Reason reason, std::string const& /* error_message */) override
3838+ {
3839+ std::lock_guard<std::mutex> lock(mutex_);
3840+
3841+ EXPECT_EQ(Finished, reason);
3842+ finished_ok_ = reason == Finished;
3843+ done_ = true;
3844+ cond_.notify_all();
3845+ }
3846+
3847+ bool wait_until_finished()
3848+ {
3849+ std::unique_lock<std::mutex> lock(mutex_);
3850+ auto now = std::chrono::steady_clock::now();
3851+ auto expiry_time = now + std::chrono::seconds(5);
3852+ EXPECT_TRUE(cond_.wait_until(lock, expiry_time, [this]{ return done_; })) << "finished message not delivered";
3853+ return finished_ok_;
3854+ }
3855+
3856+private:
3857+ bool done_;
3858+ bool finished_ok_;
3859+ std::mutex mutex_;
3860+ std::condition_variable cond_;
3861 };
3862
3863 TEST(Registry, metadata)
3864@@ -75,18 +106,13 @@
3865
3866 auto sp = meta.proxy();
3867
3868- SearchListenerBase::SPtr reply(new Receiver);
3869+ auto receiver = std::make_shared<Receiver>();
3870+ SearchListenerBase::SPtr reply(receiver);
3871 SearchMetadata metadata("C", "desktop");
3872
3873 // search would fail if testscopeB can't be executed
3874- try
3875- {
3876- auto ctrl = sp->search("foo", metadata, reply);
3877- }
3878- catch (...)
3879- {
3880- FAIL();
3881- }
3882+ auto ctrl = sp->search("foo", metadata, reply);
3883+ EXPECT_TRUE(receiver->wait_until_finished());
3884 }
3885
3886 bool wait_for_registry()
3887
3888=== modified file 'test/gtest/scopes/Runtime/CMakeLists.txt'
3889--- test/gtest/scopes/Runtime/CMakeLists.txt 2014-02-19 04:18:26 +0000
3890+++ test/gtest/scopes/Runtime/CMakeLists.txt 2014-04-28 05:57:56 +0000
3891@@ -2,7 +2,7 @@
3892 configure_file(Runtime.ini.in ${CMAKE_CURRENT_BINARY_DIR}/Runtime.ini)
3893 configure_file(Zmq.ini.in ${CMAKE_CURRENT_BINARY_DIR}/Zmq.ini)
3894
3895-add_executable(Runtime_test Runtime_test.cpp TestScope.cpp PusherScope.cpp)
3896+add_executable(Runtime_test Runtime_test.cpp TestScope.cpp PusherScope.cpp SlowCreateScope.cpp)
3897 target_link_libraries(Runtime_test ${TESTLIBS})
3898
3899 add_test(Runtime Runtime_test)
3900
3901=== modified file 'test/gtest/scopes/Runtime/PusherScope.cpp'
3902--- test/gtest/scopes/Runtime/PusherScope.cpp 2014-03-07 01:07:28 +0000
3903+++ test/gtest/scopes/Runtime/PusherScope.cpp 2014-04-28 05:57:56 +0000
3904@@ -23,11 +23,16 @@
3905 #include <unity/scopes/ScopeBase.h>
3906 #include <unity/scopes/SearchReply.h>
3907
3908+#include <atomic>
3909+
3910 #include <gtest/gtest.h>
3911
3912 using namespace std;
3913 using namespace unity::scopes;
3914
3915+namespace
3916+{
3917+
3918 class PusherQuery : public SearchQueryBase
3919 {
3920 public:
3921@@ -59,9 +64,11 @@
3922 }
3923
3924 private:
3925- int cardinality_;
3926+ atomic_int cardinality_;
3927 };
3928
3929+} // namespace
3930+
3931 int PusherScope::start(string const&, RegistryProxy const &)
3932 {
3933 return VERSION;
3934
3935=== modified file 'test/gtest/scopes/Runtime/PusherScope.h'
3936--- test/gtest/scopes/Runtime/PusherScope.h 2014-02-27 23:20:56 +0000
3937+++ test/gtest/scopes/Runtime/PusherScope.h 2014-04-28 05:57:56 +0000
3938@@ -16,6 +16,9 @@
3939 * Authored by: Michi Henning <michi.henning@canonical.com>
3940 */
3941
3942+#ifndef TEST_PUSHERSCOPE_H
3943+#define TEST_PUSHERSCOPE_H
3944+
3945 #include <unity/scopes/ScopeBase.h>
3946
3947 using namespace std;
3948@@ -33,3 +36,5 @@
3949 virtual SearchQueryBase::UPtr search(CannedQuery const &, SearchMetadata const &) override;
3950 virtual PreviewQueryBase::UPtr preview(Result const& result, ActionMetadata const& metadata) override;
3951 };
3952+
3953+#endif
3954
3955=== modified file 'test/gtest/scopes/Runtime/Runtime_test.cpp'
3956--- test/gtest/scopes/Runtime/Runtime_test.cpp 2014-04-03 14:22:02 +0000
3957+++ test/gtest/scopes/Runtime/Runtime_test.cpp 2014-04-28 05:57:56 +0000
3958@@ -1,5 +1,5 @@
3959 /*
3960-* Copyright (C) 2013 Canonical Ltd
3961+ * Copyright (C) 2013 Canonical Ltd
3962 *
3963 * This program is free software: you can redistribute it and/or modify
3964 * it under the terms of the GNU Lesser General Public License version 3 as
3965@@ -22,20 +22,22 @@
3966
3967 #include <mutex>
3968
3969+#include <unity/scopes/ActionMetadata.h>
3970 #include <unity/scopes/CategorisedResult.h>
3971+#include <unity/scopes/internal/MWScope.h>
3972+#include <unity/scopes/internal/RuntimeImpl.h>
3973+#include <unity/scopes/internal/ScopeImpl.h>
3974 #include <unity/scopes/ListenerBase.h>
3975+#include <unity/scopes/QueryCtrl.h>
3976 #include <unity/scopes/Runtime.h>
3977-#include <unity/scopes/internal/RuntimeImpl.h>
3978-#include <unity/scopes/internal/MWScope.h>
3979-#include <unity/scopes/internal/ScopeImpl.h>
3980-#include <unity/scopes/ActionMetadata.h>
3981 #include <unity/scopes/SearchMetadata.h>
3982 #include <unity/UnityExceptions.h>
3983
3984 #include <gtest/gtest.h>
3985
3986+#include "PusherScope.h"
3987+#include "SlowCreateScope.h"
3988 #include "TestScope.h"
3989-#include "PusherScope.h"
3990
3991 using namespace std;
3992 using namespace unity::scopes;
3993@@ -82,6 +84,7 @@
3994 count_++;
3995 last_result_ = std::make_shared<Result>(result);
3996 }
3997+
3998 virtual void push(Annotation annotation) override
3999 {
4000 EXPECT_EQ(1u, annotation.links().size());
4001@@ -92,6 +95,7 @@
4002 EXPECT_EQ("dep1", query.department_id());
4003 annotation_count_++;
4004 }
4005+
4006 virtual void finished(ListenerBase::Reason reason, string const& error_message) override
4007 {
4008 EXPECT_EQ(Finished, reason);
4009@@ -104,15 +108,18 @@
4010 query_complete_ = true;
4011 cond_.notify_one();
4012 }
4013+
4014 void wait_until_finished()
4015 {
4016 unique_lock<mutex> lock(mutex_);
4017 cond_.wait(lock, [this] { return this->query_complete_; });
4018 }
4019+
4020 std::shared_ptr<Result> last_result()
4021 {
4022 return last_result_;
4023 }
4024+
4025 private:
4026 bool query_complete_;
4027 mutex mutex_;
4028@@ -225,6 +232,47 @@
4029 receiver->wait_until_finished();
4030 }
4031
4032+TEST(Runtime, consecutive_search)
4033+{
4034+ auto rt = internal::RuntimeImpl::create("", "Runtime.ini");
4035+ auto mw = rt->factory()->create("TestScope", "Zmq", "Zmq.ini");
4036+ mw->start();
4037+
4038+ auto proxy = mw->create_scope_proxy("TestScope");
4039+ auto scope = internal::ScopeImpl::create(proxy, rt.get(), "TestScope");
4040+
4041+ auto receiver = make_shared<Receiver>();
4042+ auto ctrl = scope->search("test", SearchMetadata("en", "phone"), receiver);
4043+ receiver->wait_until_finished();
4044+
4045+ std::vector<std::shared_ptr<Receiver>> replies;
4046+
4047+ const int num_searches = 100;
4048+ for (int i = 0; i < num_searches; ++i)
4049+ {
4050+ replies.push_back(std::make_shared<Receiver>());
4051+ scope->search("test", SearchMetadata("en", "phone"), replies.back());
4052+ }
4053+
4054+ for (int i = 0; i < num_searches; ++i)
4055+ {
4056+ replies[i]->wait_until_finished();
4057+ }
4058+
4059+ // Do it again, to test that re-use of previously-created connections works.
4060+ replies.clear();
4061+ for (int i = 0; i < num_searches; ++i)
4062+ {
4063+ replies.push_back(std::make_shared<Receiver>());
4064+ scope->search("test", SearchMetadata("en", "phone"), replies.back());
4065+ }
4066+
4067+ for (int i = 0; i < num_searches; ++i)
4068+ {
4069+ replies[i]->wait_until_finished();
4070+ }
4071+}
4072+
4073 TEST(Runtime, preview)
4074 {
4075 // connect to scope and run a query
4076@@ -272,6 +320,68 @@
4077 receiver->wait_until_finished();
4078 }
4079
4080+class CancelReceiver : public SearchListenerBase
4081+{
4082+public:
4083+ CancelReceiver()
4084+ : query_complete_(false)
4085+ {
4086+ }
4087+
4088+ virtual void push(CategorisedResult /* result */) override
4089+ {
4090+ FAIL();
4091+ }
4092+
4093+ virtual void finished(ListenerBase::Reason reason, string const& error_message) override
4094+ {
4095+ EXPECT_EQ(Cancelled, reason);
4096+ EXPECT_EQ("", error_message);
4097+ // Signal that the query has completed.
4098+ unique_lock<mutex> lock(mutex_);
4099+ query_complete_ = true;
4100+ cond_.notify_one();
4101+ }
4102+
4103+ void wait_until_finished()
4104+ {
4105+ unique_lock<mutex> lock(mutex_);
4106+ cond_.wait(lock, [this] { return this->query_complete_; });
4107+ }
4108+
4109+private:
4110+ bool query_complete_;
4111+ mutex mutex_;
4112+ condition_variable cond_;
4113+};
4114+
4115+TEST(Runtime, early_cancel)
4116+{
4117+ auto rt = internal::RuntimeImpl::create("", "Runtime.ini");
4118+ auto mw = rt->factory()->create("SlowCreateScope", "Zmq", "Zmq.ini");
4119+ mw->start();
4120+ auto proxy = mw->create_scope_proxy("SlowCreateScope");
4121+ auto scope = internal::ScopeImpl::create(proxy, rt.get(), "SlowCreateScope");
4122+
4123+ // Check that, if a cancel is sent before search() returns on the server side, the
4124+ // cancel is correctly forwarded to the scope once the real reply proxy arrives
4125+ // over the wire.
4126+ auto receiver = make_shared<CancelReceiver>();
4127+ auto ctrl = scope->search("test", SearchMetadata("unused", "unused"), receiver);
4128+ // Allow some time for the search message to get there.
4129+ this_thread::sleep_for(chrono::milliseconds(100));
4130+ // search() in the scope doesn't return for some time, so the cancel() that follows
4131+ // is sent to the "fake" QueryCtrlProxy.
4132+ ctrl->cancel();
4133+ receiver->wait_until_finished();
4134+ // The receiver receives its cancel from the client-side run time instead of the
4135+ // scope because the run time short-cuts sending the cancel locally instead
4136+ // of waiting for the cancel message from the scope. Allow some time for the
4137+ // cancel to reach the scope before shutting down the run time, so the scope
4138+ // can test that it received the cancel.
4139+ this_thread::sleep_for(chrono::milliseconds(200));
4140+}
4141+
4142 void scope_thread(Runtime::SPtr const& rt)
4143 {
4144 TestScope scope;
4145@@ -284,6 +394,12 @@
4146 rt->run_scope(&scope, "/foo");
4147 }
4148
4149+void slow_create_thread(Runtime::SPtr const& rt)
4150+{
4151+ SlowCreateScope scope;
4152+ rt->run_scope(&scope, "/foo");
4153+}
4154+
4155 int main(int argc, char **argv)
4156 {
4157 ::testing::InitGoogleTest(&argc, argv);
4158@@ -294,6 +410,13 @@
4159 Runtime::SPtr prt = move(Runtime::create_scope_runtime("PusherScope", "Runtime.ini"));
4160 std::thread pusher_t(pusher_thread, prt);
4161
4162+ Runtime::SPtr scrt = move(Runtime::create_scope_runtime("SlowCreateScope", "Runtime.ini"));
4163+ std::thread slow_create_t(slow_create_thread, scrt);
4164+
4165+ // Give threads some time to bind to their endpoints, to avoid getting ObjectNotExistException
4166+ // from a synchronous remote call.
4167+ this_thread::sleep_for(chrono::milliseconds(200));
4168+
4169 auto rc = RUN_ALL_TESTS();
4170
4171 srt->destroy();
4172@@ -302,5 +425,8 @@
4173 prt->destroy();
4174 pusher_t.join();
4175
4176+ scrt->destroy();
4177+ slow_create_t.join();
4178+
4179 return rc;
4180 }
4181
4182=== added file 'test/gtest/scopes/Runtime/SlowCreateScope.cpp'
4183--- test/gtest/scopes/Runtime/SlowCreateScope.cpp 1970-01-01 00:00:00 +0000
4184+++ test/gtest/scopes/Runtime/SlowCreateScope.cpp 2014-04-28 05:57:56 +0000
4185@@ -0,0 +1,101 @@
4186+/*
4187+ * Copyright (C) 2013 Canonical Ltd
4188+ *
4189+ * This program is free software: you can redistribute it and/or modify
4190+ * it under the terms of the GNU Lesser General Public License version 3 as
4191+ * published by the Free Software Foundation.
4192+ *
4193+ * This program is distributed in the hope that it will be useful,
4194+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
4195+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
4196+ * GNU Lesser General Public License for more details.
4197+ *
4198+ * You should have received a copy of the GNU Lesser General Public License
4199+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
4200+ *
4201+ * Authored by: Michi Henning <michi.henning@canonical.com>
4202+ */
4203+
4204+#include <unity/scopes/ScopeBase.h>
4205+
4206+#include "SlowCreateScope.h"
4207+
4208+#include <condition_variable>
4209+#include <mutex>
4210+#include <thread>
4211+
4212+#include <gtest/gtest.h>
4213+
4214+using namespace std;
4215+using namespace unity::scopes;
4216+
4217+namespace
4218+{
4219+
4220+class TestQuery : public SearchQueryBase
4221+{
4222+public:
4223+ TestQuery()
4224+ {
4225+ lock_guard<mutex> lock(mutex_);
4226+ cancelled_ = false;
4227+ }
4228+
4229+ virtual void cancelled() override
4230+ {
4231+ lock_guard<mutex> lock(mutex_);
4232+ cancelled_ = true;
4233+ cond_.notify_all();
4234+ }
4235+
4236+ virtual void run(SearchReplyProxy const&) override
4237+ {
4238+ if (!valid())
4239+ {
4240+ return; // Query was cancelled already
4241+ }
4242+ // We time out the wait because, otherwise, the scope's object adapter can't shut down
4243+ // and the test hangs forever.
4244+ // If this fails, the cancelled method wasn't called after five seconds.
4245+ auto stop_time = chrono::steady_clock::now() + chrono::seconds(5);
4246+ unique_lock<mutex> lock(mutex_);
4247+ EXPECT_TRUE(cond_.wait_until(lock, stop_time, [this]{ return cancelled_;}));
4248+ }
4249+
4250+private:
4251+ mutex mutex_;
4252+ condition_variable cond_;
4253+ bool cancelled_;
4254+};
4255+
4256+} // namespace
4257+
4258+int SlowCreateScope::start(string const&, RegistryProxy const &)
4259+{
4260+ return VERSION;
4261+}
4262+
4263+void SlowCreateScope::stop()
4264+{
4265+}
4266+
4267+void SlowCreateScope::run()
4268+{
4269+}
4270+
4271+SearchQueryBase::UPtr SlowCreateScope::search(CannedQuery const&, SearchMetadata const &)
4272+{
4273+ // Sleep for a while. This allows the client to call cancel() before this function
4274+ // returns, while the test still holds a fake QueryCtrl returned by the async
4275+ // invocation. When this method returns, the client-side run time calls
4276+ // the real cancel, which triggers the cancelled() callback above, which, in turn,
4277+ // causes TestQuery::run() to complete.
4278+ this_thread::sleep_for(chrono::milliseconds(200));
4279+
4280+ return SearchQueryBase::UPtr(new TestQuery());
4281+}
4282+
4283+PreviewQueryBase::UPtr SlowCreateScope::preview(Result const&, ActionMetadata const &)
4284+{
4285+ return nullptr; // Not called
4286+}
4287
4288=== added file 'test/gtest/scopes/Runtime/SlowCreateScope.h'
4289--- test/gtest/scopes/Runtime/SlowCreateScope.h 1970-01-01 00:00:00 +0000
4290+++ test/gtest/scopes/Runtime/SlowCreateScope.h 2014-04-28 05:57:56 +0000
4291@@ -0,0 +1,40 @@
4292+/*
4293+ * Copyright (C) 2013 Canonical Ltd
4294+ *
4295+ * This program is free software: you can redistribute it and/or modify
4296+ * it under the terms of the GNU Lesser General Public License version 3 as
4297+ * published by the Free Software Foundation.
4298+ *
4299+ * This program is distributed in the hope that it will be useful,
4300+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
4301+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
4302+ * GNU Lesser General Public License for more details.
4303+ *
4304+ * You should have received a copy of the GNU Lesser General Public License
4305+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
4306+ *
4307+ * Authored by: Michi Henning <michi.henning@canonical.com>
4308+ */
4309+
4310+#ifndef TEST_SLOWCREATESCOPE_H
4311+#define TEST_SLOWCREATESCOPE_H
4312+
4313+#include <unity/scopes/ScopeBase.h>
4314+
4315+using namespace std;
4316+using namespace unity::scopes;
4317+
4318+class SlowCreateScope : public ScopeBase
4319+{
4320+public:
4321+ virtual int start(string const&, RegistryProxy const &) override;
4322+
4323+ virtual void stop() override;
4324+
4325+ virtual void run() override;
4326+
4327+ virtual SearchQueryBase::UPtr search(CannedQuery const &, SearchMetadata const &) override;
4328+ virtual PreviewQueryBase::UPtr preview(Result const&, ActionMetadata const &) override;
4329+};
4330+
4331+#endif
4332
4333=== modified file 'test/gtest/scopes/Runtime/TestScope.cpp'
4334--- test/gtest/scopes/Runtime/TestScope.cpp 2014-03-07 01:07:28 +0000
4335+++ test/gtest/scopes/Runtime/TestScope.cpp 2014-04-28 05:57:56 +0000
4336@@ -30,6 +30,9 @@
4337 using namespace std;
4338 using namespace unity::scopes;
4339
4340+namespace
4341+{
4342+
4343 class TestQuery : public SearchQueryBase
4344 {
4345 public:
4346@@ -41,6 +44,7 @@
4347 virtual void cancelled() override
4348 {
4349 }
4350+
4351 virtual void run(SearchReplyProxy const& reply) override
4352 {
4353 Department dep("news", query_, "News");
4354@@ -65,6 +69,8 @@
4355 CannedQuery query_;
4356 };
4357
4358+} // namespace
4359+
4360 class TestPreview : public PreviewQueryBase
4361 {
4362 public:
4363
4364=== modified file 'test/gtest/scopes/Runtime/TestScope.h'
4365--- test/gtest/scopes/Runtime/TestScope.h 2014-02-27 23:20:56 +0000
4366+++ test/gtest/scopes/Runtime/TestScope.h 2014-04-28 05:57:56 +0000
4367@@ -16,6 +16,9 @@
4368 * Authored by: James Henstridge <james.henstridge@canonical.com>
4369 */
4370
4371+#ifndef TEST_TESTSCOPE_H
4372+#define TEST_TESTSCOPE_H
4373+
4374 #include <unity/scopes/ScopeBase.h>
4375
4376 using namespace std;
4377@@ -33,3 +36,5 @@
4378 virtual SearchQueryBase::UPtr search(CannedQuery const &, SearchMetadata const &) override;
4379 virtual PreviewQueryBase::UPtr preview(Result const&, ActionMetadata const &) override;
4380 };
4381+
4382+#endif
4383
4384=== modified file 'test/gtest/scopes/ScopeExceptions/ScopeExceptions_test.cpp'
4385--- test/gtest/scopes/ScopeExceptions/ScopeExceptions_test.cpp 2014-01-09 09:47:56 +0000
4386+++ test/gtest/scopes/ScopeExceptions/ScopeExceptions_test.cpp 2014-04-28 05:57:56 +0000
4387@@ -60,3 +60,32 @@
4388 EXPECT_EQ(e.name(), e2.name());
4389 }
4390 }
4391+
4392+TEST(ObjectNotExistException, state)
4393+{
4394+ {
4395+ ObjectNotExistException e("some error", "some id");
4396+ EXPECT_STREQ("unity::scopes::ObjectNotExistException: some error (id = some id)", e.what());
4397+ EXPECT_EQ("unity::scopes::ObjectNotExistException", e.name());
4398+ EXPECT_EQ("some id", e.id());
4399+ EXPECT_THROW(rethrow_exception(e.self()), ObjectNotExistException);
4400+ ObjectNotExistException e2("blah", "some id");
4401+ e2 = e;
4402+ EXPECT_EQ(e.reason(), e2.reason());
4403+ EXPECT_EQ(e.name(), e2.name());
4404+ }
4405+}
4406+
4407+TEST(TimeoutException, state)
4408+{
4409+ {
4410+ TimeoutException e("some error");
4411+ EXPECT_STREQ("unity::scopes::TimeoutException: some error", e.what());
4412+ EXPECT_EQ("unity::scopes::TimeoutException", e.name());
4413+ EXPECT_THROW(rethrow_exception(e.self()), TimeoutException);
4414+ TimeoutException e2("blah");
4415+ e2 = e;
4416+ EXPECT_EQ(e.reason(), e2.reason());
4417+ EXPECT_EQ(e.name(), e2.name());
4418+ }
4419+}
4420
4421=== modified file 'test/gtest/scopes/internal/ThreadPool/ThreadPool_test.cpp'
4422--- test/gtest/scopes/internal/ThreadPool/ThreadPool_test.cpp 2014-01-24 03:18:25 +0000
4423+++ test/gtest/scopes/internal/ThreadPool/ThreadPool_test.cpp 2014-04-28 05:57:56 +0000
4424@@ -104,3 +104,14 @@
4425 EXPECT_STREQ("unity::InvalidArgumentException: ThreadPool(): invalid pool size: -1", e.what());
4426 }
4427 }
4428+
4429+TEST(ThreadPool, throwing_task)
4430+{
4431+ ThreadPool p(1);
4432+ auto thrower = [](){ throw std::logic_error("some error"); };
4433+ auto thrower2 = [](){ throw 99; };
4434+ auto f = p.submit(thrower);
4435+ auto f2 = p.submit(thrower2);
4436+ EXPECT_THROW(f.get(), std::logic_error);
4437+ EXPECT_THROW(f2.get(), int);
4438+}
4439
4440=== modified file 'test/gtest/scopes/internal/ThreadSafeQueue/ThreadSafeQueue_test.cpp'
4441--- test/gtest/scopes/internal/ThreadSafeQueue/ThreadSafeQueue_test.cpp 2014-02-25 12:08:41 +0000
4442+++ test/gtest/scopes/internal/ThreadSafeQueue/ThreadSafeQueue_test.cpp 2014-04-28 05:57:56 +0000
4443@@ -74,14 +74,26 @@
4444
4445 TEST(ThreadSafeQueue, exception)
4446 {
4447- unique_ptr<ThreadSafeQueue<string>> q(new ThreadSafeQueue<string>);
4448- q->push("fred");
4449- auto f = waiter_ready.get_future();
4450- auto t = thread(waiter_thread, q.get());
4451- f.wait();
4452- this_thread::sleep_for(chrono::milliseconds(50)); // Make sure child thread has time to call wait_and_pop()
4453- q.reset();
4454- t.join();
4455+ {
4456+ unique_ptr<ThreadSafeQueue<string>> q(new ThreadSafeQueue<string>);
4457+ q->push("fred");
4458+ auto f = waiter_ready.get_future();
4459+ auto t = thread(waiter_thread, q.get());
4460+ f.wait();
4461+ this_thread::sleep_for(chrono::milliseconds(50)); // Make sure child thread has time to call wait_and_pop()
4462+ q->destroy();
4463+ t.join();
4464+
4465+ try
4466+ {
4467+ q->push("fred");
4468+ FAIL();
4469+ }
4470+ catch (std::runtime_error const& e)
4471+ {
4472+ EXPECT_STREQ("ThreadSafeQueue: cannot push onto destroyed queue", e.what());
4473+ }
4474+ }
4475 }
4476
4477 atomic_int count;
4478@@ -112,6 +124,7 @@
4479
4480 // Destroy the queue while multiple threads are sleeping in wait_and_pop().
4481 q.destroy();
4482+ q.wait_for_destroy();
4483
4484 for (auto& t : threads)
4485 {
4486@@ -120,6 +133,45 @@
4487 EXPECT_EQ(20, count);
4488 }
4489
4490+void destroy_thread(ThreadSafeQueue<int>* q)
4491+{
4492+ this_thread::sleep_for(chrono::milliseconds(100));
4493+ EXPECT_EQ(0u, q->size());
4494+ q->destroy();
4495+}
4496+
4497+TEST(ThreadSafeQueue, destroy_while_empty)
4498+{
4499+ ThreadSafeQueue<int> q;
4500+ thread t(destroy_thread, &q);
4501+ q.wait_for_destroy();
4502+ EXPECT_EQ(0u, q.size());
4503+ t.join();
4504+}
4505+
4506+void wait_for_destroy_thread(ThreadSafeQueue<int>* q)
4507+{
4508+ q->wait_for_destroy();
4509+}
4510+
4511+TEST(ThreadSafeQueue, destroy_while_waiting_for_destroy)
4512+{
4513+ ThreadSafeQueue<int> q;
4514+ q.push(42);
4515+ thread t(wait_for_destroy_thread, &q);
4516+ this_thread::sleep_for(chrono::milliseconds(100));
4517+ q.destroy();
4518+ t.join();
4519+
4520+ // Call destroy() and wait_for_destroy() again, to make sure they do nothing.
4521+ q.destroy();
4522+ q.wait_for_destroy();
4523+ q.destroy();
4524+ q.destroy();
4525+ q.wait_for_destroy();
4526+ q.wait_for_destroy();
4527+}
4528+
4529 class MoveOnly
4530 {
4531 public:
4532@@ -156,4 +208,15 @@
4533 EXPECT_EQ("world", m.val());
4534 m = q.wait_and_pop();
4535 EXPECT_EQ("again", m.val());
4536+
4537+ q.destroy();
4538+ try
4539+ {
4540+ q.push(move(MoveOnly("no_push")));
4541+ FAIL();
4542+ }
4543+ catch (std::runtime_error const& e)
4544+ {
4545+ EXPECT_STREQ("ThreadSafeQueue: cannot push onto destroyed queue", e.what());
4546+ }
4547 }
4548
4549=== modified file 'test/gtest/scopes/internal/smartscopes/smartscopesproxy/smartscopesproxy_test.cpp'
4550--- test/gtest/scopes/internal/smartscopes/smartscopesproxy/smartscopesproxy_test.cpp 2014-04-01 15:26:50 +0000
4551+++ test/gtest/scopes/internal/smartscopes/smartscopesproxy/smartscopesproxy_test.cpp 2014-04-28 05:57:56 +0000
4552@@ -77,11 +77,8 @@
4553
4554 ~smartscopesproxytest()
4555 {
4556- scope_mw_->stop();
4557- scope_mw_->wait_for_shutdown();
4558-
4559- reg_mw_->stop();
4560- reg_mw_->wait_for_shutdown();
4561+ scope_rt_->destroy();
4562+ reg_rt_->destroy();
4563 }
4564
4565 protected:
4566
4567=== modified file 'test/gtest/scopes/internal/zmq_middleware/CMakeLists.txt'
4568--- test/gtest/scopes/internal/zmq_middleware/CMakeLists.txt 2014-01-28 06:34:03 +0000
4569+++ test/gtest/scopes/internal/zmq_middleware/CMakeLists.txt 2014-04-28 05:57:56 +0000
4570@@ -1,6 +1,7 @@
4571 add_subdirectory(ObjectAdapter)
4572 add_subdirectory(RegistryI)
4573 add_subdirectory(ServantBase)
4574+add_subdirectory(StopPublisher)
4575 add_subdirectory(Util)
4576 add_subdirectory(VariantConverter)
4577 add_subdirectory(ZmqMiddleware)
4578
4579=== modified file 'test/gtest/scopes/internal/zmq_middleware/ObjectAdapter/ObjectAdapter_test.cpp'
4580--- test/gtest/scopes/internal/zmq_middleware/ObjectAdapter/ObjectAdapter_test.cpp 2014-01-24 11:16:14 +0000
4581+++ test/gtest/scopes/internal/zmq_middleware/ObjectAdapter/ObjectAdapter_test.cpp 2014-04-28 05:57:56 +0000
4582@@ -44,7 +44,7 @@
4583 // an occastional "address in use" exception because the new socket
4584 // tries to bind while the old one is still in the process of destroying itself.
4585
4586-void wait(int millisec = 20)
4587+void wait(int millisec = 200)
4588 {
4589 this_thread::sleep_for(chrono::milliseconds(millisec));
4590 }
4591@@ -176,7 +176,7 @@
4592 }
4593 catch (MiddlewareException const& e)
4594 {
4595- EXPECT_STREQ("unity::scopes::MiddlewareException: Object adapter in Destroyed "
4596+ EXPECT_STREQ("unity::scopes::MiddlewareException: activate(): Object adapter in Destroyed "
4597 "state (adapter: testscope)",
4598 e.what());
4599 }
4600@@ -194,25 +194,22 @@
4601 }
4602 catch (MiddlewareException const& e)
4603 {
4604- EXPECT_STREQ("unity::scopes::MiddlewareException: ObjectAdapter::run_workers(): broker thread "
4605+ EXPECT_STREQ("unity::scopes::MiddlewareException: ObjectAdapter::run_workers(): stop thread "
4606 "failure (adapter: testscope):\n"
4607- " unity::scopes::MiddlewareException: ObjectAdapter: broker thread failure "
4608- "(adapter: testscope):\n"
4609+ " unity::scopes::MiddlewareException: StopPublisher(): publisher thread "
4610+ "failed (endpoint: inproc://testscope-stopper):\n"
4611 " Address already in use",
4612 e.what());
4613 }
4614+
4615 try
4616 {
4617 b.shutdown();
4618 }
4619 catch (MiddlewareException const& e)
4620 {
4621- EXPECT_STREQ("unity::scopes::MiddlewareException: Object adapter in Failed state (adapter: testscope)\n"
4622- " Exception history:\n"
4623- " Exception #1:\n"
4624- " unity::scopes::MiddlewareException: ObjectAdapter: broker thread failure "
4625- "(adapter: testscope):\n"
4626- " Address already in use",
4627+ EXPECT_STREQ("unity::scopes::MiddlewareException: shutdown() [state_ == Failed]: "
4628+ "Object adapter in Failed state (adapter: testscope)",
4629 e.what());
4630 }
4631 try
4632@@ -221,13 +218,9 @@
4633 }
4634 catch (MiddlewareException const& e)
4635 {
4636- EXPECT_STREQ("unity::scopes::MiddlewareException: Object adapter in Failed state (adapter: testscope)\n"
4637- " Exception history:\n"
4638- " Exception #1:\n"
4639- " unity::scopes::MiddlewareException: ObjectAdapter: broker thread failure "
4640- "(adapter: testscope):\n"
4641- " Address already in use",
4642- e.what());
4643+ EXPECT_STREQ("unity::scopes::MiddlewareException: wait_for_shutdown(): "
4644+ "Object adapter in Failed state (adapter: testscope)",
4645+ e.what());
4646 }
4647 }
4648 }
4649@@ -907,7 +900,8 @@
4650 }
4651 catch (MiddlewareException const& e)
4652 {
4653- EXPECT_STREQ("unity::scopes::MiddlewareException: Object adapter in Destroyed state (adapter: testscope)",
4654+ EXPECT_STREQ("unity::scopes::MiddlewareException: remove(): "
4655+ "Object adapter in Destroyed state (adapter: testscope)",
4656 e.what());
4657 }
4658 };
4659@@ -1005,7 +999,7 @@
4660 }
4661 catch (MiddlewareException const& e)
4662 {
4663- EXPECT_STREQ("unity::scopes::MiddlewareException: Object adapter in Destroyed "
4664+ EXPECT_STREQ("unity::scopes::MiddlewareException: add(): Object adapter in Destroyed "
4665 "state (adapter: testscope)",
4666 e.what());
4667 }
4668@@ -1016,7 +1010,7 @@
4669 }
4670 catch (MiddlewareException const& e)
4671 {
4672- EXPECT_STREQ("unity::scopes::MiddlewareException: Object adapter in Destroyed "
4673+ EXPECT_STREQ("unity::scopes::MiddlewareException: find(): Object adapter in Destroyed "
4674 "state (adapter: testscope)",
4675 e.what());
4676 }
4677@@ -1212,7 +1206,8 @@
4678 }
4679 catch (MiddlewareException const& e)
4680 {
4681- EXPECT_STREQ("unity::scopes::MiddlewareException: Object adapter in Failed state (adapter: testscope2)",
4682+ EXPECT_STREQ("unity::scopes::MiddlewareException: add_dflt_servant(): "
4683+ "Object adapter in Failed state (adapter: testscope2)",
4684 e.what());
4685 }
4686
4687@@ -1224,7 +1219,8 @@
4688 }
4689 catch (MiddlewareException const& e)
4690 {
4691- EXPECT_STREQ("unity::scopes::MiddlewareException: Object adapter in Failed state (adapter: testscope2)",
4692+ EXPECT_STREQ("unity::scopes::MiddlewareException: remove_dflt_servant(): "
4693+ "Object adapter in Failed state (adapter: testscope2)",
4694 e.what());
4695 }
4696
4697@@ -1236,7 +1232,8 @@
4698 }
4699 catch (MiddlewareException const& e)
4700 {
4701- EXPECT_STREQ("unity::scopes::MiddlewareException: Object adapter in Failed state (adapter: testscope2)",
4702+ EXPECT_STREQ("unity::scopes::MiddlewareException: find_dflt_servant(): "
4703+ "Object adapter in Failed state (adapter: testscope2)",
4704 e.what());
4705 }
4706 }
4707
4708=== modified file 'test/gtest/scopes/internal/zmq_middleware/RegistryI/RegistryI_test.cpp'
4709--- test/gtest/scopes/internal/zmq_middleware/RegistryI/RegistryI_test.cpp 2014-04-14 09:31:25 +0000
4710+++ test/gtest/scopes/internal/zmq_middleware/RegistryI/RegistryI_test.cpp 2014-04-28 05:57:56 +0000
4711@@ -404,7 +404,7 @@
4712 ///! TODO: HACK:
4713 /// we have to start scope-C and scope-D before starting scope-B here, as B aggregates C and D.
4714 /// (When re-binding logic is introduced, this will be unnecessary)
4715- scope_ids = {"scope-A", "scope-C", "scope-D", "scope-B", "scope-N", "scope-S"};
4716+ scope_ids = { {"scope-A", "scope-C", "scope-D", "scope-B", "scope-N", "scope-S"} };
4717 for (auto& scope_id : scope_ids)
4718 {
4719 proxies[scope_id] = ScopeImpl::create(mw->create_scope_proxy(scope_id), mw->runtime(), scope_id);
4720@@ -509,7 +509,7 @@
4721 // test locating all scopes
4722 TEST_F(RegistryTest, locate_all)
4723 {
4724- // locate all scopes (hense starting all scope processes)
4725+ // locate all scopes (hence starting all scope processes)
4726 for (auto const& scope_id : scope_ids)
4727 {
4728 EXPECT_EQ(proxies[scope_id], reg->locate(scope_id));
4729
4730=== added directory 'test/gtest/scopes/internal/zmq_middleware/StopPublisher'
4731=== added file 'test/gtest/scopes/internal/zmq_middleware/StopPublisher/CMakeLists.txt'
4732--- test/gtest/scopes/internal/zmq_middleware/StopPublisher/CMakeLists.txt 1970-01-01 00:00:00 +0000
4733+++ test/gtest/scopes/internal/zmq_middleware/StopPublisher/CMakeLists.txt 2014-04-28 05:57:56 +0000
4734@@ -0,0 +1,4 @@
4735+add_executable(StopPublisher_test StopPublisher_test.cpp)
4736+target_link_libraries(StopPublisher_test ${LIBS} ${TESTLIBS})
4737+
4738+add_test(StopPublisher StopPublisher_test)
4739
4740=== added file 'test/gtest/scopes/internal/zmq_middleware/StopPublisher/StopPublisher_test.cpp'
4741--- test/gtest/scopes/internal/zmq_middleware/StopPublisher/StopPublisher_test.cpp 1970-01-01 00:00:00 +0000
4742+++ test/gtest/scopes/internal/zmq_middleware/StopPublisher/StopPublisher_test.cpp 2014-04-28 05:57:56 +0000
4743@@ -0,0 +1,136 @@
4744+/*
4745+ * Copyright (C) 2014 Canonical Ltd
4746+ *
4747+ * This program is free software: you can redistribute it and/or modify
4748+ * it under the terms of the GNU Lesser General Public License version 3 as
4749+ * published by the Free Software Foundation.
4750+ *
4751+ * This program is distributed in the hope that it will be useful,
4752+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
4753+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
4754+ * GNU Lesser General Public License for more details.
4755+ *
4756+ * You should have received a copy of the GNU Lesser General Public License
4757+ * along with this program. If not, see <http://www.gnu.org/lzmqnses/>.
4758+ *
4759+ * Authored by: Michi Henning <michi.henning@canonical.com>
4760+ */
4761+
4762+#include <unity/scopes/internal/zmq_middleware/StopPublisher.h>
4763+#include <unity/scopes/ScopeExceptions.h>
4764+#include <zmqpp/poller.hpp>
4765+
4766+#include <gtest/gtest.h>
4767+
4768+using namespace std;
4769+using namespace unity::scopes;
4770+using namespace unity::scopes::internal::zmq_middleware;
4771+
4772+TEST(StopPublisher, basic)
4773+{
4774+ {
4775+ zmqpp::context c;
4776+ StopPublisher p(&c, "x");
4777+ EXPECT_EQ("inproc://x", p.endpoint());
4778+ }
4779+
4780+ {
4781+ zmqpp::context c;
4782+ StopPublisher p(&c, "x");
4783+ p.stop(); // no-op if stop() is called several times.
4784+ p.stop();
4785+ EXPECT_EQ("inproc://x", p.endpoint());
4786+ }
4787+}
4788+
4789+TEST(StopPublisher, exceptions)
4790+{
4791+ {
4792+ zmqpp::context c;
4793+ StopPublisher p(&c, "x");
4794+ p.stop();
4795+ try
4796+ {
4797+ p.subscribe();
4798+ FAIL();
4799+ }
4800+ catch (MiddlewareException const& e)
4801+ {
4802+ EXPECT_STREQ("unity::scopes::MiddlewareException: StopPublisher::subscribe(): "
4803+ "cannot subscribe to stopped publisher (endpoint: inproc://x)",
4804+ e.what());
4805+ }
4806+ }
4807+
4808+ {
4809+ zmqpp::context c;
4810+ StopPublisher p(&c, "x");
4811+ try
4812+ {
4813+ StopPublisher q(&c, "x"); // Second publisher at the same endpoint must fail
4814+ FAIL();
4815+ }
4816+ catch (MiddlewareException const& e)
4817+ {
4818+ EXPECT_STREQ("unity::scopes::MiddlewareException: StopPublisher(): publisher thread "
4819+ "failed (endpoint: inproc://x):\n"
4820+ " Address already in use",
4821+ e.what());
4822+ }
4823+ }
4824+}
4825+
4826+TEST(StopPublisher, subscribe)
4827+{
4828+ {
4829+ zmqpp::context c;
4830+ StopPublisher p(&c, "x");
4831+ auto socket = p.subscribe();
4832+ zmqpp::poller poller;
4833+ poller.add(socket);
4834+ poller.poll(100);
4835+ EXPECT_FALSE(poller.has_input(socket));
4836+ p.stop();
4837+ poller.poll(100);
4838+ EXPECT_TRUE(poller.has_input(socket));
4839+ }
4840+}
4841+
4842+void subscriber_thread(StopPublisher* p)
4843+{
4844+ auto socket = p->subscribe();
4845+ zmqpp::poller poller;
4846+ poller.add(socket);
4847+ poller.poll(); // Blocks indefinitely until socket is ready
4848+}
4849+
4850+void publisher_thread(unique_ptr<StopPublisher>)
4851+{
4852+ // Sleep for a while, so all the subscribers get a chance to block in poll().
4853+ this_thread::sleep_for(chrono::milliseconds(500));
4854+ // Destructor of unique_ptr in-param calls stop()
4855+}
4856+
4857+// Test that subscriber threads terminate correctly even if the destructor of the
4858+// publisher is called from a different thread.
4859+
4860+TEST(StopPublisher, threading)
4861+{
4862+ zmqpp::context c;
4863+ unique_ptr<StopPublisher> p(new StopPublisher(&c, "x"));
4864+ const int num_subscribers = 5;
4865+ vector<thread> subscribers;
4866+ for (int i = 0; i < num_subscribers; ++i)
4867+ {
4868+ subscribers.emplace_back(thread(subscriber_thread, p.get()));
4869+ }
4870+
4871+ thread publisher(publisher_thread, move(p)); // Moves publisher to publisher thread
4872+
4873+ for (auto& s : subscribers)
4874+ {
4875+ s.join();
4876+ }
4877+
4878+ publisher.join();
4879+}

Subscribers

People subscribed via source and target branches

to all changes: