Merge lp:~divmod-dev/divmod.org/batch-process-start-heuristic-2963 into lp:divmod.org

Proposed by Jean-Paul Calderone
Status: Merged
Approved by: Tristan Seligmann
Approved revision: 2679
Merged at revision: 2694
Proposed branch: lp:~divmod-dev/divmod.org/batch-process-start-heuristic-2963
Merge into: lp:divmod.org
Diff against target: 272 lines (+174/-3)
3 files modified
Axiom/axiom/batch.py (+43/-3)
Axiom/axiom/iaxiom.py (+10/-0)
Axiom/axiom/test/test_batch.py (+121/-0)
To merge this branch: bzr merge lp:~divmod-dev/divmod.org/batch-process-start-heuristic-2963
Reviewer Review Type Date Requested Status
Tristan Seligmann Approve
Review via email: mp+87279@code.launchpad.net

Description of the change

This changes the batch processing system to start the batch process any time there are remote listeners for an item that is added. This doesn't make the behaviour strictly correct, but it improves it enough to make Quotient usable.

To post a comment you must log in.
Revision history for this message
Tristan Seligmann (mithrandi) wrote :

This seems like a reasonable fix in lieu of something more complex (and I'm not sure what that would look like). Please merge.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'Axiom/axiom/batch.py'
2--- Axiom/axiom/batch.py 2011-08-20 01:48:56 +0000
3+++ Axiom/axiom/batch.py 2012-01-02 18:03:59 +0000
4@@ -400,7 +400,8 @@
5 processor is being added to the database.
6
7 If this processor is not already scheduled to run, this will schedule
8- it.
9+ it. It will also start the batch process if it is not yet running and
10+ there are any registered remote listeners.
11 """
12 localCount = self.store.query(
13 _ReliableListener,
14@@ -408,9 +409,19 @@
15 _ReliableListener.style == iaxiom.LOCAL),
16 limit=1).count()
17
18+ remoteCount = self.store.query(
19+ _ReliableListener,
20+ attributes.AND(_ReliableListener.processor == self,
21+ _ReliableListener.style == iaxiom.REMOTE),
22+ limit=1).count()
23+
24 if localCount and self.scheduled is None:
25 self.scheduled = extime.Time()
26 iaxiom.IScheduler(self.store).schedule(self, self.scheduled)
27+ if remoteCount:
28+ batchService = iaxiom.IBatchService(self.store, None)
29+ if batchService is not None:
30+ batchService.start()
31
32
33
34@@ -894,7 +905,13 @@
35 """
36 Controls starting, stopping, and passing messages to the system process in
37 charge of remote batch processing.
38+
39+ @ivar batchController: A reference to the L{ProcessController} for
40+ interacting with the batch process, if one exists. Otherwise C{None}.
41 """
42+ implements(iaxiom.IBatchService)
43+
44+ batchController = None
45
46 def __init__(self, store):
47 self.store = store
48@@ -948,6 +965,11 @@
49 method=method).do)
50
51
52+ def start(self):
53+ if self.batchController is not None:
54+ self.batchController.getProcess()
55+
56+
57 def suspend(self, storepath, storeID):
58 return self.batchController.getProcess().addCallback(
59 SuspendProcessor(storepath=storepath, storeid=storeID).do)
60@@ -977,6 +999,10 @@
61 return self.service.call(itemMethod)
62
63
64+ def start(self):
65+ self.service.start()
66+
67+
68 def suspend(self, storeID):
69 return self.service.suspend(self.storepath, storeID)
70
71@@ -987,9 +1013,23 @@
72
73
74 def storeBatchServiceSpecialCase(st, pups):
75+ """
76+ Adapt a L{Store} to L{IBatchService}.
77+
78+ If C{st} is a substore, return a simple wrapper that delegates to the site
79+ store's L{IBatchService} powerup. Return C{None} if C{st} has no
80+ L{BatchProcessingControllerService}.
81+ """
82 if st.parent is not None:
83- return _SubStoreBatchChannel(st)
84- return service.IService(st).getServiceNamed("Batch Processing Controller")
85+ try:
86+ return _SubStoreBatchChannel(st)
87+ except TypeError:
88+ return None
89+ storeService = service.IService(st)
90+ try:
91+ return storeService.getServiceNamed("Batch Processing Controller")
92+ except KeyError:
93+ return None
94
95
96
97
98=== modified file 'Axiom/axiom/iaxiom.py'
99--- Axiom/axiom/iaxiom.py 2009-07-07 20:35:43 +0000
100+++ Axiom/axiom/iaxiom.py 2012-01-02 18:03:59 +0000
101@@ -309,11 +309,18 @@
102 """
103
104
105+
106 class IBatchService(Interface):
107 """
108 Object which allows minimal communication with L{IReliableListener}
109 providers which are running remotely (that is, with the L{REMOTE} style).
110 """
111+ def start():
112+ """
113+ Start the remote batch process if it has not yet been started, otherwise
114+ do nothing.
115+ """
116+
117
118 def suspend(storeID):
119 """
120@@ -324,6 +331,7 @@
121 @return: A Deferred which fires when the listener has been suspended.
122 """
123
124+
125 def resume(storeID):
126 """
127 @type storeID: C{int}
128@@ -333,6 +341,8 @@
129 @return: A Deferred which fires when the listener has been resumed.
130 """
131
132+
133+
134 class IVersion(Interface):
135 """
136 Object with version information for a package that creates Axiom
137
138=== modified file 'Axiom/axiom/test/test_batch.py'
139--- Axiom/axiom/test/test_batch.py 2011-08-19 23:30:29 +0000
140+++ Axiom/axiom/test/test_batch.py 2012-01-02 18:03:59 +0000
141@@ -531,6 +531,29 @@
142
143
144 class RemoteTestCase(unittest.TestCase):
145+ def test_noBatchService(self):
146+ """
147+ A L{Store} with no database directory cannot be adapted to
148+ L{iaxiom.IBatchService}.
149+ """
150+ st = store.Store()
151+ self.assertRaises(TypeError, iaxiom.IBatchService, st)
152+ self.assertIdentical(
153+ iaxiom.IBatchService(st, None), None)
154+
155+
156+ def test_subStoreNoBatchService(self):
157+ """
158+ A user L{Store} attached to a site L{Store} with no database directory
159+ cannot be adapted to L{iaxiom.IBatchService}.
160+ """
161+ st = store.Store(filesdir=self.mktemp())
162+ ss = substore.SubStore.createNew(st, 'substore').open()
163+ self.assertRaises(TypeError, iaxiom.IBatchService, ss)
164+ self.assertIdentical(
165+ iaxiom.IBatchService(ss, None), None)
166+
167+
168 def testBatchService(self):
169 """
170 Make sure SubStores can be adapted to L{iaxiom.IBatchService}.
171@@ -606,3 +629,101 @@
172 self.assertEquals(
173 st.query(BatchWorkItem, BatchWorkItem.value == u"processed").count(),
174 BATCH_WORK_UNITS)
175+
176+
177+ def test_itemAddedStartsBatchProcess(self):
178+ """
179+ If there are remote-style listeners for an item source, C{itemAdded}
180+ starts the batch process.
181+
182+ This is not completely correct. There may be items to process remotely
183+ when the main process starts up, before any new items are added. This
184+ is simpler to implement, but it shouldn't be taken as a reason not to
185+ implement the actually correct solution.
186+ """
187+ st = store.Store(self.mktemp())
188+ svc = service.IService(st)
189+ svc.startService()
190+ self.addCleanup(svc.stopService)
191+
192+ batchService = iaxiom.IBatchService(st)
193+
194+ procType = batch.processor(TestWorkUnit)
195+ proc = procType(store=st)
196+ listener = WorkListener(store=st)
197+ proc.addReliableListener(listener, style=iaxiom.REMOTE)
198+
199+ # Sanity check: addReliableListener should eventually also trigger a
200+ # batch process start if necessary. But we don't want to test that case
201+ # here, so make sure it's not happening.
202+ self.assertEquals(batchService.batchController.mode, 'stopped')
203+
204+ # Now trigger it to start.
205+ proc.itemAdded()
206+
207+ # It probably won't be ready by now, but who knows.
208+ self.assertIn(batchService.batchController.mode, ('starting', 'ready'))
209+
210+
211+ def test_itemAddedBeforeStarted(self):
212+ """
213+ If C{itemAdded} is called before the batch service is started, the batch
214+ process is not started.
215+ """
216+ st = store.Store(self.mktemp())
217+
218+ procType = batch.processor(TestWorkUnit)
219+ proc = procType(store=st)
220+ listener = WorkListener(store=st)
221+ proc.addReliableListener(listener, style=iaxiom.REMOTE)
222+
223+ proc.itemAdded()
224+
225+ # When the service later starts, the batch service needn't start its
226+ # process. Not that this would be bad. Feel free to reverse this
227+ # behavior if you really want.
228+ svc = service.IService(st)
229+ svc.startService()
230+ self.addCleanup(svc.stopService)
231+
232+ batchService = iaxiom.IBatchService(st)
233+ self.assertEquals(batchService.batchController.mode, 'stopped')
234+
235+
236+ def test_itemAddedWithoutBatchService(self):
237+ """
238+ If the store has no batch service, C{itemAdded} doesn't start the batch
239+ process and also doesn't raise an exception.
240+ """
241+ # An in-memory store can't have a batch service.
242+ st = store.Store()
243+ svc = service.IService(st)
244+ svc.startService()
245+ self.addCleanup(svc.stopService)
246+
247+ procType = batch.processor(TestWorkUnit)
248+ proc = procType(store=st)
249+ listener = WorkListener(store=st)
250+ proc.addReliableListener(listener, style=iaxiom.REMOTE)
251+
252+ proc.itemAdded()
253+
254+ # And still there should be no batch service at all.
255+ self.assertIdentical(iaxiom.IBatchService(st, None), None)
256+
257+
258+ def test_subStoreBatchServiceStart(self):
259+ """
260+ The substore implementation of L{IBatchService.start} starts the batch
261+ process.
262+ """
263+ st = store.Store(self.mktemp())
264+ svc = service.IService(st)
265+ svc.startService()
266+ self.addCleanup(svc.stopService)
267+
268+ ss = substore.SubStore.createNew(st, 'substore').open()
269+ iaxiom.IBatchService(ss).start()
270+
271+ batchService = iaxiom.IBatchService(st)
272+ self.assertIn(batchService.batchController.mode, ('starting', 'ready'))

Subscribers

People subscribed via source and target branches

to all changes: