Merge lp:~michihenning/unity-scopes-api/finished-status into lp:unity-scopes-api

Proposed by Michi Henning
Status: Merged
Approved by: Michal Hruby
Approved revision: 66
Merged at revision: 65
Proposed branch: lp:~michihenning/unity-scopes-api/finished-status
Merge into: lp:unity-scopes-api
Diff against target: 920 lines (+239/-85)
27 files modified
demo/client.cpp (+2/-2)
demo/scopes/scope-B/scope-B.cpp (+2/-2)
demo/scopes/scope-C/scope-C.cpp (+0/-2)
demo/scopes/scope-D/scope-D.cpp (+1/-1)
include/scopes/QueryBase.h (+2/-0)
include/scopes/ReceiverBase.h (+28/-4)
include/scopes/Registry.h (+1/-1)
include/scopes/Reply.h (+2/-1)
include/scopes/ScopeBase.h (+40/-28)
include/scopes/internal/MWReply.h (+2/-1)
include/scopes/internal/QueryObject.h (+4/-0)
include/scopes/internal/ReplyImpl.h (+2/-0)
include/scopes/internal/ReplyObject.h (+2/-2)
include/scopes/internal/zmq_middleware/ZmqReply.h (+1/-1)
src/QueryBase.cpp (+1/-1)
src/ReceiverBase.cpp (+19/-0)
src/ScopeBase.cpp (+5/-0)
src/internal/QueryCtrlImpl.cpp (+1/-1)
src/internal/QueryObject.cpp (+15/-2)
src/internal/ReplyImpl.cpp (+16/-5)
src/internal/ReplyObject.cpp (+13/-13)
src/internal/ScopeImpl.cpp (+1/-1)
src/internal/ScopeLoader.cpp (+1/-0)
src/internal/ScopeObject.cpp (+16/-7)
src/internal/zmq_middleware/ReplyI.cpp (+27/-2)
src/internal/zmq_middleware/ZmqReply.cpp (+27/-2)
src/internal/zmq_middleware/capnproto/Reply.capnp (+8/-6)
To merge this branch: bzr merge lp:~michihenning/unity-scopes-api/finished-status
Reviewer Review Type Date Requested Status
Michal Hruby (community) Approve
PS Jenkins bot (community) continuous-integration Approve
Review via email: mp+196221@code.launchpad.net

Commit message

Added reason for why a call to finished() is made (finished, cancelled, or error).
Fixed incorrect return value from push() after cancellation.
Lots of doc fixes and other minor tidy-up.

Description of the change

Added reason for why a call to finished() is made (finished, cancelled, or error).
Fixed incorrect return value from push() after cancellation.
Lots of doc fixes and other minor tidy-up.

To post a comment you must log in.
Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Needs Fixing (continuous-integration)
66. By Michi Henning

Fixed incomplete comment with trailing whitespace.

Revision history for this message
PS Jenkins bot (ps-jenkins) wrote :
review: Approve (continuous-integration)
Revision history for this message
Michal Hruby (mhr3) wrote :

LGTM, though I'd like to see tests for the entire scope search use case. But that needs more infrastructure in the tests, so I'm hoping that is going to be done in a later branch.

review: Approve
Revision history for this message
Michi Henning (michihenning) wrote :

I'm hoping for that too, yes :-)

But, seriously, yes. At the moment, the gtest unit tests are fine, but not useful when it comes to testing end-to-end behaviour between client and server. We need something that can fire up a server, a client, run some interactions, and report back.

In part, I've been testing things to a certain extent by hand cobbling messages together in the unit tests via capnproto, but this isn't a viable strategy for the longer term.

I need a day with 48 hours :-(

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'demo/client.cpp'
2--- demo/client.cpp 2013-11-21 10:32:19 +0000
3+++ demo/client.cpp 2013-11-22 04:36:14 +0000
4@@ -46,9 +46,9 @@
5 cout << "received result: uri=" << result.uri() << " title=" << result.title() << " category id: " << result.category()->id() << endl;
6 }
7
8- virtual void finished() override
9+ virtual void finished(ReceiverBase::Reason reason) override
10 {
11- cout << "query complete" << endl;
12+ cout << "query complete, status: " << to_string(reason) << endl;
13 {
14 unique_lock<decltype(mutex_)> lock(mutex_);
15 query_complete_ = true;
16
17=== modified file 'demo/scopes/scope-B/scope-B.cpp'
18--- demo/scopes/scope-B/scope-B.cpp 2013-11-21 10:34:49 +0000
19+++ demo/scopes/scope-B/scope-B.cpp 2013-11-22 04:36:14 +0000
20@@ -56,9 +56,9 @@
21 }
22 }
23
24- virtual void finished() override
25+ virtual void finished(Reason reason) override
26 {
27- cout << "query to " << scope_name_ << " complete" << endl;
28+ cout << "query to " << scope_name_ << " complete, status: " << to_string(reason) << endl;
29 }
30
31 Receiver(string const& scope_name, ReplyProxy const& upstream) :
32
33=== modified file 'demo/scopes/scope-C/scope-C.cpp'
34--- demo/scopes/scope-C/scope-C.cpp 2013-11-20 23:58:07 +0000
35+++ demo/scopes/scope-C/scope-C.cpp 2013-11-22 04:36:14 +0000
36@@ -191,7 +191,6 @@
37 }
38 for (int i = 1; i < 4; ++i)
39 {
40- cerr << "worker thread: pushing" << endl;
41 ResultItem result(cat);
42 result.set_uri("uri");
43 result.set_title("scope-C: result " + to_string(i) + " for query \"" + query + "\"");
44@@ -199,7 +198,6 @@
45 result.set_dnd_uri("dnd_uri");
46 if (!reply->push(result))
47 {
48- cerr << "worker thread: push returned false" << endl;
49 break; // Query was cancelled
50 }
51 sleep(1);
52
53=== modified file 'demo/scopes/scope-D/scope-D.cpp'
54--- demo/scopes/scope-D/scope-D.cpp 2013-11-20 23:58:07 +0000
55+++ demo/scopes/scope-D/scope-D.cpp 2013-11-22 04:36:14 +0000
56@@ -143,7 +143,7 @@
57 // Informational callback to let a query know when it was cancelled. The query should
58 // clean up any resources it has allocated, stop pushing results, and arrange for
59 // run() to return (if still active).
60- // Ignoring query cancelltion does not do any direct harm, but wastes CPU cycles: any
61+ // Ignoring query cancellation does not do any direct harm, but wastes CPU cycles: any
62 // results that are pushed once a query is cancelled are ignored anyway.
63 // The main purpose of this callback to give queries a chance to stop doing whatever
64 // work may still be in progress on a query. Note that cancellations are frequent;
65
66=== modified file 'include/scopes/QueryBase.h'
67--- include/scopes/QueryBase.h 2013-11-21 10:12:45 +0000
68+++ include/scopes/QueryBase.h 2013-11-22 04:36:14 +0000
69@@ -49,6 +49,8 @@
70
71 // Abstract server-side base interface for a query that is executed inside a scope.
72
73+// TODO: documentation
74+
75 class UNITY_API QueryBase : private util::NonCopyable
76 {
77 public:
78
79=== modified file 'include/scopes/ReceiverBase.h'
80--- include/scopes/ReceiverBase.h 2013-11-21 10:18:43 +0000
81+++ include/scopes/ReceiverBase.h 2013-11-22 04:36:14 +0000
82@@ -22,6 +22,7 @@
83 #include <unity/util/DefinesPtrs.h>
84 #include <unity/util/NonCopyable.h>
85 #include <scopes/Category.h>
86+#include <scopes/ReceiverBase.h>
87
88 #include <string>
89
90@@ -40,9 +41,13 @@
91 \brief Abstract base class to receive the results of a query.
92 TODO: fix doc
93 The scope application code must instantiate a class derived from ReceiverBase and pass that instance as
94-a parameter to the ScopeProxy::query() method. Once a query is sent, the scopes run time repeatedly
95+a parameter to the Scope::query() method. Once a query is sent, the scopes run time repeatedly
96 invokes the push() method, once for each result returned by the query. Once a query is complete,
97 the finished() method is called once, to inform the caller that the query is complete.
98+
99+Calls to push() and finished() are made by an arbitrary thread.
100+
101+// TODO: add doc for thread pool and concurrent calls to push()
102 */
103
104 class UNITY_API ReceiverBase : private util::NonCopyable
105@@ -61,15 +66,28 @@
106
107 /**
108 \brief Called once by the scopes run time for each category that is returned by a query().
109- Default implementation does nothing.
110+ The default implementation does nothing. Receipt of categories may be interleaved with
111+ the receipt of results, that is, there is no guarantee that the complete set of categories
112+ will be provided before the first query result.
113+
114+ If push() throws an exception, the scopes run time calls finished() with an 'Error' reason.
115 */
116-
117 virtual void push(Category::SCPtr category);
118
119 /**
120+ \brief Indicates the cause of a call to finished().
121+ The Error enumerator indicates that a query terminated abnormally, for example,
122+ because a scope could not be reached over the network or terminated an query
123+ abnormally.
124+ */
125+ enum Reason { Finished, Cancelled, Error };
126+
127+ /**
128 \brief Called once by the scopes run time after the final result for a query() was sent.
129+ Exceptions thrown from finished() are ignored.
130+ \param r Indicates the cause for the call to finished().
131 */
132- virtual void finished() = 0;
133+ virtual void finished(Reason r) = 0;
134
135 protected:
136 /// @cond
137@@ -77,6 +95,12 @@
138 /// @endcond
139 };
140
141+/**
142+\brief Convenience function to convert a ReceiverBase::Reason enumerator to a string.
143+\return Possible return values are "finished", "cancelled", and "error".
144+*/
145+char const* to_string(ReceiverBase::Reason reason);
146+
147 } // namespace scopes
148
149 } // namespace api
150
151=== modified file 'include/scopes/Registry.h'
152--- include/scopes/Registry.h 2013-11-20 12:33:42 +0000
153+++ include/scopes/Registry.h 2013-11-22 04:36:14 +0000
154@@ -58,7 +58,7 @@
155
156 /**
157 \brief Returns a ScopeProxy for the scope with the given name.
158- @return If no scope with the given name exists, the returned `shared_ptr` stores a `nullptr`.
159+ @return If no scope with the given name exists, find() throws NotFoundException.
160 */
161 ScopeProxy find(std::string const& scope_name) const;
162
163
164=== modified file 'include/scopes/Reply.h'
165--- include/scopes/Reply.h 2013-11-20 12:33:42 +0000
166+++ include/scopes/Reply.h 2013-11-22 04:36:14 +0000
167@@ -101,7 +101,8 @@
168 friend class internal::ReplyImpl;
169
170 std::unique_ptr<internal::ReplyImpl> p;
171- std::shared_ptr<internal::QueryObject> qo; // Used to reference
172+ std::shared_ptr<internal::QueryObject> qo; // Points at the corresponding QueryObject, so we can
173+ // forward cancellation.
174 };
175
176 } // namespace scopes
177
178=== modified file 'include/scopes/ScopeBase.h'
179--- include/scopes/ScopeBase.h 2013-11-01 12:44:48 +0000
180+++ include/scopes/ScopeBase.h 2013-11-22 04:36:14 +0000
181@@ -67,7 +67,7 @@
182 \class ScopeBase
183 \brief Base class for a scope implementation.
184
185-Scopes are accessed by the Unity run time as a shared library (one library per scope).
186+Scopes are accessed by the scopes run time as a shared library (one library per scope).
187 Each scope must implement a class that derives from ScopeBase, for example:
188
189 ~~~
190@@ -79,9 +79,9 @@
191 MyScope();
192 virtual ~MyScope();
193
194- virtual int start();
195- virtual void stop();
196- virtual void run();
197+ virtual int start(); // Mandatory
198+ virtual void stop(); // Mandatory
199+ virtual void run(); // Optional
200 };
201 ~~~
202
203@@ -90,10 +90,10 @@
204 - a create function that must return a pointer to the derived instance
205 - a destroy function that is passed the pointer returned by the create function
206
207-Typically, the create and destroy functions will simply call new and delete, respectively. (However,
208-there is no requirement that the derived class instance must be heap allocated.)
209+Typically, the create and destroy functions will simply call `new` and `delete`, respectively. (However,
210+there is no requirement that the derived class instance must be heap-allocated.)
211 If the create function throws an exception, the destroy function will not be called. If the create function returns
212-NULL, the destroy function will be called with NULL as its argument.
213+NULL, the destroy function _will_ be called with NULL as its argument.
214
215 Rather than hard-coding the names of the functions, use the #UNITY_API_SCOPE_CREATE_FUNCTION and
216 #UNITY_API_SCOPE_DESTROY_FUNCTION macros, for example:
217@@ -112,10 +112,14 @@
218 }
219 ~~~
220
221-After the Unity run time has obtained a pointer to the class instance from the create function, it calls start(),
222-which allows the scope to intialize itself. Once start() returns, incoming query requests are dispatched to the scope.
223+After the scopes run time has obtained a pointer to the class instance from the create function, it calls start(),
224+which allows the scope to intialize itself. This is followed by call to run(). The call to run() is made by
225+a separate thread; its only purpose is to pass a thread of control to the scope, for example, to run an event loop.
226 When the scope should complete its activities, the run time calls stop(). The calls to the create function, start(),
227-stop(), and the destroy function) are made by the same thread. The call to run() is made by a _different_ thread.
228+stop(), and the destroy function) are made by the same thread.
229+
230+The scope implementation, if it does not return from run(), is expected to return from run() in response to a
231+call to stop() in a timely manner.
232 */
233
234 class UNITY_API ScopeBase : private util::NonCopyable
235@@ -131,7 +135,7 @@
236 static constexpr int VERSION = UNITY_SCOPES_VERSION_MAJOR;
237
238 /**
239- \brief Called by the Unity run time after initialize() completes.
240+ \brief Called by the scopes run time after the create function completes.
241 If start() throws an exception, stop() will _not_ be called.
242
243 The call to start() is made by the same thread that calls the create function.
244@@ -141,16 +145,17 @@
245 \param registry A proxy to the scope registry. This parameter is provided for aggregating
246 scopes that need to retrieve proxies to their child scopes.
247
248- \return Any return value other than ScopeBase::VERSION will cause the Unity run time
249+ \return Any return value other than ScopeBase::VERSION will cause the scopes run time
250 to refuse to load the scope. The return value is used to ensure that the shared library
251- containing the scope is ABI compatible with the Unity scopes run time.
252+ containing the scope is ABI compatible with the scopes scopes run time.
253 */
254 virtual int start(std::string const& scope_name, RegistryProxy const& registry) = 0;
255
256 /**
257- \brief Called by the Unity run time when the scope should shut down.
258+ \brief Called by the scopes run time when the scope should shut down.
259 A scope should deallocate as many resources as possible when stop() is called, for example,
260- deallocate any caches and close network connections.
261+ deallocate any caches and close network connections. In addition, if the scope implements run()
262+ and did not return from run(), it must return from run() in response to the call to stop().
263
264 Exceptions from stop() are ignored.
265
266@@ -159,19 +164,24 @@
267 virtual void stop() = 0;
268
269 /**
270- \brief Called by the Unity run time to hand a thread of control to the scope.
271+ \brief Called by the scopes run time after it has called start() to hand a thread of control to the scope.
272 run() passes a thread of control to the scope to do with as it sees fit, for example, to run an event loop.
273-
274- If run() throws an exception, stop() _will_ be called.
275-
276- The call to run() is made by a separate thread (not the thread that calls the create function and start()).
277+ During finalization, the scopes run time joins with the thread that called run(). This means that, if
278+ the scope implementation does not return from run(), it is expected to arrange for run() to complete
279+ in timely manner in response to a call to stop(). Failure to do so will cause deadlock during finalization.
280+
281+ If run() throws an exception, the run time handles the exception and calls stop() in response.
282 */
283- virtual void run() = 0;
284+ virtual void run();
285
286 /**
287- \brief Called by the Unity run time when a scope needs to instantiate a query.
288- TODO: complete documentation.
289-
290+ \brief Called by the scopes run time when a scope needs to instantiate a query.
291+ This method must return an instance that is derived from QueryBase. The implementation
292+ of this method must return in a timely manner, that is, it should perform only minimal
293+ initialization that is guaranteed to complete quickly. That call to create_query() is made
294+ by an arbitrary thread.
295+ /param q The query string to be executed by the returned object instance.
296+ /param hints TODO, complete doc
297 */
298 virtual QueryBase::UPtr create_query(std::string const& q, VariantMap const& hints) = 0;
299
300@@ -193,16 +203,18 @@
301 } // namespace unity
302
303 /**
304-\brief The function called by the Unity run time to initialize the scope.
305-It must return a pointer to a ScopeBase instance. The returned instance need not be heap-allocated.
306+\brief The function called by the scopes run time to initialize the scope.
307+It must return a pointer to an instance derived from ScopeBase. The returned
308+instance need not be heap-allocated, but must remain in scope until the
309+destroy function is called by the scopes run time.
310
311 If this function throws an exception, the destroy function will _not_ be called. If this function returns NULL,
312-the destroy function _will_be called with NULL as its argument.
313+the destroy function _will_ be called with NULL as its argument.
314 */
315 extern "C" unity::api::scopes::ScopeBase* UNITY_API_SCOPE_CREATE_FUNCTION();
316
317 /**
318-\brief The function called by the Unity run time to finalize the scope.
319+\brief The function called by the scopes run time to finalize the scope.
320 The passed pointer is the pointer that was returned by the create function.
321
322 Exceptions thrown by the destroy function are ignored.
323
324=== modified file 'include/scopes/internal/MWReply.h'
325--- include/scopes/internal/MWReply.h 2013-11-19 11:46:14 +0000
326+++ include/scopes/internal/MWReply.h 2013-11-22 04:36:14 +0000
327@@ -21,6 +21,7 @@
328
329 #include <scopes/internal/MWObjectProxy.h>
330 #include <scopes/internal/MWReplyProxyFwd.h>
331+#include <scopes/ReceiverBase.h>
332 #include <scopes/Variant.h>
333
334 #include <string>
335@@ -43,7 +44,7 @@
336 virtual ~MWReply() noexcept;
337
338 virtual void push(VariantMap const& result) = 0;
339- virtual void finished() = 0;
340+ virtual void finished(ReceiverBase::Reason reason) = 0;
341
342 protected:
343 MWReply(MiddlewareBase* mw_base);
344
345=== modified file 'include/scopes/internal/QueryObject.h'
346--- include/scopes/internal/QueryObject.h 2013-11-01 12:44:48 +0000
347+++ include/scopes/internal/QueryObject.h 2013-11-22 04:36:14 +0000
348@@ -24,6 +24,7 @@
349 #include <scopes/internal/MWQueryCtrlProxyFwd.h>
350 #include <scopes/ReplyProxyFwd.h>
351
352+#include <atomic>
353 #include <mutex>
354
355 namespace unity
356@@ -61,6 +62,8 @@
357 // Called locally only, by QueryCtrlObject.
358 void cancel();
359
360+ bool pushable() const noexcept; // Called locallly only, by ReplyImpl
361+
362 // Called by create_query(), to hold the reference count high until the run call arrives via the middleware,
363 // and we can pass the shared_ptr to the ReplyImpl.
364 void set_self(SPtr const& self);
365@@ -70,6 +73,7 @@
366 MWReplyProxy reply_;
367 std::weak_ptr<Reply> reply_proxy_;
368 MWQueryCtrlProxy const ctrl_;
369+ std::atomic_bool pushable_;
370 SPtr self_;
371 };
372
373
374=== modified file 'include/scopes/internal/ReplyImpl.h'
375--- include/scopes/internal/ReplyImpl.h 2013-11-19 11:46:14 +0000
376+++ include/scopes/internal/ReplyImpl.h 2013-11-22 04:36:14 +0000
377@@ -23,6 +23,7 @@
378 #include <scopes/internal/CategoryRegistry.h>
379 #include <scopes/ReplyProxyFwd.h>
380 #include <scopes/Category.h>
381+#include <scopes/ReceiverBase.h>
382
383 #include <atomic>
384
385@@ -60,6 +61,7 @@
386
387 bool push(unity::api::scopes::ResultItem const& result);
388 void finished();
389+ void finished(unity::api::scopes::ReceiverBase::Reason reason);
390
391 static ReplyProxy create(MWReplyProxy const& mw_proxy, std::shared_ptr<QueryObject> const& qo);
392
393
394=== modified file 'include/scopes/internal/ReplyObject.h'
395--- include/scopes/internal/ReplyObject.h 2013-11-21 10:12:45 +0000
396+++ include/scopes/internal/ReplyObject.h 2013-11-22 04:36:14 +0000
397@@ -56,10 +56,10 @@
398
399 // Remote operation implementations
400 void push(VariantMap const& result) noexcept;
401- void finished() noexcept;
402+ void finished(ReceiverBase::Reason reason) noexcept;
403
404 private:
405- ReceiverBase::SPtr const reply_base_;
406+ ReceiverBase::SPtr const receiver_base_;
407 ReapItem::SPtr reap_item_;
408 std::shared_ptr<CategoryRegistry> cat_registry_;
409 std::atomic_bool finished_;
410
411=== modified file 'include/scopes/internal/zmq_middleware/ZmqReply.h'
412--- include/scopes/internal/zmq_middleware/ZmqReply.h 2013-11-19 11:46:14 +0000
413+++ include/scopes/internal/zmq_middleware/ZmqReply.h 2013-11-22 04:36:14 +0000
414@@ -45,7 +45,7 @@
415 virtual ~ZmqReply() noexcept;
416
417 virtual void push(VariantMap const& result) override;
418- virtual void finished() override;
419+ virtual void finished(ReceiverBase::Reason reason) override;
420 };
421
422 } // namespace zmq_middleware
423
424=== modified file 'src/QueryBase.cpp'
425--- src/QueryBase.cpp 2013-11-21 10:12:45 +0000
426+++ src/QueryBase.cpp 2013-11-22 04:36:14 +0000
427@@ -50,7 +50,7 @@
428
429 void QueryBase::cancel()
430 {
431- p->cancel(); // Forward cancel to subquery
432+ p->cancel(); // Forward cancel to subqueries
433 cancelled(); // Inform this query that it was cancelled
434 }
435
436
437=== modified file 'src/ReceiverBase.cpp'
438--- src/ReceiverBase.cpp 2013-11-21 10:12:45 +0000
439+++ src/ReceiverBase.cpp 2013-11-22 04:36:14 +0000
440@@ -18,6 +18,9 @@
441
442 #include <scopes/ReceiverBase.h>
443
444+#include <cassert>
445+#include <unordered_map>
446+
447 using namespace std;
448
449 namespace unity
450@@ -43,6 +46,22 @@
451 {
452 }
453
454+// Possibly overkill, but safer than using the enum as the index into an array,
455+// in case the enumeration is ever added to or the enumerators get re-ordered.
456+
457+static unordered_map<int, char const*> const reasons =
458+{
459+ pair<int, char const*>(static_cast<int>(ReceiverBase::Finished), "finished"),
460+ pair<int, char const*>(static_cast<int>(ReceiverBase::Cancelled), "cancelled"),
461+ pair<int, char const*>(static_cast<int>(ReceiverBase::Error), "error")
462+};
463+
464+char const* to_string(ReceiverBase::Reason reason)
465+{
466+ assert(reasons.find(static_cast<int>(reason)) != reasons.end());
467+ return reasons.find(static_cast<int>(reason))->second;
468+}
469+
470 //! @endcond
471
472 } // namespace scopes
473
474=== modified file 'src/ScopeBase.cpp'
475--- src/ScopeBase.cpp 2013-11-01 12:44:48 +0000
476+++ src/ScopeBase.cpp 2013-11-22 04:36:14 +0000
477@@ -39,6 +39,11 @@
478
479 //! @endcond
480
481+void ScopeBase::run()
482+{
483+ // Intentionally empty: default "do nothing" implementation.
484+}
485+
486 void ScopeBase::runtime_version(int& v_major, int& v_minor, int& v_micro) noexcept
487 {
488 v_major = unity::api::scopes::major_version();
489
490=== modified file 'src/internal/QueryCtrlImpl.cpp'
491--- src/internal/QueryCtrlImpl.cpp 2013-11-01 12:44:48 +0000
492+++ src/internal/QueryCtrlImpl.cpp 2013-11-22 04:36:14 +0000
493@@ -65,7 +65,7 @@
494 // Indicate (to ourselves) that this query is complete. Calling via the MWReplyProxy ensures
495 // the finished() call will be processed by a seperate server-side thread,
496 // so we cannot block here.
497- reply_proxy_->finished();
498+ reply_proxy_->finished(ReceiverBase::Cancelled);
499 }
500 catch (MiddlewareException const& e)
501 {
502
503=== modified file 'src/internal/QueryObject.cpp'
504--- src/internal/QueryObject.cpp 2013-11-01 12:44:48 +0000
505+++ src/internal/QueryObject.cpp 2013-11-22 04:36:14 +0000
506@@ -48,7 +48,8 @@
507 MWQueryCtrlProxy const& ctrl) :
508 query_base_(query_base),
509 reply_(reply),
510- ctrl_(ctrl)
511+ ctrl_(ctrl),
512+ pushable_(true)
513 {
514 assert(query_base);
515 assert(reply);
516@@ -91,21 +92,28 @@
517 }
518 catch (unity::Exception const& e)
519 {
520+ pushable_ = false;
521+ reply_->finished(ReceiverBase::Error); // Oneway, can't block
522 // TODO: log error
523 }
524 catch (...)
525 {
526+ pushable_ = false;
527+ reply_->finished(ReceiverBase::Error); // Oneway, can't block
528 // TODO: log error
529 }
530 }
531
532 void QueryObject::cancel()
533 {
534+ pushable_ = false;
535 auto rp = reply_proxy_.lock();
536 if (rp)
537 {
538 // Send finished() to up-stream client to tell him the query is done.
539- rp->finished(); // Oneway, can't block
540+ // We send via the MWReplyProxy here because that allows passing
541+ // a ReceiverBase::Reason (whereas the public ReplyProxy does not).
542+ reply_->finished(ReceiverBase::Cancelled); // Oneway, can't block
543 }
544
545 // Forward the cancellation to the query base (which in turn will forward it to any subqueries).
546@@ -113,6 +121,11 @@
547 query_base_->cancel();
548 }
549
550+bool QueryObject::pushable() const noexcept
551+{
552+ return pushable_;
553+}
554+
555 // The point of keeping a shared_ptr to ourselves is to make sure this QueryObject cannot
556 // go out of scope in between being created by the Scope, and the first ReplyProxy for this
557 // query being created in QueryObject::run(). If the scope's run() method returns immediately,
558
559=== modified file 'src/internal/ReplyImpl.cpp'
560--- src/internal/ReplyImpl.cpp 2013-11-19 11:46:14 +0000
561+++ src/internal/ReplyImpl.cpp 2013-11-22 04:36:14 +0000
562@@ -16,11 +16,12 @@
563 * Authored by: Michi Henning <michi.henning@canonical.com>
564 */
565
566+#include <scopes/internal/ReplyImpl.h>
567+
568 #include <scopes/internal/MiddlewareBase.h>
569 #include <scopes/internal/MWReply.h>
570 #include <scopes/internal/RuntimeImpl.h>
571 #include <scopes/internal/ResultItemImpl.h>
572-#include <scopes/internal/ReplyImpl.h>
573 #include <scopes/ResultItem.h>
574 #include <scopes/ScopeExceptions.h>
575 #include <unity/UnityExceptions.h>
576@@ -91,8 +92,8 @@
577
578 bool ReplyImpl::push(unity::api::scopes::ResultItem const& result)
579 {
580- // if this is an aggregator scope, it may be pushing result items obtained
581- // from ReplyObject without registering category first.
582+ // If this is an aggregator scope, it may be pushing result items obtained
583+ // from ReplyObject without registering a category first.
584 auto cat = cat_registry_->lookup_category(result.category()->id());
585 if (cat == nullptr)
586 {
587@@ -115,6 +116,11 @@
588
589 bool ReplyImpl::push(VariantMap const& variant_map)
590 {
591+ if (!qo_->pushable())
592+ {
593+ return false; // Query was cancelled or had an error.
594+ }
595+
596 if (!finished_.load())
597 {
598 try
599@@ -125,7 +131,7 @@
600 catch (MiddlewareException const& e)
601 {
602 // TODO: log error
603- finished();
604+ finished(ReceiverBase::Error);
605 return false;
606 }
607 }
608@@ -134,11 +140,16 @@
609
610 void ReplyImpl::finished()
611 {
612+ finished(ReceiverBase::Finished);
613+}
614+
615+void ReplyImpl::finished(ReceiverBase::Reason reason)
616+{
617 if (!finished_.exchange(true))
618 {
619 try
620 {
621- mw_proxy_->finished();
622+ mw_proxy_->finished(reason);
623 }
624 catch (MiddlewareException const& e)
625 {
626
627=== modified file 'src/internal/ReplyObject.cpp'
628--- src/internal/ReplyObject.cpp 2013-11-21 10:43:04 +0000
629+++ src/internal/ReplyObject.cpp 2013-11-22 04:36:14 +0000
630@@ -41,13 +41,13 @@
631 namespace internal
632 {
633
634-ReplyObject::ReplyObject(ReceiverBase::SPtr const& reply_base, RuntimeImpl const* runtime) :
635- reply_base_(reply_base),
636+ReplyObject::ReplyObject(ReceiverBase::SPtr const& receiver_base, RuntimeImpl const* runtime) :
637+ receiver_base_(receiver_base),
638 cat_registry_(new CategoryRegistry()),
639 finished_(false),
640 num_push_(0)
641 {
642- assert(reply_base);
643+ assert(receiver_base);
644 assert(runtime);
645 reap_item_ = runtime->reply_reaper()->add([this] { this->disconnect(); });
646 }
647@@ -56,7 +56,7 @@
648 {
649 try
650 {
651- finished();
652+ finished(ReceiverBase::Finished);
653 }
654 catch (...)
655 {
656@@ -90,14 +90,14 @@
657 unique_lock<mutex> lock(mutex_);
658 assert(num_push_ >= 0);
659 ++num_push_;
660- lock.unlock();
661+ lock.unlock(); // Forward invocations to application outside synchronization
662 try
663 {
664 auto it = result.find("category");
665 if (it != result.end())
666 {
667 auto cat = cat_registry_->register_category(it->second.get_dict());
668- reply_base_->push(cat);
669+ receiver_base_->push(cat);
670 }
671
672 it = result.find("result");
673@@ -109,12 +109,12 @@
674 if (cat == nullptr)
675 {
676 // TODO: this is an internal error; log error
677- finished();
678+ finished(ReceiverBase::Error);
679 }
680 else
681 {
682 ResultItem result_item(cat, result_var);
683- reply_base_->push(std::move(result_item)); // Forward the result to the application code outside synchronization.
684+ receiver_base_->push(std::move(result_item));
685 }
686 }
687 }
688@@ -123,7 +123,7 @@
689 // TODO: log error
690 try
691 {
692- finished();
693+ finished(ReceiverBase::Error);
694 }
695 catch (...)
696 {
697@@ -134,7 +134,7 @@
698 // TODO: log error
699 try
700 {
701- finished();
702+ finished(ReceiverBase::Error);
703 }
704 catch (...)
705 {
706@@ -147,7 +147,7 @@
707 }
708 }
709
710-void ReplyObject::finished() noexcept
711+void ReplyObject::finished(ReceiverBase::Reason r) noexcept
712 {
713 // We permit exactly one finished() call for a query. This avoids
714 // a race condition where the executing down-stream query invokes
715@@ -167,10 +167,10 @@
716 unique_lock<mutex> lock(mutex_);
717 assert(num_push_ >= 0);
718 idle_.wait(lock, [this] { return num_push_ == 0; });
719- lock.unlock();
720+ lock.unlock(); // Inform the application code that the query is complete outside synchronization.
721 try
722 {
723- reply_base_->finished(); // Inform the application code that the query is complete outside synchronization.
724+ receiver_base_->finished(r);
725 }
726 catch (unity::Exception const& e)
727 {
728
729=== modified file 'src/internal/ScopeImpl.cpp'
730--- src/internal/ScopeImpl.cpp 2013-11-21 10:12:45 +0000
731+++ src/internal/ScopeImpl.cpp 2013-11-22 04:36:14 +0000
732@@ -76,7 +76,7 @@
733 {
734 // TODO: if things go wrong, we need to make sure that the reply object
735 // is disconnected from the middleware, so it gets deallocated.
736- reply->finished();
737+ reply->finished(ReceiverBase::Error);
738 throw;
739 }
740 catch (...)
741
742=== modified file 'src/internal/ScopeLoader.cpp'
743--- src/internal/ScopeLoader.cpp 2013-11-01 12:44:48 +0000
744+++ src/internal/ScopeLoader.cpp 2013-11-22 04:36:14 +0000
745@@ -452,6 +452,7 @@
746 }
747 catch (...)
748 {
749+ // TODO: log this
750 // Run throws, we remember the exception and send a stop command to ourselves, so the stop()
751 // callback on the scope will still be called.
752 {
753
754=== modified file 'src/internal/ScopeObject.cpp'
755--- src/internal/ScopeObject.cpp 2013-11-18 23:53:56 +0000
756+++ src/internal/ScopeObject.cpp 2013-11-22 04:36:14 +0000
757@@ -76,12 +76,21 @@
758 }
759
760 // Ask scope to instantiate a new query.
761- QueryBase::SPtr query_base = scope_base_->create_query(q, hints);
762- if (!query_base)
763- {
764- // TODO: log error, scope returned null pointer.
765+ QueryBase::SPtr query_base;
766+ try
767+ {
768+ query_base = scope_base_->create_query(q, hints);
769+ if (!query_base)
770+ {
771+ // TODO: log error, scope returned null pointer.
772+ throw ResourceException("Scope \"" + runtime_->scope_name() +
773+ "\" returned nullptr from create_query(\"" + q + "\")");
774+ }
775+ }
776+ catch (...)
777+ {
778 throw ResourceException("Scope \"" + runtime_->scope_name() +
779- "\" returned nullptr from create_query(\"" + q + "\")");
780+ "\" threw an exception from create_query(\"" + q + "\")");
781 }
782
783 MWQueryCtrlProxy ctrl_proxy;
784@@ -113,7 +122,7 @@
785 {
786 try
787 {
788- reply->finished();
789+ reply->finished(ReceiverBase::Error);
790 }
791 catch (...)
792 {
793@@ -125,7 +134,7 @@
794 {
795 try
796 {
797- reply->finished();
798+ reply->finished(ReceiverBase::Error);
799 }
800 catch (...)
801 {
802
803=== modified file 'src/internal/zmq_middleware/ReplyI.cpp'
804--- src/internal/zmq_middleware/ReplyI.cpp 2013-11-08 15:00:22 +0000
805+++ src/internal/zmq_middleware/ReplyI.cpp 2013-11-22 04:36:14 +0000
806@@ -74,11 +74,36 @@
807 }
808
809 void ReplyI::finished_(Current const&,
810- capnp::ObjectPointer::Reader&,
811+ capnp::ObjectPointer::Reader& in_params,
812 capnproto::Response::Builder&)
813 {
814 auto delegate = dynamic_pointer_cast<ReplyObject>(del());
815- delegate->finished();
816+ auto req = in_params.getAs<capnproto::Reply::FinishedRequest>();
817+ auto r = req.getReason();
818+ ReceiverBase::Reason reason;
819+ switch (r)
820+ {
821+ case capnproto::Reply::FinishedReason::FINISHED:
822+ {
823+ reason = ReceiverBase::Finished;
824+ break;
825+ }
826+ case capnproto::Reply::FinishedReason::CANCELLED:
827+ {
828+ reason = ReceiverBase::Cancelled;
829+ break;
830+ }
831+ case capnproto::Reply::FinishedReason::ERROR:
832+ {
833+ reason = ReceiverBase::Error;
834+ break;
835+ }
836+ default:
837+ {
838+ assert(false); // LCOV_EXCL_LINE
839+ }
840+ }
841+ delegate->finished(reason);
842 }
843
844 } // namespace zmq_middleware
845
846=== modified file 'src/internal/zmq_middleware/ZmqReply.cpp'
847--- src/internal/zmq_middleware/ZmqReply.cpp 2013-11-19 11:46:14 +0000
848+++ src/internal/zmq_middleware/ZmqReply.cpp 2013-11-22 04:36:14 +0000
849@@ -74,10 +74,35 @@
850 future.wait();
851 }
852
853-void ZmqReply::finished()
854+void ZmqReply::finished(ReceiverBase::Reason reason)
855 {
856 capnp::MallocMessageBuilder request_builder;
857- make_request_(request_builder, "finished");
858+ auto request = make_request_(request_builder, "finished");
859+ auto in_params = request.initInParams().getAs<capnproto::Reply::FinishedRequest>();
860+ capnproto::Reply::FinishedReason r;
861+ switch (reason)
862+ {
863+ case ReceiverBase::Finished:
864+ {
865+ r = capnproto::Reply::FinishedReason::FINISHED;
866+ break;
867+ }
868+ case ReceiverBase::Cancelled:
869+ {
870+ r = capnproto::Reply::FinishedReason::CANCELLED;
871+ break;
872+ }
873+ case ReceiverBase::Error:
874+ {
875+ r = capnproto::Reply::FinishedReason::ERROR;
876+ break;
877+ }
878+ default:
879+ {
880+ assert(false);
881+ }
882+ }
883+ in_params.setReason(r);
884
885 auto future = mw_base()->invoke_pool()->submit([&] { return this->invoke_(request_builder); });
886 future.wait();
887
888=== modified file 'src/internal/zmq_middleware/capnproto/Reply.capnp'
889--- src/internal/zmq_middleware/capnproto/Reply.capnp 2013-11-04 14:10:29 +0000
890+++ src/internal/zmq_middleware/capnproto/Reply.capnp 2013-11-22 04:36:14 +0000
891@@ -35,21 +35,23 @@
892 # Operations:
893 #
894 # void push(string result);
895-# void finished();
896+# enum FinishedReason { Finished, Cancelled, Error };
897+# void finished(Reason r);
898
899 struct PushRequest
900 {
901 result @0 : ValueDict.ValueDict;
902 }
903
904-struct PushResponse
905+enum FinishedReason
906 {
907+ unused @0;
908+ finished @1;
909+ cancelled @2;
910+ error @3;
911 }
912
913 struct FinishedRequest
914 {
915-}
916-
917-struct FinishedResponse
918-{
919+ reason @0 : FinishedReason;
920 }

Subscribers

People subscribed via source and target branches

to all changes: