Merge lp:~allenap/maas/there-can-be-only-one--bug-1462320 into lp:~maas-committers/maas/trunk

Proposed by Gavin Panella
Status: Merged
Approved by: Gavin Panella
Approved revision: no longer in the source branch.
Merged at revision: 3992
Proposed branch: lp:~allenap/maas/there-can-be-only-one--bug-1462320
Merge into: lp:~maas-committers/maas/trunk
Diff against target: 197 lines (+114/-13)
2 files modified
src/maasserver/rpc/regionservice.py (+23/-4)
src/maasserver/rpc/tests/test_regionservice.py (+91/-9)
To merge this branch: bzr merge lp:~allenap/maas/there-can-be-only-one--bug-1462320
Reviewer Review Type Date Requested Status
Raphaël Badin (community) Approve
Review via email: mp+261223@code.launchpad.net

Commit message

Remove overlapping event-loop records where it's obvious they're outdated.

In addition, don't allow RegionAdvertisingService to crash when there's an error, log instead.

To post a comment you must log in.
Revision history for this message
Raphaël Badin (rvb) wrote :

Looks good. Tested in my lab and it works as expected.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/maasserver/rpc/regionservice.py'
2--- src/maasserver/rpc/regionservice.py 2015-06-02 15:50:27 +0000
3+++ src/maasserver/rpc/regionservice.py 2015-06-05 13:23:18 +0000
4@@ -698,7 +698,12 @@
5
6 def __init__(self, interval=60):
7 super(RegionAdvertisingService, self).__init__(
8- interval, deferToThread, self.update)
9+ interval, self.try_update)
10+
11+ def try_update(self):
12+ return deferToThread(self.update).addErrback(
13+ log.err, "Failed to update this event-loop's advertisement; "
14+ "%s's record may be out of date" % (eventloop.loop.name,))
15
16 @asynchronous
17 def startService(self):
18@@ -784,9 +789,11 @@
19 event-loop running in the same process, and deletes - garbage
20 collects - old records related to any event-loop.
21 """
22+ addresses = frozenset(self._get_addresses())
23 with closing(connection.cursor()) as cursor:
24 self._do_delete(cursor)
25- self._do_insert(cursor)
26+ self._do_delete_overlap(cursor, addresses)
27+ self._do_insert(cursor, addresses)
28 self._do_collect(cursor)
29
30 @synchronous
31@@ -881,13 +888,25 @@
32 def _do_delete(self, cursor):
33 cursor.execute(self._delete_statement, [eventloop.loop.name])
34
35+ _delete_overlap_statement = "DELETE FROM eventloops WHERE "
36+ _delete_overlap_values_statement = "(address = %s AND port = %s)"
37+
38+ def _do_delete_overlap(self, cursor, addresses):
39+ statement, values = [], []
40+ for addr, port in addresses:
41+ statement.append(self._delete_overlap_values_statement)
42+ values.extend([addr, port])
43+ if len(statement) != 0:
44+ statement = self._delete_overlap_statement + " OR ".join(statement)
45+ cursor.execute(statement, values)
46+
47 _insert_statement = "INSERT INTO eventloops (name, address, port) VALUES "
48 _insert_values_statement = "(%s, %s, %s)"
49
50- def _do_insert(self, cursor):
51+ def _do_insert(self, cursor, addresses):
52 name = eventloop.loop.name
53 statement, values = [], []
54- for addr, port in self._get_addresses():
55+ for addr, port in addresses:
56 statement.append(self._insert_values_statement)
57 values.extend([name, addr, port])
58 if len(statement) != 0:
59
60=== modified file 'src/maasserver/rpc/tests/test_regionservice.py'
61--- src/maasserver/rpc/tests/test_regionservice.py 2015-06-02 15:50:27 +0000
62+++ src/maasserver/rpc/tests/test_regionservice.py 2015-06-05 13:23:18 +0000
63@@ -1718,7 +1718,25 @@
64 def test_init(self):
65 ras = RegionAdvertisingService()
66 self.assertEqual(60, ras.step)
67- self.assertEqual((deferToThread, (ras.update,), {}), ras.call)
68+ self.assertEqual((ras.try_update, (), {}), ras.call)
69+
70+ @wait_for_reactor
71+ @inlineCallbacks
72+ def test_try_update_logs_all_errors(self):
73+ ras = RegionAdvertisingService()
74+ error = factory.make_exception_type()
75+ self.patch(ras, "update").side_effect = error
76+ with TwistedLoggerFixture() as logger:
77+ yield ras.try_update()
78+ self.assertDocTestMatches(
79+ """
80+ Failed to update this event-loop's advertisement;
81+ %s's record may be out of date
82+ Traceback (most recent call last):
83+ ...
84+ maastesting.factory.TestException#...
85+ """ % eventloop.loop.name,
86+ logger.output)
87
88 @wait_for_reactor
89 def test_starting_and_stopping_the_service(self):
90@@ -1926,6 +1944,70 @@
91 [("vic", "192.168.1.1", 1111)],
92 service.dump())
93
94+ def patch_port(self, port):
95+ getServiceNamed = self.patch(eventloop.services, "getServiceNamed")
96+ getPort = getServiceNamed.return_value.getPort
97+ getPort.side_effect = asynchronous(lambda: port)
98+
99+ def patch_addresses(self, addresses):
100+ get_all_interface_addresses = self.patch(
101+ regionservice, "get_all_interface_addresses")
102+ get_all_interface_addresses.return_value = addresses
103+
104+ def test_update_deletes_overlapping_records(self):
105+ service = RegionAdvertisingService()
106+ service.prepare()
107+
108+ # The name of a fictional old event-loop.
109+ example_name = factory.make_name("loop")
110+ # Port and addresses on which the old and current loops are listening.
111+ example_port = factory.pick_port()
112+ example_addrs = {
113+ factory.make_ipv4_address(),
114+ factory.make_ipv4_address(),
115+ }
116+ self.patch_port(example_port)
117+ self.patch_addresses(example_addrs)
118+
119+ # A non-overlapping host and port for the old event-loop.
120+ for example_addr in iter(factory.make_ipv4_address, None):
121+ if example_addr not in example_addrs:
122+ example_non_overlapping_record = (
123+ example_name, example_addr, example_port)
124+ break
125+
126+ # Populate the eventloops table with records for the old event-loop.
127+ with closing(connection):
128+ with closing(connection.cursor()) as cursor:
129+ # Create the non-overlapping record.
130+ cursor.execute("""\
131+ INSERT INTO eventloops (name, address, port, updated)
132+ VALUES (%s, %s, %s, DEFAULT)
133+ """, example_non_overlapping_record)
134+ # Create the overlapping records.
135+ for example_addr in example_addrs:
136+ cursor.execute("""\
137+ INSERT INTO eventloops (name, address, port, updated)
138+ VALUES (%s, %s, %s, DEFAULT)
139+ """, [example_name, example_addr, example_port])
140+
141+ # All records for the old event-loop are present.
142+ self.assertItemsEqual(
143+ [example_non_overlapping_record] + [
144+ (example_name, example_addr, example_port)
145+ for example_addr in example_addrs
146+ ],
147+ service.dump())
148+ # Updating replaces overlapping records with records for the current
149+ # event-loop. The non-overlapping record remains.
150+ service.update()
151+ self.assertItemsEqual(
152+ [example_non_overlapping_record] + [
153+ (eventloop.loop.name, example_addr, example_port)
154+ for example_addr in example_addrs
155+ ],
156+ service.dump())
157+
158 def test_dump(self):
159 example_addresses = [
160 (factory.make_ipv4_address(), factory.pick_port()),
161@@ -1975,9 +2057,7 @@
162 service = RegionAdvertisingService()
163
164 example_port = factory.pick_port()
165- getServiceNamed = self.patch(eventloop.services, "getServiceNamed")
166- getPort = getServiceNamed.return_value.getPort
167- getPort.side_effect = asynchronous(lambda: example_port)
168+ self.patch_port(example_port)
169
170 example_ipv4_addrs = {
171 factory.make_ipv4_address(),
172@@ -1991,9 +2071,7 @@
173 factory.pick_ip_in_network(netaddr.ip.IPV4_LINK_LOCAL),
174 factory.pick_ip_in_network(netaddr.ip.IPV6_LINK_LOCAL),
175 }
176- get_all_interface_addresses = self.patch(
177- regionservice, "get_all_interface_addresses")
178- get_all_interface_addresses.return_value = (
179+ self.patch_addresses(
180 example_ipv4_addrs | example_ipv6_addrs |
181 example_link_local_addrs)
182
183@@ -2003,8 +2081,12 @@
184 [(addr, example_port) for addr in example_ipv4_addrs],
185 service._get_addresses())
186
187- getServiceNamed.assert_called_once_with("rpc")
188- get_all_interface_addresses.assert_called_once_with()
189+ self.assertThat(
190+ eventloop.services.getServiceNamed,
191+ MockCalledOnceWith("rpc"))
192+ self.assertThat(
193+ regionservice.get_all_interface_addresses,
194+ MockCalledOnceWith())
195
196 def test__get_addresses_when_rpc_down(self):
197 service = RegionAdvertisingService()