Merge ~vtqanh/cloud-init:ImproveHyperVKvpReporter into cloud-init:master
- Git
- lp:~vtqanh/cloud-init
- ImproveHyperVKvpReporter
- Merge into master
Status: | Merged |
---|---|
Merge reported by: | Anh Vo (MSFT) |
Merged at revision: | 6311bab8422a2552b3811e611436657906ab8666 |
Proposed branch: | ~vtqanh/cloud-init:ImproveHyperVKvpReporter |
Merge into: | cloud-init:master |
Diff against target: |
365 lines (+106/-115) 2 files modified
cloudinit/reporting/handlers.py (+57/-60) tests/unittests/test_reporting_hyperv.py (+49/-55) |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Server Team CI bot | Needs Fixing | ||
Ryan Harper | Approve | ||
Review via email: mp+366044@code.launchpad.net |
This proposal has been superseded by a proposal from 2019-04-29.
Commit message
Azure: Changes to the Hyper-V KVP Reporter
* Truncate KVP Pool file to prevent stale entries from being processed
by the Hyper-V KVP reporter.
* No longer update previous entries in the KVP pool as this is not
desirable and potentially has negative impact to performance.
* Batch appending of existing KVP entries to reduce performance impact
Description of the change
Ryan Harper (raharper) wrote : | # |
Server Team CI bot (server-team-bot) wrote : | # |
PASSED: Continuous integration, rev:9e465c3512a
https:/
Executed test runs:
SUCCESS: Checkout
SUCCESS: Unit & Style Tests
SUCCESS: Ubuntu LTS: Build
SUCCESS: Ubuntu LTS: Integration
IN_PROGRESS: Declarative: Post Actions
Click here to trigger a rebuild:
https:/
Anh Vo (MSFT) (vtqanh) wrote : | # |
Thanks Ryan - I replied to the comments inline
Ryan Harper (raharper) wrote : | # |
Thanks for the update. Just a few requests for comments/updated commit message.
Anh Vo (MSFT) (vtqanh) wrote : | # |
I have updated the comments and updated the commit message.
Server Team CI bot (server-team-bot) wrote : | # |
PASSED: Continuous integration, rev:6311bab8422
https:/
Executed test runs:
SUCCESS: Checkout
SUCCESS: Unit & Style Tests
SUCCESS: Ubuntu LTS: Build
SUCCESS: Ubuntu LTS: Integration
IN_PROGRESS: Declarative: Post Actions
Click here to trigger a rebuild:
https:/
Ryan Harper (raharper) wrote : | # |
Thanks for updating.
Server Team CI bot (server-team-bot) : | # |
Server Team CI bot (server-team-bot) wrote : | # |
Commit message lints:
- Expected empty line on line 2 of the commit message - Line #1 has 26 too many characters. Line starts with: " + Truncate KVP Pool"... - Line #2 has 55 too many characters. Line starts with: " + No longer update previous"...
Preview Diff
1 | diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py |
2 | old mode 100644 |
3 | new mode 100755 |
4 | index 6d23558..10165ae |
5 | --- a/cloudinit/reporting/handlers.py |
6 | +++ b/cloudinit/reporting/handlers.py |
7 | @@ -5,7 +5,6 @@ import fcntl |
8 | import json |
9 | import six |
10 | import os |
11 | -import re |
12 | import struct |
13 | import threading |
14 | import time |
15 | @@ -14,6 +13,7 @@ from cloudinit import log as logging |
16 | from cloudinit.registry import DictRegistry |
17 | from cloudinit import (url_helper, util) |
18 | from datetime import datetime |
19 | +from six.moves.queue import Empty as QueueEmptyError |
20 | |
21 | if six.PY2: |
22 | from multiprocessing.queues import JoinableQueue as JQueue |
23 | @@ -129,24 +129,50 @@ class HyperVKvpReportingHandler(ReportingHandler): |
24 | DESC_IDX_KEY = 'msg_i' |
25 | JSON_SEPARATORS = (',', ':') |
26 | KVP_POOL_FILE_GUEST = '/var/lib/hyperv/.kvp_pool_1' |
27 | + _already_truncated_pool_file = False |
28 | |
29 | def __init__(self, |
30 | kvp_file_path=KVP_POOL_FILE_GUEST, |
31 | event_types=None): |
32 | super(HyperVKvpReportingHandler, self).__init__() |
33 | self._kvp_file_path = kvp_file_path |
34 | + HyperVKvpReportingHandler._truncate_guest_pool_file( |
35 | + self._kvp_file_path) |
36 | + |
37 | self._event_types = event_types |
38 | self.q = JQueue() |
39 | - self.kvp_file = None |
40 | self.incarnation_no = self._get_incarnation_no() |
41 | self.event_key_prefix = u"{0}|{1}".format(self.EVENT_PREFIX, |
42 | self.incarnation_no) |
43 | - self._current_offset = 0 |
44 | self.publish_thread = threading.Thread( |
45 | target=self._publish_event_routine) |
46 | self.publish_thread.daemon = True |
47 | self.publish_thread.start() |
48 | |
49 | + @classmethod |
50 | + def _truncate_guest_pool_file(cls, kvp_file): |
51 | + """ |
52 | + Truncate the pool file if it has not been truncated since boot. |
53 | + This should be done exactly once for the file indicated by |
54 | + KVP_POOL_FILE_GUEST constant above. This method takes a filename |
55 | + so that we can use an arbitrary file during unit testing. |
56 | + Since KVP is a best-effort telemetry channel we only attempt to |
57 | + truncate the file once and only if the file has not been modified |
58 | + since boot. Additional truncation can lead to loss of existing |
59 | + KVPs. |
60 | + """ |
61 | + if cls._already_truncated_pool_file: |
62 | + return |
63 | + boot_time = time.time() - float(util.uptime()) |
64 | + try: |
65 | + if os.path.getmtime(kvp_file) < boot_time: |
66 | + with open(kvp_file, "w"): |
67 | + pass |
68 | + except (OSError, IOError) as e: |
69 | + LOG.warning("failed to truncate kvp pool file, %s", e) |
70 | + finally: |
71 | + cls._already_truncated_pool_file = True |
72 | + |
73 | def _get_incarnation_no(self): |
74 | """ |
75 | use the time passed as the incarnation number. |
76 | @@ -162,20 +188,15 @@ class HyperVKvpReportingHandler(ReportingHandler): |
77 | |
78 | def _iterate_kvps(self, offset): |
79 | """iterate the kvp file from the current offset.""" |
80 | - try: |
81 | - with open(self._kvp_file_path, 'rb+') as f: |
82 | - self.kvp_file = f |
83 | - fcntl.flock(f, fcntl.LOCK_EX) |
84 | - f.seek(offset) |
85 | + with open(self._kvp_file_path, 'rb') as f: |
86 | + fcntl.flock(f, fcntl.LOCK_EX) |
87 | + f.seek(offset) |
88 | + record_data = f.read(self.HV_KVP_RECORD_SIZE) |
89 | + while len(record_data) == self.HV_KVP_RECORD_SIZE: |
90 | + kvp_item = self._decode_kvp_item(record_data) |
91 | + yield kvp_item |
92 | record_data = f.read(self.HV_KVP_RECORD_SIZE) |
93 | - while len(record_data) == self.HV_KVP_RECORD_SIZE: |
94 | - self._current_offset += self.HV_KVP_RECORD_SIZE |
95 | - kvp_item = self._decode_kvp_item(record_data) |
96 | - yield kvp_item |
97 | - record_data = f.read(self.HV_KVP_RECORD_SIZE) |
98 | - fcntl.flock(f, fcntl.LOCK_UN) |
99 | - finally: |
100 | - self.kvp_file = None |
101 | + fcntl.flock(f, fcntl.LOCK_UN) |
102 | |
103 | def _event_key(self, event): |
104 | """ |
105 | @@ -207,23 +228,13 @@ class HyperVKvpReportingHandler(ReportingHandler): |
106 | |
107 | return {'key': k, 'value': v} |
108 | |
109 | - def _update_kvp_item(self, record_data): |
110 | - if self.kvp_file is None: |
111 | - raise ReportException( |
112 | - "kvp file '{0}' not opened." |
113 | - .format(self._kvp_file_path)) |
114 | - self.kvp_file.seek(-self.HV_KVP_RECORD_SIZE, 1) |
115 | - self.kvp_file.write(record_data) |
116 | - |
117 | def _append_kvp_item(self, record_data): |
118 | - with open(self._kvp_file_path, 'rb+') as f: |
119 | + with open(self._kvp_file_path, 'ab') as f: |
120 | fcntl.flock(f, fcntl.LOCK_EX) |
121 | - # seek to end of the file |
122 | - f.seek(0, 2) |
123 | - f.write(record_data) |
124 | + for data in record_data: |
125 | + f.write(data) |
126 | f.flush() |
127 | fcntl.flock(f, fcntl.LOCK_UN) |
128 | - self._current_offset = f.tell() |
129 | |
130 | def _break_down(self, key, meta_data, description): |
131 | del meta_data[self.MSG_KEY] |
132 | @@ -279,40 +290,26 @@ class HyperVKvpReportingHandler(ReportingHandler): |
133 | |
134 | def _publish_event_routine(self): |
135 | while True: |
136 | + items_from_queue = 0 |
137 | try: |
138 | event = self.q.get(block=True) |
139 | - need_append = True |
140 | + items_from_queue += 1 |
141 | + encoded_data = [] |
142 | + while event is not None: |
143 | + encoded_data += self._encode_event(event) |
144 | + try: |
145 | + # get all the rest of the events in the queue |
146 | + event = self.q.get(block=False) |
147 | + items_from_queue += 1 |
148 | + except QueueEmptyError: |
149 | + event = None |
150 | try: |
151 | - if not os.path.exists(self._kvp_file_path): |
152 | - LOG.warning( |
153 | - "skip writing events %s to %s. file not present.", |
154 | - event.as_string(), |
155 | - self._kvp_file_path) |
156 | - encoded_event = self._encode_event(event) |
157 | - # for each encoded_event |
158 | - for encoded_data in (encoded_event): |
159 | - for kvp in self._iterate_kvps(self._current_offset): |
160 | - match = ( |
161 | - re.match( |
162 | - r"^{0}\|(\d+)\|.+" |
163 | - .format(self.EVENT_PREFIX), |
164 | - kvp['key'] |
165 | - )) |
166 | - if match: |
167 | - match_groups = match.groups(0) |
168 | - if int(match_groups[0]) < self.incarnation_no: |
169 | - need_append = False |
170 | - self._update_kvp_item(encoded_data) |
171 | - continue |
172 | - if need_append: |
173 | - self._append_kvp_item(encoded_data) |
174 | - except IOError as e: |
175 | - LOG.warning( |
176 | - "failed posting event to kvp: %s e:%s", |
177 | - event.as_string(), e) |
178 | + self._append_kvp_item(encoded_data) |
179 | + except (OSError, IOError) as e: |
180 | + LOG.warning("failed posting events to kvp, %s", e) |
181 | finally: |
182 | - self.q.task_done() |
183 | - |
184 | + for _ in range(items_from_queue): |
185 | + self.q.task_done() |
186 | # when main process exits, q.get() will through EOFError |
187 | # indicating we should exit this thread. |
188 | except EOFError: |
189 | @@ -322,7 +319,7 @@ class HyperVKvpReportingHandler(ReportingHandler): |
190 | # if the kvp pool already contains a chunk of data, |
191 | # so defer it to another thread. |
192 | def publish_event(self, event): |
193 | - if (not self._event_types or event.event_type in self._event_types): |
194 | + if not self._event_types or event.event_type in self._event_types: |
195 | self.q.put(event) |
196 | |
197 | def flush(self): |
198 | diff --git a/tests/unittests/test_reporting_hyperv.py b/tests/unittests/test_reporting_hyperv.py |
199 | old mode 100644 |
200 | new mode 100755 |
201 | index 2e64c6c..d01ed5b |
202 | --- a/tests/unittests/test_reporting_hyperv.py |
203 | +++ b/tests/unittests/test_reporting_hyperv.py |
204 | @@ -1,10 +1,12 @@ |
205 | # This file is part of cloud-init. See LICENSE file for license information. |
206 | |
207 | from cloudinit.reporting import events |
208 | -from cloudinit.reporting import handlers |
209 | +from cloudinit.reporting.handlers import HyperVKvpReportingHandler |
210 | |
211 | import json |
212 | import os |
213 | +import struct |
214 | +import time |
215 | |
216 | from cloudinit import util |
217 | from cloudinit.tests.helpers import CiTestCase |
218 | @@ -13,7 +15,7 @@ from cloudinit.tests.helpers import CiTestCase |
219 | class TestKvpEncoding(CiTestCase): |
220 | def test_encode_decode(self): |
221 | kvp = {'key': 'key1', 'value': 'value1'} |
222 | - kvp_reporting = handlers.HyperVKvpReportingHandler() |
223 | + kvp_reporting = HyperVKvpReportingHandler() |
224 | data = kvp_reporting._encode_kvp_item(kvp['key'], kvp['value']) |
225 | self.assertEqual(len(data), kvp_reporting.HV_KVP_RECORD_SIZE) |
226 | decoded_kvp = kvp_reporting._decode_kvp_item(data) |
227 | @@ -26,57 +28,9 @@ class TextKvpReporter(CiTestCase): |
228 | self.tmp_file_path = self.tmp_path('kvp_pool_file') |
229 | util.ensure_file(self.tmp_file_path) |
230 | |
231 | - def test_event_type_can_be_filtered(self): |
232 | - reporter = handlers.HyperVKvpReportingHandler( |
233 | - kvp_file_path=self.tmp_file_path, |
234 | - event_types=['foo', 'bar']) |
235 | - |
236 | - reporter.publish_event( |
237 | - events.ReportingEvent('foo', 'name', 'description')) |
238 | - reporter.publish_event( |
239 | - events.ReportingEvent('some_other', 'name', 'description3')) |
240 | - reporter.q.join() |
241 | - |
242 | - kvps = list(reporter._iterate_kvps(0)) |
243 | - self.assertEqual(1, len(kvps)) |
244 | - |
245 | - reporter.publish_event( |
246 | - events.ReportingEvent('bar', 'name', 'description2')) |
247 | - reporter.q.join() |
248 | - kvps = list(reporter._iterate_kvps(0)) |
249 | - self.assertEqual(2, len(kvps)) |
250 | - |
251 | - self.assertIn('foo', kvps[0]['key']) |
252 | - self.assertIn('bar', kvps[1]['key']) |
253 | - self.assertNotIn('some_other', kvps[0]['key']) |
254 | - self.assertNotIn('some_other', kvps[1]['key']) |
255 | - |
256 | - def test_events_are_over_written(self): |
257 | - reporter = handlers.HyperVKvpReportingHandler( |
258 | - kvp_file_path=self.tmp_file_path) |
259 | - |
260 | - self.assertEqual(0, len(list(reporter._iterate_kvps(0)))) |
261 | - |
262 | - reporter.publish_event( |
263 | - events.ReportingEvent('foo', 'name1', 'description')) |
264 | - reporter.publish_event( |
265 | - events.ReportingEvent('foo', 'name2', 'description')) |
266 | - reporter.q.join() |
267 | - self.assertEqual(2, len(list(reporter._iterate_kvps(0)))) |
268 | - |
269 | - reporter2 = handlers.HyperVKvpReportingHandler( |
270 | - kvp_file_path=self.tmp_file_path) |
271 | - reporter2.incarnation_no = reporter.incarnation_no + 1 |
272 | - reporter2.publish_event( |
273 | - events.ReportingEvent('foo', 'name3', 'description')) |
274 | - reporter2.q.join() |
275 | - |
276 | - self.assertEqual(2, len(list(reporter2._iterate_kvps(0)))) |
277 | - |
278 | def test_events_with_higher_incarnation_not_over_written(self): |
279 | - reporter = handlers.HyperVKvpReportingHandler( |
280 | + reporter = HyperVKvpReportingHandler( |
281 | kvp_file_path=self.tmp_file_path) |
282 | - |
283 | self.assertEqual(0, len(list(reporter._iterate_kvps(0)))) |
284 | |
285 | reporter.publish_event( |
286 | @@ -86,7 +40,7 @@ class TextKvpReporter(CiTestCase): |
287 | reporter.q.join() |
288 | self.assertEqual(2, len(list(reporter._iterate_kvps(0)))) |
289 | |
290 | - reporter3 = handlers.HyperVKvpReportingHandler( |
291 | + reporter3 = HyperVKvpReportingHandler( |
292 | kvp_file_path=self.tmp_file_path) |
293 | reporter3.incarnation_no = reporter.incarnation_no - 1 |
294 | reporter3.publish_event( |
295 | @@ -95,7 +49,7 @@ class TextKvpReporter(CiTestCase): |
296 | self.assertEqual(3, len(list(reporter3._iterate_kvps(0)))) |
297 | |
298 | def test_finish_event_result_is_logged(self): |
299 | - reporter = handlers.HyperVKvpReportingHandler( |
300 | + reporter = HyperVKvpReportingHandler( |
301 | kvp_file_path=self.tmp_file_path) |
302 | reporter.publish_event( |
303 | events.FinishReportingEvent('name2', 'description1', |
304 | @@ -105,7 +59,7 @@ class TextKvpReporter(CiTestCase): |
305 | |
306 | def test_file_operation_issue(self): |
307 | os.remove(self.tmp_file_path) |
308 | - reporter = handlers.HyperVKvpReportingHandler( |
309 | + reporter = HyperVKvpReportingHandler( |
310 | kvp_file_path=self.tmp_file_path) |
311 | reporter.publish_event( |
312 | events.FinishReportingEvent('name2', 'description1', |
313 | @@ -113,7 +67,7 @@ class TextKvpReporter(CiTestCase): |
314 | reporter.q.join() |
315 | |
316 | def test_event_very_long(self): |
317 | - reporter = handlers.HyperVKvpReportingHandler( |
318 | + reporter = HyperVKvpReportingHandler( |
319 | kvp_file_path=self.tmp_file_path) |
320 | description = 'ab' * reporter.HV_KVP_EXCHANGE_MAX_VALUE_SIZE |
321 | long_event = events.FinishReportingEvent( |
322 | @@ -132,3 +86,43 @@ class TextKvpReporter(CiTestCase): |
323 | self.assertEqual(msg_slice['msg_i'], i) |
324 | full_description += msg_slice['msg'] |
325 | self.assertEqual(description, full_description) |
326 | + |
327 | + def test_not_truncate_kvp_file_modified_after_boot(self): |
328 | + with open(self.tmp_file_path, "wb+") as f: |
329 | + kvp = {'key': 'key1', 'value': 'value1'} |
330 | + data = (struct.pack("%ds%ds" % ( |
331 | + HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_KEY_SIZE, |
332 | + HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_VALUE_SIZE), |
333 | + kvp['key'].encode('utf-8'), kvp['value'].encode('utf-8'))) |
334 | + f.write(data) |
335 | + cur_time = time.time() |
336 | + os.utime(self.tmp_file_path, (cur_time, cur_time)) |
337 | + |
338 | + # reset this because the unit test framework |
339 | + # has already polluted the class variable |
340 | + HyperVKvpReportingHandler._already_truncated_pool_file = False |
341 | + |
342 | + reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path) |
343 | + kvps = list(reporter._iterate_kvps(0)) |
344 | + self.assertEqual(1, len(kvps)) |
345 | + |
346 | + def test_truncate_stale_kvp_file(self): |
347 | + with open(self.tmp_file_path, "wb+") as f: |
348 | + kvp = {'key': 'key1', 'value': 'value1'} |
349 | + data = (struct.pack("%ds%ds" % ( |
350 | + HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_KEY_SIZE, |
351 | + HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_VALUE_SIZE), |
352 | + kvp['key'].encode('utf-8'), kvp['value'].encode('utf-8'))) |
353 | + f.write(data) |
354 | + |
355 | + # set the time ways back to make it look like |
356 | + # we had an old kvp file |
357 | + os.utime(self.tmp_file_path, (1000000, 1000000)) |
358 | + |
359 | + # reset this because the unit test framework |
360 | + # has already polluted the class variable |
361 | + HyperVKvpReportingHandler._already_truncated_pool_file = False |
362 | + |
363 | + reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path) |
364 | + kvps = list(reporter._iterate_kvps(0)) |
365 | + self.assertEqual(0, len(kvps)) |
I've pointed CI at this branch. A couple of inline comments and questions.