Merge lp:~al-maisan/landscape-client/obsolete-results-v into lp:~landscape/landscape-client/trunk

Proposed by Muharem Hrnjadovic
Status: Merged
Approved by: Thomas Herve
Approved revision: 261
Merged at revision: not available
Proposed branch: lp:~al-maisan/landscape-client/obsolete-results-v
Merge into: lp:~landscape/landscape-client/trunk
Diff against target: 658 lines (+373/-45)
9 files modified
landscape/broker/config.py (+4/-0)
landscape/broker/exchange.py (+42/-1)
landscape/broker/exchangestore.py (+110/-0)
landscape/broker/service.py (+4/-1)
landscape/broker/tests/helpers.py (+5/-1)
landscape/broker/tests/test_exchange.py (+84/-9)
landscape/broker/tests/test_exchangestore.py (+84/-0)
landscape/lib/store.py (+39/-0)
landscape/package/store.py (+1/-33)
To merge this branch: bzr merge lp:~al-maisan/landscape-client/obsolete-results-v
Reviewer Review Type Date Requested Status
Thomas Herve (community) Approve
Free Ekanayaka (community) Approve
Review via email: mp+24405@code.launchpad.net

Description of the change

Obsolete response messages will now be discarded as opposed to being sent to the server.

A response message is considered obsolete if the secure ID has changed since the request message was received.

To post a comment you must log in.
244. By Muharem Hrnjadovic

bzr ls-lint fixes

245. By Muharem Hrnjadovic

Simplified test.

246. By Muharem Hrnjadovic

Fixed doc string.

247. By Muharem Hrnjadovic

Simplified obsolete response logic.

Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

Very nice work, just a few comments but overall the branch looks great to me.

[1]

+ def __init__(self, db, id):

Please add @param entries in the class docstring for these two parameters.

[2]

+ @param filename: The file that contains the sqlite database.

Jamu recently pointed that @param docstrings for __init__ method should be actually put in the class docstring, like:

class ExchangeStore(object):
    """Message meta data required by the L{MessageExchange}.

    The implementation uses a SQLite database as backend, with a single table
    called "message_context", whose schema is defined in
    L{ensure_exchange_schema}.

    @param filename: The file that contains the sqlite database.
    """

because the callable that gets actually used is the class itself (you usually don't call __init__ directly).

[3]

For consistency with other tests and for easier reading please name test cases against the tests the exercise, plus some test specific suffix. A few examples:

test_add_operationid_is_unique -> test_add_message_context_with_duplicate_operation_id
test_get_message_context_works -> test_get_message_context
test_deleting_message_contexts_works -> test_deleting_message_contexts

[4]

+ self._store = ExchangeStore(

It would be better to use dependency injection (that is usually the pattern adopted in the client) and pass an instance of ExchangeStore instead of creating it in the __init__ method. This helps testing and mocking, and make it easier in general to pass objects of other types providing the same API.

For example your test case will then have a self.exchange_store attribute (that you use in to build the MessageExchange object under test) which you can use for performing assertions instead of accessing self.exchanger._store, which is a private attribute and should be considered an implementation detail.

[5]

+ return 0

Maybe None would be safer, as 0 could be used as message id number.

[6]

+ self.assertIsNot(
+ None,
+ self.exchanger._store.get_message_context(message['operation-id']))

I think the test should also check that the created MessageContext has the expected attributes set:

+ message_context = self.exchanger._store.get_message_context(message['operation-id'])
+ self.assertEquals(message_context.operation_id, 123456)
+ self.assertEquals(message_context.message_type, "type-R")

[7]

+ self.exchanger.send({"type": "resynchronize", "operation-id": 234567})

Assuming that send() returns None in this case, the test should assert it:

+ message_id = self.exchanger.send({"type": "resynchronize", "operation-id": 234567})
+ self.assertIs(message_id, None)

[8]

+ logging.info(
+ "Response message with operation-id %s was discarded "
+ "because the client's secure ID has changed in the meantime"
+ % message.get('operation-id'))

The logging is not tested, you can use self.logfile.getvalue() for that.

review: Needs Fixing
248. By Muharem Hrnjadovic

Free's review comments [1-5]

249. By Muharem Hrnjadovic

Free's review comment #6

250. By Muharem Hrnjadovic

Free's review comment #7

251. By Muharem Hrnjadovic

Free's review comment #7

252. By Muharem Hrnjadovic

Free's review comment #8

Revision history for this message
Thomas Herve (therve) wrote :

It looks good!

[1] As it, the message context store will grow for ever. I think you should remove messages from it as soon as you add it to the message store (or discard it).

[2]
+ def _message_is_obsolete(self, message):
+ """True if message is obsolete.

Format "Returns C{True} if message is obsolete."

[3]
+class MessageContext(object):
+ """Stores the secure ID for incoming messages that require a response.

It seems to store more than that now.

[4]
+ """Accessing a C{MessageContext} with an existing
+ C{operation-id} works."""

+ """Calling C{all_operation_ids} on an empty database returns an empty
+ list."""

Please put the docstring on their own lines.

Thanks!

review: Needs Fixing
253. By Muharem Hrnjadovic

Thomas' review comment #1

254. By Muharem Hrnjadovic

Thomas' review comment #2

255. By Muharem Hrnjadovic

Thomas' review comment #3

256. By Muharem Hrnjadovic

Thomas' review comment #4

Revision history for this message
Muharem Hrnjadovic (al-maisan) wrote :
Download full text (3.5 KiB)

On 04/29/2010 05:31 PM, Free Ekanayaka wrote:
> Review: Needs Fixing
Hello Free,

I addressed all of your comments below, please review the changes. An
incremental diff is attached to this message for your convenience.

> Very nice work, just a few comments but overall the branch looks great to me.
>
> [1]
>
> + def __init__(self, db, id):
>
> Please add @param entries in the class docstring for these two parameters.
>
> [2]
>
> + @param filename: The file that contains the sqlite database.
>
> Jamu recently pointed that @param docstrings for __init__ method should be actually put in the class docstring, like:
>
> class ExchangeStore(object):
> """Message meta data required by the L{MessageExchange}.
>
> The implementation uses a SQLite database as backend, with a single table
> called "message_context", whose schema is defined in
> L{ensure_exchange_schema}.
>
> @param filename: The file that contains the sqlite database.
> """
>
> because the callable that gets actually used is the class itself (you usually don't call __init__ directly).
>
> [3]
>
> For consistency with other tests and for easier reading please name test cases against the tests the exercise, plus some test specific suffix. A few examples:
>
> test_add_operationid_is_unique -> test_add_message_context_with_duplicate_operation_id
> test_get_message_context_works -> test_get_message_context
> test_deleting_message_contexts_works -> test_deleting_message_contexts
>
> [4]
>
> + self._store = ExchangeStore(
>
> It would be better to use dependency injection (that is usually the pattern adopted in the client) and pass an instance of ExchangeStore instead of creating it in the __init__ method. This helps testing and mocking, and make it easier in general to pass objects of other types providing the same API.
>
> For example your test case will then have a self.exchange_store attribute (that you use in to build the MessageExchange object under test) which you can use for performing assertions instead of accessing self.exchanger._store, which is a private attribute and should be considered an implementation detail.
>
> [5]
>
> + return 0
>
> Maybe None would be safer, as 0 could be used as message id number.
>
> [6]
>
> + self.assertIsNot(
> + None,
> + self.exchanger._store.get_message_context(message['operation-id']))
>
> I think the test should also check that the created MessageContext has the expected attributes set:
>
> + message_context = self.exchanger._store.get_message_context(message['operation-id'])
> + self.assertEquals(message_context.operation_id, 123456)
> + self.assertEquals(message_context.message_type, "type-R")
>
> [7]
>
> + self.exchanger.send({"type": "resynchronize", "operation-id": 234567})
>
> Assuming that send() returns None in this case, the test should assert it:
>
> + message_id = self.exchanger.send({"type": "resynchronize", "operation-id": 234567})
> + self.assertIs(message_id, None)
>
> [8]
>
> + logging.info(
> + "Response message with operation-id %s was discarded "
> + ...

Read more...

=== modified file 'landscape/broker/exchange.py'
--- landscape/broker/exchange.py 2010-04-29 10:59:44 +0000
+++ landscape/broker/exchange.py 2010-04-29 15:57:29 +0000
@@ -1,12 +1,10 @@
1"""The part of the broker which deals with communications with the server."""1"""The part of the broker which deals with communications with the server."""
2import os
3import time2import time
4import logging3import logging
5from landscape.lib.hashlib import md54from landscape.lib.hashlib import md5
65
7from twisted.internet.defer import succeed6from twisted.internet.defer import succeed
87
9from landscape.broker.exchangestore import ExchangeStore
10from landscape.lib.message import got_next_expected, ANCIENT8from landscape.lib.message import got_next_expected, ANCIENT
11from landscape.log import format_delta9from landscape.log import format_delta
12from landscape import SERVER_API, CLIENT_API10from landscape import SERVER_API, CLIENT_API
@@ -23,7 +21,8 @@
2321
24 plugin_name = "message-exchange"22 plugin_name = "message-exchange"
2523
26 def __init__(self, reactor, store, transport, registration_info, data_path,24 def __init__(self, reactor, store, transport, registration_info,
25 message_context_store,
27 exchange_interval=60*60,26 exchange_interval=60*60,
28 urgent_exchange_interval=10,27 urgent_exchange_interval=10,
29 monitor_interval=None,28 monitor_interval=None,
@@ -54,8 +53,7 @@
54 self._client_accepted_types = set()53 self._client_accepted_types = set()
55 self._client_accepted_types_hash = None54 self._client_accepted_types_hash = None
56 self._message_handlers = {}55 self._message_handlers = {}
57 self._store = ExchangeStore(56 self._store = message_context_store
58 os.path.join(data_path, "exchange.database"))
5957
60 self.register_message("accepted-types", self._handle_accepted_types)58 self.register_message("accepted-types", self._handle_accepted_types)
61 self.register_message("resynchronize", self._handle_resynchronize)59 self.register_message("resynchronize", self._handle_resynchronize)
@@ -101,7 +99,7 @@
101 "Response message with operation-id %s was discarded "99 "Response message with operation-id %s was discarded "
102 "because the client's secure ID has changed in the meantime"100 "because the client's secure ID has changed in the meantime"
103 % message.get('operation-id'))101 % message.get('operation-id'))
104 return 0102 return None
105 else:103 else:
106 if "timestamp" not in message:104 if "timestamp" not in message:
107 message["timestamp"] = int(self._reactor.time())105 message["timestamp"] = int(self._reactor.time())
108106
=== modified file 'landscape/broker/exchangestore.py'
--- landscape/broker/exchangestore.py 2010-04-29 10:57:52 +0000
+++ landscape/broker/exchangestore.py 2010-04-29 15:57:29 +0000
@@ -16,6 +16,9 @@
16 which the request message came in and the completion of the request.16 which the request message came in and the completion of the request.
17 If the secure ID did change the result message is obolete and will not be17 If the secure ID did change the result message is obolete and will not be
18 sent to the server.18 sent to the server.
19
20 @param db: the sqlite database handle.
21 @param id: the database key value for this instance.
19 """22 """
2023
21 def __init__(self, db, id):24 def __init__(self, db, id):
@@ -47,13 +50,12 @@
47 The implementation uses a SQLite database as backend, with a single table50 The implementation uses a SQLite database as backend, with a single table
48 called "message_context", whose schema is defined in51 called "message_context", whose schema is defined in
49 L{ensure_exchange_schema}.52 L{ensure_exchange_schema}.
53
54 @param filename: The name of the file that contains the sqlite database.
50 """55 """
51 _db = None56 _db = None
5257
53 def __init__(self, filename):58 def __init__(self, filename):
54 """
55 @param filename: The file that contains the sqlite database.
56 """
57 self._filename = filename59 self._filename = filename
5860
59 def _ensure_schema(self):61 def _ensure_schema(self):
6062
=== modified file 'landscape/broker/service.py'
--- landscape/broker/service.py 2010-04-29 09:02:57 +0000
+++ landscape/broker/service.py 2010-04-29 15:57:29 +0000
@@ -7,6 +7,7 @@
7from landscape.broker.config import BrokerConfiguration7from landscape.broker.config import BrokerConfiguration
8from landscape.broker.transport import HTTPTransport8from landscape.broker.transport import HTTPTransport
9from landscape.broker.exchange import MessageExchange9from landscape.broker.exchange import MessageExchange
10from landscape.broker.exchangestore import ExchangeStore
10from landscape.broker.ping import Pinger11from landscape.broker.ping import Pinger
11from landscape.broker.store import get_default_message_store12from landscape.broker.store import get_default_message_store
12from landscape.broker.server import BrokerServer13from landscape.broker.server import BrokerServer
@@ -52,9 +53,11 @@
52 self.message_store = get_default_message_store(53 self.message_store = get_default_message_store(
53 self.persist, config.message_store_path)54 self.persist, config.message_store_path)
54 self.identity = Identity(self.config, self.persist)55 self.identity = Identity(self.config, self.persist)
56 exchange_store = ExchangeStore(
57 os.path.join(config.data_path, "exchange.database"))
55 self.exchanger = MessageExchange(58 self.exchanger = MessageExchange(
56 self.reactor, self.message_store, self.transport, self.identity,59 self.reactor, self.message_store, self.transport, self.identity,
57 config.data_path, config.exchange_interval,60 exchange_store, config.exchange_interval,
58 config.urgent_exchange_interval)61 config.urgent_exchange_interval)
59 self.pinger = self.pinger_factory(self.reactor, config.ping_url,62 self.pinger = self.pinger_factory(self.reactor, config.ping_url,
60 self.identity, self.exchanger)63 self.identity, self.exchanger)
6164
=== modified file 'landscape/broker/tests/helpers.py'
--- landscape/broker/tests/helpers.py 2010-04-29 09:02:57 +0000
+++ landscape/broker/tests/helpers.py 2010-04-29 15:57:29 +0000
@@ -6,6 +6,7 @@
6from landscape.reactor import FakeReactor6from landscape.reactor import FakeReactor
7from landscape.broker.transport import FakeTransport7from landscape.broker.transport import FakeTransport
8from landscape.broker.exchange import MessageExchange8from landscape.broker.exchange import MessageExchange
9from landscape.broker.exchangestore import ExchangeStore
9from landscape.broker.store import get_default_message_store10from landscape.broker.store import get_default_message_store
10from landscape.broker.registration import Identity, RegistrationHandler11from landscape.broker.registration import Identity, RegistrationHandler
11from landscape.broker.ping import Pinger12from landscape.broker.ping import Pinger
@@ -74,9 +75,11 @@
74 test_case.transport = FakeTransport(test_case.config.url,75 test_case.transport = FakeTransport(test_case.config.url,
75 test_case.config.ssl_public_key)76 test_case.config.ssl_public_key)
76 test_case.reactor = FakeReactor()77 test_case.reactor = FakeReactor()
78 exchange_store = ExchangeStore(
79 os.path.join(test_case.config.data_path, "exchange.database"))
77 test_case.exchanger = MessageExchange(80 test_case.exchanger = MessageExchange(
78 test_case.reactor, test_case.mstore, test_case.transport,81 test_case.reactor, test_case.mstore, test_case.transport,
79 test_case.identity, test_case.config.data_path,82 test_case.identity, exchange_store,
80 test_case.config.exchange_interval,83 test_case.config.exchange_interval,
81 test_case.config.urgent_exchange_interval)84 test_case.config.urgent_exchange_interval)
8285
8386
=== modified file 'landscape/broker/tests/test_exchange.py'
--- landscape/broker/tests/test_exchange.py 2010-04-29 10:56:27 +0000
+++ landscape/broker/tests/test_exchange.py 2010-04-29 16:05:18 +0000
@@ -1,9 +1,11 @@
1import os
1from landscape import SERVER_API, CLIENT_API2from landscape import SERVER_API, CLIENT_API
2from landscape.lib.persist import Persist3from landscape.lib.persist import Persist
3from landscape.lib.hashlib import md54from landscape.lib.hashlib import md5
4from landscape.lib.fetch import fetch_async5from landscape.lib.fetch import fetch_async
5from landscape.schema import Message, Int6from landscape.schema import Message, Int
6from landscape.broker.exchange import get_accepted_types_diff, MessageExchange7from landscape.broker.exchange import get_accepted_types_diff, MessageExchange
8from landscape.broker.exchangestore import ExchangeStore
7from landscape.broker.transport import FakeTransport9from landscape.broker.transport import FakeTransport
8from landscape.broker.store import MessageStore10from landscape.broker.store import MessageStore
9from landscape.broker.ping import Pinger11from landscape.broker.ping import Pinger
@@ -271,8 +273,10 @@
271 Immediately after registration, an urgent exchange should be scheduled.273 Immediately after registration, an urgent exchange should be scheduled.
272 """274 """
273 transport = FakeTransport()275 transport = FakeTransport()
276 exchange_store = ExchangeStore(
277 os.path.join(self.config.data_path, "exchange.database"))
274 exchanger = MessageExchange(self.reactor, self.mstore, transport,278 exchanger = MessageExchange(self.reactor, self.mstore, transport,
275 self.identity, self.config.data_path)279 self.identity, exchange_store)
276 exchanger.start()280 exchanger.start()
277 self.wait_for_exchange(urgent=True)281 self.wait_for_exchange(urgent=True)
278 self.assertEquals(len(transport.payloads), 1)282 self.assertEquals(len(transport.payloads), 1)
@@ -498,8 +502,10 @@
498 the total-messages is equivalent to the total number of messages502 the total-messages is equivalent to the total number of messages
499 pending.503 pending.
500 """504 """
505 exchange_store = ExchangeStore(
506 os.path.join(self.config.data_path, "exchange.database"))
501 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,507 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,
502 self.identity, self.config.data_path,508 self.identity, exchange_store,
503 max_messages=1)509 max_messages=1)
504 self.mstore.set_accepted_types(["empty"])510 self.mstore.set_accepted_types(["empty"])
505 self.mstore.add({"type": "empty"})511 self.mstore.add({"type": "empty"})
@@ -528,8 +534,10 @@
528 # We create our own MessageExchange because the one set up by the text534 # We create our own MessageExchange because the one set up by the text
529 # fixture has an urgent exchange interval of 10 seconds, which makes535 # fixture has an urgent exchange interval of 10 seconds, which makes
530 # testing this awkward.536 # testing this awkward.
537 exchange_store = ExchangeStore(
538 os.path.join(self.config.data_path, "exchange.database"))
531 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,539 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,
532 self.identity, self.config.data_path,540 self.identity, exchange_store,
533 urgent_exchange_interval=20)541 urgent_exchange_interval=20)
534 exchanger.schedule_exchange(urgent=True)542 exchanger.schedule_exchange(urgent=True)
535 events = []543 events = []
@@ -546,8 +554,10 @@
546 should be cancelled and a new one should be scheduled for 10 seconds554 should be cancelled and a new one should be scheduled for 10 seconds
547 before the new urgent exchange.555 before the new urgent exchange.
548 """556 """
557 exchange_store = ExchangeStore(
558 os.path.join(self.config.data_path, "exchange.database"))
549 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,559 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,
550 self.identity, self.config.data_path,560 self.identity, exchange_store,
551 urgent_exchange_interval=20)561 urgent_exchange_interval=20)
552 events = []562 events = []
553 self.reactor.call_on("impending-exchange", lambda: events.append(True))563 self.reactor.call_on("impending-exchange", lambda: events.append(True))
@@ -799,6 +809,9 @@
799 self.assertIsNot(809 self.assertIsNot(
800 None,810 None,
801 self.exchanger._store.get_message_context(message['operation-id']))811 self.exchanger._store.get_message_context(message['operation-id']))
812 message_context = self.exchanger._store.get_message_context(message['operation-id'])
813 self.assertEquals(message_context.operation_id, 123456)
814 self.assertEquals(message_context.message_type, "type-R")
802815
803 def test_one_way_messages_do_not_have_their_context_stored(self):816 def test_one_way_messages_do_not_have_their_context_stored(self):
804 """817 """
@@ -833,11 +846,17 @@
833 self.identity.secure_id = 'brand-new'846 self.identity.secure_id = 'brand-new'
834847
835 self.mstore.set_accepted_types(["resynchronize"])848 self.mstore.set_accepted_types(["resynchronize"])
836 self.exchanger.send({"type": "resynchronize", "operation-id": 234567})849 message_id = self.exchanger.send(
850 {"type": "resynchronize", "operation-id": 234567})
837 self.exchanger.exchange()851 self.exchanger.exchange()
838 self.assertEquals(2, len(self.transport.payloads))852 self.assertEquals(2, len(self.transport.payloads))
839 messages = self.transport.payloads[1]["messages"]853 messages = self.transport.payloads[1]["messages"]
840 self.assertEquals([], messages)854 self.assertEquals([], messages)
855 self.assertIs(None, message_id)
856 expected_log_entry = (
857 "Response message with operation-id 234567 was discarded because "
858 "the client's secure ID has changed in the meantime")
859 self.assertTrue(expected_log_entry in self.logfile.getvalue())
841860
842861
843class AcceptedTypesMessageExchangeTest(LandscapeTest):862class AcceptedTypesMessageExchangeTest(LandscapeTest):
844863
=== modified file 'landscape/broker/tests/test_exchangestore.py'
--- landscape/broker/tests/test_exchangestore.py 2010-04-29 10:30:28 +0000
+++ landscape/broker/tests/test_exchangestore.py 2010-04-29 15:57:29 +0000
@@ -38,14 +38,14 @@
38 self.assertEquals('change-packages', row[2])38 self.assertEquals('change-packages', row[2])
39 self.assertTrue(row[3] > now)39 self.assertTrue(row[3] > now)
4040
41 def test_add_operationid_is_unique(self):41 def test_add_message_context_with_duplicate_operation_id(self):
42 """Only one message context with a given operation-id is permitted."""42 """Only one message context with a given operation-id is permitted."""
43 self.store1.add_message_context(123, 'abc', 'change-packages')43 self.store1.add_message_context(123, 'abc', 'change-packages')
44 self.assertRaises(44 self.assertRaises(
45 sqlite3.IntegrityError,45 sqlite3.IntegrityError,
46 self.store1.add_message_context, 123, 'def', 'change-packages')46 self.store1.add_message_context, 123, 'def', 'change-packages')
4747
48 def test_get_message_context_works(self):48 def test_get_message_context(self):
49 """Accessing a C{MessageContext} with an existing49 """Accessing a C{MessageContext} with an existing
50 C{operation-id} works."""50 C{operation-id} works."""
51 now = time.time()51 now = time.time()
@@ -61,7 +61,7 @@
61 C{operation-id} result in C{None}."""61 C{operation-id} result in C{None}."""
62 self.assertIs(None, self.store1.get_message_context(999))62 self.assertIs(None, self.store1.get_message_context(999))
6363
64 def test_deleting_message_contexts_works(self):64 def test_message_context_remove(self):
65 """C{MessageContext}s are deleted correctly."""65 """C{MessageContext}s are deleted correctly."""
66 context = self.store1.add_message_context(66 context = self.store1.add_message_context(
67 345, 'opq', 'change-packages')67 345, 'opq', 'change-packages')
Revision history for this message
Muharem Hrnjadovic (al-maisan) wrote :

On 04/30/2010 09:54 AM, Thomas Herve wrote:
> Review: Needs Fixing
> It looks good!
Hello Thomas,

thank you very much for the review and especially for the first catch below.
I have addressed all of your comments below, please review the changes. An
incremental diff is attached to this message for your convenience.

> [1] As it, the message context store will grow for ever. I think you should remove messages from it as soon as you add it to the message store (or discard it).
>
> [2]
> + def _message_is_obsolete(self, message):
> + """True if message is obsolete.
>
> Format "Returns C{True} if message is obsolete."
>
> [3]
> +class MessageContext(object):
> + """Stores the secure ID for incoming messages that require a response.
>
> It seems to store more than that now.
>
> [4]
> + """Accessing a C{MessageContext} with an existing
> + C{operation-id} works."""
>
>
> + """Calling C{all_operation_ids} on an empty database returns an empty
> + list."""
>
> Please put the docstring on their own lines.
>
>
> Thanks!

Best regards

--
Muharem Hrnjadovic <email address hidden>
Public key id : B2BBFCFC
Key fingerprint : A5A3 CC67 2B87 D641 103F 5602 219F 6B60 B2BB FCFC

=== modified file 'landscape/broker/exchange.py'
--- landscape/broker/exchange.py 2010-04-29 15:57:29 +0000
+++ landscape/broker/exchange.py 2010-04-30 08:32:27 +0000
@@ -66,7 +66,7 @@
66 return (self._urgent_exchange_interval, self._exchange_interval)66 return (self._urgent_exchange_interval, self._exchange_interval)
6767
68 def _message_is_obsolete(self, message):68 def _message_is_obsolete(self, message):
69 """True if message is obsolete.69 """Returns C{True} if message is obsolete.
7070
71 A message is considered obsolete if the secure ID changed since it was71 A message is considered obsolete if the secure ID changed since it was
72 received.72 received.
@@ -84,7 +84,10 @@
8484
85 # Compare the current secure ID with the one that was in effect when85 # Compare the current secure ID with the one that was in effect when
86 # the request message was received.86 # the request message was received.
87 return self._registration_info.secure_id != context.secure_id87 result = self._registration_info.secure_id != context.secure_id
88 context.remove()
89
90 return result
8891
89 def send(self, message, urgent=False):92 def send(self, message, urgent=False):
90 """Include a message to be sent in an exchange.93 """Include a message to be sent in an exchange.
9194
=== modified file 'landscape/broker/exchangestore.py'
--- landscape/broker/exchangestore.py 2010-04-29 15:57:29 +0000
+++ landscape/broker/exchangestore.py 2010-04-30 08:35:46 +0000
@@ -10,7 +10,14 @@
1010
1111
12class MessageContext(object):12class MessageContext(object):
13 """Stores the secure ID for incoming messages that require a response.13 """Stores a context for incoming messages that require a response.
14
15 The context consists of
16
17 - the "operation-id" value
18 - the secure ID that was in effect when the message was received
19 - the message type
20 - the time when the message was received
1421
15 This data will be used to detect secure ID changes between the time at22 This data will be used to detect secure ID changes between the time at
16 which the request message came in and the completion of the request.23 which the request message came in and the completion of the request.
1724
=== modified file 'landscape/broker/tests/test_exchange.py'
--- landscape/broker/tests/test_exchange.py 2010-04-29 16:05:18 +0000
+++ landscape/broker/tests/test_exchange.py 2010-04-30 08:29:00 +0000
@@ -844,6 +844,7 @@
844844
845 # Change the secure ID so that the response message gets discarded.845 # Change the secure ID so that the response message gets discarded.
846 self.identity.secure_id = 'brand-new'846 self.identity.secure_id = 'brand-new'
847 ids_before = self.exchanger._store.all_operation_ids()
847848
848 self.mstore.set_accepted_types(["resynchronize"])849 self.mstore.set_accepted_types(["resynchronize"])
849 message_id = self.exchanger.send(850 message_id = self.exchanger.send(
@@ -858,6 +859,11 @@
858 "the client's secure ID has changed in the meantime")859 "the client's secure ID has changed in the meantime")
859 self.assertTrue(expected_log_entry in self.logfile.getvalue())860 self.assertTrue(expected_log_entry in self.logfile.getvalue())
860861
862 # The MessageContext was removed after utilisation.
863 ids_after = self.exchanger._store.all_operation_ids()
864 self.assertTrue(len(ids_after) == len(ids_before) - 1)
865 self.assertFalse('234567' in ids_after)
866
861867
862class AcceptedTypesMessageExchangeTest(LandscapeTest):868class AcceptedTypesMessageExchangeTest(LandscapeTest):
863869
864870
=== modified file 'landscape/broker/tests/test_exchangestore.py'
--- landscape/broker/tests/test_exchangestore.py 2010-04-29 15:57:29 +0000
+++ landscape/broker/tests/test_exchangestore.py 2010-04-30 08:37:03 +0000
@@ -46,8 +46,9 @@
46 self.store1.add_message_context, 123, 'def', 'change-packages')46 self.store1.add_message_context, 123, 'def', 'change-packages')
4747
48 def test_get_message_context(self):48 def test_get_message_context(self):
49 """Accessing a C{MessageContext} with an existing49 """
50 C{operation-id} works."""50 Accessing a C{MessageContext} with an existing C{operation-id} works.
51 """
51 now = time.time()52 now = time.time()
52 self.store1.add_message_context(234, 'bcd', 'change-packages')53 self.store1.add_message_context(234, 'bcd', 'change-packages')
53 context = self.store2.get_message_context(234)54 context = self.store2.get_message_context(234)
@@ -69,8 +70,10 @@
69 self.assertIs(None, self.store1.get_message_context(345))70 self.assertIs(None, self.store1.get_message_context(345))
7071
71 def test_all_operation_ids_for_empty_database(self):72 def test_all_operation_ids_for_empty_database(self):
72 """Calling C{all_operation_ids} on an empty database returns an empty73 """
73 list."""74 Calling C{all_operation_ids} on an empty database returns an empty
75 list.
76 """
74 self.assertEquals([], self.store1.all_operation_ids())77 self.assertEquals([], self.store1.all_operation_ids())
7578
76 def test_all_operation_ids(self):79 def test_all_operation_ids(self):
Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

[9]

+ message_context_store,

Please name this parameter "exchange_store", so it matches the name of the expected type (ExchangeStore) and it is shorter. Also, now that we have it, the ExchangeStore could be used to store also other types of information beside MessageContext's, so it makes sense to use a generic name for it.

+ self._store = message_context_store

Similarly please name this attribute "self._exchange_store", to make it clear which store is referring to (there is another attribute called "self._message_store", so the naming will be consistent).

[10]

+ else:
+ if "timestamp" not in message:
+ message["timestamp"] = int(self._reactor.time())
+ message_id = self._message_store.add(message)
+ if urgent:
+ self.schedule_exchange(urgent=True)
+ return message_id

I think you can save this else/indentation level and simply write:

+ if "timestamp" not in message:
+ message["timestamp"] = int(self._reactor.time())
+ message_id = self._message_store.add(message)
+ if urgent:
+ self.schedule_exchange(urgent=True)
+ return message_id

I personally like the pattern when you check a series of early-return conditions, before reaching the meat of a function/method, like

if this:
   return something
if that:
   return something_else

do_the_actual_thing

[11]

+ exchange_store = ExchangeStore(
+ os.path.join(config.data_path, "exchange.database"))

Just to avoid hard-coding this value you could add a property to l.broker.config.BrokerConfiguration, like:

@property
def exchange_store_path(self):
    return os.path.join(self.data_path, "exchange.database"))

and use it as

+ exchange_store = ExchangeStore(self.config.exchange_store_path)

[12]

+ exchange_store = ExchangeStore(
+ os.path.join(test_case.config.data_path, "exchange.database"))

Please add this object to the test_case attributes, so it can be used in tests:

+ test_case.exchange_store = ExchangeStore(
+ os.path.join(test_case.config.data_path, "exchange.database"))

after that you can avoid creating it explicitly in all the tests.

[13]

+ self.exchanger._store.get_message_context(message['operation-id']))

Once you have self.exchange_store as test attribute (see [12]) you should use it instead of accessing this private attribute.

Thanks!

Revision history for this message
Muharem Hrnjadovic (al-maisan) wrote :

Hello Free,

thank you very much for these comments, I really appreciate it.
I have taken care of all of them, please see the attached incremental
diff.

On 04/30/2010 11:14 AM, Free Ekanayaka wrote:
> [9]
>
> + message_context_store,
>
> Please name this parameter "exchange_store", so it matches the name of the expected type (ExchangeStore) and it is shorter. Also, now that we have it, the ExchangeStore could be used to store also other types of information beside MessageContext's, so it makes sense to use a generic name for it.
>
> + self._store = message_context_store
>
> Similarly please name this attribute "self._exchange_store", to make it clear which store is referring to (there is another attribute called "self._message_store", so the naming will be consistent).
>
> [10]
>
> + else:
> + if "timestamp" not in message:
> + message["timestamp"] = int(self._reactor.time())
> + message_id = self._message_store.add(message)
> + if urgent:
> + self.schedule_exchange(urgent=True)
> + return message_id
>
> I think you can save this else/indentation level and simply write:
>
> + if "timestamp" not in message:
> + message["timestamp"] = int(self._reactor.time())
> + message_id = self._message_store.add(message)
> + if urgent:
> + self.schedule_exchange(urgent=True)
> + return message_id
>
> I personally like the pattern when you check a series of early-return conditions, before reaching the meat of a function/method, like
>
> if this:
> return something
> if that:
> return something_else
>
> do_the_actual_thing
>
> [11]
>
> + exchange_store = ExchangeStore(
> + os.path.join(config.data_path, "exchange.database"))
>
> Just to avoid hard-coding this value you could add a property to l.broker.config.BrokerConfiguration, like:
>
> @property
> def exchange_store_path(self):
> return os.path.join(self.data_path, "exchange.database"))
>
> and use it as
>
> + exchange_store = ExchangeStore(self.config.exchange_store_path)
>
> [12]
>
> + exchange_store = ExchangeStore(
> + os.path.join(test_case.config.data_path, "exchange.database"))
>
> Please add this object to the test_case attributes, so it can be used in tests:
>
> + test_case.exchange_store = ExchangeStore(
> + os.path.join(test_case.config.data_path, "exchange.database"))
>
> after that you can avoid creating it explicitly in all the tests.
>
> [13]
>
> + self.exchanger._store.get_message_context(message['operation-id']))
>
> Once you have self.exchange_store as test attribute (see [12]) you should use it instead of accessing this private attribute.
>
> Thanks!

Best regards

--
Muharem Hrnjadovic <email address hidden>
Public key id : B2BBFCFC
Key fingerprint : A5A3 CC67 2B87 D641 103F 5602 219F 6B60 B2BB FCFC

=== modified file 'landscape/broker/config.py'
--- landscape/broker/config.py 2010-01-06 15:11:24 +0000
+++ landscape/broker/config.py 2010-04-30 10:39:07 +0000
@@ -18,6 +18,10 @@
18 self._original_http_proxy = os.environ.get("http_proxy")18 self._original_http_proxy = os.environ.get("http_proxy")
19 self._original_https_proxy = os.environ.get("https_proxy")19 self._original_https_proxy = os.environ.get("https_proxy")
2020
21 @property
22 def exchange_store_path(self):
23 return os.path.join(self.data_path, "exchange.database")
24
21 def make_parser(self):25 def make_parser(self):
22 """Parser factory for broker-specific options.26 """Parser factory for broker-specific options.
2327
2428
=== modified file 'landscape/broker/exchange.py'
--- landscape/broker/exchange.py 2010-04-30 08:32:42 +0000
+++ landscape/broker/exchange.py 2010-04-30 10:32:42 +0000
@@ -22,7 +22,7 @@
22 plugin_name = "message-exchange"22 plugin_name = "message-exchange"
2323
24 def __init__(self, reactor, store, transport, registration_info,24 def __init__(self, reactor, store, transport, registration_info,
25 message_context_store,25 exchange_store,
26 exchange_interval=60*60,26 exchange_interval=60*60,
27 urgent_exchange_interval=10,27 urgent_exchange_interval=10,
28 monitor_interval=None,28 monitor_interval=None,
@@ -53,7 +53,7 @@
53 self._client_accepted_types = set()53 self._client_accepted_types = set()
54 self._client_accepted_types_hash = None54 self._client_accepted_types_hash = None
55 self._message_handlers = {}55 self._message_handlers = {}
56 self._store = message_context_store56 self._exchange_store = exchange_store
5757
58 self.register_message("accepted-types", self._handle_accepted_types)58 self.register_message("accepted-types", self._handle_accepted_types)
59 self.register_message("resynchronize", self._handle_resynchronize)59 self.register_message("resynchronize", self._handle_resynchronize)
@@ -75,7 +75,7 @@
75 return False75 return False
7676
77 operation_id = message['operation-id']77 operation_id = message['operation-id']
78 context = self._store.get_message_context(operation_id)78 context = self._exchange_store.get_message_context(operation_id)
79 if context is None:79 if context is None:
80 logging.warning(80 logging.warning(
81 "No message context for message with operation-id: %s"81 "No message context for message with operation-id: %s"
@@ -103,13 +103,13 @@
103 "because the client's secure ID has changed in the meantime"103 "because the client's secure ID has changed in the meantime"
104 % message.get('operation-id'))104 % message.get('operation-id'))
105 return None105 return None
106 else:106
107 if "timestamp" not in message:107 if "timestamp" not in message:
108 message["timestamp"] = int(self._reactor.time())108 message["timestamp"] = int(self._reactor.time())
109 message_id = self._message_store.add(message)109 message_id = self._message_store.add(message)
110 if urgent:110 if urgent:
111 self.schedule_exchange(urgent=True)111 self.schedule_exchange(urgent=True)
112 return message_id112 return message_id
113113
114 def start(self):114 def start(self):
115 """Start scheduling exchanges. The first one will be urgent."""115 """Start scheduling exchanges. The first one will be urgent."""
@@ -400,7 +400,7 @@
400 if 'operation-id' in message:400 if 'operation-id' in message:
401 # This is a message that requires a response. Store the secure ID401 # This is a message that requires a response. Store the secure ID
402 # so we can check for obsolete results later.402 # so we can check for obsolete results later.
403 self._store.add_message_context(403 self._exchange_store.add_message_context(
404 message['operation-id'], self._registration_info.secure_id,404 message['operation-id'], self._registration_info.secure_id,
405 message['type'])405 message['type'])
406406
407407
=== modified file 'landscape/broker/service.py'
--- landscape/broker/service.py 2010-04-29 15:57:29 +0000
+++ landscape/broker/service.py 2010-04-30 10:39:43 +0000
@@ -53,8 +53,7 @@
53 self.message_store = get_default_message_store(53 self.message_store = get_default_message_store(
54 self.persist, config.message_store_path)54 self.persist, config.message_store_path)
55 self.identity = Identity(self.config, self.persist)55 self.identity = Identity(self.config, self.persist)
56 exchange_store = ExchangeStore(56 exchange_store = ExchangeStore(self.config.exchange_store_path)
57 os.path.join(config.data_path, "exchange.database"))
58 self.exchanger = MessageExchange(57 self.exchanger = MessageExchange(
59 self.reactor, self.message_store, self.transport, self.identity,58 self.reactor, self.message_store, self.transport, self.identity,
60 exchange_store, config.exchange_interval,59 exchange_store, config.exchange_interval,
6160
=== modified file 'landscape/broker/tests/helpers.py'
--- landscape/broker/tests/helpers.py 2010-04-29 15:57:29 +0000
+++ landscape/broker/tests/helpers.py 2010-04-30 10:44:26 +0000
@@ -75,11 +75,11 @@
75 test_case.transport = FakeTransport(test_case.config.url,75 test_case.transport = FakeTransport(test_case.config.url,
76 test_case.config.ssl_public_key)76 test_case.config.ssl_public_key)
77 test_case.reactor = FakeReactor()77 test_case.reactor = FakeReactor()
78 exchange_store = ExchangeStore(78 test_case.exchange_store = ExchangeStore(
79 os.path.join(test_case.config.data_path, "exchange.database"))79 os.path.join(test_case.config.data_path, "exchange.database"))
80 test_case.exchanger = MessageExchange(80 test_case.exchanger = MessageExchange(
81 test_case.reactor, test_case.mstore, test_case.transport,81 test_case.reactor, test_case.mstore, test_case.transport,
82 test_case.identity, exchange_store,82 test_case.identity, test_case.exchange_store,
83 test_case.config.exchange_interval,83 test_case.config.exchange_interval,
84 test_case.config.urgent_exchange_interval)84 test_case.config.urgent_exchange_interval)
8585
8686
=== modified file 'landscape/broker/tests/test_exchange.py'
--- landscape/broker/tests/test_exchange.py 2010-04-30 08:29:41 +0000
+++ landscape/broker/tests/test_exchange.py 2010-04-30 10:48:10 +0000
@@ -273,10 +273,8 @@
273 Immediately after registration, an urgent exchange should be scheduled.273 Immediately after registration, an urgent exchange should be scheduled.
274 """274 """
275 transport = FakeTransport()275 transport = FakeTransport()
276 exchange_store = ExchangeStore(
277 os.path.join(self.config.data_path, "exchange.database"))
278 exchanger = MessageExchange(self.reactor, self.mstore, transport,276 exchanger = MessageExchange(self.reactor, self.mstore, transport,
279 self.identity, exchange_store)277 self.identity, self.exchange_store)
280 exchanger.start()278 exchanger.start()
281 self.wait_for_exchange(urgent=True)279 self.wait_for_exchange(urgent=True)
282 self.assertEquals(len(transport.payloads), 1)280 self.assertEquals(len(transport.payloads), 1)
@@ -502,10 +500,8 @@
502 the total-messages is equivalent to the total number of messages500 the total-messages is equivalent to the total number of messages
503 pending.501 pending.
504 """502 """
505 exchange_store = ExchangeStore(
506 os.path.join(self.config.data_path, "exchange.database"))
507 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,503 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,
508 self.identity, exchange_store,504 self.identity, self.exchange_store,
509 max_messages=1)505 max_messages=1)
510 self.mstore.set_accepted_types(["empty"])506 self.mstore.set_accepted_types(["empty"])
511 self.mstore.add({"type": "empty"})507 self.mstore.add({"type": "empty"})
@@ -534,10 +530,8 @@
534 # We create our own MessageExchange because the one set up by the text530 # We create our own MessageExchange because the one set up by the text
535 # fixture has an urgent exchange interval of 10 seconds, which makes531 # fixture has an urgent exchange interval of 10 seconds, which makes
536 # testing this awkward.532 # testing this awkward.
537 exchange_store = ExchangeStore(
538 os.path.join(self.config.data_path, "exchange.database"))
539 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,533 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,
540 self.identity, exchange_store,534 self.identity, self.exchange_store,
541 urgent_exchange_interval=20)535 urgent_exchange_interval=20)
542 exchanger.schedule_exchange(urgent=True)536 exchanger.schedule_exchange(urgent=True)
543 events = []537 events = []
@@ -554,10 +548,8 @@
554 should be cancelled and a new one should be scheduled for 10 seconds548 should be cancelled and a new one should be scheduled for 10 seconds
555 before the new urgent exchange.549 before the new urgent exchange.
556 """550 """
557 exchange_store = ExchangeStore(
558 os.path.join(self.config.data_path, "exchange.database"))
559 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,551 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,
560 self.identity, exchange_store,552 self.identity, self.exchange_store,
561 urgent_exchange_interval=20)553 urgent_exchange_interval=20)
562 events = []554 events = []
563 self.reactor.call_on("impending-exchange", lambda: events.append(True))555 self.reactor.call_on("impending-exchange", lambda: events.append(True))
@@ -808,8 +800,9 @@
808 [message] = messages800 [message] = messages
809 self.assertIsNot(801 self.assertIsNot(
810 None,802 None,
811 self.exchanger._store.get_message_context(message['operation-id']))803 self.exchange_store.get_message_context(message['operation-id']))
812 message_context = self.exchanger._store.get_message_context(message['operation-id'])804 message_context = self.exchange_store.get_message_context(
805 message['operation-id'])
813 self.assertEquals(message_context.operation_id, 123456)806 self.assertEquals(message_context.operation_id, 123456)
814 self.assertEquals(message_context.message_type, "type-R")807 self.assertEquals(message_context.message_type, "type-R")
815808
@@ -818,14 +811,14 @@
818 Incoming messages without an 'operation-id' key will *not* have the811 Incoming messages without an 'operation-id' key will *not* have the
819 secure id stored in the L{ExchangeStore}.812 secure id stored in the L{ExchangeStore}.
820 """813 """
821 ids_before = self.exchanger._store.all_operation_ids()814 ids_before = self.exchange_store.all_operation_ids()
822815
823 msg = {"type": "type-R", "whatever": 5678}816 msg = {"type": "type-R", "whatever": 5678}
824 server_message = [msg]817 server_message = [msg]
825 self.transport.responses.append(server_message)818 self.transport.responses.append(server_message)
826 self.exchanger.exchange()819 self.exchanger.exchange()
827820
828 ids_after = self.exchanger._store.all_operation_ids()821 ids_after = self.exchange_store.all_operation_ids()
829 self.assertEquals(ids_before, ids_after)822 self.assertEquals(ids_before, ids_after)
830823
831 def test_obsolete_response_messages_are_discarded(self):824 def test_obsolete_response_messages_are_discarded(self):
@@ -844,7 +837,7 @@
844837
845 # Change the secure ID so that the response message gets discarded.838 # Change the secure ID so that the response message gets discarded.
846 self.identity.secure_id = 'brand-new'839 self.identity.secure_id = 'brand-new'
847 ids_before = self.exchanger._store.all_operation_ids()840 ids_before = self.exchange_store.all_operation_ids()
848841
849 self.mstore.set_accepted_types(["resynchronize"])842 self.mstore.set_accepted_types(["resynchronize"])
850 message_id = self.exchanger.send(843 message_id = self.exchanger.send(
@@ -860,7 +853,7 @@
860 self.assertTrue(expected_log_entry in self.logfile.getvalue())853 self.assertTrue(expected_log_entry in self.logfile.getvalue())
861854
862 # The MessageContext was removed after utilisation.855 # The MessageContext was removed after utilisation.
863 ids_after = self.exchanger._store.all_operation_ids()856 ids_after = self.exchange_store.all_operation_ids()
864 self.assertTrue(len(ids_after) == len(ids_before) - 1)857 self.assertTrue(len(ids_after) == len(ids_before) - 1)
865 self.assertFalse('234567' in ids_after)858 self.assertFalse('234567' in ids_after)
866859
257. By Muharem Hrnjadovic

Free's review comments [9,10]

258. By Muharem Hrnjadovic

Free's review comment #11

259. By Muharem Hrnjadovic

Free's review comment #12

260. By Muharem Hrnjadovic

Free's review comments [12,13]

Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

The branch looks great! Thanks for addressing all the comments. +1

[14]

+ test_case.exchange_store = ExchangeStore(
+ os.path.join(test_case.config.data_path, "exchange.database"))

You can now use BrokerConfiguration.exchange_store_path:

+ test_case.exchange_store = ExchangeStore(
+ test_case.config.exchange_store_path)

review: Approve
261. By Muharem Hrnjadovic

Free's review comment #14

Revision history for this message
Thomas Herve (therve) wrote :

[5] Some pyflakes:
landscape/broker/tests/test_exchange.py:1: 'os' imported but unused
landscape/broker/tests/test_exchange.py:8: 'ExchangeStore' imported but unused

[6] get_message_context should retrieve all the data from the database in one query, and pass it to the MessageContext __init__, thus removing the select in there.

+1!

review: Approve
262. By Muharem Hrnjadovic

Thomas' review comment #5

Revision history for this message
Muharem Hrnjadovic (al-maisan) wrote :

On 04/30/2010 04:59 PM, Thomas Herve wrote:
> Review: Approve
> [5] Some pyflakes:
> landscape/broker/tests/test_exchange.py:1: 'os' imported but unused
> landscape/broker/tests/test_exchange.py:8: 'ExchangeStore' imported but unused
>
> [6] get_message_context should retrieve all the data from the database in one query, and pass it to the MessageContext __init__, thus removing the select in there.

Hello Thomas,

thanks for the suggestions above! I have revised the code accordingly.
Please see the attached incremental diff.

Best regards

--
Muharem Hrnjadovic <email address hidden>
Public key id : B2BBFCFC
Key fingerprint : A5A3 CC67 2B87 D641 103F 5602 219F 6B60 B2BB FCFC

=== modified file 'landscape/broker/exchangestore.py'
--- landscape/broker/exchangestore.py 2010-04-30 08:36:15 +0000
+++ landscape/broker/exchangestore.py 2010-04-30 18:11:31 +0000
@@ -28,27 +28,18 @@
28 @param id: the database key value for this instance.28 @param id: the database key value for this instance.
29 """29 """
3030
31 def __init__(self, db, id):31 def __init__(self, db, operation_id, secure_id, message_type, timestamp):
32 self._db = db32 self._db = db
33 self.id = id33 self.operation_id = operation_id
3434 self.secure_id = secure_id
35 cursor = db.cursor()35 self.message_type = message_type
36 try:36 self.timestamp = timestamp
37 cursor.execute(
38 "SELECT operation_id, secure_id, message_type, timestamp "
39 "FROM message_context WHERE id=?", (id,))
40 row = cursor.fetchone()
41 finally:
42 cursor.close()
43
44 self.operation_id = row[0]
45 self.secure_id = row[1]
46 self.message_type = row[2]
47 self.timestamp = row[3]
4837
49 @with_cursor38 @with_cursor
50 def remove(self, cursor):39 def remove(self, cursor):
51 cursor.execute("DELETE FROM message_context WHERE id=?", (self.id,))40 cursor.execute(
41 "DELETE FROM message_context WHERE operation_id=?",
42 (self.operation_id,))
5243
5344
54class ExchangeStore(object):45class ExchangeStore(object):
@@ -72,21 +63,21 @@
72 def add_message_context(63 def add_message_context(
73 self, cursor, operation_id, secure_id, message_type):64 self, cursor, operation_id, secure_id, message_type):
74 """Add a L{MessageContext} with the given data."""65 """Add a L{MessageContext} with the given data."""
66 params = (operation_id, secure_id, message_type, time.time())
75 cursor.execute(67 cursor.execute(
76 "INSERT INTO message_context "68 "INSERT INTO message_context "
77 " (operation_id, secure_id, message_type, timestamp) "69 " (operation_id, secure_id, message_type, timestamp) "
78 " VALUES (?,?,?,?)",70 " VALUES (?,?,?,?)", params)
79 (operation_id, secure_id, message_type, time.time()))71 return MessageContext(self._db, *params)
80 return MessageContext(self._db, cursor.lastrowid)
8172
82 @with_cursor73 @with_cursor
83 def get_message_context(self, cursor, operation_id):74 def get_message_context(self, cursor, operation_id):
84 """The L{MessageContext} for the given C{operation_id} or C{None}."""75 """The L{MessageContext} for the given C{operation_id} or C{None}."""
85 cursor.execute(76 cursor.execute(
86 "SELECT id FROM message_context WHERE operation_id=?",77 "SELECT operation_id, secure_id, message_type, timestamp "
87 (operation_id,))78 "FROM message_context WHERE operation_id=?", (operation_id,))
88 result = cursor.fetchone()79 row = cursor.fetchone()
89 return MessageContext(self._db, result[0]) if result else None80 return MessageContext(self._db, *row) if row else None
9081
91 @with_cursor82 @with_cursor
92 def all_operation_ids(self, cursor):83 def all_operation_ids(self, cursor):
9384
=== modified file 'landscape/broker/tests/test_exchange.py'
--- landscape/broker/tests/test_exchange.py 2010-04-30 10:48:32 +0000
+++ landscape/broker/tests/test_exchange.py 2010-04-30 15:54:23 +0000
@@ -1,11 +1,9 @@
1import os
2from landscape import SERVER_API, CLIENT_API1from landscape import SERVER_API, CLIENT_API
3from landscape.lib.persist import Persist2from landscape.lib.persist import Persist
4from landscape.lib.hashlib import md53from landscape.lib.hashlib import md5
5from landscape.lib.fetch import fetch_async4from landscape.lib.fetch import fetch_async
6from landscape.schema import Message, Int5from landscape.schema import Message, Int
7from landscape.broker.exchange import get_accepted_types_diff, MessageExchange6from landscape.broker.exchange import get_accepted_types_diff, MessageExchange
8from landscape.broker.exchangestore import ExchangeStore
9from landscape.broker.transport import FakeTransport7from landscape.broker.transport import FakeTransport
10from landscape.broker.store import MessageStore8from landscape.broker.store import MessageStore
11from landscape.broker.ping import Pinger9from landscape.broker.ping import Pinger
263. By Muharem Hrnjadovic

Thomas' review comment #6

264. By Muharem Hrnjadovic

Made code more succinct

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'landscape/broker/config.py'
--- landscape/broker/config.py 2010-01-06 15:11:24 +0000
+++ landscape/broker/config.py 2010-04-30 19:04:26 +0000
@@ -18,6 +18,10 @@
18 self._original_http_proxy = os.environ.get("http_proxy")18 self._original_http_proxy = os.environ.get("http_proxy")
19 self._original_https_proxy = os.environ.get("https_proxy")19 self._original_https_proxy = os.environ.get("https_proxy")
2020
21 @property
22 def exchange_store_path(self):
23 return os.path.join(self.data_path, "exchange.database")
24
21 def make_parser(self):25 def make_parser(self):
22 """Parser factory for broker-specific options.26 """Parser factory for broker-specific options.
2327
2428
=== modified file 'landscape/broker/exchange.py'
--- landscape/broker/exchange.py 2010-01-22 11:51:19 +0000
+++ landscape/broker/exchange.py 2010-04-30 19:04:26 +0000
@@ -22,6 +22,7 @@
22 plugin_name = "message-exchange"22 plugin_name = "message-exchange"
2323
24 def __init__(self, reactor, store, transport, registration_info,24 def __init__(self, reactor, store, transport, registration_info,
25 exchange_store,
25 exchange_interval=60*60,26 exchange_interval=60*60,
26 urgent_exchange_interval=10,27 urgent_exchange_interval=10,
27 monitor_interval=None,28 monitor_interval=None,
@@ -52,6 +53,7 @@
52 self._client_accepted_types = set()53 self._client_accepted_types = set()
53 self._client_accepted_types_hash = None54 self._client_accepted_types_hash = None
54 self._message_handlers = {}55 self._message_handlers = {}
56 self._exchange_store = exchange_store
5557
56 self.register_message("accepted-types", self._handle_accepted_types)58 self.register_message("accepted-types", self._handle_accepted_types)
57 self.register_message("resynchronize", self._handle_resynchronize)59 self.register_message("resynchronize", self._handle_resynchronize)
@@ -63,6 +65,30 @@
63 """Return a binary tuple with urgent and normal exchange intervals."""65 """Return a binary tuple with urgent and normal exchange intervals."""
64 return (self._urgent_exchange_interval, self._exchange_interval)66 return (self._urgent_exchange_interval, self._exchange_interval)
6567
68 def _message_is_obsolete(self, message):
69 """Returns C{True} if message is obsolete.
70
71 A message is considered obsolete if the secure ID changed since it was
72 received.
73 """
74 if 'operation-id' not in message:
75 return False
76
77 operation_id = message['operation-id']
78 context = self._exchange_store.get_message_context(operation_id)
79 if context is None:
80 logging.warning(
81 "No message context for message with operation-id: %s"
82 % operation_id)
83 return False
84
85 # Compare the current secure ID with the one that was in effect when
86 # the request message was received.
87 result = self._registration_info.secure_id != context.secure_id
88 context.remove()
89
90 return result
91
66 def send(self, message, urgent=False):92 def send(self, message, urgent=False):
67 """Include a message to be sent in an exchange.93 """Include a message to be sent in an exchange.
6894
@@ -71,6 +97,13 @@
7197
72 @param message: Same as in L{MessageStore.add}.98 @param message: Same as in L{MessageStore.add}.
73 """99 """
100 if self._message_is_obsolete(message):
101 logging.info(
102 "Response message with operation-id %s was discarded "
103 "because the client's secure ID has changed in the meantime"
104 % message.get('operation-id'))
105 return None
106
74 if "timestamp" not in message:107 if "timestamp" not in message:
75 message["timestamp"] = int(self._reactor.time())108 message["timestamp"] = int(self._reactor.time())
76 message_id = self._message_store.add(message)109 message_id = self._message_store.add(message)
@@ -300,7 +333,8 @@
300 @param result: The response got in reply to the C{payload}.333 @param result: The response got in reply to the C{payload}.
301 """334 """
302 message_store = self._message_store335 message_store = self._message_store
303 self._client_accepted_types_hash = result.get("client-accepted-types-hash")336 self._client_accepted_types_hash = result.get(
337 "client-accepted-types-hash")
304 next_expected = result.get("next-expected-sequence")338 next_expected = result.get("next-expected-sequence")
305 old_sequence = message_store.get_sequence()339 old_sequence = message_store.get_sequence()
306 if next_expected is None:340 if next_expected is None:
@@ -363,6 +397,13 @@
363 Any message handlers registered with L{register_message} will397 Any message handlers registered with L{register_message} will
364 be called.398 be called.
365 """399 """
400 if 'operation-id' in message:
401 # This is a message that requires a response. Store the secure ID
402 # so we can check for obsolete results later.
403 self._exchange_store.add_message_context(
404 message['operation-id'], self._registration_info.secure_id,
405 message['type'])
406
366 self._reactor.fire("message", message)407 self._reactor.fire("message", message)
367 # This has plan interference! but whatever.408 # This has plan interference! but whatever.
368 if message["type"] in self._message_handlers:409 if message["type"] in self._message_handlers:
369410
=== added file 'landscape/broker/exchangestore.py'
--- landscape/broker/exchangestore.py 1970-01-01 00:00:00 +0000
+++ landscape/broker/exchangestore.py 2010-04-30 19:04:26 +0000
@@ -0,0 +1,110 @@
1"""Provide access to the persistent data used by the L{MessageExchange}."""
2import time
3
4try:
5 import sqlite3
6except ImportError:
7 from pysqlite2 import dbapi2 as sqlite3
8
9from landscape.lib.store import with_cursor
10
11
12class MessageContext(object):
13 """Stores a context for incoming messages that require a response.
14
15 The context consists of
16
17 - the "operation-id" value
18 - the secure ID that was in effect when the message was received
19 - the message type
20 - the time when the message was received
21
22 This data will be used to detect secure ID changes between the time at
23 which the request message came in and the completion of the request.
24 If the secure ID did change the result message is obolete and will not be
25 sent to the server.
26
27 @param db: the sqlite database handle.
28 @param id: the database key value for this instance.
29 """
30
31 def __init__(self, db, operation_id, secure_id, message_type, timestamp):
32 self._db = db
33 self.operation_id = operation_id
34 self.secure_id = secure_id
35 self.message_type = message_type
36 self.timestamp = timestamp
37
38 @with_cursor
39 def remove(self, cursor):
40 cursor.execute(
41 "DELETE FROM message_context WHERE operation_id=?",
42 (self.operation_id,))
43
44
45class ExchangeStore(object):
46 """Message meta data required by the L{MessageExchange}.
47
48 The implementation uses a SQLite database as backend, with a single table
49 called "message_context", whose schema is defined in
50 L{ensure_exchange_schema}.
51
52 @param filename: The name of the file that contains the sqlite database.
53 """
54 _db = None
55
56 def __init__(self, filename):
57 self._filename = filename
58
59 def _ensure_schema(self):
60 ensure_exchange_schema(self._db)
61
62 @with_cursor
63 def add_message_context(
64 self, cursor, operation_id, secure_id, message_type):
65 """Add a L{MessageContext} with the given data."""
66 params = (operation_id, secure_id, message_type, time.time())
67 cursor.execute(
68 "INSERT INTO message_context "
69 " (operation_id, secure_id, message_type, timestamp) "
70 " VALUES (?,?,?,?)", params)
71 return MessageContext(self._db, *params)
72
73 @with_cursor
74 def get_message_context(self, cursor, operation_id):
75 """The L{MessageContext} for the given C{operation_id} or C{None}."""
76 cursor.execute(
77 "SELECT operation_id, secure_id, message_type, timestamp "
78 "FROM message_context WHERE operation_id=?", (operation_id,))
79 row = cursor.fetchone()
80 return MessageContext(self._db, *row) if row else None
81
82 @with_cursor
83 def all_operation_ids(self, cursor):
84 """Return all operation IDs currently stored in C{message_context}."""
85 cursor.execute("SELECT operation_id FROM message_context")
86 result = cursor.fetchall()
87 return [row[0] for row in result]
88
89
90def ensure_exchange_schema(db):
91 """Create all tables needed by a L{ExchangeStore}.
92
93 @param db: A connection to a SQLite database.
94 """
95 cursor = db.cursor()
96 try:
97 cursor.execute(
98 "CREATE TABLE message_context"
99 " (id INTEGER PRIMARY KEY, timestamp TIMESTAMP, "
100 " secure_id TEXT NOT NULL, operation_id INTEGER NOT NULL, "
101 " message_type text NOT NULL)")
102 cursor.execute(
103 "CREATE UNIQUE INDEX msgctx_operationid_idx ON "
104 "message_context(operation_id)")
105 except (sqlite3.OperationalError, sqlite3.DatabaseError):
106 cursor.close()
107 db.rollback()
108 else:
109 cursor.close()
110 db.commit()
0111
=== modified file 'landscape/broker/service.py'
--- landscape/broker/service.py 2010-04-23 11:37:25 +0000
+++ landscape/broker/service.py 2010-04-30 19:04:26 +0000
@@ -7,6 +7,7 @@
7from landscape.broker.config import BrokerConfiguration7from landscape.broker.config import BrokerConfiguration
8from landscape.broker.transport import HTTPTransport8from landscape.broker.transport import HTTPTransport
9from landscape.broker.exchange import MessageExchange9from landscape.broker.exchange import MessageExchange
10from landscape.broker.exchangestore import ExchangeStore
10from landscape.broker.ping import Pinger11from landscape.broker.ping import Pinger
11from landscape.broker.store import get_default_message_store12from landscape.broker.store import get_default_message_store
12from landscape.broker.server import BrokerServer13from landscape.broker.server import BrokerServer
@@ -52,9 +53,11 @@
52 self.message_store = get_default_message_store(53 self.message_store = get_default_message_store(
53 self.persist, config.message_store_path)54 self.persist, config.message_store_path)
54 self.identity = Identity(self.config, self.persist)55 self.identity = Identity(self.config, self.persist)
56 exchange_store = ExchangeStore(self.config.exchange_store_path)
55 self.exchanger = MessageExchange(57 self.exchanger = MessageExchange(
56 self.reactor, self.message_store, self.transport, self.identity,58 self.reactor, self.message_store, self.transport, self.identity,
57 config.exchange_interval, config.urgent_exchange_interval)59 exchange_store, config.exchange_interval,
60 config.urgent_exchange_interval)
58 self.pinger = self.pinger_factory(self.reactor, config.ping_url,61 self.pinger = self.pinger_factory(self.reactor, config.ping_url,
59 self.identity, self.exchanger)62 self.identity, self.exchanger)
60 self.registration = RegistrationHandler(63 self.registration = RegistrationHandler(
6164
=== modified file 'landscape/broker/tests/helpers.py'
--- landscape/broker/tests/helpers.py 2010-04-07 21:00:46 +0000
+++ landscape/broker/tests/helpers.py 2010-04-30 19:04:26 +0000
@@ -6,6 +6,7 @@
6from landscape.reactor import FakeReactor6from landscape.reactor import FakeReactor
7from landscape.broker.transport import FakeTransport7from landscape.broker.transport import FakeTransport
8from landscape.broker.exchange import MessageExchange8from landscape.broker.exchange import MessageExchange
9from landscape.broker.exchangestore import ExchangeStore
9from landscape.broker.store import get_default_message_store10from landscape.broker.store import get_default_message_store
10from landscape.broker.registration import Identity, RegistrationHandler11from landscape.broker.registration import Identity, RegistrationHandler
11from landscape.broker.ping import Pinger12from landscape.broker.ping import Pinger
@@ -74,9 +75,12 @@
74 test_case.transport = FakeTransport(test_case.config.url,75 test_case.transport = FakeTransport(test_case.config.url,
75 test_case.config.ssl_public_key)76 test_case.config.ssl_public_key)
76 test_case.reactor = FakeReactor()77 test_case.reactor = FakeReactor()
78 test_case.exchange_store = ExchangeStore(
79 test_case.config.exchange_store_path)
77 test_case.exchanger = MessageExchange(80 test_case.exchanger = MessageExchange(
78 test_case.reactor, test_case.mstore, test_case.transport,81 test_case.reactor, test_case.mstore, test_case.transport,
79 test_case.identity, test_case.config.exchange_interval,82 test_case.identity, test_case.exchange_store,
83 test_case.config.exchange_interval,
80 test_case.config.urgent_exchange_interval)84 test_case.config.urgent_exchange_interval)
8185
8286
8387
=== modified file 'landscape/broker/tests/test_exchange.py'
--- landscape/broker/tests/test_exchange.py 2010-04-28 13:07:53 +0000
+++ landscape/broker/tests/test_exchange.py 2010-04-30 19:04:26 +0000
@@ -1,4 +1,3 @@
1
2from landscape import SERVER_API, CLIENT_API1from landscape import SERVER_API, CLIENT_API
3from landscape.lib.persist import Persist2from landscape.lib.persist import Persist
4from landscape.lib.hashlib import md53from landscape.lib.hashlib import md5
@@ -22,6 +21,7 @@
22 self.mstore.add_schema(Message("empty", {}))21 self.mstore.add_schema(Message("empty", {}))
23 self.mstore.add_schema(Message("data", {"data": Int()}))22 self.mstore.add_schema(Message("data", {"data": Int()}))
24 self.mstore.add_schema(Message("holdme", {}))23 self.mstore.add_schema(Message("holdme", {}))
24 self.identity.secure_id = 'needs-to-be-set-for-tests-to-pass'
2525
26 def wait_for_exchange(self, urgent=False, factor=1, delta=0):26 def wait_for_exchange(self, urgent=False, factor=1, delta=0):
27 if urgent:27 if urgent:
@@ -240,7 +240,6 @@
240 self.mstore.add({"type": "data", "data": 2})240 self.mstore.add({"type": "data", "data": 2})
241 self.mstore.add({"type": "data", "data": 3})241 self.mstore.add({"type": "data", "data": 3})
242 # next one, server will respond with 1!242 # next one, server will respond with 1!
243
244 def desynched_send_data(payload, computer_id=None, message_api=None):243 def desynched_send_data(payload, computer_id=None, message_api=None):
245 self.transport.next_expected_sequence = 1244 self.transport.next_expected_sequence = 1
246 return {"next-expected-sequence": 1}245 return {"next-expected-sequence": 1}
@@ -273,7 +272,7 @@
273 """272 """
274 transport = FakeTransport()273 transport = FakeTransport()
275 exchanger = MessageExchange(self.reactor, self.mstore, transport,274 exchanger = MessageExchange(self.reactor, self.mstore, transport,
276 self.identity)275 self.identity, self.exchange_store)
277 exchanger.start()276 exchanger.start()
278 self.wait_for_exchange(urgent=True)277 self.wait_for_exchange(urgent=True)
279 self.assertEquals(len(transport.payloads), 1)278 self.assertEquals(len(transport.payloads), 1)
@@ -390,6 +389,7 @@
390 del self.transport.exchange389 del self.transport.exchange
391390
392 exchanged = []391 exchanged = []
392
393 def exchange_callback():393 def exchange_callback():
394 exchanged.append(True)394 exchanged.append(True)
395395
@@ -473,7 +473,6 @@
473 self.assertEquals(payload.get("server-api"), "2.0")473 self.assertEquals(payload.get("server-api"), "2.0")
474 self.assertEquals(self.transport.message_api, "2.0")474 self.assertEquals(self.transport.message_api, "2.0")
475475
476
477 def test_include_total_messages_none(self):476 def test_include_total_messages_none(self):
478 """477 """
479 The payload includes the total number of messages that the client has478 The payload includes the total number of messages that the client has
@@ -500,14 +499,14 @@
500 pending.499 pending.
501 """500 """
502 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,501 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,
503 self.identity, max_messages=1)502 self.identity, self.exchange_store,
503 max_messages=1)
504 self.mstore.set_accepted_types(["empty"])504 self.mstore.set_accepted_types(["empty"])
505 self.mstore.add({"type": "empty"})505 self.mstore.add({"type": "empty"})
506 self.mstore.add({"type": "empty"})506 self.mstore.add({"type": "empty"})
507 exchanger.exchange()507 exchanger.exchange()
508 self.assertEquals(self.transport.payloads[0]["total-messages"], 2)508 self.assertEquals(self.transport.payloads[0]["total-messages"], 2)
509509
510
511 def test_impending_exchange(self):510 def test_impending_exchange(self):
512 """511 """
513 A reactor event is emitted shortly (10 seconds) before an exchange512 A reactor event is emitted shortly (10 seconds) before an exchange
@@ -530,7 +529,8 @@
530 # fixture has an urgent exchange interval of 10 seconds, which makes529 # fixture has an urgent exchange interval of 10 seconds, which makes
531 # testing this awkward.530 # testing this awkward.
532 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,531 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,
533 self.identity, urgent_exchange_interval=20)532 self.identity, self.exchange_store,
533 urgent_exchange_interval=20)
534 exchanger.schedule_exchange(urgent=True)534 exchanger.schedule_exchange(urgent=True)
535 events = []535 events = []
536 self.reactor.call_on("impending-exchange", lambda: events.append(True))536 self.reactor.call_on("impending-exchange", lambda: events.append(True))
@@ -547,7 +547,8 @@
547 before the new urgent exchange.547 before the new urgent exchange.
548 """548 """
549 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,549 exchanger = MessageExchange(self.reactor, self.mstore, self.transport,
550 self.identity, urgent_exchange_interval=20)550 self.identity, self.exchange_store,
551 urgent_exchange_interval=20)
551 events = []552 events = []
552 self.reactor.call_on("impending-exchange", lambda: events.append(True))553 self.reactor.call_on("impending-exchange", lambda: events.append(True))
553 # This call will:554 # This call will:
@@ -610,6 +611,7 @@
610 self.transport.exchange = failed_send_data611 self.transport.exchange = failed_send_data
611612
612 exchanged = []613 exchanged = []
614
613 def exchange_failed_callback():615 def exchange_failed_callback():
614 exchanged.append(True)616 exchanged.append(True)
615617
@@ -700,6 +702,7 @@
700702
701 def handler1(message):703 def handler1(message):
702 messages.append(("one", message))704 messages.append(("one", message))
705
703 def handler2(message):706 def handler2(message):
704 messages.append(("two", message))707 messages.append(("two", message))
705708
@@ -722,6 +725,7 @@
722725
723 def test_server_uuid_change_cause_event(self):726 def test_server_uuid_change_cause_event(self):
724 called = []727 called = []
728
725 def server_uuid_changed(old_uuid, new_uuid):729 def server_uuid_changed(old_uuid, new_uuid):
726 called.append((old_uuid, new_uuid))730 called.append((old_uuid, new_uuid))
727 self.reactor.call_on("server-uuid-changed", server_uuid_changed)731 self.reactor.call_on("server-uuid-changed", server_uuid_changed)
@@ -756,6 +760,7 @@
756 the event is not emitted.760 the event is not emitted.
757 """761 """
758 called = []762 called = []
763
759 def server_uuid_changed(old_uuid, new_uuid):764 def server_uuid_changed(old_uuid, new_uuid):
760 called.append((old_uuid, new_uuid))765 called.append((old_uuid, new_uuid))
761 self.reactor.call_on("server-uuid-changed", server_uuid_changed)766 self.reactor.call_on("server-uuid-changed", server_uuid_changed)
@@ -779,6 +784,77 @@
779784
780 self.assertNotIn("INFO: Server UUID changed", self.logfile.getvalue())785 self.assertNotIn("INFO: Server UUID changed", self.logfile.getvalue())
781786
787 def test_return_messages_have_their_context_stored(self):
788 """
789 Incoming messages with an 'operation-id' key will have the secure id
790 stored in the L{ExchangeStore}.
791 """
792 messages = []
793 self.exchanger.register_message("type-R", messages.append)
794 msg = {"type": "type-R", "whatever": 5678, "operation-id": 123456}
795 server_message = [msg]
796 self.transport.responses.append(server_message)
797 self.exchanger.exchange()
798 [message] = messages
799 self.assertIsNot(
800 None,
801 self.exchange_store.get_message_context(message['operation-id']))
802 message_context = self.exchange_store.get_message_context(
803 message['operation-id'])
804 self.assertEquals(message_context.operation_id, 123456)
805 self.assertEquals(message_context.message_type, "type-R")
806
807 def test_one_way_messages_do_not_have_their_context_stored(self):
808 """
809 Incoming messages without an 'operation-id' key will *not* have the
810 secure id stored in the L{ExchangeStore}.
811 """
812 ids_before = self.exchange_store.all_operation_ids()
813
814 msg = {"type": "type-R", "whatever": 5678}
815 server_message = [msg]
816 self.transport.responses.append(server_message)
817 self.exchanger.exchange()
818
819 ids_after = self.exchange_store.all_operation_ids()
820 self.assertEquals(ids_before, ids_after)
821
822 def test_obsolete_response_messages_are_discarded(self):
823 """
824 An obsolete response message will be discarded as opposed to being
825 sent to the server.
826
827 A response message is considered obsolete if the secure ID changed
828 since the request message was received.
829 """
830 # Receive the message below from the server.
831 msg = {"type": "type-R", "whatever": 5678, "operation-id": 234567}
832 server_message = [msg]
833 self.transport.responses.append(server_message)
834 self.exchanger.exchange()
835
836 # Change the secure ID so that the response message gets discarded.
837 self.identity.secure_id = 'brand-new'
838 ids_before = self.exchange_store.all_operation_ids()
839
840 self.mstore.set_accepted_types(["resynchronize"])
841 message_id = self.exchanger.send(
842 {"type": "resynchronize", "operation-id": 234567})
843 self.exchanger.exchange()
844 self.assertEquals(2, len(self.transport.payloads))
845 messages = self.transport.payloads[1]["messages"]
846 self.assertEquals([], messages)
847 self.assertIs(None, message_id)
848 expected_log_entry = (
849 "Response message with operation-id 234567 was discarded because "
850 "the client's secure ID has changed in the meantime")
851 self.assertTrue(expected_log_entry in self.logfile.getvalue())
852
853 # The MessageContext was removed after utilisation.
854 ids_after = self.exchange_store.all_operation_ids()
855 self.assertTrue(len(ids_after) == len(ids_before) - 1)
856 self.assertFalse('234567' in ids_after)
857
782858
783class AcceptedTypesMessageExchangeTest(LandscapeTest):859class AcceptedTypesMessageExchangeTest(LandscapeTest):
784860
@@ -868,7 +944,6 @@
868 self.transport.payloads[2]["client-accepted-types"],944 self.transport.payloads[2]["client-accepted-types"],
869 sorted(["type-A"] + DEFAULT_ACCEPTED_TYPES))945 sorted(["type-A"] + DEFAULT_ACCEPTED_TYPES))
870946
871
872 def test_register_message_adds_accepted_type(self):947 def test_register_message_adds_accepted_type(self):
873 """948 """
874 Using the C{register_message} method of the exchanger causes949 Using the C{register_message} method of the exchanger causes
875950
=== added file 'landscape/broker/tests/test_exchangestore.py'
--- landscape/broker/tests/test_exchangestore.py 1970-01-01 00:00:00 +0000
+++ landscape/broker/tests/test_exchangestore.py 2010-04-30 19:04:26 +0000
@@ -0,0 +1,84 @@
1import time
2
3try:
4 import sqlite3
5except ImportError:
6 from pysqlite2 import dbapi2 as sqlite3
7
8from landscape.tests.helpers import LandscapeTest
9
10from landscape.broker.exchangestore import ExchangeStore
11
12
13class ExchangeStoreTest(LandscapeTest):
14 """Unit tests for the C{ExchangeStore}."""
15
16 def setUp(self):
17 super(ExchangeStoreTest, self).setUp()
18
19 self.filename = self.makeFile()
20 self.store1 = ExchangeStore(self.filename)
21 self.store2 = ExchangeStore(self.filename)
22
23 def test_add_message_context(self):
24 """Adding a message context works correctly."""
25 now = time.time()
26 self.store1.add_message_context(123, 'abc', 'change-packages')
27
28 db = sqlite3.connect(self.store2._filename)
29 cursor = db.cursor()
30 cursor.execute(
31 "SELECT operation_id, secure_id, message_type, timestamp "
32 "FROM message_context WHERE operation_id=?", (123,))
33 results = cursor.fetchall()
34 self.assertEquals(1, len(results))
35 [row] = results
36 self.assertEquals(123, row[0])
37 self.assertEquals('abc', row[1])
38 self.assertEquals('change-packages', row[2])
39 self.assertTrue(row[3] > now)
40
41 def test_add_message_context_with_duplicate_operation_id(self):
42 """Only one message context with a given operation-id is permitted."""
43 self.store1.add_message_context(123, 'abc', 'change-packages')
44 self.assertRaises(
45 sqlite3.IntegrityError,
46 self.store1.add_message_context, 123, 'def', 'change-packages')
47
48 def test_get_message_context(self):
49 """
50 Accessing a C{MessageContext} with an existing C{operation-id} works.
51 """
52 now = time.time()
53 self.store1.add_message_context(234, 'bcd', 'change-packages')
54 context = self.store2.get_message_context(234)
55 self.assertEquals(234, context.operation_id)
56 self.assertEquals('bcd', context.secure_id)
57 self.assertEquals('change-packages', context.message_type)
58 self.assertTrue(context.timestamp > now)
59
60 def test_get_message_context_with_nonexistent_operation_id(self):
61 """Attempts to access a C{MessageContext} with a non-existent
62 C{operation-id} result in C{None}."""
63 self.assertIs(None, self.store1.get_message_context(999))
64
65 def test_message_context_remove(self):
66 """C{MessageContext}s are deleted correctly."""
67 context = self.store1.add_message_context(
68 345, 'opq', 'change-packages')
69 context.remove()
70 self.assertIs(None, self.store1.get_message_context(345))
71
72 def test_all_operation_ids_for_empty_database(self):
73 """
74 Calling C{all_operation_ids} on an empty database returns an empty
75 list.
76 """
77 self.assertEquals([], self.store1.all_operation_ids())
78
79 def test_all_operation_ids(self):
80 """C{all_operation_ids} works correctly."""
81 self.store1.add_message_context(456, 'cde', 'change-packages')
82 self.assertEquals([456], self.store2.all_operation_ids())
83 self.store2.add_message_context(567, 'def', 'change-packages')
84 self.assertEquals([456, 567], self.store1.all_operation_ids())
085
=== added file 'landscape/lib/store.py'
--- landscape/lib/store.py 1970-01-01 00:00:00 +0000
+++ landscape/lib/store.py 2010-04-30 19:04:26 +0000
@@ -0,0 +1,39 @@
1"""Functions used by all sqlite-backed stores."""
2
3try:
4 import sqlite3
5except ImportError:
6 from pysqlite2 import dbapi2 as sqlite3
7
8
9def with_cursor(method):
10 """Decorator that encloses the method in a database transaction.
11
12 Even though SQLite is supposed to be useful in autocommit mode, we've
13 found cases where the database continued to be locked for writing
14 until the cursor was closed. With this in mind, instead of using
15 the autocommit mode, we explicitly terminate transactions and enforce
16 cursor closing with this decorator.
17 """
18
19 def inner(self, *args, **kwargs):
20 if not self._db:
21 # Create the database connection only when we start to actually
22 # use it. This is essentially just a workaroud of a sqlite bug
23 # happening when 2 concurrent processes try to create the tables
24 # around the same time, the one which fails having an incorrect
25 # cache and not seeing the tables
26 self._db = sqlite3.connect(self._filename)
27 self._ensure_schema()
28 try:
29 cursor = self._db.cursor()
30 try:
31 result = method(self, cursor, *args, **kwargs)
32 finally:
33 cursor.close()
34 self._db.commit()
35 except:
36 self._db.rollback()
37 raise
38 return result
39 return inner
040
=== modified file 'landscape/package/store.py'
--- landscape/package/store.py 2009-11-25 09:48:00 +0000
+++ landscape/package/store.py 2010-04-30 19:04:26 +0000
@@ -7,6 +7,7 @@
7 from pysqlite2 import dbapi2 as sqlite37 from pysqlite2 import dbapi2 as sqlite3
88
9from landscape.lib import bpickle9from landscape.lib import bpickle
10from landscape.lib.store import with_cursor
1011
1112
12class UnknownHashIDRequest(Exception):13class UnknownHashIDRequest(Exception):
@@ -17,39 +18,6 @@
17 """Raised when trying to add an invalid hash=>id lookaside database."""18 """Raised when trying to add an invalid hash=>id lookaside database."""
1819
1920
20def with_cursor(method):
21 """Decorator that encloses the method in a database transaction.
22
23 Even though SQLite is supposed to be useful in autocommit mode, we've
24 found cases where the database continued to be locked for writing
25 until the cursor was closed. With this in mind, instead of using
26 the autocommit mode, we explicitly terminate transactions and enforce
27 cursor closing with this decorator.
28 """
29
30 def inner(self, *args, **kwargs):
31 if not self._db:
32 # Create the database connection only when we start to actually
33 # use it. This is essentially just a workaroud of a sqlite bug
34 # happening when 2 concurrent processes try to create the tables
35 # around the same time, the one which fails having an incorrect
36 # cache and not seeing the tables
37 self._db = sqlite3.connect(self._filename)
38 self._ensure_schema()
39 try:
40 cursor = self._db.cursor()
41 try:
42 result = method(self, cursor, *args, **kwargs)
43 finally:
44 cursor.close()
45 self._db.commit()
46 except:
47 self._db.rollback()
48 raise
49 return result
50 return inner
51
52
53class HashIdStore(object):21class HashIdStore(object):
54 """Persist package hash=>id mappings in a file.22 """Persist package hash=>id mappings in a file.
5523

Subscribers

People subscribed via source and target branches

to all changes: