Merge lp:~therve/landscape-client/fake-package-reporter into lp:~landscape/landscape-client/trunk
- fake-package-reporter
- Merge into trunk
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Frank Wierzbicki | ||||
Approved revision: | 365 | ||||
Merged at revision: | 360 | ||||
Proposed branch: | lp:~therve/landscape-client/fake-package-reporter | ||||
Merge into: | lp:~landscape/landscape-client/trunk | ||||
Diff against target: |
476 lines (+277/-24) 5 files modified
landscape/broker/store.py (+15/-14) landscape/package/reporter.py (+83/-6) landscape/package/store.py (+47/-0) landscape/package/taskhandler.py (+2/-1) landscape/package/tests/test_reporter.py (+130/-3) |
||||
To merge this branch: | bzr merge lp:~therve/landscape-client/fake-package-reporter | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Frank Wierzbicki (community) | Approve | ||
Free Ekanayaka (community) | Approve | ||
Review via email:
|
Commit message
Description of the change
The branch introduces 2 reporters: one named global or "master", which operates as a normal reporter but stores the messages sent. The other has a pointer to that store, and send those messages instead of generating new ones using smart. I used env variables to instrumentate it, which is not great but simple. Using different plugins means creating new modules for each one, and new scripts.
- 364. By Thomas Herve
-
Move SQL in the store
- 365. By Thomas Herve
-
Implement run instead
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Thomas Herve (therve) wrote : | # |
[1] This is for load testing, so I don't think we need to go out of our way to test this...
[2] The fake clients make start a bit before the master one, so I want to make sure it doesn't crash.
[3] Yep done.
[4] Moved.
[6] Fixed.
Thanks!
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Free Ekanayaka (free.ekanayaka) wrote : | # |
Tahnks, +1!
[7]
+ result.
Maybe this can just be
and handle_task() can be dropped.
[8]
+ def __init__(self, package_store, package_facade, remote_broker, config):
+ super(FakeGloba
+ remote_broker, config)
Do we need this?
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Frank Wierzbicki (fwierzbicki) wrote : | # |
I can't add anything to Free's comments - looks good to me +1!
Preview Diff
1 | === modified file 'landscape/broker/store.py' |
2 | --- landscape/broker/store.py 2011-07-21 23:55:47 +0000 |
3 | +++ landscape/broker/store.py 2011-08-19 14:11:42 +0000 |
4 | @@ -43,9 +43,7 @@ |
5 | api = SERVER_API |
6 | |
7 | def __init__(self, persist, directory, directory_size=1000, |
8 | - monitor_interval=60*60, get_time=time.time): |
9 | - """ |
10 | - """ |
11 | + monitor_interval=60 * 60, get_time=time.time): |
12 | self._get_time = get_time |
13 | self._directory = directory |
14 | self._directory_size = directory_size |
15 | @@ -82,8 +80,8 @@ |
16 | def get_sequence(self): |
17 | """Get the current sequence. |
18 | |
19 | - @return: The sequence number of the message that the server expects us to |
20 | - send on the next exchange. |
21 | + @return: The sequence number of the message that the server expects us |
22 | + to send on the next exchange. |
23 | """ |
24 | return self._persist.get("sequence", 0) |
25 | |
26 | @@ -98,8 +96,8 @@ |
27 | def get_server_sequence(self): |
28 | """Get the current server sequence. |
29 | |
30 | - @return: the sequence number of the message that we will ask the server to |
31 | - send to us on the next exchange. |
32 | + @return: the sequence number of the message that we will ask the server |
33 | + to send to us on the next exchange. |
34 | """ |
35 | return self._persist.get("server_sequence", 0) |
36 | |
37 | @@ -161,7 +159,7 @@ |
38 | |
39 | def delete_old_messages(self): |
40 | """Delete messages which are unlikely to be needed in the future.""" |
41 | - for fn in itertools.islice(self._walk_messages(exclude=HELD+BROKEN), |
42 | + for fn in itertools.islice(self._walk_messages(exclude=HELD + BROKEN), |
43 | self.get_pending_offset()): |
44 | os.unlink(fn) |
45 | containing_dir = os.path.split(fn)[0] |
46 | @@ -256,7 +254,8 @@ |
47 | def _walk_pending_messages(self): |
48 | """Walk the files which are definitely pending.""" |
49 | pending_offset = self.get_pending_offset() |
50 | - for i, filename in enumerate(self._walk_messages(exclude=HELD+BROKEN)): |
51 | + for i, filename in enumerate(self._walk_messages(exclude=HELD + |
52 | + BROKEN)): |
53 | if i >= pending_offset: |
54 | yield filename |
55 | |
56 | @@ -308,10 +307,10 @@ |
57 | if accepted: |
58 | new_filename = self._get_next_message_filename() |
59 | os.rename(old_filename, new_filename) |
60 | - self._set_flags(new_filename, set(flags)-set(HELD)) |
61 | + self._set_flags(new_filename, set(flags) - set(HELD)) |
62 | else: |
63 | if not accepted and offset >= pending_offset: |
64 | - self._set_flags(old_filename, set(flags)|set(HELD)) |
65 | + self._set_flags(old_filename, set(flags) | set(HELD)) |
66 | offset += 1 |
67 | |
68 | def _get_flags(self, path): |
69 | @@ -324,16 +323,18 @@ |
70 | dirname, basename = os.path.split(path) |
71 | new_path = os.path.join(dirname, basename.split("_")[0]) |
72 | if flags: |
73 | - new_path += "_"+"".join(sorted(set(flags))) |
74 | + new_path += "_" + "".join(sorted(set(flags))) |
75 | os.rename(path, new_path) |
76 | return new_path |
77 | |
78 | def _add_flags(self, path, flags): |
79 | - self._set_flags(path, self._get_flags(path)+flags) |
80 | + self._set_flags(path, self._get_flags(path) + flags) |
81 | |
82 | |
83 | def get_default_message_store(*args, **kwargs): |
84 | - """Get a L{MessageStore} object with all Landscape message schemas added.""" |
85 | + """ |
86 | + Get a L{MessageStore} object with all Landscape message schemas added. |
87 | + """ |
88 | from landscape. message_schemas import message_schemas |
89 | store = MessageStore(*args, **kwargs) |
90 | for schema in message_schemas.values(): |
91 | |
92 | === modified file 'landscape/package/reporter.py' |
93 | --- landscape/package/reporter.py 2011-08-10 14:56:43 +0000 |
94 | +++ landscape/package/reporter.py 2011-08-19 14:11:42 +0000 |
95 | @@ -10,10 +10,11 @@ |
96 | from landscape.lib.twisted_util import gather_results, spawn_process |
97 | from landscape.lib.fetch import fetch_async |
98 | from landscape.lib.fs import touch_file |
99 | +from landscape.lib import bpickle |
100 | |
101 | from landscape.package.taskhandler import ( |
102 | PackageTaskHandlerConfiguration, PackageTaskHandler, run_task_handler) |
103 | -from landscape.package.store import UnknownHashIDRequest |
104 | +from landscape.package.store import UnknownHashIDRequest, FakePackageStore |
105 | |
106 | |
107 | HASH_ID_REQUEST_TIMEOUT = 7200 |
108 | @@ -79,6 +80,9 @@ |
109 | result.callback(None) |
110 | return result |
111 | |
112 | + def send_message(self, message): |
113 | + return self._broker.send_message(message, True) |
114 | + |
115 | def fetch_hash_id_db(self): |
116 | """ |
117 | Fetch the appropriate pre-canned database of hash=>id mappings |
118 | @@ -219,7 +223,7 @@ |
119 | "type": "package-reporter-result", |
120 | "code": code, |
121 | "err": err} |
122 | - return self._broker.send_message(message, True) |
123 | + return self.send_message(message) |
124 | |
125 | def handle_task(self, task): |
126 | message = task.data |
127 | @@ -389,7 +393,7 @@ |
128 | """Create a hash_id_request and send message with "request-id".""" |
129 | request = self._store.add_hash_id_request(unknown_hashes) |
130 | message["request-id"] = request.id |
131 | - result = self._broker.send_message(message, True) |
132 | + result = self.send_message(message) |
133 | |
134 | def set_message_id(message_id): |
135 | request.message_id = message_id |
136 | @@ -530,7 +534,7 @@ |
137 | return succeed(False) |
138 | |
139 | message["type"] = "packages" |
140 | - result = self._broker.send_message(message, True) |
141 | + result = self.send_message(message) |
142 | |
143 | logging.info("Queuing message with changes in known packages: " |
144 | "%d installed, %d available, %d available upgrades, " |
145 | @@ -595,7 +599,7 @@ |
146 | return succeed(False) |
147 | |
148 | message["type"] = "package-locks" |
149 | - result = self._broker.send_message(message, True) |
150 | + result = self.send_message(message) |
151 | |
152 | logging.info("Queuing message with changes in known package locks:" |
153 | " %d created, %d deleted." % |
154 | @@ -613,8 +617,81 @@ |
155 | return result |
156 | |
157 | |
158 | +class FakeGlobalReporter(PackageReporter): |
159 | + """ |
160 | + A standard reporter, which additionally stores messages sent into its |
161 | + package store. |
162 | + """ |
163 | + |
164 | + package_store_class = FakePackageStore |
165 | + |
166 | + def __init__(self, package_store, package_facade, remote_broker, config): |
167 | + super(FakeGlobalReporter, self).__init__(package_store, package_facade, |
168 | + remote_broker, config) |
169 | + |
170 | + def send_message(self, message): |
171 | + self._store.save_message(message) |
172 | + return super(FakeGlobalReporter, self).send_message(message) |
173 | + |
174 | + |
175 | +class FakeReporter(PackageReporter): |
176 | + """ |
177 | + A fake reporter which only sends messages previously stored by a |
178 | + L{FakeGlobalReporter}. |
179 | + """ |
180 | + |
181 | + package_store_class = FakePackageStore |
182 | + |
183 | + def run(self): |
184 | + result = succeed(None) |
185 | + |
186 | + # If the appropriate hash=>id db is not there, fetch it |
187 | + result.addCallback(lambda x: self.fetch_hash_id_db()) |
188 | + |
189 | + result.addCallback(lambda x: self.handle_tasks()) |
190 | + |
191 | + # Finally, verify if we have anything new to send to the server. |
192 | + result.addCallback(lambda x: self.send_pending_messages()) |
193 | + |
194 | + return result |
195 | + |
196 | + def handle_task(self, task): |
197 | + return succeed(None) |
198 | + |
199 | + def send_pending_messages(self): |
200 | + """ |
201 | + As the last callback of L{PackageReporter}, sends messages stored. |
202 | + """ |
203 | + global_file = os.environ["FAKE_PACKAGE_STORE"] |
204 | + if not os.path.exists(global_file): |
205 | + return succeed(None) |
206 | + message_sent = set(self._store.get_message_ids()) |
207 | + global_store = FakePackageStore(global_file) |
208 | + all_message_ids = set(global_store.get_message_ids()) |
209 | + not_sent = all_message_ids - message_sent |
210 | + deferred = succeed(None) |
211 | + got_type = set() |
212 | + if not_sent: |
213 | + messages = global_store.get_messages_by_ids(not_sent) |
214 | + sent = [] |
215 | + for message_id, message in messages: |
216 | + message = bpickle.loads(str(message)) |
217 | + if message["type"] not in got_type: |
218 | + got_type.add(message["type"]) |
219 | + sent.append(message_id) |
220 | + deferred.addCallback( |
221 | + lambda x, message=message: self.send_message(message)) |
222 | + self._store.save_message_ids(sent) |
223 | + return deferred |
224 | + |
225 | + |
226 | def main(args): |
227 | - return run_task_handler(PackageReporter, args) |
228 | + if "FAKE_PACKAGE_STORE" in os.environ: |
229 | + return run_task_handler(FakeReporter, args) |
230 | + elif "FAKE_GLOBAL_PACKAGE_STORE" in os.environ: |
231 | + return run_task_handler(FakeGlobalReporter, args) |
232 | + else: |
233 | + return run_task_handler(PackageReporter, args) |
234 | |
235 | |
236 | def find_reporter_command(): |
237 | |
238 | === modified file 'landscape/package/store.py' |
239 | --- landscape/package/store.py 2011-07-21 23:55:47 +0000 |
240 | +++ landscape/package/store.py 2011-08-19 14:11:42 +0000 |
241 | @@ -329,6 +329,40 @@ |
242 | ",".join([str(task.id) for task in except_tasks])) |
243 | |
244 | |
245 | +class FakePackageStore(PackageStore): |
246 | + """ |
247 | + A L{PackageStore} with an additional message table to store sent messages. |
248 | + """ |
249 | + |
250 | + def _ensure_schema(self): |
251 | + super(FakePackageStore, self)._ensure_schema() |
252 | + ensure_fake_package_schema(self._db) |
253 | + |
254 | + @with_cursor |
255 | + def save_message(self, cursor, message): |
256 | + cursor.execute("INSERT INTO message (data) VALUES (?)", |
257 | + (buffer(bpickle.dumps(message)),)) |
258 | + |
259 | + @with_cursor |
260 | + def get_message_ids(self, cursor): |
261 | + return [row[0] for row in |
262 | + cursor.execute("SELECT id FROM message").fetchall()] |
263 | + |
264 | + @with_cursor |
265 | + def save_message_ids(self, cursor, message_ids): |
266 | + cursor.executemany( |
267 | + "INSERT INTO message (id) VALUES (?)", |
268 | + [(message_id,) for message_id in message_ids]) |
269 | + |
270 | + @with_cursor |
271 | + def get_messages_by_ids(self, cursor, message_ids): |
272 | + params = ", ".join(["?"] * len(message_ids)) |
273 | + result = cursor.execute( |
274 | + "SELECT id, data FROM message WHERE id IN (%s) " |
275 | + "ORDER BY id" % params, tuple(message_ids)).fetchall() |
276 | + return [(row[0], row[1]) for row in result] |
277 | + |
278 | + |
279 | class HashIDRequest(object): |
280 | |
281 | def __init__(self, db, id): |
282 | @@ -447,3 +481,16 @@ |
283 | else: |
284 | cursor.close() |
285 | db.commit() |
286 | + |
287 | + |
288 | +def ensure_fake_package_schema(db): |
289 | + cursor = db.cursor() |
290 | + try: |
291 | + cursor.execute("CREATE TABLE message" |
292 | + " (id INTEGER PRIMARY KEY, data BLOB)") |
293 | + except (sqlite3.OperationalError, sqlite3.DatabaseError): |
294 | + cursor.close() |
295 | + db.rollback() |
296 | + else: |
297 | + cursor.close() |
298 | + db.commit() |
299 | |
300 | === modified file 'landscape/package/taskhandler.py' |
301 | --- landscape/package/taskhandler.py 2011-06-13 23:58:09 +0000 |
302 | +++ landscape/package/taskhandler.py 2011-08-19 14:11:42 +0000 |
303 | @@ -85,6 +85,7 @@ |
304 | |
305 | queue_name = "default" |
306 | lsb_release_filename = LSB_RELEASE_FILENAME |
307 | + package_store_class = PackageStore |
308 | |
309 | def __init__(self, package_store, package_facade, remote_broker, config): |
310 | self._store = package_store |
311 | @@ -261,7 +262,7 @@ |
312 | # import Smart unless we need to. |
313 | from landscape.package.facade import SmartFacade |
314 | |
315 | - package_store = PackageStore(config.store_filename) |
316 | + package_store = cls.package_store_class(config.store_filename) |
317 | package_facade = SmartFacade() |
318 | |
319 | def finish(): |
320 | |
321 | === modified file 'landscape/package/tests/test_reporter.py' |
322 | --- landscape/package/tests/test_reporter.py 2011-08-10 14:56:43 +0000 |
323 | +++ landscape/package/tests/test_reporter.py 2011-08-19 14:11:42 +0000 |
324 | @@ -8,15 +8,18 @@ |
325 | from twisted.internet import reactor |
326 | |
327 | from landscape.lib.fetch import fetch_async, FetchError |
328 | -from landscape.package.store import PackageStore, UnknownHashIDRequest |
329 | +from landscape.lib import bpickle |
330 | +from landscape.package.store import ( |
331 | + PackageStore, UnknownHashIDRequest, FakePackageStore) |
332 | from landscape.package.reporter import ( |
333 | PackageReporter, HASH_ID_REQUEST_TIMEOUT, main, find_reporter_command, |
334 | - PackageReporterConfiguration) |
335 | + PackageReporterConfiguration, FakeGlobalReporter, FakeReporter) |
336 | from landscape.package import reporter |
337 | from landscape.package.facade import SmartFacade |
338 | from landscape.package.tests.helpers import ( |
339 | SmartFacadeHelper, HASH1, HASH2, HASH3) |
340 | -from landscape.tests.helpers import LandscapeTest, BrokerServiceHelper |
341 | +from landscape.tests.helpers import ( |
342 | + LandscapeTest, BrokerServiceHelper, EnvironSaverHelper) |
343 | from landscape.tests.mocker import ANY |
344 | |
345 | SAMPLE_LSB_RELEASE = "DISTRIB_CODENAME=codename\n" |
346 | @@ -1589,6 +1592,130 @@ |
347 | return deferred |
348 | |
349 | |
350 | +class GlobalPackageReporterTest(LandscapeTest): |
351 | + |
352 | + helpers = [SmartFacadeHelper, BrokerServiceHelper] |
353 | + |
354 | + def setUp(self): |
355 | + |
356 | + def set_up(ignored): |
357 | + self.store = FakePackageStore(self.makeFile()) |
358 | + self.config = PackageReporterConfiguration() |
359 | + self.reporter = FakeGlobalReporter( |
360 | + self.store, self.facade, self.remote, self.config) |
361 | + self.config.data_path = self.makeDir() |
362 | + os.mkdir(self.config.package_directory) |
363 | + |
364 | + result = super(GlobalPackageReporterTest, self).setUp() |
365 | + return result.addCallback(set_up) |
366 | + |
367 | + def test_store_messages(self): |
368 | + """ |
369 | + L{FakeGlobalReporter} stores messages which are sent. |
370 | + """ |
371 | + message_store = self.broker_service.message_store |
372 | + message_store.set_accepted_types(["package-reporter-result"]) |
373 | + self.reporter.smart_update_filename = self.makeFile( |
374 | + "#!/bin/sh\necho -n error >&2\necho -n output\nexit 0") |
375 | + os.chmod(self.reporter.smart_update_filename, 0755) |
376 | + deferred = Deferred() |
377 | + |
378 | + def do_test(): |
379 | + result = self.reporter.run_smart_update() |
380 | + |
381 | + def callback(ignore): |
382 | + message = {"type": "package-reporter-result", |
383 | + "code": 0, "err": u"error"} |
384 | + self.assertMessages( |
385 | + message_store.get_pending_messages(), [message]) |
386 | + stored = list(self.store._db.execute( |
387 | + "SELECT id, data FROM message").fetchall()) |
388 | + self.assertEqual(1, len(stored)) |
389 | + self.assertEqual(1, stored[0][0]) |
390 | + self.assertEqual(message, bpickle.loads(str(stored[0][1]))) |
391 | + result.addCallback(callback) |
392 | + result.chainDeferred(deferred) |
393 | + |
394 | + reactor.callWhenRunning(do_test) |
395 | + return deferred |
396 | + |
397 | + |
398 | +class FakePackageReporterTest(LandscapeTest): |
399 | + |
400 | + helpers = [EnvironSaverHelper, SmartFacadeHelper, BrokerServiceHelper] |
401 | + |
402 | + def setUp(self): |
403 | + |
404 | + def set_up(ignored): |
405 | + self.store = FakePackageStore(self.makeFile()) |
406 | + global_file = self.makeFile() |
407 | + self.global_store = FakePackageStore(global_file) |
408 | + os.environ["FAKE_PACKAGE_STORE"] = global_file |
409 | + self.config = PackageReporterConfiguration() |
410 | + self.reporter = FakeReporter( |
411 | + self.store, self.facade, self.remote, self.config) |
412 | + self.config.data_path = self.makeDir() |
413 | + os.mkdir(self.config.package_directory) |
414 | + |
415 | + result = super(FakePackageReporterTest, self).setUp() |
416 | + return result.addCallback(set_up) |
417 | + |
418 | + def test_send_messages(self): |
419 | + """ |
420 | + L{FakeReporter} sends messages stored in the global store specified by |
421 | + C{FAKE_PACKAGE_STORE}. |
422 | + """ |
423 | + message_store = self.broker_service.message_store |
424 | + message_store.set_accepted_types(["package-reporter-result"]) |
425 | + message = {"type": "package-reporter-result", |
426 | + "code": 0, "err": u"error"} |
427 | + self.global_store.save_message(message) |
428 | + |
429 | + def check(ignore): |
430 | + self.assertMessages( |
431 | + message_store.get_pending_messages(), [message]) |
432 | + stored = list(self.store._db.execute( |
433 | + "SELECT id FROM message").fetchall()) |
434 | + self.assertEqual(1, len(stored)) |
435 | + self.assertEqual(1, stored[0][0]) |
436 | + |
437 | + deferred = self.reporter.run() |
438 | + deferred.addCallback(check) |
439 | + return deferred |
440 | + |
441 | + def test_filter_message_type(self): |
442 | + """ |
443 | + L{FakeReporter} only sends one message of each type per run. |
444 | + """ |
445 | + message_store = self.broker_service.message_store |
446 | + message_store.set_accepted_types(["package-reporter-result"]) |
447 | + message1 = {"type": "package-reporter-result", |
448 | + "code": 0, "err": u"error"} |
449 | + self.global_store.save_message(message1) |
450 | + message2 = {"type": "package-reporter-result", |
451 | + "code": 1, "err": u"error"} |
452 | + self.global_store.save_message(message2) |
453 | + |
454 | + def check1(ignore): |
455 | + self.assertMessages( |
456 | + message_store.get_pending_messages(), [message1]) |
457 | + stored = list(self.store._db.execute( |
458 | + "SELECT id FROM message").fetchall()) |
459 | + self.assertEqual(1, stored[0][0]) |
460 | + return self.reporter.run().addCallback(check2) |
461 | + |
462 | + def check2(ignore): |
463 | + self.assertMessages( |
464 | + message_store.get_pending_messages(), [message1, message2]) |
465 | + stored = list(self.store._db.execute( |
466 | + "SELECT id FROM message").fetchall()) |
467 | + self.assertEqual(2, len(stored)) |
468 | + self.assertEqual(1, stored[0][0]) |
469 | + self.assertEqual(2, stored[1][0]) |
470 | + |
471 | + return self.reporter.run().addCallback(check1) |
472 | + |
473 | + |
474 | class EqualsHashes(object): |
475 | |
476 | def __init__(self, *hashes): |
Looks good! I have a few comments though.
[1]
+class FakePackageStor e(PackageStore) :
It'd be good to add direct tests for this class.
[2]
+ if not os.path. exists( global_ store):
+ return succeed(None)
Why is this needed?
[3]
+ global_db = sqlite3. connect( global_ store) execute( "SELECT id FROM message" ).fetchall( ))
+ cursor = global_db.cursor()
+ all_message_ids = set(
+ row[0] for row in
+ cursor.
Can't FakePackageStor e.get_message_ ids be used instead?
[4]
+ messages = list( sent)). fetchall( ))
+ (row[0], row[1]) for row in
+ cursor.execute(
+ "SELECT id, data FROM message WHERE id IN (%s) "
+ "ORDER BY id" % params, tuple(not_
It'd be nice to encapsulate this if FakePackageStore, and test it separately.
[5]
+ lambda x, message=message: send_message( message, True))
+ self._broker.
Ah, nice trick to get the message variable in the closure.
[6]
+ def send_message(self, message): id_db(self) : unknown_ hashes( self): expired_ hash_id_ requests( self):
+ return succeed(None)
+
+ def use_hash_
+ return succeed(None)
+
+ def request_
+ return succeed(None)
+
+ def remove_
+ return succeed(None)
+
+ def handle_task(self, task):
+ return succeed(None)
+
Instead of this, why not override the run() method directly? It would avoid having to modify the fake reporter in case we add methods to the real one.