Merge ~andyliuliming/cloud-init:reporting into cloud-init:master

Proposed by Andy
Status: Rejected
Rejected by: Chad Smith
Proposed branch: ~andyliuliming/cloud-init:reporting
Merge into: cloud-init:master
Diff against target: 511 lines (+397/-7)
7 files modified
cloudinit/cloud.py (+2/-2)
cloudinit/cmd/main.py (+2/-2)
cloudinit/distros/__init__.py (+1/-1)
cloudinit/reporting/__init__.py (+1/-1)
cloudinit/reporting/handlers.py (+255/-0)
cloudinit/stages.py (+1/-1)
tests/unittests/test_reporting_hyperv.py (+135/-0)
Reviewer Review Type Date Requested Status
Chad Smith Disapprove
Server Team CI bot continuous-integration Needs Fixing
Ryan Harper Needs Fixing
Review via email: mp+351742@code.launchpad.net

Description of the change

added one option to send the cloud init events to the kvp in hyper-v.

To post a comment you must log in.
Revision history for this message
Andy (andyliuliming) wrote :

Hi Cloud-init owners,
In this PR, one option to send the cloud init events to the kvp pool is added.
Customers can do config like this:

cat > /etc/cloud/cloud.cfg.d/06_reporting.cfg << EOF
reporting:
    logging:
        type: log
    telemetry:
        type: hyperv
EOF

Then the events would be send into the kvp pool.

Thanks,
Andy

Revision history for this message
Andy (andyliuliming) wrote :

Loop the owners of the cloud-init in๐Ÿ˜Š

From: Liu Liming <email address hidden>
Date: Saturday, August 4, 2018 at 10:54 AM
To: "<email address hidden>" <email address hidden>
Subject: Hi cloud-init owners, could you please help review this PR?

Hi Cloud-init owners,
In this PR, one option to send the cloud init events to the kvp pool is added.
Customers can do config like this:

cat > /etc/cloud/cloud.cfg.d/06_reporting.cfg << EOF
reporting:
    logging:
        type: log
    telemetry:
        type: hyperv
EOF

Then the events would be send into the kvp pool.

Thanks,
Andy

Revision history for this message
Paul Meyer (paul-meyer) wrote :

I added some questions and I also think we need to revert the non-kvp functional changes. I guess those were in the branch I pointed you at...

Revision history for this message
Ryan Harper (raharper) :
review: Needs Fixing
Revision history for this message
Andy (andyliuliming) wrote :

resolved the comments and send out a new MR.

Revision history for this message
Server Team CI bot (server-team-bot) wrote :

FAILED: Continuous integration, rev:688cd6450064f8bc803b9c763d5526a3d088eeaa
https://jenkins.ubuntu.com/server/job/cloud-init-ci/208/
Executed test runs:
    SUCCESS: Checkout
    FAILED: Unit & Style Tests

Click here to trigger a rebuild:
https://jenkins.ubuntu.com/server/job/cloud-init-ci/208/rebuild

review: Needs Fixing (continuous-integration)
Revision history for this message
Server Team CI bot (server-team-bot) wrote :

FAILED: Continuous integration, rev:da25a148e776059e07a1e21899a5eea686cb6226
https://jenkins.ubuntu.com/server/job/cloud-init-ci/209/
Executed test runs:
    SUCCESS: Checkout
    FAILED: Unit & Style Tests

Click here to trigger a rebuild:
https://jenkins.ubuntu.com/server/job/cloud-init-ci/209/rebuild

review: Needs Fixing (continuous-integration)
Revision history for this message
Server Team CI bot (server-team-bot) wrote :

FAILED: Continuous integration, rev:662011b505ff18b698c446c24173b03f53e1870f
https://jenkins.ubuntu.com/server/job/cloud-init-ci/210/
Executed test runs:
    SUCCESS: Checkout
    FAILED: Unit & Style Tests

Click here to trigger a rebuild:
https://jenkins.ubuntu.com/server/job/cloud-init-ci/210/rebuild

review: Needs Fixing (continuous-integration)
Revision history for this message
Server Team CI bot (server-team-bot) wrote :

FAILED: Continuous integration, rev:437ed8b8a09ed04f1af6aa4a86dc80ff305962a3
https://jenkins.ubuntu.com/server/job/cloud-init-ci/211/
Executed test runs:
    SUCCESS: Checkout
    FAILED: Unit & Style Tests

Click here to trigger a rebuild:
https://jenkins.ubuntu.com/server/job/cloud-init-ci/211/rebuild

review: Needs Fixing (continuous-integration)
Revision history for this message
Server Team CI bot (server-team-bot) wrote :

PASSED: Continuous integration, rev:338859269f147f95e60db40883c4b12932b18c57
https://jenkins.ubuntu.com/server/job/cloud-init-ci/212/
Executed test runs:
    SUCCESS: Checkout
    SUCCESS: Unit & Style Tests
    SUCCESS: Ubuntu LTS: Build
    SUCCESS: Ubuntu LTS: Integration
    SUCCESS: MAAS Compatability Testing
    IN_PROGRESS: Declarative: Post Actions

Click here to trigger a rebuild:
https://jenkins.ubuntu.com/server/job/cloud-init-ci/212/rebuild

review: Approve (continuous-integration)
Revision history for this message
Chad Smith (chad.smith) :
Revision history for this message
Chad Smith (chad.smith) :
~andyliuliming/cloud-init:reporting updated
af45f66... by Andy

resolve the code review comments.

85b1209... by Andy

1. resolve comments. 2. update some unit test.

Revision history for this message
Andy (andyliuliming) :
Revision history for this message
Server Team CI bot (server-team-bot) wrote :

FAILED: Continuous integration, rev:85b12092cff160b33e5ff804392dc8513b0ee037
https://jenkins.ubuntu.com/server/job/cloud-init-ci/256/
Executed test runs:
    SUCCESS: Checkout
    FAILED: Unit & Style Tests

Click here to trigger a rebuild:
https://jenkins.ubuntu.com/server/job/cloud-init-ci/256/rebuild

review: Needs Fixing (continuous-integration)
Revision history for this message
Paul Meyer (paul-meyer) wrote :

Here's a patch for the two style fixes that are needed:

diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py
index c77bf2d1..e12d8ea8 100644
--- a/cloudinit/reporting/handlers.py
+++ b/cloudinit/reporting/handlers.py
@@ -284,7 +284,8 @@ class HyperVKvpReportingHandler(ReportingHandler):
                 try:
                     if not os.path.exists(self._kvp_file_path):
                         LOG.warning(
- "skip writing event %s to %s because file not exists.",
+ "skip writing event %s to %s because " +
+ "file not exists.",
                             event.as_string(),
                             self._kvp_file_path)
                     encoded_event = self._encode_event(event)
diff --git a/tests/unittests/test_reporting_hyperv.py b/tests/unittests/test_reporting_hyperv.py
index 0b62506a..2e64c6c7 100644
--- a/tests/unittests/test_reporting_hyperv.py
+++ b/tests/unittests/test_reporting_hyperv.py
@@ -5,7 +5,6 @@ from cloudinit.reporting import handlers

 import json
 import os
-import tempfile

 from cloudinit import util
 from cloudinit.tests.helpers import CiTestCase

Revision history for this message
Chad Smith (chad.smith) wrote :

Thanks for this content Andy, I'm pulling this branch in with a minor flakes/pycodestyle fix and will attribute authorship to you. Thanks again for the contribution.
The branch that will land will be this one.

Marking this one as Rejected in favor of the followup:
https://code.launchpad.net/~chad.smith/cloud-init/+git/cloud-init/+merge/353739

review: Disapprove

Unmerged commits

85b1209... by Andy

1. resolve comments. 2. update some unit test.

af45f66... by Andy

resolve the code review comments.

3388592... by Andy

overwrite the event passed.

5d68f97... by Paul Meyer

Add Hyper-V KVP reporter

e8b9a57... by Paul Meyer

Fix inaccurate logging

3246f4c... by Paul Meyer

Add extra util.subp logging

9e38bf8... by Paul Meyer

typos

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/cloudinit/cloud.py b/cloudinit/cloud.py
2index 6d12c43..7ae98e1 100644
3--- a/cloudinit/cloud.py
4+++ b/cloudinit/cloud.py
5@@ -47,7 +47,7 @@ class Cloud(object):
6
7 @property
8 def cfg(self):
9- # Ensure that not indirectly modified
10+ # Ensure that cfg is not indirectly modified
11 return copy.deepcopy(self._cfg)
12
13 def run(self, name, functor, args, freq=None, clear_on_fail=False):
14@@ -61,7 +61,7 @@ class Cloud(object):
15 return None
16 return fn
17
18- # The rest of thes are just useful proxies
19+ # The rest of these are just useful proxies
20 def get_userdata(self, apply_filter=True):
21 return self.datasource.get_userdata(apply_filter)
22
23diff --git a/cloudinit/cmd/main.py b/cloudinit/cmd/main.py
24index d6ba90f..c0edee1 100644
25--- a/cloudinit/cmd/main.py
26+++ b/cloudinit/cmd/main.py
27@@ -315,7 +315,7 @@ def main_init(name, args):
28 existing = "trust"
29
30 init.purge_cache()
31- # Delete the non-net file as well
32+ # Delete the no-net file as well
33 util.del_file(os.path.join(path_helper.get_cpath("data"), "no-net"))
34
35 # Stage 5
36@@ -339,7 +339,7 @@ def main_init(name, args):
37 " Likely bad things to come!"))
38 if not args.force:
39 init.apply_network_config(bring_up=not args.local)
40- LOG.debug("[%s] Exiting without datasource in local mode", mode)
41+ LOG.debug("[%s] Exiting without datasource", mode)
42 if mode == sources.DSMODE_LOCAL:
43 return (None, [])
44 else:
45diff --git a/cloudinit/distros/__init__.py b/cloudinit/distros/__init__.py
46index ab0b077..fde054e 100755
47--- a/cloudinit/distros/__init__.py
48+++ b/cloudinit/distros/__init__.py
49@@ -157,7 +157,7 @@ class Distro(object):
50 distro)
51 header = '\n'.join([
52 "# Converted from network_config for distro %s" % distro,
53- "# Implmentation of _write_network_config is needed."
54+ "# Implementation of _write_network_config is needed."
55 ])
56 ns = network_state.parse_net_config_data(netconfig)
57 contents = eni.network_state_to_eni(
58diff --git a/cloudinit/reporting/__init__.py b/cloudinit/reporting/__init__.py
59index 1ed2b48..e047767 100644
60--- a/cloudinit/reporting/__init__.py
61+++ b/cloudinit/reporting/__init__.py
62@@ -18,7 +18,7 @@ DEFAULT_CONFIG = {
63
64
65 def update_configuration(config):
66- """Update the instanciated_handler_registry.
67+ """Update the instantiated_handler_registry.
68
69 :param config:
70 The dictionary containing changes to apply. If a key is given
71diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py
72index 4066076..c77bf2d 100644
73--- a/cloudinit/reporting/handlers.py
74+++ b/cloudinit/reporting/handlers.py
75@@ -1,17 +1,34 @@
76 # This file is part of cloud-init. See LICENSE file for license information.
77
78 import abc
79+import fcntl
80 import json
81 import six
82+import os
83+import re
84+import struct
85+import threading
86+import time
87
88 from cloudinit import log as logging
89 from cloudinit.registry import DictRegistry
90 from cloudinit import (url_helper, util)
91+from datetime import datetime
92
93+if six.PY2:
94+ import multiprocessing.queues as queue
95+ from multiprocessing.queues import JoinableQueue as JQueue
96+else:
97+ import queue
98+ from queue import Queue as JQueue
99
100 LOG = logging.getLogger(__name__)
101
102
103+class ReportException(Exception):
104+ pass
105+
106+
107 @six.add_metaclass(abc.ABCMeta)
108 class ReportingHandler(object):
109 """Base class for report handlers.
110@@ -85,9 +102,247 @@ class WebHookHandler(ReportingHandler):
111 LOG.warning("failed posting event: %s", event.as_string())
112
113
114+class HyperVKvpReportingHandler(ReportingHandler):
115+ """
116+ Reports events to a Hyper-V host using Key-Value-Pair exchange protocol
117+ and can be used to obtain high level diagnostic information from the host.
118+
119+ To use this facility, the KVP user-space daemon (hv_kvp_daemon) has to be
120+ running. It reads the kvp_file when the host requests the guest to
121+ enumerate the KVP's.
122+
123+ This reporter collates all events for a module (origin|name) in a single
124+ json string in the dictionary.
125+
126+ For more information, see
127+ https://technet.microsoft.com/en-us/library/dn798287.aspx#Linux%20guests
128+ """
129+ HV_KVP_EXCHANGE_MAX_VALUE_SIZE = 2048
130+ HV_KVP_EXCHANGE_MAX_KEY_SIZE = 512
131+ HV_KVP_RECORD_SIZE = (HV_KVP_EXCHANGE_MAX_KEY_SIZE +
132+ HV_KVP_EXCHANGE_MAX_VALUE_SIZE)
133+ EVENT_PREFIX = 'CLOUD_INIT'
134+ MSG_KEY = 'msg'
135+ RESULT_KEY = 'result'
136+ DESC_IDX_KEY = 'msg_i'
137+ JSON_SEPARATORS = (',', ':')
138+ KVP_POOL_FILE_GUEST = '/var/lib/hyperv/.kvp_pool_1'
139+
140+ def __init__(self,
141+ kvp_file_path=KVP_POOL_FILE_GUEST,
142+ event_types=None):
143+ super(HyperVKvpReportingHandler, self).__init__()
144+ self._kvp_file_path = kvp_file_path
145+ self._event_types = event_types
146+ self.running = False
147+ self.queue_lock = threading.Lock()
148+ self.running_lock = threading.Lock()
149+ self.q = JQueue()
150+ self.kvp_file = None
151+ self.incarnation_no = self._get_incarnation_no()
152+ self.event_key_prefix = u"{0}|{1}".format(self.EVENT_PREFIX,
153+ self.incarnation_no)
154+ self._current_offset = 0
155+
156+ def _get_incarnation_no(self):
157+ """
158+ use the time passed as the incarnation number.
159+ the incarnation number is the number which are used to
160+ distinguish the old data stored in kvp and the new data.
161+ """
162+ uptime_str = util.uptime()
163+ try:
164+ return int(time.time() - float(uptime_str))
165+ except ValueError:
166+ LOG.warning("uptime '%s' not in correct format.", uptime_str)
167+ return 0
168+
169+ def _iterate_kvps(self, offset):
170+ """iterate the kvp file from the current offset."""
171+ try:
172+ with open(self._kvp_file_path, 'rb+') as f:
173+ self.kvp_file = f
174+ fcntl.flock(f, fcntl.LOCK_EX)
175+ f.seek(offset)
176+ record_data = f.read(self.HV_KVP_RECORD_SIZE)
177+ while len(record_data) == self.HV_KVP_RECORD_SIZE:
178+ self._current_offset += self.HV_KVP_RECORD_SIZE
179+ kvp_item = self._decode_kvp_item(record_data)
180+ yield kvp_item
181+ record_data = f.read(self.HV_KVP_RECORD_SIZE)
182+ fcntl.flock(f, fcntl.LOCK_UN)
183+ finally:
184+ self.kvp_file = None
185+
186+ def _event_key(self, event):
187+ """
188+ the event key format is:
189+ CLOUD_INIT|<incarnation number>|<event_type>|<event_name>
190+ """
191+ return u"{0}|{1}|{2}".format(self.event_key_prefix,
192+ event.event_type, event.name)
193+
194+ def _encode_kvp_item(self, key, value):
195+ data = (struct.pack("%ds%ds" % (
196+ self.HV_KVP_EXCHANGE_MAX_KEY_SIZE,
197+ self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE),
198+ key.encode('utf-8'), value.encode('utf-8')))
199+ return data
200+
201+ def _decode_kvp_item(self, record_data):
202+ record_data_len = len(record_data)
203+ if record_data_len != self.HV_KVP_RECORD_SIZE:
204+ raise ReportException(
205+ "record_data len not correct {0} {1}."
206+ .format(record_data_len, self.HV_KVP_RECORD_SIZE))
207+ k = (record_data[0:self.HV_KVP_EXCHANGE_MAX_KEY_SIZE].decode('utf-8')
208+ .strip('\x00'))
209+ v = (
210+ record_data[
211+ self.HV_KVP_EXCHANGE_MAX_KEY_SIZE:self.HV_KVP_RECORD_SIZE
212+ ].decode('utf-8').strip('\x00'))
213+
214+ return {'key': k, 'value': v}
215+
216+ def _update_kvp_item(self, record_data):
217+ if self.kvp_file is None:
218+ raise ReportException(
219+ "kvp file '{0}' not opened."
220+ .format(self._kvp_file_path))
221+ self.kvp_file.seek(-self.HV_KVP_RECORD_SIZE, 1)
222+ self.kvp_file.write(record_data)
223+
224+ def _append_kvp_item(self, record_data):
225+ with open(self._kvp_file_path, 'rb+') as f:
226+ fcntl.flock(f, fcntl.LOCK_EX)
227+ # seek to end of the file
228+ f.seek(0, 2)
229+ f.write(record_data)
230+ f.flush()
231+ fcntl.flock(f, fcntl.LOCK_UN)
232+ self._current_offset = f.tell()
233+
234+ def _break_down(self, key, meta_data, description):
235+ del meta_data[self.MSG_KEY]
236+ des_in_json = json.dumps(description)
237+ des_in_json = des_in_json[1:(len(des_in_json) - 1)]
238+ i = 0
239+ result_array = []
240+ message_place_holder = "\"" + self.MSG_KEY + "\":\"\""
241+ while True:
242+ meta_data[self.DESC_IDX_KEY] = i
243+ meta_data[self.MSG_KEY] = ''
244+ data_without_desc = json.dumps(meta_data,
245+ separators=self.JSON_SEPARATORS)
246+ room_for_desc = (
247+ self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE -
248+ len(data_without_desc) - 8)
249+ value = data_without_desc.replace(
250+ message_place_holder,
251+ '"{key}":"{desc}"'.format(
252+ key=self.MSG_KEY, desc=des_in_json[:room_for_desc]))
253+ result_array.append(self._encode_kvp_item(key, value))
254+ i += 1
255+ des_in_json = des_in_json[room_for_desc:]
256+ if len(des_in_json) == 0:
257+ break
258+ return result_array
259+
260+ def _encode_event(self, event):
261+ """
262+ encode the event into kvp data bytes.
263+ if the event content reaches the maximum length of kvp value.
264+ then it would be cut to multiple slices.
265+ """
266+ key = self._event_key(event)
267+ meta_data = {
268+ "name": event.name,
269+ "type": event.event_type,
270+ "ts": (datetime.utcfromtimestamp(event.timestamp)
271+ .isoformat() + 'Z'),
272+ }
273+ if hasattr(event, self.RESULT_KEY):
274+ meta_data[self.RESULT_KEY] = event.result
275+ meta_data[self.MSG_KEY] = event.description
276+ value = json.dumps(meta_data, separators=self.JSON_SEPARATORS)
277+ # if it reaches the maximum length of kvp value,
278+ # break it down to slices.
279+ # this should be very corner case.
280+ if len(value) > self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE:
281+ return self._break_down(key, meta_data, event.description)
282+ else:
283+ data = self._encode_kvp_item(key, value)
284+ return [data]
285+
286+ def _publish_event_routine(self):
287+ while True:
288+ event = None
289+ try:
290+ # acquire the lock.
291+ event = self.q.get_nowait()
292+ need_append = True
293+ try:
294+ if not os.path.exists(self._kvp_file_path):
295+ LOG.warning(
296+ "skip writing event %s to %s because file not exists.",
297+ event.as_string(),
298+ self._kvp_file_path)
299+ encoded_event = self._encode_event(event)
300+ # for each encoded_event
301+ for encoded_data in (encoded_event):
302+ for kvp in self._iterate_kvps(self._current_offset):
303+ match = (
304+ re.match(
305+ r"^{0}\|(\d+)\|.+"
306+ .format(self.EVENT_PREFIX),
307+ kvp['key']
308+ ))
309+ if match:
310+ match_groups = match.groups(0)
311+ if int(match_groups[0]) < self.incarnation_no:
312+ need_append = False
313+ self._update_kvp_item(encoded_data)
314+ break
315+ if need_append:
316+ self._append_kvp_item(encoded_data)
317+ except IOError as e:
318+ LOG.warning(
319+ "failed posting event to kvp: %s e:%s",
320+ event.as_string(), e)
321+ self.running = False
322+ break
323+ finally:
324+ self.q.task_done()
325+ except queue.Empty:
326+ with self.queue_lock:
327+ # double check the queue is empty
328+ if self.q.empty():
329+ self.running = False
330+ break
331+
332+ def trigger_publish_event(self):
333+ if not self.running:
334+ with self.running_lock:
335+ if not self.running:
336+ self.running = True
337+ thread = threading.Thread(
338+ target=self._publish_event_routine)
339+ thread.start()
340+
341+ # since the saving to the kvp pool can be a time costing task
342+ # if the kvp pool already contains a chunk of data,
343+ # so defer it to another thread.
344+ def publish_event(self, event):
345+ if (not self._event_types or event.event_type in self._event_types):
346+ with self.queue_lock:
347+ self.q.put(event)
348+ self.trigger_publish_event()
349+
350+
351 available_handlers = DictRegistry()
352 available_handlers.register_item('log', LogHandler)
353 available_handlers.register_item('print', PrintHandler)
354 available_handlers.register_item('webhook', WebHookHandler)
355+available_handlers.register_item('hyperv', HyperVKvpReportingHandler)
356
357 # vi: ts=4 expandtab
358diff --git a/cloudinit/stages.py b/cloudinit/stages.py
359index c132b57..8874d40 100644
360--- a/cloudinit/stages.py
361+++ b/cloudinit/stages.py
362@@ -510,7 +510,7 @@ class Init(object):
363 # The default frequency if handlers don't have one
364 'frequency': frequency,
365 # This will be used when new handlers are found
366- # to help write there contents to files with numbered
367+ # to help write their contents to files with numbered
368 # names...
369 'handlercount': 0,
370 'excluded': excluded,
371diff --git a/tests/unittests/test_reporting_hyperv.py b/tests/unittests/test_reporting_hyperv.py
372new file mode 100644
373index 0000000..0b62506
374--- /dev/null
375+++ b/tests/unittests/test_reporting_hyperv.py
376@@ -0,0 +1,135 @@
377+# This file is part of cloud-init. See LICENSE file for license information.
378+
379+from cloudinit.reporting import events
380+from cloudinit.reporting import handlers
381+
382+import json
383+import os
384+import tempfile
385+
386+from cloudinit import util
387+from cloudinit.tests.helpers import CiTestCase
388+
389+
390+class TestKvpEncoding(CiTestCase):
391+ def test_encode_decode(self):
392+ kvp = {'key': 'key1', 'value': 'value1'}
393+ kvp_reporting = handlers.HyperVKvpReportingHandler()
394+ data = kvp_reporting._encode_kvp_item(kvp['key'], kvp['value'])
395+ self.assertEqual(len(data), kvp_reporting.HV_KVP_RECORD_SIZE)
396+ decoded_kvp = kvp_reporting._decode_kvp_item(data)
397+ self.assertEqual(kvp, decoded_kvp)
398+
399+
400+class TextKvpReporter(CiTestCase):
401+ def setUp(self):
402+ super(TextKvpReporter, self).setUp()
403+ self.tmp_file_path = self.tmp_path('kvp_pool_file')
404+ util.ensure_file(self.tmp_file_path)
405+
406+ def test_event_type_can_be_filtered(self):
407+ reporter = handlers.HyperVKvpReportingHandler(
408+ kvp_file_path=self.tmp_file_path,
409+ event_types=['foo', 'bar'])
410+
411+ reporter.publish_event(
412+ events.ReportingEvent('foo', 'name', 'description'))
413+ reporter.publish_event(
414+ events.ReportingEvent('some_other', 'name', 'description3'))
415+ reporter.q.join()
416+
417+ kvps = list(reporter._iterate_kvps(0))
418+ self.assertEqual(1, len(kvps))
419+
420+ reporter.publish_event(
421+ events.ReportingEvent('bar', 'name', 'description2'))
422+ reporter.q.join()
423+ kvps = list(reporter._iterate_kvps(0))
424+ self.assertEqual(2, len(kvps))
425+
426+ self.assertIn('foo', kvps[0]['key'])
427+ self.assertIn('bar', kvps[1]['key'])
428+ self.assertNotIn('some_other', kvps[0]['key'])
429+ self.assertNotIn('some_other', kvps[1]['key'])
430+
431+ def test_events_are_over_written(self):
432+ reporter = handlers.HyperVKvpReportingHandler(
433+ kvp_file_path=self.tmp_file_path)
434+
435+ self.assertEqual(0, len(list(reporter._iterate_kvps(0))))
436+
437+ reporter.publish_event(
438+ events.ReportingEvent('foo', 'name1', 'description'))
439+ reporter.publish_event(
440+ events.ReportingEvent('foo', 'name2', 'description'))
441+ reporter.q.join()
442+ self.assertEqual(2, len(list(reporter._iterate_kvps(0))))
443+
444+ reporter2 = handlers.HyperVKvpReportingHandler(
445+ kvp_file_path=self.tmp_file_path)
446+ reporter2.incarnation_no = reporter.incarnation_no + 1
447+ reporter2.publish_event(
448+ events.ReportingEvent('foo', 'name3', 'description'))
449+ reporter2.q.join()
450+
451+ self.assertEqual(2, len(list(reporter2._iterate_kvps(0))))
452+
453+ def test_events_with_higher_incarnation_not_over_written(self):
454+ reporter = handlers.HyperVKvpReportingHandler(
455+ kvp_file_path=self.tmp_file_path)
456+
457+ self.assertEqual(0, len(list(reporter._iterate_kvps(0))))
458+
459+ reporter.publish_event(
460+ events.ReportingEvent('foo', 'name1', 'description'))
461+ reporter.publish_event(
462+ events.ReportingEvent('foo', 'name2', 'description'))
463+ reporter.q.join()
464+ self.assertEqual(2, len(list(reporter._iterate_kvps(0))))
465+
466+ reporter3 = handlers.HyperVKvpReportingHandler(
467+ kvp_file_path=self.tmp_file_path)
468+ reporter3.incarnation_no = reporter.incarnation_no - 1
469+ reporter3.publish_event(
470+ events.ReportingEvent('foo', 'name3', 'description'))
471+ reporter3.q.join()
472+ self.assertEqual(3, len(list(reporter3._iterate_kvps(0))))
473+
474+ def test_finish_event_result_is_logged(self):
475+ reporter = handlers.HyperVKvpReportingHandler(
476+ kvp_file_path=self.tmp_file_path)
477+ reporter.publish_event(
478+ events.FinishReportingEvent('name2', 'description1',
479+ result=events.status.FAIL))
480+ reporter.q.join()
481+ self.assertIn('FAIL', list(reporter._iterate_kvps(0))[0]['value'])
482+
483+ def test_file_operation_issue(self):
484+ os.remove(self.tmp_file_path)
485+ reporter = handlers.HyperVKvpReportingHandler(
486+ kvp_file_path=self.tmp_file_path)
487+ reporter.publish_event(
488+ events.FinishReportingEvent('name2', 'description1',
489+ result=events.status.FAIL))
490+ reporter.q.join()
491+
492+ def test_event_very_long(self):
493+ reporter = handlers.HyperVKvpReportingHandler(
494+ kvp_file_path=self.tmp_file_path)
495+ description = 'ab' * reporter.HV_KVP_EXCHANGE_MAX_VALUE_SIZE
496+ long_event = events.FinishReportingEvent(
497+ 'event_name',
498+ description,
499+ result=events.status.FAIL)
500+ reporter.publish_event(long_event)
501+ reporter.q.join()
502+ kvps = list(reporter._iterate_kvps(0))
503+ self.assertEqual(3, len(kvps))
504+
505+ # restore from the kvp to see the content are all there
506+ full_description = ''
507+ for i in range(len(kvps)):
508+ msg_slice = json.loads(kvps[i]['value'])
509+ self.assertEqual(msg_slice['msg_i'], i)
510+ full_description += msg_slice['msg']
511+ self.assertEqual(description, full_description)

Subscribers

People subscribed via source and target branches