Merge lp:~rharding/launchpad/nonpublic_1052659 into lp:launchpad

Proposed by Richard Harding
Status: Merged
Approved by: Aaron Bentley
Approved revision: no longer in the source branch.
Merged at revision: 16328
Proposed branch: lp:~rharding/launchpad/nonpublic_1052659
Merge into: lp:launchpad
Diff against target: 406 lines (+235/-86)
2 files modified
lib/lp/blueprints/model/specification.py (+25/-85)
lib/lp/blueprints/tests/test_specification.py (+210/-1)
To merge this branch: bzr merge lp:~rharding/launchpad/nonpublic_1052659
Reviewer Review Type Date Requested Status
Aaron Bentley (community) Approve
Review via email: mp+136479@code.launchpad.net

Commit message

Make SpecificationSet.specifications privacy aware for the user in question.

Description of the change

= Summary =

The code was changed to only pull public specifications. In order to change
the method to be privacy aware it needed to be storm-ified and in the end much
of the query building tools already existed to do that.

== Pre Implementation ==

A little bit of chat with Aaron getting a sanity check on the storm-ification.

== Implementation Notes ==

Since the original method wasn't well tested, I copied over the product specification tests on Aaron's suggestion. The tests required some love and tweaking in order to work with a SpecificationSet vs a single Product, but I added them to trunk first with the only failing tests being those around non-public specifications. Then moved the tests over to this branch of work and they all passes first try.

Updated the specifications method to work via a constructed storm query adding
in the privacy tables and clauses via the existing visible_specification_query
method.

Then used the existing get_specification_filters to filter as required.

Then finally just added a quick test to make sure we get/don't get the right
specs when using the method.

== QA ==

Per the bug, having both public and non-public specs the owner should see
the non-public ones they have a grant to while the public only user should not
see them listed on the home page.

== Tests ==

New test in lib/lp/blueprints/tests/test_specification.py

The actual code is tested via a lot of other locations though.

To post a comment you must log in.
Revision history for this message
Aaron Bentley (abentley) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/blueprints/model/specification.py'
2--- lib/lp/blueprints/model/specification.py 2012-11-26 08:33:03 +0000
3+++ lib/lp/blueprints/model/specification.py 2012-11-29 15:36:27 +0000
4@@ -108,7 +108,6 @@
5 from lp.services.database.lpstorm import IStore
6 from lp.services.database.sqlbase import (
7 cursor,
8- quote,
9 SQLBase,
10 sqlvalues,
11 )
12@@ -996,11 +995,12 @@
13 :return: A DecoratedResultSet with Person precaching setup.
14 """
15 # Circular import.
16- from lp.registry.model.person import Person
17 if isinstance(clauses, basestring):
18 clauses = [SQL(clauses)]
19
20 def cache_people(rows):
21+ """DecoratedResultSet pre_iter_hook to eager load Person attributes."""
22+ from lp.registry.model.person import Person
23 # Find the people we need:
24 person_ids = set()
25 for spec in rows:
26@@ -1017,7 +1017,7 @@
27 origin.extend(validity_info["joins"])
28 columns.extend(validity_info["tables"])
29 decorators = validity_info["decorators"]
30- personset = Store.of(self).using(*origin).find(
31+ personset = IStore(Specification).using(*origin).find(
32 tuple(columns),
33 Person.id.is_in(person_ids),
34 )
35@@ -1029,7 +1029,7 @@
36 index += 1
37 decorator(person, column)
38
39- results = Store.of(self).using(*tables).find(Specification, *clauses)
40+ results = IStore(Specification).using(*tables).find(Specification, *clauses)
41 return DecoratedResultSet(results, pre_iter_hook=cache_people)
42
43 @property
44@@ -1088,100 +1088,40 @@
45
46 def specifications(self, user, sort=None, quantity=None, filter=None,
47 prejoin_people=True):
48- """See IHasSpecifications."""
49-
50- # Make a new list of the filter, so that we do not mutate what we
51- # were passed as a filter
52+ store = IStore(Specification)
53+
54+ # Take the visibility due to privacy into account.
55+ privacy_tables, clauses = visible_specification_query(user)
56+
57 if not filter:
58- # When filter is None or [] then we decide the default
59- # which for a product is to show incomplete specs
60+ # Default to showing incomplete specs
61 filter = [SpecificationFilter.INCOMPLETE]
62
63- # now look at the filter and fill in the unsaid bits
64-
65- # defaults for completeness: if nothing is said about completeness
66- # then we want to show INCOMPLETE
67- completeness = False
68- for option in [
69- SpecificationFilter.COMPLETE,
70- SpecificationFilter.INCOMPLETE]:
71- if option in filter:
72- completeness = True
73- if completeness is False:
74- filter.append(SpecificationFilter.INCOMPLETE)
75-
76- # defaults for acceptance: in this case we have nothing to do
77- # because specs are not accepted/declined against a distro
78-
79- # defaults for informationalness: we don't have to do anything
80- # because the default if nothing is said is ANY
81+ spec_clauses = get_specification_filters(filter)
82+ clauses.extend(spec_clauses)
83
84 # sort by priority descending, by default
85 if sort is None or sort == SpecificationSort.PRIORITY:
86- order = ['-priority', 'Specification.definition_status',
87- 'Specification.name']
88+ order = [Desc(Specification.priority),
89+ Specification.definition_status,
90+ Specification.name]
91+
92 elif sort == SpecificationSort.DATE:
93 if SpecificationFilter.COMPLETE in filter:
94 # if we are showing completed, we care about date completed
95- order = ['-Specification.date_completed', 'Specification.id']
96+ order = [Desc(Specification.date_completed),
97+ Specification.id]
98 else:
99 # if not specially looking for complete, we care about date
100 # registered
101- order = ['-Specification.datecreated', 'Specification.id']
102-
103- # figure out what set of specifications we are interested in. for
104- # products, we need to be able to filter on the basis of:
105- #
106- # - completeness.
107- # - informational.
108- #
109-
110- # filter out specs on inactive products
111- base = """((Specification.product IS NULL OR
112- Specification.product NOT IN
113- (SELECT Product.id FROM Product
114- WHERE Product.active IS FALSE))
115- AND Specification.information_type = 1)
116- """
117- query = base
118- # look for informational specs
119- if SpecificationFilter.INFORMATIONAL in filter:
120- query += (' AND Specification.implementation_status = %s ' %
121- quote(SpecificationImplementationStatus.INFORMATIONAL.value))
122-
123- # filter based on completion. see the implementation of
124- # Specification.is_complete() for more details
125- completeness = Specification.completeness_clause
126-
127- if SpecificationFilter.COMPLETE in filter:
128- query += ' AND ( %s ) ' % completeness
129- elif SpecificationFilter.INCOMPLETE in filter:
130- query += ' AND NOT ( %s ) ' % completeness
131-
132- # Filter for validity. If we want valid specs only then we should
133- # exclude all OBSOLETE or SUPERSEDED specs
134- if SpecificationFilter.VALID in filter:
135- # XXX: kiko 2007-02-07: this is untested and was broken.
136- query += (
137- ' AND Specification.definition_status NOT IN ( %s, %s ) ' %
138- sqlvalues(SpecificationDefinitionStatus.OBSOLETE,
139- SpecificationDefinitionStatus.SUPERSEDED))
140-
141- # ALL is the trump card
142- if SpecificationFilter.ALL in filter:
143- query = base
144-
145- # Filter for specification text
146- for constraint in filter:
147- if isinstance(constraint, basestring):
148- # a string in the filter is a text search filter
149- query += ' AND Specification.fti @@ ftq(%s) ' % quote(
150- constraint)
151-
152- results = Specification.select(query, orderBy=order, limit=quantity)
153+ order = [Desc(Specification.datecreated), Specification.id]
154+
155 if prejoin_people:
156- results = results.prejoin(['assignee', 'approver', 'drafter'])
157- return results
158+ results = self._preload_specifications_people(privacy_tables, clauses)
159+ else:
160+ results = store.using(*privacy_tables).find(
161+ Specification, *clauses)
162+ return results.order_by(*order)[:quantity]
163
164 def getByURL(self, url):
165 """See ISpecificationSet."""
166
167=== modified file 'lib/lp/blueprints/tests/test_specification.py'
168--- lib/lp/blueprints/tests/test_specification.py 2012-11-22 14:06:36 +0000
169+++ lib/lp/blueprints/tests/test_specification.py 2012-11-29 15:36:27 +0000
170@@ -5,7 +5,11 @@
171
172 __metaclass__ = type
173
174-
175+from datetime import (
176+ datetime,
177+ timedelta,
178+ )
179+import pytz
180 from storm.store import Store
181 from zope.component import (
182 getUtility,
183@@ -28,7 +32,11 @@
184 from lp.blueprints.enums import (
185 NewSpecificationDefinitionStatus,
186 SpecificationDefinitionStatus,
187+ SpecificationFilter,
188 SpecificationGoalStatus,
189+ SpecificationImplementationStatus,
190+ SpecificationPriority,
191+ SpecificationSort,
192 )
193 from lp.blueprints.errors import TargetAlreadyHasSpecification
194 from lp.blueprints.interfaces.specification import ISpecificationSet
195@@ -40,6 +48,7 @@
196 SharingPermission,
197 SpecificationSharingPolicy,
198 )
199+from lp.registry.interfaces.accesspolicy import IAccessPolicySource
200 from lp.security import (
201 AdminSpecification,
202 EditSpecificationByRelatedPeople,
203@@ -533,3 +542,203 @@
204 definition_status=SpecificationDefinitionStatus.OBSOLETE)
205 self.assertRaises(
206 AssertionError, self.specification_set.new, **args)
207+
208+
209+def list_result(context, filter=None, user=None):
210+ result = context.specifications(
211+ user, SpecificationSort.DATE, filter=filter)
212+ return list(result)
213+
214+
215+class TestSpecifications(TestCaseWithFactory):
216+
217+ layer = DatabaseFunctionalLayer
218+
219+ def setUp(self):
220+ super(TestSpecifications, self).setUp()
221+ self.date_created = datetime.now(pytz.utc)
222+
223+ def makeSpec(self, product=None, date_created=0, title=None,
224+ status=NewSpecificationDefinitionStatus.NEW,
225+ name=None, priority=None, information_type=None):
226+ blueprint = self.factory.makeSpecification(
227+ title=title, status=status, name=name, priority=priority,
228+ information_type=information_type, product=product,
229+ )
230+ removeSecurityProxy(blueprint).datecreated = (
231+ self.date_created + timedelta(date_created))
232+ return blueprint
233+
234+ def test_specifications_quantity(self):
235+ # Ensure the quantity controls the maximum number of entries.
236+ context = getUtility(ISpecificationSet)
237+ product = self.factory.makeProduct()
238+ for count in range(10):
239+ self.factory.makeSpecification(product=product)
240+ self.assertEqual(20, context.specifications(None).count())
241+ result = context.specifications(None, quantity=None).count()
242+ self.assertEqual(20, result)
243+ self.assertEqual(8, context.specifications(None, quantity=8).count())
244+ self.assertEqual(11, context.specifications(None, quantity=11).count())
245+
246+ def test_date_sort(self):
247+ # Sort on date_created.
248+ context = getUtility(ISpecificationSet)
249+ product = self.factory.makeProduct()
250+ blueprint1 = self.makeSpec(product, date_created=0)
251+ blueprint2 = self.makeSpec(product, date_created=-1)
252+ blueprint3 = self.makeSpec(product, date_created=1)
253+ result = list_result(context)
254+ self.assertEqual([blueprint3, blueprint1, blueprint2], result[0:3])
255+
256+ def test_date_sort_id(self):
257+ # date-sorting when no date varies uses object id.
258+ context = getUtility(ISpecificationSet)
259+ product = self.factory.makeProduct()
260+ blueprint1 = self.makeSpec(product)
261+ blueprint2 = self.makeSpec(product)
262+ blueprint3 = self.makeSpec(product)
263+ result = list_result(context)
264+ self.assertEqual([blueprint1, blueprint2, blueprint3], result[0:3])
265+
266+ def test_priority_sort(self):
267+ # Sorting by priority works and is the default.
268+ # When priority is supplied, status is ignored.
269+ context = getUtility(ISpecificationSet)
270+ blueprint1 = self.makeSpec(priority=SpecificationPriority.UNDEFINED,
271+ status=SpecificationDefinitionStatus.NEW)
272+ product = blueprint1.product
273+ blueprint2 = self.makeSpec(
274+ product, priority=SpecificationPriority.NOTFORUS,
275+ status=SpecificationDefinitionStatus.APPROVED)
276+ blueprint3 = self.makeSpec(
277+ product, priority=SpecificationPriority.LOW,
278+ status=SpecificationDefinitionStatus.NEW)
279+ result = list(context.specifications(
280+ None, sort=SpecificationSort.PRIORITY))
281+ self.assertTrue(
282+ result.index(blueprint3) <
283+ result.index(blueprint1) <
284+ result.index(blueprint2))
285+
286+ def test_priority_sort_fallback_is_priority(self):
287+ # Sorting by default falls back to Priority
288+ context = getUtility(ISpecificationSet)
289+ blueprint1 = self.makeSpec(name='b')
290+ product = blueprint1.product
291+ self.makeSpec(product, name='c')
292+ self.makeSpec(product, name='a')
293+ base_result = context.specifications(None)
294+ priority_result = context.specifications(
295+ None, sort=SpecificationSort.PRIORITY)
296+ self.assertEqual(list(base_result), list(priority_result))
297+
298+ def test_informational(self):
299+ # INFORMATIONAL causes only informational specs to be shown.
300+ context = getUtility(ISpecificationSet)
301+ enum = SpecificationImplementationStatus
302+ informational = self.factory.makeSpecification(
303+ implementation_status=enum.INFORMATIONAL)
304+ product = informational.product
305+ plain = self.factory.makeSpecification(product=product)
306+ result = context.specifications(None)
307+ self.assertIn(informational, result)
308+ self.assertIn(plain, result)
309+ result = context.specifications(
310+ None, filter=[SpecificationFilter.INFORMATIONAL])
311+ self.assertIn(informational, result)
312+ self.assertNotIn(plain, result)
313+
314+ def test_completeness(self):
315+ # If COMPLETE is specified, completed specs are listed. If INCOMPLETE
316+ # is specified or neither is specified, only incomplete specs are
317+ # listed.
318+ context = getUtility(ISpecificationSet)
319+ enum = SpecificationImplementationStatus
320+ implemented = self.factory.makeSpecification(
321+ implementation_status=enum.IMPLEMENTED)
322+ product = implemented.product
323+ non_implemented = self.factory.makeSpecification(product=product)
324+ result = context.specifications(
325+ None, filter=[SpecificationFilter.COMPLETE])
326+ self.assertIn(implemented, result)
327+ self.assertNotIn(non_implemented, result)
328+
329+ result = context.specifications(
330+ None, filter=[SpecificationFilter.INCOMPLETE])
331+ self.assertNotIn(implemented, result)
332+ self.assertIn(non_implemented, result)
333+ result = context.specifications(None)
334+ self.assertNotIn(implemented, result)
335+ self.assertIn(non_implemented, result)
336+
337+ def test_all(self):
338+ # ALL causes both complete and incomplete to be listed.
339+ context = getUtility(ISpecificationSet)
340+ enum = SpecificationImplementationStatus
341+ implemented = self.factory.makeSpecification(
342+ implementation_status=enum.IMPLEMENTED)
343+ product = implemented.product
344+ non_implemented = self.factory.makeSpecification(product=product)
345+ result = context.specifications(None, filter=[SpecificationFilter.ALL])
346+ self.assertIn(implemented, result)
347+ self.assertIn(non_implemented, result)
348+
349+ def test_valid(self):
350+ # VALID adjusts COMPLETE to exclude OBSOLETE and SUPERSEDED specs.
351+ # (INCOMPLETE already excludes OBSOLETE and SUPERSEDED.)
352+ context = getUtility(ISpecificationSet)
353+ i_enum = SpecificationImplementationStatus
354+ d_enum = SpecificationDefinitionStatus
355+ implemented = self.factory.makeSpecification(
356+ implementation_status=i_enum.IMPLEMENTED)
357+ product = implemented.product
358+ superseded = self.factory.makeSpecification(product=product,
359+ status=d_enum.SUPERSEDED)
360+ self.factory.makeSpecification(product=product, status=d_enum.OBSOLETE)
361+ filter = [SpecificationFilter.VALID, SpecificationFilter.COMPLETE]
362+ results = context.specifications(None, filter=filter)
363+ self.assertIn(implemented, results)
364+ self.assertNotIn(superseded, results)
365+
366+ def test_text_search(self):
367+ # Text searches work.
368+ context = getUtility(ISpecificationSet)
369+ blueprint1 = self.makeSpec(title='abc')
370+ product = blueprint1.product
371+ blueprint2 = self.makeSpec(product, title='def')
372+ result = list_result(context, ['abc'])
373+ self.assertEqual([blueprint1], result)
374+ result = list_result(product, ['def'])
375+ self.assertEqual([blueprint2], result)
376+
377+ def test_proprietary_not_listed(self):
378+ # Proprietary blueprints are not listed for random users
379+ context = getUtility(ISpecificationSet)
380+ private_spec = self.makeSpec(
381+ information_type=InformationType.PROPRIETARY)
382+ self.assertNotIn(private_spec, list(context.specifications(None)))
383+
384+ def test_proprietary_listed_for_artifact_grant(self):
385+ # Proprietary blueprints are listed for users with an artifact grant.
386+ context = getUtility(ISpecificationSet)
387+ blueprint1 = self.makeSpec(
388+ information_type=InformationType.PROPRIETARY)
389+ grant = self.factory.makeAccessArtifactGrant(
390+ concrete_artifact=blueprint1)
391+ self.assertIn(
392+ blueprint1,
393+ list(context.specifications(grant.grantee)))
394+
395+ def test_proprietary_listed_for_policy_grant(self):
396+ # Proprietary blueprints are listed for users with a policy grant.
397+ context = getUtility(ISpecificationSet)
398+ blueprint1 = self.makeSpec(
399+ information_type=InformationType.PROPRIETARY)
400+ policy_source = getUtility(IAccessPolicySource)
401+ (policy,) = policy_source.find(
402+ [(blueprint1.product, InformationType.PROPRIETARY)])
403+ grant = self.factory.makeAccessPolicyGrant(policy)
404+ self.assertIn(
405+ blueprint1,
406+ list(context.specifications(user=grant.grantee)))