Merge ~pappacena/launchpad:oci-multi-arch-upload into launchpad:master

Proposed by Thiago F. Pappacena
Status: Merged
Approved by: Thiago F. Pappacena
Approved revision: caa19b04584a4e979701275cbd70ead4ed4334d8
Merge reported by: Otto Co-Pilot
Merged at revision: not available
Proposed branch: ~pappacena/launchpad:oci-multi-arch-upload
Merge into: launchpad:master
Prerequisite: ~pappacena/launchpad:oci-unify-request-builds
Diff against target: 1057 lines (+607/-56)
10 files modified
database/schema/security.cfg (+1/-0)
lib/lp/oci/interfaces/ocirecipe.py (+10/-0)
lib/lp/oci/interfaces/ocirecipejob.py (+12/-0)
lib/lp/oci/interfaces/ociregistryclient.py (+3/-0)
lib/lp/oci/model/ocirecipe.py (+16/-1)
lib/lp/oci/model/ocirecipebuildjob.py (+117/-3)
lib/lp/oci/model/ocirecipejob.py (+12/-0)
lib/lp/oci/model/ociregistryclient.py (+92/-23)
lib/lp/oci/tests/test_ocirecipebuildjob.py (+249/-5)
lib/lp/oci/tests/test_ociregistryclient.py (+95/-24)
Reviewer Review Type Date Requested Status
Colin Watson (community) Approve
Review via email: mp+391783@code.launchpad.net

Commit message

Upload multi-arch OCI images

To post a comment you must log in.
Revision history for this message
Thiago F. Pappacena (pappacena) :
6c75d70... by Thiago F. Pappacena

Merge branch 'master' into oci-multi-arch-upload

Revision history for this message
Colin Watson (cjwatson) wrote :

I can't effectively review the actual manifest handling - it's probably best to ask Tom to check that over.

review: Needs Fixing
2870200... by Thiago F. Pappacena

Code style, typo and better code documentation

Revision history for this message
Thiago F. Pappacena (pappacena) wrote :

Replied to the comments about `allBuildsUploaded` method. I would like your opinion on that while I work on the job retry details.

Revision history for this message
Thiago F. Pappacena (pappacena) :
08c1c68... by Thiago F. Pappacena

Refactoring and adding tests for concurrent oci img upload jobs check

Revision history for this message
Thiago F. Pappacena (pappacena) wrote :

Added retry strategy and some tests to ensure that we are using database locks the way we expect, to queue up the check for manifest list upload.

4bad06d... by Thiago F. Pappacena

Merge branch 'master' into oci-multi-arch-upload

8b6e704... by Thiago F. Pappacena

Fixing exception catching in test

15622d5... by Thiago F. Pappacena

Fixing _upload_layer mock return value in tests

Revision history for this message
Colin Watson (cjwatson) :
Revision history for this message
Colin Watson (cjwatson) :
review: Approve
bd79d6e... by Thiago F. Pappacena

Checking also final result on concurrent OCIRecipeUploadJob

caa19b0... by Thiago F. Pappacena

Refactoring

Revision history for this message
Thiago F. Pappacena (pappacena) wrote :

Addressed all comments. As suggested by twom, I'll test it with microk8s before merging it.

Revision history for this message
Thiago F. Pappacena (pappacena) wrote :

Tested with microk8s and it seems to be ok.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/database/schema/security.cfg b/database/schema/security.cfg
2index 8a9a31b..d5a23f1 100644
3--- a/database/schema/security.cfg
4+++ b/database/schema/security.cfg
5@@ -2710,6 +2710,7 @@ public.account = SELECT
6 public.archive = SELECT
7 public.branch = SELECT
8 public.builder = SELECT
9+public.builderprocessor = SELECT
10 public.buildfarmjob = SELECT, INSERT
11 public.buildqueue = SELECT, INSERT, UPDATE
12 public.distribution = SELECT
13diff --git a/lib/lp/oci/interfaces/ocirecipe.py b/lib/lp/oci/interfaces/ocirecipe.py
14index 0bd3cd2..131c4d3 100644
15--- a/lib/lp/oci/interfaces/ocirecipe.py
16+++ b/lib/lp/oci/interfaces/ocirecipe.py
17@@ -178,6 +178,16 @@ class IOCIRecipeBuildRequest(Interface):
18 title=_("If set, limit builds to these architecture tags."),
19 value_type=TextLine(), required=False, readonly=True)
20
21+ uploaded_manifests = Dict(
22+ title=_("A dict of manifest information per build."),
23+ key_type=Int(), value_type=Dict(),
24+ required=False, readonly=True)
25+
26+ def addUploadedManifest(build_id, manifest_info):
27+ """Add the manifest information for one of the builds in this
28+ BuildRequest.
29+ """
30+
31
32 class IOCIRecipeView(Interface):
33 """`IOCIRecipe` attributes that require launchpad.View permission."""
34diff --git a/lib/lp/oci/interfaces/ocirecipejob.py b/lib/lp/oci/interfaces/ocirecipejob.py
35index 104a2ff..2b59faf 100644
36--- a/lib/lp/oci/interfaces/ocirecipejob.py
37+++ b/lib/lp/oci/interfaces/ocirecipejob.py
38@@ -19,6 +19,8 @@ from zope.interface import (
39 )
40 from zope.schema import (
41 Datetime,
42+ Dict,
43+ Int,
44 List,
45 TextLine,
46 )
47@@ -78,6 +80,16 @@ class IOCIRecipeRequestBuildsJob(IRunnableJob):
48 error_message = TextLine(
49 title=_("Error message"), required=False, readonly=True)
50
51+ uploaded_manifests = Dict(
52+ title=_("A dict of manifest information per build."),
53+ key_type=Int(), value_type=Dict(),
54+ required=False, readonly=True)
55+
56+ def addUploadedManifest(build_id, manifest_info):
57+ """Add the manifest information for one of the builds in this
58+ BuildRequest.
59+ """
60+
61
62 class IOCIRecipeRequestBuildsJobSource(IJobSource):
63
64diff --git a/lib/lp/oci/interfaces/ociregistryclient.py b/lib/lp/oci/interfaces/ociregistryclient.py
65index 184a93f..71af23a 100644
66--- a/lib/lp/oci/interfaces/ociregistryclient.py
67+++ b/lib/lp/oci/interfaces/ociregistryclient.py
68@@ -54,3 +54,6 @@ class IOCIRegistryClient(Interface):
69
70 :param build: The `IOCIRecipeBuild` to upload.
71 """
72+
73+ def uploadManifestList(build_request):
74+ """Upload the "fat manifest" which aggregates all platforms built."""
75diff --git a/lib/lp/oci/model/ocirecipe.py b/lib/lp/oci/model/ocirecipe.py
76index fd21dde..ab7f655 100644
77--- a/lib/lp/oci/model/ocirecipe.py
78+++ b/lib/lp/oci/model/ocirecipe.py
79@@ -38,7 +38,10 @@ from zope.component import (
80 from zope.event import notify
81 from zope.interface import implementer
82 from zope.security.interfaces import Unauthorized
83-from zope.security.proxy import removeSecurityProxy
84+from zope.security.proxy import (
85+ isinstance as zope_isinstance,
86+ removeSecurityProxy,
87+ )
88
89 from lp.app.interfaces.launchpad import ILaunchpadCelebrities
90 from lp.app.interfaces.security import IAuthorization
91@@ -697,6 +700,13 @@ class OCIRecipeBuildRequest:
92 return self.job.date_finished
93
94 @property
95+ def uploaded_manifests(self):
96+ return self.job.uploaded_manifests
97+
98+ def addUploadedManifest(self, build_id, manifest_info):
99+ self.job.addUploadedManifest(build_id, manifest_info)
100+
101+ @property
102 def status(self):
103 status_map = {
104 JobStatus.WAITING: OCIRecipeBuildRequestStatus.PENDING,
105@@ -714,3 +724,8 @@ class OCIRecipeBuildRequest:
106 @property
107 def builds(self):
108 return self.job.builds
109+
110+ def __eq__(self, other):
111+ if not zope_isinstance(other, self.__class__):
112+ return False
113+ return self.id == other.id
114diff --git a/lib/lp/oci/model/ocirecipebuildjob.py b/lib/lp/oci/model/ocirecipebuildjob.py
115index 73fa4d6..991b9a4 100644
116--- a/lib/lp/oci/model/ocirecipebuildjob.py
117+++ b/lib/lp/oci/model/ocirecipebuildjob.py
118@@ -11,6 +11,8 @@ __all__ = [
119 'OCIRecipeBuildJobType',
120 ]
121
122+from datetime import timedelta
123+import random
124
125 from lazr.delegates import delegate_to
126 from lazr.enum import (
127@@ -41,8 +43,12 @@ from lp.oci.interfaces.ociregistryclient import (
128 OCIRegistryError,
129 )
130 from lp.services.database.enumcol import DBEnum
131-from lp.services.database.interfaces import IStore
132+from lp.services.database.interfaces import (
133+ IMasterStore,
134+ IStore,
135+ )
136 from lp.services.database.stormbase import StormBase
137+from lp.services.job.interfaces.job import JobStatus
138 from lp.services.job.model.job import (
139 EnumeratedSubclass,
140 Job,
141@@ -156,16 +162,34 @@ class OCIRecipeBuildJobDerived(
142 @implementer(IOCIRegistryUploadJob)
143 @provider(IOCIRegistryUploadJobSource)
144 class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
145+ """Manages the OCI image upload to registries.
146+
147+ This job coordinates with other OCIRegistryUploadJob in a way that the
148+ last job uploading its layers and manifest will also upload the
149+ manifest list with all the previously built OCI images uploaded from
150+ other architectures in the same build request. To avoid race conditions,
151+ we synchronize that using a SELECT ... FOR UPDATE at database level to
152+ make sure that the status is consistent across all the upload jobs.
153+ """
154
155 class_job_type = OCIRecipeBuildJobType.REGISTRY_UPLOAD
156
157+ class ManifestListUploadError(Exception):
158+ pass
159+
160+ retry_error_types = (ManifestListUploadError, )
161+ max_retries = 5
162+
163 @classmethod
164 def create(cls, build):
165 """See `IOCIRegistryUploadJobSource`"""
166 edited_fields = set()
167 with notify_modified(build, edited_fields) as before_modification:
168+ json_data = {
169+ "build_uploaded": False,
170+ }
171 oci_build_job = OCIRecipeBuildJob(
172- build, cls.class_job_type, {})
173+ build, cls.class_job_type, json_data)
174 job = cls(oci_build_job)
175 job.celeryRunOnCommit()
176 del get_property_cache(build).last_registry_upload_job
177@@ -174,6 +198,14 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
178 edited_fields.add("registry_upload_status")
179 return job
180
181+ @property
182+ def retry_delay(self):
183+ dithering_secs = int(random.random() * 60)
184+ # Adds some random seconds between 0 and 60 to minimize the
185+ # likelihood of synchronized retries holding locks on the database
186+ # at the same time.
187+ return timedelta(minutes=10, seconds=dithering_secs)
188+
189 # Ideally we'd just override Job._set_status or similar, but
190 # lazr.delegates makes that difficult, so we use this to override all
191 # the individual Job lifecycle methods instead.
192@@ -227,6 +259,74 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
193 """See `IOCIRegistryUploadJob`."""
194 self.json_data["errors"] = errors
195
196+ @property
197+ def build_uploaded(self):
198+ return self.json_data.get("build_uploaded", False)
199+
200+ @build_uploaded.setter
201+ def build_uploaded(self, value):
202+ self.json_data["build_uploaded"] = bool(value)
203+
204+ def allBuildsUploaded(self, build_request):
205+ """Returns True if all builds of the given build_request already
206+ finished uploading. False otherwise.
207+
208+ Note that this method locks all upload jobs at database level,
209+ preventing them from updating their status until the end of the
210+ current transaction. Use it with caution.
211+ """
212+ builds = list(build_request.builds)
213+ uploads_per_build = {i: list(i.registry_upload_jobs) for i in builds}
214+ upload_jobs = sum(uploads_per_build.values(), [])
215+
216+ # Lock the Job rows, so no other job updates its status until the
217+ # end of this job's transaction. This is done to avoid race conditions,
218+ # where 2 upload jobs could be running simultaneously and none of them
219+ # realises that is the last upload.
220+ # Note also that new upload jobs might be created between the
221+ # transaction begin and this lock takes place, but in this case the
222+ # new upload is either a retry from a failed upload, or the first
223+ # upload for one of the existing builds. Either way, we would see that
224+ # build as "not uploaded yet", which is ok for this method, and the
225+ # new job will block until these locks are released, so we should be
226+ # safe.
227+ store = IMasterStore(builds[0])
228+ placeholders = ', '.join('?' for _ in upload_jobs)
229+ sql = (
230+ "SELECT id, status FROM job WHERE id IN (%s) FOR UPDATE"
231+ % placeholders)
232+ job_status = {
233+ job_id: JobStatus.items[status] for job_id, status in
234+ store.execute(sql, [i.job_id for i in upload_jobs])}
235+
236+ for build, upload_jobs in uploads_per_build.items():
237+ has_finished_upload = any(
238+ job_status[i.job_id] == JobStatus.COMPLETED
239+ or i.job_id == self.job_id
240+ for i in upload_jobs)
241+ if not has_finished_upload:
242+ return False
243+ return True
244+
245+ def uploadManifestList(self, client):
246+ """Uploads the aggregated manifest list for all builds in the
247+ current build request.
248+ """
249+ # The "allBuildsUploaded" call will lock, on the database,
250+ # all upload jobs for update until this transaction finishes.
251+ # So, make sure this is the last thing being done by this job.
252+ build_request = self.build.build_request
253+ if not build_request:
254+ return
255+ try:
256+ if self.allBuildsUploaded(build_request):
257+ client.uploadManifestList(build_request)
258+ except OCIRegistryError:
259+ # Do not retry automatically on known OCI registry errors.
260+ raise
261+ except Exception as e:
262+ raise self.ManifestListUploadError(str(e))
263+
264 def run(self):
265 """See `IRunnableJob`."""
266 client = getUtility(IOCIRegistryClient)
267@@ -234,7 +334,21 @@ class OCIRegistryUploadJob(OCIRecipeBuildJobDerived):
268 # it will need to gain retry support.
269 try:
270 try:
271- client.upload(self.build)
272+ if not self.build_uploaded:
273+ client.upload(self.build)
274+ self.build_uploaded = True
275+
276+ self.uploadManifestList(client)
277+ # Force this job status to COMPLETED in the same transaction we
278+ # called `allBuildsUpdated` (in the uploadManifestList call
279+ # above) to release the lock already including the new status.
280+ # This way, any other transaction that was blocked waiting to
281+ # get the info about the upload jobs will immediately have the
282+ # new status of this job, avoiding race conditions. Keep the
283+ # `manage_transaction=False` to prevent the method from
284+ # commiting at the wrong moment.
285+ self.complete(manage_transaction=False)
286+
287 except OCIRegistryError as e:
288 self.error_summary = str(e)
289 self.errors = e.errors
290diff --git a/lib/lp/oci/model/ocirecipejob.py b/lib/lp/oci/model/ocirecipejob.py
291index 15eac4f..3320cff 100644
292--- a/lib/lp/oci/model/ocirecipejob.py
293+++ b/lib/lp/oci/model/ocirecipejob.py
294@@ -169,6 +169,8 @@ class OCIRecipeRequestBuildsJob(OCIRecipeJobDerived):
295 "requester": requester.id,
296 "architectures": (
297 list(architectures) if architectures is not None else None),
298+ # A dict of build_id: manifest location
299+ "uploaded_manifests": {}
300 }
301 recipe_job = OCIRecipeJob(recipe, cls.class_job_type, metadata)
302 job = cls(recipe_job)
303@@ -258,6 +260,16 @@ class OCIRecipeRequestBuildsJob(OCIRecipeJobDerived):
304 architectures = self.metadata["architectures"]
305 return set(architectures) if architectures is not None else None
306
307+ @property
308+ def uploaded_manifests(self):
309+ return {
310+ # Converts keys to integer since saving json to database
311+ # converts them to strings.
312+ int(k): v for k, v in self.metadata["uploaded_manifests"].items()}
313+
314+ def addUploadedManifest(self, build_id, manifest_info):
315+ self.metadata["uploaded_manifests"][int(build_id)] = manifest_info
316+
317 def run(self):
318 """See `IRunnableJob`."""
319 requester = self.requester
320diff --git a/lib/lp/oci/model/ociregistryclient.py b/lib/lp/oci/model/ociregistryclient.py
321index 7561267..7c67b57 100644
322--- a/lib/lp/oci/model/ociregistryclient.py
323+++ b/lib/lp/oci/model/ociregistryclient.py
324@@ -229,17 +229,59 @@ class OCIRegistryClient:
325 """
326 # XXX twom 2020-04-17 This needs to include OCIProjectSeries and
327 # base image name
328-
329 return "{}".format("edge")
330
331 @classmethod
332+ def _uploadRegistryManifest(cls, http_client, registry_manifest,
333+ push_rule, build=None):
334+ """Uploads the build manifest, returning its content information.
335+
336+ The returned information can be used to create a Manifest list
337+ including the uploaded manifest, for example, in order to create
338+ multi-architecture images.
339+
340+ :return: A dict with {"digest": "sha256:xxx", "size": total_bytes}
341+ """
342+ digest = None
343+ data = json.dumps(registry_manifest)
344+ size = len(data)
345+ content_type = registry_manifest.get(
346+ "mediaType",
347+ "application/vnd.docker.distribution.manifest.v2+json")
348+ if build is None:
349+ # When uploading a manifest list, use the tag.
350+ tag = cls._calculateTag(build, push_rule)
351+ else:
352+ # When uploading individual build manifests, use their digest.
353+ tag = "sha256:%s" % hashlib.sha256(data).hexdigest()
354+ try:
355+ manifest_response = http_client.requestPath(
356+ "/manifests/{}".format(tag),
357+ data=data,
358+ headers={"Content-Type": content_type},
359+ method="PUT")
360+ digest = manifest_response.headers.get("Docker-Content-Digest")
361+ except HTTPError as http_error:
362+ manifest_response = http_error.response
363+ if manifest_response.status_code != 201:
364+ if build:
365+ msg = "Failed to upload manifest for {} ({}) in {}".format(
366+ build.recipe.name, push_rule.registry_url, build.id)
367+ else:
368+ msg = ("Failed to upload manifest of manifests for"
369+ " {} ({})").format(
370+ push_rule.recipe.name, push_rule.registry_url)
371+ raise cls._makeRegistryError(
372+ ManifestUploadFailed, msg, manifest_response)
373+ return {"digest": digest, "size": size}
374+
375+ @classmethod
376 def _upload_to_push_rule(
377 cls, push_rule, build, manifest, digests, preloaded_data):
378 http_client = RegistryHTTPClient.getInstance(push_rule)
379
380 for section in manifest:
381- # Work out names and tags
382- tag = cls._calculateTag(build, push_rule)
383+ # Work out names
384 file_data = preloaded_data[section["Config"]]
385 config = file_data["config_file"]
386 # Upload the layers involved
387@@ -269,24 +311,13 @@ class OCIRegistryClient:
388 layer_sizes)
389
390 # Upload the registry manifest
391- try:
392- manifest_response = http_client.requestPath(
393- "/manifests/{}".format(tag),
394- json=registry_manifest,
395- headers={
396- "Content-Type":
397- "application/"
398- "vnd.docker.distribution.manifest.v2+json"
399- },
400- method="PUT")
401- except HTTPError as http_error:
402- manifest_response = http_error.response
403- if manifest_response.status_code != 201:
404- raise cls._makeRegistryError(
405- ManifestUploadFailed,
406- "Failed to upload manifest for {} ({}) in {}".format(
407- build.recipe.name, push_rule.registry_url, build.id),
408- manifest_response)
409+ manifest = cls._uploadRegistryManifest(
410+ http_client, registry_manifest, push_rule, build)
411+
412+ # Save the uploaded manifest location, so we can use it in case
413+ # this is a multi-arch image upload.
414+ if build.build_request:
415+ build.build_request.addUploadedManifest(build.id, manifest)
416
417 @classmethod
418 def upload(cls, build):
419@@ -318,6 +349,43 @@ class OCIRegistryClient:
420 elif len(exceptions) > 1:
421 raise MultipleOCIRegistryError(exceptions)
422
423+ @classmethod
424+ def makeMultiArchManifest(cls, build_request):
425+ """Returns the multi-arch manifest content including all builds of
426+ the given build_request.
427+ """
428+ manifests = []
429+ for build in build_request.builds:
430+ build_manifest = build_request.uploaded_manifests.get(build.id)
431+ if not build_manifest:
432+ continue
433+ digest = build_manifest["digest"]
434+ size = build_manifest["size"]
435+ arch = build.processor.name
436+ manifests.append({
437+ "mediaType": ("application/"
438+ "vnd.docker.distribution.manifest.v2+json"),
439+ "size": size,
440+ "digest": digest,
441+ "platform": {"architecture": arch, "os": "linux"}
442+ })
443+ return {
444+ "schemaVersion": 2,
445+ "mediaType": ("application/"
446+ "vnd.docker.distribution.manifest.list.v2+json"),
447+ "manifests": manifests}
448+
449+ @classmethod
450+ def uploadManifestList(cls, build_request):
451+ """Uploads to all build_request.recipe.push_rules the manifest list
452+ for the builds in the given build_request.
453+ """
454+ multi_manifest_content = cls.makeMultiArchManifest(build_request)
455+ for push_rule in build_request.recipe.push_rules:
456+ http_client = RegistryHTTPClient.getInstance(push_rule)
457+ cls._uploadRegistryManifest(
458+ http_client, multi_manifest_content, push_rule, build=None)
459+
460
461 class OCIRegistryAuthenticationError(Exception):
462 def __init__(self, msg, http_error=None):
463@@ -436,8 +504,8 @@ class BearerTokenRegistryClient(RegistryHTTPClient):
464
465 :param auth_retry: Should we authenticate and retry the request if
466 it fails with HTTP 401 code?"""
467+ headers = request_kwargs.pop("headers", {})
468 try:
469- headers = request_kwargs.pop("headers", {})
470 if self.auth_token is not None:
471 headers["Authorization"] = "Bearer %s" % self.auth_token
472 return proxy_urlfetch(url, headers=headers, **request_kwargs)
473@@ -445,5 +513,6 @@ class BearerTokenRegistryClient(RegistryHTTPClient):
474 if auth_retry and e.response.status_code == 401:
475 self.authenticate(e.response)
476 return self.request(
477- url, auth_retry=False, *args, **request_kwargs)
478+ url, auth_retry=False, headers=headers,
479+ *args, **request_kwargs)
480 raise
481diff --git a/lib/lp/oci/tests/test_ocirecipebuildjob.py b/lib/lp/oci/tests/test_ocirecipebuildjob.py
482index fd3e0ff..7b66c1d 100644
483--- a/lib/lp/oci/tests/test_ocirecipebuildjob.py
484+++ b/lib/lp/oci/tests/test_ocirecipebuildjob.py
485@@ -7,6 +7,13 @@ from __future__ import absolute_import, print_function, unicode_literals
486
487 __metaclass__ = type
488
489+from datetime import (
490+ datetime,
491+ timedelta,
492+ )
493+import threading
494+import time
495+
496 from fixtures import FakeLogger
497 from testtools.matchers import (
498 Equals,
499@@ -14,11 +21,15 @@ from testtools.matchers import (
500 MatchesDict,
501 MatchesListwise,
502 MatchesStructure,
503+ Not,
504 )
505 import transaction
506+from zope.component import getUtility
507 from zope.interface import implementer
508+from zope.security.proxy import removeSecurityProxy
509
510 from lp.buildmaster.enums import BuildStatus
511+from lp.buildmaster.interfaces.processor import IProcessorSet
512 from lp.oci.interfaces.ocirecipe import (
513 OCI_RECIPE_ALLOW_CREATE,
514 OCI_RECIPE_WEBHOOKS_FEATURE_FLAG,
515@@ -27,6 +38,7 @@ from lp.oci.interfaces.ocirecipebuildjob import (
516 IOCIRecipeBuildJob,
517 IOCIRegistryUploadJob,
518 )
519+from lp.oci.interfaces.ocirecipejob import IOCIRecipeRequestBuildsJobSource
520 from lp.oci.interfaces.ociregistryclient import (
521 BlobUploadFailed,
522 IOCIRegistryClient,
523@@ -37,12 +49,17 @@ from lp.oci.model.ocirecipebuildjob import (
524 OCIRecipeBuildJobType,
525 OCIRegistryUploadJob,
526 )
527+from lp.services.compat import mock
528 from lp.services.config import config
529 from lp.services.features.testing import FeatureFixture
530+from lp.services.job.interfaces.job import JobStatus
531 from lp.services.job.runner import JobRunner
532 from lp.services.webapp import canonical_url
533 from lp.services.webhooks.testing import LogsScheduledWebhooks
534-from lp.testing import TestCaseWithFactory
535+from lp.testing import (
536+ admin_logged_in,
537+ TestCaseWithFactory,
538+ )
539 from lp.testing.dbuser import dbuser
540 from lp.testing.fakemethod import FakeMethod
541 from lp.testing.fixture import ZopeUtilityFixture
542@@ -69,6 +86,7 @@ class FakeRegistryClient:
543
544 def __init__(self):
545 self.upload = FakeMethod()
546+ self.uploadManifestList = FakeMethod()
547
548
549 class FakeOCIBuildJob(OCIRecipeBuildJobDerived):
550@@ -122,22 +140,28 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory):
551 ocibuild = self.factory.makeOCIRecipeBuild(
552 builder=self.factory.makeBuilder(), **kwargs)
553 ocibuild.updateStatus(BuildStatus.FULLYBUILT)
554- self.factory.makeWebhook(
555- target=ocibuild.recipe, event_types=["oci-recipe:build:0.1"])
556+ self.makeWebhook(ocibuild.recipe)
557 return ocibuild
558
559+ def makeWebhook(self, recipe):
560+ self.factory.makeWebhook(
561+ target=recipe, event_types=["oci-recipe:build:0.1"])
562+
563 def assertWebhookDeliveries(self, ocibuild,
564 expected_registry_upload_statuses, logger):
565 hook = ocibuild.recipe.webhooks.one()
566 deliveries = list(hook.deliveries)
567 deliveries.reverse()
568+ build_req_url = (
569+ None if ocibuild.build_request is None
570+ else canonical_url(ocibuild.build_request, force_local_path=True))
571 expected_payloads = [{
572 "recipe_build": Equals(
573 canonical_url(ocibuild, force_local_path=True)),
574 "action": Equals("status-changed"),
575 "recipe": Equals(
576 canonical_url(ocibuild.recipe, force_local_path=True)),
577- "build_request": Is(None),
578+ "build_request": Equals(build_req_url),
579 "status": Equals("Successfully built"),
580 "registry_upload_status": Equals(expected),
581 } for expected in expected_registry_upload_statuses]
582@@ -165,22 +189,242 @@ class TestOCIRegistryUploadJob(TestCaseWithFactory):
583 job = OCIRegistryUploadJob.create(ocibuild)
584 self.assertProvides(job, IOCIRegistryUploadJob)
585
586+ def makeRecipe(self, include_i386=True, include_amd64=True):
587+ i386 = getUtility(IProcessorSet).getByName("386")
588+ amd64 = getUtility(IProcessorSet).getByName("amd64")
589+ recipe = self.factory.makeOCIRecipe()
590+ distroseries = self.factory.makeDistroSeries(
591+ distribution=recipe.oci_project.distribution)
592+ distro_i386 = self.factory.makeDistroArchSeries(
593+ distroseries=distroseries, architecturetag="i386",
594+ processor=i386)
595+ distro_i386.addOrUpdateChroot(self.factory.makeLibraryFileAlias())
596+ distro_amd64 = self.factory.makeDistroArchSeries(
597+ distroseries=distroseries, architecturetag="amd64",
598+ processor=amd64)
599+ distro_amd64.addOrUpdateChroot(self.factory.makeLibraryFileAlias())
600+
601+ archs = []
602+ if include_i386:
603+ archs.append(i386)
604+ if include_amd64:
605+ archs.append(amd64)
606+ recipe.setProcessors(archs)
607+ return recipe
608+
609+ def makeBuildRequest(self, include_i386=True, include_amd64=True):
610+ recipe = self.makeRecipe(include_i386, include_amd64)
611+ # Creates a build request with a build in it.
612+ build_request = recipe.requestBuilds(recipe.owner)
613+ with admin_logged_in():
614+ jobs = getUtility(IOCIRecipeRequestBuildsJobSource).iterReady()
615+ with dbuser(config.IOCIRecipeRequestBuildsJobSource.dbuser):
616+ JobRunner(jobs).runAll()
617+ return build_request
618+
619 def test_run(self):
620 logger = self.useFixture(FakeLogger())
621- ocibuild = self.makeOCIRecipeBuild()
622+ build_request = self.makeBuildRequest(include_i386=False)
623+ recipe = build_request.recipe
624+
625+ self.assertEqual(1, build_request.builds.count())
626+ ocibuild = build_request.builds[0]
627+ ocibuild.updateStatus(BuildStatus.FULLYBUILT)
628+ self.makeWebhook(recipe)
629+
630 self.assertContentEqual([], ocibuild.registry_upload_jobs)
631 job = OCIRegistryUploadJob.create(ocibuild)
632 client = FakeRegistryClient()
633 self.useFixture(ZopeUtilityFixture(client, IOCIRegistryClient))
634 with dbuser(config.IOCIRegistryUploadJobSource.dbuser):
635 run_isolated_jobs([job])
636+
637 self.assertEqual([((ocibuild,), {})], client.upload.calls)
638+ self.assertEqual([((build_request,), {})],
639+ client.uploadManifestList.calls)
640 self.assertContentEqual([job], ocibuild.registry_upload_jobs)
641 self.assertIsNone(job.error_summary)
642 self.assertIsNone(job.errors)
643 self.assertEqual([], pop_notifications())
644 self.assertWebhookDeliveries(ocibuild, ["Pending", "Uploaded"], logger)
645
646+ def test_run_multiple_architectures(self):
647+ build_request = self.makeBuildRequest()
648+ builds = build_request.builds
649+ self.assertEqual(2, builds.count())
650+ self.assertEqual(builds[0].build_request, builds[1].build_request)
651+
652+ upload_jobs = []
653+ for build in builds:
654+ self.assertContentEqual([], build.registry_upload_jobs)
655+ upload_jobs.append(OCIRegistryUploadJob.create(build))
656+
657+ client = FakeRegistryClient()
658+ self.useFixture(ZopeUtilityFixture(client, IOCIRegistryClient))
659+
660+ with dbuser(config.IOCIRegistryUploadJobSource.dbuser):
661+ JobRunner([upload_jobs[0]]).runAll()
662+ self.assertEqual([((builds[0],), {})], client.upload.calls)
663+ self.assertEqual([], client.uploadManifestList.calls)
664+
665+ with dbuser(config.IOCIRegistryUploadJobSource.dbuser):
666+ JobRunner([upload_jobs[1]]).runAll()
667+ self.assertEqual(
668+ [((builds[0],), {}), ((builds[1],), {})], client.upload.calls)
669+ self.assertEqual([((builds[1].build_request, ), {})],
670+ client.uploadManifestList.calls)
671+
672+ def test_failing_upload_does_not_retries_automatically(self):
673+ build_request = self.makeBuildRequest(include_i386=False)
674+ builds = build_request.builds
675+ self.assertEqual(1, builds.count())
676+
677+ build = builds.one()
678+ self.assertContentEqual([], build.registry_upload_jobs)
679+ upload_job = OCIRegistryUploadJob.create(build)
680+
681+ client = mock.Mock()
682+ client.upload.side_effect = Exception("Nope! Error.")
683+ self.useFixture(ZopeUtilityFixture(client, IOCIRegistryClient))
684+
685+ with dbuser(config.IOCIRegistryUploadJobSource.dbuser):
686+ JobRunner([upload_job]).runAll()
687+ self.assertEqual(1, client.upload.call_count)
688+ self.assertEqual(0, client.uploadManifestList.call_count)
689+ self.assertEqual(JobStatus.FAILED, upload_job.status)
690+ self.assertFalse(upload_job.build_uploaded)
691+
692+ def test_failing_upload_manifest_list_retries(self):
693+ build_request = self.makeBuildRequest(include_i386=False)
694+ builds = build_request.builds
695+ self.assertEqual(1, builds.count())
696+
697+ build = builds.one()
698+ self.assertContentEqual([], build.registry_upload_jobs)
699+ upload_job = OCIRegistryUploadJob.create(build)
700+
701+ client = mock.Mock()
702+ client.uploadManifestList.side_effect = (
703+ OCIRegistryUploadJob.ManifestListUploadError("Nope! Error."))
704+ self.useFixture(ZopeUtilityFixture(client, IOCIRegistryClient))
705+
706+ with dbuser(config.IOCIRegistryUploadJobSource.dbuser):
707+ JobRunner([upload_job]).runAll()
708+ self.assertEqual(1, client.upload.call_count)
709+ self.assertEqual(1, client.uploadManifestList.call_count)
710+ self.assertEqual(JobStatus.WAITING, upload_job.status)
711+ self.assertTrue(upload_job.is_pending)
712+ self.assertTrue(upload_job.build_uploaded)
713+
714+ # Retry should skip client.upload and only run
715+ # client.uploadManifestList:
716+ client.uploadManifestList.side_effect = None
717+ with dbuser(config.IOCIRegistryUploadJobSource.dbuser):
718+ JobRunner([upload_job]).runAll()
719+ self.assertEqual(1, client.upload.call_count)
720+ self.assertEqual(2, client.uploadManifestList.call_count)
721+ self.assertEqual(JobStatus.COMPLETED, upload_job.status)
722+ self.assertTrue(upload_job.build_uploaded)
723+
724+ def test_allBuildsUploaded_lock_between_two_jobs(self):
725+ """Simple test to ensure that allBuildsUploaded method locks
726+ rows in the database and make concurrent calls wait for that.
727+
728+ This is not a 100% reliable way to check that concurrent calls to
729+ allBuildsUploaded will queue up since it relies on the
730+ execution time, but it's a "good enough" approach: this test might
731+ pass if the machine running it is *really, really* slow, but a failure
732+ here will indicate that something is for sure wrong.
733+ """
734+
735+ class AllBuildsUploadedChecker(threading.Thread):
736+ """Thread to run upload_job.allBuildsUploaded tracking the time."""
737+ def __init__(self, build_request):
738+ super(AllBuildsUploadedChecker, self).__init__()
739+ self.build_request = build_request
740+ self.upload_job = None
741+ # Locks the measurement start until we finished running the
742+ # bootstrap code. Parent thread should call waitBootstrap
743+ # after self.start().
744+ self.bootstrap_lock = threading.Lock()
745+ self.bootstrap_lock.acquire()
746+ self.result = None
747+ self.error = None
748+ self.start_date = None
749+ self.end_date = None
750+
751+ @property
752+ def lock_duration(self):
753+ return self.end_date - self.start_date
754+
755+ def waitBootstrap(self):
756+ """Wait until self.bootstrap finishes running."""
757+ self.bootstrap_lock.acquire()
758+ # We don't actually need the lock... just wanted to wait
759+ # for it. let's release it then.
760+ self.bootstrap_lock.release()
761+
762+ def bootstrap(self):
763+ try:
764+ build = self.build_request.builds[1]
765+ self.upload_job = OCIRegistryUploadJob.create(build)
766+ finally:
767+ self.bootstrap_lock.release()
768+
769+ def run(self):
770+ with admin_logged_in():
771+ self.bootstrap()
772+ self.start_date = datetime.now()
773+ try:
774+ self.result = self.upload_job.allBuildsUploaded(
775+ self.build_request)
776+ except Exception as e:
777+ self.error = e
778+ self.end_date = datetime.now()
779+
780+ # Create a build request with 2 builds.
781+ build_request = self.makeBuildRequest()
782+ builds = build_request.builds
783+ self.assertEqual(2, builds.count())
784+
785+ # Create the upload job for the first build.
786+ upload_job1 = OCIRegistryUploadJob.create(builds[0])
787+ upload_job1 = removeSecurityProxy(upload_job1)
788+
789+ # How long the lock will be held by the first job, in seconds.
790+ # Adjust to minimize false positives: a number too small here might
791+ # make the test pass even if the lock is not correctly implemented.
792+ # A number too big will slow down the test execution...
793+ waiting_time = 2
794+ # Start a clean transaction and lock the rows at database level.
795+ transaction.commit()
796+ self.assertFalse(upload_job1.allBuildsUploaded(build_request))
797+
798+ # Start, in parallel, another upload job to run `allBuildsUploaded`.
799+ concurrent_checker = AllBuildsUploadedChecker(build_request)
800+ concurrent_checker.start()
801+ # Wait until concurrent_checker is ready to measure the time waiting
802+ # for the database lock.
803+ concurrent_checker.waitBootstrap()
804+
805+ # Wait a bit and release the database lock by committing current
806+ # transaction.
807+ time.sleep(waiting_time)
808+ # Let's force the first job to be finished, just to make sure the
809+ # second job will realise it's the last one running.
810+ upload_job1.start()
811+ upload_job1.complete()
812+ transaction.commit()
813+
814+ # Now, the concurrent checker should have already finished running,
815+ # without any error and it should have taken at least the
816+ # waiting_time to finish running (since it was waiting).
817+ concurrent_checker.join()
818+ self.assertIsNone(concurrent_checker.error)
819+ self.assertTrue(concurrent_checker.result)
820+ self.assertGreaterEqual(
821+ concurrent_checker.lock_duration, timedelta(seconds=waiting_time))
822+
823 def test_run_failed_registry_error(self):
824 # A run that fails with a registry error sets the registry upload
825 # status to FAILED, and stores the detailed errors.
826diff --git a/lib/lp/oci/tests/test_ociregistryclient.py b/lib/lp/oci/tests/test_ociregistryclient.py
827index 3c5aa8c..8de88b4 100644
828--- a/lib/lp/oci/tests/test_ociregistryclient.py
829+++ b/lib/lp/oci/tests/test_ociregistryclient.py
830@@ -11,6 +11,7 @@ from functools import partial
831 import io
832 import json
833 import os
834+import re
835 import tarfile
836 import uuid
837
838@@ -33,13 +34,17 @@ from testtools.matchers import (
839 Raises,
840 )
841 import transaction
842+from zope.component import getUtility
843 from zope.security.proxy import removeSecurityProxy
844
845+from lp.buildmaster.interfaces.processor import IProcessorSet
846+from lp.oci.interfaces.ocirecipejob import IOCIRecipeRequestBuildsJobSource
847 from lp.oci.interfaces.ociregistryclient import (
848 BlobUploadFailed,
849 ManifestUploadFailed,
850 MultipleOCIRegistryError,
851 )
852+from lp.oci.model.ocirecipe import OCIRecipeBuildRequest
853 from lp.oci.model.ociregistryclient import (
854 BearerTokenRegistryClient,
855 OCIRegistryAuthenticationError,
856@@ -50,6 +55,7 @@ from lp.oci.model.ociregistryclient import (
857 from lp.oci.tests.helpers import OCIConfigHelperMixin
858 from lp.services.compat import mock
859 from lp.testing import TestCaseWithFactory
860+from lp.testing.fixture import ZopeUtilityFixture
861 from lp.testing.layers import (
862 DatabaseFunctionalLayer,
863 LaunchpadZopelessLayer,
864@@ -142,6 +148,23 @@ class TestOCIRegistryClient(OCIConfigHelperMixin, SpyProxyCallsMixin,
865
866 transaction.commit()
867
868+ def addManifestResponses(self, push_rule, status_code=201, json=None):
869+ """Add responses for manifest upload URLs."""
870+ # PUT to "anonymous" architecture-specific manifest.
871+ manifests_url = "{}/v2/{}/manifests/sha256:.*".format(
872+ push_rule.registry_credentials.url,
873+ push_rule.image_name
874+ )
875+ responses.add(
876+ "PUT", re.compile(manifests_url), status=status_code, json=json)
877+
878+ # PUT to tagged multi-arch manifest.
879+ manifests_url = "{}/v2/{}/manifests/edge".format(
880+ push_rule.registry_credentials.url,
881+ push_rule.image_name
882+ )
883+ responses.add("PUT", manifests_url, status=status_code, json=json)
884+
885 @responses.activate
886 def test_upload(self):
887 self._makeFiles()
888@@ -154,11 +177,7 @@ class TestOCIRegistryClient(OCIConfigHelperMixin, SpyProxyCallsMixin,
889 push_rule = self.build.recipe.push_rules[0]
890 responses.add("GET", "%s/v2/" % push_rule.registry_url, status=200)
891
892- manifests_url = "{}/v2/{}/manifests/edge".format(
893- push_rule.registry_credentials.url,
894- push_rule.image_name
895- )
896- responses.add("PUT", manifests_url, status=201)
897+ self.addManifestResponses(push_rule)
898
899 self.client.upload(self.build)
900
901@@ -196,7 +215,8 @@ class TestOCIRegistryClient(OCIConfigHelperMixin, SpyProxyCallsMixin,
902 _upload_fixture = self.useFixture(MockPatch(
903 "lp.oci.model.ociregistryclient.OCIRegistryClient._upload"))
904 self.useFixture(MockPatch(
905- "lp.oci.model.ociregistryclient.OCIRegistryClient._upload_layer"))
906+ "lp.oci.model.ociregistryclient.OCIRegistryClient._upload_layer",
907+ return_value=999))
908
909 self.push_rule.registry_credentials.setCredentials({
910 "username": "test-username",
911@@ -206,9 +226,7 @@ class TestOCIRegistryClient(OCIConfigHelperMixin, SpyProxyCallsMixin,
912 push_rule = self.build.recipe.push_rules[0]
913 responses.add("GET", "%s/v2/" % push_rule.registry_url, status=200)
914
915- manifests_url = "{}/v2/{}/manifests/edge".format(
916- push_rule.registry_credentials.url, push_rule.image_name)
917- responses.add("PUT", manifests_url, status=201)
918+ self.addManifestResponses(push_rule)
919
920 self.client.upload(self.build)
921
922@@ -222,7 +240,8 @@ class TestOCIRegistryClient(OCIConfigHelperMixin, SpyProxyCallsMixin,
923 upload_fixture = self.useFixture(MockPatch(
924 "lp.oci.model.ociregistryclient.OCIRegistryClient._upload"))
925 self.useFixture(MockPatch(
926- "lp.oci.model.ociregistryclient.OCIRegistryClient._upload_layer"))
927+ "lp.oci.model.ociregistryclient.OCIRegistryClient._upload_layer",
928+ return_value=999))
929
930 push_rules = [
931 self.push_rule,
932@@ -237,10 +256,8 @@ class TestOCIRegistryClient(OCIConfigHelperMixin, SpyProxyCallsMixin,
933 responses.add(
934 "GET", "%s/v2/" % push_rule.registry_url, status=200)
935
936- manifests_url = "{}/v2/{}/manifests/edge".format(
937- push_rule.registry_credentials.url, push_rule.image_name)
938 status = 400 if i < 2 else 201
939- responses.add("PUT", manifests_url, status=status)
940+ self.addManifestResponses(push_rule, status_code=status)
941
942 error = self.assertRaises(
943 MultipleOCIRegistryError, self.client.upload, self.build)
944@@ -466,15 +483,13 @@ class TestOCIRegistryClient(OCIConfigHelperMixin, SpyProxyCallsMixin,
945 self.useFixture(MockPatch(
946 "lp.oci.model.ociregistryclient.OCIRegistryClient._upload"))
947 self.useFixture(MockPatch(
948- "lp.oci.model.ociregistryclient.OCIRegistryClient._upload_layer"))
949+ "lp.oci.model.ociregistryclient.OCIRegistryClient._upload_layer",
950+ return_value=999))
951
952 push_rule = self.build.recipe.push_rules[0]
953 responses.add(
954 "GET", "{}/v2/".format(push_rule.registry_url), status=200)
955
956- manifests_url = "{}/v2/{}/manifests/edge".format(
957- push_rule.registry_credentials.url,
958- push_rule.image_name)
959 put_errors = [
960 {
961 "code": "MANIFEST_INVALID",
962@@ -482,8 +497,8 @@ class TestOCIRegistryClient(OCIConfigHelperMixin, SpyProxyCallsMixin,
963 "detail": [],
964 },
965 ]
966- responses.add(
967- "PUT", manifests_url, status=400, json={"errors": put_errors})
968+ self.addManifestResponses(
969+ push_rule, status_code=400, json={"errors": put_errors})
970
971 expected_msg = "Failed to upload manifest for {} ({}) in {}".format(
972 self.build.recipe.name, self.push_rule.registry_url, self.build.id)
973@@ -503,16 +518,14 @@ class TestOCIRegistryClient(OCIConfigHelperMixin, SpyProxyCallsMixin,
974 self.useFixture(MockPatch(
975 "lp.oci.model.ociregistryclient.OCIRegistryClient._upload"))
976 self.useFixture(MockPatch(
977- "lp.oci.model.ociregistryclient.OCIRegistryClient._upload_layer"))
978+ "lp.oci.model.ociregistryclient.OCIRegistryClient._upload_layer",
979+ return_value=999))
980
981 push_rule = self.build.recipe.push_rules[0]
982 responses.add(
983 "GET", "{}/v2/".format(push_rule.registry_url), status=200)
984
985- manifests_url = "{}/v2/{}/manifests/edge".format(
986- push_rule.registry_credentials.url,
987- push_rule.image_name)
988- responses.add("PUT", manifests_url, status=200)
989+ self.addManifestResponses(push_rule, status_code=200)
990
991 expected_msg = "Failed to upload manifest for {} ({}) in {}".format(
992 self.build.recipe.name, self.push_rule.registry_url, self.build.id)
993@@ -526,6 +539,64 @@ class TestOCIRegistryClient(OCIConfigHelperMixin, SpyProxyCallsMixin,
994 Equals(expected_msg)),
995 MatchesStructure(errors=Is(None))))))
996
997+ @responses.activate
998+ def test_multi_arch_manifest_upload(self):
999+ """Ensure that multi-arch manifest upload works and tags correctly
1000+ the uploaded image."""
1001+ # Creates a build request with 2 builds.
1002+ recipe = self.build.recipe
1003+ build1 = self.build
1004+ build2 = self.factory.makeOCIRecipeBuild(
1005+ recipe=recipe)
1006+ naked_build1 = removeSecurityProxy(build1)
1007+ naked_build2 = removeSecurityProxy(build1)
1008+ naked_build1.processor = getUtility(IProcessorSet).getByName('386')
1009+ naked_build2.processor = getUtility(IProcessorSet).getByName('amd64')
1010+
1011+ # Creates a mock IOCIRecipeRequestBuildsJobSource, as it was created
1012+ # by the celery job and triggered the 2 registry uploads already.
1013+ job = mock.Mock()
1014+ job.builds = [build1, build2]
1015+ job.uploaded_manifests = {
1016+ build1.id: {"digest": "build1digest", "size": 123},
1017+ build2.id: {"digest": "build2digest", "size": 321},
1018+ }
1019+ job_source = mock.Mock()
1020+ job_source.getByOCIRecipeAndID.return_value = job
1021+ self.useFixture(
1022+ ZopeUtilityFixture(job_source, IOCIRecipeRequestBuildsJobSource))
1023+ build_request = OCIRecipeBuildRequest(recipe, -1)
1024+
1025+ push_rule = self.build.recipe.push_rules[0]
1026+ responses.add(
1027+ "GET", "{}/v2/".format(push_rule.registry_url), status=200)
1028+ self.addManifestResponses(push_rule, status_code=201)
1029+
1030+ self.client.uploadManifestList(build_request)
1031+ self.assertEqual(2, len(responses.calls))
1032+ auth_call, manifest_call = responses.calls
1033+ self.assertEndsWith(
1034+ manifest_call.request.url,
1035+ "/v2/%s/manifests/edge" % push_rule.image_name)
1036+ self.assertEqual({
1037+ "schemaVersion": 2,
1038+ "mediaType": "application/"
1039+ "vnd.docker.distribution.manifest.list.v2+json",
1040+ "manifests": [{
1041+ "platform": {"os": "linux", "architecture": "amd64"},
1042+ "mediaType": "application/"
1043+ "vnd.docker.distribution.manifest.v2+json",
1044+ "digest": "build1digest",
1045+ "size": 123
1046+ }, {
1047+ "platform": {"os": "linux", "architecture": "386"},
1048+ "mediaType": "application/"
1049+ "vnd.docker.distribution.manifest.v2+json",
1050+ "digest": "build2digest",
1051+ "size": 321
1052+ }]
1053+ }, json.loads(manifest_call.request.body))
1054+
1055
1056 class TestRegistryHTTPClient(OCIConfigHelperMixin, SpyProxyCallsMixin,
1057 TestCaseWithFactory):