Merge lp:~david-goetz/swift/replicator_unit_tests into lp:~hudson-openstack/swift/trunk

Proposed by David Goetz
Status: Merged
Approved by: clayg
Approved revision: 115
Merged at revision: 128
Proposed branch: lp:~david-goetz/swift/replicator_unit_tests
Merge into: lp:~hudson-openstack/swift/trunk
Diff against target: 533 lines (+301/-117)
3 files modified
swift/common/utils.py (+0/-1)
swift/obj/replicator.py (+37/-30)
test/unit/obj/test_replicator.py (+264/-86)
To merge this branch: bzr merge lp:~david-goetz/swift/replicator_unit_tests
Reviewer Review Type Date Requested Status
clayg Approve
Review via email: mp+40230@code.launchpad.net

Description of the change

Adding unit tests for object replicator. Fixing bug in hash_suffix.

To post a comment you must log in.
Revision history for this message
clayg (clay-gerrard) wrote :

There was a similar bug in proxy (lp:651598) - at the time gholt suggested that duplicating the iterable by calling out to list() was more-readable than creating a slice. I caved.

collect_jobs is a good re-factoring, could probably use a doc string tho. And since you're already peping test_replicator.py:TestObjectReplicator.test_run* - do you want to go ahead and fix the "missing whitespace around operator" on 380 & 384?

Do you think it would be safer to scope process_errors as a class variable on MockProcess instead of an inferred global? Is the null_logger/NullHandler stuff used/needed? Do you think it the extra portability of using tempfile.mkdtemp instead of "/dev/shm" would be worth the overhead? I don't *think* mac's have a "/dev/shm"...

test_rsync doesn't have an assert?

Ok, that's it from me...

review: Needs Fixing
114. By David Goetz

changes from code review

115. By David Goetz

adding temp dir thing

Revision history for this message
clayg (clay-gerrard) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'swift/common/utils.py'
2--- swift/common/utils.py 2010-11-01 21:47:48 +0000
3+++ swift/common/utils.py 2010-11-16 19:24:52 +0000
4@@ -29,7 +29,6 @@
5 from contextlib import contextmanager
6 import ctypes
7 import ctypes.util
8-import fcntl
9 import struct
10 from ConfigParser import ConfigParser, NoSectionError, NoOptionError
11 from tempfile import mkstemp
12
13=== modified file 'swift/obj/replicator.py'
14--- swift/obj/replicator.py 2010-10-19 01:05:54 +0000
15+++ swift/obj/replicator.py 2010-11-16 19:24:52 +0000
16@@ -61,7 +61,7 @@
17 elif files:
18 files.sort(reverse=True)
19 meta = data = tomb = None
20- for filename in files:
21+ for filename in list(files):
22 if not meta and filename.endswith('.meta'):
23 meta = filename
24 if not data and filename.endswith('.data'):
25@@ -232,7 +232,7 @@
26 """
27 Execute the rsync binary to replicate a partition.
28
29- :returns: a tuple of (rsync exit code, rsync standard output)
30+ :returns: return code of rsync process. 0 is successful
31 """
32 start_time = time.time()
33 ret_val = None
34@@ -470,6 +470,40 @@
35 self.kill_coros()
36 self.last_replication_count = self.replication_count
37
38+ def collect_jobs(self):
39+ """
40+ Returns a sorted list of jobs (dictionaries) that specify the
41+ partitions, nodes, etc to be rsynced.
42+ """
43+ jobs = []
44+ ips = whataremyips()
45+ for local_dev in [dev for dev in self.object_ring.devs
46+ if dev and dev['ip'] in ips and dev['port'] == self.port]:
47+ dev_path = join(self.devices_dir, local_dev['device'])
48+ obj_path = join(dev_path, 'objects')
49+ tmp_path = join(dev_path, 'tmp')
50+ if self.mount_check and not os.path.ismount(dev_path):
51+ self.logger.warn('%s is not mounted' % local_dev['device'])
52+ continue
53+ unlink_older_than(tmp_path, time.time() - self.reclaim_age)
54+ if not os.path.exists(obj_path):
55+ continue
56+ for partition in os.listdir(obj_path):
57+ try:
58+ nodes = [node for node in
59+ self.object_ring.get_part_nodes(int(partition))
60+ if node['id'] != local_dev['id']]
61+ jobs.append(dict(path=join(obj_path, partition),
62+ nodes=nodes, delete=len(nodes) > 2,
63+ partition=partition))
64+ except ValueError:
65+ continue
66+ random.shuffle(jobs)
67+ # Partititons that need to be deleted take priority
68+ jobs.sort(key=lambda job: not job['delete'])
69+ self.job_count = len(jobs)
70+ return jobs
71+
72 def replicate(self):
73 """Run a replication pass"""
74 self.start = time.time()
75@@ -479,38 +513,11 @@
76 self.replication_count = 0
77 self.last_replication_count = -1
78 self.partition_times = []
79- jobs = []
80 stats = eventlet.spawn(self.heartbeat)
81 lockup_detector = eventlet.spawn(self.detect_lockups)
82 try:
83- ips = whataremyips()
84 self.run_pool = GreenPool(size=self.concurrency)
85- for local_dev in [
86- dev for dev in self.object_ring.devs
87- if dev and dev['ip'] in ips and dev['port'] == self.port]:
88- dev_path = join(self.devices_dir, local_dev['device'])
89- obj_path = join(dev_path, 'objects')
90- tmp_path = join(dev_path, 'tmp')
91- if self.mount_check and not os.path.ismount(dev_path):
92- self.logger.warn('%s is not mounted' % local_dev['device'])
93- continue
94- unlink_older_than(tmp_path, time.time() - self.reclaim_age)
95- if not os.path.exists(obj_path):
96- continue
97- for partition in os.listdir(obj_path):
98- try:
99- nodes = [node for node in
100- self.object_ring.get_part_nodes(int(partition))
101- if node['id'] != local_dev['id']]
102- jobs.append(dict(path=join(obj_path, partition),
103- nodes=nodes, delete=len(nodes) > 2,
104- partition=partition))
105- except ValueError:
106- continue
107- random.shuffle(jobs)
108- # Partititons that need to be deleted take priority
109- jobs.sort(key=lambda job: not job['delete'])
110- self.job_count = len(jobs)
111+ jobs = self.collect_jobs()
112 for job in jobs:
113 if not self.check_ring():
114 self.logger.info(
115
116=== modified file 'test/unit/obj/test_replicator.py'
117--- test/unit/obj/test_replicator.py 2010-08-31 23:12:59 +0000
118+++ test/unit/obj/test_replicator.py 2010-11-16 19:24:52 +0000
119@@ -22,46 +22,88 @@
120 import cPickle as pickle
121 import logging
122 import fcntl
123+import time
124+import tempfile
125 from contextlib import contextmanager
126+from eventlet import tpool
127
128 from eventlet.green import subprocess
129-
130+from swift.common.utils import hash_path, mkdirs, normalize_timestamp
131+from swift.common import ring
132 from swift.obj import replicator as object_replicator
133-from swift.common import ring
134+from swift.obj.server import DiskFile
135+
136
137 def _ips():
138- return ['127.0.0.0',]
139+ return ['127.0.0.0']
140 object_replicator.whataremyips = _ips
141
142-class NullHandler(logging.Handler):
143- def emit(self, record):
144- pass
145-null_logger = logging.getLogger("testing")
146-null_logger.addHandler(NullHandler())
147+
148+def mock_http_connect(status):
149+
150+ class FakeConn(object):
151+
152+ def __init__(self, status, *args, **kwargs):
153+ self.status = status
154+ self.reason = 'Fake'
155+ self.host = args[0]
156+ self.port = args[1]
157+ self.method = args[4]
158+ self.path = args[5]
159+ self.with_exc = False
160+ self.headers = kwargs.get('headers', {})
161+
162+ def getresponse(self):
163+ if self.with_exc:
164+ raise Exception('test')
165+ return self
166+
167+ def getheader(self, header):
168+ return self.headers[header]
169+
170+ def read(self, amt=None):
171+ return pickle.dumps({})
172+
173+ def close(self):
174+ return
175+ return lambda *args, **kwargs: FakeConn(status, *args, **kwargs)
176+
177+process_errors = []
178+
179
180 class MockProcess(object):
181 ret_code = None
182 ret_log = None
183+ check_args = None
184
185 class Stream(object):
186+
187 def read(self):
188 return MockProcess.ret_log.next()
189
190 def __init__(self, *args, **kwargs):
191+ targs = MockProcess.check_args.next()
192+ for targ in targs:
193+ if targ not in args[0]:
194+ process_errors.append("Invalid: %s not in %s" % (targ,
195+ args))
196 self.stdout = self.Stream()
197
198 def wait(self):
199 return self.ret_code.next()
200
201+
202 @contextmanager
203 def _mock_process(ret):
204 orig_process = subprocess.Popen
205 MockProcess.ret_code = (i[0] for i in ret)
206 MockProcess.ret_log = (i[1] for i in ret)
207+ MockProcess.check_args = (i[2] for i in ret)
208 object_replicator.subprocess.Popen = MockProcess
209 yield
210 object_replicator.subprocess.Popen = orig_process
211
212+
213 def _create_test_ring(path):
214 testgz = os.path.join(path, 'object.ring.gz')
215 intended_replica2part2dev_id = [
216@@ -90,7 +132,7 @@
217
218 def setUp(self):
219 # Setup a test ring (stolen from common/test_ring.py)
220- self.testdir = os.path.join('/dev/shm', 'test_replicator')
221+ self.testdir = tempfile.mkdtemp()
222 self.devices = os.path.join(self.testdir, 'node')
223 rmtree(self.testdir, ignore_errors=1)
224 os.mkdir(self.testdir)
225@@ -98,7 +140,9 @@
226 os.mkdir(os.path.join(self.devices, 'sda'))
227 self.objects = os.path.join(self.devices, 'sda', 'objects')
228 os.mkdir(self.objects)
229- for part in ['0','1','2', '3']:
230+ self.parts = {}
231+ for part in ['0', '1', '2', '3']:
232+ self.parts[part] = os.path.join(self.objects, part)
233 os.mkdir(os.path.join(self.objects, part))
234 self.ring = _create_test_ring(self.testdir)
235 self.conf = dict(
236@@ -107,87 +151,221 @@
237 self.replicator = object_replicator.ObjectReplicator(
238 self.conf)
239
240-# def test_check_ring(self):
241-# self.replicator.collect_jobs('sda', 0, self.ring)
242-# self.assertTrue(self.replicator.check_ring())
243-# orig_check = self.replicator.next_check
244-# self.replicator.next_check = orig_check - 30
245-# self.assertTrue(self.replicator.check_ring())
246-# self.replicator.next_check = orig_check
247-# orig_ring_time = self.replicator.object_ring._mtime
248-# self.replicator.object_ring._mtime = orig_ring_time - 30
249-# self.assertTrue(self.replicator.check_ring())
250-# self.replicator.next_check = orig_check - 30
251-# self.assertFalse(self.replicator.check_ring())
252-#
253-# def test_collect_jobs(self):
254-# self.replicator.collect_jobs('sda', 0, self.ring)
255-# self.assertTrue('1' in self.replicator.parts_to_delete)
256-# self.assertEquals(
257-# [node['id'] for node in self.replicator.partitions['0']['nodes']],
258-# [1,2])
259-# self.assertEquals(
260-# [node['id'] for node in self.replicator.partitions['1']['nodes']],
261-# [1,2,3])
262-# self.assertEquals(
263-# [node['id'] for node in self.replicator.partitions['2']['nodes']],
264-# [2,3])
265-# self.assertEquals(
266-# [node['id'] for node in self.replicator.partitions['3']['nodes']],
267-# [3,1])
268-# for part in ['0', '1', '2', '3']:
269-# self.assertEquals(self.replicator.partitions[part]['device'], 'sda')
270-# self.assertEquals(self.replicator.partitions[part]['path'],
271-# self.objects)
272-#
273-# def test_delete_partition(self):
274-# self.replicator.collect_jobs('sda', 0, self.ring)
275-# part_path = os.path.join(self.objects, '1')
276-# self.assertTrue(os.access(part_path, os.F_OK))
277-# self.replicator.delete_partition('1')
278-# self.assertFalse(os.access(part_path, os.F_OK))
279-#
280-# def test_rsync(self):
281-# self.replicator.collect_jobs('sda', 0, self.ring)
282-# with _mock_process([(0,''), (0,''), (0,'')]):
283-# self.replicator.rsync('0')
284-#
285-# def test_rsync_delete_no(self):
286-# self.replicator.collect_jobs('sda', 0, self.ring)
287-# with _mock_process([(-1, "stuff in log"), (-1, "stuff in log"),
288-# (0,''), (0,'')]):
289-# self.replicator.rsync('1')
290-# self.assertEquals(self.replicator.parts_to_delete['1'],
291-# [False, True, True])
292-#
293-# def test_rsync_delete_yes(self):
294-# self.replicator.collect_jobs('sda', 0, self.ring)
295-# with _mock_process([(0,''), (0,''), (0,'')]):
296-# self.replicator.rsync('1')
297-# self.assertEquals(self.replicator.parts_to_delete['1'],
298-# [True, True, True])
299-#
300-# def test_rsync_delete_yes_with_failure(self):
301-# self.replicator.collect_jobs('sda', 0, self.ring)
302-# with _mock_process([(-1, "stuff in log"), (0, ''), (0,''), (0,'')]):
303-# self.replicator.rsync('1')
304-# self.assertEquals(self.replicator.parts_to_delete['1'],
305-# [True, True, True])
306-#
307-# def test_rsync_failed_drive(self):
308-# self.replicator.collect_jobs('sda', 0, self.ring)
309-# with _mock_process([(12,'There was an error in file IO'),
310-# (0,''), (0,''), (0,'')]):
311-# self.replicator.rsync('1')
312-# self.assertEquals(self.replicator.parts_to_delete['1'],
313-# [True, True, True])
314+ def tearDown(self):
315+ process_errors = []
316+ rmtree(self.testdir, ignore_errors=1)
317+
318+ def test_run_once(self):
319+ replicator = object_replicator.ObjectReplicator(
320+ dict(swift_dir=self.testdir, devices=self.devices,
321+ mount_check='false', timeout='300', stats_interval='1'))
322+ was_connector = object_replicator.http_connect
323+ object_replicator.http_connect = mock_http_connect(200)
324+ cur_part = '0'
325+ df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
326+ mkdirs(df.datadir)
327+ f = open(os.path.join(df.datadir,
328+ normalize_timestamp(time.time()) + '.data'),
329+ 'wb')
330+ f.write('1234567890')
331+ f.close()
332+ ohash = hash_path('a', 'c', 'o')
333+ data_dir = ohash[-3:]
334+ whole_path_from = os.path.join(self.objects, cur_part, data_dir)
335+ process_arg_checker = []
336+ nodes = [node for node in
337+ self.ring.get_part_nodes(int(cur_part)) \
338+ if node['ip'] not in _ips()]
339+ for node in nodes:
340+ rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], cur_part)
341+ process_arg_checker.append(
342+ (0, '', ['rsync', whole_path_from, rsync_mod]))
343+ with _mock_process(process_arg_checker):
344+ replicator.run_once()
345+ self.assertFalse(process_errors)
346+
347+ object_replicator.http_connect = was_connector
348+
349+ def test_hash_suffix_one_file(self):
350+ df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
351+ mkdirs(df.datadir)
352+ f = open(os.path.join(df.datadir,
353+ normalize_timestamp(time.time() - 100) + '.ts'),
354+ 'wb')
355+ f.write('1234567890')
356+ f.close()
357+ ohash = hash_path('a', 'c', 'o')
358+ data_dir = ohash[-3:]
359+ whole_path_from = os.path.join(self.objects, '0', data_dir)
360+ object_replicator.hash_suffix(whole_path_from, 101)
361+ self.assertEquals(len(os.listdir(self.parts['0'])), 1)
362+
363+ object_replicator.hash_suffix(whole_path_from, 99)
364+ self.assertEquals(len(os.listdir(self.parts['0'])), 0)
365+
366+ def test_hash_suffix_multi_file_one(self):
367+ df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
368+ mkdirs(df.datadir)
369+ for tdiff in [1, 50, 100, 500]:
370+ for suff in ['.meta', '.data', '.ts']:
371+ f = open(os.path.join(df.datadir,
372+ normalize_timestamp(int(time.time()) - tdiff) + suff),
373+ 'wb')
374+ f.write('1234567890')
375+ f.close()
376+
377+ ohash = hash_path('a', 'c', 'o')
378+ data_dir = ohash[-3:]
379+ whole_path_from = os.path.join(self.objects, '0', data_dir)
380+ hsh_path = os.listdir(whole_path_from)[0]
381+ whole_hsh_path = os.path.join(whole_path_from, hsh_path)
382+
383+ object_replicator.hash_suffix(whole_path_from, 99)
384+ # only the tombstone should be left
385+ self.assertEquals(len(os.listdir(whole_hsh_path)), 1)
386+
387+ def test_hash_suffix_multi_file_two(self):
388+ df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
389+ mkdirs(df.datadir)
390+ for tdiff in [1, 50, 100, 500]:
391+ suffs = ['.meta', '.data']
392+ if tdiff > 50:
393+ suffs.append('.ts')
394+ for suff in suffs:
395+ f = open(os.path.join(df.datadir,
396+ normalize_timestamp(int(time.time()) - tdiff) + suff),
397+ 'wb')
398+ f.write('1234567890')
399+ f.close()
400+
401+ ohash = hash_path('a', 'c', 'o')
402+ data_dir = ohash[-3:]
403+ whole_path_from = os.path.join(self.objects, '0', data_dir)
404+ hsh_path = os.listdir(whole_path_from)[0]
405+ whole_hsh_path = os.path.join(whole_path_from, hsh_path)
406+
407+ object_replicator.hash_suffix(whole_path_from, 99)
408+ # only the meta and data should be left
409+ self.assertEquals(len(os.listdir(whole_hsh_path)), 2)
410+
411+ def test_invalidate_hash(self):
412+
413+ def assertFileData(file_path, data):
414+ with open(file_path, 'r') as fp:
415+ fdata = fp.read()
416+ self.assertEquals(fdata, data)
417+
418+ df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
419+ mkdirs(df.datadir)
420+ ohash = hash_path('a', 'c', 'o')
421+ data_dir = ohash[-3:]
422+ whole_path_from = os.path.join(self.objects, '0', data_dir)
423+ hashes_file = os.path.join(self.objects, '0',
424+ object_replicator.HASH_FILE)
425+ # test that non existant file except caught
426+ self.assertEquals(object_replicator.invalidate_hash(whole_path_from),
427+ None)
428+ # test that hashes get cleared
429+ check_pickle_data = pickle.dumps({data_dir: None},
430+ object_replicator.PICKLE_PROTOCOL)
431+ for data_hash in [{data_dir: None}, {data_dir: 'abcdefg'}]:
432+ with open(hashes_file, 'wb') as fp:
433+ pickle.dump(data_hash, fp, object_replicator.PICKLE_PROTOCOL)
434+ object_replicator.invalidate_hash(whole_path_from)
435+ assertFileData(hashes_file, check_pickle_data)
436+
437+ def test_check_ring(self):
438+ self.assertTrue(self.replicator.check_ring())
439+ orig_check = self.replicator.next_check
440+ self.replicator.next_check = orig_check - 30
441+ self.assertTrue(self.replicator.check_ring())
442+ self.replicator.next_check = orig_check
443+ orig_ring_time = self.replicator.object_ring._mtime
444+ self.replicator.object_ring._mtime = orig_ring_time - 30
445+ self.assertTrue(self.replicator.check_ring())
446+ self.replicator.next_check = orig_check - 30
447+ self.assertFalse(self.replicator.check_ring())
448+
449+ def test_collect_jobs(self):
450+ jobs = self.replicator.collect_jobs()
451+ jobs_to_delete = [j for j in jobs if j['delete']]
452+ jobs_to_keep = [j for j in jobs if not j['delete']]
453+ jobs_by_part = {}
454+ for job in jobs:
455+ jobs_by_part[job['partition']] = job
456+ self.assertEquals(len(jobs_to_delete), 1)
457+ self.assertTrue('1', jobs_to_delete[0]['partition'])
458+ self.assertEquals(
459+ [node['id'] for node in jobs_by_part['0']['nodes']], [1, 2])
460+ self.assertEquals(
461+ [node['id'] for node in jobs_by_part['1']['nodes']], [1, 2, 3])
462+ self.assertEquals(
463+ [node['id'] for node in jobs_by_part['2']['nodes']], [2, 3])
464+ self.assertEquals(
465+ [node['id'] for node in jobs_by_part['3']['nodes']], [3, 1])
466+ for part in ['0', '1', '2', '3']:
467+ for node in jobs_by_part[part]['nodes']:
468+ self.assertEquals(node['device'], 'sda')
469+ self.assertEquals(jobs_by_part[part]['path'],
470+ os.path.join(self.objects, part))
471+
472+ def test_delete_partition(self):
473+ df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
474+ mkdirs(df.datadir)
475+ ohash = hash_path('a', 'c', 'o')
476+ data_dir = ohash[-3:]
477+ part_path = os.path.join(self.objects, '1')
478+ self.assertTrue(os.access(part_path, os.F_OK))
479+ self.replicator.replicate()
480+ self.assertFalse(os.access(part_path, os.F_OK))
481+
482+ def test_run_once_recover_from_failure(self):
483+ replicator = object_replicator.ObjectReplicator(
484+ dict(swift_dir=self.testdir, devices=self.devices,
485+ mount_check='false', timeout='300', stats_interval='1'))
486+ was_connector = object_replicator.http_connect
487+ object_replicator.http_connect = mock_http_connect(200)
488+ # Write some files into '1' and run replicate- they should be moved
489+ # to the other partitoins and then node should get deleted.
490+ cur_part = '1'
491+ df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
492+ mkdirs(df.datadir)
493+ f = open(os.path.join(df.datadir,
494+ normalize_timestamp(time.time()) + '.data'),
495+ 'wb')
496+ f.write('1234567890')
497+ f.close()
498+ ohash = hash_path('a', 'c', 'o')
499+ data_dir = ohash[-3:]
500+ whole_path_from = os.path.join(self.objects, cur_part, data_dir)
501+ process_arg_checker = []
502+ nodes = [node for node in
503+ self.ring.get_part_nodes(int(cur_part)) \
504+ if node['ip'] not in _ips()]
505+ for node in nodes:
506+ rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], cur_part)
507+ process_arg_checker.append(
508+ (0, '', ['rsync', whole_path_from, rsync_mod]))
509+ self.assertTrue(os.access(os.path.join(self.objects,
510+ '1', data_dir, ohash),
511+ os.F_OK))
512+ with _mock_process(process_arg_checker):
513+ replicator.run_once()
514+ self.assertFalse(process_errors)
515+ for i, result in [('0', True), ('1', False),
516+ ('2', True), ('3', True)]:
517+ self.assertEquals(os.access(
518+ os.path.join(self.objects,
519+ i, object_replicator.HASH_FILE),
520+ os.F_OK), result)
521+ object_replicator.http_connect = was_connector
522
523 def test_run(self):
524- with _mock_process([(0,'')]*100):
525+ with _mock_process([(0, '')] * 100):
526 self.replicator.replicate()
527
528 def test_run_withlog(self):
529- with _mock_process([(0,"stuff in log")]*100):
530+ with _mock_process([(0, "stuff in log")] * 100):
531 self.replicator.replicate()
532
533 if __name__ == '__main__':