Merge ~andyliuliming/cloud-init:reporting into cloud-init:master
- Git
- lp:~andyliuliming/cloud-init
- reporting
- Merge into master
Status: | Rejected | ||||
---|---|---|---|---|---|
Rejected by: | Chad Smith | ||||
Proposed branch: | ~andyliuliming/cloud-init:reporting | ||||
Merge into: | cloud-init:master | ||||
Diff against target: |
511 lines (+397/-7) 7 files modified
cloudinit/cloud.py (+2/-2) cloudinit/cmd/main.py (+2/-2) cloudinit/distros/__init__.py (+1/-1) cloudinit/reporting/__init__.py (+1/-1) cloudinit/reporting/handlers.py (+255/-0) cloudinit/stages.py (+1/-1) tests/unittests/test_reporting_hyperv.py (+135/-0) |
||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Chad Smith | Disapprove | ||
Server Team CI bot | continuous-integration | Needs Fixing | |
Ryan Harper | Needs Fixing | ||
Review via email: mp+351742@code.launchpad.net |
Commit message
Description of the change
added one option to send the cloud init events to the kvp in hyper-v.
Andy (andyliuliming) wrote : | # |
Andy (andyliuliming) wrote : | # |
Loop the owners of the cloud-init in๐
From: Liu Liming <email address hidden>
Date: Saturday, August 4, 2018 at 10:54 AM
To: "<email address hidden>" <email address hidden>
Subject: Hi cloud-init owners, could you please help review this PR?
Hi Cloud-init owners,
In this PR, one option to send the cloud init events to the kvp pool is added.
Customers can do config like this:
cat > /etc/cloud/
reporting:
logging:
type: log
telemetry:
type: hyperv
EOF
Then the events would be send into the kvp pool.
Thanks,
Andy
Paul Meyer (paul-meyer) wrote : | # |
I added some questions and I also think we need to revert the non-kvp functional changes. I guess those were in the branch I pointed you at...
Ryan Harper (raharper) : | # |
Andy (andyliuliming) wrote : | # |
resolved the comments and send out a new MR.
Server Team CI bot (server-team-bot) wrote : | # |
FAILED: Continuous integration, rev:688cd645006
https:/
Executed test runs:
SUCCESS: Checkout
FAILED: Unit & Style Tests
Click here to trigger a rebuild:
https:/
Server Team CI bot (server-team-bot) wrote : | # |
FAILED: Continuous integration, rev:da25a148e77
https:/
Executed test runs:
SUCCESS: Checkout
FAILED: Unit & Style Tests
Click here to trigger a rebuild:
https:/
Server Team CI bot (server-team-bot) wrote : | # |
FAILED: Continuous integration, rev:662011b505f
https:/
Executed test runs:
SUCCESS: Checkout
FAILED: Unit & Style Tests
Click here to trigger a rebuild:
https:/
Server Team CI bot (server-team-bot) wrote : | # |
FAILED: Continuous integration, rev:437ed8b8a09
https:/
Executed test runs:
SUCCESS: Checkout
FAILED: Unit & Style Tests
Click here to trigger a rebuild:
https:/
Server Team CI bot (server-team-bot) wrote : | # |
PASSED: Continuous integration, rev:338859269f1
https:/
Executed test runs:
SUCCESS: Checkout
SUCCESS: Unit & Style Tests
SUCCESS: Ubuntu LTS: Build
SUCCESS: Ubuntu LTS: Integration
SUCCESS: MAAS Compatability Testing
IN_PROGRESS: Declarative: Post Actions
Click here to trigger a rebuild:
https:/
Chad Smith (chad.smith) : | # |
Chad Smith (chad.smith) : | # |
- af45f66... by Andy
-
resolve the code review comments.
- 85b1209... by Andy
-
1. resolve comments. 2. update some unit test.
Andy (andyliuliming) : | # |
Server Team CI bot (server-team-bot) wrote : | # |
FAILED: Continuous integration, rev:85b12092cff
https:/
Executed test runs:
SUCCESS: Checkout
FAILED: Unit & Style Tests
Click here to trigger a rebuild:
https:/
Paul Meyer (paul-meyer) wrote : | # |
Here's a patch for the two style fixes that are needed:
diff --git a/cloudinit/
index c77bf2d1..e12d8ea8 100644
--- a/cloudinit/
+++ b/cloudinit/
@@ -284,7 +284,8 @@ class HyperVKvpReport
- "skip writing event %s to %s because file not exists.",
+ "skip writing event %s to %s because " +
+ "file not exists.",
diff --git a/tests/
index 0b62506a..2e64c6c7 100644
--- a/tests/
+++ b/tests/
@@ -5,7 +5,6 @@ from cloudinit.reporting import handlers
import json
import os
-import tempfile
from cloudinit import util
from cloudinit.
Chad Smith (chad.smith) wrote : | # |
Thanks for this content Andy, I'm pulling this branch in with a minor flakes/pycodestyle fix and will attribute authorship to you. Thanks again for the contribution.
The branch that will land will be this one.
Marking this one as Rejected in favor of the followup:
https:/
Unmerged commits
- 85b1209... by Andy
-
1. resolve comments. 2. update some unit test.
- af45f66... by Andy
-
resolve the code review comments.
- 3388592... by Andy
-
overwrite the event passed.
- 5d68f97... by Paul Meyer
-
Add Hyper-V KVP reporter
- e8b9a57... by Paul Meyer
-
Fix inaccurate logging
- 3246f4c... by Paul Meyer
-
Add extra util.subp logging
- 9e38bf8... by Paul Meyer
-
typos
Preview Diff
1 | diff --git a/cloudinit/cloud.py b/cloudinit/cloud.py |
2 | index 6d12c43..7ae98e1 100644 |
3 | --- a/cloudinit/cloud.py |
4 | +++ b/cloudinit/cloud.py |
5 | @@ -47,7 +47,7 @@ class Cloud(object): |
6 | |
7 | @property |
8 | def cfg(self): |
9 | - # Ensure that not indirectly modified |
10 | + # Ensure that cfg is not indirectly modified |
11 | return copy.deepcopy(self._cfg) |
12 | |
13 | def run(self, name, functor, args, freq=None, clear_on_fail=False): |
14 | @@ -61,7 +61,7 @@ class Cloud(object): |
15 | return None |
16 | return fn |
17 | |
18 | - # The rest of thes are just useful proxies |
19 | + # The rest of these are just useful proxies |
20 | def get_userdata(self, apply_filter=True): |
21 | return self.datasource.get_userdata(apply_filter) |
22 | |
23 | diff --git a/cloudinit/cmd/main.py b/cloudinit/cmd/main.py |
24 | index d6ba90f..c0edee1 100644 |
25 | --- a/cloudinit/cmd/main.py |
26 | +++ b/cloudinit/cmd/main.py |
27 | @@ -315,7 +315,7 @@ def main_init(name, args): |
28 | existing = "trust" |
29 | |
30 | init.purge_cache() |
31 | - # Delete the non-net file as well |
32 | + # Delete the no-net file as well |
33 | util.del_file(os.path.join(path_helper.get_cpath("data"), "no-net")) |
34 | |
35 | # Stage 5 |
36 | @@ -339,7 +339,7 @@ def main_init(name, args): |
37 | " Likely bad things to come!")) |
38 | if not args.force: |
39 | init.apply_network_config(bring_up=not args.local) |
40 | - LOG.debug("[%s] Exiting without datasource in local mode", mode) |
41 | + LOG.debug("[%s] Exiting without datasource", mode) |
42 | if mode == sources.DSMODE_LOCAL: |
43 | return (None, []) |
44 | else: |
45 | diff --git a/cloudinit/distros/__init__.py b/cloudinit/distros/__init__.py |
46 | index ab0b077..fde054e 100755 |
47 | --- a/cloudinit/distros/__init__.py |
48 | +++ b/cloudinit/distros/__init__.py |
49 | @@ -157,7 +157,7 @@ class Distro(object): |
50 | distro) |
51 | header = '\n'.join([ |
52 | "# Converted from network_config for distro %s" % distro, |
53 | - "# Implmentation of _write_network_config is needed." |
54 | + "# Implementation of _write_network_config is needed." |
55 | ]) |
56 | ns = network_state.parse_net_config_data(netconfig) |
57 | contents = eni.network_state_to_eni( |
58 | diff --git a/cloudinit/reporting/__init__.py b/cloudinit/reporting/__init__.py |
59 | index 1ed2b48..e047767 100644 |
60 | --- a/cloudinit/reporting/__init__.py |
61 | +++ b/cloudinit/reporting/__init__.py |
62 | @@ -18,7 +18,7 @@ DEFAULT_CONFIG = { |
63 | |
64 | |
65 | def update_configuration(config): |
66 | - """Update the instanciated_handler_registry. |
67 | + """Update the instantiated_handler_registry. |
68 | |
69 | :param config: |
70 | The dictionary containing changes to apply. If a key is given |
71 | diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py |
72 | index 4066076..c77bf2d 100644 |
73 | --- a/cloudinit/reporting/handlers.py |
74 | +++ b/cloudinit/reporting/handlers.py |
75 | @@ -1,17 +1,34 @@ |
76 | # This file is part of cloud-init. See LICENSE file for license information. |
77 | |
78 | import abc |
79 | +import fcntl |
80 | import json |
81 | import six |
82 | +import os |
83 | +import re |
84 | +import struct |
85 | +import threading |
86 | +import time |
87 | |
88 | from cloudinit import log as logging |
89 | from cloudinit.registry import DictRegistry |
90 | from cloudinit import (url_helper, util) |
91 | +from datetime import datetime |
92 | |
93 | +if six.PY2: |
94 | + import multiprocessing.queues as queue |
95 | + from multiprocessing.queues import JoinableQueue as JQueue |
96 | +else: |
97 | + import queue |
98 | + from queue import Queue as JQueue |
99 | |
100 | LOG = logging.getLogger(__name__) |
101 | |
102 | |
103 | +class ReportException(Exception): |
104 | + pass |
105 | + |
106 | + |
107 | @six.add_metaclass(abc.ABCMeta) |
108 | class ReportingHandler(object): |
109 | """Base class for report handlers. |
110 | @@ -85,9 +102,247 @@ class WebHookHandler(ReportingHandler): |
111 | LOG.warning("failed posting event: %s", event.as_string()) |
112 | |
113 | |
114 | +class HyperVKvpReportingHandler(ReportingHandler): |
115 | + """ |
116 | + Reports events to a Hyper-V host using Key-Value-Pair exchange protocol |
117 | + and can be used to obtain high level diagnostic information from the host. |
118 | + |
119 | + To use this facility, the KVP user-space daemon (hv_kvp_daemon) has to be |
120 | + running. It reads the kvp_file when the host requests the guest to |
121 | + enumerate the KVP's. |
122 | + |
123 | + This reporter collates all events for a module (origin|name) in a single |
124 | + json string in the dictionary. |
125 | + |
126 | + For more information, see |
127 | + https://technet.microsoft.com/en-us/library/dn798287.aspx#Linux%20guests |
128 | + """ |
129 | + HV_KVP_EXCHANGE_MAX_VALUE_SIZE = 2048 |
130 | + HV_KVP_EXCHANGE_MAX_KEY_SIZE = 512 |
131 | + HV_KVP_RECORD_SIZE = (HV_KVP_EXCHANGE_MAX_KEY_SIZE + |
132 | + HV_KVP_EXCHANGE_MAX_VALUE_SIZE) |
133 | + EVENT_PREFIX = 'CLOUD_INIT' |
134 | + MSG_KEY = 'msg' |
135 | + RESULT_KEY = 'result' |
136 | + DESC_IDX_KEY = 'msg_i' |
137 | + JSON_SEPARATORS = (',', ':') |
138 | + KVP_POOL_FILE_GUEST = '/var/lib/hyperv/.kvp_pool_1' |
139 | + |
140 | + def __init__(self, |
141 | + kvp_file_path=KVP_POOL_FILE_GUEST, |
142 | + event_types=None): |
143 | + super(HyperVKvpReportingHandler, self).__init__() |
144 | + self._kvp_file_path = kvp_file_path |
145 | + self._event_types = event_types |
146 | + self.running = False |
147 | + self.queue_lock = threading.Lock() |
148 | + self.running_lock = threading.Lock() |
149 | + self.q = JQueue() |
150 | + self.kvp_file = None |
151 | + self.incarnation_no = self._get_incarnation_no() |
152 | + self.event_key_prefix = u"{0}|{1}".format(self.EVENT_PREFIX, |
153 | + self.incarnation_no) |
154 | + self._current_offset = 0 |
155 | + |
156 | + def _get_incarnation_no(self): |
157 | + """ |
158 | + use the time passed as the incarnation number. |
159 | + the incarnation number is the number which are used to |
160 | + distinguish the old data stored in kvp and the new data. |
161 | + """ |
162 | + uptime_str = util.uptime() |
163 | + try: |
164 | + return int(time.time() - float(uptime_str)) |
165 | + except ValueError: |
166 | + LOG.warning("uptime '%s' not in correct format.", uptime_str) |
167 | + return 0 |
168 | + |
169 | + def _iterate_kvps(self, offset): |
170 | + """iterate the kvp file from the current offset.""" |
171 | + try: |
172 | + with open(self._kvp_file_path, 'rb+') as f: |
173 | + self.kvp_file = f |
174 | + fcntl.flock(f, fcntl.LOCK_EX) |
175 | + f.seek(offset) |
176 | + record_data = f.read(self.HV_KVP_RECORD_SIZE) |
177 | + while len(record_data) == self.HV_KVP_RECORD_SIZE: |
178 | + self._current_offset += self.HV_KVP_RECORD_SIZE |
179 | + kvp_item = self._decode_kvp_item(record_data) |
180 | + yield kvp_item |
181 | + record_data = f.read(self.HV_KVP_RECORD_SIZE) |
182 | + fcntl.flock(f, fcntl.LOCK_UN) |
183 | + finally: |
184 | + self.kvp_file = None |
185 | + |
186 | + def _event_key(self, event): |
187 | + """ |
188 | + the event key format is: |
189 | + CLOUD_INIT|<incarnation number>|<event_type>|<event_name> |
190 | + """ |
191 | + return u"{0}|{1}|{2}".format(self.event_key_prefix, |
192 | + event.event_type, event.name) |
193 | + |
194 | + def _encode_kvp_item(self, key, value): |
195 | + data = (struct.pack("%ds%ds" % ( |
196 | + self.HV_KVP_EXCHANGE_MAX_KEY_SIZE, |
197 | + self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE), |
198 | + key.encode('utf-8'), value.encode('utf-8'))) |
199 | + return data |
200 | + |
201 | + def _decode_kvp_item(self, record_data): |
202 | + record_data_len = len(record_data) |
203 | + if record_data_len != self.HV_KVP_RECORD_SIZE: |
204 | + raise ReportException( |
205 | + "record_data len not correct {0} {1}." |
206 | + .format(record_data_len, self.HV_KVP_RECORD_SIZE)) |
207 | + k = (record_data[0:self.HV_KVP_EXCHANGE_MAX_KEY_SIZE].decode('utf-8') |
208 | + .strip('\x00')) |
209 | + v = ( |
210 | + record_data[ |
211 | + self.HV_KVP_EXCHANGE_MAX_KEY_SIZE:self.HV_KVP_RECORD_SIZE |
212 | + ].decode('utf-8').strip('\x00')) |
213 | + |
214 | + return {'key': k, 'value': v} |
215 | + |
216 | + def _update_kvp_item(self, record_data): |
217 | + if self.kvp_file is None: |
218 | + raise ReportException( |
219 | + "kvp file '{0}' not opened." |
220 | + .format(self._kvp_file_path)) |
221 | + self.kvp_file.seek(-self.HV_KVP_RECORD_SIZE, 1) |
222 | + self.kvp_file.write(record_data) |
223 | + |
224 | + def _append_kvp_item(self, record_data): |
225 | + with open(self._kvp_file_path, 'rb+') as f: |
226 | + fcntl.flock(f, fcntl.LOCK_EX) |
227 | + # seek to end of the file |
228 | + f.seek(0, 2) |
229 | + f.write(record_data) |
230 | + f.flush() |
231 | + fcntl.flock(f, fcntl.LOCK_UN) |
232 | + self._current_offset = f.tell() |
233 | + |
234 | + def _break_down(self, key, meta_data, description): |
235 | + del meta_data[self.MSG_KEY] |
236 | + des_in_json = json.dumps(description) |
237 | + des_in_json = des_in_json[1:(len(des_in_json) - 1)] |
238 | + i = 0 |
239 | + result_array = [] |
240 | + message_place_holder = "\"" + self.MSG_KEY + "\":\"\"" |
241 | + while True: |
242 | + meta_data[self.DESC_IDX_KEY] = i |
243 | + meta_data[self.MSG_KEY] = '' |
244 | + data_without_desc = json.dumps(meta_data, |
245 | + separators=self.JSON_SEPARATORS) |
246 | + room_for_desc = ( |
247 | + self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE - |
248 | + len(data_without_desc) - 8) |
249 | + value = data_without_desc.replace( |
250 | + message_place_holder, |
251 | + '"{key}":"{desc}"'.format( |
252 | + key=self.MSG_KEY, desc=des_in_json[:room_for_desc])) |
253 | + result_array.append(self._encode_kvp_item(key, value)) |
254 | + i += 1 |
255 | + des_in_json = des_in_json[room_for_desc:] |
256 | + if len(des_in_json) == 0: |
257 | + break |
258 | + return result_array |
259 | + |
260 | + def _encode_event(self, event): |
261 | + """ |
262 | + encode the event into kvp data bytes. |
263 | + if the event content reaches the maximum length of kvp value. |
264 | + then it would be cut to multiple slices. |
265 | + """ |
266 | + key = self._event_key(event) |
267 | + meta_data = { |
268 | + "name": event.name, |
269 | + "type": event.event_type, |
270 | + "ts": (datetime.utcfromtimestamp(event.timestamp) |
271 | + .isoformat() + 'Z'), |
272 | + } |
273 | + if hasattr(event, self.RESULT_KEY): |
274 | + meta_data[self.RESULT_KEY] = event.result |
275 | + meta_data[self.MSG_KEY] = event.description |
276 | + value = json.dumps(meta_data, separators=self.JSON_SEPARATORS) |
277 | + # if it reaches the maximum length of kvp value, |
278 | + # break it down to slices. |
279 | + # this should be very corner case. |
280 | + if len(value) > self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE: |
281 | + return self._break_down(key, meta_data, event.description) |
282 | + else: |
283 | + data = self._encode_kvp_item(key, value) |
284 | + return [data] |
285 | + |
286 | + def _publish_event_routine(self): |
287 | + while True: |
288 | + event = None |
289 | + try: |
290 | + # acquire the lock. |
291 | + event = self.q.get_nowait() |
292 | + need_append = True |
293 | + try: |
294 | + if not os.path.exists(self._kvp_file_path): |
295 | + LOG.warning( |
296 | + "skip writing event %s to %s because file not exists.", |
297 | + event.as_string(), |
298 | + self._kvp_file_path) |
299 | + encoded_event = self._encode_event(event) |
300 | + # for each encoded_event |
301 | + for encoded_data in (encoded_event): |
302 | + for kvp in self._iterate_kvps(self._current_offset): |
303 | + match = ( |
304 | + re.match( |
305 | + r"^{0}\|(\d+)\|.+" |
306 | + .format(self.EVENT_PREFIX), |
307 | + kvp['key'] |
308 | + )) |
309 | + if match: |
310 | + match_groups = match.groups(0) |
311 | + if int(match_groups[0]) < self.incarnation_no: |
312 | + need_append = False |
313 | + self._update_kvp_item(encoded_data) |
314 | + break |
315 | + if need_append: |
316 | + self._append_kvp_item(encoded_data) |
317 | + except IOError as e: |
318 | + LOG.warning( |
319 | + "failed posting event to kvp: %s e:%s", |
320 | + event.as_string(), e) |
321 | + self.running = False |
322 | + break |
323 | + finally: |
324 | + self.q.task_done() |
325 | + except queue.Empty: |
326 | + with self.queue_lock: |
327 | + # double check the queue is empty |
328 | + if self.q.empty(): |
329 | + self.running = False |
330 | + break |
331 | + |
332 | + def trigger_publish_event(self): |
333 | + if not self.running: |
334 | + with self.running_lock: |
335 | + if not self.running: |
336 | + self.running = True |
337 | + thread = threading.Thread( |
338 | + target=self._publish_event_routine) |
339 | + thread.start() |
340 | + |
341 | + # since the saving to the kvp pool can be a time costing task |
342 | + # if the kvp pool already contains a chunk of data, |
343 | + # so defer it to another thread. |
344 | + def publish_event(self, event): |
345 | + if (not self._event_types or event.event_type in self._event_types): |
346 | + with self.queue_lock: |
347 | + self.q.put(event) |
348 | + self.trigger_publish_event() |
349 | + |
350 | + |
351 | available_handlers = DictRegistry() |
352 | available_handlers.register_item('log', LogHandler) |
353 | available_handlers.register_item('print', PrintHandler) |
354 | available_handlers.register_item('webhook', WebHookHandler) |
355 | +available_handlers.register_item('hyperv', HyperVKvpReportingHandler) |
356 | |
357 | # vi: ts=4 expandtab |
358 | diff --git a/cloudinit/stages.py b/cloudinit/stages.py |
359 | index c132b57..8874d40 100644 |
360 | --- a/cloudinit/stages.py |
361 | +++ b/cloudinit/stages.py |
362 | @@ -510,7 +510,7 @@ class Init(object): |
363 | # The default frequency if handlers don't have one |
364 | 'frequency': frequency, |
365 | # This will be used when new handlers are found |
366 | - # to help write there contents to files with numbered |
367 | + # to help write their contents to files with numbered |
368 | # names... |
369 | 'handlercount': 0, |
370 | 'excluded': excluded, |
371 | diff --git a/tests/unittests/test_reporting_hyperv.py b/tests/unittests/test_reporting_hyperv.py |
372 | new file mode 100644 |
373 | index 0000000..0b62506 |
374 | --- /dev/null |
375 | +++ b/tests/unittests/test_reporting_hyperv.py |
376 | @@ -0,0 +1,135 @@ |
377 | +# This file is part of cloud-init. See LICENSE file for license information. |
378 | + |
379 | +from cloudinit.reporting import events |
380 | +from cloudinit.reporting import handlers |
381 | + |
382 | +import json |
383 | +import os |
384 | +import tempfile |
385 | + |
386 | +from cloudinit import util |
387 | +from cloudinit.tests.helpers import CiTestCase |
388 | + |
389 | + |
390 | +class TestKvpEncoding(CiTestCase): |
391 | + def test_encode_decode(self): |
392 | + kvp = {'key': 'key1', 'value': 'value1'} |
393 | + kvp_reporting = handlers.HyperVKvpReportingHandler() |
394 | + data = kvp_reporting._encode_kvp_item(kvp['key'], kvp['value']) |
395 | + self.assertEqual(len(data), kvp_reporting.HV_KVP_RECORD_SIZE) |
396 | + decoded_kvp = kvp_reporting._decode_kvp_item(data) |
397 | + self.assertEqual(kvp, decoded_kvp) |
398 | + |
399 | + |
400 | +class TextKvpReporter(CiTestCase): |
401 | + def setUp(self): |
402 | + super(TextKvpReporter, self).setUp() |
403 | + self.tmp_file_path = self.tmp_path('kvp_pool_file') |
404 | + util.ensure_file(self.tmp_file_path) |
405 | + |
406 | + def test_event_type_can_be_filtered(self): |
407 | + reporter = handlers.HyperVKvpReportingHandler( |
408 | + kvp_file_path=self.tmp_file_path, |
409 | + event_types=['foo', 'bar']) |
410 | + |
411 | + reporter.publish_event( |
412 | + events.ReportingEvent('foo', 'name', 'description')) |
413 | + reporter.publish_event( |
414 | + events.ReportingEvent('some_other', 'name', 'description3')) |
415 | + reporter.q.join() |
416 | + |
417 | + kvps = list(reporter._iterate_kvps(0)) |
418 | + self.assertEqual(1, len(kvps)) |
419 | + |
420 | + reporter.publish_event( |
421 | + events.ReportingEvent('bar', 'name', 'description2')) |
422 | + reporter.q.join() |
423 | + kvps = list(reporter._iterate_kvps(0)) |
424 | + self.assertEqual(2, len(kvps)) |
425 | + |
426 | + self.assertIn('foo', kvps[0]['key']) |
427 | + self.assertIn('bar', kvps[1]['key']) |
428 | + self.assertNotIn('some_other', kvps[0]['key']) |
429 | + self.assertNotIn('some_other', kvps[1]['key']) |
430 | + |
431 | + def test_events_are_over_written(self): |
432 | + reporter = handlers.HyperVKvpReportingHandler( |
433 | + kvp_file_path=self.tmp_file_path) |
434 | + |
435 | + self.assertEqual(0, len(list(reporter._iterate_kvps(0)))) |
436 | + |
437 | + reporter.publish_event( |
438 | + events.ReportingEvent('foo', 'name1', 'description')) |
439 | + reporter.publish_event( |
440 | + events.ReportingEvent('foo', 'name2', 'description')) |
441 | + reporter.q.join() |
442 | + self.assertEqual(2, len(list(reporter._iterate_kvps(0)))) |
443 | + |
444 | + reporter2 = handlers.HyperVKvpReportingHandler( |
445 | + kvp_file_path=self.tmp_file_path) |
446 | + reporter2.incarnation_no = reporter.incarnation_no + 1 |
447 | + reporter2.publish_event( |
448 | + events.ReportingEvent('foo', 'name3', 'description')) |
449 | + reporter2.q.join() |
450 | + |
451 | + self.assertEqual(2, len(list(reporter2._iterate_kvps(0)))) |
452 | + |
453 | + def test_events_with_higher_incarnation_not_over_written(self): |
454 | + reporter = handlers.HyperVKvpReportingHandler( |
455 | + kvp_file_path=self.tmp_file_path) |
456 | + |
457 | + self.assertEqual(0, len(list(reporter._iterate_kvps(0)))) |
458 | + |
459 | + reporter.publish_event( |
460 | + events.ReportingEvent('foo', 'name1', 'description')) |
461 | + reporter.publish_event( |
462 | + events.ReportingEvent('foo', 'name2', 'description')) |
463 | + reporter.q.join() |
464 | + self.assertEqual(2, len(list(reporter._iterate_kvps(0)))) |
465 | + |
466 | + reporter3 = handlers.HyperVKvpReportingHandler( |
467 | + kvp_file_path=self.tmp_file_path) |
468 | + reporter3.incarnation_no = reporter.incarnation_no - 1 |
469 | + reporter3.publish_event( |
470 | + events.ReportingEvent('foo', 'name3', 'description')) |
471 | + reporter3.q.join() |
472 | + self.assertEqual(3, len(list(reporter3._iterate_kvps(0)))) |
473 | + |
474 | + def test_finish_event_result_is_logged(self): |
475 | + reporter = handlers.HyperVKvpReportingHandler( |
476 | + kvp_file_path=self.tmp_file_path) |
477 | + reporter.publish_event( |
478 | + events.FinishReportingEvent('name2', 'description1', |
479 | + result=events.status.FAIL)) |
480 | + reporter.q.join() |
481 | + self.assertIn('FAIL', list(reporter._iterate_kvps(0))[0]['value']) |
482 | + |
483 | + def test_file_operation_issue(self): |
484 | + os.remove(self.tmp_file_path) |
485 | + reporter = handlers.HyperVKvpReportingHandler( |
486 | + kvp_file_path=self.tmp_file_path) |
487 | + reporter.publish_event( |
488 | + events.FinishReportingEvent('name2', 'description1', |
489 | + result=events.status.FAIL)) |
490 | + reporter.q.join() |
491 | + |
492 | + def test_event_very_long(self): |
493 | + reporter = handlers.HyperVKvpReportingHandler( |
494 | + kvp_file_path=self.tmp_file_path) |
495 | + description = 'ab' * reporter.HV_KVP_EXCHANGE_MAX_VALUE_SIZE |
496 | + long_event = events.FinishReportingEvent( |
497 | + 'event_name', |
498 | + description, |
499 | + result=events.status.FAIL) |
500 | + reporter.publish_event(long_event) |
501 | + reporter.q.join() |
502 | + kvps = list(reporter._iterate_kvps(0)) |
503 | + self.assertEqual(3, len(kvps)) |
504 | + |
505 | + # restore from the kvp to see the content are all there |
506 | + full_description = '' |
507 | + for i in range(len(kvps)): |
508 | + msg_slice = json.loads(kvps[i]['value']) |
509 | + self.assertEqual(msg_slice['msg_i'], i) |
510 | + full_description += msg_slice['msg'] |
511 | + self.assertEqual(description, full_description) |
Hi Cloud-init owners,
In this PR, one option to send the cloud init events to the kvp pool is added.
Customers can do config like this:
cat > /etc/cloud/ cloud.cfg. d/06_reporting. cfg << EOF
reporting:
logging:
type: log
telemetry:
type: hyperv
EOF
Then the events would be send into the kvp pool.
Thanks,
Andy