Merge lp:~al-maisan/landscape-client/obsolete-results-v into lp:~landscape/landscape-client/trunk
- obsolete-results-v
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Thomas Herve |
Approved revision: | 261 |
Merged at revision: | not available |
Proposed branch: | lp:~al-maisan/landscape-client/obsolete-results-v |
Merge into: | lp:~landscape/landscape-client/trunk |
Diff against target: |
658 lines (+373/-45) 9 files modified
landscape/broker/config.py (+4/-0) landscape/broker/exchange.py (+42/-1) landscape/broker/exchangestore.py (+110/-0) landscape/broker/service.py (+4/-1) landscape/broker/tests/helpers.py (+5/-1) landscape/broker/tests/test_exchange.py (+84/-9) landscape/broker/tests/test_exchangestore.py (+84/-0) landscape/lib/store.py (+39/-0) landscape/package/store.py (+1/-33) |
To merge this branch: | bzr merge lp:~al-maisan/landscape-client/obsolete-results-v |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Thomas Herve (community) | Approve | ||
Free Ekanayaka (community) | Approve | ||
Review via email: mp+24405@code.launchpad.net |
Commit message
Description of the change
Obsolete response messages will now be discarded as opposed to being sent to the server.
A response message is considered obsolete if the secure ID has changed since the request message was received.
- 244. By Muharem Hrnjadovic
-
bzr ls-lint fixes
- 245. By Muharem Hrnjadovic
-
Simplified test.
- 246. By Muharem Hrnjadovic
-
Fixed doc string.
- 247. By Muharem Hrnjadovic
-
Simplified obsolete response logic.
- 248. By Muharem Hrnjadovic
-
Free's review comments [1-5]
- 249. By Muharem Hrnjadovic
-
Free's review comment #6
- 250. By Muharem Hrnjadovic
-
Free's review comment #7
- 251. By Muharem Hrnjadovic
-
Free's review comment #7
- 252. By Muharem Hrnjadovic
-
Free's review comment #8
Thomas Herve (therve) wrote : | # |
It looks good!
[1] As it, the message context store will grow for ever. I think you should remove messages from it as soon as you add it to the message store (or discard it).
[2]
+ def _message_
+ """True if message is obsolete.
Format "Returns C{True} if message is obsolete."
[3]
+class MessageContext(
+ """Stores the secure ID for incoming messages that require a response.
It seems to store more than that now.
[4]
+ """Accessing a C{MessageContext} with an existing
+ C{operation-id} works."""
+ """Calling C{all_operation
+ list."""
Please put the docstring on their own lines.
Thanks!
- 253. By Muharem Hrnjadovic
-
Thomas' review comment #1
- 254. By Muharem Hrnjadovic
-
Thomas' review comment #2
- 255. By Muharem Hrnjadovic
-
Thomas' review comment #3
- 256. By Muharem Hrnjadovic
-
Thomas' review comment #4
Muharem Hrnjadovic (al-maisan) wrote : | # |
On 04/29/2010 05:31 PM, Free Ekanayaka wrote:
> Review: Needs Fixing
Hello Free,
I addressed all of your comments below, please review the changes. An
incremental diff is attached to this message for your convenience.
> Very nice work, just a few comments but overall the branch looks great to me.
>
> [1]
>
> + def __init__(self, db, id):
>
> Please add @param entries in the class docstring for these two parameters.
>
> [2]
>
> + @param filename: The file that contains the sqlite database.
>
> Jamu recently pointed that @param docstrings for __init__ method should be actually put in the class docstring, like:
>
> class ExchangeStore(
> """Message meta data required by the L{MessageExchange}.
>
> The implementation uses a SQLite database as backend, with a single table
> called "message_context", whose schema is defined in
> L{ensure_
>
> @param filename: The file that contains the sqlite database.
> """
>
> because the callable that gets actually used is the class itself (you usually don't call __init__ directly).
>
> [3]
>
> For consistency with other tests and for easier reading please name test cases against the tests the exercise, plus some test specific suffix. A few examples:
>
> test_add_
> test_get_
> test_deleting_
>
> [4]
>
> + self._store = ExchangeStore(
>
> It would be better to use dependency injection (that is usually the pattern adopted in the client) and pass an instance of ExchangeStore instead of creating it in the __init__ method. This helps testing and mocking, and make it easier in general to pass objects of other types providing the same API.
>
> For example your test case will then have a self.exchange_store attribute (that you use in to build the MessageExchange object under test) which you can use for performing assertions instead of accessing self.exchanger.
>
> [5]
>
> + return 0
>
> Maybe None would be safer, as 0 could be used as message id number.
>
> [6]
>
> + self.assertIsNot(
> + None,
> + self.exchanger.
>
> I think the test should also check that the created MessageContext has the expected attributes set:
>
> + message_context = self.exchanger.
> + self.assertEqua
> + self.assertEqua
>
> [7]
>
> + self.exchanger.
>
> Assuming that send() returns None in this case, the test should assert it:
>
> + message_id = self.exchanger.
> + self.assertIs(
>
> [8]
>
> + logging.info(
> + "Response message with operation-id %s was discarded "
> + ...
1 | === modified file 'landscape/broker/exchange.py' |
2 | --- landscape/broker/exchange.py 2010-04-29 10:59:44 +0000 |
3 | +++ landscape/broker/exchange.py 2010-04-29 15:57:29 +0000 |
4 | @@ -1,12 +1,10 @@ |
5 | """The part of the broker which deals with communications with the server.""" |
6 | -import os |
7 | import time |
8 | import logging |
9 | from landscape.lib.hashlib import md5 |
10 | |
11 | from twisted.internet.defer import succeed |
12 | |
13 | -from landscape.broker.exchangestore import ExchangeStore |
14 | from landscape.lib.message import got_next_expected, ANCIENT |
15 | from landscape.log import format_delta |
16 | from landscape import SERVER_API, CLIENT_API |
17 | @@ -23,7 +21,8 @@ |
18 | |
19 | plugin_name = "message-exchange" |
20 | |
21 | - def __init__(self, reactor, store, transport, registration_info, data_path, |
22 | + def __init__(self, reactor, store, transport, registration_info, |
23 | + message_context_store, |
24 | exchange_interval=60*60, |
25 | urgent_exchange_interval=10, |
26 | monitor_interval=None, |
27 | @@ -54,8 +53,7 @@ |
28 | self._client_accepted_types = set() |
29 | self._client_accepted_types_hash = None |
30 | self._message_handlers = {} |
31 | - self._store = ExchangeStore( |
32 | - os.path.join(data_path, "exchange.database")) |
33 | + self._store = message_context_store |
34 | |
35 | self.register_message("accepted-types", self._handle_accepted_types) |
36 | self.register_message("resynchronize", self._handle_resynchronize) |
37 | @@ -101,7 +99,7 @@ |
38 | "Response message with operation-id %s was discarded " |
39 | "because the client's secure ID has changed in the meantime" |
40 | % message.get('operation-id')) |
41 | - return 0 |
42 | + return None |
43 | else: |
44 | if "timestamp" not in message: |
45 | message["timestamp"] = int(self._reactor.time()) |
46 | |
47 | === modified file 'landscape/broker/exchangestore.py' |
48 | --- landscape/broker/exchangestore.py 2010-04-29 10:57:52 +0000 |
49 | +++ landscape/broker/exchangestore.py 2010-04-29 15:57:29 +0000 |
50 | @@ -16,6 +16,9 @@ |
51 | which the request message came in and the completion of the request. |
52 | If the secure ID did change the result message is obolete and will not be |
53 | sent to the server. |
54 | + |
55 | + @param db: the sqlite database handle. |
56 | + @param id: the database key value for this instance. |
57 | """ |
58 | |
59 | def __init__(self, db, id): |
60 | @@ -47,13 +50,12 @@ |
61 | The implementation uses a SQLite database as backend, with a single table |
62 | called "message_context", whose schema is defined in |
63 | L{ensure_exchange_schema}. |
64 | + |
65 | + @param filename: The name of the file that contains the sqlite database. |
66 | """ |
67 | _db = None |
68 | |
69 | def __init__(self, filename): |
70 | - """ |
71 | - @param filename: The file that contains the sqlite database. |
72 | - """ |
73 | self._filename = filename |
74 | |
75 | def _ensure_schema(self): |
76 | |
77 | === modified file 'landscape/broker/service.py' |
78 | --- landscape/broker/service.py 2010-04-29 09:02:57 +0000 |
79 | +++ landscape/broker/service.py 2010-04-29 15:57:29 +0000 |
80 | @@ -7,6 +7,7 @@ |
81 | from landscape.broker.config import BrokerConfiguration |
82 | from landscape.broker.transport import HTTPTransport |
83 | from landscape.broker.exchange import MessageExchange |
84 | +from landscape.broker.exchangestore import ExchangeStore |
85 | from landscape.broker.ping import Pinger |
86 | from landscape.broker.store import get_default_message_store |
87 | from landscape.broker.server import BrokerServer |
88 | @@ -52,9 +53,11 @@ |
89 | self.message_store = get_default_message_store( |
90 | self.persist, config.message_store_path) |
91 | self.identity = Identity(self.config, self.persist) |
92 | + exchange_store = ExchangeStore( |
93 | + os.path.join(config.data_path, "exchange.database")) |
94 | self.exchanger = MessageExchange( |
95 | self.reactor, self.message_store, self.transport, self.identity, |
96 | - config.data_path, config.exchange_interval, |
97 | + exchange_store, config.exchange_interval, |
98 | config.urgent_exchange_interval) |
99 | self.pinger = self.pinger_factory(self.reactor, config.ping_url, |
100 | self.identity, self.exchanger) |
101 | |
102 | === modified file 'landscape/broker/tests/helpers.py' |
103 | --- landscape/broker/tests/helpers.py 2010-04-29 09:02:57 +0000 |
104 | +++ landscape/broker/tests/helpers.py 2010-04-29 15:57:29 +0000 |
105 | @@ -6,6 +6,7 @@ |
106 | from landscape.reactor import FakeReactor |
107 | from landscape.broker.transport import FakeTransport |
108 | from landscape.broker.exchange import MessageExchange |
109 | +from landscape.broker.exchangestore import ExchangeStore |
110 | from landscape.broker.store import get_default_message_store |
111 | from landscape.broker.registration import Identity, RegistrationHandler |
112 | from landscape.broker.ping import Pinger |
113 | @@ -74,9 +75,11 @@ |
114 | test_case.transport = FakeTransport(test_case.config.url, |
115 | test_case.config.ssl_public_key) |
116 | test_case.reactor = FakeReactor() |
117 | + exchange_store = ExchangeStore( |
118 | + os.path.join(test_case.config.data_path, "exchange.database")) |
119 | test_case.exchanger = MessageExchange( |
120 | test_case.reactor, test_case.mstore, test_case.transport, |
121 | - test_case.identity, test_case.config.data_path, |
122 | + test_case.identity, exchange_store, |
123 | test_case.config.exchange_interval, |
124 | test_case.config.urgent_exchange_interval) |
125 | |
126 | |
127 | === modified file 'landscape/broker/tests/test_exchange.py' |
128 | --- landscape/broker/tests/test_exchange.py 2010-04-29 10:56:27 +0000 |
129 | +++ landscape/broker/tests/test_exchange.py 2010-04-29 16:05:18 +0000 |
130 | @@ -1,9 +1,11 @@ |
131 | +import os |
132 | from landscape import SERVER_API, CLIENT_API |
133 | from landscape.lib.persist import Persist |
134 | from landscape.lib.hashlib import md5 |
135 | from landscape.lib.fetch import fetch_async |
136 | from landscape.schema import Message, Int |
137 | from landscape.broker.exchange import get_accepted_types_diff, MessageExchange |
138 | +from landscape.broker.exchangestore import ExchangeStore |
139 | from landscape.broker.transport import FakeTransport |
140 | from landscape.broker.store import MessageStore |
141 | from landscape.broker.ping import Pinger |
142 | @@ -271,8 +273,10 @@ |
143 | Immediately after registration, an urgent exchange should be scheduled. |
144 | """ |
145 | transport = FakeTransport() |
146 | + exchange_store = ExchangeStore( |
147 | + os.path.join(self.config.data_path, "exchange.database")) |
148 | exchanger = MessageExchange(self.reactor, self.mstore, transport, |
149 | - self.identity, self.config.data_path) |
150 | + self.identity, exchange_store) |
151 | exchanger.start() |
152 | self.wait_for_exchange(urgent=True) |
153 | self.assertEquals(len(transport.payloads), 1) |
154 | @@ -498,8 +502,10 @@ |
155 | the total-messages is equivalent to the total number of messages |
156 | pending. |
157 | """ |
158 | + exchange_store = ExchangeStore( |
159 | + os.path.join(self.config.data_path, "exchange.database")) |
160 | exchanger = MessageExchange(self.reactor, self.mstore, self.transport, |
161 | - self.identity, self.config.data_path, |
162 | + self.identity, exchange_store, |
163 | max_messages=1) |
164 | self.mstore.set_accepted_types(["empty"]) |
165 | self.mstore.add({"type": "empty"}) |
166 | @@ -528,8 +534,10 @@ |
167 | # We create our own MessageExchange because the one set up by the text |
168 | # fixture has an urgent exchange interval of 10 seconds, which makes |
169 | # testing this awkward. |
170 | + exchange_store = ExchangeStore( |
171 | + os.path.join(self.config.data_path, "exchange.database")) |
172 | exchanger = MessageExchange(self.reactor, self.mstore, self.transport, |
173 | - self.identity, self.config.data_path, |
174 | + self.identity, exchange_store, |
175 | urgent_exchange_interval=20) |
176 | exchanger.schedule_exchange(urgent=True) |
177 | events = [] |
178 | @@ -546,8 +554,10 @@ |
179 | should be cancelled and a new one should be scheduled for 10 seconds |
180 | before the new urgent exchange. |
181 | """ |
182 | + exchange_store = ExchangeStore( |
183 | + os.path.join(self.config.data_path, "exchange.database")) |
184 | exchanger = MessageExchange(self.reactor, self.mstore, self.transport, |
185 | - self.identity, self.config.data_path, |
186 | + self.identity, exchange_store, |
187 | urgent_exchange_interval=20) |
188 | events = [] |
189 | self.reactor.call_on("impending-exchange", lambda: events.append(True)) |
190 | @@ -799,6 +809,9 @@ |
191 | self.assertIsNot( |
192 | None, |
193 | self.exchanger._store.get_message_context(message['operation-id'])) |
194 | + message_context = self.exchanger._store.get_message_context(message['operation-id']) |
195 | + self.assertEquals(message_context.operation_id, 123456) |
196 | + self.assertEquals(message_context.message_type, "type-R") |
197 | |
198 | def test_one_way_messages_do_not_have_their_context_stored(self): |
199 | """ |
200 | @@ -833,11 +846,17 @@ |
201 | self.identity.secure_id = 'brand-new' |
202 | |
203 | self.mstore.set_accepted_types(["resynchronize"]) |
204 | - self.exchanger.send({"type": "resynchronize", "operation-id": 234567}) |
205 | + message_id = self.exchanger.send( |
206 | + {"type": "resynchronize", "operation-id": 234567}) |
207 | self.exchanger.exchange() |
208 | self.assertEquals(2, len(self.transport.payloads)) |
209 | messages = self.transport.payloads[1]["messages"] |
210 | self.assertEquals([], messages) |
211 | + self.assertIs(None, message_id) |
212 | + expected_log_entry = ( |
213 | + "Response message with operation-id 234567 was discarded because " |
214 | + "the client's secure ID has changed in the meantime") |
215 | + self.assertTrue(expected_log_entry in self.logfile.getvalue()) |
216 | |
217 | |
218 | class AcceptedTypesMessageExchangeTest(LandscapeTest): |
219 | |
220 | === modified file 'landscape/broker/tests/test_exchangestore.py' |
221 | --- landscape/broker/tests/test_exchangestore.py 2010-04-29 10:30:28 +0000 |
222 | +++ landscape/broker/tests/test_exchangestore.py 2010-04-29 15:57:29 +0000 |
223 | @@ -38,14 +38,14 @@ |
224 | self.assertEquals('change-packages', row[2]) |
225 | self.assertTrue(row[3] > now) |
226 | |
227 | - def test_add_operationid_is_unique(self): |
228 | + def test_add_message_context_with_duplicate_operation_id(self): |
229 | """Only one message context with a given operation-id is permitted.""" |
230 | self.store1.add_message_context(123, 'abc', 'change-packages') |
231 | self.assertRaises( |
232 | sqlite3.IntegrityError, |
233 | self.store1.add_message_context, 123, 'def', 'change-packages') |
234 | |
235 | - def test_get_message_context_works(self): |
236 | + def test_get_message_context(self): |
237 | """Accessing a C{MessageContext} with an existing |
238 | C{operation-id} works.""" |
239 | now = time.time() |
240 | @@ -61,7 +61,7 @@ |
241 | C{operation-id} result in C{None}.""" |
242 | self.assertIs(None, self.store1.get_message_context(999)) |
243 | |
244 | - def test_deleting_message_contexts_works(self): |
245 | + def test_message_context_remove(self): |
246 | """C{MessageContext}s are deleted correctly.""" |
247 | context = self.store1.add_message_context( |
248 | 345, 'opq', 'change-packages') |
Muharem Hrnjadovic (al-maisan) wrote : | # |
On 04/30/2010 09:54 AM, Thomas Herve wrote:
> Review: Needs Fixing
> It looks good!
Hello Thomas,
thank you very much for the review and especially for the first catch below.
I have addressed all of your comments below, please review the changes. An
incremental diff is attached to this message for your convenience.
> [1] As it, the message context store will grow for ever. I think you should remove messages from it as soon as you add it to the message store (or discard it).
>
> [2]
> + def _message_
> + """True if message is obsolete.
>
> Format "Returns C{True} if message is obsolete."
>
> [3]
> +class MessageContext(
> + """Stores the secure ID for incoming messages that require a response.
>
> It seems to store more than that now.
>
> [4]
> + """Accessing a C{MessageContext} with an existing
> + C{operation-id} works."""
>
>
> + """Calling C{all_operation
> + list."""
>
> Please put the docstring on their own lines.
>
>
> Thanks!
Best regards
--
Muharem Hrnjadovic <email address hidden>
Public key id : B2BBFCFC
Key fingerprint : A5A3 CC67 2B87 D641 103F 5602 219F 6B60 B2BB FCFC
1 | === modified file 'landscape/broker/exchange.py' |
2 | --- landscape/broker/exchange.py 2010-04-29 15:57:29 +0000 |
3 | +++ landscape/broker/exchange.py 2010-04-30 08:32:27 +0000 |
4 | @@ -66,7 +66,7 @@ |
5 | return (self._urgent_exchange_interval, self._exchange_interval) |
6 | |
7 | def _message_is_obsolete(self, message): |
8 | - """True if message is obsolete. |
9 | + """Returns C{True} if message is obsolete. |
10 | |
11 | A message is considered obsolete if the secure ID changed since it was |
12 | received. |
13 | @@ -84,7 +84,10 @@ |
14 | |
15 | # Compare the current secure ID with the one that was in effect when |
16 | # the request message was received. |
17 | - return self._registration_info.secure_id != context.secure_id |
18 | + result = self._registration_info.secure_id != context.secure_id |
19 | + context.remove() |
20 | + |
21 | + return result |
22 | |
23 | def send(self, message, urgent=False): |
24 | """Include a message to be sent in an exchange. |
25 | |
26 | === modified file 'landscape/broker/exchangestore.py' |
27 | --- landscape/broker/exchangestore.py 2010-04-29 15:57:29 +0000 |
28 | +++ landscape/broker/exchangestore.py 2010-04-30 08:35:46 +0000 |
29 | @@ -10,7 +10,14 @@ |
30 | |
31 | |
32 | class MessageContext(object): |
33 | - """Stores the secure ID for incoming messages that require a response. |
34 | + """Stores a context for incoming messages that require a response. |
35 | + |
36 | + The context consists of |
37 | + |
38 | + - the "operation-id" value |
39 | + - the secure ID that was in effect when the message was received |
40 | + - the message type |
41 | + - the time when the message was received |
42 | |
43 | This data will be used to detect secure ID changes between the time at |
44 | which the request message came in and the completion of the request. |
45 | |
46 | === modified file 'landscape/broker/tests/test_exchange.py' |
47 | --- landscape/broker/tests/test_exchange.py 2010-04-29 16:05:18 +0000 |
48 | +++ landscape/broker/tests/test_exchange.py 2010-04-30 08:29:00 +0000 |
49 | @@ -844,6 +844,7 @@ |
50 | |
51 | # Change the secure ID so that the response message gets discarded. |
52 | self.identity.secure_id = 'brand-new' |
53 | + ids_before = self.exchanger._store.all_operation_ids() |
54 | |
55 | self.mstore.set_accepted_types(["resynchronize"]) |
56 | message_id = self.exchanger.send( |
57 | @@ -858,6 +859,11 @@ |
58 | "the client's secure ID has changed in the meantime") |
59 | self.assertTrue(expected_log_entry in self.logfile.getvalue()) |
60 | |
61 | + # The MessageContext was removed after utilisation. |
62 | + ids_after = self.exchanger._store.all_operation_ids() |
63 | + self.assertTrue(len(ids_after) == len(ids_before) - 1) |
64 | + self.assertFalse('234567' in ids_after) |
65 | + |
66 | |
67 | class AcceptedTypesMessageExchangeTest(LandscapeTest): |
68 | |
69 | |
70 | === modified file 'landscape/broker/tests/test_exchangestore.py' |
71 | --- landscape/broker/tests/test_exchangestore.py 2010-04-29 15:57:29 +0000 |
72 | +++ landscape/broker/tests/test_exchangestore.py 2010-04-30 08:37:03 +0000 |
73 | @@ -46,8 +46,9 @@ |
74 | self.store1.add_message_context, 123, 'def', 'change-packages') |
75 | |
76 | def test_get_message_context(self): |
77 | - """Accessing a C{MessageContext} with an existing |
78 | - C{operation-id} works.""" |
79 | + """ |
80 | + Accessing a C{MessageContext} with an existing C{operation-id} works. |
81 | + """ |
82 | now = time.time() |
83 | self.store1.add_message_context(234, 'bcd', 'change-packages') |
84 | context = self.store2.get_message_context(234) |
85 | @@ -69,8 +70,10 @@ |
86 | self.assertIs(None, self.store1.get_message_context(345)) |
87 | |
88 | def test_all_operation_ids_for_empty_database(self): |
89 | - """Calling C{all_operation_ids} on an empty database returns an empty |
90 | - list.""" |
91 | + """ |
92 | + Calling C{all_operation_ids} on an empty database returns an empty |
93 | + list. |
94 | + """ |
95 | self.assertEquals([], self.store1.all_operation_ids()) |
96 | |
97 | def test_all_operation_ids(self): |
Free Ekanayaka (free.ekanayaka) wrote : | # |
[9]
+ message_
Please name this parameter "exchange_store", so it matches the name of the expected type (ExchangeStore) and it is shorter. Also, now that we have it, the ExchangeStore could be used to store also other types of information beside MessageContext's, so it makes sense to use a generic name for it.
+ self._store = message_
Similarly please name this attribute "self._
[10]
+ else:
+ if "timestamp" not in message:
+ message[
+ message_id = self._message_
+ if urgent:
+ self.schedule_
+ return message_id
I think you can save this else/indentation level and simply write:
+ if "timestamp" not in message:
+ message[
+ message_id = self._message_
+ if urgent:
+ self.schedule_
+ return message_id
I personally like the pattern when you check a series of early-return conditions, before reaching the meat of a function/method, like
if this:
return something
if that:
return something_else
do_the_actual_thing
[11]
+ exchange_store = ExchangeStore(
+ os.path.
Just to avoid hard-coding this value you could add a property to l.broker.
@property
def exchange_
return os.path.
and use it as
+ exchange_store = ExchangeStore(
[12]
+ exchange_store = ExchangeStore(
+ os.path.
Please add this object to the test_case attributes, so it can be used in tests:
+ test_case.
+ os.path.
after that you can avoid creating it explicitly in all the tests.
[13]
+ self.exchanger.
Once you have self.exchange_store as test attribute (see [12]) you should use it instead of accessing this private attribute.
Thanks!
Muharem Hrnjadovic (al-maisan) wrote : | # |
Hello Free,
thank you very much for these comments, I really appreciate it.
I have taken care of all of them, please see the attached incremental
diff.
On 04/30/2010 11:14 AM, Free Ekanayaka wrote:
> [9]
>
> + message_
>
> Please name this parameter "exchange_store", so it matches the name of the expected type (ExchangeStore) and it is shorter. Also, now that we have it, the ExchangeStore could be used to store also other types of information beside MessageContext's, so it makes sense to use a generic name for it.
>
> + self._store = message_
>
> Similarly please name this attribute "self._
>
> [10]
>
> + else:
> + if "timestamp" not in message:
> + message[
> + message_id = self._message_
> + if urgent:
> + self.schedule_
> + return message_id
>
> I think you can save this else/indentation level and simply write:
>
> + if "timestamp" not in message:
> + message[
> + message_id = self._message_
> + if urgent:
> + self.schedule_
> + return message_id
>
> I personally like the pattern when you check a series of early-return conditions, before reaching the meat of a function/method, like
>
> if this:
> return something
> if that:
> return something_else
>
> do_the_actual_thing
>
> [11]
>
> + exchange_store = ExchangeStore(
> + os.path.
>
> Just to avoid hard-coding this value you could add a property to l.broker.
>
> @property
> def exchange_
> return os.path.
>
> and use it as
>
> + exchange_store = ExchangeStore(
>
> [12]
>
> + exchange_store = ExchangeStore(
> + os.path.
>
> Please add this object to the test_case attributes, so it can be used in tests:
>
> + test_case.
> + os.path.
>
> after that you can avoid creating it explicitly in all the tests.
>
> [13]
>
> + self.exchanger.
>
> Once you have self.exchange_store as test attribute (see [12]) you should use it instead of accessing this private attribute.
>
> Thanks!
Best regards
--
Muharem Hrnjadovic <email address hidden>
Public key id : B2BBFCFC
Key fingerprint : A5A3 CC67 2B87 D641 103F 5602 219F 6B60 B2BB FCFC
1 | === modified file 'landscape/broker/config.py' |
2 | --- landscape/broker/config.py 2010-01-06 15:11:24 +0000 |
3 | +++ landscape/broker/config.py 2010-04-30 10:39:07 +0000 |
4 | @@ -18,6 +18,10 @@ |
5 | self._original_http_proxy = os.environ.get("http_proxy") |
6 | self._original_https_proxy = os.environ.get("https_proxy") |
7 | |
8 | + @property |
9 | + def exchange_store_path(self): |
10 | + return os.path.join(self.data_path, "exchange.database") |
11 | + |
12 | def make_parser(self): |
13 | """Parser factory for broker-specific options. |
14 | |
15 | |
16 | === modified file 'landscape/broker/exchange.py' |
17 | --- landscape/broker/exchange.py 2010-04-30 08:32:42 +0000 |
18 | +++ landscape/broker/exchange.py 2010-04-30 10:32:42 +0000 |
19 | @@ -22,7 +22,7 @@ |
20 | plugin_name = "message-exchange" |
21 | |
22 | def __init__(self, reactor, store, transport, registration_info, |
23 | - message_context_store, |
24 | + exchange_store, |
25 | exchange_interval=60*60, |
26 | urgent_exchange_interval=10, |
27 | monitor_interval=None, |
28 | @@ -53,7 +53,7 @@ |
29 | self._client_accepted_types = set() |
30 | self._client_accepted_types_hash = None |
31 | self._message_handlers = {} |
32 | - self._store = message_context_store |
33 | + self._exchange_store = exchange_store |
34 | |
35 | self.register_message("accepted-types", self._handle_accepted_types) |
36 | self.register_message("resynchronize", self._handle_resynchronize) |
37 | @@ -75,7 +75,7 @@ |
38 | return False |
39 | |
40 | operation_id = message['operation-id'] |
41 | - context = self._store.get_message_context(operation_id) |
42 | + context = self._exchange_store.get_message_context(operation_id) |
43 | if context is None: |
44 | logging.warning( |
45 | "No message context for message with operation-id: %s" |
46 | @@ -103,13 +103,13 @@ |
47 | "because the client's secure ID has changed in the meantime" |
48 | % message.get('operation-id')) |
49 | return None |
50 | - else: |
51 | - if "timestamp" not in message: |
52 | - message["timestamp"] = int(self._reactor.time()) |
53 | - message_id = self._message_store.add(message) |
54 | - if urgent: |
55 | - self.schedule_exchange(urgent=True) |
56 | - return message_id |
57 | + |
58 | + if "timestamp" not in message: |
59 | + message["timestamp"] = int(self._reactor.time()) |
60 | + message_id = self._message_store.add(message) |
61 | + if urgent: |
62 | + self.schedule_exchange(urgent=True) |
63 | + return message_id |
64 | |
65 | def start(self): |
66 | """Start scheduling exchanges. The first one will be urgent.""" |
67 | @@ -400,7 +400,7 @@ |
68 | if 'operation-id' in message: |
69 | # This is a message that requires a response. Store the secure ID |
70 | # so we can check for obsolete results later. |
71 | - self._store.add_message_context( |
72 | + self._exchange_store.add_message_context( |
73 | message['operation-id'], self._registration_info.secure_id, |
74 | message['type']) |
75 | |
76 | |
77 | === modified file 'landscape/broker/service.py' |
78 | --- landscape/broker/service.py 2010-04-29 15:57:29 +0000 |
79 | +++ landscape/broker/service.py 2010-04-30 10:39:43 +0000 |
80 | @@ -53,8 +53,7 @@ |
81 | self.message_store = get_default_message_store( |
82 | self.persist, config.message_store_path) |
83 | self.identity = Identity(self.config, self.persist) |
84 | - exchange_store = ExchangeStore( |
85 | - os.path.join(config.data_path, "exchange.database")) |
86 | + exchange_store = ExchangeStore(self.config.exchange_store_path) |
87 | self.exchanger = MessageExchange( |
88 | self.reactor, self.message_store, self.transport, self.identity, |
89 | exchange_store, config.exchange_interval, |
90 | |
91 | === modified file 'landscape/broker/tests/helpers.py' |
92 | --- landscape/broker/tests/helpers.py 2010-04-29 15:57:29 +0000 |
93 | +++ landscape/broker/tests/helpers.py 2010-04-30 10:44:26 +0000 |
94 | @@ -75,11 +75,11 @@ |
95 | test_case.transport = FakeTransport(test_case.config.url, |
96 | test_case.config.ssl_public_key) |
97 | test_case.reactor = FakeReactor() |
98 | - exchange_store = ExchangeStore( |
99 | + test_case.exchange_store = ExchangeStore( |
100 | os.path.join(test_case.config.data_path, "exchange.database")) |
101 | test_case.exchanger = MessageExchange( |
102 | test_case.reactor, test_case.mstore, test_case.transport, |
103 | - test_case.identity, exchange_store, |
104 | + test_case.identity, test_case.exchange_store, |
105 | test_case.config.exchange_interval, |
106 | test_case.config.urgent_exchange_interval) |
107 | |
108 | |
109 | === modified file 'landscape/broker/tests/test_exchange.py' |
110 | --- landscape/broker/tests/test_exchange.py 2010-04-30 08:29:41 +0000 |
111 | +++ landscape/broker/tests/test_exchange.py 2010-04-30 10:48:10 +0000 |
112 | @@ -273,10 +273,8 @@ |
113 | Immediately after registration, an urgent exchange should be scheduled. |
114 | """ |
115 | transport = FakeTransport() |
116 | - exchange_store = ExchangeStore( |
117 | - os.path.join(self.config.data_path, "exchange.database")) |
118 | exchanger = MessageExchange(self.reactor, self.mstore, transport, |
119 | - self.identity, exchange_store) |
120 | + self.identity, self.exchange_store) |
121 | exchanger.start() |
122 | self.wait_for_exchange(urgent=True) |
123 | self.assertEquals(len(transport.payloads), 1) |
124 | @@ -502,10 +500,8 @@ |
125 | the total-messages is equivalent to the total number of messages |
126 | pending. |
127 | """ |
128 | - exchange_store = ExchangeStore( |
129 | - os.path.join(self.config.data_path, "exchange.database")) |
130 | exchanger = MessageExchange(self.reactor, self.mstore, self.transport, |
131 | - self.identity, exchange_store, |
132 | + self.identity, self.exchange_store, |
133 | max_messages=1) |
134 | self.mstore.set_accepted_types(["empty"]) |
135 | self.mstore.add({"type": "empty"}) |
136 | @@ -534,10 +530,8 @@ |
137 | # We create our own MessageExchange because the one set up by the text |
138 | # fixture has an urgent exchange interval of 10 seconds, which makes |
139 | # testing this awkward. |
140 | - exchange_store = ExchangeStore( |
141 | - os.path.join(self.config.data_path, "exchange.database")) |
142 | exchanger = MessageExchange(self.reactor, self.mstore, self.transport, |
143 | - self.identity, exchange_store, |
144 | + self.identity, self.exchange_store, |
145 | urgent_exchange_interval=20) |
146 | exchanger.schedule_exchange(urgent=True) |
147 | events = [] |
148 | @@ -554,10 +548,8 @@ |
149 | should be cancelled and a new one should be scheduled for 10 seconds |
150 | before the new urgent exchange. |
151 | """ |
152 | - exchange_store = ExchangeStore( |
153 | - os.path.join(self.config.data_path, "exchange.database")) |
154 | exchanger = MessageExchange(self.reactor, self.mstore, self.transport, |
155 | - self.identity, exchange_store, |
156 | + self.identity, self.exchange_store, |
157 | urgent_exchange_interval=20) |
158 | events = [] |
159 | self.reactor.call_on("impending-exchange", lambda: events.append(True)) |
160 | @@ -808,8 +800,9 @@ |
161 | [message] = messages |
162 | self.assertIsNot( |
163 | None, |
164 | - self.exchanger._store.get_message_context(message['operation-id'])) |
165 | - message_context = self.exchanger._store.get_message_context(message['operation-id']) |
166 | + self.exchange_store.get_message_context(message['operation-id'])) |
167 | + message_context = self.exchange_store.get_message_context( |
168 | + message['operation-id']) |
169 | self.assertEquals(message_context.operation_id, 123456) |
170 | self.assertEquals(message_context.message_type, "type-R") |
171 | |
172 | @@ -818,14 +811,14 @@ |
173 | Incoming messages without an 'operation-id' key will *not* have the |
174 | secure id stored in the L{ExchangeStore}. |
175 | """ |
176 | - ids_before = self.exchanger._store.all_operation_ids() |
177 | + ids_before = self.exchange_store.all_operation_ids() |
178 | |
179 | msg = {"type": "type-R", "whatever": 5678} |
180 | server_message = [msg] |
181 | self.transport.responses.append(server_message) |
182 | self.exchanger.exchange() |
183 | |
184 | - ids_after = self.exchanger._store.all_operation_ids() |
185 | + ids_after = self.exchange_store.all_operation_ids() |
186 | self.assertEquals(ids_before, ids_after) |
187 | |
188 | def test_obsolete_response_messages_are_discarded(self): |
189 | @@ -844,7 +837,7 @@ |
190 | |
191 | # Change the secure ID so that the response message gets discarded. |
192 | self.identity.secure_id = 'brand-new' |
193 | - ids_before = self.exchanger._store.all_operation_ids() |
194 | + ids_before = self.exchange_store.all_operation_ids() |
195 | |
196 | self.mstore.set_accepted_types(["resynchronize"]) |
197 | message_id = self.exchanger.send( |
198 | @@ -860,7 +853,7 @@ |
199 | self.assertTrue(expected_log_entry in self.logfile.getvalue()) |
200 | |
201 | # The MessageContext was removed after utilisation. |
202 | - ids_after = self.exchanger._store.all_operation_ids() |
203 | + ids_after = self.exchange_store.all_operation_ids() |
204 | self.assertTrue(len(ids_after) == len(ids_before) - 1) |
205 | self.assertFalse('234567' in ids_after) |
206 |
- 257. By Muharem Hrnjadovic
-
Free's review comments [9,10]
- 258. By Muharem Hrnjadovic
-
Free's review comment #11
- 259. By Muharem Hrnjadovic
-
Free's review comment #12
- 260. By Muharem Hrnjadovic
-
Free's review comments [12,13]
Free Ekanayaka (free.ekanayaka) wrote : | # |
The branch looks great! Thanks for addressing all the comments. +1
[14]
+ test_case.
+ os.path.
You can now use BrokerConfigura
+ test_case.
+ test_case.
- 261. By Muharem Hrnjadovic
-
Free's review comment #14
Thomas Herve (therve) wrote : | # |
[5] Some pyflakes:
landscape/
landscape/
[6] get_message_context should retrieve all the data from the database in one query, and pass it to the MessageContext __init__, thus removing the select in there.
+1!
- 262. By Muharem Hrnjadovic
-
Thomas' review comment #5
Muharem Hrnjadovic (al-maisan) wrote : | # |
On 04/30/2010 04:59 PM, Thomas Herve wrote:
> Review: Approve
> [5] Some pyflakes:
> landscape/
> landscape/
>
> [6] get_message_context should retrieve all the data from the database in one query, and pass it to the MessageContext __init__, thus removing the select in there.
Hello Thomas,
thanks for the suggestions above! I have revised the code accordingly.
Please see the attached incremental diff.
Best regards
--
Muharem Hrnjadovic <email address hidden>
Public key id : B2BBFCFC
Key fingerprint : A5A3 CC67 2B87 D641 103F 5602 219F 6B60 B2BB FCFC
1 | === modified file 'landscape/broker/exchangestore.py' |
2 | --- landscape/broker/exchangestore.py 2010-04-30 08:36:15 +0000 |
3 | +++ landscape/broker/exchangestore.py 2010-04-30 18:11:31 +0000 |
4 | @@ -28,27 +28,18 @@ |
5 | @param id: the database key value for this instance. |
6 | """ |
7 | |
8 | - def __init__(self, db, id): |
9 | + def __init__(self, db, operation_id, secure_id, message_type, timestamp): |
10 | self._db = db |
11 | - self.id = id |
12 | - |
13 | - cursor = db.cursor() |
14 | - try: |
15 | - cursor.execute( |
16 | - "SELECT operation_id, secure_id, message_type, timestamp " |
17 | - "FROM message_context WHERE id=?", (id,)) |
18 | - row = cursor.fetchone() |
19 | - finally: |
20 | - cursor.close() |
21 | - |
22 | - self.operation_id = row[0] |
23 | - self.secure_id = row[1] |
24 | - self.message_type = row[2] |
25 | - self.timestamp = row[3] |
26 | + self.operation_id = operation_id |
27 | + self.secure_id = secure_id |
28 | + self.message_type = message_type |
29 | + self.timestamp = timestamp |
30 | |
31 | @with_cursor |
32 | def remove(self, cursor): |
33 | - cursor.execute("DELETE FROM message_context WHERE id=?", (self.id,)) |
34 | + cursor.execute( |
35 | + "DELETE FROM message_context WHERE operation_id=?", |
36 | + (self.operation_id,)) |
37 | |
38 | |
39 | class ExchangeStore(object): |
40 | @@ -72,21 +63,21 @@ |
41 | def add_message_context( |
42 | self, cursor, operation_id, secure_id, message_type): |
43 | """Add a L{MessageContext} with the given data.""" |
44 | + params = (operation_id, secure_id, message_type, time.time()) |
45 | cursor.execute( |
46 | "INSERT INTO message_context " |
47 | " (operation_id, secure_id, message_type, timestamp) " |
48 | - " VALUES (?,?,?,?)", |
49 | - (operation_id, secure_id, message_type, time.time())) |
50 | - return MessageContext(self._db, cursor.lastrowid) |
51 | + " VALUES (?,?,?,?)", params) |
52 | + return MessageContext(self._db, *params) |
53 | |
54 | @with_cursor |
55 | def get_message_context(self, cursor, operation_id): |
56 | """The L{MessageContext} for the given C{operation_id} or C{None}.""" |
57 | cursor.execute( |
58 | - "SELECT id FROM message_context WHERE operation_id=?", |
59 | - (operation_id,)) |
60 | - result = cursor.fetchone() |
61 | - return MessageContext(self._db, result[0]) if result else None |
62 | + "SELECT operation_id, secure_id, message_type, timestamp " |
63 | + "FROM message_context WHERE operation_id=?", (operation_id,)) |
64 | + row = cursor.fetchone() |
65 | + return MessageContext(self._db, *row) if row else None |
66 | |
67 | @with_cursor |
68 | def all_operation_ids(self, cursor): |
69 | |
70 | === modified file 'landscape/broker/tests/test_exchange.py' |
71 | --- landscape/broker/tests/test_exchange.py 2010-04-30 10:48:32 +0000 |
72 | +++ landscape/broker/tests/test_exchange.py 2010-04-30 15:54:23 +0000 |
73 | @@ -1,11 +1,9 @@ |
74 | -import os |
75 | from landscape import SERVER_API, CLIENT_API |
76 | from landscape.lib.persist import Persist |
77 | from landscape.lib.hashlib import md5 |
78 | from landscape.lib.fetch import fetch_async |
79 | from landscape.schema import Message, Int |
80 | from landscape.broker.exchange import get_accepted_types_diff, MessageExchange |
81 | -from landscape.broker.exchangestore import ExchangeStore |
82 | from landscape.broker.transport import FakeTransport |
83 | from landscape.broker.store import MessageStore |
84 | from landscape.broker.ping import Pinger |
- 263. By Muharem Hrnjadovic
-
Thomas' review comment #6
- 264. By Muharem Hrnjadovic
-
Made code more succinct
Preview Diff
1 | === modified file 'landscape/broker/config.py' |
2 | --- landscape/broker/config.py 2010-01-06 15:11:24 +0000 |
3 | +++ landscape/broker/config.py 2010-04-30 19:04:26 +0000 |
4 | @@ -18,6 +18,10 @@ |
5 | self._original_http_proxy = os.environ.get("http_proxy") |
6 | self._original_https_proxy = os.environ.get("https_proxy") |
7 | |
8 | + @property |
9 | + def exchange_store_path(self): |
10 | + return os.path.join(self.data_path, "exchange.database") |
11 | + |
12 | def make_parser(self): |
13 | """Parser factory for broker-specific options. |
14 | |
15 | |
16 | === modified file 'landscape/broker/exchange.py' |
17 | --- landscape/broker/exchange.py 2010-01-22 11:51:19 +0000 |
18 | +++ landscape/broker/exchange.py 2010-04-30 19:04:26 +0000 |
19 | @@ -22,6 +22,7 @@ |
20 | plugin_name = "message-exchange" |
21 | |
22 | def __init__(self, reactor, store, transport, registration_info, |
23 | + exchange_store, |
24 | exchange_interval=60*60, |
25 | urgent_exchange_interval=10, |
26 | monitor_interval=None, |
27 | @@ -52,6 +53,7 @@ |
28 | self._client_accepted_types = set() |
29 | self._client_accepted_types_hash = None |
30 | self._message_handlers = {} |
31 | + self._exchange_store = exchange_store |
32 | |
33 | self.register_message("accepted-types", self._handle_accepted_types) |
34 | self.register_message("resynchronize", self._handle_resynchronize) |
35 | @@ -63,6 +65,30 @@ |
36 | """Return a binary tuple with urgent and normal exchange intervals.""" |
37 | return (self._urgent_exchange_interval, self._exchange_interval) |
38 | |
39 | + def _message_is_obsolete(self, message): |
40 | + """Returns C{True} if message is obsolete. |
41 | + |
42 | + A message is considered obsolete if the secure ID changed since it was |
43 | + received. |
44 | + """ |
45 | + if 'operation-id' not in message: |
46 | + return False |
47 | + |
48 | + operation_id = message['operation-id'] |
49 | + context = self._exchange_store.get_message_context(operation_id) |
50 | + if context is None: |
51 | + logging.warning( |
52 | + "No message context for message with operation-id: %s" |
53 | + % operation_id) |
54 | + return False |
55 | + |
56 | + # Compare the current secure ID with the one that was in effect when |
57 | + # the request message was received. |
58 | + result = self._registration_info.secure_id != context.secure_id |
59 | + context.remove() |
60 | + |
61 | + return result |
62 | + |
63 | def send(self, message, urgent=False): |
64 | """Include a message to be sent in an exchange. |
65 | |
66 | @@ -71,6 +97,13 @@ |
67 | |
68 | @param message: Same as in L{MessageStore.add}. |
69 | """ |
70 | + if self._message_is_obsolete(message): |
71 | + logging.info( |
72 | + "Response message with operation-id %s was discarded " |
73 | + "because the client's secure ID has changed in the meantime" |
74 | + % message.get('operation-id')) |
75 | + return None |
76 | + |
77 | if "timestamp" not in message: |
78 | message["timestamp"] = int(self._reactor.time()) |
79 | message_id = self._message_store.add(message) |
80 | @@ -300,7 +333,8 @@ |
81 | @param result: The response got in reply to the C{payload}. |
82 | """ |
83 | message_store = self._message_store |
84 | - self._client_accepted_types_hash = result.get("client-accepted-types-hash") |
85 | + self._client_accepted_types_hash = result.get( |
86 | + "client-accepted-types-hash") |
87 | next_expected = result.get("next-expected-sequence") |
88 | old_sequence = message_store.get_sequence() |
89 | if next_expected is None: |
90 | @@ -363,6 +397,13 @@ |
91 | Any message handlers registered with L{register_message} will |
92 | be called. |
93 | """ |
94 | + if 'operation-id' in message: |
95 | + # This is a message that requires a response. Store the secure ID |
96 | + # so we can check for obsolete results later. |
97 | + self._exchange_store.add_message_context( |
98 | + message['operation-id'], self._registration_info.secure_id, |
99 | + message['type']) |
100 | + |
101 | self._reactor.fire("message", message) |
102 | # This has plan interference! but whatever. |
103 | if message["type"] in self._message_handlers: |
104 | |
105 | === added file 'landscape/broker/exchangestore.py' |
106 | --- landscape/broker/exchangestore.py 1970-01-01 00:00:00 +0000 |
107 | +++ landscape/broker/exchangestore.py 2010-04-30 19:04:26 +0000 |
108 | @@ -0,0 +1,110 @@ |
109 | +"""Provide access to the persistent data used by the L{MessageExchange}.""" |
110 | +import time |
111 | + |
112 | +try: |
113 | + import sqlite3 |
114 | +except ImportError: |
115 | + from pysqlite2 import dbapi2 as sqlite3 |
116 | + |
117 | +from landscape.lib.store import with_cursor |
118 | + |
119 | + |
120 | +class MessageContext(object): |
121 | + """Stores a context for incoming messages that require a response. |
122 | + |
123 | + The context consists of |
124 | + |
125 | + - the "operation-id" value |
126 | + - the secure ID that was in effect when the message was received |
127 | + - the message type |
128 | + - the time when the message was received |
129 | + |
130 | + This data will be used to detect secure ID changes between the time at |
131 | + which the request message came in and the completion of the request. |
132 | + If the secure ID did change the result message is obolete and will not be |
133 | + sent to the server. |
134 | + |
135 | + @param db: the sqlite database handle. |
136 | + @param id: the database key value for this instance. |
137 | + """ |
138 | + |
139 | + def __init__(self, db, operation_id, secure_id, message_type, timestamp): |
140 | + self._db = db |
141 | + self.operation_id = operation_id |
142 | + self.secure_id = secure_id |
143 | + self.message_type = message_type |
144 | + self.timestamp = timestamp |
145 | + |
146 | + @with_cursor |
147 | + def remove(self, cursor): |
148 | + cursor.execute( |
149 | + "DELETE FROM message_context WHERE operation_id=?", |
150 | + (self.operation_id,)) |
151 | + |
152 | + |
153 | +class ExchangeStore(object): |
154 | + """Message meta data required by the L{MessageExchange}. |
155 | + |
156 | + The implementation uses a SQLite database as backend, with a single table |
157 | + called "message_context", whose schema is defined in |
158 | + L{ensure_exchange_schema}. |
159 | + |
160 | + @param filename: The name of the file that contains the sqlite database. |
161 | + """ |
162 | + _db = None |
163 | + |
164 | + def __init__(self, filename): |
165 | + self._filename = filename |
166 | + |
167 | + def _ensure_schema(self): |
168 | + ensure_exchange_schema(self._db) |
169 | + |
170 | + @with_cursor |
171 | + def add_message_context( |
172 | + self, cursor, operation_id, secure_id, message_type): |
173 | + """Add a L{MessageContext} with the given data.""" |
174 | + params = (operation_id, secure_id, message_type, time.time()) |
175 | + cursor.execute( |
176 | + "INSERT INTO message_context " |
177 | + " (operation_id, secure_id, message_type, timestamp) " |
178 | + " VALUES (?,?,?,?)", params) |
179 | + return MessageContext(self._db, *params) |
180 | + |
181 | + @with_cursor |
182 | + def get_message_context(self, cursor, operation_id): |
183 | + """The L{MessageContext} for the given C{operation_id} or C{None}.""" |
184 | + cursor.execute( |
185 | + "SELECT operation_id, secure_id, message_type, timestamp " |
186 | + "FROM message_context WHERE operation_id=?", (operation_id,)) |
187 | + row = cursor.fetchone() |
188 | + return MessageContext(self._db, *row) if row else None |
189 | + |
190 | + @with_cursor |
191 | + def all_operation_ids(self, cursor): |
192 | + """Return all operation IDs currently stored in C{message_context}.""" |
193 | + cursor.execute("SELECT operation_id FROM message_context") |
194 | + result = cursor.fetchall() |
195 | + return [row[0] for row in result] |
196 | + |
197 | + |
198 | +def ensure_exchange_schema(db): |
199 | + """Create all tables needed by a L{ExchangeStore}. |
200 | + |
201 | + @param db: A connection to a SQLite database. |
202 | + """ |
203 | + cursor = db.cursor() |
204 | + try: |
205 | + cursor.execute( |
206 | + "CREATE TABLE message_context" |
207 | + " (id INTEGER PRIMARY KEY, timestamp TIMESTAMP, " |
208 | + " secure_id TEXT NOT NULL, operation_id INTEGER NOT NULL, " |
209 | + " message_type text NOT NULL)") |
210 | + cursor.execute( |
211 | + "CREATE UNIQUE INDEX msgctx_operationid_idx ON " |
212 | + "message_context(operation_id)") |
213 | + except (sqlite3.OperationalError, sqlite3.DatabaseError): |
214 | + cursor.close() |
215 | + db.rollback() |
216 | + else: |
217 | + cursor.close() |
218 | + db.commit() |
219 | |
220 | === modified file 'landscape/broker/service.py' |
221 | --- landscape/broker/service.py 2010-04-23 11:37:25 +0000 |
222 | +++ landscape/broker/service.py 2010-04-30 19:04:26 +0000 |
223 | @@ -7,6 +7,7 @@ |
224 | from landscape.broker.config import BrokerConfiguration |
225 | from landscape.broker.transport import HTTPTransport |
226 | from landscape.broker.exchange import MessageExchange |
227 | +from landscape.broker.exchangestore import ExchangeStore |
228 | from landscape.broker.ping import Pinger |
229 | from landscape.broker.store import get_default_message_store |
230 | from landscape.broker.server import BrokerServer |
231 | @@ -52,9 +53,11 @@ |
232 | self.message_store = get_default_message_store( |
233 | self.persist, config.message_store_path) |
234 | self.identity = Identity(self.config, self.persist) |
235 | + exchange_store = ExchangeStore(self.config.exchange_store_path) |
236 | self.exchanger = MessageExchange( |
237 | self.reactor, self.message_store, self.transport, self.identity, |
238 | - config.exchange_interval, config.urgent_exchange_interval) |
239 | + exchange_store, config.exchange_interval, |
240 | + config.urgent_exchange_interval) |
241 | self.pinger = self.pinger_factory(self.reactor, config.ping_url, |
242 | self.identity, self.exchanger) |
243 | self.registration = RegistrationHandler( |
244 | |
245 | === modified file 'landscape/broker/tests/helpers.py' |
246 | --- landscape/broker/tests/helpers.py 2010-04-07 21:00:46 +0000 |
247 | +++ landscape/broker/tests/helpers.py 2010-04-30 19:04:26 +0000 |
248 | @@ -6,6 +6,7 @@ |
249 | from landscape.reactor import FakeReactor |
250 | from landscape.broker.transport import FakeTransport |
251 | from landscape.broker.exchange import MessageExchange |
252 | +from landscape.broker.exchangestore import ExchangeStore |
253 | from landscape.broker.store import get_default_message_store |
254 | from landscape.broker.registration import Identity, RegistrationHandler |
255 | from landscape.broker.ping import Pinger |
256 | @@ -74,9 +75,12 @@ |
257 | test_case.transport = FakeTransport(test_case.config.url, |
258 | test_case.config.ssl_public_key) |
259 | test_case.reactor = FakeReactor() |
260 | + test_case.exchange_store = ExchangeStore( |
261 | + test_case.config.exchange_store_path) |
262 | test_case.exchanger = MessageExchange( |
263 | test_case.reactor, test_case.mstore, test_case.transport, |
264 | - test_case.identity, test_case.config.exchange_interval, |
265 | + test_case.identity, test_case.exchange_store, |
266 | + test_case.config.exchange_interval, |
267 | test_case.config.urgent_exchange_interval) |
268 | |
269 | |
270 | |
271 | === modified file 'landscape/broker/tests/test_exchange.py' |
272 | --- landscape/broker/tests/test_exchange.py 2010-04-28 13:07:53 +0000 |
273 | +++ landscape/broker/tests/test_exchange.py 2010-04-30 19:04:26 +0000 |
274 | @@ -1,4 +1,3 @@ |
275 | - |
276 | from landscape import SERVER_API, CLIENT_API |
277 | from landscape.lib.persist import Persist |
278 | from landscape.lib.hashlib import md5 |
279 | @@ -22,6 +21,7 @@ |
280 | self.mstore.add_schema(Message("empty", {})) |
281 | self.mstore.add_schema(Message("data", {"data": Int()})) |
282 | self.mstore.add_schema(Message("holdme", {})) |
283 | + self.identity.secure_id = 'needs-to-be-set-for-tests-to-pass' |
284 | |
285 | def wait_for_exchange(self, urgent=False, factor=1, delta=0): |
286 | if urgent: |
287 | @@ -240,7 +240,6 @@ |
288 | self.mstore.add({"type": "data", "data": 2}) |
289 | self.mstore.add({"type": "data", "data": 3}) |
290 | # next one, server will respond with 1! |
291 | - |
292 | def desynched_send_data(payload, computer_id=None, message_api=None): |
293 | self.transport.next_expected_sequence = 1 |
294 | return {"next-expected-sequence": 1} |
295 | @@ -273,7 +272,7 @@ |
296 | """ |
297 | transport = FakeTransport() |
298 | exchanger = MessageExchange(self.reactor, self.mstore, transport, |
299 | - self.identity) |
300 | + self.identity, self.exchange_store) |
301 | exchanger.start() |
302 | self.wait_for_exchange(urgent=True) |
303 | self.assertEquals(len(transport.payloads), 1) |
304 | @@ -390,6 +389,7 @@ |
305 | del self.transport.exchange |
306 | |
307 | exchanged = [] |
308 | + |
309 | def exchange_callback(): |
310 | exchanged.append(True) |
311 | |
312 | @@ -473,7 +473,6 @@ |
313 | self.assertEquals(payload.get("server-api"), "2.0") |
314 | self.assertEquals(self.transport.message_api, "2.0") |
315 | |
316 | - |
317 | def test_include_total_messages_none(self): |
318 | """ |
319 | The payload includes the total number of messages that the client has |
320 | @@ -500,14 +499,14 @@ |
321 | pending. |
322 | """ |
323 | exchanger = MessageExchange(self.reactor, self.mstore, self.transport, |
324 | - self.identity, max_messages=1) |
325 | + self.identity, self.exchange_store, |
326 | + max_messages=1) |
327 | self.mstore.set_accepted_types(["empty"]) |
328 | self.mstore.add({"type": "empty"}) |
329 | self.mstore.add({"type": "empty"}) |
330 | exchanger.exchange() |
331 | self.assertEquals(self.transport.payloads[0]["total-messages"], 2) |
332 | |
333 | - |
334 | def test_impending_exchange(self): |
335 | """ |
336 | A reactor event is emitted shortly (10 seconds) before an exchange |
337 | @@ -530,7 +529,8 @@ |
338 | # fixture has an urgent exchange interval of 10 seconds, which makes |
339 | # testing this awkward. |
340 | exchanger = MessageExchange(self.reactor, self.mstore, self.transport, |
341 | - self.identity, urgent_exchange_interval=20) |
342 | + self.identity, self.exchange_store, |
343 | + urgent_exchange_interval=20) |
344 | exchanger.schedule_exchange(urgent=True) |
345 | events = [] |
346 | self.reactor.call_on("impending-exchange", lambda: events.append(True)) |
347 | @@ -547,7 +547,8 @@ |
348 | before the new urgent exchange. |
349 | """ |
350 | exchanger = MessageExchange(self.reactor, self.mstore, self.transport, |
351 | - self.identity, urgent_exchange_interval=20) |
352 | + self.identity, self.exchange_store, |
353 | + urgent_exchange_interval=20) |
354 | events = [] |
355 | self.reactor.call_on("impending-exchange", lambda: events.append(True)) |
356 | # This call will: |
357 | @@ -610,6 +611,7 @@ |
358 | self.transport.exchange = failed_send_data |
359 | |
360 | exchanged = [] |
361 | + |
362 | def exchange_failed_callback(): |
363 | exchanged.append(True) |
364 | |
365 | @@ -700,6 +702,7 @@ |
366 | |
367 | def handler1(message): |
368 | messages.append(("one", message)) |
369 | + |
370 | def handler2(message): |
371 | messages.append(("two", message)) |
372 | |
373 | @@ -722,6 +725,7 @@ |
374 | |
375 | def test_server_uuid_change_cause_event(self): |
376 | called = [] |
377 | + |
378 | def server_uuid_changed(old_uuid, new_uuid): |
379 | called.append((old_uuid, new_uuid)) |
380 | self.reactor.call_on("server-uuid-changed", server_uuid_changed) |
381 | @@ -756,6 +760,7 @@ |
382 | the event is not emitted. |
383 | """ |
384 | called = [] |
385 | + |
386 | def server_uuid_changed(old_uuid, new_uuid): |
387 | called.append((old_uuid, new_uuid)) |
388 | self.reactor.call_on("server-uuid-changed", server_uuid_changed) |
389 | @@ -779,6 +784,77 @@ |
390 | |
391 | self.assertNotIn("INFO: Server UUID changed", self.logfile.getvalue()) |
392 | |
393 | + def test_return_messages_have_their_context_stored(self): |
394 | + """ |
395 | + Incoming messages with an 'operation-id' key will have the secure id |
396 | + stored in the L{ExchangeStore}. |
397 | + """ |
398 | + messages = [] |
399 | + self.exchanger.register_message("type-R", messages.append) |
400 | + msg = {"type": "type-R", "whatever": 5678, "operation-id": 123456} |
401 | + server_message = [msg] |
402 | + self.transport.responses.append(server_message) |
403 | + self.exchanger.exchange() |
404 | + [message] = messages |
405 | + self.assertIsNot( |
406 | + None, |
407 | + self.exchange_store.get_message_context(message['operation-id'])) |
408 | + message_context = self.exchange_store.get_message_context( |
409 | + message['operation-id']) |
410 | + self.assertEquals(message_context.operation_id, 123456) |
411 | + self.assertEquals(message_context.message_type, "type-R") |
412 | + |
413 | + def test_one_way_messages_do_not_have_their_context_stored(self): |
414 | + """ |
415 | + Incoming messages without an 'operation-id' key will *not* have the |
416 | + secure id stored in the L{ExchangeStore}. |
417 | + """ |
418 | + ids_before = self.exchange_store.all_operation_ids() |
419 | + |
420 | + msg = {"type": "type-R", "whatever": 5678} |
421 | + server_message = [msg] |
422 | + self.transport.responses.append(server_message) |
423 | + self.exchanger.exchange() |
424 | + |
425 | + ids_after = self.exchange_store.all_operation_ids() |
426 | + self.assertEquals(ids_before, ids_after) |
427 | + |
428 | + def test_obsolete_response_messages_are_discarded(self): |
429 | + """ |
430 | + An obsolete response message will be discarded as opposed to being |
431 | + sent to the server. |
432 | + |
433 | + A response message is considered obsolete if the secure ID changed |
434 | + since the request message was received. |
435 | + """ |
436 | + # Receive the message below from the server. |
437 | + msg = {"type": "type-R", "whatever": 5678, "operation-id": 234567} |
438 | + server_message = [msg] |
439 | + self.transport.responses.append(server_message) |
440 | + self.exchanger.exchange() |
441 | + |
442 | + # Change the secure ID so that the response message gets discarded. |
443 | + self.identity.secure_id = 'brand-new' |
444 | + ids_before = self.exchange_store.all_operation_ids() |
445 | + |
446 | + self.mstore.set_accepted_types(["resynchronize"]) |
447 | + message_id = self.exchanger.send( |
448 | + {"type": "resynchronize", "operation-id": 234567}) |
449 | + self.exchanger.exchange() |
450 | + self.assertEquals(2, len(self.transport.payloads)) |
451 | + messages = self.transport.payloads[1]["messages"] |
452 | + self.assertEquals([], messages) |
453 | + self.assertIs(None, message_id) |
454 | + expected_log_entry = ( |
455 | + "Response message with operation-id 234567 was discarded because " |
456 | + "the client's secure ID has changed in the meantime") |
457 | + self.assertTrue(expected_log_entry in self.logfile.getvalue()) |
458 | + |
459 | + # The MessageContext was removed after utilisation. |
460 | + ids_after = self.exchange_store.all_operation_ids() |
461 | + self.assertTrue(len(ids_after) == len(ids_before) - 1) |
462 | + self.assertFalse('234567' in ids_after) |
463 | + |
464 | |
465 | class AcceptedTypesMessageExchangeTest(LandscapeTest): |
466 | |
467 | @@ -868,7 +944,6 @@ |
468 | self.transport.payloads[2]["client-accepted-types"], |
469 | sorted(["type-A"] + DEFAULT_ACCEPTED_TYPES)) |
470 | |
471 | - |
472 | def test_register_message_adds_accepted_type(self): |
473 | """ |
474 | Using the C{register_message} method of the exchanger causes |
475 | |
476 | === added file 'landscape/broker/tests/test_exchangestore.py' |
477 | --- landscape/broker/tests/test_exchangestore.py 1970-01-01 00:00:00 +0000 |
478 | +++ landscape/broker/tests/test_exchangestore.py 2010-04-30 19:04:26 +0000 |
479 | @@ -0,0 +1,84 @@ |
480 | +import time |
481 | + |
482 | +try: |
483 | + import sqlite3 |
484 | +except ImportError: |
485 | + from pysqlite2 import dbapi2 as sqlite3 |
486 | + |
487 | +from landscape.tests.helpers import LandscapeTest |
488 | + |
489 | +from landscape.broker.exchangestore import ExchangeStore |
490 | + |
491 | + |
492 | +class ExchangeStoreTest(LandscapeTest): |
493 | + """Unit tests for the C{ExchangeStore}.""" |
494 | + |
495 | + def setUp(self): |
496 | + super(ExchangeStoreTest, self).setUp() |
497 | + |
498 | + self.filename = self.makeFile() |
499 | + self.store1 = ExchangeStore(self.filename) |
500 | + self.store2 = ExchangeStore(self.filename) |
501 | + |
502 | + def test_add_message_context(self): |
503 | + """Adding a message context works correctly.""" |
504 | + now = time.time() |
505 | + self.store1.add_message_context(123, 'abc', 'change-packages') |
506 | + |
507 | + db = sqlite3.connect(self.store2._filename) |
508 | + cursor = db.cursor() |
509 | + cursor.execute( |
510 | + "SELECT operation_id, secure_id, message_type, timestamp " |
511 | + "FROM message_context WHERE operation_id=?", (123,)) |
512 | + results = cursor.fetchall() |
513 | + self.assertEquals(1, len(results)) |
514 | + [row] = results |
515 | + self.assertEquals(123, row[0]) |
516 | + self.assertEquals('abc', row[1]) |
517 | + self.assertEquals('change-packages', row[2]) |
518 | + self.assertTrue(row[3] > now) |
519 | + |
520 | + def test_add_message_context_with_duplicate_operation_id(self): |
521 | + """Only one message context with a given operation-id is permitted.""" |
522 | + self.store1.add_message_context(123, 'abc', 'change-packages') |
523 | + self.assertRaises( |
524 | + sqlite3.IntegrityError, |
525 | + self.store1.add_message_context, 123, 'def', 'change-packages') |
526 | + |
527 | + def test_get_message_context(self): |
528 | + """ |
529 | + Accessing a C{MessageContext} with an existing C{operation-id} works. |
530 | + """ |
531 | + now = time.time() |
532 | + self.store1.add_message_context(234, 'bcd', 'change-packages') |
533 | + context = self.store2.get_message_context(234) |
534 | + self.assertEquals(234, context.operation_id) |
535 | + self.assertEquals('bcd', context.secure_id) |
536 | + self.assertEquals('change-packages', context.message_type) |
537 | + self.assertTrue(context.timestamp > now) |
538 | + |
539 | + def test_get_message_context_with_nonexistent_operation_id(self): |
540 | + """Attempts to access a C{MessageContext} with a non-existent |
541 | + C{operation-id} result in C{None}.""" |
542 | + self.assertIs(None, self.store1.get_message_context(999)) |
543 | + |
544 | + def test_message_context_remove(self): |
545 | + """C{MessageContext}s are deleted correctly.""" |
546 | + context = self.store1.add_message_context( |
547 | + 345, 'opq', 'change-packages') |
548 | + context.remove() |
549 | + self.assertIs(None, self.store1.get_message_context(345)) |
550 | + |
551 | + def test_all_operation_ids_for_empty_database(self): |
552 | + """ |
553 | + Calling C{all_operation_ids} on an empty database returns an empty |
554 | + list. |
555 | + """ |
556 | + self.assertEquals([], self.store1.all_operation_ids()) |
557 | + |
558 | + def test_all_operation_ids(self): |
559 | + """C{all_operation_ids} works correctly.""" |
560 | + self.store1.add_message_context(456, 'cde', 'change-packages') |
561 | + self.assertEquals([456], self.store2.all_operation_ids()) |
562 | + self.store2.add_message_context(567, 'def', 'change-packages') |
563 | + self.assertEquals([456, 567], self.store1.all_operation_ids()) |
564 | |
565 | === added file 'landscape/lib/store.py' |
566 | --- landscape/lib/store.py 1970-01-01 00:00:00 +0000 |
567 | +++ landscape/lib/store.py 2010-04-30 19:04:26 +0000 |
568 | @@ -0,0 +1,39 @@ |
569 | +"""Functions used by all sqlite-backed stores.""" |
570 | + |
571 | +try: |
572 | + import sqlite3 |
573 | +except ImportError: |
574 | + from pysqlite2 import dbapi2 as sqlite3 |
575 | + |
576 | + |
577 | +def with_cursor(method): |
578 | + """Decorator that encloses the method in a database transaction. |
579 | + |
580 | + Even though SQLite is supposed to be useful in autocommit mode, we've |
581 | + found cases where the database continued to be locked for writing |
582 | + until the cursor was closed. With this in mind, instead of using |
583 | + the autocommit mode, we explicitly terminate transactions and enforce |
584 | + cursor closing with this decorator. |
585 | + """ |
586 | + |
587 | + def inner(self, *args, **kwargs): |
588 | + if not self._db: |
589 | + # Create the database connection only when we start to actually |
590 | + # use it. This is essentially just a workaroud of a sqlite bug |
591 | + # happening when 2 concurrent processes try to create the tables |
592 | + # around the same time, the one which fails having an incorrect |
593 | + # cache and not seeing the tables |
594 | + self._db = sqlite3.connect(self._filename) |
595 | + self._ensure_schema() |
596 | + try: |
597 | + cursor = self._db.cursor() |
598 | + try: |
599 | + result = method(self, cursor, *args, **kwargs) |
600 | + finally: |
601 | + cursor.close() |
602 | + self._db.commit() |
603 | + except: |
604 | + self._db.rollback() |
605 | + raise |
606 | + return result |
607 | + return inner |
608 | |
609 | === modified file 'landscape/package/store.py' |
610 | --- landscape/package/store.py 2009-11-25 09:48:00 +0000 |
611 | +++ landscape/package/store.py 2010-04-30 19:04:26 +0000 |
612 | @@ -7,6 +7,7 @@ |
613 | from pysqlite2 import dbapi2 as sqlite3 |
614 | |
615 | from landscape.lib import bpickle |
616 | +from landscape.lib.store import with_cursor |
617 | |
618 | |
619 | class UnknownHashIDRequest(Exception): |
620 | @@ -17,39 +18,6 @@ |
621 | """Raised when trying to add an invalid hash=>id lookaside database.""" |
622 | |
623 | |
624 | -def with_cursor(method): |
625 | - """Decorator that encloses the method in a database transaction. |
626 | - |
627 | - Even though SQLite is supposed to be useful in autocommit mode, we've |
628 | - found cases where the database continued to be locked for writing |
629 | - until the cursor was closed. With this in mind, instead of using |
630 | - the autocommit mode, we explicitly terminate transactions and enforce |
631 | - cursor closing with this decorator. |
632 | - """ |
633 | - |
634 | - def inner(self, *args, **kwargs): |
635 | - if not self._db: |
636 | - # Create the database connection only when we start to actually |
637 | - # use it. This is essentially just a workaroud of a sqlite bug |
638 | - # happening when 2 concurrent processes try to create the tables |
639 | - # around the same time, the one which fails having an incorrect |
640 | - # cache and not seeing the tables |
641 | - self._db = sqlite3.connect(self._filename) |
642 | - self._ensure_schema() |
643 | - try: |
644 | - cursor = self._db.cursor() |
645 | - try: |
646 | - result = method(self, cursor, *args, **kwargs) |
647 | - finally: |
648 | - cursor.close() |
649 | - self._db.commit() |
650 | - except: |
651 | - self._db.rollback() |
652 | - raise |
653 | - return result |
654 | - return inner |
655 | - |
656 | - |
657 | class HashIdStore(object): |
658 | """Persist package hash=>id mappings in a file. |
659 |
Very nice work, just a few comments but overall the branch looks great to me.
[1]
+ def __init__(self, db, id):
Please add @param entries in the class docstring for these two parameters.
[2]
+ @param filename: The file that contains the sqlite database.
Jamu recently pointed that @param docstrings for __init__ method should be actually put in the class docstring, like:
class ExchangeStore( object) :
"""Message meta data required by the L{MessageExchange}.
The implementation uses a SQLite database as backend, with a single table exchange_ schema} .
called "message_context", whose schema is defined in
L{ensure_
@param filename: The file that contains the sqlite database.
"""
because the callable that gets actually used is the class itself (you usually don't call __init__ directly).
[3]
For consistency with other tests and for easier reading please name test cases against the tests the exercise, plus some test specific suffix. A few examples:
test_add_ operationid_ is_unique -> test_add_ message_ context_ with_duplicate_ operation_ id message_ context_ works -> test_get_ message_ context message_ contexts_ works -> test_deleting_ message_ contexts
test_get_
test_deleting_
[4]
+ self._store = ExchangeStore(
It would be better to use dependency injection (that is usually the pattern adopted in the client) and pass an instance of ExchangeStore instead of creating it in the __init__ method. This helps testing and mocking, and make it easier in general to pass objects of other types providing the same API.
For example your test case will then have a self.exchange_store attribute (that you use in to build the MessageExchange object under test) which you can use for performing assertions instead of accessing self.exchanger. _store, which is a private attribute and should be considered an implementation detail.
[5]
+ return 0
Maybe None would be safer, as 0 could be used as message id number.
[6]
+ self.assertIsNot( _store. get_message_ context( message[ 'operation- id']))
+ None,
+ self.exchanger.
I think the test should also check that the created MessageContext has the expected attributes set:
+ message_context = self.exchanger. _store. get_message_ context( message[ 'operation- id']) ls(message_ context. operation_ id, 123456) ls(message_ context. message_ type, "type-R")
+ self.assertEqua
+ self.assertEqua
[7]
+ self.exchanger. send({" type": "resynchronize", "operation-id": 234567})
Assuming that send() returns None in this case, the test should assert it:
+ message_id = self.exchanger. send({" type": "resynchronize", "operation-id": 234567}) message_ id, None)
+ self.assertIs(
[8]
+ logging.info( get('operation- id'))
+ "Response message with operation-id %s was discarded "
+ "because the client's secure ID has changed in the meantime"
+ % message.
The logging is not tested, you can use self.logfile. getvalue( ) for that.