Merge lp:~therve/landscape-client/fake-package-reporter into lp:~landscape/landscape-client/trunk

Proposed by Thomas Herve
Status: Merged
Approved by: Frank Wierzbicki
Approved revision: 365
Merged at revision: 360
Proposed branch: lp:~therve/landscape-client/fake-package-reporter
Merge into: lp:~landscape/landscape-client/trunk
Diff against target: 476 lines (+277/-24)
5 files modified
landscape/broker/store.py (+15/-14)
landscape/package/reporter.py (+83/-6)
landscape/package/store.py (+47/-0)
landscape/package/taskhandler.py (+2/-1)
landscape/package/tests/test_reporter.py (+130/-3)
To merge this branch: bzr merge lp:~therve/landscape-client/fake-package-reporter
Reviewer Review Type Date Requested Status
Frank Wierzbicki (community) Approve
Free Ekanayaka (community) Approve
Review via email: mp+72155@code.launchpad.net

Description of the change

The branch introduces 2 reporters: one named global or "master", which operates as a normal reporter but stores the messages sent. The other has a pointer to that store, and send those messages instead of generating new ones using smart. I used env variables to instrumentate it, which is not great but simple. Using different plugins means creating new modules for each one, and new scripts.

To post a comment you must log in.
Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

Looks good! I have a few comments though.

[1]

+class FakePackageStore(PackageStore):

It'd be good to add direct tests for this class.

[2]

+ if not os.path.exists(global_store):
+ return succeed(None)

Why is this needed?

[3]

+ global_db = sqlite3.connect(global_store)
+ cursor = global_db.cursor()
+ all_message_ids = set(
+ row[0] for row in
+ cursor.execute("SELECT id FROM message").fetchall())

Can't FakePackageStore.get_message_ids be used instead?

[4]

+ messages = list(
+ (row[0], row[1]) for row in
+ cursor.execute(
+ "SELECT id, data FROM message WHERE id IN (%s) "
+ "ORDER BY id" % params, tuple(not_sent)).fetchall())

It'd be nice to encapsulate this if FakePackageStore, and test it separately.

[5]

+ lambda x, message=message:
+ self._broker.send_message(message, True))

Ah, nice trick to get the message variable in the closure.

[6]

+ def send_message(self, message):
+ return succeed(None)
+
+ def use_hash_id_db(self):
+ return succeed(None)
+
+ def request_unknown_hashes(self):
+ return succeed(None)
+
+ def remove_expired_hash_id_requests(self):
+ return succeed(None)
+
+ def handle_task(self, task):
+ return succeed(None)
+

Instead of this, why not override the run() method directly? It would avoid having to modify the fake reporter in case we add methods to the real one.

review: Needs Fixing
364. By Thomas Herve

Move SQL in the store

365. By Thomas Herve

Implement run instead

Revision history for this message
Thomas Herve (therve) wrote :

[1] This is for load testing, so I don't think we need to go out of our way to test this...

[2] The fake clients make start a bit before the master one, so I want to make sure it doesn't crash.

[3] Yep done.

[4] Moved.

[6] Fixed.

Thanks!

Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

Tahnks, +1!

[7]

+ result.addCallback(lambda x: self.handle_tasks())

Maybe this can just be

         result.addCallback(lambda x: self._store.clear_tasks())

and handle_task() can be dropped.

[8]

+ def __init__(self, package_store, package_facade, remote_broker, config):
+ super(FakeGlobalReporter, self).__init__(package_store, package_facade,
+ remote_broker, config)

Do we need this?

review: Approve
Revision history for this message
Frank Wierzbicki (fwierzbicki) wrote :

I can't add anything to Free's comments - looks good to me +1!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'landscape/broker/store.py'
2--- landscape/broker/store.py 2011-07-21 23:55:47 +0000
3+++ landscape/broker/store.py 2011-08-19 14:11:42 +0000
4@@ -43,9 +43,7 @@
5 api = SERVER_API
6
7 def __init__(self, persist, directory, directory_size=1000,
8- monitor_interval=60*60, get_time=time.time):
9- """
10- """
11+ monitor_interval=60 * 60, get_time=time.time):
12 self._get_time = get_time
13 self._directory = directory
14 self._directory_size = directory_size
15@@ -82,8 +80,8 @@
16 def get_sequence(self):
17 """Get the current sequence.
18
19- @return: The sequence number of the message that the server expects us to
20- send on the next exchange.
21+ @return: The sequence number of the message that the server expects us
22+ to send on the next exchange.
23 """
24 return self._persist.get("sequence", 0)
25
26@@ -98,8 +96,8 @@
27 def get_server_sequence(self):
28 """Get the current server sequence.
29
30- @return: the sequence number of the message that we will ask the server to
31- send to us on the next exchange.
32+ @return: the sequence number of the message that we will ask the server
33+ to send to us on the next exchange.
34 """
35 return self._persist.get("server_sequence", 0)
36
37@@ -161,7 +159,7 @@
38
39 def delete_old_messages(self):
40 """Delete messages which are unlikely to be needed in the future."""
41- for fn in itertools.islice(self._walk_messages(exclude=HELD+BROKEN),
42+ for fn in itertools.islice(self._walk_messages(exclude=HELD + BROKEN),
43 self.get_pending_offset()):
44 os.unlink(fn)
45 containing_dir = os.path.split(fn)[0]
46@@ -256,7 +254,8 @@
47 def _walk_pending_messages(self):
48 """Walk the files which are definitely pending."""
49 pending_offset = self.get_pending_offset()
50- for i, filename in enumerate(self._walk_messages(exclude=HELD+BROKEN)):
51+ for i, filename in enumerate(self._walk_messages(exclude=HELD +
52+ BROKEN)):
53 if i >= pending_offset:
54 yield filename
55
56@@ -308,10 +307,10 @@
57 if accepted:
58 new_filename = self._get_next_message_filename()
59 os.rename(old_filename, new_filename)
60- self._set_flags(new_filename, set(flags)-set(HELD))
61+ self._set_flags(new_filename, set(flags) - set(HELD))
62 else:
63 if not accepted and offset >= pending_offset:
64- self._set_flags(old_filename, set(flags)|set(HELD))
65+ self._set_flags(old_filename, set(flags) | set(HELD))
66 offset += 1
67
68 def _get_flags(self, path):
69@@ -324,16 +323,18 @@
70 dirname, basename = os.path.split(path)
71 new_path = os.path.join(dirname, basename.split("_")[0])
72 if flags:
73- new_path += "_"+"".join(sorted(set(flags)))
74+ new_path += "_" + "".join(sorted(set(flags)))
75 os.rename(path, new_path)
76 return new_path
77
78 def _add_flags(self, path, flags):
79- self._set_flags(path, self._get_flags(path)+flags)
80+ self._set_flags(path, self._get_flags(path) + flags)
81
82
83 def get_default_message_store(*args, **kwargs):
84- """Get a L{MessageStore} object with all Landscape message schemas added."""
85+ """
86+ Get a L{MessageStore} object with all Landscape message schemas added.
87+ """
88 from landscape. message_schemas import message_schemas
89 store = MessageStore(*args, **kwargs)
90 for schema in message_schemas.values():
91
92=== modified file 'landscape/package/reporter.py'
93--- landscape/package/reporter.py 2011-08-10 14:56:43 +0000
94+++ landscape/package/reporter.py 2011-08-19 14:11:42 +0000
95@@ -10,10 +10,11 @@
96 from landscape.lib.twisted_util import gather_results, spawn_process
97 from landscape.lib.fetch import fetch_async
98 from landscape.lib.fs import touch_file
99+from landscape.lib import bpickle
100
101 from landscape.package.taskhandler import (
102 PackageTaskHandlerConfiguration, PackageTaskHandler, run_task_handler)
103-from landscape.package.store import UnknownHashIDRequest
104+from landscape.package.store import UnknownHashIDRequest, FakePackageStore
105
106
107 HASH_ID_REQUEST_TIMEOUT = 7200
108@@ -79,6 +80,9 @@
109 result.callback(None)
110 return result
111
112+ def send_message(self, message):
113+ return self._broker.send_message(message, True)
114+
115 def fetch_hash_id_db(self):
116 """
117 Fetch the appropriate pre-canned database of hash=>id mappings
118@@ -219,7 +223,7 @@
119 "type": "package-reporter-result",
120 "code": code,
121 "err": err}
122- return self._broker.send_message(message, True)
123+ return self.send_message(message)
124
125 def handle_task(self, task):
126 message = task.data
127@@ -389,7 +393,7 @@
128 """Create a hash_id_request and send message with "request-id"."""
129 request = self._store.add_hash_id_request(unknown_hashes)
130 message["request-id"] = request.id
131- result = self._broker.send_message(message, True)
132+ result = self.send_message(message)
133
134 def set_message_id(message_id):
135 request.message_id = message_id
136@@ -530,7 +534,7 @@
137 return succeed(False)
138
139 message["type"] = "packages"
140- result = self._broker.send_message(message, True)
141+ result = self.send_message(message)
142
143 logging.info("Queuing message with changes in known packages: "
144 "%d installed, %d available, %d available upgrades, "
145@@ -595,7 +599,7 @@
146 return succeed(False)
147
148 message["type"] = "package-locks"
149- result = self._broker.send_message(message, True)
150+ result = self.send_message(message)
151
152 logging.info("Queuing message with changes in known package locks:"
153 " %d created, %d deleted." %
154@@ -613,8 +617,81 @@
155 return result
156
157
158+class FakeGlobalReporter(PackageReporter):
159+ """
160+ A standard reporter, which additionally stores messages sent into its
161+ package store.
162+ """
163+
164+ package_store_class = FakePackageStore
165+
166+ def __init__(self, package_store, package_facade, remote_broker, config):
167+ super(FakeGlobalReporter, self).__init__(package_store, package_facade,
168+ remote_broker, config)
169+
170+ def send_message(self, message):
171+ self._store.save_message(message)
172+ return super(FakeGlobalReporter, self).send_message(message)
173+
174+
175+class FakeReporter(PackageReporter):
176+ """
177+ A fake reporter which only sends messages previously stored by a
178+ L{FakeGlobalReporter}.
179+ """
180+
181+ package_store_class = FakePackageStore
182+
183+ def run(self):
184+ result = succeed(None)
185+
186+ # If the appropriate hash=>id db is not there, fetch it
187+ result.addCallback(lambda x: self.fetch_hash_id_db())
188+
189+ result.addCallback(lambda x: self.handle_tasks())
190+
191+ # Finally, verify if we have anything new to send to the server.
192+ result.addCallback(lambda x: self.send_pending_messages())
193+
194+ return result
195+
196+ def handle_task(self, task):
197+ return succeed(None)
198+
199+ def send_pending_messages(self):
200+ """
201+ As the last callback of L{PackageReporter}, sends messages stored.
202+ """
203+ global_file = os.environ["FAKE_PACKAGE_STORE"]
204+ if not os.path.exists(global_file):
205+ return succeed(None)
206+ message_sent = set(self._store.get_message_ids())
207+ global_store = FakePackageStore(global_file)
208+ all_message_ids = set(global_store.get_message_ids())
209+ not_sent = all_message_ids - message_sent
210+ deferred = succeed(None)
211+ got_type = set()
212+ if not_sent:
213+ messages = global_store.get_messages_by_ids(not_sent)
214+ sent = []
215+ for message_id, message in messages:
216+ message = bpickle.loads(str(message))
217+ if message["type"] not in got_type:
218+ got_type.add(message["type"])
219+ sent.append(message_id)
220+ deferred.addCallback(
221+ lambda x, message=message: self.send_message(message))
222+ self._store.save_message_ids(sent)
223+ return deferred
224+
225+
226 def main(args):
227- return run_task_handler(PackageReporter, args)
228+ if "FAKE_PACKAGE_STORE" in os.environ:
229+ return run_task_handler(FakeReporter, args)
230+ elif "FAKE_GLOBAL_PACKAGE_STORE" in os.environ:
231+ return run_task_handler(FakeGlobalReporter, args)
232+ else:
233+ return run_task_handler(PackageReporter, args)
234
235
236 def find_reporter_command():
237
238=== modified file 'landscape/package/store.py'
239--- landscape/package/store.py 2011-07-21 23:55:47 +0000
240+++ landscape/package/store.py 2011-08-19 14:11:42 +0000
241@@ -329,6 +329,40 @@
242 ",".join([str(task.id) for task in except_tasks]))
243
244
245+class FakePackageStore(PackageStore):
246+ """
247+ A L{PackageStore} with an additional message table to store sent messages.
248+ """
249+
250+ def _ensure_schema(self):
251+ super(FakePackageStore, self)._ensure_schema()
252+ ensure_fake_package_schema(self._db)
253+
254+ @with_cursor
255+ def save_message(self, cursor, message):
256+ cursor.execute("INSERT INTO message (data) VALUES (?)",
257+ (buffer(bpickle.dumps(message)),))
258+
259+ @with_cursor
260+ def get_message_ids(self, cursor):
261+ return [row[0] for row in
262+ cursor.execute("SELECT id FROM message").fetchall()]
263+
264+ @with_cursor
265+ def save_message_ids(self, cursor, message_ids):
266+ cursor.executemany(
267+ "INSERT INTO message (id) VALUES (?)",
268+ [(message_id,) for message_id in message_ids])
269+
270+ @with_cursor
271+ def get_messages_by_ids(self, cursor, message_ids):
272+ params = ", ".join(["?"] * len(message_ids))
273+ result = cursor.execute(
274+ "SELECT id, data FROM message WHERE id IN (%s) "
275+ "ORDER BY id" % params, tuple(message_ids)).fetchall()
276+ return [(row[0], row[1]) for row in result]
277+
278+
279 class HashIDRequest(object):
280
281 def __init__(self, db, id):
282@@ -447,3 +481,16 @@
283 else:
284 cursor.close()
285 db.commit()
286+
287+
288+def ensure_fake_package_schema(db):
289+ cursor = db.cursor()
290+ try:
291+ cursor.execute("CREATE TABLE message"
292+ " (id INTEGER PRIMARY KEY, data BLOB)")
293+ except (sqlite3.OperationalError, sqlite3.DatabaseError):
294+ cursor.close()
295+ db.rollback()
296+ else:
297+ cursor.close()
298+ db.commit()
299
300=== modified file 'landscape/package/taskhandler.py'
301--- landscape/package/taskhandler.py 2011-06-13 23:58:09 +0000
302+++ landscape/package/taskhandler.py 2011-08-19 14:11:42 +0000
303@@ -85,6 +85,7 @@
304
305 queue_name = "default"
306 lsb_release_filename = LSB_RELEASE_FILENAME
307+ package_store_class = PackageStore
308
309 def __init__(self, package_store, package_facade, remote_broker, config):
310 self._store = package_store
311@@ -261,7 +262,7 @@
312 # import Smart unless we need to.
313 from landscape.package.facade import SmartFacade
314
315- package_store = PackageStore(config.store_filename)
316+ package_store = cls.package_store_class(config.store_filename)
317 package_facade = SmartFacade()
318
319 def finish():
320
321=== modified file 'landscape/package/tests/test_reporter.py'
322--- landscape/package/tests/test_reporter.py 2011-08-10 14:56:43 +0000
323+++ landscape/package/tests/test_reporter.py 2011-08-19 14:11:42 +0000
324@@ -8,15 +8,18 @@
325 from twisted.internet import reactor
326
327 from landscape.lib.fetch import fetch_async, FetchError
328-from landscape.package.store import PackageStore, UnknownHashIDRequest
329+from landscape.lib import bpickle
330+from landscape.package.store import (
331+ PackageStore, UnknownHashIDRequest, FakePackageStore)
332 from landscape.package.reporter import (
333 PackageReporter, HASH_ID_REQUEST_TIMEOUT, main, find_reporter_command,
334- PackageReporterConfiguration)
335+ PackageReporterConfiguration, FakeGlobalReporter, FakeReporter)
336 from landscape.package import reporter
337 from landscape.package.facade import SmartFacade
338 from landscape.package.tests.helpers import (
339 SmartFacadeHelper, HASH1, HASH2, HASH3)
340-from landscape.tests.helpers import LandscapeTest, BrokerServiceHelper
341+from landscape.tests.helpers import (
342+ LandscapeTest, BrokerServiceHelper, EnvironSaverHelper)
343 from landscape.tests.mocker import ANY
344
345 SAMPLE_LSB_RELEASE = "DISTRIB_CODENAME=codename\n"
346@@ -1589,6 +1592,130 @@
347 return deferred
348
349
350+class GlobalPackageReporterTest(LandscapeTest):
351+
352+ helpers = [SmartFacadeHelper, BrokerServiceHelper]
353+
354+ def setUp(self):
355+
356+ def set_up(ignored):
357+ self.store = FakePackageStore(self.makeFile())
358+ self.config = PackageReporterConfiguration()
359+ self.reporter = FakeGlobalReporter(
360+ self.store, self.facade, self.remote, self.config)
361+ self.config.data_path = self.makeDir()
362+ os.mkdir(self.config.package_directory)
363+
364+ result = super(GlobalPackageReporterTest, self).setUp()
365+ return result.addCallback(set_up)
366+
367+ def test_store_messages(self):
368+ """
369+ L{FakeGlobalReporter} stores messages which are sent.
370+ """
371+ message_store = self.broker_service.message_store
372+ message_store.set_accepted_types(["package-reporter-result"])
373+ self.reporter.smart_update_filename = self.makeFile(
374+ "#!/bin/sh\necho -n error >&2\necho -n output\nexit 0")
375+ os.chmod(self.reporter.smart_update_filename, 0755)
376+ deferred = Deferred()
377+
378+ def do_test():
379+ result = self.reporter.run_smart_update()
380+
381+ def callback(ignore):
382+ message = {"type": "package-reporter-result",
383+ "code": 0, "err": u"error"}
384+ self.assertMessages(
385+ message_store.get_pending_messages(), [message])
386+ stored = list(self.store._db.execute(
387+ "SELECT id, data FROM message").fetchall())
388+ self.assertEqual(1, len(stored))
389+ self.assertEqual(1, stored[0][0])
390+ self.assertEqual(message, bpickle.loads(str(stored[0][1])))
391+ result.addCallback(callback)
392+ result.chainDeferred(deferred)
393+
394+ reactor.callWhenRunning(do_test)
395+ return deferred
396+
397+
398+class FakePackageReporterTest(LandscapeTest):
399+
400+ helpers = [EnvironSaverHelper, SmartFacadeHelper, BrokerServiceHelper]
401+
402+ def setUp(self):
403+
404+ def set_up(ignored):
405+ self.store = FakePackageStore(self.makeFile())
406+ global_file = self.makeFile()
407+ self.global_store = FakePackageStore(global_file)
408+ os.environ["FAKE_PACKAGE_STORE"] = global_file
409+ self.config = PackageReporterConfiguration()
410+ self.reporter = FakeReporter(
411+ self.store, self.facade, self.remote, self.config)
412+ self.config.data_path = self.makeDir()
413+ os.mkdir(self.config.package_directory)
414+
415+ result = super(FakePackageReporterTest, self).setUp()
416+ return result.addCallback(set_up)
417+
418+ def test_send_messages(self):
419+ """
420+ L{FakeReporter} sends messages stored in the global store specified by
421+ C{FAKE_PACKAGE_STORE}.
422+ """
423+ message_store = self.broker_service.message_store
424+ message_store.set_accepted_types(["package-reporter-result"])
425+ message = {"type": "package-reporter-result",
426+ "code": 0, "err": u"error"}
427+ self.global_store.save_message(message)
428+
429+ def check(ignore):
430+ self.assertMessages(
431+ message_store.get_pending_messages(), [message])
432+ stored = list(self.store._db.execute(
433+ "SELECT id FROM message").fetchall())
434+ self.assertEqual(1, len(stored))
435+ self.assertEqual(1, stored[0][0])
436+
437+ deferred = self.reporter.run()
438+ deferred.addCallback(check)
439+ return deferred
440+
441+ def test_filter_message_type(self):
442+ """
443+ L{FakeReporter} only sends one message of each type per run.
444+ """
445+ message_store = self.broker_service.message_store
446+ message_store.set_accepted_types(["package-reporter-result"])
447+ message1 = {"type": "package-reporter-result",
448+ "code": 0, "err": u"error"}
449+ self.global_store.save_message(message1)
450+ message2 = {"type": "package-reporter-result",
451+ "code": 1, "err": u"error"}
452+ self.global_store.save_message(message2)
453+
454+ def check1(ignore):
455+ self.assertMessages(
456+ message_store.get_pending_messages(), [message1])
457+ stored = list(self.store._db.execute(
458+ "SELECT id FROM message").fetchall())
459+ self.assertEqual(1, stored[0][0])
460+ return self.reporter.run().addCallback(check2)
461+
462+ def check2(ignore):
463+ self.assertMessages(
464+ message_store.get_pending_messages(), [message1, message2])
465+ stored = list(self.store._db.execute(
466+ "SELECT id FROM message").fetchall())
467+ self.assertEqual(2, len(stored))
468+ self.assertEqual(1, stored[0][0])
469+ self.assertEqual(2, stored[1][0])
470+
471+ return self.reporter.run().addCallback(check1)
472+
473+
474 class EqualsHashes(object):
475
476 def __init__(self, *hashes):

Subscribers

People subscribed via source and target branches

to all changes: