Merge lp:~notmyname/swift/stats_system into lp:~hudson-openstack/swift/trunk

Proposed by John Dickinson
Status: Superseded
Proposed branch: lp:~notmyname/swift/stats_system
Merge into: lp:~hudson-openstack/swift/trunk
Diff against target: 1571 lines (+1464/-11)
14 files modified
bin/swift-account-stats-logger (+27/-0)
bin/swift-log-stats-collector (+27/-0)
bin/swift-log-uploader (+31/-0)
etc/log-processing.conf-sample (+34/-0)
swift/common/compressed_file_reader.py (+72/-0)
swift/common/daemon.py (+5/-2)
swift/common/internal_proxy.py (+182/-0)
swift/common/utils.py (+16/-9)
swift/stats/access_processor.py (+224/-0)
swift/stats/account_stats.py (+86/-0)
swift/stats/log_processor.py (+376/-0)
swift/stats/log_uploader.py (+160/-0)
swift/stats/stats_processor.py (+63/-0)
test/unit/stats/test_log_processor.py (+161/-0)
To merge this branch: bzr merge lp:~notmyname/swift/stats_system
Reviewer Review Type Date Requested Status
Swift Core security contacts Pending
Review via email: mp+35156@code.launchpad.net

This proposal supersedes a proposal from 2010-09-07.

This proposal has been superseded by a proposal from 2010-09-15.

Description of the change

work in progress of moving the existing internal (pre-openstack) swift stats system to (openstack) swift

To post a comment you must log in.
Revision history for this message
Chuck Thier (cthier) wrote : Posted in a previous version of this proposal

Just a couple of quick comments:

1. The bins should be refactored to be more DRY like the current bins
2. pep8 :)

Revision history for this message
John Dickinson (notmyname) wrote : Posted in a previous version of this proposal

updated to be more DRY compatible

lp:~notmyname/swift/stats_system updated
70. By John Dickinson

added execute perms to stats processor binaries

71. By John Dickinson

updated config file loading to work with paste.deploy configs

72. By John Dickinson

fixed typos

73. By John Dickinson

fixed internal proxy loading

74. By John Dickinson

made a memcache stub for the internal proxy server

75. By John Dickinson

fixed internal proxy put_container reference

76. By John Dickinson

fixed some log uploading glob patterns

77. By John Dickinson

fixed bug in calculating offsets for filename patterns

78. By John Dickinson

fixed typos in log processor

79. By John Dickinson

fixed get_data_list in log_processor

80. By John Dickinson

added some debug output

81. By John Dickinson

fixed logging and log uploading

82. By John Dickinson

fixed copy/paste errors and missing imports

83. By John Dickinson

added error handling and missing return statement

84. By John Dickinson

handled some typos and better handling of missing data in internal proxy

85. By John Dickinson

fixed tests, typos, and added error handling

86. By John Dickinson

fixed bug in account stats log processing

87. By John Dickinson

fixed listing filter in log processing

88. By John Dickinson

fixed stdout capturing for generating csv files

89. By John Dickinson

fixed replica count reporting error

90. By John Dickinson

fixed lookback in log processor

91. By John Dickinson

merged with trunk

92. By John Dickinson

merged with trunk

93. By John Dickinson

fixed tests to account for changed key name

94. By John Dickinson

fixed test bug

95. By John Dickinson

updated with changes and suggestions from code review

96. By John Dickinson

merged with changes from trunk

97. By John Dickinson

added stats overview

98. By John Dickinson

updated with changes from trunk

99. By John Dickinson

added additional docs

100. By John Dickinson

documentation clarification and pep8 fixes

101. By John Dickinson

added overview stats to the doc index

102. By John Dickinson

made long lines wrap (grr pep8)

103. By John Dickinson

merged with trunk

104. By John Dickinson

merged with trunk

105. By John Dickinson

merged with trunk

106. By John Dickinson

fixed stats docs

107. By John Dickinson

added a bad lines check to the access log parser

108. By John Dickinson

added tests for compressing file reader

109. By John Dickinson

fixed compressing file reader test

110. By John Dickinson

fixed compressing file reader test

111. By John Dickinson

improved compressing file reader test

112. By John Dickinson

fixed compressing file reader test

113. By John Dickinson

made try/except much less inclusive in access log processor

114. By John Dickinson

added keylist mapping tests and fixed other tests

115. By John Dickinson

updated stats system tests

116. By John Dickinson

merged with gholt's test stubs

117. By John Dickinson

updated SAIO instructions for the stats system

118. By John Dickinson

fixed stats system saio docs

119. By John Dickinson

fixed stats system saio docs

120. By John Dickinson

added openstack copyright/license to test_log_processor.py

121. By John Dickinson

improved logging in log processors

122. By John Dickinson

merged with trunk

123. By John Dickinson

fixed missing working directory bug in account stats

124. By John Dickinson

fixed readconf parameter that was broken with a previous merge

125. By John Dickinson

updated setup.py and saio docs for syats system

126. By John Dickinson

added readconf unit test

127. By John Dickinson

updated stats saio docs to create logs with the appropriate permissions

128. By John Dickinson

fixed bug in log processor internal proxy lazy load code

129. By John Dickinson

updated readconf test

130. By John Dickinson

updated readconf test

131. By John Dickinson

updated readconf test

132. By John Dickinson

fixed internal proxy references in log processor

133. By John Dickinson

fixed account stats filename creation

134. By John Dickinson

pep8 tomfoolery

135. By John Dickinson

moved paren

136. By John Dickinson

added lazy load of internal proxy to log processor (you were right clay)

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'bin/swift-account-stats-logger'
2--- bin/swift-account-stats-logger 1970-01-01 00:00:00 +0000
3+++ bin/swift-account-stats-logger 2010-09-15 20:11:08 +0000
4@@ -0,0 +1,27 @@
5+#!/usr/bin/python
6+# Copyright (c) 2010 OpenStack, LLC.
7+#
8+# Licensed under the Apache License, Version 2.0 (the "License");
9+# you may not use this file except in compliance with the License.
10+# You may obtain a copy of the License at
11+#
12+# http://www.apache.org/licenses/LICENSE-2.0
13+#
14+# Unless required by applicable law or agreed to in writing, software
15+# distributed under the License is distributed on an "AS IS" BASIS,
16+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
17+# implied.
18+# See the License for the specific language governing permissions and
19+# limitations under the License.
20+
21+import sys
22+
23+from swift.stats.account_stats import AccountStat
24+from swift.common import utils
25+
26+if __name__ == '__main__':
27+ if len(sys.argv) < 2:
28+ print "Usage: swift-account-stats-logger CONFIG_FILE"
29+ sys.exit()
30+ stats_conf = utils.readconf(sys.argv[1], 'log-processor-stats')
31+ stats = AccountStat(stats_conf).run(once=True)
32
33=== added file 'bin/swift-log-stats-collector'
34--- bin/swift-log-stats-collector 1970-01-01 00:00:00 +0000
35+++ bin/swift-log-stats-collector 2010-09-15 20:11:08 +0000
36@@ -0,0 +1,27 @@
37+#!/usr/bin/python
38+# Copyright (c) 2010 OpenStack, LLC.
39+#
40+# Licensed under the Apache License, Version 2.0 (the "License");
41+# you may not use this file except in compliance with the License.
42+# You may obtain a copy of the License at
43+#
44+# http://www.apache.org/licenses/LICENSE-2.0
45+#
46+# Unless required by applicable law or agreed to in writing, software
47+# distributed under the License is distributed on an "AS IS" BASIS,
48+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
49+# implied.
50+# See the License for the specific language governing permissions and
51+# limitations under the License.
52+
53+import sys
54+
55+from swift.stats.log_processor import LogProcessorDaemon
56+from swift.common import utils
57+
58+if __name__ == '__main__':
59+ if len(sys.argv) < 2:
60+ print "Usage: swift-log-stats-collector CONFIG_FILE"
61+ sys.exit()
62+ conf = utils.readconf(sys.argv[1], log_name='log-stats-collector')
63+ stats = LogProcessorDaemon(conf).run(once=True, capture_stdout=False)
64
65=== added file 'bin/swift-log-uploader'
66--- bin/swift-log-uploader 1970-01-01 00:00:00 +0000
67+++ bin/swift-log-uploader 2010-09-15 20:11:08 +0000
68@@ -0,0 +1,31 @@
69+#!/usr/bin/python
70+# Copyright (c) 2010 OpenStack, LLC.
71+#
72+# Licensed under the Apache License, Version 2.0 (the "License");
73+# you may not use this file except in compliance with the License.
74+# You may obtain a copy of the License at
75+#
76+# http://www.apache.org/licenses/LICENSE-2.0
77+#
78+# Unless required by applicable law or agreed to in writing, software
79+# distributed under the License is distributed on an "AS IS" BASIS,
80+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
81+# implied.
82+# See the License for the specific language governing permissions and
83+# limitations under the License.
84+
85+import sys
86+
87+from swift.stats.log_uploader import LogUploader
88+from swift.common import utils
89+
90+if __name__ == '__main__':
91+ if len(sys.argv) < 3:
92+ print "Usage: swift-log-uploader CONFIG_FILE plugin"
93+ sys.exit()
94+ uploader_conf = utils.readconf(sys.argv[1], 'log-processor')
95+ plugin = sys.argv[2]
96+ section_name = 'log-processor-%s' % plugin
97+ plugin_conf = utils.readconf(sys.argv[1], section_name)
98+ uploader_conf.update(plugin_conf)
99+ uploader = LogUploader(uploader_conf, plugin).run(once=True)
100
101=== added file 'etc/log-processing.conf-sample'
102--- etc/log-processing.conf-sample 1970-01-01 00:00:00 +0000
103+++ etc/log-processing.conf-sample 2010-09-15 20:11:08 +0000
104@@ -0,0 +1,34 @@
105+# plugin section format is named "log-processor-<plugin>"
106+
107+[log-processor]
108+swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
109+# container_name = log_processing_data
110+# proxy_server_conf = /etc/swift/proxy-server.conf
111+# log_facility = LOG_LOCAL0
112+# log_level = INFO
113+# lookback_hours = 120
114+# lookback_window = 120
115+# user = swift
116+
117+[log-processor-access]
118+# log_dir = /var/log/swift/
119+swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
120+container_name = log_data
121+source_filename_format = access-%Y%m%d%H
122+class_path = swift.stats.access_processor.AccessLogProcessor
123+# service ips is for client ip addresses that should be counted as servicenet
124+# service_ips =
125+# load balancer private ips is for load balancer ip addresses that should be
126+# counted as servicenet
127+# lb_private_ips =
128+# server_name = proxy
129+# user = swift
130+
131+[log-processor-stats]
132+# log_dir = /var/log/swift/
133+swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
134+container_name = account_stats
135+source_filename_format = stats-%Y%m%d%H
136+class_path = swift.stats.stats_processor.StatsLogProcessor
137+# account_server_conf = /etc/swift/account-server.conf
138+# user = swift
139\ No newline at end of file
140
141=== added file 'swift/common/compressed_file_reader.py'
142--- swift/common/compressed_file_reader.py 1970-01-01 00:00:00 +0000
143+++ swift/common/compressed_file_reader.py 2010-09-15 20:11:08 +0000
144@@ -0,0 +1,72 @@
145+# Copyright (c) 2010 OpenStack, LLC.
146+#
147+# Licensed under the Apache License, Version 2.0 (the "License");
148+# you may not use this file except in compliance with the License.
149+# You may obtain a copy of the License at
150+#
151+# http://www.apache.org/licenses/LICENSE-2.0
152+#
153+# Unless required by applicable law or agreed to in writing, software
154+# distributed under the License is distributed on an "AS IS" BASIS,
155+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
156+# implied.
157+# See the License for the specific language governing permissions and
158+# limitations under the License.
159+
160+import zlib
161+import struct
162+
163+
164+class CompressedFileReader(object):
165+ '''
166+ Wraps a file object and provides a read method that returns gzip'd data.
167+
168+ One warning: if read is called with a small value, the data returned may
169+ be bigger than the value. In this case, the "compressed" data will be
170+ bigger than the original data. To solve this, use a bigger read buffer.
171+
172+ An example use case:
173+ Given an uncompressed file on disk, provide a way to read compressed data
174+ without buffering the entire file data in memory. Using this class, an
175+ uncompressed log file could be uploaded as compressed data with chunked
176+ transfer encoding.
177+
178+ gzip header and footer code taken from the python stdlib gzip module
179+
180+ :param file_obj: File object to read from
181+ :param compresslevel: compression level
182+ '''
183+ def __init__(self, file_obj, compresslevel=9):
184+ self._f = file_obj
185+ self._compressor = zlib.compressobj(compresslevel,
186+ zlib.DEFLATED,
187+ -zlib.MAX_WBITS,
188+ zlib.DEF_MEM_LEVEL,
189+ 0)
190+ self.done = False
191+ self.first = True
192+ self.crc32 = 0
193+ self.total_size = 0
194+
195+ def read(self, *a, **kw):
196+ if self.done:
197+ return ''
198+ x = self._f.read(*a, **kw)
199+ if x:
200+ self.crc32 = zlib.crc32(x, self.crc32) & 0xffffffffL
201+ self.total_size += len(x)
202+ compressed = self._compressor.compress(x)
203+ if not compressed:
204+ compressed = self._compressor.flush(zlib.Z_SYNC_FLUSH)
205+ else:
206+ compressed = self._compressor.flush(zlib.Z_FINISH)
207+ crc32 = struct.pack("<L", self.crc32 & 0xffffffffL)
208+ size = struct.pack("<L", self.total_size & 0xffffffffL)
209+ footer = crc32 + size
210+ compressed += footer
211+ self.done = True
212+ if self.first:
213+ self.first = False
214+ header = '\037\213\010\000\000\000\000\000\002\377'
215+ compressed = header + compressed
216+ return compressed
217
218=== modified file 'swift/common/daemon.py'
219--- swift/common/daemon.py 2010-08-31 23:12:59 +0000
220+++ swift/common/daemon.py 2010-09-15 20:11:08 +0000
221@@ -33,12 +33,15 @@
222 """Override this to run forever"""
223 raise NotImplementedError('run_forever not implemented')
224
225- def run(self, once=False):
226+ def run(self, once=False, capture_stdout=True, capture_stderr=True):
227 """Run the daemon"""
228 # log uncaught exceptions
229 sys.excepthook = lambda *exc_info: \
230 self.logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
231- sys.stdout = sys.stderr = utils.LoggerFileObject(self.logger)
232+ if capture_stdout:
233+ sys.stdout = utils.LoggerFileObject(self.logger)
234+ if capture_stderr:
235+ sys.stderr = utils.LoggerFileObject(self.logger)
236
237 utils.drop_privileges(self.conf.get('user', 'swift'))
238
239
240=== added file 'swift/common/internal_proxy.py'
241--- swift/common/internal_proxy.py 1970-01-01 00:00:00 +0000
242+++ swift/common/internal_proxy.py 2010-09-15 20:11:08 +0000
243@@ -0,0 +1,182 @@
244+# Copyright (c) 2010 OpenStack, LLC.
245+#
246+# Licensed under the Apache License, Version 2.0 (the "License");
247+# you may not use this file except in compliance with the License.
248+# You may obtain a copy of the License at
249+#
250+# http://www.apache.org/licenses/LICENSE-2.0
251+#
252+# Unless required by applicable law or agreed to in writing, software
253+# distributed under the License is distributed on an "AS IS" BASIS,
254+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
255+# implied.
256+# See the License for the specific language governing permissions and
257+# limitations under the License.
258+
259+import webob
260+from urllib import quote, unquote
261+from json import loads as json_loads
262+
263+from swift.common.compressed_file_reader import CompressedFileReader
264+from swift.proxy.server import BaseApplication
265+
266+class MemcacheStub(object):
267+ def get(self, *a, **kw): return None
268+ def set(self, *a, **kw): return None
269+ def incr(self, *a, **kw): return 0
270+ def delete(self, *a, **kw): return None
271+ def set_multi(self, *a, **kw): return None
272+ def get_multi(self, *a, **kw): return []
273+
274+class InternalProxy(object):
275+ """
276+ Set up a private instance of a proxy server that allows normal requests
277+ to be made without having to actually send the request to the proxy.
278+ This also doesn't log the requests to the normal proxy logs.
279+
280+ :param proxy_server_conf: proxy server configuration dictionary
281+ :param logger: logger to log requests to
282+ :param retries: number of times to retry each request
283+ """
284+ def __init__(self, proxy_server_conf=None, logger=None, retries=0):
285+ self.upload_app = BaseApplication(proxy_server_conf,
286+ memcache=MemcacheStub(),
287+ logger=logger)
288+ self.retries = retries
289+
290+ def upload_file(self, source_file, account, container, object_name,
291+ compress=True, content_type='application/x-gzip'):
292+ """
293+ Upload a file to cloud files.
294+
295+ :param source_file: path to or file like object to upload
296+ :param account: account to upload to
297+ :param container: container to upload to
298+ :param object_name: name of object being uploaded
299+ :param compress: if True, compresses object as it is uploaded
300+ :param content_type: content-type of object
301+ :returns: True if successful, False otherwise
302+ """
303+ log_create_pattern = '/v1/%s/%s/%s' % (account, container, object_name)
304+
305+ # create the container
306+ if not self.create_container(account, container):
307+ return False
308+
309+ # upload the file to the account
310+ req = webob.Request.blank(log_create_pattern,
311+ environ={'REQUEST_METHOD': 'PUT'},
312+ headers={'Transfer-Encoding': 'chunked'})
313+ if compress:
314+ if hasattr(source_file, 'read'):
315+ compressed_file = CompressedFileReader(source_file)
316+ else:
317+ compressed_file = CompressedFileReader(open(source_file, 'rb'))
318+ req.body_file = compressed_file
319+ else:
320+ if not hasattr(source_file, 'read'):
321+ source_file = open(source_file, 'rb')
322+ req.body_file = source_file
323+ req.account = account
324+ req.content_type = content_type
325+ req.content_length = None # to make sure we send chunked data
326+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
327+ tries = 1
328+ while (resp.status_int < 200 or resp.status_int > 299) \
329+ and tries <= self.retries:
330+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
331+ tries += 1
332+ if not (200 <= resp.status_int < 300):
333+ return False
334+ return True
335+
336+ def get_object(self, account, container, object_name):
337+ """
338+ Get object.
339+
340+ :param account: account name object is in
341+ :param container: container name object is in
342+ :param object_name: name of object to get
343+ :returns: iterator for object data
344+ """
345+ req = webob.Request.blank('/v1/%s/%s/%s' %
346+ (account, container, object_name),
347+ environ={'REQUEST_METHOD': 'GET'})
348+ req.account = account
349+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
350+ tries = 1
351+ while (resp.status_int < 200 or resp.status_int > 299) \
352+ and tries <= self.retries:
353+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
354+ tries += 1
355+ return resp.status_int, resp.app_iter
356+
357+ def create_container(self, account, container):
358+ """
359+ Create container.
360+
361+ :param account: account name to put the container in
362+ :param container: container name to create
363+ :returns: True if successful, otherwise False
364+ """
365+ req = webob.Request.blank('/v1/%s/%s' % (account, container),
366+ environ={'REQUEST_METHOD': 'PUT'})
367+ req.account = account
368+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
369+ tries = 1
370+ while (resp.status_int < 200 or resp.status_int > 299) \
371+ and tries <= self.retries:
372+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
373+ tries += 1
374+ return 200 <= resp.status_int < 300
375+
376+ def get_container_list(self, account, container, marker=None, limit=None,
377+ prefix=None, delimiter=None, full_listing=True):
378+ """
379+ Get container listing.
380+
381+ :param account: account name for the container
382+ :param container: container name to get the listing of
383+ :param marker: marker query
384+ :param limit: limit to query
385+ :param prefix: prefix query
386+ :param delimeter: delimeter for query
387+ :param full_listing: if True, make enough requests to get all listings
388+ :returns: list of objects
389+ """
390+ if full_listing:
391+ rv = []
392+ listing = self.get_container_list(account, container, marker,
393+ limit, prefix, delimiter, full_listing=False)
394+ while listing:
395+ rv.extend(listing)
396+ if not delimiter:
397+ marker = listing[-1]['name']
398+ else:
399+ marker = listing[-1].get('name', listing[-1].get('subdir'))
400+ listing = self.get_container_list(account, container, marker,
401+ limit, prefix, delimiter, full_listing=False)
402+ return rv
403+ path = '/v1/%s/%s' % (account, container)
404+ qs = 'format=json'
405+ if marker:
406+ qs += '&marker=%s' % quote(marker)
407+ if limit:
408+ qs += '&limit=%d' % limit
409+ if prefix:
410+ qs += '&prefix=%s' % quote(prefix)
411+ if delimiter:
412+ qs += '&delimiter=%s' % quote(delimiter)
413+ path += '?%s' % qs
414+ req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'GET'})
415+ req.account = account
416+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
417+ tries = 1
418+ while (resp.status_int < 200 or resp.status_int > 299) \
419+ and tries <= self.retries:
420+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
421+ tries += 1
422+ if resp.status_int == 204:
423+ return []
424+ if 200 <= resp.status_int < 300:
425+ return json_loads(resp.body)
426
427=== modified file 'swift/common/utils.py'
428--- swift/common/utils.py 2010-09-02 14:58:30 +0000
429+++ swift/common/utils.py 2010-09-15 20:11:08 +0000
430@@ -530,19 +530,26 @@
431 def cache_from_env(env):
432 return item_from_env(env, 'swift.cache')
433
434-def readconf(conf, section_name, log_name=None):
435+def readconf(conf, section_name=None, log_name=None):
436 c = ConfigParser()
437 if not c.read(conf):
438 print "Unable to read config file %s" % conf
439 sys.exit(1)
440- if c.has_section(section_name):
441- conf = dict(c.items(section_name))
442+ if section_name:
443+ if c.has_section(section_name):
444+ conf = dict(c.items(section_name))
445+ else:
446+ print "Unable to find %s config section in %s" % (section_name, conf)
447+ sys.exit(1)
448+ if "log_name" not in conf:
449+ if log_name is not None:
450+ conf['log_name'] = log_name
451+ else:
452+ conf['log_name'] = section_name
453 else:
454- print "Unable to find %s config section in %s" % (section_name, conf)
455- sys.exit(1)
456- if "log_name" not in conf:
457- if log_name is not None:
458+ conf = {}
459+ for s in c.sections():
460+ conf.update({s:dict(c.items(s))})
461+ if 'log_name' not in conf:
462 conf['log_name'] = log_name
463- else:
464- conf['log_name'] = section_name
465 return conf
466
467=== added directory 'swift/stats'
468=== added file 'swift/stats/__init__.py'
469=== added file 'swift/stats/access_processor.py'
470--- swift/stats/access_processor.py 1970-01-01 00:00:00 +0000
471+++ swift/stats/access_processor.py 2010-09-15 20:11:08 +0000
472@@ -0,0 +1,224 @@
473+# Copyright (c) 2010 OpenStack, LLC.
474+#
475+# Licensed under the Apache License, Version 2.0 (the "License");
476+# you may not use this file except in compliance with the License.
477+# You may obtain a copy of the License at
478+#
479+# http://www.apache.org/licenses/LICENSE-2.0
480+#
481+# Unless required by applicable law or agreed to in writing, software
482+# distributed under the License is distributed on an "AS IS" BASIS,
483+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
484+# implied.
485+# See the License for the specific language governing permissions and
486+# limitations under the License.
487+
488+import collections
489+from urllib import unquote
490+import copy
491+
492+from swift.common.utils import split_path
493+
494+month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split()
495+
496+class AccessLogProcessor(object):
497+
498+ def __init__(self, conf):
499+ self.server_name = conf.get('server_name', 'proxy')
500+ self.lb_private_ips = [x.strip() for x in \
501+ conf.get('lb_private_ips', '').split(',')\
502+ if x.strip()]
503+ self.service_ips = [x.strip() for x in \
504+ conf.get('service_ips', '').split(',')\
505+ if x.strip()]
506+
507+ def log_line_parser(self, raw_log):
508+ '''given a raw access log line, return a dict of the good parts'''
509+ d = {}
510+ try:
511+ (_,
512+ server,
513+ client_ip,
514+ lb_ip,
515+ timestamp,
516+ method,
517+ request,
518+ http_version,
519+ code,
520+ referrer,
521+ user_agent,
522+ auth_token,
523+ bytes_in,
524+ bytes_out,
525+ etag,
526+ trans_id,
527+ headers,
528+ processing_time) = (unquote(x) for x in raw_log[16:].split(' '))
529+ if server != self.server_name:
530+ raise ValueError('incorrect server name in log line')
531+ (version,
532+ account,
533+ container_name,
534+ object_name) = split_path(request, 2, 4, True)
535+ if container_name is not None:
536+ container_name = container_name.split('?', 1)[0]
537+ if object_name is not None:
538+ object_name = object_name.split('?', 1)[0]
539+ account = account.split('?', 1)[0]
540+ query = None
541+ if '?' in request:
542+ request, query = request.split('?', 1)
543+ args = query.split('&')
544+ # Count each query argument. This is used later to aggregate
545+ # the number of format, prefix, etc. queries.
546+ for q in args:
547+ if '=' in q:
548+ k, v = q.split('=', 1)
549+ else:
550+ k = q
551+ # Certain keys will get summmed in stats reporting
552+ # (format, path, delimiter, etc.). Save a "1" here
553+ # to indicate that this request is 1 request for
554+ # its respective key.
555+ d[k] = 1
556+ except ValueError:
557+ pass
558+ else:
559+ d['client_ip'] = client_ip
560+ d['lb_ip'] = lb_ip
561+ d['method'] = method
562+ d['request'] = request
563+ if query:
564+ d['query'] = query
565+ d['http_version'] = http_version
566+ d['code'] = code
567+ d['referrer'] = referrer
568+ d['user_agent'] = user_agent
569+ d['auth_token'] = auth_token
570+ d['bytes_in'] = bytes_in
571+ d['bytes_out'] = bytes_out
572+ d['etag'] = etag
573+ d['trans_id'] = trans_id
574+ d['processing_time'] = processing_time
575+ day, month, year, hour, minute, second = timestamp.split('/')
576+ d['day'] = day
577+ month = ('%02s' % month_map.index(month)).replace(' ', '0')
578+ d['month'] = month
579+ d['year'] = year
580+ d['hour'] = hour
581+ d['minute'] = minute
582+ d['second'] = second
583+ d['tz'] = '+0000'
584+ d['account'] = account
585+ d['container_name'] = container_name
586+ d['object_name'] = object_name
587+ d['bytes_out'] = int(d['bytes_out'].replace('-','0'))
588+ d['bytes_in'] = int(d['bytes_in'].replace('-','0'))
589+ d['code'] = int(d['code'])
590+ return d
591+
592+ def process(self, obj_stream, account, container, object_name):
593+ '''generate hourly groupings of data from one access log file'''
594+ hourly_aggr_info = {}
595+ for line in obj_stream:
596+ line_data = self.log_line_parser(line)
597+ if not line_data:
598+ continue
599+ account = line_data['account']
600+ container_name = line_data['container_name']
601+ year = line_data['year']
602+ month = line_data['month']
603+ day = line_data['day']
604+ hour = line_data['hour']
605+ bytes_out = line_data['bytes_out']
606+ bytes_in = line_data['bytes_in']
607+ method = line_data['method']
608+ code = int(line_data['code'])
609+ object_name = line_data['object_name']
610+ client_ip = line_data['client_ip']
611+
612+ op_level = None
613+ if not container_name:
614+ op_level = 'account'
615+ elif container_name and not object_name:
616+ op_level = 'container'
617+ elif object_name:
618+ op_level = 'object'
619+
620+ aggr_key = (account, year, month, day, hour)
621+ d = hourly_aggr_info.get(aggr_key, {})
622+ if line_data['lb_ip'] in self.lb_private_ips:
623+ source = 'service'
624+ else:
625+ source = 'public'
626+
627+ if line_data['client_ip'] in self.service_ips:
628+ source = 'service'
629+
630+ d[(source, 'bytes_out')] = d.setdefault((source, 'bytes_out'), 0) + \
631+ bytes_out
632+ d[(source, 'bytes_in')] = d.setdefault((source, 'bytes_in'), 0) + \
633+ bytes_in
634+
635+ d['format_query'] = d.setdefault('format_query', 0) + \
636+ line_data.get('format', 0)
637+ d['marker_query'] = d.setdefault('marker_query', 0) + \
638+ line_data.get('marker', 0)
639+ d['prefix_query'] = d.setdefault('prefix_query', 0) + \
640+ line_data.get('prefix', 0)
641+ d['delimiter_query'] = d.setdefault('delimiter_query', 0) + \
642+ line_data.get('delimiter', 0)
643+ path = line_data.get('path', 0)
644+ d['path_query'] = d.setdefault('path_query', 0) + path
645+
646+ code = '%dxx' % (code/100)
647+ key = (source, op_level, method, code)
648+ d[key] = d.setdefault(key, 0) + 1
649+
650+ hourly_aggr_info[aggr_key] = d
651+ return hourly_aggr_info
652+
653+ def keylist_mapping(self):
654+ source_keys = 'service public'.split()
655+ level_keys = 'account container object'.split()
656+ verb_keys = 'GET PUT POST DELETE HEAD COPY'.split()
657+ code_keys = '2xx 4xx 5xx'.split()
658+
659+ keylist_mapping = {
660+ # <db key> : <row key> or <set of row keys>
661+ 'service_bw_in': ('service', 'bytes_in'),
662+ 'service_bw_out': ('service', 'bytes_out'),
663+ 'public_bw_in': ('public', 'bytes_in'),
664+ 'public_bw_out': ('public', 'bytes_out'),
665+ 'account_requests': set(),
666+ 'container_requests': set(),
667+ 'object_requests': set(),
668+ 'service_request': set(),
669+ 'public_request': set(),
670+ 'ops_count': set(),
671+ }
672+ for verb in verb_keys:
673+ keylist_mapping[verb] = set()
674+ for code in code_keys:
675+ keylist_mapping[code] = set()
676+ for source in source_keys:
677+ for level in level_keys:
678+ for verb in verb_keys:
679+ for code in code_keys:
680+ keylist_mapping['account_requests'].add(
681+ (source, 'account', verb, code))
682+ keylist_mapping['container_requests'].add(
683+ (source, 'container', verb, code))
684+ keylist_mapping['object_requests'].add(
685+ (source, 'object', verb, code))
686+ keylist_mapping['service_request'].add(
687+ ('service', level, verb, code))
688+ keylist_mapping['public_request'].add(
689+ ('public', level, verb, code))
690+ keylist_mapping[verb].add(
691+ (source, level, verb, code))
692+ keylist_mapping[code].add(
693+ (source, level, verb, code))
694+ keylist_mapping['ops_count'].add(
695+ (source,level,verb,code))
696+ return keylist_mapping
697
698=== added file 'swift/stats/account_stats.py'
699--- swift/stats/account_stats.py 1970-01-01 00:00:00 +0000
700+++ swift/stats/account_stats.py 2010-09-15 20:11:08 +0000
701@@ -0,0 +1,86 @@
702+# Copyright (c) 2010 OpenStack, LLC.
703+#
704+# Licensed under the Apache License, Version 2.0 (the "License");
705+# you may not use this file except in compliance with the License.
706+# You may obtain a copy of the License at
707+#
708+# http://www.apache.org/licenses/LICENSE-2.0
709+#
710+# Unless required by applicable law or agreed to in writing, software
711+# distributed under the License is distributed on an "AS IS" BASIS,
712+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
713+# implied.
714+# See the License for the specific language governing permissions and
715+# limitations under the License.
716+
717+import os
718+import time
719+from paste.deploy import appconfig
720+
721+from swift.account.server import DATADIR as account_server_data_dir
722+from swift.common.db import AccountBroker
723+from swift.common.internal_proxy import InternalProxy
724+from swift.common.utils import renamer, get_logger, readconf
725+from swift.common.daemon import Daemon
726+
727+class AccountStat(Daemon):
728+ def __init__(self, stats_conf):
729+ super(AccountStat, self).__init__(stats_conf)
730+ target_dir = stats_conf.get('log_dir', '/var/log/swift')
731+ #TODO: figure out the server configs. also figure out internal_proxy
732+ account_server_conf_loc = stats_conf.get('account_server_conf',
733+ '/etc/swift/account-server.conf')
734+ server_conf = appconfig('config:%s' % account_server_conf_loc,
735+ name='account-server')
736+ filename_format = stats_conf['source_filename_format']
737+ self.filename_format = filename_format
738+ self.target_dir = target_dir
739+ self.devices = server_conf.get('devices', '/srv/node')
740+ self.mount_check = server_conf.get('mount_check', 'true').lower() in \
741+ ('true', 't', '1', 'on', 'yes', 'y')
742+ self.logger = get_logger(stats_conf, 'swift-account-stats-logger')
743+
744+ def run_once(self):
745+ self.logger.info("Gathering account stats")
746+ start = time.time()
747+ self.find_and_process()
748+ self.logger.info("Gathering account stats complete (%0.2f minutes)" %
749+ ((time.time()-start)/60))
750+
751+ def find_and_process(self):
752+ #TODO: handle a counter in the filename to prevent overwrites?
753+ src_filename = time.strftime(self.filename_format)
754+ #TODO: don't use /tmp?
755+ tmp_filename = os.path.join('/tmp', src_filename)
756+ with open(tmp_filename, 'wb') as statfile:
757+ #statfile.write('Account Name, Container Count, Object Count, Bytes Used\n')
758+ for device in os.listdir(self.devices):
759+ if self.mount_check and \
760+ not os.path.ismount(os.path.join(self.devices, device)):
761+ self.logger.error("Device %s is not mounted, skipping." %
762+ device)
763+ continue
764+ accounts = os.path.join(self.devices,
765+ device,
766+ account_server_data_dir)
767+ if not os.path.exists(accounts):
768+ self.logger.debug("Path %s does not exist, skipping." %
769+ accounts)
770+ continue
771+ for root, dirs, files in os.walk(accounts, topdown=False):
772+ for filename in files:
773+ if filename.endswith('.db'):
774+ broker = AccountBroker(os.path.join(root, filename))
775+ if not broker.is_deleted():
776+ (account_name,
777+ _, _, _,
778+ container_count,
779+ object_count,
780+ bytes_used,
781+ _, _) = broker.get_info()
782+ line_data = '"%s",%d,%d,%d\n' % (account_name,
783+ container_count,
784+ object_count,
785+ bytes_used)
786+ statfile.write(line_data)
787+ renamer(tmp_filename, os.path.join(self.target_dir, src_filename))
788
789=== added file 'swift/stats/log_processor.py'
790--- swift/stats/log_processor.py 1970-01-01 00:00:00 +0000
791+++ swift/stats/log_processor.py 2010-09-15 20:11:08 +0000
792@@ -0,0 +1,376 @@
793+# Copyright (c) 2010 OpenStack, LLC.
794+#
795+# Licensed under the Apache License, Version 2.0 (the "License");
796+# you may not use this file except in compliance with the License.
797+# You may obtain a copy of the License at
798+#
799+# http://www.apache.org/licenses/LICENSE-2.0
800+#
801+# Unless required by applicable law or agreed to in writing, software
802+# distributed under the License is distributed on an "AS IS" BASIS,
803+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
804+# implied.
805+# See the License for the specific language governing permissions and
806+# limitations under the License.
807+
808+from ConfigParser import ConfigParser
809+import zlib
810+import time
811+import datetime
812+import cStringIO
813+import collections
814+from paste.deploy import appconfig
815+import multiprocessing
816+import Queue
817+import cPickle
818+
819+from swift.common.internal_proxy import InternalProxy
820+from swift.common.exceptions import ChunkReadTimeout
821+from swift.common.utils import get_logger, readconf
822+from swift.common.daemon import Daemon
823+
824+class BadFileDownload(Exception):
825+ pass
826+
827+class LogProcessor(object):
828+
829+ def __init__(self, conf, logger):
830+ stats_conf = conf.get('log-processor', {})
831+
832+ proxy_server_conf_loc = stats_conf.get('proxy_server_conf',
833+ '/etc/swift/proxy-server.conf')
834+ self.proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc,
835+ name='proxy-server')
836+ if isinstance(logger, tuple):
837+ self.logger = get_logger(*logger)
838+ else:
839+ self.logger = logger
840+ self.internal_proxy = InternalProxy(self.proxy_server_conf,
841+ self.logger,
842+ retries=3)
843+
844+ # load the processing plugins
845+ self.plugins = {}
846+ plugin_prefix = 'log-processor-'
847+ for section in (x for x in conf if x.startswith(plugin_prefix)):
848+ plugin_name = section[len(plugin_prefix):]
849+ plugin_conf = conf.get(section, {})
850+ self.plugins[plugin_name] = plugin_conf
851+ import_target, class_name = plugin_conf['class_path'].rsplit('.', 1)
852+ module = __import__(import_target, fromlist=[import_target])
853+ klass = getattr(module, class_name)
854+ self.plugins[plugin_name]['instance'] = klass(plugin_conf)
855+
856+ def process_one_file(self, plugin_name, account, container, object_name):
857+ # get an iter of the object data
858+ compressed = object_name.endswith('.gz')
859+ stream = self.get_object_data(account, container, object_name,
860+ compressed=compressed)
861+ # look up the correct plugin and send the stream to it
862+ return self.plugins[plugin_name]['instance'].process(stream,
863+ account,
864+ container,
865+ object_name)
866+
867+ def get_data_list(self, start_date=None, end_date=None, listing_filter=None):
868+ total_list = []
869+ for name, data in self.plugins.items():
870+ account = data['swift_account']
871+ container = data['container_name']
872+ l = self.get_container_listing(account,
873+ container,
874+ start_date,
875+ end_date)
876+ for i in l:
877+ # The items in this list end up being passed as positional
878+ # parameters to process_one_file.
879+ x = (name, account, container, i)
880+ if x not in listing_filter:
881+ total_list.append(x)
882+ return total_list
883+
884+ def get_container_listing(self, swift_account, container_name, start_date=None,
885+ end_date=None, listing_filter=None):
886+ '''
887+ Get a container listing, filtered by start_date, end_date, and
888+ listing_filter. Dates, if given, should be in YYYYMMDDHH format
889+ '''
890+ search_key = None
891+ if start_date is not None:
892+ date_parts = []
893+ try:
894+ year, start_date = start_date[:4], start_date[4:]
895+ if year:
896+ date_parts.append(year)
897+ month, start_date = start_date[:2], start_date[2:]
898+ if month:
899+ date_parts.append(month)
900+ day, start_date = start_date[:2], start_date[2:]
901+ if day:
902+ date_parts.append(day)
903+ hour, start_date = start_date[:2], start_date[2:]
904+ if hour:
905+ date_parts.append(hour)
906+ except IndexError:
907+ pass
908+ else:
909+ search_key = '/'.join(date_parts)
910+ end_key = None
911+ if end_date is not None:
912+ date_parts = []
913+ try:
914+ year, end_date = end_date[:4], end_date[4:]
915+ if year:
916+ date_parts.append(year)
917+ month, end_date = end_date[:2], end_date[2:]
918+ if month:
919+ date_parts.append(month)
920+ day, end_date = end_date[:2], end_date[2:]
921+ if day:
922+ date_parts.append(day)
923+ hour, end_date = end_date[:2], end_date[2:]
924+ if hour:
925+ date_parts.append(hour)
926+ except IndexError:
927+ pass
928+ else:
929+ end_key = '/'.join(date_parts)
930+ container_listing = self.internal_proxy.get_container_list(
931+ swift_account,
932+ container_name,
933+ marker=search_key)
934+ results = []
935+ if container_listing is not None:
936+ if listing_filter is None:
937+ listing_filter = set()
938+ for item in container_listing:
939+ name = item['name']
940+ if end_key and name > end_key:
941+ break
942+ if name not in listing_filter:
943+ results.append(name)
944+ return results
945+
946+ def get_object_data(self, swift_account, container_name, object_name,
947+ compressed=False):
948+ '''reads an object and yields its lines'''
949+ code, o = self.internal_proxy.get_object(swift_account,
950+ container_name,
951+ object_name)
952+ if code < 200 or code >= 300:
953+ return
954+ last_part = ''
955+ last_compressed_part = ''
956+ # magic in the following zlib.decompressobj argument is courtesy of
957+ # http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk
958+ d = zlib.decompressobj(16+zlib.MAX_WBITS)
959+ try:
960+ for chunk in o:
961+ if compressed:
962+ try:
963+ chunk = d.decompress(chunk)
964+ except zlib.error:
965+ raise BadFileDownload() # bad compressed data
966+ parts = chunk.split('\n')
967+ parts[0] = last_part + parts[0]
968+ for part in parts[:-1]:
969+ yield part
970+ last_part = parts[-1]
971+ if last_part:
972+ yield last_part
973+ except ChunkReadTimeout:
974+ raise BadFileDownload()
975+
976+ def generate_keylist_mapping(self):
977+ keylist = {}
978+ for plugin in self.plugins:
979+ plugin_keylist = self.plugins[plugin]['instance'].keylist_mapping()
980+ if not plugin_keylist:
981+ continue
982+ for k, v in plugin_keylist.items():
983+ o = keylist.get(k)
984+ if o:
985+ if isinstance(o, set):
986+ if isinstance(v, set):
987+ o.update(v)
988+ else:
989+ o.update([v])
990+ else:
991+ o = set(o)
992+ if isinstance(v, set):
993+ o.update(v)
994+ else:
995+ o.update([v])
996+ else:
997+ o = v
998+ keylist[k] = o
999+ return keylist
1000+
1001+
1002+class LogProcessorDaemon(Daemon):
1003+ def __init__(self, conf):
1004+ c = conf.get('log-processor')
1005+ super(LogProcessorDaemon, self).__init__(c)
1006+ self.total_conf = conf
1007+ self.logger = get_logger(c)
1008+ self.log_processor = LogProcessor(conf, self.logger)
1009+ self.lookback_hours = int(c.get('lookback_hours', '120'))
1010+ self.lookback_window = int(c.get('lookback_window',
1011+ str(self.lookback_hours)))
1012+ self.log_processor_account = c['swift_account']
1013+ self.log_processor_container = c.get('container_name',
1014+ 'log_processing_data')
1015+
1016+ def run_once(self):
1017+ self.logger.info("Beginning log processing")
1018+ start = time.time()
1019+ if self.lookback_hours == 0:
1020+ lookback_start = None
1021+ lookback_end = None
1022+ else:
1023+ lookback_start = datetime.datetime.now() - \
1024+ datetime.timedelta(hours=self.lookback_hours)
1025+ lookback_start = lookback_start.strftime('%Y%m%d%H')
1026+ if self.lookback_window == 0:
1027+ lookback_end = None
1028+ else:
1029+ lookback_end = datetime.datetime.now() - \
1030+ datetime.timedelta(hours=self.lookback_hours) + \
1031+ datetime.timedelta(hours=self.lookback_window)
1032+ lookback_end = lookback_end.strftime('%Y%m%d%H')
1033+ self.logger.debug('lookback_start: %s' % lookback_start)
1034+ self.logger.debug('lookback_end: %s' % lookback_end)
1035+ try:
1036+ processed_files_stream = self.log_processor.get_object_data(
1037+ self.log_processor_account,
1038+ self.log_processor_container,
1039+ 'processed_files.pickle.gz',
1040+ compressed=True)
1041+ buf = '\n'.join(x for x in processed_files_stream)
1042+ if buf:
1043+ already_processed_files = cPickle.loads(buf)
1044+ else:
1045+ already_processed_files = set()
1046+ except:
1047+ already_processed_files = set()
1048+ self.logger.debug('found %d processed files' % \
1049+ len(already_processed_files))
1050+ logs_to_process = self.log_processor.get_data_list(lookback_start,
1051+ lookback_end,
1052+ already_processed_files)
1053+ self.logger.info('loaded %d files to process' % len(logs_to_process))
1054+ if not logs_to_process:
1055+ self.logger.info("Log processing done (%0.2f minutes)" %
1056+ ((time.time()-start)/60))
1057+ return
1058+
1059+ # map
1060+ processor_args = (self.total_conf, self.logger)
1061+ results = multiprocess_collate(processor_args, logs_to_process)
1062+
1063+ #reduce
1064+ aggr_data = {}
1065+ processed_files = already_processed_files
1066+ for item, data in results:
1067+ # since item contains the plugin and the log name, new plugins will
1068+ # "reprocess" the file and the results will be in the final csv.
1069+ processed_files.add(item)
1070+ for k, d in data.items():
1071+ existing_data = aggr_data.get(k, {})
1072+ for i, j in d.items():
1073+ current = existing_data.get(i, 0)
1074+ # merging strategy for key collisions is addition
1075+ # processing plugins need to realize this
1076+ existing_data[i] = current + j
1077+ aggr_data[k] = existing_data
1078+
1079+ # group
1080+ # reduce a large number of keys in aggr_data[k] to a small number of
1081+ # output keys
1082+ keylist_mapping = self.log_processor.generate_keylist_mapping()
1083+ final_info = collections.defaultdict(dict)
1084+ for account, data in aggr_data.items():
1085+ for key, mapping in keylist_mapping.items():
1086+ if isinstance(mapping, (list, set)):
1087+ value = 0
1088+ for k in mapping:
1089+ try:
1090+ value += data[k]
1091+ except KeyError:
1092+ pass
1093+ else:
1094+ try:
1095+ value = data[mapping]
1096+ except KeyError:
1097+ value = 0
1098+ final_info[account][key] = value
1099+
1100+ # output
1101+ sorted_keylist_mapping = sorted(keylist_mapping)
1102+ columns = 'bill_ts,data_ts,account,' + ','.join(sorted_keylist_mapping)
1103+ print columns
1104+ for (account, year, month, day, hour), d in final_info.items():
1105+ bill_ts = ''
1106+ data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour)
1107+ row = [bill_ts, data_ts]
1108+ row.append('%s' % account)
1109+ for k in sorted_keylist_mapping:
1110+ row.append('%s'%d[k])
1111+ print ','.join(row)
1112+
1113+ # cleanup
1114+ s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
1115+ f = cStringIO.StringIO(s)
1116+ self.log_processor.internal_proxy.upload_file(f,
1117+ self.log_processor_account,
1118+ self.log_processor_container,
1119+ 'processed_files.pickle.gz')
1120+
1121+ self.logger.info("Log processing done (%0.2f minutes)" %
1122+ ((time.time()-start)/60))
1123+
1124+def multiprocess_collate(processor_args, logs_to_process):
1125+ '''yield hourly data from logs_to_process'''
1126+ worker_count = multiprocessing.cpu_count()
1127+ results = []
1128+ in_queue = multiprocessing.Queue()
1129+ out_queue = multiprocessing.Queue()
1130+ for _ in range(worker_count):
1131+ p = multiprocessing.Process(target=collate_worker,
1132+ args=(processor_args,
1133+ in_queue,
1134+ out_queue))
1135+ p.start()
1136+ results.append(p)
1137+ for x in logs_to_process:
1138+ in_queue.put(x)
1139+ for _ in range(worker_count):
1140+ in_queue.put(None)
1141+ count = 0
1142+ while True:
1143+ try:
1144+ item, data = out_queue.get_nowait()
1145+ count += 1
1146+ if data:
1147+ yield item, data
1148+ if count >= len(logs_to_process):
1149+ # this implies that one result will come from every request
1150+ break
1151+ except Queue.Empty:
1152+ time.sleep(.1)
1153+ for r in results:
1154+ r.join()
1155+
1156+def collate_worker(processor_args, in_queue, out_queue):
1157+ '''worker process for multiprocess_collate'''
1158+ p = LogProcessor(*processor_args)
1159+ while True:
1160+ try:
1161+ item = in_queue.get_nowait()
1162+ if item is None:
1163+ break
1164+ except Queue.Empty:
1165+ time.sleep(.1)
1166+ else:
1167+ ret = p.process_one_file(*item)
1168+ out_queue.put((item, ret))
1169\ No newline at end of file
1170
1171=== added file 'swift/stats/log_uploader.py'
1172--- swift/stats/log_uploader.py 1970-01-01 00:00:00 +0000
1173+++ swift/stats/log_uploader.py 2010-09-15 20:11:08 +0000
1174@@ -0,0 +1,160 @@
1175+# Copyright (c) 2010 OpenStack, LLC.
1176+#
1177+# Licensed under the Apache License, Version 2.0 (the "License");
1178+# you may not use this file except in compliance with the License.
1179+# You may obtain a copy of the License at
1180+#
1181+# http://www.apache.org/licenses/LICENSE-2.0
1182+#
1183+# Unless required by applicable law or agreed to in writing, software
1184+# distributed under the License is distributed on an "AS IS" BASIS,
1185+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
1186+# implied.
1187+# See the License for the specific language governing permissions and
1188+# limitations under the License.
1189+
1190+from __future__ import with_statement
1191+import os
1192+import hashlib
1193+import time
1194+import gzip
1195+import glob
1196+from paste.deploy import appconfig
1197+
1198+from swift.common.internal_proxy import InternalProxy
1199+from swift.common.daemon import Daemon
1200+from swift.common import utils
1201+
1202+class LogUploader(Daemon):
1203+ '''
1204+ Given a local directory, a swift account, and a container name, LogParser
1205+ will upload all files in the local directory to the given account/container.
1206+ All but the newest files will be uploaded, and the files' md5 sum will be
1207+ computed. The hash is used to prevent duplicate data from being uploaded
1208+ multiple times in different files (ex: log lines). Since the hash is
1209+ computed, it is also used as the uploaded object's etag to ensure data
1210+ integrity.
1211+
1212+ Note that after the file is successfully uploaded, it will be unlinked.
1213+
1214+ The given proxy server config is used to instantiate a proxy server for
1215+ the object uploads.
1216+ '''
1217+
1218+ def __init__(self, uploader_conf, plugin_name):
1219+ super(LogUploader, self).__init__(uploader_conf)
1220+ log_dir = uploader_conf.get('log_dir', '/var/log/swift/')
1221+ swift_account = uploader_conf['swift_account']
1222+ container_name = uploader_conf['container_name']
1223+ source_filename_format = uploader_conf['source_filename_format']
1224+ proxy_server_conf_loc = uploader_conf.get('proxy_server_conf',
1225+ '/etc/swift/proxy-server.conf')
1226+ proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc,
1227+ name='proxy-server')
1228+ if not log_dir.endswith('/'):
1229+ log_dir = log_dir + '/'
1230+ self.log_dir = log_dir
1231+ self.swift_account = swift_account
1232+ self.container_name = container_name
1233+ self.filename_format = source_filename_format
1234+ self.internal_proxy = InternalProxy(proxy_server_conf)
1235+ log_name = 'swift-log-uploader-%s' % plugin_name
1236+ self.logger = utils.get_logger(uploader_conf, plugin_name)
1237+
1238+ def run_once(self):
1239+ self.logger.info("Uploading logs")
1240+ start = time.time()
1241+ self.upload_all_logs()
1242+ self.logger.info("Uploading logs complete (%0.2f minutes)" %
1243+ ((time.time()-start)/60))
1244+
1245+ def upload_all_logs(self):
1246+ i = [(self.filename_format.index(c), c) for c in '%Y %m %d %H'.split()]
1247+ i.sort()
1248+ year_offset = month_offset = day_offset = hour_offset = None
1249+ base_offset = len(self.log_dir)
1250+ for start, c in i:
1251+ offset = base_offset + start
1252+ if c == '%Y':
1253+ year_offset = offset, offset+4
1254+ # Add in the difference between len(%Y) and the expanded
1255+ # version of %Y (????). This makes sure the codes after this
1256+ # one will align properly in the final filename.
1257+ base_offset += 2
1258+ elif c == '%m':
1259+ month_offset = offset, offset+2
1260+ elif c == '%d':
1261+ day_offset = offset, offset+2
1262+ elif c == '%H':
1263+ hour_offset = offset, offset+2
1264+ if not (year_offset and month_offset and day_offset and hour_offset):
1265+ # don't have all the parts, can't upload anything
1266+ return
1267+ glob_pattern = self.filename_format
1268+ glob_pattern = glob_pattern.replace('%Y', '????', 1)
1269+ glob_pattern = glob_pattern.replace('%m', '??', 1)
1270+ glob_pattern = glob_pattern.replace('%d', '??', 1)
1271+ glob_pattern = glob_pattern.replace('%H', '??', 1)
1272+ filelist = glob.iglob(os.path.join(self.log_dir, glob_pattern))
1273+ current_hour = int(time.strftime('%H'))
1274+ today = int(time.strftime('%Y%m%d'))
1275+ self.internal_proxy.create_container(self.swift_account,
1276+ self.container_name)
1277+ for filename in filelist:
1278+ try:
1279+ # From the filename, we need to derive the year, month, day,
1280+ # and hour for the file. These values are used in the uploaded
1281+ # object's name, so they should be a reasonably accurate
1282+ # representation of the time for which the data in the file was
1283+ # collected. The file's last modified time is not a reliable
1284+ # representation of the data in the file. For example, an old
1285+ # log file (from hour A) may be uploaded or moved into the
1286+ # log_dir in hour Z. The file's modified time will be for hour
1287+ # Z, and therefore the object's name in the system will not
1288+ # represent the data in it.
1289+ # If the filename doesn't match the format, it shouldn't be
1290+ # uploaded.
1291+ year = filename[slice(*year_offset)]
1292+ month = filename[slice(*month_offset)]
1293+ day = filename[slice(*day_offset)]
1294+ hour = filename[slice(*hour_offset)]
1295+ except IndexError:
1296+ # unexpected filename format, move on
1297+ self.logger.error("Unexpected log: %s" % filename)
1298+ continue
1299+ if (time.time() - os.stat(filename).st_mtime) < 7200:
1300+ # don't process very new logs
1301+ self.logger.debug("Skipping log: %s (< 2 hours old)" % filename)
1302+ continue
1303+ self.upload_one_log(filename, year, month, day, hour)
1304+
1305+ def upload_one_log(self, filename, year, month, day, hour):
1306+ if os.path.getsize(filename) == 0:
1307+ self.logger.debug("Log %s is 0 length, skipping" % filename)
1308+ return
1309+ self.logger.debug("Processing log: %s" % filename)
1310+ filehash = hashlib.md5()
1311+ already_compressed = True if filename.endswith('.gz') else False
1312+ opener = gzip.open if already_compressed else open
1313+ f = opener(filename, 'rb')
1314+ try:
1315+ for line in f:
1316+ # filter out bad lines here?
1317+ filehash.update(line)
1318+ finally:
1319+ f.close()
1320+ filehash = filehash.hexdigest()
1321+ # By adding a hash to the filename, we ensure that uploaded files
1322+ # have unique filenames and protect against uploading one file
1323+ # more than one time. By using md5, we get an etag for free.
1324+ target_filename = '/'.join([year, month, day, hour, filehash+'.gz'])
1325+ if self.internal_proxy.upload_file(filename,
1326+ self.swift_account,
1327+ self.container_name,
1328+ target_filename,
1329+ compress=(not already_compressed)):
1330+ self.logger.debug("Uploaded log %s to %s" %
1331+ (filename, target_filename))
1332+ os.unlink(filename)
1333+ else:
1334+ self.logger.error("ERROR: Upload of log %s failed!" % filename)
1335
1336=== added file 'swift/stats/stats_processor.py'
1337--- swift/stats/stats_processor.py 1970-01-01 00:00:00 +0000
1338+++ swift/stats/stats_processor.py 2010-09-15 20:11:08 +0000
1339@@ -0,0 +1,63 @@
1340+# Copyright (c) 2010 OpenStack, LLC.
1341+#
1342+# Licensed under the Apache License, Version 2.0 (the "License");
1343+# you may not use this file except in compliance with the License.
1344+# You may obtain a copy of the License at
1345+#
1346+# http://www.apache.org/licenses/LICENSE-2.0
1347+#
1348+# Unless required by applicable law or agreed to in writing, software
1349+# distributed under the License is distributed on an "AS IS" BASIS,
1350+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
1351+# implied.
1352+# See the License for the specific language governing permissions and
1353+# limitations under the License.
1354+
1355+class StatsLogProcessor(object):
1356+
1357+ def __init__(self, conf):
1358+ pass
1359+
1360+ def process(self, obj_stream, account, container, object_name):
1361+ '''generate hourly groupings of data from one stats log file'''
1362+ account_totals = {}
1363+ year, month, day, hour, _ = object_name.split('/')
1364+ for line in obj_stream:
1365+ if not line:
1366+ continue
1367+ try:
1368+ (account,
1369+ container_count,
1370+ object_count,
1371+ bytes_used) = line.split(',')
1372+ account = account.strip('"')
1373+ container_count = int(container_count.strip('"'))
1374+ object_count = int(object_count.strip('"'))
1375+ bytes_used = int(bytes_used.strip('"'))
1376+ aggr_key = (account, year, month, day, hour)
1377+ d = account_totals.get(aggr_key, {})
1378+ d['replica_count'] = d.setdefault('replica_count', 0) + 1
1379+ d['container_count'] = d.setdefault('container_count', 0) + \
1380+ container_count
1381+ d['object_count'] = d.setdefault('object_count', 0) + \
1382+ object_count
1383+ d['bytes_used'] = d.setdefault('bytes_used', 0) + \
1384+ bytes_used
1385+ account_totals[aggr_key] = d
1386+ except (IndexError, ValueError):
1387+ # bad line data
1388+ pass
1389+ return account_totals
1390+
1391+ def keylist_mapping(self):
1392+ '''
1393+ returns a dictionary of final keys mapped to source keys
1394+ '''
1395+ keylist_mapping = {
1396+ # <db key> : <row key> or <set of row keys>
1397+ 'bytes_used': 'bytes_used',
1398+ 'container_count': 'container_count',
1399+ 'object_count': 'object_count',
1400+ 'replica_count': 'replica_count',
1401+ }
1402+ return keylist_mapping
1403
1404=== added directory 'test/unit/stats'
1405=== added file 'test/unit/stats/__init__.py'
1406=== added file 'test/unit/stats/test_log_processor.py'
1407--- test/unit/stats/test_log_processor.py 1970-01-01 00:00:00 +0000
1408+++ test/unit/stats/test_log_processor.py 2010-09-15 20:11:08 +0000
1409@@ -0,0 +1,161 @@
1410+import unittest
1411+
1412+from swift.stats import log_processor
1413+
1414+class DumbLogger(object):
1415+ def __getattr__(self, n):
1416+ return self.foo
1417+
1418+ def foo(self, *a, **kw):
1419+ pass
1420+
1421+class DumbInternalProxy(object):
1422+ def get_container_list(self, account, container, marker=None):
1423+ n = '2010/03/14/13/obj1'
1424+ if marker is None or n > marker:
1425+ return [{'name': n}]
1426+ else:
1427+ return []
1428+
1429+ def get_object(self, account, container, object_name):
1430+ code = 200
1431+ if object_name.endswith('.gz'):
1432+ # same data as below, compressed with gzip -9
1433+ def data():
1434+ yield '\x1f\x8b\x08'
1435+ yield '\x08"\xd79L'
1436+ yield '\x02\x03te'
1437+ yield 'st\x00\xcbO'
1438+ yield '\xca\xe2JI,I'
1439+ yield '\xe4\x02\x00O\xff'
1440+ yield '\xa3Y\t\x00\x00\x00'
1441+ else:
1442+ def data():
1443+ yield 'obj\n'
1444+ yield 'data'
1445+ return code, data()
1446+
1447+class TestLogProcessor(unittest.TestCase):
1448+
1449+ access_test_line = 'Jul 9 04:14:30 saio proxy 1.2.3.4 4.5.6.7 '\
1450+ '09/Jul/2010/04/14/30 GET '\
1451+ '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\
1452+ 'curl tk4e350daf-9338-4cc6-aabb-090e49babfbd '\
1453+ '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262'
1454+ stats_test_line = 'account,1,2,3'
1455+ proxy_config = {'log-processor': {
1456+
1457+ }
1458+ }
1459+
1460+ def test_access_log_line_parser(self):
1461+ access_proxy_config = self.proxy_config
1462+ access_proxy_config.update({
1463+ 'log-processor-access': {
1464+ 'source_filename_format':'%Y%m%d%H*',
1465+ 'class_path':
1466+ 'swift.stats.access_processor.AccessLogProcessor'
1467+ }})
1468+ p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
1469+ result = p.plugins['access']['instance'].log_line_parser(self.access_test_line)
1470+ self.assertEquals(result, {'code': 200,
1471+ 'processing_time': '0.0262',
1472+ 'auth_token': 'tk4e350daf-9338-4cc6-aabb-090e49babfbd',
1473+ 'month': '07',
1474+ 'second': '30',
1475+ 'year': '2010',
1476+ 'query': 'format=json&foo',
1477+ 'tz': '+0000',
1478+ 'http_version': 'HTTP/1.0',
1479+ 'object_name': 'bar',
1480+ 'etag': '-',
1481+ 'foo': 1,
1482+ 'method': 'GET',
1483+ 'trans_id': 'txfa431231-7f07-42fd-8fc7-7da9d8cc1f90',
1484+ 'client_ip': '1.2.3.4',
1485+ 'format': 1,
1486+ 'bytes_out': 95,
1487+ 'container_name': 'foo',
1488+ 'day': '09',
1489+ 'minute': '14',
1490+ 'account': 'acct',
1491+ 'hour': '04',
1492+ 'referrer': '-',
1493+ 'request': '/v1/acct/foo/bar',
1494+ 'user_agent': 'curl',
1495+ 'bytes_in': 6,
1496+ 'lb_ip': '4.5.6.7'})
1497+
1498+ def test_process_one_access_file(self):
1499+ access_proxy_config = self.proxy_config
1500+ access_proxy_config.update({
1501+ 'log-processor-access': {
1502+ 'source_filename_format':'%Y%m%d%H*',
1503+ 'class_path':
1504+ 'swift.stats.access_processor.AccessLogProcessor'
1505+ }})
1506+ p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
1507+ def get_object_data(*a, **kw):
1508+ return [self.access_test_line]
1509+ p.get_object_data = get_object_data
1510+ result = p.process_one_file('access', 'a', 'c', 'o')
1511+ expected = {('acct', '2010', '07', '09', '04'):
1512+ {('public', 'object', 'GET', '2xx'): 1,
1513+ ('public', 'bytes_out'): 95,
1514+ 'marker_query': 0,
1515+ 'format_query': 1,
1516+ 'delimiter_query': 0,
1517+ 'path_query': 0,
1518+ ('public', 'bytes_in'): 6,
1519+ 'prefix_query': 0}}
1520+ self.assertEquals(result, expected)
1521+
1522+ def test_get_container_listing(self):
1523+ p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
1524+ p.internal_proxy = DumbInternalProxy()
1525+ result = p.get_container_listing('a', 'foo')
1526+ expected = ['2010/03/14/13/obj1']
1527+ self.assertEquals(result, expected)
1528+ result = p.get_container_listing('a', 'foo', listing_filter=expected)
1529+ expected = []
1530+ self.assertEquals(result, expected)
1531+ result = p.get_container_listing('a', 'foo', start_date='2010031412',
1532+ end_date='2010031414')
1533+ expected = ['2010/03/14/13/obj1']
1534+ self.assertEquals(result, expected)
1535+ result = p.get_container_listing('a', 'foo', start_date='2010031414')
1536+ expected = []
1537+ self.assertEquals(result, expected)
1538+ result = p.get_container_listing('a', 'foo', start_date='2010031410',
1539+ end_date='2010031412')
1540+ expected = []
1541+ self.assertEquals(result, expected)
1542+
1543+ def test_get_object_data(self):
1544+ p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
1545+ p.internal_proxy = DumbInternalProxy()
1546+ result = list(p.get_object_data('a', 'c', 'o', False))
1547+ expected = ['obj','data']
1548+ self.assertEquals(result, expected)
1549+ result = list(p.get_object_data('a', 'c', 'o.gz', True))
1550+ self.assertEquals(result, expected)
1551+
1552+ def test_get_stat_totals(self):
1553+ stats_proxy_config = self.proxy_config
1554+ stats_proxy_config.update({
1555+ 'log-processor-stats': {
1556+ 'class_path':
1557+ 'swift.stats.stats_processor.StatsLogProcessor'
1558+ }})
1559+ p = log_processor.LogProcessor(stats_proxy_config, DumbLogger())
1560+ p.internal_proxy = DumbInternalProxy()
1561+ def get_object_data(*a,**kw):
1562+ return [self.stats_test_line]
1563+ p.get_object_data = get_object_data
1564+ result = p.process_one_file('stats', 'a', 'c', 'y/m/d/h/o')
1565+ expected = {'account':
1566+ {'count': 1,
1567+ 'object_count': 2,
1568+ 'container_count': 1,
1569+ 'bytes_used': 3}}
1570+ self.assertEquals(result, expected)
1571\ No newline at end of file