Merge lp:~kangol/openobject-server/5.0-monocron into lp:openobject-server/5.0

Proposed by Christophe Simonis (OpenERP)
Status: Merged
Merged at revision: 2123
Proposed branch: lp:~kangol/openobject-server/5.0-monocron
Merge into: lp:openobject-server/5.0
Diff against target: 98 lines (+53/-18)
1 file modified
bin/netsvc.py (+53/-18)
To merge this branch: bzr merge lp:~kangol/openobject-server/5.0-monocron
Reviewer Review Type Date Requested Status
Olivier Dony (Odoo) Approve
Stephane Wirtel (OpenERP) Pending
Review via email: mp+35649@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Stephane Wirtel (OpenERP) (stephane-openerp) wrote :

Functional: Seems to be good for me

Technical: I think you shoud use some explicit names, because dt, fn, etc... are not readable.

dt -> datetime
ts -> timestamp
fn -> function
ct -> current_thread
etc...

2124. By Christophe Simonis (OpenERP)

[IMP] netsvc.Agent: use better variable names

2125. By Christophe Simonis (OpenERP)

[IMP] netsvc.Agent: add doctrings

Revision history for this message
Olivier Dony (Odoo) (odo-openerp) wrote :

Note: a pass of spellchecking for the documentation could be useful ;-)

review: Approve
2126. By Maxime Glorieux

[IMP] netsvc.Agent: correct docstrings

2127. By Christophe Simonis (OpenERP)

[FIX] netsvc.Agent: remove tasks from __tasks_by_db

2128. By Christophe Simonis (OpenERP)

[FIX] netsvc.Agent: be nice

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'bin/netsvc.py'
2--- bin/netsvc.py 2010-09-03 16:38:14 +0000
3+++ bin/netsvc.py 2010-09-20 11:07:49 +0000
4@@ -39,6 +39,7 @@
5 import xmlrpclib
6 import release
7 from pprint import pformat
8+import heapq
9
10 SERVICES = {}
11 GROUPS = {}
12@@ -201,34 +202,68 @@
13 init_logger()
14
15 class Agent(object):
16- _timers = {}
17+ """Singleton that keeps track of cancellable tasks to run at a given
18+ timestamp.
19+ The tasks are caracterised by:
20+ * a timestamp
21+ * the database on which the task run
22+ * the function to call
23+ * the arguments and keyword arguments to pass to the function
24+
25+ Implementation details:
26+ Tasks are stored as list, allowing the cancellation by setting
27+ the timestamp to 0.
28+ A heapq is used to store tasks, so we don't need to sort
29+ tasks ourself.
30+ """
31+ __tasks = []
32+ __tasks_by_db = {}
33 _logger = Logger()
34
35- def setAlarm(self, fn, dt, db_name, *args, **kwargs):
36- wait = dt - time.time()
37- if wait > 0:
38- self._logger.notifyChannel('timers', LOG_DEBUG, "Job scheduled in %s seconds for %s.%s" % (wait, fn.im_class.__name__, fn.func_name))
39- timer = threading.Timer(wait, fn, args, kwargs)
40- timer.start()
41- self._timers.setdefault(db_name, []).append(timer)
42-
43- for db in self._timers:
44- for timer in self._timers[db]:
45- if not timer.isAlive():
46- self._timers[db].remove(timer)
47+ @classmethod
48+ def setAlarm(cls, function, timestamp, db_name, *args, **kwargs):
49+ task = [timestamp, db_name, function, args, kwargs]
50+ heapq.heappush(cls.__tasks, task)
51+ cls.__tasks_by_db.setdefault(db_name, []).append(task)
52
53 @classmethod
54 def cancel(cls, db_name):
55- """Cancel all timers for a given database. If None passed, all timers are cancelled"""
56- for db in cls._timers:
57- if db_name is None or db == db_name:
58- for timer in cls._timers[db]:
59- timer.cancel()
60+ """Cancel all tasks for a given database. If None is passed, all tasks are cancelled"""
61+ if db_name is None:
62+ cls.__tasks, cls.__tasks_by_db = [], {}
63+ else:
64+ if db_name in cls.__tasks_by_db:
65+ for task in cls.__tasks_by_db[db_name]:
66+ task[0] = 0
67
68 @classmethod
69 def quit(cls):
70 cls.cancel(None)
71
72+ @classmethod
73+ def runner(cls):
74+ """Neverending function (intended to be ran in a dedicated thread) that
75+ checks every 60 seconds tasks to run.
76+ """
77+ current_thread = threading.currentThread()
78+ while True:
79+ while cls.__tasks and cls.__tasks[0][0] < time.time():
80+ task = heapq.heappop(cls.__tasks)
81+ timestamp, dbname, function, args, kwargs = task
82+ cls.__tasks_by_db[dbname].remove(task)
83+ if not timestamp:
84+ # null timestamp -> cancelled task
85+ continue
86+ current_thread.dbname = dbname # hack hack
87+ cls._logger.notifyChannel('timers', LOG_DEBUG, "Run %s.%s(*%r, **%r)" % (function.im_class.__name__, function.func_name, args, kwargs))
88+ delattr(current_thread, 'dbname')
89+ threading.Thread(target=function, args=args, kwargs=kwargs).start()
90+ time.sleep(1)
91+ time.sleep(60)
92+
93+threading.Thread(target=Agent.runner).start()
94+
95+
96 import traceback
97
98 class xmlrpc(object):