Merge lp:~openerp-dev/openobject-server/6.1-here-comes-the-bogeyman-vmt into lp:openobject-server/6.1
- 6.1-here-comes-the-bogeyman-vmt
- Merge into 6.1
Proposed by
Vo Minh Thu
Status: | Merged |
---|---|
Approved by: | Xavier ALT |
Approved revision: | 4135 |
Merged at revision: | 4184 |
Proposed branch: | lp:~openerp-dev/openobject-server/6.1-here-comes-the-bogeyman-vmt |
Merge into: | lp:openobject-server/6.1 |
Diff against target: |
349 lines (+278/-2) 5 files modified
openerp-cron-worker (+111/-0) openerp/__init__.py (+1/-0) openerp/addons/base/ir/ir_cron.py (+155/-0) openerp/cron.py (+10/-1) openerp/wsgi/core.py (+1/-1) |
To merge this branch: | bzr merge lp:~openerp-dev/openobject-server/6.1-here-comes-the-bogeyman-vmt |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Christophe Simonis (OpenERP) | Needs Fixing | ||
Review via email: mp+99897@code.launchpad.net |
Commit message
Description of the change
To post a comment you must log in.
- 4135. By Vo Minh Thu
-
[IMP] cron: use multi-process signaling.
Revision history for this message
Christophe Simonis (OpenERP) (kangol) wrote : | # |
review:
Needs Fixing
Revision history for this message
Vo Minh Thu (thu) wrote : | # |
1. Simply to do 5 times the same thing (which will account for 5 mnutes)
2. I didn't think of it but it wouldn't work: for instance it assumes tools.config[
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === added file 'openerp-cron-worker' |
2 | --- openerp-cron-worker 1970-01-01 00:00:00 +0000 |
3 | +++ openerp-cron-worker 2012-04-02 12:05:25 +0000 |
4 | @@ -0,0 +1,111 @@ |
5 | +#!/usr/bin/env python |
6 | +# -*- coding: utf-8 -*- |
7 | + |
8 | +""" |
9 | +OpenERP cron jobs worker |
10 | + |
11 | +This script executes OpenERP cron jobs. Normally, cron jobs are handled by the |
12 | +OpenERP server but depending on deployment needs, independent worker processes |
13 | +can be used. This is especially the case when the server is run via Gunicorn. |
14 | + |
15 | +OpenERP cron jobs worker re-uses openerp-server command-line options but does |
16 | +not honor all of them. |
17 | + |
18 | +Meaningful options include: |
19 | + |
20 | + -d, --database comma-separated list of databases to monitor for cron jobs |
21 | + processing. If left empty, the worker monitors all databases |
22 | + (given by `psql -ls`). |
23 | + |
24 | + --addons-path as ususal. |
25 | + |
26 | + --cpu-time-limte |
27 | + --virtual-memory-limit |
28 | + --virtual-memory-reset Those three options have the same meaning the for |
29 | + the server with Gunicorn. The only catch is: To |
30 | + not enable rlimits by default, those options are |
31 | + honored only when --cpu-time-limte is different than |
32 | + 60 (its default value). |
33 | +""" |
34 | + |
35 | +import logging |
36 | +import os |
37 | +import signal |
38 | +import sys |
39 | + |
40 | +import openerp |
41 | + |
42 | +# Also use the `openerp` logger for the main script. |
43 | +_logger = logging.getLogger('openerp') |
44 | + |
45 | +# Variable keeping track of the number of calls to the signal handler defined |
46 | +# below. This variable is monitored by ``quit_on_signals()``. |
47 | +quit_signals_received = 0 |
48 | + |
49 | +# TODO copy/pasted from openerp-server |
50 | +def signal_handler(sig, frame): |
51 | + """ Signal handler: exit ungracefully on the second handled signal. |
52 | + |
53 | + :param sig: the signal number |
54 | + :param frame: the interrupted stack frame or None |
55 | + """ |
56 | + global quit_signals_received |
57 | + quit_signals_received += 1 |
58 | + import openerp.addons.base |
59 | + openerp.addons.base.ir.ir_cron.quit_signal_received = True |
60 | + if quit_signals_received == 1 and openerp.addons.base.ir.ir_cron.job_in_progress: |
61 | + _logger.info("Waiting for the current job to complete.") |
62 | + print "Waiting for the current job to complete." |
63 | + print "Hit Ctrl-C again to force shutdown." |
64 | + if quit_signals_received > 1: |
65 | + # logging.shutdown was already called at this point. |
66 | + sys.stderr.write("Forced shutdown.\n") |
67 | + os._exit(0) |
68 | + |
69 | +# TODO copy/pasted from openerp-server |
70 | +def setup_signal_handlers(): |
71 | + """ Register the signal handler defined above. """ |
72 | + SIGNALS = map(lambda x: getattr(signal, "SIG%s" % x), "INT TERM".split()) |
73 | + if os.name == 'posix': |
74 | + map(lambda sig: signal.signal(sig, signal_handler), SIGNALS) |
75 | + elif os.name == 'nt': |
76 | + import win32api |
77 | + win32api.SetConsoleCtrlHandler(lambda sig: signal_handler(sig, None), 1) |
78 | + |
79 | +def list_databases(): |
80 | + import subprocess |
81 | + p1 = subprocess.Popen(["psql", "-lAt"], stdout=subprocess.PIPE) |
82 | + p2 = subprocess.Popen(["cut", "-f", "1", "-d", "|"], stdin=p1.stdout, stdout=subprocess.PIPE) |
83 | + p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits. |
84 | + output = p2.communicate()[0] |
85 | + databases = output.splitlines() |
86 | + # TODO filter out non-OpenERP databases |
87 | + databases = [d for d in databases if d not in ['template0', 'template1', 'postgres']] |
88 | + databases = [d for d in databases if not d.startswith('postgres')] |
89 | + return databases |
90 | + |
91 | +if __name__ == '__main__': |
92 | + os.environ['TZ'] = 'UTC' |
93 | + openerp.tools.config.parse_config(sys.argv[1:]) |
94 | + config = openerp.tools.config |
95 | + if config['log_handler'] == [':INFO']: |
96 | + # Replace the default value, which is suitable for openerp-server. |
97 | + config['log_handler'].append('openerp.addons.base.ir.ir_cron:DEBUG') |
98 | + setup_signal_handlers() |
99 | + openerp.modules.module.initialize_sys_path() |
100 | + openerp.modules.loading.open_openerp_namespace() |
101 | + openerp.netsvc.init_logger() |
102 | + openerp.cron.enable_schedule_wakeup = False |
103 | + openerp.multi_process = True # enable multi-process signaling |
104 | + import openerp.addons.base |
105 | + print "OpenERP cron jobs worker. Hit Ctrl-C to exit." |
106 | + print "Documentation is available at the top of the `opener-cron-worker` file." |
107 | + if config['db_name']: |
108 | + db_names = config['db_name'].split(',') |
109 | + print "Monitoring %s databases." % len(db_names) |
110 | + else: |
111 | + db_names = list_databases |
112 | + print "Monitored databases are auto-discovered." |
113 | + openerp.addons.base.ir.ir_cron.ir_cron._run(db_names) |
114 | + |
115 | +# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: |
116 | |
117 | === modified file 'openerp/__init__.py' |
118 | --- openerp/__init__.py 2012-02-08 14:28:34 +0000 |
119 | +++ openerp/__init__.py 2012-04-02 12:05:25 +0000 |
120 | @@ -27,6 +27,7 @@ |
121 | |
122 | import addons |
123 | import conf |
124 | +import cron |
125 | import loglevels |
126 | import modules |
127 | import netsvc |
128 | |
129 | === modified file 'openerp/addons/base/ir/ir_cron.py' |
130 | --- openerp/addons/base/ir/ir_cron.py 2012-02-02 09:21:05 +0000 |
131 | +++ openerp/addons/base/ir/ir_cron.py 2012-04-02 12:05:25 +0000 |
132 | @@ -39,6 +39,14 @@ |
133 | |
134 | _logger = logging.getLogger(__name__) |
135 | |
136 | +# This variable can be set by a signal handler to stop the infinite loop in |
137 | +# ir_cron._run() |
138 | +quit_signal_received = False |
139 | + |
140 | +# This variable can be checked to know if ir_cron._run() is processing a job or |
141 | +# sleeping. |
142 | +job_in_progress = True |
143 | + |
144 | def str2tuple(s): |
145 | return eval('tuple(%s)' % (s or '')) |
146 | |
147 | @@ -266,6 +274,153 @@ |
148 | cr.commit() |
149 | cr.close() |
150 | |
151 | + def _process_job(self, cr, job): |
152 | + """ Run a given job taking care of the repetition. |
153 | + |
154 | + The cursor has a lock on the job (aquired by _acquire_job()). |
155 | + |
156 | + :param job: job to be run (as a dictionary). |
157 | + """ |
158 | + try: |
159 | + now = datetime.now() |
160 | + nextcall = datetime.strptime(job['nextcall'], DEFAULT_SERVER_DATETIME_FORMAT) |
161 | + numbercall = job['numbercall'] |
162 | + |
163 | + ok = False |
164 | + while nextcall < now and numbercall: |
165 | + if numbercall > 0: |
166 | + numbercall -= 1 |
167 | + if not ok or job['doall']: |
168 | + self._callback(cr, job['user_id'], job['model'], job['function'], job['args'], job['id']) |
169 | + if numbercall: |
170 | + nextcall += _intervalTypes[job['interval_type']](job['interval_number']) |
171 | + ok = True |
172 | + addsql = '' |
173 | + if not numbercall: |
174 | + addsql = ', active=False' |
175 | + cr.execute("UPDATE ir_cron SET nextcall=%s, numbercall=%s"+addsql+" WHERE id=%s", |
176 | + (nextcall.strftime(DEFAULT_SERVER_DATETIME_FORMAT), numbercall, job['id'])) |
177 | + |
178 | + finally: |
179 | + cr.commit() |
180 | + cr.close() |
181 | + |
182 | + @classmethod |
183 | + def _acquire_job(cls, db_name): |
184 | + # TODO remove 'check' argument from addons/base_action_rule/base_action_rule.py |
185 | + """ Try to process one cron job. |
186 | + |
187 | + This selects in database all the jobs that should be processed. It then |
188 | + tries to lock each of them and, if it succeeds, run the cron job (if it |
189 | + doesn't succeed, it means the job was already locked to be taken care |
190 | + of by another thread) and return. |
191 | + |
192 | + If a job was processed, returns True, otherwise returns False. |
193 | + """ |
194 | + db = openerp.sql_db.db_connect(db_name) |
195 | + cr = db.cursor() |
196 | + try: |
197 | + # Careful to compare timestamps with 'UTC' - everything is UTC as of v6.1. |
198 | + cr.execute("""SELECT * FROM ir_cron |
199 | + WHERE numbercall != 0 |
200 | + AND active AND nextcall <= (now() at time zone 'UTC') |
201 | + ORDER BY priority""") |
202 | + for job in cr.dictfetchall(): |
203 | + task_cr = db.cursor() |
204 | + try: |
205 | + # Try to grab an exclusive lock on the job row from within the task transaction |
206 | + acquired_lock = False |
207 | + task_cr.execute("""SELECT * |
208 | + FROM ir_cron |
209 | + WHERE id=%s |
210 | + FOR UPDATE NOWAIT""", |
211 | + (job['id'],), log_exceptions=False) |
212 | + acquired_lock = True |
213 | + except psycopg2.OperationalError, e: |
214 | + if e.pgcode == '55P03': |
215 | + # Class 55: Object not in prerequisite state; 55P03: lock_not_available |
216 | + _logger.debug('Another process/thread is already busy executing job `%s`, skipping it.', job['name']) |
217 | + continue |
218 | + else: |
219 | + # Unexpected OperationalError |
220 | + raise |
221 | + finally: |
222 | + if not acquired_lock: |
223 | + # we're exiting due to an exception while acquiring the lot |
224 | + task_cr.close() |
225 | + |
226 | + # Got the lock on the job row, run its code |
227 | + _logger.debug('Starting job `%s`.', job['name']) |
228 | + openerp.modules.registry.RegistryManager.check_registry_signaling(db_name) |
229 | + registry = openerp.pooler.get_pool(db_name) |
230 | + registry[cls._name]._process_job(task_cr, job) |
231 | + openerp.modules.registry.RegistryManager.signal_caches_change(db_name) |
232 | + return True |
233 | + |
234 | + except psycopg2.ProgrammingError, e: |
235 | + if e.pgcode == '42P01': |
236 | + # Class 42 — Syntax Error or Access Rule Violation; 42P01: undefined_table |
237 | + # The table ir_cron does not exist; this is probably not an OpenERP database. |
238 | + _logger.warning('Tried to poll an undefined table on database %s.', db_name) |
239 | + else: |
240 | + raise |
241 | + except Exception, ex: |
242 | + _logger.warning('Exception in cron:', exc_info=True) |
243 | + |
244 | + finally: |
245 | + cr.commit() |
246 | + cr.close() |
247 | + |
248 | + return False |
249 | + |
250 | + @classmethod |
251 | + def _run(cls, db_names): |
252 | + """ |
253 | + Class method intended to be run in a dedicated process to handle jobs. |
254 | + This polls the database for jobs that can be run every 60 seconds. |
255 | + |
256 | + :param db_names: list of database names to poll or callable to |
257 | + generate such a list. |
258 | + """ |
259 | + global quit_signal_received |
260 | + while not quit_signal_received: |
261 | + if callable(db_names): |
262 | + names = db_names() |
263 | + else: |
264 | + names = db_names |
265 | + for x in xrange(5): |
266 | + if quit_signal_received: |
267 | + return |
268 | + t1 = time.time() |
269 | + for db_name in names: |
270 | + while True: |
271 | + # Small hack to re-use the openerp-server config: |
272 | + # If the cpu_time_limit has not its default value, we |
273 | + # truly want to establish limits. |
274 | + if openerp.tools.config['cpu_time_limit'] != 60: |
275 | + openerp.wsgi.core.pre_request('dummy', 'dummy') |
276 | + acquired = cls._acquire_job(db_name) |
277 | + if openerp.tools.config['cpu_time_limit'] != 60: |
278 | + class W(object): |
279 | + alive = True |
280 | + worker = W() |
281 | + openerp.wsgi.core.post_request(worker, 'dummy', 'dummy') |
282 | + if not worker.alive: |
283 | + return |
284 | + if not acquired: |
285 | + break |
286 | + if quit_signal_received: |
287 | + return |
288 | + t2 = time.time() |
289 | + t = t2 - t1 |
290 | + global job_in_progress |
291 | + if t > 60: |
292 | + _logger.warning('Cron worker: processing all jobs took more than 1 minute to complete (%ss.).', int(t)) |
293 | + else: |
294 | + job_in_progress = False |
295 | + time.sleep(60 - t) |
296 | + job_in_progress = True |
297 | + |
298 | def update_running_cron(self, cr): |
299 | """ Schedule as soon as possible a wake-up for this database. """ |
300 | # Verify whether the server is already started and thus whether we need to commit |
301 | |
302 | === modified file 'openerp/cron.py' |
303 | --- openerp/cron.py 2012-02-13 11:53:54 +0000 |
304 | +++ openerp/cron.py 2012-04-02 12:05:25 +0000 |
305 | @@ -49,6 +49,13 @@ |
306 | |
307 | _logger = logging.getLogger(__name__) |
308 | |
309 | +# Scheduling wake-ups (see below) can be disabled when the polling process |
310 | +# workers are used instead of the managed thread workers. (I.e. wake-ups are |
311 | +# not used since polling is used. And polling is used when the cron are |
312 | +# handled by running special processes, e.g. openerp-cron-worker, instead |
313 | +# of the general openerp-server script.) |
314 | +enable_schedule_wakeup = True |
315 | + |
316 | # Heapq of database wake-ups. Note that 'database wake-up' meaning is in |
317 | # the context of the cron management. This is not originally about loading |
318 | # a database, although having the database name in the queue will |
319 | @@ -135,7 +142,6 @@ |
320 | _wakeups = [] |
321 | _wakeup_by_db = {} |
322 | |
323 | - |
324 | def schedule_wakeup(timestamp, db_name): |
325 | """ Schedule a new wake-up for a database. |
326 | |
327 | @@ -147,6 +153,9 @@ |
328 | :param timestamp: when the wake-up is scheduled. |
329 | |
330 | """ |
331 | + global enable_schedule_wakeup |
332 | + if not enable_schedule_wakeup: |
333 | + return |
334 | if not timestamp: |
335 | return |
336 | with _wakeups_lock: |
337 | |
338 | === modified file 'openerp/wsgi/core.py' |
339 | --- openerp/wsgi/core.py 2012-03-05 16:37:30 +0000 |
340 | +++ openerp/wsgi/core.py 2012-04-02 12:05:25 +0000 |
341 | @@ -506,7 +506,7 @@ |
342 | rss, vms = psutil.Process(os.getpid()).get_memory_info() |
343 | if vms > config['virtual_memory_reset']: |
344 | _logger.info('Virtual memory consumption ' |
345 | - 'too high, rebooting the worker.') |
346 | + 'too high, killing the worker.') |
347 | worker.alive = False # Commit suicide after the request. |
348 | |
349 | # SIGXCPU (exceeded CPU time) signal handler will raise an exception. |
1/ What is this "for x in xrange(5):" in _run() ? bazaar. launchpad. net/~openerp/ openobject- server/ 6.1/view/ head:/openerp/ service/ web_services. py#L317
2/ why not use existing function for listing the databases ? http://