Merge lp:~marcustomlinson/unity-scopes-api/rebinding_logic into lp:unity-scopes-api/devel

Proposed by Marcus Tomlinson
Status: Superseded
Proposed branch: lp:~marcustomlinson/unity-scopes-api/rebinding_logic
Merge into: lp:unity-scopes-api/devel
Diff against target: 671 lines (+141/-87)
10 files modified
include/unity/scopes/internal/zmq_middleware/ZmqMiddleware.h (+2/-1)
include/unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h (+4/-0)
scoperegistry/scoperegistry.cpp (+0/-21)
src/scopes/internal/RuntimeImpl.cpp (+15/-1)
src/scopes/internal/zmq_middleware/ZmqMiddleware.cpp (+60/-50)
src/scopes/internal/zmq_middleware/ZmqObject.cpp (+32/-1)
test/gtest/scopes/Runtime/Runtime_test.cpp (+1/-1)
test/gtest/scopes/internal/ScopeMetadataImpl/ScopeMetadataImpl_test.cpp (+9/-4)
test/gtest/scopes/internal/zmq_middleware/ObjectAdapter/ObjectAdapter_test.cpp (+15/-7)
test/gtest/scopes/internal/zmq_middleware/ZmqMiddleware/ZmqMiddleware_test.cpp (+3/-1)
To merge this branch: bzr merge lp:~marcustomlinson/unity-scopes-api/rebinding_logic
Reviewer Review Type Date Requested Status
Unity Team Pending
Review via email: mp+214221@code.launchpad.net
To post a comment you must log in.
296. By Marcus Tomlinson

Merged devel

297. By Marcus Tomlinson

Fixing tests (can't contruct a ZmqObjectProxy with an invalid runtime pointer)

298. By Marcus Tomlinson

On locate() a scope can call registry methods during its start() method. Ensure the registry adapter has enough threads available to handle this.

299. By Marcus Tomlinson

Merged devel

300. By Marcus Tomlinson

Merged send_state_in_run_scope

301. By Marcus Tomlinson

Upped middleware timeout for locate to allow enough time for the locate logic to execute.

302. By Marcus Tomlinson

Updated locate test for RegistryI to execute an indirect locate call via rebinding.

303. By Marcus Tomlinson

Merged devel

304. By Marcus Tomlinson

Fixed ScopeMetadataImpl, ObjectAdapter, and ZmqMiddleware tests

305. By Marcus Tomlinson

Added cat_registry_mutex_ in order to avoid pushing a result in parallel to it's category being registered.

306. By Marcus Tomlinson

Temporary test hack to see if tests pass in CI

307. By Marcus Tomlinson

Added RaiiScopeThread structs to Activation_test and Runtime_test to ensure each test is run in isolation

308. By Marcus Tomlinson

Ensure that the state receiver is ready to process requests as soon as the registry is added to the middleware.

309. By Marcus Tomlinson

Simplified RaiiScopeThread

310. By Marcus Tomlinson

If there are pushes still active when finished() is called, give them a second to complete before destroying the ReplyObject.

311. By Marcus Tomlinson

Merged devel

312. By Marcus Tomlinson

Fixed synchronization race between client and server copies of the category register in SearchReplyImpl and ResultReplyObject.

313. By Marcus Tomlinson

Fixed comment

314. By Marcus Tomlinson

If there are pushes still active when finished() is called, give them a second to complete before destroying the ReplyObject.

315. By Marcus Tomlinson

Make SearchListenerBase implementations thread safe in tests.

316. By Marcus Tomlinson

Make PreviewListenerBase implementations thread safe in tests.

317. By Marcus Tomlinson

Wait for active pushes to complete on finished() logic moved to ReplyImpl

318. By Marcus Tomlinson

Wait for active pushes to complete on finished() logic moved to ReplyI

319. By Marcus Tomlinson

Remove test code

320. By Marcus Tomlinson

Removed more test code

321. By Marcus Tomlinson

Merged devel and fixed conflicts

322. By Marcus Tomlinson

Removed unnecessary lock_guards from push() implementations in tests

323. By Marcus Tomlinson

Some more cleaning up

324. By Marcus Tomlinson

Check for null pointers when assigning registry_ in the ZmqObject constructor

325. By Marcus Tomlinson

Expect pushes to occur in the correct order in the smartscopesproxy search test

326. By Marcus Tomlinson

Revert changes made to SearchReplyImpl

327. By Marcus Tomlinson

Merged serialize-oneway-invoke and resolved conflicts

328. By Marcus Tomlinson

Fixed RegistryI_test according to new async standard

329. By Marcus Tomlinson

Fixed Runtime_test

330. By Marcus Tomlinson

We can't just close a socket directly and leave it lingering in the connection pool, otherwise we attempt to talk to a closed socket the next time around

331. By Marcus Tomlinson

Corrected scope_ids std::array initialization

332. By Marcus Tomlinson

Addressed review comments

333. By Marcus Tomlinson

Merged serialize-oneway-invoke

334. By Marcus Tomlinson

Fix Invocation_test due to new rebinding behavior

335. By Marcus Tomlinson

Merged serialize-oneway-invoke

336. By Marcus Tomlinson

Merged devel

337. By Marcus Tomlinson

Merged serialize-oneway-invoke

338. By Marcus Tomlinson

Merged serialize-oneway-invoke and resolved conflicts

339. By Marcus Tomlinson

Merged devel

340. By Marcus Tomlinson

Merged devel

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'include/unity/scopes/internal/zmq_middleware/ZmqMiddleware.h'
2--- include/unity/scopes/internal/zmq_middleware/ZmqMiddleware.h 2014-04-02 09:24:02 +0000
3+++ include/unity/scopes/internal/zmq_middleware/ZmqMiddleware.h 2014-04-08 08:16:38 +0000
4@@ -95,7 +95,8 @@
5 RequestMode mode,
6 int64_t timeout);
7
8- std::shared_ptr<ObjectAdapter> find_adapter(std::string const& name, std::string const& endpoint_dir);
9+ std::shared_ptr<ObjectAdapter> find_adapter(std::string const& name, std::string const& endpoint_dir,
10+ std::string const& category);
11
12 ZmqProxy safe_add(std::function<void()>& disconnect_func,
13 std::shared_ptr<ObjectAdapter> const& adapter,
14
15=== modified file 'include/unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h'
16--- include/unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h 2014-02-03 02:53:33 +0000
17+++ include/unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h 2014-04-08 08:16:38 +0000
18@@ -24,6 +24,7 @@
19 #include <unity/scopes/internal/zmq_middleware/RequestMode.h>
20 #include <unity/scopes/internal/zmq_middleware/ZmqMiddleware.h>
21 #include <unity/scopes/internal/zmq_middleware/ZmqObjectProxyFwd.h>
22+#include <unity/scopes/internal/zmq_middleware/ZmqRegistryProxyFwd.h>
23 #include <unity/scopes/internal/zmq_middleware/ZmqReceiver.h>
24
25 #include <capnp/message.h>
26@@ -75,6 +76,9 @@
27 ZmqReceiver invoke_(capnp::MessageBuilder& out_params, int64_t timeout);
28
29 private:
30+ ZmqReceiver invoke__(capnp::MessageBuilder& out_params, int64_t timeout);
31+
32+ ZmqRegistryProxy registry_;
33 std::string endpoint_;
34 std::string identity_;
35 std::string category_;
36
37=== modified file 'scoperegistry/scoperegistry.cpp'
38--- scoperegistry/scoperegistry.cpp 2014-04-04 09:40:40 +0000
39+++ scoperegistry/scoperegistry.cpp 2014-04-08 08:16:38 +0000
40@@ -422,27 +422,6 @@
41 // the registry of state changes.
42 middleware->add_state_receiver_object("StateReceiver", registry->state_receiver());
43
44- // FIXME, HACK HACK HACK HACK
45- // The middleware should spawn scope processes with lookup() on demand.
46- // Because it currently does not have the plumbing, we start every scope immediately.
47- // When the plumbing appears, remove this.
48- for (auto&& pair : local_scopes)
49- {
50- try
51- {
52- registry->locate(pair.first);
53- }
54- catch (NotFoundException const&)
55- {
56- // We ignore this. If the scope config couldn't be found, add_local_scopes()
57- // has already printed an error message.
58- }
59- catch (std::exception const& e)
60- {
61- error("could not start scope " + pair.first + ": " + e.what());
62- }
63- }
64-
65 // Drop our shared_ptr to the RegistryObject. This means that the registry object
66 // is kept alive only via the shared_ptr held by the middleware. If the middleware
67 // shuts down, it clears out the active servant map, which destroys the registry
68
69=== modified file 'src/scopes/internal/RuntimeImpl.cpp'
70--- src/scopes/internal/RuntimeImpl.cpp 2014-04-03 14:22:02 +0000
71+++ src/scopes/internal/RuntimeImpl.cpp 2014-04-08 08:16:38 +0000
72@@ -20,6 +20,7 @@
73 #include <unity/scopes/internal/ScopeBaseImpl.h>
74
75 #include <unity/scopes/internal/DfltConfig.h>
76+#include <unity/scopes/internal/MWStateReceiver.h>
77 #include <unity/scopes/internal/RegistryConfig.h>
78 #include <unity/scopes/internal/RegistryImpl.h>
79 #include <unity/scopes/internal/RuntimeConfig.h>
80@@ -201,7 +202,13 @@
81
82 void RuntimeImpl::run_scope(ScopeBase *const scope_base, std::string const& scope_ini_file)
83 {
84- auto mw = factory()->create(scope_id_, "Zmq", "Zmq.ini");
85+ // Retrieve the registry middleware and create a proxy to its state receiver
86+ RegistryConfig reg_conf(registry_identity_, registry_configfile_);
87+ auto reg_runtime = create(registry_identity_, configfile_);
88+ auto reg_mw = reg_runtime->factory()->find(registry_identity_, reg_conf.mw_kind());
89+ auto reg_state_receiver = reg_mw->create_state_receiver_proxy("StateReceiver");
90+
91+ auto mw = factory()->create(scope_id_, reg_conf.mw_kind(), reg_conf.mw_configfile());
92
93 {
94 // dirname modifies its argument, so we need a copy of scope lib path
95@@ -223,7 +230,14 @@
96 auto scope = unique_ptr<internal::ScopeObject>(new internal::ScopeObject(this, scope_base));
97 auto proxy = mw->add_scope_object(scope_id_, move(scope));
98
99+ // Inform the registry that this scope is now ready to process requests
100+ reg_state_receiver->push_state(scope_id_, StateReceiverObject::State::ScopeReady);
101+
102 mw->wait_for_shutdown();
103+
104+ // Inform the registry that this scope is shutting down
105+ reg_state_receiver->push_state(scope_id_, StateReceiverObject::State::ScopeStopping);
106+
107 run_future.get();
108 }
109
110
111=== modified file 'src/scopes/internal/zmq_middleware/ZmqMiddleware.cpp'
112--- src/scopes/internal/zmq_middleware/ZmqMiddleware.cpp 2014-04-02 09:24:02 +0000
113+++ src/scopes/internal/zmq_middleware/ZmqMiddleware.cpp 2014-04-08 08:16:38 +0000
114@@ -63,6 +63,13 @@
115 char const* reply_suffix = "-r"; // Appended to server_name_ to create reply adapter name
116 char const* state_suffix = "-s"; // Appended to server_name_ to create state adapter name
117
118+char const* query_category = "Query"; // query adapter category name
119+char const* ctrl_category = "QueryCtrl"; // control adapter category name
120+char const* reply_category = "Reply"; // reply adapter category name
121+char const* state_category = "State"; // state adapter category name
122+char const* scope_category = "Scope"; // scope adapter category name
123+char const* registry_category = "Registry"; // registry adapter category name
124+
125 } // namespace
126
127 ZmqMiddleware::ZmqMiddleware(string const& server_name, string const& configfile, RuntimeImpl* runtime)
128@@ -113,9 +120,11 @@
129 {
130 // TODO: get directory from config
131 // TODO: get pool size from config
132+ // N.B. We absolutely MUST have AT LEAST 2 invoke threads
133+ // as rebinding is invoked within 2-way invocations.
134 {
135 lock_guard<mutex> lock(data_mutex_);
136- invokers_.reset(new ThreadPool(1));
137+ invokers_.reset(new ThreadPool(2));
138 }
139 state_ = Started;
140 state_changed_.notify_all();
141@@ -273,7 +282,7 @@
142 }
143
144 // Now run over the map and check each value
145- string category = "Scope";
146+ string category = scope_category;
147 RequestMode mode = RequestMode::Twoway;
148 int64_t timeout = -1;
149 for (auto const& pair : fmap)
150@@ -343,7 +352,7 @@
151 MWRegistryProxy proxy;
152 try
153 {
154- proxy.reset(new ZmqRegistry(this, endpoint, identity, "Registry", twoway_timeout_));
155+ proxy.reset(new ZmqRegistry(this, endpoint, identity, registry_category, twoway_timeout_));
156 }
157 catch (zmqpp::exception const& e)
158 {
159@@ -358,7 +367,7 @@
160 try
161 {
162 string endpoint = "ipc://" + config_.private_dir() + "/" + identity;
163- proxy.reset(new ZmqScope(this, endpoint, identity, "Scope", twoway_timeout_));
164+ proxy.reset(new ZmqScope(this, endpoint, identity, scope_category, twoway_timeout_));
165 }
166 catch (zmqpp::exception const& e)
167 {
168@@ -372,7 +381,7 @@
169 MWScopeProxy proxy;
170 try
171 {
172- proxy.reset(new ZmqScope(this, endpoint, identity, "Scope", twoway_timeout_));
173+ proxy.reset(new ZmqScope(this, endpoint, identity, scope_category, twoway_timeout_));
174 }
175 catch (zmqpp::exception const& e)
176 {
177@@ -386,7 +395,7 @@
178 MWQueryProxy proxy;
179 try
180 {
181- proxy.reset(new ZmqQuery(this, endpoint, identity, "Query"));
182+ proxy.reset(new ZmqQuery(this, endpoint, identity, query_category));
183 }
184 catch (zmqpp::exception const& e)
185 {
186@@ -400,7 +409,7 @@
187 MWQueryCtrlProxy proxy;
188 try
189 {
190- proxy.reset(new ZmqQueryCtrl(this, endpoint, identity, "QueryCtrl"));
191+ proxy.reset(new ZmqQueryCtrl(this, endpoint, identity, ctrl_category));
192 }
193 catch (zmqpp::exception const& e)
194 {
195@@ -415,7 +424,7 @@
196 try
197 {
198 proxy.reset(new ZmqStateReceiver(this, "ipc://" + config_.private_dir() + "/" + server_name_ + state_suffix,
199- identity, "StateReceiver"));
200+ identity, state_category));
201 }
202 catch (zmqpp::exception const& e)
203 {
204@@ -432,11 +441,11 @@
205 try
206 {
207 shared_ptr<QueryCtrlI> qci(make_shared<QueryCtrlI>(ctrl));
208- auto adapter = find_adapter(server_name_ + ctrl_suffix, config_.private_dir());
209+ auto adapter = find_adapter(server_name_ + ctrl_suffix, config_.private_dir(), ctrl_category);
210 function<void()> df;
211 auto proxy = safe_add(df, adapter, "", qci);
212 ctrl->set_disconnect_function(df);
213- return ZmqQueryCtrlProxy(new ZmqQueryCtrl(this, proxy->endpoint(), proxy->identity(), "QueryCtrl"));
214+ return ZmqQueryCtrlProxy(new ZmqQueryCtrl(this, proxy->endpoint(), proxy->identity(), ctrl_category));
215 }
216 catch (std::exception const& e) // Should never happen unless our implementation is broken
217 {
218@@ -454,8 +463,8 @@
219 try
220 {
221 shared_ptr<QueryCtrlI> qci(make_shared<QueryCtrlI>(ctrl));
222- auto adapter = find_adapter(server_name_ + ctrl_suffix, config_.private_dir());
223- auto df = safe_dflt_add(adapter, "QueryCtrl", qci);
224+ auto adapter = find_adapter(server_name_ + ctrl_suffix, config_.private_dir(), ctrl_category);
225+ auto df = safe_dflt_add(adapter, ctrl_category, qci);
226 ctrl->set_disconnect_function(df);
227 }
228 catch (std::exception const& e) // Should never happen unless our implementation is broken
229@@ -474,11 +483,11 @@
230 try
231 {
232 shared_ptr<QueryI> qi(make_shared<QueryI>(query));
233- auto adapter = find_adapter(server_name_ + query_suffix, config_.private_dir());
234+ auto adapter = find_adapter(server_name_ + query_suffix, config_.private_dir(), query_category);
235 function<void()> df;
236 auto proxy = safe_add(df, adapter, "", qi);
237 query->set_disconnect_function(df);
238- return ZmqQueryProxy(new ZmqQuery(this, proxy->endpoint(), proxy->identity(), "Query"));
239+ return ZmqQueryProxy(new ZmqQuery(this, proxy->endpoint(), proxy->identity(), query_category));
240 }
241 catch (std::exception const& e) // Should never happen unless our implementation is broken
242 {
243@@ -496,8 +505,8 @@
244 try
245 {
246 shared_ptr<QueryI> qi(make_shared<QueryI>(query));
247- auto adapter = find_adapter(server_name_ + query_suffix, config_.private_dir());
248- auto df = safe_dflt_add(adapter, "Query", qi);
249+ auto adapter = find_adapter(server_name_ + query_suffix, config_.private_dir(), query_category);
250+ auto df = safe_dflt_add(adapter, query_category, qi);
251 query->set_disconnect_function(df);
252 }
253 catch (std::exception const& e) // Should never happen unless our implementation is broken
254@@ -517,11 +526,11 @@
255 try
256 {
257 shared_ptr<RegistryI> ri(make_shared<RegistryI>(registry));
258- auto adapter = find_adapter(server_name_, runtime()->registry_endpointdir());
259+ auto adapter = find_adapter(server_name_, runtime()->registry_endpointdir(), registry_category);
260 function<void()> df;
261 auto proxy = safe_add(df, adapter, identity, ri);
262 registry->set_disconnect_function(df);
263- return ZmqRegistryProxy(new ZmqRegistry(this, proxy->endpoint(), proxy->identity(), "Registry", twoway_timeout_));
264+ return ZmqRegistryProxy(new ZmqRegistry(this, proxy->endpoint(), proxy->identity(), registry_category, twoway_timeout_));
265 }
266 catch (std::exception const& e) // Should never happen unless our implementation is broken
267 {
268@@ -540,11 +549,11 @@
269 try
270 {
271 shared_ptr<ReplyI> ri(make_shared<ReplyI>(reply));
272- auto adapter = find_adapter(server_name_ + reply_suffix, config_.public_dir());
273+ auto adapter = find_adapter(server_name_ + reply_suffix, config_.public_dir(), reply_category);
274 function<void()> df;
275 auto proxy = safe_add(df, adapter, "", ri);
276 reply->set_disconnect_function(df);
277- return ZmqReplyProxy(new ZmqReply(this, proxy->endpoint(), proxy->identity(), "Reply"));
278+ return ZmqReplyProxy(new ZmqReply(this, proxy->endpoint(), proxy->identity(), reply_category));
279 }
280 catch (std::exception const& e) // Should never happen unless our implementation is broken
281 {
282@@ -564,11 +573,11 @@
283 try
284 {
285 shared_ptr<ScopeI> si(make_shared<ScopeI>(scope));
286- auto adapter = find_adapter(server_name_, config_.private_dir());
287+ auto adapter = find_adapter(server_name_, config_.private_dir(), scope_category);
288 function<void()> df;
289 auto proxy = safe_add(df, adapter, identity, si);
290 scope->set_disconnect_function(df);
291- return ZmqScopeProxy(new ZmqScope(this, proxy->endpoint(), proxy->identity(), "Scope", twoway_timeout_));
292+ return ZmqScopeProxy(new ZmqScope(this, proxy->endpoint(), proxy->identity(), scope_category, twoway_timeout_));
293 }
294 catch (std::exception const& e) // Should never happen unless our implementation is broken
295 {
296@@ -586,8 +595,8 @@
297 try
298 {
299 shared_ptr<ScopeI> si(make_shared<ScopeI>(scope));
300- auto adapter = find_adapter(server_name_, config_.private_dir());
301- auto df = safe_dflt_add(adapter, "Scope", si);
302+ auto adapter = find_adapter(server_name_, config_.private_dir(), scope_category);
303+ auto df = safe_dflt_add(adapter, scope_category, si);
304 scope->set_disconnect_function(df);
305 }
306 catch (std::exception const& e) // Should never happen unless our implementation is broken
307@@ -607,11 +616,11 @@
308 try
309 {
310 shared_ptr<StateReceiverI> sri(make_shared<StateReceiverI>(state_receiver));
311- auto adapter = find_adapter(server_name_ + state_suffix, config_.private_dir());
312+ auto adapter = find_adapter(server_name_ + state_suffix, config_.private_dir(), state_category);
313 function<void()> df;
314 auto proxy = safe_add(df, adapter, identity, sri);
315 state_receiver->set_disconnect_function(df);
316- return ZmqStateReceiverProxy(new ZmqStateReceiver(this, proxy->endpoint(), proxy->identity(), "StateReceiver"));
317+ return ZmqStateReceiverProxy(new ZmqStateReceiver(this, proxy->endpoint(), proxy->identity(), state_category));
318 }
319 catch (std::exception const& e) // Should never happen unless our implementation is broken
320 {
321@@ -675,12 +684,12 @@
322 {
323 throw MiddlewareException("make_typed_proxy(): cannot create oneway proxies");
324 }
325- if (category == "Scope")
326+ if (category == scope_category)
327 {
328 auto p = make_shared<ZmqScope>(this, endpoint, identity, category, timeout);
329 return ScopeImpl::create(p, runtime(), identity);
330 }
331- else if (category == "Registry")
332+ else if (category == registry_category)
333 {
334 auto p = make_shared<ZmqRegistry>(this, endpoint, identity, category, timeout);
335 return make_shared<RegistryImpl>(p, runtime());
336@@ -691,23 +700,8 @@
337 }
338 }
339
340-namespace
341-{
342-
343-bool has_suffix(string const& s, string const& suffix)
344-{
345- auto s_len = s.length();
346- auto suffix_len = suffix.length();
347- if (s_len >= suffix_len)
348- {
349- return s.compare(s_len - suffix_len, suffix_len, suffix) == 0;
350- }
351- return false;
352-}
353-
354-} // namespace
355-
356-shared_ptr<ObjectAdapter> ZmqMiddleware::find_adapter(string const& name, string const& endpoint_dir)
357+shared_ptr<ObjectAdapter> ZmqMiddleware::find_adapter(string const& name, string const& endpoint_dir,
358+ string const& category)
359 {
360 lock_guard<mutex> lock(data_mutex_);
361
362@@ -720,33 +714,49 @@
363 // We don't have the requested adapter yet, so we create it on the fly.
364 int pool_size;
365 RequestMode mode;
366- if (has_suffix(name, query_suffix))
367+ if (category == query_category)
368 {
369 // The query adapter is single or multi-threaded and supports oneway operations only.
370 // TODO: get pool size from config
371 pool_size = 1;
372 mode = RequestMode::Oneway;
373 }
374- else if (has_suffix(name, ctrl_suffix))
375+ else if (category == ctrl_category)
376 {
377 // The ctrl adapter is single-threaded and supports oneway operations only.
378 pool_size = 1;
379 mode = RequestMode::Oneway;
380 }
381- else if (has_suffix(name, reply_suffix))
382+ else if (category == reply_category)
383 {
384 // The reply adapter is single- or multi-threaded and supports oneway operations only.
385 // TODO: get pool size from config
386 pool_size = 1;
387 mode = RequestMode::Oneway;
388 }
389- else if (has_suffix(name, state_suffix))
390+ else if (category == state_category)
391 {
392 // The state adapter is single- or multi-threaded and supports oneway operations only.
393 // TODO: get pool size from config
394 pool_size = 1;
395 mode = RequestMode::Oneway;
396 }
397+ else if (category == scope_category)
398+ {
399+ // The scope adapter is single- or multi-threaded and supports twoway operations only.
400+ // TODO: get pool size from config
401+ pool_size = 1;
402+ mode = RequestMode::Twoway;
403+ }
404+ else if (category == registry_category)
405+ {
406+ // The registry adapter is single- or multi-threaded and supports twoway operations only.
407+ // TODO: get pool size from config
408+ // NB: On rebind, locate() is called on this adapter. A scope may then call registry methods during
409+ // its start() method, hence we must ensure this adapter has enough threads available to handle this.
410+ pool_size = 6;
411+ mode = RequestMode::Twoway;
412+ }
413 else
414 {
415 // The normal adapter is single- or multi-threaded and supports twoway operations only.
416@@ -757,7 +767,7 @@
417
418 // The query adapter is always inproc.
419 string endpoint;
420- if (has_suffix(name, query_suffix))
421+ if (category == query_category)
422 {
423 endpoint = "inproc://" + name;
424 }
425
426=== modified file 'src/scopes/internal/zmq_middleware/ZmqObject.cpp'
427--- src/scopes/internal/zmq_middleware/ZmqObject.cpp 2014-02-03 07:28:36 +0000
428+++ src/scopes/internal/zmq_middleware/ZmqObject.cpp 2014-04-08 08:16:38 +0000
429@@ -18,9 +18,11 @@
430
431 #include <unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h>
432
433+#include <unity/scopes/internal/RuntimeImpl.h>
434 #include <unity/scopes/internal/zmq_middleware/ConnectionPool.h>
435 #include <unity/scopes/internal/zmq_middleware/Util.h>
436 #include <unity/scopes/internal/zmq_middleware/ZmqException.h>
437+#include <unity/scopes/internal/zmq_middleware/ZmqRegistry.h>
438 #include <unity/scopes/internal/zmq_middleware/ZmqSender.h>
439 #include <unity/scopes/ScopeExceptions.h>
440
441@@ -61,6 +63,14 @@
442 assert(timeout >= -1);
443 throw_if_bad_endpoint(endpoint);
444
445+ // retrieve the registry proxy if this object is not the registry itself
446+ auto runtime = mw_base->runtime();
447+ if (identity != runtime->registry_identity())
448+ {
449+ registry_ = dynamic_pointer_cast<ZmqRegistry>(mw_base->create_registry_proxy(
450+ runtime->registry_identity(), runtime->registry_endpoint()));
451+ }
452+
453 // Make sure that fields have consistent settings for null proxies.
454 if (endpoint.empty() || identity.empty())
455 {
456@@ -165,11 +175,32 @@
457 return invoke_(out_params, timeout_);
458 }
459
460+ZmqReceiver ZmqObjectProxy::invoke_(capnp::MessageBuilder& out_params, int64_t timeout)
461+{
462+ try
463+ {
464+ return invoke__(out_params, timeout);
465+ }
466+ catch (TimeoutException const&)
467+ {
468+ if (!registry_)
469+ {
470+ throw;
471+ }
472+ ObjectProxy new_proxy = registry_->locate(identity_);
473+ endpoint_ = new_proxy->endpoint();
474+ identity_ = new_proxy->identity();
475+ category_ = new_proxy->category();
476+ timeout_ = new_proxy->timeout();
477+ return invoke__(out_params, timeout);
478+ }
479+}
480+
481 // Get a socket to the endpoint for this proxy and write the request on the wire.
482 // For a twoway request, poll for the reply with the timeout set for this proxy.
483 // Return a receiver for the response (whether this is a oneway or twoway request).
484
485-ZmqReceiver ZmqObjectProxy::invoke_(capnp::MessageBuilder& out_params, int64_t timeout)
486+ZmqReceiver ZmqObjectProxy::invoke__(capnp::MessageBuilder& out_params, int64_t timeout)
487 {
488 // Each calling thread gets its own pool because zmq sockets are not thread-safe.
489 thread_local static ConnectionPool pool(*mw_base()->context());
490
491=== modified file 'test/gtest/scopes/Runtime/Runtime_test.cpp'
492--- test/gtest/scopes/Runtime/Runtime_test.cpp 2014-04-03 14:22:02 +0000
493+++ test/gtest/scopes/Runtime/Runtime_test.cpp 2014-04-08 08:16:38 +0000
494@@ -240,7 +240,7 @@
495 receiver->wait_until_finished();
496
497 auto result = receiver->last_result();
498- EXPECT_TRUE(result.get() != nullptr);
499+ ASSERT_TRUE(result.get() != nullptr);
500
501 auto target = result->target_scope_proxy();
502 EXPECT_TRUE(target != nullptr);
503
504=== modified file 'test/gtest/scopes/internal/ScopeMetadataImpl/ScopeMetadataImpl_test.cpp'
505--- test/gtest/scopes/internal/ScopeMetadataImpl/ScopeMetadataImpl_test.cpp 2014-04-04 09:40:40 +0000
506+++ test/gtest/scopes/internal/ScopeMetadataImpl/ScopeMetadataImpl_test.cpp 2014-04-08 08:16:38 +0000
507@@ -18,6 +18,7 @@
508
509 #include <unity/scopes/internal/ScopeMetadataImpl.h>
510
511+#include <unity/scopes/internal/RuntimeImpl.h>
512 #include <unity/scopes/internal/ScopeImpl.h>
513 #include <unity/scopes/internal/zmq_middleware/ZmqMiddleware.h>
514 #include <unity/scopes/ScopeExceptions.h>
515@@ -35,8 +36,9 @@
516
517 TEST(ScopeMetadataImpl, basic)
518 {
519+ auto rt = RuntimeImpl::create("testscope");
520 ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/ScopeMetadataImpl/Zmq.ini",
521- (RuntimeImpl*)0x1);
522+ rt.get());
523
524 unique_ptr<ScopeMetadataImpl> mi(new ScopeMetadataImpl(&mw));
525 mi->set_scope_id("scope_id");
526@@ -229,8 +231,9 @@
527
528 TEST(ScopeMetadataImpl, serialize)
529 {
530+ auto rt = RuntimeImpl::create("testscope");
531 ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/ScopeMetadataImpl/Zmq.ini",
532- (RuntimeImpl*)0x1);
533+ rt.get());
534
535 unique_ptr<ScopeMetadataImpl> mi(new ScopeMetadataImpl(&mw));
536 mi->set_scope_id("scope_id");
537@@ -281,8 +284,9 @@
538
539 TEST(ScopeMetadataImpl, serialize_exceptions)
540 {
541+ auto rt = RuntimeImpl::create("testscope");
542 ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/ScopeMetadataImpl/Zmq.ini",
543- (RuntimeImpl*)0x1);
544+ rt.get());
545
546 ScopeMetadataImpl mi(&mw);
547 try
548@@ -348,8 +352,9 @@
549
550 TEST(ScopeMetadataImpl, deserialize_exceptions)
551 {
552+ auto rt = RuntimeImpl::create("testscope");
553 ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/ScopeMetadataImpl/Zmq.ini",
554- (RuntimeImpl*)0x1);
555+ rt.get());
556
557 VariantMap m;
558 try
559
560=== modified file 'test/gtest/scopes/internal/zmq_middleware/ObjectAdapter/ObjectAdapter_test.cpp'
561--- test/gtest/scopes/internal/zmq_middleware/ObjectAdapter/ObjectAdapter_test.cpp 2014-01-24 11:16:14 +0000
562+++ test/gtest/scopes/internal/zmq_middleware/ObjectAdapter/ObjectAdapter_test.cpp 2014-04-08 08:16:38 +0000
563@@ -19,6 +19,7 @@
564 #include <unity/scopes/internal/zmq_middleware/ObjectAdapter.h>
565
566 #include <scopes/internal/zmq_middleware/capnproto/Message.capnp.h>
567+#include <unity/scopes/internal/RuntimeImpl.h>
568 #include <unity/scopes/internal/zmq_middleware/ServantBase.h>
569 #include <unity/scopes/internal/zmq_middleware/ZmqException.h>
570 #include <unity/scopes/internal/zmq_middleware/ZmqMiddleware.h>
571@@ -281,8 +282,9 @@
572
573 TEST(ObjectAdapter, add_remove_find)
574 {
575+ auto rt = RuntimeImpl::create("testscope");
576 ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/zmq_middleware/ObjectAdapter/Zmq.ini",
577- (RuntimeImpl*)0x1);
578+ rt.get());
579
580 wait();
581 ObjectAdapter a(mw, "testscope", "ipc://testscope", RequestMode::Twoway, 5);
582@@ -568,8 +570,9 @@
583
584 TEST(ObjectAdapter, invoke_ok)
585 {
586+ auto rt = RuntimeImpl::create("testscope");
587 ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/zmq_middleware/ObjectAdapter/Zmq.ini",
588- (RuntimeImpl*)0x1);
589+ rt.get());
590
591 wait();
592 ObjectAdapter a(mw, "testscope", "ipc://testscope", RequestMode::Twoway, 1);
593@@ -620,8 +623,9 @@
594
595 TEST(ObjectAdapter, invoke_object_not_exist)
596 {
597+ auto rt = RuntimeImpl::create("testscope");
598 ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/zmq_middleware/ObjectAdapter/Zmq.ini",
599- (RuntimeImpl*)0x1);
600+ rt.get());
601
602 wait();
603 ObjectAdapter a(mw, "testscope", "ipc://testscope", RequestMode::Twoway, 1);
604@@ -665,8 +669,9 @@
605
606 TEST(ObjectAdapter, invoke_operation_not_exist)
607 {
608+ auto rt = RuntimeImpl::create("testscope");
609 ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/zmq_middleware/ObjectAdapter/Zmq.ini",
610- (RuntimeImpl*)0x1);
611+ rt.get());
612
613 wait();
614 ObjectAdapter a(mw, "testscope", "ipc://testscope", RequestMode::Twoway, 1);
615@@ -785,8 +790,9 @@
616
617 TEST(ObjectAdapter, twoway_threading)
618 {
619+ auto rt = RuntimeImpl::create("testscope");
620 ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/zmq_middleware/ObjectAdapter/Zmq.ini",
621- (RuntimeImpl*)0x1);
622+ rt.get());
623
624 // Single servant to which we send requests concurrently.
625 shared_ptr<CountingServant> o(new CountingServant(100));
626@@ -819,8 +825,9 @@
627
628 TEST(ObjectAdapter, oneway_threading)
629 {
630+ auto rt = RuntimeImpl::create("testscope");
631 ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/zmq_middleware/ObjectAdapter/Zmq.ini",
632- (RuntimeImpl*)0x1);
633+ rt.get());
634
635 // Single servant to which we send requests concurrently.
636 shared_ptr<CountingServant> o(new CountingServant(100));
637@@ -890,8 +897,9 @@
638
639 TEST(ObjectAdapter, servant_map_destructor)
640 {
641+ auto rt = RuntimeImpl::create("testscope");
642 ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/zmq_middleware/ObjectAdapter/Zmq.ini",
643- (RuntimeImpl*)0x1);
644+ rt.get());
645
646 {
647 wait();
648
649=== modified file 'test/gtest/scopes/internal/zmq_middleware/ZmqMiddleware/ZmqMiddleware_test.cpp'
650--- test/gtest/scopes/internal/zmq_middleware/ZmqMiddleware/ZmqMiddleware_test.cpp 2014-03-07 01:07:28 +0000
651+++ test/gtest/scopes/internal/zmq_middleware/ZmqMiddleware/ZmqMiddleware_test.cpp 2014-04-08 08:16:38 +0000
652@@ -18,6 +18,7 @@
653
654 #include <unity/scopes/internal/zmq_middleware/ZmqMiddleware.h>
655
656+#include <unity/scopes/internal/RuntimeImpl.h>
657 #include <unity/scopes/internal/MWObjectProxy.h>
658 #include <unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h>
659 #include <unity/scopes/ScopeExceptions.h>
660@@ -44,9 +45,10 @@
661
662 TEST(ZmqMiddleware, string_to_proxy)
663 {
664+ auto rt = RuntimeImpl::create("testscope");
665 ZmqMiddleware mw("testscope",
666 TEST_BUILD_ROOT "/gtest/scopes/internal/zmq_middleware/ZmqMiddleware/Zmq.ini",
667- (RuntimeImpl*)0x1);
668+ rt.get());
669
670 ObjectProxy p;
671 ScopeProxy sp;

Subscribers

People subscribed via source and target branches

to all changes: