Merge lp:~notmyname/swift/stats_system into lp:~hudson-openstack/swift/trunk

Proposed by John Dickinson
Status: Superseded
Proposed branch: lp:~notmyname/swift/stats_system
Merge into: lp:~hudson-openstack/swift/trunk
Diff against target: 1571 lines (+1464/-11)
14 files modified
bin/swift-account-stats-logger (+27/-0)
bin/swift-log-stats-collector (+27/-0)
bin/swift-log-uploader (+31/-0)
etc/log-processing.conf-sample (+34/-0)
swift/common/compressed_file_reader.py (+72/-0)
swift/common/daemon.py (+5/-2)
swift/common/internal_proxy.py (+182/-0)
swift/common/utils.py (+16/-9)
swift/stats/access_processor.py (+224/-0)
swift/stats/account_stats.py (+86/-0)
swift/stats/log_processor.py (+376/-0)
swift/stats/log_uploader.py (+160/-0)
swift/stats/stats_processor.py (+63/-0)
test/unit/stats/test_log_processor.py (+161/-0)
To merge this branch: bzr merge lp:~notmyname/swift/stats_system
Reviewer Review Type Date Requested Status
Swift Core security contacts Pending
Review via email: mp+35156@code.launchpad.net

This proposal supersedes a proposal from 2010-09-07.

This proposal has been superseded by a proposal from 2010-09-15.

Description of the change

work in progress of moving the existing internal (pre-openstack) swift stats system to (openstack) swift

To post a comment you must log in.
Revision history for this message
Chuck Thier (cthier) wrote : Posted in a previous version of this proposal

Just a couple of quick comments:

1. The bins should be refactored to be more DRY like the current bins
2. pep8 :)

Revision history for this message
John Dickinson (notmyname) wrote : Posted in a previous version of this proposal

updated to be more DRY compatible

lp:~notmyname/swift/stats_system updated
70. By John Dickinson

added execute perms to stats processor binaries

71. By John Dickinson

updated config file loading to work with paste.deploy configs

72. By John Dickinson

fixed typos

73. By John Dickinson

fixed internal proxy loading

74. By John Dickinson

made a memcache stub for the internal proxy server

75. By John Dickinson

fixed internal proxy put_container reference

76. By John Dickinson

fixed some log uploading glob patterns

77. By John Dickinson

fixed bug in calculating offsets for filename patterns

78. By John Dickinson

fixed typos in log processor

79. By John Dickinson

fixed get_data_list in log_processor

80. By John Dickinson

added some debug output

81. By John Dickinson

fixed logging and log uploading

82. By John Dickinson

fixed copy/paste errors and missing imports

83. By John Dickinson

added error handling and missing return statement

84. By John Dickinson

handled some typos and better handling of missing data in internal proxy

85. By John Dickinson

fixed tests, typos, and added error handling

86. By John Dickinson

fixed bug in account stats log processing

87. By John Dickinson

fixed listing filter in log processing

88. By John Dickinson

fixed stdout capturing for generating csv files

89. By John Dickinson

fixed replica count reporting error

90. By John Dickinson

fixed lookback in log processor

91. By John Dickinson

merged with trunk

92. By John Dickinson

merged with trunk

93. By John Dickinson

fixed tests to account for changed key name

94. By John Dickinson

fixed test bug

95. By John Dickinson

updated with changes and suggestions from code review

96. By John Dickinson

merged with changes from trunk

97. By John Dickinson

added stats overview

98. By John Dickinson

updated with changes from trunk

99. By John Dickinson

added additional docs

100. By John Dickinson

documentation clarification and pep8 fixes

101. By John Dickinson

added overview stats to the doc index

102. By John Dickinson

made long lines wrap (grr pep8)

103. By John Dickinson

merged with trunk

104. By John Dickinson

merged with trunk

105. By John Dickinson

merged with trunk

106. By John Dickinson

fixed stats docs

107. By John Dickinson

added a bad lines check to the access log parser

108. By John Dickinson

added tests for compressing file reader

109. By John Dickinson

fixed compressing file reader test

110. By John Dickinson

fixed compressing file reader test

111. By John Dickinson

improved compressing file reader test

112. By John Dickinson

fixed compressing file reader test

113. By John Dickinson

made try/except much less inclusive in access log processor

114. By John Dickinson

added keylist mapping tests and fixed other tests

115. By John Dickinson

updated stats system tests

116. By John Dickinson

merged with gholt's test stubs

117. By John Dickinson

updated SAIO instructions for the stats system

118. By John Dickinson

fixed stats system saio docs

119. By John Dickinson

fixed stats system saio docs

120. By John Dickinson

added openstack copyright/license to test_log_processor.py

121. By John Dickinson

improved logging in log processors

122. By John Dickinson

merged with trunk

123. By John Dickinson

fixed missing working directory bug in account stats

124. By John Dickinson

fixed readconf parameter that was broken with a previous merge

125. By John Dickinson

updated setup.py and saio docs for syats system

126. By John Dickinson

added readconf unit test

127. By John Dickinson

updated stats saio docs to create logs with the appropriate permissions

128. By John Dickinson

fixed bug in log processor internal proxy lazy load code

129. By John Dickinson

updated readconf test

130. By John Dickinson

updated readconf test

131. By John Dickinson

updated readconf test

132. By John Dickinson

fixed internal proxy references in log processor

133. By John Dickinson

fixed account stats filename creation

134. By John Dickinson

pep8 tomfoolery

135. By John Dickinson

moved paren

136. By John Dickinson

added lazy load of internal proxy to log processor (you were right clay)

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== added file 'bin/swift-account-stats-logger'
--- bin/swift-account-stats-logger 1970-01-01 00:00:00 +0000
+++ bin/swift-account-stats-logger 2010-09-15 20:11:08 +0000
@@ -0,0 +1,27 @@
1#!/usr/bin/python
2# Copyright (c) 2010 OpenStack, LLC.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import sys
18
19from swift.stats.account_stats import AccountStat
20from swift.common import utils
21
22if __name__ == '__main__':
23 if len(sys.argv) < 2:
24 print "Usage: swift-account-stats-logger CONFIG_FILE"
25 sys.exit()
26 stats_conf = utils.readconf(sys.argv[1], 'log-processor-stats')
27 stats = AccountStat(stats_conf).run(once=True)
028
=== added file 'bin/swift-log-stats-collector'
--- bin/swift-log-stats-collector 1970-01-01 00:00:00 +0000
+++ bin/swift-log-stats-collector 2010-09-15 20:11:08 +0000
@@ -0,0 +1,27 @@
1#!/usr/bin/python
2# Copyright (c) 2010 OpenStack, LLC.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import sys
18
19from swift.stats.log_processor import LogProcessorDaemon
20from swift.common import utils
21
22if __name__ == '__main__':
23 if len(sys.argv) < 2:
24 print "Usage: swift-log-stats-collector CONFIG_FILE"
25 sys.exit()
26 conf = utils.readconf(sys.argv[1], log_name='log-stats-collector')
27 stats = LogProcessorDaemon(conf).run(once=True, capture_stdout=False)
028
=== added file 'bin/swift-log-uploader'
--- bin/swift-log-uploader 1970-01-01 00:00:00 +0000
+++ bin/swift-log-uploader 2010-09-15 20:11:08 +0000
@@ -0,0 +1,31 @@
1#!/usr/bin/python
2# Copyright (c) 2010 OpenStack, LLC.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import sys
18
19from swift.stats.log_uploader import LogUploader
20from swift.common import utils
21
22if __name__ == '__main__':
23 if len(sys.argv) < 3:
24 print "Usage: swift-log-uploader CONFIG_FILE plugin"
25 sys.exit()
26 uploader_conf = utils.readconf(sys.argv[1], 'log-processor')
27 plugin = sys.argv[2]
28 section_name = 'log-processor-%s' % plugin
29 plugin_conf = utils.readconf(sys.argv[1], section_name)
30 uploader_conf.update(plugin_conf)
31 uploader = LogUploader(uploader_conf, plugin).run(once=True)
032
=== added file 'etc/log-processing.conf-sample'
--- etc/log-processing.conf-sample 1970-01-01 00:00:00 +0000
+++ etc/log-processing.conf-sample 2010-09-15 20:11:08 +0000
@@ -0,0 +1,34 @@
1# plugin section format is named "log-processor-<plugin>"
2
3[log-processor]
4swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
5# container_name = log_processing_data
6# proxy_server_conf = /etc/swift/proxy-server.conf
7# log_facility = LOG_LOCAL0
8# log_level = INFO
9# lookback_hours = 120
10# lookback_window = 120
11# user = swift
12
13[log-processor-access]
14# log_dir = /var/log/swift/
15swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
16container_name = log_data
17source_filename_format = access-%Y%m%d%H
18class_path = swift.stats.access_processor.AccessLogProcessor
19# service ips is for client ip addresses that should be counted as servicenet
20# service_ips =
21# load balancer private ips is for load balancer ip addresses that should be
22# counted as servicenet
23# lb_private_ips =
24# server_name = proxy
25# user = swift
26
27[log-processor-stats]
28# log_dir = /var/log/swift/
29swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
30container_name = account_stats
31source_filename_format = stats-%Y%m%d%H
32class_path = swift.stats.stats_processor.StatsLogProcessor
33# account_server_conf = /etc/swift/account-server.conf
34# user = swift
0\ No newline at end of file35\ No newline at end of file
136
=== added file 'swift/common/compressed_file_reader.py'
--- swift/common/compressed_file_reader.py 1970-01-01 00:00:00 +0000
+++ swift/common/compressed_file_reader.py 2010-09-15 20:11:08 +0000
@@ -0,0 +1,72 @@
1# Copyright (c) 2010 OpenStack, LLC.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import zlib
17import struct
18
19
20class CompressedFileReader(object):
21 '''
22 Wraps a file object and provides a read method that returns gzip'd data.
23
24 One warning: if read is called with a small value, the data returned may
25 be bigger than the value. In this case, the "compressed" data will be
26 bigger than the original data. To solve this, use a bigger read buffer.
27
28 An example use case:
29 Given an uncompressed file on disk, provide a way to read compressed data
30 without buffering the entire file data in memory. Using this class, an
31 uncompressed log file could be uploaded as compressed data with chunked
32 transfer encoding.
33
34 gzip header and footer code taken from the python stdlib gzip module
35
36 :param file_obj: File object to read from
37 :param compresslevel: compression level
38 '''
39 def __init__(self, file_obj, compresslevel=9):
40 self._f = file_obj
41 self._compressor = zlib.compressobj(compresslevel,
42 zlib.DEFLATED,
43 -zlib.MAX_WBITS,
44 zlib.DEF_MEM_LEVEL,
45 0)
46 self.done = False
47 self.first = True
48 self.crc32 = 0
49 self.total_size = 0
50
51 def read(self, *a, **kw):
52 if self.done:
53 return ''
54 x = self._f.read(*a, **kw)
55 if x:
56 self.crc32 = zlib.crc32(x, self.crc32) & 0xffffffffL
57 self.total_size += len(x)
58 compressed = self._compressor.compress(x)
59 if not compressed:
60 compressed = self._compressor.flush(zlib.Z_SYNC_FLUSH)
61 else:
62 compressed = self._compressor.flush(zlib.Z_FINISH)
63 crc32 = struct.pack("<L", self.crc32 & 0xffffffffL)
64 size = struct.pack("<L", self.total_size & 0xffffffffL)
65 footer = crc32 + size
66 compressed += footer
67 self.done = True
68 if self.first:
69 self.first = False
70 header = '\037\213\010\000\000\000\000\000\002\377'
71 compressed = header + compressed
72 return compressed
073
=== modified file 'swift/common/daemon.py'
--- swift/common/daemon.py 2010-08-31 23:12:59 +0000
+++ swift/common/daemon.py 2010-09-15 20:11:08 +0000
@@ -33,12 +33,15 @@
33 """Override this to run forever"""33 """Override this to run forever"""
34 raise NotImplementedError('run_forever not implemented')34 raise NotImplementedError('run_forever not implemented')
3535
36 def run(self, once=False):36 def run(self, once=False, capture_stdout=True, capture_stderr=True):
37 """Run the daemon"""37 """Run the daemon"""
38 # log uncaught exceptions38 # log uncaught exceptions
39 sys.excepthook = lambda *exc_info: \39 sys.excepthook = lambda *exc_info: \
40 self.logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)40 self.logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info)
41 sys.stdout = sys.stderr = utils.LoggerFileObject(self.logger)41 if capture_stdout:
42 sys.stdout = utils.LoggerFileObject(self.logger)
43 if capture_stderr:
44 sys.stderr = utils.LoggerFileObject(self.logger)
4245
43 utils.drop_privileges(self.conf.get('user', 'swift'))46 utils.drop_privileges(self.conf.get('user', 'swift'))
4447
4548
=== added file 'swift/common/internal_proxy.py'
--- swift/common/internal_proxy.py 1970-01-01 00:00:00 +0000
+++ swift/common/internal_proxy.py 2010-09-15 20:11:08 +0000
@@ -0,0 +1,182 @@
1# Copyright (c) 2010 OpenStack, LLC.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import webob
17from urllib import quote, unquote
18from json import loads as json_loads
19
20from swift.common.compressed_file_reader import CompressedFileReader
21from swift.proxy.server import BaseApplication
22
23class MemcacheStub(object):
24 def get(self, *a, **kw): return None
25 def set(self, *a, **kw): return None
26 def incr(self, *a, **kw): return 0
27 def delete(self, *a, **kw): return None
28 def set_multi(self, *a, **kw): return None
29 def get_multi(self, *a, **kw): return []
30
31class InternalProxy(object):
32 """
33 Set up a private instance of a proxy server that allows normal requests
34 to be made without having to actually send the request to the proxy.
35 This also doesn't log the requests to the normal proxy logs.
36
37 :param proxy_server_conf: proxy server configuration dictionary
38 :param logger: logger to log requests to
39 :param retries: number of times to retry each request
40 """
41 def __init__(self, proxy_server_conf=None, logger=None, retries=0):
42 self.upload_app = BaseApplication(proxy_server_conf,
43 memcache=MemcacheStub(),
44 logger=logger)
45 self.retries = retries
46
47 def upload_file(self, source_file, account, container, object_name,
48 compress=True, content_type='application/x-gzip'):
49 """
50 Upload a file to cloud files.
51
52 :param source_file: path to or file like object to upload
53 :param account: account to upload to
54 :param container: container to upload to
55 :param object_name: name of object being uploaded
56 :param compress: if True, compresses object as it is uploaded
57 :param content_type: content-type of object
58 :returns: True if successful, False otherwise
59 """
60 log_create_pattern = '/v1/%s/%s/%s' % (account, container, object_name)
61
62 # create the container
63 if not self.create_container(account, container):
64 return False
65
66 # upload the file to the account
67 req = webob.Request.blank(log_create_pattern,
68 environ={'REQUEST_METHOD': 'PUT'},
69 headers={'Transfer-Encoding': 'chunked'})
70 if compress:
71 if hasattr(source_file, 'read'):
72 compressed_file = CompressedFileReader(source_file)
73 else:
74 compressed_file = CompressedFileReader(open(source_file, 'rb'))
75 req.body_file = compressed_file
76 else:
77 if not hasattr(source_file, 'read'):
78 source_file = open(source_file, 'rb')
79 req.body_file = source_file
80 req.account = account
81 req.content_type = content_type
82 req.content_length = None # to make sure we send chunked data
83 resp = self.upload_app.handle_request(self.upload_app.update_request(req))
84 tries = 1
85 while (resp.status_int < 200 or resp.status_int > 299) \
86 and tries <= self.retries:
87 resp = self.upload_app.handle_request(self.upload_app.update_request(req))
88 tries += 1
89 if not (200 <= resp.status_int < 300):
90 return False
91 return True
92
93 def get_object(self, account, container, object_name):
94 """
95 Get object.
96
97 :param account: account name object is in
98 :param container: container name object is in
99 :param object_name: name of object to get
100 :returns: iterator for object data
101 """
102 req = webob.Request.blank('/v1/%s/%s/%s' %
103 (account, container, object_name),
104 environ={'REQUEST_METHOD': 'GET'})
105 req.account = account
106 resp = self.upload_app.handle_request(self.upload_app.update_request(req))
107 tries = 1
108 while (resp.status_int < 200 or resp.status_int > 299) \
109 and tries <= self.retries:
110 resp = self.upload_app.handle_request(self.upload_app.update_request(req))
111 tries += 1
112 return resp.status_int, resp.app_iter
113
114 def create_container(self, account, container):
115 """
116 Create container.
117
118 :param account: account name to put the container in
119 :param container: container name to create
120 :returns: True if successful, otherwise False
121 """
122 req = webob.Request.blank('/v1/%s/%s' % (account, container),
123 environ={'REQUEST_METHOD': 'PUT'})
124 req.account = account
125 resp = self.upload_app.handle_request(self.upload_app.update_request(req))
126 tries = 1
127 while (resp.status_int < 200 or resp.status_int > 299) \
128 and tries <= self.retries:
129 resp = self.upload_app.handle_request(self.upload_app.update_request(req))
130 tries += 1
131 return 200 <= resp.status_int < 300
132
133 def get_container_list(self, account, container, marker=None, limit=None,
134 prefix=None, delimiter=None, full_listing=True):
135 """
136 Get container listing.
137
138 :param account: account name for the container
139 :param container: container name to get the listing of
140 :param marker: marker query
141 :param limit: limit to query
142 :param prefix: prefix query
143 :param delimeter: delimeter for query
144 :param full_listing: if True, make enough requests to get all listings
145 :returns: list of objects
146 """
147 if full_listing:
148 rv = []
149 listing = self.get_container_list(account, container, marker,
150 limit, prefix, delimiter, full_listing=False)
151 while listing:
152 rv.extend(listing)
153 if not delimiter:
154 marker = listing[-1]['name']
155 else:
156 marker = listing[-1].get('name', listing[-1].get('subdir'))
157 listing = self.get_container_list(account, container, marker,
158 limit, prefix, delimiter, full_listing=False)
159 return rv
160 path = '/v1/%s/%s' % (account, container)
161 qs = 'format=json'
162 if marker:
163 qs += '&marker=%s' % quote(marker)
164 if limit:
165 qs += '&limit=%d' % limit
166 if prefix:
167 qs += '&prefix=%s' % quote(prefix)
168 if delimiter:
169 qs += '&delimiter=%s' % quote(delimiter)
170 path += '?%s' % qs
171 req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'GET'})
172 req.account = account
173 resp = self.upload_app.handle_request(self.upload_app.update_request(req))
174 tries = 1
175 while (resp.status_int < 200 or resp.status_int > 299) \
176 and tries <= self.retries:
177 resp = self.upload_app.handle_request(self.upload_app.update_request(req))
178 tries += 1
179 if resp.status_int == 204:
180 return []
181 if 200 <= resp.status_int < 300:
182 return json_loads(resp.body)
0183
=== modified file 'swift/common/utils.py'
--- swift/common/utils.py 2010-09-02 14:58:30 +0000
+++ swift/common/utils.py 2010-09-15 20:11:08 +0000
@@ -530,19 +530,26 @@
530def cache_from_env(env):530def cache_from_env(env):
531 return item_from_env(env, 'swift.cache')531 return item_from_env(env, 'swift.cache')
532532
533def readconf(conf, section_name, log_name=None):533def readconf(conf, section_name=None, log_name=None):
534 c = ConfigParser()534 c = ConfigParser()
535 if not c.read(conf):535 if not c.read(conf):
536 print "Unable to read config file %s" % conf536 print "Unable to read config file %s" % conf
537 sys.exit(1)537 sys.exit(1)
538 if c.has_section(section_name):538 if section_name:
539 conf = dict(c.items(section_name))539 if c.has_section(section_name):
540 conf = dict(c.items(section_name))
541 else:
542 print "Unable to find %s config section in %s" % (section_name, conf)
543 sys.exit(1)
544 if "log_name" not in conf:
545 if log_name is not None:
546 conf['log_name'] = log_name
547 else:
548 conf['log_name'] = section_name
540 else:549 else:
541 print "Unable to find %s config section in %s" % (section_name, conf)550 conf = {}
542 sys.exit(1)551 for s in c.sections():
543 if "log_name" not in conf:552 conf.update({s:dict(c.items(s))})
544 if log_name is not None:553 if 'log_name' not in conf:
545 conf['log_name'] = log_name554 conf['log_name'] = log_name
546 else:
547 conf['log_name'] = section_name
548 return conf555 return conf
549556
=== added directory 'swift/stats'
=== added file 'swift/stats/__init__.py'
=== added file 'swift/stats/access_processor.py'
--- swift/stats/access_processor.py 1970-01-01 00:00:00 +0000
+++ swift/stats/access_processor.py 2010-09-15 20:11:08 +0000
@@ -0,0 +1,224 @@
1# Copyright (c) 2010 OpenStack, LLC.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import collections
17from urllib import unquote
18import copy
19
20from swift.common.utils import split_path
21
22month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split()
23
24class AccessLogProcessor(object):
25
26 def __init__(self, conf):
27 self.server_name = conf.get('server_name', 'proxy')
28 self.lb_private_ips = [x.strip() for x in \
29 conf.get('lb_private_ips', '').split(',')\
30 if x.strip()]
31 self.service_ips = [x.strip() for x in \
32 conf.get('service_ips', '').split(',')\
33 if x.strip()]
34
35 def log_line_parser(self, raw_log):
36 '''given a raw access log line, return a dict of the good parts'''
37 d = {}
38 try:
39 (_,
40 server,
41 client_ip,
42 lb_ip,
43 timestamp,
44 method,
45 request,
46 http_version,
47 code,
48 referrer,
49 user_agent,
50 auth_token,
51 bytes_in,
52 bytes_out,
53 etag,
54 trans_id,
55 headers,
56 processing_time) = (unquote(x) for x in raw_log[16:].split(' '))
57 if server != self.server_name:
58 raise ValueError('incorrect server name in log line')
59 (version,
60 account,
61 container_name,
62 object_name) = split_path(request, 2, 4, True)
63 if container_name is not None:
64 container_name = container_name.split('?', 1)[0]
65 if object_name is not None:
66 object_name = object_name.split('?', 1)[0]
67 account = account.split('?', 1)[0]
68 query = None
69 if '?' in request:
70 request, query = request.split('?', 1)
71 args = query.split('&')
72 # Count each query argument. This is used later to aggregate
73 # the number of format, prefix, etc. queries.
74 for q in args:
75 if '=' in q:
76 k, v = q.split('=', 1)
77 else:
78 k = q
79 # Certain keys will get summmed in stats reporting
80 # (format, path, delimiter, etc.). Save a "1" here
81 # to indicate that this request is 1 request for
82 # its respective key.
83 d[k] = 1
84 except ValueError:
85 pass
86 else:
87 d['client_ip'] = client_ip
88 d['lb_ip'] = lb_ip
89 d['method'] = method
90 d['request'] = request
91 if query:
92 d['query'] = query
93 d['http_version'] = http_version
94 d['code'] = code
95 d['referrer'] = referrer
96 d['user_agent'] = user_agent
97 d['auth_token'] = auth_token
98 d['bytes_in'] = bytes_in
99 d['bytes_out'] = bytes_out
100 d['etag'] = etag
101 d['trans_id'] = trans_id
102 d['processing_time'] = processing_time
103 day, month, year, hour, minute, second = timestamp.split('/')
104 d['day'] = day
105 month = ('%02s' % month_map.index(month)).replace(' ', '0')
106 d['month'] = month
107 d['year'] = year
108 d['hour'] = hour
109 d['minute'] = minute
110 d['second'] = second
111 d['tz'] = '+0000'
112 d['account'] = account
113 d['container_name'] = container_name
114 d['object_name'] = object_name
115 d['bytes_out'] = int(d['bytes_out'].replace('-','0'))
116 d['bytes_in'] = int(d['bytes_in'].replace('-','0'))
117 d['code'] = int(d['code'])
118 return d
119
120 def process(self, obj_stream, account, container, object_name):
121 '''generate hourly groupings of data from one access log file'''
122 hourly_aggr_info = {}
123 for line in obj_stream:
124 line_data = self.log_line_parser(line)
125 if not line_data:
126 continue
127 account = line_data['account']
128 container_name = line_data['container_name']
129 year = line_data['year']
130 month = line_data['month']
131 day = line_data['day']
132 hour = line_data['hour']
133 bytes_out = line_data['bytes_out']
134 bytes_in = line_data['bytes_in']
135 method = line_data['method']
136 code = int(line_data['code'])
137 object_name = line_data['object_name']
138 client_ip = line_data['client_ip']
139
140 op_level = None
141 if not container_name:
142 op_level = 'account'
143 elif container_name and not object_name:
144 op_level = 'container'
145 elif object_name:
146 op_level = 'object'
147
148 aggr_key = (account, year, month, day, hour)
149 d = hourly_aggr_info.get(aggr_key, {})
150 if line_data['lb_ip'] in self.lb_private_ips:
151 source = 'service'
152 else:
153 source = 'public'
154
155 if line_data['client_ip'] in self.service_ips:
156 source = 'service'
157
158 d[(source, 'bytes_out')] = d.setdefault((source, 'bytes_out'), 0) + \
159 bytes_out
160 d[(source, 'bytes_in')] = d.setdefault((source, 'bytes_in'), 0) + \
161 bytes_in
162
163 d['format_query'] = d.setdefault('format_query', 0) + \
164 line_data.get('format', 0)
165 d['marker_query'] = d.setdefault('marker_query', 0) + \
166 line_data.get('marker', 0)
167 d['prefix_query'] = d.setdefault('prefix_query', 0) + \
168 line_data.get('prefix', 0)
169 d['delimiter_query'] = d.setdefault('delimiter_query', 0) + \
170 line_data.get('delimiter', 0)
171 path = line_data.get('path', 0)
172 d['path_query'] = d.setdefault('path_query', 0) + path
173
174 code = '%dxx' % (code/100)
175 key = (source, op_level, method, code)
176 d[key] = d.setdefault(key, 0) + 1
177
178 hourly_aggr_info[aggr_key] = d
179 return hourly_aggr_info
180
181 def keylist_mapping(self):
182 source_keys = 'service public'.split()
183 level_keys = 'account container object'.split()
184 verb_keys = 'GET PUT POST DELETE HEAD COPY'.split()
185 code_keys = '2xx 4xx 5xx'.split()
186
187 keylist_mapping = {
188 # <db key> : <row key> or <set of row keys>
189 'service_bw_in': ('service', 'bytes_in'),
190 'service_bw_out': ('service', 'bytes_out'),
191 'public_bw_in': ('public', 'bytes_in'),
192 'public_bw_out': ('public', 'bytes_out'),
193 'account_requests': set(),
194 'container_requests': set(),
195 'object_requests': set(),
196 'service_request': set(),
197 'public_request': set(),
198 'ops_count': set(),
199 }
200 for verb in verb_keys:
201 keylist_mapping[verb] = set()
202 for code in code_keys:
203 keylist_mapping[code] = set()
204 for source in source_keys:
205 for level in level_keys:
206 for verb in verb_keys:
207 for code in code_keys:
208 keylist_mapping['account_requests'].add(
209 (source, 'account', verb, code))
210 keylist_mapping['container_requests'].add(
211 (source, 'container', verb, code))
212 keylist_mapping['object_requests'].add(
213 (source, 'object', verb, code))
214 keylist_mapping['service_request'].add(
215 ('service', level, verb, code))
216 keylist_mapping['public_request'].add(
217 ('public', level, verb, code))
218 keylist_mapping[verb].add(
219 (source, level, verb, code))
220 keylist_mapping[code].add(
221 (source, level, verb, code))
222 keylist_mapping['ops_count'].add(
223 (source,level,verb,code))
224 return keylist_mapping
0225
=== added file 'swift/stats/account_stats.py'
--- swift/stats/account_stats.py 1970-01-01 00:00:00 +0000
+++ swift/stats/account_stats.py 2010-09-15 20:11:08 +0000
@@ -0,0 +1,86 @@
1# Copyright (c) 2010 OpenStack, LLC.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import os
17import time
18from paste.deploy import appconfig
19
20from swift.account.server import DATADIR as account_server_data_dir
21from swift.common.db import AccountBroker
22from swift.common.internal_proxy import InternalProxy
23from swift.common.utils import renamer, get_logger, readconf
24from swift.common.daemon import Daemon
25
26class AccountStat(Daemon):
27 def __init__(self, stats_conf):
28 super(AccountStat, self).__init__(stats_conf)
29 target_dir = stats_conf.get('log_dir', '/var/log/swift')
30 #TODO: figure out the server configs. also figure out internal_proxy
31 account_server_conf_loc = stats_conf.get('account_server_conf',
32 '/etc/swift/account-server.conf')
33 server_conf = appconfig('config:%s' % account_server_conf_loc,
34 name='account-server')
35 filename_format = stats_conf['source_filename_format']
36 self.filename_format = filename_format
37 self.target_dir = target_dir
38 self.devices = server_conf.get('devices', '/srv/node')
39 self.mount_check = server_conf.get('mount_check', 'true').lower() in \
40 ('true', 't', '1', 'on', 'yes', 'y')
41 self.logger = get_logger(stats_conf, 'swift-account-stats-logger')
42
43 def run_once(self):
44 self.logger.info("Gathering account stats")
45 start = time.time()
46 self.find_and_process()
47 self.logger.info("Gathering account stats complete (%0.2f minutes)" %
48 ((time.time()-start)/60))
49
50 def find_and_process(self):
51 #TODO: handle a counter in the filename to prevent overwrites?
52 src_filename = time.strftime(self.filename_format)
53 #TODO: don't use /tmp?
54 tmp_filename = os.path.join('/tmp', src_filename)
55 with open(tmp_filename, 'wb') as statfile:
56 #statfile.write('Account Name, Container Count, Object Count, Bytes Used\n')
57 for device in os.listdir(self.devices):
58 if self.mount_check and \
59 not os.path.ismount(os.path.join(self.devices, device)):
60 self.logger.error("Device %s is not mounted, skipping." %
61 device)
62 continue
63 accounts = os.path.join(self.devices,
64 device,
65 account_server_data_dir)
66 if not os.path.exists(accounts):
67 self.logger.debug("Path %s does not exist, skipping." %
68 accounts)
69 continue
70 for root, dirs, files in os.walk(accounts, topdown=False):
71 for filename in files:
72 if filename.endswith('.db'):
73 broker = AccountBroker(os.path.join(root, filename))
74 if not broker.is_deleted():
75 (account_name,
76 _, _, _,
77 container_count,
78 object_count,
79 bytes_used,
80 _, _) = broker.get_info()
81 line_data = '"%s",%d,%d,%d\n' % (account_name,
82 container_count,
83 object_count,
84 bytes_used)
85 statfile.write(line_data)
86 renamer(tmp_filename, os.path.join(self.target_dir, src_filename))
087
=== added file 'swift/stats/log_processor.py'
--- swift/stats/log_processor.py 1970-01-01 00:00:00 +0000
+++ swift/stats/log_processor.py 2010-09-15 20:11:08 +0000
@@ -0,0 +1,376 @@
1# Copyright (c) 2010 OpenStack, LLC.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16from ConfigParser import ConfigParser
17import zlib
18import time
19import datetime
20import cStringIO
21import collections
22from paste.deploy import appconfig
23import multiprocessing
24import Queue
25import cPickle
26
27from swift.common.internal_proxy import InternalProxy
28from swift.common.exceptions import ChunkReadTimeout
29from swift.common.utils import get_logger, readconf
30from swift.common.daemon import Daemon
31
32class BadFileDownload(Exception):
33 pass
34
35class LogProcessor(object):
36
37 def __init__(self, conf, logger):
38 stats_conf = conf.get('log-processor', {})
39
40 proxy_server_conf_loc = stats_conf.get('proxy_server_conf',
41 '/etc/swift/proxy-server.conf')
42 self.proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc,
43 name='proxy-server')
44 if isinstance(logger, tuple):
45 self.logger = get_logger(*logger)
46 else:
47 self.logger = logger
48 self.internal_proxy = InternalProxy(self.proxy_server_conf,
49 self.logger,
50 retries=3)
51
52 # load the processing plugins
53 self.plugins = {}
54 plugin_prefix = 'log-processor-'
55 for section in (x for x in conf if x.startswith(plugin_prefix)):
56 plugin_name = section[len(plugin_prefix):]
57 plugin_conf = conf.get(section, {})
58 self.plugins[plugin_name] = plugin_conf
59 import_target, class_name = plugin_conf['class_path'].rsplit('.', 1)
60 module = __import__(import_target, fromlist=[import_target])
61 klass = getattr(module, class_name)
62 self.plugins[plugin_name]['instance'] = klass(plugin_conf)
63
64 def process_one_file(self, plugin_name, account, container, object_name):
65 # get an iter of the object data
66 compressed = object_name.endswith('.gz')
67 stream = self.get_object_data(account, container, object_name,
68 compressed=compressed)
69 # look up the correct plugin and send the stream to it
70 return self.plugins[plugin_name]['instance'].process(stream,
71 account,
72 container,
73 object_name)
74
75 def get_data_list(self, start_date=None, end_date=None, listing_filter=None):
76 total_list = []
77 for name, data in self.plugins.items():
78 account = data['swift_account']
79 container = data['container_name']
80 l = self.get_container_listing(account,
81 container,
82 start_date,
83 end_date)
84 for i in l:
85 # The items in this list end up being passed as positional
86 # parameters to process_one_file.
87 x = (name, account, container, i)
88 if x not in listing_filter:
89 total_list.append(x)
90 return total_list
91
92 def get_container_listing(self, swift_account, container_name, start_date=None,
93 end_date=None, listing_filter=None):
94 '''
95 Get a container listing, filtered by start_date, end_date, and
96 listing_filter. Dates, if given, should be in YYYYMMDDHH format
97 '''
98 search_key = None
99 if start_date is not None:
100 date_parts = []
101 try:
102 year, start_date = start_date[:4], start_date[4:]
103 if year:
104 date_parts.append(year)
105 month, start_date = start_date[:2], start_date[2:]
106 if month:
107 date_parts.append(month)
108 day, start_date = start_date[:2], start_date[2:]
109 if day:
110 date_parts.append(day)
111 hour, start_date = start_date[:2], start_date[2:]
112 if hour:
113 date_parts.append(hour)
114 except IndexError:
115 pass
116 else:
117 search_key = '/'.join(date_parts)
118 end_key = None
119 if end_date is not None:
120 date_parts = []
121 try:
122 year, end_date = end_date[:4], end_date[4:]
123 if year:
124 date_parts.append(year)
125 month, end_date = end_date[:2], end_date[2:]
126 if month:
127 date_parts.append(month)
128 day, end_date = end_date[:2], end_date[2:]
129 if day:
130 date_parts.append(day)
131 hour, end_date = end_date[:2], end_date[2:]
132 if hour:
133 date_parts.append(hour)
134 except IndexError:
135 pass
136 else:
137 end_key = '/'.join(date_parts)
138 container_listing = self.internal_proxy.get_container_list(
139 swift_account,
140 container_name,
141 marker=search_key)
142 results = []
143 if container_listing is not None:
144 if listing_filter is None:
145 listing_filter = set()
146 for item in container_listing:
147 name = item['name']
148 if end_key and name > end_key:
149 break
150 if name not in listing_filter:
151 results.append(name)
152 return results
153
154 def get_object_data(self, swift_account, container_name, object_name,
155 compressed=False):
156 '''reads an object and yields its lines'''
157 code, o = self.internal_proxy.get_object(swift_account,
158 container_name,
159 object_name)
160 if code < 200 or code >= 300:
161 return
162 last_part = ''
163 last_compressed_part = ''
164 # magic in the following zlib.decompressobj argument is courtesy of
165 # http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk
166 d = zlib.decompressobj(16+zlib.MAX_WBITS)
167 try:
168 for chunk in o:
169 if compressed:
170 try:
171 chunk = d.decompress(chunk)
172 except zlib.error:
173 raise BadFileDownload() # bad compressed data
174 parts = chunk.split('\n')
175 parts[0] = last_part + parts[0]
176 for part in parts[:-1]:
177 yield part
178 last_part = parts[-1]
179 if last_part:
180 yield last_part
181 except ChunkReadTimeout:
182 raise BadFileDownload()
183
184 def generate_keylist_mapping(self):
185 keylist = {}
186 for plugin in self.plugins:
187 plugin_keylist = self.plugins[plugin]['instance'].keylist_mapping()
188 if not plugin_keylist:
189 continue
190 for k, v in plugin_keylist.items():
191 o = keylist.get(k)
192 if o:
193 if isinstance(o, set):
194 if isinstance(v, set):
195 o.update(v)
196 else:
197 o.update([v])
198 else:
199 o = set(o)
200 if isinstance(v, set):
201 o.update(v)
202 else:
203 o.update([v])
204 else:
205 o = v
206 keylist[k] = o
207 return keylist
208
209
210class LogProcessorDaemon(Daemon):
211 def __init__(self, conf):
212 c = conf.get('log-processor')
213 super(LogProcessorDaemon, self).__init__(c)
214 self.total_conf = conf
215 self.logger = get_logger(c)
216 self.log_processor = LogProcessor(conf, self.logger)
217 self.lookback_hours = int(c.get('lookback_hours', '120'))
218 self.lookback_window = int(c.get('lookback_window',
219 str(self.lookback_hours)))
220 self.log_processor_account = c['swift_account']
221 self.log_processor_container = c.get('container_name',
222 'log_processing_data')
223
224 def run_once(self):
225 self.logger.info("Beginning log processing")
226 start = time.time()
227 if self.lookback_hours == 0:
228 lookback_start = None
229 lookback_end = None
230 else:
231 lookback_start = datetime.datetime.now() - \
232 datetime.timedelta(hours=self.lookback_hours)
233 lookback_start = lookback_start.strftime('%Y%m%d%H')
234 if self.lookback_window == 0:
235 lookback_end = None
236 else:
237 lookback_end = datetime.datetime.now() - \
238 datetime.timedelta(hours=self.lookback_hours) + \
239 datetime.timedelta(hours=self.lookback_window)
240 lookback_end = lookback_end.strftime('%Y%m%d%H')
241 self.logger.debug('lookback_start: %s' % lookback_start)
242 self.logger.debug('lookback_end: %s' % lookback_end)
243 try:
244 processed_files_stream = self.log_processor.get_object_data(
245 self.log_processor_account,
246 self.log_processor_container,
247 'processed_files.pickle.gz',
248 compressed=True)
249 buf = '\n'.join(x for x in processed_files_stream)
250 if buf:
251 already_processed_files = cPickle.loads(buf)
252 else:
253 already_processed_files = set()
254 except:
255 already_processed_files = set()
256 self.logger.debug('found %d processed files' % \
257 len(already_processed_files))
258 logs_to_process = self.log_processor.get_data_list(lookback_start,
259 lookback_end,
260 already_processed_files)
261 self.logger.info('loaded %d files to process' % len(logs_to_process))
262 if not logs_to_process:
263 self.logger.info("Log processing done (%0.2f minutes)" %
264 ((time.time()-start)/60))
265 return
266
267 # map
268 processor_args = (self.total_conf, self.logger)
269 results = multiprocess_collate(processor_args, logs_to_process)
270
271 #reduce
272 aggr_data = {}
273 processed_files = already_processed_files
274 for item, data in results:
275 # since item contains the plugin and the log name, new plugins will
276 # "reprocess" the file and the results will be in the final csv.
277 processed_files.add(item)
278 for k, d in data.items():
279 existing_data = aggr_data.get(k, {})
280 for i, j in d.items():
281 current = existing_data.get(i, 0)
282 # merging strategy for key collisions is addition
283 # processing plugins need to realize this
284 existing_data[i] = current + j
285 aggr_data[k] = existing_data
286
287 # group
288 # reduce a large number of keys in aggr_data[k] to a small number of
289 # output keys
290 keylist_mapping = self.log_processor.generate_keylist_mapping()
291 final_info = collections.defaultdict(dict)
292 for account, data in aggr_data.items():
293 for key, mapping in keylist_mapping.items():
294 if isinstance(mapping, (list, set)):
295 value = 0
296 for k in mapping:
297 try:
298 value += data[k]
299 except KeyError:
300 pass
301 else:
302 try:
303 value = data[mapping]
304 except KeyError:
305 value = 0
306 final_info[account][key] = value
307
308 # output
309 sorted_keylist_mapping = sorted(keylist_mapping)
310 columns = 'bill_ts,data_ts,account,' + ','.join(sorted_keylist_mapping)
311 print columns
312 for (account, year, month, day, hour), d in final_info.items():
313 bill_ts = ''
314 data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour)
315 row = [bill_ts, data_ts]
316 row.append('%s' % account)
317 for k in sorted_keylist_mapping:
318 row.append('%s'%d[k])
319 print ','.join(row)
320
321 # cleanup
322 s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
323 f = cStringIO.StringIO(s)
324 self.log_processor.internal_proxy.upload_file(f,
325 self.log_processor_account,
326 self.log_processor_container,
327 'processed_files.pickle.gz')
328
329 self.logger.info("Log processing done (%0.2f minutes)" %
330 ((time.time()-start)/60))
331
332def multiprocess_collate(processor_args, logs_to_process):
333 '''yield hourly data from logs_to_process'''
334 worker_count = multiprocessing.cpu_count()
335 results = []
336 in_queue = multiprocessing.Queue()
337 out_queue = multiprocessing.Queue()
338 for _ in range(worker_count):
339 p = multiprocessing.Process(target=collate_worker,
340 args=(processor_args,
341 in_queue,
342 out_queue))
343 p.start()
344 results.append(p)
345 for x in logs_to_process:
346 in_queue.put(x)
347 for _ in range(worker_count):
348 in_queue.put(None)
349 count = 0
350 while True:
351 try:
352 item, data = out_queue.get_nowait()
353 count += 1
354 if data:
355 yield item, data
356 if count >= len(logs_to_process):
357 # this implies that one result will come from every request
358 break
359 except Queue.Empty:
360 time.sleep(.1)
361 for r in results:
362 r.join()
363
364def collate_worker(processor_args, in_queue, out_queue):
365 '''worker process for multiprocess_collate'''
366 p = LogProcessor(*processor_args)
367 while True:
368 try:
369 item = in_queue.get_nowait()
370 if item is None:
371 break
372 except Queue.Empty:
373 time.sleep(.1)
374 else:
375 ret = p.process_one_file(*item)
376 out_queue.put((item, ret))
0\ No newline at end of file377\ No newline at end of file
1378
=== added file 'swift/stats/log_uploader.py'
--- swift/stats/log_uploader.py 1970-01-01 00:00:00 +0000
+++ swift/stats/log_uploader.py 2010-09-15 20:11:08 +0000
@@ -0,0 +1,160 @@
1# Copyright (c) 2010 OpenStack, LLC.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16from __future__ import with_statement
17import os
18import hashlib
19import time
20import gzip
21import glob
22from paste.deploy import appconfig
23
24from swift.common.internal_proxy import InternalProxy
25from swift.common.daemon import Daemon
26from swift.common import utils
27
28class LogUploader(Daemon):
29 '''
30 Given a local directory, a swift account, and a container name, LogParser
31 will upload all files in the local directory to the given account/container.
32 All but the newest files will be uploaded, and the files' md5 sum will be
33 computed. The hash is used to prevent duplicate data from being uploaded
34 multiple times in different files (ex: log lines). Since the hash is
35 computed, it is also used as the uploaded object's etag to ensure data
36 integrity.
37
38 Note that after the file is successfully uploaded, it will be unlinked.
39
40 The given proxy server config is used to instantiate a proxy server for
41 the object uploads.
42 '''
43
44 def __init__(self, uploader_conf, plugin_name):
45 super(LogUploader, self).__init__(uploader_conf)
46 log_dir = uploader_conf.get('log_dir', '/var/log/swift/')
47 swift_account = uploader_conf['swift_account']
48 container_name = uploader_conf['container_name']
49 source_filename_format = uploader_conf['source_filename_format']
50 proxy_server_conf_loc = uploader_conf.get('proxy_server_conf',
51 '/etc/swift/proxy-server.conf')
52 proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc,
53 name='proxy-server')
54 if not log_dir.endswith('/'):
55 log_dir = log_dir + '/'
56 self.log_dir = log_dir
57 self.swift_account = swift_account
58 self.container_name = container_name
59 self.filename_format = source_filename_format
60 self.internal_proxy = InternalProxy(proxy_server_conf)
61 log_name = 'swift-log-uploader-%s' % plugin_name
62 self.logger = utils.get_logger(uploader_conf, plugin_name)
63
64 def run_once(self):
65 self.logger.info("Uploading logs")
66 start = time.time()
67 self.upload_all_logs()
68 self.logger.info("Uploading logs complete (%0.2f minutes)" %
69 ((time.time()-start)/60))
70
71 def upload_all_logs(self):
72 i = [(self.filename_format.index(c), c) for c in '%Y %m %d %H'.split()]
73 i.sort()
74 year_offset = month_offset = day_offset = hour_offset = None
75 base_offset = len(self.log_dir)
76 for start, c in i:
77 offset = base_offset + start
78 if c == '%Y':
79 year_offset = offset, offset+4
80 # Add in the difference between len(%Y) and the expanded
81 # version of %Y (????). This makes sure the codes after this
82 # one will align properly in the final filename.
83 base_offset += 2
84 elif c == '%m':
85 month_offset = offset, offset+2
86 elif c == '%d':
87 day_offset = offset, offset+2
88 elif c == '%H':
89 hour_offset = offset, offset+2
90 if not (year_offset and month_offset and day_offset and hour_offset):
91 # don't have all the parts, can't upload anything
92 return
93 glob_pattern = self.filename_format
94 glob_pattern = glob_pattern.replace('%Y', '????', 1)
95 glob_pattern = glob_pattern.replace('%m', '??', 1)
96 glob_pattern = glob_pattern.replace('%d', '??', 1)
97 glob_pattern = glob_pattern.replace('%H', '??', 1)
98 filelist = glob.iglob(os.path.join(self.log_dir, glob_pattern))
99 current_hour = int(time.strftime('%H'))
100 today = int(time.strftime('%Y%m%d'))
101 self.internal_proxy.create_container(self.swift_account,
102 self.container_name)
103 for filename in filelist:
104 try:
105 # From the filename, we need to derive the year, month, day,
106 # and hour for the file. These values are used in the uploaded
107 # object's name, so they should be a reasonably accurate
108 # representation of the time for which the data in the file was
109 # collected. The file's last modified time is not a reliable
110 # representation of the data in the file. For example, an old
111 # log file (from hour A) may be uploaded or moved into the
112 # log_dir in hour Z. The file's modified time will be for hour
113 # Z, and therefore the object's name in the system will not
114 # represent the data in it.
115 # If the filename doesn't match the format, it shouldn't be
116 # uploaded.
117 year = filename[slice(*year_offset)]
118 month = filename[slice(*month_offset)]
119 day = filename[slice(*day_offset)]
120 hour = filename[slice(*hour_offset)]
121 except IndexError:
122 # unexpected filename format, move on
123 self.logger.error("Unexpected log: %s" % filename)
124 continue
125 if (time.time() - os.stat(filename).st_mtime) < 7200:
126 # don't process very new logs
127 self.logger.debug("Skipping log: %s (< 2 hours old)" % filename)
128 continue
129 self.upload_one_log(filename, year, month, day, hour)
130
131 def upload_one_log(self, filename, year, month, day, hour):
132 if os.path.getsize(filename) == 0:
133 self.logger.debug("Log %s is 0 length, skipping" % filename)
134 return
135 self.logger.debug("Processing log: %s" % filename)
136 filehash = hashlib.md5()
137 already_compressed = True if filename.endswith('.gz') else False
138 opener = gzip.open if already_compressed else open
139 f = opener(filename, 'rb')
140 try:
141 for line in f:
142 # filter out bad lines here?
143 filehash.update(line)
144 finally:
145 f.close()
146 filehash = filehash.hexdigest()
147 # By adding a hash to the filename, we ensure that uploaded files
148 # have unique filenames and protect against uploading one file
149 # more than one time. By using md5, we get an etag for free.
150 target_filename = '/'.join([year, month, day, hour, filehash+'.gz'])
151 if self.internal_proxy.upload_file(filename,
152 self.swift_account,
153 self.container_name,
154 target_filename,
155 compress=(not already_compressed)):
156 self.logger.debug("Uploaded log %s to %s" %
157 (filename, target_filename))
158 os.unlink(filename)
159 else:
160 self.logger.error("ERROR: Upload of log %s failed!" % filename)
0161
=== added file 'swift/stats/stats_processor.py'
--- swift/stats/stats_processor.py 1970-01-01 00:00:00 +0000
+++ swift/stats/stats_processor.py 2010-09-15 20:11:08 +0000
@@ -0,0 +1,63 @@
1# Copyright (c) 2010 OpenStack, LLC.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16class StatsLogProcessor(object):
17
18 def __init__(self, conf):
19 pass
20
21 def process(self, obj_stream, account, container, object_name):
22 '''generate hourly groupings of data from one stats log file'''
23 account_totals = {}
24 year, month, day, hour, _ = object_name.split('/')
25 for line in obj_stream:
26 if not line:
27 continue
28 try:
29 (account,
30 container_count,
31 object_count,
32 bytes_used) = line.split(',')
33 account = account.strip('"')
34 container_count = int(container_count.strip('"'))
35 object_count = int(object_count.strip('"'))
36 bytes_used = int(bytes_used.strip('"'))
37 aggr_key = (account, year, month, day, hour)
38 d = account_totals.get(aggr_key, {})
39 d['replica_count'] = d.setdefault('replica_count', 0) + 1
40 d['container_count'] = d.setdefault('container_count', 0) + \
41 container_count
42 d['object_count'] = d.setdefault('object_count', 0) + \
43 object_count
44 d['bytes_used'] = d.setdefault('bytes_used', 0) + \
45 bytes_used
46 account_totals[aggr_key] = d
47 except (IndexError, ValueError):
48 # bad line data
49 pass
50 return account_totals
51
52 def keylist_mapping(self):
53 '''
54 returns a dictionary of final keys mapped to source keys
55 '''
56 keylist_mapping = {
57 # <db key> : <row key> or <set of row keys>
58 'bytes_used': 'bytes_used',
59 'container_count': 'container_count',
60 'object_count': 'object_count',
61 'replica_count': 'replica_count',
62 }
63 return keylist_mapping
064
=== added directory 'test/unit/stats'
=== added file 'test/unit/stats/__init__.py'
=== added file 'test/unit/stats/test_log_processor.py'
--- test/unit/stats/test_log_processor.py 1970-01-01 00:00:00 +0000
+++ test/unit/stats/test_log_processor.py 2010-09-15 20:11:08 +0000
@@ -0,0 +1,161 @@
1import unittest
2
3from swift.stats import log_processor
4
5class DumbLogger(object):
6 def __getattr__(self, n):
7 return self.foo
8
9 def foo(self, *a, **kw):
10 pass
11
12class DumbInternalProxy(object):
13 def get_container_list(self, account, container, marker=None):
14 n = '2010/03/14/13/obj1'
15 if marker is None or n > marker:
16 return [{'name': n}]
17 else:
18 return []
19
20 def get_object(self, account, container, object_name):
21 code = 200
22 if object_name.endswith('.gz'):
23 # same data as below, compressed with gzip -9
24 def data():
25 yield '\x1f\x8b\x08'
26 yield '\x08"\xd79L'
27 yield '\x02\x03te'
28 yield 'st\x00\xcbO'
29 yield '\xca\xe2JI,I'
30 yield '\xe4\x02\x00O\xff'
31 yield '\xa3Y\t\x00\x00\x00'
32 else:
33 def data():
34 yield 'obj\n'
35 yield 'data'
36 return code, data()
37
38class TestLogProcessor(unittest.TestCase):
39
40 access_test_line = 'Jul 9 04:14:30 saio proxy 1.2.3.4 4.5.6.7 '\
41 '09/Jul/2010/04/14/30 GET '\
42 '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\
43 'curl tk4e350daf-9338-4cc6-aabb-090e49babfbd '\
44 '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262'
45 stats_test_line = 'account,1,2,3'
46 proxy_config = {'log-processor': {
47
48 }
49 }
50
51 def test_access_log_line_parser(self):
52 access_proxy_config = self.proxy_config
53 access_proxy_config.update({
54 'log-processor-access': {
55 'source_filename_format':'%Y%m%d%H*',
56 'class_path':
57 'swift.stats.access_processor.AccessLogProcessor'
58 }})
59 p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
60 result = p.plugins['access']['instance'].log_line_parser(self.access_test_line)
61 self.assertEquals(result, {'code': 200,
62 'processing_time': '0.0262',
63 'auth_token': 'tk4e350daf-9338-4cc6-aabb-090e49babfbd',
64 'month': '07',
65 'second': '30',
66 'year': '2010',
67 'query': 'format=json&foo',
68 'tz': '+0000',
69 'http_version': 'HTTP/1.0',
70 'object_name': 'bar',
71 'etag': '-',
72 'foo': 1,
73 'method': 'GET',
74 'trans_id': 'txfa431231-7f07-42fd-8fc7-7da9d8cc1f90',
75 'client_ip': '1.2.3.4',
76 'format': 1,
77 'bytes_out': 95,
78 'container_name': 'foo',
79 'day': '09',
80 'minute': '14',
81 'account': 'acct',
82 'hour': '04',
83 'referrer': '-',
84 'request': '/v1/acct/foo/bar',
85 'user_agent': 'curl',
86 'bytes_in': 6,
87 'lb_ip': '4.5.6.7'})
88
89 def test_process_one_access_file(self):
90 access_proxy_config = self.proxy_config
91 access_proxy_config.update({
92 'log-processor-access': {
93 'source_filename_format':'%Y%m%d%H*',
94 'class_path':
95 'swift.stats.access_processor.AccessLogProcessor'
96 }})
97 p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
98 def get_object_data(*a, **kw):
99 return [self.access_test_line]
100 p.get_object_data = get_object_data
101 result = p.process_one_file('access', 'a', 'c', 'o')
102 expected = {('acct', '2010', '07', '09', '04'):
103 {('public', 'object', 'GET', '2xx'): 1,
104 ('public', 'bytes_out'): 95,
105 'marker_query': 0,
106 'format_query': 1,
107 'delimiter_query': 0,
108 'path_query': 0,
109 ('public', 'bytes_in'): 6,
110 'prefix_query': 0}}
111 self.assertEquals(result, expected)
112
113 def test_get_container_listing(self):
114 p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
115 p.internal_proxy = DumbInternalProxy()
116 result = p.get_container_listing('a', 'foo')
117 expected = ['2010/03/14/13/obj1']
118 self.assertEquals(result, expected)
119 result = p.get_container_listing('a', 'foo', listing_filter=expected)
120 expected = []
121 self.assertEquals(result, expected)
122 result = p.get_container_listing('a', 'foo', start_date='2010031412',
123 end_date='2010031414')
124 expected = ['2010/03/14/13/obj1']
125 self.assertEquals(result, expected)
126 result = p.get_container_listing('a', 'foo', start_date='2010031414')
127 expected = []
128 self.assertEquals(result, expected)
129 result = p.get_container_listing('a', 'foo', start_date='2010031410',
130 end_date='2010031412')
131 expected = []
132 self.assertEquals(result, expected)
133
134 def test_get_object_data(self):
135 p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
136 p.internal_proxy = DumbInternalProxy()
137 result = list(p.get_object_data('a', 'c', 'o', False))
138 expected = ['obj','data']
139 self.assertEquals(result, expected)
140 result = list(p.get_object_data('a', 'c', 'o.gz', True))
141 self.assertEquals(result, expected)
142
143 def test_get_stat_totals(self):
144 stats_proxy_config = self.proxy_config
145 stats_proxy_config.update({
146 'log-processor-stats': {
147 'class_path':
148 'swift.stats.stats_processor.StatsLogProcessor'
149 }})
150 p = log_processor.LogProcessor(stats_proxy_config, DumbLogger())
151 p.internal_proxy = DumbInternalProxy()
152 def get_object_data(*a,**kw):
153 return [self.stats_test_line]
154 p.get_object_data = get_object_data
155 result = p.process_one_file('stats', 'a', 'c', 'y/m/d/h/o')
156 expected = {'account':
157 {'count': 1,
158 'object_count': 2,
159 'container_count': 1,
160 'bytes_used': 3}}
161 self.assertEquals(result, expected)
0\ No newline at end of file162\ No newline at end of file