Merge lp:~notmyname/swift/stats_system into lp:~hudson-openstack/swift/trunk

Proposed by John Dickinson
Status: Superseded
Proposed branch: lp:~notmyname/swift/stats_system
Merge into: lp:~hudson-openstack/swift/trunk
Diff against target: 677 lines (+641/-0)
7 files modified
bin/swift-account-stats-logger.py (+81/-0)
bin/swift-log-uploader (+83/-0)
etc/log-processing.conf-sample (+27/-0)
swift/common/compressed_file_reader.py (+72/-0)
swift/common/internal_proxy.py (+174/-0)
swift/stats/account_stats.py (+69/-0)
swift/stats/log_uploader.py (+135/-0)
To merge this branch: bzr merge lp:~notmyname/swift/stats_system
Reviewer Review Type Date Requested Status
Swift Core security contacts Pending
Review via email: mp+31875@code.launchpad.net

This proposal has been superseded by a proposal from 2010-08-06.

Description of the change

log_uploader and a few supporting libraries as the first part of the stats system

To post a comment you must log in.
lp:~notmyname/swift/stats_system updated
50. By John Dickinson

added account stats logger to stats system

51. By John Dickinson

added log_processor and a stats plugin

52. By John Dickinson

merged with trunk

53. By John Dickinson

added access log processing plugin

54. By John Dickinson

merged with trunk

55. By John Dickinson

merged with trunk

56. By John Dickinson

merged with trunk

57. By John Dickinson

merged with trunk

58. By John Dickinson

merged with trunk

59. By John Dickinson

merged with trunk

60. By John Dickinson

initial tests for the stats system

61. By John Dickinson

first test working

62. By John Dickinson

merged with trunk

63. By John Dickinson

access log parsing tests pass

64. By John Dickinson

added (working) stats tests

65. By John Dickinson

merged with trunk (utils fix)

66. By John Dickinson

updated stats binaries to be DRY compliant

67. By John Dickinson

merged with trunk

68. By John Dickinson

set up log-stats-collector as a daemon process to create csv files

69. By John Dickinson

merged with trunk

70. By John Dickinson

added execute perms to stats processor binaries

71. By John Dickinson

updated config file loading to work with paste.deploy configs

72. By John Dickinson

fixed typos

73. By John Dickinson

fixed internal proxy loading

74. By John Dickinson

made a memcache stub for the internal proxy server

75. By John Dickinson

fixed internal proxy put_container reference

76. By John Dickinson

fixed some log uploading glob patterns

77. By John Dickinson

fixed bug in calculating offsets for filename patterns

78. By John Dickinson

fixed typos in log processor

79. By John Dickinson

fixed get_data_list in log_processor

80. By John Dickinson

added some debug output

81. By John Dickinson

fixed logging and log uploading

82. By John Dickinson

fixed copy/paste errors and missing imports

83. By John Dickinson

added error handling and missing return statement

84. By John Dickinson

handled some typos and better handling of missing data in internal proxy

85. By John Dickinson

fixed tests, typos, and added error handling

86. By John Dickinson

fixed bug in account stats log processing

87. By John Dickinson

fixed listing filter in log processing

88. By John Dickinson

fixed stdout capturing for generating csv files

89. By John Dickinson

fixed replica count reporting error

90. By John Dickinson

fixed lookback in log processor

91. By John Dickinson

merged with trunk

92. By John Dickinson

merged with trunk

93. By John Dickinson

fixed tests to account for changed key name

94. By John Dickinson

fixed test bug

95. By John Dickinson

updated with changes and suggestions from code review

96. By John Dickinson

merged with changes from trunk

97. By John Dickinson

added stats overview

98. By John Dickinson

updated with changes from trunk

99. By John Dickinson

added additional docs

100. By John Dickinson

documentation clarification and pep8 fixes

101. By John Dickinson

added overview stats to the doc index

102. By John Dickinson

made long lines wrap (grr pep8)

103. By John Dickinson

merged with trunk

104. By John Dickinson

merged with trunk

105. By John Dickinson

merged with trunk

106. By John Dickinson

fixed stats docs

107. By John Dickinson

added a bad lines check to the access log parser

108. By John Dickinson

added tests for compressing file reader

109. By John Dickinson

fixed compressing file reader test

110. By John Dickinson

fixed compressing file reader test

111. By John Dickinson

improved compressing file reader test

112. By John Dickinson

fixed compressing file reader test

113. By John Dickinson

made try/except much less inclusive in access log processor

114. By John Dickinson

added keylist mapping tests and fixed other tests

115. By John Dickinson

updated stats system tests

116. By John Dickinson

merged with gholt's test stubs

117. By John Dickinson

updated SAIO instructions for the stats system

118. By John Dickinson

fixed stats system saio docs

119. By John Dickinson

fixed stats system saio docs

120. By John Dickinson

added openstack copyright/license to test_log_processor.py

121. By John Dickinson

improved logging in log processors

122. By John Dickinson

merged with trunk

123. By John Dickinson

fixed missing working directory bug in account stats

124. By John Dickinson

fixed readconf parameter that was broken with a previous merge

125. By John Dickinson

updated setup.py and saio docs for syats system

126. By John Dickinson

added readconf unit test

127. By John Dickinson

updated stats saio docs to create logs with the appropriate permissions

128. By John Dickinson

fixed bug in log processor internal proxy lazy load code

129. By John Dickinson

updated readconf test

130. By John Dickinson

updated readconf test

131. By John Dickinson

updated readconf test

132. By John Dickinson

fixed internal proxy references in log processor

133. By John Dickinson

fixed account stats filename creation

134. By John Dickinson

pep8 tomfoolery

135. By John Dickinson

moved paren

136. By John Dickinson

added lazy load of internal proxy to log processor (you were right clay)

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'bin/swift-account-stats-logger.py'
2--- bin/swift-account-stats-logger.py 1970-01-01 00:00:00 +0000
3+++ bin/swift-account-stats-logger.py 2010-08-06 04:11:39 +0000
4@@ -0,0 +1,81 @@
5+#!/usr/bin/python
6+# Copyright (c) 2010 OpenStack, LLC.
7+#
8+# Licensed under the Apache License, Version 2.0 (the "License");
9+# you may not use this file except in compliance with the License.
10+# You may obtain a copy of the License at
11+#
12+# http://www.apache.org/licenses/LICENSE-2.0
13+#
14+# Unless required by applicable law or agreed to in writing, software
15+# distributed under the License is distributed on an "AS IS" BASIS,
16+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
17+# implied.
18+# See the License for the specific language governing permissions and
19+# limitations under the License.
20+
21+import os
22+import signal
23+import sys
24+import time
25+from ConfigParser import ConfigParser
26+
27+from swift.account_stats import AccountStat
28+from swift.common import utils
29+
30+if __name__ == '__main__':
31+ if len(sys.argv) < 2:
32+ print "Usage: swift-account-stats-logger CONFIG_FILE"
33+ sys.exit()
34+
35+ c = ConfigParser()
36+ if not c.read(sys.argv[1]):
37+ print "Unable to read config file."
38+ sys.exit(1)
39+
40+ if c.has_section('log-processor-stats'):
41+ stats_conf = dict(c.items('log-processor-stats'))
42+ else:
43+ print "Unable to find log-processor-stats config section in %s." % \
44+ sys.argv[1]
45+ sys.exit(1)
46+
47+ # reference this from the account stats conf
48+
49+ target_dir = stats.conf.get('log_dir', '/var/log/swift')
50+ account_server_conf_loc = stats_conf.get('account_server_conf',
51+ '/etc/swift/account-server.conf')
52+ filename_format = stats.conf['source_filename_format']
53+ try:
54+ c = ConfigParser()
55+ c.read(account_server_conf_loc)
56+ account_server_conf = dict(c.items('account-server'))
57+ except:
58+ print "Unable to load account server conf from %s" % account_server_conf_loc
59+ sys.exit(1)
60+
61+ utils.drop_privileges(account_server_conf.get('user', 'swift'))
62+
63+ try:
64+ os.setsid()
65+ except OSError:
66+ pass
67+
68+ logger = utils.get_logger(stats_conf, 'swift-account-stats-logger')
69+
70+ def kill_children(*args):
71+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
72+ os.killpg(0, signal.SIGTERM)
73+ sys.exit()
74+
75+ signal.signal(signal.SIGTERM, kill_children)
76+
77+ stats = AccountStat(filename_format,
78+ target_dir,
79+ account_server_conf,
80+ logger)
81+ logger.info("Gathering account stats")
82+ start = time.time()
83+ stats.find_and_process()
84+ logger.info("Gathering account stats complete (%0.2f minutes)" %
85+ ((time.time()-start)/60))
86
87=== added file 'bin/swift-log-uploader'
88--- bin/swift-log-uploader 1970-01-01 00:00:00 +0000
89+++ bin/swift-log-uploader 2010-08-06 04:11:39 +0000
90@@ -0,0 +1,83 @@
91+#!/usr/bin/python
92+# Copyright (c) 2010 OpenStack, LLC.
93+#
94+# Licensed under the Apache License, Version 2.0 (the "License");
95+# you may not use this file except in compliance with the License.
96+# You may obtain a copy of the License at
97+#
98+# http://www.apache.org/licenses/LICENSE-2.0
99+#
100+# Unless required by applicable law or agreed to in writing, software
101+# distributed under the License is distributed on an "AS IS" BASIS,
102+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
103+# implied.
104+# See the License for the specific language governing permissions and
105+# limitations under the License.
106+
107+import os
108+import signal
109+import sys
110+import time
111+from ConfigParser import ConfigParser
112+
113+from swift.stats.log_uploader import LogUploader
114+from swift.common.utils import get_logger
115+
116+if __name__ == '__main__':
117+ if len(sys.argv) < 3:
118+ print "Usage: swift-log-uploader CONFIG_FILE plugin"
119+ sys.exit()
120+
121+ c = ConfigParser()
122+ if not c.read(sys.argv[1]):
123+ print "Unable to read config file."
124+ sys.exit(1)
125+
126+ if c.has_section('log-processor'):
127+ parser_conf = dict(c.items('log-processor'))
128+ else:
129+ print "Unable to find log-processor config section in %s." % sys.argv[1]
130+ sys.exit(1)
131+
132+ plugin = sys.argv[2]
133+ section_name = 'log-processor-%s' % plugin
134+ if c.has_section(section_name):
135+ uploader_conf.update(dict(c.items(section_name)))
136+ else:
137+ print "Unable to find %s config section in %s." % (section_name,
138+ sys.argv[1])
139+ sys.exit(1)
140+
141+ try:
142+ os.setsid()
143+ except OSError:
144+ pass
145+
146+ logger = get_logger(uploader_conf, 'swift-log-uploader')
147+
148+ def kill_children(*args):
149+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
150+ os.killpg(0, signal.SIGTERM)
151+ sys.exit()
152+
153+ signal.signal(signal.SIGTERM, kill_children)
154+
155+ log_dir = uploader_conf.get('log_dir', '/var/log/swift/')
156+ swift_account = uploader_conf['swift_account']
157+ container_name = uploader_conf['container_name']
158+ source_filename_format = uploader_conf['source_filename_format']
159+ proxy_server_conf_loc = uploader_conf.get('proxy_server_conf',
160+ '/etc/swift/proxy-server.conf')
161+ try:
162+ c = ConfigParser()
163+ c.read(proxy_server_conf_loc)
164+ proxy_server_conf = dict(c.items('proxy-server'))
165+ except:
166+ proxy_server_conf = None
167+ uploader = LogUploader(log_dir, swift_account, container_name,
168+ source_filename_format, proxy_server_conf, logger)
169+ logger.info("Uploading logs")
170+ start = time.time()
171+ uploader.upload_all_logs()
172+ logger.info("Uploading logs complete (%0.2f minutes)" %
173+ ((time.time()-start)/60))
174
175=== added file 'etc/log-processing.conf-sample'
176--- etc/log-processing.conf-sample 1970-01-01 00:00:00 +0000
177+++ etc/log-processing.conf-sample 2010-08-06 04:11:39 +0000
178@@ -0,0 +1,27 @@
179+# plugin section format is named "log-processor-<plugin>"
180+# section "log-processor" is the generic defaults (overridden by plugins)
181+
182+[log-processor]
183+# working_dir = /tmp/swift/
184+# proxy_server_conf = /etc/swift/proxy-server.conf
185+# log_facility = LOG_LOCAL0
186+# log_level = INFO
187+# lookback_hours = 120
188+# lookback_window = 120
189+
190+[log-processor-access]
191+# log_dir = /var/log/swift/
192+swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
193+container_name = log_data
194+source_filename_format = %Y%m%d%H*
195+class_path = swift.stats.access_processor
196+# service ips is for client ip addresses that should be counted as servicenet
197+# service_ips =
198+
199+[log-processor-stats]
200+# log_dir = /var/log/swift/
201+swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
202+container_name = account_stats
203+source_filename_format = %Y%m%d%H*
204+class_path = swift.stats.stats_processor
205+# account_server_conf = /etc/swift/account-server.conf
206
207=== added file 'swift/common/compressed_file_reader.py'
208--- swift/common/compressed_file_reader.py 1970-01-01 00:00:00 +0000
209+++ swift/common/compressed_file_reader.py 2010-08-06 04:11:39 +0000
210@@ -0,0 +1,72 @@
211+# Copyright (c) 2010 OpenStack, LLC.
212+#
213+# Licensed under the Apache License, Version 2.0 (the "License");
214+# you may not use this file except in compliance with the License.
215+# You may obtain a copy of the License at
216+#
217+# http://www.apache.org/licenses/LICENSE-2.0
218+#
219+# Unless required by applicable law or agreed to in writing, software
220+# distributed under the License is distributed on an "AS IS" BASIS,
221+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
222+# implied.
223+# See the License for the specific language governing permissions and
224+# limitations under the License.
225+
226+import zlib
227+import struct
228+
229+
230+class CompressedFileReader(object):
231+ '''
232+ Wraps a file object and provides a read method that returns gzip'd data.
233+
234+ One warning: if read is called with a small value, the data returned may
235+ be bigger than the value. In this case, the "compressed" data will be
236+ bigger than the original data. To solve this, use a bigger read buffer.
237+
238+ An example use case:
239+ Given an uncompressed file on disk, provide a way to read compressed data
240+ without buffering the entire file data in memory. Using this class, an
241+ uncompressed log file could be uploaded as compressed data with chunked
242+ transfer encoding.
243+
244+ gzip header and footer code taken from the python stdlib gzip module
245+
246+ :param file_obj: File object to read from
247+ :param compresslevel: compression level
248+ '''
249+ def __init__(self, file_obj, compresslevel=9):
250+ self._f = file_obj
251+ self._compressor = zlib.compressobj(compresslevel,
252+ zlib.DEFLATED,
253+ -zlib.MAX_WBITS,
254+ zlib.DEF_MEM_LEVEL,
255+ 0)
256+ self.done = False
257+ self.first = True
258+ self.crc32 = 0
259+ self.total_size = 0
260+
261+ def read(self, *a, **kw):
262+ if self.done:
263+ return ''
264+ x = self._f.read(*a, **kw)
265+ if x:
266+ self.crc32 = zlib.crc32(x, self.crc32) & 0xffffffffL
267+ self.total_size += len(x)
268+ compressed = self._compressor.compress(x)
269+ if not compressed:
270+ compressed = self._compressor.flush(zlib.Z_SYNC_FLUSH)
271+ else:
272+ compressed = self._compressor.flush(zlib.Z_FINISH)
273+ crc32 = struct.pack("<L", self.crc32 & 0xffffffffL)
274+ size = struct.pack("<L", self.total_size & 0xffffffffL)
275+ footer = crc32 + size
276+ compressed += footer
277+ self.done = True
278+ if self.first:
279+ self.first = False
280+ header = '\037\213\010\000\000\000\000\000\002\377'
281+ compressed = header + compressed
282+ return compressed
283
284=== added file 'swift/common/internal_proxy.py'
285--- swift/common/internal_proxy.py 1970-01-01 00:00:00 +0000
286+++ swift/common/internal_proxy.py 2010-08-06 04:11:39 +0000
287@@ -0,0 +1,174 @@
288+# Copyright (c) 2010 OpenStack, LLC.
289+#
290+# Licensed under the Apache License, Version 2.0 (the "License");
291+# you may not use this file except in compliance with the License.
292+# You may obtain a copy of the License at
293+#
294+# http://www.apache.org/licenses/LICENSE-2.0
295+#
296+# Unless required by applicable law or agreed to in writing, software
297+# distributed under the License is distributed on an "AS IS" BASIS,
298+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
299+# implied.
300+# See the License for the specific language governing permissions and
301+# limitations under the License.
302+
303+import webob
304+from urllib import quote, unquote
305+from json import loads as json_loads
306+
307+from swift.common.compressed_file_reader import CompressedFileReader
308+from swift.proxy.server import BaseApplication
309+
310+
311+class InternalProxy(object):
312+ """
313+ Set up a private instance of a proxy server that allows normal requests
314+ to be made without having to actually send the request to the proxy.
315+ This also doesn't log the requests to the normal proxy logs.
316+
317+ :param proxy_server_conf: proxy server configuration dictionary
318+ :param logger: logger to log requests to
319+ :param retries: number of times to retry each request
320+ """
321+ def __init__(self, proxy_server_conf=None, logger=None, retries=0):
322+ self.upload_app = BaseApplication(proxy_server_conf, logger)
323+ self.retries = retries
324+
325+ def upload_file(self, source_file, account, container, object_name,
326+ compress=True, content_type='application/x-gzip'):
327+ """
328+ Upload a file to cloud files.
329+
330+ :param source_file: path to or file like object to upload
331+ :param account: account to upload to
332+ :param container: container to upload to
333+ :param object_name: name of object being uploaded
334+ :param compress: if True, compresses object as it is uploaded
335+ :param content_type: content-type of object
336+ :returns: True if successful, False otherwise
337+ """
338+ log_create_pattern = '/v1/%s/%s/%s' % (account, container, object_name)
339+
340+ # create the container
341+ if not self.put_container(account, container):
342+ return False
343+
344+ # upload the file to the account
345+ req = webob.Request.blank(log_create_pattern,
346+ environ={'REQUEST_METHOD': 'PUT'},
347+ headers={'Transfer-Encoding': 'chunked'})
348+ if compress:
349+ if hasattr(source_file, 'read'):
350+ compressed_file = CompressedFileReader(source_file)
351+ else:
352+ compressed_file = CompressedFileReader(open(source_file, 'rb'))
353+ req.body_file = compressed_file
354+ else:
355+ if not hasattr(source_file, 'read'):
356+ source_file = open(source_file, 'rb')
357+ req.body_file = source_file
358+ req.account = account
359+ req.content_type = content_type
360+ req.content_length = None # to make sure we send chunked data
361+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
362+ tries = 1
363+ while (resp.status_int < 200 or resp.status_int > 299) \
364+ and tries <= self.retries:
365+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
366+ tries += 1
367+ if not (200 <= resp.status_int < 300):
368+ return False
369+ return True
370+
371+ def get_object(self, account, container, object_name):
372+ """
373+ Get object.
374+
375+ :param account: account name object is in
376+ :param container: container name object is in
377+ :param object_name: name of object to get
378+ :returns: iterator for object data
379+ """
380+ req = webob.Request.blank('/v1/%s/%s/%s' %
381+ (account, container, object_name),
382+ environ={'REQUEST_METHOD': 'GET'})
383+ req.account = account
384+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
385+ tries = 1
386+ while (resp.status_int < 200 or resp.status_int > 299) \
387+ and tries <= self.retries:
388+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
389+ tries += 1
390+ for x in resp.app_iter:
391+ yield x
392+
393+ def create_container(self, account, container):
394+ """
395+ Create container.
396+
397+ :param account: account name to put the container in
398+ :param container: container name to create
399+ :returns: True if successful, otherwise False
400+ """
401+ req = webob.Request.blank('/v1/%s/%s' % (account, container),
402+ environ={'REQUEST_METHOD': 'PUT'})
403+ req.account = account
404+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
405+ tries = 1
406+ while (resp.status_int < 200 or resp.status_int > 299) \
407+ and tries <= self.retries:
408+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
409+ tries += 1
410+ return 200 <= resp.status_int < 300
411+
412+ def get_container_list(self, account, container, marker=None, limit=None,
413+ prefix=None, delimiter=None, full_listing=True):
414+ """
415+ Get container listing.
416+
417+ :param account: account name for the container
418+ :param container: container name to get the listing of
419+ :param marker: marker query
420+ :param limit: limit to query
421+ :param prefix: prefix query
422+ :param delimeter: delimeter for query
423+ :param full_listing: if True, make enough requests to get all listings
424+ :returns: list of objects
425+ """
426+ if full_listing:
427+ rv = []
428+ listing = self.get_container_list(account, container, marker,
429+ limit, prefix, delimiter, full_listing=False)
430+ while listing:
431+ rv.extend(listing)
432+ if not delimiter:
433+ marker = listing[-1]['name']
434+ else:
435+ marker = listing[-1].get('name', listing[-1].get('subdir'))
436+ listing = self.get_container_list(account, container, marker,
437+ limit, prefix, delimiter, full_listing=False)
438+ return rv
439+ path = '/v1/%s/%s' % (account, container)
440+ qs = 'format=json'
441+ if marker:
442+ qs += '&marker=%s' % quote(marker)
443+ if limit:
444+ qs += '&limit=%d' % limit
445+ if prefix:
446+ qs += '&prefix=%s' % quote(prefix)
447+ if delimiter:
448+ qs += '&delimiter=%s' % quote(delimiter)
449+ path += '?%s' % qs
450+ req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'GET'})
451+ req.account = account
452+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
453+ tries = 1
454+ while (resp.status_int < 200 or resp.status_int > 299) \
455+ and tries <= self.retries:
456+ resp = self.upload_app.handle_request(self.upload_app.update_request(req))
457+ tries += 1
458+ if resp.status_int == 204:
459+ return []
460+ if 200 <= resp.status_int < 300:
461+ return json_loads(resp.body)
462
463=== added directory 'swift/stats'
464=== added file 'swift/stats/__init__.py'
465=== added file 'swift/stats/account_stats.py'
466--- swift/stats/account_stats.py 1970-01-01 00:00:00 +0000
467+++ swift/stats/account_stats.py 2010-08-06 04:11:39 +0000
468@@ -0,0 +1,69 @@
469+# Copyright (c) 2010 OpenStack, LLC.
470+#
471+# Licensed under the Apache License, Version 2.0 (the "License");
472+# you may not use this file except in compliance with the License.
473+# You may obtain a copy of the License at
474+#
475+# http://www.apache.org/licenses/LICENSE-2.0
476+#
477+# Unless required by applicable law or agreed to in writing, software
478+# distributed under the License is distributed on an "AS IS" BASIS,
479+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
480+# implied.
481+# See the License for the specific language governing permissions and
482+# limitations under the License.
483+
484+import os
485+import time
486+
487+from swift.account.server import DATADIR as account_server_data_dir
488+from swift.common.db import AccountBroker
489+from swift.common.internal_proxy import InternalProxy
490+from swift.common.utils import renamer
491+
492+class AccountStat(object):
493+ def __init__(self, filename_format, target_dir, server_conf, logger):
494+ self.filename_format = filename_format
495+ self.target_dir = target_dir
496+ self.devices = server_conf.get('devices', '/srv/node')
497+ self.mount_check = server_conf.get('mount_check', 'true').lower() in \
498+ ('true', 't', '1', 'on', 'yes', 'y')
499+ self.logger = logger
500+
501+ def find_and_process(self):
502+ src_filename = time.strftime(self.filename_format)
503+ tmp_filename = os.path.join('/tmp', src_filename)
504+ with open(tmp_filename, 'wb') as statfile:
505+ #statfile.write('Account Name, Container Count, Object Count, Bytes Used, Created At\n')
506+ for device in os.listdir(self.devices):
507+ if self.mount_check and \
508+ not os.path.ismount(os.path.join(self.devices, device)):
509+ self.logger.error("Device %s is not mounted, skipping." %
510+ device)
511+ continue
512+ accounts = os.path.join(self.devices,
513+ device,
514+ account_server_data_dir)
515+ if not os.path.exists(accounts):
516+ self.logger.debug("Path %s does not exist, skipping." %
517+ accounts)
518+ continue
519+ for root, dirs, files in os.walk(accounts, topdown=False):
520+ for filename in files:
521+ if filename.endswith('.db'):
522+ broker = AccountBroker(os.path.join(root, filename))
523+ if not broker.is_deleted():
524+ account_name,
525+ created_at,
526+ _, _,
527+ container_count,
528+ object_count,
529+ bytes_used,
530+ _, _ = broker.get_info()
531+ line_data = '"%s",%d,%d,%d,%s\n' % (account_name,
532+ container_count,
533+ object_count,
534+ bytes_used,
535+ created_at)
536+ statfile.write(line_data)
537+ renamer(tmp_filename, os.path.join(self.target_dir, src_filename))
538
539=== added file 'swift/stats/log_uploader.py'
540--- swift/stats/log_uploader.py 1970-01-01 00:00:00 +0000
541+++ swift/stats/log_uploader.py 2010-08-06 04:11:39 +0000
542@@ -0,0 +1,135 @@
543+# Copyright (c) 2010 OpenStack, LLC.
544+#
545+# Licensed under the Apache License, Version 2.0 (the "License");
546+# you may not use this file except in compliance with the License.
547+# You may obtain a copy of the License at
548+#
549+# http://www.apache.org/licenses/LICENSE-2.0
550+#
551+# Unless required by applicable law or agreed to in writing, software
552+# distributed under the License is distributed on an "AS IS" BASIS,
553+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
554+# implied.
555+# See the License for the specific language governing permissions and
556+# limitations under the License.
557+
558+from __future__ import with_statement
559+import os
560+import hashlib
561+import time
562+import gzip
563+import glob
564+
565+from swift.common.internal_proxy import InternalProxy
566+
567+class LogUploader(object):
568+ '''
569+ Given a local directory, a swift account, and a container name, LogParser
570+ will upload all files in the local directory to the given account/container.
571+ All but the newest files will be uploaded, and the files' md5 sum will be
572+ computed. The hash is used to prevent duplicate data from being uploaded
573+ multiple times in different files (ex: log lines). Since the hash is
574+ computed, it is also used as the uploaded object's etag to ensure data
575+ integrity.
576+
577+ Note that after the file is successfully uploaded, it will be unlinked.
578+
579+ The given proxy server config is used to instantiate a proxy server for
580+ the object uploads.
581+ '''
582+
583+ def __init__(self, log_dir, swift_account, container_name, filename_format,
584+ proxy_server_conf, logger):
585+ if not log_dir.endswith('/'):
586+ log_dir = log_dir + '/'
587+ self.log_dir = log_dir
588+ self.swift_account = swift_account
589+ self.container_name = container_name
590+ self.filename_format = filename_format
591+ self.internal_proxy = InternalProxy(proxy_server_conf, logger)
592+ self.logger = logger
593+
594+ def upload_all_logs(self):
595+ i = [(c,self.filename_format.index(c)) for c in '%Y %m %d %H'.split()]
596+ i.sort()
597+ year_offset = month_offset = day_offset = hour_offset = None
598+ for c, start in i:
599+ if c == '%Y':
600+ year_offset = start, start+4
601+ elif c == '%m':
602+ month_offset = start, start+2
603+ elif c == '%d':
604+ day_offset = start, start+2
605+ elif c == '%H':
606+ hour_offset = start, start+2
607+ if not (year_offset and month_offset and day_offset and hour_offset):
608+ # don't have all the parts, can't upload anything
609+ return
610+ glob_pattern = self.filename_format
611+ glob_pattern = glob_pattern.replace('%Y', '????')
612+ glob_pattern = glob_pattern.replace('%m', '??')
613+ glob_pattern = glob_pattern.replace('%d', '??')
614+ glob_pattern = glob_pattern.replace('%H', '??')
615+ filelist = glob.iglob(os.path.join(self.log_dir, glob_pattern))
616+ current_hour = int(time.strftime('%H'))
617+ today = int(time.strftime('%Y%m%d'))
618+ self.internal_proxy.create_container(self.swift_account,
619+ self.container_name)
620+ for filename in filelist:
621+ try:
622+ # From the filename, we need to derive the year, month, day,
623+ # and hour for the file. These values are used in the uploaded
624+ # object's name, so they should be a reasonably accurate
625+ # representation of the time for which the data in the file was
626+ # collected. The file's last modified time is not a reliable
627+ # representation of the data in the file. For example, an old
628+ # log file (from hour A) may be uploaded or moved into the
629+ # log_dir in hour Z. The file's modified time will be for hour
630+ # Z, and therefore the object's name in the system will not
631+ # represent the data in it.
632+ # If the filename doesn't match the format, it shouldn't be
633+ # uploaded.
634+ year = filename[slice(*year_offset)]
635+ month = filename[slice(*month_offset)]
636+ day = filename[slice(*day_offset)]
637+ hour = filename[slice(*hour_offset)]
638+ except IndexError:
639+ # unexpected filename format, move on
640+ self.logger.error("Unexpected log: %s" % filename)
641+ continue
642+ if (time.time() - os.stat(filename).st_mtime) < 7200:
643+ # don't process very new logs
644+ self.logger.debug("Skipping log: %s (< 2 hours old)" % filename)
645+ continue
646+ self.upload_one_log(filename, year, month, day, hour)
647+
648+ def upload_one_log(self, filename, year, month, day, hour):
649+ if os.path.getsize(filename) == 0:
650+ self.logger.debug("Log %s is 0 length, skipping" % filename)
651+ return
652+ self.logger.debug("Processing log: %s" % filename)
653+ filehash = hashlib.md5()
654+ already_compressed = True if filename.endswith('.gz') else False
655+ opener = gzip.open if already_compressed else open
656+ f = opener(filename, 'rb')
657+ try:
658+ for line in f:
659+ # filter out bad lines here?
660+ filehash.update(line)
661+ finally:
662+ f.close()
663+ filehash = filehash.hexdigest()
664+ # By adding a hash to the filename, we ensure that uploaded files
665+ # have unique filenames and protect against uploading one file
666+ # more than one time. By using md5, we get an etag for free.
667+ target_filename = '/'.join([year, month, day, hour, filehash+'.gz'])
668+ if self.internal_proxy.upload_file(filename,
669+ self.swift_account,
670+ self.container_name,
671+ target_filename,
672+ compress=(not already_compressed)):
673+ self.logger.debug("Uploaded log %s to %s" %
674+ (filename, target_filename))
675+ os.unlink(filename)
676+ else:
677+ self.logger.error("ERROR: Upload of log %s failed!" % filename)