Merge lp:~marcustomlinson/unity-scopes-api/rebinding_logic into lp:unity-scopes-api/devel
- rebinding_logic
- Merge into devel
Status: | Superseded |
---|---|
Proposed branch: | lp:~marcustomlinson/unity-scopes-api/rebinding_logic |
Merge into: | lp:unity-scopes-api/devel |
Diff against target: |
671 lines (+141/-87) 10 files modified
include/unity/scopes/internal/zmq_middleware/ZmqMiddleware.h (+2/-1) include/unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h (+4/-0) scoperegistry/scoperegistry.cpp (+0/-21) src/scopes/internal/RuntimeImpl.cpp (+15/-1) src/scopes/internal/zmq_middleware/ZmqMiddleware.cpp (+60/-50) src/scopes/internal/zmq_middleware/ZmqObject.cpp (+32/-1) test/gtest/scopes/Runtime/Runtime_test.cpp (+1/-1) test/gtest/scopes/internal/ScopeMetadataImpl/ScopeMetadataImpl_test.cpp (+9/-4) test/gtest/scopes/internal/zmq_middleware/ObjectAdapter/ObjectAdapter_test.cpp (+15/-7) test/gtest/scopes/internal/zmq_middleware/ZmqMiddleware/ZmqMiddleware_test.cpp (+3/-1) |
To merge this branch: | bzr merge lp:~marcustomlinson/unity-scopes-api/rebinding_logic |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Unity Team | Pending | ||
Review via email: mp+214221@code.launchpad.net |
Commit message
Description of the change
- 296. By Marcus Tomlinson
-
Merged devel
- 297. By Marcus Tomlinson
-
Fixing tests (can't contruct a ZmqObjectProxy with an invalid runtime pointer)
- 298. By Marcus Tomlinson
-
On locate() a scope can call registry methods during its start() method. Ensure the registry adapter has enough threads available to handle this.
- 299. By Marcus Tomlinson
-
Merged devel
- 300. By Marcus Tomlinson
-
Merged send_state_
in_run_ scope - 301. By Marcus Tomlinson
-
Upped middleware timeout for locate to allow enough time for the locate logic to execute.
- 302. By Marcus Tomlinson
-
Updated locate test for RegistryI to execute an indirect locate call via rebinding.
- 303. By Marcus Tomlinson
-
Merged devel
- 304. By Marcus Tomlinson
-
Fixed ScopeMetadataImpl, ObjectAdapter, and ZmqMiddleware tests
- 305. By Marcus Tomlinson
-
Added cat_registry_mutex_ in order to avoid pushing a result in parallel to it's category being registered.
- 306. By Marcus Tomlinson
-
Temporary test hack to see if tests pass in CI
- 307. By Marcus Tomlinson
-
Added RaiiScopeThread structs to Activation_test and Runtime_test to ensure each test is run in isolation
- 308. By Marcus Tomlinson
-
Ensure that the state receiver is ready to process requests as soon as the registry is added to the middleware.
- 309. By Marcus Tomlinson
-
Simplified RaiiScopeThread
- 310. By Marcus Tomlinson
-
If there are pushes still active when finished() is called, give them a second to complete before destroying the ReplyObject.
- 311. By Marcus Tomlinson
-
Merged devel
- 312. By Marcus Tomlinson
-
Fixed synchronization race between client and server copies of the category register in SearchReplyImpl and ResultReplyObject.
- 313. By Marcus Tomlinson
-
Fixed comment
- 314. By Marcus Tomlinson
-
If there are pushes still active when finished() is called, give them a second to complete before destroying the ReplyObject.
- 315. By Marcus Tomlinson
-
Make SearchListenerBase implementations thread safe in tests.
- 316. By Marcus Tomlinson
-
Make PreviewListenerBase implementations thread safe in tests.
- 317. By Marcus Tomlinson
-
Wait for active pushes to complete on finished() logic moved to ReplyImpl
- 318. By Marcus Tomlinson
-
Wait for active pushes to complete on finished() logic moved to ReplyI
- 319. By Marcus Tomlinson
-
Remove test code
- 320. By Marcus Tomlinson
-
Removed more test code
- 321. By Marcus Tomlinson
-
Merged devel and fixed conflicts
- 322. By Marcus Tomlinson
-
Removed unnecessary lock_guards from push() implementations in tests
- 323. By Marcus Tomlinson
-
Some more cleaning up
- 324. By Marcus Tomlinson
-
Check for null pointers when assigning registry_ in the ZmqObject constructor
- 325. By Marcus Tomlinson
-
Expect pushes to occur in the correct order in the smartscopesproxy search test
- 326. By Marcus Tomlinson
-
Revert changes made to SearchReplyImpl
- 327. By Marcus Tomlinson
-
Merged serialize-
oneway- invoke and resolved conflicts - 328. By Marcus Tomlinson
-
Fixed RegistryI_test according to new async standard
- 329. By Marcus Tomlinson
-
Fixed Runtime_test
- 330. By Marcus Tomlinson
-
We can't just close a socket directly and leave it lingering in the connection pool, otherwise we attempt to talk to a closed socket the next time around
- 331. By Marcus Tomlinson
-
Corrected scope_ids std::array initialization
- 332. By Marcus Tomlinson
-
Addressed review comments
- 333. By Marcus Tomlinson
-
Merged serialize-
oneway- invoke - 334. By Marcus Tomlinson
-
Fix Invocation_test due to new rebinding behavior
- 335. By Marcus Tomlinson
-
Merged serialize-
oneway- invoke - 336. By Marcus Tomlinson
-
Merged devel
- 337. By Marcus Tomlinson
-
Merged serialize-
oneway- invoke - 338. By Marcus Tomlinson
-
Merged serialize-
oneway- invoke and resolved conflicts - 339. By Marcus Tomlinson
-
Merged devel
- 340. By Marcus Tomlinson
-
Merged devel
Unmerged revisions
Preview Diff
1 | === modified file 'include/unity/scopes/internal/zmq_middleware/ZmqMiddleware.h' |
2 | --- include/unity/scopes/internal/zmq_middleware/ZmqMiddleware.h 2014-04-02 09:24:02 +0000 |
3 | +++ include/unity/scopes/internal/zmq_middleware/ZmqMiddleware.h 2014-04-08 08:16:38 +0000 |
4 | @@ -95,7 +95,8 @@ |
5 | RequestMode mode, |
6 | int64_t timeout); |
7 | |
8 | - std::shared_ptr<ObjectAdapter> find_adapter(std::string const& name, std::string const& endpoint_dir); |
9 | + std::shared_ptr<ObjectAdapter> find_adapter(std::string const& name, std::string const& endpoint_dir, |
10 | + std::string const& category); |
11 | |
12 | ZmqProxy safe_add(std::function<void()>& disconnect_func, |
13 | std::shared_ptr<ObjectAdapter> const& adapter, |
14 | |
15 | === modified file 'include/unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h' |
16 | --- include/unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h 2014-02-03 02:53:33 +0000 |
17 | +++ include/unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h 2014-04-08 08:16:38 +0000 |
18 | @@ -24,6 +24,7 @@ |
19 | #include <unity/scopes/internal/zmq_middleware/RequestMode.h> |
20 | #include <unity/scopes/internal/zmq_middleware/ZmqMiddleware.h> |
21 | #include <unity/scopes/internal/zmq_middleware/ZmqObjectProxyFwd.h> |
22 | +#include <unity/scopes/internal/zmq_middleware/ZmqRegistryProxyFwd.h> |
23 | #include <unity/scopes/internal/zmq_middleware/ZmqReceiver.h> |
24 | |
25 | #include <capnp/message.h> |
26 | @@ -75,6 +76,9 @@ |
27 | ZmqReceiver invoke_(capnp::MessageBuilder& out_params, int64_t timeout); |
28 | |
29 | private: |
30 | + ZmqReceiver invoke__(capnp::MessageBuilder& out_params, int64_t timeout); |
31 | + |
32 | + ZmqRegistryProxy registry_; |
33 | std::string endpoint_; |
34 | std::string identity_; |
35 | std::string category_; |
36 | |
37 | === modified file 'scoperegistry/scoperegistry.cpp' |
38 | --- scoperegistry/scoperegistry.cpp 2014-04-04 09:40:40 +0000 |
39 | +++ scoperegistry/scoperegistry.cpp 2014-04-08 08:16:38 +0000 |
40 | @@ -422,27 +422,6 @@ |
41 | // the registry of state changes. |
42 | middleware->add_state_receiver_object("StateReceiver", registry->state_receiver()); |
43 | |
44 | - // FIXME, HACK HACK HACK HACK |
45 | - // The middleware should spawn scope processes with lookup() on demand. |
46 | - // Because it currently does not have the plumbing, we start every scope immediately. |
47 | - // When the plumbing appears, remove this. |
48 | - for (auto&& pair : local_scopes) |
49 | - { |
50 | - try |
51 | - { |
52 | - registry->locate(pair.first); |
53 | - } |
54 | - catch (NotFoundException const&) |
55 | - { |
56 | - // We ignore this. If the scope config couldn't be found, add_local_scopes() |
57 | - // has already printed an error message. |
58 | - } |
59 | - catch (std::exception const& e) |
60 | - { |
61 | - error("could not start scope " + pair.first + ": " + e.what()); |
62 | - } |
63 | - } |
64 | - |
65 | // Drop our shared_ptr to the RegistryObject. This means that the registry object |
66 | // is kept alive only via the shared_ptr held by the middleware. If the middleware |
67 | // shuts down, it clears out the active servant map, which destroys the registry |
68 | |
69 | === modified file 'src/scopes/internal/RuntimeImpl.cpp' |
70 | --- src/scopes/internal/RuntimeImpl.cpp 2014-04-03 14:22:02 +0000 |
71 | +++ src/scopes/internal/RuntimeImpl.cpp 2014-04-08 08:16:38 +0000 |
72 | @@ -20,6 +20,7 @@ |
73 | #include <unity/scopes/internal/ScopeBaseImpl.h> |
74 | |
75 | #include <unity/scopes/internal/DfltConfig.h> |
76 | +#include <unity/scopes/internal/MWStateReceiver.h> |
77 | #include <unity/scopes/internal/RegistryConfig.h> |
78 | #include <unity/scopes/internal/RegistryImpl.h> |
79 | #include <unity/scopes/internal/RuntimeConfig.h> |
80 | @@ -201,7 +202,13 @@ |
81 | |
82 | void RuntimeImpl::run_scope(ScopeBase *const scope_base, std::string const& scope_ini_file) |
83 | { |
84 | - auto mw = factory()->create(scope_id_, "Zmq", "Zmq.ini"); |
85 | + // Retrieve the registry middleware and create a proxy to its state receiver |
86 | + RegistryConfig reg_conf(registry_identity_, registry_configfile_); |
87 | + auto reg_runtime = create(registry_identity_, configfile_); |
88 | + auto reg_mw = reg_runtime->factory()->find(registry_identity_, reg_conf.mw_kind()); |
89 | + auto reg_state_receiver = reg_mw->create_state_receiver_proxy("StateReceiver"); |
90 | + |
91 | + auto mw = factory()->create(scope_id_, reg_conf.mw_kind(), reg_conf.mw_configfile()); |
92 | |
93 | { |
94 | // dirname modifies its argument, so we need a copy of scope lib path |
95 | @@ -223,7 +230,14 @@ |
96 | auto scope = unique_ptr<internal::ScopeObject>(new internal::ScopeObject(this, scope_base)); |
97 | auto proxy = mw->add_scope_object(scope_id_, move(scope)); |
98 | |
99 | + // Inform the registry that this scope is now ready to process requests |
100 | + reg_state_receiver->push_state(scope_id_, StateReceiverObject::State::ScopeReady); |
101 | + |
102 | mw->wait_for_shutdown(); |
103 | + |
104 | + // Inform the registry that this scope is shutting down |
105 | + reg_state_receiver->push_state(scope_id_, StateReceiverObject::State::ScopeStopping); |
106 | + |
107 | run_future.get(); |
108 | } |
109 | |
110 | |
111 | === modified file 'src/scopes/internal/zmq_middleware/ZmqMiddleware.cpp' |
112 | --- src/scopes/internal/zmq_middleware/ZmqMiddleware.cpp 2014-04-02 09:24:02 +0000 |
113 | +++ src/scopes/internal/zmq_middleware/ZmqMiddleware.cpp 2014-04-08 08:16:38 +0000 |
114 | @@ -63,6 +63,13 @@ |
115 | char const* reply_suffix = "-r"; // Appended to server_name_ to create reply adapter name |
116 | char const* state_suffix = "-s"; // Appended to server_name_ to create state adapter name |
117 | |
118 | +char const* query_category = "Query"; // query adapter category name |
119 | +char const* ctrl_category = "QueryCtrl"; // control adapter category name |
120 | +char const* reply_category = "Reply"; // reply adapter category name |
121 | +char const* state_category = "State"; // state adapter category name |
122 | +char const* scope_category = "Scope"; // scope adapter category name |
123 | +char const* registry_category = "Registry"; // registry adapter category name |
124 | + |
125 | } // namespace |
126 | |
127 | ZmqMiddleware::ZmqMiddleware(string const& server_name, string const& configfile, RuntimeImpl* runtime) |
128 | @@ -113,9 +120,11 @@ |
129 | { |
130 | // TODO: get directory from config |
131 | // TODO: get pool size from config |
132 | + // N.B. We absolutely MUST have AT LEAST 2 invoke threads |
133 | + // as rebinding is invoked within 2-way invocations. |
134 | { |
135 | lock_guard<mutex> lock(data_mutex_); |
136 | - invokers_.reset(new ThreadPool(1)); |
137 | + invokers_.reset(new ThreadPool(2)); |
138 | } |
139 | state_ = Started; |
140 | state_changed_.notify_all(); |
141 | @@ -273,7 +282,7 @@ |
142 | } |
143 | |
144 | // Now run over the map and check each value |
145 | - string category = "Scope"; |
146 | + string category = scope_category; |
147 | RequestMode mode = RequestMode::Twoway; |
148 | int64_t timeout = -1; |
149 | for (auto const& pair : fmap) |
150 | @@ -343,7 +352,7 @@ |
151 | MWRegistryProxy proxy; |
152 | try |
153 | { |
154 | - proxy.reset(new ZmqRegistry(this, endpoint, identity, "Registry", twoway_timeout_)); |
155 | + proxy.reset(new ZmqRegistry(this, endpoint, identity, registry_category, twoway_timeout_)); |
156 | } |
157 | catch (zmqpp::exception const& e) |
158 | { |
159 | @@ -358,7 +367,7 @@ |
160 | try |
161 | { |
162 | string endpoint = "ipc://" + config_.private_dir() + "/" + identity; |
163 | - proxy.reset(new ZmqScope(this, endpoint, identity, "Scope", twoway_timeout_)); |
164 | + proxy.reset(new ZmqScope(this, endpoint, identity, scope_category, twoway_timeout_)); |
165 | } |
166 | catch (zmqpp::exception const& e) |
167 | { |
168 | @@ -372,7 +381,7 @@ |
169 | MWScopeProxy proxy; |
170 | try |
171 | { |
172 | - proxy.reset(new ZmqScope(this, endpoint, identity, "Scope", twoway_timeout_)); |
173 | + proxy.reset(new ZmqScope(this, endpoint, identity, scope_category, twoway_timeout_)); |
174 | } |
175 | catch (zmqpp::exception const& e) |
176 | { |
177 | @@ -386,7 +395,7 @@ |
178 | MWQueryProxy proxy; |
179 | try |
180 | { |
181 | - proxy.reset(new ZmqQuery(this, endpoint, identity, "Query")); |
182 | + proxy.reset(new ZmqQuery(this, endpoint, identity, query_category)); |
183 | } |
184 | catch (zmqpp::exception const& e) |
185 | { |
186 | @@ -400,7 +409,7 @@ |
187 | MWQueryCtrlProxy proxy; |
188 | try |
189 | { |
190 | - proxy.reset(new ZmqQueryCtrl(this, endpoint, identity, "QueryCtrl")); |
191 | + proxy.reset(new ZmqQueryCtrl(this, endpoint, identity, ctrl_category)); |
192 | } |
193 | catch (zmqpp::exception const& e) |
194 | { |
195 | @@ -415,7 +424,7 @@ |
196 | try |
197 | { |
198 | proxy.reset(new ZmqStateReceiver(this, "ipc://" + config_.private_dir() + "/" + server_name_ + state_suffix, |
199 | - identity, "StateReceiver")); |
200 | + identity, state_category)); |
201 | } |
202 | catch (zmqpp::exception const& e) |
203 | { |
204 | @@ -432,11 +441,11 @@ |
205 | try |
206 | { |
207 | shared_ptr<QueryCtrlI> qci(make_shared<QueryCtrlI>(ctrl)); |
208 | - auto adapter = find_adapter(server_name_ + ctrl_suffix, config_.private_dir()); |
209 | + auto adapter = find_adapter(server_name_ + ctrl_suffix, config_.private_dir(), ctrl_category); |
210 | function<void()> df; |
211 | auto proxy = safe_add(df, adapter, "", qci); |
212 | ctrl->set_disconnect_function(df); |
213 | - return ZmqQueryCtrlProxy(new ZmqQueryCtrl(this, proxy->endpoint(), proxy->identity(), "QueryCtrl")); |
214 | + return ZmqQueryCtrlProxy(new ZmqQueryCtrl(this, proxy->endpoint(), proxy->identity(), ctrl_category)); |
215 | } |
216 | catch (std::exception const& e) // Should never happen unless our implementation is broken |
217 | { |
218 | @@ -454,8 +463,8 @@ |
219 | try |
220 | { |
221 | shared_ptr<QueryCtrlI> qci(make_shared<QueryCtrlI>(ctrl)); |
222 | - auto adapter = find_adapter(server_name_ + ctrl_suffix, config_.private_dir()); |
223 | - auto df = safe_dflt_add(adapter, "QueryCtrl", qci); |
224 | + auto adapter = find_adapter(server_name_ + ctrl_suffix, config_.private_dir(), ctrl_category); |
225 | + auto df = safe_dflt_add(adapter, ctrl_category, qci); |
226 | ctrl->set_disconnect_function(df); |
227 | } |
228 | catch (std::exception const& e) // Should never happen unless our implementation is broken |
229 | @@ -474,11 +483,11 @@ |
230 | try |
231 | { |
232 | shared_ptr<QueryI> qi(make_shared<QueryI>(query)); |
233 | - auto adapter = find_adapter(server_name_ + query_suffix, config_.private_dir()); |
234 | + auto adapter = find_adapter(server_name_ + query_suffix, config_.private_dir(), query_category); |
235 | function<void()> df; |
236 | auto proxy = safe_add(df, adapter, "", qi); |
237 | query->set_disconnect_function(df); |
238 | - return ZmqQueryProxy(new ZmqQuery(this, proxy->endpoint(), proxy->identity(), "Query")); |
239 | + return ZmqQueryProxy(new ZmqQuery(this, proxy->endpoint(), proxy->identity(), query_category)); |
240 | } |
241 | catch (std::exception const& e) // Should never happen unless our implementation is broken |
242 | { |
243 | @@ -496,8 +505,8 @@ |
244 | try |
245 | { |
246 | shared_ptr<QueryI> qi(make_shared<QueryI>(query)); |
247 | - auto adapter = find_adapter(server_name_ + query_suffix, config_.private_dir()); |
248 | - auto df = safe_dflt_add(adapter, "Query", qi); |
249 | + auto adapter = find_adapter(server_name_ + query_suffix, config_.private_dir(), query_category); |
250 | + auto df = safe_dflt_add(adapter, query_category, qi); |
251 | query->set_disconnect_function(df); |
252 | } |
253 | catch (std::exception const& e) // Should never happen unless our implementation is broken |
254 | @@ -517,11 +526,11 @@ |
255 | try |
256 | { |
257 | shared_ptr<RegistryI> ri(make_shared<RegistryI>(registry)); |
258 | - auto adapter = find_adapter(server_name_, runtime()->registry_endpointdir()); |
259 | + auto adapter = find_adapter(server_name_, runtime()->registry_endpointdir(), registry_category); |
260 | function<void()> df; |
261 | auto proxy = safe_add(df, adapter, identity, ri); |
262 | registry->set_disconnect_function(df); |
263 | - return ZmqRegistryProxy(new ZmqRegistry(this, proxy->endpoint(), proxy->identity(), "Registry", twoway_timeout_)); |
264 | + return ZmqRegistryProxy(new ZmqRegistry(this, proxy->endpoint(), proxy->identity(), registry_category, twoway_timeout_)); |
265 | } |
266 | catch (std::exception const& e) // Should never happen unless our implementation is broken |
267 | { |
268 | @@ -540,11 +549,11 @@ |
269 | try |
270 | { |
271 | shared_ptr<ReplyI> ri(make_shared<ReplyI>(reply)); |
272 | - auto adapter = find_adapter(server_name_ + reply_suffix, config_.public_dir()); |
273 | + auto adapter = find_adapter(server_name_ + reply_suffix, config_.public_dir(), reply_category); |
274 | function<void()> df; |
275 | auto proxy = safe_add(df, adapter, "", ri); |
276 | reply->set_disconnect_function(df); |
277 | - return ZmqReplyProxy(new ZmqReply(this, proxy->endpoint(), proxy->identity(), "Reply")); |
278 | + return ZmqReplyProxy(new ZmqReply(this, proxy->endpoint(), proxy->identity(), reply_category)); |
279 | } |
280 | catch (std::exception const& e) // Should never happen unless our implementation is broken |
281 | { |
282 | @@ -564,11 +573,11 @@ |
283 | try |
284 | { |
285 | shared_ptr<ScopeI> si(make_shared<ScopeI>(scope)); |
286 | - auto adapter = find_adapter(server_name_, config_.private_dir()); |
287 | + auto adapter = find_adapter(server_name_, config_.private_dir(), scope_category); |
288 | function<void()> df; |
289 | auto proxy = safe_add(df, adapter, identity, si); |
290 | scope->set_disconnect_function(df); |
291 | - return ZmqScopeProxy(new ZmqScope(this, proxy->endpoint(), proxy->identity(), "Scope", twoway_timeout_)); |
292 | + return ZmqScopeProxy(new ZmqScope(this, proxy->endpoint(), proxy->identity(), scope_category, twoway_timeout_)); |
293 | } |
294 | catch (std::exception const& e) // Should never happen unless our implementation is broken |
295 | { |
296 | @@ -586,8 +595,8 @@ |
297 | try |
298 | { |
299 | shared_ptr<ScopeI> si(make_shared<ScopeI>(scope)); |
300 | - auto adapter = find_adapter(server_name_, config_.private_dir()); |
301 | - auto df = safe_dflt_add(adapter, "Scope", si); |
302 | + auto adapter = find_adapter(server_name_, config_.private_dir(), scope_category); |
303 | + auto df = safe_dflt_add(adapter, scope_category, si); |
304 | scope->set_disconnect_function(df); |
305 | } |
306 | catch (std::exception const& e) // Should never happen unless our implementation is broken |
307 | @@ -607,11 +616,11 @@ |
308 | try |
309 | { |
310 | shared_ptr<StateReceiverI> sri(make_shared<StateReceiverI>(state_receiver)); |
311 | - auto adapter = find_adapter(server_name_ + state_suffix, config_.private_dir()); |
312 | + auto adapter = find_adapter(server_name_ + state_suffix, config_.private_dir(), state_category); |
313 | function<void()> df; |
314 | auto proxy = safe_add(df, adapter, identity, sri); |
315 | state_receiver->set_disconnect_function(df); |
316 | - return ZmqStateReceiverProxy(new ZmqStateReceiver(this, proxy->endpoint(), proxy->identity(), "StateReceiver")); |
317 | + return ZmqStateReceiverProxy(new ZmqStateReceiver(this, proxy->endpoint(), proxy->identity(), state_category)); |
318 | } |
319 | catch (std::exception const& e) // Should never happen unless our implementation is broken |
320 | { |
321 | @@ -675,12 +684,12 @@ |
322 | { |
323 | throw MiddlewareException("make_typed_proxy(): cannot create oneway proxies"); |
324 | } |
325 | - if (category == "Scope") |
326 | + if (category == scope_category) |
327 | { |
328 | auto p = make_shared<ZmqScope>(this, endpoint, identity, category, timeout); |
329 | return ScopeImpl::create(p, runtime(), identity); |
330 | } |
331 | - else if (category == "Registry") |
332 | + else if (category == registry_category) |
333 | { |
334 | auto p = make_shared<ZmqRegistry>(this, endpoint, identity, category, timeout); |
335 | return make_shared<RegistryImpl>(p, runtime()); |
336 | @@ -691,23 +700,8 @@ |
337 | } |
338 | } |
339 | |
340 | -namespace |
341 | -{ |
342 | - |
343 | -bool has_suffix(string const& s, string const& suffix) |
344 | -{ |
345 | - auto s_len = s.length(); |
346 | - auto suffix_len = suffix.length(); |
347 | - if (s_len >= suffix_len) |
348 | - { |
349 | - return s.compare(s_len - suffix_len, suffix_len, suffix) == 0; |
350 | - } |
351 | - return false; |
352 | -} |
353 | - |
354 | -} // namespace |
355 | - |
356 | -shared_ptr<ObjectAdapter> ZmqMiddleware::find_adapter(string const& name, string const& endpoint_dir) |
357 | +shared_ptr<ObjectAdapter> ZmqMiddleware::find_adapter(string const& name, string const& endpoint_dir, |
358 | + string const& category) |
359 | { |
360 | lock_guard<mutex> lock(data_mutex_); |
361 | |
362 | @@ -720,33 +714,49 @@ |
363 | // We don't have the requested adapter yet, so we create it on the fly. |
364 | int pool_size; |
365 | RequestMode mode; |
366 | - if (has_suffix(name, query_suffix)) |
367 | + if (category == query_category) |
368 | { |
369 | // The query adapter is single or multi-threaded and supports oneway operations only. |
370 | // TODO: get pool size from config |
371 | pool_size = 1; |
372 | mode = RequestMode::Oneway; |
373 | } |
374 | - else if (has_suffix(name, ctrl_suffix)) |
375 | + else if (category == ctrl_category) |
376 | { |
377 | // The ctrl adapter is single-threaded and supports oneway operations only. |
378 | pool_size = 1; |
379 | mode = RequestMode::Oneway; |
380 | } |
381 | - else if (has_suffix(name, reply_suffix)) |
382 | + else if (category == reply_category) |
383 | { |
384 | // The reply adapter is single- or multi-threaded and supports oneway operations only. |
385 | // TODO: get pool size from config |
386 | pool_size = 1; |
387 | mode = RequestMode::Oneway; |
388 | } |
389 | - else if (has_suffix(name, state_suffix)) |
390 | + else if (category == state_category) |
391 | { |
392 | // The state adapter is single- or multi-threaded and supports oneway operations only. |
393 | // TODO: get pool size from config |
394 | pool_size = 1; |
395 | mode = RequestMode::Oneway; |
396 | } |
397 | + else if (category == scope_category) |
398 | + { |
399 | + // The scope adapter is single- or multi-threaded and supports twoway operations only. |
400 | + // TODO: get pool size from config |
401 | + pool_size = 1; |
402 | + mode = RequestMode::Twoway; |
403 | + } |
404 | + else if (category == registry_category) |
405 | + { |
406 | + // The registry adapter is single- or multi-threaded and supports twoway operations only. |
407 | + // TODO: get pool size from config |
408 | + // NB: On rebind, locate() is called on this adapter. A scope may then call registry methods during |
409 | + // its start() method, hence we must ensure this adapter has enough threads available to handle this. |
410 | + pool_size = 6; |
411 | + mode = RequestMode::Twoway; |
412 | + } |
413 | else |
414 | { |
415 | // The normal adapter is single- or multi-threaded and supports twoway operations only. |
416 | @@ -757,7 +767,7 @@ |
417 | |
418 | // The query adapter is always inproc. |
419 | string endpoint; |
420 | - if (has_suffix(name, query_suffix)) |
421 | + if (category == query_category) |
422 | { |
423 | endpoint = "inproc://" + name; |
424 | } |
425 | |
426 | === modified file 'src/scopes/internal/zmq_middleware/ZmqObject.cpp' |
427 | --- src/scopes/internal/zmq_middleware/ZmqObject.cpp 2014-02-03 07:28:36 +0000 |
428 | +++ src/scopes/internal/zmq_middleware/ZmqObject.cpp 2014-04-08 08:16:38 +0000 |
429 | @@ -18,9 +18,11 @@ |
430 | |
431 | #include <unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h> |
432 | |
433 | +#include <unity/scopes/internal/RuntimeImpl.h> |
434 | #include <unity/scopes/internal/zmq_middleware/ConnectionPool.h> |
435 | #include <unity/scopes/internal/zmq_middleware/Util.h> |
436 | #include <unity/scopes/internal/zmq_middleware/ZmqException.h> |
437 | +#include <unity/scopes/internal/zmq_middleware/ZmqRegistry.h> |
438 | #include <unity/scopes/internal/zmq_middleware/ZmqSender.h> |
439 | #include <unity/scopes/ScopeExceptions.h> |
440 | |
441 | @@ -61,6 +63,14 @@ |
442 | assert(timeout >= -1); |
443 | throw_if_bad_endpoint(endpoint); |
444 | |
445 | + // retrieve the registry proxy if this object is not the registry itself |
446 | + auto runtime = mw_base->runtime(); |
447 | + if (identity != runtime->registry_identity()) |
448 | + { |
449 | + registry_ = dynamic_pointer_cast<ZmqRegistry>(mw_base->create_registry_proxy( |
450 | + runtime->registry_identity(), runtime->registry_endpoint())); |
451 | + } |
452 | + |
453 | // Make sure that fields have consistent settings for null proxies. |
454 | if (endpoint.empty() || identity.empty()) |
455 | { |
456 | @@ -165,11 +175,32 @@ |
457 | return invoke_(out_params, timeout_); |
458 | } |
459 | |
460 | +ZmqReceiver ZmqObjectProxy::invoke_(capnp::MessageBuilder& out_params, int64_t timeout) |
461 | +{ |
462 | + try |
463 | + { |
464 | + return invoke__(out_params, timeout); |
465 | + } |
466 | + catch (TimeoutException const&) |
467 | + { |
468 | + if (!registry_) |
469 | + { |
470 | + throw; |
471 | + } |
472 | + ObjectProxy new_proxy = registry_->locate(identity_); |
473 | + endpoint_ = new_proxy->endpoint(); |
474 | + identity_ = new_proxy->identity(); |
475 | + category_ = new_proxy->category(); |
476 | + timeout_ = new_proxy->timeout(); |
477 | + return invoke__(out_params, timeout); |
478 | + } |
479 | +} |
480 | + |
481 | // Get a socket to the endpoint for this proxy and write the request on the wire. |
482 | // For a twoway request, poll for the reply with the timeout set for this proxy. |
483 | // Return a receiver for the response (whether this is a oneway or twoway request). |
484 | |
485 | -ZmqReceiver ZmqObjectProxy::invoke_(capnp::MessageBuilder& out_params, int64_t timeout) |
486 | +ZmqReceiver ZmqObjectProxy::invoke__(capnp::MessageBuilder& out_params, int64_t timeout) |
487 | { |
488 | // Each calling thread gets its own pool because zmq sockets are not thread-safe. |
489 | thread_local static ConnectionPool pool(*mw_base()->context()); |
490 | |
491 | === modified file 'test/gtest/scopes/Runtime/Runtime_test.cpp' |
492 | --- test/gtest/scopes/Runtime/Runtime_test.cpp 2014-04-03 14:22:02 +0000 |
493 | +++ test/gtest/scopes/Runtime/Runtime_test.cpp 2014-04-08 08:16:38 +0000 |
494 | @@ -240,7 +240,7 @@ |
495 | receiver->wait_until_finished(); |
496 | |
497 | auto result = receiver->last_result(); |
498 | - EXPECT_TRUE(result.get() != nullptr); |
499 | + ASSERT_TRUE(result.get() != nullptr); |
500 | |
501 | auto target = result->target_scope_proxy(); |
502 | EXPECT_TRUE(target != nullptr); |
503 | |
504 | === modified file 'test/gtest/scopes/internal/ScopeMetadataImpl/ScopeMetadataImpl_test.cpp' |
505 | --- test/gtest/scopes/internal/ScopeMetadataImpl/ScopeMetadataImpl_test.cpp 2014-04-04 09:40:40 +0000 |
506 | +++ test/gtest/scopes/internal/ScopeMetadataImpl/ScopeMetadataImpl_test.cpp 2014-04-08 08:16:38 +0000 |
507 | @@ -18,6 +18,7 @@ |
508 | |
509 | #include <unity/scopes/internal/ScopeMetadataImpl.h> |
510 | |
511 | +#include <unity/scopes/internal/RuntimeImpl.h> |
512 | #include <unity/scopes/internal/ScopeImpl.h> |
513 | #include <unity/scopes/internal/zmq_middleware/ZmqMiddleware.h> |
514 | #include <unity/scopes/ScopeExceptions.h> |
515 | @@ -35,8 +36,9 @@ |
516 | |
517 | TEST(ScopeMetadataImpl, basic) |
518 | { |
519 | + auto rt = RuntimeImpl::create("testscope"); |
520 | ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/ScopeMetadataImpl/Zmq.ini", |
521 | - (RuntimeImpl*)0x1); |
522 | + rt.get()); |
523 | |
524 | unique_ptr<ScopeMetadataImpl> mi(new ScopeMetadataImpl(&mw)); |
525 | mi->set_scope_id("scope_id"); |
526 | @@ -229,8 +231,9 @@ |
527 | |
528 | TEST(ScopeMetadataImpl, serialize) |
529 | { |
530 | + auto rt = RuntimeImpl::create("testscope"); |
531 | ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/ScopeMetadataImpl/Zmq.ini", |
532 | - (RuntimeImpl*)0x1); |
533 | + rt.get()); |
534 | |
535 | unique_ptr<ScopeMetadataImpl> mi(new ScopeMetadataImpl(&mw)); |
536 | mi->set_scope_id("scope_id"); |
537 | @@ -281,8 +284,9 @@ |
538 | |
539 | TEST(ScopeMetadataImpl, serialize_exceptions) |
540 | { |
541 | + auto rt = RuntimeImpl::create("testscope"); |
542 | ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/ScopeMetadataImpl/Zmq.ini", |
543 | - (RuntimeImpl*)0x1); |
544 | + rt.get()); |
545 | |
546 | ScopeMetadataImpl mi(&mw); |
547 | try |
548 | @@ -348,8 +352,9 @@ |
549 | |
550 | TEST(ScopeMetadataImpl, deserialize_exceptions) |
551 | { |
552 | + auto rt = RuntimeImpl::create("testscope"); |
553 | ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/ScopeMetadataImpl/Zmq.ini", |
554 | - (RuntimeImpl*)0x1); |
555 | + rt.get()); |
556 | |
557 | VariantMap m; |
558 | try |
559 | |
560 | === modified file 'test/gtest/scopes/internal/zmq_middleware/ObjectAdapter/ObjectAdapter_test.cpp' |
561 | --- test/gtest/scopes/internal/zmq_middleware/ObjectAdapter/ObjectAdapter_test.cpp 2014-01-24 11:16:14 +0000 |
562 | +++ test/gtest/scopes/internal/zmq_middleware/ObjectAdapter/ObjectAdapter_test.cpp 2014-04-08 08:16:38 +0000 |
563 | @@ -19,6 +19,7 @@ |
564 | #include <unity/scopes/internal/zmq_middleware/ObjectAdapter.h> |
565 | |
566 | #include <scopes/internal/zmq_middleware/capnproto/Message.capnp.h> |
567 | +#include <unity/scopes/internal/RuntimeImpl.h> |
568 | #include <unity/scopes/internal/zmq_middleware/ServantBase.h> |
569 | #include <unity/scopes/internal/zmq_middleware/ZmqException.h> |
570 | #include <unity/scopes/internal/zmq_middleware/ZmqMiddleware.h> |
571 | @@ -281,8 +282,9 @@ |
572 | |
573 | TEST(ObjectAdapter, add_remove_find) |
574 | { |
575 | + auto rt = RuntimeImpl::create("testscope"); |
576 | ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/zmq_middleware/ObjectAdapter/Zmq.ini", |
577 | - (RuntimeImpl*)0x1); |
578 | + rt.get()); |
579 | |
580 | wait(); |
581 | ObjectAdapter a(mw, "testscope", "ipc://testscope", RequestMode::Twoway, 5); |
582 | @@ -568,8 +570,9 @@ |
583 | |
584 | TEST(ObjectAdapter, invoke_ok) |
585 | { |
586 | + auto rt = RuntimeImpl::create("testscope"); |
587 | ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/zmq_middleware/ObjectAdapter/Zmq.ini", |
588 | - (RuntimeImpl*)0x1); |
589 | + rt.get()); |
590 | |
591 | wait(); |
592 | ObjectAdapter a(mw, "testscope", "ipc://testscope", RequestMode::Twoway, 1); |
593 | @@ -620,8 +623,9 @@ |
594 | |
595 | TEST(ObjectAdapter, invoke_object_not_exist) |
596 | { |
597 | + auto rt = RuntimeImpl::create("testscope"); |
598 | ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/zmq_middleware/ObjectAdapter/Zmq.ini", |
599 | - (RuntimeImpl*)0x1); |
600 | + rt.get()); |
601 | |
602 | wait(); |
603 | ObjectAdapter a(mw, "testscope", "ipc://testscope", RequestMode::Twoway, 1); |
604 | @@ -665,8 +669,9 @@ |
605 | |
606 | TEST(ObjectAdapter, invoke_operation_not_exist) |
607 | { |
608 | + auto rt = RuntimeImpl::create("testscope"); |
609 | ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/zmq_middleware/ObjectAdapter/Zmq.ini", |
610 | - (RuntimeImpl*)0x1); |
611 | + rt.get()); |
612 | |
613 | wait(); |
614 | ObjectAdapter a(mw, "testscope", "ipc://testscope", RequestMode::Twoway, 1); |
615 | @@ -785,8 +790,9 @@ |
616 | |
617 | TEST(ObjectAdapter, twoway_threading) |
618 | { |
619 | + auto rt = RuntimeImpl::create("testscope"); |
620 | ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/zmq_middleware/ObjectAdapter/Zmq.ini", |
621 | - (RuntimeImpl*)0x1); |
622 | + rt.get()); |
623 | |
624 | // Single servant to which we send requests concurrently. |
625 | shared_ptr<CountingServant> o(new CountingServant(100)); |
626 | @@ -819,8 +825,9 @@ |
627 | |
628 | TEST(ObjectAdapter, oneway_threading) |
629 | { |
630 | + auto rt = RuntimeImpl::create("testscope"); |
631 | ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/zmq_middleware/ObjectAdapter/Zmq.ini", |
632 | - (RuntimeImpl*)0x1); |
633 | + rt.get()); |
634 | |
635 | // Single servant to which we send requests concurrently. |
636 | shared_ptr<CountingServant> o(new CountingServant(100)); |
637 | @@ -890,8 +897,9 @@ |
638 | |
639 | TEST(ObjectAdapter, servant_map_destructor) |
640 | { |
641 | + auto rt = RuntimeImpl::create("testscope"); |
642 | ZmqMiddleware mw("testscope", TEST_BUILD_ROOT "/gtest/scopes/internal/zmq_middleware/ObjectAdapter/Zmq.ini", |
643 | - (RuntimeImpl*)0x1); |
644 | + rt.get()); |
645 | |
646 | { |
647 | wait(); |
648 | |
649 | === modified file 'test/gtest/scopes/internal/zmq_middleware/ZmqMiddleware/ZmqMiddleware_test.cpp' |
650 | --- test/gtest/scopes/internal/zmq_middleware/ZmqMiddleware/ZmqMiddleware_test.cpp 2014-03-07 01:07:28 +0000 |
651 | +++ test/gtest/scopes/internal/zmq_middleware/ZmqMiddleware/ZmqMiddleware_test.cpp 2014-04-08 08:16:38 +0000 |
652 | @@ -18,6 +18,7 @@ |
653 | |
654 | #include <unity/scopes/internal/zmq_middleware/ZmqMiddleware.h> |
655 | |
656 | +#include <unity/scopes/internal/RuntimeImpl.h> |
657 | #include <unity/scopes/internal/MWObjectProxy.h> |
658 | #include <unity/scopes/internal/zmq_middleware/ZmqObjectProxy.h> |
659 | #include <unity/scopes/ScopeExceptions.h> |
660 | @@ -44,9 +45,10 @@ |
661 | |
662 | TEST(ZmqMiddleware, string_to_proxy) |
663 | { |
664 | + auto rt = RuntimeImpl::create("testscope"); |
665 | ZmqMiddleware mw("testscope", |
666 | TEST_BUILD_ROOT "/gtest/scopes/internal/zmq_middleware/ZmqMiddleware/Zmq.ini", |
667 | - (RuntimeImpl*)0x1); |
668 | + rt.get()); |
669 | |
670 | ObjectProxy p; |
671 | ScopeProxy sp; |