Merge lp:~notmyname/swift/stats_system into lp:~hudson-openstack/swift/trunk
- stats_system
- Merge into trunk
Status: | Superseded |
---|---|
Proposed branch: | lp:~notmyname/swift/stats_system |
Merge into: | lp:~hudson-openstack/swift/trunk |
Diff against target: |
1571 lines (+1464/-11) 14 files modified
bin/swift-account-stats-logger (+27/-0) bin/swift-log-stats-collector (+27/-0) bin/swift-log-uploader (+31/-0) etc/log-processing.conf-sample (+34/-0) swift/common/compressed_file_reader.py (+72/-0) swift/common/daemon.py (+5/-2) swift/common/internal_proxy.py (+182/-0) swift/common/utils.py (+16/-9) swift/stats/access_processor.py (+224/-0) swift/stats/account_stats.py (+86/-0) swift/stats/log_processor.py (+376/-0) swift/stats/log_uploader.py (+160/-0) swift/stats/stats_processor.py (+63/-0) test/unit/stats/test_log_processor.py (+161/-0) |
To merge this branch: | bzr merge lp:~notmyname/swift/stats_system |
Related bugs: | |
Related blueprints: |
Stats system
(High)
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Swift Core security contacts | Pending | ||
Review via email: mp+35156@code.launchpad.net |
This proposal supersedes a proposal from 2010-09-07.
This proposal has been superseded by a proposal from 2010-09-15.
Commit message
Description of the change
work in progress of moving the existing internal (pre-openstack) swift stats system to (openstack) swift
Chuck Thier (cthier) wrote : Posted in a previous version of this proposal | # |
John Dickinson (notmyname) wrote : Posted in a previous version of this proposal | # |
updated to be more DRY compatible
- 70. By John Dickinson
-
added execute perms to stats processor binaries
- 71. By John Dickinson
-
updated config file loading to work with paste.deploy configs
- 72. By John Dickinson
-
fixed typos
- 73. By John Dickinson
-
fixed internal proxy loading
- 74. By John Dickinson
-
made a memcache stub for the internal proxy server
- 75. By John Dickinson
-
fixed internal proxy put_container reference
- 76. By John Dickinson
-
fixed some log uploading glob patterns
- 77. By John Dickinson
-
fixed bug in calculating offsets for filename patterns
- 78. By John Dickinson
-
fixed typos in log processor
- 79. By John Dickinson
-
fixed get_data_list in log_processor
- 80. By John Dickinson
-
added some debug output
- 81. By John Dickinson
-
fixed logging and log uploading
- 82. By John Dickinson
-
fixed copy/paste errors and missing imports
- 83. By John Dickinson
-
added error handling and missing return statement
- 84. By John Dickinson
-
handled some typos and better handling of missing data in internal proxy
- 85. By John Dickinson
-
fixed tests, typos, and added error handling
- 86. By John Dickinson
-
fixed bug in account stats log processing
- 87. By John Dickinson
-
fixed listing filter in log processing
- 88. By John Dickinson
-
fixed stdout capturing for generating csv files
- 89. By John Dickinson
-
fixed replica count reporting error
- 90. By John Dickinson
-
fixed lookback in log processor
- 91. By John Dickinson
-
merged with trunk
- 92. By John Dickinson
-
merged with trunk
- 93. By John Dickinson
-
fixed tests to account for changed key name
- 94. By John Dickinson
-
fixed test bug
- 95. By John Dickinson
-
updated with changes and suggestions from code review
- 96. By John Dickinson
-
merged with changes from trunk
- 97. By John Dickinson
-
added stats overview
- 98. By John Dickinson
-
updated with changes from trunk
- 99. By John Dickinson
-
added additional docs
- 100. By John Dickinson
-
documentation clarification and pep8 fixes
- 101. By John Dickinson
-
added overview stats to the doc index
- 102. By John Dickinson
-
made long lines wrap (grr pep8)
- 103. By John Dickinson
-
merged with trunk
- 104. By John Dickinson
-
merged with trunk
- 105. By John Dickinson
-
merged with trunk
- 106. By John Dickinson
-
fixed stats docs
- 107. By John Dickinson
-
added a bad lines check to the access log parser
- 108. By John Dickinson
-
added tests for compressing file reader
- 109. By John Dickinson
-
fixed compressing file reader test
- 110. By John Dickinson
-
fixed compressing file reader test
- 111. By John Dickinson
-
improved compressing file reader test
- 112. By John Dickinson
-
fixed compressing file reader test
- 113. By John Dickinson
-
made try/except much less inclusive in access log processor
- 114. By John Dickinson
-
added keylist mapping tests and fixed other tests
- 115. By John Dickinson
-
updated stats system tests
- 116. By John Dickinson
-
merged with gholt's test stubs
- 117. By John Dickinson
-
updated SAIO instructions for the stats system
- 118. By John Dickinson
-
fixed stats system saio docs
- 119. By John Dickinson
-
fixed stats system saio docs
- 120. By John Dickinson
-
added openstack copyright/license to test_log_
processor. py - 121. By John Dickinson
-
improved logging in log processors
- 122. By John Dickinson
-
merged with trunk
- 123. By John Dickinson
-
fixed missing working directory bug in account stats
- 124. By John Dickinson
-
fixed readconf parameter that was broken with a previous merge
- 125. By John Dickinson
-
updated setup.py and saio docs for syats system
- 126. By John Dickinson
-
added readconf unit test
- 127. By John Dickinson
-
updated stats saio docs to create logs with the appropriate permissions
- 128. By John Dickinson
-
fixed bug in log processor internal proxy lazy load code
- 129. By John Dickinson
-
updated readconf test
- 130. By John Dickinson
-
updated readconf test
- 131. By John Dickinson
-
updated readconf test
- 132. By John Dickinson
-
fixed internal proxy references in log processor
- 133. By John Dickinson
-
fixed account stats filename creation
- 134. By John Dickinson
-
pep8 tomfoolery
- 135. By John Dickinson
-
moved paren
- 136. By John Dickinson
-
added lazy load of internal proxy to log processor (you were right clay)
Unmerged revisions
Preview Diff
1 | === added file 'bin/swift-account-stats-logger' | |||
2 | --- bin/swift-account-stats-logger 1970-01-01 00:00:00 +0000 | |||
3 | +++ bin/swift-account-stats-logger 2010-09-15 20:11:08 +0000 | |||
4 | @@ -0,0 +1,27 @@ | |||
5 | 1 | #!/usr/bin/python | ||
6 | 2 | # Copyright (c) 2010 OpenStack, LLC. | ||
7 | 3 | # | ||
8 | 4 | # Licensed under the Apache License, Version 2.0 (the "License"); | ||
9 | 5 | # you may not use this file except in compliance with the License. | ||
10 | 6 | # You may obtain a copy of the License at | ||
11 | 7 | # | ||
12 | 8 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
13 | 9 | # | ||
14 | 10 | # Unless required by applicable law or agreed to in writing, software | ||
15 | 11 | # distributed under the License is distributed on an "AS IS" BASIS, | ||
16 | 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | ||
17 | 13 | # implied. | ||
18 | 14 | # See the License for the specific language governing permissions and | ||
19 | 15 | # limitations under the License. | ||
20 | 16 | |||
21 | 17 | import sys | ||
22 | 18 | |||
23 | 19 | from swift.stats.account_stats import AccountStat | ||
24 | 20 | from swift.common import utils | ||
25 | 21 | |||
26 | 22 | if __name__ == '__main__': | ||
27 | 23 | if len(sys.argv) < 2: | ||
28 | 24 | print "Usage: swift-account-stats-logger CONFIG_FILE" | ||
29 | 25 | sys.exit() | ||
30 | 26 | stats_conf = utils.readconf(sys.argv[1], 'log-processor-stats') | ||
31 | 27 | stats = AccountStat(stats_conf).run(once=True) | ||
32 | 0 | 28 | ||
33 | === added file 'bin/swift-log-stats-collector' | |||
34 | --- bin/swift-log-stats-collector 1970-01-01 00:00:00 +0000 | |||
35 | +++ bin/swift-log-stats-collector 2010-09-15 20:11:08 +0000 | |||
36 | @@ -0,0 +1,27 @@ | |||
37 | 1 | #!/usr/bin/python | ||
38 | 2 | # Copyright (c) 2010 OpenStack, LLC. | ||
39 | 3 | # | ||
40 | 4 | # Licensed under the Apache License, Version 2.0 (the "License"); | ||
41 | 5 | # you may not use this file except in compliance with the License. | ||
42 | 6 | # You may obtain a copy of the License at | ||
43 | 7 | # | ||
44 | 8 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
45 | 9 | # | ||
46 | 10 | # Unless required by applicable law or agreed to in writing, software | ||
47 | 11 | # distributed under the License is distributed on an "AS IS" BASIS, | ||
48 | 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | ||
49 | 13 | # implied. | ||
50 | 14 | # See the License for the specific language governing permissions and | ||
51 | 15 | # limitations under the License. | ||
52 | 16 | |||
53 | 17 | import sys | ||
54 | 18 | |||
55 | 19 | from swift.stats.log_processor import LogProcessorDaemon | ||
56 | 20 | from swift.common import utils | ||
57 | 21 | |||
58 | 22 | if __name__ == '__main__': | ||
59 | 23 | if len(sys.argv) < 2: | ||
60 | 24 | print "Usage: swift-log-stats-collector CONFIG_FILE" | ||
61 | 25 | sys.exit() | ||
62 | 26 | conf = utils.readconf(sys.argv[1], log_name='log-stats-collector') | ||
63 | 27 | stats = LogProcessorDaemon(conf).run(once=True, capture_stdout=False) | ||
64 | 0 | 28 | ||
65 | === added file 'bin/swift-log-uploader' | |||
66 | --- bin/swift-log-uploader 1970-01-01 00:00:00 +0000 | |||
67 | +++ bin/swift-log-uploader 2010-09-15 20:11:08 +0000 | |||
68 | @@ -0,0 +1,31 @@ | |||
69 | 1 | #!/usr/bin/python | ||
70 | 2 | # Copyright (c) 2010 OpenStack, LLC. | ||
71 | 3 | # | ||
72 | 4 | # Licensed under the Apache License, Version 2.0 (the "License"); | ||
73 | 5 | # you may not use this file except in compliance with the License. | ||
74 | 6 | # You may obtain a copy of the License at | ||
75 | 7 | # | ||
76 | 8 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
77 | 9 | # | ||
78 | 10 | # Unless required by applicable law or agreed to in writing, software | ||
79 | 11 | # distributed under the License is distributed on an "AS IS" BASIS, | ||
80 | 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | ||
81 | 13 | # implied. | ||
82 | 14 | # See the License for the specific language governing permissions and | ||
83 | 15 | # limitations under the License. | ||
84 | 16 | |||
85 | 17 | import sys | ||
86 | 18 | |||
87 | 19 | from swift.stats.log_uploader import LogUploader | ||
88 | 20 | from swift.common import utils | ||
89 | 21 | |||
90 | 22 | if __name__ == '__main__': | ||
91 | 23 | if len(sys.argv) < 3: | ||
92 | 24 | print "Usage: swift-log-uploader CONFIG_FILE plugin" | ||
93 | 25 | sys.exit() | ||
94 | 26 | uploader_conf = utils.readconf(sys.argv[1], 'log-processor') | ||
95 | 27 | plugin = sys.argv[2] | ||
96 | 28 | section_name = 'log-processor-%s' % plugin | ||
97 | 29 | plugin_conf = utils.readconf(sys.argv[1], section_name) | ||
98 | 30 | uploader_conf.update(plugin_conf) | ||
99 | 31 | uploader = LogUploader(uploader_conf, plugin).run(once=True) | ||
100 | 0 | 32 | ||
101 | === added file 'etc/log-processing.conf-sample' | |||
102 | --- etc/log-processing.conf-sample 1970-01-01 00:00:00 +0000 | |||
103 | +++ etc/log-processing.conf-sample 2010-09-15 20:11:08 +0000 | |||
104 | @@ -0,0 +1,34 @@ | |||
105 | 1 | # plugin section format is named "log-processor-<plugin>" | ||
106 | 2 | |||
107 | 3 | [log-processor] | ||
108 | 4 | swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 | ||
109 | 5 | # container_name = log_processing_data | ||
110 | 6 | # proxy_server_conf = /etc/swift/proxy-server.conf | ||
111 | 7 | # log_facility = LOG_LOCAL0 | ||
112 | 8 | # log_level = INFO | ||
113 | 9 | # lookback_hours = 120 | ||
114 | 10 | # lookback_window = 120 | ||
115 | 11 | # user = swift | ||
116 | 12 | |||
117 | 13 | [log-processor-access] | ||
118 | 14 | # log_dir = /var/log/swift/ | ||
119 | 15 | swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 | ||
120 | 16 | container_name = log_data | ||
121 | 17 | source_filename_format = access-%Y%m%d%H | ||
122 | 18 | class_path = swift.stats.access_processor.AccessLogProcessor | ||
123 | 19 | # service ips is for client ip addresses that should be counted as servicenet | ||
124 | 20 | # service_ips = | ||
125 | 21 | # load balancer private ips is for load balancer ip addresses that should be | ||
126 | 22 | # counted as servicenet | ||
127 | 23 | # lb_private_ips = | ||
128 | 24 | # server_name = proxy | ||
129 | 25 | # user = swift | ||
130 | 26 | |||
131 | 27 | [log-processor-stats] | ||
132 | 28 | # log_dir = /var/log/swift/ | ||
133 | 29 | swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 | ||
134 | 30 | container_name = account_stats | ||
135 | 31 | source_filename_format = stats-%Y%m%d%H | ||
136 | 32 | class_path = swift.stats.stats_processor.StatsLogProcessor | ||
137 | 33 | # account_server_conf = /etc/swift/account-server.conf | ||
138 | 34 | # user = swift | ||
139 | 0 | \ No newline at end of file | 35 | \ No newline at end of file |
140 | 1 | 36 | ||
141 | === added file 'swift/common/compressed_file_reader.py' | |||
142 | --- swift/common/compressed_file_reader.py 1970-01-01 00:00:00 +0000 | |||
143 | +++ swift/common/compressed_file_reader.py 2010-09-15 20:11:08 +0000 | |||
144 | @@ -0,0 +1,72 @@ | |||
145 | 1 | # Copyright (c) 2010 OpenStack, LLC. | ||
146 | 2 | # | ||
147 | 3 | # Licensed under the Apache License, Version 2.0 (the "License"); | ||
148 | 4 | # you may not use this file except in compliance with the License. | ||
149 | 5 | # You may obtain a copy of the License at | ||
150 | 6 | # | ||
151 | 7 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
152 | 8 | # | ||
153 | 9 | # Unless required by applicable law or agreed to in writing, software | ||
154 | 10 | # distributed under the License is distributed on an "AS IS" BASIS, | ||
155 | 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | ||
156 | 12 | # implied. | ||
157 | 13 | # See the License for the specific language governing permissions and | ||
158 | 14 | # limitations under the License. | ||
159 | 15 | |||
160 | 16 | import zlib | ||
161 | 17 | import struct | ||
162 | 18 | |||
163 | 19 | |||
164 | 20 | class CompressedFileReader(object): | ||
165 | 21 | ''' | ||
166 | 22 | Wraps a file object and provides a read method that returns gzip'd data. | ||
167 | 23 | |||
168 | 24 | One warning: if read is called with a small value, the data returned may | ||
169 | 25 | be bigger than the value. In this case, the "compressed" data will be | ||
170 | 26 | bigger than the original data. To solve this, use a bigger read buffer. | ||
171 | 27 | |||
172 | 28 | An example use case: | ||
173 | 29 | Given an uncompressed file on disk, provide a way to read compressed data | ||
174 | 30 | without buffering the entire file data in memory. Using this class, an | ||
175 | 31 | uncompressed log file could be uploaded as compressed data with chunked | ||
176 | 32 | transfer encoding. | ||
177 | 33 | |||
178 | 34 | gzip header and footer code taken from the python stdlib gzip module | ||
179 | 35 | |||
180 | 36 | :param file_obj: File object to read from | ||
181 | 37 | :param compresslevel: compression level | ||
182 | 38 | ''' | ||
183 | 39 | def __init__(self, file_obj, compresslevel=9): | ||
184 | 40 | self._f = file_obj | ||
185 | 41 | self._compressor = zlib.compressobj(compresslevel, | ||
186 | 42 | zlib.DEFLATED, | ||
187 | 43 | -zlib.MAX_WBITS, | ||
188 | 44 | zlib.DEF_MEM_LEVEL, | ||
189 | 45 | 0) | ||
190 | 46 | self.done = False | ||
191 | 47 | self.first = True | ||
192 | 48 | self.crc32 = 0 | ||
193 | 49 | self.total_size = 0 | ||
194 | 50 | |||
195 | 51 | def read(self, *a, **kw): | ||
196 | 52 | if self.done: | ||
197 | 53 | return '' | ||
198 | 54 | x = self._f.read(*a, **kw) | ||
199 | 55 | if x: | ||
200 | 56 | self.crc32 = zlib.crc32(x, self.crc32) & 0xffffffffL | ||
201 | 57 | self.total_size += len(x) | ||
202 | 58 | compressed = self._compressor.compress(x) | ||
203 | 59 | if not compressed: | ||
204 | 60 | compressed = self._compressor.flush(zlib.Z_SYNC_FLUSH) | ||
205 | 61 | else: | ||
206 | 62 | compressed = self._compressor.flush(zlib.Z_FINISH) | ||
207 | 63 | crc32 = struct.pack("<L", self.crc32 & 0xffffffffL) | ||
208 | 64 | size = struct.pack("<L", self.total_size & 0xffffffffL) | ||
209 | 65 | footer = crc32 + size | ||
210 | 66 | compressed += footer | ||
211 | 67 | self.done = True | ||
212 | 68 | if self.first: | ||
213 | 69 | self.first = False | ||
214 | 70 | header = '\037\213\010\000\000\000\000\000\002\377' | ||
215 | 71 | compressed = header + compressed | ||
216 | 72 | return compressed | ||
217 | 0 | 73 | ||
218 | === modified file 'swift/common/daemon.py' | |||
219 | --- swift/common/daemon.py 2010-08-31 23:12:59 +0000 | |||
220 | +++ swift/common/daemon.py 2010-09-15 20:11:08 +0000 | |||
221 | @@ -33,12 +33,15 @@ | |||
222 | 33 | """Override this to run forever""" | 33 | """Override this to run forever""" |
223 | 34 | raise NotImplementedError('run_forever not implemented') | 34 | raise NotImplementedError('run_forever not implemented') |
224 | 35 | 35 | ||
226 | 36 | def run(self, once=False): | 36 | def run(self, once=False, capture_stdout=True, capture_stderr=True): |
227 | 37 | """Run the daemon""" | 37 | """Run the daemon""" |
228 | 38 | # log uncaught exceptions | 38 | # log uncaught exceptions |
229 | 39 | sys.excepthook = lambda *exc_info: \ | 39 | sys.excepthook = lambda *exc_info: \ |
230 | 40 | self.logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info) | 40 | self.logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info) |
232 | 41 | sys.stdout = sys.stderr = utils.LoggerFileObject(self.logger) | 41 | if capture_stdout: |
233 | 42 | sys.stdout = utils.LoggerFileObject(self.logger) | ||
234 | 43 | if capture_stderr: | ||
235 | 44 | sys.stderr = utils.LoggerFileObject(self.logger) | ||
236 | 42 | 45 | ||
237 | 43 | utils.drop_privileges(self.conf.get('user', 'swift')) | 46 | utils.drop_privileges(self.conf.get('user', 'swift')) |
238 | 44 | 47 | ||
239 | 45 | 48 | ||
240 | === added file 'swift/common/internal_proxy.py' | |||
241 | --- swift/common/internal_proxy.py 1970-01-01 00:00:00 +0000 | |||
242 | +++ swift/common/internal_proxy.py 2010-09-15 20:11:08 +0000 | |||
243 | @@ -0,0 +1,182 @@ | |||
244 | 1 | # Copyright (c) 2010 OpenStack, LLC. | ||
245 | 2 | # | ||
246 | 3 | # Licensed under the Apache License, Version 2.0 (the "License"); | ||
247 | 4 | # you may not use this file except in compliance with the License. | ||
248 | 5 | # You may obtain a copy of the License at | ||
249 | 6 | # | ||
250 | 7 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
251 | 8 | # | ||
252 | 9 | # Unless required by applicable law or agreed to in writing, software | ||
253 | 10 | # distributed under the License is distributed on an "AS IS" BASIS, | ||
254 | 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | ||
255 | 12 | # implied. | ||
256 | 13 | # See the License for the specific language governing permissions and | ||
257 | 14 | # limitations under the License. | ||
258 | 15 | |||
259 | 16 | import webob | ||
260 | 17 | from urllib import quote, unquote | ||
261 | 18 | from json import loads as json_loads | ||
262 | 19 | |||
263 | 20 | from swift.common.compressed_file_reader import CompressedFileReader | ||
264 | 21 | from swift.proxy.server import BaseApplication | ||
265 | 22 | |||
266 | 23 | class MemcacheStub(object): | ||
267 | 24 | def get(self, *a, **kw): return None | ||
268 | 25 | def set(self, *a, **kw): return None | ||
269 | 26 | def incr(self, *a, **kw): return 0 | ||
270 | 27 | def delete(self, *a, **kw): return None | ||
271 | 28 | def set_multi(self, *a, **kw): return None | ||
272 | 29 | def get_multi(self, *a, **kw): return [] | ||
273 | 30 | |||
274 | 31 | class InternalProxy(object): | ||
275 | 32 | """ | ||
276 | 33 | Set up a private instance of a proxy server that allows normal requests | ||
277 | 34 | to be made without having to actually send the request to the proxy. | ||
278 | 35 | This also doesn't log the requests to the normal proxy logs. | ||
279 | 36 | |||
280 | 37 | :param proxy_server_conf: proxy server configuration dictionary | ||
281 | 38 | :param logger: logger to log requests to | ||
282 | 39 | :param retries: number of times to retry each request | ||
283 | 40 | """ | ||
284 | 41 | def __init__(self, proxy_server_conf=None, logger=None, retries=0): | ||
285 | 42 | self.upload_app = BaseApplication(proxy_server_conf, | ||
286 | 43 | memcache=MemcacheStub(), | ||
287 | 44 | logger=logger) | ||
288 | 45 | self.retries = retries | ||
289 | 46 | |||
290 | 47 | def upload_file(self, source_file, account, container, object_name, | ||
291 | 48 | compress=True, content_type='application/x-gzip'): | ||
292 | 49 | """ | ||
293 | 50 | Upload a file to cloud files. | ||
294 | 51 | |||
295 | 52 | :param source_file: path to or file like object to upload | ||
296 | 53 | :param account: account to upload to | ||
297 | 54 | :param container: container to upload to | ||
298 | 55 | :param object_name: name of object being uploaded | ||
299 | 56 | :param compress: if True, compresses object as it is uploaded | ||
300 | 57 | :param content_type: content-type of object | ||
301 | 58 | :returns: True if successful, False otherwise | ||
302 | 59 | """ | ||
303 | 60 | log_create_pattern = '/v1/%s/%s/%s' % (account, container, object_name) | ||
304 | 61 | |||
305 | 62 | # create the container | ||
306 | 63 | if not self.create_container(account, container): | ||
307 | 64 | return False | ||
308 | 65 | |||
309 | 66 | # upload the file to the account | ||
310 | 67 | req = webob.Request.blank(log_create_pattern, | ||
311 | 68 | environ={'REQUEST_METHOD': 'PUT'}, | ||
312 | 69 | headers={'Transfer-Encoding': 'chunked'}) | ||
313 | 70 | if compress: | ||
314 | 71 | if hasattr(source_file, 'read'): | ||
315 | 72 | compressed_file = CompressedFileReader(source_file) | ||
316 | 73 | else: | ||
317 | 74 | compressed_file = CompressedFileReader(open(source_file, 'rb')) | ||
318 | 75 | req.body_file = compressed_file | ||
319 | 76 | else: | ||
320 | 77 | if not hasattr(source_file, 'read'): | ||
321 | 78 | source_file = open(source_file, 'rb') | ||
322 | 79 | req.body_file = source_file | ||
323 | 80 | req.account = account | ||
324 | 81 | req.content_type = content_type | ||
325 | 82 | req.content_length = None # to make sure we send chunked data | ||
326 | 83 | resp = self.upload_app.handle_request(self.upload_app.update_request(req)) | ||
327 | 84 | tries = 1 | ||
328 | 85 | while (resp.status_int < 200 or resp.status_int > 299) \ | ||
329 | 86 | and tries <= self.retries: | ||
330 | 87 | resp = self.upload_app.handle_request(self.upload_app.update_request(req)) | ||
331 | 88 | tries += 1 | ||
332 | 89 | if not (200 <= resp.status_int < 300): | ||
333 | 90 | return False | ||
334 | 91 | return True | ||
335 | 92 | |||
336 | 93 | def get_object(self, account, container, object_name): | ||
337 | 94 | """ | ||
338 | 95 | Get object. | ||
339 | 96 | |||
340 | 97 | :param account: account name object is in | ||
341 | 98 | :param container: container name object is in | ||
342 | 99 | :param object_name: name of object to get | ||
343 | 100 | :returns: iterator for object data | ||
344 | 101 | """ | ||
345 | 102 | req = webob.Request.blank('/v1/%s/%s/%s' % | ||
346 | 103 | (account, container, object_name), | ||
347 | 104 | environ={'REQUEST_METHOD': 'GET'}) | ||
348 | 105 | req.account = account | ||
349 | 106 | resp = self.upload_app.handle_request(self.upload_app.update_request(req)) | ||
350 | 107 | tries = 1 | ||
351 | 108 | while (resp.status_int < 200 or resp.status_int > 299) \ | ||
352 | 109 | and tries <= self.retries: | ||
353 | 110 | resp = self.upload_app.handle_request(self.upload_app.update_request(req)) | ||
354 | 111 | tries += 1 | ||
355 | 112 | return resp.status_int, resp.app_iter | ||
356 | 113 | |||
357 | 114 | def create_container(self, account, container): | ||
358 | 115 | """ | ||
359 | 116 | Create container. | ||
360 | 117 | |||
361 | 118 | :param account: account name to put the container in | ||
362 | 119 | :param container: container name to create | ||
363 | 120 | :returns: True if successful, otherwise False | ||
364 | 121 | """ | ||
365 | 122 | req = webob.Request.blank('/v1/%s/%s' % (account, container), | ||
366 | 123 | environ={'REQUEST_METHOD': 'PUT'}) | ||
367 | 124 | req.account = account | ||
368 | 125 | resp = self.upload_app.handle_request(self.upload_app.update_request(req)) | ||
369 | 126 | tries = 1 | ||
370 | 127 | while (resp.status_int < 200 or resp.status_int > 299) \ | ||
371 | 128 | and tries <= self.retries: | ||
372 | 129 | resp = self.upload_app.handle_request(self.upload_app.update_request(req)) | ||
373 | 130 | tries += 1 | ||
374 | 131 | return 200 <= resp.status_int < 300 | ||
375 | 132 | |||
376 | 133 | def get_container_list(self, account, container, marker=None, limit=None, | ||
377 | 134 | prefix=None, delimiter=None, full_listing=True): | ||
378 | 135 | """ | ||
379 | 136 | Get container listing. | ||
380 | 137 | |||
381 | 138 | :param account: account name for the container | ||
382 | 139 | :param container: container name to get the listing of | ||
383 | 140 | :param marker: marker query | ||
384 | 141 | :param limit: limit to query | ||
385 | 142 | :param prefix: prefix query | ||
386 | 143 | :param delimeter: delimeter for query | ||
387 | 144 | :param full_listing: if True, make enough requests to get all listings | ||
388 | 145 | :returns: list of objects | ||
389 | 146 | """ | ||
390 | 147 | if full_listing: | ||
391 | 148 | rv = [] | ||
392 | 149 | listing = self.get_container_list(account, container, marker, | ||
393 | 150 | limit, prefix, delimiter, full_listing=False) | ||
394 | 151 | while listing: | ||
395 | 152 | rv.extend(listing) | ||
396 | 153 | if not delimiter: | ||
397 | 154 | marker = listing[-1]['name'] | ||
398 | 155 | else: | ||
399 | 156 | marker = listing[-1].get('name', listing[-1].get('subdir')) | ||
400 | 157 | listing = self.get_container_list(account, container, marker, | ||
401 | 158 | limit, prefix, delimiter, full_listing=False) | ||
402 | 159 | return rv | ||
403 | 160 | path = '/v1/%s/%s' % (account, container) | ||
404 | 161 | qs = 'format=json' | ||
405 | 162 | if marker: | ||
406 | 163 | qs += '&marker=%s' % quote(marker) | ||
407 | 164 | if limit: | ||
408 | 165 | qs += '&limit=%d' % limit | ||
409 | 166 | if prefix: | ||
410 | 167 | qs += '&prefix=%s' % quote(prefix) | ||
411 | 168 | if delimiter: | ||
412 | 169 | qs += '&delimiter=%s' % quote(delimiter) | ||
413 | 170 | path += '?%s' % qs | ||
414 | 171 | req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'GET'}) | ||
415 | 172 | req.account = account | ||
416 | 173 | resp = self.upload_app.handle_request(self.upload_app.update_request(req)) | ||
417 | 174 | tries = 1 | ||
418 | 175 | while (resp.status_int < 200 or resp.status_int > 299) \ | ||
419 | 176 | and tries <= self.retries: | ||
420 | 177 | resp = self.upload_app.handle_request(self.upload_app.update_request(req)) | ||
421 | 178 | tries += 1 | ||
422 | 179 | if resp.status_int == 204: | ||
423 | 180 | return [] | ||
424 | 181 | if 200 <= resp.status_int < 300: | ||
425 | 182 | return json_loads(resp.body) | ||
426 | 0 | 183 | ||
427 | === modified file 'swift/common/utils.py' | |||
428 | --- swift/common/utils.py 2010-09-02 14:58:30 +0000 | |||
429 | +++ swift/common/utils.py 2010-09-15 20:11:08 +0000 | |||
430 | @@ -530,19 +530,26 @@ | |||
431 | 530 | def cache_from_env(env): | 530 | def cache_from_env(env): |
432 | 531 | return item_from_env(env, 'swift.cache') | 531 | return item_from_env(env, 'swift.cache') |
433 | 532 | 532 | ||
435 | 533 | def readconf(conf, section_name, log_name=None): | 533 | def readconf(conf, section_name=None, log_name=None): |
436 | 534 | c = ConfigParser() | 534 | c = ConfigParser() |
437 | 535 | if not c.read(conf): | 535 | if not c.read(conf): |
438 | 536 | print "Unable to read config file %s" % conf | 536 | print "Unable to read config file %s" % conf |
439 | 537 | sys.exit(1) | 537 | sys.exit(1) |
442 | 538 | if c.has_section(section_name): | 538 | if section_name: |
443 | 539 | conf = dict(c.items(section_name)) | 539 | if c.has_section(section_name): |
444 | 540 | conf = dict(c.items(section_name)) | ||
445 | 541 | else: | ||
446 | 542 | print "Unable to find %s config section in %s" % (section_name, conf) | ||
447 | 543 | sys.exit(1) | ||
448 | 544 | if "log_name" not in conf: | ||
449 | 545 | if log_name is not None: | ||
450 | 546 | conf['log_name'] = log_name | ||
451 | 547 | else: | ||
452 | 548 | conf['log_name'] = section_name | ||
453 | 540 | else: | 549 | else: |
458 | 541 | print "Unable to find %s config section in %s" % (section_name, conf) | 550 | conf = {} |
459 | 542 | sys.exit(1) | 551 | for s in c.sections(): |
460 | 543 | if "log_name" not in conf: | 552 | conf.update({s:dict(c.items(s))}) |
461 | 544 | if log_name is not None: | 553 | if 'log_name' not in conf: |
462 | 545 | conf['log_name'] = log_name | 554 | conf['log_name'] = log_name |
463 | 546 | else: | ||
464 | 547 | conf['log_name'] = section_name | ||
465 | 548 | return conf | 555 | return conf |
466 | 549 | 556 | ||
467 | === added directory 'swift/stats' | |||
468 | === added file 'swift/stats/__init__.py' | |||
469 | === added file 'swift/stats/access_processor.py' | |||
470 | --- swift/stats/access_processor.py 1970-01-01 00:00:00 +0000 | |||
471 | +++ swift/stats/access_processor.py 2010-09-15 20:11:08 +0000 | |||
472 | @@ -0,0 +1,224 @@ | |||
473 | 1 | # Copyright (c) 2010 OpenStack, LLC. | ||
474 | 2 | # | ||
475 | 3 | # Licensed under the Apache License, Version 2.0 (the "License"); | ||
476 | 4 | # you may not use this file except in compliance with the License. | ||
477 | 5 | # You may obtain a copy of the License at | ||
478 | 6 | # | ||
479 | 7 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
480 | 8 | # | ||
481 | 9 | # Unless required by applicable law or agreed to in writing, software | ||
482 | 10 | # distributed under the License is distributed on an "AS IS" BASIS, | ||
483 | 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | ||
484 | 12 | # implied. | ||
485 | 13 | # See the License for the specific language governing permissions and | ||
486 | 14 | # limitations under the License. | ||
487 | 15 | |||
488 | 16 | import collections | ||
489 | 17 | from urllib import unquote | ||
490 | 18 | import copy | ||
491 | 19 | |||
492 | 20 | from swift.common.utils import split_path | ||
493 | 21 | |||
494 | 22 | month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split() | ||
495 | 23 | |||
496 | 24 | class AccessLogProcessor(object): | ||
497 | 25 | |||
498 | 26 | def __init__(self, conf): | ||
499 | 27 | self.server_name = conf.get('server_name', 'proxy') | ||
500 | 28 | self.lb_private_ips = [x.strip() for x in \ | ||
501 | 29 | conf.get('lb_private_ips', '').split(',')\ | ||
502 | 30 | if x.strip()] | ||
503 | 31 | self.service_ips = [x.strip() for x in \ | ||
504 | 32 | conf.get('service_ips', '').split(',')\ | ||
505 | 33 | if x.strip()] | ||
506 | 34 | |||
507 | 35 | def log_line_parser(self, raw_log): | ||
508 | 36 | '''given a raw access log line, return a dict of the good parts''' | ||
509 | 37 | d = {} | ||
510 | 38 | try: | ||
511 | 39 | (_, | ||
512 | 40 | server, | ||
513 | 41 | client_ip, | ||
514 | 42 | lb_ip, | ||
515 | 43 | timestamp, | ||
516 | 44 | method, | ||
517 | 45 | request, | ||
518 | 46 | http_version, | ||
519 | 47 | code, | ||
520 | 48 | referrer, | ||
521 | 49 | user_agent, | ||
522 | 50 | auth_token, | ||
523 | 51 | bytes_in, | ||
524 | 52 | bytes_out, | ||
525 | 53 | etag, | ||
526 | 54 | trans_id, | ||
527 | 55 | headers, | ||
528 | 56 | processing_time) = (unquote(x) for x in raw_log[16:].split(' ')) | ||
529 | 57 | if server != self.server_name: | ||
530 | 58 | raise ValueError('incorrect server name in log line') | ||
531 | 59 | (version, | ||
532 | 60 | account, | ||
533 | 61 | container_name, | ||
534 | 62 | object_name) = split_path(request, 2, 4, True) | ||
535 | 63 | if container_name is not None: | ||
536 | 64 | container_name = container_name.split('?', 1)[0] | ||
537 | 65 | if object_name is not None: | ||
538 | 66 | object_name = object_name.split('?', 1)[0] | ||
539 | 67 | account = account.split('?', 1)[0] | ||
540 | 68 | query = None | ||
541 | 69 | if '?' in request: | ||
542 | 70 | request, query = request.split('?', 1) | ||
543 | 71 | args = query.split('&') | ||
544 | 72 | # Count each query argument. This is used later to aggregate | ||
545 | 73 | # the number of format, prefix, etc. queries. | ||
546 | 74 | for q in args: | ||
547 | 75 | if '=' in q: | ||
548 | 76 | k, v = q.split('=', 1) | ||
549 | 77 | else: | ||
550 | 78 | k = q | ||
551 | 79 | # Certain keys will get summmed in stats reporting | ||
552 | 80 | # (format, path, delimiter, etc.). Save a "1" here | ||
553 | 81 | # to indicate that this request is 1 request for | ||
554 | 82 | # its respective key. | ||
555 | 83 | d[k] = 1 | ||
556 | 84 | except ValueError: | ||
557 | 85 | pass | ||
558 | 86 | else: | ||
559 | 87 | d['client_ip'] = client_ip | ||
560 | 88 | d['lb_ip'] = lb_ip | ||
561 | 89 | d['method'] = method | ||
562 | 90 | d['request'] = request | ||
563 | 91 | if query: | ||
564 | 92 | d['query'] = query | ||
565 | 93 | d['http_version'] = http_version | ||
566 | 94 | d['code'] = code | ||
567 | 95 | d['referrer'] = referrer | ||
568 | 96 | d['user_agent'] = user_agent | ||
569 | 97 | d['auth_token'] = auth_token | ||
570 | 98 | d['bytes_in'] = bytes_in | ||
571 | 99 | d['bytes_out'] = bytes_out | ||
572 | 100 | d['etag'] = etag | ||
573 | 101 | d['trans_id'] = trans_id | ||
574 | 102 | d['processing_time'] = processing_time | ||
575 | 103 | day, month, year, hour, minute, second = timestamp.split('/') | ||
576 | 104 | d['day'] = day | ||
577 | 105 | month = ('%02s' % month_map.index(month)).replace(' ', '0') | ||
578 | 106 | d['month'] = month | ||
579 | 107 | d['year'] = year | ||
580 | 108 | d['hour'] = hour | ||
581 | 109 | d['minute'] = minute | ||
582 | 110 | d['second'] = second | ||
583 | 111 | d['tz'] = '+0000' | ||
584 | 112 | d['account'] = account | ||
585 | 113 | d['container_name'] = container_name | ||
586 | 114 | d['object_name'] = object_name | ||
587 | 115 | d['bytes_out'] = int(d['bytes_out'].replace('-','0')) | ||
588 | 116 | d['bytes_in'] = int(d['bytes_in'].replace('-','0')) | ||
589 | 117 | d['code'] = int(d['code']) | ||
590 | 118 | return d | ||
591 | 119 | |||
592 | 120 | def process(self, obj_stream, account, container, object_name): | ||
593 | 121 | '''generate hourly groupings of data from one access log file''' | ||
594 | 122 | hourly_aggr_info = {} | ||
595 | 123 | for line in obj_stream: | ||
596 | 124 | line_data = self.log_line_parser(line) | ||
597 | 125 | if not line_data: | ||
598 | 126 | continue | ||
599 | 127 | account = line_data['account'] | ||
600 | 128 | container_name = line_data['container_name'] | ||
601 | 129 | year = line_data['year'] | ||
602 | 130 | month = line_data['month'] | ||
603 | 131 | day = line_data['day'] | ||
604 | 132 | hour = line_data['hour'] | ||
605 | 133 | bytes_out = line_data['bytes_out'] | ||
606 | 134 | bytes_in = line_data['bytes_in'] | ||
607 | 135 | method = line_data['method'] | ||
608 | 136 | code = int(line_data['code']) | ||
609 | 137 | object_name = line_data['object_name'] | ||
610 | 138 | client_ip = line_data['client_ip'] | ||
611 | 139 | |||
612 | 140 | op_level = None | ||
613 | 141 | if not container_name: | ||
614 | 142 | op_level = 'account' | ||
615 | 143 | elif container_name and not object_name: | ||
616 | 144 | op_level = 'container' | ||
617 | 145 | elif object_name: | ||
618 | 146 | op_level = 'object' | ||
619 | 147 | |||
620 | 148 | aggr_key = (account, year, month, day, hour) | ||
621 | 149 | d = hourly_aggr_info.get(aggr_key, {}) | ||
622 | 150 | if line_data['lb_ip'] in self.lb_private_ips: | ||
623 | 151 | source = 'service' | ||
624 | 152 | else: | ||
625 | 153 | source = 'public' | ||
626 | 154 | |||
627 | 155 | if line_data['client_ip'] in self.service_ips: | ||
628 | 156 | source = 'service' | ||
629 | 157 | |||
630 | 158 | d[(source, 'bytes_out')] = d.setdefault((source, 'bytes_out'), 0) + \ | ||
631 | 159 | bytes_out | ||
632 | 160 | d[(source, 'bytes_in')] = d.setdefault((source, 'bytes_in'), 0) + \ | ||
633 | 161 | bytes_in | ||
634 | 162 | |||
635 | 163 | d['format_query'] = d.setdefault('format_query', 0) + \ | ||
636 | 164 | line_data.get('format', 0) | ||
637 | 165 | d['marker_query'] = d.setdefault('marker_query', 0) + \ | ||
638 | 166 | line_data.get('marker', 0) | ||
639 | 167 | d['prefix_query'] = d.setdefault('prefix_query', 0) + \ | ||
640 | 168 | line_data.get('prefix', 0) | ||
641 | 169 | d['delimiter_query'] = d.setdefault('delimiter_query', 0) + \ | ||
642 | 170 | line_data.get('delimiter', 0) | ||
643 | 171 | path = line_data.get('path', 0) | ||
644 | 172 | d['path_query'] = d.setdefault('path_query', 0) + path | ||
645 | 173 | |||
646 | 174 | code = '%dxx' % (code/100) | ||
647 | 175 | key = (source, op_level, method, code) | ||
648 | 176 | d[key] = d.setdefault(key, 0) + 1 | ||
649 | 177 | |||
650 | 178 | hourly_aggr_info[aggr_key] = d | ||
651 | 179 | return hourly_aggr_info | ||
652 | 180 | |||
653 | 181 | def keylist_mapping(self): | ||
654 | 182 | source_keys = 'service public'.split() | ||
655 | 183 | level_keys = 'account container object'.split() | ||
656 | 184 | verb_keys = 'GET PUT POST DELETE HEAD COPY'.split() | ||
657 | 185 | code_keys = '2xx 4xx 5xx'.split() | ||
658 | 186 | |||
659 | 187 | keylist_mapping = { | ||
660 | 188 | # <db key> : <row key> or <set of row keys> | ||
661 | 189 | 'service_bw_in': ('service', 'bytes_in'), | ||
662 | 190 | 'service_bw_out': ('service', 'bytes_out'), | ||
663 | 191 | 'public_bw_in': ('public', 'bytes_in'), | ||
664 | 192 | 'public_bw_out': ('public', 'bytes_out'), | ||
665 | 193 | 'account_requests': set(), | ||
666 | 194 | 'container_requests': set(), | ||
667 | 195 | 'object_requests': set(), | ||
668 | 196 | 'service_request': set(), | ||
669 | 197 | 'public_request': set(), | ||
670 | 198 | 'ops_count': set(), | ||
671 | 199 | } | ||
672 | 200 | for verb in verb_keys: | ||
673 | 201 | keylist_mapping[verb] = set() | ||
674 | 202 | for code in code_keys: | ||
675 | 203 | keylist_mapping[code] = set() | ||
676 | 204 | for source in source_keys: | ||
677 | 205 | for level in level_keys: | ||
678 | 206 | for verb in verb_keys: | ||
679 | 207 | for code in code_keys: | ||
680 | 208 | keylist_mapping['account_requests'].add( | ||
681 | 209 | (source, 'account', verb, code)) | ||
682 | 210 | keylist_mapping['container_requests'].add( | ||
683 | 211 | (source, 'container', verb, code)) | ||
684 | 212 | keylist_mapping['object_requests'].add( | ||
685 | 213 | (source, 'object', verb, code)) | ||
686 | 214 | keylist_mapping['service_request'].add( | ||
687 | 215 | ('service', level, verb, code)) | ||
688 | 216 | keylist_mapping['public_request'].add( | ||
689 | 217 | ('public', level, verb, code)) | ||
690 | 218 | keylist_mapping[verb].add( | ||
691 | 219 | (source, level, verb, code)) | ||
692 | 220 | keylist_mapping[code].add( | ||
693 | 221 | (source, level, verb, code)) | ||
694 | 222 | keylist_mapping['ops_count'].add( | ||
695 | 223 | (source,level,verb,code)) | ||
696 | 224 | return keylist_mapping | ||
697 | 0 | 225 | ||
698 | === added file 'swift/stats/account_stats.py' | |||
699 | --- swift/stats/account_stats.py 1970-01-01 00:00:00 +0000 | |||
700 | +++ swift/stats/account_stats.py 2010-09-15 20:11:08 +0000 | |||
701 | @@ -0,0 +1,86 @@ | |||
702 | 1 | # Copyright (c) 2010 OpenStack, LLC. | ||
703 | 2 | # | ||
704 | 3 | # Licensed under the Apache License, Version 2.0 (the "License"); | ||
705 | 4 | # you may not use this file except in compliance with the License. | ||
706 | 5 | # You may obtain a copy of the License at | ||
707 | 6 | # | ||
708 | 7 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
709 | 8 | # | ||
710 | 9 | # Unless required by applicable law or agreed to in writing, software | ||
711 | 10 | # distributed under the License is distributed on an "AS IS" BASIS, | ||
712 | 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | ||
713 | 12 | # implied. | ||
714 | 13 | # See the License for the specific language governing permissions and | ||
715 | 14 | # limitations under the License. | ||
716 | 15 | |||
717 | 16 | import os | ||
718 | 17 | import time | ||
719 | 18 | from paste.deploy import appconfig | ||
720 | 19 | |||
721 | 20 | from swift.account.server import DATADIR as account_server_data_dir | ||
722 | 21 | from swift.common.db import AccountBroker | ||
723 | 22 | from swift.common.internal_proxy import InternalProxy | ||
724 | 23 | from swift.common.utils import renamer, get_logger, readconf | ||
725 | 24 | from swift.common.daemon import Daemon | ||
726 | 25 | |||
727 | 26 | class AccountStat(Daemon): | ||
728 | 27 | def __init__(self, stats_conf): | ||
729 | 28 | super(AccountStat, self).__init__(stats_conf) | ||
730 | 29 | target_dir = stats_conf.get('log_dir', '/var/log/swift') | ||
731 | 30 | #TODO: figure out the server configs. also figure out internal_proxy | ||
732 | 31 | account_server_conf_loc = stats_conf.get('account_server_conf', | ||
733 | 32 | '/etc/swift/account-server.conf') | ||
734 | 33 | server_conf = appconfig('config:%s' % account_server_conf_loc, | ||
735 | 34 | name='account-server') | ||
736 | 35 | filename_format = stats_conf['source_filename_format'] | ||
737 | 36 | self.filename_format = filename_format | ||
738 | 37 | self.target_dir = target_dir | ||
739 | 38 | self.devices = server_conf.get('devices', '/srv/node') | ||
740 | 39 | self.mount_check = server_conf.get('mount_check', 'true').lower() in \ | ||
741 | 40 | ('true', 't', '1', 'on', 'yes', 'y') | ||
742 | 41 | self.logger = get_logger(stats_conf, 'swift-account-stats-logger') | ||
743 | 42 | |||
744 | 43 | def run_once(self): | ||
745 | 44 | self.logger.info("Gathering account stats") | ||
746 | 45 | start = time.time() | ||
747 | 46 | self.find_and_process() | ||
748 | 47 | self.logger.info("Gathering account stats complete (%0.2f minutes)" % | ||
749 | 48 | ((time.time()-start)/60)) | ||
750 | 49 | |||
751 | 50 | def find_and_process(self): | ||
752 | 51 | #TODO: handle a counter in the filename to prevent overwrites? | ||
753 | 52 | src_filename = time.strftime(self.filename_format) | ||
754 | 53 | #TODO: don't use /tmp? | ||
755 | 54 | tmp_filename = os.path.join('/tmp', src_filename) | ||
756 | 55 | with open(tmp_filename, 'wb') as statfile: | ||
757 | 56 | #statfile.write('Account Name, Container Count, Object Count, Bytes Used\n') | ||
758 | 57 | for device in os.listdir(self.devices): | ||
759 | 58 | if self.mount_check and \ | ||
760 | 59 | not os.path.ismount(os.path.join(self.devices, device)): | ||
761 | 60 | self.logger.error("Device %s is not mounted, skipping." % | ||
762 | 61 | device) | ||
763 | 62 | continue | ||
764 | 63 | accounts = os.path.join(self.devices, | ||
765 | 64 | device, | ||
766 | 65 | account_server_data_dir) | ||
767 | 66 | if not os.path.exists(accounts): | ||
768 | 67 | self.logger.debug("Path %s does not exist, skipping." % | ||
769 | 68 | accounts) | ||
770 | 69 | continue | ||
771 | 70 | for root, dirs, files in os.walk(accounts, topdown=False): | ||
772 | 71 | for filename in files: | ||
773 | 72 | if filename.endswith('.db'): | ||
774 | 73 | broker = AccountBroker(os.path.join(root, filename)) | ||
775 | 74 | if not broker.is_deleted(): | ||
776 | 75 | (account_name, | ||
777 | 76 | _, _, _, | ||
778 | 77 | container_count, | ||
779 | 78 | object_count, | ||
780 | 79 | bytes_used, | ||
781 | 80 | _, _) = broker.get_info() | ||
782 | 81 | line_data = '"%s",%d,%d,%d\n' % (account_name, | ||
783 | 82 | container_count, | ||
784 | 83 | object_count, | ||
785 | 84 | bytes_used) | ||
786 | 85 | statfile.write(line_data) | ||
787 | 86 | renamer(tmp_filename, os.path.join(self.target_dir, src_filename)) | ||
788 | 0 | 87 | ||
789 | === added file 'swift/stats/log_processor.py' | |||
790 | --- swift/stats/log_processor.py 1970-01-01 00:00:00 +0000 | |||
791 | +++ swift/stats/log_processor.py 2010-09-15 20:11:08 +0000 | |||
792 | @@ -0,0 +1,376 @@ | |||
793 | 1 | # Copyright (c) 2010 OpenStack, LLC. | ||
794 | 2 | # | ||
795 | 3 | # Licensed under the Apache License, Version 2.0 (the "License"); | ||
796 | 4 | # you may not use this file except in compliance with the License. | ||
797 | 5 | # You may obtain a copy of the License at | ||
798 | 6 | # | ||
799 | 7 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
800 | 8 | # | ||
801 | 9 | # Unless required by applicable law or agreed to in writing, software | ||
802 | 10 | # distributed under the License is distributed on an "AS IS" BASIS, | ||
803 | 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | ||
804 | 12 | # implied. | ||
805 | 13 | # See the License for the specific language governing permissions and | ||
806 | 14 | # limitations under the License. | ||
807 | 15 | |||
808 | 16 | from ConfigParser import ConfigParser | ||
809 | 17 | import zlib | ||
810 | 18 | import time | ||
811 | 19 | import datetime | ||
812 | 20 | import cStringIO | ||
813 | 21 | import collections | ||
814 | 22 | from paste.deploy import appconfig | ||
815 | 23 | import multiprocessing | ||
816 | 24 | import Queue | ||
817 | 25 | import cPickle | ||
818 | 26 | |||
819 | 27 | from swift.common.internal_proxy import InternalProxy | ||
820 | 28 | from swift.common.exceptions import ChunkReadTimeout | ||
821 | 29 | from swift.common.utils import get_logger, readconf | ||
822 | 30 | from swift.common.daemon import Daemon | ||
823 | 31 | |||
824 | 32 | class BadFileDownload(Exception): | ||
825 | 33 | pass | ||
826 | 34 | |||
827 | 35 | class LogProcessor(object): | ||
828 | 36 | |||
829 | 37 | def __init__(self, conf, logger): | ||
830 | 38 | stats_conf = conf.get('log-processor', {}) | ||
831 | 39 | |||
832 | 40 | proxy_server_conf_loc = stats_conf.get('proxy_server_conf', | ||
833 | 41 | '/etc/swift/proxy-server.conf') | ||
834 | 42 | self.proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc, | ||
835 | 43 | name='proxy-server') | ||
836 | 44 | if isinstance(logger, tuple): | ||
837 | 45 | self.logger = get_logger(*logger) | ||
838 | 46 | else: | ||
839 | 47 | self.logger = logger | ||
840 | 48 | self.internal_proxy = InternalProxy(self.proxy_server_conf, | ||
841 | 49 | self.logger, | ||
842 | 50 | retries=3) | ||
843 | 51 | |||
844 | 52 | # load the processing plugins | ||
845 | 53 | self.plugins = {} | ||
846 | 54 | plugin_prefix = 'log-processor-' | ||
847 | 55 | for section in (x for x in conf if x.startswith(plugin_prefix)): | ||
848 | 56 | plugin_name = section[len(plugin_prefix):] | ||
849 | 57 | plugin_conf = conf.get(section, {}) | ||
850 | 58 | self.plugins[plugin_name] = plugin_conf | ||
851 | 59 | import_target, class_name = plugin_conf['class_path'].rsplit('.', 1) | ||
852 | 60 | module = __import__(import_target, fromlist=[import_target]) | ||
853 | 61 | klass = getattr(module, class_name) | ||
854 | 62 | self.plugins[plugin_name]['instance'] = klass(plugin_conf) | ||
855 | 63 | |||
856 | 64 | def process_one_file(self, plugin_name, account, container, object_name): | ||
857 | 65 | # get an iter of the object data | ||
858 | 66 | compressed = object_name.endswith('.gz') | ||
859 | 67 | stream = self.get_object_data(account, container, object_name, | ||
860 | 68 | compressed=compressed) | ||
861 | 69 | # look up the correct plugin and send the stream to it | ||
862 | 70 | return self.plugins[plugin_name]['instance'].process(stream, | ||
863 | 71 | account, | ||
864 | 72 | container, | ||
865 | 73 | object_name) | ||
866 | 74 | |||
867 | 75 | def get_data_list(self, start_date=None, end_date=None, listing_filter=None): | ||
868 | 76 | total_list = [] | ||
869 | 77 | for name, data in self.plugins.items(): | ||
870 | 78 | account = data['swift_account'] | ||
871 | 79 | container = data['container_name'] | ||
872 | 80 | l = self.get_container_listing(account, | ||
873 | 81 | container, | ||
874 | 82 | start_date, | ||
875 | 83 | end_date) | ||
876 | 84 | for i in l: | ||
877 | 85 | # The items in this list end up being passed as positional | ||
878 | 86 | # parameters to process_one_file. | ||
879 | 87 | x = (name, account, container, i) | ||
880 | 88 | if x not in listing_filter: | ||
881 | 89 | total_list.append(x) | ||
882 | 90 | return total_list | ||
883 | 91 | |||
884 | 92 | def get_container_listing(self, swift_account, container_name, start_date=None, | ||
885 | 93 | end_date=None, listing_filter=None): | ||
886 | 94 | ''' | ||
887 | 95 | Get a container listing, filtered by start_date, end_date, and | ||
888 | 96 | listing_filter. Dates, if given, should be in YYYYMMDDHH format | ||
889 | 97 | ''' | ||
890 | 98 | search_key = None | ||
891 | 99 | if start_date is not None: | ||
892 | 100 | date_parts = [] | ||
893 | 101 | try: | ||
894 | 102 | year, start_date = start_date[:4], start_date[4:] | ||
895 | 103 | if year: | ||
896 | 104 | date_parts.append(year) | ||
897 | 105 | month, start_date = start_date[:2], start_date[2:] | ||
898 | 106 | if month: | ||
899 | 107 | date_parts.append(month) | ||
900 | 108 | day, start_date = start_date[:2], start_date[2:] | ||
901 | 109 | if day: | ||
902 | 110 | date_parts.append(day) | ||
903 | 111 | hour, start_date = start_date[:2], start_date[2:] | ||
904 | 112 | if hour: | ||
905 | 113 | date_parts.append(hour) | ||
906 | 114 | except IndexError: | ||
907 | 115 | pass | ||
908 | 116 | else: | ||
909 | 117 | search_key = '/'.join(date_parts) | ||
910 | 118 | end_key = None | ||
911 | 119 | if end_date is not None: | ||
912 | 120 | date_parts = [] | ||
913 | 121 | try: | ||
914 | 122 | year, end_date = end_date[:4], end_date[4:] | ||
915 | 123 | if year: | ||
916 | 124 | date_parts.append(year) | ||
917 | 125 | month, end_date = end_date[:2], end_date[2:] | ||
918 | 126 | if month: | ||
919 | 127 | date_parts.append(month) | ||
920 | 128 | day, end_date = end_date[:2], end_date[2:] | ||
921 | 129 | if day: | ||
922 | 130 | date_parts.append(day) | ||
923 | 131 | hour, end_date = end_date[:2], end_date[2:] | ||
924 | 132 | if hour: | ||
925 | 133 | date_parts.append(hour) | ||
926 | 134 | except IndexError: | ||
927 | 135 | pass | ||
928 | 136 | else: | ||
929 | 137 | end_key = '/'.join(date_parts) | ||
930 | 138 | container_listing = self.internal_proxy.get_container_list( | ||
931 | 139 | swift_account, | ||
932 | 140 | container_name, | ||
933 | 141 | marker=search_key) | ||
934 | 142 | results = [] | ||
935 | 143 | if container_listing is not None: | ||
936 | 144 | if listing_filter is None: | ||
937 | 145 | listing_filter = set() | ||
938 | 146 | for item in container_listing: | ||
939 | 147 | name = item['name'] | ||
940 | 148 | if end_key and name > end_key: | ||
941 | 149 | break | ||
942 | 150 | if name not in listing_filter: | ||
943 | 151 | results.append(name) | ||
944 | 152 | return results | ||
945 | 153 | |||
946 | 154 | def get_object_data(self, swift_account, container_name, object_name, | ||
947 | 155 | compressed=False): | ||
948 | 156 | '''reads an object and yields its lines''' | ||
949 | 157 | code, o = self.internal_proxy.get_object(swift_account, | ||
950 | 158 | container_name, | ||
951 | 159 | object_name) | ||
952 | 160 | if code < 200 or code >= 300: | ||
953 | 161 | return | ||
954 | 162 | last_part = '' | ||
955 | 163 | last_compressed_part = '' | ||
956 | 164 | # magic in the following zlib.decompressobj argument is courtesy of | ||
957 | 165 | # http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk | ||
958 | 166 | d = zlib.decompressobj(16+zlib.MAX_WBITS) | ||
959 | 167 | try: | ||
960 | 168 | for chunk in o: | ||
961 | 169 | if compressed: | ||
962 | 170 | try: | ||
963 | 171 | chunk = d.decompress(chunk) | ||
964 | 172 | except zlib.error: | ||
965 | 173 | raise BadFileDownload() # bad compressed data | ||
966 | 174 | parts = chunk.split('\n') | ||
967 | 175 | parts[0] = last_part + parts[0] | ||
968 | 176 | for part in parts[:-1]: | ||
969 | 177 | yield part | ||
970 | 178 | last_part = parts[-1] | ||
971 | 179 | if last_part: | ||
972 | 180 | yield last_part | ||
973 | 181 | except ChunkReadTimeout: | ||
974 | 182 | raise BadFileDownload() | ||
975 | 183 | |||
976 | 184 | def generate_keylist_mapping(self): | ||
977 | 185 | keylist = {} | ||
978 | 186 | for plugin in self.plugins: | ||
979 | 187 | plugin_keylist = self.plugins[plugin]['instance'].keylist_mapping() | ||
980 | 188 | if not plugin_keylist: | ||
981 | 189 | continue | ||
982 | 190 | for k, v in plugin_keylist.items(): | ||
983 | 191 | o = keylist.get(k) | ||
984 | 192 | if o: | ||
985 | 193 | if isinstance(o, set): | ||
986 | 194 | if isinstance(v, set): | ||
987 | 195 | o.update(v) | ||
988 | 196 | else: | ||
989 | 197 | o.update([v]) | ||
990 | 198 | else: | ||
991 | 199 | o = set(o) | ||
992 | 200 | if isinstance(v, set): | ||
993 | 201 | o.update(v) | ||
994 | 202 | else: | ||
995 | 203 | o.update([v]) | ||
996 | 204 | else: | ||
997 | 205 | o = v | ||
998 | 206 | keylist[k] = o | ||
999 | 207 | return keylist | ||
1000 | 208 | |||
1001 | 209 | |||
1002 | 210 | class LogProcessorDaemon(Daemon): | ||
1003 | 211 | def __init__(self, conf): | ||
1004 | 212 | c = conf.get('log-processor') | ||
1005 | 213 | super(LogProcessorDaemon, self).__init__(c) | ||
1006 | 214 | self.total_conf = conf | ||
1007 | 215 | self.logger = get_logger(c) | ||
1008 | 216 | self.log_processor = LogProcessor(conf, self.logger) | ||
1009 | 217 | self.lookback_hours = int(c.get('lookback_hours', '120')) | ||
1010 | 218 | self.lookback_window = int(c.get('lookback_window', | ||
1011 | 219 | str(self.lookback_hours))) | ||
1012 | 220 | self.log_processor_account = c['swift_account'] | ||
1013 | 221 | self.log_processor_container = c.get('container_name', | ||
1014 | 222 | 'log_processing_data') | ||
1015 | 223 | |||
1016 | 224 | def run_once(self): | ||
1017 | 225 | self.logger.info("Beginning log processing") | ||
1018 | 226 | start = time.time() | ||
1019 | 227 | if self.lookback_hours == 0: | ||
1020 | 228 | lookback_start = None | ||
1021 | 229 | lookback_end = None | ||
1022 | 230 | else: | ||
1023 | 231 | lookback_start = datetime.datetime.now() - \ | ||
1024 | 232 | datetime.timedelta(hours=self.lookback_hours) | ||
1025 | 233 | lookback_start = lookback_start.strftime('%Y%m%d%H') | ||
1026 | 234 | if self.lookback_window == 0: | ||
1027 | 235 | lookback_end = None | ||
1028 | 236 | else: | ||
1029 | 237 | lookback_end = datetime.datetime.now() - \ | ||
1030 | 238 | datetime.timedelta(hours=self.lookback_hours) + \ | ||
1031 | 239 | datetime.timedelta(hours=self.lookback_window) | ||
1032 | 240 | lookback_end = lookback_end.strftime('%Y%m%d%H') | ||
1033 | 241 | self.logger.debug('lookback_start: %s' % lookback_start) | ||
1034 | 242 | self.logger.debug('lookback_end: %s' % lookback_end) | ||
1035 | 243 | try: | ||
1036 | 244 | processed_files_stream = self.log_processor.get_object_data( | ||
1037 | 245 | self.log_processor_account, | ||
1038 | 246 | self.log_processor_container, | ||
1039 | 247 | 'processed_files.pickle.gz', | ||
1040 | 248 | compressed=True) | ||
1041 | 249 | buf = '\n'.join(x for x in processed_files_stream) | ||
1042 | 250 | if buf: | ||
1043 | 251 | already_processed_files = cPickle.loads(buf) | ||
1044 | 252 | else: | ||
1045 | 253 | already_processed_files = set() | ||
1046 | 254 | except: | ||
1047 | 255 | already_processed_files = set() | ||
1048 | 256 | self.logger.debug('found %d processed files' % \ | ||
1049 | 257 | len(already_processed_files)) | ||
1050 | 258 | logs_to_process = self.log_processor.get_data_list(lookback_start, | ||
1051 | 259 | lookback_end, | ||
1052 | 260 | already_processed_files) | ||
1053 | 261 | self.logger.info('loaded %d files to process' % len(logs_to_process)) | ||
1054 | 262 | if not logs_to_process: | ||
1055 | 263 | self.logger.info("Log processing done (%0.2f minutes)" % | ||
1056 | 264 | ((time.time()-start)/60)) | ||
1057 | 265 | return | ||
1058 | 266 | |||
1059 | 267 | # map | ||
1060 | 268 | processor_args = (self.total_conf, self.logger) | ||
1061 | 269 | results = multiprocess_collate(processor_args, logs_to_process) | ||
1062 | 270 | |||
1063 | 271 | #reduce | ||
1064 | 272 | aggr_data = {} | ||
1065 | 273 | processed_files = already_processed_files | ||
1066 | 274 | for item, data in results: | ||
1067 | 275 | # since item contains the plugin and the log name, new plugins will | ||
1068 | 276 | # "reprocess" the file and the results will be in the final csv. | ||
1069 | 277 | processed_files.add(item) | ||
1070 | 278 | for k, d in data.items(): | ||
1071 | 279 | existing_data = aggr_data.get(k, {}) | ||
1072 | 280 | for i, j in d.items(): | ||
1073 | 281 | current = existing_data.get(i, 0) | ||
1074 | 282 | # merging strategy for key collisions is addition | ||
1075 | 283 | # processing plugins need to realize this | ||
1076 | 284 | existing_data[i] = current + j | ||
1077 | 285 | aggr_data[k] = existing_data | ||
1078 | 286 | |||
1079 | 287 | # group | ||
1080 | 288 | # reduce a large number of keys in aggr_data[k] to a small number of | ||
1081 | 289 | # output keys | ||
1082 | 290 | keylist_mapping = self.log_processor.generate_keylist_mapping() | ||
1083 | 291 | final_info = collections.defaultdict(dict) | ||
1084 | 292 | for account, data in aggr_data.items(): | ||
1085 | 293 | for key, mapping in keylist_mapping.items(): | ||
1086 | 294 | if isinstance(mapping, (list, set)): | ||
1087 | 295 | value = 0 | ||
1088 | 296 | for k in mapping: | ||
1089 | 297 | try: | ||
1090 | 298 | value += data[k] | ||
1091 | 299 | except KeyError: | ||
1092 | 300 | pass | ||
1093 | 301 | else: | ||
1094 | 302 | try: | ||
1095 | 303 | value = data[mapping] | ||
1096 | 304 | except KeyError: | ||
1097 | 305 | value = 0 | ||
1098 | 306 | final_info[account][key] = value | ||
1099 | 307 | |||
1100 | 308 | # output | ||
1101 | 309 | sorted_keylist_mapping = sorted(keylist_mapping) | ||
1102 | 310 | columns = 'bill_ts,data_ts,account,' + ','.join(sorted_keylist_mapping) | ||
1103 | 311 | print columns | ||
1104 | 312 | for (account, year, month, day, hour), d in final_info.items(): | ||
1105 | 313 | bill_ts = '' | ||
1106 | 314 | data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour) | ||
1107 | 315 | row = [bill_ts, data_ts] | ||
1108 | 316 | row.append('%s' % account) | ||
1109 | 317 | for k in sorted_keylist_mapping: | ||
1110 | 318 | row.append('%s'%d[k]) | ||
1111 | 319 | print ','.join(row) | ||
1112 | 320 | |||
1113 | 321 | # cleanup | ||
1114 | 322 | s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) | ||
1115 | 323 | f = cStringIO.StringIO(s) | ||
1116 | 324 | self.log_processor.internal_proxy.upload_file(f, | ||
1117 | 325 | self.log_processor_account, | ||
1118 | 326 | self.log_processor_container, | ||
1119 | 327 | 'processed_files.pickle.gz') | ||
1120 | 328 | |||
1121 | 329 | self.logger.info("Log processing done (%0.2f minutes)" % | ||
1122 | 330 | ((time.time()-start)/60)) | ||
1123 | 331 | |||
1124 | 332 | def multiprocess_collate(processor_args, logs_to_process): | ||
1125 | 333 | '''yield hourly data from logs_to_process''' | ||
1126 | 334 | worker_count = multiprocessing.cpu_count() | ||
1127 | 335 | results = [] | ||
1128 | 336 | in_queue = multiprocessing.Queue() | ||
1129 | 337 | out_queue = multiprocessing.Queue() | ||
1130 | 338 | for _ in range(worker_count): | ||
1131 | 339 | p = multiprocessing.Process(target=collate_worker, | ||
1132 | 340 | args=(processor_args, | ||
1133 | 341 | in_queue, | ||
1134 | 342 | out_queue)) | ||
1135 | 343 | p.start() | ||
1136 | 344 | results.append(p) | ||
1137 | 345 | for x in logs_to_process: | ||
1138 | 346 | in_queue.put(x) | ||
1139 | 347 | for _ in range(worker_count): | ||
1140 | 348 | in_queue.put(None) | ||
1141 | 349 | count = 0 | ||
1142 | 350 | while True: | ||
1143 | 351 | try: | ||
1144 | 352 | item, data = out_queue.get_nowait() | ||
1145 | 353 | count += 1 | ||
1146 | 354 | if data: | ||
1147 | 355 | yield item, data | ||
1148 | 356 | if count >= len(logs_to_process): | ||
1149 | 357 | # this implies that one result will come from every request | ||
1150 | 358 | break | ||
1151 | 359 | except Queue.Empty: | ||
1152 | 360 | time.sleep(.1) | ||
1153 | 361 | for r in results: | ||
1154 | 362 | r.join() | ||
1155 | 363 | |||
1156 | 364 | def collate_worker(processor_args, in_queue, out_queue): | ||
1157 | 365 | '''worker process for multiprocess_collate''' | ||
1158 | 366 | p = LogProcessor(*processor_args) | ||
1159 | 367 | while True: | ||
1160 | 368 | try: | ||
1161 | 369 | item = in_queue.get_nowait() | ||
1162 | 370 | if item is None: | ||
1163 | 371 | break | ||
1164 | 372 | except Queue.Empty: | ||
1165 | 373 | time.sleep(.1) | ||
1166 | 374 | else: | ||
1167 | 375 | ret = p.process_one_file(*item) | ||
1168 | 376 | out_queue.put((item, ret)) | ||
1169 | 0 | \ No newline at end of file | 377 | \ No newline at end of file |
1170 | 1 | 378 | ||
1171 | === added file 'swift/stats/log_uploader.py' | |||
1172 | --- swift/stats/log_uploader.py 1970-01-01 00:00:00 +0000 | |||
1173 | +++ swift/stats/log_uploader.py 2010-09-15 20:11:08 +0000 | |||
1174 | @@ -0,0 +1,160 @@ | |||
1175 | 1 | # Copyright (c) 2010 OpenStack, LLC. | ||
1176 | 2 | # | ||
1177 | 3 | # Licensed under the Apache License, Version 2.0 (the "License"); | ||
1178 | 4 | # you may not use this file except in compliance with the License. | ||
1179 | 5 | # You may obtain a copy of the License at | ||
1180 | 6 | # | ||
1181 | 7 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
1182 | 8 | # | ||
1183 | 9 | # Unless required by applicable law or agreed to in writing, software | ||
1184 | 10 | # distributed under the License is distributed on an "AS IS" BASIS, | ||
1185 | 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | ||
1186 | 12 | # implied. | ||
1187 | 13 | # See the License for the specific language governing permissions and | ||
1188 | 14 | # limitations under the License. | ||
1189 | 15 | |||
1190 | 16 | from __future__ import with_statement | ||
1191 | 17 | import os | ||
1192 | 18 | import hashlib | ||
1193 | 19 | import time | ||
1194 | 20 | import gzip | ||
1195 | 21 | import glob | ||
1196 | 22 | from paste.deploy import appconfig | ||
1197 | 23 | |||
1198 | 24 | from swift.common.internal_proxy import InternalProxy | ||
1199 | 25 | from swift.common.daemon import Daemon | ||
1200 | 26 | from swift.common import utils | ||
1201 | 27 | |||
1202 | 28 | class LogUploader(Daemon): | ||
1203 | 29 | ''' | ||
1204 | 30 | Given a local directory, a swift account, and a container name, LogParser | ||
1205 | 31 | will upload all files in the local directory to the given account/container. | ||
1206 | 32 | All but the newest files will be uploaded, and the files' md5 sum will be | ||
1207 | 33 | computed. The hash is used to prevent duplicate data from being uploaded | ||
1208 | 34 | multiple times in different files (ex: log lines). Since the hash is | ||
1209 | 35 | computed, it is also used as the uploaded object's etag to ensure data | ||
1210 | 36 | integrity. | ||
1211 | 37 | |||
1212 | 38 | Note that after the file is successfully uploaded, it will be unlinked. | ||
1213 | 39 | |||
1214 | 40 | The given proxy server config is used to instantiate a proxy server for | ||
1215 | 41 | the object uploads. | ||
1216 | 42 | ''' | ||
1217 | 43 | |||
1218 | 44 | def __init__(self, uploader_conf, plugin_name): | ||
1219 | 45 | super(LogUploader, self).__init__(uploader_conf) | ||
1220 | 46 | log_dir = uploader_conf.get('log_dir', '/var/log/swift/') | ||
1221 | 47 | swift_account = uploader_conf['swift_account'] | ||
1222 | 48 | container_name = uploader_conf['container_name'] | ||
1223 | 49 | source_filename_format = uploader_conf['source_filename_format'] | ||
1224 | 50 | proxy_server_conf_loc = uploader_conf.get('proxy_server_conf', | ||
1225 | 51 | '/etc/swift/proxy-server.conf') | ||
1226 | 52 | proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc, | ||
1227 | 53 | name='proxy-server') | ||
1228 | 54 | if not log_dir.endswith('/'): | ||
1229 | 55 | log_dir = log_dir + '/' | ||
1230 | 56 | self.log_dir = log_dir | ||
1231 | 57 | self.swift_account = swift_account | ||
1232 | 58 | self.container_name = container_name | ||
1233 | 59 | self.filename_format = source_filename_format | ||
1234 | 60 | self.internal_proxy = InternalProxy(proxy_server_conf) | ||
1235 | 61 | log_name = 'swift-log-uploader-%s' % plugin_name | ||
1236 | 62 | self.logger = utils.get_logger(uploader_conf, plugin_name) | ||
1237 | 63 | |||
1238 | 64 | def run_once(self): | ||
1239 | 65 | self.logger.info("Uploading logs") | ||
1240 | 66 | start = time.time() | ||
1241 | 67 | self.upload_all_logs() | ||
1242 | 68 | self.logger.info("Uploading logs complete (%0.2f minutes)" % | ||
1243 | 69 | ((time.time()-start)/60)) | ||
1244 | 70 | |||
1245 | 71 | def upload_all_logs(self): | ||
1246 | 72 | i = [(self.filename_format.index(c), c) for c in '%Y %m %d %H'.split()] | ||
1247 | 73 | i.sort() | ||
1248 | 74 | year_offset = month_offset = day_offset = hour_offset = None | ||
1249 | 75 | base_offset = len(self.log_dir) | ||
1250 | 76 | for start, c in i: | ||
1251 | 77 | offset = base_offset + start | ||
1252 | 78 | if c == '%Y': | ||
1253 | 79 | year_offset = offset, offset+4 | ||
1254 | 80 | # Add in the difference between len(%Y) and the expanded | ||
1255 | 81 | # version of %Y (????). This makes sure the codes after this | ||
1256 | 82 | # one will align properly in the final filename. | ||
1257 | 83 | base_offset += 2 | ||
1258 | 84 | elif c == '%m': | ||
1259 | 85 | month_offset = offset, offset+2 | ||
1260 | 86 | elif c == '%d': | ||
1261 | 87 | day_offset = offset, offset+2 | ||
1262 | 88 | elif c == '%H': | ||
1263 | 89 | hour_offset = offset, offset+2 | ||
1264 | 90 | if not (year_offset and month_offset and day_offset and hour_offset): | ||
1265 | 91 | # don't have all the parts, can't upload anything | ||
1266 | 92 | return | ||
1267 | 93 | glob_pattern = self.filename_format | ||
1268 | 94 | glob_pattern = glob_pattern.replace('%Y', '????', 1) | ||
1269 | 95 | glob_pattern = glob_pattern.replace('%m', '??', 1) | ||
1270 | 96 | glob_pattern = glob_pattern.replace('%d', '??', 1) | ||
1271 | 97 | glob_pattern = glob_pattern.replace('%H', '??', 1) | ||
1272 | 98 | filelist = glob.iglob(os.path.join(self.log_dir, glob_pattern)) | ||
1273 | 99 | current_hour = int(time.strftime('%H')) | ||
1274 | 100 | today = int(time.strftime('%Y%m%d')) | ||
1275 | 101 | self.internal_proxy.create_container(self.swift_account, | ||
1276 | 102 | self.container_name) | ||
1277 | 103 | for filename in filelist: | ||
1278 | 104 | try: | ||
1279 | 105 | # From the filename, we need to derive the year, month, day, | ||
1280 | 106 | # and hour for the file. These values are used in the uploaded | ||
1281 | 107 | # object's name, so they should be a reasonably accurate | ||
1282 | 108 | # representation of the time for which the data in the file was | ||
1283 | 109 | # collected. The file's last modified time is not a reliable | ||
1284 | 110 | # representation of the data in the file. For example, an old | ||
1285 | 111 | # log file (from hour A) may be uploaded or moved into the | ||
1286 | 112 | # log_dir in hour Z. The file's modified time will be for hour | ||
1287 | 113 | # Z, and therefore the object's name in the system will not | ||
1288 | 114 | # represent the data in it. | ||
1289 | 115 | # If the filename doesn't match the format, it shouldn't be | ||
1290 | 116 | # uploaded. | ||
1291 | 117 | year = filename[slice(*year_offset)] | ||
1292 | 118 | month = filename[slice(*month_offset)] | ||
1293 | 119 | day = filename[slice(*day_offset)] | ||
1294 | 120 | hour = filename[slice(*hour_offset)] | ||
1295 | 121 | except IndexError: | ||
1296 | 122 | # unexpected filename format, move on | ||
1297 | 123 | self.logger.error("Unexpected log: %s" % filename) | ||
1298 | 124 | continue | ||
1299 | 125 | if (time.time() - os.stat(filename).st_mtime) < 7200: | ||
1300 | 126 | # don't process very new logs | ||
1301 | 127 | self.logger.debug("Skipping log: %s (< 2 hours old)" % filename) | ||
1302 | 128 | continue | ||
1303 | 129 | self.upload_one_log(filename, year, month, day, hour) | ||
1304 | 130 | |||
1305 | 131 | def upload_one_log(self, filename, year, month, day, hour): | ||
1306 | 132 | if os.path.getsize(filename) == 0: | ||
1307 | 133 | self.logger.debug("Log %s is 0 length, skipping" % filename) | ||
1308 | 134 | return | ||
1309 | 135 | self.logger.debug("Processing log: %s" % filename) | ||
1310 | 136 | filehash = hashlib.md5() | ||
1311 | 137 | already_compressed = True if filename.endswith('.gz') else False | ||
1312 | 138 | opener = gzip.open if already_compressed else open | ||
1313 | 139 | f = opener(filename, 'rb') | ||
1314 | 140 | try: | ||
1315 | 141 | for line in f: | ||
1316 | 142 | # filter out bad lines here? | ||
1317 | 143 | filehash.update(line) | ||
1318 | 144 | finally: | ||
1319 | 145 | f.close() | ||
1320 | 146 | filehash = filehash.hexdigest() | ||
1321 | 147 | # By adding a hash to the filename, we ensure that uploaded files | ||
1322 | 148 | # have unique filenames and protect against uploading one file | ||
1323 | 149 | # more than one time. By using md5, we get an etag for free. | ||
1324 | 150 | target_filename = '/'.join([year, month, day, hour, filehash+'.gz']) | ||
1325 | 151 | if self.internal_proxy.upload_file(filename, | ||
1326 | 152 | self.swift_account, | ||
1327 | 153 | self.container_name, | ||
1328 | 154 | target_filename, | ||
1329 | 155 | compress=(not already_compressed)): | ||
1330 | 156 | self.logger.debug("Uploaded log %s to %s" % | ||
1331 | 157 | (filename, target_filename)) | ||
1332 | 158 | os.unlink(filename) | ||
1333 | 159 | else: | ||
1334 | 160 | self.logger.error("ERROR: Upload of log %s failed!" % filename) | ||
1335 | 0 | 161 | ||
1336 | === added file 'swift/stats/stats_processor.py' | |||
1337 | --- swift/stats/stats_processor.py 1970-01-01 00:00:00 +0000 | |||
1338 | +++ swift/stats/stats_processor.py 2010-09-15 20:11:08 +0000 | |||
1339 | @@ -0,0 +1,63 @@ | |||
1340 | 1 | # Copyright (c) 2010 OpenStack, LLC. | ||
1341 | 2 | # | ||
1342 | 3 | # Licensed under the Apache License, Version 2.0 (the "License"); | ||
1343 | 4 | # you may not use this file except in compliance with the License. | ||
1344 | 5 | # You may obtain a copy of the License at | ||
1345 | 6 | # | ||
1346 | 7 | # http://www.apache.org/licenses/LICENSE-2.0 | ||
1347 | 8 | # | ||
1348 | 9 | # Unless required by applicable law or agreed to in writing, software | ||
1349 | 10 | # distributed under the License is distributed on an "AS IS" BASIS, | ||
1350 | 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | ||
1351 | 12 | # implied. | ||
1352 | 13 | # See the License for the specific language governing permissions and | ||
1353 | 14 | # limitations under the License. | ||
1354 | 15 | |||
1355 | 16 | class StatsLogProcessor(object): | ||
1356 | 17 | |||
1357 | 18 | def __init__(self, conf): | ||
1358 | 19 | pass | ||
1359 | 20 | |||
1360 | 21 | def process(self, obj_stream, account, container, object_name): | ||
1361 | 22 | '''generate hourly groupings of data from one stats log file''' | ||
1362 | 23 | account_totals = {} | ||
1363 | 24 | year, month, day, hour, _ = object_name.split('/') | ||
1364 | 25 | for line in obj_stream: | ||
1365 | 26 | if not line: | ||
1366 | 27 | continue | ||
1367 | 28 | try: | ||
1368 | 29 | (account, | ||
1369 | 30 | container_count, | ||
1370 | 31 | object_count, | ||
1371 | 32 | bytes_used) = line.split(',') | ||
1372 | 33 | account = account.strip('"') | ||
1373 | 34 | container_count = int(container_count.strip('"')) | ||
1374 | 35 | object_count = int(object_count.strip('"')) | ||
1375 | 36 | bytes_used = int(bytes_used.strip('"')) | ||
1376 | 37 | aggr_key = (account, year, month, day, hour) | ||
1377 | 38 | d = account_totals.get(aggr_key, {}) | ||
1378 | 39 | d['replica_count'] = d.setdefault('replica_count', 0) + 1 | ||
1379 | 40 | d['container_count'] = d.setdefault('container_count', 0) + \ | ||
1380 | 41 | container_count | ||
1381 | 42 | d['object_count'] = d.setdefault('object_count', 0) + \ | ||
1382 | 43 | object_count | ||
1383 | 44 | d['bytes_used'] = d.setdefault('bytes_used', 0) + \ | ||
1384 | 45 | bytes_used | ||
1385 | 46 | account_totals[aggr_key] = d | ||
1386 | 47 | except (IndexError, ValueError): | ||
1387 | 48 | # bad line data | ||
1388 | 49 | pass | ||
1389 | 50 | return account_totals | ||
1390 | 51 | |||
1391 | 52 | def keylist_mapping(self): | ||
1392 | 53 | ''' | ||
1393 | 54 | returns a dictionary of final keys mapped to source keys | ||
1394 | 55 | ''' | ||
1395 | 56 | keylist_mapping = { | ||
1396 | 57 | # <db key> : <row key> or <set of row keys> | ||
1397 | 58 | 'bytes_used': 'bytes_used', | ||
1398 | 59 | 'container_count': 'container_count', | ||
1399 | 60 | 'object_count': 'object_count', | ||
1400 | 61 | 'replica_count': 'replica_count', | ||
1401 | 62 | } | ||
1402 | 63 | return keylist_mapping | ||
1403 | 0 | 64 | ||
1404 | === added directory 'test/unit/stats' | |||
1405 | === added file 'test/unit/stats/__init__.py' | |||
1406 | === added file 'test/unit/stats/test_log_processor.py' | |||
1407 | --- test/unit/stats/test_log_processor.py 1970-01-01 00:00:00 +0000 | |||
1408 | +++ test/unit/stats/test_log_processor.py 2010-09-15 20:11:08 +0000 | |||
1409 | @@ -0,0 +1,161 @@ | |||
1410 | 1 | import unittest | ||
1411 | 2 | |||
1412 | 3 | from swift.stats import log_processor | ||
1413 | 4 | |||
1414 | 5 | class DumbLogger(object): | ||
1415 | 6 | def __getattr__(self, n): | ||
1416 | 7 | return self.foo | ||
1417 | 8 | |||
1418 | 9 | def foo(self, *a, **kw): | ||
1419 | 10 | pass | ||
1420 | 11 | |||
1421 | 12 | class DumbInternalProxy(object): | ||
1422 | 13 | def get_container_list(self, account, container, marker=None): | ||
1423 | 14 | n = '2010/03/14/13/obj1' | ||
1424 | 15 | if marker is None or n > marker: | ||
1425 | 16 | return [{'name': n}] | ||
1426 | 17 | else: | ||
1427 | 18 | return [] | ||
1428 | 19 | |||
1429 | 20 | def get_object(self, account, container, object_name): | ||
1430 | 21 | code = 200 | ||
1431 | 22 | if object_name.endswith('.gz'): | ||
1432 | 23 | # same data as below, compressed with gzip -9 | ||
1433 | 24 | def data(): | ||
1434 | 25 | yield '\x1f\x8b\x08' | ||
1435 | 26 | yield '\x08"\xd79L' | ||
1436 | 27 | yield '\x02\x03te' | ||
1437 | 28 | yield 'st\x00\xcbO' | ||
1438 | 29 | yield '\xca\xe2JI,I' | ||
1439 | 30 | yield '\xe4\x02\x00O\xff' | ||
1440 | 31 | yield '\xa3Y\t\x00\x00\x00' | ||
1441 | 32 | else: | ||
1442 | 33 | def data(): | ||
1443 | 34 | yield 'obj\n' | ||
1444 | 35 | yield 'data' | ||
1445 | 36 | return code, data() | ||
1446 | 37 | |||
1447 | 38 | class TestLogProcessor(unittest.TestCase): | ||
1448 | 39 | |||
1449 | 40 | access_test_line = 'Jul 9 04:14:30 saio proxy 1.2.3.4 4.5.6.7 '\ | ||
1450 | 41 | '09/Jul/2010/04/14/30 GET '\ | ||
1451 | 42 | '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\ | ||
1452 | 43 | 'curl tk4e350daf-9338-4cc6-aabb-090e49babfbd '\ | ||
1453 | 44 | '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262' | ||
1454 | 45 | stats_test_line = 'account,1,2,3' | ||
1455 | 46 | proxy_config = {'log-processor': { | ||
1456 | 47 | |||
1457 | 48 | } | ||
1458 | 49 | } | ||
1459 | 50 | |||
1460 | 51 | def test_access_log_line_parser(self): | ||
1461 | 52 | access_proxy_config = self.proxy_config | ||
1462 | 53 | access_proxy_config.update({ | ||
1463 | 54 | 'log-processor-access': { | ||
1464 | 55 | 'source_filename_format':'%Y%m%d%H*', | ||
1465 | 56 | 'class_path': | ||
1466 | 57 | 'swift.stats.access_processor.AccessLogProcessor' | ||
1467 | 58 | }}) | ||
1468 | 59 | p = log_processor.LogProcessor(access_proxy_config, DumbLogger()) | ||
1469 | 60 | result = p.plugins['access']['instance'].log_line_parser(self.access_test_line) | ||
1470 | 61 | self.assertEquals(result, {'code': 200, | ||
1471 | 62 | 'processing_time': '0.0262', | ||
1472 | 63 | 'auth_token': 'tk4e350daf-9338-4cc6-aabb-090e49babfbd', | ||
1473 | 64 | 'month': '07', | ||
1474 | 65 | 'second': '30', | ||
1475 | 66 | 'year': '2010', | ||
1476 | 67 | 'query': 'format=json&foo', | ||
1477 | 68 | 'tz': '+0000', | ||
1478 | 69 | 'http_version': 'HTTP/1.0', | ||
1479 | 70 | 'object_name': 'bar', | ||
1480 | 71 | 'etag': '-', | ||
1481 | 72 | 'foo': 1, | ||
1482 | 73 | 'method': 'GET', | ||
1483 | 74 | 'trans_id': 'txfa431231-7f07-42fd-8fc7-7da9d8cc1f90', | ||
1484 | 75 | 'client_ip': '1.2.3.4', | ||
1485 | 76 | 'format': 1, | ||
1486 | 77 | 'bytes_out': 95, | ||
1487 | 78 | 'container_name': 'foo', | ||
1488 | 79 | 'day': '09', | ||
1489 | 80 | 'minute': '14', | ||
1490 | 81 | 'account': 'acct', | ||
1491 | 82 | 'hour': '04', | ||
1492 | 83 | 'referrer': '-', | ||
1493 | 84 | 'request': '/v1/acct/foo/bar', | ||
1494 | 85 | 'user_agent': 'curl', | ||
1495 | 86 | 'bytes_in': 6, | ||
1496 | 87 | 'lb_ip': '4.5.6.7'}) | ||
1497 | 88 | |||
1498 | 89 | def test_process_one_access_file(self): | ||
1499 | 90 | access_proxy_config = self.proxy_config | ||
1500 | 91 | access_proxy_config.update({ | ||
1501 | 92 | 'log-processor-access': { | ||
1502 | 93 | 'source_filename_format':'%Y%m%d%H*', | ||
1503 | 94 | 'class_path': | ||
1504 | 95 | 'swift.stats.access_processor.AccessLogProcessor' | ||
1505 | 96 | }}) | ||
1506 | 97 | p = log_processor.LogProcessor(access_proxy_config, DumbLogger()) | ||
1507 | 98 | def get_object_data(*a, **kw): | ||
1508 | 99 | return [self.access_test_line] | ||
1509 | 100 | p.get_object_data = get_object_data | ||
1510 | 101 | result = p.process_one_file('access', 'a', 'c', 'o') | ||
1511 | 102 | expected = {('acct', '2010', '07', '09', '04'): | ||
1512 | 103 | {('public', 'object', 'GET', '2xx'): 1, | ||
1513 | 104 | ('public', 'bytes_out'): 95, | ||
1514 | 105 | 'marker_query': 0, | ||
1515 | 106 | 'format_query': 1, | ||
1516 | 107 | 'delimiter_query': 0, | ||
1517 | 108 | 'path_query': 0, | ||
1518 | 109 | ('public', 'bytes_in'): 6, | ||
1519 | 110 | 'prefix_query': 0}} | ||
1520 | 111 | self.assertEquals(result, expected) | ||
1521 | 112 | |||
1522 | 113 | def test_get_container_listing(self): | ||
1523 | 114 | p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) | ||
1524 | 115 | p.internal_proxy = DumbInternalProxy() | ||
1525 | 116 | result = p.get_container_listing('a', 'foo') | ||
1526 | 117 | expected = ['2010/03/14/13/obj1'] | ||
1527 | 118 | self.assertEquals(result, expected) | ||
1528 | 119 | result = p.get_container_listing('a', 'foo', listing_filter=expected) | ||
1529 | 120 | expected = [] | ||
1530 | 121 | self.assertEquals(result, expected) | ||
1531 | 122 | result = p.get_container_listing('a', 'foo', start_date='2010031412', | ||
1532 | 123 | end_date='2010031414') | ||
1533 | 124 | expected = ['2010/03/14/13/obj1'] | ||
1534 | 125 | self.assertEquals(result, expected) | ||
1535 | 126 | result = p.get_container_listing('a', 'foo', start_date='2010031414') | ||
1536 | 127 | expected = [] | ||
1537 | 128 | self.assertEquals(result, expected) | ||
1538 | 129 | result = p.get_container_listing('a', 'foo', start_date='2010031410', | ||
1539 | 130 | end_date='2010031412') | ||
1540 | 131 | expected = [] | ||
1541 | 132 | self.assertEquals(result, expected) | ||
1542 | 133 | |||
1543 | 134 | def test_get_object_data(self): | ||
1544 | 135 | p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) | ||
1545 | 136 | p.internal_proxy = DumbInternalProxy() | ||
1546 | 137 | result = list(p.get_object_data('a', 'c', 'o', False)) | ||
1547 | 138 | expected = ['obj','data'] | ||
1548 | 139 | self.assertEquals(result, expected) | ||
1549 | 140 | result = list(p.get_object_data('a', 'c', 'o.gz', True)) | ||
1550 | 141 | self.assertEquals(result, expected) | ||
1551 | 142 | |||
1552 | 143 | def test_get_stat_totals(self): | ||
1553 | 144 | stats_proxy_config = self.proxy_config | ||
1554 | 145 | stats_proxy_config.update({ | ||
1555 | 146 | 'log-processor-stats': { | ||
1556 | 147 | 'class_path': | ||
1557 | 148 | 'swift.stats.stats_processor.StatsLogProcessor' | ||
1558 | 149 | }}) | ||
1559 | 150 | p = log_processor.LogProcessor(stats_proxy_config, DumbLogger()) | ||
1560 | 151 | p.internal_proxy = DumbInternalProxy() | ||
1561 | 152 | def get_object_data(*a,**kw): | ||
1562 | 153 | return [self.stats_test_line] | ||
1563 | 154 | p.get_object_data = get_object_data | ||
1564 | 155 | result = p.process_one_file('stats', 'a', 'c', 'y/m/d/h/o') | ||
1565 | 156 | expected = {'account': | ||
1566 | 157 | {'count': 1, | ||
1567 | 158 | 'object_count': 2, | ||
1568 | 159 | 'container_count': 1, | ||
1569 | 160 | 'bytes_used': 3}} | ||
1570 | 161 | self.assertEquals(result, expected) | ||
1571 | 0 | \ No newline at end of file | 162 | \ No newline at end of file |
Just a couple of quick comments:
1. The bins should be refactored to be more DRY like the current bins
2. pep8 :)