Merge lp:~exarkun/ampoule/simpler-doWork-deferreds into lp:ampoule

Proposed by Jean-Paul Calderone
Status: Merged
Merged at revision: 46
Proposed branch: lp:~exarkun/ampoule/simpler-doWork-deferreds
Merge into: lp:ampoule
Diff against target: 194 lines (+44/-40)
3 files modified
ampoule/main.py (+1/-1)
ampoule/pool.py (+7/-14)
ampoule/test/test_process.py (+36/-25)
To merge this branch: bzr merge lp:~exarkun/ampoule/simpler-doWork-deferreds
Reviewer Review Type Date Requested Status
dialtone Approve
Review via email: mp+24394@code.launchpad.net

Description of the change

Started off just trying to simplify _cb_doWork. This involved getting the tests to pass reliably, though. So, the branch implements one minor simplification to _cb_doWork, and changes a handful of tests in order to make them pass reliably on my desktop.

To post a comment you must log in.
Revision history for this message
dialtone (dialtone) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'ampoule/main.py'
2--- ampoule/main.py 2010-04-28 15:46:27 +0000
3+++ ampoule/main.py 2010-04-29 04:10:34 +0000
4@@ -189,7 +189,7 @@
5 self.uid = uid
6 self.gid = gid
7 self.usePTY = usePTY
8- self.packages = packages
9+ self.packages = ("ampoule",) + packages
10 self.childReactor = childReactor
11
12 def __repr__(self):
13
14=== modified file 'ampoule/pool.py'
15--- ampoule/pool.py 2010-04-28 16:39:58 +0000
16+++ ampoule/pool.py 2010-04-29 04:10:34 +0000
17@@ -157,8 +157,8 @@
18 """
19 if self._queue:
20 _, (d, command, kwargs) = pop(self._queue)
21- self._cb_doWork(command, _d=d, **kwargs)
22-
23+ self._cb_doWork(command, **kwargs).chainDeferred(d)
24+
25 def _handleTimeout(self, child):
26 """
27 One of the children went timeout, we need to deal with it
28@@ -197,7 +197,7 @@
29 ampChildArgs=self.ampChildArgs)
30 return self._addProcess(child, finished)
31
32- def _cb_doWork(self, command, _d=None, _timeout=None, _deadline=None,
33+ def _cb_doWork(self, command, _timeout=None, _deadline=None,
34 **kwargs):
35 """
36 Go and call the command.
37@@ -239,12 +239,7 @@
38 # the process might have received tons of calls already
39 # which would make it run more calls than what is
40 # configured to do.
41- if is_error:
42- _d.errback(result)
43- return _d
44- else:
45- _d.callback(result)
46- return _d
47+ return result
48
49 die = False
50 child = self.ready.pop()
51@@ -295,10 +290,8 @@
52
53 @param kwargs: dictionary containing the arguments for the command.
54 """
55- d = defer.Deferred()
56 if self.ready: # there are unused processes, let's use them
57- self._cb_doWork(command, _d=d, **kwargs)
58- return d
59+ return self._cb_doWork(command, **kwargs)
60 else:
61 if len(self.processes) < self.max:
62 # no unused but we can start some new ones
63@@ -309,11 +302,11 @@
64 # Process pool with min=1, max=1, recycle_after=1
65 # [call(Command) for x in xrange(BIG_NUMBER)]
66 self.startAWorker()
67- self._cb_doWork(command, _d=d, **kwargs)
68- return d
69+ return self._cb_doWork(command, **kwargs)
70 else:
71 # No one is free... just queue up and wait for a process
72 # to start and pick up the first item in the queue.
73+ d = defer.Deferred()
74 self._queue.append((count(), (d, command, kwargs)))
75 return d
76
77
78=== modified file 'ampoule/test/test_process.py'
79--- ampoule/test/test_process.py 2010-03-01 19:35:21 +0000
80+++ ampoule/test/test_process.py 2010-04-29 04:10:34 +0000
81@@ -309,12 +309,10 @@
82 def checkBootstrap(response):
83 cwd.append(response['cwd'])
84 self.assertNotEquals(cwd, os.getcwd())
85- def assertNotExists(path):
86- self.assertFalse()
87- c.callRemote(GetCWD
88- ).addCallback(checkBootstrap
89- ).addCallback(lambda _: c.callRemote(commands.Shutdown)
90- ).addCallback(lambda _: self.assertFalse(os.path.exists(cwd[0])))
91+ d = c.callRemote(GetCWD)
92+ d.addCallback(checkBootstrap)
93+ d.addCallback(lambda _: c.callRemote(commands.Shutdown))
94+ finished.addCallback(lambda _: self.assertFalse(os.path.exists(cwd[0])))
95 return finished
96
97 def test_BootstrapContextInstance(self):
98@@ -325,10 +323,10 @@
99 def checkBootstrap(response):
100 cwd.append(response['cwd'])
101 self.assertTrue(cwd[0].endswith('/foo'))
102- c.callRemote(GetCWD
103- ).addCallback(checkBootstrap
104- ).addCallback(lambda _: c.callRemote(commands.Shutdown)
105- ).addCallback(lambda _: self.assertFalse(os.path.exists(cwd[0])))
106+ d = c.callRemote(GetCWD)
107+ d.addCallback(checkBootstrap)
108+ d.addCallback(lambda _: c.callRemote(commands.Shutdown))
109+ finished.addCallback(lambda _: self.assertFalse(os.path.exists(cwd[0])))
110 return finished
111
112 def test_startAMPAndParentProtocol(self):
113@@ -500,11 +498,16 @@
114 """
115 Test that deferToAMPProcess works as expected.
116 """
117-
118+ def cleanupGlobalPool():
119+ d = pool.pp.stop()
120+ pool.pp = None
121+ return d
122+ self.addCleanup(cleanupGlobalPool)
123+
124 STRING = "CIAOOOO"
125- return pool.deferToAMPProcess(commands.Echo, data=STRING
126- ).addCallback(lambda result: self.assertEquals(result['response'], STRING)
127- ).addCallback(lambda _: pool.pp.stop())
128+ d = pool.deferToAMPProcess(commands.Echo, data=STRING)
129+ d.addCallback(self.assertEquals, {"response": STRING})
130+ return d
131
132 def test_checkStateInPool(self):
133 """
134@@ -620,6 +623,7 @@
135 MIN = 1
136 RECYCLE_AFTER = 1
137 pp = pool.ProcessPool(ampChild=PidChild, min=MIN, max=MAX, recycleAfter=RECYCLE_AFTER)
138+ self.addCleanup(pp.stop)
139
140 def _checks(_):
141 self.assertEquals(pp.started, True)
142@@ -634,13 +638,11 @@
143 ).addCallback(lambda response: response['pid']
144 ).addCallback(self.assertNotEquals, pid)
145
146- def finish(reason):
147- return pp.stop().addCallback(lambda _: reason)
148
149- return pp.start(
150- ).addCallback(_checks
151- ).addCallback(_checks2
152- ).addCallback(finish)
153+ d = pp.start()
154+ d.addCallback(_checks)
155+ d.addCallback(_checks2)
156+ return d
157
158 def test_recyclingWithQueueOverload(self):
159 """
160@@ -652,20 +654,29 @@
161 RECYCLE_AFTER = 10
162 CALLS = 60
163 pp = pool.ProcessPool(ampChild=PidChild, min=MIN, max=MAX, recycleAfter=RECYCLE_AFTER)
164+ self.addCleanup(pp.stop)
165
166 def _check(results):
167 s = set()
168 for succeed, response in results:
169 s.add(response['pid'])
170- self.assertEquals(len(s), MAX*math.ceil(float(CALLS)/(MAX*RECYCLE_AFTER)))
171-
172+
173+ # For the first C{MAX} calls, each is basically guaranteed to go to
174+ # a different child. After that, though, there are no guarantees.
175+ # All the rest might go to a single child, since the child to
176+ # perform a job is selected arbitrarily from the "ready" set. Fair
177+ # distribution of jobs needs to be implemented; right now it's "set
178+ # ordering" distribution of jobs.
179+ self.assertTrue(len(s) > MAX)
180+
181 def _work(_):
182 l = [pp.doWork(Pid) for x in xrange(CALLS)]
183 d = defer.DeferredList(l)
184 return d.addCallback(_check)
185- return pp.start(
186- ).addCallback(_work
187- ).addCallback(lambda _: pp.stop())
188+ d = pp.start()
189+ d.addCallback(_work)
190+ return d
191+
192
193 def test_disableProcessRecycling(self):
194 """

Subscribers

People subscribed via source and target branches