Merge ~andyliuliming/cloud-init:reporting into cloud-init:master

Proposed by Andy
Status: Rejected
Rejected by: Chad Smith
Proposed branch: ~andyliuliming/cloud-init:reporting
Merge into: cloud-init:master
Diff against target: 511 lines (+397/-7)
7 files modified
cloudinit/cloud.py (+2/-2)
cloudinit/cmd/main.py (+2/-2)
cloudinit/distros/__init__.py (+1/-1)
cloudinit/reporting/__init__.py (+1/-1)
cloudinit/reporting/handlers.py (+255/-0)
cloudinit/stages.py (+1/-1)
tests/unittests/test_reporting_hyperv.py (+135/-0)
Reviewer Review Type Date Requested Status
Chad Smith Disapprove
Server Team CI bot continuous-integration Needs Fixing
Ryan Harper Needs Fixing
Review via email: mp+351742@code.launchpad.net

Description of the change

added one option to send the cloud init events to the kvp in hyper-v.

To post a comment you must log in.
Revision history for this message
Andy (andyliuliming) wrote :

Hi Cloud-init owners,
In this PR, one option to send the cloud init events to the kvp pool is added.
Customers can do config like this:

cat > /etc/cloud/cloud.cfg.d/06_reporting.cfg << EOF
reporting:
    logging:
        type: log
    telemetry:
        type: hyperv
EOF

Then the events would be send into the kvp pool.

Thanks,
Andy

Revision history for this message
Andy (andyliuliming) wrote :

Loop the owners of the cloud-init in๐Ÿ˜Š

From: Liu Liming <email address hidden>
Date: Saturday, August 4, 2018 at 10:54 AM
To: "<email address hidden>" <email address hidden>
Subject: Hi cloud-init owners, could you please help review this PR?

Hi Cloud-init owners,
In this PR, one option to send the cloud init events to the kvp pool is added.
Customers can do config like this:

cat > /etc/cloud/cloud.cfg.d/06_reporting.cfg << EOF
reporting:
    logging:
        type: log
    telemetry:
        type: hyperv
EOF

Then the events would be send into the kvp pool.

Thanks,
Andy

Revision history for this message
Paul Meyer (paul-meyer) wrote :

I added some questions and I also think we need to revert the non-kvp functional changes. I guess those were in the branch I pointed you at...

Revision history for this message
Ryan Harper (raharper) :
review: Needs Fixing
Revision history for this message
Andy (andyliuliming) wrote :

resolved the comments and send out a new MR.

Revision history for this message
Server Team CI bot (server-team-bot) wrote :

FAILED: Continuous integration, rev:688cd6450064f8bc803b9c763d5526a3d088eeaa
https://jenkins.ubuntu.com/server/job/cloud-init-ci/208/
Executed test runs:
    SUCCESS: Checkout
    FAILED: Unit & Style Tests

Click here to trigger a rebuild:
https://jenkins.ubuntu.com/server/job/cloud-init-ci/208/rebuild

review: Needs Fixing (continuous-integration)
Revision history for this message
Server Team CI bot (server-team-bot) wrote :

FAILED: Continuous integration, rev:da25a148e776059e07a1e21899a5eea686cb6226
https://jenkins.ubuntu.com/server/job/cloud-init-ci/209/
Executed test runs:
    SUCCESS: Checkout
    FAILED: Unit & Style Tests

Click here to trigger a rebuild:
https://jenkins.ubuntu.com/server/job/cloud-init-ci/209/rebuild

review: Needs Fixing (continuous-integration)
Revision history for this message
Server Team CI bot (server-team-bot) wrote :

FAILED: Continuous integration, rev:662011b505ff18b698c446c24173b03f53e1870f
https://jenkins.ubuntu.com/server/job/cloud-init-ci/210/
Executed test runs:
    SUCCESS: Checkout
    FAILED: Unit & Style Tests

Click here to trigger a rebuild:
https://jenkins.ubuntu.com/server/job/cloud-init-ci/210/rebuild

review: Needs Fixing (continuous-integration)
Revision history for this message
Server Team CI bot (server-team-bot) wrote :

FAILED: Continuous integration, rev:437ed8b8a09ed04f1af6aa4a86dc80ff305962a3
https://jenkins.ubuntu.com/server/job/cloud-init-ci/211/
Executed test runs:
    SUCCESS: Checkout
    FAILED: Unit & Style Tests

Click here to trigger a rebuild:
https://jenkins.ubuntu.com/server/job/cloud-init-ci/211/rebuild

review: Needs Fixing (continuous-integration)
Revision history for this message
Server Team CI bot (server-team-bot) wrote :

PASSED: Continuous integration, rev:338859269f147f95e60db40883c4b12932b18c57
https://jenkins.ubuntu.com/server/job/cloud-init-ci/212/
Executed test runs:
    SUCCESS: Checkout
    SUCCESS: Unit & Style Tests
    SUCCESS: Ubuntu LTS: Build
    SUCCESS: Ubuntu LTS: Integration
    SUCCESS: MAAS Compatability Testing
    IN_PROGRESS: Declarative: Post Actions

Click here to trigger a rebuild:
https://jenkins.ubuntu.com/server/job/cloud-init-ci/212/rebuild

review: Approve (continuous-integration)
Revision history for this message
Chad Smith (chad.smith) :
Revision history for this message
Chad Smith (chad.smith) :
~andyliuliming/cloud-init:reporting updated
af45f66... by Andy

resolve the code review comments.

85b1209... by Andy

1. resolve comments. 2. update some unit test.

Revision history for this message
Andy (andyliuliming) :
Revision history for this message
Server Team CI bot (server-team-bot) wrote :

FAILED: Continuous integration, rev:85b12092cff160b33e5ff804392dc8513b0ee037
https://jenkins.ubuntu.com/server/job/cloud-init-ci/256/
Executed test runs:
    SUCCESS: Checkout
    FAILED: Unit & Style Tests

Click here to trigger a rebuild:
https://jenkins.ubuntu.com/server/job/cloud-init-ci/256/rebuild

review: Needs Fixing (continuous-integration)
Revision history for this message
Paul Meyer (paul-meyer) wrote :

Here's a patch for the two style fixes that are needed:

diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py
index c77bf2d1..e12d8ea8 100644
--- a/cloudinit/reporting/handlers.py
+++ b/cloudinit/reporting/handlers.py
@@ -284,7 +284,8 @@ class HyperVKvpReportingHandler(ReportingHandler):
                 try:
                     if not os.path.exists(self._kvp_file_path):
                         LOG.warning(
- "skip writing event %s to %s because file not exists.",
+ "skip writing event %s to %s because " +
+ "file not exists.",
                             event.as_string(),
                             self._kvp_file_path)
                     encoded_event = self._encode_event(event)
diff --git a/tests/unittests/test_reporting_hyperv.py b/tests/unittests/test_reporting_hyperv.py
index 0b62506a..2e64c6c7 100644
--- a/tests/unittests/test_reporting_hyperv.py
+++ b/tests/unittests/test_reporting_hyperv.py
@@ -5,7 +5,6 @@ from cloudinit.reporting import handlers

 import json
 import os
-import tempfile

 from cloudinit import util
 from cloudinit.tests.helpers import CiTestCase

Revision history for this message
Chad Smith (chad.smith) wrote :

Thanks for this content Andy, I'm pulling this branch in with a minor flakes/pycodestyle fix and will attribute authorship to you. Thanks again for the contribution.
The branch that will land will be this one.

Marking this one as Rejected in favor of the followup:
https://code.launchpad.net/~chad.smith/cloud-init/+git/cloud-init/+merge/353739

review: Disapprove

Unmerged commits

85b1209... by Andy

1. resolve comments. 2. update some unit test.

af45f66... by Andy

resolve the code review comments.

3388592... by Andy

overwrite the event passed.

5d68f97... by Paul Meyer

Add Hyper-V KVP reporter

e8b9a57... by Paul Meyer

Fix inaccurate logging

3246f4c... by Paul Meyer

Add extra util.subp logging

9e38bf8... by Paul Meyer

typos

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
diff --git a/cloudinit/cloud.py b/cloudinit/cloud.py
index 6d12c43..7ae98e1 100644
--- a/cloudinit/cloud.py
+++ b/cloudinit/cloud.py
@@ -47,7 +47,7 @@ class Cloud(object):
4747
48 @property48 @property
49 def cfg(self):49 def cfg(self):
50 # Ensure that not indirectly modified50 # Ensure that cfg is not indirectly modified
51 return copy.deepcopy(self._cfg)51 return copy.deepcopy(self._cfg)
5252
53 def run(self, name, functor, args, freq=None, clear_on_fail=False):53 def run(self, name, functor, args, freq=None, clear_on_fail=False):
@@ -61,7 +61,7 @@ class Cloud(object):
61 return None61 return None
62 return fn62 return fn
6363
64 # The rest of thes are just useful proxies64 # The rest of these are just useful proxies
65 def get_userdata(self, apply_filter=True):65 def get_userdata(self, apply_filter=True):
66 return self.datasource.get_userdata(apply_filter)66 return self.datasource.get_userdata(apply_filter)
6767
diff --git a/cloudinit/cmd/main.py b/cloudinit/cmd/main.py
index d6ba90f..c0edee1 100644
--- a/cloudinit/cmd/main.py
+++ b/cloudinit/cmd/main.py
@@ -315,7 +315,7 @@ def main_init(name, args):
315 existing = "trust"315 existing = "trust"
316316
317 init.purge_cache()317 init.purge_cache()
318 # Delete the non-net file as well318 # Delete the no-net file as well
319 util.del_file(os.path.join(path_helper.get_cpath("data"), "no-net"))319 util.del_file(os.path.join(path_helper.get_cpath("data"), "no-net"))
320320
321 # Stage 5321 # Stage 5
@@ -339,7 +339,7 @@ def main_init(name, args):
339 " Likely bad things to come!"))339 " Likely bad things to come!"))
340 if not args.force:340 if not args.force:
341 init.apply_network_config(bring_up=not args.local)341 init.apply_network_config(bring_up=not args.local)
342 LOG.debug("[%s] Exiting without datasource in local mode", mode)342 LOG.debug("[%s] Exiting without datasource", mode)
343 if mode == sources.DSMODE_LOCAL:343 if mode == sources.DSMODE_LOCAL:
344 return (None, [])344 return (None, [])
345 else:345 else:
diff --git a/cloudinit/distros/__init__.py b/cloudinit/distros/__init__.py
index ab0b077..fde054e 100755
--- a/cloudinit/distros/__init__.py
+++ b/cloudinit/distros/__init__.py
@@ -157,7 +157,7 @@ class Distro(object):
157 distro)157 distro)
158 header = '\n'.join([158 header = '\n'.join([
159 "# Converted from network_config for distro %s" % distro,159 "# Converted from network_config for distro %s" % distro,
160 "# Implmentation of _write_network_config is needed."160 "# Implementation of _write_network_config is needed."
161 ])161 ])
162 ns = network_state.parse_net_config_data(netconfig)162 ns = network_state.parse_net_config_data(netconfig)
163 contents = eni.network_state_to_eni(163 contents = eni.network_state_to_eni(
diff --git a/cloudinit/reporting/__init__.py b/cloudinit/reporting/__init__.py
index 1ed2b48..e047767 100644
--- a/cloudinit/reporting/__init__.py
+++ b/cloudinit/reporting/__init__.py
@@ -18,7 +18,7 @@ DEFAULT_CONFIG = {
1818
1919
20def update_configuration(config):20def update_configuration(config):
21 """Update the instanciated_handler_registry.21 """Update the instantiated_handler_registry.
2222
23 :param config:23 :param config:
24 The dictionary containing changes to apply. If a key is given24 The dictionary containing changes to apply. If a key is given
diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py
index 4066076..c77bf2d 100644
--- a/cloudinit/reporting/handlers.py
+++ b/cloudinit/reporting/handlers.py
@@ -1,17 +1,34 @@
1# This file is part of cloud-init. See LICENSE file for license information.1# This file is part of cloud-init. See LICENSE file for license information.
22
3import abc3import abc
4import fcntl
4import json5import json
5import six6import six
7import os
8import re
9import struct
10import threading
11import time
612
7from cloudinit import log as logging13from cloudinit import log as logging
8from cloudinit.registry import DictRegistry14from cloudinit.registry import DictRegistry
9from cloudinit import (url_helper, util)15from cloudinit import (url_helper, util)
16from datetime import datetime
1017
18if six.PY2:
19 import multiprocessing.queues as queue
20 from multiprocessing.queues import JoinableQueue as JQueue
21else:
22 import queue
23 from queue import Queue as JQueue
1124
12LOG = logging.getLogger(__name__)25LOG = logging.getLogger(__name__)
1326
1427
28class ReportException(Exception):
29 pass
30
31
15@six.add_metaclass(abc.ABCMeta)32@six.add_metaclass(abc.ABCMeta)
16class ReportingHandler(object):33class ReportingHandler(object):
17 """Base class for report handlers.34 """Base class for report handlers.
@@ -85,9 +102,247 @@ class WebHookHandler(ReportingHandler):
85 LOG.warning("failed posting event: %s", event.as_string())102 LOG.warning("failed posting event: %s", event.as_string())
86103
87104
105class HyperVKvpReportingHandler(ReportingHandler):
106 """
107 Reports events to a Hyper-V host using Key-Value-Pair exchange protocol
108 and can be used to obtain high level diagnostic information from the host.
109
110 To use this facility, the KVP user-space daemon (hv_kvp_daemon) has to be
111 running. It reads the kvp_file when the host requests the guest to
112 enumerate the KVP's.
113
114 This reporter collates all events for a module (origin|name) in a single
115 json string in the dictionary.
116
117 For more information, see
118 https://technet.microsoft.com/en-us/library/dn798287.aspx#Linux%20guests
119 """
120 HV_KVP_EXCHANGE_MAX_VALUE_SIZE = 2048
121 HV_KVP_EXCHANGE_MAX_KEY_SIZE = 512
122 HV_KVP_RECORD_SIZE = (HV_KVP_EXCHANGE_MAX_KEY_SIZE +
123 HV_KVP_EXCHANGE_MAX_VALUE_SIZE)
124 EVENT_PREFIX = 'CLOUD_INIT'
125 MSG_KEY = 'msg'
126 RESULT_KEY = 'result'
127 DESC_IDX_KEY = 'msg_i'
128 JSON_SEPARATORS = (',', ':')
129 KVP_POOL_FILE_GUEST = '/var/lib/hyperv/.kvp_pool_1'
130
131 def __init__(self,
132 kvp_file_path=KVP_POOL_FILE_GUEST,
133 event_types=None):
134 super(HyperVKvpReportingHandler, self).__init__()
135 self._kvp_file_path = kvp_file_path
136 self._event_types = event_types
137 self.running = False
138 self.queue_lock = threading.Lock()
139 self.running_lock = threading.Lock()
140 self.q = JQueue()
141 self.kvp_file = None
142 self.incarnation_no = self._get_incarnation_no()
143 self.event_key_prefix = u"{0}|{1}".format(self.EVENT_PREFIX,
144 self.incarnation_no)
145 self._current_offset = 0
146
147 def _get_incarnation_no(self):
148 """
149 use the time passed as the incarnation number.
150 the incarnation number is the number which are used to
151 distinguish the old data stored in kvp and the new data.
152 """
153 uptime_str = util.uptime()
154 try:
155 return int(time.time() - float(uptime_str))
156 except ValueError:
157 LOG.warning("uptime '%s' not in correct format.", uptime_str)
158 return 0
159
160 def _iterate_kvps(self, offset):
161 """iterate the kvp file from the current offset."""
162 try:
163 with open(self._kvp_file_path, 'rb+') as f:
164 self.kvp_file = f
165 fcntl.flock(f, fcntl.LOCK_EX)
166 f.seek(offset)
167 record_data = f.read(self.HV_KVP_RECORD_SIZE)
168 while len(record_data) == self.HV_KVP_RECORD_SIZE:
169 self._current_offset += self.HV_KVP_RECORD_SIZE
170 kvp_item = self._decode_kvp_item(record_data)
171 yield kvp_item
172 record_data = f.read(self.HV_KVP_RECORD_SIZE)
173 fcntl.flock(f, fcntl.LOCK_UN)
174 finally:
175 self.kvp_file = None
176
177 def _event_key(self, event):
178 """
179 the event key format is:
180 CLOUD_INIT|<incarnation number>|<event_type>|<event_name>
181 """
182 return u"{0}|{1}|{2}".format(self.event_key_prefix,
183 event.event_type, event.name)
184
185 def _encode_kvp_item(self, key, value):
186 data = (struct.pack("%ds%ds" % (
187 self.HV_KVP_EXCHANGE_MAX_KEY_SIZE,
188 self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE),
189 key.encode('utf-8'), value.encode('utf-8')))
190 return data
191
192 def _decode_kvp_item(self, record_data):
193 record_data_len = len(record_data)
194 if record_data_len != self.HV_KVP_RECORD_SIZE:
195 raise ReportException(
196 "record_data len not correct {0} {1}."
197 .format(record_data_len, self.HV_KVP_RECORD_SIZE))
198 k = (record_data[0:self.HV_KVP_EXCHANGE_MAX_KEY_SIZE].decode('utf-8')
199 .strip('\x00'))
200 v = (
201 record_data[
202 self.HV_KVP_EXCHANGE_MAX_KEY_SIZE:self.HV_KVP_RECORD_SIZE
203 ].decode('utf-8').strip('\x00'))
204
205 return {'key': k, 'value': v}
206
207 def _update_kvp_item(self, record_data):
208 if self.kvp_file is None:
209 raise ReportException(
210 "kvp file '{0}' not opened."
211 .format(self._kvp_file_path))
212 self.kvp_file.seek(-self.HV_KVP_RECORD_SIZE, 1)
213 self.kvp_file.write(record_data)
214
215 def _append_kvp_item(self, record_data):
216 with open(self._kvp_file_path, 'rb+') as f:
217 fcntl.flock(f, fcntl.LOCK_EX)
218 # seek to end of the file
219 f.seek(0, 2)
220 f.write(record_data)
221 f.flush()
222 fcntl.flock(f, fcntl.LOCK_UN)
223 self._current_offset = f.tell()
224
225 def _break_down(self, key, meta_data, description):
226 del meta_data[self.MSG_KEY]
227 des_in_json = json.dumps(description)
228 des_in_json = des_in_json[1:(len(des_in_json) - 1)]
229 i = 0
230 result_array = []
231 message_place_holder = "\"" + self.MSG_KEY + "\":\"\""
232 while True:
233 meta_data[self.DESC_IDX_KEY] = i
234 meta_data[self.MSG_KEY] = ''
235 data_without_desc = json.dumps(meta_data,
236 separators=self.JSON_SEPARATORS)
237 room_for_desc = (
238 self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE -
239 len(data_without_desc) - 8)
240 value = data_without_desc.replace(
241 message_place_holder,
242 '"{key}":"{desc}"'.format(
243 key=self.MSG_KEY, desc=des_in_json[:room_for_desc]))
244 result_array.append(self._encode_kvp_item(key, value))
245 i += 1
246 des_in_json = des_in_json[room_for_desc:]
247 if len(des_in_json) == 0:
248 break
249 return result_array
250
251 def _encode_event(self, event):
252 """
253 encode the event into kvp data bytes.
254 if the event content reaches the maximum length of kvp value.
255 then it would be cut to multiple slices.
256 """
257 key = self._event_key(event)
258 meta_data = {
259 "name": event.name,
260 "type": event.event_type,
261 "ts": (datetime.utcfromtimestamp(event.timestamp)
262 .isoformat() + 'Z'),
263 }
264 if hasattr(event, self.RESULT_KEY):
265 meta_data[self.RESULT_KEY] = event.result
266 meta_data[self.MSG_KEY] = event.description
267 value = json.dumps(meta_data, separators=self.JSON_SEPARATORS)
268 # if it reaches the maximum length of kvp value,
269 # break it down to slices.
270 # this should be very corner case.
271 if len(value) > self.HV_KVP_EXCHANGE_MAX_VALUE_SIZE:
272 return self._break_down(key, meta_data, event.description)
273 else:
274 data = self._encode_kvp_item(key, value)
275 return [data]
276
277 def _publish_event_routine(self):
278 while True:
279 event = None
280 try:
281 # acquire the lock.
282 event = self.q.get_nowait()
283 need_append = True
284 try:
285 if not os.path.exists(self._kvp_file_path):
286 LOG.warning(
287 "skip writing event %s to %s because file not exists.",
288 event.as_string(),
289 self._kvp_file_path)
290 encoded_event = self._encode_event(event)
291 # for each encoded_event
292 for encoded_data in (encoded_event):
293 for kvp in self._iterate_kvps(self._current_offset):
294 match = (
295 re.match(
296 r"^{0}\|(\d+)\|.+"
297 .format(self.EVENT_PREFIX),
298 kvp['key']
299 ))
300 if match:
301 match_groups = match.groups(0)
302 if int(match_groups[0]) < self.incarnation_no:
303 need_append = False
304 self._update_kvp_item(encoded_data)
305 break
306 if need_append:
307 self._append_kvp_item(encoded_data)
308 except IOError as e:
309 LOG.warning(
310 "failed posting event to kvp: %s e:%s",
311 event.as_string(), e)
312 self.running = False
313 break
314 finally:
315 self.q.task_done()
316 except queue.Empty:
317 with self.queue_lock:
318 # double check the queue is empty
319 if self.q.empty():
320 self.running = False
321 break
322
323 def trigger_publish_event(self):
324 if not self.running:
325 with self.running_lock:
326 if not self.running:
327 self.running = True
328 thread = threading.Thread(
329 target=self._publish_event_routine)
330 thread.start()
331
332 # since the saving to the kvp pool can be a time costing task
333 # if the kvp pool already contains a chunk of data,
334 # so defer it to another thread.
335 def publish_event(self, event):
336 if (not self._event_types or event.event_type in self._event_types):
337 with self.queue_lock:
338 self.q.put(event)
339 self.trigger_publish_event()
340
341
88available_handlers = DictRegistry()342available_handlers = DictRegistry()
89available_handlers.register_item('log', LogHandler)343available_handlers.register_item('log', LogHandler)
90available_handlers.register_item('print', PrintHandler)344available_handlers.register_item('print', PrintHandler)
91available_handlers.register_item('webhook', WebHookHandler)345available_handlers.register_item('webhook', WebHookHandler)
346available_handlers.register_item('hyperv', HyperVKvpReportingHandler)
92347
93# vi: ts=4 expandtab348# vi: ts=4 expandtab
diff --git a/cloudinit/stages.py b/cloudinit/stages.py
index c132b57..8874d40 100644
--- a/cloudinit/stages.py
+++ b/cloudinit/stages.py
@@ -510,7 +510,7 @@ class Init(object):
510 # The default frequency if handlers don't have one510 # The default frequency if handlers don't have one
511 'frequency': frequency,511 'frequency': frequency,
512 # This will be used when new handlers are found512 # This will be used when new handlers are found
513 # to help write there contents to files with numbered513 # to help write their contents to files with numbered
514 # names...514 # names...
515 'handlercount': 0,515 'handlercount': 0,
516 'excluded': excluded,516 'excluded': excluded,
diff --git a/tests/unittests/test_reporting_hyperv.py b/tests/unittests/test_reporting_hyperv.py
517new file mode 100644517new file mode 100644
index 0000000..0b62506
--- /dev/null
+++ b/tests/unittests/test_reporting_hyperv.py
@@ -0,0 +1,135 @@
1# This file is part of cloud-init. See LICENSE file for license information.
2
3from cloudinit.reporting import events
4from cloudinit.reporting import handlers
5
6import json
7import os
8import tempfile
9
10from cloudinit import util
11from cloudinit.tests.helpers import CiTestCase
12
13
14class TestKvpEncoding(CiTestCase):
15 def test_encode_decode(self):
16 kvp = {'key': 'key1', 'value': 'value1'}
17 kvp_reporting = handlers.HyperVKvpReportingHandler()
18 data = kvp_reporting._encode_kvp_item(kvp['key'], kvp['value'])
19 self.assertEqual(len(data), kvp_reporting.HV_KVP_RECORD_SIZE)
20 decoded_kvp = kvp_reporting._decode_kvp_item(data)
21 self.assertEqual(kvp, decoded_kvp)
22
23
24class TextKvpReporter(CiTestCase):
25 def setUp(self):
26 super(TextKvpReporter, self).setUp()
27 self.tmp_file_path = self.tmp_path('kvp_pool_file')
28 util.ensure_file(self.tmp_file_path)
29
30 def test_event_type_can_be_filtered(self):
31 reporter = handlers.HyperVKvpReportingHandler(
32 kvp_file_path=self.tmp_file_path,
33 event_types=['foo', 'bar'])
34
35 reporter.publish_event(
36 events.ReportingEvent('foo', 'name', 'description'))
37 reporter.publish_event(
38 events.ReportingEvent('some_other', 'name', 'description3'))
39 reporter.q.join()
40
41 kvps = list(reporter._iterate_kvps(0))
42 self.assertEqual(1, len(kvps))
43
44 reporter.publish_event(
45 events.ReportingEvent('bar', 'name', 'description2'))
46 reporter.q.join()
47 kvps = list(reporter._iterate_kvps(0))
48 self.assertEqual(2, len(kvps))
49
50 self.assertIn('foo', kvps[0]['key'])
51 self.assertIn('bar', kvps[1]['key'])
52 self.assertNotIn('some_other', kvps[0]['key'])
53 self.assertNotIn('some_other', kvps[1]['key'])
54
55 def test_events_are_over_written(self):
56 reporter = handlers.HyperVKvpReportingHandler(
57 kvp_file_path=self.tmp_file_path)
58
59 self.assertEqual(0, len(list(reporter._iterate_kvps(0))))
60
61 reporter.publish_event(
62 events.ReportingEvent('foo', 'name1', 'description'))
63 reporter.publish_event(
64 events.ReportingEvent('foo', 'name2', 'description'))
65 reporter.q.join()
66 self.assertEqual(2, len(list(reporter._iterate_kvps(0))))
67
68 reporter2 = handlers.HyperVKvpReportingHandler(
69 kvp_file_path=self.tmp_file_path)
70 reporter2.incarnation_no = reporter.incarnation_no + 1
71 reporter2.publish_event(
72 events.ReportingEvent('foo', 'name3', 'description'))
73 reporter2.q.join()
74
75 self.assertEqual(2, len(list(reporter2._iterate_kvps(0))))
76
77 def test_events_with_higher_incarnation_not_over_written(self):
78 reporter = handlers.HyperVKvpReportingHandler(
79 kvp_file_path=self.tmp_file_path)
80
81 self.assertEqual(0, len(list(reporter._iterate_kvps(0))))
82
83 reporter.publish_event(
84 events.ReportingEvent('foo', 'name1', 'description'))
85 reporter.publish_event(
86 events.ReportingEvent('foo', 'name2', 'description'))
87 reporter.q.join()
88 self.assertEqual(2, len(list(reporter._iterate_kvps(0))))
89
90 reporter3 = handlers.HyperVKvpReportingHandler(
91 kvp_file_path=self.tmp_file_path)
92 reporter3.incarnation_no = reporter.incarnation_no - 1
93 reporter3.publish_event(
94 events.ReportingEvent('foo', 'name3', 'description'))
95 reporter3.q.join()
96 self.assertEqual(3, len(list(reporter3._iterate_kvps(0))))
97
98 def test_finish_event_result_is_logged(self):
99 reporter = handlers.HyperVKvpReportingHandler(
100 kvp_file_path=self.tmp_file_path)
101 reporter.publish_event(
102 events.FinishReportingEvent('name2', 'description1',
103 result=events.status.FAIL))
104 reporter.q.join()
105 self.assertIn('FAIL', list(reporter._iterate_kvps(0))[0]['value'])
106
107 def test_file_operation_issue(self):
108 os.remove(self.tmp_file_path)
109 reporter = handlers.HyperVKvpReportingHandler(
110 kvp_file_path=self.tmp_file_path)
111 reporter.publish_event(
112 events.FinishReportingEvent('name2', 'description1',
113 result=events.status.FAIL))
114 reporter.q.join()
115
116 def test_event_very_long(self):
117 reporter = handlers.HyperVKvpReportingHandler(
118 kvp_file_path=self.tmp_file_path)
119 description = 'ab' * reporter.HV_KVP_EXCHANGE_MAX_VALUE_SIZE
120 long_event = events.FinishReportingEvent(
121 'event_name',
122 description,
123 result=events.status.FAIL)
124 reporter.publish_event(long_event)
125 reporter.q.join()
126 kvps = list(reporter._iterate_kvps(0))
127 self.assertEqual(3, len(kvps))
128
129 # restore from the kvp to see the content are all there
130 full_description = ''
131 for i in range(len(kvps)):
132 msg_slice = json.loads(kvps[i]['value'])
133 self.assertEqual(msg_slice['msg_i'], i)
134 full_description += msg_slice['msg']
135 self.assertEqual(description, full_description)

Subscribers

People subscribed via source and target branches