Merge lp:~openerp-dev/openobject-server/6.1-here-comes-the-bogeyman-vmt into lp:openobject-server/6.1

Proposed by Vo Minh Thu
Status: Merged
Approved by: Xavier ALT
Approved revision: 4135
Merged at revision: 4184
Proposed branch: lp:~openerp-dev/openobject-server/6.1-here-comes-the-bogeyman-vmt
Merge into: lp:openobject-server/6.1
Diff against target: 349 lines (+278/-2)
5 files modified
openerp-cron-worker (+111/-0)
openerp/__init__.py (+1/-0)
openerp/addons/base/ir/ir_cron.py (+155/-0)
openerp/cron.py (+10/-1)
openerp/wsgi/core.py (+1/-1)
To merge this branch: bzr merge lp:~openerp-dev/openobject-server/6.1-here-comes-the-bogeyman-vmt
Reviewer Review Type Date Requested Status
Christophe Simonis (OpenERP) Needs Fixing
Review via email: mp+99897@code.launchpad.net
To post a comment you must log in.
4135. By Vo Minh Thu

[IMP] cron: use multi-process signaling.

Revision history for this message
Christophe Simonis (OpenERP) (kangol) wrote :

1/ What is this "for x in xrange(5):" in _run() ?
2/ why not use existing function for listing the databases ? http://bazaar.launchpad.net/~openerp/openobject-server/6.1/view/head:/openerp/service/web_services.py#L317

review: Needs Fixing
Revision history for this message
Vo Minh Thu (thu) wrote :

1. Simply to do 5 times the same thing (which will account for 5 mnutes)
2. I didn't think of it but it wouldn't work: for instance it assumes tools.config['db_name'] is a single database name. Maybe it would be useful to refactor the code but we can't to that in stable.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'openerp-cron-worker'
2--- openerp-cron-worker 1970-01-01 00:00:00 +0000
3+++ openerp-cron-worker 2012-04-02 12:05:25 +0000
4@@ -0,0 +1,111 @@
5+#!/usr/bin/env python
6+# -*- coding: utf-8 -*-
7+
8+"""
9+OpenERP cron jobs worker
10+
11+This script executes OpenERP cron jobs. Normally, cron jobs are handled by the
12+OpenERP server but depending on deployment needs, independent worker processes
13+can be used. This is especially the case when the server is run via Gunicorn.
14+
15+OpenERP cron jobs worker re-uses openerp-server command-line options but does
16+not honor all of them.
17+
18+Meaningful options include:
19+
20+ -d, --database comma-separated list of databases to monitor for cron jobs
21+ processing. If left empty, the worker monitors all databases
22+ (given by `psql -ls`).
23+
24+ --addons-path as ususal.
25+
26+ --cpu-time-limte
27+ --virtual-memory-limit
28+ --virtual-memory-reset Those three options have the same meaning the for
29+ the server with Gunicorn. The only catch is: To
30+ not enable rlimits by default, those options are
31+ honored only when --cpu-time-limte is different than
32+ 60 (its default value).
33+"""
34+
35+import logging
36+import os
37+import signal
38+import sys
39+
40+import openerp
41+
42+# Also use the `openerp` logger for the main script.
43+_logger = logging.getLogger('openerp')
44+
45+# Variable keeping track of the number of calls to the signal handler defined
46+# below. This variable is monitored by ``quit_on_signals()``.
47+quit_signals_received = 0
48+
49+# TODO copy/pasted from openerp-server
50+def signal_handler(sig, frame):
51+ """ Signal handler: exit ungracefully on the second handled signal.
52+
53+ :param sig: the signal number
54+ :param frame: the interrupted stack frame or None
55+ """
56+ global quit_signals_received
57+ quit_signals_received += 1
58+ import openerp.addons.base
59+ openerp.addons.base.ir.ir_cron.quit_signal_received = True
60+ if quit_signals_received == 1 and openerp.addons.base.ir.ir_cron.job_in_progress:
61+ _logger.info("Waiting for the current job to complete.")
62+ print "Waiting for the current job to complete."
63+ print "Hit Ctrl-C again to force shutdown."
64+ if quit_signals_received > 1:
65+ # logging.shutdown was already called at this point.
66+ sys.stderr.write("Forced shutdown.\n")
67+ os._exit(0)
68+
69+# TODO copy/pasted from openerp-server
70+def setup_signal_handlers():
71+ """ Register the signal handler defined above. """
72+ SIGNALS = map(lambda x: getattr(signal, "SIG%s" % x), "INT TERM".split())
73+ if os.name == 'posix':
74+ map(lambda sig: signal.signal(sig, signal_handler), SIGNALS)
75+ elif os.name == 'nt':
76+ import win32api
77+ win32api.SetConsoleCtrlHandler(lambda sig: signal_handler(sig, None), 1)
78+
79+def list_databases():
80+ import subprocess
81+ p1 = subprocess.Popen(["psql", "-lAt"], stdout=subprocess.PIPE)
82+ p2 = subprocess.Popen(["cut", "-f", "1", "-d", "|"], stdin=p1.stdout, stdout=subprocess.PIPE)
83+ p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits.
84+ output = p2.communicate()[0]
85+ databases = output.splitlines()
86+ # TODO filter out non-OpenERP databases
87+ databases = [d for d in databases if d not in ['template0', 'template1', 'postgres']]
88+ databases = [d for d in databases if not d.startswith('postgres')]
89+ return databases
90+
91+if __name__ == '__main__':
92+ os.environ['TZ'] = 'UTC'
93+ openerp.tools.config.parse_config(sys.argv[1:])
94+ config = openerp.tools.config
95+ if config['log_handler'] == [':INFO']:
96+ # Replace the default value, which is suitable for openerp-server.
97+ config['log_handler'].append('openerp.addons.base.ir.ir_cron:DEBUG')
98+ setup_signal_handlers()
99+ openerp.modules.module.initialize_sys_path()
100+ openerp.modules.loading.open_openerp_namespace()
101+ openerp.netsvc.init_logger()
102+ openerp.cron.enable_schedule_wakeup = False
103+ openerp.multi_process = True # enable multi-process signaling
104+ import openerp.addons.base
105+ print "OpenERP cron jobs worker. Hit Ctrl-C to exit."
106+ print "Documentation is available at the top of the `opener-cron-worker` file."
107+ if config['db_name']:
108+ db_names = config['db_name'].split(',')
109+ print "Monitoring %s databases." % len(db_names)
110+ else:
111+ db_names = list_databases
112+ print "Monitored databases are auto-discovered."
113+ openerp.addons.base.ir.ir_cron.ir_cron._run(db_names)
114+
115+# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
116
117=== modified file 'openerp/__init__.py'
118--- openerp/__init__.py 2012-02-08 14:28:34 +0000
119+++ openerp/__init__.py 2012-04-02 12:05:25 +0000
120@@ -27,6 +27,7 @@
121
122 import addons
123 import conf
124+import cron
125 import loglevels
126 import modules
127 import netsvc
128
129=== modified file 'openerp/addons/base/ir/ir_cron.py'
130--- openerp/addons/base/ir/ir_cron.py 2012-02-02 09:21:05 +0000
131+++ openerp/addons/base/ir/ir_cron.py 2012-04-02 12:05:25 +0000
132@@ -39,6 +39,14 @@
133
134 _logger = logging.getLogger(__name__)
135
136+# This variable can be set by a signal handler to stop the infinite loop in
137+# ir_cron._run()
138+quit_signal_received = False
139+
140+# This variable can be checked to know if ir_cron._run() is processing a job or
141+# sleeping.
142+job_in_progress = True
143+
144 def str2tuple(s):
145 return eval('tuple(%s)' % (s or ''))
146
147@@ -266,6 +274,153 @@
148 cr.commit()
149 cr.close()
150
151+ def _process_job(self, cr, job):
152+ """ Run a given job taking care of the repetition.
153+
154+ The cursor has a lock on the job (aquired by _acquire_job()).
155+
156+ :param job: job to be run (as a dictionary).
157+ """
158+ try:
159+ now = datetime.now()
160+ nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT)
161+ numbercall = job['numbercall']
162+
163+ ok = False
164+ while nextcall < now and numbercall:
165+ if numbercall > 0:
166+ numbercall -= 1
167+ if not ok or job['doall']:
168+ self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id'])
169+ if numbercall:
170+ nextcall += _intervalTypes[job['interval_type']](job['interval_number'])
171+ ok = True
172+ addsql = ''
173+ if not numbercall:
174+ addsql = ', active=False'
175+ cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s",
176+ (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id']))
177+
178+ finally:
179+ cr.commit()
180+ cr.close()
181+
182+ @classmethod
183+ def _acquire_job(cls, db_name):
184+ # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py
185+ """ Try to process one cron job.
186+
187+ This selects in database all the jobs that should be processed. It then
188+ tries to lock each of them and, if it succeeds, run the cron job (if it
189+ doesn't succeed, it means the job was already locked to be taken care
190+ of by another thread) and return.
191+
192+ If a job was processed, returns True, otherwise returns False.
193+ """
194+ db = openerp.sql_db.db_connect(db_name)
195+ cr = db.cursor()
196+ try:
197+ # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1.
198+ cr.execute("""SELECT * FROM ir_cron
199+ WHERE numbercall != 0
200+ AND active AND nextcall <= (now() at time zone 'UTC')
201+ ORDER BY priority""")
202+ for job in cr.dictfetchall():
203+ task_cr = db.cursor()
204+ try:
205+ # Try to grab an exclusive lock on the job row from within the task transaction
206+ acquired_lock = False
207+ task_cr.execute("""SELECT *
208+ FROM ir_cron
209+ WHERE id=%s
210+ FOR UPDATE NOWAIT""",
211+ (job['id'],), log_exceptions=False)
212+ acquired_lock = True
213+ except psycopg2.OperationalError, e:
214+ if e.pgcode == '55P03':
215+ # Class 55: Object not in prerequisite state; 55P03: lock_not_available
216+ _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name'])
217+ continue
218+ else:
219+ # Unexpected OperationalError
220+ raise
221+ finally:
222+ if not acquired_lock:
223+ # we're exiting due to an exception while acquiring the lot
224+ task_cr.close()
225+
226+ # Got the lock on the job row, run its code
227+ _logger.debug('Starting job `%s`.', job['name'])
228+ openerp.modules.registry.RegistryManager.check_registry_signaling(db_name)
229+ registry = openerp.pooler.get_pool(db_name)
230+ registry[cls._name]._process_job(task_cr, job)
231+ openerp.modules.registry.RegistryManager.signal_caches_change(db_name)
232+ return True
233+
234+ except psycopg2.ProgrammingError, e:
235+ if e.pgcode == '42P01':
236+ # Class 42 — Syntax Error or Access Rule Violation; 42P01: undefined_table
237+ # The table ir_cron does not exist; this is probably not an OpenERP database.
238+ _logger.warning('Tried to poll an undefined table on database %s.', db_name)
239+ else:
240+ raise
241+ except Exception, ex:
242+ _logger.warning('Exception in cron:', exc_info=True)
243+
244+ finally:
245+ cr.commit()
246+ cr.close()
247+
248+ return False
249+
250+ @classmethod
251+ def _run(cls, db_names):
252+ """
253+ Class method intended to be run in a dedicated process to handle jobs.
254+ This polls the database for jobs that can be run every 60 seconds.
255+
256+ :param db_names: list of database names to poll or callable to
257+ generate such a list.
258+ """
259+ global quit_signal_received
260+ while not quit_signal_received:
261+ if callable(db_names):
262+ names = db_names()
263+ else:
264+ names = db_names
265+ for x in xrange(5):
266+ if quit_signal_received:
267+ return
268+ t1 = time.time()
269+ for db_name in names:
270+ while True:
271+ # Small hack to re-use the openerp-server config:
272+ # If the cpu_time_limit has not its default value, we
273+ # truly want to establish limits.
274+ if openerp.tools.config['cpu_time_limit'] != 60:
275+ openerp.wsgi.core.pre_request('dummy', 'dummy')
276+ acquired = cls._acquire_job(db_name)
277+ if openerp.tools.config['cpu_time_limit'] != 60:
278+ class W(object):
279+ alive = True
280+ worker = W()
281+ openerp.wsgi.core.post_request(worker, 'dummy', 'dummy')
282+ if not worker.alive:
283+ return
284+ if not acquired:
285+ break
286+ if quit_signal_received:
287+ return
288+ t2 = time.time()
289+ t = t2 - t1
290+ global job_in_progress
291+ if t > 60:
292+ _logger.warning('Cron worker: processing all jobs took more than 1 minute to complete (%ss.).', int(t))
293+ else:
294+ job_in_progress = False
295+ time.sleep(60 - t)
296+ job_in_progress = True
297+
298 def update_running_cron(self, cr):
299 """ Schedule as soon as possible a wake-up for this database. """
300 # Verify whether the server is already started and thus whether we need to commit
301
302=== modified file 'openerp/cron.py'
303--- openerp/cron.py 2012-02-13 11:53:54 +0000
304+++ openerp/cron.py 2012-04-02 12:05:25 +0000
305@@ -49,6 +49,13 @@
306
307 _logger = logging.getLogger(__name__)
308
309+# Scheduling wake-ups (see below) can be disabled when the polling process
310+# workers are used instead of the managed thread workers. (I.e. wake-ups are
311+# not used since polling is used. And polling is used when the cron are
312+# handled by running special processes, e.g. openerp-cron-worker, instead
313+# of the general openerp-server script.)
314+enable_schedule_wakeup = True
315+
316 # Heapq of database wake-ups. Note that 'database wake-up' meaning is in
317 # the context of the cron management. This is not originally about loading
318 # a database, although having the database name in the queue will
319@@ -135,7 +142,6 @@
320 _wakeups = []
321 _wakeup_by_db = {}
322
323-
324 def schedule_wakeup(timestamp, db_name):
325 """ Schedule a new wake-up for a database.
326
327@@ -147,6 +153,9 @@
328 :param timestamp: when the wake-up is scheduled.
329
330 """
331+ global enable_schedule_wakeup
332+ if not enable_schedule_wakeup:
333+ return
334 if not timestamp:
335 return
336 with _wakeups_lock:
337
338=== modified file 'openerp/wsgi/core.py'
339--- openerp/wsgi/core.py 2012-03-05 16:37:30 +0000
340+++ openerp/wsgi/core.py 2012-04-02 12:05:25 +0000
341@@ -506,7 +506,7 @@
342 rss, vms = psutil.Process(os.getpid()).get_memory_info()
343 if vms > config['virtual_memory_reset']:
344 _logger.info('Virtual memory consumption '
345- 'too high, rebooting the worker.')
346+ 'too high, killing the worker.')
347 worker.alive = False # Commit suicide after the request.
348
349 # SIGXCPU (exceeded CPU time) signal handler will raise an exception.