Merge lp:~notmyname/swift/stats_system into lp:~hudson-openstack/swift/trunk

Proposed by John Dickinson
Status: Superseded
Proposed branch: lp:~notmyname/swift/stats_system
Merge into: lp:~hudson-openstack/swift/trunk
Diff against target: 1736 lines (+1615/-12)
15 files modified
bin/swift-account-stats-logger (+27/-0)
bin/swift-log-stats-collector (+27/-0)
bin/swift-log-uploader (+31/-0)
doc/source/overview_stats.rst (+79/-0)
etc/log-processing.conf-sample (+38/-0)
swift/common/compressing_file_reader.py (+71/-0)
swift/common/daemon.py (+5/-2)
swift/common/internal_proxy.py (+185/-0)
swift/common/utils.py (+17/-10)
swift/stats/access_processor.py (+226/-0)
swift/stats/account_stats.py (+100/-0)
swift/stats/log_processor.py (+413/-0)
swift/stats/log_uploader.py (+170/-0)
swift/stats/stats_processor.py (+65/-0)
test/unit/stats/test_log_processor.py (+161/-0)
To merge this branch: bzr merge lp:~notmyname/swift/stats_system
Reviewer Review Type Date Requested Status
Swift Core security contacts Pending
Review via email: mp+36485@code.launchpad.net

This proposal supersedes a proposal from 2010-09-22.

This proposal has been superseded by a proposal from 2010-09-29.

Description of the change

work in progress of moving the existing internal (pre-openstack) swift stats system to (openstack) swift

To post a comment you must log in.
Revision history for this message
Chuck Thier (cthier) wrote : Posted in a previous version of this proposal

Just a couple of quick comments:

1. The bins should be refactored to be more DRY like the current bins
2. pep8 :)

Revision history for this message
John Dickinson (notmyname) wrote : Posted in a previous version of this proposal

updated to be more DRY compatible

Revision history for this message
John Dickinson (notmyname) wrote : Posted in a previous version of this proposal

merged with recent trunk updates (resolving one conflict) in order to make final merge easier

Revision history for this message
John Dickinson (notmyname) wrote : Posted in a previous version of this proposal

fixed unit test error

Revision history for this message
John Dickinson (notmyname) wrote : Posted in a previous version of this proposal

updated with issues that came up in code review

Revision history for this message
Chuck Thier (cthier) wrote :

Can we add docs on how to run this with the saio?

lp:~notmyname/swift/stats_system updated
99. By John Dickinson

added additional docs

100. By John Dickinson

documentation clarification and pep8 fixes

101. By John Dickinson

added overview stats to the doc index

102. By John Dickinson

made long lines wrap (grr pep8)

103. By John Dickinson

merged with trunk

104. By John Dickinson

merged with trunk

105. By John Dickinson

merged with trunk

106. By John Dickinson

fixed stats docs

107. By John Dickinson

added a bad lines check to the access log parser

108. By John Dickinson

added tests for compressing file reader

109. By John Dickinson

fixed compressing file reader test

110. By John Dickinson

fixed compressing file reader test

111. By John Dickinson

improved compressing file reader test

112. By John Dickinson

fixed compressing file reader test

113. By John Dickinson

made try/except much less inclusive in access log processor

114. By John Dickinson

added keylist mapping tests and fixed other tests

115. By John Dickinson

updated stats system tests

116. By John Dickinson

merged with gholt's test stubs

117. By John Dickinson

updated SAIO instructions for the stats system

118. By John Dickinson

fixed stats system saio docs

119. By John Dickinson

fixed stats system saio docs

120. By John Dickinson

added openstack copyright/license to test_log_processor.py

121. By John Dickinson

improved logging in log processors

122. By John Dickinson

merged with trunk

123. By John Dickinson

fixed missing working directory bug in account stats

124. By John Dickinson

fixed readconf parameter that was broken with a previous merge

125. By John Dickinson

updated setup.py and saio docs for syats system

126. By John Dickinson

added readconf unit test

127. By John Dickinson

updated stats saio docs to create logs with the appropriate permissions

128. By John Dickinson

fixed bug in log processor internal proxy lazy load code

129. By John Dickinson

updated readconf test

130. By John Dickinson

updated readconf test

131. By John Dickinson

updated readconf test

132. By John Dickinson

fixed internal proxy references in log processor

133. By John Dickinson

fixed account stats filename creation

134. By John Dickinson

pep8 tomfoolery

135. By John Dickinson

moved paren

136. By John Dickinson

added lazy load of internal proxy to log processor (you were right clay)

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'bin/swift-account-stats-logger'
2--- bin/swift-account-stats-logger 1970-01-01 00:00:00 +0000
3+++ bin/swift-account-stats-logger 2010-09-23 18:31:58 +0000
4@@ -0,0 +1,27 @@
5+#!/usr/bin/python
6+# Copyright (c) 2010 OpenStack, LLC.
7+#
8+# Licensed under the Apache License, Version 2.0 (the "License");
9+# you may not use this file except in compliance with the License.
10+# You may obtain a copy of the License at
11+#
12+# http://www.apache.org/licenses/LICENSE-2.0
13+#
14+# Unless required by applicable law or agreed to in writing, software
15+# distributed under the License is distributed on an "AS IS" BASIS,
16+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
17+# implied.
18+# See the License for the specific language governing permissions and
19+# limitations under the License.
20+
21+import sys
22+
23+from swift.stats.account_stats import AccountStat
24+from swift.common import utils
25+
26+if __name__ == '__main__':
27+ if len(sys.argv) < 2:
28+ print "Usage: swift-account-stats-logger CONFIG_FILE"
29+ sys.exit()
30+ stats_conf = utils.readconf(sys.argv[1], 'log-processor-stats')
31+ stats = AccountStat(stats_conf).run(once=True)
32
33=== added file 'bin/swift-log-stats-collector'
34--- bin/swift-log-stats-collector 1970-01-01 00:00:00 +0000
35+++ bin/swift-log-stats-collector 2010-09-23 18:31:58 +0000
36@@ -0,0 +1,27 @@
37+#!/usr/bin/python
38+# Copyright (c) 2010 OpenStack, LLC.
39+#
40+# Licensed under the Apache License, Version 2.0 (the "License");
41+# you may not use this file except in compliance with the License.
42+# You may obtain a copy of the License at
43+#
44+# http://www.apache.org/licenses/LICENSE-2.0
45+#
46+# Unless required by applicable law or agreed to in writing, software
47+# distributed under the License is distributed on an "AS IS" BASIS,
48+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
49+# implied.
50+# See the License for the specific language governing permissions and
51+# limitations under the License.
52+
53+import sys
54+
55+from swift.stats.log_processor import LogProcessorDaemon
56+from swift.common import utils
57+
58+if __name__ == '__main__':
59+ if len(sys.argv) < 2:
60+ print "Usage: swift-log-stats-collector CONFIG_FILE"
61+ sys.exit()
62+ conf = utils.readconf(sys.argv[1], log_name='log-stats-collector')
63+ stats = LogProcessorDaemon(conf).run(once=True)
64
65=== added file 'bin/swift-log-uploader'
66--- bin/swift-log-uploader 1970-01-01 00:00:00 +0000
67+++ bin/swift-log-uploader 2010-09-23 18:31:58 +0000
68@@ -0,0 +1,31 @@
69+#!/usr/bin/python
70+# Copyright (c) 2010 OpenStack, LLC.
71+#
72+# Licensed under the Apache License, Version 2.0 (the "License");
73+# you may not use this file except in compliance with the License.
74+# You may obtain a copy of the License at
75+#
76+# http://www.apache.org/licenses/LICENSE-2.0
77+#
78+# Unless required by applicable law or agreed to in writing, software
79+# distributed under the License is distributed on an "AS IS" BASIS,
80+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
81+# implied.
82+# See the License for the specific language governing permissions and
83+# limitations under the License.
84+
85+import sys
86+
87+from swift.stats.log_uploader import LogUploader
88+from swift.common import utils
89+
90+if __name__ == '__main__':
91+ if len(sys.argv) < 3:
92+ print "Usage: swift-log-uploader CONFIG_FILE plugin"
93+ sys.exit()
94+ uploader_conf = utils.readconf(sys.argv[1], 'log-processor')
95+ plugin = sys.argv[2]
96+ section_name = 'log-processor-%s' % plugin
97+ plugin_conf = utils.readconf(sys.argv[1], section_name)
98+ uploader_conf.update(plugin_conf)
99+ uploader = LogUploader(uploader_conf, plugin).run(once=True)
100
101=== added file 'doc/source/overview_stats.rst'
102--- doc/source/overview_stats.rst 1970-01-01 00:00:00 +0000
103+++ doc/source/overview_stats.rst 2010-09-23 18:31:58 +0000
104@@ -0,0 +1,79 @@
105+==================
106+Swift stats system
107+==================
108+
109+The swift stats system is composed of three parts parts: log creation, log
110+uploading, and log processing. The system handles two types of logs (access
111+and storage stats), but it can be extended to handle other types of logs.
112+
113+---------
114+Log Types
115+---------
116+
117+***********
118+Access logs
119+***********
120+
121+Access logs are the proxy server logs.
122+
123+******************
124+Storage stats logs
125+******************
126+
127+Storage logs (also referred to as stats logs) are generated by a stats system
128+process. swift-account-stats-logger runs on each account server (via cron) and
129+walks the filesystem looking for account databases. When an account database
130+is found, the logger selects the account hash, bytes_used, container_count,
131+and object_count. These values are then written out as one line in a csv file.
132+One csv file is produced for every run of swift-account-stats-logger. This
133+means that, system wide, one csv file is produced for every storage node.
134+Rackspace runs the account stats logger every hour. Therefore, in a cluster of
135+ten account servers, ten csv files are produced every hour. Also, every
136+account will have one entry for every replica in the system. On average, there
137+will be three copies of each account in the aggreagate of all account stat csv
138+files created in one system-wide run.
139+
140+----------------------
141+Log Processing plugins
142+----------------------
143+
144+The swift stats system is written to allow a plugin to be defined for every
145+log type. Swift includes plugins for both access logs and storage stats logs.
146+Each plugin is responsible for defining, in a config section, where the logs
147+are stored on disk, where the logs will be stored in swift (account and
148+container), the filename format of the logs on disk, the location of the
149+plugin class definition, and any plugin-specific config values.
150+
151+The plugin class definition defines three methods. The constuctor must accept
152+one argument (the dict representation of the plugin's config section). The
153+process method must accept an iterator, and the account, container, and object
154+name of the log. The keylist_mapping accepts no parameters.
155+
156+-------------
157+Log Uploading
158+-------------
159+
160+swift-log-uploader accepts a config file and a plugin name. It finds the log
161+files on disk according to the plugin config section and uploads them to the
162+swift cluster. This means one uploader process will run on each proxy server
163+node and each account server node. To not upload partially-written log files,
164+the uploader will not upload files with an mtime of less than two hours ago.
165+Rackspace runs this process once an hour via cron.
166+
167+--------------
168+Log Processing
169+--------------
170+
171+swift-log-stats-collector accepts a config file and generates a csv that is
172+uploaded to swift. It loads all plugins defined in the config file, generates
173+a list of all log files in swift that need to be processed, and passes an
174+iterable of the log file data to the appropriate plugin's process method. The
175+process method returns a dictionary of data in the log file keyed on (account,
176+year, month, day, hour). The log-stats-collector process then combines all
177+dictionaries from all calls to a process method into one dictionary. Key
178+collisions within each (account, year, month, day, hour) dictionary are
179+summed. Finally, the summed dictionary is mapped to the final csv values with
180+each plugin's keylist_mapping method.
181+
182+The resulting csv file has one line per (account, year, month, day, hour) for
183+all log files processed in that run of swift-log-stats-collector.
184
185=== added file 'etc/log-processing.conf-sample'
186--- etc/log-processing.conf-sample 1970-01-01 00:00:00 +0000
187+++ etc/log-processing.conf-sample 2010-09-23 18:31:58 +0000
188@@ -0,0 +1,38 @@
189+# plugin section format is named "log-processor-<plugin>"
190+
191+[log-processor]
192+swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
193+# container_name = log_processing_data
194+# proxy_server_conf = /etc/swift/proxy-server.conf
195+# log_facility = LOG_LOCAL0
196+# log_level = INFO
197+# lookback_hours = 120
198+# lookback_window = 120
199+# user = swift
200+
201+[log-processor-access]
202+# log_dir = /var/log/swift/
203+swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
204+container_name = log_data
205+source_filename_format = access-%Y%m%d%H
206+# new_log_cutoff = 7200
207+# unlink_log = True
208+class_path = swift.stats.access_processor.AccessLogProcessor
209+# service ips is for client ip addresses that should be counted as servicenet
210+# service_ips =
211+# load balancer private ips is for load balancer ip addresses that should be
212+# counted as servicenet
213+# lb_private_ips =
214+# server_name = proxy
215+# user = swift
216+
217+[log-processor-stats]
218+# log_dir = /var/log/swift/
219+swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
220+container_name = account_stats
221+source_filename_format = stats-%Y%m%d%H_*
222+# new_log_cutoff = 7200
223+# unlink_log = True
224+class_path = swift.stats.stats_processor.StatsLogProcessor
225+# account_server_conf = /etc/swift/account-server.conf
226+# user = swift
227\ No newline at end of file
228
229=== added file 'swift/common/compressing_file_reader.py'
230--- swift/common/compressing_file_reader.py 1970-01-01 00:00:00 +0000
231+++ swift/common/compressing_file_reader.py 2010-09-23 18:31:58 +0000
232@@ -0,0 +1,71 @@
233+# Copyright (c) 2010 OpenStack, LLC.
234+#
235+# Licensed under the Apache License, Version 2.0 (the "License");
236+# you may not use this file except in compliance with the License.
237+# You may obtain a copy of the License at
238+#
239+# http://www.apache.org/licenses/LICENSE-2.0
240+#
241+# Unless required by applicable law or agreed to in writing, software
242+# distributed under the License is distributed on an "AS IS" BASIS,
243+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
244+# implied.
245+# See the License for the specific language governing permissions and
246+# limitations under the License.
247+
248+import zlib
249+import struct
250+
251+class CompressingFileReader(object):
252+ '''
253+ Wraps a file object and provides a read method that returns gzip'd data.
254+
255+ One warning: if read is called with a small value, the data returned may
256+ be bigger than the value. In this case, the "compressed" data will be
257+ bigger than the original data. To solve this, use a bigger read buffer.
258+
259+ An example use case:
260+ Given an uncompressed file on disk, provide a way to read compressed data
261+ without buffering the entire file data in memory. Using this class, an
262+ uncompressed log file could be uploaded as compressed data with chunked
263+ transfer encoding.
264+
265+ gzip header and footer code taken from the python stdlib gzip module
266+
267+ :param file_obj: File object to read from
268+ :param compresslevel: compression level
269+ '''
270+ def __init__(self, file_obj, compresslevel=9):
271+ self._f = file_obj
272+ self._compressor = zlib.compressobj(compresslevel,
273+ zlib.DEFLATED,
274+ -zlib.MAX_WBITS,
275+ zlib.DEF_MEM_LEVEL,
276+ 0)
277+ self.done = False
278+ self.first = True
279+ self.crc32 = 0
280+ self.total_size = 0
281+
282+ def read(self, *a, **kw):
283+ if self.done:
284+ return ''
285+ x = self._f.read(*a, **kw)
286+ if x:
287+ self.crc32 = zlib.crc32(x, self.crc32) & 0xffffffffL
288+ self.total_size += len(x)
289+ compressed = self._compressor.compress(x)
290+ if not compressed:
291+ compressed = self._compressor.flush(zlib.Z_SYNC_FLUSH)
292+ else:
293+ compressed = self._compressor.flush(zlib.Z_FINISH)
294+ crc32 = struct.pack("<L", self.crc32 & 0xffffffffL)
295+ size = struct.pack("<L", self.total_size & 0xffffffffL)
296+ footer = crc32 + size
297+ compressed += footer
298+ self.done = True
299+ if self.first:
300+ self.first = False
301+ header = '\037\213\010\000\000\000\000\000\002\377'
302+ compressed = header + compressed
303+ return compressed
304
305=== modified file 'swift/common/daemon.py'
306--- swift/common/daemon.py 2010-09-16 03:52:54 +0000
307+++ swift/common/daemon.py 2010-09-23 18:31:58 +0000
308@@ -34,12 +34,15 @@
309 """Override this to run forever"""
310 raise NotImplementedError('run_forever not implemented')
311
312- def run(self, once=False):
313+ def run(self, once=False, capture_stdout=True, capture_stderr=True):
314 """Run the daemon"""
315 # log uncaught exceptions
316 sys.excepthook = lambda *exc_info: \
317 self.logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
318- sys.stdout = sys.stderr = utils.LoggerFileObject(self.logger)
319+ if capture_stdout:
320+ sys.stdout = utils.LoggerFileObject(self.logger)
321+ if capture_stderr:
322+ sys.stderr = utils.LoggerFileObject(self.logger)
323
324 utils.drop_privileges(self.conf.get('user', 'swift'))
325
326
327=== added file 'swift/common/internal_proxy.py'
328--- swift/common/internal_proxy.py 1970-01-01 00:00:00 +0000
329+++ swift/common/internal_proxy.py 2010-09-23 18:31:58 +0000
330@@ -0,0 +1,185 @@
331+# Copyright (c) 2010 OpenStack, LLC.
332+#
333+# Licensed under the Apache License, Version 2.0 (the "License");
334+# you may not use this file except in compliance with the License.
335+# You may obtain a copy of the License at
336+#
337+# http://www.apache.org/licenses/LICENSE-2.0
338+#
339+# Unless required by applicable law or agreed to in writing, software
340+# distributed under the License is distributed on an "AS IS" BASIS,
341+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
342+# implied.
343+# See the License for the specific language governing permissions and
344+# limitations under the License.
345+
346+import webob
347+from urllib import quote, unquote
348+from json import loads as json_loads
349+
350+from swift.common.compressing_file_reader import CompressingFileReader
351+from swift.proxy.server import BaseApplication
352+
353+class MemcacheStub(object):
354+ def get(self, *a, **kw): return None
355+ def set(self, *a, **kw): return None
356+ def incr(self, *a, **kw): return 0
357+ def delete(self, *a, **kw): return None
358+ def set_multi(self, *a, **kw): return None
359+ def get_multi(self, *a, **kw): return []
360+
361+class InternalProxy(object):
362+ """
363+ Set up a private instance of a proxy server that allows normal requests
364+ to be made without having to actually send the request to the proxy.
365+ This also doesn't log the requests to the normal proxy logs.
366+
367+ :param proxy_server_conf: proxy server configuration dictionary
368+ :param logger: logger to log requests to
369+ :param retries: number of times to retry each request
370+ """
371+ def __init__(self, proxy_server_conf=None, logger=None, retries=0):
372+ self.upload_app = BaseApplication(proxy_server_conf,
373+ memcache=MemcacheStub(),
374+ logger=logger)
375+ self.retries = retries
376+
377+ def upload_file(self, source_file, account, container, object_name,
378+ compress=True, content_type='application/x-gzip',
379+ etag=None):
380+ """
381+ Upload a file to cloud files.
382+
383+ :param source_file: path to or file like object to upload
384+ :param account: account to upload to
385+ :param container: container to upload to
386+ :param object_name: name of object being uploaded
387+ :param compress: if True, compresses object as it is uploaded
388+ :param content_type: content-type of object
389+ :returns: True if successful, False otherwise
390+ """
391+ log_create_pattern = '/v1/%s/%s/%s' % (account, container, object_name)
392+
393+ # create the container
394+ if not self.create_container(account, container):
395+ return False
396+
397+ # upload the file to the account
398+ req = webob.Request.blank(log_create_pattern,
399+ environ={'REQUEST_METHOD': 'PUT'},
400+ headers={'Transfer-Encoding': 'chunked'})
401+ if compress:
402+ if hasattr(source_file, 'read'):
403+ compressed_file = CompressingFileReader(source_file)
404+ else:
405+ compressed_file = CompressingFileReader(open(source_file, 'rb'))
406+ req.body_file = compressed_file
407+ else:
408+ if not hasattr(source_file, 'read'):
409+ source_file = open(source_file, 'rb')
410+ req.body_file = source_file
411+ req.account = account
412+ req.content_type = content_type
413+ req.content_length = None # to make sure we send chunked data
414+ if etag:
415+ req.etag = etag
416+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
417+ tries = 1
418+ while (resp.status_int < 200 or resp.status_int > 299) \
419+ and tries <= self.retries:
420+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
421+ tries += 1
422+ if not (200 <= resp.status_int < 300):
423+ return False
424+ return True
425+
426+ def get_object(self, account, container, object_name):
427+ """
428+ Get object.
429+
430+ :param account: account name object is in
431+ :param container: container name object is in
432+ :param object_name: name of object to get
433+ :returns: iterator for object data
434+ """
435+ req = webob.Request.blank('/v1/%s/%s/%s' %
436+ (account, container, object_name),
437+ environ={'REQUEST_METHOD': 'GET'})
438+ req.account = account
439+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
440+ tries = 1
441+ while (resp.status_int < 200 or resp.status_int > 299) \
442+ and tries <= self.retries:
443+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
444+ tries += 1
445+ return resp.status_int, resp.app_iter
446+
447+ def create_container(self, account, container):
448+ """
449+ Create container.
450+
451+ :param account: account name to put the container in
452+ :param container: container name to create
453+ :returns: True if successful, otherwise False
454+ """
455+ req = webob.Request.blank('/v1/%s/%s' % (account, container),
456+ environ={'REQUEST_METHOD': 'PUT'})
457+ req.account = account
458+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
459+ tries = 1
460+ while (resp.status_int < 200 or resp.status_int > 299) \
461+ and tries <= self.retries:
462+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
463+ tries += 1
464+ return 200 <= resp.status_int < 300
465+
466+ def get_container_list(self, account, container, marker=None, limit=None,
467+ prefix=None, delimiter=None, full_listing=True):
468+ """
469+ Get container listing.
470+
471+ :param account: account name for the container
472+ :param container: container name to get the listing of
473+ :param marker: marker query
474+ :param limit: limit to query
475+ :param prefix: prefix query
476+ :param delimeter: delimeter for query
477+ :param full_listing: if True, make enough requests to get all listings
478+ :returns: list of objects
479+ """
480+ if full_listing:
481+ rv = []
482+ listing = self.get_container_list(account, container, marker,
483+ limit, prefix, delimiter, full_listing=False)
484+ while listing:
485+ rv.extend(listing)
486+ if not delimiter:
487+ marker = listing[-1]['name']
488+ else:
489+ marker = listing[-1].get('name', listing[-1].get('subdir'))
490+ listing = self.get_container_list(account, container, marker,
491+ limit, prefix, delimiter, full_listing=False)
492+ return rv
493+ path = '/v1/%s/%s' % (account, container)
494+ qs = 'format=json'
495+ if marker:
496+ qs += '&marker=%s' % quote(marker)
497+ if limit:
498+ qs += '&limit=%d' % limit
499+ if prefix:
500+ qs += '&prefix=%s' % quote(prefix)
501+ if delimiter:
502+ qs += '&delimiter=%s' % quote(delimiter)
503+ path += '?%s' % qs
504+ req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'GET'})
505+ req.account = account
506+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
507+ tries = 1
508+ while (resp.status_int < 200 or resp.status_int > 299) \
509+ and tries <= self.retries:
510+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
511+ tries += 1
512+ if resp.status_int == 204:
513+ return []
514+ if 200 <= resp.status_int < 300:
515+ return json_loads(resp.body)
516
517=== modified file 'swift/common/utils.py'
518--- swift/common/utils.py 2010-09-23 16:09:30 +0000
519+++ swift/common/utils.py 2010-09-23 18:31:58 +0000
520@@ -552,13 +552,13 @@
521 """
522 return item_from_env(env, 'swift.cache')
523
524-
525 def readconf(conf, section_name, log_name=None):
526 """
527 Read config file and return config items as a dict
528
529 :param conf: path to config file
530- :param section_name: config section to read
531+ :param section_name: config section to read (will return all sections if
532+ not defined)
533 :param log_name: name to be used with logging (will use section_name if
534 not defined)
535 :returns: dict of config items
536@@ -567,16 +567,23 @@
537 if not c.read(conf):
538 print "Unable to read config file %s" % conf
539 sys.exit(1)
540- if c.has_section(section_name):
541- conf = dict(c.items(section_name))
542+ if section_name:
543+ if c.has_section(section_name):
544+ conf = dict(c.items(section_name))
545+ else:
546+ print "Unable to find %s config section in %s" % (section_name, conf)
547+ sys.exit(1)
548+ if "log_name" not in conf:
549+ if log_name is not None:
550+ conf['log_name'] = log_name
551+ else:
552+ conf['log_name'] = section_name
553 else:
554- print "Unable to find %s config section in %s" % (section_name, conf)
555- sys.exit(1)
556- if "log_name" not in conf:
557- if log_name is not None:
558+ conf = {}
559+ for s in c.sections():
560+ conf.update({s:dict(c.items(s))})
561+ if 'log_name' not in conf:
562 conf['log_name'] = log_name
563- else:
564- conf['log_name'] = section_name
565 return conf
566
567
568
569=== added directory 'swift/stats'
570=== added file 'swift/stats/__init__.py'
571=== added file 'swift/stats/access_processor.py'
572--- swift/stats/access_processor.py 1970-01-01 00:00:00 +0000
573+++ swift/stats/access_processor.py 2010-09-23 18:31:58 +0000
574@@ -0,0 +1,226 @@
575+# Copyright (c) 2010 OpenStack, LLC.
576+#
577+# Licensed under the Apache License, Version 2.0 (the "License");
578+# you may not use this file except in compliance with the License.
579+# You may obtain a copy of the License at
580+#
581+# http://www.apache.org/licenses/LICENSE-2.0
582+#
583+# Unless required by applicable law or agreed to in writing, software
584+# distributed under the License is distributed on an "AS IS" BASIS,
585+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
586+# implied.
587+# See the License for the specific language governing permissions and
588+# limitations under the License.
589+
590+import collections
591+from urllib import unquote
592+import copy
593+
594+from swift.common.utils import split_path
595+
596+month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split()
597+
598+
599+class AccessLogProcessor(object):
600+ """Transform proxy server access logs"""
601+
602+ def __init__(self, conf):
603+ self.server_name = conf.get('server_name', 'proxy')
604+ self.lb_private_ips = [x.strip() for x in \
605+ conf.get('lb_private_ips', '').split(',')\
606+ if x.strip()]
607+ self.service_ips = [x.strip() for x in \
608+ conf.get('service_ips', '').split(',')\
609+ if x.strip()]
610+
611+ def log_line_parser(self, raw_log):
612+ '''given a raw access log line, return a dict of the good parts'''
613+ d = {}
614+ try:
615+ (_,
616+ server,
617+ client_ip,
618+ lb_ip,
619+ timestamp,
620+ method,
621+ request,
622+ http_version,
623+ code,
624+ referrer,
625+ user_agent,
626+ auth_token,
627+ bytes_in,
628+ bytes_out,
629+ etag,
630+ trans_id,
631+ headers,
632+ processing_time) = (unquote(x) for x in raw_log[16:].split(' '))
633+ if server != self.server_name:
634+ raise ValueError('incorrect server name in log line')
635+ (version,
636+ account,
637+ container_name,
638+ object_name) = split_path(request, 2, 4, True)
639+ if container_name is not None:
640+ container_name = container_name.split('?', 1)[0]
641+ if object_name is not None:
642+ object_name = object_name.split('?', 1)[0]
643+ account = account.split('?', 1)[0]
644+ query = None
645+ if '?' in request:
646+ request, query = request.split('?', 1)
647+ args = query.split('&')
648+ # Count each query argument. This is used later to aggregate
649+ # the number of format, prefix, etc. queries.
650+ for q in args:
651+ if '=' in q:
652+ k, v = q.split('=', 1)
653+ else:
654+ k = q
655+ # Certain keys will get summmed in stats reporting
656+ # (format, path, delimiter, etc.). Save a "1" here
657+ # to indicate that this request is 1 request for
658+ # its respective key.
659+ d[k] = 1
660+ except ValueError:
661+ pass
662+ else:
663+ d['client_ip'] = client_ip
664+ d['lb_ip'] = lb_ip
665+ d['method'] = method
666+ d['request'] = request
667+ if query:
668+ d['query'] = query
669+ d['http_version'] = http_version
670+ d['code'] = code
671+ d['referrer'] = referrer
672+ d['user_agent'] = user_agent
673+ d['auth_token'] = auth_token
674+ d['bytes_in'] = bytes_in
675+ d['bytes_out'] = bytes_out
676+ d['etag'] = etag
677+ d['trans_id'] = trans_id
678+ d['processing_time'] = processing_time
679+ day, month, year, hour, minute, second = timestamp.split('/')
680+ d['day'] = day
681+ month = ('%02s' % month_map.index(month)).replace(' ', '0')
682+ d['month'] = month
683+ d['year'] = year
684+ d['hour'] = hour
685+ d['minute'] = minute
686+ d['second'] = second
687+ d['tz'] = '+0000'
688+ d['account'] = account
689+ d['container_name'] = container_name
690+ d['object_name'] = object_name
691+ d['bytes_out'] = int(d['bytes_out'].replace('-', '0'))
692+ d['bytes_in'] = int(d['bytes_in'].replace('-', '0'))
693+ d['code'] = int(d['code'])
694+ return d
695+
696+ def process(self, obj_stream, account, container, object_name):
697+ '''generate hourly groupings of data from one access log file'''
698+ hourly_aggr_info = {}
699+ for line in obj_stream:
700+ line_data = self.log_line_parser(line)
701+ if not line_data:
702+ continue
703+ account = line_data['account']
704+ container_name = line_data['container_name']
705+ year = line_data['year']
706+ month = line_data['month']
707+ day = line_data['day']
708+ hour = line_data['hour']
709+ bytes_out = line_data['bytes_out']
710+ bytes_in = line_data['bytes_in']
711+ method = line_data['method']
712+ code = int(line_data['code'])
713+ object_name = line_data['object_name']
714+ client_ip = line_data['client_ip']
715+
716+ op_level = None
717+ if not container_name:
718+ op_level = 'account'
719+ elif container_name and not object_name:
720+ op_level = 'container'
721+ elif object_name:
722+ op_level = 'object'
723+
724+ aggr_key = (account, year, month, day, hour)
725+ d = hourly_aggr_info.get(aggr_key, {})
726+ if line_data['lb_ip'] in self.lb_private_ips:
727+ source = 'service'
728+ else:
729+ source = 'public'
730+
731+ if line_data['client_ip'] in self.service_ips:
732+ source = 'service'
733+
734+ d[(source, 'bytes_out')] = d.setdefault((
735+ source, 'bytes_out'), 0) + bytes_out
736+ d[(source, 'bytes_in')] = d.setdefault((source, 'bytes_in'), 0) + \
737+ bytes_in
738+
739+ d['format_query'] = d.setdefault('format_query', 0) + \
740+ line_data.get('format', 0)
741+ d['marker_query'] = d.setdefault('marker_query', 0) + \
742+ line_data.get('marker', 0)
743+ d['prefix_query'] = d.setdefault('prefix_query', 0) + \
744+ line_data.get('prefix', 0)
745+ d['delimiter_query'] = d.setdefault('delimiter_query', 0) + \
746+ line_data.get('delimiter', 0)
747+ path = line_data.get('path', 0)
748+ d['path_query'] = d.setdefault('path_query', 0) + path
749+
750+ code = '%dxx' % (code / 100)
751+ key = (source, op_level, method, code)
752+ d[key] = d.setdefault(key, 0) + 1
753+
754+ hourly_aggr_info[aggr_key] = d
755+ return hourly_aggr_info
756+
757+ def keylist_mapping(self):
758+ source_keys = 'service public'.split()
759+ level_keys = 'account container object'.split()
760+ verb_keys = 'GET PUT POST DELETE HEAD COPY'.split()
761+ code_keys = '2xx 4xx 5xx'.split()
762+
763+ keylist_mapping = {
764+ # <db key> : <row key> or <set of row keys>
765+ 'service_bw_in': ('service', 'bytes_in'),
766+ 'service_bw_out': ('service', 'bytes_out'),
767+ 'public_bw_in': ('public', 'bytes_in'),
768+ 'public_bw_out': ('public', 'bytes_out'),
769+ 'account_requests': set(),
770+ 'container_requests': set(),
771+ 'object_requests': set(),
772+ 'service_request': set(),
773+ 'public_request': set(),
774+ 'ops_count': set(),
775+ }
776+ for verb in verb_keys:
777+ keylist_mapping[verb] = set()
778+ for code in code_keys:
779+ keylist_mapping[code] = set()
780+ for source in source_keys:
781+ for level in level_keys:
782+ for verb in verb_keys:
783+ for code in code_keys:
784+ keylist_mapping['account_requests'].add(
785+ (source, 'account', verb, code))
786+ keylist_mapping['container_requests'].add(
787+ (source, 'container', verb, code))
788+ keylist_mapping['object_requests'].add(
789+ (source, 'object', verb, code))
790+ keylist_mapping['service_request'].add(
791+ ('service', level, verb, code))
792+ keylist_mapping['public_request'].add(
793+ ('public', level, verb, code))
794+ keylist_mapping[verb].add(
795+ (source, level, verb, code))
796+ keylist_mapping[code].add(
797+ (source, level, verb, code))
798+ keylist_mapping['ops_count'].add(
799+ (source, level, verb, code))
800+ return keylist_mapping
801
802=== added file 'swift/stats/account_stats.py'
803--- swift/stats/account_stats.py 1970-01-01 00:00:00 +0000
804+++ swift/stats/account_stats.py 2010-09-23 18:31:58 +0000
805@@ -0,0 +1,100 @@
806+# Copyright (c) 2010 OpenStack, LLC.
807+#
808+# Licensed under the Apache License, Version 2.0 (the "License");
809+# you may not use this file except in compliance with the License.
810+# You may obtain a copy of the License at
811+#
812+# http://www.apache.org/licenses/LICENSE-2.0
813+#
814+# Unless required by applicable law or agreed to in writing, software
815+# distributed under the License is distributed on an "AS IS" BASIS,
816+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
817+# implied.
818+# See the License for the specific language governing permissions and
819+# limitations under the License.
820+
821+import os
822+import time
823+from paste.deploy import appconfig
824+import shutil
825+import hashlib
826+
827+from swift.account.server import DATADIR as account_server_data_dir
828+from swift.common.db import AccountBroker
829+from swift.common.internal_proxy import InternalProxy
830+from swift.common.utils import renamer, get_logger, readconf, mkdirs
831+from swift.common.constraints import check_mount
832+from swift.common.daemon import Daemon
833+
834+
835+class AccountStat(Daemon):
836+ """
837+ Extract storage stats from account databases on the account
838+ storage nodes
839+ """
840+
841+ def __init__(self, stats_conf):
842+ super(AccountStat, self).__init__(stats_conf)
843+ target_dir = stats_conf.get('log_dir', '/var/log/swift')
844+ account_server_conf_loc = stats_conf.get('account_server_conf',
845+ '/etc/swift/account-server.conf')
846+ server_conf = appconfig('config:%s' % account_server_conf_loc,
847+ name='account-server')
848+ filename_format = stats_conf['source_filename_format']
849+ self.filename_format = filename_format
850+ self.target_dir = target_dir
851+ mkdirs(self.target_dir)
852+ self.devices = server_conf.get('devices', '/srv/node')
853+ self.mount_check = server_conf.get('mount_check', 'true').lower() in \
854+ ('true', 't', '1', 'on', 'yes', 'y')
855+ self.logger = get_logger(stats_conf, 'swift-account-stats-logger')
856+
857+ def run_once(self):
858+ self.logger.info("Gathering account stats")
859+ start = time.time()
860+ self.find_and_process()
861+ self.logger.info("Gathering account stats complete (%0.2f minutes)" %
862+ ((time.time() - start) / 60))
863+
864+ def find_and_process(self):
865+ src_filename = time.strftime(self.filename_format)
866+ working_dir = os.path.join(self.target_dir, '.stats_tmp')
867+ shutil.rmtree(working_dir, ignore_errors=True)
868+ tmp_filename = os.path.join(working_dir, src_filename)
869+ hasher = hashlib.md5()
870+ with open(tmp_filename, 'wb') as statfile:
871+ # csv has the following columns:
872+ # Account Name, Container Count, Object Count, Bytes Used
873+ for device in os.listdir(self.devices):
874+ if self.mount_check and not check_mount(self.devices, device):
875+ self.logger.error("Device %s is not mounted, skipping." %
876+ device)
877+ continue
878+ accounts = os.path.join(self.devices,
879+ device,
880+ account_server_data_dir)
881+ if not os.path.exists(accounts):
882+ self.logger.debug("Path %s does not exist, skipping." %
883+ accounts)
884+ continue
885+ for root, dirs, files in os.walk(accounts, topdown=False):
886+ for filename in files:
887+ if filename.endswith('.db'):
888+ db_path = os.path.join(root, filename)
889+ broker = AccountBroker(db_path)
890+ if not broker.is_deleted():
891+ (account_name,
892+ _, _, _,
893+ container_count,
894+ object_count,
895+ bytes_used,
896+ _, _) = broker.get_info()
897+ line_data = '"%s",%d,%d,%d\n' % (
898+ account_name, container_count,
899+ object_count, bytes_used)
900+ statfile.write(line_data)
901+ hasher.update(line_data)
902+ file_hash = hasher.hexdigest()
903+ src_filename = '_'.join([src_filename, file_hash])
904+ renamer(tmp_filename, os.path.join(self.target_dir, src_filename))
905+ shutil.rmtree(working_dir, ignore_errors=True)
906
907=== added file 'swift/stats/log_processor.py'
908--- swift/stats/log_processor.py 1970-01-01 00:00:00 +0000
909+++ swift/stats/log_processor.py 2010-09-23 18:31:58 +0000
910@@ -0,0 +1,413 @@
911+# Copyright (c) 2010 OpenStack, LLC.
912+#
913+# Licensed under the Apache License, Version 2.0 (the "License");
914+# you may not use this file except in compliance with the License.
915+# You may obtain a copy of the License at
916+#
917+# http://www.apache.org/licenses/LICENSE-2.0
918+#
919+# Unless required by applicable law or agreed to in writing, software
920+# distributed under the License is distributed on an "AS IS" BASIS,
921+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
922+# implied.
923+# See the License for the specific language governing permissions and
924+# limitations under the License.
925+
926+from ConfigParser import ConfigParser
927+import zlib
928+import time
929+import datetime
930+import cStringIO
931+import collections
932+from paste.deploy import appconfig
933+import multiprocessing
934+import Queue
935+import cPickle
936+import hashlib
937+
938+from swift.common.internal_proxy import InternalProxy
939+from swift.common.exceptions import ChunkReadTimeout
940+from swift.common.utils import get_logger, readconf
941+from swift.common.daemon import Daemon
942+
943+
944+class BadFileDownload(Exception):
945+ pass
946+
947+
948+class LogProcessor(object):
949+ """Load plugins, process logs"""
950+
951+ def __init__(self, conf, logger):
952+ stats_conf = conf.get('log-processor', {})
953+
954+ if isinstance(logger, tuple):
955+ self.logger = get_logger(*logger)
956+ else:
957+ self.logger = logger
958+
959+ # load the processing plugins
960+ self.plugins = {}
961+ plugin_prefix = 'log-processor-'
962+ for section in (x for x in conf if x.startswith(plugin_prefix)):
963+ plugin_name = section[len(plugin_prefix):]
964+ plugin_conf = conf.get(section, {})
965+ self.plugins[plugin_name] = plugin_conf
966+ class_path = self.plugins[plugin_name]['class_path']
967+ import_target, class_name = class_path.rsplit('.', 1)
968+ module = __import__(import_target, fromlist=[import_target])
969+ klass = getattr(module, class_name)
970+ self.plugins[plugin_name]['instance'] = klass(plugin_conf)
971+
972+ @property
973+ def internal_proxy(self):
974+ '''Lazy load internal proxy'''
975+ if self._internal_proxy is None:
976+ proxy_server_conf_loc = stats_conf.get('proxy_server_conf',
977+ '/etc/swift/proxy-server.conf')
978+ self.proxy_server_conf = appconfig(
979+ 'config:%s' % proxy_server_conf_loc,
980+ name='proxy-server')
981+ self._internal_proxy = InternalProxy(self.proxy_server_conf,
982+ self.logger,
983+ retries=3)
984+ return self._internal_proxy
985+
986+ def process_one_file(self, plugin_name, account, container, object_name):
987+ # get an iter of the object data
988+ compressed = object_name.endswith('.gz')
989+ stream = self.get_object_data(account, container, object_name,
990+ compressed=compressed)
991+ # look up the correct plugin and send the stream to it
992+ return self.plugins[plugin_name]['instance'].process(stream,
993+ account,
994+ container,
995+ object_name)
996+
997+ def get_data_list(self, start_date=None, end_date=None,
998+ listing_filter=None):
999+ total_list = []
1000+ for plugin_name, data in self.plugins.items():
1001+ account = data['swift_account']
1002+ container = data['container_name']
1003+ listing = self.get_container_listing(account,
1004+ container,
1005+ start_date,
1006+ end_date)
1007+ for object_name in listing:
1008+ # The items in this list end up being passed as positional
1009+ # parameters to process_one_file.
1010+ x = (plugin_name, account, container, object_name)
1011+ if x not in listing_filter:
1012+ total_list.append(x)
1013+ return total_list
1014+
1015+ def get_container_listing(self, swift_account, container_name,
1016+ start_date=None, end_date=None,
1017+ listing_filter=None):
1018+ '''
1019+ Get a container listing, filtered by start_date, end_date, and
1020+ listing_filter. Dates, if given, should be in YYYYMMDDHH format
1021+ '''
1022+ search_key = None
1023+ if start_date is not None:
1024+ date_parts = []
1025+ try:
1026+ year, start_date = start_date[:4], start_date[4:]
1027+ if year:
1028+ date_parts.append(year)
1029+ month, start_date = start_date[:2], start_date[2:]
1030+ if month:
1031+ date_parts.append(month)
1032+ day, start_date = start_date[:2], start_date[2:]
1033+ if day:
1034+ date_parts.append(day)
1035+ hour, start_date = start_date[:2], start_date[2:]
1036+ if hour:
1037+ date_parts.append(hour)
1038+ except IndexError:
1039+ pass
1040+ else:
1041+ search_key = '/'.join(date_parts)
1042+ end_key = None
1043+ if end_date is not None:
1044+ date_parts = []
1045+ try:
1046+ year, end_date = end_date[:4], end_date[4:]
1047+ if year:
1048+ date_parts.append(year)
1049+ month, end_date = end_date[:2], end_date[2:]
1050+ if month:
1051+ date_parts.append(month)
1052+ day, end_date = end_date[:2], end_date[2:]
1053+ if day:
1054+ date_parts.append(day)
1055+ hour, end_date = end_date[:2], end_date[2:]
1056+ if hour:
1057+ date_parts.append(hour)
1058+ except IndexError:
1059+ pass
1060+ else:
1061+ end_key = '/'.join(date_parts)
1062+ container_listing = self.internal_proxy.get_container_list(
1063+ swift_account,
1064+ container_name,
1065+ marker=search_key)
1066+ results = []
1067+ if container_listing is not None:
1068+ if listing_filter is None:
1069+ listing_filter = set()
1070+ for item in container_listing:
1071+ name = item['name']
1072+ if end_key and name > end_key:
1073+ break
1074+ if name not in listing_filter:
1075+ results.append(name)
1076+ return results
1077+
1078+ def get_object_data(self, swift_account, container_name, object_name,
1079+ compressed=False):
1080+ '''reads an object and yields its lines'''
1081+ code, o = self.internal_proxy.get_object(swift_account,
1082+ container_name,
1083+ object_name)
1084+ if code < 200 or code >= 300:
1085+ return
1086+ last_part = ''
1087+ last_compressed_part = ''
1088+ # magic in the following zlib.decompressobj argument is courtesy of
1089+ # Python decompressing gzip chunk-by-chunk
1090+ # http://stackoverflow.com/questions/2423866
1091+ d = zlib.decompressobj(16 + zlib.MAX_WBITS)
1092+ try:
1093+ for chunk in o:
1094+ if compressed:
1095+ try:
1096+ chunk = d.decompress(chunk)
1097+ except zlib.error:
1098+ raise BadFileDownload() # bad compressed data
1099+ parts = chunk.split('\n')
1100+ parts[0] = last_part + parts[0]
1101+ for part in parts[:-1]:
1102+ yield part
1103+ last_part = parts[-1]
1104+ if last_part:
1105+ yield last_part
1106+ except ChunkReadTimeout:
1107+ raise BadFileDownload()
1108+
1109+ def generate_keylist_mapping(self):
1110+ keylist = {}
1111+ for plugin in self.plugins:
1112+ plugin_keylist = self.plugins[plugin]['instance'].keylist_mapping()
1113+ if not plugin_keylist:
1114+ continue
1115+ for k, v in plugin_keylist.items():
1116+ o = keylist.get(k)
1117+ if o:
1118+ if isinstance(o, set):
1119+ if isinstance(v, set):
1120+ o.update(v)
1121+ else:
1122+ o.update([v])
1123+ else:
1124+ o = set(o)
1125+ if isinstance(v, set):
1126+ o.update(v)
1127+ else:
1128+ o.update([v])
1129+ else:
1130+ o = v
1131+ keylist[k] = o
1132+ return keylist
1133+
1134+
1135+class LogProcessorDaemon(Daemon):
1136+ """
1137+ Gather raw log data and farm proccessing to generate a csv that is
1138+ uploaded to swift.
1139+ """
1140+
1141+ def __init__(self, conf):
1142+ c = conf.get('log-processor')
1143+ super(LogProcessorDaemon, self).__init__(c)
1144+ self.total_conf = conf
1145+ self.logger = get_logger(c)
1146+ self.log_processor = LogProcessor(conf, self.logger)
1147+ self.lookback_hours = int(c.get('lookback_hours', '120'))
1148+ self.lookback_window = int(c.get('lookback_window',
1149+ str(self.lookback_hours)))
1150+ self.log_processor_account = c['swift_account']
1151+ self.log_processor_container = c.get('container_name',
1152+ 'log_processing_data')
1153+ self.worker_count = int(c.get('worker_count', '1'))
1154+
1155+ def run_once(self):
1156+ self.logger.info("Beginning log processing")
1157+ start = time.time()
1158+ if self.lookback_hours == 0:
1159+ lookback_start = None
1160+ lookback_end = None
1161+ else:
1162+ delta_hours = datetime.timedelta(hours=self.lookback_hours)
1163+ lookback_start = datetime.datetime.now() - delta_hours
1164+ lookback_start = lookback_start.strftime('%Y%m%d%H')
1165+ if self.lookback_window == 0:
1166+ lookback_end = None
1167+ else:
1168+ delta_window = datetime.timedelta(hours=self.lookback_window)
1169+ lookback_end = datetime.datetime.now() - \
1170+ delta_hours + \
1171+ delta_window
1172+ lookback_end = lookback_end.strftime('%Y%m%d%H')
1173+ self.logger.debug('lookback_start: %s' % lookback_start)
1174+ self.logger.debug('lookback_end: %s' % lookback_end)
1175+ try:
1176+ # Note: this file (or data set) will grow without bound.
1177+ # In practice, if it becomes a problem (say, after many months of
1178+ # running), one could manually prune the file to remove older
1179+ # entries. Automatically pruning on each run could be dangerous.
1180+ # There is not a good way to determine when an old entry should be
1181+ # pruned (lookback_hours could be set to anything and could change)
1182+ processed_files_stream = self.log_processor.get_object_data(
1183+ self.log_processor_account,
1184+ self.log_processor_container,
1185+ 'processed_files.pickle.gz',
1186+ compressed=True)
1187+ buf = '\n'.join(x for x in processed_files_stream)
1188+ if buf:
1189+ already_processed_files = cPickle.loads(buf)
1190+ else:
1191+ already_processed_files = set()
1192+ except:
1193+ already_processed_files = set()
1194+ self.logger.debug('found %d processed files' % \
1195+ len(already_processed_files))
1196+ logs_to_process = self.log_processor.get_data_list(lookback_start,
1197+ lookback_end,
1198+ already_processed_files)
1199+ self.logger.info('loaded %d files to process' % len(logs_to_process))
1200+ if not logs_to_process:
1201+ self.logger.info("Log processing done (%0.2f minutes)" %
1202+ ((time.time() - start) / 60))
1203+ return
1204+
1205+ # map
1206+ processor_args = (self.total_conf, self.logger)
1207+ results = multiprocess_collate(processor_args, logs_to_process,
1208+ self.worker_count)
1209+
1210+ #reduce
1211+ aggr_data = {}
1212+ processed_files = already_processed_files
1213+ for item, data in results:
1214+ # since item contains the plugin and the log name, new plugins will
1215+ # "reprocess" the file and the results will be in the final csv.
1216+ processed_files.add(item)
1217+ for k, d in data.items():
1218+ existing_data = aggr_data.get(k, {})
1219+ for i, j in d.items():
1220+ current = existing_data.get(i, 0)
1221+ # merging strategy for key collisions is addition
1222+ # processing plugins need to realize this
1223+ existing_data[i] = current + j
1224+ aggr_data[k] = existing_data
1225+
1226+ # group
1227+ # reduce a large number of keys in aggr_data[k] to a small number of
1228+ # output keys
1229+ keylist_mapping = self.log_processor.generate_keylist_mapping()
1230+ final_info = collections.defaultdict(dict)
1231+ for account, data in aggr_data.items():
1232+ for key, mapping in keylist_mapping.items():
1233+ if isinstance(mapping, (list, set)):
1234+ value = 0
1235+ for k in mapping:
1236+ try:
1237+ value += data[k]
1238+ except KeyError:
1239+ pass
1240+ else:
1241+ try:
1242+ value = data[mapping]
1243+ except KeyError:
1244+ value = 0
1245+ final_info[account][key] = value
1246+
1247+ # output
1248+ sorted_keylist_mapping = sorted(keylist_mapping)
1249+ columns = 'data_ts,account,' + ','.join(sorted_keylist_mapping)
1250+ out_buf = [columns]
1251+ for (account, year, month, day, hour), d in final_info.items():
1252+ data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour)
1253+ row = [data_ts]
1254+ row.append('%s' % account)
1255+ for k in sorted_keylist_mapping:
1256+ row.append('%s' % d[k])
1257+ out_buf.append(','.join(row))
1258+ out_buf = '\n'.join(out_buf)
1259+ h = hashlib.md5(out_buf).hexdigest()
1260+ upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h
1261+ f = cStringIO.StringIO(out_buf)
1262+ self.log_processor.internal_proxy.upload_file(f,
1263+ self.log_processor_account,
1264+ self.log_processor_container,
1265+ upload_name)
1266+
1267+ # cleanup
1268+ s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
1269+ f = cStringIO.StringIO(s)
1270+ self.log_processor.internal_proxy.upload_file(f,
1271+ self.log_processor_account,
1272+ self.log_processor_container,
1273+ 'processed_files.pickle.gz')
1274+
1275+ self.logger.info("Log processing done (%0.2f minutes)" %
1276+ ((time.time() - start) / 60))
1277+
1278+
1279+def multiprocess_collate(processor_args, logs_to_process, worker_count):
1280+ '''yield hourly data from logs_to_process'''
1281+ results = []
1282+ in_queue = multiprocessing.Queue()
1283+ out_queue = multiprocessing.Queue()
1284+ for _ in range(worker_count):
1285+ p = multiprocessing.Process(target=collate_worker,
1286+ args=(processor_args,
1287+ in_queue,
1288+ out_queue))
1289+ p.start()
1290+ results.append(p)
1291+ for x in logs_to_process:
1292+ in_queue.put(x)
1293+ for _ in range(worker_count):
1294+ in_queue.put(None)
1295+ count = 0
1296+ while True:
1297+ try:
1298+ item, data = out_queue.get_nowait()
1299+ count += 1
1300+ if data:
1301+ yield item, data
1302+ if count >= len(logs_to_process):
1303+ # this implies that one result will come from every request
1304+ break
1305+ except Queue.Empty:
1306+ time.sleep(.1)
1307+ for r in results:
1308+ r.join()
1309+
1310+
1311+def collate_worker(processor_args, in_queue, out_queue):
1312+ '''worker process for multiprocess_collate'''
1313+ p = LogProcessor(*processor_args)
1314+ while True:
1315+ try:
1316+ item = in_queue.get_nowait()
1317+ if item is None:
1318+ break
1319+ except Queue.Empty:
1320+ time.sleep(.1)
1321+ else:
1322+ ret = p.process_one_file(*item)
1323+ out_queue.put((item, ret))
1324
1325=== added file 'swift/stats/log_uploader.py'
1326--- swift/stats/log_uploader.py 1970-01-01 00:00:00 +0000
1327+++ swift/stats/log_uploader.py 2010-09-23 18:31:58 +0000
1328@@ -0,0 +1,170 @@
1329+# Copyright (c) 2010 OpenStack, LLC.
1330+#
1331+# Licensed under the Apache License, Version 2.0 (the "License");
1332+# you may not use this file except in compliance with the License.
1333+# You may obtain a copy of the License at
1334+#
1335+# http://www.apache.org/licenses/LICENSE-2.0
1336+#
1337+# Unless required by applicable law or agreed to in writing, software
1338+# distributed under the License is distributed on an "AS IS" BASIS,
1339+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
1340+# implied.
1341+# See the License for the specific language governing permissions and
1342+# limitations under the License.
1343+
1344+from __future__ import with_statement
1345+import os
1346+import hashlib
1347+import time
1348+import gzip
1349+import glob
1350+from paste.deploy import appconfig
1351+
1352+from swift.common.internal_proxy import InternalProxy
1353+from swift.common.daemon import Daemon
1354+from swift.common import utils
1355+
1356+
1357+class LogUploader(Daemon):
1358+ '''
1359+ Given a local directory, a swift account, and a container name, LogParser
1360+ will upload all files in the local directory to the given account/
1361+ container. All but the newest files will be uploaded, and the files' md5
1362+ sum will be computed. The hash is used to prevent duplicate data from
1363+ being uploaded multiple times in different files (ex: log lines). Since
1364+ the hash is computed, it is also used as the uploaded object's etag to
1365+ ensure data integrity.
1366+
1367+ Note that after the file is successfully uploaded, it will be unlinked.
1368+
1369+ The given proxy server config is used to instantiate a proxy server for
1370+ the object uploads.
1371+ '''
1372+
1373+ def __init__(self, uploader_conf, plugin_name):
1374+ super(LogUploader, self).__init__(uploader_conf)
1375+ log_dir = uploader_conf.get('log_dir', '/var/log/swift/')
1376+ swift_account = uploader_conf['swift_account']
1377+ container_name = uploader_conf['container_name']
1378+ source_filename_format = uploader_conf['source_filename_format']
1379+ proxy_server_conf_loc = uploader_conf.get('proxy_server_conf',
1380+ '/etc/swift/proxy-server.conf')
1381+ proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc,
1382+ name='proxy-server')
1383+ new_log_cutoff = int(uploader_conf.get('new_log_cutoff', '7200'))
1384+ unlink_log = uploader_conf.get('unlink_log', 'True').lower() in \
1385+ ('true', 'on', '1', 'yes')
1386+ self.unlink_log = unlink_log
1387+ self.new_log_cutoff = new_log_cutoff
1388+ if not log_dir.endswith('/'):
1389+ log_dir = log_dir + '/'
1390+ self.log_dir = log_dir
1391+ self.swift_account = swift_account
1392+ self.container_name = container_name
1393+ self.filename_format = source_filename_format
1394+ self.internal_proxy = InternalProxy(proxy_server_conf)
1395+ log_name = 'swift-log-uploader-%s' % plugin_name
1396+ self.logger = utils.get_logger(uploader_conf, plugin_name)
1397+
1398+ def run_once(self):
1399+ self.logger.info("Uploading logs")
1400+ start = time.time()
1401+ self.upload_all_logs()
1402+ self.logger.info("Uploading logs complete (%0.2f minutes)" %
1403+ ((time.time() - start) / 60))
1404+
1405+ def upload_all_logs(self):
1406+ i = [(self.filename_format.index(c), c) for c in '%Y %m %d %H'.split()]
1407+ i.sort()
1408+ year_offset = month_offset = day_offset = hour_offset = None
1409+ base_offset = len(self.log_dir)
1410+ for start, c in i:
1411+ offset = base_offset + start
1412+ if c == '%Y':
1413+ year_offset = offset, offset + 4
1414+ # Add in the difference between len(%Y) and the expanded
1415+ # version of %Y (????). This makes sure the codes after this
1416+ # one will align properly in the final filename.
1417+ base_offset += 2
1418+ elif c == '%m':
1419+ month_offset = offset, offset + 2
1420+ elif c == '%d':
1421+ day_offset = offset, offset + 2
1422+ elif c == '%H':
1423+ hour_offset = offset, offset + 2
1424+ if not (year_offset and month_offset and day_offset and hour_offset):
1425+ # don't have all the parts, can't upload anything
1426+ return
1427+ glob_pattern = self.filename_format
1428+ glob_pattern = glob_pattern.replace('%Y', '????', 1)
1429+ glob_pattern = glob_pattern.replace('%m', '??', 1)
1430+ glob_pattern = glob_pattern.replace('%d', '??', 1)
1431+ glob_pattern = glob_pattern.replace('%H', '??', 1)
1432+ filelist = glob.iglob(os.path.join(self.log_dir, glob_pattern))
1433+ current_hour = int(time.strftime('%H'))
1434+ today = int(time.strftime('%Y%m%d'))
1435+ self.internal_proxy.create_container(self.swift_account,
1436+ self.container_name)
1437+ for filename in filelist:
1438+ try:
1439+ # From the filename, we need to derive the year, month, day,
1440+ # and hour for the file. These values are used in the uploaded
1441+ # object's name, so they should be a reasonably accurate
1442+ # representation of the time for which the data in the file was
1443+ # collected. The file's last modified time is not a reliable
1444+ # representation of the data in the file. For example, an old
1445+ # log file (from hour A) may be uploaded or moved into the
1446+ # log_dir in hour Z. The file's modified time will be for hour
1447+ # Z, and therefore the object's name in the system will not
1448+ # represent the data in it.
1449+ # If the filename doesn't match the format, it shouldn't be
1450+ # uploaded.
1451+ year = filename[slice(*year_offset)]
1452+ month = filename[slice(*month_offset)]
1453+ day = filename[slice(*day_offset)]
1454+ hour = filename[slice(*hour_offset)]
1455+ except IndexError:
1456+ # unexpected filename format, move on
1457+ self.logger.error("Unexpected log: %s" % filename)
1458+ continue
1459+ if ((time.time() - os.stat(filename).st_mtime) <
1460+ self.new_log_cutoff):
1461+ # don't process very new logs
1462+ self.logger.debug(
1463+ "Skipping log: %s (< %d seconds old)" % (filename,
1464+ self.new_log_cutoff))
1465+ continue
1466+ self.upload_one_log(filename, year, month, day, hour)
1467+
1468+ def upload_one_log(self, filename, year, month, day, hour):
1469+ if os.path.getsize(filename) == 0:
1470+ self.logger.debug("Log %s is 0 length, skipping" % filename)
1471+ return
1472+ self.logger.debug("Processing log: %s" % filename)
1473+ filehash = hashlib.md5()
1474+ already_compressed = True if filename.endswith('.gz') else False
1475+ opener = gzip.open if already_compressed else open
1476+ f = opener(filename, 'rb')
1477+ try:
1478+ for line in f:
1479+ # filter out bad lines here?
1480+ filehash.update(line)
1481+ finally:
1482+ f.close()
1483+ filehash = filehash.hexdigest()
1484+ # By adding a hash to the filename, we ensure that uploaded files
1485+ # have unique filenames and protect against uploading one file
1486+ # more than one time. By using md5, we get an etag for free.
1487+ target_filename = '/'.join([year, month, day, hour, filehash + '.gz'])
1488+ if self.internal_proxy.upload_file(filename,
1489+ self.swift_account,
1490+ self.container_name,
1491+ target_filename,
1492+ compress=(not already_compressed)):
1493+ self.logger.debug("Uploaded log %s to %s" %
1494+ (filename, target_filename))
1495+ if self.unlink_log:
1496+ os.unlink(filename)
1497+ else:
1498+ self.logger.error("ERROR: Upload of log %s failed!" % filename)
1499
1500=== added file 'swift/stats/stats_processor.py'
1501--- swift/stats/stats_processor.py 1970-01-01 00:00:00 +0000
1502+++ swift/stats/stats_processor.py 2010-09-23 18:31:58 +0000
1503@@ -0,0 +1,65 @@
1504+# Copyright (c) 2010 OpenStack, LLC.
1505+#
1506+# Licensed under the Apache License, Version 2.0 (the "License");
1507+# you may not use this file except in compliance with the License.
1508+# You may obtain a copy of the License at
1509+#
1510+# http://www.apache.org/licenses/LICENSE-2.0
1511+#
1512+# Unless required by applicable law or agreed to in writing, software
1513+# distributed under the License is distributed on an "AS IS" BASIS,
1514+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
1515+# implied.
1516+# See the License for the specific language governing permissions and
1517+# limitations under the License.
1518+
1519+
1520+class StatsLogProcessor(object):
1521+ """Transform account storage stat logs"""
1522+
1523+ def __init__(self, conf):
1524+ pass
1525+
1526+ def process(self, obj_stream, account, container, object_name):
1527+ '''generate hourly groupings of data from one stats log file'''
1528+ account_totals = {}
1529+ year, month, day, hour, _ = object_name.split('/')
1530+ for line in obj_stream:
1531+ if not line:
1532+ continue
1533+ try:
1534+ (account,
1535+ container_count,
1536+ object_count,
1537+ bytes_used) = line.split(',')
1538+ account = account.strip('"')
1539+ container_count = int(container_count.strip('"'))
1540+ object_count = int(object_count.strip('"'))
1541+ bytes_used = int(bytes_used.strip('"'))
1542+ aggr_key = (account, year, month, day, hour)
1543+ d = account_totals.get(aggr_key, {})
1544+ d['replica_count'] = d.setdefault('replica_count', 0) + 1
1545+ d['container_count'] = d.setdefault('container_count', 0) + \
1546+ container_count
1547+ d['object_count'] = d.setdefault('object_count', 0) + \
1548+ object_count
1549+ d['bytes_used'] = d.setdefault('bytes_used', 0) + \
1550+ bytes_used
1551+ account_totals[aggr_key] = d
1552+ except (IndexError, ValueError):
1553+ # bad line data
1554+ pass
1555+ return account_totals
1556+
1557+ def keylist_mapping(self):
1558+ '''
1559+ returns a dictionary of final keys mapped to source keys
1560+ '''
1561+ keylist_mapping = {
1562+ # <db key> : <row key> or <set of row keys>
1563+ 'bytes_used': 'bytes_used',
1564+ 'container_count': 'container_count',
1565+ 'object_count': 'object_count',
1566+ 'replica_count': 'replica_count',
1567+ }
1568+ return keylist_mapping
1569
1570=== added directory 'test/unit/stats'
1571=== added file 'test/unit/stats/__init__.py'
1572=== added file 'test/unit/stats/test_log_processor.py'
1573--- test/unit/stats/test_log_processor.py 1970-01-01 00:00:00 +0000
1574+++ test/unit/stats/test_log_processor.py 2010-09-23 18:31:58 +0000
1575@@ -0,0 +1,161 @@
1576+import unittest
1577+
1578+from swift.stats import log_processor
1579+
1580+class DumbLogger(object):
1581+ def __getattr__(self, n):
1582+ return self.foo
1583+
1584+ def foo(self, *a, **kw):
1585+ pass
1586+
1587+class DumbInternalProxy(object):
1588+ def get_container_list(self, account, container, marker=None):
1589+ n = '2010/03/14/13/obj1'
1590+ if marker is None or n > marker:
1591+ return [{'name': n}]
1592+ else:
1593+ return []
1594+
1595+ def get_object(self, account, container, object_name):
1596+ code = 200
1597+ if object_name.endswith('.gz'):
1598+ # same data as below, compressed with gzip -9
1599+ def data():
1600+ yield '\x1f\x8b\x08'
1601+ yield '\x08"\xd79L'
1602+ yield '\x02\x03te'
1603+ yield 'st\x00\xcbO'
1604+ yield '\xca\xe2JI,I'
1605+ yield '\xe4\x02\x00O\xff'
1606+ yield '\xa3Y\t\x00\x00\x00'
1607+ else:
1608+ def data():
1609+ yield 'obj\n'
1610+ yield 'data'
1611+ return code, data()
1612+
1613+class TestLogProcessor(unittest.TestCase):
1614+
1615+ access_test_line = 'Jul 9 04:14:30 saio proxy 1.2.3.4 4.5.6.7 '\
1616+ '09/Jul/2010/04/14/30 GET '\
1617+ '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\
1618+ 'curl tk4e350daf-9338-4cc6-aabb-090e49babfbd '\
1619+ '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262'
1620+ stats_test_line = 'account,1,2,3'
1621+ proxy_config = {'log-processor': {
1622+
1623+ }
1624+ }
1625+
1626+ def test_access_log_line_parser(self):
1627+ access_proxy_config = self.proxy_config
1628+ access_proxy_config.update({
1629+ 'log-processor-access': {
1630+ 'source_filename_format':'%Y%m%d%H*',
1631+ 'class_path':
1632+ 'swift.stats.access_processor.AccessLogProcessor'
1633+ }})
1634+ p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
1635+ result = p.plugins['access']['instance'].log_line_parser(self.access_test_line)
1636+ self.assertEquals(result, {'code': 200,
1637+ 'processing_time': '0.0262',
1638+ 'auth_token': 'tk4e350daf-9338-4cc6-aabb-090e49babfbd',
1639+ 'month': '07',
1640+ 'second': '30',
1641+ 'year': '2010',
1642+ 'query': 'format=json&foo',
1643+ 'tz': '+0000',
1644+ 'http_version': 'HTTP/1.0',
1645+ 'object_name': 'bar',
1646+ 'etag': '-',
1647+ 'foo': 1,
1648+ 'method': 'GET',
1649+ 'trans_id': 'txfa431231-7f07-42fd-8fc7-7da9d8cc1f90',
1650+ 'client_ip': '1.2.3.4',
1651+ 'format': 1,
1652+ 'bytes_out': 95,
1653+ 'container_name': 'foo',
1654+ 'day': '09',
1655+ 'minute': '14',
1656+ 'account': 'acct',
1657+ 'hour': '04',
1658+ 'referrer': '-',
1659+ 'request': '/v1/acct/foo/bar',
1660+ 'user_agent': 'curl',
1661+ 'bytes_in': 6,
1662+ 'lb_ip': '4.5.6.7'})
1663+
1664+ def test_process_one_access_file(self):
1665+ access_proxy_config = self.proxy_config
1666+ access_proxy_config.update({
1667+ 'log-processor-access': {
1668+ 'source_filename_format':'%Y%m%d%H*',
1669+ 'class_path':
1670+ 'swift.stats.access_processor.AccessLogProcessor'
1671+ }})
1672+ p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
1673+ def get_object_data(*a, **kw):
1674+ return [self.access_test_line]
1675+ p.get_object_data = get_object_data
1676+ result = p.process_one_file('access', 'a', 'c', 'o')
1677+ expected = {('acct', '2010', '07', '09', '04'):
1678+ {('public', 'object', 'GET', '2xx'): 1,
1679+ ('public', 'bytes_out'): 95,
1680+ 'marker_query': 0,
1681+ 'format_query': 1,
1682+ 'delimiter_query': 0,
1683+ 'path_query': 0,
1684+ ('public', 'bytes_in'): 6,
1685+ 'prefix_query': 0}}
1686+ self.assertEquals(result, expected)
1687+
1688+ def test_get_container_listing(self):
1689+ p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
1690+ p._internal_proxy = DumbInternalProxy()
1691+ result = p.get_container_listing('a', 'foo')
1692+ expected = ['2010/03/14/13/obj1']
1693+ self.assertEquals(result, expected)
1694+ result = p.get_container_listing('a', 'foo', listing_filter=expected)
1695+ expected = []
1696+ self.assertEquals(result, expected)
1697+ result = p.get_container_listing('a', 'foo', start_date='2010031412',
1698+ end_date='2010031414')
1699+ expected = ['2010/03/14/13/obj1']
1700+ self.assertEquals(result, expected)
1701+ result = p.get_container_listing('a', 'foo', start_date='2010031414')
1702+ expected = []
1703+ self.assertEquals(result, expected)
1704+ result = p.get_container_listing('a', 'foo', start_date='2010031410',
1705+ end_date='2010031412')
1706+ expected = []
1707+ self.assertEquals(result, expected)
1708+
1709+ def test_get_object_data(self):
1710+ p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
1711+ p._internal_proxy = DumbInternalProxy()
1712+ result = list(p.get_object_data('a', 'c', 'o', False))
1713+ expected = ['obj','data']
1714+ self.assertEquals(result, expected)
1715+ result = list(p.get_object_data('a', 'c', 'o.gz', True))
1716+ self.assertEquals(result, expected)
1717+
1718+ def test_get_stat_totals(self):
1719+ stats_proxy_config = self.proxy_config
1720+ stats_proxy_config.update({
1721+ 'log-processor-stats': {
1722+ 'class_path':
1723+ 'swift.stats.stats_processor.StatsLogProcessor'
1724+ }})
1725+ p = log_processor.LogProcessor(stats_proxy_config, DumbLogger())
1726+ p._internal_proxy = DumbInternalProxy()
1727+ def get_object_data(*a,**kw):
1728+ return [self.stats_test_line]
1729+ p.get_object_data = get_object_data
1730+ result = p.process_one_file('stats', 'a', 'c', 'y/m/d/h/o')
1731+ expected = {('account', 'y', 'm', 'd', 'h'):
1732+ {'replica_count': 1,
1733+ 'object_count': 2,
1734+ 'container_count': 1,
1735+ 'bytes_used': 3}}
1736+ self.assertEquals(result, expected)