Merge lp:~notmyname/swift/stats_system into lp:~hudson-openstack/swift/trunk
- stats_system
- Merge into trunk
Status: | Superseded |
---|---|
Proposed branch: | lp:~notmyname/swift/stats_system |
Merge into: | lp:~hudson-openstack/swift/trunk |
Diff against target: |
1142 lines (+1090/-0) 10 files modified
bin/swift-account-stats-logger.py (+81/-0) bin/swift-log-uploader (+83/-0) etc/log-processing.conf-sample (+28/-0) swift/common/compressed_file_reader.py (+72/-0) swift/common/internal_proxy.py (+174/-0) swift/stats/access_processor.py (+168/-0) swift/stats/account_stats.py (+69/-0) swift/stats/log_processor.py (+226/-0) swift/stats/log_uploader.py (+135/-0) swift/stats/stats_processor.py (+54/-0) |
To merge this branch: | bzr merge lp:~notmyname/swift/stats_system |
Related bugs: | |
Related blueprints: |
Stats system
(High)
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Swift Core security contacts | Pending | ||
Review via email: mp+32502@code.launchpad.net |
This proposal supersedes a proposal from 2010-08-06.
This proposal has been superseded by a proposal from 2010-08-14.
Commit message
Description of the change
work in progress of moving the existing internal (pre-openstack) swift stats system to (openstack) swift
- 53. By John Dickinson
-
added access log processing plugin
- 54. By John Dickinson
-
merged with trunk
- 55. By John Dickinson
-
merged with trunk
- 56. By John Dickinson
-
merged with trunk
- 57. By John Dickinson
-
merged with trunk
- 58. By John Dickinson
-
merged with trunk
- 59. By John Dickinson
-
merged with trunk
- 60. By John Dickinson
-
initial tests for the stats system
- 61. By John Dickinson
-
first test working
- 62. By John Dickinson
-
merged with trunk
- 63. By John Dickinson
-
access log parsing tests pass
- 64. By John Dickinson
-
added (working) stats tests
- 65. By John Dickinson
-
merged with trunk (utils fix)
- 66. By John Dickinson
-
updated stats binaries to be DRY compliant
- 67. By John Dickinson
-
merged with trunk
- 68. By John Dickinson
-
set up log-stats-collector as a daemon process to create csv files
- 69. By John Dickinson
-
merged with trunk
- 70. By John Dickinson
-
added execute perms to stats processor binaries
- 71. By John Dickinson
-
updated config file loading to work with paste.deploy configs
- 72. By John Dickinson
-
fixed typos
- 73. By John Dickinson
-
fixed internal proxy loading
- 74. By John Dickinson
-
made a memcache stub for the internal proxy server
- 75. By John Dickinson
-
fixed internal proxy put_container reference
- 76. By John Dickinson
-
fixed some log uploading glob patterns
- 77. By John Dickinson
-
fixed bug in calculating offsets for filename patterns
- 78. By John Dickinson
-
fixed typos in log processor
- 79. By John Dickinson
-
fixed get_data_list in log_processor
- 80. By John Dickinson
-
added some debug output
- 81. By John Dickinson
-
fixed logging and log uploading
- 82. By John Dickinson
-
fixed copy/paste errors and missing imports
- 83. By John Dickinson
-
added error handling and missing return statement
- 84. By John Dickinson
-
handled some typos and better handling of missing data in internal proxy
- 85. By John Dickinson
-
fixed tests, typos, and added error handling
- 86. By John Dickinson
-
fixed bug in account stats log processing
- 87. By John Dickinson
-
fixed listing filter in log processing
- 88. By John Dickinson
-
fixed stdout capturing for generating csv files
- 89. By John Dickinson
-
fixed replica count reporting error
- 90. By John Dickinson
-
fixed lookback in log processor
- 91. By John Dickinson
-
merged with trunk
- 92. By John Dickinson
-
merged with trunk
- 93. By John Dickinson
-
fixed tests to account for changed key name
- 94. By John Dickinson
-
fixed test bug
- 95. By John Dickinson
-
updated with changes and suggestions from code review
- 96. By John Dickinson
-
merged with changes from trunk
- 97. By John Dickinson
-
added stats overview
- 98. By John Dickinson
-
updated with changes from trunk
- 99. By John Dickinson
-
added additional docs
- 100. By John Dickinson
-
documentation clarification and pep8 fixes
- 101. By John Dickinson
-
added overview stats to the doc index
- 102. By John Dickinson
-
made long lines wrap (grr pep8)
- 103. By John Dickinson
-
merged with trunk
- 104. By John Dickinson
-
merged with trunk
- 105. By John Dickinson
-
merged with trunk
- 106. By John Dickinson
-
fixed stats docs
- 107. By John Dickinson
-
added a bad lines check to the access log parser
- 108. By John Dickinson
-
added tests for compressing file reader
- 109. By John Dickinson
-
fixed compressing file reader test
- 110. By John Dickinson
-
fixed compressing file reader test
- 111. By John Dickinson
-
improved compressing file reader test
- 112. By John Dickinson
-
fixed compressing file reader test
- 113. By John Dickinson
-
made try/except much less inclusive in access log processor
- 114. By John Dickinson
-
added keylist mapping tests and fixed other tests
- 115. By John Dickinson
-
updated stats system tests
- 116. By John Dickinson
-
merged with gholt's test stubs
- 117. By John Dickinson
-
updated SAIO instructions for the stats system
- 118. By John Dickinson
-
fixed stats system saio docs
- 119. By John Dickinson
-
fixed stats system saio docs
- 120. By John Dickinson
-
added openstack copyright/license to test_log_
processor. py - 121. By John Dickinson
-
improved logging in log processors
- 122. By John Dickinson
-
merged with trunk
- 123. By John Dickinson
-
fixed missing working directory bug in account stats
- 124. By John Dickinson
-
fixed readconf parameter that was broken with a previous merge
- 125. By John Dickinson
-
updated setup.py and saio docs for syats system
- 126. By John Dickinson
-
added readconf unit test
- 127. By John Dickinson
-
updated stats saio docs to create logs with the appropriate permissions
- 128. By John Dickinson
-
fixed bug in log processor internal proxy lazy load code
- 129. By John Dickinson
-
updated readconf test
- 130. By John Dickinson
-
updated readconf test
- 131. By John Dickinson
-
updated readconf test
- 132. By John Dickinson
-
fixed internal proxy references in log processor
- 133. By John Dickinson
-
fixed account stats filename creation
- 134. By John Dickinson
-
pep8 tomfoolery
- 135. By John Dickinson
-
moved paren
- 136. By John Dickinson
-
added lazy load of internal proxy to log processor (you were right clay)
Unmerged revisions
Preview Diff
1 | === added file 'bin/swift-account-stats-logger.py' |
2 | --- bin/swift-account-stats-logger.py 1970-01-01 00:00:00 +0000 |
3 | +++ bin/swift-account-stats-logger.py 2010-08-14 18:41:02 +0000 |
4 | @@ -0,0 +1,81 @@ |
5 | +#!/usr/bin/python |
6 | +# Copyright (c) 2010 OpenStack, LLC. |
7 | +# |
8 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
9 | +# you may not use this file except in compliance with the License. |
10 | +# You may obtain a copy of the License at |
11 | +# |
12 | +# http://www.apache.org/licenses/LICENSE-2.0 |
13 | +# |
14 | +# Unless required by applicable law or agreed to in writing, software |
15 | +# distributed under the License is distributed on an "AS IS" BASIS, |
16 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
17 | +# implied. |
18 | +# See the License for the specific language governing permissions and |
19 | +# limitations under the License. |
20 | + |
21 | +import os |
22 | +import signal |
23 | +import sys |
24 | +import time |
25 | +from ConfigParser import ConfigParser |
26 | + |
27 | +from swift.account_stats import AccountStat |
28 | +from swift.common import utils |
29 | + |
30 | +if __name__ == '__main__': |
31 | + if len(sys.argv) < 2: |
32 | + print "Usage: swift-account-stats-logger CONFIG_FILE" |
33 | + sys.exit() |
34 | + |
35 | + c = ConfigParser() |
36 | + if not c.read(sys.argv[1]): |
37 | + print "Unable to read config file." |
38 | + sys.exit(1) |
39 | + |
40 | + if c.has_section('log-processor-stats'): |
41 | + stats_conf = dict(c.items('log-processor-stats')) |
42 | + else: |
43 | + print "Unable to find log-processor-stats config section in %s." % \ |
44 | + sys.argv[1] |
45 | + sys.exit(1) |
46 | + |
47 | + # reference this from the account stats conf |
48 | + |
49 | + target_dir = stats.conf.get('log_dir', '/var/log/swift') |
50 | + account_server_conf_loc = stats_conf.get('account_server_conf', |
51 | + '/etc/swift/account-server.conf') |
52 | + filename_format = stats.conf['source_filename_format'] |
53 | + try: |
54 | + c = ConfigParser() |
55 | + c.read(account_server_conf_loc) |
56 | + account_server_conf = dict(c.items('account-server')) |
57 | + except: |
58 | + print "Unable to load account server conf from %s" % account_server_conf_loc |
59 | + sys.exit(1) |
60 | + |
61 | + utils.drop_privileges(account_server_conf.get('user', 'swift')) |
62 | + |
63 | + try: |
64 | + os.setsid() |
65 | + except OSError: |
66 | + pass |
67 | + |
68 | + logger = utils.get_logger(stats_conf, 'swift-account-stats-logger') |
69 | + |
70 | + def kill_children(*args): |
71 | + signal.signal(signal.SIGTERM, signal.SIG_IGN) |
72 | + os.killpg(0, signal.SIGTERM) |
73 | + sys.exit() |
74 | + |
75 | + signal.signal(signal.SIGTERM, kill_children) |
76 | + |
77 | + stats = AccountStat(filename_format, |
78 | + target_dir, |
79 | + account_server_conf, |
80 | + logger) |
81 | + logger.info("Gathering account stats") |
82 | + start = time.time() |
83 | + stats.find_and_process() |
84 | + logger.info("Gathering account stats complete (%0.2f minutes)" % |
85 | + ((time.time()-start)/60)) |
86 | |
87 | === added file 'bin/swift-log-uploader' |
88 | --- bin/swift-log-uploader 1970-01-01 00:00:00 +0000 |
89 | +++ bin/swift-log-uploader 2010-08-14 18:41:02 +0000 |
90 | @@ -0,0 +1,83 @@ |
91 | +#!/usr/bin/python |
92 | +# Copyright (c) 2010 OpenStack, LLC. |
93 | +# |
94 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
95 | +# you may not use this file except in compliance with the License. |
96 | +# You may obtain a copy of the License at |
97 | +# |
98 | +# http://www.apache.org/licenses/LICENSE-2.0 |
99 | +# |
100 | +# Unless required by applicable law or agreed to in writing, software |
101 | +# distributed under the License is distributed on an "AS IS" BASIS, |
102 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
103 | +# implied. |
104 | +# See the License for the specific language governing permissions and |
105 | +# limitations under the License. |
106 | + |
107 | +import os |
108 | +import signal |
109 | +import sys |
110 | +import time |
111 | +from ConfigParser import ConfigParser |
112 | + |
113 | +from swift.stats.log_uploader import LogUploader |
114 | +from swift.common.utils import get_logger |
115 | + |
116 | +if __name__ == '__main__': |
117 | + if len(sys.argv) < 3: |
118 | + print "Usage: swift-log-uploader CONFIG_FILE plugin" |
119 | + sys.exit() |
120 | + |
121 | + c = ConfigParser() |
122 | + if not c.read(sys.argv[1]): |
123 | + print "Unable to read config file." |
124 | + sys.exit(1) |
125 | + |
126 | + if c.has_section('log-processor'): |
127 | + parser_conf = dict(c.items('log-processor')) |
128 | + else: |
129 | + print "Unable to find log-processor config section in %s." % sys.argv[1] |
130 | + sys.exit(1) |
131 | + |
132 | + plugin = sys.argv[2] |
133 | + section_name = 'log-processor-%s' % plugin |
134 | + if c.has_section(section_name): |
135 | + uploader_conf.update(dict(c.items(section_name))) |
136 | + else: |
137 | + print "Unable to find %s config section in %s." % (section_name, |
138 | + sys.argv[1]) |
139 | + sys.exit(1) |
140 | + |
141 | + try: |
142 | + os.setsid() |
143 | + except OSError: |
144 | + pass |
145 | + |
146 | + logger = get_logger(uploader_conf, 'swift-log-uploader') |
147 | + |
148 | + def kill_children(*args): |
149 | + signal.signal(signal.SIGTERM, signal.SIG_IGN) |
150 | + os.killpg(0, signal.SIGTERM) |
151 | + sys.exit() |
152 | + |
153 | + signal.signal(signal.SIGTERM, kill_children) |
154 | + |
155 | + log_dir = uploader_conf.get('log_dir', '/var/log/swift/') |
156 | + swift_account = uploader_conf['swift_account'] |
157 | + container_name = uploader_conf['container_name'] |
158 | + source_filename_format = uploader_conf['source_filename_format'] |
159 | + proxy_server_conf_loc = uploader_conf.get('proxy_server_conf', |
160 | + '/etc/swift/proxy-server.conf') |
161 | + try: |
162 | + c = ConfigParser() |
163 | + c.read(proxy_server_conf_loc) |
164 | + proxy_server_conf = dict(c.items('proxy-server')) |
165 | + except: |
166 | + proxy_server_conf = None |
167 | + uploader = LogUploader(log_dir, swift_account, container_name, |
168 | + source_filename_format, proxy_server_conf, logger) |
169 | + logger.info("Uploading logs") |
170 | + start = time.time() |
171 | + uploader.upload_all_logs() |
172 | + logger.info("Uploading logs complete (%0.2f minutes)" % |
173 | + ((time.time()-start)/60)) |
174 | |
175 | === added file 'etc/log-processing.conf-sample' |
176 | --- etc/log-processing.conf-sample 1970-01-01 00:00:00 +0000 |
177 | +++ etc/log-processing.conf-sample 2010-08-14 18:41:02 +0000 |
178 | @@ -0,0 +1,28 @@ |
179 | +# plugin section format is named "log-processor-<plugin>" |
180 | +# section "log-processor" is the generic defaults (overridden by plugins) |
181 | + |
182 | +[log-processor] |
183 | +# working_dir = /tmp/swift/ |
184 | +# proxy_server_conf = /etc/swift/proxy-server.conf |
185 | +# log_facility = LOG_LOCAL0 |
186 | +# log_level = INFO |
187 | +# lookback_hours = 120 |
188 | +# lookback_window = 120 |
189 | + |
190 | +[log-processor-access] |
191 | +# log_dir = /var/log/swift/ |
192 | +swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 |
193 | +container_name = log_data |
194 | +source_filename_format = %Y%m%d%H* |
195 | +class_path = swift.stats.access_processor.AccessLogProcessor |
196 | +# service ips is for client ip addresses that should be counted as servicenet |
197 | +# service_ips = |
198 | +# server_name = proxy |
199 | + |
200 | +[log-processor-stats] |
201 | +# log_dir = /var/log/swift/ |
202 | +swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 |
203 | +container_name = account_stats |
204 | +source_filename_format = %Y%m%d%H* |
205 | +class_path = swift.stats.stats_processor.StatsLogProcessor |
206 | +# account_server_conf = /etc/swift/account-server.conf |
207 | |
208 | === added file 'swift/common/compressed_file_reader.py' |
209 | --- swift/common/compressed_file_reader.py 1970-01-01 00:00:00 +0000 |
210 | +++ swift/common/compressed_file_reader.py 2010-08-14 18:41:02 +0000 |
211 | @@ -0,0 +1,72 @@ |
212 | +# Copyright (c) 2010 OpenStack, LLC. |
213 | +# |
214 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
215 | +# you may not use this file except in compliance with the License. |
216 | +# You may obtain a copy of the License at |
217 | +# |
218 | +# http://www.apache.org/licenses/LICENSE-2.0 |
219 | +# |
220 | +# Unless required by applicable law or agreed to in writing, software |
221 | +# distributed under the License is distributed on an "AS IS" BASIS, |
222 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
223 | +# implied. |
224 | +# See the License for the specific language governing permissions and |
225 | +# limitations under the License. |
226 | + |
227 | +import zlib |
228 | +import struct |
229 | + |
230 | + |
231 | +class CompressedFileReader(object): |
232 | + ''' |
233 | + Wraps a file object and provides a read method that returns gzip'd data. |
234 | + |
235 | + One warning: if read is called with a small value, the data returned may |
236 | + be bigger than the value. In this case, the "compressed" data will be |
237 | + bigger than the original data. To solve this, use a bigger read buffer. |
238 | + |
239 | + An example use case: |
240 | + Given an uncompressed file on disk, provide a way to read compressed data |
241 | + without buffering the entire file data in memory. Using this class, an |
242 | + uncompressed log file could be uploaded as compressed data with chunked |
243 | + transfer encoding. |
244 | + |
245 | + gzip header and footer code taken from the python stdlib gzip module |
246 | + |
247 | + :param file_obj: File object to read from |
248 | + :param compresslevel: compression level |
249 | + ''' |
250 | + def __init__(self, file_obj, compresslevel=9): |
251 | + self._f = file_obj |
252 | + self._compressor = zlib.compressobj(compresslevel, |
253 | + zlib.DEFLATED, |
254 | + -zlib.MAX_WBITS, |
255 | + zlib.DEF_MEM_LEVEL, |
256 | + 0) |
257 | + self.done = False |
258 | + self.first = True |
259 | + self.crc32 = 0 |
260 | + self.total_size = 0 |
261 | + |
262 | + def read(self, *a, **kw): |
263 | + if self.done: |
264 | + return '' |
265 | + x = self._f.read(*a, **kw) |
266 | + if x: |
267 | + self.crc32 = zlib.crc32(x, self.crc32) & 0xffffffffL |
268 | + self.total_size += len(x) |
269 | + compressed = self._compressor.compress(x) |
270 | + if not compressed: |
271 | + compressed = self._compressor.flush(zlib.Z_SYNC_FLUSH) |
272 | + else: |
273 | + compressed = self._compressor.flush(zlib.Z_FINISH) |
274 | + crc32 = struct.pack("<L", self.crc32 & 0xffffffffL) |
275 | + size = struct.pack("<L", self.total_size & 0xffffffffL) |
276 | + footer = crc32 + size |
277 | + compressed += footer |
278 | + self.done = True |
279 | + if self.first: |
280 | + self.first = False |
281 | + header = '\037\213\010\000\000\000\000\000\002\377' |
282 | + compressed = header + compressed |
283 | + return compressed |
284 | |
285 | === added file 'swift/common/internal_proxy.py' |
286 | --- swift/common/internal_proxy.py 1970-01-01 00:00:00 +0000 |
287 | +++ swift/common/internal_proxy.py 2010-08-14 18:41:02 +0000 |
288 | @@ -0,0 +1,174 @@ |
289 | +# Copyright (c) 2010 OpenStack, LLC. |
290 | +# |
291 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
292 | +# you may not use this file except in compliance with the License. |
293 | +# You may obtain a copy of the License at |
294 | +# |
295 | +# http://www.apache.org/licenses/LICENSE-2.0 |
296 | +# |
297 | +# Unless required by applicable law or agreed to in writing, software |
298 | +# distributed under the License is distributed on an "AS IS" BASIS, |
299 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
300 | +# implied. |
301 | +# See the License for the specific language governing permissions and |
302 | +# limitations under the License. |
303 | + |
304 | +import webob |
305 | +from urllib import quote, unquote |
306 | +from json import loads as json_loads |
307 | + |
308 | +from swift.common.compressed_file_reader import CompressedFileReader |
309 | +from swift.proxy.server import BaseApplication |
310 | + |
311 | + |
312 | +class InternalProxy(object): |
313 | + """ |
314 | + Set up a private instance of a proxy server that allows normal requests |
315 | + to be made without having to actually send the request to the proxy. |
316 | + This also doesn't log the requests to the normal proxy logs. |
317 | + |
318 | + :param proxy_server_conf: proxy server configuration dictionary |
319 | + :param logger: logger to log requests to |
320 | + :param retries: number of times to retry each request |
321 | + """ |
322 | + def __init__(self, proxy_server_conf=None, logger=None, retries=0): |
323 | + self.upload_app = BaseApplication(proxy_server_conf, logger) |
324 | + self.retries = retries |
325 | + |
326 | + def upload_file(self, source_file, account, container, object_name, |
327 | + compress=True, content_type='application/x-gzip'): |
328 | + """ |
329 | + Upload a file to cloud files. |
330 | + |
331 | + :param source_file: path to or file like object to upload |
332 | + :param account: account to upload to |
333 | + :param container: container to upload to |
334 | + :param object_name: name of object being uploaded |
335 | + :param compress: if True, compresses object as it is uploaded |
336 | + :param content_type: content-type of object |
337 | + :returns: True if successful, False otherwise |
338 | + """ |
339 | + log_create_pattern = '/v1/%s/%s/%s' % (account, container, object_name) |
340 | + |
341 | + # create the container |
342 | + if not self.put_container(account, container): |
343 | + return False |
344 | + |
345 | + # upload the file to the account |
346 | + req = webob.Request.blank(log_create_pattern, |
347 | + environ={'REQUEST_METHOD': 'PUT'}, |
348 | + headers={'Transfer-Encoding': 'chunked'}) |
349 | + if compress: |
350 | + if hasattr(source_file, 'read'): |
351 | + compressed_file = CompressedFileReader(source_file) |
352 | + else: |
353 | + compressed_file = CompressedFileReader(open(source_file, 'rb')) |
354 | + req.body_file = compressed_file |
355 | + else: |
356 | + if not hasattr(source_file, 'read'): |
357 | + source_file = open(source_file, 'rb') |
358 | + req.body_file = source_file |
359 | + req.account = account |
360 | + req.content_type = content_type |
361 | + req.content_length = None # to make sure we send chunked data |
362 | + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) |
363 | + tries = 1 |
364 | + while (resp.status_int < 200 or resp.status_int > 299) \ |
365 | + and tries <= self.retries: |
366 | + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) |
367 | + tries += 1 |
368 | + if not (200 <= resp.status_int < 300): |
369 | + return False |
370 | + return True |
371 | + |
372 | + def get_object(self, account, container, object_name): |
373 | + """ |
374 | + Get object. |
375 | + |
376 | + :param account: account name object is in |
377 | + :param container: container name object is in |
378 | + :param object_name: name of object to get |
379 | + :returns: iterator for object data |
380 | + """ |
381 | + req = webob.Request.blank('/v1/%s/%s/%s' % |
382 | + (account, container, object_name), |
383 | + environ={'REQUEST_METHOD': 'GET'}) |
384 | + req.account = account |
385 | + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) |
386 | + tries = 1 |
387 | + while (resp.status_int < 200 or resp.status_int > 299) \ |
388 | + and tries <= self.retries: |
389 | + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) |
390 | + tries += 1 |
391 | + for x in resp.app_iter: |
392 | + yield x |
393 | + |
394 | + def create_container(self, account, container): |
395 | + """ |
396 | + Create container. |
397 | + |
398 | + :param account: account name to put the container in |
399 | + :param container: container name to create |
400 | + :returns: True if successful, otherwise False |
401 | + """ |
402 | + req = webob.Request.blank('/v1/%s/%s' % (account, container), |
403 | + environ={'REQUEST_METHOD': 'PUT'}) |
404 | + req.account = account |
405 | + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) |
406 | + tries = 1 |
407 | + while (resp.status_int < 200 or resp.status_int > 299) \ |
408 | + and tries <= self.retries: |
409 | + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) |
410 | + tries += 1 |
411 | + return 200 <= resp.status_int < 300 |
412 | + |
413 | + def get_container_list(self, account, container, marker=None, limit=None, |
414 | + prefix=None, delimiter=None, full_listing=True): |
415 | + """ |
416 | + Get container listing. |
417 | + |
418 | + :param account: account name for the container |
419 | + :param container: container name to get the listing of |
420 | + :param marker: marker query |
421 | + :param limit: limit to query |
422 | + :param prefix: prefix query |
423 | + :param delimeter: delimeter for query |
424 | + :param full_listing: if True, make enough requests to get all listings |
425 | + :returns: list of objects |
426 | + """ |
427 | + if full_listing: |
428 | + rv = [] |
429 | + listing = self.get_container_list(account, container, marker, |
430 | + limit, prefix, delimiter, full_listing=False) |
431 | + while listing: |
432 | + rv.extend(listing) |
433 | + if not delimiter: |
434 | + marker = listing[-1]['name'] |
435 | + else: |
436 | + marker = listing[-1].get('name', listing[-1].get('subdir')) |
437 | + listing = self.get_container_list(account, container, marker, |
438 | + limit, prefix, delimiter, full_listing=False) |
439 | + return rv |
440 | + path = '/v1/%s/%s' % (account, container) |
441 | + qs = 'format=json' |
442 | + if marker: |
443 | + qs += '&marker=%s' % quote(marker) |
444 | + if limit: |
445 | + qs += '&limit=%d' % limit |
446 | + if prefix: |
447 | + qs += '&prefix=%s' % quote(prefix) |
448 | + if delimiter: |
449 | + qs += '&delimiter=%s' % quote(delimiter) |
450 | + path += '?%s' % qs |
451 | + req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'GET'}) |
452 | + req.account = account |
453 | + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) |
454 | + tries = 1 |
455 | + while (resp.status_int < 200 or resp.status_int > 299) \ |
456 | + and tries <= self.retries: |
457 | + resp = self.upload_app.handle_request(self.upload_app.update_request(req)) |
458 | + tries += 1 |
459 | + if resp.status_int == 204: |
460 | + return [] |
461 | + if 200 <= resp.status_int < 300: |
462 | + return json_loads(resp.body) |
463 | |
464 | === added directory 'swift/stats' |
465 | === added file 'swift/stats/__init__.py' |
466 | === added file 'swift/stats/access_processor.py' |
467 | --- swift/stats/access_processor.py 1970-01-01 00:00:00 +0000 |
468 | +++ swift/stats/access_processor.py 2010-08-14 18:41:02 +0000 |
469 | @@ -0,0 +1,168 @@ |
470 | +# Copyright (c) 2010 OpenStack, LLC. |
471 | +# |
472 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
473 | +# you may not use this file except in compliance with the License. |
474 | +# You may obtain a copy of the License at |
475 | +# |
476 | +# http://www.apache.org/licenses/LICENSE-2.0 |
477 | +# |
478 | +# Unless required by applicable law or agreed to in writing, software |
479 | +# distributed under the License is distributed on an "AS IS" BASIS, |
480 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
481 | +# implied. |
482 | +# See the License for the specific language governing permissions and |
483 | +# limitations under the License. |
484 | + |
485 | +class AccessLogProcessor(object): |
486 | + |
487 | + def __init__(self, conf): |
488 | + self.server_name = conf.get('server_name', 'proxy') |
489 | + |
490 | + def _log_line_parser(self, raw_log): |
491 | + '''given a raw access log line, return a dict of the good parts''' |
492 | + d = {} |
493 | + try: |
494 | + (_, |
495 | + server, |
496 | + client_ip, |
497 | + lb_ip, |
498 | + timestamp, |
499 | + method, |
500 | + request, |
501 | + http_version, |
502 | + code, |
503 | + referrer, |
504 | + user_agent, |
505 | + auth_token, |
506 | + bytes_in, |
507 | + bytes_out, |
508 | + etag, |
509 | + trans_id, |
510 | + headers, |
511 | + processing_time) = (unquote(x) for x in raw_log[16:].split(' ')) |
512 | + if server != self.server_name: |
513 | + raise ValueError('incorrect server name in log line') |
514 | + (version, |
515 | + account, |
516 | + container_name, |
517 | + object_name) = split_path(request, 2, 4, True) |
518 | + if container_name is not None: |
519 | + container_name = container_name.split('?', 1)[0] |
520 | + if object_name is not None: |
521 | + object_name = object_name.split('?', 1)[0] |
522 | + account = account.split('?', 1)[0] |
523 | + query = None |
524 | + if '?' in request: |
525 | + request, query = request.split('?', 1) |
526 | + args = query.split('&') |
527 | + # Count each query argument. This is used later to aggregate |
528 | + # the number of format, prefix, etc. queries. |
529 | + for q in args: |
530 | + if '=' in q: |
531 | + k, v = q.split('=', 1) |
532 | + else: |
533 | + k = q |
534 | + # Certain keys will get summmed in stats reporting |
535 | + # (format, path, delimiter, etc.). Save a "1" here |
536 | + # to indicate that this request is 1 request for |
537 | + # its respective key. |
538 | + d[k] = 1 |
539 | + except ValueError: |
540 | + pass |
541 | + else: |
542 | + d['client_ip'] = client_ip |
543 | + d['lb_ip'] = lb_ip |
544 | + d['method'] = method |
545 | + d['request'] = request |
546 | + if query: |
547 | + d['query'] = query |
548 | + d['http_version'] = http_version |
549 | + d['code'] = code |
550 | + d['referrer'] = referrer |
551 | + d['user_agent'] = user_agent |
552 | + d['auth_token'] = auth_token |
553 | + d['bytes_in'] = bytes_in |
554 | + d['bytes_out'] = bytes_out |
555 | + d['etag'] = etag |
556 | + d['trans_id'] = trans_id |
557 | + d['processing_time'] = processing_time |
558 | + day, month, year, hour, minute, second = timestamp.split('/') |
559 | + d['day'] = day |
560 | + month = ('%02s' % month_map.index(month)).replace(' ', '0') |
561 | + d['month'] = month |
562 | + d['year'] = year |
563 | + d['hour'] = hour |
564 | + d['minute'] = minute |
565 | + d['second'] = second |
566 | + d['tz'] = '+0000' |
567 | + d['account'] = account |
568 | + d['container_name'] = container_name |
569 | + d['object_name'] = object_name |
570 | + d['bytes_out'] = int(d['bytes_out'].replace('-','0')) |
571 | + d['bytes_in'] = int(d['bytes_in'].replace('-','0')) |
572 | + d['code'] = int(d['code']) |
573 | + return d |
574 | + |
575 | + def process(self, obj_stream): |
576 | + '''generate hourly groupings of data from one access log file''' |
577 | + hourly_aggr_info = {} |
578 | + aggr_account_logs = {} |
579 | + container_line_counts = collections.defaultdict(int) |
580 | + log_buffer = collections.defaultdict(list) |
581 | + for line in obj_stream: |
582 | + line_data = self._log_line_parser(line) |
583 | + if not line_data: |
584 | + continue |
585 | + account = line_data['account'] |
586 | + container_name = line_data['container_name'] |
587 | + year = line_data['year'] |
588 | + month = line_data['month'] |
589 | + day = line_data['day'] |
590 | + hour = line_data['hour'] |
591 | + bytes_out = line_data['bytes_out'] |
592 | + bytes_in = line_data['bytes_in'] |
593 | + method = line_data['method'] |
594 | + code = int(line_data['code']) |
595 | + object_name = line_data['object_name'] |
596 | + client_ip = line_data['client_ip'] |
597 | + |
598 | + op_level = None |
599 | + if not container_name: |
600 | + op_level = 'account' |
601 | + elif container_name and not object_name: |
602 | + op_level = 'container' |
603 | + elif object_name: |
604 | + op_level = 'object' |
605 | + |
606 | + aggr_key = (account, year, month, day, hour) |
607 | + d = hourly_aggr_info.get(aggr_key, {}) |
608 | + if line_data['lb_ip'] in self.lb_private_ips: |
609 | + source = 'service' |
610 | + else: |
611 | + source = 'public' |
612 | + |
613 | + if line_data['client_ip'] in self.service_ips: |
614 | + source = 'service' |
615 | + |
616 | + d[(source, 'bytes_out')] = d.setdefault((source, 'bytes_out'), 0) + \ |
617 | + bytes_out |
618 | + d[(source, 'bytes_in')] = d.setdefault((source, 'bytes_in'), 0) + \ |
619 | + bytes_in |
620 | + |
621 | + d['format_query'] = d.setdefault('format_query', 0) + \ |
622 | + line_data.get('format', 0) |
623 | + d['marker_query'] = d.setdefault('marker_query', 0) + \ |
624 | + line_data.get('marker', 0) |
625 | + d['prefix_query'] = d.setdefault('prefix_query', 0) + \ |
626 | + line_data.get('prefix', 0) |
627 | + d['delimiter_query'] = d.setdefault('delimiter_query', 0) + \ |
628 | + line_data.get('delimiter', 0) |
629 | + path = line_data.get('path', 0) |
630 | + d['path_query'] = d.setdefault('path_query', 0) + path |
631 | + |
632 | + code = '%dxx' % (code/100) |
633 | + key = (source, op_level, method, code) |
634 | + d[key] = d.setdefault(key, 0) + 1 |
635 | + |
636 | + hourly_aggr_info[aggr_key] = d |
637 | + return hourly_aggr_info, item, aggr_account_logs |
638 | |
639 | === added file 'swift/stats/account_stats.py' |
640 | --- swift/stats/account_stats.py 1970-01-01 00:00:00 +0000 |
641 | +++ swift/stats/account_stats.py 2010-08-14 18:41:02 +0000 |
642 | @@ -0,0 +1,69 @@ |
643 | +# Copyright (c) 2010 OpenStack, LLC. |
644 | +# |
645 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
646 | +# you may not use this file except in compliance with the License. |
647 | +# You may obtain a copy of the License at |
648 | +# |
649 | +# http://www.apache.org/licenses/LICENSE-2.0 |
650 | +# |
651 | +# Unless required by applicable law or agreed to in writing, software |
652 | +# distributed under the License is distributed on an "AS IS" BASIS, |
653 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
654 | +# implied. |
655 | +# See the License for the specific language governing permissions and |
656 | +# limitations under the License. |
657 | + |
658 | +import os |
659 | +import time |
660 | + |
661 | +from swift.account.server import DATADIR as account_server_data_dir |
662 | +from swift.common.db import AccountBroker |
663 | +from swift.common.internal_proxy import InternalProxy |
664 | +from swift.common.utils import renamer |
665 | + |
666 | +class AccountStat(object): |
667 | + def __init__(self, filename_format, target_dir, server_conf, logger): |
668 | + self.filename_format = filename_format |
669 | + self.target_dir = target_dir |
670 | + self.devices = server_conf.get('devices', '/srv/node') |
671 | + self.mount_check = server_conf.get('mount_check', 'true').lower() in \ |
672 | + ('true', 't', '1', 'on', 'yes', 'y') |
673 | + self.logger = logger |
674 | + |
675 | + def find_and_process(self): |
676 | + src_filename = time.strftime(self.filename_format) |
677 | + tmp_filename = os.path.join('/tmp', src_filename) |
678 | + with open(tmp_filename, 'wb') as statfile: |
679 | + #statfile.write('Account Name, Container Count, Object Count, Bytes Used, Created At\n') |
680 | + for device in os.listdir(self.devices): |
681 | + if self.mount_check and \ |
682 | + not os.path.ismount(os.path.join(self.devices, device)): |
683 | + self.logger.error("Device %s is not mounted, skipping." % |
684 | + device) |
685 | + continue |
686 | + accounts = os.path.join(self.devices, |
687 | + device, |
688 | + account_server_data_dir) |
689 | + if not os.path.exists(accounts): |
690 | + self.logger.debug("Path %s does not exist, skipping." % |
691 | + accounts) |
692 | + continue |
693 | + for root, dirs, files in os.walk(accounts, topdown=False): |
694 | + for filename in files: |
695 | + if filename.endswith('.db'): |
696 | + broker = AccountBroker(os.path.join(root, filename)) |
697 | + if not broker.is_deleted(): |
698 | + account_name, |
699 | + created_at, |
700 | + _, _, |
701 | + container_count, |
702 | + object_count, |
703 | + bytes_used, |
704 | + _, _ = broker.get_info() |
705 | + line_data = '"%s",%d,%d,%d,%s\n' % (account_name, |
706 | + container_count, |
707 | + object_count, |
708 | + bytes_used, |
709 | + created_at) |
710 | + statfile.write(line_data) |
711 | + renamer(tmp_filename, os.path.join(self.target_dir, src_filename)) |
712 | |
713 | === added file 'swift/stats/log_processor.py' |
714 | --- swift/stats/log_processor.py 1970-01-01 00:00:00 +0000 |
715 | +++ swift/stats/log_processor.py 2010-08-14 18:41:02 +0000 |
716 | @@ -0,0 +1,226 @@ |
717 | +# Copyright (c) 2010 OpenStack, LLC. |
718 | +# |
719 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
720 | +# you may not use this file except in compliance with the License. |
721 | +# You may obtain a copy of the License at |
722 | +# |
723 | +# http://www.apache.org/licenses/LICENSE-2.0 |
724 | +# |
725 | +# Unless required by applicable law or agreed to in writing, software |
726 | +# distributed under the License is distributed on an "AS IS" BASIS, |
727 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
728 | +# implied. |
729 | +# See the License for the specific language governing permissions and |
730 | +# limitations under the License. |
731 | + |
732 | +class LogProcessor(object): |
733 | + |
734 | + def __init__(self, conf, logger): |
735 | + stats_conf = conf.get('log-processor', {}) |
736 | + |
737 | + working_dir = stats_conf.get('working_dir', '/tmp/swift/') |
738 | + if working_dir.endswith('/') and len(working_dir) > 1: |
739 | + working_dir = working_dir[:-1] |
740 | + self.working_dir = working_dir |
741 | + proxy_server_conf_loc = stats_conf.get('proxy_server_conf', |
742 | + '/etc/swift/proxy-server.conf') |
743 | + try: |
744 | + c = ConfigParser() |
745 | + c.read(proxy_server_conf_loc) |
746 | + proxy_server_conf = dict(c.items('proxy-server')) |
747 | + except: |
748 | + proxy_server_conf = None |
749 | + self.proxy_server_conf = proxy_server_conf |
750 | + if isinstance(logger, tuple): |
751 | + self.logger = get_logger(*logger) |
752 | + else: |
753 | + self.logger = logger |
754 | + |
755 | + # load the processing plugins |
756 | + self.plugins = {} |
757 | + plugin_prefix = 'log-processor-' |
758 | + for section in (x for x in conf if x.startswith(plugin_prefix)): |
759 | + plugin_name = section[len(plugin_prefix):] |
760 | + plugin_conf = conf.get(section, {}) |
761 | + self.plugins[plugin_name] = plugin_conf |
762 | + import_target, class_name = plugin_conf['class_path'].rsplit('.', 1) |
763 | + module = __import__(import_target, fromlist=[import_target]) |
764 | + klass = getattr(module, class_name) |
765 | + self.plugins[plugin_name]['instance'] = klass(plugin_conf) |
766 | + |
767 | + def process_one_file(self, plugin_name, account, container, object_name): |
768 | + # get an iter of the object data |
769 | + compressed = object_name.endswith('.gz') |
770 | + stream = self.get_object_data(account, container, object_name, |
771 | + compressed=compressed) |
772 | + # look up the correct plugin and send the stream to it |
773 | + return self.plugins[plugin_name]['instance'].process(stream) |
774 | + |
775 | + def get_data_list(self, start_date=None, end_date=None, listing_filter=None): |
776 | + total_list = [] |
777 | + for p in self.plugins: |
778 | + account = p['swift_account'] |
779 | + container = p['container_name'] |
780 | + l = self.get_container_listing(account, container, start_date, |
781 | + end_date, listing_filter) |
782 | + for i in l: |
783 | + total_list.append((p, account, container, i)) |
784 | + return total_list |
785 | + |
786 | + def get_container_listing(self, swift_account, container_name, start_date=None, |
787 | + end_date=None, listing_filter=None): |
788 | + ''' |
789 | + Get a container listing, filtered by start_date, end_date, and |
790 | + listing_filter. Dates, if given, should be in YYYYMMDDHH format |
791 | + ''' |
792 | + search_key = None |
793 | + if start_date is not None: |
794 | + date_parts = [] |
795 | + try: |
796 | + year, start_date = start_date[:4], start_date[4:] |
797 | + if year: |
798 | + date_parts.append(year) |
799 | + month, start_date = start_date[:2], start_date[2:] |
800 | + if month: |
801 | + date_parts.append(month) |
802 | + day, start_date = start_date[:2], start_date[2:] |
803 | + if day: |
804 | + date_parts.append(day) |
805 | + hour, start_date = start_date[:2], start_date[2:] |
806 | + if hour: |
807 | + date_parts.append(hour) |
808 | + except IndexError: |
809 | + pass |
810 | + else: |
811 | + search_key = '/'.join(date_parts) |
812 | + end_key = None |
813 | + if end_date is not None: |
814 | + date_parts = [] |
815 | + try: |
816 | + year, end_date = end_date[:4], end_date[4:] |
817 | + if year: |
818 | + date_parts.append(year) |
819 | + month, end_date = end_date[:2], end_date[2:] |
820 | + if month: |
821 | + date_parts.append(month) |
822 | + day, end_date = end_date[:2], end_date[2:] |
823 | + if day: |
824 | + date_parts.append(day) |
825 | + hour, end_date = end_date[:2], end_date[2:] |
826 | + if hour: |
827 | + date_parts.append(hour) |
828 | + except IndexError: |
829 | + pass |
830 | + else: |
831 | + end_key = '/'.join(date_parts) |
832 | + container_listing = self.private_proxy.get_container_list( |
833 | + swift_account, |
834 | + container_name, |
835 | + marker=search_key) |
836 | + results = [] |
837 | + if container_listing is not None: |
838 | + if listing_filter is None: |
839 | + listing_filter = set() |
840 | + for item in container_listing: |
841 | + name = item['name'] |
842 | + if end_key and name > end_key: |
843 | + break |
844 | + if name not in listing_filter: |
845 | + results.append(name) |
846 | + return results |
847 | + |
848 | + def get_object_data(self, swift_account, container_name, object_name, |
849 | + compressed=False): |
850 | + '''reads an object and yields its lines''' |
851 | + o = self.private_proxy.get_object(swift_account, |
852 | + container_name, |
853 | + object_name) |
854 | + tmp_file = tempfile.TemporaryFile(dir=self.working_dir) |
855 | + with tmp_file as f: |
856 | + bad_file = False |
857 | + try: |
858 | + for chunk in o: |
859 | + f.write(chunk) |
860 | + except ChunkReadTimeout: |
861 | + bad_file = True |
862 | + if bad_file: |
863 | + raise BadFileDownload() |
864 | + f.flush() |
865 | + f.seek(0) # rewind to start reading |
866 | + last_part = '' |
867 | + last_compressed_part = '' |
868 | + # magic in the following zlib.decompressobj argument is courtesy of |
869 | + # http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk |
870 | + d = zlib.decompressobj(16+zlib.MAX_WBITS) |
871 | + for chunk in iter(lambda: f.read(16384), ''): |
872 | + if compressed: |
873 | + try: |
874 | + chunk = d.decompress(chunk) |
875 | + except zlib.error: |
876 | + raise BadFileDownload() # bad compressed data |
877 | + parts = chunk.split('\n') |
878 | + parts[0] = last_part + parts[0] |
879 | + for part in parts[:-1]: |
880 | + yield part |
881 | + last_part = parts[-1] |
882 | + if last_part: |
883 | + yield last_part |
884 | + |
885 | +def multiprocess_collate(processor_args, |
886 | + start_date=None, |
887 | + end_date=None, |
888 | + listing_filter=None): |
889 | + '''get listing of files and yield hourly data from them''' |
890 | + p = LogProcessor(*processor_args) |
891 | + all_files = p.get_data_list(start_date, end_date, listing_filter) |
892 | + |
893 | + p.logger.info('loaded %d files to process' % len(all_files)) |
894 | + |
895 | + if not all_files: |
896 | + # no work to do |
897 | + return |
898 | + |
899 | + worker_count = multiprocessing.cpu_count() - 1 |
900 | + results = [] |
901 | + in_queue = multiprocessing.Queue() |
902 | + out_queue = multiprocessing.Queue() |
903 | + for _ in range(worker_count): |
904 | + p = multiprocessing.Process(target=collate_worker, |
905 | + args=(processor_args, |
906 | + in_queue, |
907 | + out_queue)) |
908 | + p.start() |
909 | + results.append(p) |
910 | + for x in all_files: |
911 | + in_queue.put(x) |
912 | + for _ in range(worker_count): |
913 | + in_queue.put(None) |
914 | + count = 0 |
915 | + while True: |
916 | + try: |
917 | + item, data = out_queue.get_nowait() |
918 | + count += 1 |
919 | + if data: |
920 | + yield item, data |
921 | + if count >= len(all_files): |
922 | + # this implies that one result will come from every request |
923 | + break |
924 | + except Queue.Empty: |
925 | + time.sleep(.1) |
926 | + for r in results: |
927 | + r.join() |
928 | + |
929 | +def collate_worker(processor_args, in_queue, out_queue): |
930 | + '''worker process for multiprocess_collate''' |
931 | + p = LogProcessor(*processor_args) |
932 | + while True: |
933 | + try: |
934 | + item = in_queue.get_nowait() |
935 | + if item is None: |
936 | + break |
937 | + except Queue.Empty: |
938 | + time.sleep(.1) |
939 | + else: |
940 | + ret = None |
941 | + ret = p.process_one_file(item) |
942 | + out_queue.put((item, ret)) |
943 | \ No newline at end of file |
944 | |
945 | === added file 'swift/stats/log_uploader.py' |
946 | --- swift/stats/log_uploader.py 1970-01-01 00:00:00 +0000 |
947 | +++ swift/stats/log_uploader.py 2010-08-14 18:41:02 +0000 |
948 | @@ -0,0 +1,135 @@ |
949 | +# Copyright (c) 2010 OpenStack, LLC. |
950 | +# |
951 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
952 | +# you may not use this file except in compliance with the License. |
953 | +# You may obtain a copy of the License at |
954 | +# |
955 | +# http://www.apache.org/licenses/LICENSE-2.0 |
956 | +# |
957 | +# Unless required by applicable law or agreed to in writing, software |
958 | +# distributed under the License is distributed on an "AS IS" BASIS, |
959 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
960 | +# implied. |
961 | +# See the License for the specific language governing permissions and |
962 | +# limitations under the License. |
963 | + |
964 | +from __future__ import with_statement |
965 | +import os |
966 | +import hashlib |
967 | +import time |
968 | +import gzip |
969 | +import glob |
970 | + |
971 | +from swift.common.internal_proxy import InternalProxy |
972 | + |
973 | +class LogUploader(object): |
974 | + ''' |
975 | + Given a local directory, a swift account, and a container name, LogParser |
976 | + will upload all files in the local directory to the given account/container. |
977 | + All but the newest files will be uploaded, and the files' md5 sum will be |
978 | + computed. The hash is used to prevent duplicate data from being uploaded |
979 | + multiple times in different files (ex: log lines). Since the hash is |
980 | + computed, it is also used as the uploaded object's etag to ensure data |
981 | + integrity. |
982 | + |
983 | + Note that after the file is successfully uploaded, it will be unlinked. |
984 | + |
985 | + The given proxy server config is used to instantiate a proxy server for |
986 | + the object uploads. |
987 | + ''' |
988 | + |
989 | + def __init__(self, log_dir, swift_account, container_name, filename_format, |
990 | + proxy_server_conf, logger): |
991 | + if not log_dir.endswith('/'): |
992 | + log_dir = log_dir + '/' |
993 | + self.log_dir = log_dir |
994 | + self.swift_account = swift_account |
995 | + self.container_name = container_name |
996 | + self.filename_format = filename_format |
997 | + self.internal_proxy = InternalProxy(proxy_server_conf, logger) |
998 | + self.logger = logger |
999 | + |
1000 | + def upload_all_logs(self): |
1001 | + i = [(c,self.filename_format.index(c)) for c in '%Y %m %d %H'.split()] |
1002 | + i.sort() |
1003 | + year_offset = month_offset = day_offset = hour_offset = None |
1004 | + for c, start in i: |
1005 | + if c == '%Y': |
1006 | + year_offset = start, start+4 |
1007 | + elif c == '%m': |
1008 | + month_offset = start, start+2 |
1009 | + elif c == '%d': |
1010 | + day_offset = start, start+2 |
1011 | + elif c == '%H': |
1012 | + hour_offset = start, start+2 |
1013 | + if not (year_offset and month_offset and day_offset and hour_offset): |
1014 | + # don't have all the parts, can't upload anything |
1015 | + return |
1016 | + glob_pattern = self.filename_format |
1017 | + glob_pattern = glob_pattern.replace('%Y', '????') |
1018 | + glob_pattern = glob_pattern.replace('%m', '??') |
1019 | + glob_pattern = glob_pattern.replace('%d', '??') |
1020 | + glob_pattern = glob_pattern.replace('%H', '??') |
1021 | + filelist = glob.iglob(os.path.join(self.log_dir, glob_pattern)) |
1022 | + current_hour = int(time.strftime('%H')) |
1023 | + today = int(time.strftime('%Y%m%d')) |
1024 | + self.internal_proxy.create_container(self.swift_account, |
1025 | + self.container_name) |
1026 | + for filename in filelist: |
1027 | + try: |
1028 | + # From the filename, we need to derive the year, month, day, |
1029 | + # and hour for the file. These values are used in the uploaded |
1030 | + # object's name, so they should be a reasonably accurate |
1031 | + # representation of the time for which the data in the file was |
1032 | + # collected. The file's last modified time is not a reliable |
1033 | + # representation of the data in the file. For example, an old |
1034 | + # log file (from hour A) may be uploaded or moved into the |
1035 | + # log_dir in hour Z. The file's modified time will be for hour |
1036 | + # Z, and therefore the object's name in the system will not |
1037 | + # represent the data in it. |
1038 | + # If the filename doesn't match the format, it shouldn't be |
1039 | + # uploaded. |
1040 | + year = filename[slice(*year_offset)] |
1041 | + month = filename[slice(*month_offset)] |
1042 | + day = filename[slice(*day_offset)] |
1043 | + hour = filename[slice(*hour_offset)] |
1044 | + except IndexError: |
1045 | + # unexpected filename format, move on |
1046 | + self.logger.error("Unexpected log: %s" % filename) |
1047 | + continue |
1048 | + if (time.time() - os.stat(filename).st_mtime) < 7200: |
1049 | + # don't process very new logs |
1050 | + self.logger.debug("Skipping log: %s (< 2 hours old)" % filename) |
1051 | + continue |
1052 | + self.upload_one_log(filename, year, month, day, hour) |
1053 | + |
1054 | + def upload_one_log(self, filename, year, month, day, hour): |
1055 | + if os.path.getsize(filename) == 0: |
1056 | + self.logger.debug("Log %s is 0 length, skipping" % filename) |
1057 | + return |
1058 | + self.logger.debug("Processing log: %s" % filename) |
1059 | + filehash = hashlib.md5() |
1060 | + already_compressed = True if filename.endswith('.gz') else False |
1061 | + opener = gzip.open if already_compressed else open |
1062 | + f = opener(filename, 'rb') |
1063 | + try: |
1064 | + for line in f: |
1065 | + # filter out bad lines here? |
1066 | + filehash.update(line) |
1067 | + finally: |
1068 | + f.close() |
1069 | + filehash = filehash.hexdigest() |
1070 | + # By adding a hash to the filename, we ensure that uploaded files |
1071 | + # have unique filenames and protect against uploading one file |
1072 | + # more than one time. By using md5, we get an etag for free. |
1073 | + target_filename = '/'.join([year, month, day, hour, filehash+'.gz']) |
1074 | + if self.internal_proxy.upload_file(filename, |
1075 | + self.swift_account, |
1076 | + self.container_name, |
1077 | + target_filename, |
1078 | + compress=(not already_compressed)): |
1079 | + self.logger.debug("Uploaded log %s to %s" % |
1080 | + (filename, target_filename)) |
1081 | + os.unlink(filename) |
1082 | + else: |
1083 | + self.logger.error("ERROR: Upload of log %s failed!" % filename) |
1084 | |
1085 | === added file 'swift/stats/stats_processor.py' |
1086 | --- swift/stats/stats_processor.py 1970-01-01 00:00:00 +0000 |
1087 | +++ swift/stats/stats_processor.py 2010-08-14 18:41:02 +0000 |
1088 | @@ -0,0 +1,54 @@ |
1089 | +# Copyright (c) 2010 OpenStack, LLC. |
1090 | +# |
1091 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
1092 | +# you may not use this file except in compliance with the License. |
1093 | +# You may obtain a copy of the License at |
1094 | +# |
1095 | +# http://www.apache.org/licenses/LICENSE-2.0 |
1096 | +# |
1097 | +# Unless required by applicable law or agreed to in writing, software |
1098 | +# distributed under the License is distributed on an "AS IS" BASIS, |
1099 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
1100 | +# implied. |
1101 | +# See the License for the specific language governing permissions and |
1102 | +# limitations under the License. |
1103 | + |
1104 | +class StatsLogProcessor(object): |
1105 | + |
1106 | + def __init__(self, conf): |
1107 | + pass |
1108 | + |
1109 | + def process(self, obj_stream): |
1110 | + '''generate hourly groupings of data from one stats log file''' |
1111 | + account_totals = {} |
1112 | + year, month, day, hour, _ = item.split('/') |
1113 | + for line in obj_stream: |
1114 | + if not line: |
1115 | + continue |
1116 | + try: |
1117 | + (account, |
1118 | + container_count, |
1119 | + object_count, |
1120 | + bytes_used, |
1121 | + created_at) = line.split(',') |
1122 | + account = account.strip('"') |
1123 | + if account_name and account_name != account: |
1124 | + continue |
1125 | + container_count = int(container_count.strip('"')) |
1126 | + object_count = int(object_count.strip('"')) |
1127 | + bytes_used = int(bytes_used.strip('"')) |
1128 | + aggr_key = (account, year, month, day, hour) |
1129 | + d = account_totals.get(aggr_key, {}) |
1130 | + d['count'] = d.setdefault('count', 0) + 1 |
1131 | + d['container_count'] = d.setdefault('container_count', 0) + \ |
1132 | + container_count |
1133 | + d['object_count'] = d.setdefault('object_count', 0) + \ |
1134 | + object_count |
1135 | + d['bytes_used'] = d.setdefault('bytes_used', 0) + \ |
1136 | + bytes_used |
1137 | + d['created_at'] = created_at |
1138 | + account_totals[aggr_key] = d |
1139 | + except (IndexError, ValueError): |
1140 | + # bad line data |
1141 | + pass |
1142 | + return account_totals, item |