Merge lp:~mwhudson/launchpad/puller-job-scheduling into lp:launchpad
- puller-job-scheduling
- Merge into devel
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Jonathan Lange | ||||
Approved revision: | no longer in the source branch. | ||||
Merged at revision: | not available | ||||
Proposed branch: | lp:~mwhudson/launchpad/puller-job-scheduling | ||||
Merge into: | lp:launchpad | ||||
Diff against target: | None lines | ||||
To merge this branch: | bzr merge lp:~mwhudson/launchpad/puller-job-scheduling | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Jonathan Lange (community) | code | Approve | |
Review via email: mp+9174@code.launchpad.net |
Commit message
Description of the change
Michael Hudson-Doyle (mwhudson) wrote : | # |
Jonathan Lange (jml) wrote : | # |
On Thu, Jul 23, 2009 at 1:00 PM, Michael
Hudson<email address hidden> wrote:
> Michael Hudson has proposed merging lp:~mwhudson/launchpad/puller-job-scheduling into lp:~launchpad-pqm/launchpad/devel.
>
> Requested reviews:
> Jonathan Lange (jml)
>
> Hi jml,
>
> This long awaited branch makes the puller scheduler work in a job-style fashion.
>
> In some sense, after all the preliminary branches, it's fairly simple, applying the task infrastructure.
>
Hooray!
> A fair portion of the changes from the lines-of-diff POV are due to the fact that the cronscript doesn't take an argument any more.
>
Yeah I see.
I can't recall that we ever discussed the impact of mirroring all
types of branches with the one process. I mention it in the review
below.
> I'm not sure if the JobScheduler needs more tests, tell me what you think.
>
I don't think it does.
> === modified file 'configs/
> --- configs/
> +++ configs/
> @@ -185,6 +185,10 @@
> [rosetta]
> global_
>
> +[supermirror_
> +error_dir: /var/tmp/
> +oops_prefix: SMP
> +
If we're adding a new section, do you think we could call it something better
than supermirror_puller? branch_puller perhaps?
> === modified file 'cronscripts/
> --- cronscripts/
> +++ cronscripts/
> @@ -12,7 +12,6 @@
> from twisted.python import log as tplog
> from twisted.web.xmlrpc import Proxy
>
> -from lp.code.enums import BranchType
> from lp.codehosting.
> from canonical.config import config
> from canonical.
> @@ -40,26 +39,11 @@
> parser = OptionParser()
> logger_
> (options, arguments) = parser.parse_args()
> - which = arguments.pop(0)
> if arguments:
> parser.
> -
> - branch_type_map = {
> - 'upload': BranchType.HOSTED,
> - 'mirror': BranchType.
> - 'import': BranchType.IMPORTED
> - }
> -
> - try:
> - branch_type = branch_
> - except KeyError:
> - parser.error(
> - 'Expected one of %s, but got: %r'
> - % (branch_
> -
> - log = set_up_
> + log = set_up_
> manager = scheduler.
> - Proxy(config.
> + Proxy(config.
>
It's pleasing to see this complexity go. I guess this threatens to increase
the lag for hosted branches. Do we need to worry about this?
> reactor.
> reactor.run()
>
> === modified file 'lib/canonical/
> --- lib/canonical/
> +++ lib/canonical/
Jonathan Lange (jml) wrote : | # |
All of which combines to be needs-fixing.
Michael Hudson-Doyle (mwhudson) wrote : | # |
Jonathan Lange wrote:
> On Thu, Jul 23, 2009 at 1:00 PM, Michael
> Hudson<email address hidden> wrote:
>> Michael Hudson has proposed merging lp:~mwhudson/launchpad/puller-job-scheduling into lp:~launchpad-pqm/launchpad/devel.
>>
>> Requested reviews:
>> Jonathan Lange (jml)
>>
>> Hi jml,
>>
>> This long awaited branch makes the puller scheduler work in a job-style fashion.
>>
>> In some sense, after all the preliminary branches, it's fairly simple, applying the task infrastructure.
>>
>
> Hooray!
>
>> A fair portion of the changes from the lines-of-diff POV are due to the fact that the cronscript doesn't take an argument any more.
>>
>
> Yeah I see.
>
> I can't recall that we ever discussed the impact of mirroring all
> types of branches with the one process. I mention it in the review
> below.
Hmm, I guess we should up the limit from the current 4 per type to say,
10 across all types. I also increased the worker_timeout, which should
help the initial pull of large branches.
We should set up some graphs to measure the problem at least. It would
be easy enough to graph the number of branches waiting to be pulled, I
don't think we keep enough information currently to track how long
branches wait on average. We will when we move to a BranchJob though --
which should be a fairly easy follow up to this branch.
>> I'm not sure if the JobScheduler needs more tests, tell me what you think.
>>
>
> I don't think it does.
Cool.
>> === modified file 'configs/
>> --- configs/
>> +++ configs/
>> @@ -185,6 +185,10 @@
>> [rosetta]
>> global_
>>
>> +[supermirror_
>> +error_dir: /var/tmp/
>> +oops_prefix: SMP
>> +
>
> If we're adding a new section, do you think we could call it something better
> than supermirror_puller? branch_puller perhaps?
Probably makes sense.
>> === modified file 'cronscripts/
>> --- cronscripts/
>> +++ cronscripts/
>> @@ -12,7 +12,6 @@
>> from twisted.python import log as tplog
>> from twisted.web.xmlrpc import Proxy
>>
>> -from lp.code.enums import BranchType
>> from lp.codehosting.
>> from canonical.config import config
>> from canonical.
>> @@ -40,26 +39,11 @@
>> parser = OptionParser()
>> logger_
>> (options, arguments) = parser.parse_args()
>> - which = arguments.pop(0)
>> if arguments:
>> parser.
>> -
>> - branch_type_map = {
>> - 'upload': BranchType.HOSTED,
>> - 'mirror': BranchType.
>> - 'import': BranchType.IMPORTED
>> - }
>> -
>> - try:
>> - branch_type = branch_
>> - except KeyError:
>> - parser.error(
>> - 'Expected one of %s, but got: %r'
>> - % (branch_
>> -
>> - log = set_up_
1 | === modified file 'lib/lp/codehosting/puller/scheduler.py' |
2 | --- lib/lp/codehosting/puller/scheduler.py 2009-07-23 02:07:29 +0000 |
3 | +++ lib/lp/codehosting/puller/scheduler.py 2009-07-23 02:35:18 +0000 |
4 | @@ -451,7 +451,8 @@ |
5 | return deferred |
6 | |
7 | def run(self): |
8 | - consumer = ParallelLimitedTaskConsumer(config.supermirror.maximum_workers) |
9 | + consumer = ParallelLimitedTaskConsumer( |
10 | + config.supermirror.maximum_workers) |
11 | self.consumer = consumer |
12 | source = PollingTaskSource(10, self._poll) |
13 | deferred = consumer.consume(source) |
14 | |
15 | === modified file 'lib/lp/codehosting/puller/tests/test_acceptance.py' |
16 | --- lib/lp/codehosting/puller/tests/test_acceptance.py 2009-07-23 02:07:29 +0000 |
17 | +++ lib/lp/codehosting/puller/tests/test_acceptance.py 2009-07-23 02:35:18 +0000 |
18 | @@ -10,7 +10,6 @@ |
19 | import os |
20 | import shutil |
21 | from subprocess import PIPE, Popen |
22 | -import sys |
23 | import unittest |
24 | from urlparse import urlparse |
25 | |
26 | @@ -453,4 +452,3 @@ |
27 | |
28 | def test_suite(): |
29 | return unittest.TestLoader().loadTestsFromName(__name__) |
30 | - return unittest.TestSuite() |
31 | |
32 | === modified file 'lib/lp/codehosting/puller/tests/test_scheduler.py' |
33 | --- lib/lp/codehosting/puller/tests/test_scheduler.py 2009-07-23 02:07:29 +0000 |
34 | +++ lib/lp/codehosting/puller/tests/test_scheduler.py 2009-07-23 02:35:18 +0000 |
35 | @@ -517,7 +517,6 @@ |
36 | def setUp(self): |
37 | from twisted.internet import reactor |
38 | self.factory = ObjectFactory() |
39 | - status_client = FakePullerEndpointProxy() |
40 | self.available_oops_prefixes = set(['foo']) |
41 | self.eventHandler = self.makePullerMaster( |
42 | BranchType.HOSTED, oops_prefixes=self.available_oops_prefixes) |
43 | @@ -801,8 +800,7 @@ |
44 | |
45 | # We need to create a branch at the destination_url, so that the |
46 | # subprocess can actually create a lock. |
47 | - destination_branch = BzrDir.create_branch_convenience( |
48 | - puller_master.destination_url) |
49 | + BzrDir.create_branch_convenience(puller_master.destination_url) |
50 | |
51 | deferred = puller_master.mirror().addErrback(self._dumpError) |
52 | |
53 | @@ -911,8 +909,7 @@ |
54 | |
55 | # We need to create a branch at the destination_url, so that the |
56 | # subprocess can actually create a lock. |
57 | - destination_branch = BzrDir.create_branch_convenience( |
58 | - locking_puller_master.destination_url) |
59 | + BzrDir.create_branch_convenience(locking_puller_master.destination_url) |
60 | |
61 | # Because when the deferred returned by 'func' is done we kill the |
62 | # locking subprocess, we know that when the subprocess is done, the |
Jonathan Lange (jml) wrote : | # |
Thanks for that.
Preview Diff
1 | === modified file 'configs/development/launchpad-lazr.conf' |
2 | --- configs/development/launchpad-lazr.conf 2009-07-16 21:12:16 +0000 |
3 | +++ configs/development/launchpad-lazr.conf 2009-07-23 02:07:29 +0000 |
4 | @@ -185,6 +185,10 @@ |
5 | [rosetta] |
6 | global_suggestions_enabled: True |
7 | |
8 | +[supermirror_puller] |
9 | +error_dir: /var/tmp/codehosting.test |
10 | +oops_prefix: SMP |
11 | + |
12 | [supermirror_import_puller] |
13 | error_dir: /var/tmp/codehosting.test |
14 | oops_prefix: ISMP |
15 | |
16 | === modified file 'cronscripts/supermirror-pull.py' |
17 | --- cronscripts/supermirror-pull.py 2009-06-24 20:52:01 +0000 |
18 | +++ cronscripts/supermirror-pull.py 2009-07-23 02:07:29 +0000 |
19 | @@ -12,7 +12,6 @@ |
20 | from twisted.python import log as tplog |
21 | from twisted.web.xmlrpc import Proxy |
22 | |
23 | -from lp.code.enums import BranchType |
24 | from lp.codehosting.puller import mirror, scheduler |
25 | from canonical.config import config |
26 | from canonical.launchpad.scripts import logger_options |
27 | @@ -40,26 +39,11 @@ |
28 | parser = OptionParser() |
29 | logger_options(parser) |
30 | (options, arguments) = parser.parse_args() |
31 | - which = arguments.pop(0) |
32 | if arguments: |
33 | parser.error("Unhandled arguments %s" % repr(arguments)) |
34 | - |
35 | - branch_type_map = { |
36 | - 'upload': BranchType.HOSTED, |
37 | - 'mirror': BranchType.MIRRORED, |
38 | - 'import': BranchType.IMPORTED |
39 | - } |
40 | - |
41 | - try: |
42 | - branch_type = branch_type_map[which] |
43 | - except KeyError: |
44 | - parser.error( |
45 | - 'Expected one of %s, but got: %r' |
46 | - % (branch_type_map.keys(), which)) |
47 | - |
48 | - log = set_up_logging_for_script(options, 'supermirror_%s_puller' % which) |
49 | + log = set_up_logging_for_script(options, 'supermirror_puller') |
50 | manager = scheduler.JobScheduler( |
51 | - Proxy(config.codehosting.branch_puller_endpoint), log, branch_type) |
52 | + Proxy(config.codehosting.branch_puller_endpoint), log) |
53 | |
54 | reactor.callWhenRunning(run_mirror, log, manager) |
55 | reactor.run() |
56 | |
57 | === modified file 'lib/canonical/config/schema-lazr.conf' |
58 | --- lib/canonical/config/schema-lazr.conf 2009-07-16 21:12:16 +0000 |
59 | +++ lib/canonical/config/schema-lazr.conf 2009-07-23 02:07:29 +0000 |
60 | @@ -1513,6 +1513,17 @@ |
61 | maximum_workers: 4 |
62 | |
63 | |
64 | +[supermirror_puller] |
65 | +# See [error_reports]. |
66 | +error_dir: none |
67 | + |
68 | +# See [error_reports]. |
69 | +oops_prefix: none |
70 | + |
71 | +# See [error_reports]. |
72 | +copy_to_zlog: false |
73 | + |
74 | + |
75 | [supermirror_import_puller] |
76 | # See [error_reports]. |
77 | error_dir: none |
78 | |
79 | === modified file 'lib/lp/codehosting/puller/scheduler.py' |
80 | --- lib/lp/codehosting/puller/scheduler.py 2009-06-25 04:06:00 +0000 |
81 | +++ lib/lp/codehosting/puller/scheduler.py 2009-07-23 02:35:18 +0000 |
82 | @@ -18,7 +18,6 @@ |
83 | import os |
84 | from StringIO import StringIO |
85 | import socket |
86 | -import sys |
87 | |
88 | from twisted.internet import defer, error, reactor |
89 | from twisted.protocols.basic import NetstringReceiver, NetstringParseError |
90 | @@ -28,7 +27,8 @@ |
91 | |
92 | import canonical |
93 | from canonical.cachedproperty import cachedproperty |
94 | -from lp.codehosting.vfs import branch_id_to_path |
95 | +from canonical.twistedsupport.task import ( |
96 | + ParallelLimitedTaskConsumer, PollingTaskSource) |
97 | from lp.codehosting.puller.worker import ( |
98 | get_canonical_url_for_branch_name) |
99 | from lp.codehosting.puller import get_lock_id_for_branch_id |
100 | @@ -276,7 +276,7 @@ |
101 | 'scripts/mirror-branch.py') |
102 | protocol_class = PullerMonitorProtocol |
103 | |
104 | - def __init__(self, branch_id, source_url, unique_name, branch_type, |
105 | + def __init__(self, branch_id, source_url, unique_name, branch_type_name, |
106 | default_stacked_on_url, logger, client, |
107 | available_oops_prefixes): |
108 | """Construct a PullerMaster object. |
109 | @@ -300,10 +300,9 @@ |
110 | """ |
111 | self.branch_id = branch_id |
112 | self.source_url = source_url.strip() |
113 | - path = branch_id_to_path(branch_id) |
114 | self.destination_url = 'lp-mirrored:///%s' % (unique_name,) |
115 | self.unique_name = unique_name |
116 | - self.branch_type = branch_type |
117 | + self.branch_type_name = branch_type_name |
118 | self.default_stacked_on_url = default_stacked_on_url |
119 | self.logger = logger |
120 | self.branch_puller_endpoint = client |
121 | @@ -335,7 +334,7 @@ |
122 | command = [ |
123 | interpreter, self.path_to_script, self.source_url, |
124 | self.destination_url, str(self.branch_id), str(self.unique_name), |
125 | - self.branch_type.name, self.oops_prefix, |
126 | + self.branch_type_name, self.oops_prefix, |
127 | self.default_stacked_on_url] |
128 | self.logger.debug("executing %s", command) |
129 | env = os.environ.copy() |
130 | @@ -416,12 +415,11 @@ |
131 | branches. |
132 | """ |
133 | |
134 | - def __init__(self, branch_puller_endpoint, logger, branch_type): |
135 | + def __init__(self, branch_puller_endpoint, logger): |
136 | self.branch_puller_endpoint = branch_puller_endpoint |
137 | self.logger = logger |
138 | self.actualLock = None |
139 | - self.branch_type = branch_type |
140 | - self.name = 'branch-puller-%s' % branch_type.name.lower() |
141 | + self.name = 'branch-puller' |
142 | self.lockfilename = '/var/lock/launchpad-%s.lock' % self.name |
143 | |
144 | @cachedproperty |
145 | @@ -435,43 +433,36 @@ |
146 | return set( |
147 | [str(i) for i in range(config.supermirror.maximum_workers)]) |
148 | |
149 | - def _run(self, puller_masters): |
150 | - """Run all branches_to_mirror registered with the JobScheduler.""" |
151 | - self.logger.info('%d branches to mirror', len(puller_masters)) |
152 | - assert config.supermirror.maximum_workers is not None, ( |
153 | - "config.supermirror.maximum_workers is not defined.") |
154 | - semaphore = defer.DeferredSemaphore( |
155 | + def _turnJobTupleIntoTask(self, job_tuple): |
156 | + if len(job_tuple) == 0: |
157 | + return None |
158 | + (branch_id, pull_url, unique_name, |
159 | + default_stacked_on_url, branch_type_name) = job_tuple |
160 | + master = PullerMaster( |
161 | + branch_id, pull_url, unique_name, branch_type_name, |
162 | + default_stacked_on_url, self.logger, |
163 | + self.branch_puller_endpoint, self.available_oops_prefixes) |
164 | + return master.run |
165 | + |
166 | + def _poll(self): |
167 | + deferred = self.branch_puller_endpoint.callRemote( |
168 | + 'acquireBranchToPull') |
169 | + deferred.addCallback(self._turnJobTupleIntoTask) |
170 | + return deferred |
171 | + |
172 | + def run(self): |
173 | + consumer = ParallelLimitedTaskConsumer( |
174 | config.supermirror.maximum_workers) |
175 | - deferreds = [ |
176 | - semaphore.run(puller_master.run) |
177 | - for puller_master in puller_masters] |
178 | - deferred = defer.gatherResults(deferreds) |
179 | + self.consumer = consumer |
180 | + source = PollingTaskSource(10, self._poll) |
181 | + deferred = consumer.consume(source) |
182 | deferred.addCallback(self._finishedRunning) |
183 | return deferred |
184 | |
185 | - def run(self): |
186 | - deferred = self.branch_puller_endpoint.callRemote( |
187 | - 'getBranchPullQueue', self.branch_type.name) |
188 | - deferred.addCallback(self.getPullerMasters) |
189 | - deferred.addCallback(self._run) |
190 | - return deferred |
191 | - |
192 | def _finishedRunning(self, ignored): |
193 | self.logger.info('Mirroring complete') |
194 | return ignored |
195 | |
196 | - def getPullerMaster(self, branch_id, branch_src, unique_name, |
197 | - default_stacked_on_url): |
198 | - branch_src = branch_src.strip() |
199 | - return PullerMaster( |
200 | - branch_id, branch_src, unique_name, self.branch_type, |
201 | - default_stacked_on_url, self.logger, |
202 | - self.branch_puller_endpoint, self.available_oops_prefixes) |
203 | - |
204 | - def getPullerMasters(self, branches_to_pull): |
205 | - return [ |
206 | - self.getPullerMaster(*branch) for branch in branches_to_pull] |
207 | - |
208 | def lock(self): |
209 | self.actualLock = GlobalLock(self.lockfilename) |
210 | try: |
211 | |
212 | === modified file 'lib/lp/codehosting/puller/tests/test_acceptance.py' |
213 | --- lib/lp/codehosting/puller/tests/test_acceptance.py 2009-06-25 04:06:00 +0000 |
214 | +++ lib/lp/codehosting/puller/tests/test_acceptance.py 2009-07-23 02:35:18 +0000 |
215 | @@ -10,7 +10,6 @@ |
216 | import os |
217 | import shutil |
218 | from subprocess import PIPE, Popen |
219 | -import sys |
220 | import unittest |
221 | from urlparse import urlparse |
222 | |
223 | @@ -123,7 +122,7 @@ |
224 | output, error = process.communicate() |
225 | return process.returncode, output, error |
226 | |
227 | - def runPuller(self, branch_type): |
228 | + def runPuller(self): |
229 | """Run the puller script for the given branch type. |
230 | |
231 | :param branch_type: One of 'upload', 'mirror' or 'import' |
232 | @@ -133,7 +132,7 @@ |
233 | stdout and stderr respectively. |
234 | """ |
235 | command = [ |
236 | - '%s/bin/py' % config.root, self._puller_script, '-q', branch_type] |
237 | + '%s/bin/py' % config.root, self._puller_script, '-q'] |
238 | retcode, output, error = self.runSubprocess(command) |
239 | return command, retcode, output, error |
240 | |
241 | @@ -214,7 +213,7 @@ |
242 | db_branch = self.factory.makeAnyBranch(branch_type=BranchType.HOSTED) |
243 | transaction.commit() |
244 | self.pushBranch(db_branch) |
245 | - command, retcode, output, error = self.runPuller('upload') |
246 | + command, retcode, output, error = self.runPuller() |
247 | self.assertRanSuccessfully(command, retcode, output, error) |
248 | self.assertMirrored(db_branch) |
249 | |
250 | @@ -225,7 +224,7 @@ |
251 | transaction.commit() |
252 | pack_tree = self.make_branch_and_tree('pack', format='pack-0.92') |
253 | self.pushBranch(db_branch, tree=pack_tree) |
254 | - command, retcode, output, error = self.runPuller('upload') |
255 | + command, retcode, output, error = self.runPuller() |
256 | self.assertRanSuccessfully(command, retcode, output, error) |
257 | self.assertMirrored(db_branch) |
258 | # Then we upgrade the to a different format and ask for it to be |
259 | @@ -234,7 +233,7 @@ |
260 | transaction.begin() |
261 | db_branch.requestMirror() |
262 | transaction.commit() |
263 | - command, retcode, output, error = self.runPuller('upload') |
264 | + command, retcode, output, error = self.runPuller() |
265 | self.assertRanSuccessfully(command, retcode, output, error) |
266 | self.assertMirrored(db_branch) |
267 | |
268 | @@ -244,7 +243,7 @@ |
269 | transaction.commit() |
270 | loom_tree = self.makeLoomBranchAndTree('loom') |
271 | self.pushBranch(db_branch, tree=loom_tree) |
272 | - command, retcode, output, error = self.runPuller('upload') |
273 | + command, retcode, output, error = self.runPuller() |
274 | self.assertRanSuccessfully(command, retcode, output, error) |
275 | self.assertMirrored(db_branch) |
276 | |
277 | @@ -257,7 +256,7 @@ |
278 | branch=db_branch, person=accessing_user) |
279 | transaction.commit() |
280 | self.pushBranch(db_branch) |
281 | - command, retcode, output, error = self.runPuller('upload') |
282 | + command, retcode, output, error = self.runPuller() |
283 | self.assertRanSuccessfully(command, retcode, output, error) |
284 | self.assertMirrored(db_branch, accessing_user=accessing_user) |
285 | |
286 | @@ -267,7 +266,7 @@ |
287 | branch_type=BranchType.MIRRORED) |
288 | tree = self.setUpMirroredBranch(db_branch) |
289 | transaction.commit() |
290 | - command, retcode, output, error = self.runPuller('mirror') |
291 | + command, retcode, output, error = self.runPuller() |
292 | self.assertRanSuccessfully(command, retcode, output, error) |
293 | self.assertMirrored(db_branch, source_branch=tree.branch) |
294 | |
295 | @@ -292,13 +291,11 @@ |
296 | series.branch = default_branch |
297 | # Arrange for it to be pulled. |
298 | if branch_type == BranchType.HOSTED: |
299 | - puller_type = 'upload' |
300 | transaction.commit() |
301 | # For hosted branches, we just push it into the hosted area via |
302 | # the codehosting vfs. |
303 | self.pushBranch(default_branch) |
304 | elif branch_type == BranchType.MIRRORED: |
305 | - puller_type = 'mirror' |
306 | # For mirrored branches, we serve the branch over HTTP, point the |
307 | # database branch at this HTTP server and call requestMirror() |
308 | self.setUpMirroredBranch(default_branch, format='1.6') |
309 | @@ -308,7 +305,7 @@ |
310 | "don't know how to make a %s default branch" |
311 | % branch_type.TITLE) |
312 | # Pull it. |
313 | - command, retcode, output, error = self.runPuller(puller_type) |
314 | + command, retcode, output, error = self.runPuller() |
315 | self.assertRanSuccessfully(command, retcode, output, error) |
316 | return default_branch |
317 | |
318 | @@ -320,7 +317,7 @@ |
319 | branch_type=BranchType.MIRRORED, product=default_branch.product) |
320 | tree = self.setUpMirroredBranch(db_branch, format='1.6') |
321 | transaction.commit() |
322 | - command, retcode, output, error = self.runPuller('mirror') |
323 | + command, retcode, output, error = self.runPuller() |
324 | self.assertRanSuccessfully(command, retcode, output, error) |
325 | mirrored_branch = self.assertMirrored( |
326 | db_branch, source_branch=tree.branch) |
327 | @@ -339,7 +336,7 @@ |
328 | branch_type=BranchType.HOSTED, product=default_branch.product) |
329 | transaction.commit() |
330 | self.pushBranch(db_branch, format='1.6') |
331 | - command, retcode, output, error = self.runPuller('upload') |
332 | + command, retcode, output, error = self.runPuller() |
333 | self.assertRanSuccessfully(command, retcode, output, error) |
334 | mirrored_branch = self.assertMirrored(db_branch) |
335 | self.assertEqual( |
336 | @@ -369,7 +366,7 @@ |
337 | branch_config.set_option( |
338 | 'stacked_on_location', |
339 | 'bzr+ssh://bazaar.launchpad.dev/' + default_branch.unique_name) |
340 | - command, retcode, output, error = self.runPuller('upload') |
341 | + command, retcode, output, error = self.runPuller() |
342 | self.assertRanSuccessfully(command, retcode, output, error) |
343 | # We clear the stacking information again here so that assertMirrored |
344 | # can open the branch in the hosted area. |
345 | @@ -388,7 +385,7 @@ |
346 | |
347 | tree = self.setUpMirroredBranch(db_branch, format='1.6') |
348 | transaction.commit() |
349 | - command, retcode, output, error = self.runPuller('mirror') |
350 | + command, retcode, output, error = self.runPuller() |
351 | self.assertRanSuccessfully(command, retcode, output, error) |
352 | mirrored_branch = self.assertMirrored( |
353 | db_branch, source_branch=tree.branch) |
354 | @@ -423,26 +420,26 @@ |
355 | self.serveOverHTTP(self._getImportMirrorPort()) |
356 | |
357 | # Run the puller. |
358 | - command, retcode, output, error = self.runPuller("import") |
359 | + command, retcode, output, error = self.runPuller() |
360 | self.assertRanSuccessfully(command, retcode, output, error) |
361 | |
362 | self.assertMirrored(db_branch, source_branch=tree.branch) |
363 | |
364 | def test_mirror_empty(self): |
365 | # Run the puller on an empty pull queue. |
366 | - command, retcode, output, error = self.runPuller("upload") |
367 | + command, retcode, output, error = self.runPuller() |
368 | self.assertRanSuccessfully(command, retcode, output, error) |
369 | |
370 | def test_records_script_activity(self): |
371 | # A record gets created in the ScriptActivity table. |
372 | script_activity_set = getUtility(IScriptActivitySet) |
373 | self.assertIs( |
374 | - script_activity_set.getLastActivity("branch-puller-hosted"), |
375 | + script_activity_set.getLastActivity("branch-puller"), |
376 | None) |
377 | - self.runPuller("upload") |
378 | + self.runPuller() |
379 | transaction.abort() |
380 | self.assertIsNot( |
381 | - script_activity_set.getLastActivity("branch-puller-hosted"), |
382 | + script_activity_set.getLastActivity("branch-puller"), |
383 | None) |
384 | |
385 | # Possible tests to add: |
386 | @@ -455,4 +452,3 @@ |
387 | |
388 | def test_suite(): |
389 | return unittest.TestLoader().loadTestsFromName(__name__) |
390 | - return unittest.TestSuite() |
391 | |
392 | === modified file 'lib/lp/codehosting/puller/tests/test_scheduler.py' |
393 | --- lib/lp/codehosting/puller/tests/test_scheduler.py 2009-06-25 04:06:00 +0000 |
394 | +++ lib/lp/codehosting/puller/tests/test_scheduler.py 2009-07-23 02:35:18 +0000 |
395 | @@ -73,55 +73,31 @@ |
396 | |
397 | def setUp(self): |
398 | self.masterlock = 'master.lock' |
399 | - # We set the log level to CRITICAL so that the log messages |
400 | - # are suppressed. |
401 | - logging.basicConfig(level=logging.CRITICAL) |
402 | |
403 | def tearDown(self): |
404 | reset_logging() |
405 | - |
406 | - def makeFakeClient(self, hosted, mirrored, imported): |
407 | - return FakePullerEndpointProxy( |
408 | - {'HOSTED': hosted, 'MIRRORED': mirrored, 'IMPORTED': imported}) |
409 | - |
410 | - def makeJobScheduler(self, branch_type, branch_tuples): |
411 | - if branch_type == BranchType.HOSTED: |
412 | - client = self.makeFakeClient(branch_tuples, [], []) |
413 | - elif branch_type == BranchType.MIRRORED: |
414 | - client = self.makeFakeClient([], branch_tuples, []) |
415 | - elif branch_type == BranchType.IMPORTED: |
416 | - client = self.makeFakeClient([], [], branch_tuples) |
417 | - else: |
418 | - self.fail("Unknown branch type: %r" % (branch_type,)) |
419 | - return scheduler.JobScheduler( |
420 | - client, logging.getLogger(), branch_type) |
421 | + if os.path.exists(self.masterlock): |
422 | + os.unlink(self.masterlock) |
423 | + |
424 | + def makeJobScheduler(self): |
425 | + return scheduler.JobScheduler(None, logging.getLogger()) |
426 | |
427 | def testManagerCreatesLocks(self): |
428 | - try: |
429 | - manager = self.makeJobScheduler(BranchType.HOSTED, []) |
430 | - manager.lockfilename = self.masterlock |
431 | - manager.lock() |
432 | - self.failUnless(os.path.exists(self.masterlock)) |
433 | - manager.unlock() |
434 | - finally: |
435 | - self._removeLockFile() |
436 | + manager = self.makeJobScheduler() |
437 | + manager.lockfilename = self.masterlock |
438 | + manager.lock() |
439 | + self.failUnless(os.path.exists(self.masterlock)) |
440 | + manager.unlock() |
441 | |
442 | def testManagerEnforcesLocks(self): |
443 | - try: |
444 | - manager = self.makeJobScheduler(BranchType.HOSTED, []) |
445 | - manager.lockfilename = self.masterlock |
446 | - manager.lock() |
447 | - anothermanager = self.makeJobScheduler(BranchType.HOSTED, []) |
448 | - anothermanager.lockfilename = self.masterlock |
449 | - self.assertRaises(scheduler.LockError, anothermanager.lock) |
450 | - self.failUnless(os.path.exists(self.masterlock)) |
451 | - manager.unlock() |
452 | - finally: |
453 | - self._removeLockFile() |
454 | - |
455 | - def _removeLockFile(self): |
456 | - if os.path.exists(self.masterlock): |
457 | - os.unlink(self.masterlock) |
458 | + manager = self.makeJobScheduler() |
459 | + manager.lockfilename = self.masterlock |
460 | + manager.lock() |
461 | + anothermanager = self.makeJobScheduler() |
462 | + anothermanager.lockfilename = self.masterlock |
463 | + self.assertRaises(scheduler.LockError, anothermanager.lock) |
464 | + self.failUnless(os.path.exists(self.masterlock)) |
465 | + manager.unlock() |
466 | |
467 | |
468 | class TestPullerWireProtocol(TrialTestCase): |
469 | @@ -541,7 +517,6 @@ |
470 | def setUp(self): |
471 | from twisted.internet import reactor |
472 | self.factory = ObjectFactory() |
473 | - status_client = FakePullerEndpointProxy() |
474 | self.available_oops_prefixes = set(['foo']) |
475 | self.eventHandler = self.makePullerMaster( |
476 | BranchType.HOSTED, oops_prefixes=self.available_oops_prefixes) |
477 | @@ -825,8 +800,7 @@ |
478 | |
479 | # We need to create a branch at the destination_url, so that the |
480 | # subprocess can actually create a lock. |
481 | - destination_branch = BzrDir.create_branch_convenience( |
482 | - puller_master.destination_url) |
483 | + BzrDir.create_branch_convenience(puller_master.destination_url) |
484 | |
485 | deferred = puller_master.mirror().addErrback(self._dumpError) |
486 | |
487 | @@ -935,8 +909,7 @@ |
488 | |
489 | # We need to create a branch at the destination_url, so that the |
490 | # subprocess can actually create a lock. |
491 | - destination_branch = BzrDir.create_branch_convenience( |
492 | - locking_puller_master.destination_url) |
493 | + BzrDir.create_branch_convenience(locking_puller_master.destination_url) |
494 | |
495 | # Because when the deferred returned by 'func' is done we kill the |
496 | # locking subprocess, we know that when the subprocess is done, the |
Hi jml,
This long awaited branch makes the puller scheduler work in a job-style fashion.
In some sense, after all the preliminary branches, it's fairly simple, applying the task infrastructure.
A fair portion of the changes from the lines-of-diff POV are due to the fact that the cronscript doesn't take an argument any more.
I'm not sure if the JobScheduler needs more tests, tell me what you think.
Cheers,
mwh