Merge lp:~notmyname/swift/stats_system into lp:~hudson-openstack/swift/trunk
- stats_system
- Merge into trunk
Status: | Superseded |
---|---|
Proposed branch: | lp:~notmyname/swift/stats_system |
Merge into: | lp:~hudson-openstack/swift/trunk |
Diff against target: |
1736 lines (+1615/-12) 15 files modified
bin/swift-account-stats-logger (+27/-0) bin/swift-log-stats-collector (+27/-0) bin/swift-log-uploader (+31/-0) doc/source/overview_stats.rst (+79/-0) etc/log-processing.conf-sample (+38/-0) swift/common/compressing_file_reader.py (+71/-0) swift/common/daemon.py (+5/-2) swift/common/internal_proxy.py (+185/-0) swift/common/utils.py (+17/-10) swift/stats/access_processor.py (+226/-0) swift/stats/account_stats.py (+100/-0) swift/stats/log_processor.py (+413/-0) swift/stats/log_uploader.py (+170/-0) swift/stats/stats_processor.py (+65/-0) test/unit/stats/test_log_processor.py (+161/-0) |
To merge this branch: | bzr merge lp:~notmyname/swift/stats_system |
Related bugs: | |
Related blueprints: |
Stats system
(High)
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Swift Core security contacts | Pending | ||
Review via email: mp+36485@code.launchpad.net |
This proposal supersedes a proposal from 2010-09-22.
This proposal has been superseded by a proposal from 2010-09-29.
Commit message
Description of the change
work in progress of moving the existing internal (pre-openstack) swift stats system to (openstack) swift
Chuck Thier (cthier) wrote : Posted in a previous version of this proposal | # |
John Dickinson (notmyname) wrote : Posted in a previous version of this proposal | # |
updated to be more DRY compatible
John Dickinson (notmyname) wrote : Posted in a previous version of this proposal | # |
merged with recent trunk updates (resolving one conflict) in order to make final merge easier
John Dickinson (notmyname) wrote : Posted in a previous version of this proposal | # |
fixed unit test error
John Dickinson (notmyname) wrote : Posted in a previous version of this proposal | # |
updated with issues that came up in code review
Chuck Thier (cthier) wrote : | # |
Can we add docs on how to run this with the saio?
- 99. By John Dickinson
-
added additional docs
- 100. By John Dickinson
-
documentation clarification and pep8 fixes
- 101. By John Dickinson
-
added overview stats to the doc index
- 102. By John Dickinson
-
made long lines wrap (grr pep8)
- 103. By John Dickinson
-
merged with trunk
- 104. By John Dickinson
-
merged with trunk
- 105. By John Dickinson
-
merged with trunk
- 106. By John Dickinson
-
fixed stats docs
- 107. By John Dickinson
-
added a bad lines check to the access log parser
- 108. By John Dickinson
-
added tests for compressing file reader
- 109. By John Dickinson
-
fixed compressing file reader test
- 110. By John Dickinson
-
fixed compressing file reader test
- 111. By John Dickinson
-
improved compressing file reader test
- 112. By John Dickinson
-
fixed compressing file reader test
- 113. By John Dickinson
-
made try/except much less inclusive in access log processor
- 114. By John Dickinson
-
added keylist mapping tests and fixed other tests
- 115. By John Dickinson
-
updated stats system tests
- 116. By John Dickinson
-
merged with gholt's test stubs
- 117. By John Dickinson
-
updated SAIO instructions for the stats system
- 118. By John Dickinson
-
fixed stats system saio docs
- 119. By John Dickinson
-
fixed stats system saio docs
- 120. By John Dickinson
-
added openstack copyright/license to test_log_
processor. py - 121. By John Dickinson
-
improved logging in log processors
- 122. By John Dickinson
-
merged with trunk
- 123. By John Dickinson
-
fixed missing working directory bug in account stats
- 124. By John Dickinson
-
fixed readconf parameter that was broken with a previous merge
- 125. By John Dickinson
-
updated setup.py and saio docs for syats system
- 126. By John Dickinson
-
added readconf unit test
- 127. By John Dickinson
-
updated stats saio docs to create logs with the appropriate permissions
- 128. By John Dickinson
-
fixed bug in log processor internal proxy lazy load code
- 129. By John Dickinson
-
updated readconf test
- 130. By John Dickinson
-
updated readconf test
- 131. By John Dickinson
-
updated readconf test
- 132. By John Dickinson
-
fixed internal proxy references in log processor
- 133. By John Dickinson
-
fixed account stats filename creation
- 134. By John Dickinson
-
pep8 tomfoolery
- 135. By John Dickinson
-
moved paren
- 136. By John Dickinson
-
added lazy load of internal proxy to log processor (you were right clay)
Unmerged revisions
Preview Diff
1 | === added file 'bin/swift-account-stats-logger' |
2 | --- bin/swift-account-stats-logger 1970-01-01 00:00:00 +0000 |
3 | +++ bin/swift-account-stats-logger 2010-09-23 18:31:58 +0000 |
4 | @@ -0,0 +1,27 @@ |
5 | +#!/usr/bin/python |
6 | +# Copyright (c) 2010 OpenStack, LLC. |
7 | +# |
8 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
9 | +# you may not use this file except in compliance with the License. |
10 | +# You may obtain a copy of the License at |
11 | +# |
12 | +# http://www.apache.org/licenses/LICENSE-2.0 |
13 | +# |
14 | +# Unless required by applicable law or agreed to in writing, software |
15 | +# distributed under the License is distributed on an "AS IS" BASIS, |
16 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
17 | +# implied. |
18 | +# See the License for the specific language governing permissions and |
19 | +# limitations under the License. |
20 | + |
21 | +import sys |
22 | + |
23 | +from swift.stats.account_stats import AccountStat |
24 | +from swift.common import utils |
25 | + |
26 | +if __name__ == '__main__': |
27 | + if len(sys.argv) < 2: |
28 | + print "Usage: swift-account-stats-logger CONFIG_FILE" |
29 | + sys.exit() |
30 | + stats_conf = utils.readconf(sys.argv[1], 'log-processor-stats') |
31 | + stats = AccountStat(stats_conf).run(once=True) |
32 | |
33 | === added file 'bin/swift-log-stats-collector' |
34 | --- bin/swift-log-stats-collector 1970-01-01 00:00:00 +0000 |
35 | +++ bin/swift-log-stats-collector 2010-09-23 18:31:58 +0000 |
36 | @@ -0,0 +1,27 @@ |
37 | +#!/usr/bin/python |
38 | +# Copyright (c) 2010 OpenStack, LLC. |
39 | +# |
40 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
41 | +# you may not use this file except in compliance with the License. |
42 | +# You may obtain a copy of the License at |
43 | +# |
44 | +# http://www.apache.org/licenses/LICENSE-2.0 |
45 | +# |
46 | +# Unless required by applicable law or agreed to in writing, software |
47 | +# distributed under the License is distributed on an "AS IS" BASIS, |
48 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
49 | +# implied. |
50 | +# See the License for the specific language governing permissions and |
51 | +# limitations under the License. |
52 | + |
53 | +import sys |
54 | + |
55 | +from swift.stats.log_processor import LogProcessorDaemon |
56 | +from swift.common import utils |
57 | + |
58 | +if __name__ == '__main__': |
59 | + if len(sys.argv) < 2: |
60 | + print "Usage: swift-log-stats-collector CONFIG_FILE" |
61 | + sys.exit() |
62 | + conf = utils.readconf(sys.argv[1], log_name='log-stats-collector') |
63 | + stats = LogProcessorDaemon(conf).run(once=True) |
64 | |
65 | === added file 'bin/swift-log-uploader' |
66 | --- bin/swift-log-uploader 1970-01-01 00:00:00 +0000 |
67 | +++ bin/swift-log-uploader 2010-09-23 18:31:58 +0000 |
68 | @@ -0,0 +1,31 @@ |
69 | +#!/usr/bin/python |
70 | +# Copyright (c) 2010 OpenStack, LLC. |
71 | +# |
72 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
73 | +# you may not use this file except in compliance with the License. |
74 | +# You may obtain a copy of the License at |
75 | +# |
76 | +# http://www.apache.org/licenses/LICENSE-2.0 |
77 | +# |
78 | +# Unless required by applicable law or agreed to in writing, software |
79 | +# distributed under the License is distributed on an "AS IS" BASIS, |
80 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
81 | +# implied. |
82 | +# See the License for the specific language governing permissions and |
83 | +# limitations under the License. |
84 | + |
85 | +import sys |
86 | + |
87 | +from swift.stats.log_uploader import LogUploader |
88 | +from swift.common import utils |
89 | + |
90 | +if __name__ == '__main__': |
91 | + if len(sys.argv) < 3: |
92 | + print "Usage: swift-log-uploader CONFIG_FILE plugin" |
93 | + sys.exit() |
94 | + uploader_conf = utils.readconf(sys.argv[1], 'log-processor') |
95 | + plugin = sys.argv[2] |
96 | + section_name = 'log-processor-%s' % plugin |
97 | + plugin_conf = utils.readconf(sys.argv[1], section_name) |
98 | + uploader_conf.update(plugin_conf) |
99 | + uploader = LogUploader(uploader_conf, plugin).run(once=True) |
100 | |
101 | === added file 'doc/source/overview_stats.rst' |
102 | --- doc/source/overview_stats.rst 1970-01-01 00:00:00 +0000 |
103 | +++ doc/source/overview_stats.rst 2010-09-23 18:31:58 +0000 |
104 | @@ -0,0 +1,79 @@ |
105 | +================== |
106 | +Swift stats system |
107 | +================== |
108 | + |
109 | +The swift stats system is composed of three parts parts: log creation, log |
110 | +uploading, and log processing. The system handles two types of logs (access |
111 | +and storage stats), but it can be extended to handle other types of logs. |
112 | + |
113 | +--------- |
114 | +Log Types |
115 | +--------- |
116 | + |
117 | +*********** |
118 | +Access logs |
119 | +*********** |
120 | + |
121 | +Access logs are the proxy server logs. |
122 | + |
123 | +****************** |
124 | +Storage stats logs |
125 | +****************** |
126 | + |
127 | +Storage logs (also referred to as stats logs) are generated by a stats system |
128 | +process. swift-account-stats-logger runs on each account server (via cron) and |
129 | +walks the filesystem looking for account databases. When an account database |
130 | +is found, the logger selects the account hash, bytes_used, container_count, |
131 | +and object_count. These values are then written out as one line in a csv file. |
132 | +One csv file is produced for every run of swift-account-stats-logger. This |
133 | +means that, system wide, one csv file is produced for every storage node. |
134 | +Rackspace runs the account stats logger every hour. Therefore, in a cluster of |
135 | +ten account servers, ten csv files are produced every hour. Also, every |
136 | +account will have one entry for every replica in the system. On average, there |
137 | +will be three copies of each account in the aggreagate of all account stat csv |
138 | +files created in one system-wide run. |
139 | + |
140 | +---------------------- |
141 | +Log Processing plugins |
142 | +---------------------- |
143 | + |
144 | +The swift stats system is written to allow a plugin to be defined for every |
145 | +log type. Swift includes plugins for both access logs and storage stats logs. |
146 | +Each plugin is responsible for defining, in a config section, where the logs |
147 | +are stored on disk, where the logs will be stored in swift (account and |
148 | +container), the filename format of the logs on disk, the location of the |
149 | +plugin class definition, and any plugin-specific config values. |
150 | + |
151 | +The plugin class definition defines three methods. The constuctor must accept |
152 | +one argument (the dict representation of the plugin's config section). The |
153 | +process method must accept an iterator, and the account, container, and object |
154 | +name of the log. The keylist_mapping accepts no parameters. |
155 | + |
156 | +------------- |
157 | +Log Uploading |
158 | +------------- |
159 | + |
160 | +swift-log-uploader accepts a config file and a plugin name. It finds the log |
161 | +files on disk according to the plugin config section and uploads them to the |
162 | +swift cluster. This means one uploader process will run on each proxy server |
163 | +node and each account server node. To not upload partially-written log files, |
164 | +the uploader will not upload files with an mtime of less than two hours ago. |
165 | +Rackspace runs this process once an hour via cron. |
166 | + |
167 | +-------------- |
168 | +Log Processing |
169 | +-------------- |
170 | + |
171 | +swift-log-stats-collector accepts a config file and generates a csv that is |
172 | +uploaded to swift. It loads all plugins defined in the config file, generates |
173 | +a list of all log files in swift that need to be processed, and passes an |
174 | +iterable of the log file data to the appropriate plugin's process method. The |
175 | +process method returns a dictionary of data in the log file keyed on (account, |
176 | +year, month, day, hour). The log-stats-collector process then combines all |
177 | +dictionaries from all calls to a process method into one dictionary. Key |
178 | +collisions within each (account, year, month, day, hour) dictionary are |
179 | +summed. Finally, the summed dictionary is mapped to the final csv values with |
180 | +each plugin's keylist_mapping method. |
181 | + |
182 | +The resulting csv file has one line per (account, year, month, day, hour) for |
183 | +all log files processed in that run of swift-log-stats-collector. |
184 | |
185 | === added file 'etc/log-processing.conf-sample' |
186 | --- etc/log-processing.conf-sample 1970-01-01 00:00:00 +0000 |
187 | +++ etc/log-processing.conf-sample 2010-09-23 18:31:58 +0000 |
188 | @@ -0,0 +1,38 @@ |
189 | +# plugin section format is named "log-processor-<plugin>" |
190 | + |
191 | +[log-processor] |
192 | +swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 |
193 | +# container_name = log_processing_data |
194 | +# proxy_server_conf = /etc/swift/proxy-server.conf |
195 | +# log_facility = LOG_LOCAL0 |
196 | +# log_level = INFO |
197 | +# lookback_hours = 120 |
198 | +# lookback_window = 120 |
199 | +# user = swift |
200 | + |
201 | +[log-processor-access] |
202 | +# log_dir = /var/log/swift/ |
203 | +swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 |
204 | +container_name = log_data |
205 | +source_filename_format = access-%Y%m%d%H |
206 | +# new_log_cutoff = 7200 |
207 | +# unlink_log = True |
208 | +class_path = swift.stats.access_processor.AccessLogProcessor |
209 | +# service ips is for client ip addresses that should be counted as servicenet |
210 | +# service_ips = |
211 | +# load balancer private ips is for load balancer ip addresses that should be |
212 | +# counted as servicenet |
213 | +# lb_private_ips = |
214 | +# server_name = proxy |
215 | +# user = swift |
216 | + |
217 | +[log-processor-stats] |
218 | +# log_dir = /var/log/swift/ |
219 | +swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 |
220 | +container_name = account_stats |
221 | +source_filename_format = stats-%Y%m%d%H_* |
222 | +# new_log_cutoff = 7200 |
223 | +# unlink_log = True |
224 | +class_path = swift.stats.stats_processor.StatsLogProcessor |
225 | +# account_server_conf = /etc/swift/account-server.conf |
226 | +# user = swift |
227 | \ No newline at end of file |
228 | |
229 | === added file 'swift/common/compressing_file_reader.py' |
230 | --- swift/common/compressing_file_reader.py 1970-01-01 00:00:00 +0000 |
231 | +++ swift/common/compressing_file_reader.py 2010-09-23 18:31:58 +0000 |
232 | @@ -0,0 +1,71 @@ |
233 | +# Copyright (c) 2010 OpenStack, LLC. |
234 | +# |
235 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
236 | +# you may not use this file except in compliance with the License. |
237 | +# You may obtain a copy of the License at |
238 | +# |
239 | +# http://www.apache.org/licenses/LICENSE-2.0 |
240 | +# |
241 | +# Unless required by applicable law or agreed to in writing, software |
242 | +# distributed under the License is distributed on an "AS IS" BASIS, |
243 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
244 | +# implied. |
245 | +# See the License for the specific language governing permissions and |
246 | +# limitations under the License. |
247 | + |
248 | +import zlib |
249 | +import struct |
250 | + |
251 | +class CompressingFileReader(object): |
252 | + ''' |
253 | + Wraps a file object and provides a read method that returns gzip'd data. |
254 | + |
255 | + One warning: if read is called with a small value, the data returned may |
256 | + be bigger than the value. In this case, the "compressed" data will be |
257 | + bigger than the original data. To solve this, use a bigger read buffer. |
258 | + |
259 | + An example use case: |
260 | + Given an uncompressed file on disk, provide a way to read compressed data |
261 | + without buffering the entire file data in memory. Using this class, an |
262 | + uncompressed log file could be uploaded as compressed data with chunked |
263 | + transfer encoding. |
264 | + |
265 | + gzip header and footer code taken from the python stdlib gzip module |
266 | + |
267 | + :param file_obj: File object to read from |
268 | + :param compresslevel: compression level |
269 | + ''' |
270 | + def __init__(self, file_obj, compresslevel=9): |
271 | + self._f = file_obj |
272 | + self._compressor = zlib.compressobj(compresslevel, |
273 | + zlib.DEFLATED, |
274 | + -zlib.MAX_WBITS, |
275 | + zlib.DEF_MEM_LEVEL, |
276 | + 0) |
277 | + self.done = False |
278 | + self.first = True |
279 | + self.crc32 = 0 |
280 | + self.total_size = 0 |
281 | + |
282 | + def read(self, *a, **kw): |
283 | + if self.done: |
284 | + return '' |
285 | + x = self._f.read(*a, **kw) |
286 | + if x: |
287 | + self.crc32 = zlib.crc32(x, self.crc32) & 0xffffffffL |
288 | + self.total_size += len(x) |
289 | + compressed = self._compressor.compress(x) |
290 | + if not compressed: |
291 | + compressed = self._compressor.flush(zlib.Z_SYNC_FLUSH) |
292 | + else: |
293 | + compressed = self._compressor.flush(zlib.Z_FINISH) |
294 | + crc32 = struct.pack("<L", self.crc32 & 0xffffffffL) |
295 | + size = struct.pack("<L", self.total_size & 0xffffffffL) |
296 | + footer = crc32 + size |
297 | + compressed += footer |
298 | + self.done = True |
299 | + if self.first: |
300 | + self.first = False |
301 | + header = '\037\213\010\000\000\000\000\000\002\377' |
302 | + compressed = header + compressed |
303 | + return compressed |
304 | |
305 | === modified file 'swift/common/daemon.py' |
306 | --- swift/common/daemon.py 2010-09-16 03:52:54 +0000 |
307 | +++ swift/common/daemon.py 2010-09-23 18:31:58 +0000 |
308 | @@ -34,12 +34,15 @@ |
309 | """Override this to run forever""" |
310 | raise NotImplementedError('run_forever not implemented') |
311 | |
312 | - def run(self, once=False): |
313 | + def run(self, once=False, capture_stdout=True, capture_stderr=True): |
314 | """Run the daemon""" |
315 | # log uncaught exceptions |
316 | sys.excepthook = lambda *exc_info: \ |
317 | self.logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info) |
318 | - sys.stdout = sys.stderr = utils.LoggerFileObject(self.logger) |
319 | + if capture_stdout: |
320 | + sys.stdout = utils.LoggerFileObject(self.logger) |
321 | + if capture_stderr: |
322 | + sys.stderr = utils.LoggerFileObject(self.logger) |
323 | |
324 | utils.drop_privileges(self.conf.get('user', 'swift')) |
325 | |
326 | |
327 | === added file 'swift/common/internal_proxy.py' |
328 | --- swift/common/internal_proxy.py 1970-01-01 00:00:00 +0000 |
329 | +++ swift/common/internal_proxy.py 2010-09-23 18:31:58 +0000 |
330 | @@ -0,0 +1,185 @@ |
331 | +# Copyright (c) 2010 OpenStack, LLC. |
332 | +# |
333 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
334 | +# you may not use this file except in compliance with the License. |
335 | +# You may obtain a copy of the License at |
336 | +# |
337 | +# http://www.apache.org/licenses/LICENSE-2.0 |
338 | +# |
339 | +# Unless required by applicable law or agreed to in writing, software |
340 | +# distributed under the License is distributed on an "AS IS" BASIS, |
341 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
342 | +# implied. |
343 | +# See the License for the specific language governing permissions and |
344 | +# limitations under the License. |
345 | + |
346 | +import webob |
347 | +from urllib import quote, unquote |
348 | +from json import loads as json_loads |
349 | + |
350 | +from swift.common.compressing_file_reader import CompressingFileReader |
351 | +from swift.proxy.server import BaseApplication |
352 | + |
353 | +class MemcacheStub(object): |
354 | + def get(self, *a, **kw): return None |
355 | + def set(self, *a, **kw): return None |
356 | + def incr(self, *a, **kw): return 0 |
357 | + def delete(self, *a, **kw): return None |
358 | + def set_multi(self, *a, **kw): return None |
359 | + def get_multi(self, *a, **kw): return [] |
360 | + |
361 | +class InternalProxy(object): |
362 | + """ |
363 | + Set up a private instance of a proxy server that allows normal requests |
364 | + to be made without having to actually send the request to the proxy. |
365 | + This also doesn't log the requests to the normal proxy logs. |
366 | + |
367 | + :param proxy_server_conf: proxy server configuration dictionary |
368 | + :param logger: logger to log requests to |
369 | + :param retries: number of times to retry each request |
370 | + """ |
371 | + def __init__(self, proxy_server_conf=None, logger=None, retries=0): |
372 | + self.upload_app = BaseApplication(proxy_server_conf, |
373 | + memcache=MemcacheStub(), |
374 | + logger=logger) |
375 | + self.retries = retries |
376 | + |
377 | + def upload_file(self, source_file, account, container, object_name, |
378 | + compress=True, content_type='application/x-gzip', |
379 | + etag=None): |
380 | + """ |
381 | + Upload a file to cloud files. |
382 | + |
383 | + :param source_file: path to or file like object to upload |
384 | + :param account: account to upload to |
385 | + :param container: container to upload to |
386 | + :param object_name: name of object being uploaded |
387 | + :param compress: if True, compresses object as it is uploaded |
388 | + :param content_type: content-type of object |
389 | + :returns: True if successful, False otherwise |
390 | + """ |
391 | + log_create_pattern = '/v1/%s/%s/%s' % (account, container, object_name) |
392 | + |
393 | + # create the container |
394 | + if not self.create_container(account, container): |
395 | + return False |
396 | + |
397 | + # upload the file to the account |
398 | + req = webob.Request.blank(log_create_pattern, |
399 | + environ={'REQUEST_METHOD': 'PUT'}, |
400 | + headers={'Transfer-Encoding': 'chunked'}) |
401 | + if compress: |
402 | + if hasattr(source_file, 'read'): |
403 | + compressed_file = CompressingFileReader(source_file) |
404 | + else: |
405 | + compressed_file = CompressingFileReader(open(source_file, 'rb')) |
406 | + req.body_file = compressed_file |
407 | + else: |
408 | + if not hasattr(source_file, 'read'): |
409 | + source_file = open(source_file, 'rb') |
410 | + req.body_file = source_file |
411 | + req.account = account |
412 | + req.content_type = content_type |
413 | + req.content_length = None # to make sure we send chunked data |
414 | + if etag: |
415 | + req.etag = etag |
416 | + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) |
417 | + tries = 1 |
418 | + while (resp.status_int < 200 or resp.status_int > 299) \ |
419 | + and tries <= self.retries: |
420 | + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) |
421 | + tries += 1 |
422 | + if not (200 <= resp.status_int < 300): |
423 | + return False |
424 | + return True |
425 | + |
426 | + def get_object(self, account, container, object_name): |
427 | + """ |
428 | + Get object. |
429 | + |
430 | + :param account: account name object is in |
431 | + :param container: container name object is in |
432 | + :param object_name: name of object to get |
433 | + :returns: iterator for object data |
434 | + """ |
435 | + req = webob.Request.blank('/v1/%s/%s/%s' % |
436 | + (account, container, object_name), |
437 | + environ={'REQUEST_METHOD': 'GET'}) |
438 | + req.account = account |
439 | + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) |
440 | + tries = 1 |
441 | + while (resp.status_int < 200 or resp.status_int > 299) \ |
442 | + and tries <= self.retries: |
443 | + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) |
444 | + tries += 1 |
445 | + return resp.status_int, resp.app_iter |
446 | + |
447 | + def create_container(self, account, container): |
448 | + """ |
449 | + Create container. |
450 | + |
451 | + :param account: account name to put the container in |
452 | + :param container: container name to create |
453 | + :returns: True if successful, otherwise False |
454 | + """ |
455 | + req = webob.Request.blank('/v1/%s/%s' % (account, container), |
456 | + environ={'REQUEST_METHOD': 'PUT'}) |
457 | + req.account = account |
458 | + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) |
459 | + tries = 1 |
460 | + while (resp.status_int < 200 or resp.status_int > 299) \ |
461 | + and tries <= self.retries: |
462 | + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) |
463 | + tries += 1 |
464 | + return 200 <= resp.status_int < 300 |
465 | + |
466 | + def get_container_list(self, account, container, marker=None, limit=None, |
467 | + prefix=None, delimiter=None, full_listing=True): |
468 | + """ |
469 | + Get container listing. |
470 | + |
471 | + :param account: account name for the container |
472 | + :param container: container name to get the listing of |
473 | + :param marker: marker query |
474 | + :param limit: limit to query |
475 | + :param prefix: prefix query |
476 | + :param delimeter: delimeter for query |
477 | + :param full_listing: if True, make enough requests to get all listings |
478 | + :returns: list of objects |
479 | + """ |
480 | + if full_listing: |
481 | + rv = [] |
482 | + listing = self.get_container_list(account, container, marker, |
483 | + limit, prefix, delimiter, full_listing=False) |
484 | + while listing: |
485 | + rv.extend(listing) |
486 | + if not delimiter: |
487 | + marker = listing[-1]['name'] |
488 | + else: |
489 | + marker = listing[-1].get('name', listing[-1].get('subdir')) |
490 | + listing = self.get_container_list(account, container, marker, |
491 | + limit, prefix, delimiter, full_listing=False) |
492 | + return rv |
493 | + path = '/v1/%s/%s' % (account, container) |
494 | + qs = 'format=json' |
495 | + if marker: |
496 | + qs += '&marker=%s' % quote(marker) |
497 | + if limit: |
498 | + qs += '&limit=%d' % limit |
499 | + if prefix: |
500 | + qs += '&prefix=%s' % quote(prefix) |
501 | + if delimiter: |
502 | + qs += '&delimiter=%s' % quote(delimiter) |
503 | + path += '?%s' % qs |
504 | + req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'GET'}) |
505 | + req.account = account |
506 | + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) |
507 | + tries = 1 |
508 | + while (resp.status_int < 200 or resp.status_int > 299) \ |
509 | + and tries <= self.retries: |
510 | + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) |
511 | + tries += 1 |
512 | + if resp.status_int == 204: |
513 | + return [] |
514 | + if 200 <= resp.status_int < 300: |
515 | + return json_loads(resp.body) |
516 | |
517 | === modified file 'swift/common/utils.py' |
518 | --- swift/common/utils.py 2010-09-23 16:09:30 +0000 |
519 | +++ swift/common/utils.py 2010-09-23 18:31:58 +0000 |
520 | @@ -552,13 +552,13 @@ |
521 | """ |
522 | return item_from_env(env, 'swift.cache') |
523 | |
524 | - |
525 | def readconf(conf, section_name, log_name=None): |
526 | """ |
527 | Read config file and return config items as a dict |
528 | |
529 | :param conf: path to config file |
530 | - :param section_name: config section to read |
531 | + :param section_name: config section to read (will return all sections if |
532 | + not defined) |
533 | :param log_name: name to be used with logging (will use section_name if |
534 | not defined) |
535 | :returns: dict of config items |
536 | @@ -567,16 +567,23 @@ |
537 | if not c.read(conf): |
538 | print "Unable to read config file %s" % conf |
539 | sys.exit(1) |
540 | - if c.has_section(section_name): |
541 | - conf = dict(c.items(section_name)) |
542 | + if section_name: |
543 | + if c.has_section(section_name): |
544 | + conf = dict(c.items(section_name)) |
545 | + else: |
546 | + print "Unable to find %s config section in %s" % (section_name, conf) |
547 | + sys.exit(1) |
548 | + if "log_name" not in conf: |
549 | + if log_name is not None: |
550 | + conf['log_name'] = log_name |
551 | + else: |
552 | + conf['log_name'] = section_name |
553 | else: |
554 | - print "Unable to find %s config section in %s" % (section_name, conf) |
555 | - sys.exit(1) |
556 | - if "log_name" not in conf: |
557 | - if log_name is not None: |
558 | + conf = {} |
559 | + for s in c.sections(): |
560 | + conf.update({s:dict(c.items(s))}) |
561 | + if 'log_name' not in conf: |
562 | conf['log_name'] = log_name |
563 | - else: |
564 | - conf['log_name'] = section_name |
565 | return conf |
566 | |
567 | |
568 | |
569 | === added directory 'swift/stats' |
570 | === added file 'swift/stats/__init__.py' |
571 | === added file 'swift/stats/access_processor.py' |
572 | --- swift/stats/access_processor.py 1970-01-01 00:00:00 +0000 |
573 | +++ swift/stats/access_processor.py 2010-09-23 18:31:58 +0000 |
574 | @@ -0,0 +1,226 @@ |
575 | +# Copyright (c) 2010 OpenStack, LLC. |
576 | +# |
577 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
578 | +# you may not use this file except in compliance with the License. |
579 | +# You may obtain a copy of the License at |
580 | +# |
581 | +# http://www.apache.org/licenses/LICENSE-2.0 |
582 | +# |
583 | +# Unless required by applicable law or agreed to in writing, software |
584 | +# distributed under the License is distributed on an "AS IS" BASIS, |
585 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
586 | +# implied. |
587 | +# See the License for the specific language governing permissions and |
588 | +# limitations under the License. |
589 | + |
590 | +import collections |
591 | +from urllib import unquote |
592 | +import copy |
593 | + |
594 | +from swift.common.utils import split_path |
595 | + |
596 | +month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split() |
597 | + |
598 | + |
599 | +class AccessLogProcessor(object): |
600 | + """Transform proxy server access logs""" |
601 | + |
602 | + def __init__(self, conf): |
603 | + self.server_name = conf.get('server_name', 'proxy') |
604 | + self.lb_private_ips = [x.strip() for x in \ |
605 | + conf.get('lb_private_ips', '').split(',')\ |
606 | + if x.strip()] |
607 | + self.service_ips = [x.strip() for x in \ |
608 | + conf.get('service_ips', '').split(',')\ |
609 | + if x.strip()] |
610 | + |
611 | + def log_line_parser(self, raw_log): |
612 | + '''given a raw access log line, return a dict of the good parts''' |
613 | + d = {} |
614 | + try: |
615 | + (_, |
616 | + server, |
617 | + client_ip, |
618 | + lb_ip, |
619 | + timestamp, |
620 | + method, |
621 | + request, |
622 | + http_version, |
623 | + code, |
624 | + referrer, |
625 | + user_agent, |
626 | + auth_token, |
627 | + bytes_in, |
628 | + bytes_out, |
629 | + etag, |
630 | + trans_id, |
631 | + headers, |
632 | + processing_time) = (unquote(x) for x in raw_log[16:].split(' ')) |
633 | + if server != self.server_name: |
634 | + raise ValueError('incorrect server name in log line') |
635 | + (version, |
636 | + account, |
637 | + container_name, |
638 | + object_name) = split_path(request, 2, 4, True) |
639 | + if container_name is not None: |
640 | + container_name = container_name.split('?', 1)[0] |
641 | + if object_name is not None: |
642 | + object_name = object_name.split('?', 1)[0] |
643 | + account = account.split('?', 1)[0] |
644 | + query = None |
645 | + if '?' in request: |
646 | + request, query = request.split('?', 1) |
647 | + args = query.split('&') |
648 | + # Count each query argument. This is used later to aggregate |
649 | + # the number of format, prefix, etc. queries. |
650 | + for q in args: |
651 | + if '=' in q: |
652 | + k, v = q.split('=', 1) |
653 | + else: |
654 | + k = q |
655 | + # Certain keys will get summmed in stats reporting |
656 | + # (format, path, delimiter, etc.). Save a "1" here |
657 | + # to indicate that this request is 1 request for |
658 | + # its respective key. |
659 | + d[k] = 1 |
660 | + except ValueError: |
661 | + pass |
662 | + else: |
663 | + d['client_ip'] = client_ip |
664 | + d['lb_ip'] = lb_ip |
665 | + d['method'] = method |
666 | + d['request'] = request |
667 | + if query: |
668 | + d['query'] = query |
669 | + d['http_version'] = http_version |
670 | + d['code'] = code |
671 | + d['referrer'] = referrer |
672 | + d['user_agent'] = user_agent |
673 | + d['auth_token'] = auth_token |
674 | + d['bytes_in'] = bytes_in |
675 | + d['bytes_out'] = bytes_out |
676 | + d['etag'] = etag |
677 | + d['trans_id'] = trans_id |
678 | + d['processing_time'] = processing_time |
679 | + day, month, year, hour, minute, second = timestamp.split('/') |
680 | + d['day'] = day |
681 | + month = ('%02s' % month_map.index(month)).replace(' ', '0') |
682 | + d['month'] = month |
683 | + d['year'] = year |
684 | + d['hour'] = hour |
685 | + d['minute'] = minute |
686 | + d['second'] = second |
687 | + d['tz'] = '+0000' |
688 | + d['account'] = account |
689 | + d['container_name'] = container_name |
690 | + d['object_name'] = object_name |
691 | + d['bytes_out'] = int(d['bytes_out'].replace('-', '0')) |
692 | + d['bytes_in'] = int(d['bytes_in'].replace('-', '0')) |
693 | + d['code'] = int(d['code']) |
694 | + return d |
695 | + |
696 | + def process(self, obj_stream, account, container, object_name): |
697 | + '''generate hourly groupings of data from one access log file''' |
698 | + hourly_aggr_info = {} |
699 | + for line in obj_stream: |
700 | + line_data = self.log_line_parser(line) |
701 | + if not line_data: |
702 | + continue |
703 | + account = line_data['account'] |
704 | + container_name = line_data['container_name'] |
705 | + year = line_data['year'] |
706 | + month = line_data['month'] |
707 | + day = line_data['day'] |
708 | + hour = line_data['hour'] |
709 | + bytes_out = line_data['bytes_out'] |
710 | + bytes_in = line_data['bytes_in'] |
711 | + method = line_data['method'] |
712 | + code = int(line_data['code']) |
713 | + object_name = line_data['object_name'] |
714 | + client_ip = line_data['client_ip'] |
715 | + |
716 | + op_level = None |
717 | + if not container_name: |
718 | + op_level = 'account' |
719 | + elif container_name and not object_name: |
720 | + op_level = 'container' |
721 | + elif object_name: |
722 | + op_level = 'object' |
723 | + |
724 | + aggr_key = (account, year, month, day, hour) |
725 | + d = hourly_aggr_info.get(aggr_key, {}) |
726 | + if line_data['lb_ip'] in self.lb_private_ips: |
727 | + source = 'service' |
728 | + else: |
729 | + source = 'public' |
730 | + |
731 | + if line_data['client_ip'] in self.service_ips: |
732 | + source = 'service' |
733 | + |
734 | + d[(source, 'bytes_out')] = d.setdefault(( |
735 | + source, 'bytes_out'), 0) + bytes_out |
736 | + d[(source, 'bytes_in')] = d.setdefault((source, 'bytes_in'), 0) + \ |
737 | + bytes_in |
738 | + |
739 | + d['format_query'] = d.setdefault('format_query', 0) + \ |
740 | + line_data.get('format', 0) |
741 | + d['marker_query'] = d.setdefault('marker_query', 0) + \ |
742 | + line_data.get('marker', 0) |
743 | + d['prefix_query'] = d.setdefault('prefix_query', 0) + \ |
744 | + line_data.get('prefix', 0) |
745 | + d['delimiter_query'] = d.setdefault('delimiter_query', 0) + \ |
746 | + line_data.get('delimiter', 0) |
747 | + path = line_data.get('path', 0) |
748 | + d['path_query'] = d.setdefault('path_query', 0) + path |
749 | + |
750 | + code = '%dxx' % (code / 100) |
751 | + key = (source, op_level, method, code) |
752 | + d[key] = d.setdefault(key, 0) + 1 |
753 | + |
754 | + hourly_aggr_info[aggr_key] = d |
755 | + return hourly_aggr_info |
756 | + |
757 | + def keylist_mapping(self): |
758 | + source_keys = 'service public'.split() |
759 | + level_keys = 'account container object'.split() |
760 | + verb_keys = 'GET PUT POST DELETE HEAD COPY'.split() |
761 | + code_keys = '2xx 4xx 5xx'.split() |
762 | + |
763 | + keylist_mapping = { |
764 | + # <db key> : <row key> or <set of row keys> |
765 | + 'service_bw_in': ('service', 'bytes_in'), |
766 | + 'service_bw_out': ('service', 'bytes_out'), |
767 | + 'public_bw_in': ('public', 'bytes_in'), |
768 | + 'public_bw_out': ('public', 'bytes_out'), |
769 | + 'account_requests': set(), |
770 | + 'container_requests': set(), |
771 | + 'object_requests': set(), |
772 | + 'service_request': set(), |
773 | + 'public_request': set(), |
774 | + 'ops_count': set(), |
775 | + } |
776 | + for verb in verb_keys: |
777 | + keylist_mapping[verb] = set() |
778 | + for code in code_keys: |
779 | + keylist_mapping[code] = set() |
780 | + for source in source_keys: |
781 | + for level in level_keys: |
782 | + for verb in verb_keys: |
783 | + for code in code_keys: |
784 | + keylist_mapping['account_requests'].add( |
785 | + (source, 'account', verb, code)) |
786 | + keylist_mapping['container_requests'].add( |
787 | + (source, 'container', verb, code)) |
788 | + keylist_mapping['object_requests'].add( |
789 | + (source, 'object', verb, code)) |
790 | + keylist_mapping['service_request'].add( |
791 | + ('service', level, verb, code)) |
792 | + keylist_mapping['public_request'].add( |
793 | + ('public', level, verb, code)) |
794 | + keylist_mapping[verb].add( |
795 | + (source, level, verb, code)) |
796 | + keylist_mapping[code].add( |
797 | + (source, level, verb, code)) |
798 | + keylist_mapping['ops_count'].add( |
799 | + (source, level, verb, code)) |
800 | + return keylist_mapping |
801 | |
802 | === added file 'swift/stats/account_stats.py' |
803 | --- swift/stats/account_stats.py 1970-01-01 00:00:00 +0000 |
804 | +++ swift/stats/account_stats.py 2010-09-23 18:31:58 +0000 |
805 | @@ -0,0 +1,100 @@ |
806 | +# Copyright (c) 2010 OpenStack, LLC. |
807 | +# |
808 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
809 | +# you may not use this file except in compliance with the License. |
810 | +# You may obtain a copy of the License at |
811 | +# |
812 | +# http://www.apache.org/licenses/LICENSE-2.0 |
813 | +# |
814 | +# Unless required by applicable law or agreed to in writing, software |
815 | +# distributed under the License is distributed on an "AS IS" BASIS, |
816 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
817 | +# implied. |
818 | +# See the License for the specific language governing permissions and |
819 | +# limitations under the License. |
820 | + |
821 | +import os |
822 | +import time |
823 | +from paste.deploy import appconfig |
824 | +import shutil |
825 | +import hashlib |
826 | + |
827 | +from swift.account.server import DATADIR as account_server_data_dir |
828 | +from swift.common.db import AccountBroker |
829 | +from swift.common.internal_proxy import InternalProxy |
830 | +from swift.common.utils import renamer, get_logger, readconf, mkdirs |
831 | +from swift.common.constraints import check_mount |
832 | +from swift.common.daemon import Daemon |
833 | + |
834 | + |
835 | +class AccountStat(Daemon): |
836 | + """ |
837 | + Extract storage stats from account databases on the account |
838 | + storage nodes |
839 | + """ |
840 | + |
841 | + def __init__(self, stats_conf): |
842 | + super(AccountStat, self).__init__(stats_conf) |
843 | + target_dir = stats_conf.get('log_dir', '/var/log/swift') |
844 | + account_server_conf_loc = stats_conf.get('account_server_conf', |
845 | + '/etc/swift/account-server.conf') |
846 | + server_conf = appconfig('config:%s' % account_server_conf_loc, |
847 | + name='account-server') |
848 | + filename_format = stats_conf['source_filename_format'] |
849 | + self.filename_format = filename_format |
850 | + self.target_dir = target_dir |
851 | + mkdirs(self.target_dir) |
852 | + self.devices = server_conf.get('devices', '/srv/node') |
853 | + self.mount_check = server_conf.get('mount_check', 'true').lower() in \ |
854 | + ('true', 't', '1', 'on', 'yes', 'y') |
855 | + self.logger = get_logger(stats_conf, 'swift-account-stats-logger') |
856 | + |
857 | + def run_once(self): |
858 | + self.logger.info("Gathering account stats") |
859 | + start = time.time() |
860 | + self.find_and_process() |
861 | + self.logger.info("Gathering account stats complete (%0.2f minutes)" % |
862 | + ((time.time() - start) / 60)) |
863 | + |
864 | + def find_and_process(self): |
865 | + src_filename = time.strftime(self.filename_format) |
866 | + working_dir = os.path.join(self.target_dir, '.stats_tmp') |
867 | + shutil.rmtree(working_dir, ignore_errors=True) |
868 | + tmp_filename = os.path.join(working_dir, src_filename) |
869 | + hasher = hashlib.md5() |
870 | + with open(tmp_filename, 'wb') as statfile: |
871 | + # csv has the following columns: |
872 | + # Account Name, Container Count, Object Count, Bytes Used |
873 | + for device in os.listdir(self.devices): |
874 | + if self.mount_check and not check_mount(self.devices, device): |
875 | + self.logger.error("Device %s is not mounted, skipping." % |
876 | + device) |
877 | + continue |
878 | + accounts = os.path.join(self.devices, |
879 | + device, |
880 | + account_server_data_dir) |
881 | + if not os.path.exists(accounts): |
882 | + self.logger.debug("Path %s does not exist, skipping." % |
883 | + accounts) |
884 | + continue |
885 | + for root, dirs, files in os.walk(accounts, topdown=False): |
886 | + for filename in files: |
887 | + if filename.endswith('.db'): |
888 | + db_path = os.path.join(root, filename) |
889 | + broker = AccountBroker(db_path) |
890 | + if not broker.is_deleted(): |
891 | + (account_name, |
892 | + _, _, _, |
893 | + container_count, |
894 | + object_count, |
895 | + bytes_used, |
896 | + _, _) = broker.get_info() |
897 | + line_data = '"%s",%d,%d,%d\n' % ( |
898 | + account_name, container_count, |
899 | + object_count, bytes_used) |
900 | + statfile.write(line_data) |
901 | + hasher.update(line_data) |
902 | + file_hash = hasher.hexdigest() |
903 | + src_filename = '_'.join([src_filename, file_hash]) |
904 | + renamer(tmp_filename, os.path.join(self.target_dir, src_filename)) |
905 | + shutil.rmtree(working_dir, ignore_errors=True) |
906 | |
907 | === added file 'swift/stats/log_processor.py' |
908 | --- swift/stats/log_processor.py 1970-01-01 00:00:00 +0000 |
909 | +++ swift/stats/log_processor.py 2010-09-23 18:31:58 +0000 |
910 | @@ -0,0 +1,413 @@ |
911 | +# Copyright (c) 2010 OpenStack, LLC. |
912 | +# |
913 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
914 | +# you may not use this file except in compliance with the License. |
915 | +# You may obtain a copy of the License at |
916 | +# |
917 | +# http://www.apache.org/licenses/LICENSE-2.0 |
918 | +# |
919 | +# Unless required by applicable law or agreed to in writing, software |
920 | +# distributed under the License is distributed on an "AS IS" BASIS, |
921 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
922 | +# implied. |
923 | +# See the License for the specific language governing permissions and |
924 | +# limitations under the License. |
925 | + |
926 | +from ConfigParser import ConfigParser |
927 | +import zlib |
928 | +import time |
929 | +import datetime |
930 | +import cStringIO |
931 | +import collections |
932 | +from paste.deploy import appconfig |
933 | +import multiprocessing |
934 | +import Queue |
935 | +import cPickle |
936 | +import hashlib |
937 | + |
938 | +from swift.common.internal_proxy import InternalProxy |
939 | +from swift.common.exceptions import ChunkReadTimeout |
940 | +from swift.common.utils import get_logger, readconf |
941 | +from swift.common.daemon import Daemon |
942 | + |
943 | + |
944 | +class BadFileDownload(Exception): |
945 | + pass |
946 | + |
947 | + |
948 | +class LogProcessor(object): |
949 | + """Load plugins, process logs""" |
950 | + |
951 | + def __init__(self, conf, logger): |
952 | + stats_conf = conf.get('log-processor', {}) |
953 | + |
954 | + if isinstance(logger, tuple): |
955 | + self.logger = get_logger(*logger) |
956 | + else: |
957 | + self.logger = logger |
958 | + |
959 | + # load the processing plugins |
960 | + self.plugins = {} |
961 | + plugin_prefix = 'log-processor-' |
962 | + for section in (x for x in conf if x.startswith(plugin_prefix)): |
963 | + plugin_name = section[len(plugin_prefix):] |
964 | + plugin_conf = conf.get(section, {}) |
965 | + self.plugins[plugin_name] = plugin_conf |
966 | + class_path = self.plugins[plugin_name]['class_path'] |
967 | + import_target, class_name = class_path.rsplit('.', 1) |
968 | + module = __import__(import_target, fromlist=[import_target]) |
969 | + klass = getattr(module, class_name) |
970 | + self.plugins[plugin_name]['instance'] = klass(plugin_conf) |
971 | + |
972 | + @property |
973 | + def internal_proxy(self): |
974 | + '''Lazy load internal proxy''' |
975 | + if self._internal_proxy is None: |
976 | + proxy_server_conf_loc = stats_conf.get('proxy_server_conf', |
977 | + '/etc/swift/proxy-server.conf') |
978 | + self.proxy_server_conf = appconfig( |
979 | + 'config:%s' % proxy_server_conf_loc, |
980 | + name='proxy-server') |
981 | + self._internal_proxy = InternalProxy(self.proxy_server_conf, |
982 | + self.logger, |
983 | + retries=3) |
984 | + return self._internal_proxy |
985 | + |
986 | + def process_one_file(self, plugin_name, account, container, object_name): |
987 | + # get an iter of the object data |
988 | + compressed = object_name.endswith('.gz') |
989 | + stream = self.get_object_data(account, container, object_name, |
990 | + compressed=compressed) |
991 | + # look up the correct plugin and send the stream to it |
992 | + return self.plugins[plugin_name]['instance'].process(stream, |
993 | + account, |
994 | + container, |
995 | + object_name) |
996 | + |
997 | + def get_data_list(self, start_date=None, end_date=None, |
998 | + listing_filter=None): |
999 | + total_list = [] |
1000 | + for plugin_name, data in self.plugins.items(): |
1001 | + account = data['swift_account'] |
1002 | + container = data['container_name'] |
1003 | + listing = self.get_container_listing(account, |
1004 | + container, |
1005 | + start_date, |
1006 | + end_date) |
1007 | + for object_name in listing: |
1008 | + # The items in this list end up being passed as positional |
1009 | + # parameters to process_one_file. |
1010 | + x = (plugin_name, account, container, object_name) |
1011 | + if x not in listing_filter: |
1012 | + total_list.append(x) |
1013 | + return total_list |
1014 | + |
1015 | + def get_container_listing(self, swift_account, container_name, |
1016 | + start_date=None, end_date=None, |
1017 | + listing_filter=None): |
1018 | + ''' |
1019 | + Get a container listing, filtered by start_date, end_date, and |
1020 | + listing_filter. Dates, if given, should be in YYYYMMDDHH format |
1021 | + ''' |
1022 | + search_key = None |
1023 | + if start_date is not None: |
1024 | + date_parts = [] |
1025 | + try: |
1026 | + year, start_date = start_date[:4], start_date[4:] |
1027 | + if year: |
1028 | + date_parts.append(year) |
1029 | + month, start_date = start_date[:2], start_date[2:] |
1030 | + if month: |
1031 | + date_parts.append(month) |
1032 | + day, start_date = start_date[:2], start_date[2:] |
1033 | + if day: |
1034 | + date_parts.append(day) |
1035 | + hour, start_date = start_date[:2], start_date[2:] |
1036 | + if hour: |
1037 | + date_parts.append(hour) |
1038 | + except IndexError: |
1039 | + pass |
1040 | + else: |
1041 | + search_key = '/'.join(date_parts) |
1042 | + end_key = None |
1043 | + if end_date is not None: |
1044 | + date_parts = [] |
1045 | + try: |
1046 | + year, end_date = end_date[:4], end_date[4:] |
1047 | + if year: |
1048 | + date_parts.append(year) |
1049 | + month, end_date = end_date[:2], end_date[2:] |
1050 | + if month: |
1051 | + date_parts.append(month) |
1052 | + day, end_date = end_date[:2], end_date[2:] |
1053 | + if day: |
1054 | + date_parts.append(day) |
1055 | + hour, end_date = end_date[:2], end_date[2:] |
1056 | + if hour: |
1057 | + date_parts.append(hour) |
1058 | + except IndexError: |
1059 | + pass |
1060 | + else: |
1061 | + end_key = '/'.join(date_parts) |
1062 | + container_listing = self.internal_proxy.get_container_list( |
1063 | + swift_account, |
1064 | + container_name, |
1065 | + marker=search_key) |
1066 | + results = [] |
1067 | + if container_listing is not None: |
1068 | + if listing_filter is None: |
1069 | + listing_filter = set() |
1070 | + for item in container_listing: |
1071 | + name = item['name'] |
1072 | + if end_key and name > end_key: |
1073 | + break |
1074 | + if name not in listing_filter: |
1075 | + results.append(name) |
1076 | + return results |
1077 | + |
1078 | + def get_object_data(self, swift_account, container_name, object_name, |
1079 | + compressed=False): |
1080 | + '''reads an object and yields its lines''' |
1081 | + code, o = self.internal_proxy.get_object(swift_account, |
1082 | + container_name, |
1083 | + object_name) |
1084 | + if code < 200 or code >= 300: |
1085 | + return |
1086 | + last_part = '' |
1087 | + last_compressed_part = '' |
1088 | + # magic in the following zlib.decompressobj argument is courtesy of |
1089 | + # Python decompressing gzip chunk-by-chunk |
1090 | + # http://stackoverflow.com/questions/2423866 |
1091 | + d = zlib.decompressobj(16 + zlib.MAX_WBITS) |
1092 | + try: |
1093 | + for chunk in o: |
1094 | + if compressed: |
1095 | + try: |
1096 | + chunk = d.decompress(chunk) |
1097 | + except zlib.error: |
1098 | + raise BadFileDownload() # bad compressed data |
1099 | + parts = chunk.split('\n') |
1100 | + parts[0] = last_part + parts[0] |
1101 | + for part in parts[:-1]: |
1102 | + yield part |
1103 | + last_part = parts[-1] |
1104 | + if last_part: |
1105 | + yield last_part |
1106 | + except ChunkReadTimeout: |
1107 | + raise BadFileDownload() |
1108 | + |
1109 | + def generate_keylist_mapping(self): |
1110 | + keylist = {} |
1111 | + for plugin in self.plugins: |
1112 | + plugin_keylist = self.plugins[plugin]['instance'].keylist_mapping() |
1113 | + if not plugin_keylist: |
1114 | + continue |
1115 | + for k, v in plugin_keylist.items(): |
1116 | + o = keylist.get(k) |
1117 | + if o: |
1118 | + if isinstance(o, set): |
1119 | + if isinstance(v, set): |
1120 | + o.update(v) |
1121 | + else: |
1122 | + o.update([v]) |
1123 | + else: |
1124 | + o = set(o) |
1125 | + if isinstance(v, set): |
1126 | + o.update(v) |
1127 | + else: |
1128 | + o.update([v]) |
1129 | + else: |
1130 | + o = v |
1131 | + keylist[k] = o |
1132 | + return keylist |
1133 | + |
1134 | + |
1135 | +class LogProcessorDaemon(Daemon): |
1136 | + """ |
1137 | + Gather raw log data and farm proccessing to generate a csv that is |
1138 | + uploaded to swift. |
1139 | + """ |
1140 | + |
1141 | + def __init__(self, conf): |
1142 | + c = conf.get('log-processor') |
1143 | + super(LogProcessorDaemon, self).__init__(c) |
1144 | + self.total_conf = conf |
1145 | + self.logger = get_logger(c) |
1146 | + self.log_processor = LogProcessor(conf, self.logger) |
1147 | + self.lookback_hours = int(c.get('lookback_hours', '120')) |
1148 | + self.lookback_window = int(c.get('lookback_window', |
1149 | + str(self.lookback_hours))) |
1150 | + self.log_processor_account = c['swift_account'] |
1151 | + self.log_processor_container = c.get('container_name', |
1152 | + 'log_processing_data') |
1153 | + self.worker_count = int(c.get('worker_count', '1')) |
1154 | + |
1155 | + def run_once(self): |
1156 | + self.logger.info("Beginning log processing") |
1157 | + start = time.time() |
1158 | + if self.lookback_hours == 0: |
1159 | + lookback_start = None |
1160 | + lookback_end = None |
1161 | + else: |
1162 | + delta_hours = datetime.timedelta(hours=self.lookback_hours) |
1163 | + lookback_start = datetime.datetime.now() - delta_hours |
1164 | + lookback_start = lookback_start.strftime('%Y%m%d%H') |
1165 | + if self.lookback_window == 0: |
1166 | + lookback_end = None |
1167 | + else: |
1168 | + delta_window = datetime.timedelta(hours=self.lookback_window) |
1169 | + lookback_end = datetime.datetime.now() - \ |
1170 | + delta_hours + \ |
1171 | + delta_window |
1172 | + lookback_end = lookback_end.strftime('%Y%m%d%H') |
1173 | + self.logger.debug('lookback_start: %s' % lookback_start) |
1174 | + self.logger.debug('lookback_end: %s' % lookback_end) |
1175 | + try: |
1176 | + # Note: this file (or data set) will grow without bound. |
1177 | + # In practice, if it becomes a problem (say, after many months of |
1178 | + # running), one could manually prune the file to remove older |
1179 | + # entries. Automatically pruning on each run could be dangerous. |
1180 | + # There is not a good way to determine when an old entry should be |
1181 | + # pruned (lookback_hours could be set to anything and could change) |
1182 | + processed_files_stream = self.log_processor.get_object_data( |
1183 | + self.log_processor_account, |
1184 | + self.log_processor_container, |
1185 | + 'processed_files.pickle.gz', |
1186 | + compressed=True) |
1187 | + buf = '\n'.join(x for x in processed_files_stream) |
1188 | + if buf: |
1189 | + already_processed_files = cPickle.loads(buf) |
1190 | + else: |
1191 | + already_processed_files = set() |
1192 | + except: |
1193 | + already_processed_files = set() |
1194 | + self.logger.debug('found %d processed files' % \ |
1195 | + len(already_processed_files)) |
1196 | + logs_to_process = self.log_processor.get_data_list(lookback_start, |
1197 | + lookback_end, |
1198 | + already_processed_files) |
1199 | + self.logger.info('loaded %d files to process' % len(logs_to_process)) |
1200 | + if not logs_to_process: |
1201 | + self.logger.info("Log processing done (%0.2f minutes)" % |
1202 | + ((time.time() - start) / 60)) |
1203 | + return |
1204 | + |
1205 | + # map |
1206 | + processor_args = (self.total_conf, self.logger) |
1207 | + results = multiprocess_collate(processor_args, logs_to_process, |
1208 | + self.worker_count) |
1209 | + |
1210 | + #reduce |
1211 | + aggr_data = {} |
1212 | + processed_files = already_processed_files |
1213 | + for item, data in results: |
1214 | + # since item contains the plugin and the log name, new plugins will |
1215 | + # "reprocess" the file and the results will be in the final csv. |
1216 | + processed_files.add(item) |
1217 | + for k, d in data.items(): |
1218 | + existing_data = aggr_data.get(k, {}) |
1219 | + for i, j in d.items(): |
1220 | + current = existing_data.get(i, 0) |
1221 | + # merging strategy for key collisions is addition |
1222 | + # processing plugins need to realize this |
1223 | + existing_data[i] = current + j |
1224 | + aggr_data[k] = existing_data |
1225 | + |
1226 | + # group |
1227 | + # reduce a large number of keys in aggr_data[k] to a small number of |
1228 | + # output keys |
1229 | + keylist_mapping = self.log_processor.generate_keylist_mapping() |
1230 | + final_info = collections.defaultdict(dict) |
1231 | + for account, data in aggr_data.items(): |
1232 | + for key, mapping in keylist_mapping.items(): |
1233 | + if isinstance(mapping, (list, set)): |
1234 | + value = 0 |
1235 | + for k in mapping: |
1236 | + try: |
1237 | + value += data[k] |
1238 | + except KeyError: |
1239 | + pass |
1240 | + else: |
1241 | + try: |
1242 | + value = data[mapping] |
1243 | + except KeyError: |
1244 | + value = 0 |
1245 | + final_info[account][key] = value |
1246 | + |
1247 | + # output |
1248 | + sorted_keylist_mapping = sorted(keylist_mapping) |
1249 | + columns = 'data_ts,account,' + ','.join(sorted_keylist_mapping) |
1250 | + out_buf = [columns] |
1251 | + for (account, year, month, day, hour), d in final_info.items(): |
1252 | + data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour) |
1253 | + row = [data_ts] |
1254 | + row.append('%s' % account) |
1255 | + for k in sorted_keylist_mapping: |
1256 | + row.append('%s' % d[k]) |
1257 | + out_buf.append(','.join(row)) |
1258 | + out_buf = '\n'.join(out_buf) |
1259 | + h = hashlib.md5(out_buf).hexdigest() |
1260 | + upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h |
1261 | + f = cStringIO.StringIO(out_buf) |
1262 | + self.log_processor.internal_proxy.upload_file(f, |
1263 | + self.log_processor_account, |
1264 | + self.log_processor_container, |
1265 | + upload_name) |
1266 | + |
1267 | + # cleanup |
1268 | + s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) |
1269 | + f = cStringIO.StringIO(s) |
1270 | + self.log_processor.internal_proxy.upload_file(f, |
1271 | + self.log_processor_account, |
1272 | + self.log_processor_container, |
1273 | + 'processed_files.pickle.gz') |
1274 | + |
1275 | + self.logger.info("Log processing done (%0.2f minutes)" % |
1276 | + ((time.time() - start) / 60)) |
1277 | + |
1278 | + |
1279 | +def multiprocess_collate(processor_args, logs_to_process, worker_count): |
1280 | + '''yield hourly data from logs_to_process''' |
1281 | + results = [] |
1282 | + in_queue = multiprocessing.Queue() |
1283 | + out_queue = multiprocessing.Queue() |
1284 | + for _ in range(worker_count): |
1285 | + p = multiprocessing.Process(target=collate_worker, |
1286 | + args=(processor_args, |
1287 | + in_queue, |
1288 | + out_queue)) |
1289 | + p.start() |
1290 | + results.append(p) |
1291 | + for x in logs_to_process: |
1292 | + in_queue.put(x) |
1293 | + for _ in range(worker_count): |
1294 | + in_queue.put(None) |
1295 | + count = 0 |
1296 | + while True: |
1297 | + try: |
1298 | + item, data = out_queue.get_nowait() |
1299 | + count += 1 |
1300 | + if data: |
1301 | + yield item, data |
1302 | + if count >= len(logs_to_process): |
1303 | + # this implies that one result will come from every request |
1304 | + break |
1305 | + except Queue.Empty: |
1306 | + time.sleep(.1) |
1307 | + for r in results: |
1308 | + r.join() |
1309 | + |
1310 | + |
1311 | +def collate_worker(processor_args, in_queue, out_queue): |
1312 | + '''worker process for multiprocess_collate''' |
1313 | + p = LogProcessor(*processor_args) |
1314 | + while True: |
1315 | + try: |
1316 | + item = in_queue.get_nowait() |
1317 | + if item is None: |
1318 | + break |
1319 | + except Queue.Empty: |
1320 | + time.sleep(.1) |
1321 | + else: |
1322 | + ret = p.process_one_file(*item) |
1323 | + out_queue.put((item, ret)) |
1324 | |
1325 | === added file 'swift/stats/log_uploader.py' |
1326 | --- swift/stats/log_uploader.py 1970-01-01 00:00:00 +0000 |
1327 | +++ swift/stats/log_uploader.py 2010-09-23 18:31:58 +0000 |
1328 | @@ -0,0 +1,170 @@ |
1329 | +# Copyright (c) 2010 OpenStack, LLC. |
1330 | +# |
1331 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
1332 | +# you may not use this file except in compliance with the License. |
1333 | +# You may obtain a copy of the License at |
1334 | +# |
1335 | +# http://www.apache.org/licenses/LICENSE-2.0 |
1336 | +# |
1337 | +# Unless required by applicable law or agreed to in writing, software |
1338 | +# distributed under the License is distributed on an "AS IS" BASIS, |
1339 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
1340 | +# implied. |
1341 | +# See the License for the specific language governing permissions and |
1342 | +# limitations under the License. |
1343 | + |
1344 | +from __future__ import with_statement |
1345 | +import os |
1346 | +import hashlib |
1347 | +import time |
1348 | +import gzip |
1349 | +import glob |
1350 | +from paste.deploy import appconfig |
1351 | + |
1352 | +from swift.common.internal_proxy import InternalProxy |
1353 | +from swift.common.daemon import Daemon |
1354 | +from swift.common import utils |
1355 | + |
1356 | + |
1357 | +class LogUploader(Daemon): |
1358 | + ''' |
1359 | + Given a local directory, a swift account, and a container name, LogParser |
1360 | + will upload all files in the local directory to the given account/ |
1361 | + container. All but the newest files will be uploaded, and the files' md5 |
1362 | + sum will be computed. The hash is used to prevent duplicate data from |
1363 | + being uploaded multiple times in different files (ex: log lines). Since |
1364 | + the hash is computed, it is also used as the uploaded object's etag to |
1365 | + ensure data integrity. |
1366 | + |
1367 | + Note that after the file is successfully uploaded, it will be unlinked. |
1368 | + |
1369 | + The given proxy server config is used to instantiate a proxy server for |
1370 | + the object uploads. |
1371 | + ''' |
1372 | + |
1373 | + def __init__(self, uploader_conf, plugin_name): |
1374 | + super(LogUploader, self).__init__(uploader_conf) |
1375 | + log_dir = uploader_conf.get('log_dir', '/var/log/swift/') |
1376 | + swift_account = uploader_conf['swift_account'] |
1377 | + container_name = uploader_conf['container_name'] |
1378 | + source_filename_format = uploader_conf['source_filename_format'] |
1379 | + proxy_server_conf_loc = uploader_conf.get('proxy_server_conf', |
1380 | + '/etc/swift/proxy-server.conf') |
1381 | + proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc, |
1382 | + name='proxy-server') |
1383 | + new_log_cutoff = int(uploader_conf.get('new_log_cutoff', '7200')) |
1384 | + unlink_log = uploader_conf.get('unlink_log', 'True').lower() in \ |
1385 | + ('true', 'on', '1', 'yes') |
1386 | + self.unlink_log = unlink_log |
1387 | + self.new_log_cutoff = new_log_cutoff |
1388 | + if not log_dir.endswith('/'): |
1389 | + log_dir = log_dir + '/' |
1390 | + self.log_dir = log_dir |
1391 | + self.swift_account = swift_account |
1392 | + self.container_name = container_name |
1393 | + self.filename_format = source_filename_format |
1394 | + self.internal_proxy = InternalProxy(proxy_server_conf) |
1395 | + log_name = 'swift-log-uploader-%s' % plugin_name |
1396 | + self.logger = utils.get_logger(uploader_conf, plugin_name) |
1397 | + |
1398 | + def run_once(self): |
1399 | + self.logger.info("Uploading logs") |
1400 | + start = time.time() |
1401 | + self.upload_all_logs() |
1402 | + self.logger.info("Uploading logs complete (%0.2f minutes)" % |
1403 | + ((time.time() - start) / 60)) |
1404 | + |
1405 | + def upload_all_logs(self): |
1406 | + i = [(self.filename_format.index(c), c) for c in '%Y %m %d %H'.split()] |
1407 | + i.sort() |
1408 | + year_offset = month_offset = day_offset = hour_offset = None |
1409 | + base_offset = len(self.log_dir) |
1410 | + for start, c in i: |
1411 | + offset = base_offset + start |
1412 | + if c == '%Y': |
1413 | + year_offset = offset, offset + 4 |
1414 | + # Add in the difference between len(%Y) and the expanded |
1415 | + # version of %Y (????). This makes sure the codes after this |
1416 | + # one will align properly in the final filename. |
1417 | + base_offset += 2 |
1418 | + elif c == '%m': |
1419 | + month_offset = offset, offset + 2 |
1420 | + elif c == '%d': |
1421 | + day_offset = offset, offset + 2 |
1422 | + elif c == '%H': |
1423 | + hour_offset = offset, offset + 2 |
1424 | + if not (year_offset and month_offset and day_offset and hour_offset): |
1425 | + # don't have all the parts, can't upload anything |
1426 | + return |
1427 | + glob_pattern = self.filename_format |
1428 | + glob_pattern = glob_pattern.replace('%Y', '????', 1) |
1429 | + glob_pattern = glob_pattern.replace('%m', '??', 1) |
1430 | + glob_pattern = glob_pattern.replace('%d', '??', 1) |
1431 | + glob_pattern = glob_pattern.replace('%H', '??', 1) |
1432 | + filelist = glob.iglob(os.path.join(self.log_dir, glob_pattern)) |
1433 | + current_hour = int(time.strftime('%H')) |
1434 | + today = int(time.strftime('%Y%m%d')) |
1435 | + self.internal_proxy.create_container(self.swift_account, |
1436 | + self.container_name) |
1437 | + for filename in filelist: |
1438 | + try: |
1439 | + # From the filename, we need to derive the year, month, day, |
1440 | + # and hour for the file. These values are used in the uploaded |
1441 | + # object's name, so they should be a reasonably accurate |
1442 | + # representation of the time for which the data in the file was |
1443 | + # collected. The file's last modified time is not a reliable |
1444 | + # representation of the data in the file. For example, an old |
1445 | + # log file (from hour A) may be uploaded or moved into the |
1446 | + # log_dir in hour Z. The file's modified time will be for hour |
1447 | + # Z, and therefore the object's name in the system will not |
1448 | + # represent the data in it. |
1449 | + # If the filename doesn't match the format, it shouldn't be |
1450 | + # uploaded. |
1451 | + year = filename[slice(*year_offset)] |
1452 | + month = filename[slice(*month_offset)] |
1453 | + day = filename[slice(*day_offset)] |
1454 | + hour = filename[slice(*hour_offset)] |
1455 | + except IndexError: |
1456 | + # unexpected filename format, move on |
1457 | + self.logger.error("Unexpected log: %s" % filename) |
1458 | + continue |
1459 | + if ((time.time() - os.stat(filename).st_mtime) < |
1460 | + self.new_log_cutoff): |
1461 | + # don't process very new logs |
1462 | + self.logger.debug( |
1463 | + "Skipping log: %s (< %d seconds old)" % (filename, |
1464 | + self.new_log_cutoff)) |
1465 | + continue |
1466 | + self.upload_one_log(filename, year, month, day, hour) |
1467 | + |
1468 | + def upload_one_log(self, filename, year, month, day, hour): |
1469 | + if os.path.getsize(filename) == 0: |
1470 | + self.logger.debug("Log %s is 0 length, skipping" % filename) |
1471 | + return |
1472 | + self.logger.debug("Processing log: %s" % filename) |
1473 | + filehash = hashlib.md5() |
1474 | + already_compressed = True if filename.endswith('.gz') else False |
1475 | + opener = gzip.open if already_compressed else open |
1476 | + f = opener(filename, 'rb') |
1477 | + try: |
1478 | + for line in f: |
1479 | + # filter out bad lines here? |
1480 | + filehash.update(line) |
1481 | + finally: |
1482 | + f.close() |
1483 | + filehash = filehash.hexdigest() |
1484 | + # By adding a hash to the filename, we ensure that uploaded files |
1485 | + # have unique filenames and protect against uploading one file |
1486 | + # more than one time. By using md5, we get an etag for free. |
1487 | + target_filename = '/'.join([year, month, day, hour, filehash + '.gz']) |
1488 | + if self.internal_proxy.upload_file(filename, |
1489 | + self.swift_account, |
1490 | + self.container_name, |
1491 | + target_filename, |
1492 | + compress=(not already_compressed)): |
1493 | + self.logger.debug("Uploaded log %s to %s" % |
1494 | + (filename, target_filename)) |
1495 | + if self.unlink_log: |
1496 | + os.unlink(filename) |
1497 | + else: |
1498 | + self.logger.error("ERROR: Upload of log %s failed!" % filename) |
1499 | |
1500 | === added file 'swift/stats/stats_processor.py' |
1501 | --- swift/stats/stats_processor.py 1970-01-01 00:00:00 +0000 |
1502 | +++ swift/stats/stats_processor.py 2010-09-23 18:31:58 +0000 |
1503 | @@ -0,0 +1,65 @@ |
1504 | +# Copyright (c) 2010 OpenStack, LLC. |
1505 | +# |
1506 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
1507 | +# you may not use this file except in compliance with the License. |
1508 | +# You may obtain a copy of the License at |
1509 | +# |
1510 | +# http://www.apache.org/licenses/LICENSE-2.0 |
1511 | +# |
1512 | +# Unless required by applicable law or agreed to in writing, software |
1513 | +# distributed under the License is distributed on an "AS IS" BASIS, |
1514 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
1515 | +# implied. |
1516 | +# See the License for the specific language governing permissions and |
1517 | +# limitations under the License. |
1518 | + |
1519 | + |
1520 | +class StatsLogProcessor(object): |
1521 | + """Transform account storage stat logs""" |
1522 | + |
1523 | + def __init__(self, conf): |
1524 | + pass |
1525 | + |
1526 | + def process(self, obj_stream, account, container, object_name): |
1527 | + '''generate hourly groupings of data from one stats log file''' |
1528 | + account_totals = {} |
1529 | + year, month, day, hour, _ = object_name.split('/') |
1530 | + for line in obj_stream: |
1531 | + if not line: |
1532 | + continue |
1533 | + try: |
1534 | + (account, |
1535 | + container_count, |
1536 | + object_count, |
1537 | + bytes_used) = line.split(',') |
1538 | + account = account.strip('"') |
1539 | + container_count = int(container_count.strip('"')) |
1540 | + object_count = int(object_count.strip('"')) |
1541 | + bytes_used = int(bytes_used.strip('"')) |
1542 | + aggr_key = (account, year, month, day, hour) |
1543 | + d = account_totals.get(aggr_key, {}) |
1544 | + d['replica_count'] = d.setdefault('replica_count', 0) + 1 |
1545 | + d['container_count'] = d.setdefault('container_count', 0) + \ |
1546 | + container_count |
1547 | + d['object_count'] = d.setdefault('object_count', 0) + \ |
1548 | + object_count |
1549 | + d['bytes_used'] = d.setdefault('bytes_used', 0) + \ |
1550 | + bytes_used |
1551 | + account_totals[aggr_key] = d |
1552 | + except (IndexError, ValueError): |
1553 | + # bad line data |
1554 | + pass |
1555 | + return account_totals |
1556 | + |
1557 | + def keylist_mapping(self): |
1558 | + ''' |
1559 | + returns a dictionary of final keys mapped to source keys |
1560 | + ''' |
1561 | + keylist_mapping = { |
1562 | + # <db key> : <row key> or <set of row keys> |
1563 | + 'bytes_used': 'bytes_used', |
1564 | + 'container_count': 'container_count', |
1565 | + 'object_count': 'object_count', |
1566 | + 'replica_count': 'replica_count', |
1567 | + } |
1568 | + return keylist_mapping |
1569 | |
1570 | === added directory 'test/unit/stats' |
1571 | === added file 'test/unit/stats/__init__.py' |
1572 | === added file 'test/unit/stats/test_log_processor.py' |
1573 | --- test/unit/stats/test_log_processor.py 1970-01-01 00:00:00 +0000 |
1574 | +++ test/unit/stats/test_log_processor.py 2010-09-23 18:31:58 +0000 |
1575 | @@ -0,0 +1,161 @@ |
1576 | +import unittest |
1577 | + |
1578 | +from swift.stats import log_processor |
1579 | + |
1580 | +class DumbLogger(object): |
1581 | + def __getattr__(self, n): |
1582 | + return self.foo |
1583 | + |
1584 | + def foo(self, *a, **kw): |
1585 | + pass |
1586 | + |
1587 | +class DumbInternalProxy(object): |
1588 | + def get_container_list(self, account, container, marker=None): |
1589 | + n = '2010/03/14/13/obj1' |
1590 | + if marker is None or n > marker: |
1591 | + return [{'name': n}] |
1592 | + else: |
1593 | + return [] |
1594 | + |
1595 | + def get_object(self, account, container, object_name): |
1596 | + code = 200 |
1597 | + if object_name.endswith('.gz'): |
1598 | + # same data as below, compressed with gzip -9 |
1599 | + def data(): |
1600 | + yield '\x1f\x8b\x08' |
1601 | + yield '\x08"\xd79L' |
1602 | + yield '\x02\x03te' |
1603 | + yield 'st\x00\xcbO' |
1604 | + yield '\xca\xe2JI,I' |
1605 | + yield '\xe4\x02\x00O\xff' |
1606 | + yield '\xa3Y\t\x00\x00\x00' |
1607 | + else: |
1608 | + def data(): |
1609 | + yield 'obj\n' |
1610 | + yield 'data' |
1611 | + return code, data() |
1612 | + |
1613 | +class TestLogProcessor(unittest.TestCase): |
1614 | + |
1615 | + access_test_line = 'Jul 9 04:14:30 saio proxy 1.2.3.4 4.5.6.7 '\ |
1616 | + '09/Jul/2010/04/14/30 GET '\ |
1617 | + '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\ |
1618 | + 'curl tk4e350daf-9338-4cc6-aabb-090e49babfbd '\ |
1619 | + '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262' |
1620 | + stats_test_line = 'account,1,2,3' |
1621 | + proxy_config = {'log-processor': { |
1622 | + |
1623 | + } |
1624 | + } |
1625 | + |
1626 | + def test_access_log_line_parser(self): |
1627 | + access_proxy_config = self.proxy_config |
1628 | + access_proxy_config.update({ |
1629 | + 'log-processor-access': { |
1630 | + 'source_filename_format':'%Y%m%d%H*', |
1631 | + 'class_path': |
1632 | + 'swift.stats.access_processor.AccessLogProcessor' |
1633 | + }}) |
1634 | + p = log_processor.LogProcessor(access_proxy_config, DumbLogger()) |
1635 | + result = p.plugins['access']['instance'].log_line_parser(self.access_test_line) |
1636 | + self.assertEquals(result, {'code': 200, |
1637 | + 'processing_time': '0.0262', |
1638 | + 'auth_token': 'tk4e350daf-9338-4cc6-aabb-090e49babfbd', |
1639 | + 'month': '07', |
1640 | + 'second': '30', |
1641 | + 'year': '2010', |
1642 | + 'query': 'format=json&foo', |
1643 | + 'tz': '+0000', |
1644 | + 'http_version': 'HTTP/1.0', |
1645 | + 'object_name': 'bar', |
1646 | + 'etag': '-', |
1647 | + 'foo': 1, |
1648 | + 'method': 'GET', |
1649 | + 'trans_id': 'txfa431231-7f07-42fd-8fc7-7da9d8cc1f90', |
1650 | + 'client_ip': '1.2.3.4', |
1651 | + 'format': 1, |
1652 | + 'bytes_out': 95, |
1653 | + 'container_name': 'foo', |
1654 | + 'day': '09', |
1655 | + 'minute': '14', |
1656 | + 'account': 'acct', |
1657 | + 'hour': '04', |
1658 | + 'referrer': '-', |
1659 | + 'request': '/v1/acct/foo/bar', |
1660 | + 'user_agent': 'curl', |
1661 | + 'bytes_in': 6, |
1662 | + 'lb_ip': '4.5.6.7'}) |
1663 | + |
1664 | + def test_process_one_access_file(self): |
1665 | + access_proxy_config = self.proxy_config |
1666 | + access_proxy_config.update({ |
1667 | + 'log-processor-access': { |
1668 | + 'source_filename_format':'%Y%m%d%H*', |
1669 | + 'class_path': |
1670 | + 'swift.stats.access_processor.AccessLogProcessor' |
1671 | + }}) |
1672 | + p = log_processor.LogProcessor(access_proxy_config, DumbLogger()) |
1673 | + def get_object_data(*a, **kw): |
1674 | + return [self.access_test_line] |
1675 | + p.get_object_data = get_object_data |
1676 | + result = p.process_one_file('access', 'a', 'c', 'o') |
1677 | + expected = {('acct', '2010', '07', '09', '04'): |
1678 | + {('public', 'object', 'GET', '2xx'): 1, |
1679 | + ('public', 'bytes_out'): 95, |
1680 | + 'marker_query': 0, |
1681 | + 'format_query': 1, |
1682 | + 'delimiter_query': 0, |
1683 | + 'path_query': 0, |
1684 | + ('public', 'bytes_in'): 6, |
1685 | + 'prefix_query': 0}} |
1686 | + self.assertEquals(result, expected) |
1687 | + |
1688 | + def test_get_container_listing(self): |
1689 | + p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) |
1690 | + p._internal_proxy = DumbInternalProxy() |
1691 | + result = p.get_container_listing('a', 'foo') |
1692 | + expected = ['2010/03/14/13/obj1'] |
1693 | + self.assertEquals(result, expected) |
1694 | + result = p.get_container_listing('a', 'foo', listing_filter=expected) |
1695 | + expected = [] |
1696 | + self.assertEquals(result, expected) |
1697 | + result = p.get_container_listing('a', 'foo', start_date='2010031412', |
1698 | + end_date='2010031414') |
1699 | + expected = ['2010/03/14/13/obj1'] |
1700 | + self.assertEquals(result, expected) |
1701 | + result = p.get_container_listing('a', 'foo', start_date='2010031414') |
1702 | + expected = [] |
1703 | + self.assertEquals(result, expected) |
1704 | + result = p.get_container_listing('a', 'foo', start_date='2010031410', |
1705 | + end_date='2010031412') |
1706 | + expected = [] |
1707 | + self.assertEquals(result, expected) |
1708 | + |
1709 | + def test_get_object_data(self): |
1710 | + p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) |
1711 | + p._internal_proxy = DumbInternalProxy() |
1712 | + result = list(p.get_object_data('a', 'c', 'o', False)) |
1713 | + expected = ['obj','data'] |
1714 | + self.assertEquals(result, expected) |
1715 | + result = list(p.get_object_data('a', 'c', 'o.gz', True)) |
1716 | + self.assertEquals(result, expected) |
1717 | + |
1718 | + def test_get_stat_totals(self): |
1719 | + stats_proxy_config = self.proxy_config |
1720 | + stats_proxy_config.update({ |
1721 | + 'log-processor-stats': { |
1722 | + 'class_path': |
1723 | + 'swift.stats.stats_processor.StatsLogProcessor' |
1724 | + }}) |
1725 | + p = log_processor.LogProcessor(stats_proxy_config, DumbLogger()) |
1726 | + p._internal_proxy = DumbInternalProxy() |
1727 | + def get_object_data(*a,**kw): |
1728 | + return [self.stats_test_line] |
1729 | + p.get_object_data = get_object_data |
1730 | + result = p.process_one_file('stats', 'a', 'c', 'y/m/d/h/o') |
1731 | + expected = {('account', 'y', 'm', 'd', 'h'): |
1732 | + {'replica_count': 1, |
1733 | + 'object_count': 2, |
1734 | + 'container_count': 1, |
1735 | + 'bytes_used': 3}} |
1736 | + self.assertEquals(result, expected) |
Just a couple of quick comments:
1. The bins should be refactored to be more DRY like the current bins
2. pep8 :)