Merge lp:~cr3/checkbox-core/shadowd into lp:checkbox-core

Proposed by Marc Tardif
Status: Merged
Merged at revision: 18
Proposed branch: lp:~cr3/checkbox-core/shadowd
Merge into: lp:checkbox-core
Diff against target: 1021 lines (+513/-132)
14 files modified
bin/checkbox-shadowd (+30/-0)
checkbox/daemon/client.py (+8/-2)
checkbox/daemon/operation.py (+2/-2)
checkbox/daemon/server.py (+10/-7)
checkbox/daemon/subsystem.py (+4/-0)
checkbox/journal/event.py (+81/-25)
checkbox/message/attributes.py (+2/-0)
checkbox/scripts/shadowd.py (+78/-0)
checkbox/scripts/submit.py (+83/-28)
checkbox/shadowd/queue.py (+9/-12)
checkbox/shadowd/sender.py (+59/-48)
checkbox/shadowd/starter.py (+128/-0)
checkbox/shadowd/tests/test_sender.py (+10/-8)
configs/checkbox.conf (+9/-0)
To merge this branch: bzr merge lp:~cr3/checkbox-core/shadowd
Reviewer Review Type Date Requested Status
Sylvain Pineau (community) Approve
Marc Tardif Pending
Review via email: mp+104197@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Sylvain Pineau (sylvain-pineau) wrote :

I would add this function to properly close the connection in enqueue():

    def disconnect(self, sender):
        if not self.is_active:
            return

        if queue_disconnect(sender) < 0:
            raise ApplicationError("Failed to close connection.")

        self.is_active = False

And probably check for errors and raise ApplicationError in the reschedule command.

But both can be implemented later, so approved !

review: Approve
Revision history for this message
Marc Tardif (cr3) wrote :

I've applied both your suggestions and merged into trunk. Thanks!

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'bin/checkbox-shadowd'
2--- bin/checkbox-shadowd 1970-01-01 00:00:00 +0000
3+++ bin/checkbox-shadowd 2012-05-01 02:07:19 +0000
4@@ -0,0 +1,30 @@
5+#!/usr/bin/python
6+#
7+# Copyright (c) 2012 Canonical
8+#
9+# This file is part of Checkbox.
10+#
11+# Checkbox is free software; you can redistribute it and/or modify
12+# it under the terms of the GNU Lesser General Public License as
13+# published by the Free Software Foundation; either version 2.1 of
14+# the License, or (at your option) any later version.
15+#
16+# Checkbox is distributed in the hope that it will be useful,
17+# but WITHOUT ANY WARRANTY; without even the implied warranty of
18+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19+# GNU Lesser General Public License for more details.
20+#
21+# You should have received a copy of the GNU Lesser General Public License
22+# along with this program. If not, see <http://www.gnu.org/licenses/>.
23+#
24+import sys
25+
26+try:
27+ import _preamble
28+except ImportError:
29+ sys.exc_clear()
30+
31+from checkbox.scripts import shadowd
32+
33+
34+shadowd.run()
35
36=== modified file 'checkbox/daemon/client.py'
37--- checkbox/daemon/client.py 2012-04-24 22:43:09 +0000
38+++ checkbox/daemon/client.py 2012-05-01 02:07:19 +0000
39@@ -325,7 +325,13 @@
40 messenger.startCommand(message)
41
42 def _doLocate(self):
43- return True
44+ if self.type == ClientType.ANY:
45+ return True
46+ elif self.type == ClientType.SHADOWD:
47+ return self._getDaemonInfo()
48+ else:
49+ raise Exception(
50+ "Unsupported client type: %s" % ClientType[self.type])
51
52 def _initializeHostname(self):
53 if self._initialized_hostname:
54@@ -471,7 +477,7 @@
55 return True
56
57 def _getConnectorInfo(self):
58- if self._address and is_ipaddress(self._address)[0]:
59+ if self._address and address_is_valid(self._address):
60 self._port = address_to_port(self._address)
61 if self._port > 0:
62 logging.debug("Already have address, no info to locate.")
63
64=== modified file 'checkbox/daemon/operation.py'
65--- checkbox/daemon/operation.py 2012-04-24 22:43:09 +0000
66+++ checkbox/daemon/operation.py 2012-05-01 02:07:19 +0000
67@@ -45,8 +45,8 @@
68 "OFF_PEACEFUL",
69 "SET_PEACEFUL_SHUTDOWN",
70 "TIME_OFFSET",
71- ("SHADOWD_BASE", 100),
72- "QUEUE",
73+ ("SHADOWD_BASE", 200),
74+ "ENQUEUE",
75 "CONTINUE_CLAIM",
76 "SUSPEND_CLAIM",
77 "DEACTIVATE_CLAIM",
78
79=== modified file 'checkbox/daemon/server.py'
80--- checkbox/daemon/server.py 2012-04-23 19:36:19 +0000
81+++ checkbox/daemon/server.py 2012-05-01 02:07:19 +0000
82@@ -560,7 +560,10 @@
83 if sock_handler.function:
84 logging.info("Calling handler for sock")
85
86- result = sock_handler.function(sock)
87+ if sock_handler.data is not None:
88+ result = sock_handler.function(sock, sock_handler.data)
89+ else:
90+ result = sock_handler.function(sock)
91 logging.info("Return from handler")
92
93 elif handle_command:
94@@ -1176,11 +1179,11 @@
95 or sock_timed_out):
96 if (sock_timed_out.sock._finish_connect()
97 != StreamError.WOULDBLOCK):
98- sock_timed_out.call_handler = True
99+ sock_handler.call_handler = True
100 else:
101 if (selector.check(sock, SelectorIO.READ)
102 or sock_timed_out):
103- sock_timed_out.call_handler = True
104+ sock_handler.call_handler = True
105
106 # Scan through the pipe table to find which ones are callable
107 for pipe_handler in self._pipe_table:
108@@ -1239,7 +1242,7 @@
109
110 # Enqueue this packet into the buffers if this
111 # is a udp sock
112- if sock.get_protocol() == "udp":
113+ if sock.protocol == "udp":
114 if not sock.handle_incoming_packet():
115 # There is not yet a complete message ready
116 continue
117@@ -1287,7 +1290,7 @@
118 self._sock_udp.unserialize(string)
119
120 def _handleRequest(self, sock):
121- protocol = sock.get_protocol()
122+ protocol = sock.protocol
123 if protocol == "tcp":
124 is_tcp = True
125 elif protocol == "udp":
126@@ -1376,7 +1379,7 @@
127 return True
128
129 def _handleRequestSockTimerHandler(self, sock):
130- assert(sock.get_protocol() == "tcp")
131+ assert(sock.protocol == "tcp")
132
133 self.unregisterSock(sock)
134
135@@ -1706,7 +1709,7 @@
136 socket.IPPROTO_TCP, socket.TCP_NODELAY, 1):
137 logging.info("Failed to set TCP_NODELAY on TCP command port.")
138
139- if not sock_tcp.listen_on_port(port):
140+ if not sock_tcp.listen(port):
141 return cleanup(
142 "Failed to listen on port %d for TCP command socket.", port)
143
144
145=== modified file 'checkbox/daemon/subsystem.py'
146--- checkbox/daemon/subsystem.py 2012-04-24 22:43:09 +0000
147+++ checkbox/daemon/subsystem.py 2012-05-01 02:07:19 +0000
148@@ -126,6 +126,10 @@
149 self.addEntry(
150 SubsystemType.MASTER, SubsystemCategory.DAEMON, "MASTER")
151 self.addEntry(
152+ SubsystemType.SHADOW, SubsystemCategory.DAEMON, "SHADOW")
153+ self.addEntry(
154+ SubsystemType.SHADOWD, SubsystemCategory.DAEMON, "SHADOWD")
155+ self.addEntry(
156 SubsystemType.RUNNER, SubsystemCategory.DAEMON, "RUNNER")
157 self.addEntry(
158 SubsystemType.RUNNERD, SubsystemCategory.DAEMON, "RUNNERD")
159
160=== modified file 'checkbox/journal/event.py'
161--- checkbox/journal/event.py 2012-04-23 19:36:19 +0000
162+++ checkbox/journal/event.py 2012-05-01 02:07:19 +0000
163@@ -143,12 +143,13 @@
164
165 type = None
166
167- def __init__(self):
168+ def __init__(self, cluster=-1, proc=-1, subproc=-1):
169+ self.cluster = cluster
170+ self.proc = proc
171+ self.subproc = subproc
172+
173 self.event_clock = time.time()
174 self.event_time = time.localtime(self.event_clock)
175- self.cluster = -1
176- self.proc = -1
177- self.subproc = -1
178
179 def _initializeFromRecord(self, record):
180 self.cluster = record.evaluateInteger("cluster", -1)
181@@ -263,8 +264,8 @@
182
183 type = EventType.GENERIC
184
185- def __init__(self):
186- super(GenericEvent, self).__init__()
187+ def __init__(self, *args, **kwargs):
188+ super(GenericEvent, self).__init__(*args, **kwargs)
189 self.data = ""
190
191 def _initializeFromRecord(self, record):
192@@ -276,7 +277,9 @@
193 self.data = record.evaluateString("generic_data", "")
194
195 def _writeEvent(self, file):
196- file.write("%s\n" % self.data)
197+ if file.write("%s\n" % self.data) < 0:
198+ return False
199+
200 return True
201
202 def _readEvent(self, file):
203@@ -292,8 +295,8 @@
204
205 type = EventType.EXECUTE
206
207- def __init__(self):
208- super(ExecuteEvent, self).__init__()
209+ def __init__(self, *args, **kwargs):
210+ super(ExecuteEvent, self).__init__(*args, **kwargs)
211 self.host = ""
212
213 def _initializeFromRecord(self, record):
214@@ -305,7 +308,9 @@
215 self.host = record.evaluateString("execute_host", "")
216
217 def _writeEvent(self, file):
218- file.write("Job executing on host: %s\n" % self.host)
219+ if file.write("Job executing on host: %s\n" % self.host) < 0:
220+ return False
221+
222 return True
223
224 def _readEvent(self, file):
225@@ -326,8 +331,8 @@
226
227 type = EventType.SUSPEND
228
229- def __init__(self):
230- super(SuspendEvent, self).__init__()
231+ def __init__(self, *args, **kwargs):
232+ super(SuspendEvent, self).__init__(*args, **kwargs)
233 self.pid_count = 0
234
235 def _initializeFromRecord(self, record):
236@@ -339,8 +344,13 @@
237 self.pid_count = record.evaluateString("pid_count", 0)
238
239 def _writeEvent(self, file):
240- file.write("Job was suspended\n")
241- file.write("Number of processes suspended: %d\n" % self.pid_count)
242+ if file.write("Job was suspended\n") < 0:
243+ return False
244+
245+ if file.write(
246+ "Number of processes suspended: %d\n" % self.pid_count) < 0:
247+ return False
248+
249 return True
250
251 def _readEvent(self, file):
252@@ -363,7 +373,9 @@
253 type = EventType.RESUME
254
255 def _writeEvent(self, file):
256- file.write("Job was resumed\n")
257+ if file.write("Job was resumed\n") < 0:
258+ return False
259+
260 return True
261
262 def _readEvent(self, file):
263@@ -374,6 +386,42 @@
264 return True
265
266
267+class SubmitEvent(EventInfo):
268+
269+ __slots__ = (
270+ "host",
271+ )
272+
273+ type = EventType.SUBMIT
274+
275+ def __init__(self, *args, **kwargs):
276+ super(SubmitEvent, self).__init__(*args, **kwargs)
277+ self.host = ""
278+
279+ def _writeEvent(self, file):
280+ if file.write("Job submitted from host: %s\n" % self.host) < 0:
281+ return False
282+
283+ return True
284+
285+ def _readEvent(self, file):
286+ line = file.readline()
287+ match = re.match(r"Job submitted from host: (?P<host>[^\s]*)\n", line)
288+ if not match:
289+ return False
290+
291+ self.host = match.group("host")
292+ return True
293+
294+ def _initializeFromRecord(self, record):
295+ super(SubmitEvent, self)._initializeFromRecord(record)
296+
297+ if not record:
298+ return
299+
300+ self.host = record.evaluateString("host", "")
301+
302+
303 class TerminateEvent(EventInfo):
304
305 __slots__ = (
306@@ -395,8 +443,8 @@
307 ("total_sent_bytes", "Total Bytes Sent"),
308 ("total_received_bytes", "Total Bytes Received"))
309
310- def __init__(self):
311- super(TerminateEvent, self).__init__()
312+ def __init__(self, *args, **kwargs):
313+ super(TerminateEvent, self).__init__(*args, **kwargs)
314 self.sent_bytes = self.received_bytes = 0.0
315 self.total_sent_bytes = self.total_received_bytes = 0.0
316 self.return_value = self.signal_number = -1
317@@ -404,25 +452,33 @@
318 self.core_file = None
319
320 def _writeEvent(self, file):
321- file.write("Job terminated\n")
322+ if file.write("Job terminated\n") < 0:
323+ return False
324
325 if self.normal:
326- file.write(
327+ if file.write(
328 "\t(1) Normal termination (return value %d)\n"
329- % self.return_value)
330+ % self.return_value) < 0:
331+ return False
332 else:
333- file.write(
334+ if file.write(
335 "\t(0) Abnormal termination (signal %d)\n"
336- % self.signal_number)
337+ % self.signal_number) < 0:
338+ return False
339
340 if self.core_file:
341- file.write("\t(1) Core file in: %s\n" % self.core_file)
342+ ret = file.write("\t(1) Core file in: %s\n" % self.core_file)
343 else:
344- file.write("\t(0) No core file\n")
345+ ret = file.write("\t(0) No core file\n")
346+
347+ if ret < 0:
348+ return False
349
350 # Write bytes
351 for attribute, string in self.attribute_strings:
352- file.write("\t%.0f - %s\n" % (getattr(self, attribute), string))
353+ if file.write(
354+ "\t%.0f - %s\n" % (getattr(self, attribute), string)) < 0:
355+ return False
356
357 return True
358
359
360=== modified file 'checkbox/message/attributes.py'
361--- checkbox/message/attributes.py 2012-04-23 19:36:19 +0000
362+++ checkbox/message/attributes.py 2012-05-01 02:07:19 +0000
363@@ -43,6 +43,8 @@
364 JOB_STATE="state",
365 JOB_START_TIME="start_time",
366 JOB_EXIT_TIME="exit_time",
367+ JOB_ROOT_DIR="root_dir",
368+ JOB_WORKING_DIR="working_dir",
369
370 CLUSTER_ID="cluster_id",
371 PROC_ID="proc_id",
372
373=== added file 'checkbox/scripts/shadowd.py'
374--- checkbox/scripts/shadowd.py 1970-01-01 00:00:00 +0000
375+++ checkbox/scripts/shadowd.py 2012-05-01 02:07:19 +0000
376@@ -0,0 +1,78 @@
377+#
378+# Copyright (c) 2012 Canonical
379+#
380+# This file is part of Checkbox.
381+#
382+# Checkbox is free software; you can redistribute it and/or modify
383+# it under the terms of the GNU Lesser General Public License as
384+# published by the Free Software Foundation; either version 2.1 of
385+# the License, or (at your option) any later version.
386+#
387+# Checkbox is distributed in the hope that it will be useful,
388+# but WITHOUT ANY WARRANTY; without even the implied warranty of
389+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
390+# GNU Lesser General Public License for more details.
391+#
392+# You should have received a copy of the GNU Lesser General Public License
393+# along with this program. If not, see <http://www.gnu.org/licenses/>.
394+#
395+__metaclass__ = type
396+
397+__all__ = [
398+ "run",
399+ ]
400+
401+from optparse import OptionGroup
402+
403+from checkbox.daemon.subsystem import (
404+ SubsystemType,
405+ set_subsystem,
406+ )
407+
408+from checkbox.shadowd.starter import Starter
409+
410+from checkbox.scripts.application import Application
411+from checkbox.scripts.mixins.config import ConfigMixin
412+from checkbox.scripts.mixins.daemon import DaemonMixin
413+from checkbox.scripts.mixins.logger import LoggerMixin
414+
415+
416+class ShadowdApplication(ConfigMixin, DaemonMixin, LoggerMixin, Application):
417+
418+ # Application defaults
419+ usage = "Usage: %prog [OPTIONS]"
420+
421+ # Shadowd defaults
422+ default_name = None
423+
424+ def addOptions(self, parser):
425+ """See Application."""
426+ super(ShadowdApplication, self).addOptions(parser)
427+
428+ group = OptionGroup(parser, "Shadowd options")
429+ group.add_option("--name",
430+ default=self.default_name,
431+ help="Shadow daemon name.")
432+
433+ parser.add_option_group(group)
434+
435+ def parseOptions(self, options, args):
436+ super(ShadowdApplication, self).parseOptions(options, args)
437+
438+ self.shadow_name = options.name
439+
440+ def preProcess(self):
441+ # Set the subsystem for the daemon server.
442+ set_subsystem("SHADOWD", SubsystemType.SHADOWD)
443+
444+ super(ShadowdApplication, self).preProcess()
445+
446+ def process(self):
447+ starter = Starter()
448+ starter.initialize()
449+ starter.register()
450+
451+
452+def run():
453+ application = ShadowdApplication()
454+ application.run()
455
456=== modified file 'checkbox/scripts/submit.py'
457--- checkbox/scripts/submit.py 2012-04-24 22:43:09 +0000
458+++ checkbox/scripts/submit.py 2012-05-01 02:07:19 +0000
459@@ -22,6 +22,7 @@
460 "run",
461 ]
462
463+import os
464 import sys
465
466 from optparse import (
467@@ -29,7 +30,9 @@
468 OptionValueError,
469 )
470 from signal import SIGPIPE
471+from StringIO import StringIO
472
473+from checkbox.daemon.operation import OperationCode
474 from checkbox.daemon.subsystem import (
475 SubsystemType,
476 set_subsystem,
477@@ -40,6 +43,13 @@
478 sigaction,
479 )
480
481+from checkbox.message.attributes import Attributes
482+from checkbox.message.parser import (
483+ ParserError,
484+ ParserStream,
485+ )
486+from checkbox.message.record import Record
487+
488 from checkbox.scripts.application import (
489 Application,
490 ApplicationError,
491@@ -49,10 +59,6 @@
492
493 from checkbox.shadowd.client import Client
494 from checkbox.shadowd.queue import queue_connect
495-from checkbox.shadowd.sender import (
496- send_create_cluster,
497- send_create_process,
498- )
499
500
501 class SubmitApplication(ConfigMixin, LoggerMixin, Application):
502@@ -71,10 +77,6 @@
503 super(SubmitApplication, self).addOptions(parser)
504
505 group = OptionGroup(parser, "Submit options")
506- group.add_option("--append",
507- action="append",
508- type="string",
509- help="Append expressions to each job in the file.")
510 group.add_option("--pool",
511 metavar="HOST",
512 help="Host as the central manager to query.")
513@@ -102,9 +104,6 @@
514 else:
515 self.file = sys.stdin
516
517- # Append option
518- self.append = options.append
519-
520 if options.local and options.remote:
521 raise OptionValueError(
522 "Must not specify both --local and --remote.")
523@@ -113,7 +112,7 @@
524 elif options.remote:
525 self.name = options.remote
526
527- self.client = Client(self.name, options.pool)
528+ self.pool = options.pool
529
530 def preProcess(self):
531 # Install signal handlers.
532@@ -125,15 +124,50 @@
533 super(SubmitApplication, self).preProcess()
534
535 def process(self):
536- self.queue()
537-
538- def connect(self):
539+ jobs = self.read(self.file)
540+
541+ client = Client(self.name, self.pool)
542+ if not client.locate():
543+ raise ApplicationError("Failed to find address of local shadowd")
544+
545+ self.enqueue(client, jobs)
546+ self.reschedule(client)
547+
548+ def read(self, file):
549+ job = Record()
550+ parser = ParserStream()
551+ for line in file.readlines():
552+ # Skip over comments
553+ line = line.strip()
554+ if line[0] == "#":
555+ continue
556+
557+ if not line:
558+ if job.references:
559+ yield job
560+ job = Record()
561+
562+ try:
563+ tree = parser.parseRecord(StringIO(line))
564+ except ParserError, error:
565+ raise ApplicationError(error)
566+
567+ if not tree:
568+ raise ApplicationError("Failed to parse line: %s" % line)
569+
570+ for key, value in tree.references.iteritems():
571+ job.insert(key, value)
572+
573+ if job.references:
574+ yield job
575+
576+ def connect(self, client):
577 if self.is_active:
578 return
579
580- sock = queue_connect(self.client.address, 0)
581- if not sock:
582- name = self.client.name
583+ sender = queue_connect(client.address, 0)
584+ if not sender:
585+ name = client.name
586 if name:
587 raise ApplicationError(
588 "Failed to connect to queue manager %s" % name)
589@@ -143,21 +177,42 @@
590
591 self.is_active = True
592
593- return
594+ return sender
595
596- def queue(self, count=1):
597+ def enqueue(self, client, jobs):
598 """Place one or more copies of the jobs in the queue.
599
600 :param count: Number of times to submit the jobs to the queue.
601 """
602- sock = self.connect()
603-
604- for i in xrange(count):
605- cluster = send_create_cluster(sock)
606- if cluster < 0:
607- raise ApplicationError("Failed to create cluster.")
608-
609- process = send_create_process(sock, cluster)
610+ sender = self.connect(client)
611+ cluster = sender.createCluster()
612+ if cluster < 0:
613+ raise ApplicationError("Failed to create cluster.")
614+
615+ for job in jobs:
616+ process = sender.createProcess(cluster)
617+ if process < 0:
618+ raise ApplicationError("Failed to create process.")
619+
620+ self.setRootDir(job)
621+ self.setWorkingDir(job)
622+
623+ def reschedule(self, client):
624+ client.sendCommand(OperationCode.RESCHEDULE)
625+
626+ def setRootDir(self, job):
627+ root_dir = self.computerRootDir()
628+ job.insertString(Attributes.JOB_ROOT_DIR, root_dir)
629+
630+ def setWorkingDir(self, job):
631+ working_dir = self.computeWorkingDir(self)
632+ job.insertString(Attributes.JOB_WORKING_DIR, working_dir)
633+
634+ def computeRootDir(self):
635+ return "/"
636+
637+ def computeWorkingDir(self):
638+ return os.getcwd()
639
640
641 def run():
642
643=== modified file 'checkbox/shadowd/queue.py'
644--- checkbox/shadowd/queue.py 2012-04-24 22:43:09 +0000
645+++ checkbox/shadowd/queue.py 2012-05-01 02:07:19 +0000
646@@ -30,10 +30,7 @@
647 )
648 from checkbox.daemon.operation import OperationCode
649
650-from checkbox.shadowd.sender import (
651- send_close_connection,
652- send_close_socket,
653- )
654+from checkbox.shadowd.sender import Sender
655
656
657 def queue_connect(location, timeout=0):
658@@ -57,26 +54,26 @@
659
660 return None
661 else:
662- sock = client.startCommand(OperationCode.QUEUE, "tcp", timeout)
663+ sock = client.startCommand(OperationCode.ENQUEUE, "tcp", timeout)
664 if not sock:
665 logging.info("Failed to connect to queue manager.")
666 return None
667
668- return sock
669-
670-
671-def queue_disconnect(sock, commit=True):
672+ return Sender(sock)
673+
674+
675+def queue_disconnect(sender, commit=True):
676 """Close the connection to the shadow daemon job queue and optionally
677 commit the transaction.
678
679 :param sock: Sock object returned by `queue_connect`.
680 :param commit: Optional transaction commit, defaults to True.
681 """
682- if not sock:
683+ if not sender:
684 return False
685
686 if commit:
687- ret = send_close_connection()
688+ ret = sender.closeConnection()
689
690- send_close_socket()
691+ sender.closeSocket()
692 return ret
693
694=== modified file 'checkbox/shadowd/sender.py'
695--- checkbox/shadowd/sender.py 2012-04-24 22:43:09 +0000
696+++ checkbox/shadowd/sender.py 2012-05-01 02:07:19 +0000
697@@ -19,10 +19,7 @@
698 __metaclass__ = type
699
700 __all__ = [
701- "send_create_cluster",
702- "send_create_process",
703- "send_close_connection",
704- "send_close_socket",
705+ "Sender",
706 ]
707
708 import logging
709@@ -30,47 +27,61 @@
710 from checkbox.shadowd.operation import OperationCode
711
712
713-def send_create_cluster(sock):
714- assert(sock.putInteger(OperationCode.CREATE_CLUSTER))
715- assert(sock.putEnd())
716-
717- ret = sock.getInteger()
718- if ret < 0:
719- error = sock.getInteger()
720- logging.info("Received create cluster error: %d", error)
721-
722- assert(sock.getEnd())
723- return ret
724-
725-
726-def send_create_process(sock, cluster):
727- assert(sock.putInteger(OperationCode.CREATE_PROCESS))
728- assert(sock.putInteger(cluster))
729- assert(sock.putEnd())
730-
731- ret = sock.getInteger()
732- if ret < 0:
733- error = sock.getInteger()
734- logging.info("Received create process error: %d", error)
735-
736- assert(sock.getEnd())
737- return ret
738-
739-
740-def send_close_connection(sock):
741- assert(sock.putInteger(OperationCode.CLOSE_CONNECTION))
742- assert(sock.putEnd())
743-
744- ret = sock.getInteger()
745- if ret < 0:
746- error = sock.getInteger()
747- logging.info("Received close connection error: %d", error)
748-
749- assert(sock.getEnd())
750- return ret
751-
752-
753-def send_close_socket(sock):
754- assert(sock.putInteger(OperationCode.CLOSE_SOCKET))
755- assert(sock.putEnd())
756- return 0
757+class Sender:
758+
759+ __slots__ = (
760+ "_sock",
761+ )
762+
763+ def __init__(self, sock):
764+ self._sock = sock
765+
766+ def createCluster(self):
767+ sock = self._sock
768+
769+ assert(sock.putInteger(OperationCode.CREATE_CLUSTER))
770+ assert(sock.putEnd())
771+
772+ ret = sock.getInteger()
773+ if ret < 0:
774+ error = sock.getInteger()
775+ logging.info("Received create cluster error: %d", error)
776+
777+ assert(sock.getEnd())
778+ return ret
779+
780+ def createProcess(self, cluster):
781+ sock = self._sock
782+
783+ assert(sock.putInteger(OperationCode.CREATE_PROCESS))
784+ assert(sock.putInteger(cluster))
785+ assert(sock.putEnd())
786+
787+ ret = sock.getInteger()
788+ if ret < 0:
789+ error = sock.getInteger()
790+ logging.info("Received create process error: %d", error)
791+
792+ assert(sock.getEnd())
793+ return ret
794+
795+ def closeConnection(self):
796+ sock = self._sock
797+
798+ assert(sock.putInteger(OperationCode.CLOSE_CONNECTION))
799+ assert(sock.putEnd())
800+
801+ ret = sock.getInteger()
802+ if ret < 0:
803+ error = sock.getInteger()
804+ logging.info("Received close connection error: %d", error)
805+
806+ assert(sock.getEnd())
807+ return ret
808+
809+ def closeSocket(self):
810+ sock = self._sock
811+
812+ assert(sock.putInteger(OperationCode.CLOSE_SOCKET))
813+ assert(sock.putEnd())
814+ return 0
815
816=== added file 'checkbox/shadowd/starter.py'
817--- checkbox/shadowd/starter.py 1970-01-01 00:00:00 +0000
818+++ checkbox/shadowd/starter.py 2012-05-01 02:07:19 +0000
819@@ -0,0 +1,128 @@
820+#
821+# Copyright (c) 2012 Canonical
822+#
823+# This file is part of Checkbox.
824+#
825+# Checkbox is free software; you can redistribute it and/or modify
826+# it under the terms of the GNU Lesser General Public License as
827+# published by the Free Software Foundation; either version 2.1 of
828+# the License, or (at your option) any later version.
829+#
830+# Checkbox is distributed in the hope that it will be useful,
831+# but WITHOUT ANY WARRANTY; without even the implied warranty of
832+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
833+# GNU Lesser General Public License for more details.
834+#
835+# You should have received a copy of the GNU Lesser General Public License
836+# along with this program. If not, see <http://www.gnu.org/licenses/>.
837+#
838+__metaclass__ = type
839+
840+__all__ = [
841+ "Starter",
842+ ]
843+
844+import logging
845+
846+from checkbox.lib import param
847+from checkbox.lib.hostname import fullhostname
848+
849+from checkbox.daemon.operation import OperationCode
850+from checkbox.daemon.server import server
851+
852+from checkbox.process.timeslice import Timeslice
853+
854+
855+class Starter:
856+
857+ __slots__ = (
858+ "_name",
859+ "_spool",
860+ "_interval",
861+ "_exit_when_done",
862+ "_timer_id",
863+ "_start_delay",
864+ "_start_count",
865+ )
866+
867+ min_interval_timer_set = False
868+
869+ def __init__(self):
870+ self._name = None
871+ self._spool = None
872+ self._interval = Timeslice()
873+ self._exit_when_done = False
874+ self._timer_id = -1
875+
876+ def initialize(self):
877+ self._spool = param.get("SPOOL")
878+ if not self._spool:
879+ raise Exception("No spool directory specified in config file.")
880+
881+ self._name = param.get("SHADOWD_NAME")
882+ if not self._name:
883+ self._name = fullhostname()
884+
885+ logging.debug("Using name: %s", self._name)
886+
887+ interval = self._interval
888+ interval.default_interval = param.getInteger("SHADOWD_INTERVAL", 300)
889+ interval.max_interval = interval.default_interval
890+ interval.min_interval = param.getInteger("SHADOWD_MIN_INTERVAL", 5)
891+
892+ self._start_delay = param.getInteger("SHADOWD_START_DELAY", 5)
893+ self._start_count = param.getInteger("SHADOWD_START_COUNT", 1)
894+
895+ # TODO: initialize queue
896+
897+ def register(self):
898+ server.registerCommand(OperationCode.ENQUEUE, self._enqueue)
899+ server.registerCommand(OperationCode.RESCHEDULE, self._reschedule)
900+
901+ self.registerTimers()
902+
903+ def registerTimers(self):
904+ if self._timer_id >= 0:
905+ server.unregisterTimer(self._timer_id)
906+
907+ self._timer_id = server.registerTimer(self.timeout, 10)
908+
909+ def timeout(self):
910+ # If called too frequently, delay.
911+ self._interval.expediteNextRun()
912+ time_to_next_run = self._interval.time_to_next_run
913+ if time_to_next_run > 0:
914+ if not Starter.min_interval_timer_set:
915+ logging.debug(
916+ "Setting delay until next queue scan to %d seconds",
917+ time_to_next_run)
918+ server.resetTimer(self._timer_id, time_to_next_run, 1)
919+ Starter.min_interval_timer_set = True
920+ return
921+
922+ # TODO: Check if in walk job queue
923+
924+ Starter.min_interval_timer_set = False
925+
926+ # TODO: finish this
927+
928+ def _enqueue(self, command, stream):
929+ print 'ENQUEUE!'
930+ return 0
931+
932+ def _reschedule(self, command, stream):
933+ if stream and not stream.end():
934+ logging.info("Failed to receive end of message for RESCHEDULE.")
935+ return 0
936+
937+ if self._exit_when_done:
938+ return 0
939+
940+ self._needReschedule()
941+ return 0
942+
943+ def _needReschedule(self):
944+ self._need_reschedule = True
945+
946+ # Update the portal and request a reschedule.
947+ self.timeout()
948
949=== modified file 'checkbox/shadowd/tests/test_sender.py'
950--- checkbox/shadowd/tests/test_sender.py 2012-04-24 22:43:09 +0000
951+++ checkbox/shadowd/tests/test_sender.py 2012-05-01 02:07:19 +0000
952@@ -28,13 +28,13 @@
953 from checkbox.io.sock import SockAddress
954 from checkbox.io.socktcp import SockTCP
955
956-from checkbox.shadowd.sender import send_close_connection
957-
958-
959-class TestSendCloseConnection(ProcessMixin, TestCase):
960+from checkbox.shadowd.sender import Sender
961+
962+
963+class TestSender(ProcessMixin, TestCase):
964
965 def setUp(self):
966- super(TestSendCloseConnection, self).setUp()
967+ super(TestSender, self).setUp()
968 self.server = SockTCP()
969 self.server.bind(SockAddress.LOOPBACK)
970 self.server.listen()
971@@ -43,8 +43,10 @@
972 self.client = SockTCP()
973 self.client.connect(host, port)
974
975+ self.sender = Sender(self.client)
976+
977 def tearDown(self):
978- super(TestSendCloseConnection, self).setUp()
979+ super(TestSender, self).setUp()
980 self.client.close()
981 self.server.close()
982
983@@ -58,7 +60,7 @@
984 sock.putEnd()
985 os._exit(0)
986
987- self.assertEquals(send_close_connection(self.client), 0)
988+ self.assertEquals(self.sender.closeConnection(), 0)
989
990 def test_failure(self):
991 if not self.forkChild():
992@@ -71,4 +73,4 @@
993 sock.putEnd()
994 os._exit(0)
995
996- self.assertEquals(send_close_connection(self.client), -1)
997+ self.assertEquals(self.sender.closeConnection(), -1)
998
999=== modified file 'configs/checkbox.conf'
1000--- configs/checkbox.conf 2012-04-23 19:34:06 +0000
1001+++ configs/checkbox.conf 2012-05-01 02:07:19 +0000
1002@@ -6,6 +6,7 @@
1003
1004 ## Pathnames
1005 LOG = %(LOCAL_DIR)s/log
1006+SPOOL = %(LOCAL_DIR)s/spool
1007 BIN = %(LOCAL_DIR)s/bin
1008 EXECUTE = %(LOCAL_DIR)s/execute
1009
1010@@ -56,3 +57,11 @@
1011
1012 ## Path to the starter binary.
1013 STARTER = %(BIN)s/checkbox-runner
1014+
1015+[SHADOWD]
1016+
1017+## How often should the shadowd send an update to the portal?
1018+#INTERVAL = 300
1019+
1020+## How long should the shadowd wait between spawning each shadow?
1021+#START_DELAY = 2

Subscribers

People subscribed via source and target branches

to all changes: