Merge lp:~notmyname/swift/stats_system into lp:~hudson-openstack/swift/trunk

Proposed by John Dickinson
Status: Superseded
Proposed branch: lp:~notmyname/swift/stats_system
Merge into: lp:~hudson-openstack/swift/trunk
Diff against target: 1142 lines (+1090/-0)
10 files modified
bin/swift-account-stats-logger.py (+81/-0)
bin/swift-log-uploader (+83/-0)
etc/log-processing.conf-sample (+28/-0)
swift/common/compressed_file_reader.py (+72/-0)
swift/common/internal_proxy.py (+174/-0)
swift/stats/access_processor.py (+168/-0)
swift/stats/account_stats.py (+69/-0)
swift/stats/log_processor.py (+226/-0)
swift/stats/log_uploader.py (+135/-0)
swift/stats/stats_processor.py (+54/-0)
To merge this branch: bzr merge lp:~notmyname/swift/stats_system
Reviewer Review Type Date Requested Status
Swift Core security contacts Pending
Review via email: mp+32502@code.launchpad.net

This proposal supersedes a proposal from 2010-08-06.

This proposal has been superseded by a proposal from 2010-08-14.

Description of the change

work in progress of moving the existing internal (pre-openstack) swift stats system to (openstack) swift

To post a comment you must log in.
lp:~notmyname/swift/stats_system updated
53. By John Dickinson

added access log processing plugin

54. By John Dickinson

merged with trunk

55. By John Dickinson

merged with trunk

56. By John Dickinson

merged with trunk

57. By John Dickinson

merged with trunk

58. By John Dickinson

merged with trunk

59. By John Dickinson

merged with trunk

60. By John Dickinson

initial tests for the stats system

61. By John Dickinson

first test working

62. By John Dickinson

merged with trunk

63. By John Dickinson

access log parsing tests pass

64. By John Dickinson

added (working) stats tests

65. By John Dickinson

merged with trunk (utils fix)

66. By John Dickinson

updated stats binaries to be DRY compliant

67. By John Dickinson

merged with trunk

68. By John Dickinson

set up log-stats-collector as a daemon process to create csv files

69. By John Dickinson

merged with trunk

70. By John Dickinson

added execute perms to stats processor binaries

71. By John Dickinson

updated config file loading to work with paste.deploy configs

72. By John Dickinson

fixed typos

73. By John Dickinson

fixed internal proxy loading

74. By John Dickinson

made a memcache stub for the internal proxy server

75. By John Dickinson

fixed internal proxy put_container reference

76. By John Dickinson

fixed some log uploading glob patterns

77. By John Dickinson

fixed bug in calculating offsets for filename patterns

78. By John Dickinson

fixed typos in log processor

79. By John Dickinson

fixed get_data_list in log_processor

80. By John Dickinson

added some debug output

81. By John Dickinson

fixed logging and log uploading

82. By John Dickinson

fixed copy/paste errors and missing imports

83. By John Dickinson

added error handling and missing return statement

84. By John Dickinson

handled some typos and better handling of missing data in internal proxy

85. By John Dickinson

fixed tests, typos, and added error handling

86. By John Dickinson

fixed bug in account stats log processing

87. By John Dickinson

fixed listing filter in log processing

88. By John Dickinson

fixed stdout capturing for generating csv files

89. By John Dickinson

fixed replica count reporting error

90. By John Dickinson

fixed lookback in log processor

91. By John Dickinson

merged with trunk

92. By John Dickinson

merged with trunk

93. By John Dickinson

fixed tests to account for changed key name

94. By John Dickinson

fixed test bug

95. By John Dickinson

updated with changes and suggestions from code review

96. By John Dickinson

merged with changes from trunk

97. By John Dickinson

added stats overview

98. By John Dickinson

updated with changes from trunk

99. By John Dickinson

added additional docs

100. By John Dickinson

documentation clarification and pep8 fixes

101. By John Dickinson

added overview stats to the doc index

102. By John Dickinson

made long lines wrap (grr pep8)

103. By John Dickinson

merged with trunk

104. By John Dickinson

merged with trunk

105. By John Dickinson

merged with trunk

106. By John Dickinson

fixed stats docs

107. By John Dickinson

added a bad lines check to the access log parser

108. By John Dickinson

added tests for compressing file reader

109. By John Dickinson

fixed compressing file reader test

110. By John Dickinson

fixed compressing file reader test

111. By John Dickinson

improved compressing file reader test

112. By John Dickinson

fixed compressing file reader test

113. By John Dickinson

made try/except much less inclusive in access log processor

114. By John Dickinson

added keylist mapping tests and fixed other tests

115. By John Dickinson

updated stats system tests

116. By John Dickinson

merged with gholt's test stubs

117. By John Dickinson

updated SAIO instructions for the stats system

118. By John Dickinson

fixed stats system saio docs

119. By John Dickinson

fixed stats system saio docs

120. By John Dickinson

added openstack copyright/license to test_log_processor.py

121. By John Dickinson

improved logging in log processors

122. By John Dickinson

merged with trunk

123. By John Dickinson

fixed missing working directory bug in account stats

124. By John Dickinson

fixed readconf parameter that was broken with a previous merge

125. By John Dickinson

updated setup.py and saio docs for syats system

126. By John Dickinson

added readconf unit test

127. By John Dickinson

updated stats saio docs to create logs with the appropriate permissions

128. By John Dickinson

fixed bug in log processor internal proxy lazy load code

129. By John Dickinson

updated readconf test

130. By John Dickinson

updated readconf test

131. By John Dickinson

updated readconf test

132. By John Dickinson

fixed internal proxy references in log processor

133. By John Dickinson

fixed account stats filename creation

134. By John Dickinson

pep8 tomfoolery

135. By John Dickinson

moved paren

136. By John Dickinson

added lazy load of internal proxy to log processor (you were right clay)

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'bin/swift-account-stats-logger.py'
2--- bin/swift-account-stats-logger.py 1970-01-01 00:00:00 +0000
3+++ bin/swift-account-stats-logger.py 2010-08-14 18:41:02 +0000
4@@ -0,0 +1,81 @@
5+#!/usr/bin/python
6+# Copyright (c) 2010 OpenStack, LLC.
7+#
8+# Licensed under the Apache License, Version 2.0 (the "License");
9+# you may not use this file except in compliance with the License.
10+# You may obtain a copy of the License at
11+#
12+# http://www.apache.org/licenses/LICENSE-2.0
13+#
14+# Unless required by applicable law or agreed to in writing, software
15+# distributed under the License is distributed on an "AS IS" BASIS,
16+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
17+# implied.
18+# See the License for the specific language governing permissions and
19+# limitations under the License.
20+
21+import os
22+import signal
23+import sys
24+import time
25+from ConfigParser import ConfigParser
26+
27+from swift.account_stats import AccountStat
28+from swift.common import utils
29+
30+if __name__ == '__main__':
31+ if len(sys.argv) < 2:
32+ print "Usage: swift-account-stats-logger CONFIG_FILE"
33+ sys.exit()
34+
35+ c = ConfigParser()
36+ if not c.read(sys.argv[1]):
37+ print "Unable to read config file."
38+ sys.exit(1)
39+
40+ if c.has_section('log-processor-stats'):
41+ stats_conf = dict(c.items('log-processor-stats'))
42+ else:
43+ print "Unable to find log-processor-stats config section in %s." % \
44+ sys.argv[1]
45+ sys.exit(1)
46+
47+ # reference this from the account stats conf
48+
49+ target_dir = stats.conf.get('log_dir', '/var/log/swift')
50+ account_server_conf_loc = stats_conf.get('account_server_conf',
51+ '/etc/swift/account-server.conf')
52+ filename_format = stats.conf['source_filename_format']
53+ try:
54+ c = ConfigParser()
55+ c.read(account_server_conf_loc)
56+ account_server_conf = dict(c.items('account-server'))
57+ except:
58+ print "Unable to load account server conf from %s" % account_server_conf_loc
59+ sys.exit(1)
60+
61+ utils.drop_privileges(account_server_conf.get('user', 'swift'))
62+
63+ try:
64+ os.setsid()
65+ except OSError:
66+ pass
67+
68+ logger = utils.get_logger(stats_conf, 'swift-account-stats-logger')
69+
70+ def kill_children(*args):
71+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
72+ os.killpg(0, signal.SIGTERM)
73+ sys.exit()
74+
75+ signal.signal(signal.SIGTERM, kill_children)
76+
77+ stats = AccountStat(filename_format,
78+ target_dir,
79+ account_server_conf,
80+ logger)
81+ logger.info("Gathering account stats")
82+ start = time.time()
83+ stats.find_and_process()
84+ logger.info("Gathering account stats complete (%0.2f minutes)" %
85+ ((time.time()-start)/60))
86
87=== added file 'bin/swift-log-uploader'
88--- bin/swift-log-uploader 1970-01-01 00:00:00 +0000
89+++ bin/swift-log-uploader 2010-08-14 18:41:02 +0000
90@@ -0,0 +1,83 @@
91+#!/usr/bin/python
92+# Copyright (c) 2010 OpenStack, LLC.
93+#
94+# Licensed under the Apache License, Version 2.0 (the "License");
95+# you may not use this file except in compliance with the License.
96+# You may obtain a copy of the License at
97+#
98+# http://www.apache.org/licenses/LICENSE-2.0
99+#
100+# Unless required by applicable law or agreed to in writing, software
101+# distributed under the License is distributed on an "AS IS" BASIS,
102+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
103+# implied.
104+# See the License for the specific language governing permissions and
105+# limitations under the License.
106+
107+import os
108+import signal
109+import sys
110+import time
111+from ConfigParser import ConfigParser
112+
113+from swift.stats.log_uploader import LogUploader
114+from swift.common.utils import get_logger
115+
116+if __name__ == '__main__':
117+ if len(sys.argv) < 3:
118+ print "Usage: swift-log-uploader CONFIG_FILE plugin"
119+ sys.exit()
120+
121+ c = ConfigParser()
122+ if not c.read(sys.argv[1]):
123+ print "Unable to read config file."
124+ sys.exit(1)
125+
126+ if c.has_section('log-processor'):
127+ parser_conf = dict(c.items('log-processor'))
128+ else:
129+ print "Unable to find log-processor config section in %s." % sys.argv[1]
130+ sys.exit(1)
131+
132+ plugin = sys.argv[2]
133+ section_name = 'log-processor-%s' % plugin
134+ if c.has_section(section_name):
135+ uploader_conf.update(dict(c.items(section_name)))
136+ else:
137+ print "Unable to find %s config section in %s." % (section_name,
138+ sys.argv[1])
139+ sys.exit(1)
140+
141+ try:
142+ os.setsid()
143+ except OSError:
144+ pass
145+
146+ logger = get_logger(uploader_conf, 'swift-log-uploader')
147+
148+ def kill_children(*args):
149+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
150+ os.killpg(0, signal.SIGTERM)
151+ sys.exit()
152+
153+ signal.signal(signal.SIGTERM, kill_children)
154+
155+ log_dir = uploader_conf.get('log_dir', '/var/log/swift/')
156+ swift_account = uploader_conf['swift_account']
157+ container_name = uploader_conf['container_name']
158+ source_filename_format = uploader_conf['source_filename_format']
159+ proxy_server_conf_loc = uploader_conf.get('proxy_server_conf',
160+ '/etc/swift/proxy-server.conf')
161+ try:
162+ c = ConfigParser()
163+ c.read(proxy_server_conf_loc)
164+ proxy_server_conf = dict(c.items('proxy-server'))
165+ except:
166+ proxy_server_conf = None
167+ uploader = LogUploader(log_dir, swift_account, container_name,
168+ source_filename_format, proxy_server_conf, logger)
169+ logger.info("Uploading logs")
170+ start = time.time()
171+ uploader.upload_all_logs()
172+ logger.info("Uploading logs complete (%0.2f minutes)" %
173+ ((time.time()-start)/60))
174
175=== added file 'etc/log-processing.conf-sample'
176--- etc/log-processing.conf-sample 1970-01-01 00:00:00 +0000
177+++ etc/log-processing.conf-sample 2010-08-14 18:41:02 +0000
178@@ -0,0 +1,28 @@
179+# plugin section format is named "log-processor-<plugin>"
180+# section "log-processor" is the generic defaults (overridden by plugins)
181+
182+[log-processor]
183+# working_dir = /tmp/swift/
184+# proxy_server_conf = /etc/swift/proxy-server.conf
185+# log_facility = LOG_LOCAL0
186+# log_level = INFO
187+# lookback_hours = 120
188+# lookback_window = 120
189+
190+[log-processor-access]
191+# log_dir = /var/log/swift/
192+swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
193+container_name = log_data
194+source_filename_format = %Y%m%d%H*
195+class_path = swift.stats.access_processor.AccessLogProcessor
196+# service ips is for client ip addresses that should be counted as servicenet
197+# service_ips =
198+# server_name = proxy
199+
200+[log-processor-stats]
201+# log_dir = /var/log/swift/
202+swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
203+container_name = account_stats
204+source_filename_format = %Y%m%d%H*
205+class_path = swift.stats.stats_processor.StatsLogProcessor
206+# account_server_conf = /etc/swift/account-server.conf
207
208=== added file 'swift/common/compressed_file_reader.py'
209--- swift/common/compressed_file_reader.py 1970-01-01 00:00:00 +0000
210+++ swift/common/compressed_file_reader.py 2010-08-14 18:41:02 +0000
211@@ -0,0 +1,72 @@
212+# Copyright (c) 2010 OpenStack, LLC.
213+#
214+# Licensed under the Apache License, Version 2.0 (the "License");
215+# you may not use this file except in compliance with the License.
216+# You may obtain a copy of the License at
217+#
218+# http://www.apache.org/licenses/LICENSE-2.0
219+#
220+# Unless required by applicable law or agreed to in writing, software
221+# distributed under the License is distributed on an "AS IS" BASIS,
222+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
223+# implied.
224+# See the License for the specific language governing permissions and
225+# limitations under the License.
226+
227+import zlib
228+import struct
229+
230+
231+class CompressedFileReader(object):
232+ '''
233+ Wraps a file object and provides a read method that returns gzip'd data.
234+
235+ One warning: if read is called with a small value, the data returned may
236+ be bigger than the value. In this case, the "compressed" data will be
237+ bigger than the original data. To solve this, use a bigger read buffer.
238+
239+ An example use case:
240+ Given an uncompressed file on disk, provide a way to read compressed data
241+ without buffering the entire file data in memory. Using this class, an
242+ uncompressed log file could be uploaded as compressed data with chunked
243+ transfer encoding.
244+
245+ gzip header and footer code taken from the python stdlib gzip module
246+
247+ :param file_obj: File object to read from
248+ :param compresslevel: compression level
249+ '''
250+ def __init__(self, file_obj, compresslevel=9):
251+ self._f = file_obj
252+ self._compressor = zlib.compressobj(compresslevel,
253+ zlib.DEFLATED,
254+ -zlib.MAX_WBITS,
255+ zlib.DEF_MEM_LEVEL,
256+ 0)
257+ self.done = False
258+ self.first = True
259+ self.crc32 = 0
260+ self.total_size = 0
261+
262+ def read(self, *a, **kw):
263+ if self.done:
264+ return ''
265+ x = self._f.read(*a, **kw)
266+ if x:
267+ self.crc32 = zlib.crc32(x, self.crc32) & 0xffffffffL
268+ self.total_size += len(x)
269+ compressed = self._compressor.compress(x)
270+ if not compressed:
271+ compressed = self._compressor.flush(zlib.Z_SYNC_FLUSH)
272+ else:
273+ compressed = self._compressor.flush(zlib.Z_FINISH)
274+ crc32 = struct.pack("<L", self.crc32 & 0xffffffffL)
275+ size = struct.pack("<L", self.total_size & 0xffffffffL)
276+ footer = crc32 + size
277+ compressed += footer
278+ self.done = True
279+ if self.first:
280+ self.first = False
281+ header = '\037\213\010\000\000\000\000\000\002\377'
282+ compressed = header + compressed
283+ return compressed
284
285=== added file 'swift/common/internal_proxy.py'
286--- swift/common/internal_proxy.py 1970-01-01 00:00:00 +0000
287+++ swift/common/internal_proxy.py 2010-08-14 18:41:02 +0000
288@@ -0,0 +1,174 @@
289+# Copyright (c) 2010 OpenStack, LLC.
290+#
291+# Licensed under the Apache License, Version 2.0 (the "License");
292+# you may not use this file except in compliance with the License.
293+# You may obtain a copy of the License at
294+#
295+# http://www.apache.org/licenses/LICENSE-2.0
296+#
297+# Unless required by applicable law or agreed to in writing, software
298+# distributed under the License is distributed on an "AS IS" BASIS,
299+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
300+# implied.
301+# See the License for the specific language governing permissions and
302+# limitations under the License.
303+
304+import webob
305+from urllib import quote, unquote
306+from json import loads as json_loads
307+
308+from swift.common.compressed_file_reader import CompressedFileReader
309+from swift.proxy.server import BaseApplication
310+
311+
312+class InternalProxy(object):
313+ """
314+ Set up a private instance of a proxy server that allows normal requests
315+ to be made without having to actually send the request to the proxy.
316+ This also doesn't log the requests to the normal proxy logs.
317+
318+ :param proxy_server_conf: proxy server configuration dictionary
319+ :param logger: logger to log requests to
320+ :param retries: number of times to retry each request
321+ """
322+ def __init__(self, proxy_server_conf=None, logger=None, retries=0):
323+ self.upload_app = BaseApplication(proxy_server_conf, logger)
324+ self.retries = retries
325+
326+ def upload_file(self, source_file, account, container, object_name,
327+ compress=True, content_type='application/x-gzip'):
328+ """
329+ Upload a file to cloud files.
330+
331+ :param source_file: path to or file like object to upload
332+ :param account: account to upload to
333+ :param container: container to upload to
334+ :param object_name: name of object being uploaded
335+ :param compress: if True, compresses object as it is uploaded
336+ :param content_type: content-type of object
337+ :returns: True if successful, False otherwise
338+ """
339+ log_create_pattern = '/v1/%s/%s/%s' % (account, container, object_name)
340+
341+ # create the container
342+ if not self.put_container(account, container):
343+ return False
344+
345+ # upload the file to the account
346+ req = webob.Request.blank(log_create_pattern,
347+ environ={'REQUEST_METHOD': 'PUT'},
348+ headers={'Transfer-Encoding': 'chunked'})
349+ if compress:
350+ if hasattr(source_file, 'read'):
351+ compressed_file = CompressedFileReader(source_file)
352+ else:
353+ compressed_file = CompressedFileReader(open(source_file, 'rb'))
354+ req.body_file = compressed_file
355+ else:
356+ if not hasattr(source_file, 'read'):
357+ source_file = open(source_file, 'rb')
358+ req.body_file = source_file
359+ req.account = account
360+ req.content_type = content_type
361+ req.content_length = None # to make sure we send chunked data
362+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
363+ tries = 1
364+ while (resp.status_int < 200 or resp.status_int > 299) \
365+ and tries <= self.retries:
366+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
367+ tries += 1
368+ if not (200 <= resp.status_int < 300):
369+ return False
370+ return True
371+
372+ def get_object(self, account, container, object_name):
373+ """
374+ Get object.
375+
376+ :param account: account name object is in
377+ :param container: container name object is in
378+ :param object_name: name of object to get
379+ :returns: iterator for object data
380+ """
381+ req = webob.Request.blank('/v1/%s/%s/%s' %
382+ (account, container, object_name),
383+ environ={'REQUEST_METHOD': 'GET'})
384+ req.account = account
385+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
386+ tries = 1
387+ while (resp.status_int < 200 or resp.status_int > 299) \
388+ and tries <= self.retries:
389+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
390+ tries += 1
391+ for x in resp.app_iter:
392+ yield x
393+
394+ def create_container(self, account, container):
395+ """
396+ Create container.
397+
398+ :param account: account name to put the container in
399+ :param container: container name to create
400+ :returns: True if successful, otherwise False
401+ """
402+ req = webob.Request.blank('/v1/%s/%s' % (account, container),
403+ environ={'REQUEST_METHOD': 'PUT'})
404+ req.account = account
405+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
406+ tries = 1
407+ while (resp.status_int < 200 or resp.status_int > 299) \
408+ and tries <= self.retries:
409+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
410+ tries += 1
411+ return 200 <= resp.status_int < 300
412+
413+ def get_container_list(self, account, container, marker=None, limit=None,
414+ prefix=None, delimiter=None, full_listing=True):
415+ """
416+ Get container listing.
417+
418+ :param account: account name for the container
419+ :param container: container name to get the listing of
420+ :param marker: marker query
421+ :param limit: limit to query
422+ :param prefix: prefix query
423+ :param delimeter: delimeter for query
424+ :param full_listing: if True, make enough requests to get all listings
425+ :returns: list of objects
426+ """
427+ if full_listing:
428+ rv = []
429+ listing = self.get_container_list(account, container, marker,
430+ limit, prefix, delimiter, full_listing=False)
431+ while listing:
432+ rv.extend(listing)
433+ if not delimiter:
434+ marker = listing[-1]['name']
435+ else:
436+ marker = listing[-1].get('name', listing[-1].get('subdir'))
437+ listing = self.get_container_list(account, container, marker,
438+ limit, prefix, delimiter, full_listing=False)
439+ return rv
440+ path = '/v1/%s/%s' % (account, container)
441+ qs = 'format=json'
442+ if marker:
443+ qs += '&marker=%s' % quote(marker)
444+ if limit:
445+ qs += '&limit=%d' % limit
446+ if prefix:
447+ qs += '&prefix=%s' % quote(prefix)
448+ if delimiter:
449+ qs += '&delimiter=%s' % quote(delimiter)
450+ path += '?%s' % qs
451+ req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'GET'})
452+ req.account = account
453+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
454+ tries = 1
455+ while (resp.status_int < 200 or resp.status_int > 299) \
456+ and tries <= self.retries:
457+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
458+ tries += 1
459+ if resp.status_int == 204:
460+ return []
461+ if 200 <= resp.status_int < 300:
462+ return json_loads(resp.body)
463
464=== added directory 'swift/stats'
465=== added file 'swift/stats/__init__.py'
466=== added file 'swift/stats/access_processor.py'
467--- swift/stats/access_processor.py 1970-01-01 00:00:00 +0000
468+++ swift/stats/access_processor.py 2010-08-14 18:41:02 +0000
469@@ -0,0 +1,168 @@
470+# Copyright (c) 2010 OpenStack, LLC.
471+#
472+# Licensed under the Apache License, Version 2.0 (the "License");
473+# you may not use this file except in compliance with the License.
474+# You may obtain a copy of the License at
475+#
476+# http://www.apache.org/licenses/LICENSE-2.0
477+#
478+# Unless required by applicable law or agreed to in writing, software
479+# distributed under the License is distributed on an "AS IS" BASIS,
480+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
481+# implied.
482+# See the License for the specific language governing permissions and
483+# limitations under the License.
484+
485+class AccessLogProcessor(object):
486+
487+ def __init__(self, conf):
488+ self.server_name = conf.get('server_name', 'proxy')
489+
490+ def _log_line_parser(self, raw_log):
491+ '''given a raw access log line, return a dict of the good parts'''
492+ d = {}
493+ try:
494+ (_,
495+ server,
496+ client_ip,
497+ lb_ip,
498+ timestamp,
499+ method,
500+ request,
501+ http_version,
502+ code,
503+ referrer,
504+ user_agent,
505+ auth_token,
506+ bytes_in,
507+ bytes_out,
508+ etag,
509+ trans_id,
510+ headers,
511+ processing_time) = (unquote(x) for x in raw_log[16:].split(' '))
512+ if server != self.server_name:
513+ raise ValueError('incorrect server name in log line')
514+ (version,
515+ account,
516+ container_name,
517+ object_name) = split_path(request, 2, 4, True)
518+ if container_name is not None:
519+ container_name = container_name.split('?', 1)[0]
520+ if object_name is not None:
521+ object_name = object_name.split('?', 1)[0]
522+ account = account.split('?', 1)[0]
523+ query = None
524+ if '?' in request:
525+ request, query = request.split('?', 1)
526+ args = query.split('&')
527+ # Count each query argument. This is used later to aggregate
528+ # the number of format, prefix, etc. queries.
529+ for q in args:
530+ if '=' in q:
531+ k, v = q.split('=', 1)
532+ else:
533+ k = q
534+ # Certain keys will get summmed in stats reporting
535+ # (format, path, delimiter, etc.). Save a "1" here
536+ # to indicate that this request is 1 request for
537+ # its respective key.
538+ d[k] = 1
539+ except ValueError:
540+ pass
541+ else:
542+ d['client_ip'] = client_ip
543+ d['lb_ip'] = lb_ip
544+ d['method'] = method
545+ d['request'] = request
546+ if query:
547+ d['query'] = query
548+ d['http_version'] = http_version
549+ d['code'] = code
550+ d['referrer'] = referrer
551+ d['user_agent'] = user_agent
552+ d['auth_token'] = auth_token
553+ d['bytes_in'] = bytes_in
554+ d['bytes_out'] = bytes_out
555+ d['etag'] = etag
556+ d['trans_id'] = trans_id
557+ d['processing_time'] = processing_time
558+ day, month, year, hour, minute, second = timestamp.split('/')
559+ d['day'] = day
560+ month = ('%02s' % month_map.index(month)).replace(' ', '0')
561+ d['month'] = month
562+ d['year'] = year
563+ d['hour'] = hour
564+ d['minute'] = minute
565+ d['second'] = second
566+ d['tz'] = '+0000'
567+ d['account'] = account
568+ d['container_name'] = container_name
569+ d['object_name'] = object_name
570+ d['bytes_out'] = int(d['bytes_out'].replace('-','0'))
571+ d['bytes_in'] = int(d['bytes_in'].replace('-','0'))
572+ d['code'] = int(d['code'])
573+ return d
574+
575+ def process(self, obj_stream):
576+ '''generate hourly groupings of data from one access log file'''
577+ hourly_aggr_info = {}
578+ aggr_account_logs = {}
579+ container_line_counts = collections.defaultdict(int)
580+ log_buffer = collections.defaultdict(list)
581+ for line in obj_stream:
582+ line_data = self._log_line_parser(line)
583+ if not line_data:
584+ continue
585+ account = line_data['account']
586+ container_name = line_data['container_name']
587+ year = line_data['year']
588+ month = line_data['month']
589+ day = line_data['day']
590+ hour = line_data['hour']
591+ bytes_out = line_data['bytes_out']
592+ bytes_in = line_data['bytes_in']
593+ method = line_data['method']
594+ code = int(line_data['code'])
595+ object_name = line_data['object_name']
596+ client_ip = line_data['client_ip']
597+
598+ op_level = None
599+ if not container_name:
600+ op_level = 'account'
601+ elif container_name and not object_name:
602+ op_level = 'container'
603+ elif object_name:
604+ op_level = 'object'
605+
606+ aggr_key = (account, year, month, day, hour)
607+ d = hourly_aggr_info.get(aggr_key, {})
608+ if line_data['lb_ip'] in self.lb_private_ips:
609+ source = 'service'
610+ else:
611+ source = 'public'
612+
613+ if line_data['client_ip'] in self.service_ips:
614+ source = 'service'
615+
616+ d[(source, 'bytes_out')] = d.setdefault((source, 'bytes_out'), 0) + \
617+ bytes_out
618+ d[(source, 'bytes_in')] = d.setdefault((source, 'bytes_in'), 0) + \
619+ bytes_in
620+
621+ d['format_query'] = d.setdefault('format_query', 0) + \
622+ line_data.get('format', 0)
623+ d['marker_query'] = d.setdefault('marker_query', 0) + \
624+ line_data.get('marker', 0)
625+ d['prefix_query'] = d.setdefault('prefix_query', 0) + \
626+ line_data.get('prefix', 0)
627+ d['delimiter_query'] = d.setdefault('delimiter_query', 0) + \
628+ line_data.get('delimiter', 0)
629+ path = line_data.get('path', 0)
630+ d['path_query'] = d.setdefault('path_query', 0) + path
631+
632+ code = '%dxx' % (code/100)
633+ key = (source, op_level, method, code)
634+ d[key] = d.setdefault(key, 0) + 1
635+
636+ hourly_aggr_info[aggr_key] = d
637+ return hourly_aggr_info, item, aggr_account_logs
638
639=== added file 'swift/stats/account_stats.py'
640--- swift/stats/account_stats.py 1970-01-01 00:00:00 +0000
641+++ swift/stats/account_stats.py 2010-08-14 18:41:02 +0000
642@@ -0,0 +1,69 @@
643+# Copyright (c) 2010 OpenStack, LLC.
644+#
645+# Licensed under the Apache License, Version 2.0 (the "License");
646+# you may not use this file except in compliance with the License.
647+# You may obtain a copy of the License at
648+#
649+# http://www.apache.org/licenses/LICENSE-2.0
650+#
651+# Unless required by applicable law or agreed to in writing, software
652+# distributed under the License is distributed on an "AS IS" BASIS,
653+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
654+# implied.
655+# See the License for the specific language governing permissions and
656+# limitations under the License.
657+
658+import os
659+import time
660+
661+from swift.account.server import DATADIR as account_server_data_dir
662+from swift.common.db import AccountBroker
663+from swift.common.internal_proxy import InternalProxy
664+from swift.common.utils import renamer
665+
666+class AccountStat(object):
667+ def __init__(self, filename_format, target_dir, server_conf, logger):
668+ self.filename_format = filename_format
669+ self.target_dir = target_dir
670+ self.devices = server_conf.get('devices', '/srv/node')
671+ self.mount_check = server_conf.get('mount_check', 'true').lower() in \
672+ ('true', 't', '1', 'on', 'yes', 'y')
673+ self.logger = logger
674+
675+ def find_and_process(self):
676+ src_filename = time.strftime(self.filename_format)
677+ tmp_filename = os.path.join('/tmp', src_filename)
678+ with open(tmp_filename, 'wb') as statfile:
679+ #statfile.write('Account Name, Container Count, Object Count, Bytes Used, Created At\n')
680+ for device in os.listdir(self.devices):
681+ if self.mount_check and \
682+ not os.path.ismount(os.path.join(self.devices, device)):
683+ self.logger.error("Device %s is not mounted, skipping." %
684+ device)
685+ continue
686+ accounts = os.path.join(self.devices,
687+ device,
688+ account_server_data_dir)
689+ if not os.path.exists(accounts):
690+ self.logger.debug("Path %s does not exist, skipping." %
691+ accounts)
692+ continue
693+ for root, dirs, files in os.walk(accounts, topdown=False):
694+ for filename in files:
695+ if filename.endswith('.db'):
696+ broker = AccountBroker(os.path.join(root, filename))
697+ if not broker.is_deleted():
698+ account_name,
699+ created_at,
700+ _, _,
701+ container_count,
702+ object_count,
703+ bytes_used,
704+ _, _ = broker.get_info()
705+ line_data = '"%s",%d,%d,%d,%s\n' % (account_name,
706+ container_count,
707+ object_count,
708+ bytes_used,
709+ created_at)
710+ statfile.write(line_data)
711+ renamer(tmp_filename, os.path.join(self.target_dir, src_filename))
712
713=== added file 'swift/stats/log_processor.py'
714--- swift/stats/log_processor.py 1970-01-01 00:00:00 +0000
715+++ swift/stats/log_processor.py 2010-08-14 18:41:02 +0000
716@@ -0,0 +1,226 @@
717+# Copyright (c) 2010 OpenStack, LLC.
718+#
719+# Licensed under the Apache License, Version 2.0 (the "License");
720+# you may not use this file except in compliance with the License.
721+# You may obtain a copy of the License at
722+#
723+# http://www.apache.org/licenses/LICENSE-2.0
724+#
725+# Unless required by applicable law or agreed to in writing, software
726+# distributed under the License is distributed on an "AS IS" BASIS,
727+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
728+# implied.
729+# See the License for the specific language governing permissions and
730+# limitations under the License.
731+
732+class LogProcessor(object):
733+
734+ def __init__(self, conf, logger):
735+ stats_conf = conf.get('log-processor', {})
736+
737+ working_dir = stats_conf.get('working_dir', '/tmp/swift/')
738+ if working_dir.endswith('/') and len(working_dir) > 1:
739+ working_dir = working_dir[:-1]
740+ self.working_dir = working_dir
741+ proxy_server_conf_loc = stats_conf.get('proxy_server_conf',
742+ '/etc/swift/proxy-server.conf')
743+ try:
744+ c = ConfigParser()
745+ c.read(proxy_server_conf_loc)
746+ proxy_server_conf = dict(c.items('proxy-server'))
747+ except:
748+ proxy_server_conf = None
749+ self.proxy_server_conf = proxy_server_conf
750+ if isinstance(logger, tuple):
751+ self.logger = get_logger(*logger)
752+ else:
753+ self.logger = logger
754+
755+ # load the processing plugins
756+ self.plugins = {}
757+ plugin_prefix = 'log-processor-'
758+ for section in (x for x in conf if x.startswith(plugin_prefix)):
759+ plugin_name = section[len(plugin_prefix):]
760+ plugin_conf = conf.get(section, {})
761+ self.plugins[plugin_name] = plugin_conf
762+ import_target, class_name = plugin_conf['class_path'].rsplit('.', 1)
763+ module = __import__(import_target, fromlist=[import_target])
764+ klass = getattr(module, class_name)
765+ self.plugins[plugin_name]['instance'] = klass(plugin_conf)
766+
767+ def process_one_file(self, plugin_name, account, container, object_name):
768+ # get an iter of the object data
769+ compressed = object_name.endswith('.gz')
770+ stream = self.get_object_data(account, container, object_name,
771+ compressed=compressed)
772+ # look up the correct plugin and send the stream to it
773+ return self.plugins[plugin_name]['instance'].process(stream)
774+
775+ def get_data_list(self, start_date=None, end_date=None, listing_filter=None):
776+ total_list = []
777+ for p in self.plugins:
778+ account = p['swift_account']
779+ container = p['container_name']
780+ l = self.get_container_listing(account, container, start_date,
781+ end_date, listing_filter)
782+ for i in l:
783+ total_list.append((p, account, container, i))
784+ return total_list
785+
786+ def get_container_listing(self, swift_account, container_name, start_date=None,
787+ end_date=None, listing_filter=None):
788+ '''
789+ Get a container listing, filtered by start_date, end_date, and
790+ listing_filter. Dates, if given, should be in YYYYMMDDHH format
791+ '''
792+ search_key = None
793+ if start_date is not None:
794+ date_parts = []
795+ try:
796+ year, start_date = start_date[:4], start_date[4:]
797+ if year:
798+ date_parts.append(year)
799+ month, start_date = start_date[:2], start_date[2:]
800+ if month:
801+ date_parts.append(month)
802+ day, start_date = start_date[:2], start_date[2:]
803+ if day:
804+ date_parts.append(day)
805+ hour, start_date = start_date[:2], start_date[2:]
806+ if hour:
807+ date_parts.append(hour)
808+ except IndexError:
809+ pass
810+ else:
811+ search_key = '/'.join(date_parts)
812+ end_key = None
813+ if end_date is not None:
814+ date_parts = []
815+ try:
816+ year, end_date = end_date[:4], end_date[4:]
817+ if year:
818+ date_parts.append(year)
819+ month, end_date = end_date[:2], end_date[2:]
820+ if month:
821+ date_parts.append(month)
822+ day, end_date = end_date[:2], end_date[2:]
823+ if day:
824+ date_parts.append(day)
825+ hour, end_date = end_date[:2], end_date[2:]
826+ if hour:
827+ date_parts.append(hour)
828+ except IndexError:
829+ pass
830+ else:
831+ end_key = '/'.join(date_parts)
832+ container_listing = self.private_proxy.get_container_list(
833+ swift_account,
834+ container_name,
835+ marker=search_key)
836+ results = []
837+ if container_listing is not None:
838+ if listing_filter is None:
839+ listing_filter = set()
840+ for item in container_listing:
841+ name = item['name']
842+ if end_key and name > end_key:
843+ break
844+ if name not in listing_filter:
845+ results.append(name)
846+ return results
847+
848+ def get_object_data(self, swift_account, container_name, object_name,
849+ compressed=False):
850+ '''reads an object and yields its lines'''
851+ o = self.private_proxy.get_object(swift_account,
852+ container_name,
853+ object_name)
854+ tmp_file = tempfile.TemporaryFile(dir=self.working_dir)
855+ with tmp_file as f:
856+ bad_file = False
857+ try:
858+ for chunk in o:
859+ f.write(chunk)
860+ except ChunkReadTimeout:
861+ bad_file = True
862+ if bad_file:
863+ raise BadFileDownload()
864+ f.flush()
865+ f.seek(0) # rewind to start reading
866+ last_part = ''
867+ last_compressed_part = ''
868+ # magic in the following zlib.decompressobj argument is courtesy of
869+ # http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk
870+ d = zlib.decompressobj(16+zlib.MAX_WBITS)
871+ for chunk in iter(lambda: f.read(16384), ''):
872+ if compressed:
873+ try:
874+ chunk = d.decompress(chunk)
875+ except zlib.error:
876+ raise BadFileDownload() # bad compressed data
877+ parts = chunk.split('\n')
878+ parts[0] = last_part + parts[0]
879+ for part in parts[:-1]:
880+ yield part
881+ last_part = parts[-1]
882+ if last_part:
883+ yield last_part
884+
885+def multiprocess_collate(processor_args,
886+ start_date=None,
887+ end_date=None,
888+ listing_filter=None):
889+ '''get listing of files and yield hourly data from them'''
890+ p = LogProcessor(*processor_args)
891+ all_files = p.get_data_list(start_date, end_date, listing_filter)
892+
893+ p.logger.info('loaded %d files to process' % len(all_files))
894+
895+ if not all_files:
896+ # no work to do
897+ return
898+
899+ worker_count = multiprocessing.cpu_count() - 1
900+ results = []
901+ in_queue = multiprocessing.Queue()
902+ out_queue = multiprocessing.Queue()
903+ for _ in range(worker_count):
904+ p = multiprocessing.Process(target=collate_worker,
905+ args=(processor_args,
906+ in_queue,
907+ out_queue))
908+ p.start()
909+ results.append(p)
910+ for x in all_files:
911+ in_queue.put(x)
912+ for _ in range(worker_count):
913+ in_queue.put(None)
914+ count = 0
915+ while True:
916+ try:
917+ item, data = out_queue.get_nowait()
918+ count += 1
919+ if data:
920+ yield item, data
921+ if count >= len(all_files):
922+ # this implies that one result will come from every request
923+ break
924+ except Queue.Empty:
925+ time.sleep(.1)
926+ for r in results:
927+ r.join()
928+
929+def collate_worker(processor_args, in_queue, out_queue):
930+ '''worker process for multiprocess_collate'''
931+ p = LogProcessor(*processor_args)
932+ while True:
933+ try:
934+ item = in_queue.get_nowait()
935+ if item is None:
936+ break
937+ except Queue.Empty:
938+ time.sleep(.1)
939+ else:
940+ ret = None
941+ ret = p.process_one_file(item)
942+ out_queue.put((item, ret))
943\ No newline at end of file
944
945=== added file 'swift/stats/log_uploader.py'
946--- swift/stats/log_uploader.py 1970-01-01 00:00:00 +0000
947+++ swift/stats/log_uploader.py 2010-08-14 18:41:02 +0000
948@@ -0,0 +1,135 @@
949+# Copyright (c) 2010 OpenStack, LLC.
950+#
951+# Licensed under the Apache License, Version 2.0 (the "License");
952+# you may not use this file except in compliance with the License.
953+# You may obtain a copy of the License at
954+#
955+# http://www.apache.org/licenses/LICENSE-2.0
956+#
957+# Unless required by applicable law or agreed to in writing, software
958+# distributed under the License is distributed on an "AS IS" BASIS,
959+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
960+# implied.
961+# See the License for the specific language governing permissions and
962+# limitations under the License.
963+
964+from __future__ import with_statement
965+import os
966+import hashlib
967+import time
968+import gzip
969+import glob
970+
971+from swift.common.internal_proxy import InternalProxy
972+
973+class LogUploader(object):
974+ '''
975+ Given a local directory, a swift account, and a container name, LogParser
976+ will upload all files in the local directory to the given account/container.
977+ All but the newest files will be uploaded, and the files' md5 sum will be
978+ computed. The hash is used to prevent duplicate data from being uploaded
979+ multiple times in different files (ex: log lines). Since the hash is
980+ computed, it is also used as the uploaded object's etag to ensure data
981+ integrity.
982+
983+ Note that after the file is successfully uploaded, it will be unlinked.
984+
985+ The given proxy server config is used to instantiate a proxy server for
986+ the object uploads.
987+ '''
988+
989+ def __init__(self, log_dir, swift_account, container_name, filename_format,
990+ proxy_server_conf, logger):
991+ if not log_dir.endswith('/'):
992+ log_dir = log_dir + '/'
993+ self.log_dir = log_dir
994+ self.swift_account = swift_account
995+ self.container_name = container_name
996+ self.filename_format = filename_format
997+ self.internal_proxy = InternalProxy(proxy_server_conf, logger)
998+ self.logger = logger
999+
1000+ def upload_all_logs(self):
1001+ i = [(c,self.filename_format.index(c)) for c in '%Y %m %d %H'.split()]
1002+ i.sort()
1003+ year_offset = month_offset = day_offset = hour_offset = None
1004+ for c, start in i:
1005+ if c == '%Y':
1006+ year_offset = start, start+4
1007+ elif c == '%m':
1008+ month_offset = start, start+2
1009+ elif c == '%d':
1010+ day_offset = start, start+2
1011+ elif c == '%H':
1012+ hour_offset = start, start+2
1013+ if not (year_offset and month_offset and day_offset and hour_offset):
1014+ # don't have all the parts, can't upload anything
1015+ return
1016+ glob_pattern = self.filename_format
1017+ glob_pattern = glob_pattern.replace('%Y', '????')
1018+ glob_pattern = glob_pattern.replace('%m', '??')
1019+ glob_pattern = glob_pattern.replace('%d', '??')
1020+ glob_pattern = glob_pattern.replace('%H', '??')
1021+ filelist = glob.iglob(os.path.join(self.log_dir, glob_pattern))
1022+ current_hour = int(time.strftime('%H'))
1023+ today = int(time.strftime('%Y%m%d'))
1024+ self.internal_proxy.create_container(self.swift_account,
1025+ self.container_name)
1026+ for filename in filelist:
1027+ try:
1028+ # From the filename, we need to derive the year, month, day,
1029+ # and hour for the file. These values are used in the uploaded
1030+ # object's name, so they should be a reasonably accurate
1031+ # representation of the time for which the data in the file was
1032+ # collected. The file's last modified time is not a reliable
1033+ # representation of the data in the file. For example, an old
1034+ # log file (from hour A) may be uploaded or moved into the
1035+ # log_dir in hour Z. The file's modified time will be for hour
1036+ # Z, and therefore the object's name in the system will not
1037+ # represent the data in it.
1038+ # If the filename doesn't match the format, it shouldn't be
1039+ # uploaded.
1040+ year = filename[slice(*year_offset)]
1041+ month = filename[slice(*month_offset)]
1042+ day = filename[slice(*day_offset)]
1043+ hour = filename[slice(*hour_offset)]
1044+ except IndexError:
1045+ # unexpected filename format, move on
1046+ self.logger.error("Unexpected log: %s" % filename)
1047+ continue
1048+ if (time.time() - os.stat(filename).st_mtime) < 7200:
1049+ # don't process very new logs
1050+ self.logger.debug("Skipping log: %s (< 2 hours old)" % filename)
1051+ continue
1052+ self.upload_one_log(filename, year, month, day, hour)
1053+
1054+ def upload_one_log(self, filename, year, month, day, hour):
1055+ if os.path.getsize(filename) == 0:
1056+ self.logger.debug("Log %s is 0 length, skipping" % filename)
1057+ return
1058+ self.logger.debug("Processing log: %s" % filename)
1059+ filehash = hashlib.md5()
1060+ already_compressed = True if filename.endswith('.gz') else False
1061+ opener = gzip.open if already_compressed else open
1062+ f = opener(filename, 'rb')
1063+ try:
1064+ for line in f:
1065+ # filter out bad lines here?
1066+ filehash.update(line)
1067+ finally:
1068+ f.close()
1069+ filehash = filehash.hexdigest()
1070+ # By adding a hash to the filename, we ensure that uploaded files
1071+ # have unique filenames and protect against uploading one file
1072+ # more than one time. By using md5, we get an etag for free.
1073+ target_filename = '/'.join([year, month, day, hour, filehash+'.gz'])
1074+ if self.internal_proxy.upload_file(filename,
1075+ self.swift_account,
1076+ self.container_name,
1077+ target_filename,
1078+ compress=(not already_compressed)):
1079+ self.logger.debug("Uploaded log %s to %s" %
1080+ (filename, target_filename))
1081+ os.unlink(filename)
1082+ else:
1083+ self.logger.error("ERROR: Upload of log %s failed!" % filename)
1084
1085=== added file 'swift/stats/stats_processor.py'
1086--- swift/stats/stats_processor.py 1970-01-01 00:00:00 +0000
1087+++ swift/stats/stats_processor.py 2010-08-14 18:41:02 +0000
1088@@ -0,0 +1,54 @@
1089+# Copyright (c) 2010 OpenStack, LLC.
1090+#
1091+# Licensed under the Apache License, Version 2.0 (the "License");
1092+# you may not use this file except in compliance with the License.
1093+# You may obtain a copy of the License at
1094+#
1095+# http://www.apache.org/licenses/LICENSE-2.0
1096+#
1097+# Unless required by applicable law or agreed to in writing, software
1098+# distributed under the License is distributed on an "AS IS" BASIS,
1099+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
1100+# implied.
1101+# See the License for the specific language governing permissions and
1102+# limitations under the License.
1103+
1104+class StatsLogProcessor(object):
1105+
1106+ def __init__(self, conf):
1107+ pass
1108+
1109+ def process(self, obj_stream):
1110+ '''generate hourly groupings of data from one stats log file'''
1111+ account_totals = {}
1112+ year, month, day, hour, _ = item.split('/')
1113+ for line in obj_stream:
1114+ if not line:
1115+ continue
1116+ try:
1117+ (account,
1118+ container_count,
1119+ object_count,
1120+ bytes_used,
1121+ created_at) = line.split(',')
1122+ account = account.strip('"')
1123+ if account_name and account_name != account:
1124+ continue
1125+ container_count = int(container_count.strip('"'))
1126+ object_count = int(object_count.strip('"'))
1127+ bytes_used = int(bytes_used.strip('"'))
1128+ aggr_key = (account, year, month, day, hour)
1129+ d = account_totals.get(aggr_key, {})
1130+ d['count'] = d.setdefault('count', 0) + 1
1131+ d['container_count'] = d.setdefault('container_count', 0) + \
1132+ container_count
1133+ d['object_count'] = d.setdefault('object_count', 0) + \
1134+ object_count
1135+ d['bytes_used'] = d.setdefault('bytes_used', 0) + \
1136+ bytes_used
1137+ d['created_at'] = created_at
1138+ account_totals[aggr_key] = d
1139+ except (IndexError, ValueError):
1140+ # bad line data
1141+ pass
1142+ return account_totals, item