Merge lp:~jimbaker/pyjuju/expose-provision-machines-reexpose into lp:pyjuju

Proposed by Jim Baker
Status: Merged
Approved by: Kapil Thangavelu
Approved revision: 300
Merged at revision: 308
Proposed branch: lp:~jimbaker/pyjuju/expose-provision-machines-reexpose
Merge into: lp:pyjuju
Prerequisite: lp:~jimbaker/pyjuju/expose-provision-machines
Diff against target: 287 lines (+168/-18)
4 files modified
ensemble/agents/provision.py (+15/-8)
ensemble/agents/tests/test_provision.py (+27/-1)
ensemble/state/base.py (+15/-0)
ensemble/state/tests/test_base.py (+111/-9)
To merge this branch: bzr merge lp:~jimbaker/pyjuju/expose-provision-machines-reexpose
Reviewer Review Type Date Requested Status
Kapil Thangavelu (community) Approve
Gustavo Niemeyer Approve
Review via email: mp+68313@code.launchpad.net

Description of the change

Fixing this was essentially a matter of making
cb_watch_service_exposed_flag be symmetric on opening/closing ports on
the corresponding service units; that is, the following code should be
run regardless of whether the service is exposed:

            for unit_state in unit_states:
                yield self.open_close_ports(unit_state)

Tests in test_provision have been changed so that they actually test
re-exposing a service.

In addition, the support for re-exposing brought up the issue that
StateBase._watch_topology has the same issue previously identified in
other watches: that it needs to guard on the connection being
connected upon entry to the watch function. These changes were made in
both _watch_topology and __watch_topology. Tests were also add to
test_base to verify that when the client is disconnected, the watch
properly shuts down. (The necessity for this change to StateBase is
otherwise only seen in repeated looping of tests that in the
provisioning agent.)

Lastly, I removed the unnecessary sleeps in test_base in favor of our
current practice of using poke_zk (but only for these types of tests).

To post a comment you must log in.
Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Looks good, thanks Jim.

review: Approve
Revision history for this message
Kapil Thangavelu (hazmat) wrote :

looks good, thanks for digging into the additional robustness checks.

review: Approve
Revision history for this message
Jim Baker (jimbaker) wrote :

Thanks. I will be merging this into trunk, along with its upstream,
expose-provision-machines, once expose-provider-ec2 is in place, since it
requires a provider implementation to be present.

On Mon, Jul 25, 2011 at 11:01 AM, Kapil Thangavelu <
<email address hidden>> wrote:

> The proposal to merge
> lp:~jimbaker/ensemble/expose-provision-machines-reexpose into lp:ensemble
> has been updated.
>
> Status: Needs review => Approved
>
> For more details, see:
>
> https://code.launchpad.net/~jimbaker/ensemble/expose-provision-machines-reexpose/+merge/68313<https://code.launchpad.net/%7Ejimbaker/ensemble/expose-provision-machines-reexpose/+merge/68313>
> --
>
> https://code.launchpad.net/~jimbaker/ensemble/expose-provision-machines-reexpose/+merge/68313<https://code.launchpad.net/%7Ejimbaker/ensemble/expose-provision-machines-reexpose/+merge/68313>
> You are the owner of
> lp:~jimbaker/ensemble/expose-provision-machines-reexpose.
>

301. By Jim Baker

Merged upstream branch expose-provision-machines

302. By Jim Baker

Merged upstream expose-provision-machines

303. By Jim Baker

Merged upstream expose-provision-machines

304. By Jim Baker

Merged upstream

305. By Jim Baker

Merged upstream

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'ensemble/agents/provision.py'
--- ensemble/agents/provision.py 2011-08-09 14:27:56 +0000
+++ ensemble/agents/provision.py 2011-08-09 14:27:56 +0000
@@ -301,16 +301,23 @@
301 def cb_watch_service_exposed_flag(exposed):301 def cb_watch_service_exposed_flag(exposed):
302 if not self._running:302 if not self._running:
303 raise StopWatcher()303 raise StopWatcher()
304
305 if exposed:
306 log.debug("Service %r is exposed", service_name)
307 else:
308 log.debug("Service %r is unexposed", service_name)
309
310 try:
311 unit_states = yield service_state.get_all_unit_states()
312 except StateChanged:
313 log.debug("Stopping watch on %r, no longer in topology",
314 service_name)
315 raise StopWatcher()
316 for unit_state in unit_states:
317 yield self.open_close_ports(unit_state)
318
304 if not exposed:319 if not exposed:
305 self._watched_services[service_name] = NotExposed320 self._watched_services[service_name] = NotExposed
306 try:
307 unit_states = yield service_state.get_all_unit_states()
308 except StateChanged:
309 log.debug("Stopping watch on %r, no longer in topology",
310 service_name)
311 raise StopWatcher()
312 for unit_state in unit_states:
313 yield self.open_close_ports(unit_state)
314 else:321 else:
315 self._watched_services[service_name] = set()322 self._watched_services[service_name] = set()
316 yield self._setup_service_unit_watch(service_state)323 yield self._setup_service_unit_watch(service_state)
317324
=== modified file 'ensemble/agents/tests/test_provision.py'
--- ensemble/agents/tests/test_provision.py 2011-08-09 14:27:56 +0000
+++ ensemble/agents/tests/test_provision.py 2011-08-09 14:27:56 +0000
@@ -507,6 +507,14 @@
507 yield wordpress.clear_exposed_flag()507 yield wordpress.clear_exposed_flag()
508 self.assertTrue((yield expected_units))508 self.assertTrue((yield expected_units))
509509
510 # Re-expose wordpress: set the flag again, verify that it
511 # triggers on the expected units
512 expected_units = self.wait_on_expected_units(
513 set(["wordpress/0"]))
514 yield wordpress.set_exposed_flag()
515 self.assertTrue((yield expected_units))
516 yield self.agent.stop()
517
510 @inlineCallbacks518 @inlineCallbacks
511 def test_add_remove_service_units_for_exposed_service(self):519 def test_add_remove_service_units_for_exposed_service(self):
512 """Verify that adding/removing service units for an exposed520 """Verify that adding/removing service units for an exposed
@@ -799,8 +807,8 @@
799 yield drupal_0.open_port(443, "tcp")807 yield drupal_0.open_port(443, "tcp")
800 yield wordpress_0.open_port(80, "tcp")808 yield wordpress_0.open_port(80, "tcp")
801 yield wordpress_1.open_port(80, "tcp")809 yield wordpress_1.open_port(80, "tcp")
810 self.assertTrue((yield expected_units))
802 self.assertTrue((yield expected_machines))811 self.assertTrue((yield expected_machines))
803 self.assertTrue((yield expected_units))
804 self.assertEqual((yield self.get_provider_ports(machine_0)),812 self.assertEqual((yield self.get_provider_ports(machine_0)),
805 set([(80, "tcp"), (443, "tcp"), (8080, "tcp")]))813 set([(80, "tcp"), (443, "tcp"), (8080, "tcp")]))
806 self.assertEqual((yield self.get_provider_ports(machine_1)),814 self.assertEqual((yield self.get_provider_ports(machine_1)),
@@ -869,4 +877,22 @@
869 self.assertTrue((yield expected_units))877 self.assertTrue((yield expected_units))
870 self.assertEqual((yield self.get_provider_ports(machine_0)),878 self.assertEqual((yield self.get_provider_ports(machine_0)),
871 set([(80, "tcp"), (443, "tcp"), (8080, "tcp")]))879 set([(80, "tcp"), (443, "tcp"), (8080, "tcp")]))
880
881 # Unexpose drupal service, verify only wordpress ports are now opened
882 expected_machines = self.wait_on_expected_machines(set([0]))
883 expected_units = self.wait_on_expected_units(set(["drupal/0"]))
884 yield drupal.clear_exposed_flag()
885 self.assertTrue((yield expected_machines))
886 self.assertTrue((yield expected_units))
887 self.assertEqual((yield self.get_provider_ports(machine_0)),
888 set([(80, "tcp")]))
889
890 # Re-expose drupal service, verify ports are once again opened
891 expected_machines = self.wait_on_expected_machines(set([0]))
892 expected_units = self.wait_on_expected_units(set(["drupal/0"]))
893 yield drupal.set_exposed_flag()
894 self.assertTrue((yield expected_machines))
895 self.assertTrue((yield expected_units))
896 self.assertEqual((yield self.get_provider_ports(machine_0)),
897 set([(80, "tcp"), (443, "tcp"), (8080, "tcp")]))
872 yield self.agent.stop()898 yield self.agent.stop()
873899
=== modified file 'ensemble/state/base.py'
--- ensemble/state/base.py 2011-05-06 04:40:33 +0000
+++ ensemble/state/base.py 2011-08-09 14:27:56 +0000
@@ -94,6 +94,13 @@
94 "private", since the only purpose of this method is to be used by94 "private", since the only purpose of this method is to be used by
95 subclasses.95 subclasses.
96 """96 """
97 # Need to guard on the client being connected in the case
98 # 1) a watch is waiting to run (in the reactor);
99 # 2) and the connection is closed.
100 # Because _watch_topology always chains to __watch_topology,
101 # the other guarding seen with `StopWatcher` is done there.
102 if not self._client.connected:
103 return
97 exists, watch = self._client.exists_and_watch("/topology")104 exists, watch = self._client.exists_and_watch("/topology")
98 stat = yield exists105 stat = yield exists
99106
@@ -106,6 +113,14 @@
106 @inlineCallbacks113 @inlineCallbacks
107 def __topology_changed(self, ignored, watch_topology_function):114 def __topology_changed(self, ignored, watch_topology_function):
108 """Internal callback used by _watch_topology()."""115 """Internal callback used by _watch_topology()."""
116 # Need to guard on the client being connected in the case
117 # 1) a watch is waiting to run (in the reactor);
118 # 2) and the connection is closed.
119 #
120 # It remains the reponsibility of `watch_topology_function` to
121 # raise `StopWatcher`, per the doc of `_topology_changed`.
122 if not self._client.connected:
123 return
109 try:124 try:
110 get, watch = self._client.get_and_watch("/topology")125 get, watch = self._client.get_and_watch("/topology")
111 content, stat = yield get126 content, stat = yield get
112127
=== modified file 'ensemble/state/tests/test_base.py'
--- ensemble/state/tests/test_base.py 2011-02-10 20:13:55 +0000
+++ ensemble/state/tests/test_base.py 2011-08-09 14:27:56 +0000
@@ -1,11 +1,12 @@
1from twisted.internet.defer import inlineCallbacks, Deferred1from twisted.internet.defer import inlineCallbacks, Deferred
2from twisted.internet import reactor2from txzookeeper import ZookeeperClient
33
4from ensemble.state.tests.common import StateTestBase4from ensemble.state.tests.common import StateTestBase
55
6from ensemble.state.base import StateBase6from ensemble.state.base import StateBase
7from ensemble.state.errors import StopWatcher7from ensemble.state.errors import StopWatcher
8from ensemble.state.topology import InternalTopology8from ensemble.state.topology import InternalTopology
9from ensemble.tests.common import get_test_zookeeper_address
910
1011
11class StateBaseTest(StateTestBase):12class StateBaseTest(StateTestBase):
@@ -176,9 +177,7 @@
176 yield wait_callback[1]177 yield wait_callback[1]
177178
178 # Give a chance for something bad to happen.179 # Give a chance for something bad to happen.
179 go_reactor = Deferred()180 yield self.poke_zk()
180 reactor.callLater(0.5, go_reactor.callback, None)
181 yield go_reactor
182181
183 # Now the watch callback must have been fired with two182 # Now the watch callback must have been fired with two
184 # different topologies. The old one, and the new one.183 # different topologies. The old one, and the new one.
@@ -273,7 +272,7 @@
273 yield self.set_topology(topology)272 yield self.set_topology(topology)
274273
275 # Give a chance for something bad to happen.274 # Give a chance for something bad to happen.
276 yield self.sleep(0.1)275 yield self.poke_zk()
277276
278 # Ensure we still have a single call.277 # Ensure we still have a single call.
279 self.assertEquals(len(calls), 1)278 self.assertEquals(len(calls), 1)
@@ -295,8 +294,8 @@
295 @inlineCallbacks294 @inlineCallbacks
296 def test_stop_watch(self):295 def test_stop_watch(self):
297 """296 """
298 A watch which fires an StopWatcher excepiton, will297 A watch that fires a `StopWatcher` exception will end the
299 end the watch."""298 watch."""
300 wait_callback = [Deferred() for i in range(5)]299 wait_callback = [Deferred() for i in range(5)]
301 calls = []300 calls = []
302301
@@ -324,12 +323,115 @@
324 yield wait_callback[1]323 yield wait_callback[1]
325 self.assertEqual(len(calls), 2)324 self.assertEqual(len(calls), 2)
326325
327 # Change the topology again, we shouldnt' see this.326 # Change the topology again, we shouldn't see this.
328 topology.add_machine("m-2")327 topology.add_machine("m-2")
329 yield self.set_topology(topology)328 yield self.set_topology(topology)
330329
331 # Give a chance for something bad to happen.330 # Give a chance for something bad to happen.
332 yield self.sleep(0.1)331 yield self.poke_zk()
333332
334 # Ensure we still have a single call.333 # Ensure we still have a single call.
335 self.assertEquals(len(calls), 2)334 self.assertEquals(len(calls), 2)
335
336 @inlineCallbacks
337 def test_watch_stops_on_closed_connection(self):
338 """Verify watches stops when the connection is closed."""
339
340 # Use a separate client connection for watching so it can be
341 # disconnected.
342 watch_client = ZookeeperClient(get_test_zookeeper_address())
343 yield watch_client.connect()
344 watch_base = StateBase(watch_client)
345
346 wait_callback = Deferred()
347 finish_callback = Deferred()
348 calls = []
349
350 def watcher(old_topology, new_topology):
351 calls.append((old_topology, new_topology))
352 wait_callback.callback(True)
353 return finish_callback
354
355 # Start watching.
356 yield watch_base._watch_topology(watcher)
357
358 # Create the topology.
359 topology = InternalTopology()
360 topology.add_machine("m-0")
361 yield self.set_topology(topology)
362
363 # Hold off until callback is started.
364 yield wait_callback
365
366 # Change the topology.
367 topology.add_machine("m-1")
368 yield self.set_topology(topology)
369
370 # Ensure that the watch has been called just once so far
371 # (although still pending due to the finish_callback).
372 self.assertEquals(len(calls), 1)
373
374 # Now disconnect the client.
375 watch_client.close()
376 self.assertFalse(watch_client.connected)
377 self.assertTrue(self.client.connected)
378
379 # Change the topology again.
380 topology.add_machine("m-2")
381 yield self.set_topology(topology)
382
383 # Allow the first call to be completed, starting a process of
384 # watching for the next change. At this point, the watch will
385 # encounter that the client is disconnected.
386 finish_callback.callback(True)
387
388 # Give a chance for something bad to happen.
389 yield self.poke_zk()
390
391 # Ensure the watch was still not called.
392 self.assertEquals(len(calls), 1)
393
394 @inlineCallbacks
395 def test_watch_stops_on_early_closed_connection(self):
396 """Verify watches stops when the connection is closed early.
397
398 _watch_topology chains from an exists_and_watch to a
399 get_and_watch. This test ensures that this chaining will fail
400 gracefully if the connection is closed before this chaining
401 can occur.
402 """
403 # Use a separate client connection for watching so it can be
404 # disconnected.
405 watch_client = ZookeeperClient(get_test_zookeeper_address())
406 yield watch_client.connect()
407 watch_base = StateBase(watch_client)
408
409 calls = []
410
411 @inlineCallbacks
412 def watcher(old_topology, new_topology):
413 calls.append((old_topology, new_topology))
414
415 # Create the topology.
416 topology = InternalTopology()
417 topology.add_machine("m-0")
418 yield self.set_topology(topology)
419
420 # Now disconnect the client.
421 watch_client.close()
422 self.assertFalse(watch_client.connected)
423 self.assertTrue(self.client.connected)
424
425 # Start watching.
426 yield watch_base._watch_topology(watcher)
427
428 # Change the topology, this will trigger the watch.
429 topology.add_machine("m-1")
430 yield self.set_topology(topology)
431
432 # Give a chance for something bad to happen.
433 yield self.poke_zk()
434
435 # Ensure the watcher was never called, because its client was
436 # disconnected.
437 self.assertEquals(len(calls), 0)

Subscribers

People subscribed via source and target branches

to status/vote changes: