Merge lp:~notmyname/swift/stats_system into lp:~hudson-openstack/swift/trunk

Proposed by John Dickinson
Status: Superseded
Proposed branch: lp:~notmyname/swift/stats_system
Merge into: lp:~hudson-openstack/swift/trunk
Diff against target: 677 lines (+641/-0)
7 files modified
bin/swift-account-stats-logger.py (+81/-0)
bin/swift-log-uploader (+83/-0)
etc/log-processing.conf-sample (+27/-0)
swift/common/compressed_file_reader.py (+72/-0)
swift/common/internal_proxy.py (+174/-0)
swift/stats/account_stats.py (+69/-0)
swift/stats/log_uploader.py (+135/-0)
To merge this branch: bzr merge lp:~notmyname/swift/stats_system
Reviewer Review Type Date Requested Status
Swift Core security contacts Pending
Review via email: mp+31875@code.launchpad.net

This proposal has been superseded by a proposal from 2010-08-06.

Description of the change

log_uploader and a few supporting libraries as the first part of the stats system

To post a comment you must log in.
lp:~notmyname/swift/stats_system updated
50. By John Dickinson

added account stats logger to stats system

51. By John Dickinson

added log_processor and a stats plugin

52. By John Dickinson

merged with trunk

53. By John Dickinson

added access log processing plugin

54. By John Dickinson

merged with trunk

55. By John Dickinson

merged with trunk

56. By John Dickinson

merged with trunk

57. By John Dickinson

merged with trunk

58. By John Dickinson

merged with trunk

59. By John Dickinson

merged with trunk

60. By John Dickinson

initial tests for the stats system

61. By John Dickinson

first test working

62. By John Dickinson

merged with trunk

63. By John Dickinson

access log parsing tests pass

64. By John Dickinson

added (working) stats tests

65. By John Dickinson

merged with trunk (utils fix)

66. By John Dickinson

updated stats binaries to be DRY compliant

67. By John Dickinson

merged with trunk

68. By John Dickinson

set up log-stats-collector as a daemon process to create csv files

69. By John Dickinson

merged with trunk

70. By John Dickinson

added execute perms to stats processor binaries

71. By John Dickinson

updated config file loading to work with paste.deploy configs

72. By John Dickinson

fixed typos

73. By John Dickinson

fixed internal proxy loading

74. By John Dickinson

made a memcache stub for the internal proxy server

75. By John Dickinson

fixed internal proxy put_container reference

76. By John Dickinson

fixed some log uploading glob patterns

77. By John Dickinson

fixed bug in calculating offsets for filename patterns

78. By John Dickinson

fixed typos in log processor

79. By John Dickinson

fixed get_data_list in log_processor

80. By John Dickinson

added some debug output

81. By John Dickinson

fixed logging and log uploading

82. By John Dickinson

fixed copy/paste errors and missing imports

83. By John Dickinson

added error handling and missing return statement

84. By John Dickinson

handled some typos and better handling of missing data in internal proxy

85. By John Dickinson

fixed tests, typos, and added error handling

86. By John Dickinson

fixed bug in account stats log processing

87. By John Dickinson

fixed listing filter in log processing

88. By John Dickinson

fixed stdout capturing for generating csv files

89. By John Dickinson

fixed replica count reporting error

90. By John Dickinson

fixed lookback in log processor

91. By John Dickinson

merged with trunk

92. By John Dickinson

merged with trunk

93. By John Dickinson

fixed tests to account for changed key name

94. By John Dickinson

fixed test bug

95. By John Dickinson

updated with changes and suggestions from code review

96. By John Dickinson

merged with changes from trunk

97. By John Dickinson

added stats overview

98. By John Dickinson

updated with changes from trunk

99. By John Dickinson

added additional docs

100. By John Dickinson

documentation clarification and pep8 fixes

101. By John Dickinson

added overview stats to the doc index

102. By John Dickinson

made long lines wrap (grr pep8)

103. By John Dickinson

merged with trunk

104. By John Dickinson

merged with trunk

105. By John Dickinson

merged with trunk

106. By John Dickinson

fixed stats docs

107. By John Dickinson

added a bad lines check to the access log parser

108. By John Dickinson

added tests for compressing file reader

109. By John Dickinson

fixed compressing file reader test

110. By John Dickinson

fixed compressing file reader test

111. By John Dickinson

improved compressing file reader test

112. By John Dickinson

fixed compressing file reader test

113. By John Dickinson

made try/except much less inclusive in access log processor

114. By John Dickinson

added keylist mapping tests and fixed other tests

115. By John Dickinson

updated stats system tests

116. By John Dickinson

merged with gholt's test stubs

117. By John Dickinson

updated SAIO instructions for the stats system

118. By John Dickinson

fixed stats system saio docs

119. By John Dickinson

fixed stats system saio docs

120. By John Dickinson

added openstack copyright/license to test_log_processor.py

121. By John Dickinson

improved logging in log processors

122. By John Dickinson

merged with trunk

123. By John Dickinson

fixed missing working directory bug in account stats

124. By John Dickinson

fixed readconf parameter that was broken with a previous merge

125. By John Dickinson

updated setup.py and saio docs for syats system

126. By John Dickinson

added readconf unit test

127. By John Dickinson

updated stats saio docs to create logs with the appropriate permissions

128. By John Dickinson

fixed bug in log processor internal proxy lazy load code

129. By John Dickinson

updated readconf test

130. By John Dickinson

updated readconf test

131. By John Dickinson

updated readconf test

132. By John Dickinson

fixed internal proxy references in log processor

133. By John Dickinson

fixed account stats filename creation

134. By John Dickinson

pep8 tomfoolery

135. By John Dickinson

moved paren

136. By John Dickinson

added lazy load of internal proxy to log processor (you were right clay)

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== added file 'bin/swift-account-stats-logger.py'
--- bin/swift-account-stats-logger.py 1970-01-01 00:00:00 +0000
+++ bin/swift-account-stats-logger.py 2010-08-06 04:11:39 +0000
@@ -0,0 +1,81 @@
1#!/usr/bin/python
2# Copyright (c) 2010 OpenStack, LLC.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import os
18import signal
19import sys
20import time
21from ConfigParser import ConfigParser
22
23from swift.account_stats import AccountStat
24from swift.common import utils
25
26if __name__ == '__main__':
27 if len(sys.argv) < 2:
28 print "Usage: swift-account-stats-logger CONFIG_FILE"
29 sys.exit()
30
31 c = ConfigParser()
32 if not c.read(sys.argv[1]):
33 print "Unable to read config file."
34 sys.exit(1)
35
36 if c.has_section('log-processor-stats'):
37 stats_conf = dict(c.items('log-processor-stats'))
38 else:
39 print "Unable to find log-processor-stats config section in %s." % \
40 sys.argv[1]
41 sys.exit(1)
42
43 # reference this from the account stats conf
44
45 target_dir = stats.conf.get('log_dir', '/var/log/swift')
46 account_server_conf_loc = stats_conf.get('account_server_conf',
47 '/etc/swift/account-server.conf')
48 filename_format = stats.conf['source_filename_format']
49 try:
50 c = ConfigParser()
51 c.read(account_server_conf_loc)
52 account_server_conf = dict(c.items('account-server'))
53 except:
54 print "Unable to load account server conf from %s" % account_server_conf_loc
55 sys.exit(1)
56
57 utils.drop_privileges(account_server_conf.get('user', 'swift'))
58
59 try:
60 os.setsid()
61 except OSError:
62 pass
63
64 logger = utils.get_logger(stats_conf, 'swift-account-stats-logger')
65
66 def kill_children(*args):
67 signal.signal(signal.SIGTERM, signal.SIG_IGN)
68 os.killpg(0, signal.SIGTERM)
69 sys.exit()
70
71 signal.signal(signal.SIGTERM, kill_children)
72
73 stats = AccountStat(filename_format,
74 target_dir,
75 account_server_conf,
76 logger)
77 logger.info("Gathering account stats")
78 start = time.time()
79 stats.find_and_process()
80 logger.info("Gathering account stats complete (%0.2f minutes)" %
81 ((time.time()-start)/60))
082
=== added file 'bin/swift-log-uploader'
--- bin/swift-log-uploader 1970-01-01 00:00:00 +0000
+++ bin/swift-log-uploader 2010-08-06 04:11:39 +0000
@@ -0,0 +1,83 @@
1#!/usr/bin/python
2# Copyright (c) 2010 OpenStack, LLC.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import os
18import signal
19import sys
20import time
21from ConfigParser import ConfigParser
22
23from swift.stats.log_uploader import LogUploader
24from swift.common.utils import get_logger
25
26if __name__ == '__main__':
27 if len(sys.argv) < 3:
28 print "Usage: swift-log-uploader CONFIG_FILE plugin"
29 sys.exit()
30
31 c = ConfigParser()
32 if not c.read(sys.argv[1]):
33 print "Unable to read config file."
34 sys.exit(1)
35
36 if c.has_section('log-processor'):
37 parser_conf = dict(c.items('log-processor'))
38 else:
39 print "Unable to find log-processor config section in %s." % sys.argv[1]
40 sys.exit(1)
41
42 plugin = sys.argv[2]
43 section_name = 'log-processor-%s' % plugin
44 if c.has_section(section_name):
45 uploader_conf.update(dict(c.items(section_name)))
46 else:
47 print "Unable to find %s config section in %s." % (section_name,
48 sys.argv[1])
49 sys.exit(1)
50
51 try:
52 os.setsid()
53 except OSError:
54 pass
55
56 logger = get_logger(uploader_conf, 'swift-log-uploader')
57
58 def kill_children(*args):
59 signal.signal(signal.SIGTERM, signal.SIG_IGN)
60 os.killpg(0, signal.SIGTERM)
61 sys.exit()
62
63 signal.signal(signal.SIGTERM, kill_children)
64
65 log_dir = uploader_conf.get('log_dir', '/var/log/swift/')
66 swift_account = uploader_conf['swift_account']
67 container_name = uploader_conf['container_name']
68 source_filename_format = uploader_conf['source_filename_format']
69 proxy_server_conf_loc = uploader_conf.get('proxy_server_conf',
70 '/etc/swift/proxy-server.conf')
71 try:
72 c = ConfigParser()
73 c.read(proxy_server_conf_loc)
74 proxy_server_conf = dict(c.items('proxy-server'))
75 except:
76 proxy_server_conf = None
77 uploader = LogUploader(log_dir, swift_account, container_name,
78 source_filename_format, proxy_server_conf, logger)
79 logger.info("Uploading logs")
80 start = time.time()
81 uploader.upload_all_logs()
82 logger.info("Uploading logs complete (%0.2f minutes)" %
83 ((time.time()-start)/60))
084
=== added file 'etc/log-processing.conf-sample'
--- etc/log-processing.conf-sample 1970-01-01 00:00:00 +0000
+++ etc/log-processing.conf-sample 2010-08-06 04:11:39 +0000
@@ -0,0 +1,27 @@
1# plugin section format is named "log-processor-<plugin>"
2# section "log-processor" is the generic defaults (overridden by plugins)
3
4[log-processor]
5# working_dir = /tmp/swift/
6# proxy_server_conf = /etc/swift/proxy-server.conf
7# log_facility = LOG_LOCAL0
8# log_level = INFO
9# lookback_hours = 120
10# lookback_window = 120
11
12[log-processor-access]
13# log_dir = /var/log/swift/
14swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
15container_name = log_data
16source_filename_format = %Y%m%d%H*
17class_path = swift.stats.access_processor
18# service ips is for client ip addresses that should be counted as servicenet
19# service_ips =
20
21[log-processor-stats]
22# log_dir = /var/log/swift/
23swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
24container_name = account_stats
25source_filename_format = %Y%m%d%H*
26class_path = swift.stats.stats_processor
27# account_server_conf = /etc/swift/account-server.conf
028
=== added file 'swift/common/compressed_file_reader.py'
--- swift/common/compressed_file_reader.py 1970-01-01 00:00:00 +0000
+++ swift/common/compressed_file_reader.py 2010-08-06 04:11:39 +0000
@@ -0,0 +1,72 @@
1# Copyright (c) 2010 OpenStack, LLC.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import zlib
17import struct
18
19
20class CompressedFileReader(object):
21 '''
22 Wraps a file object and provides a read method that returns gzip'd data.
23
24 One warning: if read is called with a small value, the data returned may
25 be bigger than the value. In this case, the "compressed" data will be
26 bigger than the original data. To solve this, use a bigger read buffer.
27
28 An example use case:
29 Given an uncompressed file on disk, provide a way to read compressed data
30 without buffering the entire file data in memory. Using this class, an
31 uncompressed log file could be uploaded as compressed data with chunked
32 transfer encoding.
33
34 gzip header and footer code taken from the python stdlib gzip module
35
36 :param file_obj: File object to read from
37 :param compresslevel: compression level
38 '''
39 def __init__(self, file_obj, compresslevel=9):
40 self._f = file_obj
41 self._compressor = zlib.compressobj(compresslevel,
42 zlib.DEFLATED,
43 -zlib.MAX_WBITS,
44 zlib.DEF_MEM_LEVEL,
45 0)
46 self.done = False
47 self.first = True
48 self.crc32 = 0
49 self.total_size = 0
50
51 def read(self, *a, **kw):
52 if self.done:
53 return ''
54 x = self._f.read(*a, **kw)
55 if x:
56 self.crc32 = zlib.crc32(x, self.crc32) & 0xffffffffL
57 self.total_size += len(x)
58 compressed = self._compressor.compress(x)
59 if not compressed:
60 compressed = self._compressor.flush(zlib.Z_SYNC_FLUSH)
61 else:
62 compressed = self._compressor.flush(zlib.Z_FINISH)
63 crc32 = struct.pack("<L", self.crc32 & 0xffffffffL)
64 size = struct.pack("<L", self.total_size & 0xffffffffL)
65 footer = crc32 + size
66 compressed += footer
67 self.done = True
68 if self.first:
69 self.first = False
70 header = '\037\213\010\000\000\000\000\000\002\377'
71 compressed = header + compressed
72 return compressed
073
=== added file 'swift/common/internal_proxy.py'
--- swift/common/internal_proxy.py 1970-01-01 00:00:00 +0000
+++ swift/common/internal_proxy.py 2010-08-06 04:11:39 +0000
@@ -0,0 +1,174 @@
1# Copyright (c) 2010 OpenStack, LLC.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import webob
17from urllib import quote, unquote
18from json import loads as json_loads
19
20from swift.common.compressed_file_reader import CompressedFileReader
21from swift.proxy.server import BaseApplication
22
23
24class InternalProxy(object):
25 """
26 Set up a private instance of a proxy server that allows normal requests
27 to be made without having to actually send the request to the proxy.
28 This also doesn't log the requests to the normal proxy logs.
29
30 :param proxy_server_conf: proxy server configuration dictionary
31 :param logger: logger to log requests to
32 :param retries: number of times to retry each request
33 """
34 def __init__(self, proxy_server_conf=None, logger=None, retries=0):
35 self.upload_app = BaseApplication(proxy_server_conf, logger)
36 self.retries = retries
37
38 def upload_file(self, source_file, account, container, object_name,
39 compress=True, content_type='application/x-gzip'):
40 """
41 Upload a file to cloud files.
42
43 :param source_file: path to or file like object to upload
44 :param account: account to upload to
45 :param container: container to upload to
46 :param object_name: name of object being uploaded
47 :param compress: if True, compresses object as it is uploaded
48 :param content_type: content-type of object
49 :returns: True if successful, False otherwise
50 """
51 log_create_pattern = '/v1/%s/%s/%s' % (account, container, object_name)
52
53 # create the container
54 if not self.put_container(account, container):
55 return False
56
57 # upload the file to the account
58 req = webob.Request.blank(log_create_pattern,
59 environ={'REQUEST_METHOD': 'PUT'},
60 headers={'Transfer-Encoding': 'chunked'})
61 if compress:
62 if hasattr(source_file, 'read'):
63 compressed_file = CompressedFileReader(source_file)
64 else:
65 compressed_file = CompressedFileReader(open(source_file, 'rb'))
66 req.body_file = compressed_file
67 else:
68 if not hasattr(source_file, 'read'):
69 source_file = open(source_file, 'rb')
70 req.body_file = source_file
71 req.account = account
72 req.content_type = content_type
73 req.content_length = None # to make sure we send chunked data
74 resp = self.upload_app.handle_request(self.upload_app.update_request(req))
75 tries = 1
76 while (resp.status_int < 200 or resp.status_int > 299) \
77 and tries <= self.retries:
78 resp = self.upload_app.handle_request(self.upload_app.update_request(req))
79 tries += 1
80 if not (200 <= resp.status_int < 300):
81 return False
82 return True
83
84 def get_object(self, account, container, object_name):
85 """
86 Get object.
87
88 :param account: account name object is in
89 :param container: container name object is in
90 :param object_name: name of object to get
91 :returns: iterator for object data
92 """
93 req = webob.Request.blank('/v1/%s/%s/%s' %
94 (account, container, object_name),
95 environ={'REQUEST_METHOD': 'GET'})
96 req.account = account
97 resp = self.upload_app.handle_request(self.upload_app.update_request(req))
98 tries = 1
99 while (resp.status_int < 200 or resp.status_int > 299) \
100 and tries <= self.retries:
101 resp = self.upload_app.handle_request(self.upload_app.update_request(req))
102 tries += 1
103 for x in resp.app_iter:
104 yield x
105
106 def create_container(self, account, container):
107 """
108 Create container.
109
110 :param account: account name to put the container in
111 :param container: container name to create
112 :returns: True if successful, otherwise False
113 """
114 req = webob.Request.blank('/v1/%s/%s' % (account, container),
115 environ={'REQUEST_METHOD': 'PUT'})
116 req.account = account
117 resp = self.upload_app.handle_request(self.upload_app.update_request(req))
118 tries = 1
119 while (resp.status_int < 200 or resp.status_int > 299) \
120 and tries <= self.retries:
121 resp = self.upload_app.handle_request(self.upload_app.update_request(req))
122 tries += 1
123 return 200 <= resp.status_int < 300
124
125 def get_container_list(self, account, container, marker=None, limit=None,
126 prefix=None, delimiter=None, full_listing=True):
127 """
128 Get container listing.
129
130 :param account: account name for the container
131 :param container: container name to get the listing of
132 :param marker: marker query
133 :param limit: limit to query
134 :param prefix: prefix query
135 :param delimeter: delimeter for query
136 :param full_listing: if True, make enough requests to get all listings
137 :returns: list of objects
138 """
139 if full_listing:
140 rv = []
141 listing = self.get_container_list(account, container, marker,
142 limit, prefix, delimiter, full_listing=False)
143 while listing:
144 rv.extend(listing)
145 if not delimiter:
146 marker = listing[-1]['name']
147 else:
148 marker = listing[-1].get('name', listing[-1].get('subdir'))
149 listing = self.get_container_list(account, container, marker,
150 limit, prefix, delimiter, full_listing=False)
151 return rv
152 path = '/v1/%s/%s' % (account, container)
153 qs = 'format=json'
154 if marker:
155 qs += '&marker=%s' % quote(marker)
156 if limit:
157 qs += '&limit=%d' % limit
158 if prefix:
159 qs += '&prefix=%s' % quote(prefix)
160 if delimiter:
161 qs += '&delimiter=%s' % quote(delimiter)
162 path += '?%s' % qs
163 req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'GET'})
164 req.account = account
165 resp = self.upload_app.handle_request(self.upload_app.update_request(req))
166 tries = 1
167 while (resp.status_int < 200 or resp.status_int > 299) \
168 and tries <= self.retries:
169 resp = self.upload_app.handle_request(self.upload_app.update_request(req))
170 tries += 1
171 if resp.status_int == 204:
172 return []
173 if 200 <= resp.status_int < 300:
174 return json_loads(resp.body)
0175
=== added directory 'swift/stats'
=== added file 'swift/stats/__init__.py'
=== added file 'swift/stats/account_stats.py'
--- swift/stats/account_stats.py 1970-01-01 00:00:00 +0000
+++ swift/stats/account_stats.py 2010-08-06 04:11:39 +0000
@@ -0,0 +1,69 @@
1# Copyright (c) 2010 OpenStack, LLC.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import os
17import time
18
19from swift.account.server import DATADIR as account_server_data_dir
20from swift.common.db import AccountBroker
21from swift.common.internal_proxy import InternalProxy
22from swift.common.utils import renamer
23
24class AccountStat(object):
25 def __init__(self, filename_format, target_dir, server_conf, logger):
26 self.filename_format = filename_format
27 self.target_dir = target_dir
28 self.devices = server_conf.get('devices', '/srv/node')
29 self.mount_check = server_conf.get('mount_check', 'true').lower() in \
30 ('true', 't', '1', 'on', 'yes', 'y')
31 self.logger = logger
32
33 def find_and_process(self):
34 src_filename = time.strftime(self.filename_format)
35 tmp_filename = os.path.join('/tmp', src_filename)
36 with open(tmp_filename, 'wb') as statfile:
37 #statfile.write('Account Name, Container Count, Object Count, Bytes Used, Created At\n')
38 for device in os.listdir(self.devices):
39 if self.mount_check and \
40 not os.path.ismount(os.path.join(self.devices, device)):
41 self.logger.error("Device %s is not mounted, skipping." %
42 device)
43 continue
44 accounts = os.path.join(self.devices,
45 device,
46 account_server_data_dir)
47 if not os.path.exists(accounts):
48 self.logger.debug("Path %s does not exist, skipping." %
49 accounts)
50 continue
51 for root, dirs, files in os.walk(accounts, topdown=False):
52 for filename in files:
53 if filename.endswith('.db'):
54 broker = AccountBroker(os.path.join(root, filename))
55 if not broker.is_deleted():
56 account_name,
57 created_at,
58 _, _,
59 container_count,
60 object_count,
61 bytes_used,
62 _, _ = broker.get_info()
63 line_data = '"%s",%d,%d,%d,%s\n' % (account_name,
64 container_count,
65 object_count,
66 bytes_used,
67 created_at)
68 statfile.write(line_data)
69 renamer(tmp_filename, os.path.join(self.target_dir, src_filename))
070
=== added file 'swift/stats/log_uploader.py'
--- swift/stats/log_uploader.py 1970-01-01 00:00:00 +0000
+++ swift/stats/log_uploader.py 2010-08-06 04:11:39 +0000
@@ -0,0 +1,135 @@
1# Copyright (c) 2010 OpenStack, LLC.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16from __future__ import with_statement
17import os
18import hashlib
19import time
20import gzip
21import glob
22
23from swift.common.internal_proxy import InternalProxy
24
25class LogUploader(object):
26 '''
27 Given a local directory, a swift account, and a container name, LogParser
28 will upload all files in the local directory to the given account/container.
29 All but the newest files will be uploaded, and the files' md5 sum will be
30 computed. The hash is used to prevent duplicate data from being uploaded
31 multiple times in different files (ex: log lines). Since the hash is
32 computed, it is also used as the uploaded object's etag to ensure data
33 integrity.
34
35 Note that after the file is successfully uploaded, it will be unlinked.
36
37 The given proxy server config is used to instantiate a proxy server for
38 the object uploads.
39 '''
40
41 def __init__(self, log_dir, swift_account, container_name, filename_format,
42 proxy_server_conf, logger):
43 if not log_dir.endswith('/'):
44 log_dir = log_dir + '/'
45 self.log_dir = log_dir
46 self.swift_account = swift_account
47 self.container_name = container_name
48 self.filename_format = filename_format
49 self.internal_proxy = InternalProxy(proxy_server_conf, logger)
50 self.logger = logger
51
52 def upload_all_logs(self):
53 i = [(c,self.filename_format.index(c)) for c in '%Y %m %d %H'.split()]
54 i.sort()
55 year_offset = month_offset = day_offset = hour_offset = None
56 for c, start in i:
57 if c == '%Y':
58 year_offset = start, start+4
59 elif c == '%m':
60 month_offset = start, start+2
61 elif c == '%d':
62 day_offset = start, start+2
63 elif c == '%H':
64 hour_offset = start, start+2
65 if not (year_offset and month_offset and day_offset and hour_offset):
66 # don't have all the parts, can't upload anything
67 return
68 glob_pattern = self.filename_format
69 glob_pattern = glob_pattern.replace('%Y', '????')
70 glob_pattern = glob_pattern.replace('%m', '??')
71 glob_pattern = glob_pattern.replace('%d', '??')
72 glob_pattern = glob_pattern.replace('%H', '??')
73 filelist = glob.iglob(os.path.join(self.log_dir, glob_pattern))
74 current_hour = int(time.strftime('%H'))
75 today = int(time.strftime('%Y%m%d'))
76 self.internal_proxy.create_container(self.swift_account,
77 self.container_name)
78 for filename in filelist:
79 try:
80 # From the filename, we need to derive the year, month, day,
81 # and hour for the file. These values are used in the uploaded
82 # object's name, so they should be a reasonably accurate
83 # representation of the time for which the data in the file was
84 # collected. The file's last modified time is not a reliable
85 # representation of the data in the file. For example, an old
86 # log file (from hour A) may be uploaded or moved into the
87 # log_dir in hour Z. The file's modified time will be for hour
88 # Z, and therefore the object's name in the system will not
89 # represent the data in it.
90 # If the filename doesn't match the format, it shouldn't be
91 # uploaded.
92 year = filename[slice(*year_offset)]
93 month = filename[slice(*month_offset)]
94 day = filename[slice(*day_offset)]
95 hour = filename[slice(*hour_offset)]
96 except IndexError:
97 # unexpected filename format, move on
98 self.logger.error("Unexpected log: %s" % filename)
99 continue
100 if (time.time() - os.stat(filename).st_mtime) < 7200:
101 # don't process very new logs
102 self.logger.debug("Skipping log: %s (< 2 hours old)" % filename)
103 continue
104 self.upload_one_log(filename, year, month, day, hour)
105
106 def upload_one_log(self, filename, year, month, day, hour):
107 if os.path.getsize(filename) == 0:
108 self.logger.debug("Log %s is 0 length, skipping" % filename)
109 return
110 self.logger.debug("Processing log: %s" % filename)
111 filehash = hashlib.md5()
112 already_compressed = True if filename.endswith('.gz') else False
113 opener = gzip.open if already_compressed else open
114 f = opener(filename, 'rb')
115 try:
116 for line in f:
117 # filter out bad lines here?
118 filehash.update(line)
119 finally:
120 f.close()
121 filehash = filehash.hexdigest()
122 # By adding a hash to the filename, we ensure that uploaded files
123 # have unique filenames and protect against uploading one file
124 # more than one time. By using md5, we get an etag for free.
125 target_filename = '/'.join([year, month, day, hour, filehash+'.gz'])
126 if self.internal_proxy.upload_file(filename,
127 self.swift_account,
128 self.container_name,
129 target_filename,
130 compress=(not already_compressed)):
131 self.logger.debug("Uploaded log %s to %s" %
132 (filename, target_filename))
133 os.unlink(filename)
134 else:
135 self.logger.error("ERROR: Upload of log %s failed!" % filename)