Merge lp:~cr3/checkbox-core/shadowd into lp:checkbox-core
- shadowd
- Merge into trunk
Proposed by
Marc Tardif
Status: | Merged |
---|---|
Merged at revision: | 18 |
Proposed branch: | lp:~cr3/checkbox-core/shadowd |
Merge into: | lp:checkbox-core |
Diff against target: |
1021 lines (+513/-132) 14 files modified
bin/checkbox-shadowd (+30/-0) checkbox/daemon/client.py (+8/-2) checkbox/daemon/operation.py (+2/-2) checkbox/daemon/server.py (+10/-7) checkbox/daemon/subsystem.py (+4/-0) checkbox/journal/event.py (+81/-25) checkbox/message/attributes.py (+2/-0) checkbox/scripts/shadowd.py (+78/-0) checkbox/scripts/submit.py (+83/-28) checkbox/shadowd/queue.py (+9/-12) checkbox/shadowd/sender.py (+59/-48) checkbox/shadowd/starter.py (+128/-0) checkbox/shadowd/tests/test_sender.py (+10/-8) configs/checkbox.conf (+9/-0) |
To merge this branch: | bzr merge lp:~cr3/checkbox-core/shadowd |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Sylvain Pineau (community) | Approve | ||
Marc Tardif | Pending | ||
Review via email:
|
Commit message
Description of the change
To post a comment you must log in.
Revision history for this message
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Marc Tardif (cr3) wrote : | # |
I've applied both your suggestions and merged into trunk. Thanks!
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === added file 'bin/checkbox-shadowd' |
2 | --- bin/checkbox-shadowd 1970-01-01 00:00:00 +0000 |
3 | +++ bin/checkbox-shadowd 2012-05-01 02:07:19 +0000 |
4 | @@ -0,0 +1,30 @@ |
5 | +#!/usr/bin/python |
6 | +# |
7 | +# Copyright (c) 2012 Canonical |
8 | +# |
9 | +# This file is part of Checkbox. |
10 | +# |
11 | +# Checkbox is free software; you can redistribute it and/or modify |
12 | +# it under the terms of the GNU Lesser General Public License as |
13 | +# published by the Free Software Foundation; either version 2.1 of |
14 | +# the License, or (at your option) any later version. |
15 | +# |
16 | +# Checkbox is distributed in the hope that it will be useful, |
17 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
18 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
19 | +# GNU Lesser General Public License for more details. |
20 | +# |
21 | +# You should have received a copy of the GNU Lesser General Public License |
22 | +# along with this program. If not, see <http://www.gnu.org/licenses/>. |
23 | +# |
24 | +import sys |
25 | + |
26 | +try: |
27 | + import _preamble |
28 | +except ImportError: |
29 | + sys.exc_clear() |
30 | + |
31 | +from checkbox.scripts import shadowd |
32 | + |
33 | + |
34 | +shadowd.run() |
35 | |
36 | === modified file 'checkbox/daemon/client.py' |
37 | --- checkbox/daemon/client.py 2012-04-24 22:43:09 +0000 |
38 | +++ checkbox/daemon/client.py 2012-05-01 02:07:19 +0000 |
39 | @@ -325,7 +325,13 @@ |
40 | messenger.startCommand(message) |
41 | |
42 | def _doLocate(self): |
43 | - return True |
44 | + if self.type == ClientType.ANY: |
45 | + return True |
46 | + elif self.type == ClientType.SHADOWD: |
47 | + return self._getDaemonInfo() |
48 | + else: |
49 | + raise Exception( |
50 | + "Unsupported client type: %s" % ClientType[self.type]) |
51 | |
52 | def _initializeHostname(self): |
53 | if self._initialized_hostname: |
54 | @@ -471,7 +477,7 @@ |
55 | return True |
56 | |
57 | def _getConnectorInfo(self): |
58 | - if self._address and is_ipaddress(self._address)[0]: |
59 | + if self._address and address_is_valid(self._address): |
60 | self._port = address_to_port(self._address) |
61 | if self._port > 0: |
62 | logging.debug("Already have address, no info to locate.") |
63 | |
64 | === modified file 'checkbox/daemon/operation.py' |
65 | --- checkbox/daemon/operation.py 2012-04-24 22:43:09 +0000 |
66 | +++ checkbox/daemon/operation.py 2012-05-01 02:07:19 +0000 |
67 | @@ -45,8 +45,8 @@ |
68 | "OFF_PEACEFUL", |
69 | "SET_PEACEFUL_SHUTDOWN", |
70 | "TIME_OFFSET", |
71 | - ("SHADOWD_BASE", 100), |
72 | - "QUEUE", |
73 | + ("SHADOWD_BASE", 200), |
74 | + "ENQUEUE", |
75 | "CONTINUE_CLAIM", |
76 | "SUSPEND_CLAIM", |
77 | "DEACTIVATE_CLAIM", |
78 | |
79 | === modified file 'checkbox/daemon/server.py' |
80 | --- checkbox/daemon/server.py 2012-04-23 19:36:19 +0000 |
81 | +++ checkbox/daemon/server.py 2012-05-01 02:07:19 +0000 |
82 | @@ -560,7 +560,10 @@ |
83 | if sock_handler.function: |
84 | logging.info("Calling handler for sock") |
85 | |
86 | - result = sock_handler.function(sock) |
87 | + if sock_handler.data is not None: |
88 | + result = sock_handler.function(sock, sock_handler.data) |
89 | + else: |
90 | + result = sock_handler.function(sock) |
91 | logging.info("Return from handler") |
92 | |
93 | elif handle_command: |
94 | @@ -1176,11 +1179,11 @@ |
95 | or sock_timed_out): |
96 | if (sock_timed_out.sock._finish_connect() |
97 | != StreamError.WOULDBLOCK): |
98 | - sock_timed_out.call_handler = True |
99 | + sock_handler.call_handler = True |
100 | else: |
101 | if (selector.check(sock, SelectorIO.READ) |
102 | or sock_timed_out): |
103 | - sock_timed_out.call_handler = True |
104 | + sock_handler.call_handler = True |
105 | |
106 | # Scan through the pipe table to find which ones are callable |
107 | for pipe_handler in self._pipe_table: |
108 | @@ -1239,7 +1242,7 @@ |
109 | |
110 | # Enqueue this packet into the buffers if this |
111 | # is a udp sock |
112 | - if sock.get_protocol() == "udp": |
113 | + if sock.protocol == "udp": |
114 | if not sock.handle_incoming_packet(): |
115 | # There is not yet a complete message ready |
116 | continue |
117 | @@ -1287,7 +1290,7 @@ |
118 | self._sock_udp.unserialize(string) |
119 | |
120 | def _handleRequest(self, sock): |
121 | - protocol = sock.get_protocol() |
122 | + protocol = sock.protocol |
123 | if protocol == "tcp": |
124 | is_tcp = True |
125 | elif protocol == "udp": |
126 | @@ -1376,7 +1379,7 @@ |
127 | return True |
128 | |
129 | def _handleRequestSockTimerHandler(self, sock): |
130 | - assert(sock.get_protocol() == "tcp") |
131 | + assert(sock.protocol == "tcp") |
132 | |
133 | self.unregisterSock(sock) |
134 | |
135 | @@ -1706,7 +1709,7 @@ |
136 | socket.IPPROTO_TCP, socket.TCP_NODELAY, 1): |
137 | logging.info("Failed to set TCP_NODELAY on TCP command port.") |
138 | |
139 | - if not sock_tcp.listen_on_port(port): |
140 | + if not sock_tcp.listen(port): |
141 | return cleanup( |
142 | "Failed to listen on port %d for TCP command socket.", port) |
143 | |
144 | |
145 | === modified file 'checkbox/daemon/subsystem.py' |
146 | --- checkbox/daemon/subsystem.py 2012-04-24 22:43:09 +0000 |
147 | +++ checkbox/daemon/subsystem.py 2012-05-01 02:07:19 +0000 |
148 | @@ -126,6 +126,10 @@ |
149 | self.addEntry( |
150 | SubsystemType.MASTER, SubsystemCategory.DAEMON, "MASTER") |
151 | self.addEntry( |
152 | + SubsystemType.SHADOW, SubsystemCategory.DAEMON, "SHADOW") |
153 | + self.addEntry( |
154 | + SubsystemType.SHADOWD, SubsystemCategory.DAEMON, "SHADOWD") |
155 | + self.addEntry( |
156 | SubsystemType.RUNNER, SubsystemCategory.DAEMON, "RUNNER") |
157 | self.addEntry( |
158 | SubsystemType.RUNNERD, SubsystemCategory.DAEMON, "RUNNERD") |
159 | |
160 | === modified file 'checkbox/journal/event.py' |
161 | --- checkbox/journal/event.py 2012-04-23 19:36:19 +0000 |
162 | +++ checkbox/journal/event.py 2012-05-01 02:07:19 +0000 |
163 | @@ -143,12 +143,13 @@ |
164 | |
165 | type = None |
166 | |
167 | - def __init__(self): |
168 | + def __init__(self, cluster=-1, proc=-1, subproc=-1): |
169 | + self.cluster = cluster |
170 | + self.proc = proc |
171 | + self.subproc = subproc |
172 | + |
173 | self.event_clock = time.time() |
174 | self.event_time = time.localtime(self.event_clock) |
175 | - self.cluster = -1 |
176 | - self.proc = -1 |
177 | - self.subproc = -1 |
178 | |
179 | def _initializeFromRecord(self, record): |
180 | self.cluster = record.evaluateInteger("cluster", -1) |
181 | @@ -263,8 +264,8 @@ |
182 | |
183 | type = EventType.GENERIC |
184 | |
185 | - def __init__(self): |
186 | - super(GenericEvent, self).__init__() |
187 | + def __init__(self, *args, **kwargs): |
188 | + super(GenericEvent, self).__init__(*args, **kwargs) |
189 | self.data = "" |
190 | |
191 | def _initializeFromRecord(self, record): |
192 | @@ -276,7 +277,9 @@ |
193 | self.data = record.evaluateString("generic_data", "") |
194 | |
195 | def _writeEvent(self, file): |
196 | - file.write("%s\n" % self.data) |
197 | + if file.write("%s\n" % self.data) < 0: |
198 | + return False |
199 | + |
200 | return True |
201 | |
202 | def _readEvent(self, file): |
203 | @@ -292,8 +295,8 @@ |
204 | |
205 | type = EventType.EXECUTE |
206 | |
207 | - def __init__(self): |
208 | - super(ExecuteEvent, self).__init__() |
209 | + def __init__(self, *args, **kwargs): |
210 | + super(ExecuteEvent, self).__init__(*args, **kwargs) |
211 | self.host = "" |
212 | |
213 | def _initializeFromRecord(self, record): |
214 | @@ -305,7 +308,9 @@ |
215 | self.host = record.evaluateString("execute_host", "") |
216 | |
217 | def _writeEvent(self, file): |
218 | - file.write("Job executing on host: %s\n" % self.host) |
219 | + if file.write("Job executing on host: %s\n" % self.host) < 0: |
220 | + return False |
221 | + |
222 | return True |
223 | |
224 | def _readEvent(self, file): |
225 | @@ -326,8 +331,8 @@ |
226 | |
227 | type = EventType.SUSPEND |
228 | |
229 | - def __init__(self): |
230 | - super(SuspendEvent, self).__init__() |
231 | + def __init__(self, *args, **kwargs): |
232 | + super(SuspendEvent, self).__init__(*args, **kwargs) |
233 | self.pid_count = 0 |
234 | |
235 | def _initializeFromRecord(self, record): |
236 | @@ -339,8 +344,13 @@ |
237 | self.pid_count = record.evaluateString("pid_count", 0) |
238 | |
239 | def _writeEvent(self, file): |
240 | - file.write("Job was suspended\n") |
241 | - file.write("Number of processes suspended: %d\n" % self.pid_count) |
242 | + if file.write("Job was suspended\n") < 0: |
243 | + return False |
244 | + |
245 | + if file.write( |
246 | + "Number of processes suspended: %d\n" % self.pid_count) < 0: |
247 | + return False |
248 | + |
249 | return True |
250 | |
251 | def _readEvent(self, file): |
252 | @@ -363,7 +373,9 @@ |
253 | type = EventType.RESUME |
254 | |
255 | def _writeEvent(self, file): |
256 | - file.write("Job was resumed\n") |
257 | + if file.write("Job was resumed\n") < 0: |
258 | + return False |
259 | + |
260 | return True |
261 | |
262 | def _readEvent(self, file): |
263 | @@ -374,6 +386,42 @@ |
264 | return True |
265 | |
266 | |
267 | +class SubmitEvent(EventInfo): |
268 | + |
269 | + __slots__ = ( |
270 | + "host", |
271 | + ) |
272 | + |
273 | + type = EventType.SUBMIT |
274 | + |
275 | + def __init__(self, *args, **kwargs): |
276 | + super(SubmitEvent, self).__init__(*args, **kwargs) |
277 | + self.host = "" |
278 | + |
279 | + def _writeEvent(self, file): |
280 | + if file.write("Job submitted from host: %s\n" % self.host) < 0: |
281 | + return False |
282 | + |
283 | + return True |
284 | + |
285 | + def _readEvent(self, file): |
286 | + line = file.readline() |
287 | + match = re.match(r"Job submitted from host: (?P<host>[^\s]*)\n", line) |
288 | + if not match: |
289 | + return False |
290 | + |
291 | + self.host = match.group("host") |
292 | + return True |
293 | + |
294 | + def _initializeFromRecord(self, record): |
295 | + super(SubmitEvent, self)._initializeFromRecord(record) |
296 | + |
297 | + if not record: |
298 | + return |
299 | + |
300 | + self.host = record.evaluateString("host", "") |
301 | + |
302 | + |
303 | class TerminateEvent(EventInfo): |
304 | |
305 | __slots__ = ( |
306 | @@ -395,8 +443,8 @@ |
307 | ("total_sent_bytes", "Total Bytes Sent"), |
308 | ("total_received_bytes", "Total Bytes Received")) |
309 | |
310 | - def __init__(self): |
311 | - super(TerminateEvent, self).__init__() |
312 | + def __init__(self, *args, **kwargs): |
313 | + super(TerminateEvent, self).__init__(*args, **kwargs) |
314 | self.sent_bytes = self.received_bytes = 0.0 |
315 | self.total_sent_bytes = self.total_received_bytes = 0.0 |
316 | self.return_value = self.signal_number = -1 |
317 | @@ -404,25 +452,33 @@ |
318 | self.core_file = None |
319 | |
320 | def _writeEvent(self, file): |
321 | - file.write("Job terminated\n") |
322 | + if file.write("Job terminated\n") < 0: |
323 | + return False |
324 | |
325 | if self.normal: |
326 | - file.write( |
327 | + if file.write( |
328 | "\t(1) Normal termination (return value %d)\n" |
329 | - % self.return_value) |
330 | + % self.return_value) < 0: |
331 | + return False |
332 | else: |
333 | - file.write( |
334 | + if file.write( |
335 | "\t(0) Abnormal termination (signal %d)\n" |
336 | - % self.signal_number) |
337 | + % self.signal_number) < 0: |
338 | + return False |
339 | |
340 | if self.core_file: |
341 | - file.write("\t(1) Core file in: %s\n" % self.core_file) |
342 | + ret = file.write("\t(1) Core file in: %s\n" % self.core_file) |
343 | else: |
344 | - file.write("\t(0) No core file\n") |
345 | + ret = file.write("\t(0) No core file\n") |
346 | + |
347 | + if ret < 0: |
348 | + return False |
349 | |
350 | # Write bytes |
351 | for attribute, string in self.attribute_strings: |
352 | - file.write("\t%.0f - %s\n" % (getattr(self, attribute), string)) |
353 | + if file.write( |
354 | + "\t%.0f - %s\n" % (getattr(self, attribute), string)) < 0: |
355 | + return False |
356 | |
357 | return True |
358 | |
359 | |
360 | === modified file 'checkbox/message/attributes.py' |
361 | --- checkbox/message/attributes.py 2012-04-23 19:36:19 +0000 |
362 | +++ checkbox/message/attributes.py 2012-05-01 02:07:19 +0000 |
363 | @@ -43,6 +43,8 @@ |
364 | JOB_STATE="state", |
365 | JOB_START_TIME="start_time", |
366 | JOB_EXIT_TIME="exit_time", |
367 | + JOB_ROOT_DIR="root_dir", |
368 | + JOB_WORKING_DIR="working_dir", |
369 | |
370 | CLUSTER_ID="cluster_id", |
371 | PROC_ID="proc_id", |
372 | |
373 | === added file 'checkbox/scripts/shadowd.py' |
374 | --- checkbox/scripts/shadowd.py 1970-01-01 00:00:00 +0000 |
375 | +++ checkbox/scripts/shadowd.py 2012-05-01 02:07:19 +0000 |
376 | @@ -0,0 +1,78 @@ |
377 | +# |
378 | +# Copyright (c) 2012 Canonical |
379 | +# |
380 | +# This file is part of Checkbox. |
381 | +# |
382 | +# Checkbox is free software; you can redistribute it and/or modify |
383 | +# it under the terms of the GNU Lesser General Public License as |
384 | +# published by the Free Software Foundation; either version 2.1 of |
385 | +# the License, or (at your option) any later version. |
386 | +# |
387 | +# Checkbox is distributed in the hope that it will be useful, |
388 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
389 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
390 | +# GNU Lesser General Public License for more details. |
391 | +# |
392 | +# You should have received a copy of the GNU Lesser General Public License |
393 | +# along with this program. If not, see <http://www.gnu.org/licenses/>. |
394 | +# |
395 | +__metaclass__ = type |
396 | + |
397 | +__all__ = [ |
398 | + "run", |
399 | + ] |
400 | + |
401 | +from optparse import OptionGroup |
402 | + |
403 | +from checkbox.daemon.subsystem import ( |
404 | + SubsystemType, |
405 | + set_subsystem, |
406 | + ) |
407 | + |
408 | +from checkbox.shadowd.starter import Starter |
409 | + |
410 | +from checkbox.scripts.application import Application |
411 | +from checkbox.scripts.mixins.config import ConfigMixin |
412 | +from checkbox.scripts.mixins.daemon import DaemonMixin |
413 | +from checkbox.scripts.mixins.logger import LoggerMixin |
414 | + |
415 | + |
416 | +class ShadowdApplication(ConfigMixin, DaemonMixin, LoggerMixin, Application): |
417 | + |
418 | + # Application defaults |
419 | + usage = "Usage: %prog [OPTIONS]" |
420 | + |
421 | + # Shadowd defaults |
422 | + default_name = None |
423 | + |
424 | + def addOptions(self, parser): |
425 | + """See Application.""" |
426 | + super(ShadowdApplication, self).addOptions(parser) |
427 | + |
428 | + group = OptionGroup(parser, "Shadowd options") |
429 | + group.add_option("--name", |
430 | + default=self.default_name, |
431 | + help="Shadow daemon name.") |
432 | + |
433 | + parser.add_option_group(group) |
434 | + |
435 | + def parseOptions(self, options, args): |
436 | + super(ShadowdApplication, self).parseOptions(options, args) |
437 | + |
438 | + self.shadow_name = options.name |
439 | + |
440 | + def preProcess(self): |
441 | + # Set the subsystem for the daemon server. |
442 | + set_subsystem("SHADOWD", SubsystemType.SHADOWD) |
443 | + |
444 | + super(ShadowdApplication, self).preProcess() |
445 | + |
446 | + def process(self): |
447 | + starter = Starter() |
448 | + starter.initialize() |
449 | + starter.register() |
450 | + |
451 | + |
452 | +def run(): |
453 | + application = ShadowdApplication() |
454 | + application.run() |
455 | |
456 | === modified file 'checkbox/scripts/submit.py' |
457 | --- checkbox/scripts/submit.py 2012-04-24 22:43:09 +0000 |
458 | +++ checkbox/scripts/submit.py 2012-05-01 02:07:19 +0000 |
459 | @@ -22,6 +22,7 @@ |
460 | "run", |
461 | ] |
462 | |
463 | +import os |
464 | import sys |
465 | |
466 | from optparse import ( |
467 | @@ -29,7 +30,9 @@ |
468 | OptionValueError, |
469 | ) |
470 | from signal import SIGPIPE |
471 | +from StringIO import StringIO |
472 | |
473 | +from checkbox.daemon.operation import OperationCode |
474 | from checkbox.daemon.subsystem import ( |
475 | SubsystemType, |
476 | set_subsystem, |
477 | @@ -40,6 +43,13 @@ |
478 | sigaction, |
479 | ) |
480 | |
481 | +from checkbox.message.attributes import Attributes |
482 | +from checkbox.message.parser import ( |
483 | + ParserError, |
484 | + ParserStream, |
485 | + ) |
486 | +from checkbox.message.record import Record |
487 | + |
488 | from checkbox.scripts.application import ( |
489 | Application, |
490 | ApplicationError, |
491 | @@ -49,10 +59,6 @@ |
492 | |
493 | from checkbox.shadowd.client import Client |
494 | from checkbox.shadowd.queue import queue_connect |
495 | -from checkbox.shadowd.sender import ( |
496 | - send_create_cluster, |
497 | - send_create_process, |
498 | - ) |
499 | |
500 | |
501 | class SubmitApplication(ConfigMixin, LoggerMixin, Application): |
502 | @@ -71,10 +77,6 @@ |
503 | super(SubmitApplication, self).addOptions(parser) |
504 | |
505 | group = OptionGroup(parser, "Submit options") |
506 | - group.add_option("--append", |
507 | - action="append", |
508 | - type="string", |
509 | - help="Append expressions to each job in the file.") |
510 | group.add_option("--pool", |
511 | metavar="HOST", |
512 | help="Host as the central manager to query.") |
513 | @@ -102,9 +104,6 @@ |
514 | else: |
515 | self.file = sys.stdin |
516 | |
517 | - # Append option |
518 | - self.append = options.append |
519 | - |
520 | if options.local and options.remote: |
521 | raise OptionValueError( |
522 | "Must not specify both --local and --remote.") |
523 | @@ -113,7 +112,7 @@ |
524 | elif options.remote: |
525 | self.name = options.remote |
526 | |
527 | - self.client = Client(self.name, options.pool) |
528 | + self.pool = options.pool |
529 | |
530 | def preProcess(self): |
531 | # Install signal handlers. |
532 | @@ -125,15 +124,50 @@ |
533 | super(SubmitApplication, self).preProcess() |
534 | |
535 | def process(self): |
536 | - self.queue() |
537 | - |
538 | - def connect(self): |
539 | + jobs = self.read(self.file) |
540 | + |
541 | + client = Client(self.name, self.pool) |
542 | + if not client.locate(): |
543 | + raise ApplicationError("Failed to find address of local shadowd") |
544 | + |
545 | + self.enqueue(client, jobs) |
546 | + self.reschedule(client) |
547 | + |
548 | + def read(self, file): |
549 | + job = Record() |
550 | + parser = ParserStream() |
551 | + for line in file.readlines(): |
552 | + # Skip over comments |
553 | + line = line.strip() |
554 | + if line[0] == "#": |
555 | + continue |
556 | + |
557 | + if not line: |
558 | + if job.references: |
559 | + yield job |
560 | + job = Record() |
561 | + |
562 | + try: |
563 | + tree = parser.parseRecord(StringIO(line)) |
564 | + except ParserError, error: |
565 | + raise ApplicationError(error) |
566 | + |
567 | + if not tree: |
568 | + raise ApplicationError("Failed to parse line: %s" % line) |
569 | + |
570 | + for key, value in tree.references.iteritems(): |
571 | + job.insert(key, value) |
572 | + |
573 | + if job.references: |
574 | + yield job |
575 | + |
576 | + def connect(self, client): |
577 | if self.is_active: |
578 | return |
579 | |
580 | - sock = queue_connect(self.client.address, 0) |
581 | - if not sock: |
582 | - name = self.client.name |
583 | + sender = queue_connect(client.address, 0) |
584 | + if not sender: |
585 | + name = client.name |
586 | if name: |
587 | raise ApplicationError( |
588 | "Failed to connect to queue manager %s" % name) |
589 | @@ -143,21 +177,42 @@ |
590 | |
591 | self.is_active = True |
592 | |
593 | - return |
594 | + return sender |
595 | |
596 | - def queue(self, count=1): |
597 | + def enqueue(self, client, jobs): |
598 | """Place one or more copies of the jobs in the queue. |
599 | |
600 | :param count: Number of times to submit the jobs to the queue. |
601 | """ |
602 | - sock = self.connect() |
603 | - |
604 | - for i in xrange(count): |
605 | - cluster = send_create_cluster(sock) |
606 | - if cluster < 0: |
607 | - raise ApplicationError("Failed to create cluster.") |
608 | - |
609 | - process = send_create_process(sock, cluster) |
610 | + sender = self.connect(client) |
611 | + cluster = sender.createCluster() |
612 | + if cluster < 0: |
613 | + raise ApplicationError("Failed to create cluster.") |
614 | + |
615 | + for job in jobs: |
616 | + process = sender.createProcess(cluster) |
617 | + if process < 0: |
618 | + raise ApplicationError("Failed to create process.") |
619 | + |
620 | + self.setRootDir(job) |
621 | + self.setWorkingDir(job) |
622 | + |
623 | + def reschedule(self, client): |
624 | + client.sendCommand(OperationCode.RESCHEDULE) |
625 | + |
626 | + def setRootDir(self, job): |
627 | + root_dir = self.computerRootDir() |
628 | + job.insertString(Attributes.JOB_ROOT_DIR, root_dir) |
629 | + |
630 | + def setWorkingDir(self, job): |
631 | + working_dir = self.computeWorkingDir(self) |
632 | + job.insertString(Attributes.JOB_WORKING_DIR, working_dir) |
633 | + |
634 | + def computeRootDir(self): |
635 | + return "/" |
636 | + |
637 | + def computeWorkingDir(self): |
638 | + return os.getcwd() |
639 | |
640 | |
641 | def run(): |
642 | |
643 | === modified file 'checkbox/shadowd/queue.py' |
644 | --- checkbox/shadowd/queue.py 2012-04-24 22:43:09 +0000 |
645 | +++ checkbox/shadowd/queue.py 2012-05-01 02:07:19 +0000 |
646 | @@ -30,10 +30,7 @@ |
647 | ) |
648 | from checkbox.daemon.operation import OperationCode |
649 | |
650 | -from checkbox.shadowd.sender import ( |
651 | - send_close_connection, |
652 | - send_close_socket, |
653 | - ) |
654 | +from checkbox.shadowd.sender import Sender |
655 | |
656 | |
657 | def queue_connect(location, timeout=0): |
658 | @@ -57,26 +54,26 @@ |
659 | |
660 | return None |
661 | else: |
662 | - sock = client.startCommand(OperationCode.QUEUE, "tcp", timeout) |
663 | + sock = client.startCommand(OperationCode.ENQUEUE, "tcp", timeout) |
664 | if not sock: |
665 | logging.info("Failed to connect to queue manager.") |
666 | return None |
667 | |
668 | - return sock |
669 | - |
670 | - |
671 | -def queue_disconnect(sock, commit=True): |
672 | + return Sender(sock) |
673 | + |
674 | + |
675 | +def queue_disconnect(sender, commit=True): |
676 | """Close the connection to the shadow daemon job queue and optionally |
677 | commit the transaction. |
678 | |
679 | :param sock: Sock object returned by `queue_connect`. |
680 | :param commit: Optional transaction commit, defaults to True. |
681 | """ |
682 | - if not sock: |
683 | + if not sender: |
684 | return False |
685 | |
686 | if commit: |
687 | - ret = send_close_connection() |
688 | + ret = sender.closeConnection() |
689 | |
690 | - send_close_socket() |
691 | + sender.closeSocket() |
692 | return ret |
693 | |
694 | === modified file 'checkbox/shadowd/sender.py' |
695 | --- checkbox/shadowd/sender.py 2012-04-24 22:43:09 +0000 |
696 | +++ checkbox/shadowd/sender.py 2012-05-01 02:07:19 +0000 |
697 | @@ -19,10 +19,7 @@ |
698 | __metaclass__ = type |
699 | |
700 | __all__ = [ |
701 | - "send_create_cluster", |
702 | - "send_create_process", |
703 | - "send_close_connection", |
704 | - "send_close_socket", |
705 | + "Sender", |
706 | ] |
707 | |
708 | import logging |
709 | @@ -30,47 +27,61 @@ |
710 | from checkbox.shadowd.operation import OperationCode |
711 | |
712 | |
713 | -def send_create_cluster(sock): |
714 | - assert(sock.putInteger(OperationCode.CREATE_CLUSTER)) |
715 | - assert(sock.putEnd()) |
716 | - |
717 | - ret = sock.getInteger() |
718 | - if ret < 0: |
719 | - error = sock.getInteger() |
720 | - logging.info("Received create cluster error: %d", error) |
721 | - |
722 | - assert(sock.getEnd()) |
723 | - return ret |
724 | - |
725 | - |
726 | -def send_create_process(sock, cluster): |
727 | - assert(sock.putInteger(OperationCode.CREATE_PROCESS)) |
728 | - assert(sock.putInteger(cluster)) |
729 | - assert(sock.putEnd()) |
730 | - |
731 | - ret = sock.getInteger() |
732 | - if ret < 0: |
733 | - error = sock.getInteger() |
734 | - logging.info("Received create process error: %d", error) |
735 | - |
736 | - assert(sock.getEnd()) |
737 | - return ret |
738 | - |
739 | - |
740 | -def send_close_connection(sock): |
741 | - assert(sock.putInteger(OperationCode.CLOSE_CONNECTION)) |
742 | - assert(sock.putEnd()) |
743 | - |
744 | - ret = sock.getInteger() |
745 | - if ret < 0: |
746 | - error = sock.getInteger() |
747 | - logging.info("Received close connection error: %d", error) |
748 | - |
749 | - assert(sock.getEnd()) |
750 | - return ret |
751 | - |
752 | - |
753 | -def send_close_socket(sock): |
754 | - assert(sock.putInteger(OperationCode.CLOSE_SOCKET)) |
755 | - assert(sock.putEnd()) |
756 | - return 0 |
757 | +class Sender: |
758 | + |
759 | + __slots__ = ( |
760 | + "_sock", |
761 | + ) |
762 | + |
763 | + def __init__(self, sock): |
764 | + self._sock = sock |
765 | + |
766 | + def createCluster(self): |
767 | + sock = self._sock |
768 | + |
769 | + assert(sock.putInteger(OperationCode.CREATE_CLUSTER)) |
770 | + assert(sock.putEnd()) |
771 | + |
772 | + ret = sock.getInteger() |
773 | + if ret < 0: |
774 | + error = sock.getInteger() |
775 | + logging.info("Received create cluster error: %d", error) |
776 | + |
777 | + assert(sock.getEnd()) |
778 | + return ret |
779 | + |
780 | + def createProcess(self, cluster): |
781 | + sock = self._sock |
782 | + |
783 | + assert(sock.putInteger(OperationCode.CREATE_PROCESS)) |
784 | + assert(sock.putInteger(cluster)) |
785 | + assert(sock.putEnd()) |
786 | + |
787 | + ret = sock.getInteger() |
788 | + if ret < 0: |
789 | + error = sock.getInteger() |
790 | + logging.info("Received create process error: %d", error) |
791 | + |
792 | + assert(sock.getEnd()) |
793 | + return ret |
794 | + |
795 | + def closeConnection(self): |
796 | + sock = self._sock |
797 | + |
798 | + assert(sock.putInteger(OperationCode.CLOSE_CONNECTION)) |
799 | + assert(sock.putEnd()) |
800 | + |
801 | + ret = sock.getInteger() |
802 | + if ret < 0: |
803 | + error = sock.getInteger() |
804 | + logging.info("Received close connection error: %d", error) |
805 | + |
806 | + assert(sock.getEnd()) |
807 | + return ret |
808 | + |
809 | + def closeSocket(self): |
810 | + sock = self._sock |
811 | + |
812 | + assert(sock.putInteger(OperationCode.CLOSE_SOCKET)) |
813 | + assert(sock.putEnd()) |
814 | + return 0 |
815 | |
816 | === added file 'checkbox/shadowd/starter.py' |
817 | --- checkbox/shadowd/starter.py 1970-01-01 00:00:00 +0000 |
818 | +++ checkbox/shadowd/starter.py 2012-05-01 02:07:19 +0000 |
819 | @@ -0,0 +1,128 @@ |
820 | +# |
821 | +# Copyright (c) 2012 Canonical |
822 | +# |
823 | +# This file is part of Checkbox. |
824 | +# |
825 | +# Checkbox is free software; you can redistribute it and/or modify |
826 | +# it under the terms of the GNU Lesser General Public License as |
827 | +# published by the Free Software Foundation; either version 2.1 of |
828 | +# the License, or (at your option) any later version. |
829 | +# |
830 | +# Checkbox is distributed in the hope that it will be useful, |
831 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
832 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
833 | +# GNU Lesser General Public License for more details. |
834 | +# |
835 | +# You should have received a copy of the GNU Lesser General Public License |
836 | +# along with this program. If not, see <http://www.gnu.org/licenses/>. |
837 | +# |
838 | +__metaclass__ = type |
839 | + |
840 | +__all__ = [ |
841 | + "Starter", |
842 | + ] |
843 | + |
844 | +import logging |
845 | + |
846 | +from checkbox.lib import param |
847 | +from checkbox.lib.hostname import fullhostname |
848 | + |
849 | +from checkbox.daemon.operation import OperationCode |
850 | +from checkbox.daemon.server import server |
851 | + |
852 | +from checkbox.process.timeslice import Timeslice |
853 | + |
854 | + |
855 | +class Starter: |
856 | + |
857 | + __slots__ = ( |
858 | + "_name", |
859 | + "_spool", |
860 | + "_interval", |
861 | + "_exit_when_done", |
862 | + "_timer_id", |
863 | + "_start_delay", |
864 | + "_start_count", |
865 | + ) |
866 | + |
867 | + min_interval_timer_set = False |
868 | + |
869 | + def __init__(self): |
870 | + self._name = None |
871 | + self._spool = None |
872 | + self._interval = Timeslice() |
873 | + self._exit_when_done = False |
874 | + self._timer_id = -1 |
875 | + |
876 | + def initialize(self): |
877 | + self._spool = param.get("SPOOL") |
878 | + if not self._spool: |
879 | + raise Exception("No spool directory specified in config file.") |
880 | + |
881 | + self._name = param.get("SHADOWD_NAME") |
882 | + if not self._name: |
883 | + self._name = fullhostname() |
884 | + |
885 | + logging.debug("Using name: %s", self._name) |
886 | + |
887 | + interval = self._interval |
888 | + interval.default_interval = param.getInteger("SHADOWD_INTERVAL", 300) |
889 | + interval.max_interval = interval.default_interval |
890 | + interval.min_interval = param.getInteger("SHADOWD_MIN_INTERVAL", 5) |
891 | + |
892 | + self._start_delay = param.getInteger("SHADOWD_START_DELAY", 5) |
893 | + self._start_count = param.getInteger("SHADOWD_START_COUNT", 1) |
894 | + |
895 | + # TODO: initialize queue |
896 | + |
897 | + def register(self): |
898 | + server.registerCommand(OperationCode.ENQUEUE, self._enqueue) |
899 | + server.registerCommand(OperationCode.RESCHEDULE, self._reschedule) |
900 | + |
901 | + self.registerTimers() |
902 | + |
903 | + def registerTimers(self): |
904 | + if self._timer_id >= 0: |
905 | + server.unregisterTimer(self._timer_id) |
906 | + |
907 | + self._timer_id = server.registerTimer(self.timeout, 10) |
908 | + |
909 | + def timeout(self): |
910 | + # If called too frequently, delay. |
911 | + self._interval.expediteNextRun() |
912 | + time_to_next_run = self._interval.time_to_next_run |
913 | + if time_to_next_run > 0: |
914 | + if not Starter.min_interval_timer_set: |
915 | + logging.debug( |
916 | + "Setting delay until next queue scan to %d seconds", |
917 | + time_to_next_run) |
918 | + server.resetTimer(self._timer_id, time_to_next_run, 1) |
919 | + Starter.min_interval_timer_set = True |
920 | + return |
921 | + |
922 | + # TODO: Check if in walk job queue |
923 | + |
924 | + Starter.min_interval_timer_set = False |
925 | + |
926 | + # TODO: finish this |
927 | + |
928 | + def _enqueue(self, command, stream): |
929 | + print 'ENQUEUE!' |
930 | + return 0 |
931 | + |
932 | + def _reschedule(self, command, stream): |
933 | + if stream and not stream.end(): |
934 | + logging.info("Failed to receive end of message for RESCHEDULE.") |
935 | + return 0 |
936 | + |
937 | + if self._exit_when_done: |
938 | + return 0 |
939 | + |
940 | + self._needReschedule() |
941 | + return 0 |
942 | + |
943 | + def _needReschedule(self): |
944 | + self._need_reschedule = True |
945 | + |
946 | + # Update the portal and request a reschedule. |
947 | + self.timeout() |
948 | |
949 | === modified file 'checkbox/shadowd/tests/test_sender.py' |
950 | --- checkbox/shadowd/tests/test_sender.py 2012-04-24 22:43:09 +0000 |
951 | +++ checkbox/shadowd/tests/test_sender.py 2012-05-01 02:07:19 +0000 |
952 | @@ -28,13 +28,13 @@ |
953 | from checkbox.io.sock import SockAddress |
954 | from checkbox.io.socktcp import SockTCP |
955 | |
956 | -from checkbox.shadowd.sender import send_close_connection |
957 | - |
958 | - |
959 | -class TestSendCloseConnection(ProcessMixin, TestCase): |
960 | +from checkbox.shadowd.sender import Sender |
961 | + |
962 | + |
963 | +class TestSender(ProcessMixin, TestCase): |
964 | |
965 | def setUp(self): |
966 | - super(TestSendCloseConnection, self).setUp() |
967 | + super(TestSender, self).setUp() |
968 | self.server = SockTCP() |
969 | self.server.bind(SockAddress.LOOPBACK) |
970 | self.server.listen() |
971 | @@ -43,8 +43,10 @@ |
972 | self.client = SockTCP() |
973 | self.client.connect(host, port) |
974 | |
975 | + self.sender = Sender(self.client) |
976 | + |
977 | def tearDown(self): |
978 | - super(TestSendCloseConnection, self).setUp() |
979 | + super(TestSender, self).setUp() |
980 | self.client.close() |
981 | self.server.close() |
982 | |
983 | @@ -58,7 +60,7 @@ |
984 | sock.putEnd() |
985 | os._exit(0) |
986 | |
987 | - self.assertEquals(send_close_connection(self.client), 0) |
988 | + self.assertEquals(self.sender.closeConnection(), 0) |
989 | |
990 | def test_failure(self): |
991 | if not self.forkChild(): |
992 | @@ -71,4 +73,4 @@ |
993 | sock.putEnd() |
994 | os._exit(0) |
995 | |
996 | - self.assertEquals(send_close_connection(self.client), -1) |
997 | + self.assertEquals(self.sender.closeConnection(), -1) |
998 | |
999 | === modified file 'configs/checkbox.conf' |
1000 | --- configs/checkbox.conf 2012-04-23 19:34:06 +0000 |
1001 | +++ configs/checkbox.conf 2012-05-01 02:07:19 +0000 |
1002 | @@ -6,6 +6,7 @@ |
1003 | |
1004 | ## Pathnames |
1005 | LOG = %(LOCAL_DIR)s/log |
1006 | +SPOOL = %(LOCAL_DIR)s/spool |
1007 | BIN = %(LOCAL_DIR)s/bin |
1008 | EXECUTE = %(LOCAL_DIR)s/execute |
1009 | |
1010 | @@ -56,3 +57,11 @@ |
1011 | |
1012 | ## Path to the starter binary. |
1013 | STARTER = %(BIN)s/checkbox-runner |
1014 | + |
1015 | +[SHADOWD] |
1016 | + |
1017 | +## How often should the shadowd send an update to the portal? |
1018 | +#INTERVAL = 300 |
1019 | + |
1020 | +## How long should the shadowd wait between spawning each shadow? |
1021 | +#START_DELAY = 2 |
I would add this function to properly close the connection in enqueue():
def disconnect(self, sender):
if not self.is_active:
return
if queue_disconnec t(sender) < 0: r("Failed to close connection.")
raise ApplicationErro
And probably check for errors and raise ApplicationError in the reschedule command.
But both can be implemented later, so approved !