Merge lp:~openerp-dev/openobject-server/6.1-gunicorn-signaling-vmt into lp:openobject-server/6.1
- 6.1-gunicorn-signaling-vmt
- Merge into 6.1
Status: | Merged |
---|---|
Merged at revision: | 4126 |
Proposed branch: | lp:~openerp-dev/openobject-server/6.1-gunicorn-signaling-vmt |
Merge into: | lp:openobject-server/6.1 |
Diff against target: |
353 lines (+139/-34) 10 files modified
gunicorn.conf.py (+0/-1) openerp/__init__.py (+7/-0) openerp/addons/base/ir/ir_ui_menu.py (+5/-1) openerp/addons/base/module/module.py (+2/-0) openerp/cron.py (+7/-4) openerp/modules/registry.py (+109/-0) openerp/osv/orm.py (+1/-0) openerp/service/web_services.py (+4/-0) openerp/tools/cache.py (+2/-0) openerp/wsgi/core.py (+2/-28) |
To merge this branch: | bzr merge lp:~openerp-dev/openobject-server/6.1-gunicorn-signaling-vmt |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Vo Minh Thu (community) | Approve | ||
Christophe Simonis (OpenERP) | Approve | ||
Review via email: mp+95936@code.launchpad.net |
Commit message
Description of the change
- 4019. By Vo Minh Thu
-
[IMP] multi-process: moved signaling sequences to registry creation instead of base.sql.
- 4020. By Vo Minh Thu
-
[FIX] multi-process signaling: one query instead of two (based on chs idea).
- 4021. By Vo Minh Thu
-
[FIX] removed spurious print statement.
- 4022. By Vo Minh Thu
-
[FIX] typo.
Christophe Simonis (OpenERP) (kangol) : | # |
- 4023. By Vo Minh Thu
-
[MERGE] merged trunk.
- 4024. By Vo Minh Thu
-
[IMP] signaling: call also digits_change() when caches are cleared.
- 4025. By Vo Minh Thu
-
[REV] reverted local (and mistakenly commited) changes to gunicorn.conf.py.
Vo Minh Thu (thu) wrote : | # |
My latest comment is satisfied now.
Description of the patch:
This adds a simple signaling scheme between OpenERP server instances, but also between processes (managed by, say, gunicorn). The signaling is done through the database by using two PostgreSQL sequences: one for registry invalidation, and one for caches invalidation.
The former case happens when installing a new module, the later when an tools.ormcache is cleared.
The idea is to keep inside each process a value (one per signal) provided by the db. Whenever a registry has been changed, or a cache has been cleared inside a process, the value in database is incremented. Also, whenever a process notices its internal value and the database are out of sync, its registry or caches are reloaded/cleared. Each process tests its internal values against the database at the beginning of every request handling.
Preview Diff
1 | === modified file 'gunicorn.conf.py' |
2 | --- gunicorn.conf.py 2012-02-10 15:25:21 +0000 |
3 | +++ gunicorn.conf.py 2012-03-23 13:22:20 +0000 |
4 | @@ -25,7 +25,6 @@ |
5 | |
6 | # Some application-wide initialization is needed. |
7 | on_starting = openerp.wsgi.core.on_starting |
8 | -when_ready = openerp.wsgi.core.when_ready |
9 | pre_request = openerp.wsgi.core.pre_request |
10 | post_request = openerp.wsgi.core.post_request |
11 | |
12 | |
13 | === modified file 'openerp/__init__.py' |
14 | --- openerp/__init__.py 2011-09-27 16:51:33 +0000 |
15 | +++ openerp/__init__.py 2012-03-23 13:22:20 +0000 |
16 | @@ -45,5 +45,12 @@ |
17 | import workflow |
18 | import wsgi |
19 | |
20 | +# Is the server running in multi-process mode (e.g. behind Gunicorn). |
21 | +# If this is True, the processes have to communicate some events, |
22 | +# e.g. database update or cache invalidation. Each process has also |
23 | +# its own copy of the data structure and we don't need to care about |
24 | +# locks between threads. |
25 | +multi_process = False |
26 | + |
27 | # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: |
28 | |
29 | |
30 | === modified file 'openerp/addons/base/ir/ir_ui_menu.py' |
31 | --- openerp/addons/base/ir/ir_ui_menu.py 2012-02-10 08:26:37 +0000 |
32 | +++ openerp/addons/base/ir/ir_ui_menu.py 2012-03-23 13:22:20 +0000 |
33 | @@ -42,7 +42,7 @@ |
34 | |
35 | def __init__(self, *args, **kwargs): |
36 | self.cache_lock = threading.RLock() |
37 | - self.clear_cache() |
38 | + self._cache = {} |
39 | r = super(ir_ui_menu, self).__init__(*args, **kwargs) |
40 | self.pool.get('ir.model.access').register_cache_clearing_method(self._name, 'clear_cache') |
41 | return r |
42 | @@ -50,6 +50,10 @@ |
43 | def clear_cache(self): |
44 | with self.cache_lock: |
45 | # radical but this doesn't frequently happen |
46 | + if self._cache: |
47 | + # Normally this is done by openerp.tools.ormcache |
48 | + # but since we do not use it, set it by ourself. |
49 | + self.pool._any_cache_cleared = True |
50 | self._cache = {} |
51 | |
52 | def _filter_visible_menus(self, cr, uid, ids, context=None): |
53 | |
54 | === modified file 'openerp/addons/base/module/module.py' |
55 | --- openerp/addons/base/module/module.py 2012-03-01 01:47:08 +0000 |
56 | +++ openerp/addons/base/module/module.py 2012-03-23 13:22:20 +0000 |
57 | @@ -30,6 +30,7 @@ |
58 | import zipfile |
59 | import zipimport |
60 | |
61 | +import openerp |
62 | import openerp.modules as addons |
63 | import pooler |
64 | import release |
65 | @@ -344,6 +345,7 @@ |
66 | if to_install_ids: |
67 | self.button_install(cr, uid, to_install_ids, context=context) |
68 | |
69 | + openerp.modules.registry.RegistryManager.signal_registry_change(cr.dbname) |
70 | return dict(ACTION_DICT, name=_('Install')) |
71 | |
72 | def button_immediate_install(self, cr, uid, ids, context=None): |
73 | |
74 | === modified file 'openerp/cron.py' |
75 | --- openerp/cron.py 2012-01-24 11:07:30 +0000 |
76 | +++ openerp/cron.py 2012-03-23 13:22:20 +0000 |
77 | @@ -204,9 +204,12 @@ |
78 | _logger.warning("Connection pool size (%s) is set lower than max number of cron threads (%s), " |
79 | "this may cause trouble if you reach that number of parallel cron tasks.", |
80 | db_maxconn, _thread_slots) |
81 | - t = threading.Thread(target=runner, name="openerp.cron.master_thread") |
82 | - t.setDaemon(True) |
83 | - t.start() |
84 | - _logger.debug("Master cron daemon started!") |
85 | + if _thread_slots: |
86 | + t = threading.Thread(target=runner, name="openerp.cron.master_thread") |
87 | + t.setDaemon(True) |
88 | + t.start() |
89 | + _logger.debug("Master cron daemon started!") |
90 | + else: |
91 | + _logger.info("No master cron daemon (0 workers needed).") |
92 | |
93 | # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: |
94 | |
95 | === modified file 'openerp/modules/registry.py' |
96 | --- openerp/modules/registry.py 2012-01-24 12:42:52 +0000 |
97 | +++ openerp/modules/registry.py 2012-03-23 13:22:20 +0000 |
98 | @@ -51,6 +51,18 @@ |
99 | self.db_name = db_name |
100 | self.db = openerp.sql_db.db_connect(db_name) |
101 | |
102 | + # Inter-process signaling (used only when openerp.multi_process is True): |
103 | + # The `base_registry_signaling` sequence indicates the whole registry |
104 | + # must be reloaded. |
105 | + # The `base_cache_signaling sequence` indicates all caches must be |
106 | + # invalidated (i.e. cleared). |
107 | + self.base_registry_signaling_sequence = 1 |
108 | + self.base_cache_signaling_sequence = 1 |
109 | + |
110 | + # Flag indicating if at least one model cache has been cleared. |
111 | + # Useful only in a multi-process context. |
112 | + self._any_cache_cleared = False |
113 | + |
114 | cr = self.db.cursor() |
115 | has_unaccent = openerp.modules.db.has_unaccent(cr) |
116 | if openerp.tools.config['unaccent'] and not has_unaccent: |
117 | @@ -114,6 +126,36 @@ |
118 | """ |
119 | for model in self.models.itervalues(): |
120 | model.clear_caches() |
121 | + # Special case for ir_ui_menu which does not use openerp.tools.ormcache. |
122 | + ir_ui_menu = self.models.get('ir.ui.menu') |
123 | + if ir_ui_menu: |
124 | + ir_ui_menu.clear_cache() |
125 | + |
126 | + |
127 | + # Useful only in a multi-process context. |
128 | + def reset_any_cache_cleared(self): |
129 | + self._any_cache_cleared = False |
130 | + |
131 | + # Useful only in a multi-process context. |
132 | + def any_cache_cleared(self): |
133 | + return self._any_cache_cleared |
134 | + |
135 | + @classmethod |
136 | + def setup_multi_process_signaling(cls, cr): |
137 | + if not openerp.multi_process: |
138 | + return |
139 | + |
140 | + # Inter-process signaling: |
141 | + # The `base_registry_signaling` sequence indicates the whole registry |
142 | + # must be reloaded. |
143 | + # The `base_cache_signaling sequence` indicates all caches must be |
144 | + # invalidated (i.e. cleared). |
145 | + cr.execute("""SELECT sequence_name FROM information_schema.sequences WHERE sequence_name='base_registry_signaling'""") |
146 | + if not cr.fetchall(): |
147 | + cr.execute("""CREATE SEQUENCE base_registry_signaling INCREMENT BY 1 START WITH 1""") |
148 | + cr.execute("""SELECT nextval('base_registry_signaling')""") |
149 | + cr.execute("""CREATE SEQUENCE base_cache_signaling INCREMENT BY 1 START WITH 1""") |
150 | + cr.execute("""SELECT nextval('base_cache_signaling')""") |
151 | |
152 | class RegistryManager(object): |
153 | """ Model registries manager. |
154 | @@ -164,6 +206,7 @@ |
155 | |
156 | cr = registry.db.cursor() |
157 | try: |
158 | + Registry.setup_multi_process_signaling(cr) |
159 | registry.do_parent_store(cr) |
160 | registry.get('ir.actions.report.xml').register_all(cr) |
161 | cr.commit() |
162 | @@ -215,5 +258,71 @@ |
163 | if db_name in cls.registries: |
164 | cls.registries[db_name].clear_caches() |
165 | |
166 | + @classmethod |
167 | + def check_registry_signaling(cls, db_name): |
168 | + if openerp.multi_process and db_name in cls.registries: |
169 | + registry = cls.get(db_name, pooljobs=False) |
170 | + cr = registry.db.cursor() |
171 | + try: |
172 | + cr.execute(""" |
173 | + SELECT base_registry_signaling.last_value, |
174 | + base_cache_signaling.last_value |
175 | + FROM base_registry_signaling, base_cache_signaling""") |
176 | + r, c = cr.fetchone() |
177 | + # Check if the model registry must be reloaded (e.g. after the |
178 | + # database has been updated by another process). |
179 | + if registry.base_registry_signaling_sequence != r: |
180 | + _logger.info("Reloading the model registry after database signaling.") |
181 | + # Don't run the cron in the Gunicorn worker. |
182 | + registry = cls.new(db_name, pooljobs=False) |
183 | + registry.base_registry_signaling_sequence = r |
184 | + # Check if the model caches must be invalidated (e.g. after a write |
185 | + # occured on another process). Don't clear right after a registry |
186 | + # has been reload. |
187 | + elif registry.base_cache_signaling_sequence != c: |
188 | + _logger.info("Invalidating all model caches after database signaling.") |
189 | + registry.base_cache_signaling_sequence = c |
190 | + registry.clear_caches() |
191 | + registry.reset_any_cache_cleared() |
192 | + # One possible reason caches have been invalidated is the |
193 | + # use of decimal_precision.write(), in which case we need |
194 | + # to refresh fields.float columns. |
195 | + for model in registry.models.values(): |
196 | + for column in model._columns.values(): |
197 | + if hasattr(column, 'digits_change'): |
198 | + column.digits_change(cr) |
199 | + finally: |
200 | + cr.close() |
201 | + |
202 | + @classmethod |
203 | + def signal_caches_change(cls, db_name): |
204 | + if openerp.multi_process and db_name in cls.registries: |
205 | + # Check the registries if any cache has been cleared and signal it |
206 | + # through the database to other processes. |
207 | + registry = cls.get(db_name, pooljobs=False) |
208 | + if registry.any_cache_cleared(): |
209 | + _logger.info("At least one model cache has been cleared, signaling through the database.") |
210 | + cr = registry.db.cursor() |
211 | + r = 1 |
212 | + try: |
213 | + cr.execute("select nextval('base_cache_signaling')") |
214 | + r = cr.fetchone()[0] |
215 | + finally: |
216 | + cr.close() |
217 | + registry.base_cache_signaling_sequence = r |
218 | + registry.reset_any_cache_cleared() |
219 | + |
220 | + @classmethod |
221 | + def signal_registry_change(cls, db_name): |
222 | + if openerp.multi_process and db_name in cls.registries: |
223 | + registry = cls.get(db_name, pooljobs=False) |
224 | + cr = registry.db.cursor() |
225 | + r = 1 |
226 | + try: |
227 | + cr.execute("select nextval('base_registry_signaling')") |
228 | + r = cr.fetchone()[0] |
229 | + finally: |
230 | + cr.close() |
231 | + registry.base_registry_signaling_sequence = r |
232 | |
233 | # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: |
234 | |
235 | === modified file 'openerp/osv/orm.py' |
236 | --- openerp/osv/orm.py 2012-03-09 08:19:29 +0000 |
237 | +++ openerp/osv/orm.py 2012-03-23 13:22:20 +0000 |
238 | @@ -2389,6 +2389,7 @@ |
239 | try: |
240 | getattr(self, '_ormcache') |
241 | self._ormcache = {} |
242 | + self.pool._any_cache_cleared = True |
243 | except AttributeError: |
244 | pass |
245 | |
246 | |
247 | === modified file 'openerp/service/web_services.py' |
248 | --- openerp/service/web_services.py 2012-03-16 16:02:16 +0000 |
249 | +++ openerp/service/web_services.py 2012-03-23 13:22:20 +0000 |
250 | @@ -581,8 +581,10 @@ |
251 | raise NameError("Method not available %s" % method) |
252 | security.check(db,uid,passwd) |
253 | assert openerp.osv.osv.service, "The object_proxy class must be started with start_object_proxy." |
254 | + openerp.modules.registry.RegistryManager.check_registry_signaling(db) |
255 | fn = getattr(openerp.osv.osv.service, method) |
256 | res = fn(db, uid, *params) |
257 | + openerp.modules.registry.RegistryManager.signal_caches_change(db) |
258 | return res |
259 | |
260 | |
261 | @@ -662,8 +664,10 @@ |
262 | if method not in ['report', 'report_get', 'render_report']: |
263 | raise KeyError("Method not supported %s" % method) |
264 | security.check(db,uid,passwd) |
265 | + openerp.modules.registry.RegistryManager.check_registry_signaling(db) |
266 | fn = getattr(self, 'exp_' + method) |
267 | res = fn(db, uid, *params) |
268 | + openerp.modules.registry.RegistryManager.signal_caches_change(db) |
269 | return res |
270 | |
271 | def exp_render_report(self, db, uid, object, ids, datas=None, context=None): |
272 | |
273 | === modified file 'openerp/tools/cache.py' |
274 | --- openerp/tools/cache.py 2011-11-22 08:58:48 +0000 |
275 | +++ openerp/tools/cache.py 2012-03-23 13:22:20 +0000 |
276 | @@ -57,10 +57,12 @@ |
277 | try: |
278 | key = args[self.skiparg-2:] |
279 | del d[key] |
280 | + self2.pool._any_cache_cleared = True |
281 | except KeyError: |
282 | pass |
283 | else: |
284 | d.clear() |
285 | + self2.pool._any_cache_cleared = True |
286 | |
287 | class ormcache_multi(ormcache): |
288 | def __init__(self, skiparg=2, size=8192, multi=3): |
289 | |
290 | === modified file 'openerp/wsgi/core.py' |
291 | --- openerp/wsgi/core.py 2012-02-21 18:52:47 +0000 |
292 | +++ openerp/wsgi/core.py 2012-03-23 13:22:20 +0000 |
293 | @@ -447,7 +447,7 @@ |
294 | |
295 | The WSGI server can be shutdown with stop_server() below. |
296 | """ |
297 | - threading.Thread(target=serve).start() |
298 | + threading.Thread(name='WSGI server', target=serve).start() |
299 | |
300 | def stop_server(): |
301 | """ Initiate the shutdown of the WSGI server. |
302 | @@ -465,7 +465,7 @@ |
303 | def on_starting(server): |
304 | global arbiter_pid |
305 | arbiter_pid = os.getpid() # TODO check if this is true even after replacing the executable |
306 | - #openerp.tools.cache = kill_workers_cache |
307 | + openerp.multi_process = True # Yay! |
308 | openerp.netsvc.init_logger() |
309 | openerp.osv.osv.start_object_proxy() |
310 | openerp.service.web_services.start_web_services() |
311 | @@ -482,11 +482,6 @@ |
312 | Maybe you forgot to add those addons in your addons_path configuration.""" |
313 | _logger.exception('Failed to load server-wide module `%s`.%s', m, msg) |
314 | |
315 | -# Install our own signal handler on the master process. |
316 | -def when_ready(server): |
317 | - # Hijack gunicorn's SIGWINCH handling; we can choose another one. |
318 | - signal.signal(signal.SIGWINCH, make_winch_handler(server)) |
319 | - |
320 | # Install limits on virtual memory and CPU time consumption. |
321 | def pre_request(worker, req): |
322 | import os |
323 | @@ -514,30 +509,9 @@ |
324 | 'too high, rebooting the worker.') |
325 | worker.alive = False # Commit suicide after the request. |
326 | |
327 | -# Our signal handler will signal a SGIQUIT to all workers. |
328 | -def make_winch_handler(server): |
329 | - def handle_winch(sig, fram): |
330 | - server.kill_workers(signal.SIGQUIT) # This is gunicorn specific. |
331 | - return handle_winch |
332 | - |
333 | # SIGXCPU (exceeded CPU time) signal handler will raise an exception. |
334 | def time_expired(n, stack): |
335 | _logger.info('CPU time limit exceeded.') |
336 | raise Exception('CPU time limit exceeded.') # TODO one of openerp.exception |
337 | |
338 | -# Kill gracefuly the workers (e.g. because we want to clear their cache). |
339 | -# This is done by signaling a SIGWINCH to the master process, so it can be |
340 | -# called by the workers themselves. |
341 | -def kill_workers(): |
342 | - try: |
343 | - os.kill(arbiter_pid, signal.SIGWINCH) |
344 | - except OSError, e: |
345 | - if e.errno == errno.ESRCH: # no such pid |
346 | - return |
347 | - raise |
348 | - |
349 | -class kill_workers_cache(openerp.tools.ormcache): |
350 | - def clear(self, dbname, *args, **kwargs): |
351 | - kill_workers() |
352 | - |
353 | # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: |
check_registry_ signaling should also refresh the fields.float columns, by calling digits_change(), when the decimal_precision module is installed.