Merge lp:~doanac/lava-celery/scheduler-monitor-support into lp:lava-celery

Proposed by Andy Doan
Status: Merged
Merged at revision: 27
Proposed branch: lp:~doanac/lava-celery/scheduler-monitor-support
Merge into: lp:lava-celery
Diff against target: 281 lines (+150/-50)
3 files modified
lava/celery/commands.py (+70/-3)
lava/celery/tasks.py (+79/-47)
setup.py (+1/-0)
To merge this branch: bzr merge lp:~doanac/lava-celery/scheduler-monitor-support
Reviewer Review Type Date Requested Status
Michael Hudson-Doyle (community) Approve
Review via email: mp+114090@code.launchpad.net

Description of the change

This updates the celery module a few ways:

1) improve run_command_task to take files as arguments
2) use fork/exec for run_command task
3) add a new celery-schedulermonitor command

To post a comment you must log in.
Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :

Yay for all that Unix fd yoga.

I think you should be a bunch more anal about errors in the child process: don't call sys.exit() but rather os._exit() and probably do it in a finally: block whose try: you enter first thing in the pid == 0 branch (going through the finally: clause in the run_command twice would be pretty confusing I think!).

You add celery-schedulermonitor to commands.__all__, not celery_schedulermonitor!

There's something about the way files are handled that makes me uneasy -- the code on the worker side doesn't do anything about directories or permissions, so I could imagine that if we started to write the json into /tmp/blah/foo.json rather than directly in /tmp/ things might break in unexpected ways. But well, maybe that just needs an XXX.

Otherwise, all looks ok!

review: Approve
25. By Andy Doan

retval is now handled in the _exec_command function

26. By Andy Doan

make _exec_command a little safer

This adds some exception handling per mwhudson's review comments to help
ensure we always properly exit the child process.

27. By Andy Doan

fix celery-schedulermonitor command entry

mwhudson caught this in the code review

Revision history for this message
Andy Doan (doanac) wrote :

I just added commits 25,26, and 27 per your review comments. Let me know if they seem sane, and I'll proceed with ther merge

Revision history for this message
Michael Hudson-Doyle (mwhudson) wrote :

Looks good! Sorry for not replying sooner, I think Canonical mail was broken for a while yesterday.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lava/celery/commands.py'
2--- lava/celery/commands.py 2012-06-26 05:05:01 +0000
3+++ lava/celery/commands.py 2012-07-11 03:19:19 +0000
4@@ -15,7 +15,8 @@
5 from lava.tool.command import Command, CommandGroup
6
7
8-__all__ = ["celeryd", "celery", "hello_world", "run_remote"]
9+__all__ = ["celeryd", "celery", "celery_schedulermonitor", "hello_world",
10+ "run_remote"]
11
12 class celeryd(Command):
13 """
14@@ -100,6 +101,7 @@
15
16 @classmethod
17 def register_arguments(cls, parser):
18+ super(run_remote, cls).register_arguments(parser)
19 parser.add_argument(
20 '-s', '--silent',
21 action="store_true",
22@@ -195,7 +197,7 @@
23 self.say("Unknown message: {0}", message)
24 message.reject()
25
26- def invoke(self):
27+ def invoke(self, arg0="lava", files=None):
28 self._configure_celery()
29 # Importing only after we call ._configure_celery()
30 from celery.messaging import establish_connection
31@@ -216,7 +218,7 @@
32 # Invoke a command with specified arguments. NOTE: that we
33 # pass queue.name here which is conveniently also the
34 # routing key. See ._setup_plumbing() for an explanation.
35- args=[queue.name, self.args.args],
36+ args=[queue.name, self.args.args, arg0, files],
37 # Routing key for the celery task workers. This routing
38 # key is different from queue.name. It is used by celery
39 # and celeryd to associate workers with tasks to perform.
40@@ -247,3 +249,68 @@
41 connection.close()
42 self.logger.debug("Remote processing done, return code is %r", self.retval)
43 return self.retval
44+
45+class celery_schedulermonitor(run_remote):
46+ """
47+ Launches a schedulermonitor task on a celery worker node.
48+ """
49+
50+ @classmethod
51+ def register_arguments(cls, parser):
52+ super(celery_schedulermonitor, cls).register_arguments(parser)
53+
54+ group = parser.add_argument_group(title="Dispatcher Configuration")
55+ group.add_argument('--settings-module',
56+ default="lava_server.settings.debian",
57+ help='The Django server settings module to use.')
58+ group.add_argument('dispatcher', metavar='dispatcher',
59+ help='The dispatcher to use like "lava-dispatch"')
60+ group.add_argument('boardname', metavar='boardname',
61+ help='The board to run the job on')
62+ group.add_argument('json_file', metavar='json_file',
63+ help='The job definition to run')
64+
65+ def _get_logfile_impl(self, logfile):
66+ self.logfile = logfile.path
67+
68+ def _get_logfile(self):
69+ os.environ["DJANGO_SETTINGS_MODULE"] = self.args.settings_module
70+ from lava_scheduler_daemon.dbjobsource import DatabaseJobSource
71+ from twisted.internet import reactor
72+
73+ source = DatabaseJobSource()
74+
75+ def run():
76+ d = source.getLogFileForJobOnBoard(self.args.boardname)
77+ d = d.addCallback(self._get_logfile_impl)
78+ d.addCallback(lambda result: reactor.stop())
79+
80+ reactor.callWhenRunning(run)
81+ reactor.run()
82+ return self.logfile
83+
84+ def invoke(self):
85+ logfile = self._get_logfile()
86+
87+ files = {}
88+ with open(self.args.json_file, 'r') as f:
89+ files[self.args.json_file] = f.read()
90+
91+ self._logfd = open(logfile, 'wb')
92+
93+ self.args.args = ['manage', 'schedulermonitor',
94+ self.args.dispatcher, self.args.boardname, self.args.json_file]
95+ super(celery_schedulermonitor, self).invoke('lava-server', files)
96+
97+ def _process_message(self, body, message):
98+ self.logger.debug("Remote worker message: %r", message.payload)
99+ if 'stdout' in message.payload:
100+ self._logfd.write(message.payload['stdout'])
101+ self._logfd.flush()
102+ elif 'stderr' in message.payload:
103+ self._logfd.write(message.payload['stderr'])
104+ self._logfd.flush()
105+ else:
106+ if 'done' in message.payload:
107+ self._logfd.close()
108+ super(celery_schedulermonitor, self)._process_message(body, message)
109
110=== modified file 'lava/celery/tasks.py'
111--- lava/celery/tasks.py 2012-03-26 14:56:09 +0000
112+++ lava/celery/tasks.py 2012-07-11 03:19:19 +0000
113@@ -5,58 +5,91 @@
114 Task definitions imported by celeryd when configured to use lava.celery.config
115 """
116
117+import os
118+import select
119 import sys
120+import traceback
121
122 from celery.messaging import establish_connection
123 from celery.task import task
124 from kombu.compat import Publisher
125-from lava.tool.main import LavaDispatcher
126
127 from lava.celery.queues import StreamExchange
128
129
130 __all__ = ["run_command"]
131
132-
133-class CloudifiedDispatcher(LavaDispatcher):
134- """
135- Dispatcher suitable to run commands and stream their output back to the
136- originator via kombu.
137- """
138-
139- def __init__(self, publisher):
140- super(CloudifiedDispatcher, self).__init__()
141- self.publisher = publisher
142-
143- def say(self, command, message, *args, **kwargs):
144- text = message.format(*args, **kwargs)
145- self.publisher.send({
146- 'command': command.get_name(),
147- 'say': text
148- })
149-
150-
151-class WriteProxy(object):
152- """
153- Simple proxy for File-like objects that redirects write() to a callback
154- """
155-
156- def __init__(self, stream, write_cb):
157- self._stream = stream
158- self._write_cb = write_cb
159-
160- def write(self, data):
161- self._write_cb(data)
162-
163- def __getattr__(self, attr):
164- return getattr(self._stream, attr)
165-
166+def _exec_command(publisher, args, arg0):
167+ """
168+ A traditional fork/exec style action that prints stdout/stderr to the
169+ publisher
170+ """
171+ (out_read, out_write) = os.pipe()
172+ (err_read, err_write) = os.pipe()
173+
174+ pid = os.fork()
175+ if pid == 0:
176+ try:
177+ os.close(out_read)
178+ os.close(err_read)
179+
180+ os.dup2(out_write, 1)
181+ os.dup2(err_write, 2)
182+ args.insert(0, arg0)
183+ os.execvp(args[0], args)
184+ print >>sys.stderr, "lava celeryd error executing ",args
185+ except:
186+ traceback.print_exc()
187+ finally:
188+ os._exit(1)
189+
190+ os.close(out_write)
191+ os.close(err_write)
192+ out_read = os.fdopen(out_read)
193+ err_read = os.fdopen(err_read)
194+
195+ readset = [out_read, err_read]
196+ while readset:
197+ rlist, wlist, xlist = select.select(readset, [], [])
198+ if out_read in rlist:
199+ data = os.read(out_read.fileno(), 1024)
200+ if data == "":
201+ out_read.close()
202+ readset.remove(out_read)
203+ publisher.send({'stdout': data})
204+ if err_read in rlist:
205+ data = os.read(err_read.fileno(), 1024)
206+ if data == "":
207+ err_read.close()
208+ readset.remove(err_read)
209+ publisher.send({'stderr': data})
210+
211+ (pid, retval) = os.waitpid(pid,0)
212+ publisher.send({'done': retval})
213+
214+def _copy_files(files):
215+ """
216+ run command can take a dictionary of filname->content pairs that
217+ the remote side of the command needs
218+ """
219+ if files is None:
220+ return []
221+ deletes = []
222+ for k,v in files.items():
223+ # if path exists, let's assume the celery worker and lava scheduler
224+ # running on the same local system, so we don't need to create the file
225+ if not os.path.exists(k):
226+ with open(k, 'w') as f:
227+ f.write(v)
228+ deletes.insert(0, k)
229+ return deletes
230
231 @task(acks_late=True, ignore_result=True)
232-def run_command(queue_name, args):
233+def run_command(queue_name, args, arg0="lava", files=None):
234 """
235 Run a lava-tool command with the specified arguments
236 """
237+ print "command received:", args
238 connection = establish_connection()
239 channel = connection.channel()
240 publisher = Publisher(
241@@ -64,18 +97,17 @@
242 exchange=StreamExchange,
243 routing_key=queue_name,
244 exchange_type="direct")
245- orig_stdout = sys.stdout
246- sys.stdout = WriteProxy(sys.stdout, lambda data: publisher.send({"stdout": data}))
247- orig_stderr = sys.stderr
248- sys.stderr = WriteProxy(sys.stderr, lambda data: publisher.send({"stderr": data}))
249- retval = None
250+
251 try:
252- retval = CloudifiedDispatcher(publisher).dispatch(args)
253- except SystemExit as exc:
254- retval = exc.code
255+ files = _copy_files(files)
256+ _exec_command(publisher, args, arg0)
257+ except:
258+ traceback.print_exc()
259 finally:
260- publisher.send({'done': retval})
261 publisher.close()
262 connection.close()
263- sys.stdout = orig_stdout
264- sys.stderr = orig_stderr
265+
266+ for f in files:
267+ os.unlink(f)
268+
269+ print "command complete for:",args
270
271=== modified file 'setup.py'
272--- setup.py 2012-06-26 05:05:01 +0000
273+++ setup.py 2012-07-11 03:19:19 +0000
274@@ -17,6 +17,7 @@
275 [lava.commands]
276 celery = lava.celery.commands:celery
277 celeryd = lava.celery.commands:celeryd
278+ celery-schedulermonitor- = lava.celery.commands:celery_schedulermonitor
279 [lava.celery.commands]
280 run-remote= lava.celery.commands:run_remote
281 hello-world = lava.celery.commands:hello_world

Subscribers

People subscribed via source and target branches