Merge lp:~lifeless/python-oops-datedir-repo/less-rsync into lp:python-oops-datedir-repo

Proposed by Robert Collins
Status: Merged
Merged at revision: 20
Proposed branch: lp:~lifeless/python-oops-datedir-repo/less-rsync
Merge into: lp:python-oops-datedir-repo
Diff against target: 232 lines (+154/-3)
5 files modified
NEWS (+12/-0)
oops_datedir_repo/__init__.py (+1/-1)
oops_datedir_repo/repository.py (+62/-1)
oops_datedir_repo/tests/test_repository.py (+78/-0)
setup.py (+1/-1)
To merge this branch: bzr merge lp:~lifeless/python-oops-datedir-repo/less-rsync
Reviewer Review Type Date Requested Status
Steve Kowalik (community) code Approve
Review via email: mp+80861@code.launchpad.net

Commit message

This adds necessary support to have the datedir repo on each server scanner and uploaded to amqp (or any other publisher).

Description of the change

This adds necessary support to have the datedir repo on each server scanner and uploaded to amqp (or any other publisher) in the future. The glue code for this will go in a separate project to avoid inappropriate dependencies between the repo and the amqp publisher.

To post a comment you must log in.
Revision history for this message
Steve Kowalik (stevenk) :
review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'NEWS'
2--- NEWS 2011-10-10 02:50:20 +0000
3+++ NEWS 2011-11-01 02:13:23 +0000
4@@ -6,6 +6,18 @@
5 NEXT
6 ----
7
8+0.0.10
9+------
10+
11+* New files are written to $name.tmp and then renamed to $name, allowing
12+ readers to detect whether the file was finished or not.
13+ (Robert Collins)
14+
15+* DateDirRepo.republish(publisher) can be used to treat a DateDirRepo as the
16+ source of reports for feeding into a different publisher. This will remove
17+ reports that are successfully republished.
18+ (Robert Collins, #884551)
19+
20 0.0.9
21 -----
22
23
24=== modified file 'oops_datedir_repo/__init__.py'
25--- oops_datedir_repo/__init__.py 2011-10-10 02:51:42 +0000
26+++ oops_datedir_repo/__init__.py 2011-11-01 02:13:23 +0000
27@@ -25,7 +25,7 @@
28 # established at this point, and setup.py will use a version of next-$(revno).
29 # If the releaselevel is 'final', then the tarball will be major.minor.micro.
30 # Otherwise it is major.minor.micro~$(revno).
31-__version__ = (0, 0, 9, 'beta', 0)
32+__version__ = (0, 0, 10, 'beta', 0)
33
34 __all__ = [
35 'DateDirRepo',
36
37=== modified file 'oops_datedir_repo/repository.py'
38--- oops_datedir_repo/repository.py 2011-10-13 02:14:59 +0000
39+++ oops_datedir_repo/repository.py 2011-11-01 02:13:23 +0000
40@@ -23,12 +23,14 @@
41 ]
42
43 import datetime
44+from functools import partial
45 from hashlib import md5
46 import os.path
47 import stat
48
49 from pytz import utc
50
51+import serializer
52 import serializer_bson
53 import serializer_rfc822
54 from uniquefileallocator import UniqueFileAllocator
55@@ -78,6 +80,10 @@
56 def publish(self, report, now=None):
57 """Write the report to disk.
58
59+ The report is written to a temporary file, and then renamed to its
60+ final location. Programs concurrently reading from a DateDirRepo
61+ should ignore files ending in .tmp.
62+
63 :param now: The datetime to use as the current time. Will be
64 determined if not supplied. Useful for testing.
65 """
66@@ -100,7 +106,8 @@
67 if self.inherit_id:
68 oopsid = report.get('id') or oopsid
69 report['id'] = oopsid
70- self.serializer.write(report, open(filename, 'wb'))
71+ self.serializer.write(report, open(filename + '.tmp', 'wb'))
72+ os.rename(filename + '.tmp', filename)
73 if self.stash_path:
74 original_report['datedir_repo_filepath'] = filename
75 # Set file permission to: rw-r--r-- (so that reports from
76@@ -110,3 +117,57 @@
77 stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH)
78 os.chmod(filename, wanted_permission)
79 return report['id']
80+
81+ def republish(self, publisher):
82+ """Republish the contents of the DateDirRepo to another publisher.
83+
84+ This makes it easy to treat a DateDirRepo as a backing store in message
85+ queue environments: if the message queue is down, flush to the
86+ DateDirRepo, then later pick the OOPSes up and send them to the message
87+ queue environment.
88+
89+ For instance:
90+
91+ >>> repo = DateDirRepo('.')
92+ >>> repo.publish({'some':'report'})
93+ >>> queue = []
94+ >>> def queue_publisher(report):
95+ ... queue.append(report)
96+ ... return report['id']
97+ >>> repo.republish(queue_publisher)
98+
99+ Will scan the disk and send the single found report to queue_publisher,
100+ deleting the report afterwards.
101+
102+ Empty datedir directories are automatically cleaned up, as are stale
103+ .tmp files.
104+
105+ If the publisher returns None, signalling that it did not publish the
106+ report, then the report is not deleted from disk.
107+ """
108+ two_days = datetime.timedelta(2)
109+ now = datetime.date.today()
110+ old = now - two_days
111+ for dirname in os.listdir(self.root):
112+ try:
113+ y, m, d = dirname.split('-')
114+ except ValueError:
115+ # Not a datedir
116+ continue
117+ date = datetime.date(int(y),int(m),int(d))
118+ prune = date < old
119+ dirpath = os.path.join(self.root, dirname)
120+ files = os.listdir(dirpath)
121+ if not files and prune:
122+ # Cleanup no longer needed directory.
123+ os.rmdir(dirpath)
124+ for candidate in map(partial(os.path.join, dirpath), files):
125+ if candidate.endswith('.tmp'):
126+ if prune:
127+ os.unlink(candidate)
128+ continue
129+ with file(candidate, 'rb') as report_file:
130+ report = serializer.read(report_file)
131+ oopsid = publisher(report)
132+ if oopsid:
133+ os.unlink(candidate)
134
135=== modified file 'oops_datedir_repo/tests/test_repository.py'
136--- oops_datedir_repo/tests/test_repository.py 2011-10-13 02:14:59 +0000
137+++ oops_datedir_repo/tests/test_repository.py 2011-11-01 02:13:23 +0000
138@@ -157,3 +157,81 @@
139 with open(expected_path, 'rb') as fp:
140 self.assertEqual(expected_disk_report, bson.loads(fp.read()))
141
142+ def test_republish_not_published(self):
143+ # If an OOPS being republished is not republished, it is preserved on
144+ # disk.
145+ repo = DateDirRepo(self.useFixture(TempDir()).path)
146+ now = datetime.datetime(2006, 04, 01, 00, 30, 00, tzinfo=utc)
147+ report = {'time': now}
148+ repo.publish(report, now)
149+ dir = repo.root + '/2006-04-01/'
150+ files = os.listdir(dir)
151+ expected_path = dir + files[0]
152+ oopses = []
153+ # append() returns None
154+ publisher = oopses.append
155+ repo.republish(publisher)
156+ self.assertTrue(os.path.isfile(expected_path))
157+ self.assertEqual(1, len(oopses))
158+
159+ def test_republish_ignores_current_dot_tmp_files(self):
160+ # .tmp files are in-progress writes and not to be touched.
161+ repo = DateDirRepo(self.useFixture(TempDir()).path, stash_path=True)
162+ report = {}
163+ repo.publish(report)
164+ finished_path = report['datedir_repo_filepath']
165+ inprogress_path = finished_path + '.tmp'
166+ # Move the file to a temp path, simulating an in-progress write.
167+ os.rename(finished_path, inprogress_path)
168+ oopses = []
169+ publisher = oopses.append
170+ repo.republish(publisher)
171+ self.assertTrue(os.path.isfile(inprogress_path))
172+ self.assertEqual([], oopses)
173+
174+ def test_republish_republishes_and_removes(self):
175+ # When a report is republished it is then removed from disk.
176+ repo = DateDirRepo(self.useFixture(TempDir()).path, stash_path=True)
177+ report = {}
178+ repo.publish(report)
179+ finished_path = report['datedir_repo_filepath']
180+ oopses = []
181+ def publish(report):
182+ oopses.append(report)
183+ return report['id']
184+ repo.republish(publish)
185+ self.assertFalse(os.path.isfile(finished_path))
186+ self.assertEqual(1, len(oopses))
187+
188+ def test_republish_cleans_empty_old_directories(self):
189+ # An empty old datedir directory cannot get new reports in it, so gets
190+ # cleaned up to keep the worker efficient.
191+ repo = DateDirRepo(self.useFixture(TempDir()).path)
192+ os.mkdir(repo.root + '/2006-04-12')
193+ repo.republish([].append)
194+ self.assertFalse(os.path.exists(repo.root + '/2006-04-12'))
195+
196+ def test_republish_removes_old_dot_tmp_files(self):
197+ # A .tmp file more than 24 hours old is probably never going to get
198+ # renamed into place, so we just unlink it.
199+ repo = DateDirRepo(self.useFixture(TempDir()).path)
200+ now = datetime.datetime(2006, 04, 01, 00, 30, 00, tzinfo=utc)
201+ report = {'time': now}
202+ repo.publish(report, now)
203+ dir = repo.root + '/2006-04-01/'
204+ files = os.listdir(dir)
205+ finished_path = dir + files[0]
206+ inprogress_path = finished_path + '.tmp'
207+ os.rename(finished_path, inprogress_path)
208+ oopses = []
209+ publisher = oopses.append
210+ repo.republish(publisher)
211+ self.assertFalse(os.path.isfile(inprogress_path))
212+ self.assertEqual([], oopses)
213+
214+ def test_republish_no_error_non_datedir(self):
215+ # The present of a non datedir directory in a datedir repo doesn't
216+ # break things.
217+ repo = DateDirRepo(self.useFixture(TempDir()).path)
218+ os.mkdir(repo.root + '/foo')
219+ repo.republish([].append)
220
221=== modified file 'setup.py'
222--- setup.py 2011-10-10 02:51:42 +0000
223+++ setup.py 2011-11-01 02:13:23 +0000
224@@ -22,7 +22,7 @@
225 description = file(os.path.join(os.path.dirname(__file__), 'README'), 'rb').read()
226
227 setup(name="oops_datedir_repo",
228- version="0.0.9",
229+ version="0.0.10",
230 description="OOPS disk serialisation and repository management.",
231 long_description=description,
232 maintainer="Launchpad Developers",

Subscribers

People subscribed via source and target branches

to all changes: