Merge lp:~dobey/ubuntuone-client/queue-limit-3-0 into lp:ubuntuone-client/stable-3-0

Proposed by dobey
Status: Merged
Approved by: dobey
Approved revision: 1184
Merged at revision: 1185
Proposed branch: lp:~dobey/ubuntuone-client/queue-limit-3-0
Merge into: lp:ubuntuone-client/stable-3-0
Diff against target: 148 lines (+114/-2)
2 files modified
tests/syncdaemon/test_action_queue.py (+111/-0)
ubuntuone/syncdaemon/action_queue.py (+3/-2)
To merge this branch: bzr merge lp:~dobey/ubuntuone-client/queue-limit-3-0
Reviewer Review Type Date Requested Status
Zachery Bir (community) Approve
Eric Casteleijn (community) Approve
Review via email: mp+103174@code.launchpad.net

Commit message

Use the correct comparison to decide in which queue (LP: #978903).

To post a comment you must log in.
Revision history for this message
Eric Casteleijn (thisfred) :
review: Approve
Revision history for this message
Zachery Bir (urbanape) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'tests/syncdaemon/test_action_queue.py'
2--- tests/syncdaemon/test_action_queue.py 2012-04-09 20:07:05 +0000
3+++ tests/syncdaemon/test_action_queue.py 2012-04-24 19:00:28 +0000
4@@ -31,6 +31,7 @@
5 from __future__ import with_statement
6
7 import base64
8+import collections
9 import inspect
10 import logging
11 import operator
12@@ -120,6 +121,26 @@
13 return self.check(logger.NOTE, *msgs)
14
15
16+class FakeOffloadQueue(object):
17+ """Fake replacemente for offload_queue."""
18+ def __init__(self):
19+ self.queue = collections.deque()
20+
21+ def push(self, item):
22+ """Push it."""
23+ self.queue.append(item)
24+
25+ def pop(self):
26+ """Pop it."""
27+ return self.queue.popleft()
28+
29+ def __len__(self):
30+ return len(self.queue)
31+
32+ def __getitem__(self, idx):
33+ return self.queue[idx]
34+
35+
36 class FakeMagicHash(object):
37 """Fake magic hash."""
38 _magic_hash = '666'
39@@ -5858,3 +5879,93 @@
40 self.assertEqual(called[0], ((FakeCommand, 'arg0'), {'bar': 'baz'}))
41 self.assertEqual(called[1], ((FakeCommand, 'arg1'), {'foo': 'bar'}))
42 self.assertEqual(called[2], ((FakeCommand, 'arg2'), {}))
43+
44+ def test_execute_pushing_popping(self):
45+ """Exercise the limits when pushing/popping to disk."""
46+ aq = self.action_queue
47+ aq.memory_pool_limit = 2
48+
49+ def _fake_execute(_, cmd):
50+ """Don't really execute, but store and return deferred.
51+
52+ It also handles the queue.
53+ """
54+ d = defer.Deferred()
55+ commands.append((cmd, d))
56+ aq.queue.append(cmd)
57+
58+ def remove(_):
59+ aq.queue.remove(cmd)
60+ commands.remove((cmd, d))
61+
62+ d.addCallback(remove)
63+ return d
64+
65+ commands = []
66+ self.patch(aq, '_really_execute', _fake_execute)
67+ aq.disk_queue = FakeOffloadQueue()
68+ aq.queue = []
69+ aq.commands[FakeCommand.__name__] = FakeCommand
70+
71+ # send two commands, they should be executed right away
72+ aq.execute(FakeCommand, 1)
73+ aq.execute(FakeCommand, 2)
74+ self.assertEqual(commands[0][0], 1)
75+ self.assertEqual(commands[1][0], 2)
76+
77+ # send a third and fourth commands, they should be offloaded
78+ aq.execute(FakeCommand, 3)
79+ aq.execute(FakeCommand, 4)
80+ self.assertEqual(len(commands), 2)
81+ self.assertEqual(len(aq.disk_queue), 2)
82+ self.assertEqual(aq.disk_queue[0], ('FakeCommand', (3,), {}))
83+ self.assertEqual(aq.disk_queue[1], ('FakeCommand', (4,), {}))
84+
85+ # finish command 1, it should pop and execute command 3
86+ commands[0][1].callback(True)
87+ self.assertEqual(len(commands), 2)
88+ self.assertEqual(commands[0][0], 2)
89+ self.assertEqual(commands[1][0], 3)
90+ self.assertEqual(len(aq.disk_queue), 1)
91+ self.assertEqual(aq.disk_queue[0], ('FakeCommand', (4,), {}))
92+
93+ # other command should go offload
94+ aq.execute(FakeCommand, 5)
95+ self.assertEqual(len(commands), 2)
96+ self.assertEqual(len(aq.disk_queue), 2)
97+ self.assertEqual(aq.disk_queue[0], ('FakeCommand', (4,), {}))
98+ self.assertEqual(aq.disk_queue[1], ('FakeCommand', (5,), {}))
99+
100+ # finish commands 2 and 3... 4 and 5 should go in
101+ commands[0][1].callback(True)
102+ commands[0][1].callback(True)
103+ self.assertEqual(len(commands), 2)
104+ self.assertEqual(commands[0][0], 4)
105+ self.assertEqual(commands[1][0], 5)
106+ self.assertEqual(len(aq.disk_queue), 0)
107+
108+ # even in the edge, another command should be offloaded
109+ aq.execute(FakeCommand, 6)
110+ self.assertEqual(len(commands), 2)
111+ self.assertEqual(len(aq.disk_queue), 1)
112+ self.assertEqual(aq.disk_queue[0], ('FakeCommand', (6,), {}))
113+
114+ # finish 4 and 5, we should only have 6 left
115+ commands[0][1].callback(True)
116+ commands[0][1].callback(True)
117+ self.assertEqual(len(commands), 1)
118+ self.assertEqual(commands[0][0], 6)
119+ self.assertEqual(len(aq.disk_queue), 0)
120+
121+ # one below the limit, next op should be executed right away
122+ aq.execute(FakeCommand, 7)
123+ self.assertEqual(len(commands), 2)
124+ self.assertEqual(commands[0][0], 6)
125+ self.assertEqual(commands[1][0], 7)
126+ self.assertEqual(len(aq.disk_queue), 0)
127+
128+ # finish 6 and 7, all clean
129+ commands[0][1].callback(True)
130+ commands[0][1].callback(True)
131+ self.assertEqual(len(commands), 0)
132+ self.assertEqual(len(aq.disk_queue), 0)
133
134=== modified file 'ubuntuone/syncdaemon/action_queue.py'
135--- ubuntuone/syncdaemon/action_queue.py 2012-04-09 20:08:42 +0000
136+++ ubuntuone/syncdaemon/action_queue.py 2012-04-24 19:00:28 +0000
137@@ -1095,8 +1095,9 @@
138 @defer.inlineCallbacks
139 def execute(self, command_class, *args, **kwargs):
140 """Execute a command only if there's room in memory to handle it."""
141- if len(self.queue) > self.memory_pool_limit:
142- # not enough room in memory, store it in the offloaded queue
143+ if len(self.queue) >= self.memory_pool_limit:
144+ # already in the limit, can't go further as we don't have
145+ # more room in memory, store it in the offloaded queue
146 logger.debug('offload push: %s %s %s', command_class.__name__, args, kwargs)
147 self.disk_queue.push((command_class.__name__, args, kwargs))
148 return

Subscribers

People subscribed via source and target branches

to all changes: