Merge lp:~al-maisan/landscape-client/obsolete-results-v into lp:~landscape/landscape-client/trunk

Proposed by Muharem Hrnjadovic
Status: Merged
Approved by: Thomas Herve
Approved revision: 261
Merged at revision: not available
Proposed branch: lp:~al-maisan/landscape-client/obsolete-results-v
Merge into: lp:~landscape/landscape-client/trunk
Diff against target: 658 lines (+373/-45)
9 files modified
landscape/broker/config.py (+4/-0)
landscape/broker/exchange.py (+42/-1)
landscape/broker/exchangestore.py (+110/-0)
landscape/broker/service.py (+4/-1)
landscape/broker/tests/helpers.py (+5/-1)
landscape/broker/tests/test_exchange.py (+84/-9)
landscape/broker/tests/test_exchangestore.py (+84/-0)
landscape/lib/store.py (+39/-0)
landscape/package/store.py (+1/-33)
To merge this branch: bzr merge lp:~al-maisan/landscape-client/obsolete-results-v
Reviewer Review Type Date Requested Status
Thomas Herve (community) Approve
Free Ekanayaka (community) Approve
Review via email: mp+24405@code.launchpad.net

Description of the change

Obsolete response messages will now be discarded as opposed to being sent to the server.

A response message is considered obsolete if the secure ID has changed since the request message was received.

To post a comment you must log in.
244. By Muharem Hrnjadovic

bzr ls-lint fixes

245. By Muharem Hrnjadovic

Simplified test.

246. By Muharem Hrnjadovic

Fixed doc string.

247. By Muharem Hrnjadovic

Simplified obsolete response logic.

Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

Very nice work, just a few comments but overall the branch looks great to me.

[1]

+ def __init__(self, db, id):

Please add @param entries in the class docstring for these two parameters.

[2]

+ @param filename: The file that contains the sqlite database.

Jamu recently pointed that @param docstrings for __init__ method should be actually put in the class docstring, like:

class ExchangeStore(object):
    """Message meta data required by the L{MessageExchange}.

    The implementation uses a SQLite database as backend, with a single table
    called "message_context", whose schema is defined in
    L{ensure_exchange_schema}.

    @param filename: The file that contains the sqlite database.
    """

because the callable that gets actually used is the class itself (you usually don't call __init__ directly).

[3]

For consistency with other tests and for easier reading please name test cases against the tests the exercise, plus some test specific suffix. A few examples:

test_add_operationid_is_unique -> test_add_message_context_with_duplicate_operation_id
test_get_message_context_works -> test_get_message_context
test_deleting_message_contexts_works -> test_deleting_message_contexts

[4]

+ self._store = ExchangeStore(

It would be better to use dependency injection (that is usually the pattern adopted in the client) and pass an instance of ExchangeStore instead of creating it in the __init__ method. This helps testing and mocking, and make it easier in general to pass objects of other types providing the same API.

For example your test case will then have a self.exchange_store attribute (that you use in to build the MessageExchange object under test) which you can use for performing assertions instead of accessing self.exchanger._store, which is a private attribute and should be considered an implementation detail.

[5]

+ return 0

Maybe None would be safer, as 0 could be used as message id number.

[6]

+ self.assertIsNot(
+ None,
+ self.exchanger._store.get_message_context(message['operation-id']))

I think the test should also check that the created MessageContext has the expected attributes set:

+ message_context = self.exchanger._store.get_message_context(message['operation-id'])
+ self.assertEquals(message_context.operation_id, 123456)
+ self.assertEquals(message_context.message_type, "type-R")

[7]

+ self.exchanger.send({"type": "resynchronize", "operation-id": 234567})

Assuming that send() returns None in this case, the test should assert it:

+ message_id = self.exchanger.send({"type": "resynchronize", "operation-id": 234567})
+ self.assertIs(message_id, None)

[8]

+ logging.info(
+ "Response message with operation-id %s was discarded "
+ "because the client's secure ID has changed in the meantime"
+ % message.get('operation-id'))

The logging is not tested, you can use self.logfile.getvalue() for that.

review: Needs Fixing
248. By Muharem Hrnjadovic

Free's review comments [1-5]

249. By Muharem Hrnjadovic

Free's review comment #6

250. By Muharem Hrnjadovic

Free's review comment #7

251. By Muharem Hrnjadovic

Free's review comment #7

252. By Muharem Hrnjadovic

Free's review comment #8

Revision history for this message
Thomas Herve (therve) wrote :

It looks good!

[1] As it, the message context store will grow for ever. I think you should remove messages from it as soon as you add it to the message store (or discard it).

[2]
+ def _message_is_obsolete(self, message):
+ """True if message is obsolete.

Format "Returns C{True} if message is obsolete."

[3]
+class MessageContext(object):
+ """Stores the secure ID for incoming messages that require a response.

It seems to store more than that now.

[4]
+ """Accessing a C{MessageContext} with an existing
+ C{operation-id} works."""

+ """Calling C{all_operation_ids} on an empty database returns an empty
+ list."""

Please put the docstring on their own lines.

Thanks!

review: Needs Fixing
253. By Muharem Hrnjadovic

Thomas' review comment #1

254. By Muharem Hrnjadovic

Thomas' review comment #2

255. By Muharem Hrnjadovic

Thomas' review comment #3

256. By Muharem Hrnjadovic

Thomas' review comment #4

Revision history for this message
Muharem Hrnjadovic (al-maisan) wrote :
Download full text (3.5 KiB)

On 04/29/2010 05:31 PM, Free Ekanayaka wrote:
> Review: Needs Fixing
Hello Free,

I addressed all of your comments below, please review the changes. An
incremental diff is attached to this message for your convenience.

> Very nice work, just a few comments but overall the branch looks great to me.
>
> [1]
>
> + def __init__(self, db, id):
>
> Please add @param entries in the class docstring for these two parameters.
>
> [2]
>
> + @param filename: The file that contains the sqlite database.
>
> Jamu recently pointed that @param docstrings for __init__ method should be actually put in the class docstring, like:
>
> class ExchangeStore(object):
> """Message meta data required by the L{MessageExchange}.
>
> The implementation uses a SQLite database as backend, with a single table
> called "message_context", whose schema is defined in
> L{ensure_exchange_schema}.
>
> @param filename: The file that contains the sqlite database.
> """
>
> because the callable that gets actually used is the class itself (you usually don't call __init__ directly).
>
> [3]
>
> For consistency with other tests and for easier reading please name test cases against the tests the exercise, plus some test specific suffix. A few examples:
>
> test_add_operationid_is_unique -> test_add_message_context_with_duplicate_operation_id
> test_get_message_context_works -> test_get_message_context
> test_deleting_message_contexts_works -> test_deleting_message_contexts
>
> [4]
>
> + self._store = ExchangeStore(
>
> It would be better to use dependency injection (that is usually the pattern adopted in the client) and pass an instance of ExchangeStore instead of creating it in the __init__ method. This helps testing and mocking, and make it easier in general to pass objects of other types providing the same API.
>
> For example your test case will then have a self.exchange_store attribute (that you use in to build the MessageExchange object under test) which you can use for performing assertions instead of accessing self.exchanger._store, which is a private attribute and should be considered an implementation detail.
>
> [5]
>
> + return 0
>
> Maybe None would be safer, as 0 could be used as message id number.
>
> [6]
>
> + self.assertIsNot(
> + None,
> + self.exchanger._store.get_message_context(message['operation-id']))
>
> I think the test should also check that the created MessageContext has the expected attributes set:
>
> + message_context = self.exchanger._store.get_message_context(message['operation-id'])
> + self.assertEquals(message_context.operation_id, 123456)
> + self.assertEquals(message_context.message_type, "type-R")
>
> [7]
>
> + self.exchanger.send({"type": "resynchronize", "operation-id": 234567})
>
> Assuming that send() returns None in this case, the test should assert it:
>
> + message_id = self.exchanger.send({"type": "resynchronize", "operation-id": 234567})
> + self.assertIs(message_id, None)
>
> [8]
>
> + logging.info(
> + "Response message with operation-id %s was discarded "
> + ...

Read more...

1=== modified file 'landscape/broker/exchange.py'
2--- landscape/broker/exchange.py 2010-04-29 10:59:44 +0000
3+++ landscape/broker/exchange.py 2010-04-29 15:57:29 +0000
4@@ -1,12 +1,10 @@
5 """The part of the broker which deals with communications with the server."""
6-import os
7 import time
8 import logging
9 from landscape.lib.hashlib import md5
10
11 from twisted.internet.defer import succeed
12
13-from landscape.broker.exchangestore import ExchangeStore
14 from landscape.lib.message import got_next_expected, ANCIENT
15 from landscape.log import format_delta
16 from landscape import SERVER_API, CLIENT_API
17@@ -23,7 +21,8 @@
18
19 plugin_name = "message-exchange"
20
21- def __init__(self, reactor, store, transport, registration_info, data_path,
22+ def __init__(self, reactor, store, transport, registration_info,
23+ message_context_store,
24 exchange_interval=60*60,
25 urgent_exchange_interval=10,
26 monitor_interval=None,
27@@ -54,8 +53,7 @@
28 self._client_accepted_types = set()
29 self._client_accepted_types_hash = None
30 self._message_handlers = {}
31- self._store = ExchangeStore(
32- os.path.join(data_path, "exchange.database"))
33+ self._store = message_context_store
34
35 self.register_message("accepted-types", self._handle_accepted_types)
36 self.register_message("resynchronize", self._handle_resynchronize)
37@@ -101,7 +99,7 @@
38 "Response message with operation-id %s was discarded "
39 "because the client's secure ID has changed in the meantime"
40 % message.get('operation-id'))
41- return 0
42+ return None
43 else:
44 if "timestamp" not in message:
45 message["timestamp"] = int(self._reactor.time())
46
47=== modified file 'landscape/broker/exchangestore.py'
48--- landscape/broker/exchangestore.py 2010-04-29 10:57:52 +0000
49+++ landscape/broker/exchangestore.py 2010-04-29 15:57:29 +0000
50@@ -16,6 +16,9 @@
51 which the request message came in and the completion of the request.
52 If the secure ID did change the result message is obolete and will not be
53 sent to the server.
54+
55+ @param db: the sqlite database handle.
56+ @param id: the database key value for this instance.
57 """
58
59 def __init__(self, db, id):
60@@ -47,13 +50,12 @@
61 The implementation uses a SQLite database as backend, with a single table
62 called "message_context", whose schema is defined in
63 L{ensure_exchange_schema}.
64+
65+ @param filename: The name of the file that contains the sqlite database.
66 """
67 _db = None
68
69 def __init__(self, filename):
70- """
71- @param filename: The file that contains the sqlite database.
72- """
73 self._filename = filename
74
75 def _ensure_schema(self):
76
77=== modified file 'landscape/broker/service.py'
78--- landscape/broker/service.py 2010-04-29 09:02:57 +0000
79+++ landscape/broker/service.py 2010-04-29 15:57:29 +0000
80@@ -7,6 +7,7 @@
81 from landscape.broker.config import BrokerConfiguration
82 from landscape.broker.transport import HTTPTransport
83 from landscape.broker.exchange import MessageExchange
84+from landscape.broker.exchangestore import ExchangeStore
85 from landscape.broker.ping import Pinger
86 from landscape.broker.store import get_default_message_store
87 from landscape.broker.server import BrokerServer
88@@ -52,9 +53,11 @@
89 self.message_store = get_default_message_store(
90 self.persist, config.message_store_path)
91 self.identity = Identity(self.config, self.persist)
92+ exchange_store = ExchangeStore(
93+ os.path.join(config.data_path, "exchange.database"))
94 self.exchanger = MessageExchange(
95 self.reactor, self.message_store, self.transport, self.identity,
96- config.data_path, config.exchange_interval,
97+ exchange_store, config.exchange_interval,
98 config.urgent_exchange_interval)
99 self.pinger = self.pinger_factory(self.reactor, config.ping_url,
100 self.identity, self.exchanger)
101
102=== modified file 'landscape/broker/tests/helpers.py'
103--- landscape/broker/tests/helpers.py 2010-04-29 09:02:57 +0000
104+++ landscape/broker/tests/helpers.py 2010-04-29 15:57:29 +0000
105@@ -6,6 +6,7 @@
106 from landscape.reactor import FakeReactor
107 from landscape.broker.transport import FakeTransport
108 from landscape.broker.exchange import MessageExchange
109+from landscape.broker.exchangestore import ExchangeStore
110 from landscape.broker.store import get_default_message_store
111 from landscape.broker.registration import Identity, RegistrationHandler
112 from landscape.broker.ping import Pinger
113@@ -74,9 +75,11 @@
114 test_case.transport = FakeTransport(test_case.config.url,
115 test_case.config.ssl_public_key)
116 test_case.reactor = FakeReactor()
117+ exchange_store = ExchangeStore(
118+ os.path.join(test_case.config.data_path, "exchange.database"))
119 test_case.exchanger = MessageExchange(
120 test_case.reactor, test_case.mstore, test_case.transport,
121- test_case.identity, test_case.config.data_path,
122+ test_case.identity, exchange_store,
123 test_case.config.exchange_interval,
124 test_case.config.urgent_exchange_interval)
125
126
127=== modified file 'landscape/broker/tests/test_exchange.py'
128--- landscape/broker/tests/test_exchange.py 2010-04-29 10:56:27 +0000
129+++ landscape/broker/tests/test_exchange.py 2010-04-29 16:05:18 +0000
130@@ -1,9 +1,11 @@
131+import os
132 from landscape import SERVER_API, CLIENT_API
133 from landscape.lib.persist import Persist
134 from landscape.lib.hashlib import md5
135 from landscape.lib.fetch import fetch_async
136 from landscape.schema import Message, Int
137 from landscape.broker.exchange import get_accepted_types_diff, MessageExchange
138+from landscape.broker.exchangestore import ExchangeStore
139 from landscape.broker.transport import FakeTransport
140 from landscape.broker.store import MessageStore
141 from landscape.broker.ping import Pinger
142@@ -271,8 +273,10 @@
143 Immediately after registration, an urgent exchange should be scheduled.
144 """
145 transport = FakeTransport()
146+ exchange_store = ExchangeStore(
147+ os.path.join(self.config.data_path, "exchange.database"))
148 exchanger = MessageExchange(self.reactor, self.mstore, transport,
149- self.identity, self.config.data_path)
150+ self.identity, exchange_store)
151 exchanger.start()
152 self.wait_for_exchange(urgent=True)
153 self.assertEquals(len(transport.payloads), 1)
154@@ -498,8 +502,10 @@
155 the total-messages is equivalent to the total number of messages
156 pending.
157 """
158+ exchange_store = ExchangeStore(
159+ os.path.join(self.config.data_path, "exchange.database"))
160 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,
161- self.identity, self.config.data_path,
162+ self.identity, exchange_store,
163 max_messages=1)
164 self.mstore.set_accepted_types(["empty"])
165 self.mstore.add({"type": "empty"})
166@@ -528,8 +534,10 @@
167 # We create our own MessageExchange because the one set up by the text
168 # fixture has an urgent exchange interval of 10 seconds, which makes
169 # testing this awkward.
170+ exchange_store = ExchangeStore(
171+ os.path.join(self.config.data_path, "exchange.database"))
172 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,
173- self.identity, self.config.data_path,
174+ self.identity, exchange_store,
175 urgent_exchange_interval=20)
176 exchanger.schedule_exchange(urgent=True)
177 events = []
178@@ -546,8 +554,10 @@
179 should be cancelled and a new one should be scheduled for 10 seconds
180 before the new urgent exchange.
181 """
182+ exchange_store = ExchangeStore(
183+ os.path.join(self.config.data_path, "exchange.database"))
184 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,
185- self.identity, self.config.data_path,
186+ self.identity, exchange_store,
187 urgent_exchange_interval=20)
188 events = []
189 self.reactor.call_on("impending-exchange", lambda: events.append(True))
190@@ -799,6 +809,9 @@
191 self.assertIsNot(
192 None,
193 self.exchanger._store.get_message_context(message['operation-id']))
194+ message_context = self.exchanger._store.get_message_context(message['operation-id'])
195+ self.assertEquals(message_context.operation_id, 123456)
196+ self.assertEquals(message_context.message_type, "type-R")
197
198 def test_one_way_messages_do_not_have_their_context_stored(self):
199 """
200@@ -833,11 +846,17 @@
201 self.identity.secure_id = 'brand-new'
202
203 self.mstore.set_accepted_types(["resynchronize"])
204- self.exchanger.send({"type": "resynchronize", "operation-id": 234567})
205+ message_id = self.exchanger.send(
206+ {"type": "resynchronize", "operation-id": 234567})
207 self.exchanger.exchange()
208 self.assertEquals(2, len(self.transport.payloads))
209 messages = self.transport.payloads[1]["messages"]
210 self.assertEquals([], messages)
211+ self.assertIs(None, message_id)
212+ expected_log_entry = (
213+ "Response message with operation-id 234567 was discarded because "
214+ "the client's secure ID has changed in the meantime")
215+ self.assertTrue(expected_log_entry in self.logfile.getvalue())
216
217
218 class AcceptedTypesMessageExchangeTest(LandscapeTest):
219
220=== modified file 'landscape/broker/tests/test_exchangestore.py'
221--- landscape/broker/tests/test_exchangestore.py 2010-04-29 10:30:28 +0000
222+++ landscape/broker/tests/test_exchangestore.py 2010-04-29 15:57:29 +0000
223@@ -38,14 +38,14 @@
224 self.assertEquals('change-packages', row[2])
225 self.assertTrue(row[3] > now)
226
227- def test_add_operationid_is_unique(self):
228+ def test_add_message_context_with_duplicate_operation_id(self):
229 """Only one message context with a given operation-id is permitted."""
230 self.store1.add_message_context(123, 'abc', 'change-packages')
231 self.assertRaises(
232 sqlite3.IntegrityError,
233 self.store1.add_message_context, 123, 'def', 'change-packages')
234
235- def test_get_message_context_works(self):
236+ def test_get_message_context(self):
237 """Accessing a C{MessageContext} with an existing
238 C{operation-id} works."""
239 now = time.time()
240@@ -61,7 +61,7 @@
241 C{operation-id} result in C{None}."""
242 self.assertIs(None, self.store1.get_message_context(999))
243
244- def test_deleting_message_contexts_works(self):
245+ def test_message_context_remove(self):
246 """C{MessageContext}s are deleted correctly."""
247 context = self.store1.add_message_context(
248 345, 'opq', 'change-packages')
Revision history for this message
Muharem Hrnjadovic (al-maisan) wrote :

On 04/30/2010 09:54 AM, Thomas Herve wrote:
> Review: Needs Fixing
> It looks good!
Hello Thomas,

thank you very much for the review and especially for the first catch below.
I have addressed all of your comments below, please review the changes. An
incremental diff is attached to this message for your convenience.

> [1] As it, the message context store will grow for ever. I think you should remove messages from it as soon as you add it to the message store (or discard it).
>
> [2]
> + def _message_is_obsolete(self, message):
> + """True if message is obsolete.
>
> Format "Returns C{True} if message is obsolete."
>
> [3]
> +class MessageContext(object):
> + """Stores the secure ID for incoming messages that require a response.
>
> It seems to store more than that now.
>
> [4]
> + """Accessing a C{MessageContext} with an existing
> + C{operation-id} works."""
>
>
> + """Calling C{all_operation_ids} on an empty database returns an empty
> + list."""
>
> Please put the docstring on their own lines.
>
>
> Thanks!

Best regards

--
Muharem Hrnjadovic <email address hidden>
Public key id : B2BBFCFC
Key fingerprint : A5A3 CC67 2B87 D641 103F 5602 219F 6B60 B2BB FCFC

1=== modified file 'landscape/broker/exchange.py'
2--- landscape/broker/exchange.py 2010-04-29 15:57:29 +0000
3+++ landscape/broker/exchange.py 2010-04-30 08:32:27 +0000
4@@ -66,7 +66,7 @@
5 return (self._urgent_exchange_interval, self._exchange_interval)
6
7 def _message_is_obsolete(self, message):
8- """True if message is obsolete.
9+ """Returns C{True} if message is obsolete.
10
11 A message is considered obsolete if the secure ID changed since it was
12 received.
13@@ -84,7 +84,10 @@
14
15 # Compare the current secure ID with the one that was in effect when
16 # the request message was received.
17- return self._registration_info.secure_id != context.secure_id
18+ result = self._registration_info.secure_id != context.secure_id
19+ context.remove()
20+
21+ return result
22
23 def send(self, message, urgent=False):
24 """Include a message to be sent in an exchange.
25
26=== modified file 'landscape/broker/exchangestore.py'
27--- landscape/broker/exchangestore.py 2010-04-29 15:57:29 +0000
28+++ landscape/broker/exchangestore.py 2010-04-30 08:35:46 +0000
29@@ -10,7 +10,14 @@
30
31
32 class MessageContext(object):
33- """Stores the secure ID for incoming messages that require a response.
34+ """Stores a context for incoming messages that require a response.
35+
36+ The context consists of
37+
38+ - the "operation-id" value
39+ - the secure ID that was in effect when the message was received
40+ - the message type
41+ - the time when the message was received
42
43 This data will be used to detect secure ID changes between the time at
44 which the request message came in and the completion of the request.
45
46=== modified file 'landscape/broker/tests/test_exchange.py'
47--- landscape/broker/tests/test_exchange.py 2010-04-29 16:05:18 +0000
48+++ landscape/broker/tests/test_exchange.py 2010-04-30 08:29:00 +0000
49@@ -844,6 +844,7 @@
50
51 # Change the secure ID so that the response message gets discarded.
52 self.identity.secure_id = 'brand-new'
53+ ids_before = self.exchanger._store.all_operation_ids()
54
55 self.mstore.set_accepted_types(["resynchronize"])
56 message_id = self.exchanger.send(
57@@ -858,6 +859,11 @@
58 "the client's secure ID has changed in the meantime")
59 self.assertTrue(expected_log_entry in self.logfile.getvalue())
60
61+ # The MessageContext was removed after utilisation.
62+ ids_after = self.exchanger._store.all_operation_ids()
63+ self.assertTrue(len(ids_after) == len(ids_before) - 1)
64+ self.assertFalse('234567' in ids_after)
65+
66
67 class AcceptedTypesMessageExchangeTest(LandscapeTest):
68
69
70=== modified file 'landscape/broker/tests/test_exchangestore.py'
71--- landscape/broker/tests/test_exchangestore.py 2010-04-29 15:57:29 +0000
72+++ landscape/broker/tests/test_exchangestore.py 2010-04-30 08:37:03 +0000
73@@ -46,8 +46,9 @@
74 self.store1.add_message_context, 123, 'def', 'change-packages')
75
76 def test_get_message_context(self):
77- """Accessing a C{MessageContext} with an existing
78- C{operation-id} works."""
79+ """
80+ Accessing a C{MessageContext} with an existing C{operation-id} works.
81+ """
82 now = time.time()
83 self.store1.add_message_context(234, 'bcd', 'change-packages')
84 context = self.store2.get_message_context(234)
85@@ -69,8 +70,10 @@
86 self.assertIs(None, self.store1.get_message_context(345))
87
88 def test_all_operation_ids_for_empty_database(self):
89- """Calling C{all_operation_ids} on an empty database returns an empty
90- list."""
91+ """
92+ Calling C{all_operation_ids} on an empty database returns an empty
93+ list.
94+ """
95 self.assertEquals([], self.store1.all_operation_ids())
96
97 def test_all_operation_ids(self):
Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

[9]

+ message_context_store,

Please name this parameter "exchange_store", so it matches the name of the expected type (ExchangeStore) and it is shorter. Also, now that we have it, the ExchangeStore could be used to store also other types of information beside MessageContext's, so it makes sense to use a generic name for it.

+ self._store = message_context_store

Similarly please name this attribute "self._exchange_store", to make it clear which store is referring to (there is another attribute called "self._message_store", so the naming will be consistent).

[10]

+ else:
+ if "timestamp" not in message:
+ message["timestamp"] = int(self._reactor.time())
+ message_id = self._message_store.add(message)
+ if urgent:
+ self.schedule_exchange(urgent=True)
+ return message_id

I think you can save this else/indentation level and simply write:

+ if "timestamp" not in message:
+ message["timestamp"] = int(self._reactor.time())
+ message_id = self._message_store.add(message)
+ if urgent:
+ self.schedule_exchange(urgent=True)
+ return message_id

I personally like the pattern when you check a series of early-return conditions, before reaching the meat of a function/method, like

if this:
   return something
if that:
   return something_else

do_the_actual_thing

[11]

+ exchange_store = ExchangeStore(
+ os.path.join(config.data_path, "exchange.database"))

Just to avoid hard-coding this value you could add a property to l.broker.config.BrokerConfiguration, like:

@property
def exchange_store_path(self):
    return os.path.join(self.data_path, "exchange.database"))

and use it as

+ exchange_store = ExchangeStore(self.config.exchange_store_path)

[12]

+ exchange_store = ExchangeStore(
+ os.path.join(test_case.config.data_path, "exchange.database"))

Please add this object to the test_case attributes, so it can be used in tests:

+ test_case.exchange_store = ExchangeStore(
+ os.path.join(test_case.config.data_path, "exchange.database"))

after that you can avoid creating it explicitly in all the tests.

[13]

+ self.exchanger._store.get_message_context(message['operation-id']))

Once you have self.exchange_store as test attribute (see [12]) you should use it instead of accessing this private attribute.

Thanks!

Revision history for this message
Muharem Hrnjadovic (al-maisan) wrote :

Hello Free,

thank you very much for these comments, I really appreciate it.
I have taken care of all of them, please see the attached incremental
diff.

On 04/30/2010 11:14 AM, Free Ekanayaka wrote:
> [9]
>
> + message_context_store,
>
> Please name this parameter "exchange_store", so it matches the name of the expected type (ExchangeStore) and it is shorter. Also, now that we have it, the ExchangeStore could be used to store also other types of information beside MessageContext's, so it makes sense to use a generic name for it.
>
> + self._store = message_context_store
>
> Similarly please name this attribute "self._exchange_store", to make it clear which store is referring to (there is another attribute called "self._message_store", so the naming will be consistent).
>
> [10]
>
> + else:
> + if "timestamp" not in message:
> + message["timestamp"] = int(self._reactor.time())
> + message_id = self._message_store.add(message)
> + if urgent:
> + self.schedule_exchange(urgent=True)
> + return message_id
>
> I think you can save this else/indentation level and simply write:
>
> + if "timestamp" not in message:
> + message["timestamp"] = int(self._reactor.time())
> + message_id = self._message_store.add(message)
> + if urgent:
> + self.schedule_exchange(urgent=True)
> + return message_id
>
> I personally like the pattern when you check a series of early-return conditions, before reaching the meat of a function/method, like
>
> if this:
> return something
> if that:
> return something_else
>
> do_the_actual_thing
>
> [11]
>
> + exchange_store = ExchangeStore(
> + os.path.join(config.data_path, "exchange.database"))
>
> Just to avoid hard-coding this value you could add a property to l.broker.config.BrokerConfiguration, like:
>
> @property
> def exchange_store_path(self):
> return os.path.join(self.data_path, "exchange.database"))
>
> and use it as
>
> + exchange_store = ExchangeStore(self.config.exchange_store_path)
>
> [12]
>
> + exchange_store = ExchangeStore(
> + os.path.join(test_case.config.data_path, "exchange.database"))
>
> Please add this object to the test_case attributes, so it can be used in tests:
>
> + test_case.exchange_store = ExchangeStore(
> + os.path.join(test_case.config.data_path, "exchange.database"))
>
> after that you can avoid creating it explicitly in all the tests.
>
> [13]
>
> + self.exchanger._store.get_message_context(message['operation-id']))
>
> Once you have self.exchange_store as test attribute (see [12]) you should use it instead of accessing this private attribute.
>
> Thanks!

Best regards

--
Muharem Hrnjadovic <email address hidden>
Public key id : B2BBFCFC
Key fingerprint : A5A3 CC67 2B87 D641 103F 5602 219F 6B60 B2BB FCFC

1=== modified file 'landscape/broker/config.py'
2--- landscape/broker/config.py 2010-01-06 15:11:24 +0000
3+++ landscape/broker/config.py 2010-04-30 10:39:07 +0000
4@@ -18,6 +18,10 @@
5 self._original_http_proxy = os.environ.get("http_proxy")
6 self._original_https_proxy = os.environ.get("https_proxy")
7
8+ @property
9+ def exchange_store_path(self):
10+ return os.path.join(self.data_path, "exchange.database")
11+
12 def make_parser(self):
13 """Parser factory for broker-specific options.
14
15
16=== modified file 'landscape/broker/exchange.py'
17--- landscape/broker/exchange.py 2010-04-30 08:32:42 +0000
18+++ landscape/broker/exchange.py 2010-04-30 10:32:42 +0000
19@@ -22,7 +22,7 @@
20 plugin_name = "message-exchange"
21
22 def __init__(self, reactor, store, transport, registration_info,
23- message_context_store,
24+ exchange_store,
25 exchange_interval=60*60,
26 urgent_exchange_interval=10,
27 monitor_interval=None,
28@@ -53,7 +53,7 @@
29 self._client_accepted_types = set()
30 self._client_accepted_types_hash = None
31 self._message_handlers = {}
32- self._store = message_context_store
33+ self._exchange_store = exchange_store
34
35 self.register_message("accepted-types", self._handle_accepted_types)
36 self.register_message("resynchronize", self._handle_resynchronize)
37@@ -75,7 +75,7 @@
38 return False
39
40 operation_id = message['operation-id']
41- context = self._store.get_message_context(operation_id)
42+ context = self._exchange_store.get_message_context(operation_id)
43 if context is None:
44 logging.warning(
45 "No message context for message with operation-id: %s"
46@@ -103,13 +103,13 @@
47 "because the client's secure ID has changed in the meantime"
48 % message.get('operation-id'))
49 return None
50- else:
51- if "timestamp" not in message:
52- message["timestamp"] = int(self._reactor.time())
53- message_id = self._message_store.add(message)
54- if urgent:
55- self.schedule_exchange(urgent=True)
56- return message_id
57+
58+ if "timestamp" not in message:
59+ message["timestamp"] = int(self._reactor.time())
60+ message_id = self._message_store.add(message)
61+ if urgent:
62+ self.schedule_exchange(urgent=True)
63+ return message_id
64
65 def start(self):
66 """Start scheduling exchanges. The first one will be urgent."""
67@@ -400,7 +400,7 @@
68 if 'operation-id' in message:
69 # This is a message that requires a response. Store the secure ID
70 # so we can check for obsolete results later.
71- self._store.add_message_context(
72+ self._exchange_store.add_message_context(
73 message['operation-id'], self._registration_info.secure_id,
74 message['type'])
75
76
77=== modified file 'landscape/broker/service.py'
78--- landscape/broker/service.py 2010-04-29 15:57:29 +0000
79+++ landscape/broker/service.py 2010-04-30 10:39:43 +0000
80@@ -53,8 +53,7 @@
81 self.message_store = get_default_message_store(
82 self.persist, config.message_store_path)
83 self.identity = Identity(self.config, self.persist)
84- exchange_store = ExchangeStore(
85- os.path.join(config.data_path, "exchange.database"))
86+ exchange_store = ExchangeStore(self.config.exchange_store_path)
87 self.exchanger = MessageExchange(
88 self.reactor, self.message_store, self.transport, self.identity,
89 exchange_store, config.exchange_interval,
90
91=== modified file 'landscape/broker/tests/helpers.py'
92--- landscape/broker/tests/helpers.py 2010-04-29 15:57:29 +0000
93+++ landscape/broker/tests/helpers.py 2010-04-30 10:44:26 +0000
94@@ -75,11 +75,11 @@
95 test_case.transport = FakeTransport(test_case.config.url,
96 test_case.config.ssl_public_key)
97 test_case.reactor = FakeReactor()
98- exchange_store = ExchangeStore(
99+ test_case.exchange_store = ExchangeStore(
100 os.path.join(test_case.config.data_path, "exchange.database"))
101 test_case.exchanger = MessageExchange(
102 test_case.reactor, test_case.mstore, test_case.transport,
103- test_case.identity, exchange_store,
104+ test_case.identity, test_case.exchange_store,
105 test_case.config.exchange_interval,
106 test_case.config.urgent_exchange_interval)
107
108
109=== modified file 'landscape/broker/tests/test_exchange.py'
110--- landscape/broker/tests/test_exchange.py 2010-04-30 08:29:41 +0000
111+++ landscape/broker/tests/test_exchange.py 2010-04-30 10:48:10 +0000
112@@ -273,10 +273,8 @@
113 Immediately after registration, an urgent exchange should be scheduled.
114 """
115 transport = FakeTransport()
116- exchange_store = ExchangeStore(
117- os.path.join(self.config.data_path, "exchange.database"))
118 exchanger = MessageExchange(self.reactor, self.mstore, transport,
119- self.identity, exchange_store)
120+ self.identity, self.exchange_store)
121 exchanger.start()
122 self.wait_for_exchange(urgent=True)
123 self.assertEquals(len(transport.payloads), 1)
124@@ -502,10 +500,8 @@
125 the total-messages is equivalent to the total number of messages
126 pending.
127 """
128- exchange_store = ExchangeStore(
129- os.path.join(self.config.data_path, "exchange.database"))
130 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,
131- self.identity, exchange_store,
132+ self.identity, self.exchange_store,
133 max_messages=1)
134 self.mstore.set_accepted_types(["empty"])
135 self.mstore.add({"type": "empty"})
136@@ -534,10 +530,8 @@
137 # We create our own MessageExchange because the one set up by the text
138 # fixture has an urgent exchange interval of 10 seconds, which makes
139 # testing this awkward.
140- exchange_store = ExchangeStore(
141- os.path.join(self.config.data_path, "exchange.database"))
142 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,
143- self.identity, exchange_store,
144+ self.identity, self.exchange_store,
145 urgent_exchange_interval=20)
146 exchanger.schedule_exchange(urgent=True)
147 events = []
148@@ -554,10 +548,8 @@
149 should be cancelled and a new one should be scheduled for 10 seconds
150 before the new urgent exchange.
151 """
152- exchange_store = ExchangeStore(
153- os.path.join(self.config.data_path, "exchange.database"))
154 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,
155- self.identity, exchange_store,
156+ self.identity, self.exchange_store,
157 urgent_exchange_interval=20)
158 events = []
159 self.reactor.call_on("impending-exchange", lambda: events.append(True))
160@@ -808,8 +800,9 @@
161 [message] = messages
162 self.assertIsNot(
163 None,
164- self.exchanger._store.get_message_context(message['operation-id']))
165- message_context = self.exchanger._store.get_message_context(message['operation-id'])
166+ self.exchange_store.get_message_context(message['operation-id']))
167+ message_context = self.exchange_store.get_message_context(
168+ message['operation-id'])
169 self.assertEquals(message_context.operation_id, 123456)
170 self.assertEquals(message_context.message_type, "type-R")
171
172@@ -818,14 +811,14 @@
173 Incoming messages without an 'operation-id' key will *not* have the
174 secure id stored in the L{ExchangeStore}.
175 """
176- ids_before = self.exchanger._store.all_operation_ids()
177+ ids_before = self.exchange_store.all_operation_ids()
178
179 msg = {"type": "type-R", "whatever": 5678}
180 server_message = [msg]
181 self.transport.responses.append(server_message)
182 self.exchanger.exchange()
183
184- ids_after = self.exchanger._store.all_operation_ids()
185+ ids_after = self.exchange_store.all_operation_ids()
186 self.assertEquals(ids_before, ids_after)
187
188 def test_obsolete_response_messages_are_discarded(self):
189@@ -844,7 +837,7 @@
190
191 # Change the secure ID so that the response message gets discarded.
192 self.identity.secure_id = 'brand-new'
193- ids_before = self.exchanger._store.all_operation_ids()
194+ ids_before = self.exchange_store.all_operation_ids()
195
196 self.mstore.set_accepted_types(["resynchronize"])
197 message_id = self.exchanger.send(
198@@ -860,7 +853,7 @@
199 self.assertTrue(expected_log_entry in self.logfile.getvalue())
200
201 # The MessageContext was removed after utilisation.
202- ids_after = self.exchanger._store.all_operation_ids()
203+ ids_after = self.exchange_store.all_operation_ids()
204 self.assertTrue(len(ids_after) == len(ids_before) - 1)
205 self.assertFalse('234567' in ids_after)
206
257. By Muharem Hrnjadovic

Free's review comments [9,10]

258. By Muharem Hrnjadovic

Free's review comment #11

259. By Muharem Hrnjadovic

Free's review comment #12

260. By Muharem Hrnjadovic

Free's review comments [12,13]

Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

The branch looks great! Thanks for addressing all the comments. +1

[14]

+ test_case.exchange_store = ExchangeStore(
+ os.path.join(test_case.config.data_path, "exchange.database"))

You can now use BrokerConfiguration.exchange_store_path:

+ test_case.exchange_store = ExchangeStore(
+ test_case.config.exchange_store_path)

review: Approve
261. By Muharem Hrnjadovic

Free's review comment #14

Revision history for this message
Thomas Herve (therve) wrote :

[5] Some pyflakes:
landscape/broker/tests/test_exchange.py:1: 'os' imported but unused
landscape/broker/tests/test_exchange.py:8: 'ExchangeStore' imported but unused

[6] get_message_context should retrieve all the data from the database in one query, and pass it to the MessageContext __init__, thus removing the select in there.

+1!

review: Approve
262. By Muharem Hrnjadovic

Thomas' review comment #5

Revision history for this message
Muharem Hrnjadovic (al-maisan) wrote :

On 04/30/2010 04:59 PM, Thomas Herve wrote:
> Review: Approve
> [5] Some pyflakes:
> landscape/broker/tests/test_exchange.py:1: 'os' imported but unused
> landscape/broker/tests/test_exchange.py:8: 'ExchangeStore' imported but unused
>
> [6] get_message_context should retrieve all the data from the database in one query, and pass it to the MessageContext __init__, thus removing the select in there.

Hello Thomas,

thanks for the suggestions above! I have revised the code accordingly.
Please see the attached incremental diff.

Best regards

--
Muharem Hrnjadovic <email address hidden>
Public key id : B2BBFCFC
Key fingerprint : A5A3 CC67 2B87 D641 103F 5602 219F 6B60 B2BB FCFC

1=== modified file 'landscape/broker/exchangestore.py'
2--- landscape/broker/exchangestore.py 2010-04-30 08:36:15 +0000
3+++ landscape/broker/exchangestore.py 2010-04-30 18:11:31 +0000
4@@ -28,27 +28,18 @@
5 @param id: the database key value for this instance.
6 """
7
8- def __init__(self, db, id):
9+ def __init__(self, db, operation_id, secure_id, message_type, timestamp):
10 self._db = db
11- self.id = id
12-
13- cursor = db.cursor()
14- try:
15- cursor.execute(
16- "SELECT operation_id, secure_id, message_type, timestamp "
17- "FROM message_context WHERE id=?", (id,))
18- row = cursor.fetchone()
19- finally:
20- cursor.close()
21-
22- self.operation_id = row[0]
23- self.secure_id = row[1]
24- self.message_type = row[2]
25- self.timestamp = row[3]
26+ self.operation_id = operation_id
27+ self.secure_id = secure_id
28+ self.message_type = message_type
29+ self.timestamp = timestamp
30
31 @with_cursor
32 def remove(self, cursor):
33- cursor.execute("DELETE FROM message_context WHERE id=?", (self.id,))
34+ cursor.execute(
35+ "DELETE FROM message_context WHERE operation_id=?",
36+ (self.operation_id,))
37
38
39 class ExchangeStore(object):
40@@ -72,21 +63,21 @@
41 def add_message_context(
42 self, cursor, operation_id, secure_id, message_type):
43 """Add a L{MessageContext} with the given data."""
44+ params = (operation_id, secure_id, message_type, time.time())
45 cursor.execute(
46 "INSERT INTO message_context "
47 " (operation_id, secure_id, message_type, timestamp) "
48- " VALUES (?,?,?,?)",
49- (operation_id, secure_id, message_type, time.time()))
50- return MessageContext(self._db, cursor.lastrowid)
51+ " VALUES (?,?,?,?)", params)
52+ return MessageContext(self._db, *params)
53
54 @with_cursor
55 def get_message_context(self, cursor, operation_id):
56 """The L{MessageContext} for the given C{operation_id} or C{None}."""
57 cursor.execute(
58- "SELECT id FROM message_context WHERE operation_id=?",
59- (operation_id,))
60- result = cursor.fetchone()
61- return MessageContext(self._db, result[0]) if result else None
62+ "SELECT operation_id, secure_id, message_type, timestamp "
63+ "FROM message_context WHERE operation_id=?", (operation_id,))
64+ row = cursor.fetchone()
65+ return MessageContext(self._db, *row) if row else None
66
67 @with_cursor
68 def all_operation_ids(self, cursor):
69
70=== modified file 'landscape/broker/tests/test_exchange.py'
71--- landscape/broker/tests/test_exchange.py 2010-04-30 10:48:32 +0000
72+++ landscape/broker/tests/test_exchange.py 2010-04-30 15:54:23 +0000
73@@ -1,11 +1,9 @@
74-import os
75 from landscape import SERVER_API, CLIENT_API
76 from landscape.lib.persist import Persist
77 from landscape.lib.hashlib import md5
78 from landscape.lib.fetch import fetch_async
79 from landscape.schema import Message, Int
80 from landscape.broker.exchange import get_accepted_types_diff, MessageExchange
81-from landscape.broker.exchangestore import ExchangeStore
82 from landscape.broker.transport import FakeTransport
83 from landscape.broker.store import MessageStore
84 from landscape.broker.ping import Pinger
263. By Muharem Hrnjadovic

Thomas' review comment #6

264. By Muharem Hrnjadovic

Made code more succinct

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'landscape/broker/config.py'
2--- landscape/broker/config.py 2010-01-06 15:11:24 +0000
3+++ landscape/broker/config.py 2010-04-30 19:04:26 +0000
4@@ -18,6 +18,10 @@
5 self._original_http_proxy = os.environ.get("http_proxy")
6 self._original_https_proxy = os.environ.get("https_proxy")
7
8+ @property
9+ def exchange_store_path(self):
10+ return os.path.join(self.data_path, "exchange.database")
11+
12 def make_parser(self):
13 """Parser factory for broker-specific options.
14
15
16=== modified file 'landscape/broker/exchange.py'
17--- landscape/broker/exchange.py 2010-01-22 11:51:19 +0000
18+++ landscape/broker/exchange.py 2010-04-30 19:04:26 +0000
19@@ -22,6 +22,7 @@
20 plugin_name = "message-exchange"
21
22 def __init__(self, reactor, store, transport, registration_info,
23+ exchange_store,
24 exchange_interval=60*60,
25 urgent_exchange_interval=10,
26 monitor_interval=None,
27@@ -52,6 +53,7 @@
28 self._client_accepted_types = set()
29 self._client_accepted_types_hash = None
30 self._message_handlers = {}
31+ self._exchange_store = exchange_store
32
33 self.register_message("accepted-types", self._handle_accepted_types)
34 self.register_message("resynchronize", self._handle_resynchronize)
35@@ -63,6 +65,30 @@
36 """Return a binary tuple with urgent and normal exchange intervals."""
37 return (self._urgent_exchange_interval, self._exchange_interval)
38
39+ def _message_is_obsolete(self, message):
40+ """Returns C{True} if message is obsolete.
41+
42+ A message is considered obsolete if the secure ID changed since it was
43+ received.
44+ """
45+ if 'operation-id' not in message:
46+ return False
47+
48+ operation_id = message['operation-id']
49+ context = self._exchange_store.get_message_context(operation_id)
50+ if context is None:
51+ logging.warning(
52+ "No message context for message with operation-id: %s"
53+ % operation_id)
54+ return False
55+
56+ # Compare the current secure ID with the one that was in effect when
57+ # the request message was received.
58+ result = self._registration_info.secure_id != context.secure_id
59+ context.remove()
60+
61+ return result
62+
63 def send(self, message, urgent=False):
64 """Include a message to be sent in an exchange.
65
66@@ -71,6 +97,13 @@
67
68 @param message: Same as in L{MessageStore.add}.
69 """
70+ if self._message_is_obsolete(message):
71+ logging.info(
72+ "Response message with operation-id %s was discarded "
73+ "because the client's secure ID has changed in the meantime"
74+ % message.get('operation-id'))
75+ return None
76+
77 if "timestamp" not in message:
78 message["timestamp"] = int(self._reactor.time())
79 message_id = self._message_store.add(message)
80@@ -300,7 +333,8 @@
81 @param result: The response got in reply to the C{payload}.
82 """
83 message_store = self._message_store
84- self._client_accepted_types_hash = result.get("client-accepted-types-hash")
85+ self._client_accepted_types_hash = result.get(
86+ "client-accepted-types-hash")
87 next_expected = result.get("next-expected-sequence")
88 old_sequence = message_store.get_sequence()
89 if next_expected is None:
90@@ -363,6 +397,13 @@
91 Any message handlers registered with L{register_message} will
92 be called.
93 """
94+ if 'operation-id' in message:
95+ # This is a message that requires a response. Store the secure ID
96+ # so we can check for obsolete results later.
97+ self._exchange_store.add_message_context(
98+ message['operation-id'], self._registration_info.secure_id,
99+ message['type'])
100+
101 self._reactor.fire("message", message)
102 # This has plan interference! but whatever.
103 if message["type"] in self._message_handlers:
104
105=== added file 'landscape/broker/exchangestore.py'
106--- landscape/broker/exchangestore.py 1970-01-01 00:00:00 +0000
107+++ landscape/broker/exchangestore.py 2010-04-30 19:04:26 +0000
108@@ -0,0 +1,110 @@
109+"""Provide access to the persistent data used by the L{MessageExchange}."""
110+import time
111+
112+try:
113+ import sqlite3
114+except ImportError:
115+ from pysqlite2 import dbapi2 as sqlite3
116+
117+from landscape.lib.store import with_cursor
118+
119+
120+class MessageContext(object):
121+ """Stores a context for incoming messages that require a response.
122+
123+ The context consists of
124+
125+ - the "operation-id" value
126+ - the secure ID that was in effect when the message was received
127+ - the message type
128+ - the time when the message was received
129+
130+ This data will be used to detect secure ID changes between the time at
131+ which the request message came in and the completion of the request.
132+ If the secure ID did change the result message is obolete and will not be
133+ sent to the server.
134+
135+ @param db: the sqlite database handle.
136+ @param id: the database key value for this instance.
137+ """
138+
139+ def __init__(self, db, operation_id, secure_id, message_type, timestamp):
140+ self._db = db
141+ self.operation_id = operation_id
142+ self.secure_id = secure_id
143+ self.message_type = message_type
144+ self.timestamp = timestamp
145+
146+ @with_cursor
147+ def remove(self, cursor):
148+ cursor.execute(
149+ "DELETE FROM message_context WHERE operation_id=?",
150+ (self.operation_id,))
151+
152+
153+class ExchangeStore(object):
154+ """Message meta data required by the L{MessageExchange}.
155+
156+ The implementation uses a SQLite database as backend, with a single table
157+ called "message_context", whose schema is defined in
158+ L{ensure_exchange_schema}.
159+
160+ @param filename: The name of the file that contains the sqlite database.
161+ """
162+ _db = None
163+
164+ def __init__(self, filename):
165+ self._filename = filename
166+
167+ def _ensure_schema(self):
168+ ensure_exchange_schema(self._db)
169+
170+ @with_cursor
171+ def add_message_context(
172+ self, cursor, operation_id, secure_id, message_type):
173+ """Add a L{MessageContext} with the given data."""
174+ params = (operation_id, secure_id, message_type, time.time())
175+ cursor.execute(
176+ "INSERT INTO message_context "
177+ " (operation_id, secure_id, message_type, timestamp) "
178+ " VALUES (?,?,?,?)", params)
179+ return MessageContext(self._db, *params)
180+
181+ @with_cursor
182+ def get_message_context(self, cursor, operation_id):
183+ """The L{MessageContext} for the given C{operation_id} or C{None}."""
184+ cursor.execute(
185+ "SELECT operation_id, secure_id, message_type, timestamp "
186+ "FROM message_context WHERE operation_id=?", (operation_id,))
187+ row = cursor.fetchone()
188+ return MessageContext(self._db, *row) if row else None
189+
190+ @with_cursor
191+ def all_operation_ids(self, cursor):
192+ """Return all operation IDs currently stored in C{message_context}."""
193+ cursor.execute("SELECT operation_id FROM message_context")
194+ result = cursor.fetchall()
195+ return [row[0] for row in result]
196+
197+
198+def ensure_exchange_schema(db):
199+ """Create all tables needed by a L{ExchangeStore}.
200+
201+ @param db: A connection to a SQLite database.
202+ """
203+ cursor = db.cursor()
204+ try:
205+ cursor.execute(
206+ "CREATE TABLE message_context"
207+ " (id INTEGER PRIMARY KEY, timestamp TIMESTAMP, "
208+ " secure_id TEXT NOT NULL, operation_id INTEGER NOT NULL, "
209+ " message_type text NOT NULL)")
210+ cursor.execute(
211+ "CREATE UNIQUE INDEX msgctx_operationid_idx ON "
212+ "message_context(operation_id)")
213+ except (sqlite3.OperationalError, sqlite3.DatabaseError):
214+ cursor.close()
215+ db.rollback()
216+ else:
217+ cursor.close()
218+ db.commit()
219
220=== modified file 'landscape/broker/service.py'
221--- landscape/broker/service.py 2010-04-23 11:37:25 +0000
222+++ landscape/broker/service.py 2010-04-30 19:04:26 +0000
223@@ -7,6 +7,7 @@
224 from landscape.broker.config import BrokerConfiguration
225 from landscape.broker.transport import HTTPTransport
226 from landscape.broker.exchange import MessageExchange
227+from landscape.broker.exchangestore import ExchangeStore
228 from landscape.broker.ping import Pinger
229 from landscape.broker.store import get_default_message_store
230 from landscape.broker.server import BrokerServer
231@@ -52,9 +53,11 @@
232 self.message_store = get_default_message_store(
233 self.persist, config.message_store_path)
234 self.identity = Identity(self.config, self.persist)
235+ exchange_store = ExchangeStore(self.config.exchange_store_path)
236 self.exchanger = MessageExchange(
237 self.reactor, self.message_store, self.transport, self.identity,
238- config.exchange_interval, config.urgent_exchange_interval)
239+ exchange_store, config.exchange_interval,
240+ config.urgent_exchange_interval)
241 self.pinger = self.pinger_factory(self.reactor, config.ping_url,
242 self.identity, self.exchanger)
243 self.registration = RegistrationHandler(
244
245=== modified file 'landscape/broker/tests/helpers.py'
246--- landscape/broker/tests/helpers.py 2010-04-07 21:00:46 +0000
247+++ landscape/broker/tests/helpers.py 2010-04-30 19:04:26 +0000
248@@ -6,6 +6,7 @@
249 from landscape.reactor import FakeReactor
250 from landscape.broker.transport import FakeTransport
251 from landscape.broker.exchange import MessageExchange
252+from landscape.broker.exchangestore import ExchangeStore
253 from landscape.broker.store import get_default_message_store
254 from landscape.broker.registration import Identity, RegistrationHandler
255 from landscape.broker.ping import Pinger
256@@ -74,9 +75,12 @@
257 test_case.transport = FakeTransport(test_case.config.url,
258 test_case.config.ssl_public_key)
259 test_case.reactor = FakeReactor()
260+ test_case.exchange_store = ExchangeStore(
261+ test_case.config.exchange_store_path)
262 test_case.exchanger = MessageExchange(
263 test_case.reactor, test_case.mstore, test_case.transport,
264- test_case.identity, test_case.config.exchange_interval,
265+ test_case.identity, test_case.exchange_store,
266+ test_case.config.exchange_interval,
267 test_case.config.urgent_exchange_interval)
268
269
270
271=== modified file 'landscape/broker/tests/test_exchange.py'
272--- landscape/broker/tests/test_exchange.py 2010-04-28 13:07:53 +0000
273+++ landscape/broker/tests/test_exchange.py 2010-04-30 19:04:26 +0000
274@@ -1,4 +1,3 @@
275-
276 from landscape import SERVER_API, CLIENT_API
277 from landscape.lib.persist import Persist
278 from landscape.lib.hashlib import md5
279@@ -22,6 +21,7 @@
280 self.mstore.add_schema(Message("empty", {}))
281 self.mstore.add_schema(Message("data", {"data": Int()}))
282 self.mstore.add_schema(Message("holdme", {}))
283+ self.identity.secure_id = 'needs-to-be-set-for-tests-to-pass'
284
285 def wait_for_exchange(self, urgent=False, factor=1, delta=0):
286 if urgent:
287@@ -240,7 +240,6 @@
288 self.mstore.add({"type": "data", "data": 2})
289 self.mstore.add({"type": "data", "data": 3})
290 # next one, server will respond with 1!
291-
292 def desynched_send_data(payload, computer_id=None, message_api=None):
293 self.transport.next_expected_sequence = 1
294 return {"next-expected-sequence": 1}
295@@ -273,7 +272,7 @@
296 """
297 transport = FakeTransport()
298 exchanger = MessageExchange(self.reactor, self.mstore, transport,
299- self.identity)
300+ self.identity, self.exchange_store)
301 exchanger.start()
302 self.wait_for_exchange(urgent=True)
303 self.assertEquals(len(transport.payloads), 1)
304@@ -390,6 +389,7 @@
305 del self.transport.exchange
306
307 exchanged = []
308+
309 def exchange_callback():
310 exchanged.append(True)
311
312@@ -473,7 +473,6 @@
313 self.assertEquals(payload.get("server-api"), "2.0")
314 self.assertEquals(self.transport.message_api, "2.0")
315
316-
317 def test_include_total_messages_none(self):
318 """
319 The payload includes the total number of messages that the client has
320@@ -500,14 +499,14 @@
321 pending.
322 """
323 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,
324- self.identity, max_messages=1)
325+ self.identity, self.exchange_store,
326+ max_messages=1)
327 self.mstore.set_accepted_types(["empty"])
328 self.mstore.add({"type": "empty"})
329 self.mstore.add({"type": "empty"})
330 exchanger.exchange()
331 self.assertEquals(self.transport.payloads[0]["total-messages"], 2)
332
333-
334 def test_impending_exchange(self):
335 """
336 A reactor event is emitted shortly (10 seconds) before an exchange
337@@ -530,7 +529,8 @@
338 # fixture has an urgent exchange interval of 10 seconds, which makes
339 # testing this awkward.
340 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,
341- self.identity, urgent_exchange_interval=20)
342+ self.identity, self.exchange_store,
343+ urgent_exchange_interval=20)
344 exchanger.schedule_exchange(urgent=True)
345 events = []
346 self.reactor.call_on("impending-exchange", lambda: events.append(True))
347@@ -547,7 +547,8 @@
348 before the new urgent exchange.
349 """
350 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,
351- self.identity, urgent_exchange_interval=20)
352+ self.identity, self.exchange_store,
353+ urgent_exchange_interval=20)
354 events = []
355 self.reactor.call_on("impending-exchange", lambda: events.append(True))
356 # This call will:
357@@ -610,6 +611,7 @@
358 self.transport.exchange = failed_send_data
359
360 exchanged = []
361+
362 def exchange_failed_callback():
363 exchanged.append(True)
364
365@@ -700,6 +702,7 @@
366
367 def handler1(message):
368 messages.append(("one", message))
369+
370 def handler2(message):
371 messages.append(("two", message))
372
373@@ -722,6 +725,7 @@
374
375 def test_server_uuid_change_cause_event(self):
376 called = []
377+
378 def server_uuid_changed(old_uuid, new_uuid):
379 called.append((old_uuid, new_uuid))
380 self.reactor.call_on("server-uuid-changed", server_uuid_changed)
381@@ -756,6 +760,7 @@
382 the event is not emitted.
383 """
384 called = []
385+
386 def server_uuid_changed(old_uuid, new_uuid):
387 called.append((old_uuid, new_uuid))
388 self.reactor.call_on("server-uuid-changed", server_uuid_changed)
389@@ -779,6 +784,77 @@
390
391 self.assertNotIn("INFO: Server UUID changed", self.logfile.getvalue())
392
393+ def test_return_messages_have_their_context_stored(self):
394+ """
395+ Incoming messages with an 'operation-id' key will have the secure id
396+ stored in the L{ExchangeStore}.
397+ """
398+ messages = []
399+ self.exchanger.register_message("type-R", messages.append)
400+ msg = {"type": "type-R", "whatever": 5678, "operation-id": 123456}
401+ server_message = [msg]
402+ self.transport.responses.append(server_message)
403+ self.exchanger.exchange()
404+ [message] = messages
405+ self.assertIsNot(
406+ None,
407+ self.exchange_store.get_message_context(message['operation-id']))
408+ message_context = self.exchange_store.get_message_context(
409+ message['operation-id'])
410+ self.assertEquals(message_context.operation_id, 123456)
411+ self.assertEquals(message_context.message_type, "type-R")
412+
413+ def test_one_way_messages_do_not_have_their_context_stored(self):
414+ """
415+ Incoming messages without an 'operation-id' key will *not* have the
416+ secure id stored in the L{ExchangeStore}.
417+ """
418+ ids_before = self.exchange_store.all_operation_ids()
419+
420+ msg = {"type": "type-R", "whatever": 5678}
421+ server_message = [msg]
422+ self.transport.responses.append(server_message)
423+ self.exchanger.exchange()
424+
425+ ids_after = self.exchange_store.all_operation_ids()
426+ self.assertEquals(ids_before, ids_after)
427+
428+ def test_obsolete_response_messages_are_discarded(self):
429+ """
430+ An obsolete response message will be discarded as opposed to being
431+ sent to the server.
432+
433+ A response message is considered obsolete if the secure ID changed
434+ since the request message was received.
435+ """
436+ # Receive the message below from the server.
437+ msg = {"type": "type-R", "whatever": 5678, "operation-id": 234567}
438+ server_message = [msg]
439+ self.transport.responses.append(server_message)
440+ self.exchanger.exchange()
441+
442+ # Change the secure ID so that the response message gets discarded.
443+ self.identity.secure_id = 'brand-new'
444+ ids_before = self.exchange_store.all_operation_ids()
445+
446+ self.mstore.set_accepted_types(["resynchronize"])
447+ message_id = self.exchanger.send(
448+ {"type": "resynchronize", "operation-id": 234567})
449+ self.exchanger.exchange()
450+ self.assertEquals(2, len(self.transport.payloads))
451+ messages = self.transport.payloads[1]["messages"]
452+ self.assertEquals([], messages)
453+ self.assertIs(None, message_id)
454+ expected_log_entry = (
455+ "Response message with operation-id 234567 was discarded because "
456+ "the client's secure ID has changed in the meantime")
457+ self.assertTrue(expected_log_entry in self.logfile.getvalue())
458+
459+ # The MessageContext was removed after utilisation.
460+ ids_after = self.exchange_store.all_operation_ids()
461+ self.assertTrue(len(ids_after) == len(ids_before) - 1)
462+ self.assertFalse('234567' in ids_after)
463+
464
465 class AcceptedTypesMessageExchangeTest(LandscapeTest):
466
467@@ -868,7 +944,6 @@
468 self.transport.payloads[2]["client-accepted-types"],
469 sorted(["type-A"] + DEFAULT_ACCEPTED_TYPES))
470
471-
472 def test_register_message_adds_accepted_type(self):
473 """
474 Using the C{register_message} method of the exchanger causes
475
476=== added file 'landscape/broker/tests/test_exchangestore.py'
477--- landscape/broker/tests/test_exchangestore.py 1970-01-01 00:00:00 +0000
478+++ landscape/broker/tests/test_exchangestore.py 2010-04-30 19:04:26 +0000
479@@ -0,0 +1,84 @@
480+import time
481+
482+try:
483+ import sqlite3
484+except ImportError:
485+ from pysqlite2 import dbapi2 as sqlite3
486+
487+from landscape.tests.helpers import LandscapeTest
488+
489+from landscape.broker.exchangestore import ExchangeStore
490+
491+
492+class ExchangeStoreTest(LandscapeTest):
493+ """Unit tests for the C{ExchangeStore}."""
494+
495+ def setUp(self):
496+ super(ExchangeStoreTest, self).setUp()
497+
498+ self.filename = self.makeFile()
499+ self.store1 = ExchangeStore(self.filename)
500+ self.store2 = ExchangeStore(self.filename)
501+
502+ def test_add_message_context(self):
503+ """Adding a message context works correctly."""
504+ now = time.time()
505+ self.store1.add_message_context(123, 'abc', 'change-packages')
506+
507+ db = sqlite3.connect(self.store2._filename)
508+ cursor = db.cursor()
509+ cursor.execute(
510+ "SELECT operation_id, secure_id, message_type, timestamp "
511+ "FROM message_context WHERE operation_id=?", (123,))
512+ results = cursor.fetchall()
513+ self.assertEquals(1, len(results))
514+ [row] = results
515+ self.assertEquals(123, row[0])
516+ self.assertEquals('abc', row[1])
517+ self.assertEquals('change-packages', row[2])
518+ self.assertTrue(row[3] > now)
519+
520+ def test_add_message_context_with_duplicate_operation_id(self):
521+ """Only one message context with a given operation-id is permitted."""
522+ self.store1.add_message_context(123, 'abc', 'change-packages')
523+ self.assertRaises(
524+ sqlite3.IntegrityError,
525+ self.store1.add_message_context, 123, 'def', 'change-packages')
526+
527+ def test_get_message_context(self):
528+ """
529+ Accessing a C{MessageContext} with an existing C{operation-id} works.
530+ """
531+ now = time.time()
532+ self.store1.add_message_context(234, 'bcd', 'change-packages')
533+ context = self.store2.get_message_context(234)
534+ self.assertEquals(234, context.operation_id)
535+ self.assertEquals('bcd', context.secure_id)
536+ self.assertEquals('change-packages', context.message_type)
537+ self.assertTrue(context.timestamp > now)
538+
539+ def test_get_message_context_with_nonexistent_operation_id(self):
540+ """Attempts to access a C{MessageContext} with a non-existent
541+ C{operation-id} result in C{None}."""
542+ self.assertIs(None, self.store1.get_message_context(999))
543+
544+ def test_message_context_remove(self):
545+ """C{MessageContext}s are deleted correctly."""
546+ context = self.store1.add_message_context(
547+ 345, 'opq', 'change-packages')
548+ context.remove()
549+ self.assertIs(None, self.store1.get_message_context(345))
550+
551+ def test_all_operation_ids_for_empty_database(self):
552+ """
553+ Calling C{all_operation_ids} on an empty database returns an empty
554+ list.
555+ """
556+ self.assertEquals([], self.store1.all_operation_ids())
557+
558+ def test_all_operation_ids(self):
559+ """C{all_operation_ids} works correctly."""
560+ self.store1.add_message_context(456, 'cde', 'change-packages')
561+ self.assertEquals([456], self.store2.all_operation_ids())
562+ self.store2.add_message_context(567, 'def', 'change-packages')
563+ self.assertEquals([456, 567], self.store1.all_operation_ids())
564
565=== added file 'landscape/lib/store.py'
566--- landscape/lib/store.py 1970-01-01 00:00:00 +0000
567+++ landscape/lib/store.py 2010-04-30 19:04:26 +0000
568@@ -0,0 +1,39 @@
569+"""Functions used by all sqlite-backed stores."""
570+
571+try:
572+ import sqlite3
573+except ImportError:
574+ from pysqlite2 import dbapi2 as sqlite3
575+
576+
577+def with_cursor(method):
578+ """Decorator that encloses the method in a database transaction.
579+
580+ Even though SQLite is supposed to be useful in autocommit mode, we've
581+ found cases where the database continued to be locked for writing
582+ until the cursor was closed. With this in mind, instead of using
583+ the autocommit mode, we explicitly terminate transactions and enforce
584+ cursor closing with this decorator.
585+ """
586+
587+ def inner(self, *args, **kwargs):
588+ if not self._db:
589+ # Create the database connection only when we start to actually
590+ # use it. This is essentially just a workaroud of a sqlite bug
591+ # happening when 2 concurrent processes try to create the tables
592+ # around the same time, the one which fails having an incorrect
593+ # cache and not seeing the tables
594+ self._db = sqlite3.connect(self._filename)
595+ self._ensure_schema()
596+ try:
597+ cursor = self._db.cursor()
598+ try:
599+ result = method(self, cursor, *args, **kwargs)
600+ finally:
601+ cursor.close()
602+ self._db.commit()
603+ except:
604+ self._db.rollback()
605+ raise
606+ return result
607+ return inner
608
609=== modified file 'landscape/package/store.py'
610--- landscape/package/store.py 2009-11-25 09:48:00 +0000
611+++ landscape/package/store.py 2010-04-30 19:04:26 +0000
612@@ -7,6 +7,7 @@
613 from pysqlite2 import dbapi2 as sqlite3
614
615 from landscape.lib import bpickle
616+from landscape.lib.store import with_cursor
617
618
619 class UnknownHashIDRequest(Exception):
620@@ -17,39 +18,6 @@
621 """Raised when trying to add an invalid hash=>id lookaside database."""
622
623
624-def with_cursor(method):
625- """Decorator that encloses the method in a database transaction.
626-
627- Even though SQLite is supposed to be useful in autocommit mode, we've
628- found cases where the database continued to be locked for writing
629- until the cursor was closed. With this in mind, instead of using
630- the autocommit mode, we explicitly terminate transactions and enforce
631- cursor closing with this decorator.
632- """
633-
634- def inner(self, *args, **kwargs):
635- if not self._db:
636- # Create the database connection only when we start to actually
637- # use it. This is essentially just a workaroud of a sqlite bug
638- # happening when 2 concurrent processes try to create the tables
639- # around the same time, the one which fails having an incorrect
640- # cache and not seeing the tables
641- self._db = sqlite3.connect(self._filename)
642- self._ensure_schema()
643- try:
644- cursor = self._db.cursor()
645- try:
646- result = method(self, cursor, *args, **kwargs)
647- finally:
648- cursor.close()
649- self._db.commit()
650- except:
651- self._db.rollback()
652- raise
653- return result
654- return inner
655-
656-
657 class HashIdStore(object):
658 """Persist package hash=>id mappings in a file.
659

Subscribers

People subscribed via source and target branches

to all changes: