Merge ~pappacena/launchpad:fix-signingkey-duplication-on-inject into launchpad:master

Proposed by Thiago F. Pappacena
Status: Merged
Approved by: Thiago F. Pappacena
Approved revision: 487286704f4453e21aa5e9232fcdf64b7ad7bb75
Merge reported by: Otto Co-Pilot
Merged at revision: not available
Proposed branch: ~pappacena/launchpad:fix-signingkey-duplication-on-inject
Merge into: launchpad:master
Diff against target: 125 lines (+89/-6)
2 files modified
lib/lp/services/signing/model/signingkey.py (+14/-6)
lib/lp/services/signing/tests/test_signingkey.py (+75/-0)
Reviewer Review Type Date Requested Status
Colin Watson (community) Approve
Review via email: mp+384448@code.launchpad.net

Commit message

Fixing duplicate key error when injecting the same signing key for different archives.

To post a comment you must log in.
Revision history for this message
Colin Watson (cjwatson) :
review: Approve
4872867... by Thiago F. Pappacena

Fixing typo and removing unecessary store.invalidate

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/lib/lp/services/signing/model/signingkey.py b/lib/lp/services/signing/model/signingkey.py
2index 0479b61..5557e7e 100644
3--- a/lib/lp/services/signing/model/signingkey.py
4+++ b/lib/lp/services/signing/model/signingkey.py
5@@ -106,13 +106,21 @@ class SigningKey(StormBase):
6 signing_service = getUtility(ISigningServiceClient)
7 generated_key = signing_service.inject(
8 key_type, private_key, public_key, description, created_at)
9- signing_key = SigningKey(
10- key_type=key_type, fingerprint=generated_key['fingerprint'],
11- public_key=bytes(public_key),
12- description=description, date_created=created_at)
13+ fingerprint = generated_key['fingerprint']
14+
15 store = IMasterStore(SigningKey)
16- store.add(signing_key)
17- return signing_key
18+ # Check if the key is already saved in the database.
19+ db_key = store.find(
20+ SigningKey,
21+ SigningKey.key_type == key_type,
22+ SigningKey.fingerprint == fingerprint).one()
23+ if db_key is None:
24+ db_key = SigningKey(
25+ key_type=key_type, fingerprint=fingerprint,
26+ public_key=bytes(public_key),
27+ description=description, date_created=created_at)
28+ store.add(db_key)
29+ return db_key
30
31 def sign(self, message, message_name):
32 if self.key_type in (SigningKeyType.UEFI, SigningKeyType.FIT):
33diff --git a/lib/lp/services/signing/tests/test_signingkey.py b/lib/lp/services/signing/tests/test_signingkey.py
34index 8613dd7..81376b1 100644
35--- a/lib/lp/services/signing/tests/test_signingkey.py
36+++ b/lib/lp/services/signing/tests/test_signingkey.py
37@@ -91,6 +91,31 @@ class TestSigningKey(TestCaseWithFactory, TestWithFixtures):
38 self.assertEqual(created_at, db_key.date_created)
39
40 @responses.activate
41+ def test_inject_signing_key_with_existing_fingerprint(self):
42+ self.signing_service.addResponses(self)
43+
44+ priv_key = PrivateKey.generate()
45+ pub_key = priv_key.public_key
46+ created_at = datetime(2020, 4, 16, 16, 35).replace(tzinfo=utc)
47+
48+ key = SigningKey.inject(
49+ SigningKeyType.KMOD, bytes(priv_key), bytes(pub_key),
50+ u"This is a test key", created_at)
51+ self.assertIsInstance(key, SigningKey)
52+
53+ store = IMasterStore(SigningKey)
54+ store.flush()
55+
56+ # This should give back the same key
57+ new_key = SigningKey.inject(
58+ SigningKeyType.KMOD, bytes(priv_key), bytes(pub_key),
59+ u"This is a test key with another description", created_at)
60+ store.flush()
61+
62+ self.assertEqual(key.id, new_key.id)
63+ self.assertEqual(5, len(responses.calls))
64+
65+ @responses.activate
66 def test_sign_some_data(self):
67 self.signing_service.addResponses(self)
68
69@@ -175,6 +200,56 @@ class TestArchiveSigningKey(TestCaseWithFactory):
70 fingerprint=self.signing_service.generated_fingerprint,
71 public_key=bytes(pub_key)))
72
73+ @responses.activate
74+ def test_inject_same_key_for_different_archives(self):
75+ self.signing_service.addResponses(self)
76+
77+ archive = self.factory.makeArchive()
78+ distro_series = archive.distribution.series[0]
79+
80+ priv_key = PrivateKey.generate()
81+ pub_key = priv_key.public_key
82+
83+ now = datetime.now().replace(tzinfo=utc)
84+ arch_key = getUtility(IArchiveSigningKeySet).inject(
85+ SigningKeyType.UEFI, bytes(priv_key), bytes(pub_key),
86+ u"Some description", now, archive,
87+ earliest_distro_series=distro_series)
88+
89+ store = Store.of(arch_key)
90+
91+ rs = store.find(ArchiveSigningKey)
92+ self.assertEqual(1, rs.count())
93+
94+ db_arch_key = rs.one()
95+ self.assertThat(db_arch_key, MatchesStructure.byEquality(
96+ key_type=SigningKeyType.UEFI, archive=archive,
97+ earliest_distro_series=distro_series))
98+
99+ self.assertThat(db_arch_key.signing_key, MatchesStructure.byEquality(
100+ key_type=SigningKeyType.UEFI, description=u"Some description",
101+ fingerprint=self.signing_service.generated_fingerprint,
102+ public_key=bytes(pub_key)))
103+
104+ # Inject the same key for a new archive, and check that the
105+ # corresponding SigningKey object is the same.
106+ another_archive = self.factory.makeArchive()
107+
108+ another_arch_key = getUtility(IArchiveSigningKeySet).inject(
109+ SigningKeyType.UEFI, bytes(priv_key), bytes(pub_key),
110+ u"Another description", now, another_archive)
111+
112+ rs = store.find(ArchiveSigningKey)
113+ self.assertEqual(2, rs.count())
114+
115+ another_db_arch_key = rs.order_by('id').last()
116+ self.assertNotEqual(another_arch_key.id, arch_key.id)
117+ self.assertEqual(another_arch_key.signing_key, arch_key.signing_key)
118+
119+ self.assertThat(another_db_arch_key, MatchesStructure.byEquality(
120+ key_type=SigningKeyType.UEFI, archive=another_archive,
121+ earliest_distro_series=None))
122+
123 def test_create(self):
124 archive = self.factory.makeArchive()
125 distro_series = archive.distribution.series[0]