Merge lp:~blake-rouse/maas/simplestreams-file-handler-connectionwrapper into lp:~maas-committers/maas/trunk

Proposed by Blake Rouse
Status: Merged
Approved by: Blake Rouse
Approved revision: no longer in the source branch.
Merged at revision: 2838
Proposed branch: lp:~blake-rouse/maas/simplestreams-file-handler-connectionwrapper
Merge into: lp:~maas-committers/maas/trunk
Diff against target: 274 lines (+135/-51)
3 files modified
src/maasserver/bootresources.py (+42/-16)
src/maasserver/fields.py (+4/-2)
src/maasserver/tests/test_bootresources.py (+89/-33)
To merge this branch: bzr merge lp:~blake-rouse/maas/simplestreams-file-handler-connectionwrapper
Reviewer Review Type Date Requested Status
Jeroen T. Vermeulen (community) Approve
Review via email: mp+232465@code.launchpad.net

Commit message

Replaced TransactionWrapper with ConnectionWrapper.

TransactionWrapper wrapped a LargeFileObject in its own transaction, when reading data into StreamingHttpResponse. This works in theory but causes the main connection for django to be in a transaction, and when a new request is received the Transaction middleware will fail to initialize do to the connection already being in a transaction.

ConnectionWrapper wraps the LargeFileObject in its own connection to the database. This allows the large object to use its own transaction, and not cause any issues with incoming requests to django.

To post a comment you must log in.
Revision history for this message
Jeroen T. Vermeulen (jtv) wrote :

Looks like a sensible change — I even wonder how much need there is for a transaction at all in this case, but I don't see it doing any harm either. I'll have to take some of the Django mechanics on faith (and on its test coverage, obviously). It does make me wonder if we make the same provision when writing the same large object?

I'm approving, but with some notes for small things that I think need fixing.

.

Can you make it clear that _set_up is designed to be idempotent for lazy initialisation? If I had to maintain it under pressure I might miss that, I could imagine myself missing that and breaking the idempotency.

(Pedantic note #1: the docstring for _set_up uses “Setup,” the noun, as a verb.)

.

(Pedantic note #2: the leading comment for TestConnectionWrapper.make_file_for_client looks like the sentence got jumbled.)

.

In test_download_calls__get_new_connection:

        # The connection is not made until first access of the content. Read
        # the content so it called.
        b''.join(response.streaming_content)

Well commented, but mind the typo. :) Our usual way to force the full iteration is to use list():

        list(response.streaming_content)

The same snippet also occurs in test_download_connection_is_not_same_as_django_connections. Short as it is, it may be worth extracting into a function. That way it's a "thing" and you don't need to explain it in the tests, apart from the name and the docstring.

.

In test_download_connection_is_not_same_as_django_connections:

        # This is very import test. The connection that is used by the wrapper
        # cannot be the same as the connection using for all other webrequests,
        # as this will cause transactional errors to occur.

Could you add just a bit more background to those transactional errors? The "to occur" part is just filler to substitute for an actual reason IMHO: you can leave it out without losing anything — except then it becomes more obvious that something is missing.

(Pedantic note #3: I could have sworn that first sentence was written in Asia!)

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/maasserver/bootresources.py'
2--- src/maasserver/bootresources.py 2014-08-26 17:50:29 +0000
3+++ src/maasserver/bootresources.py 2014-08-28 13:11:20 +0000
4@@ -26,8 +26,10 @@
5 from crochet import run_in_reactor
6 from django.db import (
7 close_old_connections,
8+ connections,
9 transaction,
10 )
11+from django.db.utils import load_backend
12 from django.http import (
13 Http404,
14 HttpResponse,
15@@ -102,39 +104,63 @@
16 arches=['amd64'], subarches=['*'], labels=['release'])
17
18
19-class TransactionWrapper:
20- """Wraps `LargeObjectFile` in transaction, so `StreamingHttpResponse`
21- can be used. Once the stream is done, then the transaction is
22- closed.
23+class ConnectionWrapper:
24+ """Wraps `LargeObjectFile` in a new database connection.
25+
26+ `StreamingHttpResponse` runs outside of django context, so connection
27+ that is not shared by django is needed.
28+
29+ A new database connection is made at the start of the interation and is
30+ closed upon close of wrapper.
31 """
32
33- def __init__(self, largeobject):
34+ def __init__(self, largeobject, alias="default"):
35 self.largeobject = largeobject
36- self._atomic = None
37+ self.alias = alias
38+ self._connection = None
39 self._stream = None
40
41+ def _get_new_connection(self):
42+ """Create new database connection."""
43+ db = connections.databases[self.alias]
44+ backend = load_backend(db['ENGINE'])
45+ return backend.DatabaseWrapper(db, self.alias)
46+
47+ def _set_up(self):
48+ """Sets up the connection and stream.
49+
50+ This uses lazy initialisation because it is called each time
51+ `next` is called.
52+ """
53+ if self._connection is None:
54+ self._connection = self._get_new_connection()
55+ self._connection.connect()
56+ self._connection.enter_transaction_management()
57+ self._connection.set_autocommit(False)
58+ if self._stream is None:
59+ self._stream = self.largeobject.open(
60+ 'rb', connection=self._connection)
61+
62 def __iter__(self):
63 return self
64
65 def next(self):
66- if self._atomic is None:
67- self._atomic = transaction.atomic()
68- self._atomic.__enter__()
69- if self._stream is None:
70- self._stream = self.largeobject.open('rb')
71+ self._set_up()
72 data = self._stream.read(self.largeobject.block_size)
73 if len(data) == 0:
74 raise StopIteration
75 return data
76
77 def close(self):
78+ """Close the connection and stream."""
79 if self._stream is not None:
80 self._stream.close()
81 self._stream = None
82- if self._atomic is not None:
83- self._atomic.__exit__()
84- self._atomic = None
85- close_old_connections()
86+ if self._connection is not None:
87+ self._connection.commit()
88+ self._connection.leave_transaction_management()
89+ self._connection.close()
90+ self._connection = None
91
92
93 class SimpleStreamsHandler:
94@@ -297,7 +323,7 @@
95 except BootResourceFile.DoesNotExist:
96 raise Http404()
97 response = StreamingHttpResponse(
98- TransactionWrapper(rfile.largefile.content),
99+ ConnectionWrapper(rfile.largefile.content),
100 content_type='application/octet-stream')
101 response['Content-Length'] = rfile.largefile.total_size
102 return response
103
104=== modified file 'src/maasserver/fields.py'
105--- src/maasserver/fields.py 2014-08-13 21:49:35 +0000
106+++ src/maasserver/fields.py 2014-08-28 13:11:20 +0000
107@@ -444,9 +444,11 @@
108 def __iter__(self):
109 return self
110
111- def open(self, mode="rwb", new_file=None, using="default"):
112+ def open(self, mode="rwb", new_file=None, using="default",
113+ connection=None):
114 """Opens the internal large object instance."""
115- connection = connections[using]
116+ if connection is None:
117+ connection = connections[using]
118 self._lobject = connection.connection.lobject(
119 self.oid, mode, 0, new_file)
120 self.oid = self._lobject.oid
121
122=== modified file 'src/maasserver/tests/test_bootresources.py'
123--- src/maasserver/tests/test_bootresources.py 2014-08-27 12:26:41 +0000
124+++ src/maasserver/tests/test_bootresources.py 2014-08-28 13:11:20 +0000
125@@ -21,7 +21,10 @@
126 from StringIO import StringIO
127
128 from django.core.urlresolvers import reverse
129-from django.db import transaction
130+from django.db import (
131+ connections,
132+ transaction,
133+ )
134 from django.http import StreamingHttpResponse
135 from django.test.client import Client
136 from maasserver import bootresources
137@@ -435,25 +438,23 @@
138 version = resource_set.version
139 resource_file = resource_set.files.order_by('?')[0]
140 filename = resource_file.filename
141- with resource_file.largefile.content.open('rb') as stream:
142- content = stream.read()
143 response = self.get_file_client(
144 os, arch, subarch, series, version, filename)
145 self.assertIsInstance(response, StreamingHttpResponse)
146- self.assertEqual(content, b''.join(response.streaming_content))
147-
148-
149-class TestTransactionWrapper(MAASTestCase):
150- """Tests the use of StreamingHttpResponse(TransactionWrapper(stream)).
151+
152+
153+class TestConnectionWrapper(TransactionTestCase):
154+ """Tests the use of StreamingHttpResponse(ConnectionWrapper(stream)).
155
156 We do not run this inside of `MAASServerTestCase` as that wraps a
157- transaction around each test. This removes that behavior so we can
158- test that the transaction is remaining open for all of the content.
159+ transaction around each test. Since a new connection is created to return
160+ the actual content, the transaction to create the data needs be committed.
161 """
162
163- def test_download(self):
164- # Do the setup inside of a transaction, as we are running in a test
165- # that doesn't enable transactions per test.
166+ def make_file_for_client(self):
167+ # Set up the database information inside of a transaction. This is
168+ # done so the information is committed. As the new connection needs
169+ # to be able to access the data.
170 with transaction.atomic():
171 os = factory.make_name('os')
172 series = factory.make_name('series')
173@@ -477,26 +478,81 @@
174 largefile = factory.make_large_file(content=content, size=size)
175 factory.make_boot_resource_file(
176 resource_set, largefile, filename=filename, filetype=filetype)
177-
178- # Outside of the transaction, we run the actual test. The client will
179- # run inside of its own transaction, but once the streaming response
180- # is returned that transaction will be closed.
181- client = Client()
182- response = client.get(
183- reverse(
184- 'simplestreams_file_handler', kwargs={
185- 'os': os,
186- 'arch': arch,
187- 'subarch': subarch,
188- 'series': series,
189- 'version': version,
190- 'filename': filename,
191- }))
192-
193- # If TransactionWrapper does not work, then a ProgramError will be
194- # thrown. If it works then content will match.
195- self.assertEqual(content, b''.join(response.streaming_content))
196- self.assertTrue(largefile.content.closed)
197+ return content, reverse(
198+ 'simplestreams_file_handler', kwargs={
199+ 'os': os,
200+ 'arch': arch,
201+ 'subarch': subarch,
202+ 'series': series,
203+ 'version': version,
204+ 'filename': filename,
205+ })
206+
207+ def read_response(self, response):
208+ """Read the streaming_content from the response.
209+
210+ :rtype: bytes
211+ """
212+ return b''.join(response.streaming_content)
213+
214+ def test_download_calls__get_new_connection(self):
215+ content, url = self.make_file_for_client()
216+ mock_get_new_connection = self.patch(
217+ bootresources.ConnectionWrapper, '_get_new_connection')
218+
219+ client = Client()
220+ response = client.get(url)
221+ self.read_response(response)
222+ self.assertThat(mock_get_new_connection, MockCalledOnceWith())
223+
224+ def test_download_connection_is_not_same_as_django_connections(self):
225+ content, url = self.make_file_for_client()
226+
227+ class AssertConnectionWrapper(bootresources.ConnectionWrapper):
228+
229+ def _set_up(self):
230+ super(AssertConnectionWrapper, self)._set_up()
231+ # Capture the created connection
232+ AssertConnectionWrapper.connection = self._connection
233+
234+ def close(self):
235+ # Close the stream, but we don't want to close the
236+ # connection as the test is testing that the connection is
237+ # not the same as the connection django is using for other
238+ # webrequests.
239+ if self._stream is not None:
240+ self._stream.close()
241+ self._stream = None
242+ self._connection = None
243+
244+ self.patch(
245+ bootresources, 'ConnectionWrapper', AssertConnectionWrapper)
246+
247+ client = Client()
248+ response = client.get(url)
249+ self.read_response(response)
250+
251+ # Add cleanup to close the connection, since this was removed from
252+ # AssertConnectionWrapper.close method.
253+ def close():
254+ conn = AssertConnectionWrapper.connection
255+ conn.commit()
256+ conn.leave_transaction_management()
257+ conn.close()
258+ self.addCleanup(close)
259+
260+ # The connection that is used by the wrapper cannot be the same as the
261+ # connection be using for all other webrequests. Without this
262+ # seperate the transactional middleware will fail to initialize,
263+ # because the the connection will already be in a transaction.
264+ #
265+ # Note: cannot test if DatabaseWrapper != DatabaseWrapper, as it will
266+ # report true, because the __eq__ operator only checks if the aliases
267+ # are the same. This is checking the underlying connection is
268+ # different, which is the important part.
269+ self.assertNotEqual(
270+ connections["default"].connection,
271+ AssertConnectionWrapper.connection.connection)
272
273
274 def make_product():