Merge lp:~verterok/ubuntuone-client/fix-721859 into lp:ubuntuone-client

Proposed by Guillermo Gonzalez
Status: Merged
Approved by: Guillermo Gonzalez
Approved revision: 886
Merged at revision: 888
Proposed branch: lp:~verterok/ubuntuone-client/fix-721859
Merge into: lp:ubuntuone-client
Diff against target: 164 lines (+84/-3)
2 files modified
tests/syncdaemon/test_tritcask.py (+68/-0)
ubuntuone/syncdaemon/tritcask.py (+16/-3)
To merge this branch: bzr merge lp:~verterok/ubuntuone-client/fix-721859
Reviewer Review Type Date Requested Status
Facundo Batista (community) Approve
Lucio Torre (community) Approve
Review via email: mp+50830@code.launchpad.net

Commit message

Fix tritcask to handle broken header in the data file.

Description of the change

Fxi tritcask to handle broken header in the data file (live and/or immutable), if the file with the bad data is the "live" file, it's marked as immutable and a new "live" file is created.

To post a comment you must log in.
Revision history for this message
Lucio Torre (lucio.torre) wrote :

+1

review: Approve
Revision history for this message
Facundo Batista (facundo) wrote :

+1

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'tests/syncdaemon/test_tritcask.py'
2--- tests/syncdaemon/test_tritcask.py 2011-02-07 19:10:31 +0000
3+++ tests/syncdaemon/test_tritcask.py 2011-02-22 23:48:58 +0000
4@@ -38,6 +38,7 @@
5 INACTIVE,
6 DEAD,
7 BadCrc,
8+ BadHeader,
9 DataFile,
10 TempDataFile,
11 DeadDataFile,
12@@ -166,6 +167,28 @@
13 self.assertTrue(db.live_file.has_bad_crc, 'has_bad_crc should be True.')
14 db.shutdown()
15
16+ def test_iter_entries_bad_header_unpack(self):
17+ """Test that unpack error during iter_entries is the same as EOF."""
18+ db = Tritcask(self.base_dir)
19+ for i in range(10):
20+ db.put(i, 'foo%d' % (i,), 'bar%s' % (i,))
21+ # truncate the file at the start of the header of the last value.
22+ curr_pos = db.live_file.fd.tell()
23+ db.live_file.fd.seek(curr_pos - header_size+4)
24+ db.live_file.fd.truncate()
25+ entries = []
26+ for i, entry in enumerate(db.live_file.iter_entries()):
27+ entries.append(entry)
28+ self.assertEqual(entry[4], i)
29+ self.assertEqual(entry[5], 'foo%d' % (i,))
30+ self.assertEqual(entry[6], 'bar%d' % (i,))
31+ self.assertEqual(9, len(entries))
32+ self.assertTrue(db.live_file.has_bad_data, 'has_bad_data should be True.')
33+ self.assertTrue(self.memento.check_warning('Found corrupted header on'))
34+ self.assertTrue(self.memento.check_warning(
35+ 'the rest of the file will be ignored.'))
36+ db.shutdown()
37+
38 def test__getitem__(self):
39 """Test our slicing support."""
40 data_file = self.file_class(self.base_dir)
41@@ -247,6 +270,23 @@
42 with contextlib.closing(fmap):
43 self.assertRaises(BadCrc, data_file.read, fmap, 0)
44
45+ def test_read_bad_header(self):
46+ """Test for read method with a bad header/unpack error."""
47+ data_file = self.file_class(self.base_dir)
48+ orig_tstamp, _, _ = data_file.write(0, 'foo', 'bar')
49+ # mess with the data on disk to make this entry crc32 invalid
50+ # seek to the end of crc32+header+key
51+ data_file.close()
52+ with open(data_file.filename, 'r+b') as fd:
53+ fd.read(crc32_size+4)
54+ fd.truncate()
55+ # write a different value -> random bytes
56+ fd.write(os.urandom(header_size/2))
57+ fd.flush()
58+ fmap = mmap.mmap(fd.fileno(), 0, access=mmap.ACCESS_READ)
59+ with contextlib.closing(fmap):
60+ self.assertRaises(BadHeader, data_file.read, fmap, 0)
61+
62
63 class TempDataFileTest(DataFileTest):
64 """Tests for TempDataFile."""
65@@ -394,6 +434,33 @@
66 'the rest of the file will be ignored.'))
67 db.shutdown()
68
69+ def test_iter_entries_bad_header_unpack(self):
70+ """Test that unpack error during iter_entries is the same as EOF."""
71+ db = Tritcask(self.base_dir)
72+ for i in range(10):
73+ db.put(i, 'foo%d' % (i,), 'bar%s' % (i,))
74+ # truncate the file at the start of the header of the last value.
75+ db.live_file.fd.seek(crc32_size+header_size+8+crc32_size+4)
76+ db.live_file.fd.truncate()
77+ file_id = db.live_file.file_id
78+ db.rotate()
79+ db.shutdown()
80+ db = Tritcask(self.base_dir)
81+ entries = []
82+ for i, entry in enumerate(db._immutable[file_id].iter_entries()):
83+ entries.append(entry)
84+ self.assertEqual(entry[4], i)
85+ self.assertEqual(entry[5], 'foo%d' % (i,))
86+ self.assertEqual(entry[6], 'bar%d' % (i,))
87+ self.assertEqual(1, len(entries))
88+ self.assertTrue(db._immutable[file_id].has_bad_data,
89+ 'has_bad_data should be True.')
90+ self.assertTrue(self.memento.check_warning('Found corrupted header on'))
91+ self.assertTrue(self.memento.check_warning(
92+ 'the rest of the file will be ignored.'))
93+
94+ db.shutdown()
95+
96 def test__getitem__(self):
97 """Test our slicing support."""
98 rw_file = DataFile(self.base_dir)
99@@ -461,6 +528,7 @@
100 self.assertRaises(NotImplementedError, dead_file.read)
101
102 test_read_bad_crc = test_read
103+ test_read_bad_header = test_read
104
105 def test__open(self):
106 """Test that always fails with NotImplementedError."""
107
108=== modified file 'ubuntuone/syncdaemon/tritcask.py'
109--- ubuntuone/syncdaemon/tritcask.py 2011-02-07 19:10:31 +0000
110+++ ubuntuone/syncdaemon/tritcask.py 2011-02-22 23:48:58 +0000
111@@ -70,6 +70,9 @@
112 """A Exception for Bad CRC32."""
113
114
115+class BadHeader(Exception):
116+ """A Exception for Bad header value."""
117+
118 TritcaskEntry = namedtuple('TritcaskEntry', ['crc32', 'tstamp', 'key_sz',
119 'value_sz', 'row_type', 'key', 'value', 'value_pos'])
120
121@@ -144,6 +147,7 @@
122 If filename is None a new file is created.
123 """
124 self.has_bad_crc = False
125+ self.has_bad_data = False
126 if filename is None:
127 filename = self._get_next_file_id() + LIVE + FILE_SUFFIX
128 self.filename = os.path.join(base_path, filename)
129@@ -213,6 +217,12 @@
130 'the rest of the file will be ignored.',
131 self.file_id, current_pos)
132 raise StopIteration
133+ except BadHeader:
134+ self.has_bad_data = True
135+ logger.warning('Found corrupted header on %s at position: %s, '
136+ 'the rest of the file will be ignored.',
137+ self.file_id, current_pos)
138+ raise StopIteration
139
140 def __getitem__(self, item):
141 """__getitem__ to support slicing and *only* slicing."""
142@@ -249,8 +259,11 @@
143 if header == '' or crc32_bytes == '':
144 # reached EOF
145 raise EOFError
146- crc32 = crc32_struct.unpack(crc32_bytes)[0]
147- tstamp, key_sz, value_sz, row_type = header_struct.unpack(header)
148+ try:
149+ crc32 = crc32_struct.unpack(crc32_bytes)[0]
150+ tstamp, key_sz, value_sz, row_type = header_struct.unpack(header)
151+ except struct.error, e:
152+ raise BadHeader(e)
153 key = fmmap[current_pos:current_pos+key_sz]
154 current_pos += key_sz
155 value_pos = current_pos
156@@ -586,7 +599,7 @@
157 return False
158 # check if the file is marked with a bad crc
159 # as we shouldn't keep adding data to a broken file.
160- if self.live_file.has_bad_crc:
161+ if self.live_file.has_bad_crc or self.live_file.has_bad_data:
162 return True
163 # now check the default rotation policy
164 try:

Subscribers

People subscribed via source and target branches