Merge lp:~openerp-dev/openobject-server/6.1-gunicorn-signaling-vmt into lp:openobject-server/6.1

Proposed by Vo Minh Thu
Status: Merged
Merged at revision: 4126
Proposed branch: lp:~openerp-dev/openobject-server/6.1-gunicorn-signaling-vmt
Merge into: lp:openobject-server/6.1
Diff against target: 353 lines (+139/-34)
10 files modified
gunicorn.conf.py (+0/-1)
openerp/__init__.py (+7/-0)
openerp/addons/base/ir/ir_ui_menu.py (+5/-1)
openerp/addons/base/module/module.py (+2/-0)
openerp/cron.py (+7/-4)
openerp/modules/registry.py (+109/-0)
openerp/osv/orm.py (+1/-0)
openerp/service/web_services.py (+4/-0)
openerp/tools/cache.py (+2/-0)
openerp/wsgi/core.py (+2/-28)
To merge this branch: bzr merge lp:~openerp-dev/openobject-server/6.1-gunicorn-signaling-vmt
Reviewer Review Type Date Requested Status
Vo Minh Thu (community) Approve
Christophe Simonis (OpenERP) Approve
Review via email: mp+95936@code.launchpad.net
To post a comment you must log in.
4019. By Vo Minh Thu

[IMP] multi-process: moved signaling sequences to registry creation instead of base.sql.

4020. By Vo Minh Thu

[FIX] multi-process signaling: one query instead of two (based on chs idea).

4021. By Vo Minh Thu

[FIX] removed spurious print statement.

4022. By Vo Minh Thu

[FIX] typo.

Revision history for this message
Christophe Simonis (OpenERP) (kangol) :
review: Approve
Revision history for this message
Vo Minh Thu (thu) wrote :

check_registry_signaling should also refresh the fields.float columns, by calling digits_change(), when the decimal_precision module is installed.

review: Needs Fixing
4023. By Vo Minh Thu

[MERGE] merged trunk.

4024. By Vo Minh Thu

[IMP] signaling: call also digits_change() when caches are cleared.

4025. By Vo Minh Thu

[REV] reverted local (and mistakenly commited) changes to gunicorn.conf.py.

Revision history for this message
Vo Minh Thu (thu) wrote :

My latest comment is satisfied now.

Description of the patch:

This adds a simple signaling scheme between OpenERP server instances, but also between processes (managed by, say, gunicorn). The signaling is done through the database by using two PostgreSQL sequences: one for registry invalidation, and one for caches invalidation.

The former case happens when installing a new module, the later when an tools.ormcache is cleared.

The idea is to keep inside each process a value (one per signal) provided by the db. Whenever a registry has been changed, or a cache has been cleared inside a process, the value in database is incremented. Also, whenever a process notices its internal value and the database are out of sync, its registry or caches are reloaded/cleared. Each process tests its internal values against the database at the beginning of every request handling.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'gunicorn.conf.py'
--- gunicorn.conf.py 2012-02-10 15:25:21 +0000
+++ gunicorn.conf.py 2012-03-23 13:22:20 +0000
@@ -25,7 +25,6 @@
2525
26# Some application-wide initialization is needed.26# Some application-wide initialization is needed.
27on_starting = openerp.wsgi.core.on_starting27on_starting = openerp.wsgi.core.on_starting
28when_ready = openerp.wsgi.core.when_ready
29pre_request = openerp.wsgi.core.pre_request28pre_request = openerp.wsgi.core.pre_request
30post_request = openerp.wsgi.core.post_request29post_request = openerp.wsgi.core.post_request
3130
3231
=== modified file 'openerp/__init__.py'
--- openerp/__init__.py 2011-09-27 16:51:33 +0000
+++ openerp/__init__.py 2012-03-23 13:22:20 +0000
@@ -45,5 +45,12 @@
45import workflow45import workflow
46import wsgi46import wsgi
4747
48# Is the server running in multi-process mode (e.g. behind Gunicorn).
49# If this is True, the processes have to communicate some events,
50# e.g. database update or cache invalidation. Each process has also
51# its own copy of the data structure and we don't need to care about
52# locks between threads.
53multi_process = False
54
48# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:55# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
4956
5057
=== modified file 'openerp/addons/base/ir/ir_ui_menu.py'
--- openerp/addons/base/ir/ir_ui_menu.py 2012-02-10 08:26:37 +0000
+++ openerp/addons/base/ir/ir_ui_menu.py 2012-03-23 13:22:20 +0000
@@ -42,7 +42,7 @@
4242
43 def __init__(self, *args, **kwargs):43 def __init__(self, *args, **kwargs):
44 self.cache_lock = threading.RLock()44 self.cache_lock = threading.RLock()
45 self.clear_cache()45 self._cache = {}
46 r = super(ir_ui_menu, self).__init__(*args, **kwargs)46 r = super(ir_ui_menu, self).__init__(*args, **kwargs)
47 self.pool.get('ir.model.access').register_cache_clearing_method(self._name, 'clear_cache')47 self.pool.get('ir.model.access').register_cache_clearing_method(self._name, 'clear_cache')
48 return r48 return r
@@ -50,6 +50,10 @@
50 def clear_cache(self):50 def clear_cache(self):
51 with self.cache_lock:51 with self.cache_lock:
52 # radical but this doesn't frequently happen52 # radical but this doesn't frequently happen
53 if self._cache:
54 # Normally this is done by openerp.tools.ormcache
55 # but since we do not use it, set it by ourself.
56 self.pool._any_cache_cleared = True
53 self._cache = {}57 self._cache = {}
5458
55 def _filter_visible_menus(self, cr, uid, ids, context=None):59 def _filter_visible_menus(self, cr, uid, ids, context=None):
5660
=== modified file 'openerp/addons/base/module/module.py'
--- openerp/addons/base/module/module.py 2012-03-01 01:47:08 +0000
+++ openerp/addons/base/module/module.py 2012-03-23 13:22:20 +0000
@@ -30,6 +30,7 @@
30import zipfile30import zipfile
31import zipimport31import zipimport
3232
33import openerp
33import openerp.modules as addons34import openerp.modules as addons
34import pooler35import pooler
35import release36import release
@@ -344,6 +345,7 @@
344 if to_install_ids:345 if to_install_ids:
345 self.button_install(cr, uid, to_install_ids, context=context)346 self.button_install(cr, uid, to_install_ids, context=context)
346347
348 openerp.modules.registry.RegistryManager.signal_registry_change(cr.dbname)
347 return dict(ACTION_DICT, name=_('Install'))349 return dict(ACTION_DICT, name=_('Install'))
348350
349 def button_immediate_install(self, cr, uid, ids, context=None):351 def button_immediate_install(self, cr, uid, ids, context=None):
350352
=== modified file 'openerp/cron.py'
--- openerp/cron.py 2012-01-24 11:07:30 +0000
+++ openerp/cron.py 2012-03-23 13:22:20 +0000
@@ -204,9 +204,12 @@
204 _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), "204 _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), "
205 "this may cause trouble if you reach that number of parallel cron tasks.",205 "this may cause trouble if you reach that number of parallel cron tasks.",
206 db_maxconn, _thread_slots)206 db_maxconn, _thread_slots)
207 t = threading.Thread(target=runner, name="openerp.cron.master_thread")207 if _thread_slots:
208 t.setDaemon(True)208 t = threading.Thread(target=runner, name="openerp.cron.master_thread")
209 t.start()209 t.setDaemon(True)
210 _logger.debug("Master cron daemon started!")210 t.start()
211 _logger.debug("Master cron daemon started!")
212 else:
213 _logger.info("No master cron daemon (0 workers needed).")
211214
212# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:215# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
213216
=== modified file 'openerp/modules/registry.py'
--- openerp/modules/registry.py 2012-01-24 12:42:52 +0000
+++ openerp/modules/registry.py 2012-03-23 13:22:20 +0000
@@ -51,6 +51,18 @@
51 self.db_name = db_name51 self.db_name = db_name
52 self.db = openerp.sql_db.db_connect(db_name)52 self.db = openerp.sql_db.db_connect(db_name)
5353
54 # Inter-process signaling (used only when openerp.multi_process is True):
55 # The `base_registry_signaling` sequence indicates the whole registry
56 # must be reloaded.
57 # The `base_cache_signaling sequence` indicates all caches must be
58 # invalidated (i.e. cleared).
59 self.base_registry_signaling_sequence = 1
60 self.base_cache_signaling_sequence = 1
61
62 # Flag indicating if at least one model cache has been cleared.
63 # Useful only in a multi-process context.
64 self._any_cache_cleared = False
65
54 cr = self.db.cursor()66 cr = self.db.cursor()
55 has_unaccent = openerp.modules.db.has_unaccent(cr)67 has_unaccent = openerp.modules.db.has_unaccent(cr)
56 if openerp.tools.config['unaccent'] and not has_unaccent:68 if openerp.tools.config['unaccent'] and not has_unaccent:
@@ -114,6 +126,36 @@
114 """126 """
115 for model in self.models.itervalues():127 for model in self.models.itervalues():
116 model.clear_caches()128 model.clear_caches()
129 # Special case for ir_ui_menu which does not use openerp.tools.ormcache.
130 ir_ui_menu = self.models.get('ir.ui.menu')
131 if ir_ui_menu:
132 ir_ui_menu.clear_cache()
133
134
135 # Useful only in a multi-process context.
136 def reset_any_cache_cleared(self):
137 self._any_cache_cleared = False
138
139 # Useful only in a multi-process context.
140 def any_cache_cleared(self):
141 return self._any_cache_cleared
142
143 @classmethod
144 def setup_multi_process_signaling(cls, cr):
145 if not openerp.multi_process:
146 return
147
148 # Inter-process signaling:
149 # The `base_registry_signaling` sequence indicates the whole registry
150 # must be reloaded.
151 # The `base_cache_signaling sequence` indicates all caches must be
152 # invalidated (i.e. cleared).
153 cr.execute("""SELECT sequence_name FROM information_schema.sequences WHERE sequence_name='base_registry_signaling'""")
154 if not cr.fetchall():
155 cr.execute("""CREATE SEQUENCE base_registry_signaling INCREMENT BY 1 START WITH 1""")
156 cr.execute("""SELECT nextval('base_registry_signaling')""")
157 cr.execute("""CREATE SEQUENCE base_cache_signaling INCREMENT BY 1 START WITH 1""")
158 cr.execute("""SELECT nextval('base_cache_signaling')""")
117159
118class RegistryManager(object):160class RegistryManager(object):
119 """ Model registries manager.161 """ Model registries manager.
@@ -164,6 +206,7 @@
164206
165 cr = registry.db.cursor()207 cr = registry.db.cursor()
166 try:208 try:
209 Registry.setup_multi_process_signaling(cr)
167 registry.do_parent_store(cr)210 registry.do_parent_store(cr)
168 registry.get('ir.actions.report.xml').register_all(cr)211 registry.get('ir.actions.report.xml').register_all(cr)
169 cr.commit()212 cr.commit()
@@ -215,5 +258,71 @@
215 if db_name in cls.registries:258 if db_name in cls.registries:
216 cls.registries[db_name].clear_caches()259 cls.registries[db_name].clear_caches()
217260
261 @classmethod
262 def check_registry_signaling(cls, db_name):
263 if openerp.multi_process and db_name in cls.registries:
264 registry = cls.get(db_name, pooljobs=False)
265 cr = registry.db.cursor()
266 try:
267 cr.execute("""
268 SELECT base_registry_signaling.last_value,
269 base_cache_signaling.last_value
270 FROM base_registry_signaling, base_cache_signaling""")
271 r, c = cr.fetchone()
272 # Check if the model registry must be reloaded (e.g. after the
273 # database has been updated by another process).
274 if registry.base_registry_signaling_sequence != r:
275 _logger.info("Reloading the model registry after database signaling.")
276 # Don't run the cron in the Gunicorn worker.
277 registry = cls.new(db_name, pooljobs=False)
278 registry.base_registry_signaling_sequence = r
279 # Check if the model caches must be invalidated (e.g. after a write
280 # occured on another process). Don't clear right after a registry
281 # has been reload.
282 elif registry.base_cache_signaling_sequence != c:
283 _logger.info("Invalidating all model caches after database signaling.")
284 registry.base_cache_signaling_sequence = c
285 registry.clear_caches()
286 registry.reset_any_cache_cleared()
287 # One possible reason caches have been invalidated is the
288 # use of decimal_precision.write(), in which case we need
289 # to refresh fields.float columns.
290 for model in registry.models.values():
291 for column in model._columns.values():
292 if hasattr(column, 'digits_change'):
293 column.digits_change(cr)
294 finally:
295 cr.close()
296
297 @classmethod
298 def signal_caches_change(cls, db_name):
299 if openerp.multi_process and db_name in cls.registries:
300 # Check the registries if any cache has been cleared and signal it
301 # through the database to other processes.
302 registry = cls.get(db_name, pooljobs=False)
303 if registry.any_cache_cleared():
304 _logger.info("At least one model cache has been cleared, signaling through the database.")
305 cr = registry.db.cursor()
306 r = 1
307 try:
308 cr.execute("select nextval('base_cache_signaling')")
309 r = cr.fetchone()[0]
310 finally:
311 cr.close()
312 registry.base_cache_signaling_sequence = r
313 registry.reset_any_cache_cleared()
314
315 @classmethod
316 def signal_registry_change(cls, db_name):
317 if openerp.multi_process and db_name in cls.registries:
318 registry = cls.get(db_name, pooljobs=False)
319 cr = registry.db.cursor()
320 r = 1
321 try:
322 cr.execute("select nextval('base_registry_signaling')")
323 r = cr.fetchone()[0]
324 finally:
325 cr.close()
326 registry.base_registry_signaling_sequence = r
218327
219# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:328# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
220329
=== modified file 'openerp/osv/orm.py'
--- openerp/osv/orm.py 2012-03-09 08:19:29 +0000
+++ openerp/osv/orm.py 2012-03-23 13:22:20 +0000
@@ -2389,6 +2389,7 @@
2389 try:2389 try:
2390 getattr(self, '_ormcache')2390 getattr(self, '_ormcache')
2391 self._ormcache = {}2391 self._ormcache = {}
2392 self.pool._any_cache_cleared = True
2392 except AttributeError:2393 except AttributeError:
2393 pass2394 pass
23942395
23952396
=== modified file 'openerp/service/web_services.py'
--- openerp/service/web_services.py 2012-03-16 16:02:16 +0000
+++ openerp/service/web_services.py 2012-03-23 13:22:20 +0000
@@ -581,8 +581,10 @@
581 raise NameError("Method not available %s" % method)581 raise NameError("Method not available %s" % method)
582 security.check(db,uid,passwd)582 security.check(db,uid,passwd)
583 assert openerp.osv.osv.service, "The object_proxy class must be started with start_object_proxy."583 assert openerp.osv.osv.service, "The object_proxy class must be started with start_object_proxy."
584 openerp.modules.registry.RegistryManager.check_registry_signaling(db)
584 fn = getattr(openerp.osv.osv.service, method)585 fn = getattr(openerp.osv.osv.service, method)
585 res = fn(db, uid, *params)586 res = fn(db, uid, *params)
587 openerp.modules.registry.RegistryManager.signal_caches_change(db)
586 return res588 return res
587589
588590
@@ -662,8 +664,10 @@
662 if method not in ['report', 'report_get', 'render_report']:664 if method not in ['report', 'report_get', 'render_report']:
663 raise KeyError("Method not supported %s" % method)665 raise KeyError("Method not supported %s" % method)
664 security.check(db,uid,passwd)666 security.check(db,uid,passwd)
667 openerp.modules.registry.RegistryManager.check_registry_signaling(db)
665 fn = getattr(self, 'exp_' + method)668 fn = getattr(self, 'exp_' + method)
666 res = fn(db, uid, *params)669 res = fn(db, uid, *params)
670 openerp.modules.registry.RegistryManager.signal_caches_change(db)
667 return res671 return res
668672
669 def exp_render_report(self, db, uid, object, ids, datas=None, context=None):673 def exp_render_report(self, db, uid, object, ids, datas=None, context=None):
670674
=== modified file 'openerp/tools/cache.py'
--- openerp/tools/cache.py 2011-11-22 08:58:48 +0000
+++ openerp/tools/cache.py 2012-03-23 13:22:20 +0000
@@ -57,10 +57,12 @@
57 try:57 try:
58 key = args[self.skiparg-2:]58 key = args[self.skiparg-2:]
59 del d[key]59 del d[key]
60 self2.pool._any_cache_cleared = True
60 except KeyError:61 except KeyError:
61 pass62 pass
62 else:63 else:
63 d.clear()64 d.clear()
65 self2.pool._any_cache_cleared = True
6466
65class ormcache_multi(ormcache):67class ormcache_multi(ormcache):
66 def __init__(self, skiparg=2, size=8192, multi=3):68 def __init__(self, skiparg=2, size=8192, multi=3):
6769
=== modified file 'openerp/wsgi/core.py'
--- openerp/wsgi/core.py 2012-02-21 18:52:47 +0000
+++ openerp/wsgi/core.py 2012-03-23 13:22:20 +0000
@@ -447,7 +447,7 @@
447447
448 The WSGI server can be shutdown with stop_server() below.448 The WSGI server can be shutdown with stop_server() below.
449 """449 """
450 threading.Thread(target=serve).start()450 threading.Thread(name='WSGI server', target=serve).start()
451451
452def stop_server():452def stop_server():
453 """ Initiate the shutdown of the WSGI server.453 """ Initiate the shutdown of the WSGI server.
@@ -465,7 +465,7 @@
465def on_starting(server):465def on_starting(server):
466 global arbiter_pid466 global arbiter_pid
467 arbiter_pid = os.getpid() # TODO check if this is true even after replacing the executable467 arbiter_pid = os.getpid() # TODO check if this is true even after replacing the executable
468 #openerp.tools.cache = kill_workers_cache468 openerp.multi_process = True # Yay!
469 openerp.netsvc.init_logger()469 openerp.netsvc.init_logger()
470 openerp.osv.osv.start_object_proxy()470 openerp.osv.osv.start_object_proxy()
471 openerp.service.web_services.start_web_services()471 openerp.service.web_services.start_web_services()
@@ -482,11 +482,6 @@
482Maybe you forgot to add those addons in your addons_path configuration."""482Maybe you forgot to add those addons in your addons_path configuration."""
483 _logger.exception('Failed to load server-wide module `%s`.%s', m, msg)483 _logger.exception('Failed to load server-wide module `%s`.%s', m, msg)
484484
485# Install our own signal handler on the master process.
486def when_ready(server):
487 # Hijack gunicorn's SIGWINCH handling; we can choose another one.
488 signal.signal(signal.SIGWINCH, make_winch_handler(server))
489
490# Install limits on virtual memory and CPU time consumption.485# Install limits on virtual memory and CPU time consumption.
491def pre_request(worker, req):486def pre_request(worker, req):
492 import os487 import os
@@ -514,30 +509,9 @@
514 'too high, rebooting the worker.')509 'too high, rebooting the worker.')
515 worker.alive = False # Commit suicide after the request.510 worker.alive = False # Commit suicide after the request.
516511
517# Our signal handler will signal a SGIQUIT to all workers.
518def make_winch_handler(server):
519 def handle_winch(sig, fram):
520 server.kill_workers(signal.SIGQUIT) # This is gunicorn specific.
521 return handle_winch
522
523# SIGXCPU (exceeded CPU time) signal handler will raise an exception.512# SIGXCPU (exceeded CPU time) signal handler will raise an exception.
524def time_expired(n, stack):513def time_expired(n, stack):
525 _logger.info('CPU time limit exceeded.')514 _logger.info('CPU time limit exceeded.')
526 raise Exception('CPU time limit exceeded.') # TODO one of openerp.exception515 raise Exception('CPU time limit exceeded.') # TODO one of openerp.exception
527516
528# Kill gracefuly the workers (e.g. because we want to clear their cache).
529# This is done by signaling a SIGWINCH to the master process, so it can be
530# called by the workers themselves.
531def kill_workers():
532 try:
533 os.kill(arbiter_pid, signal.SIGWINCH)
534 except OSError, e:
535 if e.errno == errno.ESRCH: # no such pid
536 return
537 raise
538
539class kill_workers_cache(openerp.tools.ormcache):
540 def clear(self, dbname, *args, **kwargs):
541 kill_workers()
542
543# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:517# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: