Merge lp:~openerp-dev/openobject-server/6.1-here-comes-the-bogeyman-vmt into lp:openobject-server/6.1

Proposed by Vo Minh Thu
Status: Merged
Approved by: Xavier ALT
Approved revision: 4135
Merged at revision: 4184
Proposed branch: lp:~openerp-dev/openobject-server/6.1-here-comes-the-bogeyman-vmt
Merge into: lp:openobject-server/6.1
Diff against target: 349 lines (+278/-2)
5 files modified
openerp-cron-worker (+111/-0)
openerp/__init__.py (+1/-0)
openerp/addons/base/ir/ir_cron.py (+155/-0)
openerp/cron.py (+10/-1)
openerp/wsgi/core.py (+1/-1)
To merge this branch: bzr merge lp:~openerp-dev/openobject-server/6.1-here-comes-the-bogeyman-vmt
Reviewer Review Type Date Requested Status
Christophe Simonis (OpenERP) Needs Fixing
Review via email: mp+99897@code.launchpad.net
To post a comment you must log in.
4135. By Vo Minh Thu

[IMP] cron: use multi-process signaling.

Revision history for this message
Christophe Simonis (OpenERP) (kangol) wrote :

1/ What is this "for x in xrange(5):" in _run() ?
2/ why not use existing function for listing the databases ? http://bazaar.launchpad.net/~openerp/openobject-server/6.1/view/head:/openerp/service/web_services.py#L317

review: Needs Fixing
Revision history for this message
Vo Minh Thu (thu) wrote :

1. Simply to do 5 times the same thing (which will account for 5 mnutes)
2. I didn't think of it but it wouldn't work: for instance it assumes tools.config['db_name'] is a single database name. Maybe it would be useful to refactor the code but we can't to that in stable.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== added file 'openerp-cron-worker'
--- openerp-cron-worker 1970-01-01 00:00:00 +0000
+++ openerp-cron-worker 2012-04-02 12:05:25 +0000
@@ -0,0 +1,111 @@
1#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3
4"""
5OpenERP cron jobs worker
6
7This script executes OpenERP cron jobs. Normally, cron jobs are handled by the
8OpenERP server but depending on deployment needs, independent worker processes
9can be used. This is especially the case when the server is run via Gunicorn.
10
11OpenERP cron jobs worker re-uses openerp-server command-line options but does
12not honor all of them.
13
14Meaningful options include:
15
16 -d, --database comma-separated list of databases to monitor for cron jobs
17 processing. If left empty, the worker monitors all databases
18 (given by `psql -ls`).
19
20 --addons-path as ususal.
21
22 --cpu-time-limte
23 --virtual-memory-limit
24 --virtual-memory-reset Those three options have the same meaning the for
25 the server with Gunicorn. The only catch is: To
26 not enable rlimits by default, those options are
27 honored only when --cpu-time-limte is different than
28 60 (its default value).
29"""
30
31import logging
32import os
33import signal
34import sys
35
36import openerp
37
38# Also use the `openerp` logger for the main script.
39_logger = logging.getLogger('openerp')
40
41# Variable keeping track of the number of calls to the signal handler defined
42# below. This variable is monitored by ``quit_on_signals()``.
43quit_signals_received = 0
44
45# TODO copy/pasted from openerp-server
46def signal_handler(sig, frame):
47 """ Signal handler: exit ungracefully on the second handled signal.
48
49 :param sig: the signal number
50 :param frame: the interrupted stack frame or None
51 """
52 global quit_signals_received
53 quit_signals_received += 1
54 import openerp.addons.base
55 openerp.addons.base.ir.ir_cron.quit_signal_received = True
56 if quit_signals_received == 1 and openerp.addons.base.ir.ir_cron.job_in_progress:
57 _logger.info("Waiting for the current job to complete.")
58 print "Waiting for the current job to complete."
59 print "Hit Ctrl-C again to force shutdown."
60 if quit_signals_received > 1:
61 # logging.shutdown was already called at this point.
62 sys.stderr.write("Forced shutdown.\n")
63 os._exit(0)
64
65# TODO copy/pasted from openerp-server
66def setup_signal_handlers():
67 """ Register the signal handler defined above. """
68 SIGNALS = map(lambda x: getattr(signal, "SIG%s" % x), "INT TERM".split())
69 if os.name == 'posix':
70 map(lambda sig: signal.signal(sig, signal_handler), SIGNALS)
71 elif os.name == 'nt':
72 import win32api
73 win32api.SetConsoleCtrlHandler(lambda sig: signal_handler(sig, None), 1)
74
75def list_databases():
76 import subprocess
77 p1 = subprocess.Popen(["psql", "-lAt"], stdout=subprocess.PIPE)
78 p2 = subprocess.Popen(["cut", "-f", "1", "-d", "|"], stdin=p1.stdout, stdout=subprocess.PIPE)
79 p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits.
80 output = p2.communicate()[0]
81 databases = output.splitlines()
82 # TODO filter out non-OpenERP databases
83 databases = [d for d in databases if d not in ['template0', 'template1', 'postgres']]
84 databases = [d for d in databases if not d.startswith('postgres')]
85 return databases
86
87if __name__ == '__main__':
88 os.environ['TZ'] = 'UTC'
89 openerp.tools.config.parse_config(sys.argv[1:])
90 config = openerp.tools.config
91 if config['log_handler'] == [':INFO']:
92 # Replace the default value, which is suitable for openerp-server.
93 config['log_handler'].append('openerp.addons.base.ir.ir_cron:DEBUG')
94 setup_signal_handlers()
95 openerp.modules.module.initialize_sys_path()
96 openerp.modules.loading.open_openerp_namespace()
97 openerp.netsvc.init_logger()
98 openerp.cron.enable_schedule_wakeup = False
99 openerp.multi_process = True # enable multi-process signaling
100 import openerp.addons.base
101 print "OpenERP cron jobs worker. Hit Ctrl-C to exit."
102 print "Documentation is available at the top of the `opener-cron-worker` file."
103 if config['db_name']:
104 db_names = config['db_name'].split(',')
105 print "Monitoring %s databases." % len(db_names)
106 else:
107 db_names = list_databases
108 print "Monitored databases are auto-discovered."
109 openerp.addons.base.ir.ir_cron.ir_cron._run(db_names)
110
111# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
0112
=== modified file 'openerp/__init__.py'
--- openerp/__init__.py 2012-02-08 14:28:34 +0000
+++ openerp/__init__.py 2012-04-02 12:05:25 +0000
@@ -27,6 +27,7 @@
2727
28import addons28import addons
29import conf29import conf
30import cron
30import loglevels31import loglevels
31import modules32import modules
32import netsvc33import netsvc
3334
=== modified file 'openerp/addons/base/ir/ir_cron.py'
--- openerp/addons/base/ir/ir_cron.py 2012-02-02 09:21:05 +0000
+++ openerp/addons/base/ir/ir_cron.py 2012-04-02 12:05:25 +0000
@@ -39,6 +39,14 @@
3939
40_logger = logging.getLogger(__name__)40_logger = logging.getLogger(__name__)
4141
42# This variable can be set by a signal handler to stop the infinite loop in
43# ir_cron._run()
44quit_signal_received = False
45
46# This variable can be checked to know if ir_cron._run() is processing a job or
47# sleeping.
48job_in_progress = True
49
42def str2tuple(s):50def str2tuple(s):
43 return eval('tuple(%s)' % (s or ''))51 return eval('tuple(%s)' % (s or ''))
4452
@@ -266,6 +274,153 @@
266 cr.commit()274 cr.commit()
267 cr.close()275 cr.close()
268276
277 def _process_job(self, cr, job):
278 """ Run a given job taking care of the repetition.
279
280 The cursor has a lock on the job (aquired by _acquire_job()).
281
282 :param job: job to be run (as a dictionary).
283 """
284 try:
285 now = datetime.now()
286 nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
287 numbercall = job['numbercall']
288
289 ok = False
290 while nextcall < now and numbercall:
291 if numbercall > 0:
292 numbercall -= 1
293 if not ok or job['doall']:
294 self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
295 if numbercall:
296 nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
297 ok = True
298 addsql = ''
299 if not numbercall:
300 addsql = ', active=False'
301 cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
302 (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
303
304 finally:
305 cr.commit()
306 cr.close()
307
308 @classmethod
309 def _acquire_job(cls, db_name):
310 # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
311 """ Try to process one cron job.
312
313 This selects in database all the jobs that should be processed. It then
314 tries to lock each of them and, if it succeeds, run the cron job (if it
315 doesn't succeed, it means the job was already locked to be taken care
316 of by another thread) and return.
317
318 If a job was processed, returns True, otherwise returns False.
319 """
320 db = openerp.sql_db.db_connect(db_name)
321 cr = db.cursor()
322 try:
323 # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
324 cr.execute("""SELECT * FROM ir_cron
325 WHERE numbercall != 0
326 AND active AND nextcall <= (now() at time zone 'UTC')
327 ORDER BY priority""")
328 for job in cr.dictfetchall():
329 task_cr = db.cursor()
330 try:
331 # Try to grab an exclusive lock on the job row from within the task transaction
332 acquired_lock = False
333 task_cr.execute("""SELECT *
334 FROM ir_cron
335 WHERE id=%s
336 FOR UPDATE NOWAIT""",
337 (job['id'],), log_exceptions=False)
338 acquired_lock = True
339 except psycopg2.OperationalError, e:
340 if e.pgcode == '55P03':
341 # Class 55: Object not in prerequisite state; 55P03: lock_not_available
342 _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
343 continue
344 else:
345 # Unexpected OperationalError
346 raise
347 finally:
348 if not acquired_lock:
349 # we're exiting due to an exception while acquiring the lot
350 task_cr.close()
351
352 # Got the lock on the job row, run its code
353 _logger.debug('Starting job `%s`.', job['name'])
354 openerp.modules.registry.RegistryManager.check_registry_signaling(db_name)
355 registry = openerp.pooler.get_pool(db_name)
356 registry[cls._name]._process_job(task_cr, job)
357 openerp.modules.registry.RegistryManager.signal_caches_change(db_name)
358 return True
359
360 except psycopg2.ProgrammingError, e:
361 if e.pgcode == '42P01':
362 # Class 42 — Syntax Error or Access Rule Violation; 42P01: undefined_table
363 # The table ir_cron does not exist; this is probably not an OpenERP database.
364 _logger.warning('Tried to poll an undefined table on database %s.', db_name)
365 else:
366 raise
367 except Exception, ex:
368 _logger.warning('Exception in cron:', exc_info=True)
369
370 finally:
371 cr.commit()
372 cr.close()
373
374 return False
375
376 @classmethod
377 def _run(cls, db_names):
378 """
379 Class method intended to be run in a dedicated process to handle jobs.
380 This polls the database for jobs that can be run every 60 seconds.
381
382 :param db_names: list of database names to poll or callable to
383 generate such a list.
384 """
385 global quit_signal_received
386 while not quit_signal_received:
387 if callable(db_names):
388 names = db_names()
389 else:
390 names = db_names
391 for x in xrange(5):
392 if quit_signal_received:
393 return
394 t1 = time.time()
395 for db_name in names:
396 while True:
397 # Small hack to re-use the openerp-server config:
398 # If the cpu_time_limit has not its default value, we
399 # truly want to establish limits.
400 if openerp.tools.config['cpu_time_limit'] != 60:
401 openerp.wsgi.core.pre_request('dummy', 'dummy')
402 acquired = cls._acquire_job(db_name)
403 if openerp.tools.config['cpu_time_limit'] != 60:
404 class W(object):
405 alive = True
406 worker = W()
407 openerp.wsgi.core.post_request(worker, 'dummy', 'dummy')
408 if not worker.alive:
409 return
410 if not acquired:
411 break
412 if quit_signal_received:
413 return
414 t2 = time.time()
415 t = t2 - t1
416 global job_in_progress
417 if t > 60:
418 _logger.warning('Cron worker: processing all jobs took more than 1 minute to complete (%ss.).', int(t))
419 else:
420 job_in_progress = False
421 time.sleep(60 - t)
422 job_in_progress = True
423
269 def update_running_cron(self, cr):424 def update_running_cron(self, cr):
270 """ Schedule as soon as possible a wake-up for this database. """425 """ Schedule as soon as possible a wake-up for this database. """
271 # Verify whether the server is already started and thus whether we need to commit426 # Verify whether the server is already started and thus whether we need to commit
272427
=== modified file 'openerp/cron.py'
--- openerp/cron.py 2012-02-13 11:53:54 +0000
+++ openerp/cron.py 2012-04-02 12:05:25 +0000
@@ -49,6 +49,13 @@
4949
50_logger = logging.getLogger(__name__)50_logger = logging.getLogger(__name__)
5151
52# Scheduling wake-ups (see below) can be disabled when the polling process
53# workers are used instead of the managed thread workers. (I.e. wake-ups are
54# not used since polling is used. And polling is used when the cron are
55# handled by running special processes, e.g. openerp-cron-worker, instead
56# of the general openerp-server script.)
57enable_schedule_wakeup = True
58
52# Heapq of database wake-ups. Note that 'database wake-up' meaning is in59# Heapq of database wake-ups. Note that 'database wake-up' meaning is in
53# the context of the cron management. This is not originally about loading60# the context of the cron management. This is not originally about loading
54# a database, although having the database name in the queue will61# a database, although having the database name in the queue will
@@ -135,7 +142,6 @@
135 _wakeups = []142 _wakeups = []
136 _wakeup_by_db = {}143 _wakeup_by_db = {}
137144
138
139def schedule_wakeup(timestamp, db_name):145def schedule_wakeup(timestamp, db_name):
140 """ Schedule a new wake-up for a database.146 """ Schedule a new wake-up for a database.
141147
@@ -147,6 +153,9 @@
147 :param timestamp: when the wake-up is scheduled.153 :param timestamp: when the wake-up is scheduled.
148154
149 """155 """
156 global enable_schedule_wakeup
157 if not enable_schedule_wakeup:
158 return
150 if not timestamp:159 if not timestamp:
151 return160 return
152 with _wakeups_lock:161 with _wakeups_lock:
153162
=== modified file 'openerp/wsgi/core.py'
--- openerp/wsgi/core.py 2012-03-05 16:37:30 +0000
+++ openerp/wsgi/core.py 2012-04-02 12:05:25 +0000
@@ -506,7 +506,7 @@
506 rss, vms = psutil.Process(os.getpid()).get_memory_info()506 rss, vms = psutil.Process(os.getpid()).get_memory_info()
507 if vms > config['virtual_memory_reset']:507 if vms > config['virtual_memory_reset']:
508 _logger.info('Virtual memory consumption '508 _logger.info('Virtual memory consumption '
509 'too high, rebooting the worker.')509 'too high, killing the worker.')
510 worker.alive = False # Commit suicide after the request.510 worker.alive = False # Commit suicide after the request.
511511
512# SIGXCPU (exceeded CPU time) signal handler will raise an exception.512# SIGXCPU (exceeded CPU time) signal handler will raise an exception.