Merge lp:~termie/nova/rpc_multicall into lp:~hudson-openstack/nova/trunk
- rpc_multicall
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Vish Ishaya |
Approved revision: | 1138 |
Merged at revision: | 1116 |
Proposed branch: | lp:~termie/nova/rpc_multicall |
Merge into: | lp:~hudson-openstack/nova/trunk |
Diff against target: |
995 lines (+443/-144) 8 files modified
nova/fakerabbit.py (+23/-8) nova/rpc.py (+208/-71) nova/service.py (+36/-24) nova/test.py (+6/-3) nova/tests/integrated/integrated_helpers.py (+1/-4) nova/tests/test_cloud.py (+9/-17) nova/tests/test_rpc.py (+107/-11) nova/tests/test_service.py (+53/-6) |
To merge this branch: | bzr merge lp:~termie/nova/rpc_multicall |
Related bugs: | |
Related blueprints: |
No DB Messaging
(High)
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Vish Ishaya (community) | Approve | ||
Ed Leafe (community) | Approve | ||
Chris Behrens (community) | Approve | ||
Jay Pipes (community) | Needs Information | ||
Review via email: mp+61686@code.launchpad.net |
Commit message
Description of the change
Adds the ability to make a call that returns multiple times (a call returning a generator). This is also based on the work in rpc-improvements + a bunch of fixes Vish and I worked through to get all the tests to pass so the code is a bit all over the place.
The functionality is being added to support Vish's work on removing worker access to the database, this allows us to write multi-phase actions that yield state updates as they progress, letting the frontend update the db.
Chris Behrens (cbehrens) wrote : | # |
- 1088. By Justin Shepherd
-
The tools/* directory is now included in pep8 runs. Added an opt-out system for excluding files/dirs from pep8 (using GLOBIGNORE).
- 1089. By Johannes Erdfelt
-
The XenAPI driver uses openssl as part of the nova-agent implementation to set the password for root. It uses a temporary file insecurely and unnecessarily. Change the code to write the password directly to stdin of the openssl process instead.
- 1090. By Johannes Erdfelt
-
Add new flag 'max_kernel_
ramdisk_ size' to specify a maximum size of kernel or ramdisk so we don't copy large files to dom0 and fill up /boot/guest - 1091. By Anthony Young
-
This fix ensures that kpartx -d is called in the event that tune2fs fails during key injection, as it does when trying to inject a key into a windows instance. Bug #760921 is a symptom of this issue, as if kpartx -d is not called then partitions remain mapped that prevent the underlying nbd from being reused.
Couldn't think of a good way to regression test for this - any ideas?
- 1092. By John Tran
-
Added an EC2 API endpoint that'll allow import of public key. Prior, api only allowed generation of new keys.
- 1093. By Dan Prince
-
Update OSAPI v1.1 extensions so that it supports RequestExtensions. ResponseExtensions were removed since the new RequestExtension covers both use cases. This branch also removes some of the odd serialization code in the RequestExtensio
nController that converted dictionary objects into webob objects. RequestExtension handlers should now always return proper webob objects. - 1094. By William Wolf
-
Get rid of old virt/images.py functions that are no longer needed. Checked for any loose calls to these functions and found none. All tests pass for me.
- 1095. By Rick Harris
-
This is the groundwork for the upcoming distributed scheduler changes. Nothing is actually wired up here, so it shouldn't break any existing code (and all tests pass).
The goals were to:
1. Define the basic distributed scheduler communication mechanism:
a. call_zone_method - how each zone can communicate with its childrenb. encrypted child-blobs - how child zones an securely and statelessly report back weight and build-plan info
2. Put in hooks for advanced-filtering (hard-requirements, capabilities) as well as preferences (least-
cost-scheduling ) 3. Create a base set of dist-scheduler tests that we can extend as we add more functionality.
Next up will be to:
1. Add in a filtering driver
2. Add in a cost-scheduler driver
- 1096. By Eldar Nugaev
-
print information about nova-manage project problems
- 1097. By Vish Ishaya
-
Makes sure vlan creation locks so we don't race and fail to create a vlan.
- 1098. By Soren Hansen
-
Include data files for public key tests in the tarball.
- 1099. By <email address hidden>
-
found a typo in the xenserver glance plugin that doesn't work with glance trunk. Also modified the image url to fetch from /v1/image/X instead of /image/X as that returned a 300.
- 1100. By Andrey Brindeyev
-
--dhcp-
lease-max= 150 by default. This prevents >150 instances in one network.
Jay Pipes (jaypipes) wrote : | # |
Hey terms. Looks great overall, including Chris' additions and LiFO.
One little question, though.
37 - greenthread.
38 + for (queue, callback) in CONSUMERS.
39 + item = self.get(queue)
40 + if item:
41 + callback(item)
42 + num += 1
43 + yield
44 + if limit and num == limit:
45 + raise StopIteration()
46 + greenthread.
Why did you change the sleep time from 0 to 0.1. Did you notice something lock up when it was at 0?
Just curious :) Cheers!
-jay
Chris Behrens (cbehrens) wrote : | # |
Jay: I think it may be so that there's less CPU spinning... though this is just in fakerabbit.
I'm worried about the 'time.sleep(0.01)' in MulticallWaiter, though. That's a lot of send/recvs spinning until the result actually comes into the queue...
What about this:
def wait(self):
while not self._closed:
try:
except Exception:
result = self._results.get()
if isinstance(result, Exception):
if result == None:
yield result
raise StopIteration
That removes the need for the sleep and will be less harsh on rabbit.. wait() will block until a result is received (eventlet will be polling for data). And if one happens to call wait() after close(), StopIteration is raised.
Couple other things:
1) Service.kill puts self.conn back into the pool, but it never came out of the pool in the first place. I'd just remove the 'put'. No need to use the pool for it right now, and it somewhat breaks the pool if ConsumerSet needs to reconnect to rabbit.
2) Should this code that kills the csetthread, etc, actually be in Service.stop?
3) There's no handling of reconnecting to rabbit if the connection dies. We really need a larger re-factor. But since only 'call' is using this right now, and 'call' already didn't handle reconnecting, there's not really any behavior change.
- 1101. By justinsb
-
Fix bug #744150 by starting nova-api on an unused port.
- 1102. By Renuka Apte
-
Fixes euca-attach-volume for iscsi using Xenserver
Minor changes required to xenapi functions to get correct format for volume-id, iscsi-host, etc.
- 1103. By Anne Gentle
-
Fixes some minor doc issues - misspelled flags in zones doc and also adds zones doc to an index for easier findability
- 1104. By Dave Walker
-
When adding a keypair with ec2 API that already exists, give a friendly error and no traceback in nova-api
- 1105. By termie
-
Fixes a bug related to incorrect reparsing of flags and prevents many extra reparses.
- 1106. By Chris Behrens
-
Pretty simple. We call openssl to encrypt the admin password, but the recent changes around this code forgot to strip the newline off the read from stdout.
- 1107. By Johannes Erdfelt
-
Using the root-password subcommand of the nova client results in the password being changed for the instance specified, but to a different unknown password. The patch changes nova to use the password specified in the API call.
- 1108. By Johannes Erdfelt
-
eventlet.spawn_n() expects the function and arguments, but it expects the arguments unpacked since it uses *args.
- 1109. By Ed Leafe
-
The code for getting an opaque reference to an instance assumed that there was a reference to an instance obj available when raising an exception. I changed this from raising an InstanceNotFound exception to a NotFound, as this is more appropriate for the failure, and doesn't require an instance ID.
- 1110. By Brian Lamar
-
Created new libvirt directory, moved libvirt_conn.py to libvirt/
connection. py, moved libvirt templates, broke out firewall and network utilities. - 1111. By Chris Behrens
-
During the API create call, the API would kick off a build and then loop in a greenthread waiting for the scheduler to pick a host for the instance. After API would see a host was picked, it would cast to the compute node's set_admin_password method.
The API server really should not have to do this. The password to set should be pushed along with the build request, instead. The compute node can then set the password after it detects the instance has booted. This removes a greenthread from the API server, a loop that constantly checks the DB for the host, and finally a cast to the compute node.
- 1112. By Mark Washenberger
-
Several changes designed to bring the openstack api 1.1 closer to spec
- add ram limits to the nova compute quotas
- enable injected file limits and injected file size limits to be overridden in the quota database table
- expose quota limits as absolute limits in the openstack api 1.1 limits resource
- add support for controlling 'unlimited' quotas to nova-manage - 1113. By Alex Meade
-
Fixed the mistyped line referred to in bug 787023
termie (termie) wrote : | # |
Chris: I've just tried a few different variations of the above suggested code as well as code based on iterconsume, but all have resulted in issues of some sort or another that have been difficult to debug (either hangs or returns nothing)... the code works as it is so I am hesitant to go back down the rabbit hole. Will push newer version shortly.
- 1114. By termie
-
add support to rpc for multicall
- 1115. By termie
-
make the test more expicit
- 1116. By termie
-
add commented out unworking code for yield-based returns
- 1117. By Chris Behrens
-
Add a connection pool for rpc cast/call
Use the same rabbit connection for all topic listening and wait to be notified vs doing a 0.1 second poll for each. - 1118. By Chris Behrens
-
pep8 and comment fixes
- 1119. By Chris Behrens
-
convert fanout_cast to ConnectionPool
- 1120. By Chris Behrens
-
fakerabbit's declare_consumer should support more than 1 consumer. also: make fakerabbit Backend.consume be an iterator like it should be..
- 1121. By Vish Ishaya
-
fix consumers to actually be deleted and clean up cloud test
- 1122. By Chris Behrens
-
catch greenlet.
GreenletExit when shutting service down - 1123. By Chris Behrens
-
Always create Service consumers no matter if report_interval is 0
Fix tests to handle how Service loads Consumers now - 1124. By Chris Behrens
-
Add rpc_conn_pool_size flag for the new connection pool
- 1125. By Chris Behrens
-
connection pool tests and make the pool LIFO
- 1126. By termie
-
bring back commits lost in merge
- 1127. By termie
-
almost everything working with fake_rabbit
- 1128. By termie
-
don't need to use a separate connection
- 1129. By Vish Ishaya
-
lots of fixes for rpc and extra imports
- 1130. By termie
-
make sure that using multicall on a call with a single result still functions
- 1131. By termie
-
cleanup the code for merging
- 1132. By termie
-
cleanups
- 1133. By termie
-
replace removed import
- 1134. By termie
-
don't put connection back in pool
- 1135. By termie
-
move consumerset killing into stop
- 1136. By termie
-
change the behavior of calling a multicall
Chris Behrens (cbehrens) wrote : | # |
termie: Strange. Well, I'd like to get this merged sooner rather than later, so it's probably okay for now. I have a couple of other starts to redoing this code even more. One of them uses kombu, which seems to simplify some things for us, but it still has some annoyances. More on that later.
I think there was one more thing I spotted when playing around with larger refactors that I didn't comment on here yet. I'll take a look at what you've changed here since I last looked..
Chris Behrens (cbehrens) wrote : | # |
Ah, I added a self.conn.close() to Service.stop() after killing the csetthread. Not terribly important.
Looks good to me.
Ed Leafe (ed-leafe) wrote : | # |
When testing if an object is a generator, relying on having an attribute named 'send' is weak and error-prone. It would be preferable to use:
isinstance(rval, types.Generator
to check for generators.
Line 239 of the diff (wait method): Logging the error should probably not only include the type of exception, but the exception message as well:
type_e = type(e)
LOG.
% locals())
In the call() method of nova/rpc.py (416 of the diff), is there any chance that MultiCall() will return either a) a non-list result or b) an empty list? If so the slice action will throw an error.
In nova/service.py, the declaration at line 109 is stylistically inconsistent. The declarations above it use full names ('consumer_all', 'consumer_node', etc.), while the last name is 'cset'; it should be 'consumer_set' for naming consistency. Also, the indentation of continued lines is inconsistent; since the preceeding lines use the standard two indents, this last declaration should follow that convention. And while I'm nitpicking, there should not be a blank line between the first 3 and the last declarations (i.e., remove line 108).
Josh Kearney (jk0) wrote : | # |
Not that it matters much, but I always prefer doing my for loops like such (in reference to the tests near the end):
for i, x in enumerate(y):
^ This eliminates having to set i = 0 and manually incrementing it inside the loop.
termie (termie) wrote : | # |
Ed and jk0, I think I've addressed all the issues you've brought up.
Chris Behrens (cbehrens) wrote : | # |
Something I spot which is a bug before your changes:
215 msg_reply(msg_id, _('No method for message: %s') % message_data)
msg_id could be None there.
Chris Behrens (cbehrens) wrote : | # |
Also: would it simplify things by removing this:
- # This will be popped off in _unpack_context
- msg_id = message_
And and using ctxt.reply going forward? See: http://
RPC test passes.
termie (termie) wrote : | # |
fixed the bug, I don't particularly want to do that other change for this but you are welcome to submit such a fixup later :)
Chris Behrens (cbehrens) wrote : | # |
I'm good with that. :)
Vish Ishaya (vishvananda) wrote : | # |
I've been testing this as we go as well. I think it is good to go.
Preview Diff
1 | === modified file 'nova/fakerabbit.py' |
2 | --- nova/fakerabbit.py 2011-02-22 23:05:48 +0000 |
3 | +++ nova/fakerabbit.py 2011-05-27 00:09:30 +0000 |
4 | @@ -31,6 +31,7 @@ |
5 | |
6 | EXCHANGES = {} |
7 | QUEUES = {} |
8 | +CONSUMERS = {} |
9 | |
10 | |
11 | class Message(base.BaseMessage): |
12 | @@ -96,17 +97,29 @@ |
13 | ' key %(routing_key)s') % locals()) |
14 | EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key) |
15 | |
16 | - def declare_consumer(self, queue, callback, *args, **kwargs): |
17 | - self.current_queue = queue |
18 | - self.current_callback = callback |
19 | + def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs): |
20 | + global CONSUMERS |
21 | + LOG.debug("Adding consumer %s", consumer_tag) |
22 | + CONSUMERS[consumer_tag] = (queue, callback) |
23 | + |
24 | + def cancel(self, consumer_tag): |
25 | + global CONSUMERS |
26 | + LOG.debug("Removing consumer %s", consumer_tag) |
27 | + del CONSUMERS[consumer_tag] |
28 | |
29 | def consume(self, limit=None): |
30 | + global CONSUMERS |
31 | + num = 0 |
32 | while True: |
33 | - item = self.get(self.current_queue) |
34 | - if item: |
35 | - self.current_callback(item) |
36 | - raise StopIteration() |
37 | - greenthread.sleep(0) |
38 | + for (queue, callback) in CONSUMERS.itervalues(): |
39 | + item = self.get(queue) |
40 | + if item: |
41 | + callback(item) |
42 | + num += 1 |
43 | + yield |
44 | + if limit and num == limit: |
45 | + raise StopIteration() |
46 | + greenthread.sleep(0.1) |
47 | |
48 | def get(self, queue, no_ack=False): |
49 | global QUEUES |
50 | @@ -134,5 +147,7 @@ |
51 | def reset_all(): |
52 | global EXCHANGES |
53 | global QUEUES |
54 | + global CONSUMERS |
55 | EXCHANGES = {} |
56 | QUEUES = {} |
57 | + CONSUMERS = {} |
58 | |
59 | === modified file 'nova/rpc.py' |
60 | --- nova/rpc.py 2011-04-20 19:08:22 +0000 |
61 | +++ nova/rpc.py 2011-05-27 00:09:30 +0000 |
62 | @@ -28,12 +28,15 @@ |
63 | import sys |
64 | import time |
65 | import traceback |
66 | +import types |
67 | import uuid |
68 | |
69 | from carrot import connection as carrot_connection |
70 | from carrot import messaging |
71 | from eventlet import greenpool |
72 | -from eventlet import greenthread |
73 | +from eventlet import pools |
74 | +from eventlet import queue |
75 | +import greenlet |
76 | |
77 | from nova import context |
78 | from nova import exception |
79 | @@ -47,7 +50,10 @@ |
80 | |
81 | |
82 | FLAGS = flags.FLAGS |
83 | -flags.DEFINE_integer('rpc_thread_pool_size', 1024, 'Size of RPC thread pool') |
84 | +flags.DEFINE_integer('rpc_thread_pool_size', 1024, |
85 | + 'Size of RPC thread pool') |
86 | +flags.DEFINE_integer('rpc_conn_pool_size', 30, |
87 | + 'Size of RPC connection pool') |
88 | |
89 | |
90 | class Connection(carrot_connection.BrokerConnection): |
91 | @@ -90,6 +96,22 @@ |
92 | return cls.instance() |
93 | |
94 | |
95 | +class Pool(pools.Pool): |
96 | + """Class that implements a Pool of Connections.""" |
97 | + |
98 | + # TODO(comstud): Timeout connections not used in a while |
99 | + def create(self): |
100 | + LOG.debug('Creating new connection') |
101 | + return Connection.instance(new=True) |
102 | + |
103 | +# Create a ConnectionPool to use for RPC calls. We'll order the |
104 | +# pool as a stack (LIFO), so that we can potentially loop through and |
105 | +# timeout old unused connections at some point |
106 | +ConnectionPool = Pool( |
107 | + max_size=FLAGS.rpc_conn_pool_size, |
108 | + order_as_stack=True) |
109 | + |
110 | + |
111 | class Consumer(messaging.Consumer): |
112 | """Consumer base class. |
113 | |
114 | @@ -131,7 +153,9 @@ |
115 | self.connection = Connection.recreate() |
116 | self.backend = self.connection.create_backend() |
117 | self.declare() |
118 | - super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks) |
119 | + return super(Consumer, self).fetch(no_ack, |
120 | + auto_ack, |
121 | + enable_callbacks) |
122 | if self.failed_connection: |
123 | LOG.error(_('Reconnected to queue')) |
124 | self.failed_connection = False |
125 | @@ -159,13 +183,13 @@ |
126 | self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size) |
127 | super(AdapterConsumer, self).__init__(connection=connection, |
128 | topic=topic) |
129 | - |
130 | - def receive(self, *args, **kwargs): |
131 | - self.pool.spawn_n(self._receive, *args, **kwargs) |
132 | - |
133 | - @exception.wrap_exception |
134 | - def _receive(self, message_data, message): |
135 | - """Magically looks for a method on the proxy object and calls it. |
136 | + self.register_callback(self.process_data) |
137 | + |
138 | + def process_data(self, message_data, message): |
139 | + """Consumer callback to call a method on a proxy object. |
140 | + |
141 | + Parses the message for validity and fires off a thread to call the |
142 | + proxy object method. |
143 | |
144 | Message data should be a dictionary with two keys: |
145 | method: string representing the method to call |
146 | @@ -175,8 +199,8 @@ |
147 | |
148 | """ |
149 | LOG.debug(_('received %s') % message_data) |
150 | - msg_id = message_data.pop('_msg_id', None) |
151 | - |
152 | + # This will be popped off in _unpack_context |
153 | + msg_id = message_data.get('_msg_id', None) |
154 | ctxt = _unpack_context(message_data) |
155 | |
156 | method = message_data.get('method') |
157 | @@ -188,8 +212,17 @@ |
158 | # we just log the message and send an error string |
159 | # back to the caller |
160 | LOG.warn(_('no method for message: %s') % message_data) |
161 | - msg_reply(msg_id, _('No method for message: %s') % message_data) |
162 | + if msg_id: |
163 | + msg_reply(msg_id, |
164 | + _('No method for message: %s') % message_data) |
165 | return |
166 | + self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args) |
167 | + |
168 | + @exception.wrap_exception |
169 | + def _process_data(self, msg_id, ctxt, method, args): |
170 | + """Thread that maigcally looks for a method on the proxy |
171 | + object and calls it. |
172 | + """ |
173 | |
174 | node_func = getattr(self.proxy, str(method)) |
175 | node_args = dict((str(k), v) for k, v in args.iteritems()) |
176 | @@ -197,7 +230,18 @@ |
177 | try: |
178 | rval = node_func(context=ctxt, **node_args) |
179 | if msg_id: |
180 | - msg_reply(msg_id, rval, None) |
181 | + # Check if the result was a generator |
182 | + if isinstance(rval, types.GeneratorType): |
183 | + for x in rval: |
184 | + msg_reply(msg_id, x, None) |
185 | + else: |
186 | + msg_reply(msg_id, rval, None) |
187 | + |
188 | + # This final None tells multicall that it is done. |
189 | + msg_reply(msg_id, None, None) |
190 | + elif isinstance(rval, types.GeneratorType): |
191 | + # NOTE(vish): this iterates through the generator |
192 | + list(rval) |
193 | except Exception as e: |
194 | logging.exception('Exception during message handling') |
195 | if msg_id: |
196 | @@ -205,11 +249,6 @@ |
197 | return |
198 | |
199 | |
200 | -class Publisher(messaging.Publisher): |
201 | - """Publisher base class.""" |
202 | - pass |
203 | - |
204 | - |
205 | class TopicAdapterConsumer(AdapterConsumer): |
206 | """Consumes messages on a specific topic.""" |
207 | |
208 | @@ -242,6 +281,58 @@ |
209 | topic=topic, proxy=proxy) |
210 | |
211 | |
212 | +class ConsumerSet(object): |
213 | + """Groups consumers to listen on together on a single connection.""" |
214 | + |
215 | + def __init__(self, connection, consumer_list): |
216 | + self.consumer_list = set(consumer_list) |
217 | + self.consumer_set = None |
218 | + self.enabled = True |
219 | + self.init(connection) |
220 | + |
221 | + def init(self, conn): |
222 | + if not conn: |
223 | + conn = Connection.instance(new=True) |
224 | + if self.consumer_set: |
225 | + self.consumer_set.close() |
226 | + self.consumer_set = messaging.ConsumerSet(conn) |
227 | + for consumer in self.consumer_list: |
228 | + consumer.connection = conn |
229 | + # consumer.backend is set for us |
230 | + self.consumer_set.add_consumer(consumer) |
231 | + |
232 | + def reconnect(self): |
233 | + self.init(None) |
234 | + |
235 | + def wait(self, limit=None): |
236 | + running = True |
237 | + while running: |
238 | + it = self.consumer_set.iterconsume(limit=limit) |
239 | + if not it: |
240 | + break |
241 | + while True: |
242 | + try: |
243 | + it.next() |
244 | + except StopIteration: |
245 | + return |
246 | + except greenlet.GreenletExit: |
247 | + running = False |
248 | + break |
249 | + except Exception as e: |
250 | + LOG.exception(_("Exception while processing consumer")) |
251 | + self.reconnect() |
252 | + # Break to outer loop |
253 | + break |
254 | + |
255 | + def close(self): |
256 | + self.consumer_set.close() |
257 | + |
258 | + |
259 | +class Publisher(messaging.Publisher): |
260 | + """Publisher base class.""" |
261 | + pass |
262 | + |
263 | + |
264 | class TopicPublisher(Publisher): |
265 | """Publishes messages on a specific topic.""" |
266 | |
267 | @@ -306,16 +397,18 @@ |
268 | LOG.error(_("Returning exception %s to caller"), message) |
269 | LOG.error(tb) |
270 | failure = (failure[0].__name__, str(failure[1]), tb) |
271 | - conn = Connection.instance() |
272 | - publisher = DirectPublisher(connection=conn, msg_id=msg_id) |
273 | - try: |
274 | - publisher.send({'result': reply, 'failure': failure}) |
275 | - except TypeError: |
276 | - publisher.send( |
277 | - {'result': dict((k, repr(v)) |
278 | - for k, v in reply.__dict__.iteritems()), |
279 | - 'failure': failure}) |
280 | - publisher.close() |
281 | + |
282 | + with ConnectionPool.item() as conn: |
283 | + publisher = DirectPublisher(connection=conn, msg_id=msg_id) |
284 | + try: |
285 | + publisher.send({'result': reply, 'failure': failure}) |
286 | + except TypeError: |
287 | + publisher.send( |
288 | + {'result': dict((k, repr(v)) |
289 | + for k, v in reply.__dict__.iteritems()), |
290 | + 'failure': failure}) |
291 | + |
292 | + publisher.close() |
293 | |
294 | |
295 | class RemoteError(exception.Error): |
296 | @@ -347,8 +440,9 @@ |
297 | if key.startswith('_context_'): |
298 | value = msg.pop(key) |
299 | context_dict[key[9:]] = value |
300 | + context_dict['msg_id'] = msg.pop('_msg_id', None) |
301 | LOG.debug(_('unpacked context: %s'), context_dict) |
302 | - return context.RequestContext.from_dict(context_dict) |
303 | + return RpcContext.from_dict(context_dict) |
304 | |
305 | |
306 | def _pack_context(msg, context): |
307 | @@ -360,70 +454,112 @@ |
308 | for args at some point. |
309 | |
310 | """ |
311 | - context = dict([('_context_%s' % key, value) |
312 | - for (key, value) in context.to_dict().iteritems()]) |
313 | - msg.update(context) |
314 | - |
315 | - |
316 | -def call(context, topic, msg): |
317 | - """Sends a message on a topic and wait for a response.""" |
318 | + context_d = dict([('_context_%s' % key, value) |
319 | + for (key, value) in context.to_dict().iteritems()]) |
320 | + msg.update(context_d) |
321 | + |
322 | + |
323 | +class RpcContext(context.RequestContext): |
324 | + def __init__(self, *args, **kwargs): |
325 | + msg_id = kwargs.pop('msg_id', None) |
326 | + self.msg_id = msg_id |
327 | + super(RpcContext, self).__init__(*args, **kwargs) |
328 | + |
329 | + def reply(self, *args, **kwargs): |
330 | + msg_reply(self.msg_id, *args, **kwargs) |
331 | + |
332 | + |
333 | +def multicall(context, topic, msg): |
334 | + """Make a call that returns multiple times.""" |
335 | LOG.debug(_('Making asynchronous call on %s ...'), topic) |
336 | msg_id = uuid.uuid4().hex |
337 | msg.update({'_msg_id': msg_id}) |
338 | LOG.debug(_('MSG_ID is %s') % (msg_id)) |
339 | _pack_context(msg, context) |
340 | |
341 | - class WaitMessage(object): |
342 | - def __call__(self, data, message): |
343 | - """Acks message and sets result.""" |
344 | - message.ack() |
345 | - if data['failure']: |
346 | - self.result = RemoteError(*data['failure']) |
347 | - else: |
348 | - self.result = data['result'] |
349 | - |
350 | - wait_msg = WaitMessage() |
351 | - conn = Connection.instance() |
352 | - consumer = DirectConsumer(connection=conn, msg_id=msg_id) |
353 | + con_conn = ConnectionPool.get() |
354 | + consumer = DirectConsumer(connection=con_conn, msg_id=msg_id) |
355 | + wait_msg = MulticallWaiter(consumer) |
356 | consumer.register_callback(wait_msg) |
357 | |
358 | - conn = Connection.instance() |
359 | - publisher = TopicPublisher(connection=conn, topic=topic) |
360 | + publisher = TopicPublisher(connection=con_conn, topic=topic) |
361 | publisher.send(msg) |
362 | publisher.close() |
363 | |
364 | - try: |
365 | - consumer.wait(limit=1) |
366 | - except StopIteration: |
367 | - pass |
368 | - consumer.close() |
369 | - # NOTE(termie): this is a little bit of a change from the original |
370 | - # non-eventlet code where returning a Failure |
371 | - # instance from a deferred call is very similar to |
372 | - # raising an exception |
373 | - if isinstance(wait_msg.result, Exception): |
374 | - raise wait_msg.result |
375 | - return wait_msg.result |
376 | + return wait_msg |
377 | + |
378 | + |
379 | +class MulticallWaiter(object): |
380 | + def __init__(self, consumer): |
381 | + self._consumer = consumer |
382 | + self._results = queue.Queue() |
383 | + self._closed = False |
384 | + |
385 | + def close(self): |
386 | + self._closed = True |
387 | + self._consumer.close() |
388 | + ConnectionPool.put(self._consumer.connection) |
389 | + |
390 | + def __call__(self, data, message): |
391 | + """Acks message and sets result.""" |
392 | + message.ack() |
393 | + if data['failure']: |
394 | + self._results.put(RemoteError(*data['failure'])) |
395 | + else: |
396 | + self._results.put(data['result']) |
397 | + |
398 | + def __iter__(self): |
399 | + return self.wait() |
400 | + |
401 | + def wait(self): |
402 | + while True: |
403 | + rv = None |
404 | + while rv is None and not self._closed: |
405 | + try: |
406 | + rv = self._consumer.fetch(enable_callbacks=True) |
407 | + except Exception: |
408 | + self.close() |
409 | + raise |
410 | + time.sleep(0.01) |
411 | + |
412 | + result = self._results.get() |
413 | + if isinstance(result, Exception): |
414 | + self.close() |
415 | + raise result |
416 | + if result == None: |
417 | + self.close() |
418 | + raise StopIteration |
419 | + yield result |
420 | + |
421 | + |
422 | +def call(context, topic, msg): |
423 | + """Sends a message on a topic and wait for a response.""" |
424 | + rv = multicall(context, topic, msg) |
425 | + # NOTE(vish): return the last result from the multicall |
426 | + rv = list(rv) |
427 | + if not rv: |
428 | + return |
429 | + return rv[-1] |
430 | |
431 | |
432 | def cast(context, topic, msg): |
433 | """Sends a message on a topic without waiting for a response.""" |
434 | LOG.debug(_('Making asynchronous cast on %s...'), topic) |
435 | _pack_context(msg, context) |
436 | - conn = Connection.instance() |
437 | - publisher = TopicPublisher(connection=conn, topic=topic) |
438 | - publisher.send(msg) |
439 | - publisher.close() |
440 | + with ConnectionPool.item() as conn: |
441 | + publisher = TopicPublisher(connection=conn, topic=topic) |
442 | + publisher.send(msg) |
443 | + publisher.close() |
444 | |
445 | |
446 | def fanout_cast(context, topic, msg): |
447 | """Sends a message on a fanout exchange without waiting for a response.""" |
448 | LOG.debug(_('Making asynchronous fanout cast...')) |
449 | _pack_context(msg, context) |
450 | - conn = Connection.instance() |
451 | - publisher = FanoutPublisher(topic, connection=conn) |
452 | - publisher.send(msg) |
453 | - publisher.close() |
454 | + with ConnectionPool.item() as conn: |
455 | + publisher = FanoutPublisher(topic, connection=conn) |
456 | + publisher.send(msg) |
457 | + publisher.close() |
458 | |
459 | |
460 | def generic_response(message_data, message): |
461 | @@ -459,6 +595,7 @@ |
462 | |
463 | if wait: |
464 | consumer.wait() |
465 | + consumer.close() |
466 | |
467 | |
468 | if __name__ == '__main__': |
469 | |
470 | === modified file 'nova/service.py' |
471 | --- nova/service.py 2011-05-20 04:03:15 +0000 |
472 | +++ nova/service.py 2011-05-27 00:09:30 +0000 |
473 | @@ -19,14 +19,11 @@ |
474 | |
475 | """Generic Node baseclass for all workers that run on hosts.""" |
476 | |
477 | +import greenlet |
478 | import inspect |
479 | import os |
480 | -import sys |
481 | -import time |
482 | |
483 | -from eventlet import event |
484 | from eventlet import greenthread |
485 | -from eventlet import greenpool |
486 | |
487 | from nova import context |
488 | from nova import db |
489 | @@ -91,27 +88,37 @@ |
490 | if 'nova-compute' == self.binary: |
491 | self.manager.update_available_resource(ctxt) |
492 | |
493 | - conn1 = rpc.Connection.instance(new=True) |
494 | - conn2 = rpc.Connection.instance(new=True) |
495 | - conn3 = rpc.Connection.instance(new=True) |
496 | + self.conn = rpc.Connection.instance(new=True) |
497 | + logging.debug("Creating Consumer connection for Service %s" % |
498 | + self.topic) |
499 | + |
500 | + # Share this same connection for these Consumers |
501 | + consumer_all = rpc.TopicAdapterConsumer( |
502 | + connection=self.conn, |
503 | + topic=self.topic, |
504 | + proxy=self) |
505 | + consumer_node = rpc.TopicAdapterConsumer( |
506 | + connection=self.conn, |
507 | + topic='%s.%s' % (self.topic, self.host), |
508 | + proxy=self) |
509 | + fanout = rpc.FanoutAdapterConsumer( |
510 | + connection=self.conn, |
511 | + topic=self.topic, |
512 | + proxy=self) |
513 | + consumer_set = rpc.ConsumerSet( |
514 | + connection=self.conn, |
515 | + consumer_list=[consumer_all, consumer_node, fanout]) |
516 | + |
517 | + # Wait forever, processing these consumers |
518 | + def _wait(): |
519 | + try: |
520 | + consumer_set.wait() |
521 | + finally: |
522 | + consumer_set.close() |
523 | + |
524 | + self.consumer_set_thread = greenthread.spawn(_wait) |
525 | + |
526 | if self.report_interval: |
527 | - consumer_all = rpc.TopicAdapterConsumer( |
528 | - connection=conn1, |
529 | - topic=self.topic, |
530 | - proxy=self) |
531 | - consumer_node = rpc.TopicAdapterConsumer( |
532 | - connection=conn2, |
533 | - topic='%s.%s' % (self.topic, self.host), |
534 | - proxy=self) |
535 | - fanout = rpc.FanoutAdapterConsumer( |
536 | - connection=conn3, |
537 | - topic=self.topic, |
538 | - proxy=self) |
539 | - |
540 | - self.timers.append(consumer_all.attach_to_eventlet()) |
541 | - self.timers.append(consumer_node.attach_to_eventlet()) |
542 | - self.timers.append(fanout.attach_to_eventlet()) |
543 | - |
544 | pulse = utils.LoopingCall(self.report_state) |
545 | pulse.start(interval=self.report_interval, now=False) |
546 | self.timers.append(pulse) |
547 | @@ -174,6 +181,11 @@ |
548 | logging.warn(_('Service killed that has no database entry')) |
549 | |
550 | def stop(self): |
551 | + self.consumer_set_thread.kill() |
552 | + try: |
553 | + self.consumer_set_thread.wait() |
554 | + except greenlet.GreenletExit: |
555 | + pass |
556 | for x in self.timers: |
557 | try: |
558 | x.stop() |
559 | |
560 | === modified file 'nova/test.py' |
561 | --- nova/test.py 2011-04-20 19:08:22 +0000 |
562 | +++ nova/test.py 2011-05-27 00:09:30 +0000 |
563 | @@ -31,17 +31,15 @@ |
564 | import unittest |
565 | |
566 | import mox |
567 | -import shutil |
568 | import stubout |
569 | from eventlet import greenthread |
570 | |
571 | -from nova import context |
572 | -from nova import db |
573 | from nova import fakerabbit |
574 | from nova import flags |
575 | from nova import rpc |
576 | from nova import service |
577 | from nova import wsgi |
578 | +from nova.virt import fake |
579 | |
580 | |
581 | FLAGS = flags.FLAGS |
582 | @@ -85,6 +83,7 @@ |
583 | self._monkey_patch_attach() |
584 | self._monkey_patch_wsgi() |
585 | self._original_flags = FLAGS.FlagValuesDict() |
586 | + rpc.ConnectionPool = rpc.Pool(max_size=FLAGS.rpc_conn_pool_size) |
587 | |
588 | def tearDown(self): |
589 | """Runs after each test method to tear down test environment.""" |
590 | @@ -99,6 +98,10 @@ |
591 | if FLAGS.fake_rabbit: |
592 | fakerabbit.reset_all() |
593 | |
594 | + if FLAGS.connection_type == 'fake': |
595 | + if hasattr(fake.FakeConnection, '_instance'): |
596 | + del fake.FakeConnection._instance |
597 | + |
598 | # Reset any overriden flags |
599 | self.reset_flags() |
600 | |
601 | |
602 | === modified file 'nova/tests/integrated/integrated_helpers.py' |
603 | --- nova/tests/integrated/integrated_helpers.py 2011-03-30 16:08:36 +0000 |
604 | +++ nova/tests/integrated/integrated_helpers.py 2011-05-27 00:09:30 +0000 |
605 | @@ -154,10 +154,7 @@ |
606 | # set up services |
607 | self.start_service('compute') |
608 | self.start_service('volume') |
609 | - # NOTE(justinsb): There's a bug here which is eluding me... |
610 | - # If we start the network_service, all is good, but then subsequent |
611 | - # tests fail: CloudTestCase.test_ajax_console in particular. |
612 | - #self.start_service('network') |
613 | + self.start_service('network') |
614 | self.start_service('scheduler') |
615 | |
616 | self._start_api_service() |
617 | |
618 | === modified file 'nova/tests/test_cloud.py' |
619 | --- nova/tests/test_cloud.py 2011-05-20 06:51:29 +0000 |
620 | +++ nova/tests/test_cloud.py 2011-05-27 00:09:30 +0000 |
621 | @@ -17,13 +17,9 @@ |
622 | # under the License. |
623 | |
624 | from base64 import b64decode |
625 | -import json |
626 | from M2Crypto import BIO |
627 | from M2Crypto import RSA |
628 | import os |
629 | -import shutil |
630 | -import tempfile |
631 | -import time |
632 | |
633 | from eventlet import greenthread |
634 | |
635 | @@ -33,12 +29,10 @@ |
636 | from nova import flags |
637 | from nova import log as logging |
638 | from nova import rpc |
639 | -from nova import service |
640 | from nova import test |
641 | from nova import utils |
642 | from nova import exception |
643 | from nova.auth import manager |
644 | -from nova.compute import power_state |
645 | from nova.api.ec2 import cloud |
646 | from nova.api.ec2 import ec2utils |
647 | from nova.image import local |
648 | @@ -79,14 +73,21 @@ |
649 | self.stubs.Set(local.LocalImageService, 'show', fake_show) |
650 | self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show) |
651 | |
652 | + # NOTE(vish): set up a manual wait so rpc.cast has a chance to finish |
653 | + rpc_cast = rpc.cast |
654 | + |
655 | + def finish_cast(*args, **kwargs): |
656 | + rpc_cast(*args, **kwargs) |
657 | + greenthread.sleep(0.2) |
658 | + |
659 | + self.stubs.Set(rpc, 'cast', finish_cast) |
660 | + |
661 | def tearDown(self): |
662 | network_ref = db.project_get_network(self.context, |
663 | self.project.id) |
664 | db.network_disassociate(self.context, network_ref['id']) |
665 | self.manager.delete_project(self.project) |
666 | self.manager.delete_user(self.user) |
667 | - self.compute.kill() |
668 | - self.network.kill() |
669 | super(CloudTestCase, self).tearDown() |
670 | |
671 | def _create_key(self, name): |
672 | @@ -113,7 +114,6 @@ |
673 | self.cloud.describe_addresses(self.context) |
674 | self.cloud.release_address(self.context, |
675 | public_ip=address) |
676 | - greenthread.sleep(0.3) |
677 | db.floating_ip_destroy(self.context, address) |
678 | |
679 | def test_associate_disassociate_address(self): |
680 | @@ -129,12 +129,10 @@ |
681 | self.cloud.associate_address(self.context, |
682 | instance_id=ec2_id, |
683 | public_ip=address) |
684 | - greenthread.sleep(0.3) |
685 | self.cloud.disassociate_address(self.context, |
686 | public_ip=address) |
687 | self.cloud.release_address(self.context, |
688 | public_ip=address) |
689 | - greenthread.sleep(0.3) |
690 | self.network.deallocate_fixed_ip(self.context, fixed) |
691 | db.instance_destroy(self.context, inst['id']) |
692 | db.floating_ip_destroy(self.context, address) |
693 | @@ -306,31 +304,25 @@ |
694 | 'instance_type': instance_type, |
695 | 'max_count': max_count} |
696 | rv = self.cloud.run_instances(self.context, **kwargs) |
697 | - greenthread.sleep(0.3) |
698 | instance_id = rv['instancesSet'][0]['instanceId'] |
699 | output = self.cloud.get_console_output(context=self.context, |
700 | instance_id=[instance_id]) |
701 | self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE?OUTPUT') |
702 | # TODO(soren): We need this until we can stop polling in the rpc code |
703 | # for unit tests. |
704 | - greenthread.sleep(0.3) |
705 | rv = self.cloud.terminate_instances(self.context, [instance_id]) |
706 | - greenthread.sleep(0.3) |
707 | |
708 | def test_ajax_console(self): |
709 | kwargs = {'image_id': 'ami-1'} |
710 | rv = self.cloud.run_instances(self.context, **kwargs) |
711 | instance_id = rv['instancesSet'][0]['instanceId'] |
712 | - greenthread.sleep(0.3) |
713 | output = self.cloud.get_ajax_console(context=self.context, |
714 | instance_id=[instance_id]) |
715 | self.assertEquals(output['url'], |
716 | '%s/?token=FAKETOKEN' % FLAGS.ajax_console_proxy_url) |
717 | # TODO(soren): We need this until we can stop polling in the rpc code |
718 | # for unit tests. |
719 | - greenthread.sleep(0.3) |
720 | rv = self.cloud.terminate_instances(self.context, [instance_id]) |
721 | - greenthread.sleep(0.3) |
722 | |
723 | def test_key_generation(self): |
724 | result = self._create_key('test') |
725 | |
726 | === modified file 'nova/tests/test_rpc.py' |
727 | --- nova/tests/test_rpc.py 2011-02-23 22:41:11 +0000 |
728 | +++ nova/tests/test_rpc.py 2011-05-27 00:09:30 +0000 |
729 | @@ -31,7 +31,6 @@ |
730 | |
731 | |
732 | class RpcTestCase(test.TestCase): |
733 | - """Test cases for rpc""" |
734 | def setUp(self): |
735 | super(RpcTestCase, self).setUp() |
736 | self.conn = rpc.Connection.instance(True) |
737 | @@ -43,14 +42,55 @@ |
738 | self.context = context.get_admin_context() |
739 | |
740 | def test_call_succeed(self): |
741 | - """Get a value through rpc call""" |
742 | value = 42 |
743 | result = rpc.call(self.context, 'test', {"method": "echo", |
744 | "args": {"value": value}}) |
745 | self.assertEqual(value, result) |
746 | |
747 | + def test_call_succeed_despite_multiple_returns(self): |
748 | + value = 42 |
749 | + result = rpc.call(self.context, 'test', {"method": "echo_three_times", |
750 | + "args": {"value": value}}) |
751 | + self.assertEqual(value + 2, result) |
752 | + |
753 | + def test_call_succeed_despite_multiple_returns_yield(self): |
754 | + value = 42 |
755 | + result = rpc.call(self.context, 'test', |
756 | + {"method": "echo_three_times_yield", |
757 | + "args": {"value": value}}) |
758 | + self.assertEqual(value + 2, result) |
759 | + |
760 | + def test_multicall_succeed_once(self): |
761 | + value = 42 |
762 | + result = rpc.multicall(self.context, |
763 | + 'test', |
764 | + {"method": "echo", |
765 | + "args": {"value": value}}) |
766 | + for i, x in enumerate(result): |
767 | + if i > 0: |
768 | + self.fail('should only receive one response') |
769 | + self.assertEqual(value + i, x) |
770 | + |
771 | + def test_multicall_succeed_three_times(self): |
772 | + value = 42 |
773 | + result = rpc.multicall(self.context, |
774 | + 'test', |
775 | + {"method": "echo_three_times", |
776 | + "args": {"value": value}}) |
777 | + for i, x in enumerate(result): |
778 | + self.assertEqual(value + i, x) |
779 | + |
780 | + def test_multicall_succeed_three_times_yield(self): |
781 | + value = 42 |
782 | + result = rpc.multicall(self.context, |
783 | + 'test', |
784 | + {"method": "echo_three_times_yield", |
785 | + "args": {"value": value}}) |
786 | + for i, x in enumerate(result): |
787 | + self.assertEqual(value + i, x) |
788 | + |
789 | def test_context_passed(self): |
790 | - """Makes sure a context is passed through rpc call""" |
791 | + """Makes sure a context is passed through rpc call.""" |
792 | value = 42 |
793 | result = rpc.call(self.context, |
794 | 'test', {"method": "context", |
795 | @@ -58,11 +98,12 @@ |
796 | self.assertEqual(self.context.to_dict(), result) |
797 | |
798 | def test_call_exception(self): |
799 | - """Test that exception gets passed back properly |
800 | + """Test that exception gets passed back properly. |
801 | |
802 | rpc.call returns a RemoteError object. The value of the |
803 | exception is converted to a string, so we convert it back |
804 | to an int in the test. |
805 | + |
806 | """ |
807 | value = 42 |
808 | self.assertRaises(rpc.RemoteError, |
809 | @@ -81,7 +122,7 @@ |
810 | self.assertEqual(int(exc.value), value) |
811 | |
812 | def test_nested_calls(self): |
813 | - """Test that we can do an rpc.call inside another call""" |
814 | + """Test that we can do an rpc.call inside another call.""" |
815 | class Nested(object): |
816 | @staticmethod |
817 | def echo(context, queue, value): |
818 | @@ -108,25 +149,80 @@ |
819 | "value": value}}) |
820 | self.assertEqual(value, result) |
821 | |
822 | + def test_connectionpool_single(self): |
823 | + """Test that ConnectionPool recycles a single connection.""" |
824 | + conn1 = rpc.ConnectionPool.get() |
825 | + rpc.ConnectionPool.put(conn1) |
826 | + conn2 = rpc.ConnectionPool.get() |
827 | + rpc.ConnectionPool.put(conn2) |
828 | + self.assertEqual(conn1, conn2) |
829 | + |
830 | + def test_connectionpool_double(self): |
831 | + """Test that ConnectionPool returns and reuses separate connections. |
832 | + |
833 | + When called consecutively we should get separate connections and upon |
834 | + returning them those connections should be reused for future calls |
835 | + before generating a new connection. |
836 | + |
837 | + """ |
838 | + conn1 = rpc.ConnectionPool.get() |
839 | + conn2 = rpc.ConnectionPool.get() |
840 | + |
841 | + self.assertNotEqual(conn1, conn2) |
842 | + rpc.ConnectionPool.put(conn1) |
843 | + rpc.ConnectionPool.put(conn2) |
844 | + |
845 | + conn3 = rpc.ConnectionPool.get() |
846 | + conn4 = rpc.ConnectionPool.get() |
847 | + self.assertEqual(conn1, conn3) |
848 | + self.assertEqual(conn2, conn4) |
849 | + |
850 | + def test_connectionpool_limit(self): |
851 | + """Test connection pool limit and connection uniqueness.""" |
852 | + max_size = FLAGS.rpc_conn_pool_size |
853 | + conns = [] |
854 | + |
855 | + for i in xrange(max_size): |
856 | + conns.append(rpc.ConnectionPool.get()) |
857 | + |
858 | + self.assertFalse(rpc.ConnectionPool.free_items) |
859 | + self.assertEqual(rpc.ConnectionPool.current_size, |
860 | + rpc.ConnectionPool.max_size) |
861 | + self.assertEqual(len(set(conns)), max_size) |
862 | + |
863 | |
864 | class TestReceiver(object): |
865 | - """Simple Proxy class so the consumer has methods to call |
866 | - |
867 | - Uses static methods because we aren't actually storing any state""" |
868 | + """Simple Proxy class so the consumer has methods to call. |
869 | + |
870 | + Uses static methods because we aren't actually storing any state. |
871 | + |
872 | + """ |
873 | |
874 | @staticmethod |
875 | def echo(context, value): |
876 | - """Simply returns whatever value is sent in""" |
877 | + """Simply returns whatever value is sent in.""" |
878 | LOG.debug(_("Received %s"), value) |
879 | return value |
880 | |
881 | @staticmethod |
882 | def context(context, value): |
883 | - """Returns dictionary version of context""" |
884 | + """Returns dictionary version of context.""" |
885 | LOG.debug(_("Received %s"), context) |
886 | return context.to_dict() |
887 | |
888 | @staticmethod |
889 | + def echo_three_times(context, value): |
890 | + context.reply(value) |
891 | + context.reply(value + 1) |
892 | + context.reply(value + 2) |
893 | + |
894 | + @staticmethod |
895 | + def echo_three_times_yield(context, value): |
896 | + yield value |
897 | + yield value + 1 |
898 | + yield value + 2 |
899 | + |
900 | + @staticmethod |
901 | def fail(context, value): |
902 | - """Raises an exception with the value sent in""" |
903 | + """Raises an exception with the value sent in.""" |
904 | raise Exception(value) |
905 | |
906 | === modified file 'nova/tests/test_service.py' |
907 | --- nova/tests/test_service.py 2011-03-17 13:35:00 +0000 |
908 | +++ nova/tests/test_service.py 2011-05-27 00:09:30 +0000 |
909 | @@ -106,7 +106,10 @@ |
910 | |
911 | # NOTE(vish): Create was moved out of mox replay to make sure that |
912 | # the looping calls are created in StartService. |
913 | - app = service.Service.create(host=host, binary=binary) |
914 | + app = service.Service.create(host=host, binary=binary, topic=topic) |
915 | + |
916 | + self.mox.StubOutWithMock(service.rpc.Connection, 'instance') |
917 | + service.rpc.Connection.instance(new=mox.IgnoreArg()) |
918 | |
919 | self.mox.StubOutWithMock(rpc, |
920 | 'TopicAdapterConsumer', |
921 | @@ -114,6 +117,11 @@ |
922 | self.mox.StubOutWithMock(rpc, |
923 | 'FanoutAdapterConsumer', |
924 | use_mock_anything=True) |
925 | + |
926 | + self.mox.StubOutWithMock(rpc, |
927 | + 'ConsumerSet', |
928 | + use_mock_anything=True) |
929 | + |
930 | rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(), |
931 | topic=topic, |
932 | proxy=mox.IsA(service.Service)).AndReturn( |
933 | @@ -129,9 +137,14 @@ |
934 | proxy=mox.IsA(service.Service)).AndReturn( |
935 | rpc.FanoutAdapterConsumer) |
936 | |
937 | - rpc.TopicAdapterConsumer.attach_to_eventlet() |
938 | - rpc.TopicAdapterConsumer.attach_to_eventlet() |
939 | - rpc.FanoutAdapterConsumer.attach_to_eventlet() |
940 | + def wait_func(self, limit=None): |
941 | + return None |
942 | + |
943 | + mock_cset = self.mox.CreateMock(rpc.ConsumerSet, |
944 | + {'wait': wait_func}) |
945 | + rpc.ConsumerSet(connection=mox.IgnoreArg(), |
946 | + consumer_list=mox.IsA(list)).AndReturn(mock_cset) |
947 | + wait_func(mox.IgnoreArg()) |
948 | |
949 | service_create = {'host': host, |
950 | 'binary': binary, |
951 | @@ -287,8 +300,42 @@ |
952 | # Creating mocks |
953 | self.mox.StubOutWithMock(service.rpc.Connection, 'instance') |
954 | service.rpc.Connection.instance(new=mox.IgnoreArg()) |
955 | - service.rpc.Connection.instance(new=mox.IgnoreArg()) |
956 | - service.rpc.Connection.instance(new=mox.IgnoreArg()) |
957 | + |
958 | + self.mox.StubOutWithMock(rpc, |
959 | + 'TopicAdapterConsumer', |
960 | + use_mock_anything=True) |
961 | + self.mox.StubOutWithMock(rpc, |
962 | + 'FanoutAdapterConsumer', |
963 | + use_mock_anything=True) |
964 | + |
965 | + self.mox.StubOutWithMock(rpc, |
966 | + 'ConsumerSet', |
967 | + use_mock_anything=True) |
968 | + |
969 | + rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(), |
970 | + topic=topic, |
971 | + proxy=mox.IsA(service.Service)).AndReturn( |
972 | + rpc.TopicAdapterConsumer) |
973 | + |
974 | + rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(), |
975 | + topic='%s.%s' % (topic, host), |
976 | + proxy=mox.IsA(service.Service)).AndReturn( |
977 | + rpc.TopicAdapterConsumer) |
978 | + |
979 | + rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(), |
980 | + topic=topic, |
981 | + proxy=mox.IsA(service.Service)).AndReturn( |
982 | + rpc.FanoutAdapterConsumer) |
983 | + |
984 | + def wait_func(self, limit=None): |
985 | + return None |
986 | + |
987 | + mock_cset = self.mox.CreateMock(rpc.ConsumerSet, |
988 | + {'wait': wait_func}) |
989 | + rpc.ConsumerSet(connection=mox.IgnoreArg(), |
990 | + consumer_list=mox.IsA(list)).AndReturn(mock_cset) |
991 | + wait_func(mox.IgnoreArg()) |
992 | + |
993 | self.mox.StubOutWithMock(serv.manager.driver, |
994 | 'update_available_resource') |
995 | serv.manager.driver.update_available_resource(mox.IgnoreArg(), host) |
Added ConnectionPool tests, and changed the Pool to be LiFO: lp:~cbehrens/nova/rpc_multicall
(See comments added around ConnectionPool in rpc.py)
One other problem I can think of here:
MulticallWaiter() will put the connection back into the pool as long as its 'close' method is called.
But what happens if an exception occurs between someone pulling results and calling .close()?
Should we put in a def __del__() in MulticallWaiter() to make absolutely sure we .put the connection back when we're done with the result?