Merge lp:~openerp-dev/openobject-server/trunk-gunicorn-signaling-vmt into lp:openobject-server

Proposed by Antony Lesuisse (OpenERP)
Status: Rejected
Rejected by: Vo Minh Thu
Proposed branch: lp:~openerp-dev/openobject-server/trunk-gunicorn-signaling-vmt
Merge into: lp:openobject-server
Diff against target: 352 lines (+138/-34)
10 files modified
gunicorn.conf.py (+0/-1)
openerp/__init__.py (+7/-0)
openerp/addons/base/ir/ir_ui_menu.py (+5/-1)
openerp/addons/base/module/module.py (+2/-0)
openerp/cron.py (+7/-4)
openerp/modules/registry.py (+108/-0)
openerp/osv/orm.py (+1/-0)
openerp/service/web_services.py (+4/-0)
openerp/tools/cache.py (+2/-0)
openerp/wsgi/core.py (+2/-28)
To merge this branch: bzr merge lp:~openerp-dev/openobject-server/trunk-gunicorn-signaling-vmt
Reviewer Review Type Date Requested Status
OpenERP Core Team Pending
Review via email: mp+92150@code.launchpad.net

Description of the change

multiprocess signaling, wip

To post a comment you must log in.
Revision history for this message
Vo Minh Thu (thu) wrote :

We have to add --fixes for this bug report:
https://bugs.launchpad.net/openobject-server/+bug/943794

4018. By Vo Minh Thu

[IMP] multi-process: moved signaling sequences to registry creation instead of base.sql.

4019. By Vo Minh Thu

[MERGE] merged trunk.

4020. By Vo Minh Thu

[FIX] signaling sequences: create them only if they do not exist.

Revision history for this message
Vo Minh Thu (thu) wrote :

Similar to https://code.launchpad.net/~openerp-dev/openobject-server/6.1-gunicorn-signaling-vmt/+merge/95936. Probably we can just forward port that one when it gets merged.

Revision history for this message
Vo Minh Thu (thu) wrote :

Was forward-ported from 6.1.

Unmerged revisions

4020. By Vo Minh Thu

[FIX] signaling sequences: create them only if they do not exist.

4019. By Vo Minh Thu

[MERGE] merged trunk.

4018. By Vo Minh Thu

[IMP] multi-process: moved signaling sequences to registry creation instead of base.sql.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'gunicorn.conf.py'
2--- gunicorn.conf.py 2012-02-10 15:25:21 +0000
3+++ gunicorn.conf.py 2012-03-07 12:45:23 +0000
4@@ -25,7 +25,6 @@
5
6 # Some application-wide initialization is needed.
7 on_starting = openerp.wsgi.core.on_starting
8-when_ready = openerp.wsgi.core.when_ready
9 pre_request = openerp.wsgi.core.pre_request
10 post_request = openerp.wsgi.core.post_request
11
12
13=== modified file 'openerp/__init__.py'
14--- openerp/__init__.py 2011-09-27 16:51:33 +0000
15+++ openerp/__init__.py 2012-03-07 12:45:23 +0000
16@@ -45,5 +45,12 @@
17 import workflow
18 import wsgi
19
20+# Is the server running in multi-process mode (e.g. behind Gunicorn).
21+# If this is True, the processes have to communicate some events,
22+# e.g. database update or cache invalidation. Each process has also
23+# its own copy of the data structure and we don't need to care about
24+# locks between threads.
25+multi_process = False
26+
27 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
28
29
30=== modified file 'openerp/addons/base/ir/ir_ui_menu.py'
31--- openerp/addons/base/ir/ir_ui_menu.py 2012-02-10 08:26:37 +0000
32+++ openerp/addons/base/ir/ir_ui_menu.py 2012-03-07 12:45:23 +0000
33@@ -42,7 +42,7 @@
34
35 def __init__(self, *args, **kwargs):
36 self.cache_lock = threading.RLock()
37- self.clear_cache()
38+ self._cache = {}
39 r = super(ir_ui_menu, self).__init__(*args, **kwargs)
40 self.pool.get('ir.model.access').register_cache_clearing_method(self._name, 'clear_cache')
41 return r
42@@ -50,6 +50,10 @@
43 def clear_cache(self):
44 with self.cache_lock:
45 # radical but this doesn't frequently happen
46+ if self._cache:
47+ # Normally this is done by openerp.tools.ormcache
48+ # but since we do not use it, set it by ourself.
49+ self.pool._any_cache_cleared = True
50 self._cache = {}
51
52 def _filter_visible_menus(self, cr, uid, ids, context=None):
53
54=== modified file 'openerp/addons/base/module/module.py'
55--- openerp/addons/base/module/module.py 2012-02-14 15:18:46 +0000
56+++ openerp/addons/base/module/module.py 2012-03-07 12:45:23 +0000
57@@ -30,6 +30,7 @@
58 import zipfile
59 import zipimport
60
61+import openerp
62 import openerp.modules as addons
63 import pooler
64 import release
65@@ -344,6 +345,7 @@
66 if to_install_ids:
67 self.button_install(cr, uid, to_install_ids, context=context)
68
69+ openerp.modules.registry.RegistryManager.signal_registry_change(cr.dbname)
70 return dict(ACTION_DICT, name=_('Install'))
71
72 def button_immediate_install(self, cr, uid, ids, context=None):
73
74=== modified file 'openerp/cron.py'
75--- openerp/cron.py 2012-01-24 11:07:30 +0000
76+++ openerp/cron.py 2012-03-07 12:45:23 +0000
77@@ -204,9 +204,12 @@
78 _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), "
79 "this may cause trouble if you reach that number of parallel cron tasks.",
80 db_maxconn, _thread_slots)
81- t = threading.Thread(target=runner, name="openerp.cron.master_thread")
82- t.setDaemon(True)
83- t.start()
84- _logger.debug("Master cron daemon started!")
85+ if _thread_slots:
86+ t = threading.Thread(target=runner, name="openerp.cron.master_thread")
87+ t.setDaemon(True)
88+ t.start()
89+ _logger.debug("Master cron daemon started!")
90+ else:
91+ _logger.info("No master cron daemon (0 workers needed).")
92
93 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
94
95=== modified file 'openerp/modules/registry.py'
96--- openerp/modules/registry.py 2012-01-24 12:42:52 +0000
97+++ openerp/modules/registry.py 2012-03-07 12:45:23 +0000
98@@ -51,6 +51,18 @@
99 self.db_name = db_name
100 self.db = openerp.sql_db.db_connect(db_name)
101
102+ # Inter-process signaling (used only when openerp.multi_process is True):
103+ # The `base_registry_signaling` sequence indicates the whole registry
104+ # must be reloaded.
105+ # The `base_cache_signaling sequence` indicates all caches must be
106+ # invalidated (i.e. cleared).
107+ self.base_registry_signaling_sequence = 1
108+ self.base_cache_signaling_sequence = 1
109+
110+ # Flag indicating if at least one model cache has been cleared.
111+ # Useful only in a multi-process context.
112+ self._any_cache_cleared = False
113+
114 cr = self.db.cursor()
115 has_unaccent = openerp.modules.db.has_unaccent(cr)
116 if openerp.tools.config['unaccent'] and not has_unaccent:
117@@ -114,6 +126,36 @@
118 """
119 for model in self.models.itervalues():
120 model.clear_caches()
121+ # Special case for ir_ui_menu which does not use openerp.tools.ormcache.
122+ ir_ui_menu = self.models.get('ir.ui.menu')
123+ if ir_ui_menu:
124+ ir_ui_menu.clear_cache()
125+
126+
127+ # Useful only in a multi-process context.
128+ def reset_any_cache_cleared(self):
129+ self._any_cache_cleared = False
130+
131+ # Useful only in a multi-process context.
132+ def any_cache_cleared(self):
133+ return self._any_cache_cleared
134+
135+ @classmethod
136+ def setup_multi_process_signaling(cls, cr):
137+ if not openerp.multi_process:
138+ return
139+
140+ # Inter-process signaling:
141+ # The `base_registry_signaling` sequence indicates the whole registry
142+ # must be reloaded.
143+ # The `base_cache_signaling sequence` indicates all caches must be
144+ # invalidated (i.e. cleared).
145+ cr.execute("""SELECT sequence_name FROM information_schema.sequences WHERE sequence_name='base_registry_signaling'""")
146+ if not cr.fetchall():
147+ cr.execute("""CREATE SEQUENCE base_registry_signaling INCREMENT BY 1 START WITH 1""")
148+ cr.execute("""SELECT nextval('base_registry_signaling')""")
149+ cr.execute("""CREATE SEQUENCE base_cache_signaling INCREMENT BY 1 START WITH 1""")
150+ cr.execute("""SELECT nextval('base_cache_signaling')""")
151
152 class RegistryManager(object):
153 """ Model registries manager.
154@@ -164,6 +206,7 @@
155
156 cr = registry.db.cursor()
157 try:
158+ Registry.setup_multi_process_signaling(cr)
159 registry.do_parent_store(cr)
160 registry.get('ir.actions.report.xml').register_all(cr)
161 cr.commit()
162@@ -215,5 +258,70 @@
163 if db_name in cls.registries:
164 cls.registries[db_name].clear_caches()
165
166+ @classmethod
167+ def check_registry_signaling(cls, db_name):
168+ if openerp.multi_process and db_name in cls.registries:
169+ # Check if the model registry must be reloaded (e.g. after the
170+ # database has been updated by another process).
171+ registry = cls.get(db_name, pooljobs=False)
172+ cr = registry.db.cursor()
173+ registry_reloaded = False
174+ try:
175+ cr.execute('SELECT last_value FROM base_registry_signaling')
176+ r = cr.fetchone()[0]
177+ if registry.base_registry_signaling_sequence != r:
178+ _logger.info("Reloading the model registry after database signaling.")
179+ # Don't run the cron in the Gunicorn worker.
180+ registry = cls.new(db_name, pooljobs=False)
181+ registry.base_registry_signaling_sequence = r
182+ registry_reloaded = True
183+ finally:
184+ cr.close()
185+
186+ # Check if the model caches must be invalidated (e.g. after a write
187+ # occured on another process). Don't clear right after a registry
188+ # has been reload.
189+ cr = openerp.sql_db.db_connect(db_name).cursor()
190+ try:
191+ cr.execute('SELECT last_value FROM base_cache_signaling')
192+ r = cr.fetchone()[0]
193+ if registry.base_cache_signaling_sequence != r and not registry_reloaded:
194+ _logger.info("Invalidating all model caches after database signaling.")
195+ registry.base_cache_signaling_sequence = r
196+ registry.clear_caches()
197+ registry.reset_any_cache_cleared()
198+ finally:
199+ cr.close()
200+
201+ @classmethod
202+ def signal_caches_change(cls, db_name):
203+ if openerp.multi_process and db_name in cls.registries:
204+ # Check the registries if any cache has been cleared and signal it
205+ # through the database to other processes.
206+ registry = cls.get(db_name, pooljobs=False)
207+ if registry.any_cache_cleared():
208+ _logger.info("At least one model cache has been cleare, signaling through the database.")
209+ cr = registry.db.cursor()
210+ r = 1
211+ try:
212+ cr.execute("select nextval('base_cache_signaling')")
213+ r = cr.fetchone()[0]
214+ finally:
215+ cr.close()
216+ registry.base_cache_signaling_sequence = r
217+ registry.reset_any_cache_cleared()
218+
219+ @classmethod
220+ def signal_registry_change(cls, db_name):
221+ if openerp.multi_process and db_name in cls.registries:
222+ registry = cls.get(db_name, pooljobs=False)
223+ cr = registry.db.cursor()
224+ r = 1
225+ try:
226+ cr.execute("select nextval('base_registry_signaling')")
227+ r = cr.fetchone()[0]
228+ finally:
229+ cr.close()
230+ registry.base_registry_signaling_sequence = r
231
232 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
233
234=== modified file 'openerp/osv/orm.py'
235--- openerp/osv/orm.py 2012-02-15 10:17:14 +0000
236+++ openerp/osv/orm.py 2012-03-07 12:45:23 +0000
237@@ -2389,6 +2389,7 @@
238 try:
239 getattr(self, '_ormcache')
240 self._ormcache = {}
241+ self.pool._any_cache_cleared = True
242 except AttributeError:
243 pass
244
245
246=== modified file 'openerp/service/web_services.py'
247--- openerp/service/web_services.py 2012-02-14 19:30:30 +0000
248+++ openerp/service/web_services.py 2012-03-07 12:45:23 +0000
249@@ -568,8 +568,10 @@
250 raise NameError("Method not available %s" % method)
251 security.check(db,uid,passwd)
252 assert openerp.osv.osv.service, "The object_proxy class must be started with start_object_proxy."
253+ openerp.modules.registry.RegistryManager.check_registry_signaling(db)
254 fn = getattr(openerp.osv.osv.service, method)
255 res = fn(db, uid, *params)
256+ openerp.modules.registry.RegistryManager.signal_caches_change(db)
257 return res
258
259
260@@ -649,8 +651,10 @@
261 if method not in ['report', 'report_get', 'render_report']:
262 raise KeyError("Method not supported %s" % method)
263 security.check(db,uid,passwd)
264+ openerp.modules.registry.RegistryManager.check_registry_signaling(db)
265 fn = getattr(self, 'exp_' + method)
266 res = fn(db, uid, *params)
267+ openerp.modules.registry.RegistryManager.signal_caches_change(db)
268 return res
269
270 def exp_render_report(self, db, uid, object, ids, datas=None, context=None):
271
272=== modified file 'openerp/tools/cache.py'
273--- openerp/tools/cache.py 2011-11-22 08:58:48 +0000
274+++ openerp/tools/cache.py 2012-03-07 12:45:23 +0000
275@@ -57,10 +57,12 @@
276 try:
277 key = args[self.skiparg-2:]
278 del d[key]
279+ self2.pool._any_cache_cleared = True
280 except KeyError:
281 pass
282 else:
283 d.clear()
284+ self2.pool._any_cache_cleared = True
285
286 class ormcache_multi(ormcache):
287 def __init__(self, skiparg=2, size=8192, multi=3):
288
289=== modified file 'openerp/wsgi/core.py'
290--- openerp/wsgi/core.py 2012-02-21 18:54:41 +0000
291+++ openerp/wsgi/core.py 2012-03-07 12:45:23 +0000
292@@ -447,7 +447,7 @@
293
294 The WSGI server can be shutdown with stop_server() below.
295 """
296- threading.Thread(target=serve).start()
297+ threading.Thread(name='WSGI server', target=serve).start()
298
299 def stop_server():
300 """ Initiate the shutdown of the WSGI server.
301@@ -465,7 +465,7 @@
302 def on_starting(server):
303 global arbiter_pid
304 arbiter_pid = os.getpid() # TODO check if this is true even after replacing the executable
305- #openerp.tools.cache = kill_workers_cache
306+ openerp.multi_process = True # Yay!
307 openerp.netsvc.init_logger()
308 openerp.osv.osv.start_object_proxy()
309 openerp.service.web_services.start_web_services()
310@@ -482,11 +482,6 @@
311 Maybe you forgot to add those addons in your addons_path configuration."""
312 _logger.exception('Failed to load server-wide module `%s`.%s', m, msg)
313
314-# Install our own signal handler on the master process.
315-def when_ready(server):
316- # Hijack gunicorn's SIGWINCH handling; we can choose another one.
317- signal.signal(signal.SIGWINCH, make_winch_handler(server))
318-
319 # Install limits on virtual memory and CPU time consumption.
320 def pre_request(worker, req):
321 import os
322@@ -514,30 +509,9 @@
323 'too high, rebooting the worker.')
324 worker.alive = False # Commit suicide after the request.
325
326-# Our signal handler will signal a SGIQUIT to all workers.
327-def make_winch_handler(server):
328- def handle_winch(sig, fram):
329- server.kill_workers(signal.SIGQUIT) # This is gunicorn specific.
330- return handle_winch
331-
332 # SIGXCPU (exceeded CPU time) signal handler will raise an exception.
333 def time_expired(n, stack):
334 _logger.info('CPU time limit exceeded.')
335 raise Exception('CPU time limit exceeded.') # TODO one of openerp.exception
336
337-# Kill gracefuly the workers (e.g. because we want to clear their cache).
338-# This is done by signaling a SIGWINCH to the master process, so it can be
339-# called by the workers themselves.
340-def kill_workers():
341- try:
342- os.kill(arbiter_pid, signal.SIGWINCH)
343- except OSError, e:
344- if e.errno == errno.ESRCH: # no such pid
345- return
346- raise
347-
348-class kill_workers_cache(openerp.tools.ormcache):
349- def clear(self, dbname, *args, **kwargs):
350- kill_workers()
351-
352 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: