Merge lp:~openerp-dev/openobject-server/6.1-gunicorn-signaling-vmt into lp:openobject-server/6.1

Proposed by Vo Minh Thu
Status: Merged
Merged at revision: 4126
Proposed branch: lp:~openerp-dev/openobject-server/6.1-gunicorn-signaling-vmt
Merge into: lp:openobject-server/6.1
Diff against target: 353 lines (+139/-34)
10 files modified
gunicorn.conf.py (+0/-1)
openerp/__init__.py (+7/-0)
openerp/addons/base/ir/ir_ui_menu.py (+5/-1)
openerp/addons/base/module/module.py (+2/-0)
openerp/cron.py (+7/-4)
openerp/modules/registry.py (+109/-0)
openerp/osv/orm.py (+1/-0)
openerp/service/web_services.py (+4/-0)
openerp/tools/cache.py (+2/-0)
openerp/wsgi/core.py (+2/-28)
To merge this branch: bzr merge lp:~openerp-dev/openobject-server/6.1-gunicorn-signaling-vmt
Reviewer Review Type Date Requested Status
Vo Minh Thu (community) Approve
Christophe Simonis (OpenERP) Approve
Review via email: mp+95936@code.launchpad.net
To post a comment you must log in.
4019. By Vo Minh Thu

[IMP] multi-process: moved signaling sequences to registry creation instead of base.sql.

4020. By Vo Minh Thu

[FIX] multi-process signaling: one query instead of two (based on chs idea).

4021. By Vo Minh Thu

[FIX] removed spurious print statement.

4022. By Vo Minh Thu

[FIX] typo.

Revision history for this message
Christophe Simonis (OpenERP) (kangol) :
review: Approve
Revision history for this message
Vo Minh Thu (thu) wrote :

check_registry_signaling should also refresh the fields.float columns, by calling digits_change(), when the decimal_precision module is installed.

review: Needs Fixing
4023. By Vo Minh Thu

[MERGE] merged trunk.

4024. By Vo Minh Thu

[IMP] signaling: call also digits_change() when caches are cleared.

4025. By Vo Minh Thu

[REV] reverted local (and mistakenly commited) changes to gunicorn.conf.py.

Revision history for this message
Vo Minh Thu (thu) wrote :

My latest comment is satisfied now.

Description of the patch:

This adds a simple signaling scheme between OpenERP server instances, but also between processes (managed by, say, gunicorn). The signaling is done through the database by using two PostgreSQL sequences: one for registry invalidation, and one for caches invalidation.

The former case happens when installing a new module, the later when an tools.ormcache is cleared.

The idea is to keep inside each process a value (one per signal) provided by the db. Whenever a registry has been changed, or a cache has been cleared inside a process, the value in database is incremented. Also, whenever a process notices its internal value and the database are out of sync, its registry or caches are reloaded/cleared. Each process tests its internal values against the database at the beginning of every request handling.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'gunicorn.conf.py'
2--- gunicorn.conf.py 2012-02-10 15:25:21 +0000
3+++ gunicorn.conf.py 2012-03-23 13:22:20 +0000
4@@ -25,7 +25,6 @@
5
6 # Some application-wide initialization is needed.
7 on_starting = openerp.wsgi.core.on_starting
8-when_ready = openerp.wsgi.core.when_ready
9 pre_request = openerp.wsgi.core.pre_request
10 post_request = openerp.wsgi.core.post_request
11
12
13=== modified file 'openerp/__init__.py'
14--- openerp/__init__.py 2011-09-27 16:51:33 +0000
15+++ openerp/__init__.py 2012-03-23 13:22:20 +0000
16@@ -45,5 +45,12 @@
17 import workflow
18 import wsgi
19
20+# Is the server running in multi-process mode (e.g. behind Gunicorn).
21+# If this is True, the processes have to communicate some events,
22+# e.g. database update or cache invalidation. Each process has also
23+# its own copy of the data structure and we don't need to care about
24+# locks between threads.
25+multi_process = False
26+
27 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
28
29
30=== modified file 'openerp/addons/base/ir/ir_ui_menu.py'
31--- openerp/addons/base/ir/ir_ui_menu.py 2012-02-10 08:26:37 +0000
32+++ openerp/addons/base/ir/ir_ui_menu.py 2012-03-23 13:22:20 +0000
33@@ -42,7 +42,7 @@
34
35 def __init__(self, *args, **kwargs):
36 self.cache_lock = threading.RLock()
37- self.clear_cache()
38+ self._cache = {}
39 r = super(ir_ui_menu, self).__init__(*args, **kwargs)
40 self.pool.get('ir.model.access').register_cache_clearing_method(self._name, 'clear_cache')
41 return r
42@@ -50,6 +50,10 @@
43 def clear_cache(self):
44 with self.cache_lock:
45 # radical but this doesn't frequently happen
46+ if self._cache:
47+ # Normally this is done by openerp.tools.ormcache
48+ # but since we do not use it, set it by ourself.
49+ self.pool._any_cache_cleared = True
50 self._cache = {}
51
52 def _filter_visible_menus(self, cr, uid, ids, context=None):
53
54=== modified file 'openerp/addons/base/module/module.py'
55--- openerp/addons/base/module/module.py 2012-03-01 01:47:08 +0000
56+++ openerp/addons/base/module/module.py 2012-03-23 13:22:20 +0000
57@@ -30,6 +30,7 @@
58 import zipfile
59 import zipimport
60
61+import openerp
62 import openerp.modules as addons
63 import pooler
64 import release
65@@ -344,6 +345,7 @@
66 if to_install_ids:
67 self.button_install(cr, uid, to_install_ids, context=context)
68
69+ openerp.modules.registry.RegistryManager.signal_registry_change(cr.dbname)
70 return dict(ACTION_DICT, name=_('Install'))
71
72 def button_immediate_install(self, cr, uid, ids, context=None):
73
74=== modified file 'openerp/cron.py'
75--- openerp/cron.py 2012-01-24 11:07:30 +0000
76+++ openerp/cron.py 2012-03-23 13:22:20 +0000
77@@ -204,9 +204,12 @@
78 _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), "
79 "this may cause trouble if you reach that number of parallel cron tasks.",
80 db_maxconn, _thread_slots)
81- t = threading.Thread(target=runner, name="openerp.cron.master_thread")
82- t.setDaemon(True)
83- t.start()
84- _logger.debug("Master cron daemon started!")
85+ if _thread_slots:
86+ t = threading.Thread(target=runner, name="openerp.cron.master_thread")
87+ t.setDaemon(True)
88+ t.start()
89+ _logger.debug("Master cron daemon started!")
90+ else:
91+ _logger.info("No master cron daemon (0 workers needed).")
92
93 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
94
95=== modified file 'openerp/modules/registry.py'
96--- openerp/modules/registry.py 2012-01-24 12:42:52 +0000
97+++ openerp/modules/registry.py 2012-03-23 13:22:20 +0000
98@@ -51,6 +51,18 @@
99 self.db_name = db_name
100 self.db = openerp.sql_db.db_connect(db_name)
101
102+ # Inter-process signaling (used only when openerp.multi_process is True):
103+ # The `base_registry_signaling` sequence indicates the whole registry
104+ # must be reloaded.
105+ # The `base_cache_signaling sequence` indicates all caches must be
106+ # invalidated (i.e. cleared).
107+ self.base_registry_signaling_sequence = 1
108+ self.base_cache_signaling_sequence = 1
109+
110+ # Flag indicating if at least one model cache has been cleared.
111+ # Useful only in a multi-process context.
112+ self._any_cache_cleared = False
113+
114 cr = self.db.cursor()
115 has_unaccent = openerp.modules.db.has_unaccent(cr)
116 if openerp.tools.config['unaccent'] and not has_unaccent:
117@@ -114,6 +126,36 @@
118 """
119 for model in self.models.itervalues():
120 model.clear_caches()
121+ # Special case for ir_ui_menu which does not use openerp.tools.ormcache.
122+ ir_ui_menu = self.models.get('ir.ui.menu')
123+ if ir_ui_menu:
124+ ir_ui_menu.clear_cache()
125+
126+
127+ # Useful only in a multi-process context.
128+ def reset_any_cache_cleared(self):
129+ self._any_cache_cleared = False
130+
131+ # Useful only in a multi-process context.
132+ def any_cache_cleared(self):
133+ return self._any_cache_cleared
134+
135+ @classmethod
136+ def setup_multi_process_signaling(cls, cr):
137+ if not openerp.multi_process:
138+ return
139+
140+ # Inter-process signaling:
141+ # The `base_registry_signaling` sequence indicates the whole registry
142+ # must be reloaded.
143+ # The `base_cache_signaling sequence` indicates all caches must be
144+ # invalidated (i.e. cleared).
145+ cr.execute("""SELECT sequence_name FROM information_schema.sequences WHERE sequence_name='base_registry_signaling'""")
146+ if not cr.fetchall():
147+ cr.execute("""CREATE SEQUENCE base_registry_signaling INCREMENT BY 1 START WITH 1""")
148+ cr.execute("""SELECT nextval('base_registry_signaling')""")
149+ cr.execute("""CREATE SEQUENCE base_cache_signaling INCREMENT BY 1 START WITH 1""")
150+ cr.execute("""SELECT nextval('base_cache_signaling')""")
151
152 class RegistryManager(object):
153 """ Model registries manager.
154@@ -164,6 +206,7 @@
155
156 cr = registry.db.cursor()
157 try:
158+ Registry.setup_multi_process_signaling(cr)
159 registry.do_parent_store(cr)
160 registry.get('ir.actions.report.xml').register_all(cr)
161 cr.commit()
162@@ -215,5 +258,71 @@
163 if db_name in cls.registries:
164 cls.registries[db_name].clear_caches()
165
166+ @classmethod
167+ def check_registry_signaling(cls, db_name):
168+ if openerp.multi_process and db_name in cls.registries:
169+ registry = cls.get(db_name, pooljobs=False)
170+ cr = registry.db.cursor()
171+ try:
172+ cr.execute("""
173+ SELECT base_registry_signaling.last_value,
174+ base_cache_signaling.last_value
175+ FROM base_registry_signaling, base_cache_signaling""")
176+ r, c = cr.fetchone()
177+ # Check if the model registry must be reloaded (e.g. after the
178+ # database has been updated by another process).
179+ if registry.base_registry_signaling_sequence != r:
180+ _logger.info("Reloading the model registry after database signaling.")
181+ # Don't run the cron in the Gunicorn worker.
182+ registry = cls.new(db_name, pooljobs=False)
183+ registry.base_registry_signaling_sequence = r
184+ # Check if the model caches must be invalidated (e.g. after a write
185+ # occured on another process). Don't clear right after a registry
186+ # has been reload.
187+ elif registry.base_cache_signaling_sequence != c:
188+ _logger.info("Invalidating all model caches after database signaling.")
189+ registry.base_cache_signaling_sequence = c
190+ registry.clear_caches()
191+ registry.reset_any_cache_cleared()
192+ # One possible reason caches have been invalidated is the
193+ # use of decimal_precision.write(), in which case we need
194+ # to refresh fields.float columns.
195+ for model in registry.models.values():
196+ for column in model._columns.values():
197+ if hasattr(column, 'digits_change'):
198+ column.digits_change(cr)
199+ finally:
200+ cr.close()
201+
202+ @classmethod
203+ def signal_caches_change(cls, db_name):
204+ if openerp.multi_process and db_name in cls.registries:
205+ # Check the registries if any cache has been cleared and signal it
206+ # through the database to other processes.
207+ registry = cls.get(db_name, pooljobs=False)
208+ if registry.any_cache_cleared():
209+ _logger.info("At least one model cache has been cleared, signaling through the database.")
210+ cr = registry.db.cursor()
211+ r = 1
212+ try:
213+ cr.execute("select nextval('base_cache_signaling')")
214+ r = cr.fetchone()[0]
215+ finally:
216+ cr.close()
217+ registry.base_cache_signaling_sequence = r
218+ registry.reset_any_cache_cleared()
219+
220+ @classmethod
221+ def signal_registry_change(cls, db_name):
222+ if openerp.multi_process and db_name in cls.registries:
223+ registry = cls.get(db_name, pooljobs=False)
224+ cr = registry.db.cursor()
225+ r = 1
226+ try:
227+ cr.execute("select nextval('base_registry_signaling')")
228+ r = cr.fetchone()[0]
229+ finally:
230+ cr.close()
231+ registry.base_registry_signaling_sequence = r
232
233 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
234
235=== modified file 'openerp/osv/orm.py'
236--- openerp/osv/orm.py 2012-03-09 08:19:29 +0000
237+++ openerp/osv/orm.py 2012-03-23 13:22:20 +0000
238@@ -2389,6 +2389,7 @@
239 try:
240 getattr(self, '_ormcache')
241 self._ormcache = {}
242+ self.pool._any_cache_cleared = True
243 except AttributeError:
244 pass
245
246
247=== modified file 'openerp/service/web_services.py'
248--- openerp/service/web_services.py 2012-03-16 16:02:16 +0000
249+++ openerp/service/web_services.py 2012-03-23 13:22:20 +0000
250@@ -581,8 +581,10 @@
251 raise NameError("Method not available %s" % method)
252 security.check(db,uid,passwd)
253 assert openerp.osv.osv.service, "The object_proxy class must be started with start_object_proxy."
254+ openerp.modules.registry.RegistryManager.check_registry_signaling(db)
255 fn = getattr(openerp.osv.osv.service, method)
256 res = fn(db, uid, *params)
257+ openerp.modules.registry.RegistryManager.signal_caches_change(db)
258 return res
259
260
261@@ -662,8 +664,10 @@
262 if method not in ['report', 'report_get', 'render_report']:
263 raise KeyError("Method not supported %s" % method)
264 security.check(db,uid,passwd)
265+ openerp.modules.registry.RegistryManager.check_registry_signaling(db)
266 fn = getattr(self, 'exp_' + method)
267 res = fn(db, uid, *params)
268+ openerp.modules.registry.RegistryManager.signal_caches_change(db)
269 return res
270
271 def exp_render_report(self, db, uid, object, ids, datas=None, context=None):
272
273=== modified file 'openerp/tools/cache.py'
274--- openerp/tools/cache.py 2011-11-22 08:58:48 +0000
275+++ openerp/tools/cache.py 2012-03-23 13:22:20 +0000
276@@ -57,10 +57,12 @@
277 try:
278 key = args[self.skiparg-2:]
279 del d[key]
280+ self2.pool._any_cache_cleared = True
281 except KeyError:
282 pass
283 else:
284 d.clear()
285+ self2.pool._any_cache_cleared = True
286
287 class ormcache_multi(ormcache):
288 def __init__(self, skiparg=2, size=8192, multi=3):
289
290=== modified file 'openerp/wsgi/core.py'
291--- openerp/wsgi/core.py 2012-02-21 18:52:47 +0000
292+++ openerp/wsgi/core.py 2012-03-23 13:22:20 +0000
293@@ -447,7 +447,7 @@
294
295 The WSGI server can be shutdown with stop_server() below.
296 """
297- threading.Thread(target=serve).start()
298+ threading.Thread(name='WSGI server', target=serve).start()
299
300 def stop_server():
301 """ Initiate the shutdown of the WSGI server.
302@@ -465,7 +465,7 @@
303 def on_starting(server):
304 global arbiter_pid
305 arbiter_pid = os.getpid() # TODO check if this is true even after replacing the executable
306- #openerp.tools.cache = kill_workers_cache
307+ openerp.multi_process = True # Yay!
308 openerp.netsvc.init_logger()
309 openerp.osv.osv.start_object_proxy()
310 openerp.service.web_services.start_web_services()
311@@ -482,11 +482,6 @@
312 Maybe you forgot to add those addons in your addons_path configuration."""
313 _logger.exception('Failed to load server-wide module `%s`.%s', m, msg)
314
315-# Install our own signal handler on the master process.
316-def when_ready(server):
317- # Hijack gunicorn's SIGWINCH handling; we can choose another one.
318- signal.signal(signal.SIGWINCH, make_winch_handler(server))
319-
320 # Install limits on virtual memory and CPU time consumption.
321 def pre_request(worker, req):
322 import os
323@@ -514,30 +509,9 @@
324 'too high, rebooting the worker.')
325 worker.alive = False # Commit suicide after the request.
326
327-# Our signal handler will signal a SGIQUIT to all workers.
328-def make_winch_handler(server):
329- def handle_winch(sig, fram):
330- server.kill_workers(signal.SIGQUIT) # This is gunicorn specific.
331- return handle_winch
332-
333 # SIGXCPU (exceeded CPU time) signal handler will raise an exception.
334 def time_expired(n, stack):
335 _logger.info('CPU time limit exceeded.')
336 raise Exception('CPU time limit exceeded.') # TODO one of openerp.exception
337
338-# Kill gracefuly the workers (e.g. because we want to clear their cache).
339-# This is done by signaling a SIGWINCH to the master process, so it can be
340-# called by the workers themselves.
341-def kill_workers():
342- try:
343- os.kill(arbiter_pid, signal.SIGWINCH)
344- except OSError, e:
345- if e.errno == errno.ESRCH: # no such pid
346- return
347- raise
348-
349-class kill_workers_cache(openerp.tools.ormcache):
350- def clear(self, dbname, *args, **kwargs):
351- kill_workers()
352-
353 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: