Merge lp:~eday/burrow/backend-api-cleanup into lp:burrow

Proposed by Eric Day
Status: Merged
Approved by: Eric Day
Approved revision: 11
Merged at revision: 4
Proposed branch: lp:~eday/burrow/backend-api-cleanup
Merge into: lp:burrow
Diff against target: 1944 lines (+980/-436)
17 files modified
ChangeLog (+40/-0)
bin/burrow (+154/-0)
bin/burrowd (+4/-4)
burrow/__init__.py (+6/-77)
burrow/backend/__init__.py (+52/-50)
burrow/backend/http.py (+103/-0)
burrow/backend/memory.py (+217/-99)
burrow/backend/sqlite.py (+160/-123)
burrow/client.py (+63/-0)
burrow/config.py (+1/-1)
burrow/frontend/__init__.py (+6/-6)
burrow/frontend/wsgi.py (+52/-52)
burrow/server.py (+95/-0)
etc/burrowd.conf (+9/-9)
locale/burrow.pot (+8/-8)
setup.py (+4/-1)
test/frontend/test_wsgi.py (+6/-6)
To merge this branch: bzr merge lp:~eday/burrow/backend-api-cleanup
Reviewer Review Type Date Requested Status
Burrow Core Team Pending
Review via email: mp+56510@code.launchpad.net

Description of the change

A number of cleanup tasks and client support.

To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'ChangeLog'
2--- ChangeLog 2011-03-18 00:47:26 +0000
3+++ ChangeLog 2011-04-06 05:49:23 +0000
4@@ -1,3 +1,43 @@
5+2011-03-30 Eric Day <eday@oddments.org>
6+
7+ Merged burrow and burrowd packages, the interfaces for backend/client can be shared, and this will also allow modular in-process queues.
8+
9+2011-03-18 Eric Day <eday@oddments.org>
10+
11+ Started SQLite backend cleanup.
12+
13+2011-03-18 Eric Day <eday@oddments.org>
14+
15+ More backend API cleanup.
16+
17+2011-03-18 Eric Day <eday@oddments.org>
18+
19+ Further backend API cleanup, started cleanup in memory backend.
20+
21+2011-03-17 Eric Day <eday@oddments.org>
22+
23+ Removed the need to 'queue_exists' decorator and backend methods.
24+
25+2011-03-17 Eric Day <eday@oddments.org>
26+
27+ Switched to pass attributes as a dict instead of positional arguments.
28+
29+2011-03-17 Eric Day <eday@oddments.org>
30+
31+ Switched to pass filters as a dict instead of positional arguments.
32+
33+2011-03-18 Eric Day <eday@oddments.org>
34+
35+ Started locale support.
36+
37+2011-03-17 Eric Day <eday@oddments.org>
38+
39+ Started locale support.
40+
41+2011-03-17 Eric Day <eday@oddments.org>
42+
43+ Python prototype conversion to get new trunk started.
44+
45 2011-03-17 Eric Day <eday@oddments.org>
46
47 First chunk of code from prototype. Beyond the prototype, configuration, module loading, and log handling was added.
48
49=== modified file 'bin/burrow'
50--- bin/burrow 2011-03-17 23:42:41 +0000
51+++ bin/burrow 2011-04-06 05:49:23 +0000
52@@ -16,3 +16,157 @@
53 '''
54 Burrow command line client.
55 '''
56+
57+import pwd
58+import optparse
59+import os
60+import sys
61+
62+# If ../burrow/__init__.py exists, add ../ to the Python search path so
63+# that it will override whatever may be installed in the default Python
64+# search path.
65+BASE_DIRECTORY = os.path.join(os.path.abspath(__file__), os.pardir, os.pardir)
66+BASE_DIRECTORY = os.path.normpath(BASE_DIRECTORY)
67+if os.path.exists(os.path.join(BASE_DIRECTORY, 'burrow', '__init__.py')):
68+ sys.path.insert(0, BASE_DIRECTORY)
69+
70+import burrow
71+
72+
73+class Burrow(object):
74+
75+ sections = [
76+ dict(name='Global',
77+ filters=True,
78+ args=[],
79+ commands=['delete_accounts', 'get_accounts']),
80+ dict(name='Account',
81+ account=True,
82+ filters=True,
83+ args=[],
84+ commands=['delete_queues', 'get_queues']),
85+ dict(name='Queue',
86+ account=True,
87+ filters=True,
88+ args=['queue'],
89+ commands=['delete_messages', 'get_messages', 'update_messages']),
90+ dict(name='Message',
91+ account=True,
92+ args=['queue', 'message'],
93+ commands=[
94+ 'create_message',
95+ 'delete_message',
96+ 'get_message',
97+ 'update_message'])]
98+
99+ attribute_commands = [
100+ 'update_messages',
101+ 'create_message',
102+ 'update_message']
103+
104+ stdin_commands = ['create_message']
105+
106+ def __init__(self):
107+ self.parser = optparse.OptionParser(version=burrow.__version__)
108+ self.parser.add_option('-c', '--commands', action='store_true',
109+ help=_('Print help for the available commands'))
110+ self.parser.add_option('-u', '--url', default='http://localhost:8080',
111+ help=_('Backend URL to use'))
112+ rcfile = os.path.expanduser('~')
113+ rcfile = os.path.join(rcfile, '.burrowrc')
114+ if not os.path.exists(rcfile):
115+ rcfile = None
116+ self.parser.add_option('-f', '--files', default=rcfile,
117+ help=_('Configuration file(s) to use (comma separated)'))
118+ user = pwd.getpwuid(os.getuid())[0]
119+ self.parser.add_option('-a', '--account', default=user,
120+ help=_('Account to use for queue and message commands'))
121+ self.parser.add_option('-w', '--wait',
122+ help=_('Number of seconds to wait if no messages match'))
123+
124+ attributes = optparse.OptionGroup(self.parser,
125+ _('Messages attribute options'))
126+ attributes.add_option('-t', '--ttl',
127+ help=_('TTL attribute in seconds to set for message(s)'))
128+ attributes.add_option('-H', '--hide',
129+ help=_('Hidden time attribute in seconds to set for message(s)'))
130+ self.parser.add_option_group(attributes)
131+
132+ filters = optparse.OptionGroup(self.parser, _('Filtering options'))
133+ filters.add_option('-l', '--limit',
134+ help=_('Limit the number of messages to match'))
135+ filters.add_option('-m', '--marker',
136+ help=_('Only match messages that were inserted after this id'))
137+ filters.add_option('-A', '--all', action='store_true',
138+ help=_('Match all messages, including those that are hidden'))
139+ choices = ['none', 'id', 'attributes', 'all']
140+ filters.add_option('-d', '--detail', type='choice', choices=choices,
141+ help=_('What message information to return. Options are: %s') %
142+ ', '.join(choices))
143+ self.parser.add_option_group(filters)
144+
145+ def run(self):
146+ (self.options, args) = self.parser.parse_args()
147+ if self.options.commands:
148+ self.print_help()
149+ if len(args) == 0:
150+ self.print_help(_('No command given'))
151+ if self.options.files is None:
152+ files = []
153+ else:
154+ files = self.options.files.split(', ')
155+ self.client = burrow.Client(url=self.options.url, config_files=files)
156+ for section in self.sections:
157+ if args[0] in section['commands']:
158+ self.run_command(section, args[0], args[1:])
159+ self.print_help(_('Command not found'))
160+
161+ def run_command(self, section, command, args):
162+ if len(args) != len(section['args']):
163+ self.print_help(_('Wrong number of arguments'))
164+ if section.get('account', None):
165+ args.insert(0, self.options.account)
166+ if command in self.stdin_commands:
167+ args.append(sys.stdin.read())
168+ if command in self.attribute_commands:
169+ attributes = {}
170+ if self.options.ttl is not None:
171+ attributes['ttl'] = self.options.ttl
172+ if self.options.hide is not None:
173+ attributes['hide'] = self.options.hide
174+ args.append(attributes)
175+ if section.get('filters', None):
176+ filters = {}
177+ if self.options.limit is not None:
178+ filters['limit'] = self.options.limit
179+ if self.options.marker is not None:
180+ filters['marker'] = self.options.marker
181+ if self.options.all is not None:
182+ filters['match_hidden'] = self.options.all
183+ args.append(filters)
184+ getattr(self.client.backend, command)(*args)
185+ sys.exit(0)
186+
187+ def print_help(self, message=None):
188+ if message:
189+ print message
190+ print
191+ self.parser.print_help()
192+ print
193+ for section in self.sections:
194+ print '%s commands:' % section['name']
195+ for command in section['commands']:
196+ help = ''
197+ if section.get('filters', None):
198+ help += ' [filters]'
199+ if command in self.attribute_commands:
200+ help += ' [attributes]'
201+ for arg in section['args']:
202+ help += ' <%s>' % arg
203+ print ' %s%s' % (command, help)
204+ print
205+ sys.exit(1)
206+
207+
208+if __name__ == '__main__':
209+ Burrow().run()
210
211=== modified file 'bin/burrowd'
212--- bin/burrowd 2011-03-17 23:42:41 +0000
213+++ bin/burrowd 2011-04-06 05:49:23 +0000
214@@ -20,15 +20,15 @@
215 import os
216 import sys
217
218-# If ../burrowd/__init__.py exists, add ../ to the Python search path so
219+# If ../burrow/__init__.py exists, add ../ to the Python search path so
220 # that it will override whatever may be installed in the default Python
221 # search path.
222 BASE_DIRECTORY = os.path.join(os.path.abspath(__file__), os.pardir, os.pardir)
223 BASE_DIRECTORY = os.path.normpath(BASE_DIRECTORY)
224-if os.path.exists(os.path.join(BASE_DIRECTORY, 'burrowd', '__init__.py')):
225+if os.path.exists(os.path.join(BASE_DIRECTORY, 'burrow', '__init__.py')):
226 sys.path.insert(0, BASE_DIRECTORY)
227
228-import burrowd
229+import burrow
230
231 if __name__ == '__main__':
232- burrowd.Burrowd(sys.argv[1:]).run()
233+ burrow.Server(sys.argv[1:]).run()
234
235=== removed directory 'burrow'
236=== renamed directory 'burrowd' => 'burrow'
237=== removed file 'burrow/__init__.py'
238--- burrow/__init__.py 2011-03-17 23:42:41 +0000
239+++ burrow/__init__.py 1970-01-01 00:00:00 +0000
240@@ -1,19 +0,0 @@
241-# Copyright (C) 2011 OpenStack LLC.
242-#
243-# Licensed under the Apache License, Version 2.0 (the "License");
244-# you may not use this file except in compliance with the License.
245-# You may obtain a copy of the License at
246-#
247-# http://www.apache.org/licenses/LICENSE-2.0
248-#
249-# Unless required by applicable law or agreed to in writing, software
250-# distributed under the License is distributed on an "AS IS" BASIS,
251-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
252-# See the License for the specific language governing permissions and
253-# limitations under the License.
254-
255-'''Main client module for burrow.'''
256-
257-
258-class Burrow(object):
259- pass
260
261=== modified file 'burrow/__init__.py'
262--- burrowd/__init__.py 2011-03-18 00:47:26 +0000
263+++ burrow/__init__.py 2011-04-06 05:49:23 +0000
264@@ -12,99 +12,28 @@
265 # See the License for the specific language governing permissions and
266 # limitations under the License.
267
268-'''Main server module for burrow.'''
269+'''Main module for burrow.'''
270
271-import ConfigParser
272 import gettext
273 import logging
274-import logging.config
275 import sys
276
277-import eventlet
278+from burrow.client import Client
279+from burrow.server import Server
280+import burrow.config
281
282-import burrowd.config
283+__version__ = '0.1'
284
285 # This installs the _(...) function as a built-in so all other modules
286 # don't need to.
287 gettext.install('burrow')
288
289-# Default configuration values for this module.
290-DEFAULT_BACKEND = 'burrowd.backend.sqlite'
291-DEFAULT_FRONTENDS = 'burrowd.frontend.wsgi'
292-DEFAULT_THREAD_POOL_SIZE = 1000
293-
294-
295-class Burrowd(object):
296- '''Server class for burrow.'''
297-
298- def __init__(self, config_files=[], add_default_log_handler=True):
299- '''Initialize a server using the config files from the given
300- list. This is passed directly to ConfigParser.read(), so
301- files should be in ConfigParser format. This will load all
302- frontend and backend classes from the configuration.'''
303- if len(config_files) > 0:
304- logging.config.fileConfig(config_files)
305- self._config = ConfigParser.ConfigParser()
306- self._config.read(config_files)
307- self.config = burrowd.config.Config(self._config, 'burrowd')
308- self.log = get_logger(self.config)
309- if add_default_log_handler:
310- self._add_default_log_handler()
311- self._import_backend()
312- self._import_frontends()
313-
314- def _add_default_log_handler(self):
315- '''Add a default log handler it one has not been set.'''
316- root_log = logging.getLogger()
317- if len(root_log.handlers) > 0 or len(self.log.handlers) > 0:
318- return
319- handler = logging.StreamHandler()
320- handler.setLevel(logging.DEBUG)
321- log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
322- handler.setFormatter(logging.Formatter(log_format))
323- root_log.addHandler(handler)
324-
325- def _import_backend(self):
326- '''Load backend given in the 'backend' option.'''
327- backend = self.config.get('backend', DEFAULT_BACKEND)
328- config = (self._config, backend)
329- self.backend = import_class(backend, 'Backend')(config)
330-
331- def _import_frontends(self):
332- '''Load frontends given in the 'frontends' option.'''
333- self.frontends = []
334- frontends = self.config.get('frontends', DEFAULT_FRONTENDS)
335- for frontend in frontends.split(','):
336- frontend = frontend.split(':')
337- if len(frontend) == 1:
338- frontend.append(None)
339- config = (self._config, frontend[0], frontend[1])
340- frontend = import_class(frontend[0], 'Frontend')
341- frontend = frontend(config, self.backend)
342- self.frontends.append(frontend)
343-
344- def run(self):
345- '''Create the thread pool and start the main server loop. Wait
346- for the pool to complete, but possibly run forever if the
347- frontends and backend never remove threads.'''
348- thread_pool_size = self.config.getint('thread_pool_size',
349- DEFAULT_THREAD_POOL_SIZE)
350- thread_pool = eventlet.GreenPool(size=int(thread_pool_size))
351- self.backend.run(thread_pool)
352- for frontend in self.frontends:
353- frontend.run(thread_pool)
354- self.log.info(_('Waiting for all threads to exit'))
355- try:
356- thread_pool.waitall()
357- except KeyboardInterrupt:
358- pass
359-
360
361 class Module(object):
362 '''Common module class for burrow.'''
363
364 def __init__(self, config):
365- self.config = burrowd.config.Config(*config)
366+ self.config = burrow.config.Config(*config)
367 self.log = get_logger(self.config)
368 self.log.debug(_('Module created'))
369
370
371=== modified file 'burrow/backend/__init__.py'
372--- burrowd/backend/__init__.py 2011-03-17 23:42:41 +0000
373+++ burrow/backend/__init__.py 2011-04-06 05:49:23 +0000
374@@ -12,14 +12,14 @@
375 # See the License for the specific language governing permissions and
376 # limitations under the License.
377
378-'''Backends for the burrow server.'''
379+'''Backends for burrow.'''
380
381 import eventlet
382
383-import burrowd
384-
385-
386-class Backend(burrowd.Module):
387+import burrow
388+
389+
390+class Backend(burrow.Module):
391 '''Interface that backend implementations must provide.'''
392
393 def __init__(self, config):
394@@ -27,62 +27,52 @@
395 self.queues = {}
396
397 def run(self, thread_pool):
398+ '''Run the backend. This should start any periodic tasks in
399+ separate threads and should never block.'''
400 thread_pool.spawn_n(self._clean)
401
402- def _clean(self):
403- while True:
404- self.clean()
405- eventlet.sleep(1)
406-
407- def delete_accounts(self):
408- pass
409-
410- def get_accounts(self):
411- return []
412-
413- def delete_account(self, account):
414- pass
415-
416- def get_queues(self, account):
417- return []
418-
419- def queue_exists(self, account, queue):
420- return False
421-
422- def delete_messages(self, account, queue, limit, marker, match_hidden):
423- return []
424-
425- def get_messages(self, account, queue, limit, marker, match_hidden):
426- return []
427-
428- def update_messages(self, account, queue, limit, marker, match_hidden, ttl,
429- hide):
430- return []
431-
432- def delete_message(self, account, queue, message_id):
433- return None
434-
435- def get_message(self, account, queue, message_id):
436- return None
437-
438- def put_message(self, account, queue, message_id, ttl, hide, body):
439+ def delete_accounts(self, filters={}):
440+ pass
441+
442+ def get_accounts(self, filters={}):
443+ return []
444+
445+ def delete_queues(self, account, filters={}):
446+ pass
447+
448+ def get_queues(self, account, filters={}):
449+ return []
450+
451+ def delete_messages(self, account, queue, filters={}):
452+ return []
453+
454+ def get_messages(self, account, queue, filters={}):
455+ return []
456+
457+ def update_messages(self, account, queue, attributes={}, filters={}):
458+ return []
459+
460+ def create_message(self, account, queue, message, body, attributes={}):
461 return True
462
463- def update_message(self, account, queue, message_id, ttl, hide):
464- return None
465-
466- def clean(self):
467- '''This method should remove all messages with an expired
468- TTL and make hidden messages that have an expired hide time
469- visible again.'''
470- pass
471+ def delete_message(self, account, queue, message):
472+ return None
473+
474+ def get_message(self, account, queue, message):
475+ return None
476+
477+ def update_message(self, account, queue, message, attributes={}):
478+ return None
479
480 def notify(self, account, queue):
481+ '''Notify any waiting callers that the account/queue has
482+ a visible message.'''
483 queue = '%s/%s' % (account, queue)
484 if queue in self.queues:
485 self.queues[queue].put(0)
486
487 def wait(self, account, queue, seconds):
488+ '''Wait for a message to appear in the account/queue.'''
489 queue = '%s/%s' % (account, queue)
490 if queue not in self.queues:
491 self.queues[queue] = eventlet.Queue()
492@@ -92,3 +82,15 @@
493 pass
494 if self.queues[queue].getting() == 0:
495 del self.queues[queue]
496+
497+ def clean(self):
498+ '''This method should remove all messages with an expired
499+ TTL and make hidden messages that have an expired hide time
500+ visible again.'''
501+ pass
502+
503+ def _clean(self):
504+ '''Thread to run the clean method periodically.'''
505+ while True:
506+ self.clean()
507+ eventlet.sleep(1)
508
509=== added file 'burrow/backend/http.py'
510--- burrow/backend/http.py 1970-01-01 00:00:00 +0000
511+++ burrow/backend/http.py 2011-04-06 05:49:23 +0000
512@@ -0,0 +1,103 @@
513+# Copyright (C) 2011 OpenStack LLC.
514+#
515+# Licensed under the Apache License, Version 2.0 (the "License");
516+# you may not use this file except in compliance with the License.
517+# You may obtain a copy of the License at
518+#
519+# http://www.apache.org/licenses/LICENSE-2.0
520+#
521+# Unless required by applicable law or agreed to in writing, software
522+# distributed under the License is distributed on an "AS IS" BASIS,
523+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
524+# See the License for the specific language governing permissions and
525+# limitations under the License.
526+
527+'''HTTP backend for burrow using httplib.'''
528+
529+import httplib
530+
531+import burrow.backend
532+
533+
534+class Backend(burrow.backend.Backend):
535+ '''This backend forwards all requests via HTTP using the httplib
536+ module. It is used for clients and proxies.'''
537+
538+ def __init__(self, config):
539+ super(Backend, self).__init__(config)
540+ self.server = ('localhost', 8080)
541+
542+ def delete_accounts(self, filters={}):
543+ url = self._add_parameters('/', filters=filters)
544+ self._request('DELETE', url)
545+
546+ def get_accounts(self, filters={}):
547+ url = self._add_parameters('/', filters=filters)
548+ self._request('GET', url)
549+
550+ def delete_queues(self, account, filters={}):
551+ url = self._add_parameters('/%s' % account, filters=filters)
552+ self._request('DELETE', url)
553+
554+ def get_queues(self, account, filters={}):
555+ url = self._add_parameters('/%s' % account, filters=filters)
556+ self._request('GET', url)
557+
558+ def delete_messages(self, account, queue, filters={}):
559+ url = '/%s/%s' % (account, queue)
560+ url = self._add_parameters(url, filters=filters)
561+ self._request('DELETE', url)
562+
563+ def get_messages(self, account, queue, filters={}):
564+ url = '/%s/%s' % (account, queue)
565+ url = self._add_parameters(url, filters=filters)
566+ self._request('GET', url)
567+
568+ def update_messages(self, account, queue, attributes={}, filters={}):
569+ url = '/%s/%s' % (account, queue)
570+ url = self._add_parameters(url, attributes, filters)
571+ self._request('POST', url)
572+
573+ def create_message(self, account, queue, message, body, attributes={}):
574+ url = '/%s/%s/%s' % (account, queue, message)
575+ url = self._add_parameters(url, attributes)
576+ self._request('PUT', url, body=body)
577+
578+ def delete_message(self, account, queue, message):
579+ url = '/%s/%s/%s' % (account, queue, message)
580+ self._request('DELETE', url)
581+
582+ def get_message(self, account, queue, message):
583+ url = '/%s/%s/%s' % (account, queue, message)
584+ self._request('GET', url)
585+
586+ def update_message(self, account, queue, message, attributes={}):
587+ url = '/%s/%s/%s' % (account, queue, message)
588+ url = self._add_parameters(url, attributes)
589+ self._request('POST', url)
590+
591+ def clean(self):
592+ pass
593+
594+ def _add_parameters(self, url, attributes={}, filters={}):
595+ separator = '?'
596+ for attribute in ['ttl', 'hide']:
597+ value = attributes.get(attribute, None)
598+ if value is not None:
599+ url += '%s%s=%s' % (separator, attribute, value)
600+ separator = '&'
601+ for filter in ['marker', 'limit', 'match_hidden', 'detail']:
602+ value = filters.get(filter, None)
603+ if value is not None:
604+ url += '%s%s=%s' % (separator, filter, value)
605+ separator = '&'
606+ return url
607+
608+ def _request(self, *args, **kwargs):
609+ connection = httplib.HTTPConnection(*self.server)
610+ connection.request(*args, **kwargs)
611+ response = connection.getresponse()
612+ if response.status == 200:
613+ print response.read()
614+ if response.status >= 400:
615+ print response.reason
616
617=== modified file 'burrow/backend/memory.py'
618--- burrowd/backend/memory.py 2011-03-17 23:42:41 +0000
619+++ burrow/backend/memory.py 2011-04-06 05:49:23 +0000
620@@ -12,115 +12,117 @@
621 # See the License for the specific language governing permissions and
622 # limitations under the License.
623
624-'''Memory backend for the burrow server.'''
625+'''Memory backend for burrow.'''
626
627 import time
628
629-import burrowd.backend
630-
631-
632-class Backend(burrowd.backend.Backend):
633+import burrow.backend
634+
635+
636+class Backend(burrow.backend.Backend):
637+ '''This backend stores all data using native Python data
638+ structures. It uses a linked list of tuples to store data
639+ (accounts, queues, and messages) with a dict as a secondary index
640+ into this list. This is required so we can have O(1) appends,
641+ deletes, and lookups by name, along with easy traversal starting
642+ anywhere in the list.'''
643
644 def __init__(self, config):
645 super(Backend, self).__init__(config)
646- self.accounts = {}
647-
648- def delete_accounts(self):
649- self.accounts.clear()
650-
651- def get_accounts(self):
652- return self.accounts.keys()
653-
654- def delete_account(self, account):
655- del self.accounts[account]
656-
657- def get_queues(self, account):
658- if account not in self.accounts:
659- return []
660- return self.accounts[account].keys()
661-
662- def queue_exists(self, account, queue):
663- return account in self.accounts and queue in self.accounts[account]
664-
665- def delete_messages(self, account, queue, limit, marker, match_hidden):
666- messages = self._scan_queue(account, queue, limit, marker,
667- match_hidden, delete=True)
668- if len(self.accounts[account][queue]) == 0:
669- del self.accounts[account][queue]
670- if len(self.accounts[account]) == 0:
671- del self.accounts[account]
672- return messages
673-
674- def get_messages(self, account, queue, limit, marker, match_hidden):
675- return self._scan_queue(account, queue, limit, marker, match_hidden)
676-
677- def update_messages(self, account, queue, limit, marker, match_hidden, ttl,
678- hide):
679- return self._scan_queue(account, queue, limit, marker, match_hidden,
680- ttl=ttl, hide=hide)
681-
682- def delete_message(self, account, queue, message_id):
683- for index in range(0, len(self.accounts[account][queue])):
684- message = self.accounts[account][queue][index]
685- if message['id'] == message_id:
686- del self.accounts[account][queue][index]
687- if len(self.accounts[account][queue]) == 0:
688- del self.accounts[account][queue]
689- if len(self.accounts[account]) == 0:
690- del self.accounts[account]
691- return message
692- return None
693-
694- def get_message(self, account, queue, message_id):
695- for index in range(0, len(self.accounts[account][queue])):
696- message = self.accounts[account][queue][index]
697- if message['id'] == message_id:
698- return message
699- return None
700-
701- def put_message(self, account, queue, message_id, ttl, hide, body):
702- if account not in self.accounts:
703- self.accounts[account] = {}
704- if queue not in self.accounts[account]:
705- self.accounts[account][queue] = []
706- for index in range(0, len(self.accounts[account][queue])):
707- message = self.accounts[account][queue][index]
708- if message['id'] == message_id:
709+ self.accounts = Accounts()
710+
711+ def delete_accounts(self, filters={}):
712+ self.accounts.delete(filters)
713+
714+ def get_accounts(self, filters={}):
715+ for account in self.accounts.iter(filters):
716+ yield account[0]
717+
718+ def delete_queues(self, account, filters={}):
719+ self.accounts.delete_queues(account, filters)
720+
721+ def get_queues(self, account, filters={}):
722+ for queue in self.accounts.iter_queues(account, filters):
723+ yield queue[0]
724+
725+ def delete_messages(self, account, queue, filters={}):
726+ return self._scan_queue(account, queue, filters, delete=True)
727+
728+ def get_messages(self, account, queue, filters={}):
729+ return self._scan_queue(account, queue, filters)
730+
731+ def update_messages(self, account, queue, attributes={}, filters={}):
732+ return self._scan_queue(account, queue, filters, attributes)
733+
734+ def create_message(self, account, queue, message, body, attributes={}):
735+ account, queue = self.accounts.get_queue(account, queue, True)
736+ ttl = attributes.get('ttl', 0)
737+ hide = attributes.get('hide', 0)
738+ for index in xrange(0, len(queue[3])):
739+ if queue[3][index]['id'] == message:
740+ message = queue[3][index]
741 message['ttl'] = ttl
742 message['hide'] = hide
743 message['body'] = body
744 if hide == 0:
745- self.notify(account, queue)
746+ self.notify(account[0], queue[0])
747 return False
748- message = dict(id=message_id, ttl=ttl, hide=hide, body=body)
749- self.accounts[account][queue].append(message)
750- self.notify(account, queue)
751+ message = dict(id=message, ttl=ttl, hide=hide, body=body)
752+ queue[3].append(message)
753+ self.notify(account[0], queue[0])
754 return True
755
756- def update_message(self, account, queue, message_id, ttl, hide):
757- for index in range(0, len(self.accounts[account][queue])):
758- message = self.accounts[account][queue][index]
759- if message['id'] == message_id:
760+ def delete_message(self, account, queue, message):
761+ account, queue = self.accounts.get_queue(account, queue)
762+ if queue is None:
763+ return None
764+ for index in xrange(0, len(queue[3])):
765+ if queue[3][index]['id'] == message:
766+ message = queue[3][index]
767+ del queue[3][index]
768+ if len(queue[3]) == 0:
769+ self.accounts.delete_queue(account[0], queue[0])
770+ return message
771+ return None
772+
773+ def get_message(self, account, queue, message):
774+ account, queue = self.accounts.get_queue(account, queue)
775+ if queue is None:
776+ return None
777+ for index in xrange(0, len(queue[3])):
778+ if queue[3][index]['id'] == message:
779+ return queue[3][index]
780+ return None
781+
782+ def update_message(self, account, queue, message, attributes={}):
783+ account, queue = self.accounts.get_queue(account, queue)
784+ if queue is None:
785+ return None
786+ ttl = attributes.get('ttl', None)
787+ hide = attributes.get('hide', None)
788+ for index in xrange(0, len(queue[3])):
789+ if queue[3][index]['id'] == message:
790+ message = queue[3][index]
791 if ttl is not None:
792 message['ttl'] = ttl
793 if hide is not None:
794 message['hide'] = hide
795 if hide == 0:
796- self.notify(account, queue)
797+ self.notify(account[0], queue[0])
798 return message
799 return None
800
801 def clean(self):
802 now = int(time.time())
803- for account in self.accounts.keys():
804- for queue in self.accounts[account].keys():
805+ for account in self.accounts.iter():
806+ for queue in account[3].iter():
807 index = 0
808 notify = False
809- total = len(self.accounts[account][queue])
810+ total = len(queue[3])
811 while index < total:
812- message = self.accounts[account][queue][index]
813+ message = queue[3][index]
814 if 0 < message['ttl'] <= now:
815- del self.accounts[account][queue][index]
816+ del queue[3][index]
817 total -= 1
818 else:
819 if 0 < message['hide'] <= now:
820@@ -128,30 +130,35 @@
821 notify = True
822 index += 1
823 if notify:
824- self.notify(account, queue)
825- if len(self.accounts[account][queue]) == 0:
826- del self.accounts[account][queue]
827- if len(self.accounts[account]) == 0:
828- del self.accounts[account]
829+ self.notify(account[0], queue[0])
830+ if len(queue[3]) == 0:
831+ self.accounts.delete_queue(account[0], queue[0])
832
833- def _scan_queue(self, account, queue, limit, marker, match_hidden,
834- ttl=None, hide=None, delete=False):
835+ def _scan_queue(self, account, queue, filters, attributes={},
836+ delete=False):
837+ account, queue = self.accounts.get_queue(account, queue)
838+ if queue is None:
839+ return
840 index = 0
841 notify = False
842- if marker is not None:
843+ if 'marker' in filters and filters['marker'] is not None:
844 found = False
845- for index in range(0, len(self.accounts[account][queue])):
846- message = self.accounts[account][queue][index]
847- if message['id'] == marker:
848+ for index in xrange(0, len(queue[3])):
849+ message = queue[3][index]
850+ if message['id'] == filters['marker']:
851 index += 1
852 found = True
853 break
854 if not found:
855 index = 0
856 messages = []
857- total = len(self.accounts[account][queue])
858+ total = len(queue[3])
859+ limit = filters.get('limit', None)
860+ match_hidden = filters.get('match_hidden', False)
861+ ttl = attributes.get('ttl', None)
862+ hide = attributes.get('hide', None)
863 while index < total:
864- message = self.accounts[account][queue][index]
865+ message = queue[3][index]
866 if not match_hidden and message['hide'] != 0:
867 index += 1
868 continue
869@@ -162,15 +169,126 @@
870 if hide == 0:
871 notify = True
872 if delete:
873- del self.accounts[account][queue][index]
874+ del queue[3][index]
875 total -= 1
876 else:
877 index += 1
878- messages.append(message)
879+ yield message
880 if limit:
881 limit -= 1
882 if limit == 0:
883 break
884 if notify:
885- self.notify(account, queue)
886- return messages
887+ self.notify(account[0], queue[0])
888+ if len(queue[3]) == 0:
889+ self.accounts.delete_queue(account[0], queue[0])
890+
891+
892+class IndexedTupleList(object):
893+ '''Class for managing an indexed tuple list. The tuple must be at
894+ least three elements and must reserve the first three for (name,
895+ next, previous).'''
896+
897+ def __init__(self):
898+ self.first = None
899+ self.last = None
900+ self.index = {}
901+
902+ def add(self, item):
903+ item = (item[0], None, self.last) + item[3:]
904+ if self.first is None:
905+ self.first = item
906+ self.last = item
907+ self.index[item[0]] = item
908+ return item
909+
910+ def count(self):
911+ return len(self.index)
912+
913+ def delete(self, filters):
914+ if len(filters) == 0:
915+ self.first = None
916+ self.last = None
917+ self.index.clear()
918+ return
919+ for item in self.iter(filters):
920+ self.delete_item(item[0])
921+
922+ def delete_item(self, name):
923+ if name not in self.index:
924+ return
925+ item = self.index[name][1]
926+ if item is not None:
927+ prev_item = self.index[name][2]
928+ self.index[item[0]] = (item[0], item[1], prev_item) + item[3:]
929+ item = self.index[name][2]
930+ if item is not None:
931+ next_item = self.index[name][1]
932+ self.index[item[0]] = (item[0], next_item, item[2]) + item[3:]
933+ if self.first == self.index[name]:
934+ self.first = self.index[name][1]
935+ if self.last == self.index[name]:
936+ self.last = self.index[name][2]
937+ del self.index[name]
938+
939+ def get(self, name):
940+ if name in self.index:
941+ return self.index[name]
942+ return None
943+
944+ def iter(self, filters={}):
945+ marker = filters.get('marker', None)
946+ if marker is not None and marker in self.index:
947+ item = self.index[marker]
948+ else:
949+ item = self.first
950+ limit = filters.get('limit', None)
951+ while item is not None:
952+ yield item
953+ if limit:
954+ limit -= 1
955+ if limit == 0:
956+ break
957+ item = item[1]
958+
959+
960+class Accounts(IndexedTupleList):
961+
962+ def delete_queue(self, account, queue):
963+ account = self.get(account)
964+ if account is not None:
965+ account[3].delete_item(queue)
966+ if account[3].count() == 0:
967+ self.delete_item(account[0])
968+
969+ def delete_queues(self, account, filters):
970+ account = self.get(account)
971+ if account is not None:
972+ account[3].delete(filters)
973+ if account[3].count() == 0:
974+ self.delete_item(account[0])
975+
976+ def get_queue(self, account, queue, create=False):
977+ if account in self.index:
978+ account = self.index[account]
979+ elif create:
980+ account = self.add((account, None, None, Queues()))
981+ else:
982+ return None, None
983+ return account, account[3].get(queue, create)
984+
985+ def iter_queues(self, account, filters={}):
986+ account = self.get(account)
987+ if account is not None:
988+ for queue in account[3].iter(filters):
989+ yield queue
990+
991+
992+class Queues(IndexedTupleList):
993+
994+ def get(self, queue, create=False):
995+ if queue in self.index:
996+ return self.index[queue]
997+ elif create:
998+ return self.add((queue, None, None, []))
999+ return None
1000
1001=== modified file 'burrow/backend/sqlite.py'
1002--- burrowd/backend/sqlite.py 2011-03-17 23:42:41 +0000
1003+++ burrow/backend/sqlite.py 2011-04-06 05:49:23 +0000
1004@@ -12,23 +12,24 @@
1005 # See the License for the specific language governing permissions and
1006 # limitations under the License.
1007
1008-'''Memory backend for the burrow server.'''
1009+'''SQLite backend for burrow.'''
1010
1011 import sqlite3
1012 import time
1013
1014-import burrowd.backend
1015+import burrow.backend
1016
1017 # Default configuration values for this module.
1018 DEFAULT_DATABASE = ':memory:'
1019
1020
1021-class Backend(burrowd.backend.Backend):
1022+class Backend(burrow.backend.Backend):
1023
1024 def __init__(self, config):
1025 super(Backend, self).__init__(config)
1026 database = self.config.get('database', DEFAULT_DATABASE)
1027 self.db = sqlite3.connect(database)
1028+ self.db.isolation_level = None
1029 queries = [
1030 'CREATE TABLE queues ('
1031 'account VARCHAR(255) NOT NULL,'
1032@@ -44,129 +45,97 @@
1033 for query in queries:
1034 self.db.execute(query)
1035
1036- def delete_accounts(self):
1037- self.db.execute("DELETE FROM queues")
1038- self.db.execute("DELETE FROM messages")
1039-
1040- def get_accounts(self):
1041- result = self.db.execute("SELECT account FROM queues").fetchall()
1042- return [row[0] for row in result]
1043-
1044- def delete_account(self, account):
1045- query = "SELECT rowid FROM queues WHERE account='%s'" % account
1046- result = self.db.execute(query).fetchall()
1047- if len(result) == 0:
1048+ def delete_accounts(self, filters={}):
1049+ if len(filters) == 0:
1050+ self.db.execute('DELETE FROM queues')
1051+ self.db.execute('DELETE FROM messages')
1052 return
1053- queues = [str(queue[0]) for queue in result]
1054- query = "DELETE FROM messages WHERE queue IN (%s)" % (','.join(queues))
1055- self.db.execute(query)
1056- self.db.execute("DELETE FROM queues WHERE account='%s'" % account)
1057-
1058- def get_queues(self, account):
1059- query = "SELECT queue FROM queues WHERE account='%s'" % account
1060- result = self.db.execute(query).fetchall()
1061- return [row[0] for row in result]
1062-
1063- def queue_exists(self, account, queue):
1064- query = "SELECT COUNT(*) FROM queues " \
1065- "WHERE account='%s' AND queue='%s'" % \
1066- (account, queue)
1067- result = self.db.execute(query).fetchall()
1068- if len(result) == 0:
1069- return False
1070- self.rowid = result[0][0]
1071- return True
1072-
1073- def delete_messages(self, account, queue, limit, marker, match_hidden):
1074- messages = self.get_messages(account, queue, limit, marker,
1075- match_hidden)
1076- ids = [message['id'] for message in messages]
1077- query = "DELETE FROM messages WHERE queue=%d AND name IN (%s)" % \
1078- (self.rowid, ','.join(ids))
1079- self.db.execute(query)
1080- query = "SELECT rowid FROM messages WHERE queue=%d LIMIT 1" % \
1081- self.rowid
1082- if len(self.db.execute(query).fetchall()) == 0:
1083- query = "DELETE FROM queues WHERE rowid=%d" % self.rowid
1084- self.db.execute(query)
1085- return messages
1086-
1087- def get_messages(self, account, queue, limit, marker, match_hidden):
1088- if marker is not None:
1089- query = "SELECT rowid FROM messages " \
1090- "WHERE queue=%d AND name='%s'" % \
1091- (self.rowid, marker)
1092- result = self.db.execute(query).fetchall()
1093- if len(result) == 0:
1094- marker = None
1095- else:
1096- marker = result[0][0]
1097- query = "SELECT name,ttl,hide,body FROM messages WHERE queue=%d" % \
1098- self.rowid
1099- if match_hidden is False:
1100- query += " AND hide == 0"
1101- if marker is not None:
1102- query += " AND rowid > %d" % marker
1103+
1104+ def get_accounts(self, filters={}):
1105+ query = 'SELECT DISTINCT account FROM queues'
1106+ values = tuple()
1107+ marker = filters.get('marker', None)
1108+ if marker is not None:
1109+ query += ' WHERE account > ?'
1110+ values += (marker,)
1111+ limit = filters.get('limit', None)
1112 if limit is not None:
1113- query += " LIMIT %d" % limit
1114- result = self.db.execute(query).fetchall()
1115- messages = []
1116- for row in result:
1117- messages.append(dict(id=row[0], ttl=row[1], hide=row[2],
1118- body=row[3]))
1119- return messages
1120-
1121- def update_messages(self, account, queue, limit, marker, match_hidden, ttl,
1122- hide):
1123- messages = self.get_messages(account, queue, limit, marker,
1124- match_hidden)
1125- query = "UPDATE messages SET"
1126- comma = ''
1127- if ttl is not None:
1128- query += "%s ttl=%d" % (comma, ttl)
1129- comma = ','
1130- if hide is not None:
1131- query += "%s hide=%d" % (comma, hide)
1132- comma = ','
1133- if comma == '':
1134- return (False, message)
1135- ids = []
1136- for message in messages:
1137+ query += ' LIMIT ?'
1138+ values += (limit,)
1139+ for row in self.db.execute(query, values):
1140+ yield row[0]
1141+
1142+ def delete_queues(self, account, filters={}):
1143+ query = 'SELECT rowid FROM queues WHERE account=?'
1144+ ids = []
1145+ for row in self.db.execute(query, (account,)):
1146+ ids.append(str(row[0]))
1147+ if len(ids) == 0:
1148+ return
1149+ query = 'DELETE FROM messages WHERE queue IN (%s)'
1150+ self.db.execute(query % ','.join(ids))
1151+ self.db.execute('DELETE FROM queues WHERE account=?', (account,))
1152+
1153+ def get_queues(self, account, filters={}):
1154+ query = 'SELECT queue FROM queues WHERE account=?'
1155+ for row in self.db.execute(query, (account,)):
1156+ yield row[0]
1157+
1158+ def delete_messages(self, account, queue, filters={}):
1159+ result = self._get_messages(account, queue, filters)
1160+ rowid = result.next()
1161+ ids = []
1162+ for message in result:
1163+ ids.append(message['id'])
1164+ yield message
1165+ if len(ids) == 0:
1166+ return
1167+ query = 'DELETE FROM messages WHERE queue=%d AND name IN (%s)'
1168+ self.db.execute(query % (rowid, ','.join(ids)))
1169+ query = 'SELECT rowid FROM messages WHERE queue=? LIMIT 1'
1170+ if len(self.db.execute(query, (rowid,)).fetchall()) == 0:
1171+ query = 'DELETE FROM queues WHERE rowid=?'
1172+ self.db.execute(query, (rowid,))
1173+
1174+ def get_messages(self, account, queue, filters={}):
1175+ result = self._get_messages(account, queue, filters)
1176+ rowid = result.next()
1177+ return result
1178+
1179+ def update_messages(self, account, queue, attributes={}, filters={}):
1180+ result = self._get_messages(account, queue, filters)
1181+ rowid = result.next()
1182+ ids = []
1183+ ttl = attributes.get('ttl', None)
1184+ hide = attributes.get('hide', None)
1185+ for message in result:
1186 ids.append(message['id'])
1187 if ttl is not None:
1188 message['ttl'] = ttl
1189 if hide is not None:
1190 message['hide'] = hide
1191- query += " WHERE queue=%d AND name IN (%s)" % \
1192- (self.rowid, ','.join(ids))
1193- self.db.execute(query)
1194+ yield message
1195+ if len(ids) == 0:
1196+ return
1197+ query = 'UPDATE messages SET'
1198+ comma = ''
1199+ values = tuple()
1200+ if ttl is not None:
1201+ query += '%s ttl=?' % comma
1202+ values += (ttl,)
1203+ comma = ','
1204+ if hide is not None:
1205+ query += '%s hide=?' % comma
1206+ values += (hide,)
1207+ comma = ','
1208+ if comma == '':
1209+ return
1210+ values += (rowid,)
1211+ query += ' WHERE queue=? AND name IN (%s)'
1212+ self.db.execute(query % ','.join(ids), values)
1213 self.notify(account, queue)
1214- return messages
1215-
1216- def delete_message(self, account, queue, message_id):
1217- message = self.get_message(account, queue, message_id)
1218- if message is None:
1219- return None
1220- query = "DELETE FROM messages WHERE queue=%d AND name='%s'" % \
1221- (self.rowid, message_id)
1222- self.db.execute(query)
1223- query = "SELECT rowid FROM messages WHERE queue=%d LIMIT 1" % \
1224- self.rowid
1225- if len(self.db.execute(query).fetchall()) == 0:
1226- query = "DELETE FROM queues WHERE rowid=%d" % self.rowid
1227- self.db.execute(query)
1228- return message
1229-
1230- def get_message(self, account, queue, message_id):
1231- query = "SELECT name,ttl,hide,body FROM messages " \
1232- "WHERE queue=%d AND name='%s'" % (self.rowid, message_id)
1233- result = self.db.execute(query).fetchall()
1234- if len(result) == 0:
1235- return None
1236- row = result[0]
1237- return dict(id=row[0], ttl=row[1], hide=row[2], body=row[3])
1238-
1239- def put_message(self, account, queue, message_id, ttl, hide, body):
1240+
1241+ def create_message(self, account, queue, message, body, attributes):
1242 query = "SELECT rowid FROM queues " \
1243 "WHERE account='%s' AND queue='%s'" % (account, queue)
1244 result = self.db.execute(query).fetchall()
1245@@ -176,11 +145,13 @@
1246 else:
1247 rowid = result[0][0]
1248 query = "SELECT rowid FROM messages WHERE queue=%d AND name='%s'" % \
1249- (rowid, message_id)
1250+ (rowid, message)
1251 result = self.db.execute(query).fetchall()
1252+ ttl = attributes.get('ttl', 0)
1253+ hide = attributes.get('hide', 0)
1254 if len(result) == 0:
1255 query = "INSERT INTO messages VALUES (%d, '%s', %d, %d, '%s')" % \
1256- (rowid, message_id, ttl, hide, body)
1257+ (rowid, message, ttl, hide, body)
1258 self.db.execute(query)
1259 self.notify(account, queue)
1260 return True
1261@@ -191,12 +162,45 @@
1262 self.notify(account, queue)
1263 return False
1264
1265- def update_message(self, account, queue, message_id, ttl, hide):
1266- message = self.get_message(account, queue, message_id)
1267+ def delete_message(self, account, queue, message):
1268+ rowid = self._get_queue(account, queue)
1269+ if rowid is None:
1270+ return None
1271+ message = self.get_message(account, queue, message)
1272+ if message is None:
1273+ return None
1274+ query = "DELETE FROM messages WHERE queue=%d AND name='%s'" % \
1275+ (rowid, message['id'])
1276+ self.db.execute(query)
1277+ query = "SELECT rowid FROM messages WHERE queue=%d LIMIT 1" % rowid
1278+ if len(self.db.execute(query).fetchall()) == 0:
1279+ query = "DELETE FROM queues WHERE rowid=%d" % rowid
1280+ self.db.execute(query)
1281+ return message
1282+
1283+ def get_message(self, account, queue, message):
1284+ rowid = self._get_queue(account, queue)
1285+ if rowid is None:
1286+ return None
1287+ query = "SELECT name,ttl,hide,body FROM messages " \
1288+ "WHERE queue=%d AND name='%s'" % (rowid, message)
1289+ result = self.db.execute(query).fetchall()
1290+ if len(result) == 0:
1291+ return None
1292+ row = result[0]
1293+ return dict(id=row[0], ttl=row[1], hide=row[2], body=row[3])
1294+
1295+ def update_message(self, account, queue, message, attributes):
1296+ rowid = self._get_queue(account, queue)
1297+ if rowid is None:
1298+ return None
1299+ message = self.get_message(account, queue, message)
1300 if message is None:
1301 return None
1302 query = "UPDATE messages SET"
1303 comma = ''
1304+ ttl = attributes.get('ttl', None)
1305+ hide = attributes.get('hide', None)
1306 if ttl is not None:
1307 query += "%s ttl=%d" % (comma, ttl)
1308 comma = ','
1309@@ -205,7 +209,7 @@
1310 comma = ','
1311 if comma == '':
1312 return message
1313- query += " WHERE queue=%d AND name='%s'" % (self.rowid, message_id)
1314+ query += " WHERE queue=%d AND name='%s'" % (rowid, message['id'])
1315 self.db.execute(query)
1316 if hide == 0:
1317 self.notify(account, queue)
1318@@ -248,3 +252,36 @@
1319 queue
1320 result = self.db.execute(query).fetchall()[0]
1321 self.notify(result[0], result[1])
1322+
1323+ def _get_queue(self, account, queue):
1324+ query = "SELECT COUNT(*) FROM queues " \
1325+ "WHERE account='%s' AND queue='%s'" % \
1326+ (account, queue)
1327+ result = self.db.execute(query).fetchall()
1328+ if len(result) == 0:
1329+ return None
1330+ return result[0][0]
1331+
1332+ def _get_messages(self, account, queue, filters):
1333+ rowid = self._get_queue(account, queue)
1334+ yield rowid
1335+ if rowid is None:
1336+ return
1337+ marker = None
1338+ if 'marker' in filters and filters['marker'] is not None:
1339+ query = "SELECT rowid FROM messages " \
1340+ "WHERE queue=%d AND name='%s'" % (rowid, filters['marker'])
1341+ result = self.db.execute(query).fetchall()
1342+ if len(result) > 0:
1343+ marker = result[0][0]
1344+ query = "SELECT name,ttl,hide,body FROM messages WHERE queue=%d" % \
1345+ rowid
1346+ if marker is not None:
1347+ query += " AND rowid > %d" % marker
1348+ if 'match_hidden' not in filters or filters['match_hidden'] is False:
1349+ query += " AND hide == 0"
1350+ if 'limit' in filters and filters['limit'] is not None:
1351+ query += " LIMIT %d" % filters['limit']
1352+ result = self.db.execute(query).fetchall()
1353+ for row in result:
1354+ yield dict(id=row[0], ttl=row[1], hide=row[2], body=row[3])
1355
1356=== added file 'burrow/client.py'
1357--- burrow/client.py 1970-01-01 00:00:00 +0000
1358+++ burrow/client.py 2011-04-06 05:49:23 +0000
1359@@ -0,0 +1,63 @@
1360+# Copyright (C) 2011 OpenStack LLC.
1361+#
1362+# Licensed under the Apache License, Version 2.0 (the "License");
1363+# you may not use this file except in compliance with the License.
1364+# You may obtain a copy of the License at
1365+#
1366+# http://www.apache.org/licenses/LICENSE-2.0
1367+#
1368+# Unless required by applicable law or agreed to in writing, software
1369+# distributed under the License is distributed on an "AS IS" BASIS,
1370+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1371+# See the License for the specific language governing permissions and
1372+# limitations under the License.
1373+
1374+'''Client module for burrow.'''
1375+
1376+import ConfigParser
1377+import logging
1378+import logging.config
1379+
1380+import burrow
1381+import burrow.config
1382+
1383+# Default configuration values for this module.
1384+DEFAULT_BACKEND = 'burrow.backend.http'
1385+
1386+
1387+class Client(object):
1388+ '''Client class for burrow.'''
1389+
1390+ def __init__(self, url=None, config_files=[],
1391+ add_default_log_handler=True):
1392+ '''Initialize a client using the URL and config files from the
1393+ given list. This is passed directly to ConfigParser.read(),
1394+ so files should be in ConfigParser format. This will load
1395+ all the backend class from the configuration.'''
1396+ if len(config_files) > 0:
1397+ logging.config.fileConfig(config_files)
1398+ self._config = ConfigParser.ConfigParser()
1399+ self._config.read(config_files)
1400+ # TODO: Parse URL if given and overwrite any values in self._config.
1401+ self.config = burrow.config.Config(self._config, 'burrow.client')
1402+ self.log = burrow.get_logger(self.config)
1403+ if add_default_log_handler:
1404+ self._add_default_log_handler()
1405+ self._import_backend()
1406+
1407+ def _add_default_log_handler(self):
1408+ '''Add a default log handler it one has not been set.'''
1409+ root_log = logging.getLogger()
1410+ if len(root_log.handlers) > 0 or len(self.log.handlers) > 0:
1411+ return
1412+ handler = logging.StreamHandler()
1413+ handler.setLevel(logging.ERROR)
1414+ log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
1415+ handler.setFormatter(logging.Formatter(log_format))
1416+ root_log.addHandler(handler)
1417+
1418+ def _import_backend(self):
1419+ '''Load backend given in the 'backend' option.'''
1420+ backend = self.config.get('backend', DEFAULT_BACKEND)
1421+ config = (self._config, backend)
1422+ self.backend = burrow.import_class(backend, 'Backend')(config)
1423
1424=== modified file 'burrow/config.py'
1425--- burrowd/config.py 2011-03-17 23:42:41 +0000
1426+++ burrow/config.py 2011-04-06 05:49:23 +0000
1427@@ -12,7 +12,7 @@
1428 # See the License for the specific language governing permissions and
1429 # limitations under the License.
1430
1431-'''Configuration module for the burrow server.'''
1432+'''Configuration module for burrow.'''
1433
1434 import ConfigParser
1435
1436
1437=== modified file 'burrow/frontend/__init__.py'
1438--- burrowd/frontend/__init__.py 2011-03-17 23:42:41 +0000
1439+++ burrow/frontend/__init__.py 2011-04-06 05:49:23 +0000
1440@@ -12,12 +12,12 @@
1441 # See the License for the specific language governing permissions and
1442 # limitations under the License.
1443
1444-'''Frontends for the burrow server.'''
1445-
1446-import burrowd
1447-
1448-
1449-class Frontend(burrowd.Module):
1450+'''Frontends for burrow.'''
1451+
1452+import burrow
1453+
1454+
1455+class Frontend(burrow.Module):
1456 '''Interface that frontend implementations must provide.'''
1457
1458 def __init__(self, config, backend):
1459
1460=== modified file 'burrow/frontend/wsgi.py'
1461--- burrowd/frontend/wsgi.py 2011-03-17 23:42:41 +0000
1462+++ burrow/frontend/wsgi.py 2011-04-06 05:49:23 +0000
1463@@ -24,7 +24,7 @@
1464 import webob.dec
1465 import webob.exc
1466
1467-import burrowd.frontend
1468+import burrow.frontend
1469
1470 # Default configuration values for this module.
1471 DEFAULT_HOST = '0.0.0.0'
1472@@ -38,20 +38,18 @@
1473 DEFAULT_HIDE = 0
1474
1475
1476-def queue_exists(method):
1477- '''Decorator to ensure an account and queue exists. If the wait
1478- option is given, this will block until a message in the queue is
1479- ready or the timeout expires.'''
1480+def wait_on_queue(method):
1481+ '''Decorator to wait on an account/queue if the wait option is
1482+ given. This will block until a message in the queue is ready or
1483+ the timeout expires.'''
1484 def wrapper(self, req, account, queue, *args, **kwargs):
1485 wait = 0
1486 if 'wait' in req.params:
1487 wait = int(req.params['wait'])
1488 if wait > 0:
1489 wait += time.time()
1490- res = webob.exc.HTTPNotFound()
1491 while True:
1492- if self.backend.queue_exists(account, queue):
1493- res = method(self, req, account, queue, *args, **kwargs)
1494+ res = method(self, req, account, queue, *args, **kwargs)
1495 if wait == 0 or res.status_int != 404:
1496 break
1497 now = time.time()
1498@@ -63,7 +61,7 @@
1499 return wrapper
1500
1501
1502-class Frontend(burrowd.frontend.Frontend):
1503+class Frontend(burrow.frontend.Frontend):
1504
1505 def __init__(self, config, backend):
1506 super(Frontend, self).__init__(config, backend)
1507@@ -73,7 +71,7 @@
1508 mapper.connect('/', action='root')
1509 mapper.connect('/{account}', action='account')
1510 mapper.connect('/{account}/{queue}', action='queue')
1511- mapper.connect('/{account}/{queue}/{message_id}', action='message')
1512+ mapper.connect('/{account}/{queue}/{message}', action='message')
1513 self._routes = routes.middleware.RoutesMiddleware(self._route, mapper)
1514
1515 def run(self, thread_pool):
1516@@ -121,85 +119,86 @@
1517
1518 @webob.dec.wsgify
1519 def _delete_root(self, req):
1520- self.backend.delete_accounts()
1521+ filters = self._parse_filters(req)
1522+ self.backend.delete_accounts(filters)
1523 return webob.exc.HTTPNoContent()
1524
1525 @webob.dec.wsgify
1526 def _get_root(self, req):
1527- accounts = self.backend.get_accounts()
1528+ filters = self._parse_filters(req)
1529+ accounts = [account for account in self.backend.get_accounts(filters)]
1530 if len(accounts) == 0:
1531 return webob.exc.HTTPNotFound()
1532 return webob.exc.HTTPOk(body=json.dumps(accounts, indent=2))
1533
1534 @webob.dec.wsgify
1535 def _delete_account(self, req, account):
1536- self.backend.delete_account(account)
1537+ filters = self._parse_filters(req)
1538+ self.backend.delete_queues(account, filters)
1539 return webob.exc.HTTPNoContent()
1540
1541 @webob.dec.wsgify
1542 def _get_account(self, req, account):
1543- queues = self.backend.get_queues(account)
1544+ filters = self._parse_filters(req)
1545+ queues = [queue for queue in self.backend.get_queues(account, filters)]
1546 if len(queues) == 0:
1547 return webob.exc.HTTPNotFound()
1548 return webob.exc.HTTPOk(body=json.dumps(queues, indent=2))
1549
1550 @webob.dec.wsgify
1551- @queue_exists
1552+ @wait_on_queue
1553 def _delete_queue(self, req, account, queue):
1554- limit, marker, match_hidden = self._parse_filters(req)
1555- messages = self.backend.delete_messages(account, queue, limit, marker,
1556- match_hidden)
1557+ filters = self._parse_filters(req)
1558+ messages = [message for message in
1559+ self.backend.delete_messages(account, queue, filters)]
1560 return self._return_messages(req, account, queue, messages, 'none')
1561
1562 @webob.dec.wsgify
1563- @queue_exists
1564+ @wait_on_queue
1565 def _get_queue(self, req, account, queue):
1566- limit, marker, match_hidden = self._parse_filters(req)
1567- messages = self.backend.get_messages(account, queue, limit, marker,
1568- match_hidden)
1569+ filters = self._parse_filters(req)
1570+ messages = [message for message in
1571+ self.backend.get_messages(account, queue, filters)]
1572 return self._return_messages(req, account, queue, messages, 'all')
1573
1574 @webob.dec.wsgify
1575- @queue_exists
1576+ @wait_on_queue
1577 def _post_queue(self, req, account, queue):
1578- limit, marker, match_hidden = self._parse_filters(req)
1579- ttl, hide = self._parse_metadata(req)
1580- messages = self.backend.update_messages(account, queue, limit, marker,
1581- match_hidden, ttl, hide)
1582+ attributes = self._parse_attributes(req)
1583+ filters = self._parse_filters(req)
1584+ messages = [message for message in
1585+ self.backend.update_messages(account, queue, attributes, filters)]
1586 return self._return_messages(req, account, queue, messages, 'all')
1587
1588 @webob.dec.wsgify
1589- @queue_exists
1590- def _delete_message(self, req, account, queue, message_id):
1591- message = self.backend.delete_message(account, queue, message_id)
1592+ def _delete_message(self, req, account, queue, message):
1593+ message = self.backend.delete_message(account, queue, message)
1594 if message is None:
1595 return webob.exc.HTTPNotFound()
1596 return self._return_message(req, account, queue, message, 'none')
1597
1598 @webob.dec.wsgify
1599- @queue_exists
1600- def _get_message(self, req, account, queue, message_id):
1601- message = self.backend.get_message(account, queue, message_id)
1602+ def _get_message(self, req, account, queue, message):
1603+ message = self.backend.get_message(account, queue, message)
1604 if message is None:
1605 return webob.exc.HTTPNotFound()
1606 return self._return_message(req, account, queue, message, 'all')
1607
1608 @webob.dec.wsgify
1609- @queue_exists
1610- def _post_message(self, req, account, queue, message_id):
1611- ttl, hide = self._parse_metadata(req)
1612- message = self.backend.update_message(account, queue, message_id, ttl,
1613- hide)
1614+ def _post_message(self, req, account, queue, message):
1615+ attributes = self._parse_attributes(req)
1616+ message = self.backend.update_message(account, queue, message,
1617+ attributes)
1618 if message is None:
1619 return webob.exc.HTTPNotFound()
1620 return self._return_message(req, account, queue, message, 'id')
1621
1622 @webob.dec.wsgify
1623- def _put_message(self, req, account, queue, message_id):
1624- (ttl, hide) = self._parse_metadata(req, self.default_ttl,
1625- self.default_hide)
1626- if self.backend.put_message(account, queue, message_id, ttl, hide, \
1627- req.body):
1628+ def _put_message(self, req, account, queue, message):
1629+ attributes = self._parse_attributes(req, self.default_ttl,
1630+ self.default_hide)
1631+ if self.backend.create_message(account, queue, message, req.body,
1632+ attributes):
1633 return webob.exc.HTTPCreated()
1634 return webob.exc.HTTPNoContent()
1635
1636@@ -239,31 +238,32 @@
1637 return webob.exc.HTTPOk(body=json.dumps(body, indent=2))
1638
1639 def _parse_filters(self, req):
1640- limit = None
1641+ filters = {}
1642 if 'limit' in req.params:
1643- limit = int(req.params['limit'])
1644- marker = None
1645+ filters['limit'] = int(req.params['limit'])
1646 if 'marker' in req.params:
1647- marker = req.params['marker']
1648- match_hidden = False
1649+ filters['marker'] = req.params['marker']
1650 if 'hidden' in req.params and req.params['hidden'].lower() == 'true':
1651- match_hidden = True
1652- return limit, marker, match_hidden
1653+ filters['match_hidden'] = True
1654+ return filters
1655
1656- def _parse_metadata(self, req, default_ttl=None, default_hide=None):
1657+ def _parse_attributes(self, req, default_ttl=None, default_hide=None):
1658+ attributes = {}
1659 if 'ttl' in req.params:
1660 ttl = int(req.params['ttl'])
1661 else:
1662 ttl = default_ttl
1663 if ttl is not None and ttl > 0:
1664 ttl += int(time.time())
1665+ attributes['ttl'] = ttl
1666 if 'hide' in req.params:
1667 hide = int(req.params['hide'])
1668 else:
1669 hide = default_hide
1670 if hide is not None and hide > 0:
1671 hide += int(time.time())
1672- return ttl, hide
1673+ attributes['hide'] = hide
1674+ return attributes
1675
1676
1677 class WSGILog(object):
1678
1679=== added file 'burrow/server.py'
1680--- burrow/server.py 1970-01-01 00:00:00 +0000
1681+++ burrow/server.py 2011-04-06 05:49:23 +0000
1682@@ -0,0 +1,95 @@
1683+# Copyright (C) 2011 OpenStack LLC.
1684+#
1685+# Licensed under the Apache License, Version 2.0 (the "License");
1686+# you may not use this file except in compliance with the License.
1687+# You may obtain a copy of the License at
1688+#
1689+# http://www.apache.org/licenses/LICENSE-2.0
1690+#
1691+# Unless required by applicable law or agreed to in writing, software
1692+# distributed under the License is distributed on an "AS IS" BASIS,
1693+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1694+# See the License for the specific language governing permissions and
1695+# limitations under the License.
1696+
1697+'''Server module for burrow.'''
1698+
1699+import ConfigParser
1700+import logging
1701+import logging.config
1702+
1703+import eventlet
1704+
1705+import burrow
1706+import burrow.config
1707+
1708+# Default configuration values for this module.
1709+DEFAULT_BACKEND = 'burrow.backend.sqlite'
1710+DEFAULT_FRONTENDS = 'burrow.frontend.wsgi'
1711+DEFAULT_THREAD_POOL_SIZE = 1000
1712+
1713+
1714+class Server(object):
1715+ '''Server class for burrow.'''
1716+
1717+ def __init__(self, config_files=[], add_default_log_handler=True):
1718+ '''Initialize a server using the config files from the given
1719+ list. This is passed directly to ConfigParser.read(), so
1720+ files should be in ConfigParser format. This will load all
1721+ frontend and backend classes from the configuration.'''
1722+ if len(config_files) > 0:
1723+ logging.config.fileConfig(config_files)
1724+ self._config = ConfigParser.ConfigParser()
1725+ self._config.read(config_files)
1726+ self.config = burrow.config.Config(self._config, 'burrow.server')
1727+ self.log = burrow.get_logger(self.config)
1728+ if add_default_log_handler:
1729+ self._add_default_log_handler()
1730+ self._import_backend()
1731+ self._import_frontends()
1732+
1733+ def _add_default_log_handler(self):
1734+ '''Add a default log handler it one has not been set.'''
1735+ root_log = logging.getLogger()
1736+ if len(root_log.handlers) > 0 or len(self.log.handlers) > 0:
1737+ return
1738+ handler = logging.StreamHandler()
1739+ handler.setLevel(logging.DEBUG)
1740+ log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
1741+ handler.setFormatter(logging.Formatter(log_format))
1742+ root_log.addHandler(handler)
1743+
1744+ def _import_backend(self):
1745+ '''Load backend given in the 'backend' option.'''
1746+ backend = self.config.get('backend', DEFAULT_BACKEND)
1747+ config = (self._config, backend)
1748+ self.backend = burrow.import_class(backend, 'Backend')(config)
1749+
1750+ def _import_frontends(self):
1751+ '''Load frontends given in the 'frontends' option.'''
1752+ self.frontends = []
1753+ frontends = self.config.get('frontends', DEFAULT_FRONTENDS)
1754+ for frontend in frontends.split(','):
1755+ frontend = frontend.split(':')
1756+ if len(frontend) == 1:
1757+ frontend.append(None)
1758+ config = (self._config, frontend[0], frontend[1])
1759+ frontend = burrow.import_class(frontend[0], 'Frontend')
1760+ frontend = frontend(config, self.backend)
1761+ self.frontends.append(frontend)
1762+
1763+ def run(self):
1764+ '''Create the thread pool and start the main server loop. Wait
1765+ for the pool to complete, but possibly run forever if the
1766+ frontends and backend never remove threads.'''
1767+ thread_pool_size = self.config.getint('thread_pool_size',
1768+ DEFAULT_THREAD_POOL_SIZE)
1769+ thread_pool = eventlet.GreenPool(size=int(thread_pool_size))
1770+ self.backend.run(thread_pool)
1771+ for frontend in self.frontends:
1772+ frontend.run(thread_pool)
1773+ self.log.info(_('Waiting for all threads to exit'))
1774+ try:
1775+ thread_pool.waitall()
1776+ except KeyboardInterrupt:
1777+ pass
1778
1779=== modified file 'etc/burrowd.conf'
1780--- etc/burrowd.conf 2011-03-17 23:42:41 +0000
1781+++ etc/burrowd.conf 2011-04-06 05:49:23 +0000
1782@@ -1,6 +1,6 @@
1783 [DEFAULT]
1784
1785-# Log level to use. All sections below prefixed with 'burrowd' can define
1786+# Log level to use. All sections below prefixed with 'burrow' can define
1787 # this to override this default.
1788 log_level = DEBUG
1789
1790@@ -11,26 +11,26 @@
1791 default_hide = 0
1792
1793
1794-[burrowd]
1795+[burrow.server]
1796
1797 # Backend to use for storing messages.
1798-backend = burrowd.backend.sqlite
1799+backend = burrow.backend.sqlite
1800
1801 # Comma separated list of frontends to run.
1802-# frontends = burrowd.frontend.wsgi,burrowd.frontend.wsgi:ssl
1803-frontends = burrowd.frontend.wsgi
1804+# frontends = burrow.frontend.wsgi,burrow.frontend.wsgi:ssl
1805+frontends = burrow.frontend.wsgi
1806
1807 # Size of the thread pool to use for the server.
1808 thread_pool_size = 1000
1809
1810
1811-[burrowd.backend.sqlite]
1812+[burrow.backend.sqlite]
1813
1814 # Database file to use, passed to sqlite3.connect.
1815 database = :memory:
1816
1817
1818-[burrowd.frontend.wsgi]
1819+[burrow.frontend.wsgi]
1820
1821 # Host to listen on.
1822 host = 0.0.0.0
1823@@ -51,7 +51,7 @@
1824 ssl_keyfile = example.key
1825
1826 # Size of thread pool for the WSGI server. If the size is 0, use the main
1827-# burrowd thread pool.
1828+# burrow thread pool.
1829 thread_pool_size = 0
1830
1831 # Default expiration time in seconds to set for messages. This overrides
1832@@ -63,7 +63,7 @@
1833 # default_hide = 0
1834
1835
1836-[burrowd.frontend.wsgi:ssl]
1837+[burrow.frontend.wsgi:ssl]
1838
1839 # Port to listen on.
1840 port = 8443
1841
1842=== modified file 'locale/burrow.pot'
1843--- locale/burrow.pot 2011-03-18 00:47:26 +0000
1844+++ locale/burrow.pot 2011-04-06 05:49:23 +0000
1845@@ -8,7 +8,7 @@
1846 msgstr ""
1847 "Project-Id-Version: burrow 0.1\n"
1848 "Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
1849-"POT-Creation-Date: 2011-03-17 17:43-0700\n"
1850+"POT-Creation-Date: 2011-04-05 19:00-0700\n"
1851 "PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
1852 "Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
1853 "Language-Team: LANGUAGE <LL@li.org>\n"
1854@@ -17,20 +17,20 @@
1855 "Content-Transfer-Encoding: 8bit\n"
1856 "Generated-By: Babel 0.9.4\n"
1857
1858-#: burrowd/__init__.py:96
1859-msgid "Waiting for all threads to exit"
1860-msgstr ""
1861-
1862-#: burrowd/__init__.py:109
1863+#: burrow/__init__.py:38
1864 msgid "Module created"
1865 msgstr ""
1866
1867-#: burrowd/__init__.py:130
1868+#: burrow/__init__.py:59
1869 #, python-format
1870 msgid "Class %s.%s cannot be found (%s)"
1871 msgstr ""
1872
1873-#: burrowd/frontend/wsgi.py:87
1874+#: burrow/server.py:91
1875+msgid "Waiting for all threads to exit"
1876+msgstr ""
1877+
1878+#: burrow/frontend/wsgi.py:85
1879 #, python-format
1880 msgid "Listening on %s:%d"
1881 msgstr ""
1882
1883=== modified file 'setup.py'
1884--- setup.py 2011-03-17 23:42:41 +0000
1885+++ setup.py 2011-04-06 05:49:23 +0000
1886@@ -17,11 +17,14 @@
1887 from setuptools.command.sdist import sdist
1888 import os
1889 import subprocess
1890+
1891 try:
1892 from babel.messages import frontend
1893 except ImportError:
1894 frontend = None
1895
1896+import burrow
1897+
1898
1899 class local_sdist(sdist):
1900 """Customized sdist hook - builds the ChangeLog file from VC first"""
1901@@ -54,7 +57,7 @@
1902
1903 setup(
1904 name=name,
1905- version='0.1',
1906+ version=burrow.__version__,
1907 description='Burrow',
1908 license='Apache License (2.0)',
1909 author='OpenStack, LLC.',
1910
1911=== modified file 'test/frontend/test_wsgi.py'
1912--- test/frontend/test_wsgi.py 2011-03-17 23:42:41 +0000
1913+++ test/frontend/test_wsgi.py 2011-04-06 05:49:23 +0000
1914@@ -20,19 +20,19 @@
1915 import eventlet
1916 import webob
1917
1918-import burrowd.backend.memory
1919-import burrowd.backend.sqlite
1920-import burrowd.frontend.wsgi
1921+import burrow.backend.memory
1922+import burrow.backend.sqlite
1923+import burrow.frontend.wsgi
1924
1925
1926 class TestWSGIMemory(unittest.TestCase):
1927 '''Unittests for the WSGI frontend to SQLite backend.'''
1928- backend_class = burrowd.backend.memory.Backend
1929+ backend_class = burrow.backend.memory.Backend
1930
1931 def setUp(self):
1932 config = (ConfigParser.ConfigParser(), 'test')
1933 self.backend = self.backend_class(config)
1934- self.frontend = burrowd.frontend.wsgi.Frontend(config, self.backend)
1935+ self.frontend = burrow.frontend.wsgi.Frontend(config, self.backend)
1936 self.frontend.default_ttl = 0
1937 self._get_url('/', status=404)
1938 self._get_url('/a', status=404)
1939@@ -310,4 +310,4 @@
1940
1941 class TestWSGISQLite(TestWSGIMemory):
1942 '''Unittests for the WSGI frontend to SQLite backend.'''
1943- backend_class = burrowd.backend.sqlite.Backend
1944+ backend_class = burrow.backend.sqlite.Backend

Subscribers

People subscribed via source and target branches