Merge lp:~blake-rouse/maas/fix-1680278 into lp:~maas-committers/maas/trunk

Proposed by Blake Rouse
Status: Merged
Approved by: Blake Rouse
Approved revision: no longer in the source branch.
Merged at revision: 5979
Proposed branch: lp:~blake-rouse/maas/fix-1680278
Merge into: lp:~maas-committers/maas/trunk
Diff against target: 244 lines (+99/-27)
2 files modified
src/metadataserver/api_twisted.py (+68/-22)
src/metadataserver/tests/test_api_twisted.py (+31/-5)
To merge this branch: bzr merge lp:~blake-rouse/maas/fix-1680278
Reviewer Review Type Date Requested Status
Lee Trager (community) Approve
Review via email: mp+322550@code.launchpad.net

Commit message

Update the last_ping for the script_set in its own transaction. Only update if the timestamp is later in time and only the latest message in the queue for that node. Uses select_for_update to remove the chance on conflicts.

To post a comment you must log in.
Revision history for this message
Blake Rouse (blake-rouse) wrote :

Still need to do the unit tests. But I ran this with 9 VMs on my machine (which makes it out) and all deployed successfully with no issues or tracebacks.

Revision history for this message
Lee Trager (ltrager) wrote :

Two comments below but it looks good!

Revision history for this message
Blake Rouse (blake-rouse) :
Revision history for this message
Lee Trager (ltrager) wrote :

LGTM! Tested by running testing on 3 machines at once on my laptop. Everything completed successfully and no exception in the logs.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'src/metadataserver/api_twisted.py'
--- src/metadataserver/api_twisted.py 2017-03-29 13:58:22 +0000
+++ src/metadataserver/api_twisted.py 2017-04-13 20:41:06 +0000
@@ -9,13 +9,16 @@
9from datetime import datetime9from datetime import datetime
10import json10import json
1111
12from django.db import DatabaseError
12from maasserver.api.utils import extract_oauth_key_from_auth_header13from maasserver.api.utils import extract_oauth_key_from_auth_header
13from maasserver.enum import (14from maasserver.enum import (
14 NODE_STATUS,15 NODE_STATUS,
15 NODE_TYPE,16 NODE_TYPE,
16)17)
18from maasserver.models.timestampedmodel import now
17from maasserver.utils.orm import (19from maasserver.utils.orm import (
18 in_transaction,20 in_transaction,
21 make_serialization_failure,
19 transactional,22 transactional,
20 TransactionManagementError,23 TransactionManagementError,
21)24)
@@ -25,7 +28,10 @@
25 add_event_to_node_event_log,28 add_event_to_node_event_log,
26 process_file,29 process_file,
27)30)
28from metadataserver.models import NodeKey31from metadataserver.models import (
32 NodeKey,
33 ScriptSet,
34)
29from provisioningserver.logger import LegacyLogger35from provisioningserver.logger import LegacyLogger
30from provisioningserver.utils.twisted import deferred36from provisioningserver.utils.twisted import deferred
31from twisted.application.internet import TimerService37from twisted.application.internet import TimerService
@@ -225,7 +231,10 @@
225 "outside of a transaction.")231 "outside of a transaction.")
226 else:232 else:
227 # Here we're in a database thread, with a database connection.233 # Here we're in a database thread, with a database connection.
228 for message in messages:234 # We only save the last_ping off the last message in the
235 # list of messages. This removes the number of database saves
236 # required.
237 for idx, message in enumerate(messages):
229 try:238 try:
230 self._processMessage(node, message)239 self._processMessage(node, message)
231 except:240 except:
@@ -233,6 +242,47 @@
233 None,242 None,
234 "Failed to process message "243 "Failed to process message "
235 "for node: %s" % node.hostname)244 "for node: %s" % node.hostname)
245 if idx == len(messages) - 1:
246 try:
247 self._updateLastPing(node, message)
248 except:
249 log.err(
250 None,
251 "Failed to update last ping "
252 "for node: %s" % node.hostname)
253
254 @transactional
255 def _updateLastPing(self, node, message):
256 """
257 Update the last ping in any status which uses a script_set whenever a
258 node in that status contacts us.
259 """
260 script_set_statuses = {
261 NODE_STATUS.COMMISSIONING: 'current_commissioning_script_set_id',
262 NODE_STATUS.TESTING: 'current_testing_script_set_id',
263 NODE_STATUS.DEPLOYING: 'current_installation_script_set_id',
264 }
265 script_set_property = script_set_statuses.get(node.status)
266 if script_set_property is not None:
267 script_set_id = getattr(node, script_set_property)
268 if script_set_id is not None:
269 try:
270 script_set = ScriptSet.objects.select_for_update(
271 nowait=True).get(id=script_set_id)
272 except ScriptSet.DoesNotExist:
273 # Wierd that it would be deleted, but let not cause a
274 # stack trace for this error.
275 pass
276 except DatabaseError:
277 # select_for_update(nowait=True) failed instantly. Raise
278 # error so @transactional will retry the whole operation.
279 raise make_serialization_failure()
280 else:
281 current_time = now()
282 if (script_set.last_ping is None or
283 current_time > script_set.last_ping):
284 script_set.last_ping = current_time
285 script_set.save(update_fields=['last_ping'])
236286
237 @transactional287 @transactional
238 def _processMessage(self, node, message):288 def _processMessage(self, node, message):
@@ -278,18 +328,6 @@
278 for script_result, args in results.items():328 for script_result, args in results.items():
279 script_result.store_result(**args)329 script_result.store_result(**args)
280330
281 # Update the last ping in any status which uses a script_set whenever a
282 # node in that status contacts us.
283 script_set_statuses = {
284 NODE_STATUS.COMMISSIONING: node.current_commissioning_script_set,
285 NODE_STATUS.TESTING: node.current_testing_script_set,
286 NODE_STATUS.DEPLOYING: node.current_installation_script_set,
287 }
288 script_set = script_set_statuses.get(node.status)
289 if script_set is not None:
290 script_set.last_ping = message['timestamp']
291 script_set.save()
292
293 # At the end of a top-level event, we change the node status.331 # At the end of a top-level event, we change the node status.
294 save_node = False332 save_node = False
295 if self._is_top_level(activity_name) and event_type == 'finish':333 if self._is_top_level(activity_name) and event_type == 'finish':
@@ -342,16 +380,24 @@
342 """Top-level events do not have slashes in their names."""380 """Top-level events do not have slashes in their names."""
343 return '/' not in activity_name381 return '/' not in activity_name
344382
345 @transactional
346 def _processMessageNow(self, authorization, message):383 def _processMessageNow(self, authorization, message):
347 try:384 # This should be called in a non-reactor thread with a pre-existing
348 node = NodeKey.objects.get_node_for_key(authorization)385 # connection (e.g. via deferToDatabase).
349 except NodeKey.DoesNotExist:386 if in_transaction():
350 # The node that should get this message has already had its owner387 raise TransactionManagementError(
351 # cleared or changed and this message cannot be saved.388 "_processMessageNow must be called from "
352 return None389 "outside of a transaction.")
353 else:390 else:
354 return self._processMessage(node, message)391 try:
392 node = transactional(NodeKey.objects.get_node_for_key)(
393 authorization)
394 except NodeKey.DoesNotExist:
395 # The node that should get this message has already had its
396 # owner cleared or changed and this message cannot be saved.
397 return None
398 else:
399 self._processMessage(node, message)
400 self._updateLastPing(node, message)
355401
356 @deferred402 @deferred
357 def queueMessage(self, authorization, message):403 def queueMessage(self, authorization, message):
358404
=== modified file 'src/metadataserver/tests/test_api_twisted.py'
--- src/metadataserver/tests/test_api_twisted.py 2017-03-29 13:58:22 +0000
+++ src/metadataserver/tests/test_api_twisted.py 2017-04-13 20:41:06 +0000
@@ -11,6 +11,7 @@
11from io import BytesIO11from io import BytesIO
12import json12import json
13from unittest.mock import (13from unittest.mock import (
14 call,
14 Mock,15 Mock,
15 sentinel,16 sentinel,
16)17)
@@ -35,6 +36,7 @@
35from maasserver.utils.threads import deferToDatabase36from maasserver.utils.threads import deferToDatabase
36from maastesting.matchers import (37from maastesting.matchers import (
37 MockCalledOnceWith,38 MockCalledOnceWith,
39 MockCallsMatch,
38 MockNotCalled,40 MockNotCalled,
39)41)
40from maastesting.testcase import MAASTestCase42from maastesting.testcase import MAASTestCase
@@ -229,20 +231,37 @@
229231
230 @wait_for_reactor232 @wait_for_reactor
231 @inlineCallbacks233 @inlineCallbacks
232 def test__processMessages_calls_processMessage(self):234 def test__processMessageNow_fails_when_in_transaction(self):
235 worker = StatusWorkerService(sentinel.dbtasks)
236 with ExpectedException(TransactionManagementError):
237 yield deferToDatabase(
238 transactional(worker._processMessageNow),
239 sentinel.node, sentinel.message)
240
241 @wait_for_reactor
242 @inlineCallbacks
243 def test__processMessages_calls_processMessage_and_updateLastPing(self):
233 worker = StatusWorkerService(sentinel.dbtasks)244 worker = StatusWorkerService(sentinel.dbtasks)
234 mock_processMessage = self.patch(worker, "_processMessage")245 mock_processMessage = self.patch(worker, "_processMessage")
246 mock_updateLastPing = self.patch(worker, "_updateLastPing")
235 yield deferToDatabase(247 yield deferToDatabase(
236 worker._processMessages, sentinel.node, [sentinel.message])248 worker._processMessages, sentinel.node,
249 [sentinel.message1, sentinel.message2])
237 self.assertThat(250 self.assertThat(
238 mock_processMessage,251 mock_processMessage,
239 MockCalledOnceWith(sentinel.node, sentinel.message))252 MockCallsMatch(
253 call(sentinel.node, sentinel.message1),
254 call(sentinel.node, sentinel.message2)))
255 self.assertThat(
256 mock_updateLastPing,
257 MockCalledOnceWith(sentinel.node, sentinel.message2))
240258
241 @wait_for_reactor259 @wait_for_reactor
242 @inlineCallbacks260 @inlineCallbacks
243 def test_queueMessages_processes_top_level_message_instantly(self):261 def test_queueMessages_processes_top_level_message_instantly(self):
244 worker = StatusWorkerService(sentinel.dbtasks)262 worker = StatusWorkerService(sentinel.dbtasks)
245 mock_processMessage = self.patch(worker, "_processMessage")263 mock_processMessage = self.patch(worker, "_processMessage")
264 mock_updateLastPing = self.patch(worker, "_updateLastPing")
246 message = self.make_message()265 message = self.make_message()
247 message['event_type'] = 'finish'266 message['event_type'] = 'finish'
248 nodes_with_tokens = yield deferToDatabase(self.make_nodes_with_tokens)267 nodes_with_tokens = yield deferToDatabase(self.make_nodes_with_tokens)
@@ -251,6 +270,9 @@
251 self.assertThat(270 self.assertThat(
252 mock_processMessage,271 mock_processMessage,
253 MockCalledOnceWith(node, message))272 MockCalledOnceWith(node, message))
273 self.assertThat(
274 mock_updateLastPing,
275 MockCalledOnceWith(node, message))
254276
255 @wait_for_reactor277 @wait_for_reactor
256 @inlineCallbacks278 @inlineCallbacks
@@ -313,6 +335,10 @@
313 worker = StatusWorkerService(sentinel.dbtasks)335 worker = StatusWorkerService(sentinel.dbtasks)
314 worker._processMessage(node, payload)336 worker._processMessage(node, payload)
315337
338 def updateLastPing(self, node, payload):
339 worker = StatusWorkerService(sentinel.dbtasks)
340 worker._updateLastPing(node, payload)
341
316 def test_status_installation_result_does_not_affect_other_node(self):342 def test_status_installation_result_does_not_affect_other_node(self):
317 node1 = factory.make_Node(status=NODE_STATUS.DEPLOYING)343 node1 = factory.make_Node(status=NODE_STATUS.DEPLOYING)
318 node2 = factory.make_Node(status=NODE_STATUS.DEPLOYING)344 node2 = factory.make_Node(status=NODE_STATUS.DEPLOYING)
@@ -792,7 +818,7 @@
792 break818 break
793 self.assertEqual(content, script_result.stdout)819 self.assertEqual(content, script_result.stdout)
794820
795 def test_status_updates_script_status_last_ping(self):821 def test_updateLastPing_updates_script_status_last_ping(self):
796 nodes = {822 nodes = {
797 status: factory.make_Node(823 status: factory.make_Node(
798 status=status, with_empty_script_sets=True)824 status=status, with_empty_script_sets=True)
@@ -810,7 +836,7 @@
810 'description': 'testing',836 'description': 'testing',
811 'timestamp': datetime.utcnow(),837 'timestamp': datetime.utcnow(),
812 }838 }
813 self.processMessage(node, payload)839 self.updateLastPing(node, payload)
814 script_set_statuses = {840 script_set_statuses = {
815 NODE_STATUS.COMMISSIONING: (841 NODE_STATUS.COMMISSIONING: (
816 node.current_commissioning_script_set),842 node.current_commissioning_script_set),