Merge ~vtqanh/cloud-init:ImproveHyperVKvpReporter into cloud-init:master

Proposed by Anh Vo (MSFT)
Status: Merged
Merge reported by: Anh Vo (MSFT)
Merged at revision: 6311bab8422a2552b3811e611436657906ab8666
Proposed branch: ~vtqanh/cloud-init:ImproveHyperVKvpReporter
Merge into: cloud-init:master
Diff against target: 365 lines (+106/-115)
2 files modified
cloudinit/reporting/handlers.py (+57/-60)
tests/unittests/test_reporting_hyperv.py (+49/-55)
Reviewer Review Type Date Requested Status
Server Team CI bot Needs Fixing
Ryan Harper Approve
Review via email: mp+366044@code.launchpad.net

This proposal has been superseded by a proposal from 2019-04-29.

Commit message

Azure: Changes to the Hyper-V KVP Reporter

* Truncate KVP Pool file to prevent stale entries from being processed
  by the Hyper-V KVP reporter.
* No longer update previous entries in the KVP pool as this is not
  desirable and potentially has negative impact to performance.
* Batch appending of existing KVP entries to reduce performance impact

To post a comment you must log in.
Revision history for this message
Ryan Harper (raharper) wrote :

I've pointed CI at this branch. A couple of inline comments and questions.

Revision history for this message
Server Team CI bot (server-team-bot) wrote :

PASSED: Continuous integration, rev:9e465c3512a14e85fe20ea1ce7a4f622b444a6b3
https://jenkins.ubuntu.com/server/job/cloud-init-ci/679/
Executed test runs:
    SUCCESS: Checkout
    SUCCESS: Unit & Style Tests
    SUCCESS: Ubuntu LTS: Build
    SUCCESS: Ubuntu LTS: Integration
    IN_PROGRESS: Declarative: Post Actions

Click here to trigger a rebuild:
https://jenkins.ubuntu.com/server/job/cloud-init-ci/679/rebuild

review: Approve (continuous-integration)
Revision history for this message
Anh Vo (MSFT) (vtqanh) wrote :

Thanks Ryan - I replied to the comments inline

Revision history for this message
Ryan Harper (raharper) wrote :

Thanks for the update. Just a few requests for comments/updated commit message.

Revision history for this message
Anh Vo (MSFT) (vtqanh) wrote :

I have updated the comments and updated the commit message.

Revision history for this message
Server Team CI bot (server-team-bot) wrote :

PASSED: Continuous integration, rev:6311bab8422a2552b3811e611436657906ab8666
https://jenkins.ubuntu.com/server/job/cloud-init-ci/691/
Executed test runs:
    SUCCESS: Checkout
    SUCCESS: Unit & Style Tests
    SUCCESS: Ubuntu LTS: Build
    SUCCESS: Ubuntu LTS: Integration
    IN_PROGRESS: Declarative: Post Actions

Click here to trigger a rebuild:
https://jenkins.ubuntu.com/server/job/cloud-init-ci/691/rebuild

review: Approve (continuous-integration)
Revision history for this message
Ryan Harper (raharper) wrote :

Thanks for updating.

review: Approve
Revision history for this message
Server Team CI bot (server-team-bot) :
review: Approve (continuous-integration)
Revision history for this message
Server Team CI bot (server-team-bot) wrote :

Commit message lints:
 - Expected empty line on line 2 of the commit message - Line #1 has 26 too many characters. Line starts with: " + Truncate KVP Pool"... - Line #2 has 55 too many characters. Line starts with: " + No longer update previous"...

review: Needs Fixing

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py
2old mode 100644
3new mode 100755
4index 6d23558..10165ae
5--- a/cloudinit/reporting/handlers.py
6+++ b/cloudinit/reporting/handlers.py
7@@ -5,7 +5,6 @@ import fcntl
8 import json
9 import six
10 import os
11-import re
12 import struct
13 import threading
14 import time
15@@ -14,6 +13,7 @@ from cloudinit import log as logging
16 from cloudinit.registry import DictRegistry
17 from cloudinit import (url_helper, util)
18 from datetime import datetime
19+from six.moves.queue import Empty as QueueEmptyError
20
21 if six.PY2:
22 from multiprocessing.queues import JoinableQueue as JQueue
23@@ -129,24 +129,50 @@ class HyperVKvpReportingHandler(ReportingHandler):
24 DESC_IDX_KEY = 'msg_i'
25 JSON_SEPARATORS = (',', ':')
26 KVP_POOL_FILE_GUEST = '/var/lib/hyperv/.kvp_pool_1'
27+ _already_truncated_pool_file = False
28
29 def __init__(self,
30 kvp_file_path=KVP_POOL_FILE_GUEST,
31 event_types=None):
32 super(HyperVKvpReportingHandler, self).__init__()
33 self._kvp_file_path = kvp_file_path
34+ HyperVKvpReportingHandler._truncate_guest_pool_file(
35+ self._kvp_file_path)
36+
37 self._event_types = event_types
38 self.q = JQueue()
39- self.kvp_file = None
40 self.incarnation_no = self._get_incarnation_no()
41 self.event_key_prefix = u"{0}|{1}".format(self.EVENT_PREFIX,
42 self.incarnation_no)
43- self._current_offset = 0
44 self.publish_thread = threading.Thread(
45 target=self._publish_event_routine)
46 self.publish_thread.daemon = True
47 self.publish_thread.start()
48
49+ @classmethod
50+ def _truncate_guest_pool_file(cls, kvp_file):
51+ """
52+ Truncate the pool file if it has not been truncated since boot.
53+ This should be done exactly once for the file indicated by
54+ KVP_POOL_FILE_GUEST constant above. This method takes a filename
55+ so that we can use an arbitrary file during unit testing.
56+ Since KVP is a best-effort telemetry channel we only attempt to
57+ truncate the file once and only if the file has not been modified
58+ since boot. Additional truncation can lead to loss of existing
59+ KVPs.
60+ """
61+ if cls._already_truncated_pool_file:
62+ return
63+ boot_time = time.time() - float(util.uptime())
64+ try:
65+ if os.path.getmtime(kvp_file) < boot_time:
66+ with open(kvp_file, "w"):
67+ pass
68+ except (OSError, IOError) as e:
69+ LOG.warning("failed to truncate kvp pool file, %s", e)
70+ finally:
71+ cls._already_truncated_pool_file = True
72+
73 def _get_incarnation_no(self):
74 """
75 use the time passed as the incarnation number.
76@@ -162,20 +188,15 @@ class HyperVKvpReportingHandler(ReportingHandler):
77
78 def _iterate_kvps(self, offset):
79 """iterate the kvp file from the current offset."""
80- try:
81- with open(self._kvp_file_path, 'rb+') as f:
82- self.kvp_file = f
83- fcntl.flock(f, fcntl.LOCK_EX)
84- f.seek(offset)
85+ with open(self._kvp_file_path, 'rb') as f:
86+ fcntl.flock(f, fcntl.LOCK_EX)
87+ f.seek(offset)
88+ record_data = f.read(self.HV_KVP_RECORD_SIZE)
89+ while len(record_data) == self.HV_KVP_RECORD_SIZE:
90+ kvp_item = self._decode_kvp_item(record_data)
91+ yield kvp_item
92 record_data = f.read(self.HV_KVP_RECORD_SIZE)
93- while len(record_data) == self.HV_KVP_RECORD_SIZE:
94- self._current_offset += self.HV_KVP_RECORD_SIZE
95- kvp_item = self._decode_kvp_item(record_data)
96- yield kvp_item
97- record_data = f.read(self.HV_KVP_RECORD_SIZE)
98- fcntl.flock(f, fcntl.LOCK_UN)
99- finally:
100- self.kvp_file = None
101+ fcntl.flock(f, fcntl.LOCK_UN)
102
103 def _event_key(self, event):
104 """
105@@ -207,23 +228,13 @@ class HyperVKvpReportingHandler(ReportingHandler):
106
107 return {'key': k, 'value': v}
108
109- def _update_kvp_item(self, record_data):
110- if self.kvp_file is None:
111- raise ReportException(
112- "kvp file '{0}' not opened."
113- .format(self._kvp_file_path))
114- self.kvp_file.seek(-self.HV_KVP_RECORD_SIZE, 1)
115- self.kvp_file.write(record_data)
116-
117 def _append_kvp_item(self, record_data):
118- with open(self._kvp_file_path, 'rb+') as f:
119+ with open(self._kvp_file_path, 'ab') as f:
120 fcntl.flock(f, fcntl.LOCK_EX)
121- # seek to end of the file
122- f.seek(0, 2)
123- f.write(record_data)
124+ for data in record_data:
125+ f.write(data)
126 f.flush()
127 fcntl.flock(f, fcntl.LOCK_UN)
128- self._current_offset = f.tell()
129
130 def _break_down(self, key, meta_data, description):
131 del meta_data[self.MSG_KEY]
132@@ -279,40 +290,26 @@ class HyperVKvpReportingHandler(ReportingHandler):
133
134 def _publish_event_routine(self):
135 while True:
136+ items_from_queue = 0
137 try:
138 event = self.q.get(block=True)
139- need_append = True
140+ items_from_queue += 1
141+ encoded_data = []
142+ while event is not None:
143+ encoded_data += self._encode_event(event)
144+ try:
145+ # get all the rest of the events in the queue
146+ event = self.q.get(block=False)
147+ items_from_queue += 1
148+ except QueueEmptyError:
149+ event = None
150 try:
151- if not os.path.exists(self._kvp_file_path):
152- LOG.warning(
153- "skip writing events %s to %s. file not present.",
154- event.as_string(),
155- self._kvp_file_path)
156- encoded_event = self._encode_event(event)
157- # for each encoded_event
158- for encoded_data in (encoded_event):
159- for kvp in self._iterate_kvps(self._current_offset):
160- match = (
161- re.match(
162- r"^{0}\|(\d+)\|.+"
163- .format(self.EVENT_PREFIX),
164- kvp['key']
165- ))
166- if match:
167- match_groups = match.groups(0)
168- if int(match_groups[0]) < self.incarnation_no:
169- need_append = False
170- self._update_kvp_item(encoded_data)
171- continue
172- if need_append:
173- self._append_kvp_item(encoded_data)
174- except IOError as e:
175- LOG.warning(
176- "failed posting event to kvp: %s e:%s",
177- event.as_string(), e)
178+ self._append_kvp_item(encoded_data)
179+ except (OSError, IOError) as e:
180+ LOG.warning("failed posting events to kvp, %s", e)
181 finally:
182- self.q.task_done()
183-
184+ for _ in range(items_from_queue):
185+ self.q.task_done()
186 # when main process exits, q.get() will through EOFError
187 # indicating we should exit this thread.
188 except EOFError:
189@@ -322,7 +319,7 @@ class HyperVKvpReportingHandler(ReportingHandler):
190 # if the kvp pool already contains a chunk of data,
191 # so defer it to another thread.
192 def publish_event(self, event):
193- if (not self._event_types or event.event_type in self._event_types):
194+ if not self._event_types or event.event_type in self._event_types:
195 self.q.put(event)
196
197 def flush(self):
198diff --git a/tests/unittests/test_reporting_hyperv.py b/tests/unittests/test_reporting_hyperv.py
199old mode 100644
200new mode 100755
201index 2e64c6c..d01ed5b
202--- a/tests/unittests/test_reporting_hyperv.py
203+++ b/tests/unittests/test_reporting_hyperv.py
204@@ -1,10 +1,12 @@
205 # This file is part of cloud-init. See LICENSE file for license information.
206
207 from cloudinit.reporting import events
208-from cloudinit.reporting import handlers
209+from cloudinit.reporting.handlers import HyperVKvpReportingHandler
210
211 import json
212 import os
213+import struct
214+import time
215
216 from cloudinit import util
217 from cloudinit.tests.helpers import CiTestCase
218@@ -13,7 +15,7 @@ from cloudinit.tests.helpers import CiTestCase
219 class TestKvpEncoding(CiTestCase):
220 def test_encode_decode(self):
221 kvp = {'key': 'key1', 'value': 'value1'}
222- kvp_reporting = handlers.HyperVKvpReportingHandler()
223+ kvp_reporting = HyperVKvpReportingHandler()
224 data = kvp_reporting._encode_kvp_item(kvp['key'], kvp['value'])
225 self.assertEqual(len(data), kvp_reporting.HV_KVP_RECORD_SIZE)
226 decoded_kvp = kvp_reporting._decode_kvp_item(data)
227@@ -26,57 +28,9 @@ class TextKvpReporter(CiTestCase):
228 self.tmp_file_path = self.tmp_path('kvp_pool_file')
229 util.ensure_file(self.tmp_file_path)
230
231- def test_event_type_can_be_filtered(self):
232- reporter = handlers.HyperVKvpReportingHandler(
233- kvp_file_path=self.tmp_file_path,
234- event_types=['foo', 'bar'])
235-
236- reporter.publish_event(
237- events.ReportingEvent('foo', 'name', 'description'))
238- reporter.publish_event(
239- events.ReportingEvent('some_other', 'name', 'description3'))
240- reporter.q.join()
241-
242- kvps = list(reporter._iterate_kvps(0))
243- self.assertEqual(1, len(kvps))
244-
245- reporter.publish_event(
246- events.ReportingEvent('bar', 'name', 'description2'))
247- reporter.q.join()
248- kvps = list(reporter._iterate_kvps(0))
249- self.assertEqual(2, len(kvps))
250-
251- self.assertIn('foo', kvps[0]['key'])
252- self.assertIn('bar', kvps[1]['key'])
253- self.assertNotIn('some_other', kvps[0]['key'])
254- self.assertNotIn('some_other', kvps[1]['key'])
255-
256- def test_events_are_over_written(self):
257- reporter = handlers.HyperVKvpReportingHandler(
258- kvp_file_path=self.tmp_file_path)
259-
260- self.assertEqual(0, len(list(reporter._iterate_kvps(0))))
261-
262- reporter.publish_event(
263- events.ReportingEvent('foo', 'name1', 'description'))
264- reporter.publish_event(
265- events.ReportingEvent('foo', 'name2', 'description'))
266- reporter.q.join()
267- self.assertEqual(2, len(list(reporter._iterate_kvps(0))))
268-
269- reporter2 = handlers.HyperVKvpReportingHandler(
270- kvp_file_path=self.tmp_file_path)
271- reporter2.incarnation_no = reporter.incarnation_no + 1
272- reporter2.publish_event(
273- events.ReportingEvent('foo', 'name3', 'description'))
274- reporter2.q.join()
275-
276- self.assertEqual(2, len(list(reporter2._iterate_kvps(0))))
277-
278 def test_events_with_higher_incarnation_not_over_written(self):
279- reporter = handlers.HyperVKvpReportingHandler(
280+ reporter = HyperVKvpReportingHandler(
281 kvp_file_path=self.tmp_file_path)
282-
283 self.assertEqual(0, len(list(reporter._iterate_kvps(0))))
284
285 reporter.publish_event(
286@@ -86,7 +40,7 @@ class TextKvpReporter(CiTestCase):
287 reporter.q.join()
288 self.assertEqual(2, len(list(reporter._iterate_kvps(0))))
289
290- reporter3 = handlers.HyperVKvpReportingHandler(
291+ reporter3 = HyperVKvpReportingHandler(
292 kvp_file_path=self.tmp_file_path)
293 reporter3.incarnation_no = reporter.incarnation_no - 1
294 reporter3.publish_event(
295@@ -95,7 +49,7 @@ class TextKvpReporter(CiTestCase):
296 self.assertEqual(3, len(list(reporter3._iterate_kvps(0))))
297
298 def test_finish_event_result_is_logged(self):
299- reporter = handlers.HyperVKvpReportingHandler(
300+ reporter = HyperVKvpReportingHandler(
301 kvp_file_path=self.tmp_file_path)
302 reporter.publish_event(
303 events.FinishReportingEvent('name2', 'description1',
304@@ -105,7 +59,7 @@ class TextKvpReporter(CiTestCase):
305
306 def test_file_operation_issue(self):
307 os.remove(self.tmp_file_path)
308- reporter = handlers.HyperVKvpReportingHandler(
309+ reporter = HyperVKvpReportingHandler(
310 kvp_file_path=self.tmp_file_path)
311 reporter.publish_event(
312 events.FinishReportingEvent('name2', 'description1',
313@@ -113,7 +67,7 @@ class TextKvpReporter(CiTestCase):
314 reporter.q.join()
315
316 def test_event_very_long(self):
317- reporter = handlers.HyperVKvpReportingHandler(
318+ reporter = HyperVKvpReportingHandler(
319 kvp_file_path=self.tmp_file_path)
320 description = 'ab' * reporter.HV_KVP_EXCHANGE_MAX_VALUE_SIZE
321 long_event = events.FinishReportingEvent(
322@@ -132,3 +86,43 @@ class TextKvpReporter(CiTestCase):
323 self.assertEqual(msg_slice['msg_i'], i)
324 full_description += msg_slice['msg']
325 self.assertEqual(description, full_description)
326+
327+ def test_not_truncate_kvp_file_modified_after_boot(self):
328+ with open(self.tmp_file_path, "wb+") as f:
329+ kvp = {'key': 'key1', 'value': 'value1'}
330+ data = (struct.pack("%ds%ds" % (
331+ HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_KEY_SIZE,
332+ HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_VALUE_SIZE),
333+ kvp['key'].encode('utf-8'), kvp['value'].encode('utf-8')))
334+ f.write(data)
335+ cur_time = time.time()
336+ os.utime(self.tmp_file_path, (cur_time, cur_time))
337+
338+ # reset this because the unit test framework
339+ # has already polluted the class variable
340+ HyperVKvpReportingHandler._already_truncated_pool_file = False
341+
342+ reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
343+ kvps = list(reporter._iterate_kvps(0))
344+ self.assertEqual(1, len(kvps))
345+
346+ def test_truncate_stale_kvp_file(self):
347+ with open(self.tmp_file_path, "wb+") as f:
348+ kvp = {'key': 'key1', 'value': 'value1'}
349+ data = (struct.pack("%ds%ds" % (
350+ HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_KEY_SIZE,
351+ HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_VALUE_SIZE),
352+ kvp['key'].encode('utf-8'), kvp['value'].encode('utf-8')))
353+ f.write(data)
354+
355+ # set the time ways back to make it look like
356+ # we had an old kvp file
357+ os.utime(self.tmp_file_path, (1000000, 1000000))
358+
359+ # reset this because the unit test framework
360+ # has already polluted the class variable
361+ HyperVKvpReportingHandler._already_truncated_pool_file = False
362+
363+ reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path)
364+ kvps = list(reporter._iterate_kvps(0))
365+ self.assertEqual(0, len(kvps))

Subscribers

People subscribed via source and target branches