Merge ~chad.smith/cloud-init:feature/kvp-reporting into cloud-init:master

Proposed by Chad Smith
Status: Merged
Approved by: Chad Smith
Approved revision: 0124a19cbfc5367395eb888ca504b31910982650
Merge reported by: Server Team CI bot
Merged at revision: not available
Proposed branch: ~chad.smith/cloud-init:feature/kvp-reporting
Merge into: cloud-init:master
Diff against target: 510 lines (+396/-7)
7 files modified
cloudinit/cloud.py (+2/-2)
cloudinit/cmd/main.py (+2/-2)
cloudinit/distros/__init__.py (+1/-1)
cloudinit/reporting/__init__.py (+1/-1)
cloudinit/reporting/handlers.py (+255/-0)
cloudinit/stages.py (+1/-1)
tests/unittests/test_reporting_hyperv.py (+134/-0)
Reviewer Review Type Date Requested Status
Server Team CI bot continuous-integration Approve
cloud-init Commiters Pending
Review via email: mp+353739@code.launchpad.net

Commit message

logging: Add logging config type hyperv for reporting via Azure KVP

Linux guests can provide information to Hyper-V hosts via KVP.
KVP allows the guests to provide any string key-value-pairs back to the
host's registry. On linux, kvp communication pools are presented as pool
files in /var/lib/hyperv/.kvp_pool_#.

The following reporting configuration can enable this kvp reporting in
addition to default logging if the pool files exist:

reporting:
    logging:
        type: log
    telemetry:
        type: hyperv

Author: Andy Liu <email address hidden>

Description of the change

Grabbed original branch with a minor flakes/pycodestyle patch https://code.launchpad.net/~andyliuliming/cloud-init/+git/cloud-init/+merge/351742

To post a comment you must log in.
Revision history for this message
Server Team CI bot (server-team-bot) wrote :

PASSED: Continuous integration, rev:0124a19cbfc5367395eb888ca504b31910982650
https://jenkins.ubuntu.com/server/job/cloud-init-ci/259/
Executed test runs:
    SUCCESS: Checkout
    SUCCESS: Unit & Style Tests
    SUCCESS: Ubuntu LTS: Build
    SUCCESS: Ubuntu LTS: Integration
    IN_PROGRESS: Declarative: Post Actions

Click here to trigger a rebuild:
https://jenkins.ubuntu.com/server/job/cloud-init-ci/259/rebuild

review: Approve (continuous-integration)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
diff --git a/cloudinit/cloud.py b/cloudinit/cloud.py
index 6d12c43..7ae98e1 100644
--- a/cloudinit/cloud.py
+++ b/cloudinit/cloud.py
@@ -47,7 +47,7 @@ class Cloud(object):
4747
48 @property48 @property
49 def cfg(self):49 def cfg(self):
50 # Ensure that not indirectly modified50 # Ensure that cfg is not indirectly modified
51 return copy.deepcopy(self._cfg)51 return copy.deepcopy(self._cfg)
5252
53 def run(self, name, functor, args, freq=None, clear_on_fail=False):53 def run(self, name, functor, args, freq=None, clear_on_fail=False):
@@ -61,7 +61,7 @@ class Cloud(object):
61 return None61 return None
62 return fn62 return fn
6363
64 # The rest of thes are just useful proxies64 # The rest of these are just useful proxies
65 def get_userdata(self, apply_filter=True):65 def get_userdata(self, apply_filter=True):
66 return self.datasource.get_userdata(apply_filter)66 return self.datasource.get_userdata(apply_filter)
6767
diff --git a/cloudinit/cmd/main.py b/cloudinit/cmd/main.py
index d6ba90f..c0edee1 100644
--- a/cloudinit/cmd/main.py
+++ b/cloudinit/cmd/main.py
@@ -315,7 +315,7 @@ def main_init(name, args):
315 existing = "trust"315 existing = "trust"
316316
317 init.purge_cache()317 init.purge_cache()
318 # Delete the non-net file as well318 # Delete the no-net file as well
319 util.del_file(os.path.join(path_helper.get_cpath("data"), "no-net"))319 util.del_file(os.path.join(path_helper.get_cpath("data"), "no-net"))
320320
321 # Stage 5321 # Stage 5
@@ -339,7 +339,7 @@ def main_init(name, args):
339 " Likely bad things to come!"))339 " Likely bad things to come!"))
340 if not args.force:340 if not args.force:
341 init.apply_network_config(bring_up=not args.local)341 init.apply_network_config(bring_up=not args.local)
342 LOG.debug("[%s] Exiting without datasource in local mode", mode)342 LOG.debug("[%s] Exiting without datasource", mode)
343 if mode == sources.DSMODE_LOCAL:343 if mode == sources.DSMODE_LOCAL:
344 return (None, [])344 return (None, [])
345 else:345 else:
diff --git a/cloudinit/distros/__init__.py b/cloudinit/distros/__init__.py
index ab0b077..fde054e 100755
--- a/cloudinit/distros/__init__.py
+++ b/cloudinit/distros/__init__.py
@@ -157,7 +157,7 @@ class Distro(object):
157 distro)157 distro)
158 header = '\n'.join([158 header = '\n'.join([
159 "# Converted from network_config for distro %s" % distro,159 "# Converted from network_config for distro %s" % distro,
160 "# Implmentation of _write_network_config is needed."160 "# Implementation of _write_network_config is needed."
161 ])161 ])
162 ns = network_state.parse_net_config_data(netconfig)162 ns = network_state.parse_net_config_data(netconfig)
163 contents = eni.network_state_to_eni(163 contents = eni.network_state_to_eni(
diff --git a/cloudinit/reporting/__init__.py b/cloudinit/reporting/__init__.py
index 1ed2b48..e047767 100644
--- a/cloudinit/reporting/__init__.py
+++ b/cloudinit/reporting/__init__.py
@@ -18,7 +18,7 @@ DEFAULT_CONFIG = {
1818
1919
20def update_configuration(config):20def update_configuration(config):
21 """Update the instanciated_handler_registry.21 """Update the instantiated_handler_registry.
2222
23 :param config:23 :param config:
24 The dictionary containing changes to apply. If a key is given24 The dictionary containing changes to apply. If a key is given
diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py
index 4066076..4b4bb39 100644
--- a/cloudinit/reporting/handlers.py
+++ b/cloudinit/reporting/handlers.py
@@ -1,17 +1,34 @@
1# This file is part of cloud-init. See LICENSE file for license information.1# This file is part of cloud-init. See LICENSE file for license information.
22
3import abc3import abc
4import fcntl
4import json5import json
5import six6import six
7import os
8import re
9import struct
10import threading
11import time
612
7from cloudinit import log as logging13from cloudinit import log as logging
8from cloudinit.registry import DictRegistry14from cloudinit.registry import DictRegistry
9from cloudinit import (url_helper, util)15from cloudinit import (url_helper, util)
16from datetime import datetime
1017
18if six.PY2:
19 import multiprocessing.queues as queue
20 from multiprocessing.queues import JoinableQueue as JQueue
21else:
22 import queue
23 from queue import Queue as JQueue
1124
12LOG = logging.getLogger(__name__)25LOG = logging.getLogger(__name__)
1326
1427
28class ReportException(Exception):
29 pass
30
31
15@six.add_metaclass(abc.ABCMeta)32@six.add_metaclass(abc.ABCMeta)
16class ReportingHandler(object):33class ReportingHandler(object):
17 """Base class for report handlers.34 """Base class for report handlers.
@@ -85,9 +102,247 @@ class WebHookHandler(ReportingHandler):
85 LOG.warning("failed posting event: %s", event.as_string())102 LOG.warning("failed posting event: %s", event.as_string())
86103
87104
105class HyperVKvpReportingHandler(ReportingHandler):
106 """
107 Reports events to a Hyper-V host using Key-Value-Pair exchange protocol
108 and can be used to obtain high level diagnostic information from the host.
109
110 To use this facility, the KVP user-space daemon (hv_kvp_daemon) has to be
111 running. It reads the kvp_file when the host requests the guest to
112 enumerate the KVP's.
113
114 This reporter collates all events for a module (origin|name) in a single
115 json string in the dictionary.
116
117 For more information, see
118 https://technet.microsoft.com/en-us/library/dn798287.aspx#Linux%20guests
119 """
120 HV_KVP_EXCHANGE_MAX_VALUE_SIZE = 2048
121 HV_KVP_EXCHANGE_MAX_KEY_SIZE = 512
122 HV_KVP_RECORD_SIZE = (HV_KVP_EXCHANGE_MAX_KEY_SIZE +
123 HV_KVP_EXCHANGE_MAX_VALUE_SIZE)
124 EVENT_PREFIX = 'CLOUD_INIT'
125 MSG_KEY = 'msg'
126 RESULT_KEY = 'result'
127 DESC_IDX_KEY = 'msg_i'
128 JSON_SEPARATORS = (',', ':')
129 KVP_POOL_FILE_GUEST = '/var/lib/hyperv/.kvp_pool_1'
130
131 def __init__(self,
132 kvp_file_path=KVP_POOL_FILE_GUEST,
133 event_types=None):
134 super(HyperVKvpReportingHandler, self).__init__()
135 self._kvp_file_path = kvp_file_path
136 self._event_types = event_types
137 self.running = False
138 self.queue_lock = threading.Lock()
139 self.running_lock = threading.Lock()
140 self.q = JQueue()
141 self.kvp_file = None
142 self.incarnation_no = self._get_incarnation_no()
143 self.event_key_prefix = u"{0}|{1}".format(self.EVENT_PREFIX,
144 self.incarnation_no)
145 self._current_offset = 0
146
147 def _get_incarnation_no(self):
148 """
149 use the time passed as the incarnation number.
150 the incarnation number is the number which are used to
151 distinguish the old data stored in kvp and the new data.
152 """
153 uptime_str = util.uptime()
154 try:
155 return int(time.time() - float(uptime_str))
156 except ValueError:
157 LOG.warning("uptime '%s' not in correct format.", uptime_str)
158 return 0
159
160 def _iterate_kvps(self, offset):
161 """iterate the kvp file from the current offset."""
162 try:
163 with open(self._kvp_file_path, 'rb+') as f:
164 self.kvp_file = f
165 fcntl.flock(f, fcntl.LOCK_EX)
166 f.seek(offset)
167 record_data = f.read(self.HV_KVP_RECORD_SIZE)
168 while len(record_data) == self.HV_KVP_RECORD_SIZE:
169 self._current_offset += self.HV_KVP_RECORD_SIZE
170 kvp_item = self._decode_kvp_item(record_data)
171 yield kvp_item
172 record_data = f.read(self.HV_KVP_RECORD_SIZE)
173 fcntl.flock(f, fcntl.LOCK_UN)
174 finally:
175 self.kvp_file = None
176
177 def _event_key(self, event):
178 """
179 the event key format is:
180 CLOUD_INIT|<incarnation number>|<event_type>|<event_name>
181 """
182 return u"{0}|{1}|{2}".format(self.event_key_prefix,
183 event.event_type, event.name)
184
185 def _encode_kvp_item(self, key, value):
186 data = (struct.pack("%ds%ds" % (
187 self.HV_KVP_EXCHANGE_MAX_KEY_SIZE,
188 self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE),
189 key.encode('utf-8'), value.encode('utf-8')))
190 return data
191
192 def _decode_kvp_item(self, record_data):
193 record_data_len = len(record_data)
194 if record_data_len != self.HV_KVP_RECORD_SIZE:
195 raise ReportException(
196 "record_data len not correct {0} {1}."
197 .format(record_data_len, self.HV_KVP_RECORD_SIZE))
198 k = (record_data[0:self.HV_KVP_EXCHANGE_MAX_KEY_SIZE].decode('utf-8')
199 .strip('\x00'))
200 v = (
201 record_data[
202 self.HV_KVP_EXCHANGE_MAX_KEY_SIZE:self.HV_KVP_RECORD_SIZE
203 ].decode('utf-8').strip('\x00'))
204
205 return {'key': k, 'value': v}
206
207 def _update_kvp_item(self, record_data):
208 if self.kvp_file is None:
209 raise ReportException(
210 "kvp file '{0}' not opened."
211 .format(self._kvp_file_path))
212 self.kvp_file.seek(-self.HV_KVP_RECORD_SIZE, 1)
213 self.kvp_file.write(record_data)
214
215 def _append_kvp_item(self, record_data):
216 with open(self._kvp_file_path, 'rb+') as f:
217 fcntl.flock(f, fcntl.LOCK_EX)
218 # seek to end of the file
219 f.seek(0, 2)
220 f.write(record_data)
221 f.flush()
222 fcntl.flock(f, fcntl.LOCK_UN)
223 self._current_offset = f.tell()
224
225 def _break_down(self, key, meta_data, description):
226 del meta_data[self.MSG_KEY]
227 des_in_json = json.dumps(description)
228 des_in_json = des_in_json[1:(len(des_in_json) - 1)]
229 i = 0
230 result_array = []
231 message_place_holder = "\"" + self.MSG_KEY + "\":\"\""
232 while True:
233 meta_data[self.DESC_IDX_KEY] = i
234 meta_data[self.MSG_KEY] = ''
235 data_without_desc = json.dumps(meta_data,
236 separators=self.JSON_SEPARATORS)
237 room_for_desc = (
238 self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE -
239 len(data_without_desc) - 8)
240 value = data_without_desc.replace(
241 message_place_holder,
242 '"{key}":"{desc}"'.format(
243 key=self.MSG_KEY, desc=des_in_json[:room_for_desc]))
244 result_array.append(self._encode_kvp_item(key, value))
245 i += 1
246 des_in_json = des_in_json[room_for_desc:]
247 if len(des_in_json) == 0:
248 break
249 return result_array
250
251 def _encode_event(self, event):
252 """
253 encode the event into kvp data bytes.
254 if the event content reaches the maximum length of kvp value.
255 then it would be cut to multiple slices.
256 """
257 key = self._event_key(event)
258 meta_data = {
259 "name": event.name,
260 "type": event.event_type,
261 "ts": (datetime.utcfromtimestamp(event.timestamp)
262 .isoformat() + 'Z'),
263 }
264 if hasattr(event, self.RESULT_KEY):
265 meta_data[self.RESULT_KEY] = event.result
266 meta_data[self.MSG_KEY] = event.description
267 value = json.dumps(meta_data, separators=self.JSON_SEPARATORS)
268 # if it reaches the maximum length of kvp value,
269 # break it down to slices.
270 # this should be very corner case.
271 if len(value) > self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE:
272 return self._break_down(key, meta_data, event.description)
273 else:
274 data = self._encode_kvp_item(key, value)
275 return [data]
276
277 def _publish_event_routine(self):
278 while True:
279 event = None
280 try:
281 # acquire the lock.
282 event = self.q.get_nowait()
283 need_append = True
284 try:
285 if not os.path.exists(self._kvp_file_path):
286 LOG.warning(
287 "skip writing events %s to %s. file not present.",
288 event.as_string(),
289 self._kvp_file_path)
290 encoded_event = self._encode_event(event)
291 # for each encoded_event
292 for encoded_data in (encoded_event):
293 for kvp in self._iterate_kvps(self._current_offset):
294 match = (
295 re.match(
296 r"^{0}\|(\d+)\|.+"
297 .format(self.EVENT_PREFIX),
298 kvp['key']
299 ))
300 if match:
301 match_groups = match.groups(0)
302 if int(match_groups[0]) < self.incarnation_no:
303 need_append = False
304 self._update_kvp_item(encoded_data)
305 break
306 if need_append:
307 self._append_kvp_item(encoded_data)
308 except IOError as e:
309 LOG.warning(
310 "failed posting event to kvp: %s e:%s",
311 event.as_string(), e)
312 self.running = False
313 break
314 finally:
315 self.q.task_done()
316 except queue.Empty:
317 with self.queue_lock:
318 # double check the queue is empty
319 if self.q.empty():
320 self.running = False
321 break
322
323 def trigger_publish_event(self):
324 if not self.running:
325 with self.running_lock:
326 if not self.running:
327 self.running = True
328 thread = threading.Thread(
329 target=self._publish_event_routine)
330 thread.start()
331
332 # since the saving to the kvp pool can be a time costing task
333 # if the kvp pool already contains a chunk of data,
334 # so defer it to another thread.
335 def publish_event(self, event):
336 if (not self._event_types or event.event_type in self._event_types):
337 with self.queue_lock:
338 self.q.put(event)
339 self.trigger_publish_event()
340
341
88available_handlers = DictRegistry()342available_handlers = DictRegistry()
89available_handlers.register_item('log', LogHandler)343available_handlers.register_item('log', LogHandler)
90available_handlers.register_item('print', PrintHandler)344available_handlers.register_item('print', PrintHandler)
91available_handlers.register_item('webhook', WebHookHandler)345available_handlers.register_item('webhook', WebHookHandler)
346available_handlers.register_item('hyperv', HyperVKvpReportingHandler)
92347
93# vi: ts=4 expandtab348# vi: ts=4 expandtab
diff --git a/cloudinit/stages.py b/cloudinit/stages.py
index c132b57..8874d40 100644
--- a/cloudinit/stages.py
+++ b/cloudinit/stages.py
@@ -510,7 +510,7 @@ class Init(object):
510 # The default frequency if handlers don't have one510 # The default frequency if handlers don't have one
511 'frequency': frequency,511 'frequency': frequency,
512 # This will be used when new handlers are found512 # This will be used when new handlers are found
513 # to help write there contents to files with numbered513 # to help write their contents to files with numbered
514 # names...514 # names...
515 'handlercount': 0,515 'handlercount': 0,
516 'excluded': excluded,516 'excluded': excluded,
diff --git a/tests/unittests/test_reporting_hyperv.py b/tests/unittests/test_reporting_hyperv.py
517new file mode 100644517new file mode 100644
index 0000000..2e64c6c
--- /dev/null
+++ b/tests/unittests/test_reporting_hyperv.py
@@ -0,0 +1,134 @@
1# This file is part of cloud-init. See LICENSE file for license information.
2
3from cloudinit.reporting import events
4from cloudinit.reporting import handlers
5
6import json
7import os
8
9from cloudinit import util
10from cloudinit.tests.helpers import CiTestCase
11
12
13class TestKvpEncoding(CiTestCase):
14 def test_encode_decode(self):
15 kvp = {'key': 'key1', 'value': 'value1'}
16 kvp_reporting = handlers.HyperVKvpReportingHandler()
17 data = kvp_reporting._encode_kvp_item(kvp['key'], kvp['value'])
18 self.assertEqual(len(data), kvp_reporting.HV_KVP_RECORD_SIZE)
19 decoded_kvp = kvp_reporting._decode_kvp_item(data)
20 self.assertEqual(kvp, decoded_kvp)
21
22
23class TextKvpReporter(CiTestCase):
24 def setUp(self):
25 super(TextKvpReporter, self).setUp()
26 self.tmp_file_path = self.tmp_path('kvp_pool_file')
27 util.ensure_file(self.tmp_file_path)
28
29 def test_event_type_can_be_filtered(self):
30 reporter = handlers.HyperVKvpReportingHandler(
31 kvp_file_path=self.tmp_file_path,
32 event_types=['foo', 'bar'])
33
34 reporter.publish_event(
35 events.ReportingEvent('foo', 'name', 'description'))
36 reporter.publish_event(
37 events.ReportingEvent('some_other', 'name', 'description3'))
38 reporter.q.join()
39
40 kvps = list(reporter._iterate_kvps(0))
41 self.assertEqual(1, len(kvps))
42
43 reporter.publish_event(
44 events.ReportingEvent('bar', 'name', 'description2'))
45 reporter.q.join()
46 kvps = list(reporter._iterate_kvps(0))
47 self.assertEqual(2, len(kvps))
48
49 self.assertIn('foo', kvps[0]['key'])
50 self.assertIn('bar', kvps[1]['key'])
51 self.assertNotIn('some_other', kvps[0]['key'])
52 self.assertNotIn('some_other', kvps[1]['key'])
53
54 def test_events_are_over_written(self):
55 reporter = handlers.HyperVKvpReportingHandler(
56 kvp_file_path=self.tmp_file_path)
57
58 self.assertEqual(0, len(list(reporter._iterate_kvps(0))))
59
60 reporter.publish_event(
61 events.ReportingEvent('foo', 'name1', 'description'))
62 reporter.publish_event(
63 events.ReportingEvent('foo', 'name2', 'description'))
64 reporter.q.join()
65 self.assertEqual(2, len(list(reporter._iterate_kvps(0))))
66
67 reporter2 = handlers.HyperVKvpReportingHandler(
68 kvp_file_path=self.tmp_file_path)
69 reporter2.incarnation_no = reporter.incarnation_no + 1
70 reporter2.publish_event(
71 events.ReportingEvent('foo', 'name3', 'description'))
72 reporter2.q.join()
73
74 self.assertEqual(2, len(list(reporter2._iterate_kvps(0))))
75
76 def test_events_with_higher_incarnation_not_over_written(self):
77 reporter = handlers.HyperVKvpReportingHandler(
78 kvp_file_path=self.tmp_file_path)
79
80 self.assertEqual(0, len(list(reporter._iterate_kvps(0))))
81
82 reporter.publish_event(
83 events.ReportingEvent('foo', 'name1', 'description'))
84 reporter.publish_event(
85 events.ReportingEvent('foo', 'name2', 'description'))
86 reporter.q.join()
87 self.assertEqual(2, len(list(reporter._iterate_kvps(0))))
88
89 reporter3 = handlers.HyperVKvpReportingHandler(
90 kvp_file_path=self.tmp_file_path)
91 reporter3.incarnation_no = reporter.incarnation_no - 1
92 reporter3.publish_event(
93 events.ReportingEvent('foo', 'name3', 'description'))
94 reporter3.q.join()
95 self.assertEqual(3, len(list(reporter3._iterate_kvps(0))))
96
97 def test_finish_event_result_is_logged(self):
98 reporter = handlers.HyperVKvpReportingHandler(
99 kvp_file_path=self.tmp_file_path)
100 reporter.publish_event(
101 events.FinishReportingEvent('name2', 'description1',
102 result=events.status.FAIL))
103 reporter.q.join()
104 self.assertIn('FAIL', list(reporter._iterate_kvps(0))[0]['value'])
105
106 def test_file_operation_issue(self):
107 os.remove(self.tmp_file_path)
108 reporter = handlers.HyperVKvpReportingHandler(
109 kvp_file_path=self.tmp_file_path)
110 reporter.publish_event(
111 events.FinishReportingEvent('name2', 'description1',
112 result=events.status.FAIL))
113 reporter.q.join()
114
115 def test_event_very_long(self):
116 reporter = handlers.HyperVKvpReportingHandler(
117 kvp_file_path=self.tmp_file_path)
118 description = 'ab' * reporter.HV_KVP_EXCHANGE_MAX_VALUE_SIZE
119 long_event = events.FinishReportingEvent(
120 'event_name',
121 description,
122 result=events.status.FAIL)
123 reporter.publish_event(long_event)
124 reporter.q.join()
125 kvps = list(reporter._iterate_kvps(0))
126 self.assertEqual(3, len(kvps))
127
128 # restore from the kvp to see the content are all there
129 full_description = ''
130 for i in range(len(kvps)):
131 msg_slice = json.loads(kvps[i]['value'])
132 self.assertEqual(msg_slice['msg_i'], i)
133 full_description += msg_slice['msg']
134 self.assertEqual(description, full_description)

Subscribers

People subscribed via source and target branches