Merge lp:~openerp-dev/openobject-server/6.1-here-comes-the-bogeyman-vmt into lp:openobject-server/6.1
- 6.1-here-comes-the-bogeyman-vmt
- Merge into 6.1
Proposed by
Vo Minh Thu
Status: | Merged |
---|---|
Approved by: | Xavier ALT |
Approved revision: | 4135 |
Merged at revision: | 4184 |
Proposed branch: | lp:~openerp-dev/openobject-server/6.1-here-comes-the-bogeyman-vmt |
Merge into: | lp:openobject-server/6.1 |
Diff against target: |
349 lines (+278/-2) 5 files modified
openerp-cron-worker (+111/-0) openerp/__init__.py (+1/-0) openerp/addons/base/ir/ir_cron.py (+155/-0) openerp/cron.py (+10/-1) openerp/wsgi/core.py (+1/-1) |
To merge this branch: | bzr merge lp:~openerp-dev/openobject-server/6.1-here-comes-the-bogeyman-vmt |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Christophe Simonis (OpenERP) | Needs Fixing | ||
Review via email: mp+99897@code.launchpad.net |
Commit message
Description of the change
To post a comment you must log in.
- 4135. By Vo Minh Thu
-
[IMP] cron: use multi-process signaling.
Revision history for this message
Christophe Simonis (OpenERP) (kangol) wrote : | # |
review:
Needs Fixing
Revision history for this message
Vo Minh Thu (thu) wrote : | # |
1. Simply to do 5 times the same thing (which will account for 5 mnutes)
2. I didn't think of it but it wouldn't work: for instance it assumes tools.config[
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === added file 'openerp-cron-worker' | |||
2 | --- openerp-cron-worker 1970-01-01 00:00:00 +0000 | |||
3 | +++ openerp-cron-worker 2012-04-02 12:05:25 +0000 | |||
4 | @@ -0,0 +1,111 @@ | |||
5 | 1 | #!/usr/bin/env python | ||
6 | 2 | # -*- coding: utf-8 -*- | ||
7 | 3 | |||
8 | 4 | """ | ||
9 | 5 | OpenERP cron jobs worker | ||
10 | 6 | |||
11 | 7 | This script executes OpenERP cron jobs. Normally, cron jobs are handled by the | ||
12 | 8 | OpenERP server but depending on deployment needs, independent worker processes | ||
13 | 9 | can be used. This is especially the case when the server is run via Gunicorn. | ||
14 | 10 | |||
15 | 11 | OpenERP cron jobs worker re-uses openerp-server command-line options but does | ||
16 | 12 | not honor all of them. | ||
17 | 13 | |||
18 | 14 | Meaningful options include: | ||
19 | 15 | |||
20 | 16 | -d, --database comma-separated list of databases to monitor for cron jobs | ||
21 | 17 | processing. If left empty, the worker monitors all databases | ||
22 | 18 | (given by `psql -ls`). | ||
23 | 19 | |||
24 | 20 | --addons-path as ususal. | ||
25 | 21 | |||
26 | 22 | --cpu-time-limte | ||
27 | 23 | --virtual-memory-limit | ||
28 | 24 | --virtual-memory-reset Those three options have the same meaning the for | ||
29 | 25 | the server with Gunicorn. The only catch is: To | ||
30 | 26 | not enable rlimits by default, those options are | ||
31 | 27 | honored only when --cpu-time-limte is different than | ||
32 | 28 | 60 (its default value). | ||
33 | 29 | """ | ||
34 | 30 | |||
35 | 31 | import logging | ||
36 | 32 | import os | ||
37 | 33 | import signal | ||
38 | 34 | import sys | ||
39 | 35 | |||
40 | 36 | import openerp | ||
41 | 37 | |||
42 | 38 | # Also use the `openerp` logger for the main script. | ||
43 | 39 | _logger = logging.getLogger('openerp') | ||
44 | 40 | |||
45 | 41 | # Variable keeping track of the number of calls to the signal handler defined | ||
46 | 42 | # below. This variable is monitored by ``quit_on_signals()``. | ||
47 | 43 | quit_signals_received = 0 | ||
48 | 44 | |||
49 | 45 | # TODO copy/pasted from openerp-server | ||
50 | 46 | def signal_handler(sig, frame): | ||
51 | 47 | """ Signal handler: exit ungracefully on the second handled signal. | ||
52 | 48 | |||
53 | 49 | :param sig: the signal number | ||
54 | 50 | :param frame: the interrupted stack frame or None | ||
55 | 51 | """ | ||
56 | 52 | global quit_signals_received | ||
57 | 53 | quit_signals_received += 1 | ||
58 | 54 | import openerp.addons.base | ||
59 | 55 | openerp.addons.base.ir.ir_cron.quit_signal_received = True | ||
60 | 56 | if quit_signals_received == 1 and openerp.addons.base.ir.ir_cron.job_in_progress: | ||
61 | 57 | _logger.info("Waiting for the current job to complete.") | ||
62 | 58 | print "Waiting for the current job to complete." | ||
63 | 59 | print "Hit Ctrl-C again to force shutdown." | ||
64 | 60 | if quit_signals_received > 1: | ||
65 | 61 | # logging.shutdown was already called at this point. | ||
66 | 62 | sys.stderr.write("Forced shutdown.\n") | ||
67 | 63 | os._exit(0) | ||
68 | 64 | |||
69 | 65 | # TODO copy/pasted from openerp-server | ||
70 | 66 | def setup_signal_handlers(): | ||
71 | 67 | """ Register the signal handler defined above. """ | ||
72 | 68 | SIGNALS = map(lambda x: getattr(signal, "SIG%s" % x), "INT TERM".split()) | ||
73 | 69 | if os.name == 'posix': | ||
74 | 70 | map(lambda sig: signal.signal(sig, signal_handler), SIGNALS) | ||
75 | 71 | elif os.name == 'nt': | ||
76 | 72 | import win32api | ||
77 | 73 | win32api.SetConsoleCtrlHandler(lambda sig: signal_handler(sig, None), 1) | ||
78 | 74 | |||
79 | 75 | def list_databases(): | ||
80 | 76 | import subprocess | ||
81 | 77 | p1 = subprocess.Popen(["psql", "-lAt"], stdout=subprocess.PIPE) | ||
82 | 78 | p2 = subprocess.Popen(["cut", "-f", "1", "-d", "|"], stdin=p1.stdout, stdout=subprocess.PIPE) | ||
83 | 79 | p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits. | ||
84 | 80 | output = p2.communicate()[0] | ||
85 | 81 | databases = output.splitlines() | ||
86 | 82 | # TODO filter out non-OpenERP databases | ||
87 | 83 | databases = [d for d in databases if d not in ['template0', 'template1', 'postgres']] | ||
88 | 84 | databases = [d for d in databases if not d.startswith('postgres')] | ||
89 | 85 | return databases | ||
90 | 86 | |||
91 | 87 | if __name__ == '__main__': | ||
92 | 88 | os.environ['TZ'] = 'UTC' | ||
93 | 89 | openerp.tools.config.parse_config(sys.argv[1:]) | ||
94 | 90 | config = openerp.tools.config | ||
95 | 91 | if config['log_handler'] == [':INFO']: | ||
96 | 92 | # Replace the default value, which is suitable for openerp-server. | ||
97 | 93 | config['log_handler'].append('openerp.addons.base.ir.ir_cron:DEBUG') | ||
98 | 94 | setup_signal_handlers() | ||
99 | 95 | openerp.modules.module.initialize_sys_path() | ||
100 | 96 | openerp.modules.loading.open_openerp_namespace() | ||
101 | 97 | openerp.netsvc.init_logger() | ||
102 | 98 | openerp.cron.enable_schedule_wakeup = False | ||
103 | 99 | openerp.multi_process = True # enable multi-process signaling | ||
104 | 100 | import openerp.addons.base | ||
105 | 101 | print "OpenERP cron jobs worker. Hit Ctrl-C to exit." | ||
106 | 102 | print "Documentation is available at the top of the `opener-cron-worker` file." | ||
107 | 103 | if config['db_name']: | ||
108 | 104 | db_names = config['db_name'].split(',') | ||
109 | 105 | print "Monitoring %s databases." % len(db_names) | ||
110 | 106 | else: | ||
111 | 107 | db_names = list_databases | ||
112 | 108 | print "Monitored databases are auto-discovered." | ||
113 | 109 | openerp.addons.base.ir.ir_cron.ir_cron._run(db_names) | ||
114 | 110 | |||
115 | 111 | # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: | ||
116 | 0 | 112 | ||
117 | === modified file 'openerp/__init__.py' | |||
118 | --- openerp/__init__.py 2012-02-08 14:28:34 +0000 | |||
119 | +++ openerp/__init__.py 2012-04-02 12:05:25 +0000 | |||
120 | @@ -27,6 +27,7 @@ | |||
121 | 27 | 27 | ||
122 | 28 | import addons | 28 | import addons |
123 | 29 | import conf | 29 | import conf |
124 | 30 | import cron | ||
125 | 30 | import loglevels | 31 | import loglevels |
126 | 31 | import modules | 32 | import modules |
127 | 32 | import netsvc | 33 | import netsvc |
128 | 33 | 34 | ||
129 | === modified file 'openerp/addons/base/ir/ir_cron.py' | |||
130 | --- openerp/addons/base/ir/ir_cron.py 2012-02-02 09:21:05 +0000 | |||
131 | +++ openerp/addons/base/ir/ir_cron.py 2012-04-02 12:05:25 +0000 | |||
132 | @@ -39,6 +39,14 @@ | |||
133 | 39 | 39 | ||
134 | 40 | _logger = logging.getLogger(__name__) | 40 | _logger = logging.getLogger(__name__) |
135 | 41 | 41 | ||
136 | 42 | # This variable can be set by a signal handler to stop the infinite loop in | ||
137 | 43 | # ir_cron._run() | ||
138 | 44 | quit_signal_received = False | ||
139 | 45 | |||
140 | 46 | # This variable can be checked to know if ir_cron._run() is processing a job or | ||
141 | 47 | # sleeping. | ||
142 | 48 | job_in_progress = True | ||
143 | 49 | |||
144 | 42 | def str2tuple(s): | 50 | def str2tuple(s): |
145 | 43 | return eval('tuple(%s)' % (s or '')) | 51 | return eval('tuple(%s)' % (s or '')) |
146 | 44 | 52 | ||
147 | @@ -266,6 +274,153 @@ | |||
148 | 266 | cr.commit() | 274 | cr.commit() |
149 | 267 | cr.close() | 275 | cr.close() |
150 | 268 | 276 | ||
151 | 277 | def _process_job(self, cr, job): | ||
152 | 278 | """ Run a given job taking care of the repetition. | ||
153 | 279 | |||
154 | 280 | The cursor has a lock on the job (aquired by _acquire_job()). | ||
155 | 281 | |||
156 | 282 | :param job: job to be run (as a dictionary). | ||
157 | 283 | """ | ||
158 | 284 | try: | ||
159 | 285 | now = datetime.now() | ||
160 | 286 | nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT) | ||
161 | 287 | numbercall = job['numbercall'] | ||
162 | 288 | |||
163 | 289 | ok = False | ||
164 | 290 | while nextcall < now and numbercall: | ||
165 | 291 | if numbercall > 0: | ||
166 | 292 | numbercall -= 1 | ||
167 | 293 | if not ok or job['doall']: | ||
168 | 294 | self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id']) | ||
169 | 295 | if numbercall: | ||
170 | 296 | nextcall += _intervalTypes[job['interval_type']](job['interval_number']) | ||
171 | 297 | ok = True | ||
172 | 298 | addsql = '' | ||
173 | 299 | if not numbercall: | ||
174 | 300 | addsql = ', active=False' | ||
175 | 301 | cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s", | ||
176 | 302 | (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id'])) | ||
177 | 303 | |||
178 | 304 | finally: | ||
179 | 305 | cr.commit() | ||
180 | 306 | cr.close() | ||
181 | 307 | |||
182 | 308 | @classmethod | ||
183 | 309 | def _acquire_job(cls, db_name): | ||
184 | 310 | # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py | ||
185 | 311 | """ Try to process one cron job. | ||
186 | 312 | |||
187 | 313 | This selects in database all the jobs that should be processed. It then | ||
188 | 314 | tries to lock each of them and, if it succeeds, run the cron job (if it | ||
189 | 315 | doesn't succeed, it means the job was already locked to be taken care | ||
190 | 316 | of by another thread) and return. | ||
191 | 317 | |||
192 | 318 | If a job was processed, returns True, otherwise returns False. | ||
193 | 319 | """ | ||
194 | 320 | db = openerp.sql_db.db_connect(db_name) | ||
195 | 321 | cr = db.cursor() | ||
196 | 322 | try: | ||
197 | 323 | # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1. | ||
198 | 324 | cr.execute("""SELECT * FROM ir_cron | ||
199 | 325 | WHERE numbercall != 0 | ||
200 | 326 | AND active AND nextcall <= (now() at time zone 'UTC') | ||
201 | 327 | ORDER BY priority""") | ||
202 | 328 | for job in cr.dictfetchall(): | ||
203 | 329 | task_cr = db.cursor() | ||
204 | 330 | try: | ||
205 | 331 | # Try to grab an exclusive lock on the job row from within the task transaction | ||
206 | 332 | acquired_lock = False | ||
207 | 333 | task_cr.execute("""SELECT * | ||
208 | 334 | FROM ir_cron | ||
209 | 335 | WHERE id=%s | ||
210 | 336 | FOR UPDATE NOWAIT""", | ||
211 | 337 | (job['id'],), log_exceptions=False) | ||
212 | 338 | acquired_lock = True | ||
213 | 339 | except psycopg2.OperationalError, e: | ||
214 | 340 | if e.pgcode == '55P03': | ||
215 | 341 | # Class 55: Object not in prerequisite state; 55P03: lock_not_available | ||
216 | 342 | _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name']) | ||
217 | 343 | continue | ||
218 | 344 | else: | ||
219 | 345 | # Unexpected OperationalError | ||
220 | 346 | raise | ||
221 | 347 | finally: | ||
222 | 348 | if not acquired_lock: | ||
223 | 349 | # we're exiting due to an exception while acquiring the lot | ||
224 | 350 | task_cr.close() | ||
225 | 351 | |||
226 | 352 | # Got the lock on the job row, run its code | ||
227 | 353 | _logger.debug('Starting job `%s`.', job['name']) | ||
228 | 354 | openerp.modules.registry.RegistryManager.check_registry_signaling(db_name) | ||
229 | 355 | registry = openerp.pooler.get_pool(db_name) | ||
230 | 356 | registry[cls._name]._process_job(task_cr, job) | ||
231 | 357 | openerp.modules.registry.RegistryManager.signal_caches_change(db_name) | ||
232 | 358 | return True | ||
233 | 359 | |||
234 | 360 | except psycopg2.ProgrammingError, e: | ||
235 | 361 | if e.pgcode == '42P01': | ||
236 | 362 | # Class 42 — Syntax Error or Access Rule Violation; 42P01: undefined_table | ||
237 | 363 | # The table ir_cron does not exist; this is probably not an OpenERP database. | ||
238 | 364 | _logger.warning('Tried to poll an undefined table on database %s.', db_name) | ||
239 | 365 | else: | ||
240 | 366 | raise | ||
241 | 367 | except Exception, ex: | ||
242 | 368 | _logger.warning('Exception in cron:', exc_info=True) | ||
243 | 369 | |||
244 | 370 | finally: | ||
245 | 371 | cr.commit() | ||
246 | 372 | cr.close() | ||
247 | 373 | |||
248 | 374 | return False | ||
249 | 375 | |||
250 | 376 | @classmethod | ||
251 | 377 | def _run(cls, db_names): | ||
252 | 378 | """ | ||
253 | 379 | Class method intended to be run in a dedicated process to handle jobs. | ||
254 | 380 | This polls the database for jobs that can be run every 60 seconds. | ||
255 | 381 | |||
256 | 382 | :param db_names: list of database names to poll or callable to | ||
257 | 383 | generate such a list. | ||
258 | 384 | """ | ||
259 | 385 | global quit_signal_received | ||
260 | 386 | while not quit_signal_received: | ||
261 | 387 | if callable(db_names): | ||
262 | 388 | names = db_names() | ||
263 | 389 | else: | ||
264 | 390 | names = db_names | ||
265 | 391 | for x in xrange(5): | ||
266 | 392 | if quit_signal_received: | ||
267 | 393 | return | ||
268 | 394 | t1 = time.time() | ||
269 | 395 | for db_name in names: | ||
270 | 396 | while True: | ||
271 | 397 | # Small hack to re-use the openerp-server config: | ||
272 | 398 | # If the cpu_time_limit has not its default value, we | ||
273 | 399 | # truly want to establish limits. | ||
274 | 400 | if openerp.tools.config['cpu_time_limit'] != 60: | ||
275 | 401 | openerp.wsgi.core.pre_request('dummy', 'dummy') | ||
276 | 402 | acquired = cls._acquire_job(db_name) | ||
277 | 403 | if openerp.tools.config['cpu_time_limit'] != 60: | ||
278 | 404 | class W(object): | ||
279 | 405 | alive = True | ||
280 | 406 | worker = W() | ||
281 | 407 | openerp.wsgi.core.post_request(worker, 'dummy', 'dummy') | ||
282 | 408 | if not worker.alive: | ||
283 | 409 | return | ||
284 | 410 | if not acquired: | ||
285 | 411 | break | ||
286 | 412 | if quit_signal_received: | ||
287 | 413 | return | ||
288 | 414 | t2 = time.time() | ||
289 | 415 | t = t2 - t1 | ||
290 | 416 | global job_in_progress | ||
291 | 417 | if t > 60: | ||
292 | 418 | _logger.warning('Cron worker: processing all jobs took more than 1 minute to complete (%ss.).', int(t)) | ||
293 | 419 | else: | ||
294 | 420 | job_in_progress = False | ||
295 | 421 | time.sleep(60 - t) | ||
296 | 422 | job_in_progress = True | ||
297 | 423 | |||
298 | 269 | def update_running_cron(self, cr): | 424 | def update_running_cron(self, cr): |
299 | 270 | """ Schedule as soon as possible a wake-up for this database. """ | 425 | """ Schedule as soon as possible a wake-up for this database. """ |
300 | 271 | # Verify whether the server is already started and thus whether we need to commit | 426 | # Verify whether the server is already started and thus whether we need to commit |
301 | 272 | 427 | ||
302 | === modified file 'openerp/cron.py' | |||
303 | --- openerp/cron.py 2012-02-13 11:53:54 +0000 | |||
304 | +++ openerp/cron.py 2012-04-02 12:05:25 +0000 | |||
305 | @@ -49,6 +49,13 @@ | |||
306 | 49 | 49 | ||
307 | 50 | _logger = logging.getLogger(__name__) | 50 | _logger = logging.getLogger(__name__) |
308 | 51 | 51 | ||
309 | 52 | # Scheduling wake-ups (see below) can be disabled when the polling process | ||
310 | 53 | # workers are used instead of the managed thread workers. (I.e. wake-ups are | ||
311 | 54 | # not used since polling is used. And polling is used when the cron are | ||
312 | 55 | # handled by running special processes, e.g. openerp-cron-worker, instead | ||
313 | 56 | # of the general openerp-server script.) | ||
314 | 57 | enable_schedule_wakeup = True | ||
315 | 58 | |||
316 | 52 | # Heapq of database wake-ups. Note that 'database wake-up' meaning is in | 59 | # Heapq of database wake-ups. Note that 'database wake-up' meaning is in |
317 | 53 | # the context of the cron management. This is not originally about loading | 60 | # the context of the cron management. This is not originally about loading |
318 | 54 | # a database, although having the database name in the queue will | 61 | # a database, although having the database name in the queue will |
319 | @@ -135,7 +142,6 @@ | |||
320 | 135 | _wakeups = [] | 142 | _wakeups = [] |
321 | 136 | _wakeup_by_db = {} | 143 | _wakeup_by_db = {} |
322 | 137 | 144 | ||
323 | 138 | |||
324 | 139 | def schedule_wakeup(timestamp, db_name): | 145 | def schedule_wakeup(timestamp, db_name): |
325 | 140 | """ Schedule a new wake-up for a database. | 146 | """ Schedule a new wake-up for a database. |
326 | 141 | 147 | ||
327 | @@ -147,6 +153,9 @@ | |||
328 | 147 | :param timestamp: when the wake-up is scheduled. | 153 | :param timestamp: when the wake-up is scheduled. |
329 | 148 | 154 | ||
330 | 149 | """ | 155 | """ |
331 | 156 | global enable_schedule_wakeup | ||
332 | 157 | if not enable_schedule_wakeup: | ||
333 | 158 | return | ||
334 | 150 | if not timestamp: | 159 | if not timestamp: |
335 | 151 | return | 160 | return |
336 | 152 | with _wakeups_lock: | 161 | with _wakeups_lock: |
337 | 153 | 162 | ||
338 | === modified file 'openerp/wsgi/core.py' | |||
339 | --- openerp/wsgi/core.py 2012-03-05 16:37:30 +0000 | |||
340 | +++ openerp/wsgi/core.py 2012-04-02 12:05:25 +0000 | |||
341 | @@ -506,7 +506,7 @@ | |||
342 | 506 | rss, vms = psutil.Process(os.getpid()).get_memory_info() | 506 | rss, vms = psutil.Process(os.getpid()).get_memory_info() |
343 | 507 | if vms > config['virtual_memory_reset']: | 507 | if vms > config['virtual_memory_reset']: |
344 | 508 | _logger.info('Virtual memory consumption ' | 508 | _logger.info('Virtual memory consumption ' |
346 | 509 | 'too high, rebooting the worker.') | 509 | 'too high, killing the worker.') |
347 | 510 | worker.alive = False # Commit suicide after the request. | 510 | worker.alive = False # Commit suicide after the request. |
348 | 511 | 511 | ||
349 | 512 | # SIGXCPU (exceeded CPU time) signal handler will raise an exception. | 512 | # SIGXCPU (exceeded CPU time) signal handler will raise an exception. |
1/ What is this "for x in xrange(5):" in _run() ? bazaar. launchpad. net/~openerp/ openobject- server/ 6.1/view/ head:/openerp/ service/ web_services. py#L317
2/ why not use existing function for listing the databases ? http://