Merge lp:~david-goetz/swift/walt into lp:~hudson-openstack/swift/trunk

Proposed by David Goetz
Status: Rejected
Rejected by: clayg
Proposed branch: lp:~david-goetz/swift/walt
Merge into: lp:~hudson-openstack/swift/trunk
Diff against target: 427 lines (+400/-1)
3 files modified
bin/walt (+398/-0)
setup.py (+1/-0)
swift/common/direct_client.py (+1/-1)
To merge this branch: bzr merge lp:~david-goetz/swift/walt
Reviewer Review Type Date Requested Status
clayg Disapprove
Review via email: mp+48828@code.launchpad.net

Description of the change

Adding walt into swift.

To post a comment you must log in.
Revision history for this message
clayg (clay-gerrard) wrote :

This thing is awesome - get yourself a copy on github: https://github.com/gholt/tcod/blob/master/walt

review: Disapprove

Unmerged revisions

208. By David Goetz

merging golts changes

207. By David Goetz

merging the direct delete changes

206. By David Goetz

merging the direct delete functions

205. By David Goetz

adding walt to swift (it was better when it was underground)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'bin/walt'
2--- bin/walt 1970-01-01 00:00:00 +0000
3+++ bin/walt 2011-02-07 19:22:53 +0000
4@@ -0,0 +1,398 @@
5+#!/usr/bin/python
6+
7+import eventlet
8+eventlet.monkey_patch()
9+
10+from optparse import OptionParser
11+from os import environ
12+from random import random, randint
13+from sys import argv, exit
14+from time import time
15+from urlparse import urlparse
16+from uuid import uuid4
17+
18+from eventlet import GreenPool, sleep, Timeout
19+from eventlet.semaphore import Semaphore
20+
21+from swift.common.bufferedhttp import http_connect
22+from swift.common.client import get_auth, ClientException, Connection
23+from swift.common.direct_client import direct_get_container, \
24+ direct_head_container, retry
25+from swift.common.utils import normalize_timestamp
26+
27+default_cont_name = 'walt'
28+
29+
30+def container_names(num_containers):
31+ if num_containers:
32+ return ['%s_%d' % (default_cont_name, i)
33+ for i in range(num_containers)]
34+ else:
35+ return [default_cont_name]
36+
37+
38+def get_rand_container_name(num_containers):
39+ if num_containers:
40+ return '%s_%s' % (default_cont_name, int(random() * num_containers))
41+ return default_cont_name
42+
43+
44+def direct_put_object_c(node, part, account, container, obj, conn_timeout=5,
45+ response_timeout=15):
46+ path = '/%s/%s/%s' % (account, container, obj)
47+ with Timeout(conn_timeout):
48+ conn = http_connect(node['ip'], node['port'], node['device'], part,
49+ 'PUT', path, headers={'x-content-type': 'text/plain',
50+ 'x-etag': 'd41d8cd98f00b204e9800998ecf8427e', 'x-size': '0',
51+ 'x-timestamp': normalize_timestamp(time())})
52+ with Timeout(response_timeout):
53+ resp = conn.getresponse()
54+ resp.read()
55+ if resp.status < 200 or resp.status >= 300:
56+ raise ClientException(
57+ 'Container server %s:%s direct object PUT %s gave status %s' %
58+ (node['ip'], node['port'],
59+ repr('/%s/%s%s' % (node['device'], part, path)),
60+ resp.status),
61+ http_host=node['ip'], http_port=node['port'],
62+ http_device=node['device'], http_status=resp.status,
63+ http_reason=resp.reason)
64+ return resp.status
65+
66+
67+def direct_delete_object_c(node, part, account, container, obj, conn_timeout=5,
68+ response_timeout=15):
69+ path = '/%s/%s/%s' % (account, container, obj)
70+ with Timeout(conn_timeout):
71+ conn = http_connect(node['ip'], node['port'], node['device'], part,
72+ 'DELETE', path,
73+ headers={'x-timestamp': normalize_timestamp(time())})
74+ with Timeout(response_timeout):
75+ resp = conn.getresponse()
76+ resp.read()
77+ if resp.status < 200 or resp.status >= 300:
78+ raise ClientException(
79+ 'Container server %s:%s direct object DELETE %s gave status %s' %
80+ (node['ip'], node['port'],
81+ repr('/%s/%s%s' % (node['device'], part, path)), resp.status),
82+ http_host=node['ip'], http_port=node['port'],
83+ http_device=node['device'], http_status=resp.status,
84+ http_reason=resp.reason)
85+ return resp.status
86+
87+
88+def do_listings(left, sleep_amount, options, extra, extra2):
89+ if options.direct:
90+ while left[0] > 0:
91+ left[0] -= 1
92+ retry(direct_get_container, options.node, options.part,
93+ options.account, options.container)
94+ sleep(sleep_amount)
95+ else:
96+ conn = Connection(options.auth, options.user, options.key,
97+ preauthurl=options.preauthurl,
98+ preauthtoken=options.preauthtoken, snet=options.snet)
99+ while left[0] > 0:
100+ left[0] -= 1
101+ conn.get_container(get_rand_container_name(options.num_containers))
102+ sleep(sleep_amount)
103+
104+
105+def fill_queue_from_listings(options, queue):
106+ if options.direct:
107+ for cont_name in container_names(options.num_containers):
108+ _junk, cont_list = retry(direct_get_container, options.node,
109+ options.part, options.account, options.container)
110+ if cont_list and cont_list[1]:
111+ for obj_dict in cont_list[1]:
112+ if (cont_name, obj_dict['name']) not in queue['used']:
113+ queue['ok'].add((cont_name, obj_dict['name']))
114+ break
115+ else:
116+ conn = Connection(options.auth, options.user, options.key,
117+ preauthurl=options.preauthurl,
118+ preauthtoken=options.preauthtoken, snet=options.snet)
119+ for cont_name in container_names(options.num_containers):
120+ cont_list = conn.get_container(cont_name)
121+ if cont_list and cont_list[1]:
122+ for obj_dict in cont_list[1]:
123+ if (cont_name, obj_dict['name']) not in queue['used']:
124+ queue['ok'].add((cont_name, obj_dict['name']))
125+ break
126+
127+
128+def do_object_deletes(left, sleep_amount, options, queue, sema):
129+ if options.direct:
130+ num_sleeps = 0
131+ tried_listing = False
132+ while left[0] > 0:
133+ try:
134+ with sema:
135+ cont_name, obj_name = queue['ok'].pop()
136+ queue['used'].add((cont_name, obj_name))
137+ except KeyError:
138+ if num_sleeps > 1:
139+ if not tried_listing:
140+ with sema:
141+ fill_queue_from_listings(options, queue)
142+ tried_listing = True
143+ continue
144+ left[0] = 0
145+ print "Nothing more to delete!!"
146+ break
147+ num_sleeps += 1
148+ sleep(1)
149+ continue
150+ tried_listing = False
151+ retry(direct_delete_object_c, options.node, options.part,
152+ options.account, cont_name, obj_name)
153+ left[0] -= 1
154+ sleep(sleep_amount)
155+ else:
156+ conn = Connection(options.auth, options.user, options.key,
157+ preauthurl=options.preauthurl,
158+ preauthtoken=options.preauthtoken, snet=options.snet)
159+ num_sleeps = 0
160+ tried_listing = False
161+ while left[0] > 0:
162+ try:
163+ with sema:
164+ cont_name, obj_name = queue['ok'].pop()
165+ queue['used'].add((cont_name, obj_name))
166+ tried_listing = False
167+ conn.delete_object(cont_name, obj_name)
168+ left[0] -= 1
169+ except KeyError:
170+ if num_sleeps > 1:
171+ if not tried_listing:
172+ with sema:
173+ fill_queue_from_listings(options, queue)
174+ tried_listing = True
175+ continue
176+ left[0] = 0
177+ print "Nothing more to delete!!"
178+ break
179+ num_sleeps += 1
180+ sleep(1)
181+ continue
182+ sleep(sleep_amount)
183+
184+
185+def do_object_puts(left, sleep_amount, options, queue, sema):
186+ if options.direct:
187+ while left[0] > 0:
188+ left[0] -= 1
189+ retry(direct_put_object_c, options.node, options.part,
190+ options.account, options.container, uuid4().hex)
191+ sleep(sleep_amount)
192+ else:
193+ conn = Connection(options.auth, options.user, options.key,
194+ preauthurl=options.preauthurl,
195+ preauthtoken=options.preauthtoken, snet=options.snet)
196+ while left[0] > 0:
197+ left[0] -= 1
198+ data = '0' * options.object_size
199+ cont_name = get_rand_container_name(options.num_containers)
200+ name_ok = False
201+ while not name_ok:
202+ obj_name = uuid4().hex
203+ if obj_name not in queue['used']:
204+ name_ok = True
205+ conn.put_object(cont_name,
206+ obj_name, data, content_length=len(data))
207+ with sema:
208+ queue['ok'].add((cont_name, obj_name))
209+ sleep(sleep_amount)
210+
211+
212+def do_container_heads(left, sleep_amount, options, extra, extra2):
213+ if options.direct:
214+ while left[0] > 0:
215+ left[0] -= 1
216+ retry(direct_head_container, options.node, options.part,
217+ options.account, options.container)
218+ sleep(sleep_amount)
219+ else:
220+ conn = Connection(options.auth, options.user, options.key,
221+ preauthurl=options.preauthurl,
222+ preauthtoken=options.preauthtoken, snet=options.snet)
223+ while left[0] > 0:
224+ left[0] -= 1
225+ conn.head_container(
226+ get_rand_container_name(options.num_containers))
227+ sleep(sleep_amount)
228+
229+
230+TEST_FUNCS = [do_listings, do_object_puts,
231+ do_container_heads, do_object_deletes]
232+
233+
234+def run_test(number, options, arg, queue, sema):
235+ begin = time()
236+ iterations = 1
237+ concurrency = 1
238+ sleep_amount = 0
239+ while arg:
240+ typ = arg[0]
241+ i = 1
242+ while i < len(arg) and (arg[i].isdigit() or arg[i] == '.'):
243+ i += 1
244+ val = float(arg[1:i])
245+ arg = arg[i:]
246+ if typ == 'c':
247+ concurrency = int(val)
248+ elif typ == 'x':
249+ iterations = int(val)
250+ elif typ == 's':
251+ sleep_amount = val
252+ else:
253+ raise Exception('Unknown test argument type: %s' % repr(typ))
254+ if not options.direct:
255+ conn = Connection(options.auth, options.user, options.key,
256+ preauthurl=options.preauthurl,
257+ preauthtoken=options.preauthtoken, snet=options.snet)
258+ for cont_name in container_names(options.num_containers):
259+ conn.put_container(cont_name)
260+ gpool = GreenPool()
261+
262+ print 'Running test %s %s times at %s concurrency' % \
263+ (number, iterations, concurrency)
264+ left = [iterations]
265+ for _ in xrange(concurrency):
266+ gpool.spawn(TEST_FUNCS[number - 1], left, sleep_amount,
267+ options, queue, sema)
268+ last = 0
269+ tick = time()
270+ while left[0] > 0:
271+ while time() - tick < options.report_interval:
272+ if not left[0]:
273+ break
274+ sleep(.1)
275+ if left[0] > 0:
276+ elapsed = time() - tick
277+ persec = (iterations - left[0] - last) / elapsed
278+ print 'Test %s: %s of %s: %.03f/s for the last %.03fs' % \
279+ (number, iterations - left[0], iterations, persec, elapsed)
280+ r = [None, None] * len(TEST_FUNCS)
281+ r[(number - 1) * 2] = iterations - left[0]
282+ r[(number - 1) * 2 + 1] = persec
283+ if options.reportfile:
284+ options.reportfile.write('%s,' % time() +
285+ ','.join([v and str(v) or '' for v in r]) + '\n')
286+ last = iterations - left[0]
287+ tick = time()
288+ gpool.waitall()
289+ print 'Test %s finished: %.03fs: %.03f/s overall' % \
290+ (number, time() - begin, iterations / (time() - begin))
291+
292+
293+if __name__ == '__main__':
294+ begin = time()
295+ import gettext
296+ gettext.install('swift', unicode=1)
297+ parser = OptionParser(usage='''
298+Usage: %%prog [options] <test> [<test>] ...
299+Where <test> is: <number>[x<count>][c<concurrency>][s<sleep>]
300+ <count> is how many to do in total
301+ <concurrency> is how many to do at any given moment
302+ <sleep> is how long to wait before doing another, with the caveat
303+ that this applies at a single concurrency level
304+Available test numbers:
305+ 1 Does listing requests on the container named walt
306+ 2 Does zero byte object puts into the container named walt
307+ 3 Does head requests on the container named walt
308+ 4 Does object deletes
309+
310+If -C (multiple containers) is set then instead of hitting walt it will use a
311+randomly chosen container
312+
313+Examples:
314+ %%prog -A http://127.0.0.1:11000/v1.0 -U user -K key 1x100c10s2
315+ %%prog -D http://127.0.0.1:6041/sdb4/106861/8647f434-ce65-4295-ab35-1b921a5a4567/walt 1x100c10s2
316+
317+ This would do 100 total listings, up to 10 at any given time, and each of
318+ those ten "concurrency threads" would sleep for 2 seconds between listings.
319+'''.strip('\n') % globals())
320+ parser.add_option('-f', '--file', dest='filename', default=None,
321+ help='File name for storing CSV reports (default: None)')
322+ parser.add_option('-r', '--report-interval', dest='report_interval',
323+ default=60,
324+ help='Number of seconds between report updates')
325+ parser.add_option('-s', '--snet', action='store_true', dest='snet',
326+ default=False, help='Use SERVICENET internal network')
327+ parser.add_option('-A', '--auth', dest='auth',
328+ help='URL for obtaining an auth token')
329+ parser.add_option('-U', '--user', dest='user',
330+ help='User name for obtaining an auth token')
331+ parser.add_option('-K', '--key', dest='key',
332+ help='Key for obtaining an auth token')
333+ parser.add_option('-D', '--direct', dest='direct', default=None,
334+ help='URL for running direct container server tests; '
335+ 'the URL should be for the container itself, ex: '
336+ 'http://127.0.0.1:6041/sdb4/106861/8647f434-ce65-'
337+ '4295-ab35-1b921a5a4567/walt')
338+ parser.add_option('-C', '--num-put-containers', dest='num_put_containers',
339+ default=0,
340+ help='Number of container used (default 0 is only '
341+ 'walt) only non-Direct mode.')
342+ parser.add_option('-S', '--object-size', dest='object_size',
343+ default=0,
344+ help='Size of objects to PUT (in bytes) '
345+ 'only non-Direct mode.')
346+ parser.add_option('-N', '--container-name', dest='container_name',
347+ default='walt',
348+ help='Override default container name (walt).')
349+ args = argv[1:]
350+ if not args:
351+ args.append('-h')
352+ (options, args) = parser.parse_args(args)
353+ options.num_containers = int(options.num_put_containers)
354+ options.object_size = int(options.object_size)
355+ options.report_interval = float(options.report_interval)
356+ options.reportfile = options.filename and open(options.filename, 'w')
357+ if options.container_name:
358+ default_cont_name = options.container_name
359+ if options.reportfile:
360+ options.reportfile.write('Time,' +
361+ ','.join(['Test %s Count,Test %s Rate' % (n, n)
362+ for n in xrange(1, len(TEST_FUNCS) + 1)]) + '\n')
363+
364+ required_help = '''
365+Requires WALT_AUTH, WALT_USER, and WALT_KEY environment variables be set or
366+overridden with -A, -U, or -K. Alternatively, you may use -D for direct
367+container server testing.'''.strip('\n')
368+ if not options.direct:
369+ for attr in ('auth', 'user', 'key'):
370+ if not getattr(options, attr, None):
371+ setattr(options, attr, environ.get('WALT_%s' % attr.upper()))
372+ if not getattr(options, attr, None):
373+ exit(required_help)
374+ else:
375+ parsed = urlparse(options.direct)
376+ if ':' in parsed[1]:
377+ ip, port = parsed[1].split(':', 1)
378+ port = int(port)
379+ else:
380+ ip = parsed[1]
381+ port = 80
382+ _, device, options.part, options.account, options.container = \
383+ parsed[2].split('/')
384+ options.node = {'ip': ip, 'port': port, 'device': device}
385+
386+ if not args:
387+ parser.print_help()
388+ exit()
389+
390+ if not options.direct:
391+ options.preauthurl, options.preauthtoken = \
392+ get_auth(options.auth, options.user, options.key, snet=options.snet)
393+ gpool = GreenPool()
394+ queue = {'ok': set(), 'used': set()}
395+ sema = Semaphore()
396+ for arg in args:
397+ i = 0
398+ while i < len(arg) and arg[i].isdigit():
399+ i += 1
400+ gpool.spawn(run_test, int(arg[:i]), options, arg[i:], queue, sema)
401+ gpool.waitall()
402+ print 'All tests finished: %.03fs' % (time() - begin)
403
404=== modified file 'setup.py'
405--- setup.py 2011-01-27 00:06:20 +0000
406+++ setup.py 2011-02-07 19:22:53 +0000
407@@ -101,6 +101,7 @@
408 'bin/swauth-cleanup-tokens', 'bin/swauth-delete-account',
409 'bin/swauth-delete-user', 'bin/swauth-list', 'bin/swauth-prep',
410 'bin/swauth-set-account-service', 'bin/swift-auth-to-swauth',
411+ 'bin/walt',
412 ],
413 entry_points={
414 'paste.app_factory': [
415
416=== modified file 'swift/common/direct_client.py'
417--- swift/common/direct_client.py 2011-01-04 23:34:43 +0000
418+++ swift/common/direct_client.py 2011-02-07 19:22:53 +0000
419@@ -330,7 +330,7 @@
420 :param kwargs: keyward arguments to send to func (if retries or
421 error_log are sent, they will be deleted from kwargs
422 before sending on to func)
423- :returns: restult of func
424+ :returns: A tuple of (attempts, result of func)
425 """
426 retries = 5
427 if 'retries' in kwargs: