Merge lp:~mwhudson/launchpad/smarter-code-import-scheduling-bug-236973 into lp:launchpad

Proposed by Michael Hudson-Doyle
Status: Merged
Approved by: Tim Penhey
Approved revision: not available
Merged at revision: not available
Proposed branch: lp:~mwhudson/launchpad/smarter-code-import-scheduling-bug-236973
Merge into: lp:launchpad
Prerequisite: lp:~mwhudson/launchpad/reduce-concurrent-job-count
Diff against target: 164 lines (+63/-13)
3 files modified
cronscripts/code-import-dispatcher.py (+1/-1)
lib/lp/codehosting/codeimport/dispatcher.py (+28/-3)
lib/lp/codehosting/codeimport/tests/test_dispatcher.py (+34/-9)
To merge this branch: bzr merge lp:~mwhudson/launchpad/smarter-code-import-scheduling-bug-236973
Reviewer Review Type Date Requested Status
Tim Penhey (community) Approve
Review via email: mp+19841@code.launchpad.net

Commit message

Ask for work until none is provided in the code import dispatcher.

To post a comment you must log in.
Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :

Hi Tim,

We've talked about this I think -- I modify the code import dispatcher to look for work until it finds none and sleeps for a period dependent on the load of the machine between the asks.

Revision history for this message
Tim Penhey (thumper) wrote :

Looks good. I'm eager to see how this works on staging :)

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'cronscripts/code-import-dispatcher.py'
--- cronscripts/code-import-dispatcher.py 2010-02-22 01:36:30 +0000
+++ cronscripts/code-import-dispatcher.py 2010-02-23 00:34:17 +0000
@@ -37,7 +37,7 @@
37 globalErrorUtility.configure('codeimportdispatcher')37 globalErrorUtility.configure('codeimportdispatcher')
3838
39 dispatcher = CodeImportDispatcher(self.logger, self.options.max_jobs)39 dispatcher = CodeImportDispatcher(self.logger, self.options.max_jobs)
40 dispatcher.findAndDispatchJob(40 dispatcher.findAndDispatchJobs(
41 ServerProxy(config.codeimportdispatcher.codeimportscheduler_url))41 ServerProxy(config.codeimportdispatcher.codeimportscheduler_url))
4242
4343
4444
=== modified file 'lib/lp/codehosting/codeimport/dispatcher.py'
--- lib/lp/codehosting/codeimport/dispatcher.py 2010-02-22 01:36:30 +0000
+++ lib/lp/codehosting/codeimport/dispatcher.py 2010-02-23 00:34:17 +0000
@@ -16,6 +16,7 @@
16import os16import os
17import socket17import socket
18import subprocess18import subprocess
19import time
1920
20from canonical.config import config21from canonical.config import config
2122
@@ -32,13 +33,14 @@
32 worker_script = os.path.join(33 worker_script = os.path.join(
33 config.root, 'scripts', 'code-import-worker-db.py')34 config.root, 'scripts', 'code-import-worker-db.py')
3435
35 def __init__(self, logger, worker_limit):36 def __init__(self, logger, worker_limit, _sleep=time.sleep):
36 """Initialize an instance.37 """Initialize an instance.
3738
38 :param logger: A `Logger` object.39 :param logger: A `Logger` object.
39 """40 """
40 self.logger = logger41 self.logger = logger
41 self.worker_limit = worker_limit42 self.worker_limit = worker_limit
43 self._sleep = _sleep
4244
43 def getHostname(self):45 def getHostname(self):
44 """Return the hostname of this machine.46 """Return the hostname of this machine.
@@ -65,15 +67,38 @@
6567
6668
67 def findAndDispatchJob(self, scheduler_client):69 def findAndDispatchJob(self, scheduler_client):
68 """Check for and dispatch a job if necessary."""70 """Check for and dispatch a job if necessary.
71
72 :return: A boolean, true if a job was found and dispatched.
73 """
6974
70 job_id = scheduler_client.getJobForMachine(75 job_id = scheduler_client.getJobForMachine(
71 self.getHostname(), self.worker_limit)76 self.getHostname(), self.worker_limit)
7277
73 if job_id == 0:78 if job_id == 0:
74 self.logger.info("No jobs pending.")79 self.logger.info("No jobs pending.")
75 return80 return False
7681
77 self.logger.info("Dispatching job %d." % job_id)82 self.logger.info("Dispatching job %d." % job_id)
7883
79 self.dispatchJob(job_id)84 self.dispatchJob(job_id)
85 return True
86
87 def _getSleepInterval(self):
88 """How long to sleep for until asking for a new job.
89
90 The basic idea is to wait longer if the machine is more heavily
91 loaded, so that less loaded slaves get a chance to grab some jobs.
92
93 We assume worker_limit will be roughly the number of CPUs in the
94 machine, so load/worker_limit is roughly how loaded the machine is.
95 """
96 return 5*os.getloadavg()[0]/self.worker_limit
97
98 def findAndDispatchJobs(self, scheduler_client):
99 """Call findAndDispatchJob until no job is found."""
100 while True:
101 found = self.findAndDispatchJob(scheduler_client)
102 if not found:
103 break
104 self._sleep(self._getSleepInterval())
80105
=== modified file 'lib/lp/codehosting/codeimport/tests/test_dispatcher.py'
--- lib/lp/codehosting/codeimport/tests/test_dispatcher.py 2010-02-22 02:06:57 +0000
+++ lib/lp/codehosting/codeimport/tests/test_dispatcher.py 2010-02-23 00:34:17 +0000
@@ -24,11 +24,11 @@
24class StubSchedulerClient:24class StubSchedulerClient:
25 """A scheduler client that returns a pre-arranged answer."""25 """A scheduler client that returns a pre-arranged answer."""
2626
27 def __init__(self, id_to_return):27 def __init__(self, ids_to_return):
28 self.id_to_return = id_to_return28 self.ids_to_return = ids_to_return
2929
30 def getJobForMachine(self, machine, limit):30 def getJobForMachine(self, machine, limit):
31 return self.id_to_return31 return self.ids_to_return.pop(0)
3232
3333
34class MockSchedulerClient:34class MockSchedulerClient:
@@ -51,9 +51,10 @@
51 TestCase.setUp(self)51 TestCase.setUp(self)
52 self.pushConfig('codeimportdispatcher', forced_hostname='none')52 self.pushConfig('codeimportdispatcher', forced_hostname='none')
5353
54 def makeDispatcher(self, worker_limit=10):54 def makeDispatcher(self, worker_limit=10, _sleep=lambda delay: None):
55 """Make a `CodeImportDispatcher`."""55 """Make a `CodeImportDispatcher`."""
56 return CodeImportDispatcher(QuietFakeLogger(), worker_limit)56 return CodeImportDispatcher(
57 QuietFakeLogger(), worker_limit, _sleep=_sleep)
5758
58 def test_getHostname(self):59 def test_getHostname(self):
59 # By default, getHostname return the same as socket.gethostname()60 # By default, getHostname return the same as socket.gethostname()
@@ -111,16 +112,16 @@
111 calls = []112 calls = []
112 dispatcher = self.makeDispatcher()113 dispatcher = self.makeDispatcher()
113 dispatcher.dispatchJob = lambda job_id: calls.append(job_id)114 dispatcher.dispatchJob = lambda job_id: calls.append(job_id)
114 dispatcher.findAndDispatchJob(StubSchedulerClient(10))115 found = dispatcher.findAndDispatchJob(StubSchedulerClient([10]))
115 self.assertEqual([10], calls)116 self.assertEqual(([10], True), (calls, found))
116117
117 def test_findAndDispatchJob_noJobWaiting(self):118 def test_findAndDispatchJob_noJobWaiting(self):
118 # If there is no job to dispatch, then we just exit quietly.119 # If there is no job to dispatch, then we just exit quietly.
119 calls = []120 calls = []
120 dispatcher = self.makeDispatcher()121 dispatcher = self.makeDispatcher()
121 dispatcher.dispatchJob = lambda job_id: calls.append(job_id)122 dispatcher.dispatchJob = lambda job_id: calls.append(job_id)
122 dispatcher.findAndDispatchJob(StubSchedulerClient(0))123 found = dispatcher.findAndDispatchJob(StubSchedulerClient([0]))
123 self.assertEqual([], calls)124 self.assertEqual(([], False), (calls, found))
124125
125 def test_findAndDispatchJob_calls_getJobForMachine_with_limit(self):126 def test_findAndDispatchJob_calls_getJobForMachine_with_limit(self):
126 # findAndDispatchJob calls getJobForMachine on the scheduler client127 # findAndDispatchJob calls getJobForMachine on the scheduler client
@@ -133,5 +134,29 @@
133 [(dispatcher.getHostname(), worker_limit)],134 [(dispatcher.getHostname(), worker_limit)],
134 scheduler_client.calls)135 scheduler_client.calls)
135136
137 def test_findAndDispatchJobs(self):
138 # findAndDispatchJobs calls getJobForMachine on the scheduler_client,
139 # dispatching jobs, until it indicates that there are no more jobs to
140 # dispatch.
141 calls = []
142 dispatcher = self.makeDispatcher()
143 dispatcher.dispatchJob = lambda job_id: calls.append(job_id)
144 dispatcher.findAndDispatchJobs(StubSchedulerClient([10, 9, 0]))
145 self.assertEqual([10, 9], calls)
146
147 def test_findAndDispatchJobs_sleeps(self):
148 # After finding a job, findAndDispatchJobs sleeps for an interval as
149 # returned by _getSleepInterval.
150 sleep_calls = []
151 interval = self.factory.getUniqueInteger()
152 def _sleep(delay):
153 sleep_calls.append(delay)
154 dispatcher = self.makeDispatcher(_sleep=_sleep)
155 dispatcher.dispatchJob = lambda job_id: None
156 dispatcher._getSleepInterval = lambda : interval
157 dispatcher.findAndDispatchJobs(StubSchedulerClient([10, 0]))
158 self.assertEqual([interval], sleep_calls)
159
160
136def test_suite():161def test_suite():
137 return TestLoader().loadTestsFromName(__name__)162 return TestLoader().loadTestsFromName(__name__)