Merge ~chad.smith/cloud-init:feature/kvp-reporting into cloud-init:master

Proposed by Chad Smith
Status: Merged
Approved by: Chad Smith
Approved revision: 0124a19cbfc5367395eb888ca504b31910982650
Merge reported by: Server Team CI bot
Merged at revision: not available
Proposed branch: ~chad.smith/cloud-init:feature/kvp-reporting
Merge into: cloud-init:master
Diff against target: 510 lines (+396/-7)
7 files modified
cloudinit/cloud.py (+2/-2)
cloudinit/cmd/main.py (+2/-2)
cloudinit/distros/__init__.py (+1/-1)
cloudinit/reporting/__init__.py (+1/-1)
cloudinit/reporting/handlers.py (+255/-0)
cloudinit/stages.py (+1/-1)
tests/unittests/test_reporting_hyperv.py (+134/-0)
Reviewer Review Type Date Requested Status
Server Team CI bot continuous-integration Approve
cloud-init Commiters Pending
Review via email: mp+353739@code.launchpad.net

Commit message

logging: Add logging config type hyperv for reporting via Azure KVP

Linux guests can provide information to Hyper-V hosts via KVP.
KVP allows the guests to provide any string key-value-pairs back to the
host's registry. On linux, kvp communication pools are presented as pool
files in /var/lib/hyperv/.kvp_pool_#.

The following reporting configuration can enable this kvp reporting in
addition to default logging if the pool files exist:

reporting:
    logging:
        type: log
    telemetry:
        type: hyperv

Author: Andy Liu <email address hidden>

Description of the change

Grabbed original branch with a minor flakes/pycodestyle patch https://code.launchpad.net/~andyliuliming/cloud-init/+git/cloud-init/+merge/351742

To post a comment you must log in.
Revision history for this message
Server Team CI bot (server-team-bot) wrote :

PASSED: Continuous integration, rev:0124a19cbfc5367395eb888ca504b31910982650
https://jenkins.ubuntu.com/server/job/cloud-init-ci/259/
Executed test runs:
    SUCCESS: Checkout
    SUCCESS: Unit & Style Tests
    SUCCESS: Ubuntu LTS: Build
    SUCCESS: Ubuntu LTS: Integration
    IN_PROGRESS: Declarative: Post Actions

Click here to trigger a rebuild:
https://jenkins.ubuntu.com/server/job/cloud-init-ci/259/rebuild

review: Approve (continuous-integration)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/cloudinit/cloud.py b/cloudinit/cloud.py
2index 6d12c43..7ae98e1 100644
3--- a/cloudinit/cloud.py
4+++ b/cloudinit/cloud.py
5@@ -47,7 +47,7 @@ class Cloud(object):
6
7 @property
8 def cfg(self):
9- # Ensure that not indirectly modified
10+ # Ensure that cfg is not indirectly modified
11 return copy.deepcopy(self._cfg)
12
13 def run(self, name, functor, args, freq=None, clear_on_fail=False):
14@@ -61,7 +61,7 @@ class Cloud(object):
15 return None
16 return fn
17
18- # The rest of thes are just useful proxies
19+ # The rest of these are just useful proxies
20 def get_userdata(self, apply_filter=True):
21 return self.datasource.get_userdata(apply_filter)
22
23diff --git a/cloudinit/cmd/main.py b/cloudinit/cmd/main.py
24index d6ba90f..c0edee1 100644
25--- a/cloudinit/cmd/main.py
26+++ b/cloudinit/cmd/main.py
27@@ -315,7 +315,7 @@ def main_init(name, args):
28 existing = "trust"
29
30 init.purge_cache()
31- # Delete the non-net file as well
32+ # Delete the no-net file as well
33 util.del_file(os.path.join(path_helper.get_cpath("data"), "no-net"))
34
35 # Stage 5
36@@ -339,7 +339,7 @@ def main_init(name, args):
37 " Likely bad things to come!"))
38 if not args.force:
39 init.apply_network_config(bring_up=not args.local)
40- LOG.debug("[%s] Exiting without datasource in local mode", mode)
41+ LOG.debug("[%s] Exiting without datasource", mode)
42 if mode == sources.DSMODE_LOCAL:
43 return (None, [])
44 else:
45diff --git a/cloudinit/distros/__init__.py b/cloudinit/distros/__init__.py
46index ab0b077..fde054e 100755
47--- a/cloudinit/distros/__init__.py
48+++ b/cloudinit/distros/__init__.py
49@@ -157,7 +157,7 @@ class Distro(object):
50 distro)
51 header = '\n'.join([
52 "# Converted from network_config for distro %s" % distro,
53- "# Implmentation of _write_network_config is needed."
54+ "# Implementation of _write_network_config is needed."
55 ])
56 ns = network_state.parse_net_config_data(netconfig)
57 contents = eni.network_state_to_eni(
58diff --git a/cloudinit/reporting/__init__.py b/cloudinit/reporting/__init__.py
59index 1ed2b48..e047767 100644
60--- a/cloudinit/reporting/__init__.py
61+++ b/cloudinit/reporting/__init__.py
62@@ -18,7 +18,7 @@ DEFAULT_CONFIG = {
63
64
65 def update_configuration(config):
66- """Update the instanciated_handler_registry.
67+ """Update the instantiated_handler_registry.
68
69 :param config:
70 The dictionary containing changes to apply. If a key is given
71diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py
72index 4066076..4b4bb39 100644
73--- a/cloudinit/reporting/handlers.py
74+++ b/cloudinit/reporting/handlers.py
75@@ -1,17 +1,34 @@
76 # This file is part of cloud-init. See LICENSE file for license information.
77
78 import abc
79+import fcntl
80 import json
81 import six
82+import os
83+import re
84+import struct
85+import threading
86+import time
87
88 from cloudinit import log as logging
89 from cloudinit.registry import DictRegistry
90 from cloudinit import (url_helper, util)
91+from datetime import datetime
92
93+if six.PY2:
94+ import multiprocessing.queues as queue
95+ from multiprocessing.queues import JoinableQueue as JQueue
96+else:
97+ import queue
98+ from queue import Queue as JQueue
99
100 LOG = logging.getLogger(__name__)
101
102
103+class ReportException(Exception):
104+ pass
105+
106+
107 @six.add_metaclass(abc.ABCMeta)
108 class ReportingHandler(object):
109 """Base class for report handlers.
110@@ -85,9 +102,247 @@ class WebHookHandler(ReportingHandler):
111 LOG.warning("failed posting event: %s", event.as_string())
112
113
114+class HyperVKvpReportingHandler(ReportingHandler):
115+ """
116+ Reports events to a Hyper-V host using Key-Value-Pair exchange protocol
117+ and can be used to obtain high level diagnostic information from the host.
118+
119+ To use this facility, the KVP user-space daemon (hv_kvp_daemon) has to be
120+ running. It reads the kvp_file when the host requests the guest to
121+ enumerate the KVP's.
122+
123+ This reporter collates all events for a module (origin|name) in a single
124+ json string in the dictionary.
125+
126+ For more information, see
127+ https://technet.microsoft.com/en-us/library/dn798287.aspx#Linux%20guests
128+ """
129+ HV_KVP_EXCHANGE_MAX_VALUE_SIZE = 2048
130+ HV_KVP_EXCHANGE_MAX_KEY_SIZE = 512
131+ HV_KVP_RECORD_SIZE = (HV_KVP_EXCHANGE_MAX_KEY_SIZE +
132+ HV_KVP_EXCHANGE_MAX_VALUE_SIZE)
133+ EVENT_PREFIX = 'CLOUD_INIT'
134+ MSG_KEY = 'msg'
135+ RESULT_KEY = 'result'
136+ DESC_IDX_KEY = 'msg_i'
137+ JSON_SEPARATORS = (',', ':')
138+ KVP_POOL_FILE_GUEST = '/var/lib/hyperv/.kvp_pool_1'
139+
140+ def __init__(self,
141+ kvp_file_path=KVP_POOL_FILE_GUEST,
142+ event_types=None):
143+ super(HyperVKvpReportingHandler, self).__init__()
144+ self._kvp_file_path = kvp_file_path
145+ self._event_types = event_types
146+ self.running = False
147+ self.queue_lock = threading.Lock()
148+ self.running_lock = threading.Lock()
149+ self.q = JQueue()
150+ self.kvp_file = None
151+ self.incarnation_no = self._get_incarnation_no()
152+ self.event_key_prefix = u"{0}|{1}".format(self.EVENT_PREFIX,
153+ self.incarnation_no)
154+ self._current_offset = 0
155+
156+ def _get_incarnation_no(self):
157+ """
158+ use the time passed as the incarnation number.
159+ the incarnation number is the number which are used to
160+ distinguish the old data stored in kvp and the new data.
161+ """
162+ uptime_str = util.uptime()
163+ try:
164+ return int(time.time() - float(uptime_str))
165+ except ValueError:
166+ LOG.warning("uptime '%s' not in correct format.", uptime_str)
167+ return 0
168+
169+ def _iterate_kvps(self, offset):
170+ """iterate the kvp file from the current offset."""
171+ try:
172+ with open(self._kvp_file_path, 'rb+') as f:
173+ self.kvp_file = f
174+ fcntl.flock(f, fcntl.LOCK_EX)
175+ f.seek(offset)
176+ record_data = f.read(self.HV_KVP_RECORD_SIZE)
177+ while len(record_data) == self.HV_KVP_RECORD_SIZE:
178+ self._current_offset += self.HV_KVP_RECORD_SIZE
179+ kvp_item = self._decode_kvp_item(record_data)
180+ yield kvp_item
181+ record_data = f.read(self.HV_KVP_RECORD_SIZE)
182+ fcntl.flock(f, fcntl.LOCK_UN)
183+ finally:
184+ self.kvp_file = None
185+
186+ def _event_key(self, event):
187+ """
188+ the event key format is:
189+ CLOUD_INIT|<incarnation number>|<event_type>|<event_name>
190+ """
191+ return u"{0}|{1}|{2}".format(self.event_key_prefix,
192+ event.event_type, event.name)
193+
194+ def _encode_kvp_item(self, key, value):
195+ data = (struct.pack("%ds%ds" % (
196+ self.HV_KVP_EXCHANGE_MAX_KEY_SIZE,
197+ self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE),
198+ key.encode('utf-8'), value.encode('utf-8')))
199+ return data
200+
201+ def _decode_kvp_item(self, record_data):
202+ record_data_len = len(record_data)
203+ if record_data_len != self.HV_KVP_RECORD_SIZE:
204+ raise ReportException(
205+ "record_data len not correct {0} {1}."
206+ .format(record_data_len, self.HV_KVP_RECORD_SIZE))
207+ k = (record_data[0:self.HV_KVP_EXCHANGE_MAX_KEY_SIZE].decode('utf-8')
208+ .strip('\x00'))
209+ v = (
210+ record_data[
211+ self.HV_KVP_EXCHANGE_MAX_KEY_SIZE:self.HV_KVP_RECORD_SIZE
212+ ].decode('utf-8').strip('\x00'))
213+
214+ return {'key': k, 'value': v}
215+
216+ def _update_kvp_item(self, record_data):
217+ if self.kvp_file is None:
218+ raise ReportException(
219+ "kvp file '{0}' not opened."
220+ .format(self._kvp_file_path))
221+ self.kvp_file.seek(-self.HV_KVP_RECORD_SIZE, 1)
222+ self.kvp_file.write(record_data)
223+
224+ def _append_kvp_item(self, record_data):
225+ with open(self._kvp_file_path, 'rb+') as f:
226+ fcntl.flock(f, fcntl.LOCK_EX)
227+ # seek to end of the file
228+ f.seek(0, 2)
229+ f.write(record_data)
230+ f.flush()
231+ fcntl.flock(f, fcntl.LOCK_UN)
232+ self._current_offset = f.tell()
233+
234+ def _break_down(self, key, meta_data, description):
235+ del meta_data[self.MSG_KEY]
236+ des_in_json = json.dumps(description)
237+ des_in_json = des_in_json[1:(len(des_in_json) - 1)]
238+ i = 0
239+ result_array = []
240+ message_place_holder = "\"" + self.MSG_KEY + "\":\"\""
241+ while True:
242+ meta_data[self.DESC_IDX_KEY] = i
243+ meta_data[self.MSG_KEY] = ''
244+ data_without_desc = json.dumps(meta_data,
245+ separators=self.JSON_SEPARATORS)
246+ room_for_desc = (
247+ self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE -
248+ len(data_without_desc) - 8)
249+ value = data_without_desc.replace(
250+ message_place_holder,
251+ '"{key}":"{desc}"'.format(
252+ key=self.MSG_KEY, desc=des_in_json[:room_for_desc]))
253+ result_array.append(self._encode_kvp_item(key, value))
254+ i += 1
255+ des_in_json = des_in_json[room_for_desc:]
256+ if len(des_in_json) == 0:
257+ break
258+ return result_array
259+
260+ def _encode_event(self, event):
261+ """
262+ encode the event into kvp data bytes.
263+ if the event content reaches the maximum length of kvp value.
264+ then it would be cut to multiple slices.
265+ """
266+ key = self._event_key(event)
267+ meta_data = {
268+ "name": event.name,
269+ "type": event.event_type,
270+ "ts": (datetime.utcfromtimestamp(event.timestamp)
271+ .isoformat() + 'Z'),
272+ }
273+ if hasattr(event, self.RESULT_KEY):
274+ meta_data[self.RESULT_KEY] = event.result
275+ meta_data[self.MSG_KEY] = event.description
276+ value = json.dumps(meta_data, separators=self.JSON_SEPARATORS)
277+ # if it reaches the maximum length of kvp value,
278+ # break it down to slices.
279+ # this should be very corner case.
280+ if len(value) > self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE:
281+ return self._break_down(key, meta_data, event.description)
282+ else:
283+ data = self._encode_kvp_item(key, value)
284+ return [data]
285+
286+ def _publish_event_routine(self):
287+ while True:
288+ event = None
289+ try:
290+ # acquire the lock.
291+ event = self.q.get_nowait()
292+ need_append = True
293+ try:
294+ if not os.path.exists(self._kvp_file_path):
295+ LOG.warning(
296+ "skip writing events %s to %s. file not present.",
297+ event.as_string(),
298+ self._kvp_file_path)
299+ encoded_event = self._encode_event(event)
300+ # for each encoded_event
301+ for encoded_data in (encoded_event):
302+ for kvp in self._iterate_kvps(self._current_offset):
303+ match = (
304+ re.match(
305+ r"^{0}\|(\d+)\|.+"
306+ .format(self.EVENT_PREFIX),
307+ kvp['key']
308+ ))
309+ if match:
310+ match_groups = match.groups(0)
311+ if int(match_groups[0]) < self.incarnation_no:
312+ need_append = False
313+ self._update_kvp_item(encoded_data)
314+ break
315+ if need_append:
316+ self._append_kvp_item(encoded_data)
317+ except IOError as e:
318+ LOG.warning(
319+ "failed posting event to kvp: %s e:%s",
320+ event.as_string(), e)
321+ self.running = False
322+ break
323+ finally:
324+ self.q.task_done()
325+ except queue.Empty:
326+ with self.queue_lock:
327+ # double check the queue is empty
328+ if self.q.empty():
329+ self.running = False
330+ break
331+
332+ def trigger_publish_event(self):
333+ if not self.running:
334+ with self.running_lock:
335+ if not self.running:
336+ self.running = True
337+ thread = threading.Thread(
338+ target=self._publish_event_routine)
339+ thread.start()
340+
341+ # since the saving to the kvp pool can be a time costing task
342+ # if the kvp pool already contains a chunk of data,
343+ # so defer it to another thread.
344+ def publish_event(self, event):
345+ if (not self._event_types or event.event_type in self._event_types):
346+ with self.queue_lock:
347+ self.q.put(event)
348+ self.trigger_publish_event()
349+
350+
351 available_handlers = DictRegistry()
352 available_handlers.register_item('log', LogHandler)
353 available_handlers.register_item('print', PrintHandler)
354 available_handlers.register_item('webhook', WebHookHandler)
355+available_handlers.register_item('hyperv', HyperVKvpReportingHandler)
356
357 # vi: ts=4 expandtab
358diff --git a/cloudinit/stages.py b/cloudinit/stages.py
359index c132b57..8874d40 100644
360--- a/cloudinit/stages.py
361+++ b/cloudinit/stages.py
362@@ -510,7 +510,7 @@ class Init(object):
363 # The default frequency if handlers don't have one
364 'frequency': frequency,
365 # This will be used when new handlers are found
366- # to help write there contents to files with numbered
367+ # to help write their contents to files with numbered
368 # names...
369 'handlercount': 0,
370 'excluded': excluded,
371diff --git a/tests/unittests/test_reporting_hyperv.py b/tests/unittests/test_reporting_hyperv.py
372new file mode 100644
373index 0000000..2e64c6c
374--- /dev/null
375+++ b/tests/unittests/test_reporting_hyperv.py
376@@ -0,0 +1,134 @@
377+# This file is part of cloud-init. See LICENSE file for license information.
378+
379+from cloudinit.reporting import events
380+from cloudinit.reporting import handlers
381+
382+import json
383+import os
384+
385+from cloudinit import util
386+from cloudinit.tests.helpers import CiTestCase
387+
388+
389+class TestKvpEncoding(CiTestCase):
390+ def test_encode_decode(self):
391+ kvp = {'key': 'key1', 'value': 'value1'}
392+ kvp_reporting = handlers.HyperVKvpReportingHandler()
393+ data = kvp_reporting._encode_kvp_item(kvp['key'], kvp['value'])
394+ self.assertEqual(len(data), kvp_reporting.HV_KVP_RECORD_SIZE)
395+ decoded_kvp = kvp_reporting._decode_kvp_item(data)
396+ self.assertEqual(kvp, decoded_kvp)
397+
398+
399+class TextKvpReporter(CiTestCase):
400+ def setUp(self):
401+ super(TextKvpReporter, self).setUp()
402+ self.tmp_file_path = self.tmp_path('kvp_pool_file')
403+ util.ensure_file(self.tmp_file_path)
404+
405+ def test_event_type_can_be_filtered(self):
406+ reporter = handlers.HyperVKvpReportingHandler(
407+ kvp_file_path=self.tmp_file_path,
408+ event_types=['foo', 'bar'])
409+
410+ reporter.publish_event(
411+ events.ReportingEvent('foo', 'name', 'description'))
412+ reporter.publish_event(
413+ events.ReportingEvent('some_other', 'name', 'description3'))
414+ reporter.q.join()
415+
416+ kvps = list(reporter._iterate_kvps(0))
417+ self.assertEqual(1, len(kvps))
418+
419+ reporter.publish_event(
420+ events.ReportingEvent('bar', 'name', 'description2'))
421+ reporter.q.join()
422+ kvps = list(reporter._iterate_kvps(0))
423+ self.assertEqual(2, len(kvps))
424+
425+ self.assertIn('foo', kvps[0]['key'])
426+ self.assertIn('bar', kvps[1]['key'])
427+ self.assertNotIn('some_other', kvps[0]['key'])
428+ self.assertNotIn('some_other', kvps[1]['key'])
429+
430+ def test_events_are_over_written(self):
431+ reporter = handlers.HyperVKvpReportingHandler(
432+ kvp_file_path=self.tmp_file_path)
433+
434+ self.assertEqual(0, len(list(reporter._iterate_kvps(0))))
435+
436+ reporter.publish_event(
437+ events.ReportingEvent('foo', 'name1', 'description'))
438+ reporter.publish_event(
439+ events.ReportingEvent('foo', 'name2', 'description'))
440+ reporter.q.join()
441+ self.assertEqual(2, len(list(reporter._iterate_kvps(0))))
442+
443+ reporter2 = handlers.HyperVKvpReportingHandler(
444+ kvp_file_path=self.tmp_file_path)
445+ reporter2.incarnation_no = reporter.incarnation_no + 1
446+ reporter2.publish_event(
447+ events.ReportingEvent('foo', 'name3', 'description'))
448+ reporter2.q.join()
449+
450+ self.assertEqual(2, len(list(reporter2._iterate_kvps(0))))
451+
452+ def test_events_with_higher_incarnation_not_over_written(self):
453+ reporter = handlers.HyperVKvpReportingHandler(
454+ kvp_file_path=self.tmp_file_path)
455+
456+ self.assertEqual(0, len(list(reporter._iterate_kvps(0))))
457+
458+ reporter.publish_event(
459+ events.ReportingEvent('foo', 'name1', 'description'))
460+ reporter.publish_event(
461+ events.ReportingEvent('foo', 'name2', 'description'))
462+ reporter.q.join()
463+ self.assertEqual(2, len(list(reporter._iterate_kvps(0))))
464+
465+ reporter3 = handlers.HyperVKvpReportingHandler(
466+ kvp_file_path=self.tmp_file_path)
467+ reporter3.incarnation_no = reporter.incarnation_no - 1
468+ reporter3.publish_event(
469+ events.ReportingEvent('foo', 'name3', 'description'))
470+ reporter3.q.join()
471+ self.assertEqual(3, len(list(reporter3._iterate_kvps(0))))
472+
473+ def test_finish_event_result_is_logged(self):
474+ reporter = handlers.HyperVKvpReportingHandler(
475+ kvp_file_path=self.tmp_file_path)
476+ reporter.publish_event(
477+ events.FinishReportingEvent('name2', 'description1',
478+ result=events.status.FAIL))
479+ reporter.q.join()
480+ self.assertIn('FAIL', list(reporter._iterate_kvps(0))[0]['value'])
481+
482+ def test_file_operation_issue(self):
483+ os.remove(self.tmp_file_path)
484+ reporter = handlers.HyperVKvpReportingHandler(
485+ kvp_file_path=self.tmp_file_path)
486+ reporter.publish_event(
487+ events.FinishReportingEvent('name2', 'description1',
488+ result=events.status.FAIL))
489+ reporter.q.join()
490+
491+ def test_event_very_long(self):
492+ reporter = handlers.HyperVKvpReportingHandler(
493+ kvp_file_path=self.tmp_file_path)
494+ description = 'ab' * reporter.HV_KVP_EXCHANGE_MAX_VALUE_SIZE
495+ long_event = events.FinishReportingEvent(
496+ 'event_name',
497+ description,
498+ result=events.status.FAIL)
499+ reporter.publish_event(long_event)
500+ reporter.q.join()
501+ kvps = list(reporter._iterate_kvps(0))
502+ self.assertEqual(3, len(kvps))
503+
504+ # restore from the kvp to see the content are all there
505+ full_description = ''
506+ for i in range(len(kvps)):
507+ msg_slice = json.loads(kvps[i]['value'])
508+ self.assertEqual(msg_slice['msg_i'], i)
509+ full_description += msg_slice['msg']
510+ self.assertEqual(description, full_description)

Subscribers

People subscribed via source and target branches