Merge lp:~divmod-dev/divmod.org/batch-process-start-heuristic-2963 into lp:divmod.org

Proposed by Jean-Paul Calderone
Status: Merged
Approved by: Tristan Seligmann
Approved revision: 2679
Merged at revision: 2694
Proposed branch: lp:~divmod-dev/divmod.org/batch-process-start-heuristic-2963
Merge into: lp:divmod.org
Diff against target: 272 lines (+174/-3)
3 files modified
Axiom/axiom/batch.py (+43/-3)
Axiom/axiom/iaxiom.py (+10/-0)
Axiom/axiom/test/test_batch.py (+121/-0)
To merge this branch: bzr merge lp:~divmod-dev/divmod.org/batch-process-start-heuristic-2963
Reviewer Review Type Date Requested Status
Tristan Seligmann Approve
Review via email: mp+87279@code.launchpad.net

Description of the change

This changes the batch processing system to start the batch process any time there are remote listeners for an item that is added. This doesn't make the behaviour strictly correct, but it improves it enough to make Quotient usable.

To post a comment you must log in.
Revision history for this message
Tristan Seligmann (mithrandi) wrote :

This seems like a reasonable fix in lieu of something more complex (and I'm not sure what that would look like). Please merge.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'Axiom/axiom/batch.py'
--- Axiom/axiom/batch.py 2011-08-20 01:48:56 +0000
+++ Axiom/axiom/batch.py 2012-01-02 18:03:59 +0000
@@ -400,7 +400,8 @@
400 processor is being added to the database.400 processor is being added to the database.
401401
402 If this processor is not already scheduled to run, this will schedule402 If this processor is not already scheduled to run, this will schedule
403 it.403 it. It will also start the batch process if it is not yet running and
404 there are any registered remote listeners.
404 """405 """
405 localCount = self.store.query(406 localCount = self.store.query(
406 _ReliableListener,407 _ReliableListener,
@@ -408,9 +409,19 @@
408 _ReliableListener.style == iaxiom.LOCAL),409 _ReliableListener.style == iaxiom.LOCAL),
409 limit=1).count()410 limit=1).count()
410411
412 remoteCount = self.store.query(
413 _ReliableListener,
414 attributes.AND(_ReliableListener.processor == self,
415 _ReliableListener.style == iaxiom.REMOTE),
416 limit=1).count()
417
411 if localCount and self.scheduled is None:418 if localCount and self.scheduled is None:
412 self.scheduled = extime.Time()419 self.scheduled = extime.Time()
413 iaxiom.IScheduler(self.store).schedule(self, self.scheduled)420 iaxiom.IScheduler(self.store).schedule(self, self.scheduled)
421 if remoteCount:
422 batchService = iaxiom.IBatchService(self.store, None)
423 if batchService is not None:
424 batchService.start()
414425
415426
416427
@@ -894,7 +905,13 @@
894 """905 """
895 Controls starting, stopping, and passing messages to the system process in906 Controls starting, stopping, and passing messages to the system process in
896 charge of remote batch processing.907 charge of remote batch processing.
908
909 @ivar batchController: A reference to the L{ProcessController} for
910 interacting with the batch process, if one exists. Otherwise C{None}.
897 """911 """
912 implements(iaxiom.IBatchService)
913
914 batchController = None
898915
899 def __init__(self, store):916 def __init__(self, store):
900 self.store = store917 self.store = store
@@ -948,6 +965,11 @@
948 method=method).do)965 method=method).do)
949966
950967
968 def start(self):
969 if self.batchController is not None:
970 self.batchController.getProcess()
971
972
951 def suspend(self, storepath, storeID):973 def suspend(self, storepath, storeID):
952 return self.batchController.getProcess().addCallback(974 return self.batchController.getProcess().addCallback(
953 SuspendProcessor(storepath=storepath, storeid=storeID).do)975 SuspendProcessor(storepath=storepath, storeid=storeID).do)
@@ -977,6 +999,10 @@
977 return self.service.call(itemMethod)999 return self.service.call(itemMethod)
9781000
9791001
1002 def start(self):
1003 self.service.start()
1004
1005
980 def suspend(self, storeID):1006 def suspend(self, storeID):
981 return self.service.suspend(self.storepath, storeID)1007 return self.service.suspend(self.storepath, storeID)
9821008
@@ -987,9 +1013,23 @@
9871013
9881014
989def storeBatchServiceSpecialCase(st, pups):1015def storeBatchServiceSpecialCase(st, pups):
1016 """
1017 Adapt a L{Store} to L{IBatchService}.
1018
1019 If C{st} is a substore, return a simple wrapper that delegates to the site
1020 store's L{IBatchService} powerup. Return C{None} if C{st} has no
1021 L{BatchProcessingControllerService}.
1022 """
990 if st.parent is not None:1023 if st.parent is not None:
991 return _SubStoreBatchChannel(st)1024 try:
992 return service.IService(st).getServiceNamed("Batch Processing Controller")1025 return _SubStoreBatchChannel(st)
1026 except TypeError:
1027 return None
1028 storeService = service.IService(st)
1029 try:
1030 return storeService.getServiceNamed("Batch Processing Controller")
1031 except KeyError:
1032 return None
9931033
9941034
9951035
9961036
=== modified file 'Axiom/axiom/iaxiom.py'
--- Axiom/axiom/iaxiom.py 2009-07-07 20:35:43 +0000
+++ Axiom/axiom/iaxiom.py 2012-01-02 18:03:59 +0000
@@ -309,11 +309,18 @@
309 """309 """
310310
311311
312
312class IBatchService(Interface):313class IBatchService(Interface):
313 """314 """
314 Object which allows minimal communication with L{IReliableListener}315 Object which allows minimal communication with L{IReliableListener}
315 providers which are running remotely (that is, with the L{REMOTE} style).316 providers which are running remotely (that is, with the L{REMOTE} style).
316 """317 """
318 def start():
319 """
320 Start the remote batch process if it has not yet been started, otherwise
321 do nothing.
322 """
323
317324
318 def suspend(storeID):325 def suspend(storeID):
319 """326 """
@@ -324,6 +331,7 @@
324 @return: A Deferred which fires when the listener has been suspended.331 @return: A Deferred which fires when the listener has been suspended.
325 """332 """
326333
334
327 def resume(storeID):335 def resume(storeID):
328 """336 """
329 @type storeID: C{int}337 @type storeID: C{int}
@@ -333,6 +341,8 @@
333 @return: A Deferred which fires when the listener has been resumed.341 @return: A Deferred which fires when the listener has been resumed.
334 """342 """
335343
344
345
336class IVersion(Interface):346class IVersion(Interface):
337 """347 """
338 Object with version information for a package that creates Axiom348 Object with version information for a package that creates Axiom
339349
=== modified file 'Axiom/axiom/test/test_batch.py'
--- Axiom/axiom/test/test_batch.py 2011-08-19 23:30:29 +0000
+++ Axiom/axiom/test/test_batch.py 2012-01-02 18:03:59 +0000
@@ -531,6 +531,29 @@
531531
532532
533class RemoteTestCase(unittest.TestCase):533class RemoteTestCase(unittest.TestCase):
534 def test_noBatchService(self):
535 """
536 A L{Store} with no database directory cannot be adapted to
537 L{iaxiom.IBatchService}.
538 """
539 st = store.Store()
540 self.assertRaises(TypeError, iaxiom.IBatchService, st)
541 self.assertIdentical(
542 iaxiom.IBatchService(st, None), None)
543
544
545 def test_subStoreNoBatchService(self):
546 """
547 A user L{Store} attached to a site L{Store} with no database directory
548 cannot be adapted to L{iaxiom.IBatchService}.
549 """
550 st = store.Store(filesdir=self.mktemp())
551 ss = substore.SubStore.createNew(st, 'substore').open()
552 self.assertRaises(TypeError, iaxiom.IBatchService, ss)
553 self.assertIdentical(
554 iaxiom.IBatchService(ss, None), None)
555
556
534 def testBatchService(self):557 def testBatchService(self):
535 """558 """
536 Make sure SubStores can be adapted to L{iaxiom.IBatchService}.559 Make sure SubStores can be adapted to L{iaxiom.IBatchService}.
@@ -606,3 +629,101 @@
606 self.assertEquals(629 self.assertEquals(
607 st.query(BatchWorkItem, BatchWorkItem.value == u"processed").count(),630 st.query(BatchWorkItem, BatchWorkItem.value == u"processed").count(),
608 BATCH_WORK_UNITS)631 BATCH_WORK_UNITS)
632
633
634 def test_itemAddedStartsBatchProcess(self):
635 """
636 If there are remote-style listeners for an item source, C{itemAdded}
637 starts the batch process.
638
639 This is not completely correct. There may be items to process remotely
640 when the main process starts up, before any new items are added. This
641 is simpler to implement, but it shouldn't be taken as a reason not to
642 implement the actually correct solution.
643 """
644 st = store.Store(self.mktemp())
645 svc = service.IService(st)
646 svc.startService()
647 self.addCleanup(svc.stopService)
648
649 batchService = iaxiom.IBatchService(st)
650
651 procType = batch.processor(TestWorkUnit)
652 proc = procType(store=st)
653 listener = WorkListener(store=st)
654 proc.addReliableListener(listener, style=iaxiom.REMOTE)
655
656 # Sanity check: addReliableListener should eventually also trigger a
657 # batch process start if necessary. But we don't want to test that case
658 # here, so make sure it's not happening.
659 self.assertEquals(batchService.batchController.mode, 'stopped')
660
661 # Now trigger it to start.
662 proc.itemAdded()
663
664 # It probably won't be ready by now, but who knows.
665 self.assertIn(batchService.batchController.mode, ('starting', 'ready'))
666
667
668 def test_itemAddedBeforeStarted(self):
669 """
670 If C{itemAdded} is called before the batch service is started, the batch
671 process is not started.
672 """
673 st = store.Store(self.mktemp())
674
675 procType = batch.processor(TestWorkUnit)
676 proc = procType(store=st)
677 listener = WorkListener(store=st)
678 proc.addReliableListener(listener, style=iaxiom.REMOTE)
679
680 proc.itemAdded()
681
682 # When the service later starts, the batch service needn't start its
683 # process. Not that this would be bad. Feel free to reverse this
684 # behavior if you really want.
685 svc = service.IService(st)
686 svc.startService()
687 self.addCleanup(svc.stopService)
688
689 batchService = iaxiom.IBatchService(st)
690 self.assertEquals(batchService.batchController.mode, 'stopped')
691
692
693 def test_itemAddedWithoutBatchService(self):
694 """
695 If the store has no batch service, C{itemAdded} doesn't start the batch
696 process and also doesn't raise an exception.
697 """
698 # An in-memory store can't have a batch service.
699 st = store.Store()
700 svc = service.IService(st)
701 svc.startService()
702 self.addCleanup(svc.stopService)
703
704 procType = batch.processor(TestWorkUnit)
705 proc = procType(store=st)
706 listener = WorkListener(store=st)
707 proc.addReliableListener(listener, style=iaxiom.REMOTE)
708
709 proc.itemAdded()
710
711 # And still there should be no batch service at all.
712 self.assertIdentical(iaxiom.IBatchService(st, None), None)
713
714
715 def test_subStoreBatchServiceStart(self):
716 """
717 The substore implementation of L{IBatchService.start} starts the batch
718 process.
719 """
720 st = store.Store(self.mktemp())
721 svc = service.IService(st)
722 svc.startService()
723 self.addCleanup(svc.stopService)
724
725 ss = substore.SubStore.createNew(st, 'substore').open()
726 iaxiom.IBatchService(ss).start()
727
728 batchService = iaxiom.IBatchService(st)
729 self.assertIn(batchService.batchController.mode, ('starting', 'ready'))

Subscribers

People subscribed via source and target branches

to all changes: