Merge lp:~termie/nova/rpc_multicall into lp:~hudson-openstack/nova/trunk

Proposed by termie
Status: Merged
Approved by: Vish Ishaya
Approved revision: 1138
Merged at revision: 1116
Proposed branch: lp:~termie/nova/rpc_multicall
Merge into: lp:~hudson-openstack/nova/trunk
Diff against target: 995 lines (+443/-144)
8 files modified
nova/fakerabbit.py (+23/-8)
nova/rpc.py (+208/-71)
nova/service.py (+36/-24)
nova/test.py (+6/-3)
nova/tests/integrated/integrated_helpers.py (+1/-4)
nova/tests/test_cloud.py (+9/-17)
nova/tests/test_rpc.py (+107/-11)
nova/tests/test_service.py (+53/-6)
To merge this branch: bzr merge lp:~termie/nova/rpc_multicall
Reviewer Review Type Date Requested Status
Vish Ishaya (community) Approve
Ed Leafe (community) Approve
Chris Behrens (community) Approve
Jay Pipes (community) Needs Information
Review via email: mp+61686@code.launchpad.net

Description of the change

Adds the ability to make a call that returns multiple times (a call returning a generator). This is also based on the work in rpc-improvements + a bunch of fixes Vish and I worked through to get all the tests to pass so the code is a bit all over the place.

The functionality is being added to support Vish's work on removing worker access to the database, this allows us to write multi-phase actions that yield state updates as they progress, letting the frontend update the db.

To post a comment you must log in.
Revision history for this message
Chris Behrens (cbehrens) wrote :

Added ConnectionPool tests, and changed the Pool to be LiFO: lp:~cbehrens/nova/rpc_multicall
(See comments added around ConnectionPool in rpc.py)

One other problem I can think of here:

MulticallWaiter() will put the connection back into the pool as long as its 'close' method is called.
But what happens if an exception occurs between someone pulling results and calling .close()?

Should we put in a def __del__() in MulticallWaiter() to make absolutely sure we .put the connection back when we're done with the result?

lp:~termie/nova/rpc_multicall updated
1088. By Justin Shepherd

The tools/* directory is now included in pep8 runs. Added an opt-out system for excluding files/dirs from pep8 (using GLOBIGNORE).

1089. By Johannes Erdfelt

The XenAPI driver uses openssl as part of the nova-agent implementation to set the password for root. It uses a temporary file insecurely and unnecessarily. Change the code to write the password directly to stdin of the openssl process instead.

1090. By Johannes Erdfelt

Add new flag 'max_kernel_ramdisk_size' to specify a maximum size of kernel or ramdisk so we don't copy large files to dom0 and fill up /boot/guest

1091. By Anthony Young

This fix ensures that kpartx -d is called in the event that tune2fs fails during key injection, as it does when trying to inject a key into a windows instance. Bug #760921 is a symptom of this issue, as if kpartx -d is not called then partitions remain mapped that prevent the underlying nbd from being reused.

Couldn't think of a good way to regression test for this - any ideas?

1092. By John Tran

Added an EC2 API endpoint that'll allow import of public key. Prior, api only allowed generation of new keys.

1093. By Dan Prince

Update OSAPI v1.1 extensions so that it supports RequestExtensions. ResponseExtensions were removed since the new RequestExtension covers both use cases. This branch also removes some of the odd serialization code in the RequestExtensionController that converted dictionary objects into webob objects. RequestExtension handlers should now always return proper webob objects.

1094. By William Wolf

Get rid of old virt/images.py functions that are no longer needed. Checked for any loose calls to these functions and found none. All tests pass for me.

1095. By Rick Harris

This is the groundwork for the upcoming distributed scheduler changes. Nothing is actually wired up here, so it shouldn't break any existing code (and all tests pass).

The goals were to:

1. Define the basic distributed scheduler communication mechanism:
   a. call_zone_method - how each zone can communicate with its children

   b. encrypted child-blobs - how child zones an securely and statelessly report back weight and build-plan info

2. Put in hooks for advanced-filtering (hard-requirements, capabilities) as well as preferences (least-cost-scheduling)

3. Create a base set of dist-scheduler tests that we can extend as we add more functionality.

Next up will be to:

1. Add in a filtering driver

2. Add in a cost-scheduler driver

1096. By Eldar Nugaev

print information about nova-manage project problems

1097. By Vish Ishaya

Makes sure vlan creation locks so we don't race and fail to create a vlan.

1098. By Soren Hansen

Include data files for public key tests in the tarball.

1099. By <email address hidden>

found a typo in the xenserver glance plugin that doesn't work with glance trunk. Also modified the image url to fetch from /v1/image/X instead of /image/X as that returned a 300.

1100. By Andrey Brindeyev

--dhcp-lease-max=150 by default. This prevents >150 instances in one network.

Revision history for this message
Jay Pipes (jaypipes) wrote :

Hey terms. Looks great overall, including Chris' additions and LiFO.

One little question, though.

37 - greenthread.sleep(0)
38 + for (queue, callback) in CONSUMERS.itervalues():
39 + item = self.get(queue)
40 + if item:
41 + callback(item)
42 + num += 1
43 + yield
44 + if limit and num == limit:
45 + raise StopIteration()
46 + greenthread.sleep(0.1)

Why did you change the sleep time from 0 to 0.1. Did you notice something lock up when it was at 0?

Just curious :) Cheers!
-jay

review: Needs Information
Revision history for this message
Chris Behrens (cbehrens) wrote :

Jay: I think it may be so that there's less CPU spinning... though this is just in fakerabbit.

I'm worried about the 'time.sleep(0.01)' in MulticallWaiter, though. That's a lot of send/recvs spinning until the result actually comes into the queue...

What about this:

    def wait(self):
        while not self._closed:
            try:
                self._consumer.wait(limit=1)
            except Exception:
                self.close()
                raise

            result = self._results.get()
            if isinstance(result, Exception):
                self.close()
                raise result
            if result == None:
                self.close()
                raise StopIteration
            yield result
        raise StopIteration

That removes the need for the sleep and will be less harsh on rabbit.. wait() will block until a result is received (eventlet will be polling for data). And if one happens to call wait() after close(), StopIteration is raised.

Couple other things:

1) Service.kill puts self.conn back into the pool, but it never came out of the pool in the first place. I'd just remove the 'put'. No need to use the pool for it right now, and it somewhat breaks the pool if ConsumerSet needs to reconnect to rabbit.
2) Should this code that kills the csetthread, etc, actually be in Service.stop?
3) There's no handling of reconnecting to rabbit if the connection dies. We really need a larger re-factor. But since only 'call' is using this right now, and 'call' already didn't handle reconnecting, there's not really any behavior change.

lp:~termie/nova/rpc_multicall updated
1101. By justinsb

Fix bug #744150 by starting nova-api on an unused port.

1102. By Renuka Apte

Fixes euca-attach-volume for iscsi using Xenserver

Minor changes required to xenapi functions to get correct format for volume-id, iscsi-host, etc.

1103. By Anne Gentle

Fixes some minor doc issues - misspelled flags in zones doc and also adds zones doc to an index for easier findability

1104. By Dave Walker

When adding a keypair with ec2 API that already exists, give a friendly error and no traceback in nova-api

1105. By termie

Fixes a bug related to incorrect reparsing of flags and prevents many extra reparses.

1106. By Chris Behrens

Pretty simple. We call openssl to encrypt the admin password, but the recent changes around this code forgot to strip the newline off the read from stdout.

1107. By Johannes Erdfelt

Using the root-password subcommand of the nova client results in the password being changed for the instance specified, but to a different unknown password. The patch changes nova to use the password specified in the API call.

1108. By Johannes Erdfelt

eventlet.spawn_n() expects the function and arguments, but it expects the arguments unpacked since it uses *args.

1109. By Ed Leafe

The code for getting an opaque reference to an instance assumed that there was a reference to an instance obj available when raising an exception. I changed this from raising an InstanceNotFound exception to a NotFound, as this is more appropriate for the failure, and doesn't require an instance ID.

1110. By Brian Lamar

Created new libvirt directory, moved libvirt_conn.py to libvirt/connection.py, moved libvirt templates, broke out firewall and network utilities.

1111. By Chris Behrens

During the API create call, the API would kick off a build and then loop in a greenthread waiting for the scheduler to pick a host for the instance. After API would see a host was picked, it would cast to the compute node's set_admin_password method.

The API server really should not have to do this. The password to set should be pushed along with the build request, instead. The compute node can then set the password after it detects the instance has booted. This removes a greenthread from the API server, a loop that constantly checks the DB for the host, and finally a cast to the compute node.

1112. By Mark Washenberger

Several changes designed to bring the openstack api 1.1 closer to spec
- add ram limits to the nova compute quotas
- enable injected file limits and injected file size limits to be overridden in the quota database table
- expose quota limits as absolute limits in the openstack api 1.1 limits resource
- add support for controlling 'unlimited' quotas to nova-manage

1113. By Alex Meade

Fixed the mistyped line referred to in bug 787023

Revision history for this message
termie (termie) wrote :

Chris: I've just tried a few different variations of the above suggested code as well as code based on iterconsume, but all have resulted in issues of some sort or another that have been difficult to debug (either hangs or returns nothing)... the code works as it is so I am hesitant to go back down the rabbit hole. Will push newer version shortly.

lp:~termie/nova/rpc_multicall updated
1114. By termie

add support to rpc for multicall

1115. By termie

make the test more expicit

1116. By termie

add commented out unworking code for yield-based returns

1117. By Chris Behrens

Add a connection pool for rpc cast/call
Use the same rabbit connection for all topic listening and wait to be notified vs doing a 0.1 second poll for each.

1118. By Chris Behrens

pep8 and comment fixes

1119. By Chris Behrens

convert fanout_cast to ConnectionPool

1120. By Chris Behrens

fakerabbit's declare_consumer should support more than 1 consumer. also: make fakerabbit Backend.consume be an iterator like it should be..

1121. By Vish Ishaya

fix consumers to actually be deleted and clean up cloud test

1122. By Chris Behrens

catch greenlet.GreenletExit when shutting service down

1123. By Chris Behrens

Always create Service consumers no matter if report_interval is 0
Fix tests to handle how Service loads Consumers now

1124. By Chris Behrens

Add rpc_conn_pool_size flag for the new connection pool

1125. By Chris Behrens

connection pool tests and make the pool LIFO

1126. By termie

bring back commits lost in merge

1127. By termie

almost everything working with fake_rabbit

1128. By termie

don't need to use a separate connection

1129. By Vish Ishaya

lots of fixes for rpc and extra imports

1130. By termie

make sure that using multicall on a call with a single result still functions

1131. By termie

cleanup the code for merging

1132. By termie

cleanups

1133. By termie

replace removed import

1134. By termie

don't put connection back in pool

1135. By termie

move consumerset killing into stop

1136. By termie

change the behavior of calling a multicall

Revision history for this message
Chris Behrens (cbehrens) wrote :

termie: Strange. Well, I'd like to get this merged sooner rather than later, so it's probably okay for now. I have a couple of other starts to redoing this code even more. One of them uses kombu, which seems to simplify some things for us, but it still has some annoyances. More on that later.

I think there was one more thing I spotted when playing around with larger refactors that I didn't comment on here yet. I'll take a look at what you've changed here since I last looked..

Revision history for this message
Chris Behrens (cbehrens) wrote :

Ah, I added a self.conn.close() to Service.stop() after killing the csetthread. Not terribly important.

Looks good to me.

Revision history for this message
Ed Leafe (ed-leafe) wrote :

When testing if an object is a generator, relying on having an attribute named 'send' is weak and error-prone. It would be preferable to use:
   isinstance(rval, types.GeneratorType)
to check for generators.

Line 239 of the diff (wait method): Logging the error should probably not only include the type of exception, but the exception message as well:
    type_e = type(e)
    LOG.error(_("Received exception %(e)s (%(type_e)s while processing consumer")
            % locals())

In the call() method of nova/rpc.py (416 of the diff), is there any chance that MultiCall() will return either a) a non-list result or b) an empty list? If so the slice action will throw an error.

In nova/service.py, the declaration at line 109 is stylistically inconsistent. The declarations above it use full names ('consumer_all', 'consumer_node', etc.), while the last name is 'cset'; it should be 'consumer_set' for naming consistency. Also, the indentation of continued lines is inconsistent; since the preceeding lines use the standard two indents, this last declaration should follow that convention. And while I'm nitpicking, there should not be a blank line between the first 3 and the last declarations (i.e., remove line 108).

review: Needs Fixing
Revision history for this message
Josh Kearney (jk0) wrote :

Not that it matters much, but I always prefer doing my for loops like such (in reference to the tests near the end):

for i, x in enumerate(y):

^ This eliminates having to set i = 0 and manually incrementing it inside the loop.

lp:~termie/nova/rpc_multicall updated
1137. By termie

changes per review

Revision history for this message
termie (termie) wrote :

Ed and jk0, I think I've addressed all the issues you've brought up.

Revision history for this message
Chris Behrens (cbehrens) wrote :

Something I spot which is a bug before your changes:

215 msg_reply(msg_id, _('No method for message: %s') % message_data)

msg_id could be None there.

Revision history for this message
Chris Behrens (cbehrens) wrote :

Also: would it simplify things by removing this:

- # This will be popped off in _unpack_context
- msg_id = message_data.get('_msg_id', None)

And and using ctxt.reply going forward? See: http://paste.openstack.org/show/1433/

RPC test passes.

lp:~termie/nova/rpc_multicall updated
1138. By termie

fix a minor bug unrelated to this change

Revision history for this message
termie (termie) wrote :

fixed the bug, I don't particularly want to do that other change for this but you are welcome to submit such a fixup later :)

Revision history for this message
Chris Behrens (cbehrens) wrote :

I'm good with that. :)

review: Approve
Revision history for this message
Ed Leafe (ed-leafe) wrote :

Fixes look good.

review: Approve
Revision history for this message
Vish Ishaya (vishvananda) wrote :

I've been testing this as we go as well. I think it is good to go.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'nova/fakerabbit.py'
2--- nova/fakerabbit.py 2011-02-22 23:05:48 +0000
3+++ nova/fakerabbit.py 2011-05-27 00:09:30 +0000
4@@ -31,6 +31,7 @@
5
6 EXCHANGES = {}
7 QUEUES = {}
8+CONSUMERS = {}
9
10
11 class Message(base.BaseMessage):
12@@ -96,17 +97,29 @@
13 ' key %(routing_key)s') % locals())
14 EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)
15
16- def declare_consumer(self, queue, callback, *args, **kwargs):
17- self.current_queue = queue
18- self.current_callback = callback
19+ def declare_consumer(self, queue, callback, consumer_tag, *args, **kwargs):
20+ global CONSUMERS
21+ LOG.debug("Adding consumer %s", consumer_tag)
22+ CONSUMERS[consumer_tag] = (queue, callback)
23+
24+ def cancel(self, consumer_tag):
25+ global CONSUMERS
26+ LOG.debug("Removing consumer %s", consumer_tag)
27+ del CONSUMERS[consumer_tag]
28
29 def consume(self, limit=None):
30+ global CONSUMERS
31+ num = 0
32 while True:
33- item = self.get(self.current_queue)
34- if item:
35- self.current_callback(item)
36- raise StopIteration()
37- greenthread.sleep(0)
38+ for (queue, callback) in CONSUMERS.itervalues():
39+ item = self.get(queue)
40+ if item:
41+ callback(item)
42+ num += 1
43+ yield
44+ if limit and num == limit:
45+ raise StopIteration()
46+ greenthread.sleep(0.1)
47
48 def get(self, queue, no_ack=False):
49 global QUEUES
50@@ -134,5 +147,7 @@
51 def reset_all():
52 global EXCHANGES
53 global QUEUES
54+ global CONSUMERS
55 EXCHANGES = {}
56 QUEUES = {}
57+ CONSUMERS = {}
58
59=== modified file 'nova/rpc.py'
60--- nova/rpc.py 2011-04-20 19:08:22 +0000
61+++ nova/rpc.py 2011-05-27 00:09:30 +0000
62@@ -28,12 +28,15 @@
63 import sys
64 import time
65 import traceback
66+import types
67 import uuid
68
69 from carrot import connection as carrot_connection
70 from carrot import messaging
71 from eventlet import greenpool
72-from eventlet import greenthread
73+from eventlet import pools
74+from eventlet import queue
75+import greenlet
76
77 from nova import context
78 from nova import exception
79@@ -47,7 +50,10 @@
80
81
82 FLAGS = flags.FLAGS
83-flags.DEFINE_integer('rpc_thread_pool_size', 1024, 'Size of RPC thread pool')
84+flags.DEFINE_integer('rpc_thread_pool_size', 1024,
85+ 'Size of RPC thread pool')
86+flags.DEFINE_integer('rpc_conn_pool_size', 30,
87+ 'Size of RPC connection pool')
88
89
90 class Connection(carrot_connection.BrokerConnection):
91@@ -90,6 +96,22 @@
92 return cls.instance()
93
94
95+class Pool(pools.Pool):
96+ """Class that implements a Pool of Connections."""
97+
98+ # TODO(comstud): Timeout connections not used in a while
99+ def create(self):
100+ LOG.debug('Creating new connection')
101+ return Connection.instance(new=True)
102+
103+# Create a ConnectionPool to use for RPC calls. We'll order the
104+# pool as a stack (LIFO), so that we can potentially loop through and
105+# timeout old unused connections at some point
106+ConnectionPool = Pool(
107+ max_size=FLAGS.rpc_conn_pool_size,
108+ order_as_stack=True)
109+
110+
111 class Consumer(messaging.Consumer):
112 """Consumer base class.
113
114@@ -131,7 +153,9 @@
115 self.connection = Connection.recreate()
116 self.backend = self.connection.create_backend()
117 self.declare()
118- super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks)
119+ return super(Consumer, self).fetch(no_ack,
120+ auto_ack,
121+ enable_callbacks)
122 if self.failed_connection:
123 LOG.error(_('Reconnected to queue'))
124 self.failed_connection = False
125@@ -159,13 +183,13 @@
126 self.pool = greenpool.GreenPool(FLAGS.rpc_thread_pool_size)
127 super(AdapterConsumer, self).__init__(connection=connection,
128 topic=topic)
129-
130- def receive(self, *args, **kwargs):
131- self.pool.spawn_n(self._receive, *args, **kwargs)
132-
133- @exception.wrap_exception
134- def _receive(self, message_data, message):
135- """Magically looks for a method on the proxy object and calls it.
136+ self.register_callback(self.process_data)
137+
138+ def process_data(self, message_data, message):
139+ """Consumer callback to call a method on a proxy object.
140+
141+ Parses the message for validity and fires off a thread to call the
142+ proxy object method.
143
144 Message data should be a dictionary with two keys:
145 method: string representing the method to call
146@@ -175,8 +199,8 @@
147
148 """
149 LOG.debug(_('received %s') % message_data)
150- msg_id = message_data.pop('_msg_id', None)
151-
152+ # This will be popped off in _unpack_context
153+ msg_id = message_data.get('_msg_id', None)
154 ctxt = _unpack_context(message_data)
155
156 method = message_data.get('method')
157@@ -188,8 +212,17 @@
158 # we just log the message and send an error string
159 # back to the caller
160 LOG.warn(_('no method for message: %s') % message_data)
161- msg_reply(msg_id, _('No method for message: %s') % message_data)
162+ if msg_id:
163+ msg_reply(msg_id,
164+ _('No method for message: %s') % message_data)
165 return
166+ self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args)
167+
168+ @exception.wrap_exception
169+ def _process_data(self, msg_id, ctxt, method, args):
170+ """Thread that maigcally looks for a method on the proxy
171+ object and calls it.
172+ """
173
174 node_func = getattr(self.proxy, str(method))
175 node_args = dict((str(k), v) for k, v in args.iteritems())
176@@ -197,7 +230,18 @@
177 try:
178 rval = node_func(context=ctxt, **node_args)
179 if msg_id:
180- msg_reply(msg_id, rval, None)
181+ # Check if the result was a generator
182+ if isinstance(rval, types.GeneratorType):
183+ for x in rval:
184+ msg_reply(msg_id, x, None)
185+ else:
186+ msg_reply(msg_id, rval, None)
187+
188+ # This final None tells multicall that it is done.
189+ msg_reply(msg_id, None, None)
190+ elif isinstance(rval, types.GeneratorType):
191+ # NOTE(vish): this iterates through the generator
192+ list(rval)
193 except Exception as e:
194 logging.exception('Exception during message handling')
195 if msg_id:
196@@ -205,11 +249,6 @@
197 return
198
199
200-class Publisher(messaging.Publisher):
201- """Publisher base class."""
202- pass
203-
204-
205 class TopicAdapterConsumer(AdapterConsumer):
206 """Consumes messages on a specific topic."""
207
208@@ -242,6 +281,58 @@
209 topic=topic, proxy=proxy)
210
211
212+class ConsumerSet(object):
213+ """Groups consumers to listen on together on a single connection."""
214+
215+ def __init__(self, connection, consumer_list):
216+ self.consumer_list = set(consumer_list)
217+ self.consumer_set = None
218+ self.enabled = True
219+ self.init(connection)
220+
221+ def init(self, conn):
222+ if not conn:
223+ conn = Connection.instance(new=True)
224+ if self.consumer_set:
225+ self.consumer_set.close()
226+ self.consumer_set = messaging.ConsumerSet(conn)
227+ for consumer in self.consumer_list:
228+ consumer.connection = conn
229+ # consumer.backend is set for us
230+ self.consumer_set.add_consumer(consumer)
231+
232+ def reconnect(self):
233+ self.init(None)
234+
235+ def wait(self, limit=None):
236+ running = True
237+ while running:
238+ it = self.consumer_set.iterconsume(limit=limit)
239+ if not it:
240+ break
241+ while True:
242+ try:
243+ it.next()
244+ except StopIteration:
245+ return
246+ except greenlet.GreenletExit:
247+ running = False
248+ break
249+ except Exception as e:
250+ LOG.exception(_("Exception while processing consumer"))
251+ self.reconnect()
252+ # Break to outer loop
253+ break
254+
255+ def close(self):
256+ self.consumer_set.close()
257+
258+
259+class Publisher(messaging.Publisher):
260+ """Publisher base class."""
261+ pass
262+
263+
264 class TopicPublisher(Publisher):
265 """Publishes messages on a specific topic."""
266
267@@ -306,16 +397,18 @@
268 LOG.error(_("Returning exception %s to caller"), message)
269 LOG.error(tb)
270 failure = (failure[0].__name__, str(failure[1]), tb)
271- conn = Connection.instance()
272- publisher = DirectPublisher(connection=conn, msg_id=msg_id)
273- try:
274- publisher.send({'result': reply, 'failure': failure})
275- except TypeError:
276- publisher.send(
277- {'result': dict((k, repr(v))
278- for k, v in reply.__dict__.iteritems()),
279- 'failure': failure})
280- publisher.close()
281+
282+ with ConnectionPool.item() as conn:
283+ publisher = DirectPublisher(connection=conn, msg_id=msg_id)
284+ try:
285+ publisher.send({'result': reply, 'failure': failure})
286+ except TypeError:
287+ publisher.send(
288+ {'result': dict((k, repr(v))
289+ for k, v in reply.__dict__.iteritems()),
290+ 'failure': failure})
291+
292+ publisher.close()
293
294
295 class RemoteError(exception.Error):
296@@ -347,8 +440,9 @@
297 if key.startswith('_context_'):
298 value = msg.pop(key)
299 context_dict[key[9:]] = value
300+ context_dict['msg_id'] = msg.pop('_msg_id', None)
301 LOG.debug(_('unpacked context: %s'), context_dict)
302- return context.RequestContext.from_dict(context_dict)
303+ return RpcContext.from_dict(context_dict)
304
305
306 def _pack_context(msg, context):
307@@ -360,70 +454,112 @@
308 for args at some point.
309
310 """
311- context = dict([('_context_%s' % key, value)
312- for (key, value) in context.to_dict().iteritems()])
313- msg.update(context)
314-
315-
316-def call(context, topic, msg):
317- """Sends a message on a topic and wait for a response."""
318+ context_d = dict([('_context_%s' % key, value)
319+ for (key, value) in context.to_dict().iteritems()])
320+ msg.update(context_d)
321+
322+
323+class RpcContext(context.RequestContext):
324+ def __init__(self, *args, **kwargs):
325+ msg_id = kwargs.pop('msg_id', None)
326+ self.msg_id = msg_id
327+ super(RpcContext, self).__init__(*args, **kwargs)
328+
329+ def reply(self, *args, **kwargs):
330+ msg_reply(self.msg_id, *args, **kwargs)
331+
332+
333+def multicall(context, topic, msg):
334+ """Make a call that returns multiple times."""
335 LOG.debug(_('Making asynchronous call on %s ...'), topic)
336 msg_id = uuid.uuid4().hex
337 msg.update({'_msg_id': msg_id})
338 LOG.debug(_('MSG_ID is %s') % (msg_id))
339 _pack_context(msg, context)
340
341- class WaitMessage(object):
342- def __call__(self, data, message):
343- """Acks message and sets result."""
344- message.ack()
345- if data['failure']:
346- self.result = RemoteError(*data['failure'])
347- else:
348- self.result = data['result']
349-
350- wait_msg = WaitMessage()
351- conn = Connection.instance()
352- consumer = DirectConsumer(connection=conn, msg_id=msg_id)
353+ con_conn = ConnectionPool.get()
354+ consumer = DirectConsumer(connection=con_conn, msg_id=msg_id)
355+ wait_msg = MulticallWaiter(consumer)
356 consumer.register_callback(wait_msg)
357
358- conn = Connection.instance()
359- publisher = TopicPublisher(connection=conn, topic=topic)
360+ publisher = TopicPublisher(connection=con_conn, topic=topic)
361 publisher.send(msg)
362 publisher.close()
363
364- try:
365- consumer.wait(limit=1)
366- except StopIteration:
367- pass
368- consumer.close()
369- # NOTE(termie): this is a little bit of a change from the original
370- # non-eventlet code where returning a Failure
371- # instance from a deferred call is very similar to
372- # raising an exception
373- if isinstance(wait_msg.result, Exception):
374- raise wait_msg.result
375- return wait_msg.result
376+ return wait_msg
377+
378+
379+class MulticallWaiter(object):
380+ def __init__(self, consumer):
381+ self._consumer = consumer
382+ self._results = queue.Queue()
383+ self._closed = False
384+
385+ def close(self):
386+ self._closed = True
387+ self._consumer.close()
388+ ConnectionPool.put(self._consumer.connection)
389+
390+ def __call__(self, data, message):
391+ """Acks message and sets result."""
392+ message.ack()
393+ if data['failure']:
394+ self._results.put(RemoteError(*data['failure']))
395+ else:
396+ self._results.put(data['result'])
397+
398+ def __iter__(self):
399+ return self.wait()
400+
401+ def wait(self):
402+ while True:
403+ rv = None
404+ while rv is None and not self._closed:
405+ try:
406+ rv = self._consumer.fetch(enable_callbacks=True)
407+ except Exception:
408+ self.close()
409+ raise
410+ time.sleep(0.01)
411+
412+ result = self._results.get()
413+ if isinstance(result, Exception):
414+ self.close()
415+ raise result
416+ if result == None:
417+ self.close()
418+ raise StopIteration
419+ yield result
420+
421+
422+def call(context, topic, msg):
423+ """Sends a message on a topic and wait for a response."""
424+ rv = multicall(context, topic, msg)
425+ # NOTE(vish): return the last result from the multicall
426+ rv = list(rv)
427+ if not rv:
428+ return
429+ return rv[-1]
430
431
432 def cast(context, topic, msg):
433 """Sends a message on a topic without waiting for a response."""
434 LOG.debug(_('Making asynchronous cast on %s...'), topic)
435 _pack_context(msg, context)
436- conn = Connection.instance()
437- publisher = TopicPublisher(connection=conn, topic=topic)
438- publisher.send(msg)
439- publisher.close()
440+ with ConnectionPool.item() as conn:
441+ publisher = TopicPublisher(connection=conn, topic=topic)
442+ publisher.send(msg)
443+ publisher.close()
444
445
446 def fanout_cast(context, topic, msg):
447 """Sends a message on a fanout exchange without waiting for a response."""
448 LOG.debug(_('Making asynchronous fanout cast...'))
449 _pack_context(msg, context)
450- conn = Connection.instance()
451- publisher = FanoutPublisher(topic, connection=conn)
452- publisher.send(msg)
453- publisher.close()
454+ with ConnectionPool.item() as conn:
455+ publisher = FanoutPublisher(topic, connection=conn)
456+ publisher.send(msg)
457+ publisher.close()
458
459
460 def generic_response(message_data, message):
461@@ -459,6 +595,7 @@
462
463 if wait:
464 consumer.wait()
465+ consumer.close()
466
467
468 if __name__ == '__main__':
469
470=== modified file 'nova/service.py'
471--- nova/service.py 2011-05-20 04:03:15 +0000
472+++ nova/service.py 2011-05-27 00:09:30 +0000
473@@ -19,14 +19,11 @@
474
475 """Generic Node baseclass for all workers that run on hosts."""
476
477+import greenlet
478 import inspect
479 import os
480-import sys
481-import time
482
483-from eventlet import event
484 from eventlet import greenthread
485-from eventlet import greenpool
486
487 from nova import context
488 from nova import db
489@@ -91,27 +88,37 @@
490 if 'nova-compute' == self.binary:
491 self.manager.update_available_resource(ctxt)
492
493- conn1 = rpc.Connection.instance(new=True)
494- conn2 = rpc.Connection.instance(new=True)
495- conn3 = rpc.Connection.instance(new=True)
496+ self.conn = rpc.Connection.instance(new=True)
497+ logging.debug("Creating Consumer connection for Service %s" %
498+ self.topic)
499+
500+ # Share this same connection for these Consumers
501+ consumer_all = rpc.TopicAdapterConsumer(
502+ connection=self.conn,
503+ topic=self.topic,
504+ proxy=self)
505+ consumer_node = rpc.TopicAdapterConsumer(
506+ connection=self.conn,
507+ topic='%s.%s' % (self.topic, self.host),
508+ proxy=self)
509+ fanout = rpc.FanoutAdapterConsumer(
510+ connection=self.conn,
511+ topic=self.topic,
512+ proxy=self)
513+ consumer_set = rpc.ConsumerSet(
514+ connection=self.conn,
515+ consumer_list=[consumer_all, consumer_node, fanout])
516+
517+ # Wait forever, processing these consumers
518+ def _wait():
519+ try:
520+ consumer_set.wait()
521+ finally:
522+ consumer_set.close()
523+
524+ self.consumer_set_thread = greenthread.spawn(_wait)
525+
526 if self.report_interval:
527- consumer_all = rpc.TopicAdapterConsumer(
528- connection=conn1,
529- topic=self.topic,
530- proxy=self)
531- consumer_node = rpc.TopicAdapterConsumer(
532- connection=conn2,
533- topic='%s.%s' % (self.topic, self.host),
534- proxy=self)
535- fanout = rpc.FanoutAdapterConsumer(
536- connection=conn3,
537- topic=self.topic,
538- proxy=self)
539-
540- self.timers.append(consumer_all.attach_to_eventlet())
541- self.timers.append(consumer_node.attach_to_eventlet())
542- self.timers.append(fanout.attach_to_eventlet())
543-
544 pulse = utils.LoopingCall(self.report_state)
545 pulse.start(interval=self.report_interval, now=False)
546 self.timers.append(pulse)
547@@ -174,6 +181,11 @@
548 logging.warn(_('Service killed that has no database entry'))
549
550 def stop(self):
551+ self.consumer_set_thread.kill()
552+ try:
553+ self.consumer_set_thread.wait()
554+ except greenlet.GreenletExit:
555+ pass
556 for x in self.timers:
557 try:
558 x.stop()
559
560=== modified file 'nova/test.py'
561--- nova/test.py 2011-04-20 19:08:22 +0000
562+++ nova/test.py 2011-05-27 00:09:30 +0000
563@@ -31,17 +31,15 @@
564 import unittest
565
566 import mox
567-import shutil
568 import stubout
569 from eventlet import greenthread
570
571-from nova import context
572-from nova import db
573 from nova import fakerabbit
574 from nova import flags
575 from nova import rpc
576 from nova import service
577 from nova import wsgi
578+from nova.virt import fake
579
580
581 FLAGS = flags.FLAGS
582@@ -85,6 +83,7 @@
583 self._monkey_patch_attach()
584 self._monkey_patch_wsgi()
585 self._original_flags = FLAGS.FlagValuesDict()
586+ rpc.ConnectionPool = rpc.Pool(max_size=FLAGS.rpc_conn_pool_size)
587
588 def tearDown(self):
589 """Runs after each test method to tear down test environment."""
590@@ -99,6 +98,10 @@
591 if FLAGS.fake_rabbit:
592 fakerabbit.reset_all()
593
594+ if FLAGS.connection_type == 'fake':
595+ if hasattr(fake.FakeConnection, '_instance'):
596+ del fake.FakeConnection._instance
597+
598 # Reset any overriden flags
599 self.reset_flags()
600
601
602=== modified file 'nova/tests/integrated/integrated_helpers.py'
603--- nova/tests/integrated/integrated_helpers.py 2011-03-30 16:08:36 +0000
604+++ nova/tests/integrated/integrated_helpers.py 2011-05-27 00:09:30 +0000
605@@ -154,10 +154,7 @@
606 # set up services
607 self.start_service('compute')
608 self.start_service('volume')
609- # NOTE(justinsb): There's a bug here which is eluding me...
610- # If we start the network_service, all is good, but then subsequent
611- # tests fail: CloudTestCase.test_ajax_console in particular.
612- #self.start_service('network')
613+ self.start_service('network')
614 self.start_service('scheduler')
615
616 self._start_api_service()
617
618=== modified file 'nova/tests/test_cloud.py'
619--- nova/tests/test_cloud.py 2011-05-20 06:51:29 +0000
620+++ nova/tests/test_cloud.py 2011-05-27 00:09:30 +0000
621@@ -17,13 +17,9 @@
622 # under the License.
623
624 from base64 import b64decode
625-import json
626 from M2Crypto import BIO
627 from M2Crypto import RSA
628 import os
629-import shutil
630-import tempfile
631-import time
632
633 from eventlet import greenthread
634
635@@ -33,12 +29,10 @@
636 from nova import flags
637 from nova import log as logging
638 from nova import rpc
639-from nova import service
640 from nova import test
641 from nova import utils
642 from nova import exception
643 from nova.auth import manager
644-from nova.compute import power_state
645 from nova.api.ec2 import cloud
646 from nova.api.ec2 import ec2utils
647 from nova.image import local
648@@ -79,14 +73,21 @@
649 self.stubs.Set(local.LocalImageService, 'show', fake_show)
650 self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show)
651
652+ # NOTE(vish): set up a manual wait so rpc.cast has a chance to finish
653+ rpc_cast = rpc.cast
654+
655+ def finish_cast(*args, **kwargs):
656+ rpc_cast(*args, **kwargs)
657+ greenthread.sleep(0.2)
658+
659+ self.stubs.Set(rpc, 'cast', finish_cast)
660+
661 def tearDown(self):
662 network_ref = db.project_get_network(self.context,
663 self.project.id)
664 db.network_disassociate(self.context, network_ref['id'])
665 self.manager.delete_project(self.project)
666 self.manager.delete_user(self.user)
667- self.compute.kill()
668- self.network.kill()
669 super(CloudTestCase, self).tearDown()
670
671 def _create_key(self, name):
672@@ -113,7 +114,6 @@
673 self.cloud.describe_addresses(self.context)
674 self.cloud.release_address(self.context,
675 public_ip=address)
676- greenthread.sleep(0.3)
677 db.floating_ip_destroy(self.context, address)
678
679 def test_associate_disassociate_address(self):
680@@ -129,12 +129,10 @@
681 self.cloud.associate_address(self.context,
682 instance_id=ec2_id,
683 public_ip=address)
684- greenthread.sleep(0.3)
685 self.cloud.disassociate_address(self.context,
686 public_ip=address)
687 self.cloud.release_address(self.context,
688 public_ip=address)
689- greenthread.sleep(0.3)
690 self.network.deallocate_fixed_ip(self.context, fixed)
691 db.instance_destroy(self.context, inst['id'])
692 db.floating_ip_destroy(self.context, address)
693@@ -306,31 +304,25 @@
694 'instance_type': instance_type,
695 'max_count': max_count}
696 rv = self.cloud.run_instances(self.context, **kwargs)
697- greenthread.sleep(0.3)
698 instance_id = rv['instancesSet'][0]['instanceId']
699 output = self.cloud.get_console_output(context=self.context,
700 instance_id=[instance_id])
701 self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE?OUTPUT')
702 # TODO(soren): We need this until we can stop polling in the rpc code
703 # for unit tests.
704- greenthread.sleep(0.3)
705 rv = self.cloud.terminate_instances(self.context, [instance_id])
706- greenthread.sleep(0.3)
707
708 def test_ajax_console(self):
709 kwargs = {'image_id': 'ami-1'}
710 rv = self.cloud.run_instances(self.context, **kwargs)
711 instance_id = rv['instancesSet'][0]['instanceId']
712- greenthread.sleep(0.3)
713 output = self.cloud.get_ajax_console(context=self.context,
714 instance_id=[instance_id])
715 self.assertEquals(output['url'],
716 '%s/?token=FAKETOKEN' % FLAGS.ajax_console_proxy_url)
717 # TODO(soren): We need this until we can stop polling in the rpc code
718 # for unit tests.
719- greenthread.sleep(0.3)
720 rv = self.cloud.terminate_instances(self.context, [instance_id])
721- greenthread.sleep(0.3)
722
723 def test_key_generation(self):
724 result = self._create_key('test')
725
726=== modified file 'nova/tests/test_rpc.py'
727--- nova/tests/test_rpc.py 2011-02-23 22:41:11 +0000
728+++ nova/tests/test_rpc.py 2011-05-27 00:09:30 +0000
729@@ -31,7 +31,6 @@
730
731
732 class RpcTestCase(test.TestCase):
733- """Test cases for rpc"""
734 def setUp(self):
735 super(RpcTestCase, self).setUp()
736 self.conn = rpc.Connection.instance(True)
737@@ -43,14 +42,55 @@
738 self.context = context.get_admin_context()
739
740 def test_call_succeed(self):
741- """Get a value through rpc call"""
742 value = 42
743 result = rpc.call(self.context, 'test', {"method": "echo",
744 "args": {"value": value}})
745 self.assertEqual(value, result)
746
747+ def test_call_succeed_despite_multiple_returns(self):
748+ value = 42
749+ result = rpc.call(self.context, 'test', {"method": "echo_three_times",
750+ "args": {"value": value}})
751+ self.assertEqual(value + 2, result)
752+
753+ def test_call_succeed_despite_multiple_returns_yield(self):
754+ value = 42
755+ result = rpc.call(self.context, 'test',
756+ {"method": "echo_three_times_yield",
757+ "args": {"value": value}})
758+ self.assertEqual(value + 2, result)
759+
760+ def test_multicall_succeed_once(self):
761+ value = 42
762+ result = rpc.multicall(self.context,
763+ 'test',
764+ {"method": "echo",
765+ "args": {"value": value}})
766+ for i, x in enumerate(result):
767+ if i > 0:
768+ self.fail('should only receive one response')
769+ self.assertEqual(value + i, x)
770+
771+ def test_multicall_succeed_three_times(self):
772+ value = 42
773+ result = rpc.multicall(self.context,
774+ 'test',
775+ {"method": "echo_three_times",
776+ "args": {"value": value}})
777+ for i, x in enumerate(result):
778+ self.assertEqual(value + i, x)
779+
780+ def test_multicall_succeed_three_times_yield(self):
781+ value = 42
782+ result = rpc.multicall(self.context,
783+ 'test',
784+ {"method": "echo_three_times_yield",
785+ "args": {"value": value}})
786+ for i, x in enumerate(result):
787+ self.assertEqual(value + i, x)
788+
789 def test_context_passed(self):
790- """Makes sure a context is passed through rpc call"""
791+ """Makes sure a context is passed through rpc call."""
792 value = 42
793 result = rpc.call(self.context,
794 'test', {"method": "context",
795@@ -58,11 +98,12 @@
796 self.assertEqual(self.context.to_dict(), result)
797
798 def test_call_exception(self):
799- """Test that exception gets passed back properly
800+ """Test that exception gets passed back properly.
801
802 rpc.call returns a RemoteError object. The value of the
803 exception is converted to a string, so we convert it back
804 to an int in the test.
805+
806 """
807 value = 42
808 self.assertRaises(rpc.RemoteError,
809@@ -81,7 +122,7 @@
810 self.assertEqual(int(exc.value), value)
811
812 def test_nested_calls(self):
813- """Test that we can do an rpc.call inside another call"""
814+ """Test that we can do an rpc.call inside another call."""
815 class Nested(object):
816 @staticmethod
817 def echo(context, queue, value):
818@@ -108,25 +149,80 @@
819 "value": value}})
820 self.assertEqual(value, result)
821
822+ def test_connectionpool_single(self):
823+ """Test that ConnectionPool recycles a single connection."""
824+ conn1 = rpc.ConnectionPool.get()
825+ rpc.ConnectionPool.put(conn1)
826+ conn2 = rpc.ConnectionPool.get()
827+ rpc.ConnectionPool.put(conn2)
828+ self.assertEqual(conn1, conn2)
829+
830+ def test_connectionpool_double(self):
831+ """Test that ConnectionPool returns and reuses separate connections.
832+
833+ When called consecutively we should get separate connections and upon
834+ returning them those connections should be reused for future calls
835+ before generating a new connection.
836+
837+ """
838+ conn1 = rpc.ConnectionPool.get()
839+ conn2 = rpc.ConnectionPool.get()
840+
841+ self.assertNotEqual(conn1, conn2)
842+ rpc.ConnectionPool.put(conn1)
843+ rpc.ConnectionPool.put(conn2)
844+
845+ conn3 = rpc.ConnectionPool.get()
846+ conn4 = rpc.ConnectionPool.get()
847+ self.assertEqual(conn1, conn3)
848+ self.assertEqual(conn2, conn4)
849+
850+ def test_connectionpool_limit(self):
851+ """Test connection pool limit and connection uniqueness."""
852+ max_size = FLAGS.rpc_conn_pool_size
853+ conns = []
854+
855+ for i in xrange(max_size):
856+ conns.append(rpc.ConnectionPool.get())
857+
858+ self.assertFalse(rpc.ConnectionPool.free_items)
859+ self.assertEqual(rpc.ConnectionPool.current_size,
860+ rpc.ConnectionPool.max_size)
861+ self.assertEqual(len(set(conns)), max_size)
862+
863
864 class TestReceiver(object):
865- """Simple Proxy class so the consumer has methods to call
866-
867- Uses static methods because we aren't actually storing any state"""
868+ """Simple Proxy class so the consumer has methods to call.
869+
870+ Uses static methods because we aren't actually storing any state.
871+
872+ """
873
874 @staticmethod
875 def echo(context, value):
876- """Simply returns whatever value is sent in"""
877+ """Simply returns whatever value is sent in."""
878 LOG.debug(_("Received %s"), value)
879 return value
880
881 @staticmethod
882 def context(context, value):
883- """Returns dictionary version of context"""
884+ """Returns dictionary version of context."""
885 LOG.debug(_("Received %s"), context)
886 return context.to_dict()
887
888 @staticmethod
889+ def echo_three_times(context, value):
890+ context.reply(value)
891+ context.reply(value + 1)
892+ context.reply(value + 2)
893+
894+ @staticmethod
895+ def echo_three_times_yield(context, value):
896+ yield value
897+ yield value + 1
898+ yield value + 2
899+
900+ @staticmethod
901 def fail(context, value):
902- """Raises an exception with the value sent in"""
903+ """Raises an exception with the value sent in."""
904 raise Exception(value)
905
906=== modified file 'nova/tests/test_service.py'
907--- nova/tests/test_service.py 2011-03-17 13:35:00 +0000
908+++ nova/tests/test_service.py 2011-05-27 00:09:30 +0000
909@@ -106,7 +106,10 @@
910
911 # NOTE(vish): Create was moved out of mox replay to make sure that
912 # the looping calls are created in StartService.
913- app = service.Service.create(host=host, binary=binary)
914+ app = service.Service.create(host=host, binary=binary, topic=topic)
915+
916+ self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
917+ service.rpc.Connection.instance(new=mox.IgnoreArg())
918
919 self.mox.StubOutWithMock(rpc,
920 'TopicAdapterConsumer',
921@@ -114,6 +117,11 @@
922 self.mox.StubOutWithMock(rpc,
923 'FanoutAdapterConsumer',
924 use_mock_anything=True)
925+
926+ self.mox.StubOutWithMock(rpc,
927+ 'ConsumerSet',
928+ use_mock_anything=True)
929+
930 rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
931 topic=topic,
932 proxy=mox.IsA(service.Service)).AndReturn(
933@@ -129,9 +137,14 @@
934 proxy=mox.IsA(service.Service)).AndReturn(
935 rpc.FanoutAdapterConsumer)
936
937- rpc.TopicAdapterConsumer.attach_to_eventlet()
938- rpc.TopicAdapterConsumer.attach_to_eventlet()
939- rpc.FanoutAdapterConsumer.attach_to_eventlet()
940+ def wait_func(self, limit=None):
941+ return None
942+
943+ mock_cset = self.mox.CreateMock(rpc.ConsumerSet,
944+ {'wait': wait_func})
945+ rpc.ConsumerSet(connection=mox.IgnoreArg(),
946+ consumer_list=mox.IsA(list)).AndReturn(mock_cset)
947+ wait_func(mox.IgnoreArg())
948
949 service_create = {'host': host,
950 'binary': binary,
951@@ -287,8 +300,42 @@
952 # Creating mocks
953 self.mox.StubOutWithMock(service.rpc.Connection, 'instance')
954 service.rpc.Connection.instance(new=mox.IgnoreArg())
955- service.rpc.Connection.instance(new=mox.IgnoreArg())
956- service.rpc.Connection.instance(new=mox.IgnoreArg())
957+
958+ self.mox.StubOutWithMock(rpc,
959+ 'TopicAdapterConsumer',
960+ use_mock_anything=True)
961+ self.mox.StubOutWithMock(rpc,
962+ 'FanoutAdapterConsumer',
963+ use_mock_anything=True)
964+
965+ self.mox.StubOutWithMock(rpc,
966+ 'ConsumerSet',
967+ use_mock_anything=True)
968+
969+ rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
970+ topic=topic,
971+ proxy=mox.IsA(service.Service)).AndReturn(
972+ rpc.TopicAdapterConsumer)
973+
974+ rpc.TopicAdapterConsumer(connection=mox.IgnoreArg(),
975+ topic='%s.%s' % (topic, host),
976+ proxy=mox.IsA(service.Service)).AndReturn(
977+ rpc.TopicAdapterConsumer)
978+
979+ rpc.FanoutAdapterConsumer(connection=mox.IgnoreArg(),
980+ topic=topic,
981+ proxy=mox.IsA(service.Service)).AndReturn(
982+ rpc.FanoutAdapterConsumer)
983+
984+ def wait_func(self, limit=None):
985+ return None
986+
987+ mock_cset = self.mox.CreateMock(rpc.ConsumerSet,
988+ {'wait': wait_func})
989+ rpc.ConsumerSet(connection=mox.IgnoreArg(),
990+ consumer_list=mox.IsA(list)).AndReturn(mock_cset)
991+ wait_func(mox.IgnoreArg())
992+
993 self.mox.StubOutWithMock(serv.manager.driver,
994 'update_available_resource')
995 serv.manager.driver.update_available_resource(mox.IgnoreArg(), host)