Merge lp:~blake-rouse/maas/fix-1680278 into lp:~maas-committers/maas/trunk

Proposed by Blake Rouse
Status: Merged
Approved by: Blake Rouse
Approved revision: no longer in the source branch.
Merged at revision: 5979
Proposed branch: lp:~blake-rouse/maas/fix-1680278
Merge into: lp:~maas-committers/maas/trunk
Diff against target: 244 lines (+99/-27)
2 files modified
src/metadataserver/api_twisted.py (+68/-22)
src/metadataserver/tests/test_api_twisted.py (+31/-5)
To merge this branch: bzr merge lp:~blake-rouse/maas/fix-1680278
Reviewer Review Type Date Requested Status
Lee Trager (community) Approve
Review via email: mp+322550@code.launchpad.net

Commit message

Update the last_ping for the script_set in its own transaction. Only update if the timestamp is later in time and only the latest message in the queue for that node. Uses select_for_update to remove the chance on conflicts.

To post a comment you must log in.
Revision history for this message
Blake Rouse (blake-rouse) wrote :

Still need to do the unit tests. But I ran this with 9 VMs on my machine (which makes it out) and all deployed successfully with no issues or tracebacks.

Revision history for this message
Lee Trager (ltrager) wrote :

Two comments below but it looks good!

Revision history for this message
Blake Rouse (blake-rouse) :
Revision history for this message
Lee Trager (ltrager) wrote :

LGTM! Tested by running testing on 3 machines at once on my laptop. Everything completed successfully and no exception in the logs.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/metadataserver/api_twisted.py'
2--- src/metadataserver/api_twisted.py 2017-03-29 13:58:22 +0000
3+++ src/metadataserver/api_twisted.py 2017-04-13 20:41:06 +0000
4@@ -9,13 +9,16 @@
5 from datetime import datetime
6 import json
7
8+from django.db import DatabaseError
9 from maasserver.api.utils import extract_oauth_key_from_auth_header
10 from maasserver.enum import (
11 NODE_STATUS,
12 NODE_TYPE,
13 )
14+from maasserver.models.timestampedmodel import now
15 from maasserver.utils.orm import (
16 in_transaction,
17+ make_serialization_failure,
18 transactional,
19 TransactionManagementError,
20 )
21@@ -25,7 +28,10 @@
22 add_event_to_node_event_log,
23 process_file,
24 )
25-from metadataserver.models import NodeKey
26+from metadataserver.models import (
27+ NodeKey,
28+ ScriptSet,
29+)
30 from provisioningserver.logger import LegacyLogger
31 from provisioningserver.utils.twisted import deferred
32 from twisted.application.internet import TimerService
33@@ -225,7 +231,10 @@
34 "outside of a transaction.")
35 else:
36 # Here we're in a database thread, with a database connection.
37- for message in messages:
38+ # We only save the last_ping off the last message in the
39+ # list of messages. This removes the number of database saves
40+ # required.
41+ for idx, message in enumerate(messages):
42 try:
43 self._processMessage(node, message)
44 except:
45@@ -233,6 +242,47 @@
46 None,
47 "Failed to process message "
48 "for node: %s" % node.hostname)
49+ if idx == len(messages) - 1:
50+ try:
51+ self._updateLastPing(node, message)
52+ except:
53+ log.err(
54+ None,
55+ "Failed to update last ping "
56+ "for node: %s" % node.hostname)
57+
58+ @transactional
59+ def _updateLastPing(self, node, message):
60+ """
61+ Update the last ping in any status which uses a script_set whenever a
62+ node in that status contacts us.
63+ """
64+ script_set_statuses = {
65+ NODE_STATUS.COMMISSIONING: 'current_commissioning_script_set_id',
66+ NODE_STATUS.TESTING: 'current_testing_script_set_id',
67+ NODE_STATUS.DEPLOYING: 'current_installation_script_set_id',
68+ }
69+ script_set_property = script_set_statuses.get(node.status)
70+ if script_set_property is not None:
71+ script_set_id = getattr(node, script_set_property)
72+ if script_set_id is not None:
73+ try:
74+ script_set = ScriptSet.objects.select_for_update(
75+ nowait=True).get(id=script_set_id)
76+ except ScriptSet.DoesNotExist:
77+ # Wierd that it would be deleted, but let not cause a
78+ # stack trace for this error.
79+ pass
80+ except DatabaseError:
81+ # select_for_update(nowait=True) failed instantly. Raise
82+ # error so @transactional will retry the whole operation.
83+ raise make_serialization_failure()
84+ else:
85+ current_time = now()
86+ if (script_set.last_ping is None or
87+ current_time > script_set.last_ping):
88+ script_set.last_ping = current_time
89+ script_set.save(update_fields=['last_ping'])
90
91 @transactional
92 def _processMessage(self, node, message):
93@@ -278,18 +328,6 @@
94 for script_result, args in results.items():
95 script_result.store_result(**args)
96
97- # Update the last ping in any status which uses a script_set whenever a
98- # node in that status contacts us.
99- script_set_statuses = {
100- NODE_STATUS.COMMISSIONING: node.current_commissioning_script_set,
101- NODE_STATUS.TESTING: node.current_testing_script_set,
102- NODE_STATUS.DEPLOYING: node.current_installation_script_set,
103- }
104- script_set = script_set_statuses.get(node.status)
105- if script_set is not None:
106- script_set.last_ping = message['timestamp']
107- script_set.save()
108-
109 # At the end of a top-level event, we change the node status.
110 save_node = False
111 if self._is_top_level(activity_name) and event_type == 'finish':
112@@ -342,16 +380,24 @@
113 """Top-level events do not have slashes in their names."""
114 return '/' not in activity_name
115
116- @transactional
117 def _processMessageNow(self, authorization, message):
118- try:
119- node = NodeKey.objects.get_node_for_key(authorization)
120- except NodeKey.DoesNotExist:
121- # The node that should get this message has already had its owner
122- # cleared or changed and this message cannot be saved.
123- return None
124+ # This should be called in a non-reactor thread with a pre-existing
125+ # connection (e.g. via deferToDatabase).
126+ if in_transaction():
127+ raise TransactionManagementError(
128+ "_processMessageNow must be called from "
129+ "outside of a transaction.")
130 else:
131- return self._processMessage(node, message)
132+ try:
133+ node = transactional(NodeKey.objects.get_node_for_key)(
134+ authorization)
135+ except NodeKey.DoesNotExist:
136+ # The node that should get this message has already had its
137+ # owner cleared or changed and this message cannot be saved.
138+ return None
139+ else:
140+ self._processMessage(node, message)
141+ self._updateLastPing(node, message)
142
143 @deferred
144 def queueMessage(self, authorization, message):
145
146=== modified file 'src/metadataserver/tests/test_api_twisted.py'
147--- src/metadataserver/tests/test_api_twisted.py 2017-03-29 13:58:22 +0000
148+++ src/metadataserver/tests/test_api_twisted.py 2017-04-13 20:41:06 +0000
149@@ -11,6 +11,7 @@
150 from io import BytesIO
151 import json
152 from unittest.mock import (
153+ call,
154 Mock,
155 sentinel,
156 )
157@@ -35,6 +36,7 @@
158 from maasserver.utils.threads import deferToDatabase
159 from maastesting.matchers import (
160 MockCalledOnceWith,
161+ MockCallsMatch,
162 MockNotCalled,
163 )
164 from maastesting.testcase import MAASTestCase
165@@ -229,20 +231,37 @@
166
167 @wait_for_reactor
168 @inlineCallbacks
169- def test__processMessages_calls_processMessage(self):
170+ def test__processMessageNow_fails_when_in_transaction(self):
171+ worker = StatusWorkerService(sentinel.dbtasks)
172+ with ExpectedException(TransactionManagementError):
173+ yield deferToDatabase(
174+ transactional(worker._processMessageNow),
175+ sentinel.node, sentinel.message)
176+
177+ @wait_for_reactor
178+ @inlineCallbacks
179+ def test__processMessages_calls_processMessage_and_updateLastPing(self):
180 worker = StatusWorkerService(sentinel.dbtasks)
181 mock_processMessage = self.patch(worker, "_processMessage")
182+ mock_updateLastPing = self.patch(worker, "_updateLastPing")
183 yield deferToDatabase(
184- worker._processMessages, sentinel.node, [sentinel.message])
185+ worker._processMessages, sentinel.node,
186+ [sentinel.message1, sentinel.message2])
187 self.assertThat(
188 mock_processMessage,
189- MockCalledOnceWith(sentinel.node, sentinel.message))
190+ MockCallsMatch(
191+ call(sentinel.node, sentinel.message1),
192+ call(sentinel.node, sentinel.message2)))
193+ self.assertThat(
194+ mock_updateLastPing,
195+ MockCalledOnceWith(sentinel.node, sentinel.message2))
196
197 @wait_for_reactor
198 @inlineCallbacks
199 def test_queueMessages_processes_top_level_message_instantly(self):
200 worker = StatusWorkerService(sentinel.dbtasks)
201 mock_processMessage = self.patch(worker, "_processMessage")
202+ mock_updateLastPing = self.patch(worker, "_updateLastPing")
203 message = self.make_message()
204 message['event_type'] = 'finish'
205 nodes_with_tokens = yield deferToDatabase(self.make_nodes_with_tokens)
206@@ -251,6 +270,9 @@
207 self.assertThat(
208 mock_processMessage,
209 MockCalledOnceWith(node, message))
210+ self.assertThat(
211+ mock_updateLastPing,
212+ MockCalledOnceWith(node, message))
213
214 @wait_for_reactor
215 @inlineCallbacks
216@@ -313,6 +335,10 @@
217 worker = StatusWorkerService(sentinel.dbtasks)
218 worker._processMessage(node, payload)
219
220+ def updateLastPing(self, node, payload):
221+ worker = StatusWorkerService(sentinel.dbtasks)
222+ worker._updateLastPing(node, payload)
223+
224 def test_status_installation_result_does_not_affect_other_node(self):
225 node1 = factory.make_Node(status=NODE_STATUS.DEPLOYING)
226 node2 = factory.make_Node(status=NODE_STATUS.DEPLOYING)
227@@ -792,7 +818,7 @@
228 break
229 self.assertEqual(content, script_result.stdout)
230
231- def test_status_updates_script_status_last_ping(self):
232+ def test_updateLastPing_updates_script_status_last_ping(self):
233 nodes = {
234 status: factory.make_Node(
235 status=status, with_empty_script_sets=True)
236@@ -810,7 +836,7 @@
237 'description': 'testing',
238 'timestamp': datetime.utcnow(),
239 }
240- self.processMessage(node, payload)
241+ self.updateLastPing(node, payload)
242 script_set_statuses = {
243 NODE_STATUS.COMMISSIONING: (
244 node.current_commissioning_script_set),