Merge lp:~david-goetz/swift/replicator_unit_tests into lp:~hudson-openstack/swift/trunk
- replicator_unit_tests
- Merge into trunk
Proposed by
David Goetz
Status: | Merged | ||||||||
---|---|---|---|---|---|---|---|---|---|
Approved by: | clayg | ||||||||
Approved revision: | 115 | ||||||||
Merged at revision: | 128 | ||||||||
Proposed branch: | lp:~david-goetz/swift/replicator_unit_tests | ||||||||
Merge into: | lp:~hudson-openstack/swift/trunk | ||||||||
Diff against target: |
533 lines (+301/-117) 3 files modified
swift/common/utils.py (+0/-1) swift/obj/replicator.py (+37/-30) test/unit/obj/test_replicator.py (+264/-86) |
||||||||
To merge this branch: | bzr merge lp:~david-goetz/swift/replicator_unit_tests | ||||||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
clayg | Approve | ||
Review via email: mp+40230@code.launchpad.net |
Commit message
Description of the change
Adding unit tests for object replicator. Fixing bug in hash_suffix.
To post a comment you must log in.
- 114. By David Goetz
-
changes from code review
- 115. By David Goetz
-
adding temp dir thing
Revision history for this message
clayg (clay-gerrard) : | # |
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'swift/common/utils.py' |
2 | --- swift/common/utils.py 2010-11-01 21:47:48 +0000 |
3 | +++ swift/common/utils.py 2010-11-16 19:24:52 +0000 |
4 | @@ -29,7 +29,6 @@ |
5 | from contextlib import contextmanager |
6 | import ctypes |
7 | import ctypes.util |
8 | -import fcntl |
9 | import struct |
10 | from ConfigParser import ConfigParser, NoSectionError, NoOptionError |
11 | from tempfile import mkstemp |
12 | |
13 | === modified file 'swift/obj/replicator.py' |
14 | --- swift/obj/replicator.py 2010-10-19 01:05:54 +0000 |
15 | +++ swift/obj/replicator.py 2010-11-16 19:24:52 +0000 |
16 | @@ -61,7 +61,7 @@ |
17 | elif files: |
18 | files.sort(reverse=True) |
19 | meta = data = tomb = None |
20 | - for filename in files: |
21 | + for filename in list(files): |
22 | if not meta and filename.endswith('.meta'): |
23 | meta = filename |
24 | if not data and filename.endswith('.data'): |
25 | @@ -232,7 +232,7 @@ |
26 | """ |
27 | Execute the rsync binary to replicate a partition. |
28 | |
29 | - :returns: a tuple of (rsync exit code, rsync standard output) |
30 | + :returns: return code of rsync process. 0 is successful |
31 | """ |
32 | start_time = time.time() |
33 | ret_val = None |
34 | @@ -470,6 +470,40 @@ |
35 | self.kill_coros() |
36 | self.last_replication_count = self.replication_count |
37 | |
38 | + def collect_jobs(self): |
39 | + """ |
40 | + Returns a sorted list of jobs (dictionaries) that specify the |
41 | + partitions, nodes, etc to be rsynced. |
42 | + """ |
43 | + jobs = [] |
44 | + ips = whataremyips() |
45 | + for local_dev in [dev for dev in self.object_ring.devs |
46 | + if dev and dev['ip'] in ips and dev['port'] == self.port]: |
47 | + dev_path = join(self.devices_dir, local_dev['device']) |
48 | + obj_path = join(dev_path, 'objects') |
49 | + tmp_path = join(dev_path, 'tmp') |
50 | + if self.mount_check and not os.path.ismount(dev_path): |
51 | + self.logger.warn('%s is not mounted' % local_dev['device']) |
52 | + continue |
53 | + unlink_older_than(tmp_path, time.time() - self.reclaim_age) |
54 | + if not os.path.exists(obj_path): |
55 | + continue |
56 | + for partition in os.listdir(obj_path): |
57 | + try: |
58 | + nodes = [node for node in |
59 | + self.object_ring.get_part_nodes(int(partition)) |
60 | + if node['id'] != local_dev['id']] |
61 | + jobs.append(dict(path=join(obj_path, partition), |
62 | + nodes=nodes, delete=len(nodes) > 2, |
63 | + partition=partition)) |
64 | + except ValueError: |
65 | + continue |
66 | + random.shuffle(jobs) |
67 | + # Partititons that need to be deleted take priority |
68 | + jobs.sort(key=lambda job: not job['delete']) |
69 | + self.job_count = len(jobs) |
70 | + return jobs |
71 | + |
72 | def replicate(self): |
73 | """Run a replication pass""" |
74 | self.start = time.time() |
75 | @@ -479,38 +513,11 @@ |
76 | self.replication_count = 0 |
77 | self.last_replication_count = -1 |
78 | self.partition_times = [] |
79 | - jobs = [] |
80 | stats = eventlet.spawn(self.heartbeat) |
81 | lockup_detector = eventlet.spawn(self.detect_lockups) |
82 | try: |
83 | - ips = whataremyips() |
84 | self.run_pool = GreenPool(size=self.concurrency) |
85 | - for local_dev in [ |
86 | - dev for dev in self.object_ring.devs |
87 | - if dev and dev['ip'] in ips and dev['port'] == self.port]: |
88 | - dev_path = join(self.devices_dir, local_dev['device']) |
89 | - obj_path = join(dev_path, 'objects') |
90 | - tmp_path = join(dev_path, 'tmp') |
91 | - if self.mount_check and not os.path.ismount(dev_path): |
92 | - self.logger.warn('%s is not mounted' % local_dev['device']) |
93 | - continue |
94 | - unlink_older_than(tmp_path, time.time() - self.reclaim_age) |
95 | - if not os.path.exists(obj_path): |
96 | - continue |
97 | - for partition in os.listdir(obj_path): |
98 | - try: |
99 | - nodes = [node for node in |
100 | - self.object_ring.get_part_nodes(int(partition)) |
101 | - if node['id'] != local_dev['id']] |
102 | - jobs.append(dict(path=join(obj_path, partition), |
103 | - nodes=nodes, delete=len(nodes) > 2, |
104 | - partition=partition)) |
105 | - except ValueError: |
106 | - continue |
107 | - random.shuffle(jobs) |
108 | - # Partititons that need to be deleted take priority |
109 | - jobs.sort(key=lambda job: not job['delete']) |
110 | - self.job_count = len(jobs) |
111 | + jobs = self.collect_jobs() |
112 | for job in jobs: |
113 | if not self.check_ring(): |
114 | self.logger.info( |
115 | |
116 | === modified file 'test/unit/obj/test_replicator.py' |
117 | --- test/unit/obj/test_replicator.py 2010-08-31 23:12:59 +0000 |
118 | +++ test/unit/obj/test_replicator.py 2010-11-16 19:24:52 +0000 |
119 | @@ -22,46 +22,88 @@ |
120 | import cPickle as pickle |
121 | import logging |
122 | import fcntl |
123 | +import time |
124 | +import tempfile |
125 | from contextlib import contextmanager |
126 | +from eventlet import tpool |
127 | |
128 | from eventlet.green import subprocess |
129 | - |
130 | +from swift.common.utils import hash_path, mkdirs, normalize_timestamp |
131 | +from swift.common import ring |
132 | from swift.obj import replicator as object_replicator |
133 | -from swift.common import ring |
134 | +from swift.obj.server import DiskFile |
135 | + |
136 | |
137 | def _ips(): |
138 | - return ['127.0.0.0',] |
139 | + return ['127.0.0.0'] |
140 | object_replicator.whataremyips = _ips |
141 | |
142 | -class NullHandler(logging.Handler): |
143 | - def emit(self, record): |
144 | - pass |
145 | -null_logger = logging.getLogger("testing") |
146 | -null_logger.addHandler(NullHandler()) |
147 | + |
148 | +def mock_http_connect(status): |
149 | + |
150 | + class FakeConn(object): |
151 | + |
152 | + def __init__(self, status, *args, **kwargs): |
153 | + self.status = status |
154 | + self.reason = 'Fake' |
155 | + self.host = args[0] |
156 | + self.port = args[1] |
157 | + self.method = args[4] |
158 | + self.path = args[5] |
159 | + self.with_exc = False |
160 | + self.headers = kwargs.get('headers', {}) |
161 | + |
162 | + def getresponse(self): |
163 | + if self.with_exc: |
164 | + raise Exception('test') |
165 | + return self |
166 | + |
167 | + def getheader(self, header): |
168 | + return self.headers[header] |
169 | + |
170 | + def read(self, amt=None): |
171 | + return pickle.dumps({}) |
172 | + |
173 | + def close(self): |
174 | + return |
175 | + return lambda *args, **kwargs: FakeConn(status, *args, **kwargs) |
176 | + |
177 | +process_errors = [] |
178 | + |
179 | |
180 | class MockProcess(object): |
181 | ret_code = None |
182 | ret_log = None |
183 | + check_args = None |
184 | |
185 | class Stream(object): |
186 | + |
187 | def read(self): |
188 | return MockProcess.ret_log.next() |
189 | |
190 | def __init__(self, *args, **kwargs): |
191 | + targs = MockProcess.check_args.next() |
192 | + for targ in targs: |
193 | + if targ not in args[0]: |
194 | + process_errors.append("Invalid: %s not in %s" % (targ, |
195 | + args)) |
196 | self.stdout = self.Stream() |
197 | |
198 | def wait(self): |
199 | return self.ret_code.next() |
200 | |
201 | + |
202 | @contextmanager |
203 | def _mock_process(ret): |
204 | orig_process = subprocess.Popen |
205 | MockProcess.ret_code = (i[0] for i in ret) |
206 | MockProcess.ret_log = (i[1] for i in ret) |
207 | + MockProcess.check_args = (i[2] for i in ret) |
208 | object_replicator.subprocess.Popen = MockProcess |
209 | yield |
210 | object_replicator.subprocess.Popen = orig_process |
211 | |
212 | + |
213 | def _create_test_ring(path): |
214 | testgz = os.path.join(path, 'object.ring.gz') |
215 | intended_replica2part2dev_id = [ |
216 | @@ -90,7 +132,7 @@ |
217 | |
218 | def setUp(self): |
219 | # Setup a test ring (stolen from common/test_ring.py) |
220 | - self.testdir = os.path.join('/dev/shm', 'test_replicator') |
221 | + self.testdir = tempfile.mkdtemp() |
222 | self.devices = os.path.join(self.testdir, 'node') |
223 | rmtree(self.testdir, ignore_errors=1) |
224 | os.mkdir(self.testdir) |
225 | @@ -98,7 +140,9 @@ |
226 | os.mkdir(os.path.join(self.devices, 'sda')) |
227 | self.objects = os.path.join(self.devices, 'sda', 'objects') |
228 | os.mkdir(self.objects) |
229 | - for part in ['0','1','2', '3']: |
230 | + self.parts = {} |
231 | + for part in ['0', '1', '2', '3']: |
232 | + self.parts[part] = os.path.join(self.objects, part) |
233 | os.mkdir(os.path.join(self.objects, part)) |
234 | self.ring = _create_test_ring(self.testdir) |
235 | self.conf = dict( |
236 | @@ -107,87 +151,221 @@ |
237 | self.replicator = object_replicator.ObjectReplicator( |
238 | self.conf) |
239 | |
240 | -# def test_check_ring(self): |
241 | -# self.replicator.collect_jobs('sda', 0, self.ring) |
242 | -# self.assertTrue(self.replicator.check_ring()) |
243 | -# orig_check = self.replicator.next_check |
244 | -# self.replicator.next_check = orig_check - 30 |
245 | -# self.assertTrue(self.replicator.check_ring()) |
246 | -# self.replicator.next_check = orig_check |
247 | -# orig_ring_time = self.replicator.object_ring._mtime |
248 | -# self.replicator.object_ring._mtime = orig_ring_time - 30 |
249 | -# self.assertTrue(self.replicator.check_ring()) |
250 | -# self.replicator.next_check = orig_check - 30 |
251 | -# self.assertFalse(self.replicator.check_ring()) |
252 | -# |
253 | -# def test_collect_jobs(self): |
254 | -# self.replicator.collect_jobs('sda', 0, self.ring) |
255 | -# self.assertTrue('1' in self.replicator.parts_to_delete) |
256 | -# self.assertEquals( |
257 | -# [node['id'] for node in self.replicator.partitions['0']['nodes']], |
258 | -# [1,2]) |
259 | -# self.assertEquals( |
260 | -# [node['id'] for node in self.replicator.partitions['1']['nodes']], |
261 | -# [1,2,3]) |
262 | -# self.assertEquals( |
263 | -# [node['id'] for node in self.replicator.partitions['2']['nodes']], |
264 | -# [2,3]) |
265 | -# self.assertEquals( |
266 | -# [node['id'] for node in self.replicator.partitions['3']['nodes']], |
267 | -# [3,1]) |
268 | -# for part in ['0', '1', '2', '3']: |
269 | -# self.assertEquals(self.replicator.partitions[part]['device'], 'sda') |
270 | -# self.assertEquals(self.replicator.partitions[part]['path'], |
271 | -# self.objects) |
272 | -# |
273 | -# def test_delete_partition(self): |
274 | -# self.replicator.collect_jobs('sda', 0, self.ring) |
275 | -# part_path = os.path.join(self.objects, '1') |
276 | -# self.assertTrue(os.access(part_path, os.F_OK)) |
277 | -# self.replicator.delete_partition('1') |
278 | -# self.assertFalse(os.access(part_path, os.F_OK)) |
279 | -# |
280 | -# def test_rsync(self): |
281 | -# self.replicator.collect_jobs('sda', 0, self.ring) |
282 | -# with _mock_process([(0,''), (0,''), (0,'')]): |
283 | -# self.replicator.rsync('0') |
284 | -# |
285 | -# def test_rsync_delete_no(self): |
286 | -# self.replicator.collect_jobs('sda', 0, self.ring) |
287 | -# with _mock_process([(-1, "stuff in log"), (-1, "stuff in log"), |
288 | -# (0,''), (0,'')]): |
289 | -# self.replicator.rsync('1') |
290 | -# self.assertEquals(self.replicator.parts_to_delete['1'], |
291 | -# [False, True, True]) |
292 | -# |
293 | -# def test_rsync_delete_yes(self): |
294 | -# self.replicator.collect_jobs('sda', 0, self.ring) |
295 | -# with _mock_process([(0,''), (0,''), (0,'')]): |
296 | -# self.replicator.rsync('1') |
297 | -# self.assertEquals(self.replicator.parts_to_delete['1'], |
298 | -# [True, True, True]) |
299 | -# |
300 | -# def test_rsync_delete_yes_with_failure(self): |
301 | -# self.replicator.collect_jobs('sda', 0, self.ring) |
302 | -# with _mock_process([(-1, "stuff in log"), (0, ''), (0,''), (0,'')]): |
303 | -# self.replicator.rsync('1') |
304 | -# self.assertEquals(self.replicator.parts_to_delete['1'], |
305 | -# [True, True, True]) |
306 | -# |
307 | -# def test_rsync_failed_drive(self): |
308 | -# self.replicator.collect_jobs('sda', 0, self.ring) |
309 | -# with _mock_process([(12,'There was an error in file IO'), |
310 | -# (0,''), (0,''), (0,'')]): |
311 | -# self.replicator.rsync('1') |
312 | -# self.assertEquals(self.replicator.parts_to_delete['1'], |
313 | -# [True, True, True]) |
314 | + def tearDown(self): |
315 | + process_errors = [] |
316 | + rmtree(self.testdir, ignore_errors=1) |
317 | + |
318 | + def test_run_once(self): |
319 | + replicator = object_replicator.ObjectReplicator( |
320 | + dict(swift_dir=self.testdir, devices=self.devices, |
321 | + mount_check='false', timeout='300', stats_interval='1')) |
322 | + was_connector = object_replicator.http_connect |
323 | + object_replicator.http_connect = mock_http_connect(200) |
324 | + cur_part = '0' |
325 | + df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') |
326 | + mkdirs(df.datadir) |
327 | + f = open(os.path.join(df.datadir, |
328 | + normalize_timestamp(time.time()) + '.data'), |
329 | + 'wb') |
330 | + f.write('1234567890') |
331 | + f.close() |
332 | + ohash = hash_path('a', 'c', 'o') |
333 | + data_dir = ohash[-3:] |
334 | + whole_path_from = os.path.join(self.objects, cur_part, data_dir) |
335 | + process_arg_checker = [] |
336 | + nodes = [node for node in |
337 | + self.ring.get_part_nodes(int(cur_part)) \ |
338 | + if node['ip'] not in _ips()] |
339 | + for node in nodes: |
340 | + rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], cur_part) |
341 | + process_arg_checker.append( |
342 | + (0, '', ['rsync', whole_path_from, rsync_mod])) |
343 | + with _mock_process(process_arg_checker): |
344 | + replicator.run_once() |
345 | + self.assertFalse(process_errors) |
346 | + |
347 | + object_replicator.http_connect = was_connector |
348 | + |
349 | + def test_hash_suffix_one_file(self): |
350 | + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') |
351 | + mkdirs(df.datadir) |
352 | + f = open(os.path.join(df.datadir, |
353 | + normalize_timestamp(time.time() - 100) + '.ts'), |
354 | + 'wb') |
355 | + f.write('1234567890') |
356 | + f.close() |
357 | + ohash = hash_path('a', 'c', 'o') |
358 | + data_dir = ohash[-3:] |
359 | + whole_path_from = os.path.join(self.objects, '0', data_dir) |
360 | + object_replicator.hash_suffix(whole_path_from, 101) |
361 | + self.assertEquals(len(os.listdir(self.parts['0'])), 1) |
362 | + |
363 | + object_replicator.hash_suffix(whole_path_from, 99) |
364 | + self.assertEquals(len(os.listdir(self.parts['0'])), 0) |
365 | + |
366 | + def test_hash_suffix_multi_file_one(self): |
367 | + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') |
368 | + mkdirs(df.datadir) |
369 | + for tdiff in [1, 50, 100, 500]: |
370 | + for suff in ['.meta', '.data', '.ts']: |
371 | + f = open(os.path.join(df.datadir, |
372 | + normalize_timestamp(int(time.time()) - tdiff) + suff), |
373 | + 'wb') |
374 | + f.write('1234567890') |
375 | + f.close() |
376 | + |
377 | + ohash = hash_path('a', 'c', 'o') |
378 | + data_dir = ohash[-3:] |
379 | + whole_path_from = os.path.join(self.objects, '0', data_dir) |
380 | + hsh_path = os.listdir(whole_path_from)[0] |
381 | + whole_hsh_path = os.path.join(whole_path_from, hsh_path) |
382 | + |
383 | + object_replicator.hash_suffix(whole_path_from, 99) |
384 | + # only the tombstone should be left |
385 | + self.assertEquals(len(os.listdir(whole_hsh_path)), 1) |
386 | + |
387 | + def test_hash_suffix_multi_file_two(self): |
388 | + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') |
389 | + mkdirs(df.datadir) |
390 | + for tdiff in [1, 50, 100, 500]: |
391 | + suffs = ['.meta', '.data'] |
392 | + if tdiff > 50: |
393 | + suffs.append('.ts') |
394 | + for suff in suffs: |
395 | + f = open(os.path.join(df.datadir, |
396 | + normalize_timestamp(int(time.time()) - tdiff) + suff), |
397 | + 'wb') |
398 | + f.write('1234567890') |
399 | + f.close() |
400 | + |
401 | + ohash = hash_path('a', 'c', 'o') |
402 | + data_dir = ohash[-3:] |
403 | + whole_path_from = os.path.join(self.objects, '0', data_dir) |
404 | + hsh_path = os.listdir(whole_path_from)[0] |
405 | + whole_hsh_path = os.path.join(whole_path_from, hsh_path) |
406 | + |
407 | + object_replicator.hash_suffix(whole_path_from, 99) |
408 | + # only the meta and data should be left |
409 | + self.assertEquals(len(os.listdir(whole_hsh_path)), 2) |
410 | + |
411 | + def test_invalidate_hash(self): |
412 | + |
413 | + def assertFileData(file_path, data): |
414 | + with open(file_path, 'r') as fp: |
415 | + fdata = fp.read() |
416 | + self.assertEquals(fdata, data) |
417 | + |
418 | + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') |
419 | + mkdirs(df.datadir) |
420 | + ohash = hash_path('a', 'c', 'o') |
421 | + data_dir = ohash[-3:] |
422 | + whole_path_from = os.path.join(self.objects, '0', data_dir) |
423 | + hashes_file = os.path.join(self.objects, '0', |
424 | + object_replicator.HASH_FILE) |
425 | + # test that non existant file except caught |
426 | + self.assertEquals(object_replicator.invalidate_hash(whole_path_from), |
427 | + None) |
428 | + # test that hashes get cleared |
429 | + check_pickle_data = pickle.dumps({data_dir: None}, |
430 | + object_replicator.PICKLE_PROTOCOL) |
431 | + for data_hash in [{data_dir: None}, {data_dir: 'abcdefg'}]: |
432 | + with open(hashes_file, 'wb') as fp: |
433 | + pickle.dump(data_hash, fp, object_replicator.PICKLE_PROTOCOL) |
434 | + object_replicator.invalidate_hash(whole_path_from) |
435 | + assertFileData(hashes_file, check_pickle_data) |
436 | + |
437 | + def test_check_ring(self): |
438 | + self.assertTrue(self.replicator.check_ring()) |
439 | + orig_check = self.replicator.next_check |
440 | + self.replicator.next_check = orig_check - 30 |
441 | + self.assertTrue(self.replicator.check_ring()) |
442 | + self.replicator.next_check = orig_check |
443 | + orig_ring_time = self.replicator.object_ring._mtime |
444 | + self.replicator.object_ring._mtime = orig_ring_time - 30 |
445 | + self.assertTrue(self.replicator.check_ring()) |
446 | + self.replicator.next_check = orig_check - 30 |
447 | + self.assertFalse(self.replicator.check_ring()) |
448 | + |
449 | + def test_collect_jobs(self): |
450 | + jobs = self.replicator.collect_jobs() |
451 | + jobs_to_delete = [j for j in jobs if j['delete']] |
452 | + jobs_to_keep = [j for j in jobs if not j['delete']] |
453 | + jobs_by_part = {} |
454 | + for job in jobs: |
455 | + jobs_by_part[job['partition']] = job |
456 | + self.assertEquals(len(jobs_to_delete), 1) |
457 | + self.assertTrue('1', jobs_to_delete[0]['partition']) |
458 | + self.assertEquals( |
459 | + [node['id'] for node in jobs_by_part['0']['nodes']], [1, 2]) |
460 | + self.assertEquals( |
461 | + [node['id'] for node in jobs_by_part['1']['nodes']], [1, 2, 3]) |
462 | + self.assertEquals( |
463 | + [node['id'] for node in jobs_by_part['2']['nodes']], [2, 3]) |
464 | + self.assertEquals( |
465 | + [node['id'] for node in jobs_by_part['3']['nodes']], [3, 1]) |
466 | + for part in ['0', '1', '2', '3']: |
467 | + for node in jobs_by_part[part]['nodes']: |
468 | + self.assertEquals(node['device'], 'sda') |
469 | + self.assertEquals(jobs_by_part[part]['path'], |
470 | + os.path.join(self.objects, part)) |
471 | + |
472 | + def test_delete_partition(self): |
473 | + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') |
474 | + mkdirs(df.datadir) |
475 | + ohash = hash_path('a', 'c', 'o') |
476 | + data_dir = ohash[-3:] |
477 | + part_path = os.path.join(self.objects, '1') |
478 | + self.assertTrue(os.access(part_path, os.F_OK)) |
479 | + self.replicator.replicate() |
480 | + self.assertFalse(os.access(part_path, os.F_OK)) |
481 | + |
482 | + def test_run_once_recover_from_failure(self): |
483 | + replicator = object_replicator.ObjectReplicator( |
484 | + dict(swift_dir=self.testdir, devices=self.devices, |
485 | + mount_check='false', timeout='300', stats_interval='1')) |
486 | + was_connector = object_replicator.http_connect |
487 | + object_replicator.http_connect = mock_http_connect(200) |
488 | + # Write some files into '1' and run replicate- they should be moved |
489 | + # to the other partitoins and then node should get deleted. |
490 | + cur_part = '1' |
491 | + df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') |
492 | + mkdirs(df.datadir) |
493 | + f = open(os.path.join(df.datadir, |
494 | + normalize_timestamp(time.time()) + '.data'), |
495 | + 'wb') |
496 | + f.write('1234567890') |
497 | + f.close() |
498 | + ohash = hash_path('a', 'c', 'o') |
499 | + data_dir = ohash[-3:] |
500 | + whole_path_from = os.path.join(self.objects, cur_part, data_dir) |
501 | + process_arg_checker = [] |
502 | + nodes = [node for node in |
503 | + self.ring.get_part_nodes(int(cur_part)) \ |
504 | + if node['ip'] not in _ips()] |
505 | + for node in nodes: |
506 | + rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], cur_part) |
507 | + process_arg_checker.append( |
508 | + (0, '', ['rsync', whole_path_from, rsync_mod])) |
509 | + self.assertTrue(os.access(os.path.join(self.objects, |
510 | + '1', data_dir, ohash), |
511 | + os.F_OK)) |
512 | + with _mock_process(process_arg_checker): |
513 | + replicator.run_once() |
514 | + self.assertFalse(process_errors) |
515 | + for i, result in [('0', True), ('1', False), |
516 | + ('2', True), ('3', True)]: |
517 | + self.assertEquals(os.access( |
518 | + os.path.join(self.objects, |
519 | + i, object_replicator.HASH_FILE), |
520 | + os.F_OK), result) |
521 | + object_replicator.http_connect = was_connector |
522 | |
523 | def test_run(self): |
524 | - with _mock_process([(0,'')]*100): |
525 | + with _mock_process([(0, '')] * 100): |
526 | self.replicator.replicate() |
527 | |
528 | def test_run_withlog(self): |
529 | - with _mock_process([(0,"stuff in log")]*100): |
530 | + with _mock_process([(0, "stuff in log")] * 100): |
531 | self.replicator.replicate() |
532 | |
533 | if __name__ == '__main__': |
There was a similar bug in proxy (lp:651598) - at the time gholt suggested that duplicating the iterable by calling out to list() was more-readable than creating a slice. I caved.
collect_jobs is a good re-factoring, could probably use a doc string tho. And since you're already peping test_replicator .py:TestObjectR eplicator. test_run* - do you want to go ahead and fix the "missing whitespace around operator" on 380 & 384?
Do you think it would be safer to scope process_errors as a class variable on MockProcess instead of an inferred global? Is the null_logger/ NullHandler stuff used/needed? Do you think it the extra portability of using tempfile.mkdtemp instead of "/dev/shm" would be worth the overhead? I don't *think* mac's have a "/dev/shm"...
test_rsync doesn't have an assert?
Ok, that's it from me...