Merge ~jfguedez/charm-ubuntu-advantage:bug/1962335 into charm-ubuntu-advantage:master

Proposed by Jose Guedez
Status: Merged
Approved by: Tom Haddon
Approved revision: 08fa05cc05ecd33ea145f6c453ea19a3ace7ce51
Merged at revision: 05a3802c0fa7c349544c5d29fe11175eb694ac65
Proposed branch: ~jfguedez/charm-ubuntu-advantage:bug/1962335
Merge into: charm-ubuntu-advantage:master
Diff against target: 1548 lines (+1363/-45)
4 files modified
lib/charms/operator_libs_linux/v0/apt.py (+1329/-0)
run_tests (+4/-3)
src/charm.py (+3/-13)
tests/test_charm.py (+27/-29)
Reviewer Review Type Date Requested Status
Tom Haddon Approve
Canonical IS Reviewers Pending
Review via email: mp+416293@code.launchpad.net

Commit message

Use the operator apt library to install/remove packages

To post a comment you must log in.
Revision history for this message
🤖 Canonical IS Merge Bot (canonical-is-mergebot) wrote :

This merge proposal is being monitored by mergebot. Change the status to Approved to merge.

Revision history for this message
Jose Guedez (jfguedez) wrote :

Updated just to reflect that the operator apt library fixes [0] have been released (only the library patch version was updated)

[0] https://github.com/canonical/operator-libs-linux/pull/41

Revision history for this message
Tom Haddon (mthaddon) wrote :

LGTM, thx

review: Approve
Revision history for this message
🤖 Canonical IS Merge Bot (canonical-is-mergebot) wrote :

Change successfully merged at revision 05a3802c0fa7c349544c5d29fe11175eb694ac65

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
diff --git a/lib/charms/operator_libs_linux/v0/apt.py b/lib/charms/operator_libs_linux/v0/apt.py
0new file mode 1006440new file mode 100644
index 0000000..2b5c8f2
--- /dev/null
+++ b/lib/charms/operator_libs_linux/v0/apt.py
@@ -0,0 +1,1329 @@
1# Copyright 2021 Canonical Ltd.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""Abstractions for the system's Debian/Ubuntu package information and repositories.
16
17This module contains abstractions and wrappers around Debian/Ubuntu-style repositories and
18packages, in order to easily provide an idiomatic and Pythonic mechanism for adding packages and/or
19repositories to systems for use in machine charms.
20
21A sane default configuration is attainable through nothing more than instantiation of the
22appropriate classes. `DebianPackage` objects provide information about the architecture, version,
23name, and status of a package.
24
25`DebianPackage` will try to look up a package either from `dpkg -L` or from `apt-cache` when
26provided with a string indicating the package name. If it cannot be located, `PackageNotFoundError`
27will be returned, as `apt` and `dpkg` otherwise return `100` for all errors, and a meaningful error
28message if the package is not known is desirable.
29
30To install packages with convenience methods:
31
32```python
33try:
34 # Run `apt-get update`
35 apt.update()
36 apt.add_package("zsh")
37 apt.add_package(["vim", "htop", "wget"])
38except PackageNotFoundError:
39 logger.error("a specified package not found in package cache or on system")
40except PackageError as e:
41 logger.error("could not install package. Reason: %s", e.message)
42````
43
44To find details of a specific package:
45
46```python
47try:
48 vim = apt.DebianPackage.from_system("vim")
49
50 # To find from the apt cache only
51 # apt.DebianPackage.from_apt_cache("vim")
52
53 # To find from installed packages only
54 # apt.DebianPackage.from_installed_package("vim")
55
56 vim.ensure(PackageState.Latest)
57 logger.info("updated vim to version: %s", vim.fullversion)
58except PackageNotFoundError:
59 logger.error("a specified package not found in package cache or on system")
60except PackageError as e:
61 logger.error("could not install package. Reason: %s", e.message)
62```
63
64
65`RepositoryMapping` will return a dict-like object containing enabled system repositories
66and their properties (available groups, baseuri. gpg key). This class can add, disable, or
67manipulate repositories. Items can be retrieved as `DebianRepository` objects.
68
69In order add a new repository with explicit details for fields, a new `DebianRepository` can
70be added to `RepositoryMapping`
71
72`RepositoryMapping` provides an abstraction around the existing repositories on the system,
73and can be accessed and iterated over like any `Mapping` object, to retrieve values by key,
74iterate, or perform other operations.
75
76Keys are constructed as `{repo_type}-{}-{release}` in order to uniquely identify a repository.
77
78Repositories can be added with explicit values through a Python constructor.
79
80Example:
81
82```python
83repositories = apt.RepositoryMapping()
84
85if "deb-example.com-focal" not in repositories:
86 repositories.add(DebianRepository(enabled=True, repotype="deb",
87 uri="https://example.com", release="focal", groups=["universe"]))
88```
89
90Alternatively, any valid `sources.list` line may be used to construct a new
91`DebianRepository`.
92
93Example:
94
95```python
96repositories = apt.RepositoryMapping()
97
98if "deb-us.archive.ubuntu.com-xenial" not in repositories:
99 line = "deb http://us.archive.ubuntu.com/ubuntu xenial main restricted"
100 repo = DebianRepository.from_repo_line(line)
101 repositories.add(repo)
102```
103"""
104
105import fileinput
106import glob
107import logging
108import os
109import re
110import subprocess
111from collections.abc import Mapping
112from enum import Enum
113from subprocess import PIPE, CalledProcessError, check_call, check_output
114from typing import Iterable, List, Optional, Tuple, Union
115from urllib.parse import urlparse
116
117logger = logging.getLogger(__name__)
118
119# The unique Charmhub library identifier, never change it
120LIBID = "7c3dbc9c2ad44a47bd6fcb25caa270e5"
121
122# Increment this major API version when introducing breaking changes
123LIBAPI = 0
124
125# Increment this PATCH version before using `charmcraft publish-lib` or reset
126# to 0 if you are raising the major API version
127LIBPATCH = 7
128
129
130VALID_SOURCE_TYPES = ("deb", "deb-src")
131OPTIONS_MATCHER = re.compile(r"\[.*?\]")
132
133
134class Error(Exception):
135 """Base class of most errors raised by this library."""
136
137 def __repr__(self):
138 """String representation of Error."""
139 return "<{}.{} {}>".format(type(self).__module__, type(self).__name__, self.args)
140
141 @property
142 def name(self):
143 """Return a string representation of the model plus class."""
144 return "<{}.{}>".format(type(self).__module__, type(self).__name__)
145
146 @property
147 def message(self):
148 """Return the message passed as an argument."""
149 return self.args[0]
150
151
152class PackageError(Error):
153 """Raised when there's an error installing or removing a package."""
154
155
156class PackageNotFoundError(Error):
157 """Raised when a requested package is not known to the system."""
158
159
160class PackageState(Enum):
161 """A class to represent possible package states."""
162
163 Present = "present"
164 Absent = "absent"
165 Latest = "latest"
166 Available = "available"
167
168
169class DebianPackage:
170 """Represents a traditional Debian package and its utility functions.
171
172 `DebianPackage` wraps information and functionality around a known package, whether installed
173 or available. The version, epoch, name, and architecture can be easily queried and compared
174 against other `DebianPackage` objects to determine the latest version or to install a specific
175 version.
176
177 The representation of this object as a string mimics the output from `dpkg` for familiarity.
178
179 Installation and removal of packages is handled through the `state` property or `ensure`
180 method, with the following options:
181
182 apt.PackageState.Absent
183 apt.PackageState.Available
184 apt.PackageState.Present
185 apt.PackageState.Latest
186
187 When `DebianPackage` is initialized, the state of a given `DebianPackage` object will be set to
188 `Available`, `Present`, or `Latest`, with `Absent` implemented as a convenience for removal
189 (though it operates essentially the same as `Available`).
190 """
191
192 def __init__(
193 self, name: str, version: str, epoch: str, arch: str, state: PackageState
194 ) -> None:
195 self._name = name
196 self._arch = arch
197 self._state = state
198 self._version = Version(version, epoch)
199
200 def __eq__(self, other) -> bool:
201 """Equality for comparison.
202
203 Args:
204 other: a `DebianPackage` object for comparison
205
206 Returns:
207 A boolean reflecting equality
208 """
209 return isinstance(other, self.__class__) and (
210 self._name,
211 self._version.number,
212 ) == (other._name, other._version.number)
213
214 def __hash__(self):
215 """A basic hash so this class can be used in Mappings and dicts."""
216 return hash((self._name, self._version.number))
217
218 def __repr__(self):
219 """A representation of the package."""
220 return "<{}.{}: {}>".format(self.__module__, self.__class__.__name__, self.__dict__)
221
222 def __str__(self):
223 """A human-readable representation of the package."""
224 return "<{}: {}-{}.{} -- {}>".format(
225 self.__class__.__name__,
226 self._name,
227 self._version,
228 self._arch,
229 str(self._state),
230 )
231
232 @staticmethod
233 def _apt(
234 command: str,
235 package_names: Union[str, List],
236 optargs: Optional[List[str]] = None,
237 ) -> None:
238 """Wrap package management commands for Debian/Ubuntu systems.
239
240 Args:
241 command: the command given to `apt-get`
242 package_names: a package name or list of package names to operate on
243 optargs: an (Optional) list of additioanl arguments
244
245 Raises:
246 PackageError if an error is encountered
247 """
248 optargs = optargs if optargs is not None else []
249 if isinstance(package_names, str):
250 package_names = [package_names]
251 _cmd = ["apt-get", "-y", *optargs, command, *package_names]
252 try:
253 check_call(_cmd, stderr=PIPE, stdout=PIPE)
254 except CalledProcessError as e:
255 raise PackageError(
256 "Could not {} package(s) [{}]: {}".format(command, [*package_names], e.output)
257 ) from None
258
259 def _add(self) -> None:
260 """Add a package to the system."""
261 self._apt(
262 "install",
263 "{}={}".format(self.name, self.version),
264 optargs=["--option=Dpkg::Options::=--force-confold"],
265 )
266
267 def _remove(self) -> None:
268 """Removes a package from the system. Implementation-specific."""
269 return self._apt("remove", "{}={}".format(self.name, self.version))
270
271 @property
272 def name(self) -> str:
273 """Returns the name of the package."""
274 return self._name
275
276 def ensure(self, state: PackageState):
277 """Ensures that a package is in a given state.
278
279 Args:
280 state: a `PackageState` to reconcile the package to
281
282 Raises:
283 PackageError from the underlying call to apt
284 """
285 if self._state is not state:
286 if state not in (PackageState.Present, PackageState.Latest):
287 self._remove()
288 else:
289 self._add()
290 self._state = state
291
292 @property
293 def present(self) -> bool:
294 """Returns whether or not a package is present."""
295 return self._state in (PackageState.Present, PackageState.Latest)
296
297 @property
298 def latest(self) -> bool:
299 """Returns whether the package is the most recent version."""
300 return self._state is PackageState.Latest
301
302 @property
303 def state(self) -> PackageState:
304 """Returns the current package state."""
305 return self._state
306
307 @state.setter
308 def state(self, state: PackageState) -> None:
309 """Sets the package state to a given value.
310
311 Args:
312 state: a `PackageState` to reconcile the package to
313
314 Raises:
315 PackageError from the underlying call to apt
316 """
317 if state in (PackageState.Latest, PackageState.Present):
318 self._add()
319 else:
320 self._remove()
321 self._state = state
322
323 @property
324 def version(self) -> "Version":
325 """Returns the version for a package."""
326 return self._version
327
328 @property
329 def epoch(self) -> str:
330 """Returns the epoch for a package. May be unset."""
331 return self._version.epoch
332
333 @property
334 def arch(self) -> str:
335 """Returns the architecture for a package."""
336 return self._arch
337
338 @property
339 def fullversion(self) -> str:
340 """Returns the name+epoch for a package."""
341 return "{}.{}".format(self._version, self._arch)
342
343 @staticmethod
344 def _get_epoch_from_version(version: str) -> Tuple[str, str]:
345 """Pull the epoch, if any, out of a version string."""
346 epoch_matcher = re.compile(r"^((?P<epoch>\d+):)?(?P<version>.*)")
347 matches = epoch_matcher.search(version).groupdict()
348 return matches.get("epoch", ""), matches.get("version")
349
350 @classmethod
351 def from_system(
352 cls, package: str, version: Optional[str] = "", arch: Optional[str] = ""
353 ) -> "DebianPackage":
354 """Locates a package, either on the system or known to apt, and serializes the information.
355
356 Args:
357 package: a string representing the package
358 version: an optional string if a specific version isr equested
359 arch: an optional architecture, defaulting to `dpkg --print-architecture`. If an
360 architecture is not specified, this will be used for selection.
361
362 """
363 try:
364 return DebianPackage.from_installed_package(package, version, arch)
365 except PackageNotFoundError:
366 logger.debug(
367 "package '%s' is not currently installed or has the wrong architecture.", package
368 )
369
370 # Ok, try `apt-cache ...`
371 try:
372 return DebianPackage.from_apt_cache(package, version, arch)
373 except (PackageNotFoundError, PackageError):
374 # If we get here, it's not known to the systems.
375 # This seems unnecessary, but virtually all `apt` commands have a return code of `100`,
376 # and providing meaningful error messages without this is ugly.
377 raise PackageNotFoundError(
378 "Package '{}{}' could not be found on the system or in the apt cache!".format(
379 package, ".{}".format(arch) if arch else ""
380 )
381 ) from None
382
383 @classmethod
384 def from_installed_package(
385 cls, package: str, version: Optional[str] = "", arch: Optional[str] = ""
386 ) -> "DebianPackage":
387 """Check whether the package is already installed and return an instance.
388
389 Args:
390 package: a string representing the package
391 version: an optional string if a specific version isr equested
392 arch: an optional architecture, defaulting to `dpkg --print-architecture`.
393 If an architecture is not specified, this will be used for selection.
394 """
395 system_arch = check_output(
396 ["dpkg", "--print-architecture"], universal_newlines=True
397 ).strip()
398 arch = arch if arch else system_arch
399
400 # Regexps are a really terrible way to do this. Thanks dpkg
401 output = ""
402 try:
403 output = check_output(["dpkg", "-l", package], stderr=PIPE, universal_newlines=True)
404 except CalledProcessError:
405 raise PackageNotFoundError("Package is not installed: {}".format(package)) from None
406
407 # Pop off the output from `dpkg -l' because there's no flag to
408 # omit it`
409 lines = str(output).splitlines()[5:]
410
411 dpkg_matcher = re.compile(
412 r"""
413 ^(?P<package_status>\w+?)\s+
414 (?P<package_name>.*?)(?P<throwaway_arch>:\w+?)?\s+
415 (?P<version>.*?)\s+
416 (?P<arch>\w+?)\s+
417 (?P<description>.*)
418 """,
419 re.VERBOSE,
420 )
421
422 for line in lines:
423 try:
424 matches = dpkg_matcher.search(line).groupdict()
425 package_status = matches["package_status"]
426
427 if not package_status.endswith("i"):
428 logger.debug(
429 "package '%s' in dpkg output but not installed, status: '%s'",
430 package,
431 package_status,
432 )
433 break
434
435 epoch, split_version = DebianPackage._get_epoch_from_version(matches["version"])
436 pkg = DebianPackage(
437 matches["package_name"],
438 split_version,
439 epoch,
440 matches["arch"],
441 PackageState.Present,
442 )
443 if (pkg.arch == "all" or pkg.arch == arch) and (
444 version == "" or str(pkg.version) == version
445 ):
446 return pkg
447 except AttributeError:
448 logger.warning("dpkg matcher could not parse line: %s", line)
449
450 # If we didn't find it, fail through
451 raise PackageNotFoundError("Package {}.{} is not installed!".format(package, arch))
452
453 @classmethod
454 def from_apt_cache(
455 cls, package: str, version: Optional[str] = "", arch: Optional[str] = ""
456 ) -> "DebianPackage":
457 """Check whether the package is already installed and return an instance.
458
459 Args:
460 package: a string representing the package
461 version: an optional string if a specific version isr equested
462 arch: an optional architecture, defaulting to `dpkg --print-architecture`.
463 If an architecture is not specified, this will be used for selection.
464 """
465 system_arch = check_output(
466 ["dpkg", "--print-architecture"], universal_newlines=True
467 ).strip()
468 arch = arch if arch else system_arch
469
470 # Regexps are a really terrible way to do this. Thanks dpkg
471 keys = ("Package", "Architecture", "Version")
472
473 try:
474 output = check_output(
475 ["apt-cache", "show", package], stderr=PIPE, universal_newlines=True
476 )
477 except CalledProcessError as e:
478 raise PackageError(
479 "Could not list packages in apt-cache: {}".format(e.output)
480 ) from None
481
482 pkg_groups = output.strip().split("\n\n")
483 keys = ("Package", "Architecture", "Version")
484
485 for pkg_raw in pkg_groups:
486 lines = str(pkg_raw).splitlines()
487 vals = {}
488 for line in lines:
489 if line.startswith(keys):
490 items = line.split(":", 1)
491 vals[items[0]] = items[1].strip()
492 else:
493 continue
494
495 epoch, split_version = DebianPackage._get_epoch_from_version(vals["Version"])
496 pkg = DebianPackage(
497 vals["Package"],
498 split_version,
499 epoch,
500 vals["Architecture"],
501 PackageState.Available,
502 )
503
504 if (pkg.arch == "all" or pkg.arch == arch) and (
505 version == "" or str(pkg.version) == version
506 ):
507 return pkg
508
509 # If we didn't find it, fail through
510 raise PackageNotFoundError("Package {}.{} is not in the apt cache!".format(package, arch))
511
512
513class Version:
514 """An abstraction around package versions.
515
516 This seems like it should be strictly unnecessary, except that `apt_pkg` is not usable inside a
517 venv, and wedging version comparisions into `DebianPackage` would overcomplicate it.
518
519 This class implements the algorithm found here:
520 https://www.debian.org/doc/debian-policy/ch-controlfields.html#version
521 """
522
523 def __init__(self, version: str, epoch: str):
524 self._version = version
525 self._epoch = epoch or ""
526
527 def __repr__(self):
528 """A representation of the package."""
529 return "<{}.{}: {}>".format(self.__module__, self.__class__.__name__, self.__dict__)
530
531 def __str__(self):
532 """A human-readable representation of the package."""
533 return "{}{}".format("{}:".format(self._epoch) if self._epoch else "", self._version)
534
535 @property
536 def epoch(self):
537 """Returns the epoch for a package. May be empty."""
538 return self._epoch
539
540 @property
541 def number(self) -> str:
542 """Returns the version number for a package."""
543 return self._version
544
545 def _get_parts(self, version: str) -> Tuple[str, str]:
546 """Separate the version into component upstream and Debian pieces."""
547 try:
548 version.rindex("-")
549 except ValueError:
550 # No hyphens means no Debian version
551 return version, "0"
552
553 upstream, debian = version.rsplit("-", 1)
554 return upstream, debian
555
556 def _listify(self, revision: str) -> List[str]:
557 """Split a revision string into a listself.
558
559 This list is comprised of alternating between strings and numbers,
560 padded on either end to always be "str, int, str, int..." and
561 always be of even length. This allows us to trivially implement the
562 comparison algorithm described.
563 """
564 result = []
565 while revision:
566 rev_1, remains = self._get_alphas(revision)
567 rev_2, remains = self._get_digits(remains)
568 result.extend([rev_1, rev_2])
569 revision = remains
570 return result
571
572 def _get_alphas(self, revision: str) -> Tuple[str, str]:
573 """Return a tuple of the first non-digit characters of a revision."""
574 # get the index of the first digit
575 for i, char in enumerate(revision):
576 if char.isdigit():
577 if i == 0:
578 return "", revision
579 return revision[0:i], revision[i:]
580 # string is entirely alphas
581 return revision, ""
582
583 def _get_digits(self, revision: str) -> Tuple[int, str]:
584 """Return a tuple of the first integer characters of a revision."""
585 # If the string is empty, return (0,'')
586 if not revision:
587 return 0, ""
588 # get the index of the first non-digit
589 for i, char in enumerate(revision):
590 if not char.isdigit():
591 if i == 0:
592 return 0, revision
593 return int(revision[0:i]), revision[i:]
594 # string is entirely digits
595 return int(revision), ""
596
597 def _dstringcmp(self, a, b): # noqa: C901
598 """Debian package version string section lexical sort algorithm.
599
600 The lexical comparison is a comparison of ASCII values modified so
601 that all the letters sort earlier than all the non-letters and so that
602 a tilde sorts before anything, even the end of a part.
603 """
604 if a == b:
605 return 0
606 try:
607 for i, char in enumerate(a):
608 if char == b[i]:
609 continue
610 # "a tilde sorts before anything, even the end of a part"
611 # (emptyness)
612 if char == "~":
613 return -1
614 if b[i] == "~":
615 return 1
616 # "all the letters sort earlier than all the non-letters"
617 if char.isalpha() and not b[i].isalpha():
618 return -1
619 if not char.isalpha() and b[i].isalpha():
620 return 1
621 # otherwise lexical sort
622 if ord(char) > ord(b[i]):
623 return 1
624 if ord(char) < ord(b[i]):
625 return -1
626 except IndexError:
627 # a is longer than b but otherwise equal, greater unless there are tildes
628 if char == "~":
629 return -1
630 return 1
631 # if we get here, a is shorter than b but otherwise equal, so check for tildes...
632 if b[len(a)] == "~":
633 return 1
634 return -1
635
636 def _compare_revision_strings(self, first: str, second: str): # noqa: C901
637 """Compare two debian revision strings."""
638 if first == second:
639 return 0
640
641 # listify pads results so that we will always be comparing ints to ints
642 # and strings to strings (at least until we fall off the end of a list)
643 first_list = self._listify(first)
644 second_list = self._listify(second)
645 if first_list == second_list:
646 return 0
647 try:
648 for i, item in enumerate(first_list):
649 # explicitly raise IndexError if we've fallen off the edge of list2
650 if i >= len(second_list):
651 raise IndexError
652 # if the items are equal, next
653 if item == second_list[i]:
654 continue
655 # numeric comparison
656 if isinstance(item, int):
657 if item > second_list[i]:
658 return 1
659 if item < second_list[i]:
660 return -1
661 else:
662 # string comparison
663 return self._dstringcmp(item, second_list[i])
664 except IndexError:
665 # rev1 is longer than rev2 but otherwise equal, hence greater
666 # ...except for goddamn tildes
667 if first_list[len(second_list)][0][0] == "~":
668 return 1
669 return 1
670 # rev1 is shorter than rev2 but otherwise equal, hence lesser
671 # ...except for goddamn tildes
672 if second_list[len(first_list)][0][0] == "~":
673 return -1
674 return -1
675
676 def _compare_version(self, other) -> int:
677 if (self.number, self.epoch) == (other.number, other.epoch):
678 return 0
679
680 if self.epoch < other.epoch:
681 return -1
682 if self.epoch > other.epoch:
683 return 1
684
685 # If none of these are true, follow the algorithm
686 upstream_version, debian_version = self._get_parts(self.number)
687 other_upstream_version, other_debian_version = self._get_parts(other.number)
688
689 upstream_cmp = self._compare_revision_strings(upstream_version, other_upstream_version)
690 if upstream_cmp != 0:
691 return upstream_cmp
692
693 debian_cmp = self._compare_revision_strings(debian_version, other_debian_version)
694 if debian_cmp != 0:
695 return debian_cmp
696
697 return 0
698
699 def __lt__(self, other) -> bool:
700 """Less than magic method impl."""
701 return self._compare_version(other) < 0
702
703 def __eq__(self, other) -> bool:
704 """Equality magic method impl."""
705 return self._compare_version(other) == 0
706
707 def __gt__(self, other) -> bool:
708 """Greater than magic method impl."""
709 return self._compare_version(other) > 0
710
711 def __le__(self, other) -> bool:
712 """Less than or equal to magic method impl."""
713 return self.__eq__(other) or self.__lt__(other)
714
715 def __ge__(self, other) -> bool:
716 """Greater than or equal to magic method impl."""
717 return self.__gt__(other) or self.__eq__(other)
718
719 def __ne__(self, other) -> bool:
720 """Not equal to magic method impl."""
721 return not self.__eq__(other)
722
723
724def add_package(
725 package_names: Union[str, List[str]],
726 version: Optional[str] = "",
727 arch: Optional[str] = "",
728 update_cache: Optional[bool] = False,
729) -> Union[DebianPackage, List[DebianPackage]]:
730 """Add a package or list of packages to the system.
731
732 Args:
733 name: the name(s) of the package(s)
734 version: an (Optional) version as a string. Defaults to the latest known
735 arch: an optional architecture for the package
736 update_cache: whether or not to run `apt-get update` prior to operating
737
738 Raises:
739 PackageNotFoundError if the package is not in the cache.
740 """
741 cache_refreshed = False
742 if update_cache:
743 update()
744 cache_refreshed = True
745
746 packages = {"success": [], "retry": [], "failed": []}
747
748 package_names = [package_names] if type(package_names) is str else package_names
749 if not package_names:
750 raise TypeError("Expected at least one package name to add, received zero!")
751
752 if len(package_names) != 1 and version:
753 raise TypeError(
754 "Explicit version should not be set if more than one package is being added!"
755 )
756
757 for p in package_names:
758 pkg, success = _add(p, version, arch)
759 if success:
760 packages["success"].append(pkg)
761 else:
762 logger.warning("failed to locate and install/update '%s'", pkg)
763 packages["retry"].append(p)
764
765 if packages["retry"] and not cache_refreshed:
766 logger.info("updating the apt-cache and retrying installation of failed packages.")
767 update()
768
769 for p in packages["retry"]:
770 pkg, success = _add(p, version, arch)
771 if success:
772 packages["success"].append(pkg)
773 else:
774 packages["failed"].append(p)
775
776 if packages["failed"]:
777 raise PackageError("Failed to install packages: {}".format(", ".join(packages["failed"])))
778
779 return packages["success"] if len(packages["success"]) > 1 else packages["success"][0]
780
781
782def _add(
783 name: str,
784 version: Optional[str] = "",
785 arch: Optional[str] = "",
786) -> Tuple[Union[DebianPackage, str], bool]:
787 """Adds a package.
788
789 Args:
790 name: the name(s) of the package(s)
791 version: an (Optional) version as a string. Defaults to the latest known
792 arch: an optional architecture for the package
793
794 Returns: a tuple of `DebianPackage` if found, or a :str: if it is not, and
795 a boolean indicating success
796 """
797 try:
798 pkg = DebianPackage.from_system(name, version, arch)
799 pkg.ensure(state=PackageState.Present)
800 return pkg, True
801 except PackageNotFoundError:
802 return name, False
803
804
805def remove_package(
806 package_names: Union[str, List[str]]
807) -> Union[DebianPackage, List[DebianPackage]]:
808 """Removes a package from the system.
809
810 Args:
811 package_names: the name of a package
812
813 Raises:
814 PackageNotFoundError if the package is not found.
815 """
816 packages = []
817
818 package_names = [package_names] if type(package_names) is str else package_names
819 if not package_names:
820 raise TypeError("Expected at least one package name to add, received zero!")
821
822 for p in package_names:
823 try:
824 pkg = DebianPackage.from_installed_package(p)
825 pkg.ensure(state=PackageState.Absent)
826 packages.append(pkg)
827 except PackageNotFoundError:
828 logger.info("package '%s' was requested for removal, but it was not installed.", p)
829
830 # the list of packages will be empty when no package is removed
831 logger.debug("packages: '%s'", packages)
832 return packages[0] if len(packages) == 1 else packages
833
834
835def update() -> None:
836 """Updates the apt cache via `apt-get update`."""
837 check_call(["apt-get", "update"], stderr=PIPE, stdout=PIPE)
838
839
840class InvalidSourceError(Error):
841 """Exceptions for invalid source entries."""
842
843
844class GPGKeyError(Error):
845 """Exceptions for GPG keys."""
846
847
848class DebianRepository:
849 """An abstraction to represent a repository."""
850
851 def __init__(
852 self,
853 enabled: bool,
854 repotype: str,
855 uri: str,
856 release: str,
857 groups: List[str],
858 filename: Optional[str] = "",
859 gpg_key_filename: Optional[str] = "",
860 options: Optional[dict] = None,
861 ):
862 self._enabled = enabled
863 self._repotype = repotype
864 self._uri = uri
865 self._release = release
866 self._groups = groups
867 self._filename = filename
868 self._gpg_key_filename = gpg_key_filename
869 self._options = options
870
871 @property
872 def enabled(self):
873 """Return whether or not the repository is enabled."""
874 return self._enabled
875
876 @property
877 def repotype(self):
878 """Return whether it is binary or source."""
879 return self._repotype
880
881 @property
882 def uri(self):
883 """Return the URI."""
884 return self._uri
885
886 @property
887 def release(self):
888 """Return which Debian/Ubuntu releases it is valid for."""
889 return self._release
890
891 @property
892 def groups(self):
893 """Return the enabled package groups."""
894 return self._groups
895
896 @property
897 def filename(self):
898 """Returns the filename for a repository."""
899 return self._filename
900
901 @filename.setter
902 def filename(self, fname: str) -> None:
903 """Sets the filename used when a repo is written back to diskself.
904
905 Args:
906 fname: a filename to write the repository information to.
907 """
908 if not fname.endswith(".list"):
909 raise InvalidSourceError("apt source filenames should end in .list!")
910
911 self._filename = fname
912
913 @property
914 def gpg_key(self):
915 """Returns the path to the GPG key for this repository."""
916 return self._gpg_key_filename
917
918 @property
919 def options(self):
920 """Returns any additional repo options which are set."""
921 return self._options
922
923 def make_options_string(self) -> str:
924 """Generate the complete options string for a a repository.
925
926 Combining `gpg_key`, if set, and the rest of the options to find
927 a complex repo string.
928 """
929 options = self._options if self._options else {}
930 if self._gpg_key_filename:
931 options["signed-by"] = self._gpg_key_filename
932
933 return (
934 "[{}] ".format(" ".join(["{}={}".format(k, v) for k, v in options.items()]))
935 if options
936 else ""
937 )
938
939 @staticmethod
940 def prefix_from_uri(uri: str) -> str:
941 """Get a repo list prefix from the uri, depending on whether a path is set."""
942 uridetails = urlparse(uri)
943 path = (
944 uridetails.path.lstrip("/").replace("/", "-") if uridetails.path else uridetails.netloc
945 )
946 return "/etc/apt/sources.list.d/{}".format(path)
947
948 @staticmethod
949 def from_repo_line(repo_line: str, write_file: Optional[bool] = True) -> "DebianRepository":
950 """Instantiate a new `DebianRepository` a `sources.list` entry line.
951
952 Args:
953 repo_line: a string representing a repository entry
954 write_file: boolean to enable writing the new repo to disk
955 """
956 repo = RepositoryMapping._parse(repo_line, "UserInput")
957 fname = "{}-{}.list".format(
958 DebianRepository.prefix_from_uri(repo.uri), repo.release.replace("/", "-")
959 )
960 repo.filename = fname
961
962 options = repo.options if repo.options else {}
963 if repo.gpg_key:
964 options["signed-by"] = repo.gpg_key
965
966 # For Python 3.5 it's required to use sorted in the options dict in order to not have
967 # different results in the order of the options between executions.
968 options_str = (
969 "[{}] ".format(" ".join(["{}={}".format(k, v) for k, v in sorted(options.items())]))
970 if options
971 else ""
972 )
973
974 if write_file:
975 with open(fname, "wb") as f:
976 f.write(
977 (
978 "{}".format("#" if not repo.enabled else "")
979 + "{} {}{} ".format(repo.repotype, options_str, repo.uri)
980 + "{} {}\n".format(repo.release, " ".join(repo.groups))
981 ).encode("utf-8")
982 )
983
984 return repo
985
986 def disable(self) -> None:
987 """Remove this repository from consideration.
988
989 Disable it instead of removing from the repository file.
990 """
991 searcher = "{} {}{} {}".format(
992 self.repotype, self.make_options_string(), self.uri, self.release
993 )
994 for line in fileinput.input(self._filename, inplace=True):
995 if re.match(r"^{}\s".format(re.escape(searcher)), line):
996 print("# {}".format(line), end="")
997 else:
998 print(line, end="")
999
1000 def import_key(self, key: str) -> None:
1001 """Import an ASCII Armor key.
1002
1003 A Radix64 format keyid is also supported for backwards
1004 compatibility. In this case Ubuntu keyserver will be
1005 queried for a key via HTTPS by its keyid. This method
1006 is less preferrable because https proxy servers may
1007 require traffic decryption which is equivalent to a
1008 man-in-the-middle attack (a proxy server impersonates
1009 keyserver TLS certificates and has to be explicitly
1010 trusted by the system).
1011
1012 Args:
1013 key: A GPG key in ASCII armor format,
1014 including BEGIN and END markers or a keyid.
1015
1016 Raises:
1017 GPGKeyError if the key could not be imported
1018 """
1019 key = key.strip()
1020 if "-" in key or "\n" in key:
1021 # Send everything not obviously a keyid to GPG to import, as
1022 # we trust its validation better than our own. eg. handling
1023 # comments before the key.
1024 logger.debug("PGP key found (looks like ASCII Armor format)")
1025 if (
1026 "-----BEGIN PGP PUBLIC KEY BLOCK-----" in key
1027 and "-----END PGP PUBLIC KEY BLOCK-----" in key
1028 ):
1029 logger.debug("Writing provided PGP key in the binary format")
1030 key_bytes = key.encode("utf-8")
1031 key_name = self._get_keyid_by_gpg_key(key_bytes)
1032 key_gpg = self._dearmor_gpg_key(key_bytes)
1033 self._gpg_key_filename = "/etc/apt/trusted.gpg.d/{}.gpg".format(key_name)
1034 self._write_apt_gpg_keyfile(key_name=self._gpg_key_filename, key_material=key_gpg)
1035 else:
1036 raise GPGKeyError("ASCII armor markers missing from GPG key")
1037 else:
1038 logger.warning(
1039 "PGP key found (looks like Radix64 format). "
1040 "SECURELY importing PGP key from keyserver; "
1041 "full key not provided."
1042 )
1043 # as of bionic add-apt-repository uses curl with an HTTPS keyserver URL
1044 # to retrieve GPG keys. `apt-key adv` command is deprecated as is
1045 # apt-key in general as noted in its manpage. See lp:1433761 for more
1046 # history. Instead, /etc/apt/trusted.gpg.d is used directly to drop
1047 # gpg
1048 key_asc = self._get_key_by_keyid(key)
1049 # write the key in GPG format so that apt-key list shows it
1050 key_gpg = self._dearmor_gpg_key(key_asc.encode("utf-8"))
1051 self._gpg_key_filename = "/etc/apt/trusted.gpg.d/{}.gpg".format(key)
1052 self._write_apt_gpg_keyfile(key_name=key, key_material=key_gpg)
1053
1054 @staticmethod
1055 def _get_keyid_by_gpg_key(key_material: bytes) -> str:
1056 """Get a GPG key fingerprint by GPG key material.
1057
1058 Gets a GPG key fingerprint (40-digit, 160-bit) by the ASCII armor-encoded
1059 or binary GPG key material. Can be used, for example, to generate file
1060 names for keys passed via charm options.
1061 """
1062 # Use the same gpg command for both Xenial and Bionic
1063 cmd = ["gpg", "--with-colons", "--with-fingerprint"]
1064 ps = subprocess.run(
1065 cmd,
1066 stdout=PIPE,
1067 stderr=PIPE,
1068 input=key_material,
1069 )
1070 out, err = ps.stdout.decode(), ps.stderr.decode()
1071 if "gpg: no valid OpenPGP data found." in err:
1072 raise GPGKeyError("Invalid GPG key material provided")
1073 # from gnupg2 docs: fpr :: Fingerprint (fingerprint is in field 10)
1074 return re.search(r"^fpr:{9}([0-9A-F]{40}):$", out, re.MULTILINE).group(1)
1075
1076 @staticmethod
1077 def _get_key_by_keyid(keyid: str) -> str:
1078 """Get a key via HTTPS from the Ubuntu keyserver.
1079
1080 Different key ID formats are supported by SKS keyservers (the longer ones
1081 are more secure, see "dead beef attack" and https://evil32.com/). Since
1082 HTTPS is used, if SSLBump-like HTTPS proxies are in place, they will
1083 impersonate keyserver.ubuntu.com and generate a certificate with
1084 keyserver.ubuntu.com in the CN field or in SubjAltName fields of a
1085 certificate. If such proxy behavior is expected it is necessary to add the
1086 CA certificate chain containing the intermediate CA of the SSLBump proxy to
1087 every machine that this code runs on via ca-certs cloud-init directive (via
1088 cloudinit-userdata model-config) or via other means (such as through a
1089 custom charm option). Also note that DNS resolution for the hostname in a
1090 URL is done at a proxy server - not at the client side.
1091 8-digit (32 bit) key ID
1092 https://keyserver.ubuntu.com/pks/lookup?search=0x4652B4E6
1093 16-digit (64 bit) key ID
1094 https://keyserver.ubuntu.com/pks/lookup?search=0x6E85A86E4652B4E6
1095 40-digit key ID:
1096 https://keyserver.ubuntu.com/pks/lookup?search=0x35F77D63B5CEC106C577ED856E85A86E4652B4E6
1097
1098 Args:
1099 keyid: An 8, 16 or 40 hex digit keyid to find a key for
1100
1101 Returns:
1102 A string contining key material for the specified GPG key id
1103
1104
1105 Raises:
1106 subprocess.CalledProcessError
1107 """
1108 # options=mr - machine-readable output (disables html wrappers)
1109 keyserver_url = (
1110 "https://keyserver.ubuntu.com" "/pks/lookup?op=get&options=mr&exact=on&search=0x{}"
1111 )
1112 curl_cmd = ["curl", keyserver_url.format(keyid)]
1113 # use proxy server settings in order to retrieve the key
1114 return check_output(curl_cmd).decode()
1115
1116 @staticmethod
1117 def _dearmor_gpg_key(key_asc: bytes) -> bytes:
1118 """Converts a GPG key in the ASCII armor format to the binary format.
1119
1120 Args:
1121 key_asc: A GPG key in ASCII armor format.
1122
1123 Returns:
1124 A GPG key in binary format as a string
1125
1126 Raises:
1127 GPGKeyError
1128 """
1129 ps = subprocess.run(["gpg", "--dearmor"], stdout=PIPE, stderr=PIPE, input=key_asc)
1130 out, err = ps.stdout, ps.stderr.decode()
1131 if "gpg: no valid OpenPGP data found." in err:
1132 raise GPGKeyError(
1133 "Invalid GPG key material. Check your network setup"
1134 " (MTU, routing, DNS) and/or proxy server settings"
1135 " as well as destination keyserver status."
1136 )
1137 else:
1138 return out
1139
1140 @staticmethod
1141 def _write_apt_gpg_keyfile(key_name: str, key_material: bytes) -> None:
1142 """Writes GPG key material into a file at a provided path.
1143
1144 Args:
1145 key_name: A key name to use for a key file (could be a fingerprint)
1146 key_material: A GPG key material (binary)
1147 """
1148 with open(key_name, "wb") as keyf:
1149 keyf.write(key_material)
1150
1151
1152class RepositoryMapping(Mapping):
1153 """An representation of known repositories.
1154
1155 Instantiation of `RepositoryMapping` will iterate through the
1156 filesystem, parse out repository files in `/etc/apt/...`, and create
1157 `DebianRepository` objects in this list.
1158
1159 Typical usage:
1160
1161 repositories = apt.RepositoryMapping()
1162 repositories.add(DebianRepository(
1163 enabled=True, repotype="deb", uri="https://example.com", release="focal",
1164 groups=["universe"]
1165 ))
1166 """
1167
1168 def __init__(self):
1169 self._repository_map = {}
1170 # Repositories that we're adding -- used to implement mode param
1171 self.default_file = "/etc/apt/sources.list"
1172
1173 # read sources.list if it exists
1174 if os.path.isfile(self.default_file):
1175 self.load(self.default_file)
1176
1177 # read sources.list.d
1178 for file in glob.iglob("/etc/apt/sources.list.d/*.list"):
1179 self.load(file)
1180
1181 def __contains__(self, key: str) -> bool:
1182 """Magic method for checking presence of repo in mapping."""
1183 return key in self._repository_map
1184
1185 def __len__(self) -> int:
1186 """Return number of repositories in map."""
1187 return len(self._repository_map)
1188
1189 def __iter__(self) -> Iterable[DebianRepository]:
1190 """Iterator magic method for RepositoryMapping."""
1191 return iter(self._repository_map.values())
1192
1193 def __getitem__(self, repository_uri: str) -> DebianRepository:
1194 """Return a given `DebianRepository`."""
1195 return self._repository_map[repository_uri]
1196
1197 def __setitem__(self, repository_uri: str, repository: DebianRepository) -> None:
1198 """Add a `DebianRepository` to the cache."""
1199 self._repository_map[repository_uri] = repository
1200
1201 def load(self, filename: str):
1202 """Load a repository source file into the cache.
1203
1204 Args:
1205 filename: the path to the repository file
1206 """
1207 parsed = []
1208 skipped = []
1209 with open(filename, "r") as f:
1210 for n, line in enumerate(f):
1211 try:
1212 repo = self._parse(line, filename)
1213 except InvalidSourceError:
1214 skipped.append(n)
1215 else:
1216 repo_identifier = "{}-{}-{}".format(repo.repotype, repo.uri, repo.release)
1217 self._repository_map[repo_identifier] = repo
1218 parsed.append(n)
1219 logger.debug("parsed repo: '%s'", repo_identifier)
1220
1221 if skipped:
1222 skip_list = ", ".join(str(s) for s in skipped)
1223 logger.debug("skipped the following lines in file '%s': %s", filename, skip_list)
1224
1225 if parsed:
1226 logger.info("parsed %d apt package repositories", len(parsed))
1227 else:
1228 raise InvalidSourceError("all repository lines in '{}' were invalid!".format(filename))
1229
1230 @staticmethod
1231 def _parse(line: str, filename: str) -> DebianRepository:
1232 """Parse a line in a sources.list file.
1233
1234 Args:
1235 line: a single line from `load` to parse
1236 filename: the filename being read
1237
1238 Raises:
1239 InvalidSourceError if the source type is unknown
1240 """
1241 enabled = True
1242 repotype = uri = release = gpg_key = ""
1243 options = {}
1244 groups = []
1245
1246 line = line.strip()
1247 if line.startswith("#"):
1248 enabled = False
1249 line = line[1:]
1250
1251 # Check for "#" in the line and treat a part after it as a comment then strip it off.
1252 i = line.find("#")
1253 if i > 0:
1254 line = line[:i]
1255
1256 # Split a source into substrings to initialize a new repo.
1257 source = line.strip()
1258 if source:
1259 # Match any repo options, and get a dict representation.
1260 for v in re.findall(OPTIONS_MATCHER, source):
1261 opts = dict(o.split("=") for o in v.strip("[]").split())
1262 # Extract the 'signed-by' option for the gpg_key
1263 gpg_key = opts.pop("signed-by", "")
1264 options = opts
1265
1266 # Remove any options from the source string and split the string into chunks
1267 source = re.sub(OPTIONS_MATCHER, "", source)
1268 chunks = source.split()
1269
1270 # Check we've got a valid list of chunks
1271 if len(chunks) < 3 or chunks[0] not in VALID_SOURCE_TYPES:
1272 raise InvalidSourceError("An invalid sources line was found in %s!", filename)
1273
1274 repotype = chunks[0]
1275 uri = chunks[1]
1276 release = chunks[2]
1277 groups = chunks[3:]
1278
1279 return DebianRepository(
1280 enabled, repotype, uri, release, groups, filename, gpg_key, options
1281 )
1282 else:
1283 raise InvalidSourceError("An invalid sources line was found in %s!", filename)
1284
1285 def add(self, repo: DebianRepository, default_filename: Optional[bool] = False) -> None:
1286 """Add a new repository to the system.
1287
1288 Args:
1289 repo: a `DebianRepository` object
1290 default_filename: an (Optional) filename if the default is not desirable
1291 """
1292 new_filename = "{}-{}.list".format(
1293 DebianRepository.prefix_from_uri(repo.uri), repo.release.replace("/", "-")
1294 )
1295
1296 fname = repo.filename or new_filename
1297
1298 options = repo.options if repo.options else {}
1299 if repo.gpg_key:
1300 options["signed-by"] = repo.gpg_key
1301
1302 with open(fname, "wb") as f:
1303 f.write(
1304 (
1305 "{}".format("#" if not repo.enabled else "")
1306 + "{} {}{} ".format(repo.repotype, repo.make_options_string(), repo.uri)
1307 + "{} {}\n".format(repo.release, " ".join(repo.groups))
1308 ).encode("utf-8")
1309 )
1310
1311 self._repository_map["{}-{}-{}".format(repo.repotype, repo.uri, repo.release)] = repo
1312
1313 def disable(self, repo: DebianRepository) -> None:
1314 """Remove a repository. Disable by default.
1315
1316 Args:
1317 repo: a `DebianRepository` to disable
1318 """
1319 searcher = "{} {}{} {}".format(
1320 repo.repotype, repo.make_options_string(), repo.uri, repo.release
1321 )
1322
1323 for line in fileinput.input(repo.filename, inplace=True):
1324 if re.match(r"^{}\s".format(re.escape(searcher)), line):
1325 print("# {}".format(line), end="")
1326 else:
1327 print(line, end="")
1328
1329 self._repository_map["{}-{}-{}".format(repo.repotype, repo.uri, repo.release)] = repo
diff --git a/run_tests b/run_tests
index 362a995..ef9f058 100755
--- a/run_tests
+++ b/run_tests
@@ -1,17 +1,18 @@
1#!/bin/sh -e1#!/bin/sh -e
2# Copyright 2021 Canonical Ltd.2# Copyright 2021 Canonical Ltd.
3# See LICENSE file for licensing details.3# See LICENSE file for licensing details.
4PYTHONPATH_INCLUDES="src:lib"
45
5if [ -z "$VIRTUAL_ENV" -a -d venv/ ]; then6if [ -z "$VIRTUAL_ENV" -a -d venv/ ]; then
6 . venv/bin/activate7 . venv/bin/activate
7fi8fi
89
9if [ -z "$PYTHONPATH" ]; then10if [ -z "$PYTHONPATH" ]; then
10 export PYTHONPATH=src11 export PYTHONPATH="$PYTHONPATH_INCLUDES"
11else12else
12 export PYTHONPATH="src:$PYTHONPATH"13 export PYTHONPATH="$PYTHONPATH_INCLUDES:$PYTHONPATH"
13fi14fi
1415
15flake816flake8 --extend-exclude operator_libs_linux
16coverage run --source=src -m unittest -v "$@"17coverage run --source=src -m unittest -v "$@"
17coverage report -m18coverage report -m
diff --git a/src/charm.py b/src/charm.py
index 34878c0..84362d3 100755
--- a/src/charm.py
+++ b/src/charm.py
@@ -10,6 +10,7 @@ import os
10import subprocess10import subprocess
11import yaml11import yaml
1212
13from charms.operator_libs_linux.v0 import apt
13from ops.charm import CharmBase14from ops.charm import CharmBase
14from ops.framework import StoredState15from ops.framework import StoredState
15from ops.main import main16from ops.main import main
@@ -29,17 +30,6 @@ def remove_ppa(ppa, env):
29 subprocess.check_call(["add-apt-repository", "--remove", "--yes", ppa], env=env)30 subprocess.check_call(["add-apt-repository", "--remove", "--yes", ppa], env=env)
3031
3132
32def install_package(package):
33 """Install specified apt package (after performing an apt update)"""
34 subprocess.check_call(["apt", "update"])
35 subprocess.check_call(["apt", "install", "--yes", "--quiet", package])
36
37
38def remove_package(package):
39 """Remove specified apt package"""
40 subprocess.check_call(["apt", "remove", "--yes", "--quiet", package])
41
42
43def update_configuration(contract_url):33def update_configuration(contract_url):
44 """Write the contract_url to the uaclient configuration file"""34 """Write the contract_url to the uaclient configuration file"""
45 with open("/etc/ubuntu-advantage/uaclient.conf", "r+") as f:35 with open("/etc/ubuntu-advantage/uaclient.conf", "r+") as f:
@@ -137,9 +127,9 @@ class UbuntuAdvantageCharm(CharmBase):
137 """Install apt package if necessary"""127 """Install apt package if necessary"""
138 if self._state.package_needs_installing:128 if self._state.package_needs_installing:
139 logger.info("Removing package ubuntu-advantage-tools")129 logger.info("Removing package ubuntu-advantage-tools")
140 remove_package("ubuntu-advantage-tools")130 apt.remove_package("ubuntu-advantage-tools")
141 logger.info("Installing package ubuntu-advantage-tools")131 logger.info("Installing package ubuntu-advantage-tools")
142 install_package("ubuntu-advantage-tools")132 apt.add_package("ubuntu-advantage-tools", update_cache=True)
143 self._state.package_needs_installing = False133 self._state.package_needs_installing = False
144134
145 def _handle_subscription_state(self):135 def _handle_subscription_state(self):
diff --git a/tests/test_charm.py b/tests/test_charm.py
index f55d844..37da2a7 100644
--- a/tests/test_charm.py
+++ b/tests/test_charm.py
@@ -80,7 +80,8 @@ class TestCharm(TestCase):
80 "check_call": patch("subprocess.check_call").start(),80 "check_call": patch("subprocess.check_call").start(),
81 "check_output": patch("subprocess.check_output").start(),81 "check_output": patch("subprocess.check_output").start(),
82 "open": patch("builtins.open").start(),82 "open": patch("builtins.open").start(),
83 "environ": patch.dict("os.environ", clear=True).start()83 "environ": patch.dict("os.environ", clear=True).start(),
84 "apt": patch("charm.apt").start()
84 }85 }
85 self.mocks["check_output"].side_effect = [86 self.mocks["check_output"].side_effect = [
86 STATUS_DETACHED87 STATUS_DETACHED
@@ -100,25 +101,21 @@ class TestCharm(TestCase):
100101
101 def test_config_changed_ppa_new(self):102 def test_config_changed_ppa_new(self):
102 self.harness.update_config({"ppa": "ppa:ua-client/stable"})103 self.harness.update_config({"ppa": "ppa:ua-client/stable"})
103 self.assertEqual(self.mocks["check_call"].call_count, 6)104 self.assertEqual(self.mocks["check_call"].call_count, 3)
104 self.mocks["check_call"].assert_has_calls(self._add_ua_proxy_setup_calls([105 self.mocks["check_call"].assert_has_calls(self._add_ua_proxy_setup_calls([
105 call(["add-apt-repository", "--yes", "ppa:ua-client/stable"], env=self.env),106 call(["add-apt-repository", "--yes", "ppa:ua-client/stable"], env=self.env),
106 call(["apt", "remove", "--yes", "--quiet", "ubuntu-advantage-tools"]),
107 call(["apt", "update"]),
108 call(["apt", "install", "--yes", "--quiet", "ubuntu-advantage-tools"])
109 ]))107 ]))
108 self._assert_apt_calls()
110 self.assertEqual(self.harness.charm._state.ppa, "ppa:ua-client/stable")109 self.assertEqual(self.harness.charm._state.ppa, "ppa:ua-client/stable")
111 self.assertFalse(self.harness.charm._state.package_needs_installing)110 self.assertFalse(self.harness.charm._state.package_needs_installing)
112111
113 def test_config_changed_ppa_updated(self):112 def test_config_changed_ppa_updated(self):
114 self.harness.update_config({"ppa": "ppa:ua-client/stable"})113 self.harness.update_config({"ppa": "ppa:ua-client/stable"})
115 self.assertEqual(self.mocks["check_call"].call_count, 6)114 self.assertEqual(self.mocks["check_call"].call_count, 3)
116 self.mocks["check_call"].assert_has_calls(self._add_ua_proxy_setup_calls([115 self.mocks["check_call"].assert_has_calls(self._add_ua_proxy_setup_calls([
117 call(["add-apt-repository", "--yes", "ppa:ua-client/stable"], env=self.env),116 call(["add-apt-repository", "--yes", "ppa:ua-client/stable"], env=self.env),
118 call(["apt", "remove", "--yes", "--quiet", "ubuntu-advantage-tools"]),
119 call(["apt", "update"]),
120 call(["apt", "install", "--yes", "--quiet", "ubuntu-advantage-tools"])
121 ]))117 ]))
118 self._assert_apt_calls()
122 self.assertEqual(self.harness.charm._state.ppa, "ppa:ua-client/stable")119 self.assertEqual(self.harness.charm._state.ppa, "ppa:ua-client/stable")
123 self.assertFalse(self.harness.charm._state.package_needs_installing)120 self.assertFalse(self.harness.charm._state.package_needs_installing)
124121
@@ -126,29 +123,26 @@ class TestCharm(TestCase):
126 STATUS_DETACHED123 STATUS_DETACHED
127 ]124 ]
128 self.mocks["check_call"].reset_mock()125 self.mocks["check_call"].reset_mock()
126 self.mocks["apt"].reset_mock()
129 self.harness.update_config({"ppa": "ppa:different-client/unstable"})127 self.harness.update_config({"ppa": "ppa:different-client/unstable"})
130 self.assertEqual(self.mocks["check_call"].call_count, 7)128 self.assertEqual(self.mocks["check_call"].call_count, 4)
131 self.mocks["check_call"].assert_has_calls(self._add_ua_proxy_setup_calls([129 self.mocks["check_call"].assert_has_calls(self._add_ua_proxy_setup_calls([
132 call(["add-apt-repository", "--remove", "--yes", "ppa:ua-client/stable"],130 call(["add-apt-repository", "--remove", "--yes", "ppa:ua-client/stable"],
133 env=self.env),131 env=self.env),
134 call(["add-apt-repository", "--yes", "ppa:different-client/unstable"],132 call(["add-apt-repository", "--yes", "ppa:different-client/unstable"],
135 env=self.env),133 env=self.env),
136 call(["apt", "remove", "--yes", "--quiet", "ubuntu-advantage-tools"]),
137 call(["apt", "update"]),
138 call(["apt", "install", "--yes", "--quiet", "ubuntu-advantage-tools"])
139 ]))134 ]))
135 self._assert_apt_calls()
140 self.assertEqual(self.harness.charm._state.ppa, "ppa:different-client/unstable")136 self.assertEqual(self.harness.charm._state.ppa, "ppa:different-client/unstable")
141 self.assertFalse(self.harness.charm._state.package_needs_installing)137 self.assertFalse(self.harness.charm._state.package_needs_installing)
142138
143 def test_config_changed_ppa_unmodified(self):139 def test_config_changed_ppa_unmodified(self):
144 self.harness.update_config({"ppa": "ppa:ua-client/stable"})140 self.harness.update_config({"ppa": "ppa:ua-client/stable"})
145 self.assertEqual(self.mocks["check_call"].call_count, 6)141 self.assertEqual(self.mocks["check_call"].call_count, 3)
146 self.mocks["check_call"].assert_has_calls(self._add_ua_proxy_setup_calls([142 self.mocks["check_call"].assert_has_calls(self._add_ua_proxy_setup_calls([
147 call(["add-apt-repository", "--yes", "ppa:ua-client/stable"], env=self.env),143 call(["add-apt-repository", "--yes", "ppa:ua-client/stable"], env=self.env),
148 call(["apt", "remove", "--yes", "--quiet", "ubuntu-advantage-tools"]),
149 call(["apt", "update"]),
150 call(["apt", "install", "--yes", "--quiet", "ubuntu-advantage-tools"])
151 ]))144 ]))
145 self._assert_apt_calls()
152 self.assertEqual(self.harness.charm._state.ppa, "ppa:ua-client/stable")146 self.assertEqual(self.harness.charm._state.ppa, "ppa:ua-client/stable")
153 self.assertFalse(self.harness.charm._state.package_needs_installing)147 self.assertFalse(self.harness.charm._state.package_needs_installing)
154148
@@ -165,30 +159,27 @@ class TestCharm(TestCase):
165159
166 def test_config_changed_ppa_unset(self):160 def test_config_changed_ppa_unset(self):
167 self.harness.update_config({"ppa": "ppa:ua-client/stable"})161 self.harness.update_config({"ppa": "ppa:ua-client/stable"})
168 self.assertEqual(self.mocks["check_call"].call_count, 6)162 self.assertEqual(self.mocks["check_call"].call_count, 3)
169 self.mocks["check_call"].assert_has_calls(self._add_ua_proxy_setup_calls([163 self.mocks["check_call"].assert_has_calls(self._add_ua_proxy_setup_calls([
170 call(["add-apt-repository", "--yes", "ppa:ua-client/stable"], env=self.env),164 call(["add-apt-repository", "--yes", "ppa:ua-client/stable"], env=self.env),
171 call(["apt", "remove", "--yes", "--quiet", "ubuntu-advantage-tools"]),
172 call(["apt", "update"]),
173 call(["apt", "install", "--yes", "--quiet", "ubuntu-advantage-tools"])
174 ]))165 ]))
166 self._assert_apt_calls()
175 self.assertEqual(self.harness.charm._state.ppa, "ppa:ua-client/stable")167 self.assertEqual(self.harness.charm._state.ppa, "ppa:ua-client/stable")
176 self.assertFalse(self.harness.charm._state.package_needs_installing)168 self.assertFalse(self.harness.charm._state.package_needs_installing)
177169
178 self.mocks["check_call"].reset_mock()170 self.mocks["check_call"].reset_mock()
179 self.mocks["check_output"].reset_mock()171 self.mocks["check_output"].reset_mock()
172 self.mocks["apt"].reset_mock()
180 self.mocks["check_output"].side_effect = [173 self.mocks["check_output"].side_effect = [
181 STATUS_DETACHED174 STATUS_DETACHED
182 ]175 ]
183 self.harness.update_config({"ppa": ""})176 self.harness.update_config({"ppa": ""})
184 self.assertEqual(self.mocks["check_call"].call_count, 6)177 self.assertEqual(self.mocks["check_call"].call_count, 3)
185 self.mocks["check_call"].assert_has_calls(self._add_ua_proxy_setup_calls([178 self.mocks["check_call"].assert_has_calls(self._add_ua_proxy_setup_calls([
186 call(["add-apt-repository", "--remove", "--yes", "ppa:ua-client/stable"],179 call(["add-apt-repository", "--remove", "--yes", "ppa:ua-client/stable"],
187 env=self.env),180 env=self.env),
188 call(["apt", "remove", "--yes", "--quiet", "ubuntu-advantage-tools"]),
189 call(["apt", "update"]),
190 call(["apt", "install", "--yes", "--quiet", "ubuntu-advantage-tools"])
191 ]))181 ]))
182 self._assert_apt_calls()
192 self.assertIsNone(self.harness.charm._state.ppa)183 self.assertIsNone(self.harness.charm._state.ppa)
193 self.assertFalse(self.harness.charm._state.package_needs_installing)184 self.assertFalse(self.harness.charm._state.package_needs_installing)
194185
@@ -232,14 +223,12 @@ class TestCharm(TestCase):
232 STATUS_ATTACHED223 STATUS_ATTACHED
233 ]224 ]
234 self.harness.update_config({"token": "test-token"})225 self.harness.update_config({"token": "test-token"})
235 self.assertEqual(self.mocks["check_call"].call_count, 6)226 self.assertEqual(self.mocks["check_call"].call_count, 3)
236 self.mocks["check_call"].assert_has_calls(self._add_ua_proxy_setup_calls([227 self.mocks["check_call"].assert_has_calls(self._add_ua_proxy_setup_calls([
237 call(["add-apt-repository", "--yes", "ppa:ua-client/stable"], env=self.env),228 call(["add-apt-repository", "--yes", "ppa:ua-client/stable"], env=self.env),
238 call(["apt", "remove", "--yes", "--quiet", "ubuntu-advantage-tools"]),
239 call(["apt", "update"]),
240 call(["apt", "install", "--yes", "--quiet", "ubuntu-advantage-tools"])
241 ]))229 ]))
242 self.mocks["open"].assert_called_with("/etc/ubuntu-advantage/uaclient.conf", "r+")230 self.mocks["open"].assert_called_with("/etc/ubuntu-advantage/uaclient.conf", "r+")
231 self._assert_apt_calls()
243 handle = self.mocks["open"]()232 handle = self.mocks["open"]()
244 expected = dedent("""\233 expected = dedent("""\
245 contract_url: https://contracts.canonical.com234 contract_url: https://contracts.canonical.com
@@ -505,3 +494,12 @@ class TestCharm(TestCase):
505 ),494 ),
506 ]495 ]
507 return call_list + proxy_calls if append else proxy_calls + call_list496 return call_list + proxy_calls if append else proxy_calls + call_list
497
498 def _assert_apt_calls(self):
499 """Helper to run the assertions for apt install/remove"""
500 self.mocks["apt"].remove_package.assert_called_once_with(
501 "ubuntu-advantage-tools"
502 )
503 self.mocks["apt"].add_package.assert_called_once_with(
504 "ubuntu-advantage-tools", update_cache=True
505 )

Subscribers

People subscribed via source and target branches