Server : Apache/2.4.41 (Ubuntu) System : Linux journalup 5.4.0-198-generic #218-Ubuntu SMP Fri Sep 27 20:18:53 UTC 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.33 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, Directory : /usr/bin/ |
#!/usr/bin/python3 # Copyright (c) 2005-2018 Canonical Ltd # # AUTHOR: # Michael Vogt <[email protected]> # Balint Reczey <[email protected]> # This file is part of unattended-upgrades # # unattended-upgrades is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as published # by the Free Software Foundation; either version 2 of the License, or (at # your option) any later version. # # unattended-upgrades is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with unattended-upgrades; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # import atexit import copy import datetime import errno import email.charset import fcntl import fnmatch import gettext try: from gi.repository.Gio import NetworkMonitor except ImportError: pass import grp import io import locale import logging import logging.handlers import re import os import select import signal import socket import string import subprocess import sys import syslog try: from typing import AbstractSet, cast, DefaultDict, Dict, Iterable, List AbstractSet # pyflakes DefaultDict # pyflakes Dict # pyflakes Iterable # pyflakes List # pyflakes from typing import Set, Tuple, Union Set # pyflakes Tuple # pyflakes Union # pyflakes except ImportError: pass from collections import defaultdict, namedtuple from datetime import date from email.message import Message from gettext import gettext as _ from io import StringIO from optparse import ( OptionParser, SUPPRESS_HELP, ) from subprocess import ( Popen, PIPE, ) from textwrap import wrap import apt import apt_inst import apt_pkg import distro_info # the reboot required flag file used by packages REBOOT_REQUIRED_FILE = "/var/run/reboot-required" KEPT_PACKAGES_FILE = "var/lib/unattended-upgrades/kept-back" MAIL_BINARY = "/usr/bin/mail" SENDMAIL_BINARY = "/usr/sbin/sendmail" USERS = "/usr/bin/users" # no py3 lsb_release in debian :/ DISTRO_CODENAME = subprocess.check_output( ["lsb_release", "-c", "-s"], universal_newlines=True).strip() # type: str DISTRO_DESC = subprocess.check_output( ["lsb_release", "-d", "-s"], universal_newlines=True).strip() # type: str DISTRO_ID = subprocess.check_output( ["lsb_release", "-i", "-s"], universal_newlines=True).strip() # type: str # Number of days before release of devel where we enable unattended # upgrades. DEVEL_UNTIL_RELEASE = datetime.timedelta(days=21) # progress information is written here PROGRESS_LOG = "/var/run/unattended-upgrades.progress" PID_FILE = "/var/run/unattended-upgrades.pid" LOCK_FILE = "/var/run/unattended-upgrades.lock" # set from the sigint signal handler SIGNAL_STOP_REQUEST = False # messages to be logged only once logged_msgs = set() # type: AbstractSet[str] NEVER_PIN = -32768 class LoggingDateTime: """The date/time representation for the dpkg log file timestamps""" LOG_DATE_TIME_FMT = "%Y-%m-%d %H:%M:%S" @classmethod def as_string(cls): # type: () -> str """Return the current date and time as LOG_DATE_TIME_FMT string""" return datetime.datetime.now().strftime(cls.LOG_DATE_TIME_FMT) @classmethod def from_string(cls, logstr): # type: (str) -> datetime.datetime """Take a LOG_DATE_TIME_FMT string and return datetime object""" return datetime.datetime.strptime(logstr, cls.LOG_DATE_TIME_FMT) class UnknownMatcherError(ValueError): pass class NoAllowedOriginError(ValueError): pass PkgPin = namedtuple('PkgPin', ['pkg', 'priority']) PkgFilePin = namedtuple('PkgFilePin', ['id', 'priority']) class UnattendedUpgradesCache(apt.Cache): def __init__(self, rootdir): # type: (str) -> None self._cached_candidate_pkgnames = set() # type: Set[str] self.allowed_origins = get_allowed_origins() logging.info(_("Allowed origins are: %s"), ", ".join(self.allowed_origins)) self.blacklist = apt_pkg.config.value_list( "Unattended-Upgrade::Package-Blacklist") logging.info(_("Initial blacklist: %s"), " ".join(self.blacklist)) self.whitelist = apt_pkg.config.value_list( "Unattended-Upgrade::Package-Whitelist") self.strict_whitelist = apt_pkg.config.find_b( "Unattended-Upgrade::Package-Whitelist-Strict", False) logging.info(_("Initial whitelist (%s): %s"), "strict" if self.strict_whitelist else "not strict", " ".join(self.whitelist)) apt.Cache.__init__(self, rootdir=rootdir) # pre-heat lazy-loaded modules to avoid crash on python upgrade datetime.datetime.strptime("", "") # generate versioned_kernel_pkgs_regexp for later use self.versioned_kernel_pkgs_regexp = versioned_kernel_pkgs_regexp() self.running_kernel_pkgs_regexp = running_kernel_pkgs_regexp() if self.versioned_kernel_pkgs_regexp: logging.debug("Using %s regexp to find kernel packages", self.versioned_kernel_pkgs_regexp.pattern) else: logging.debug("APT::VersionedKernelPackages is not set") if self.running_kernel_pkgs_regexp: logging.debug("Using %s regexp to find running kernel packages", self.running_kernel_pkgs_regexp.pattern) def find_better_version(self, pkg): # type (apt.Package) -> apt.package.Version if pkg.is_installed and pkg.versions[0] > pkg.installed: logging.debug( "Package %s has a higher version available, checking if it is " "from an allowed origin and is not pinned down.", pkg.name) for v in pkg.versions: if pkg.installed < v \ and pkg.installed.policy_priority <= v.policy_priority \ and is_in_allowed_origin(v, self.allowed_origins): return v return None def find_kept_packages(self, dry_run): # type: (bool) -> KeptPkgs """ Find kept packages not collected already """ kept_packages = KeptPkgs(set) if dry_run: logging.info(_("The list of kept packages can't be calculated in " "dry-run mode.")) return kept_packages for pkg in self: better_version = self.find_better_version(pkg) if better_version: logging.info(self.kept_package_excuse(pkg._pkg, self.blacklist, self.whitelist, self.strict_whitelist, better_version)) kept_packages.add(pkg, better_version, self) return kept_packages def kept_package_excuse(self, pkg, # apt.Package blacklist, # type: List[str] whitelist, # type: List[str] strict_whitelist, # type: bool better_version # type: apt.package.Version ): # type: (...) -> str """ Log the excuse the package is kept back for """ if pkg.selected_state == apt_pkg.SELSTATE_HOLD: return _("Package %s is marked to be held back.") % pkg.name elif is_pkgname_in_blacklist(pkg.name, blacklist): return _("Package %s is blacklisted.") % pkg.name elif whitelist: if strict_whitelist: if not is_pkgname_in_whitelist(pkg.name, whitelist): return (_( "Package %s is not on the strict whitelist.") % pkg.name) else: if not is_pkgname_in_whitelist(pkg.name, whitelist): return (_( "Package %s is not whitelisted and it is not a" " dependency of a whitelisted package.") % pkg.name) elif not any([o.trusted for o in better_version.origins]): return _("Package %s's origin is not trusted.") % pkg.name return (_("Package %s is kept back because a related package" " is kept back or due to local apt_preferences(5).") % pkg.name) def pinning_from_regex_list(self, regexps, priority): # type: (List[str], int) -> List[PkgPin] """ Represent blacklist as Python regexps as list of pkg pinnings""" pins = [] # type: List[PkgPin] for regex in regexps: if python_regex_is_posix(regex): pins.append(PkgPin('/^' + regex + '/', priority)) else: # Python regex is not also an equivalent POSIX regexp. # This is expected to be rare. Go through all the package names # and pin all the matching ones. for pkg in self._cache.packages: if re.match(regex, pkg.name): pins.append(PkgPin(pkg.name, priority)) return pins def pinning_from_config(self): # type: () -> List[Union[PkgPin, PkgFilePin]] """ Represent configuration as list of pinnings Assumes self.allowed_origins to be already set. """ pins = [] # type: List[Union[PkgPin, PkgFilePin]] # mark not allowed origins with 'never' pin for pkg_file in self._cache.file_list: # type: ignore if not is_allowed_origin(pkg_file, self.allowed_origins): # Set the magic 'never' pin on not allowed origins logging.debug("Marking not allowed %s with %s pin", pkg_file, NEVER_PIN) pins.append(PkgFilePin(pkg_file.id, NEVER_PIN)) # TODO(rbalint) pin not trusted origins with NEVER_PIN elif self.strict_whitelist: # set even allowed origins to -1 and set individual package # priorities up later pins.append(PkgFilePin(pkg_file.id, -1)) # mark blacklisted packages with 'never' pin pins.extend(self.pinning_from_regex_list( # type: ignore self.blacklist, NEVER_PIN)) # set priority of whitelisted packages to high pins.extend(self.pinning_from_regex_list( # type: ignore self.whitelist, 900)) if self.strict_whitelist: policy = self._depcache.policy # pin down already pinned packages which are not on the whitelist # to not install locally pinned up packages accidentally for pkg in self._cache.packages: if pkg.has_versions: pkg_ver = policy.get_candidate_ver(pkg) # type: ignore if pkg_ver is not None \ and policy.get_priority(pkg_ver) > -1: # the pin is higher than set for allowed origins, thus # there is extra pinning configuration if not is_pkgname_in_whitelist(pkg.name, self.whitelist): pins.append(PkgPin(pkg.name, NEVER_PIN)) return pins def apply_pinning(self, pins): # type: (List[Union[PkgPin, PkgFilePin]]) -> None """ Apply the list of pins """ policy = self._depcache.policy pkg_files = {f.id: f for f in self._cache.file_list} # type: ignore for pin in pins: logging.debug("Applying pinning: %s" % str(pin)) if isinstance(pin, PkgPin): policy.create_pin('Version', pin.pkg, '*', # type: ignore pin.priority) elif isinstance(pin, PkgFilePin): logging.debug("Applying pin %s to package_file: %s" % (pin.priority, str(pkg_files[pin.id]))) policy.set_priority(pkg_files[pin.id], # type: ignore pin.priority) def open(self, progress=None): apt.Cache.open(self, progress) # apply pinning generated from unattended-upgrades configuration self.apply_pinning(self.pinning_from_config()) def adjust_candidate(self, pkg): # type: (apt.Package) -> bool """ Adjust origin and return True if adjustment took place This is needed when e.g. a package is available in the security pocket but there is also a package in the updates pocket with a higher version number """ try: new_cand = ver_in_allowed_origin(pkg, self.allowed_origins) # Only adjust to lower versions to avoid flipping back and forth # and to avoid picking a newer version, not selected by apt. # This helps avoiding upgrades to experimental's packages. if pkg.candidate is not None and new_cand < pkg.candidate: logging.debug("adjusting candidate version: %s" % new_cand) pkg.candidate = new_cand return True else: return False except NoAllowedOriginError: return False def call_checked(self, function, pkg, **kwargs): """ Call function and check if package is in the wanted state """ try: function(pkg, **kwargs) except SystemError as e: logging.warning( _("package %s upgradable but fails to " "be marked for upgrade (%s)"), pkg.name, e) self.clear() return False return ((function == apt.package.Package.mark_upgrade or function == apt.package.Package.mark_install) and (pkg.marked_upgrade or pkg.marked_install)) def call_adjusted(self, function, pkg, **kwargs): """Call function, but with adjusting packages in changes to come from allowed origins Note that as a side effect more package's candidate can be adjusted than only the one's in the final changes set. """ new_pkgs_to_adjust = [] # List[str] if not is_pkg_change_allowed(pkg, self.blacklist, self.whitelist, self.strict_whitelist): return if function == apt.package.Package.mark_upgrade \ and not pkg.is_upgradable: if not apt_pkg.config.find_b("Unattended-Upgrade::Allow-downgrade", False): return else: function = apt.package.Package.mark_install marking_succeeded = self.call_checked(function, pkg, **kwargs) if not marking_succeeded or \ not check_changes_for_sanity(self, desired_pkg=pkg): logging.debug("falling back to adjusting %s's dependencies" % pkg.name) self.clear() # adjust candidates in advance if needed for pkg_name in self._cached_candidate_pkgnames: self.adjust_candidate(self[pkg_name]) self.adjust_candidate(pkg) for dep in transitive_dependencies(pkg, self, level=1): try: self.adjust_candidate(self[dep]) except KeyError: pass self.call_checked(function, pkg, **kwargs) for marked_pkg in self.get_changes(): if marked_pkg.name in self._cached_candidate_pkgnames: continue if not is_in_allowed_origin(marked_pkg.candidate, self.allowed_origins): try: ver_in_allowed_origin(marked_pkg, self.allowed_origins) # important! this avoids downgrades below if pkg.is_installed and not pkg.is_upgradable and \ apt_pkg.config.find_b("Unattended-Upgrade::Allow-" "downgrade", False): continue new_pkgs_to_adjust.append(marked_pkg) except NoAllowedOriginError: pass if new_pkgs_to_adjust: new_pkg_adjusted = False for pkg_to_adjust in new_pkgs_to_adjust: if self.adjust_candidate(pkg_to_adjust): self._cached_candidate_pkgnames.add(pkg_to_adjust.name) new_pkg_adjusted = True if new_pkg_adjusted: self.call_adjusted(function, pkg, **kwargs) def mark_upgrade_adjusted(self, pkg, **kwargs): self.call_adjusted(apt.package.Package.mark_upgrade, pkg, **kwargs) def mark_install_adjusted(self, pkg, **kwargs): self.call_adjusted(apt.package.Package.mark_install, pkg, **kwargs) class LogInstallProgress(apt.progress.base.InstallProgress): """ Install progress that writes to self.progress_log (/var/run/unattended-upgrades.progress by default) """ def __init__(self, logfile_dpkg, verbose=False, progress_log="var/run/unattended-upgrades.progress"): # type: (str, bool, str) -> None apt.progress.base.InstallProgress.__init__(self) self.logfile_dpkg = logfile_dpkg self.progress_log = os.path.join(apt_pkg.config.find_dir("Dir"), progress_log) self.verbose = verbose self.output_logfd = None # type: int def status_change(self, pkg, percent, status): # type: (str, float, str) -> None with open(self.progress_log, "w") as f: f.write(_("Progress: %s %% (%s)") % (percent, pkg)) def _fixup_fds(self): # () -> None required_fds = [0, 1, 2, # stdin, stdout, stderr self.writefd, self.write_stream.fileno(), self.statusfd, self.status_stream.fileno() ] # ensure that our required fds close on exec for fd in required_fds[3:]: old_flags = fcntl.fcntl(fd, fcntl.F_GETFD) fcntl.fcntl(fd, fcntl.F_SETFD, old_flags | fcntl.FD_CLOEXEC) # close all fds proc_fd = "/proc/self/fd" if os.path.exists(proc_fd): error_count = 0 for fdname in os.listdir(proc_fd): try: fd = int(fdname) except Exception: print("ERROR: can not get fd for %s" % fdname) if fd in required_fds: continue try: os.close(fd) # print("closed: ", fd) except OSError as e: # there will be one fd that can not be closed # as its the fd from pythons internal diropen() # so its ok to ignore one close error error_count += 1 if error_count > 1: print("ERROR: os.close(%s): %s" % (fd, e)) def _redirect_stdin(self): # type: () -> None REDIRECT_INPUT = os.devnull fd = os.open(REDIRECT_INPUT, os.O_RDWR) os.dup2(fd, 0) def _redirect_output(self): # type: () -> None # do not create log in dry-run mode, just output to stdout/stderr if not apt_pkg.config.find_b("Debug::pkgDPkgPM", False): logfd = self._get_logfile_dpkg_fd() os.dup2(logfd, 1) os.dup2(logfd, 2) def _get_logfile_dpkg_fd(self): # type: () -> int logfd = os.open( self.logfile_dpkg, os.O_RDWR | os.O_APPEND | os.O_CREAT, 0o640) try: adm_gid = grp.getgrnam("adm").gr_gid os.fchown(logfd, 0, adm_gid) except (KeyError, OSError): pass return logfd def update_interface(self): # type: () -> None # call super class first apt.progress.base.InstallProgress.update_interface(self) self._do_verbose_output_if_needed() def _do_verbose_output_if_needed(self): # type: () -> None # if we are in debug mode, nothing to be more verbose about if apt_pkg.config.find_b("Debug::pkgDPkgPM", False): return # handle verbose if self.verbose: if self.output_logfd is None: self.output_logfd = os.open(self.logfile_dpkg, os.O_RDONLY) os.lseek(self.output_logfd, 0, os.SEEK_END) try: select.select([self.output_logfd], [], [], 0) # FIXME: this should be OSError, but in py2.7 it is still # select.error except select.error as e: if e.errno != errno.EINTR: # type: ignore logging.exception("select failed") # output to stdout in verbose mode only os.write(1, os.read(self.output_logfd, 1024)) def _log_in_dpkg_log(self, msg): # type: (str) -> None logfd = self._get_logfile_dpkg_fd() os.write(logfd, msg.encode("utf-8")) os.close(logfd) def finish_update(self): # type: () -> None self._log_in_dpkg_log("Log ended: %s\n\n" % LoggingDateTime.as_string()) def fork(self): # type: () -> int self._log_in_dpkg_log("Log started: %s\n" % LoggingDateTime.as_string()) pid = os.fork() if pid == 0: self._fixup_fds() self._redirect_stdin() self._redirect_output() return pid class Unlocked: """ Context manager for unlocking the apt lock while cache.commit() is run """ def __enter__(self): # type: () -> None try: apt_pkg.pkgsystem_unlock_inner() except Exception: # earlier python-apt used to leak lock logging.warning("apt_pkg.pkgsystem_unlock() failed due to not " "holding the lock but trying to continue") pass def __exit__(self, exc_type, exc_value, exc_tb): # type: (object, object, object) -> None apt_pkg.pkgsystem_lock_inner() class KeptPkgs(defaultdict): """ Packages to keep by highest allowed pretty-printed origin """ def add(self, pkg, # type: apt.Package version, # type: apt.package.Version cache # type: UnattendedUpgradesCache ): # type: (...) -> None for origin in version.origins: if is_allowed_origin(origin, cache.allowed_origins): self[origin.origin + " " + origin.archive].add(pkg.name) return class UnattendedUpgradesResult: """ Represent the (potentially partial) results of an unattended-upgrades run """ def __init__(self, success, # type: bool result_str="", # type: str pkgs=[], # type: List[str] pkgs_kept_back=KeptPkgs(set), # type: KeptPkgs pkgs_removed=[], # type: List[str] pkgs_kept_installed=[], # type: List[str] update_stamp=False # type: bool ): # type: (...) -> None self.success = success self.result_str = result_str self.pkgs = pkgs self.pkgs_kept_back = pkgs_kept_back self.pkgs_removed = pkgs_removed self.pkgs_kept_installed = pkgs_kept_installed self.update_stamp = update_stamp def is_dpkg_journal_dirty(): # type: () -> bool """ Return True if the dpkg journal is dirty (similar to debSystem::CheckUpdates) """ d = os.path.join( os.path.dirname(apt_pkg.config.find_file("Dir::State::status")), "updates") for f in os.listdir(d): if re.match("[0-9]+", f): return True return False def signal_handler(signal, frame): # type: (int, object) -> None logging.warning("SIGTERM received, will stop") global SIGNAL_STOP_REQUEST SIGNAL_STOP_REQUEST = True def log_once(msg): # type: (str) -> None global logged_msgs if msg not in logged_msgs: logging.info(msg) logged_msgs.add(msg) # type: ignore def should_stop(): # type: () -> bool """ Return True if u-u needs to stop due to signal received or due to the system started to run on battery. """ if SIGNAL_STOP_REQUEST: logging.warning("SIGNAL received, stopping") return True try: if apt_pkg.config.find_b("Unattended-Upgrade::OnlyOnACPower", True) \ and subprocess.call("on_ac_power") == 1: logging.warning("System is on battery power, stopping") return True except FileNotFoundError: log_once( _("Checking if system is running on battery is skipped. Please " "install powermgmt-base package to check power status and skip " "installing updates when the system is running on battery.")) if apt_pkg.config.find_b( "Unattended-Upgrade::Skip-Updates-On-Metered-Connections", True): try: if NetworkMonitor.get_network_metered( NetworkMonitor.get_default()): logging.warning(_("System is on metered connection, stopping")) return True except NameError: log_once(_("Checking if connection is metered is skipped. Please " "install python3-gi package to detect metered " "connections and skip downloading updates.")) return False def substitute(line): # type: (str) -> str """ substitude known mappings and return a new string Currently supported ${distro-release} """ mapping = {"distro_codename": get_distro_codename(), "distro_id": get_distro_id()} return string.Template(line).substitute(mapping) def get_distro_codename(): # type: () -> str return DISTRO_CODENAME def get_distro_id(): # type: () -> str return DISTRO_ID def versioned_kernel_pkgs_regexp(): apt_versioned_kernel_pkgs = apt_pkg.config.value_list( "APT::VersionedKernelPackages") if apt_versioned_kernel_pkgs: return re.compile("(" + "|".join( ["^" + p + "-[1-9][0-9]*\\.[0-9]+\\.[0-9]+-[0-9]+(-.+)?$" for p in apt_versioned_kernel_pkgs]) + ")") else: return None def running_kernel_pkgs_regexp(): apt_versioned_kernel_pkgs = apt_pkg.config.value_list( "APT::VersionedKernelPackages") if apt_versioned_kernel_pkgs: running_kernel_version = subprocess.check_output( ["uname", "-r"], universal_newlines=True).rstrip() kernel_escaped = re.escape(running_kernel_version) try: kernel_noflavor_escaped = re.escape( re.match("[1-9][0-9]*\\.[0-9]+\\.[0-9]+-[0-9]+", running_kernel_version)[0]) return re.compile("(" + "|".join( [("^" + p + "-" + kernel_escaped + "$|^" + p + "-" + kernel_noflavor_escaped + "$") for p in apt_versioned_kernel_pkgs]) + ")") except TypeError: # flavor could not be cut from version return re.compile("(" + "|".join( [("^" + p + "-" + kernel_escaped + "$") for p in apt_versioned_kernel_pkgs]) + ")") else: return None def get_allowed_origins_legacy(): # type: () -> List[str] """ legacy support for old Allowed-Origins var """ allowed_origins = [] # type: List[str] key = "Unattended-Upgrade::Allowed-Origins" try: for s in apt_pkg.config.value_list(key): # if there is a ":" use that as seperator, else use spaces if re.findall(r'(?<!\\):', s): (distro_id, distro_codename) = re.split(r'(?<!\\):', s) else: (distro_id, distro_codename) = s.split() # unescape "\:" back to ":" distro_id = re.sub(r'\\:', ':', distro_id) # escape "," (see LP: #824856) - can this be simpler? distro_id = re.sub(r'([^\\]),', r'\1\\,', distro_id) distro_codename = re.sub(r'([^\\]),', r'\1\\,', distro_codename) # convert to new format allowed_origins.append("o=%s,a=%s" % (substitute(distro_id), substitute(distro_codename))) except ValueError: logging.error(_("Unable to parse %s." % key)) raise return allowed_origins def get_allowed_origins(): # type: () -> List[str] """ return a list of allowed origins from apt.conf This will take substitutions (like distro_id) into account. """ allowed_origins = get_allowed_origins_legacy() key = "Unattended-Upgrade::Origins-Pattern" try: for s in apt_pkg.config.value_list(key): allowed_origins.append(substitute(s)) except ValueError: logging.error(_("Unable to parse %s." % key)) raise return allowed_origins def match_whitelist_string(whitelist, origin): # type: (str, Union[apt.package.Origin, apt_pkg.PackageFile]) -> bool """ take a whitelist string in the form "origin=Debian,label=Debian-Security" and match against the given python-apt origin. A empty whitelist string never matches anything. """ whitelist = whitelist.strip() if whitelist == "": logging.warning("empty match string matches nothing") return False res = True # make "\," the html quote equivalent whitelist = whitelist.replace("\\,", "%2C") for token in whitelist.split(","): # strip and unquote the "," back (what, value) = [s.strip().replace("%2C", ",") for s in token.split("=")] # logging.debug("matching %s=%s against %s" % ( # what, value, origin)) # support substitution here as well value = substitute(value) # first char is apt-cache policy output, send is the name # in the Release file if what in ("o", "origin"): match = fnmatch.fnmatch(origin.origin, value) elif what in ("l", "label"): match = fnmatch.fnmatch(origin.label, value) elif what in ("a", "suite", "archive"): match = fnmatch.fnmatch(origin.archive, value) elif what in ("c", "component"): match = fnmatch.fnmatch(origin.component, value) elif what in ("site",): match = fnmatch.fnmatch(origin.site, value) elif what in ("n", "codename",): match = fnmatch.fnmatch(origin.codename, value) else: raise UnknownMatcherError( "Unknown whitelist entry for matcher %s (token %s)" % ( what, token)) # update res res = res and match # logging.debug("matching %s=%s against %s" % ( # what, value, origin)) return res def python_regex_is_posix(expression): # type: (str) -> bool """ Returns if the Python regex is also an equivalent POSIX regex """ return re.match("^[a-zA-Z0-9\\^\\$\\+\\.]*$", expression) is not None def cache_commit(cache, # type: apt.Cache logfile_dpkg, # type: str verbose, # type: bool iprogress=None, # type: apt.progress.base.InstallProgress ): # type: (...) -> Tuple[bool, Exception] """Commit the changes from the given cache to the system""" error = None res = False if iprogress is None: iprogress = LogInstallProgress(logfile_dpkg, verbose) try: res = cache.commit(install_progress=iprogress) cache.open() except SystemError as e: error = e if verbose: logging.exception("Exception happened during upgrade.") cache.clear() return res, error def upgrade_normal(cache, logfile_dpkg, verbose): # type: (apt.Cache, str, bool) -> bool res, error = cache_commit(cache, logfile_dpkg, verbose) if res: logging.info(_("All upgrades installed")) else: logging.error(_("Installing the upgrades failed!")) logging.error(_("error message: %s"), error) logging.error(_("dpkg returned a error! See %s for details"), logfile_dpkg) return res def upgrade_in_minimal_steps(cache, # type: UnattendedUpgradesCache pkgs_to_upgrade, # type: List[str] logfile_dpkg="", # type: str verbose=False, # type: bool ): # type: (...) -> bool install_log = LogInstallProgress(logfile_dpkg, verbose) res = True # to upgrade contains the package names to_upgrade = set(pkgs_to_upgrade) for pkgname in upgrade_order(to_upgrade, cache): # upgrade packages and dependencies in increasing expected size of # package sets to upgrade/install together if pkgname not in to_upgrade: # pkg is upgraded in a previous set continue if should_stop(): return False pkg = cache[pkgname] try: if pkg.is_upgradable \ or candidate_version_changed(pkg): cache.mark_upgrade_adjusted( pkg, from_user=not pkg.is_auto_installed) elif not pkg.is_installed: cache.mark_install_adjusted(pkg, from_user=False) else: continue except Exception as e: logging.warning( _("package %s upgradable but fails to " "be marked for upgrade (%s)"), pkgname, e) cache.clear() res = False continue # double check that we are not running into side effects like # what could have been caused LP: #1020680 if not check_changes_for_sanity(cache): logging.info("While building minimal partition: " "cache has not allowed changes") cache.clear() continue changes = [p.name for p in cache.get_changes()] if not changes: continue # write progress log information if len(pkgs_to_upgrade) > 0: all_count = len(pkgs_to_upgrade) remaining_count = all_count - len(to_upgrade) percent = remaining_count / float(all_count * 100.0) else: percent = 100.0 install_log.status_change(pkg=",".join(changes), percent=percent, status="") # apply changes logging.debug("applying set %s" % changes) res, error = cache_commit(cache, logfile_dpkg, verbose, install_log) if error: if verbose: logging.exception("Exception happened during upgrade.") logging.error(_("Installing the upgrades failed!")) logging.error(_("error message: %s"), error) logging.error(_("dpkg returned a error! See %s for details"), logfile_dpkg) return False to_upgrade = to_upgrade - set(changes) logging.debug("left to upgrade %s" % to_upgrade) if len(to_upgrade) == 0: logging.info(_("All upgrades installed")) break return res def is_allowed_origin(origin, allowed_origins): # type: (Union[apt.package.Origin, apt_pkg.PackageFile], List[str]) -> bool # local origin is allowed by default if origin.component == 'now' and origin.archive == 'now' and \ not origin.label and not origin.site: return True for allowed in allowed_origins: if match_whitelist_string(allowed, origin): return True return False def is_in_allowed_origin(ver, allowed_origins): # type: (apt.package.Version, List[str]) -> bool if not ver: return False for origin in ver.origins: if is_allowed_origin(origin, allowed_origins): return True return False def ver_in_allowed_origin(pkg, allowed_origins): # type: (apt.Package, List[str]) -> apt.package.Version for ver in pkg.versions: if is_in_allowed_origin(ver, allowed_origins): # leave as soon as we have the highest new candidate return ver raise NoAllowedOriginError() def is_pkgname_in_blacklist(pkgname, blacklist): # type: (str, List[str]) -> bool for blacklist_regexp in blacklist: if re.match(blacklist_regexp, pkgname): logging.debug("skipping blacklisted package %s" % pkgname) return True return False def is_pkgname_in_whitelist(pkgname, whitelist): # type: (str, List[str]) -> bool # a empty whitelist means the user does not want to use this feature if not whitelist: return True for whitelist_regexp in whitelist: if re.match(whitelist_regexp, pkgname): logging.debug("only upgrading the following package %s" % pkgname) return True return False def is_pkg_change_allowed(pkg, blacklist, whitelist, strict_whitelist): # type: (apt.Package, List[str], List[str], bool) -> bool if is_pkgname_in_blacklist(pkg.name, blacklist): logging.debug("pkg %s package has been blacklisted" % pkg.name) return False # a strict whitelist will not allow any changes not in the # whitelist, most people will want the relaxed whitelist # that whitelists a package but pulls in the package # dependencies if strict_whitelist and \ not is_pkgname_in_whitelist(pkg.name, whitelist): logging.debug("pkg %s package is not whitelisted" % pkg.name) return False if pkg._pkg.selected_state == apt_pkg.SELSTATE_HOLD: logging.debug("pkg %s is on hold" % pkg.name) return False return True def transitive_dependencies(pkg, # type: apt.Package cache, # type: apt.Cache acc=set(), # type AbstractSet[str] valid_types=None, # type: AbstractSet[str] level=None # type: int ): # type (...) -> AbstractSet[str] """ All (transitive) dependencies of the package Note that alternative (|) dependencies are collected, too """ if not pkg.candidate or level is not None and level < 1: return acc for dep in pkg.candidate.dependencies: for base_dep in dep: if base_dep.name not in acc: if not valid_types or base_dep.rawtype in valid_types: acc.add(base_dep.name) try: transitive_dependencies( cache[base_dep.name], cache, acc, valid_types, level=(level - 1 if level is not None else None)) except KeyError: pass return acc def upgrade_order(to_upgrade, cache): # type: (AbstractSet[str], apt.Cache) -> List[str] """ Sort pkg names by the expected number of other packages to be upgraded with it. The calculation is not 100% accurate, it is an approximation. """ upgrade_set_sizes = {} # calculate upgrade sets follow_deps = {'Depends', 'PreDepends', 'Recommends'} for pkgname in to_upgrade: pkg = cache[pkgname] upgrade_set_sizes[pkgname] = len(transitive_dependencies( pkg, cache, valid_types=follow_deps).intersection(to_upgrade)) return sorted(upgrade_set_sizes, key=upgrade_set_sizes.get) def check_changes_for_sanity(cache, desired_pkg=None): # type: (UnattendedUpgradesCache, apt.Package) -> bool sanity_check_result = sanity_problem(cache, desired_pkg) if sanity_check_result is None: return True else: logging.debug("sanity check failed for: %s : %s" % (str({str(p.candidate) for p in cache.get_changes()}), sanity_check_result)) return False def sanity_problem(cache, desired_pkg): # type: (UnattendedUpgradesCache, apt.Package) -> str if cache._depcache.broken_count != 0: return ("there are broken packages in the cache") # If there are no packages to be installed they were kept back if cache.install_count == 0: return ("no package is selected to be upgraded or installed") changes = cache.get_changes() for pkg in changes: if pkg.marked_delete: return ("pkg %s is marked to be deleted" % pkg.name) if pkg.marked_install or pkg.marked_upgrade: # apt will never fallback from a trusted to a untrusted # origin so its good enough if we have a single trusted one if not any([o.trusted for o in pkg.candidate.origins]): return ("pkg %s is not from a trusted origin" % pkg.name) if not is_in_allowed_origin(pkg.candidate, cache.allowed_origins): return ("pkg %s is not in an allowed origin" % pkg.name) if not is_pkg_change_allowed(pkg, cache.blacklist, cache.whitelist, cache.strict_whitelist): return ("pkg %s is blacklisted or is not whitelisted" % pkg.name) # check if the package is unsafe to upgrade unattended ignore_require_restart = apt_pkg.config.find_b( "Unattended-Upgrade::IgnoreAppsRequireRestart", False) upgrade_requires = pkg.candidate.record.get("Upgrade-Requires") if pkg.marked_upgrade and ignore_require_restart is False \ and upgrade_requires == "app-restart": return ("pkg %s requires app-restart, it is not safe to " "upgrade it unattended") # check that the package we want to upgrade is in the change set if desired_pkg and desired_pkg not in changes: return ("pkg %s to be marked for upgrade/install is not marked " "accordingly" % desired_pkg.name) return None def is_deb(file): # type: (str) -> bool if file.endswith(".deb"): return True else: return False def pkgname_from_deb(debfile): # type: (str) -> str # FIXME: add error checking here try: control = apt_inst.DebFile(debfile).control.extractdata("control") sections = apt_pkg.TagSection(control) return sections["Package"] except (IOError, SystemError) as e: logging.error("failed to read deb file %s (%s)" % (debfile, e)) # dumb fallback return debfile.split("_")[0] def get_md5sum_for_file_in_deb(deb_file, conf_file): # type: (str, str) -> str dpkg_cmd = ["dpkg-deb", "--fsys-tarfile", deb_file] tar_cmd = ["tar", "-x", "-O", "-f", "-", "." + conf_file] md5_cmd = ["md5sum"] dpkg_p = Popen(dpkg_cmd, stdout=PIPE) tar_p = Popen(tar_cmd, stdin=dpkg_p.stdout, stdout=PIPE, universal_newlines=True) md5_p = Popen(md5_cmd, stdin=tar_p.stdout, stdout=PIPE, universal_newlines=True) pkg_md5sum = md5_p.communicate()[0].split()[0] for __p in [dpkg_p, tar_p, md5_p]: p = cast(Popen, __p) p.stdout.close() p.wait() return pkg_md5sum def get_md5sum_for_file_installed(conf_file, prefix): # type: (str, str) -> str try: with open(prefix + conf_file, 'rb') as fb: for hash_string in apt_pkg.Hashes(fb).hashes: # type: ignore if hash_string.hashtype == 'MD5Sum': return hash_string.hashvalue return None except IsADirectoryError: # the package replaces a directory wih a configuration file # # if the package changed this way it is safe to assume that # the transition happens without showing a prompt but if the admin # created the directory the admin will need to resolve it after # being notified about the unexpected prompt logging.debug("found conffile %s is a directory on the system " % conf_file) return "dir" except FileNotFoundError: # if the local file got deleted by the admin thats ok but it may still # trigger a conffile promp (see debian #788049) logging.debug("conffile %s in missing on the system" % conf_file) return "" def map_conf_file(conf_file, conffiles): # type: (str, Union[AbstractSet[str], Dict[str, str]]) -> str """Find respective conffile in a set of conffiles with some heuristics """ if conf_file in conffiles: return conf_file elif os.path.join(conf_file, os.path.basename(conf_file)) in conffiles: # new /etc/foo may be old /etc/foo/foo, like in LP: #1822745 return os.path.join(conf_file, os.path.basename(conf_file)) elif os.path.dirname(conf_file) in conffiles: # new /etc/foo/foo may be old /etc/foo, probably by accident return os.path.dirname(conf_file) # TODO: peek into package's dpkg-maintscript-helper mv_conffile usage else: return None # prefix is *only* needed for the build-in tests def conffile_prompt(destFile, prefix=""): # type: (str, str) -> bool logging.debug("check_conffile_prompt(%s)" % destFile) pkgname = pkgname_from_deb(destFile) # get the conffiles for the /var/lib/dpkg/status file status_file = apt_pkg.config.find("Dir::State::status") with open(status_file, "r") as f: tagfile = apt_pkg.TagFile(f) conffiles = "" for section in tagfile: if section.get("Package") == pkgname: logging.debug("found pkg: %s" % pkgname) if "Conffiles" in section: conffiles = section.get("Conffiles") break # get conffile value from pkg, its ok if the new version # does not have conffiles anymore pkg_conffiles = set() # type: AbstractSet[str] try: deb = apt_inst.DebFile(destFile) pkg_conffiles = set(deb.control.extractdata( "conffiles").strip().decode("utf-8").split("\n")) except SystemError as e: print(_("Apt returned an error, exiting")) print(_("error message: %s") % e) logging.error(_("Apt returned an error, exiting")) logging.error(_("error message: %s"), e) raise except LookupError as e: logging.debug("No conffiles in deb %s (%s)" % (destFile, e)) if not pkg_conffiles: return False # Conffiles: # /etc/bash_completion.d/m-a c7780fab6b14d75ca54e11e992a6c11c dpkg_status_conffiles = {} for line in conffiles.splitlines(): # ignore empty lines line = line.strip() if not line: continue # show what we do logging.debug("conffile line: %s", line) li = line.split() conf_file = li[0] md5 = li[1] if len(li) > 2: obs = li[2] else: obs = None # ignore if conffile is obsolete if obs == "obsolete": continue # ignore state "newconffile" until its clearer if there # might be a dpkg prompt (LP: #936870) if md5 == "newconffile": continue new_conf_file = map_conf_file(conf_file, pkg_conffiles) if not new_conf_file: logging.debug("%s not in package conffiles %s" % ( conf_file, pkg_conffiles)) continue # record for later dpkg_status_conffiles[conf_file] = md5 # test against the installed file, if the local file got deleted # by the admin thats ok but it may still trigger a conffile prompt # (see debian #788049) current_md5 = get_md5sum_for_file_installed(conf_file, prefix) logging.debug("current md5: %s" % current_md5) # hashes are the same, no conffile prompt if current_md5 == md5: continue # calculate md5sum from the deb (may take a bit) pkg_md5sum = get_md5sum_for_file_in_deb(destFile, new_conf_file) logging.debug("pkg_md5sum: %s" % pkg_md5sum) # the md5sum in the deb is unchanged, this will not # trigger a conffile prompt if pkg_md5sum == md5: continue # if we made it to this point: # current_md5 != pkg_md5sum != md5 # and that will trigger a conffile prompt, we can # stop processing at this point and just return True return True # now check if there are conffiles in the pkg that where not there # in the previous version in the dpkg status file if pkg_conffiles: for conf_file in pkg_conffiles: old_conf_file = map_conf_file(conf_file, dpkg_status_conffiles) if not old_conf_file: pkg_md5sum = get_md5sum_for_file_in_deb(destFile, conf_file) current_md5 = get_md5sum_for_file_installed(conf_file, prefix) if current_md5 != "" and pkg_md5sum != current_md5: return True return False def dpkg_conffile_prompt(): # type: () -> bool if "DPkg::Options" not in apt_pkg.config: return True options = apt_pkg.config.value_list("DPkg::Options") for option in options: option = option.strip() if option in ["--force-confold", "--force-confnew"]: return False return True def rewind_cache(cache, pkgs_to_upgrade): # type: (UnattendedUpgradesCache, List[apt.Package]) -> None """ set the cache back to the state with packages_to_upgrade """ cache.clear() for pkg2 in pkgs_to_upgrade: cache.mark_install_adjusted(pkg2, from_user=not pkg2.is_auto_installed) if cache.broken_count > 0: raise AssertionError("rewind_cache created a broken cache") def host(): # type: () -> str return socket.getfqdn() def wrap_indent(t, subsequent_indent=" "): # type: (str, str) -> str return "\n".join(wrap(t, break_on_hyphens=False, subsequent_indent=subsequent_indent)) def setup_apt_listchanges(conf="/etc/apt/listchanges.conf"): # type: (str) -> None """ deal with apt-listchanges """ # apt-listchanges will always send a mail if there is a mail address # set in the config regardless of the frontend used, so set it to # mail if we have a sendmail and to none if not (as it appears to # not check if sendmail is there or not), debian bug #579733 if os.path.exists(SENDMAIL_BINARY): os.environ["APT_LISTCHANGES_FRONTEND"] = "mail" else: os.environ["APT_LISTCHANGES_FRONTEND"] = "none" def _send_mail_using_mailx(from_address, to_address, subject, body): # type: (str, str, str, str) -> int # ensure that the body is a byte stream and that we do not # break on encoding errors (the default error mode is "strict") encoded_body = body.encode( locale.getpreferredencoding(False), errors="replace") # we use a binary pipe to stdin to ensure we do not break on # unicode encoding errors (e.g. because the user is running a # ascii only system like the buildds) mail = subprocess.Popen( [MAIL_BINARY, "-r", from_address, "-s", subject, to_address], stdin=subprocess.PIPE, universal_newlines=False) mail.stdin.write(encoded_body) mail.stdin.close() ret = mail.wait() return ret def _send_mail_using_sendmail(from_address, to_address, subject, body): # type: (str, str, str, str) -> int # format as a proper mail msg = Message() msg['Subject'] = subject msg['From'] = from_address msg['To'] = to_address msg['Auto-Submitted'] = "auto-generated" # order is important here, Message() first, then Charset() # then msg.set_charset() charset = email.charset.Charset("utf-8") charset.body_encoding = email.charset.QP # type: ignore msg.set_payload(body, charset) # and send it away sendmail = subprocess.Popen( [SENDMAIL_BINARY, "-oi", "-t"], stdin=subprocess.PIPE, universal_newlines=True) sendmail.stdin.write(msg.as_string()) sendmail.stdin.close() ret = sendmail.wait() return ret def send_summary_mail(pkgs, # type: List[str] res, # type: bool result_str, # type: str pkgs_kept_back, # type: KeptPkgs pkgs_removed, # type: List[str] pkgs_kept_installed, # type: List[str] mem_log, # type: StringIO dpkg_log_content, # type: str ): # type: (...) -> None """ send mail (if configured in Unattended-Upgrade::Mail) """ to_email = apt_pkg.config.find("Unattended-Upgrade::Mail", "") if not to_email: return if not os.path.exists(MAIL_BINARY) and not os.path.exists(SENDMAIL_BINARY): logging.error(_("No /usr/bin/mail or /usr/sbin/sendmail, " "can not send mail. " "You probably want to install the mailx package.")) return # The admin may well wish to get a mail report regardless of what was done. # This is now set by Unattended-Upgrade::MailReport values of: # "always", "only-on-error" or "on-change" # (you can achieve "never" by not setting Unattended-Upgrade::Mail). # If this is not set, then set it using any legacy MailOnlyOnError # setting (default True) # mail_opt = apt_pkg.config.find("Unattended-Upgrade::MailReport") if (mail_opt == ""): # None set - map from legacy value if apt_pkg.config.find_b("Unattended-Upgrade::MailOnlyOnError", False): mail_opt = "only-on-error" else: mail_opt = "on-change" # if the operation was successful and the user has requested to get # mails only on errors, just exit here if (res and (mail_opt == "only-on-error")): return # if the run was successful but nothing had to be done skip sending email # unless the admin wants it anyway if (((mail_opt != "always") and res and not pkgs and not pkgs_kept_back and not pkgs_removed)): return # Check if reboot-required flag is present reboot_flag_str = _( "[reboot required]") if os.path.isfile(REBOOT_REQUIRED_FILE) else "" # Check if packages are kept on hold hold_flag_str = (_("[package on hold]") if pkgs_kept_back or pkgs_kept_installed else "") logging.debug("Sending mail to %s" % to_email) subject = _( "{hold_flag}{reboot_flag} unattended-upgrades result for " "{machine}: {result}").format( hold_flag=hold_flag_str, reboot_flag=reboot_flag_str, machine=host(), result="SUCCESS" if res else "FAILURE").strip() body = wrap_indent(_("Unattended upgrade result: %s") % result_str) body += "\n\n" if os.path.isfile(REBOOT_REQUIRED_FILE): body += _( "Warning: A reboot is required to complete this upgrade, " "or a previous one.\n\n") if pkgs: if res: body += _("Packages that were upgraded:\n") else: body += _("Packages that attempted to upgrade:\n") body += " " + wrap_indent(" ".join(pkgs)) body += "\n\n" if pkgs_kept_back: body += _("Packages with upgradable origin but kept back:\n") for origin, origin_pkgs in pkgs_kept_back.items(): body += " " + origin + ":\n" body += " " + wrap_indent(" ".join(origin_pkgs), subsequent_indent=" ") + "\n" body += "\n" if pkgs_removed: body += _("Packages that were auto-removed:\n") body += " " + wrap_indent(" ".join(pkgs_removed)) body += "\n\n" if pkgs_kept_installed: body += _("Packages that were kept from being auto-removed:\n") body += " " + wrap_indent(" ".join(pkgs_kept_installed)) body += "\n\n" if dpkg_log_content: body += _("Package installation log:") + "\n" body += dpkg_log_content body += "\n\n" body += _("Unattended-upgrades log:\n") body += mem_log.getvalue() from_email = apt_pkg.config.find("Unattended-Upgrade::Sender", "root") if os.path.exists(SENDMAIL_BINARY): ret = _send_mail_using_sendmail(from_email, to_email, subject, body) elif os.path.exists(MAIL_BINARY): ret = _send_mail_using_mailx(from_email, to_email, subject, body) else: raise AssertionError( "This should never be reached as we previously validated that we " "either have sendmail or mailx. Maybe they've been removed in " "this right moment?") logging.debug("mail returned: %s", ret) def do_install(cache, # type: UnattendedUpgradesCache pkgs_to_upgrade, # type: List[str] options, # type: Options logfile_dpkg, # type: str ): # type: (...) -> bool setup_apt_listchanges() logging.info(_("Writing dpkg log to %s"), logfile_dpkg) if cache.get_changes(): cache.clear() pkg_install_success = False try: if options.minimal_upgrade_steps: # try upgrade all "pkgs" in minimal steps pkg_install_success = upgrade_in_minimal_steps( cache, pkgs_to_upgrade, logfile_dpkg, options.verbose or options.debug) else: mark_pkgs_to_upgrade(cache, pkgs_to_upgrade) pkg_install_success = upgrade_normal( cache, logfile_dpkg, options.verbose or options.debug) except Exception as e: # print unhandled exceptions here this way, while stderr is redirected os.write(2, ("Exception: %s\n" % e).encode('utf-8')) pkg_install_success = False return pkg_install_success def _setup_alternative_rootdir(rootdir): # type: (str) -> None # clear system unattended-upgrade stuff apt_pkg.config.clear("Unattended-Upgrade") # read rootdir (taken from apt.Cache, but we need to run it # here before the cache gets initialized if os.path.exists(rootdir + "/etc/apt/apt.conf"): apt_pkg.read_config_file(apt_pkg.config, rootdir + "/etc/apt/apt.conf") if os.path.isdir(rootdir + "/etc/apt/apt.conf.d"): apt_pkg.read_config_dir(apt_pkg.config, rootdir + "/etc/apt/apt.conf.d") logdir = os.path.join(rootdir, "var", "log", "unattended-upgrades") if not os.path.exists(logdir): os.makedirs(logdir) apt.apt_pkg.config.set("Unattended-Upgrade::LogDir", logdir) def _get_logdir(): # type: () -> str logdir = apt_pkg.config.find_dir( "Unattended-Upgrade::LogDir", # COMPAT only apt_pkg.config.find_dir("APT::UnattendedUpgrades::LogDir", "/var/log/unattended-upgrades/")) return logdir def _setup_logging(options): # type: (Options) -> StringIO # ensure this is run only once if len(logging.root.handlers) > 0: return None # init the logging logdir = _get_logdir() logfile = os.path.join( logdir, apt_pkg.config.find( "Unattended-Upgrade::LogFile", # COMPAT only apt_pkg.config.find("APT::UnattendedUpgrades::LogFile", "unattended-upgrades.log"))) if not options.dry_run and not os.path.exists(logdir): os.makedirs(logdir) logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', filename=logfile) # additional logging logger = logging.getLogger() mem_log = StringIO() if options.apt_debug: apt_pkg.config.set("Debug::pkgProblemResolver", "1") apt_pkg.config.set("Debug::pkgDepCache::AutoInstall", "1") if options.debug: logger.setLevel(logging.DEBUG) stdout_handler = logging.StreamHandler(sys.stdout) logger.addHandler(stdout_handler) elif options.verbose: logger.setLevel(logging.INFO) stdout_handler = logging.StreamHandler(sys.stdout) logger.addHandler(stdout_handler) if apt_pkg.config.find("Unattended-Upgrade::Mail", ""): mem_log_handler = logging.StreamHandler(mem_log) logger.addHandler(mem_log_handler) # Configure syslog if necessary syslogEnable = apt_pkg.config.find_b("Unattended-Upgrade::SyslogEnable", False) if syslogEnable: syslogFacility = apt_pkg.config.find( "Unattended-Upgrade::SyslogFacility", "daemon") syslogHandler = logging.handlers.SysLogHandler( address='/dev/log', facility=syslogFacility) # type: ignore syslogHandler.setFormatter( logging.Formatter("unattended-upgrade: %(message)s")) known = syslogHandler.facility_names.keys() # type: ignore if syslogFacility.lower() in known: logger.addHandler(syslogHandler) logging.info("Enabled logging to syslog via %s facility " % syslogFacility) else: logging.warning("Syslog facility %s was not found" % syslogFacility) return mem_log def logged_in_users(): # type: () -> AbstractSet[str] """Return a list of logged in users""" # the "users" command always returns a single line with: # "user1, user1, user2" users = subprocess.check_output( USERS, universal_newlines=True).rstrip('\n') return set(users.split()) def reboot_if_requested_and_needed(): # type: () -> None """auto-reboot (if required and the config for this is set)""" if not os.path.exists(REBOOT_REQUIRED_FILE): return if not apt_pkg.config.find_b( "Unattended-Upgrade::Automatic-Reboot", False): return # see if we need to check for logged in users if not apt_pkg.config.find_b( "Unattended-Upgrade::Automatic-Reboot-WithUsers", True): users = logged_in_users() if users: msg = gettext.ngettext( "Found %s, but not rebooting because %s is logged in." % ( REBOOT_REQUIRED_FILE, users), "Found %s, but not rebooting because %s are logged in." % ( REBOOT_REQUIRED_FILE, users), len(users)) logging.warning(msg) return # reboot at the specified time when = apt_pkg.config.find( "Unattended-Upgrade::Automatic-Reboot-Time", "now") logging.warning("Found %s, rebooting" % REBOOT_REQUIRED_FILE) cmd = ["/sbin/shutdown", "-r", when] try: shutdown_msg = subprocess.check_output(cmd, stderr=subprocess.STDOUT) if shutdown_msg.strip(): logging.warning("Shutdown msg: %s", shutdown_msg.strip()) except Exception as e: logging.error("Failed to issue shutdown: %s", e) def write_stamp_file(): # type: () -> None statedir = os.path.join(apt_pkg.config.find_dir("Dir::State"), "periodic") if not os.path.exists(statedir): os.makedirs(statedir) with open(os.path.join(statedir, "unattended-upgrades-stamp"), "w"): pass def try_to_upgrade(pkg, # type: apt.Package pkgs_to_upgrade, # type: List[apt.Package] cache, # type: UnattendedUpgradesCache ): # type: (...) -> None try: try: # try to adjust pkg itself first, if that throws an exception it # can't be upgraded on its own cache.adjust_candidate(pkg) if not pkg.is_upgradable and not apt_pkg.config.find_b( "Unattended-Upgrade::Allow-downgrade", False): return except NoAllowedOriginError: return cache._cached_candidate_pkgnames.add(pkg.name) cache.mark_upgrade_adjusted(pkg, from_user=not pkg.is_auto_installed) if check_changes_for_sanity(cache, pkg): # add to packages to upgrade pkgs_to_upgrade.append(pkg) else: rewind_cache(cache, pkgs_to_upgrade) except (SystemError, NoAllowedOriginError) as e: # can't upgrade logging.warning( _("package %s upgradable but fails to " "be marked for upgrade (%s)"), pkg.name, e) rewind_cache(cache, pkgs_to_upgrade) def candidate_version_changed(pkg, # type: apt.Package ): return (pkg.is_installed and pkg.candidate and pkg.candidate.version != pkg.installed.version and apt_pkg.config.find_b( 'Unattended-Upgrade::Allow-downgrade', False)) def calculate_upgradable_pkgs(cache, # type: UnattendedUpgradesCache options, # type: Options ): # type: (...) -> List[apt.Package] pkgs_to_upgrade = [] # type: List[apt.Package] # now do the actual upgrade for pkg in cache: if options.debug and pkg.is_upgradable \ or candidate_version_changed(pkg): logging.debug("Checking: %s (%s)" % ( pkg.name, getattr(pkg.candidate, "origins", []))) if (pkg.is_upgradable or candidate_version_changed(pkg) and is_pkgname_in_whitelist(pkg.name, cache.whitelist)): try: ver_in_allowed_origin(pkg, cache.allowed_origins) except NoAllowedOriginError: continue try_to_upgrade(pkg, pkgs_to_upgrade, cache) if cache.get_changes(): cache.clear() return pkgs_to_upgrade def get_dpkg_log_content(logfile_dpkg, install_start_time): # type: (str, datetime.datetime) -> str logging.debug("Extracting content from %s since %s" % ( logfile_dpkg, install_start_time)) content = [] found_start = False try: with io.open(logfile_dpkg, encoding='utf-8', errors='replace') as fp: # read until we find the last "Log started: " for line in fp.readlines(): # scan for the first entry we need (minimal-step mode # creates a new stanza for each individual install) if not found_start and line.startswith("Log started: "): stanza_start = LoggingDateTime.from_string( line[len("Log started: "):-1]) if stanza_start >= install_start_time: found_start = True if found_start: # skip progress indicator until #860931 is fixed in apt # and dpkg if re.match( "^\\(Reading database \\.\\.\\. ()|([0-9]+%)$", line): continue content.append(line) return "".join(content) except FileNotFoundError: return "" def get_auto_removable(cache): # type: (apt.Cache) -> AbstractSet[str] return {pkg.name for pkg in cache if pkg.is_auto_removable} def is_autoremove_valid(cache, # type: UnattendedUpgradesCache pkgname, # type: str auto_removable, # type: AbstractSet[str] ): # type: (...) -> bool changes = cache.get_changes() if not changes: # package is already removed return True pkgnames = {pkg.name for pkg in changes} for pkg in changes: if not is_pkg_change_allowed(pkg, cache.blacklist, cache.whitelist, cache.strict_whitelist): logging.warning( _("Keeping the following auto-removable package(s) because " "they include %s which is set to be kept unmodified: %s"), pkg.name, " ".join(sorted(pkgnames))) return False if not pkgnames.issubset(auto_removable): if pkgname != "": logging.warning( _("Keeping auto-removable %s package(s) because it would" " also remove the following packages which should " "be kept in this step: %s"), pkgname, " ".join(sorted(pkgnames - auto_removable))) else: logging.warning( _("Keeping %s auto-removable package(s) because it would" " also remove the following packages which should " "be kept in this step: %s"), len(auto_removable), " ".join(sorted(pkgnames - auto_removable))) return False for packagename in pkgnames: if cache.running_kernel_pkgs_regexp and \ cache.running_kernel_pkgs_regexp.match(packagename): logging.warning( _("Keeping the following auto-removable package(s) because " "they include %s which package is related to the running " "kernel: %s"), packagename, " ".join(sorted(pkgnames))) return False if cache.install_count > 0: logging.error( "The following packages are marked for installation or upgrade " "which is not allowed when performing autoremovals: %s", " ".join([pkg.name for pkg in changes if not pkg.marked_delete])) return False return True def do_auto_remove(cache, # type: UnattendedUpgradesCache auto_removable, # type: AbstractSet[str] logfile_dpkg, # type: str minimal_steps, # type: bool verbose=False, # type: bool dry_run=False # type: bool ): # type: (...) -> Tuple[bool, List[str], List[str]] res = True if not auto_removable: return (res, [], []) pkgs_removed = [] # type: List[str] pkgs_kept_installed = [] # type: List[str] if minimal_steps: for pkgname in auto_removable: if should_stop(): pkgs_kept_installed = list(auto_removable - set(pkgs_removed)) return (False, pkgs_removed, pkgs_kept_installed) logging.debug("marking %s for removal" % pkgname) if pkgname in pkgs_removed: continue cache[pkgname].mark_delete() if not is_autoremove_valid(cache, pkgname, auto_removable): # this situation can occur when removing newly unused packages # would also remove old unused packages which are not set # for removal, thus getting there is not handled as an error pkgs_kept_installed.append(pkgname) cache.clear() continue if not dry_run: changes = cache.get_changes() pkgnames = {pkg.name for pkg in changes} res, error = cache_commit(cache, logfile_dpkg, verbose) if not res: break pkgs_removed.extend(pkgnames) else: cache.clear() else: for pkgname in auto_removable: cache[pkgname].mark_delete() if is_autoremove_valid(cache, "", auto_removable): # do it in one step if not dry_run: res, error = cache_commit(cache, logfile_dpkg, verbose) else: cache.clear() else: cache.clear() if res: logging.info(_("Packages that were successfully auto-removed: %s"), " ".join(sorted(pkgs_removed))) logging.info(_("Packages that are kept back: %s"), " ".join(sorted(pkgs_kept_installed))) if not res: cache.clear() logging.error(_("Auto-removing the packages failed!")) logging.error(_("Error message: %s"), error) logging.error(_("dpkg returned an error! See %s for details"), logfile_dpkg) return (res, pkgs_removed, pkgs_kept_installed) def clean_downloaded_packages(fetcher): # type: (apt_pkg.Acquire) -> None archivedir = os.path.dirname( apt_pkg.config.find_dir("Dir::Cache::archives")) for item in fetcher.items: if os.path.dirname(os.path.abspath(item.destfile)) == archivedir: try: os.unlink(item.destfile) except OSError: pass def is_update_day(): # type: () -> bool # check if patch days are configured patch_days = apt_pkg.config.value_list("Unattended-Upgrade::Update-Days") if not patch_days: return True # validate patch days today = date.today() # abbreviated localized dayname if today.strftime("%a") in patch_days: return True # full localized dayname if today.strftime("%A") in patch_days: return True # by number (Sun: 0, Mon: 1, ...) if today.strftime("%w") in patch_days: return True # today is not a patch day logging.info( "Skipping update check: today is %s,%s,%s but patch days are %s", today.strftime("%w"), today.strftime("%a"), today.strftime("%A"), ", ".join(patch_days)) return False def update_kept_pkgs_file(kept_pkgs, kept_file): # type: (DefaultDict[str, List[str]], str) -> None if kept_pkgs: pkgs_all_origins = set() for origin_pkgs in kept_pkgs.values(): pkgs_all_origins.update(origin_pkgs) try: with open(kept_file, "w") as kf: kf.write(" ".join(sorted(pkgs_all_origins))) except FileNotFoundError: logging.error(_("Could not open %s for saving list of packages " "kept back." % kept_file)) else: if os.path.exists(kept_file): os.remove(kept_file) def main(options, rootdir="/"): # type: (Options, str) -> int # useful for testing if not rootdir == "/": _setup_alternative_rootdir(rootdir) # see debian #776752 install_start_time = datetime.datetime.now().replace(microsecond=0) # setup logging mem_log = _setup_logging(options) # get log logfile_dpkg = os.path.join(_get_logdir(), 'unattended-upgrades-dpkg.log') if not os.path.exists(logfile_dpkg): with open(logfile_dpkg, 'w'): pass # lock for the shutdown check shutdown_lock = apt_pkg.get_lock(LOCK_FILE) if shutdown_lock < 0: logging.error("Lock file is already taken, exiting") return 1 try: res = run(options, rootdir, mem_log, logfile_dpkg, install_start_time) if res.success and res.result_str: # complete, successful run update_kept_pkgs_file(res.pkgs_kept_back, os.path.join(rootdir, KEPT_PACKAGES_FILE)) if res.result_str and not options.dry_run: # there is some meaningful result which is worth an email log_content = get_dpkg_log_content(logfile_dpkg, install_start_time) send_summary_mail(res.pkgs, res.success, res.result_str, res.pkgs_kept_back, res.pkgs_removed, res.pkgs_kept_installed, mem_log, log_content) if res.update_stamp: # write timestamp file write_stamp_file() if not options.dry_run: # check if the user wants a reboot reboot_if_requested_and_needed() os.close(shutdown_lock) if res.success: return 0 else: return 1 except Exception as e: logger = logging.getLogger() logger.exception(_("An error occurred: %s"), e) log_content = get_dpkg_log_content(logfile_dpkg, install_start_time) if not options.dry_run: send_summary_mail(["<unknown>"], False, _("An error occurred"), None, [], [], mem_log, log_content) # Re-raise exceptions for apport raise def mark_pkgs_to_upgrade(cache, pkgs_to_upgrade): # type (apt.Cache, List[str]) -> None for pkg_name in pkgs_to_upgrade: pkg = cache[pkg_name] if pkg.is_upgradable \ or (pkg.is_installed and pkg.candidate.version != pkg.installed.version) \ and apt_pkg.config.find_b("Unattended-Upgrade::Allow-downgrade", False): cache.mark_upgrade_adjusted(pkg, from_user=not pkg.is_auto_installed) elif not pkg.is_installed: cache.mark_install_adjusted(pkg, from_user=False) def run(options, # type: Options rootdir, # type: str mem_log, # type: StringIO logfile_dpkg, # type: str install_start_time, # type: datetime.datetime ): # type: (...) -> UnattendedUpgradesResult # check if today is a patch day if not is_update_day(): return UnattendedUpgradesResult(True) # check if u-u should be stopped already if should_stop(): return UnattendedUpgradesResult(False) # check to see if want to auto-upgrade the devel release if apt_pkg.config.find("Unattended-Upgrade::DevRelease") == "auto": try: if DISTRO_ID.lower() == 'ubuntu': devel = (distro_info.UbuntuDistroInfo() . devel(result="object")) elif DISTRO_ID.lower() == 'debian': devel = (distro_info.DebianDistroInfo() . devel(result="object")) else: devel = (distro_info.DistroInfo(DISTRO_ID) . devel(result="object")) except Exception as e: logging.warning("Could not figure out development release: %s" % e) else: if ((devel.series == DISTRO_CODENAME and devel.release is not None and devel.release - date.today() > DEVEL_UNTIL_RELEASE)): syslog.syslog((_("Not running on this development " "release before %s") % (devel.release - DEVEL_UNTIL_RELEASE - datetime.timedelta(days=1)))) logging.warning(_("Not running on this development " "release before %s") % (devel.release - DEVEL_UNTIL_RELEASE - datetime.timedelta(days=1))) return UnattendedUpgradesResult(True) logging.debug("Running on the development release") elif "(development branch)" in DISTRO_DESC and not\ apt_pkg.config.find_b("Unattended-Upgrade::DevRelease", True): syslog.syslog(_("Not running on the development release.")) logging.info(_("Not running on the development release.")) return UnattendedUpgradesResult(True) logging.info(_("Starting unattended upgrades script")) # check and get lock try: apt_pkg.pkgsystem_lock() except SystemError: logging.error(_("Lock could not be acquired (another package " "manager running?)")) print(_("Cache lock can not be acquired, exiting")) return UnattendedUpgradesResult( False, _("Lock could not be acquired")) # check if the journal is dirty and if so, take emergceny action # the alternative is to leave the system potentially unsecure until # the user comes in and fixes if is_dpkg_journal_dirty() and \ apt_pkg.config.find_b("Unattended-Upgrade::AutoFixInterruptedDpkg", True): logging.warning( _("Unclean dpkg state detected, trying to correct")) print(_("Unclean dpkg state detected, trying to correct")) env = copy.copy(os.environ) env["DPKG_FRONTEND_LOCKED"] = "1" try: with Unlocked(): output = subprocess.check_output( ["dpkg", "--force-confold", "--configure", "-a"], env=env, universal_newlines=True) except subprocess.CalledProcessError as e: output = e.output logging.warning(_("dpkg --configure -a output:\n%s"), output) # get a cache try: cache = UnattendedUpgradesCache(rootdir=rootdir) except SystemError as error: print(_("Apt returned an error, exiting")) print(_("error message: %s") % error) logging.error(_("Apt returned an error, exiting")) logging.error(_("error message: %s"), error) return UnattendedUpgradesResult( False, _("Apt returned an error, exiting")) if cache._depcache.broken_count > 0: print(_("Cache has broken packages, exiting")) logging.error(_("Cache has broken packages, exiting")) return UnattendedUpgradesResult( False, _("Cache has broken packages, exiting")) # FIXME: make this into a ContextManager # be nice when calculating the upgrade as its pretty CPU intensive old_priority = os.nice(0) try: # Check that we will be able to restore the priority os.nice(-1) os.nice(20) except OSError as e: if e.errno in (errno.EPERM, errno.EACCES): pass else: raise auto_removable = get_auto_removable(cache) # find out about the packages that are upgradable (in an allowed_origin) pkgs_to_upgrade = calculate_upgradable_pkgs(cache, options) pkgs_to_upgrade.sort(key=lambda p: p.name) pkgs = [pkg.name for pkg in pkgs_to_upgrade] logging.debug("pkgs that look like they should be upgraded: %s" % "\n".join(pkgs)) # FIXME: make this into a ContextManager # stop being nice os.nice(old_priority - os.nice(0)) # download what looks good mark_pkgs_to_upgrade(cache, pkgs) if options.debug: fetcher = apt_pkg.Acquire(apt.progress.text.AcquireProgress()) else: fetcher = apt_pkg.Acquire() list = apt_pkg.SourceList() list.read_main_list() recs = cache._records pm = apt_pkg.PackageManager(cache._depcache) # don't start downloading during shutdown # TODO: download files one by one and check for stop request after each of # them if should_stop(): return UnattendedUpgradesResult(False, _("Upgrade was interrupted")) try: pm.get_archives(fetcher, list, recs) except SystemError as e: logging.error(_("GetArchives() failed: %s"), e) try: res = fetcher.run() logging.debug("fetch.run() result: %s", res) except SystemError as e: logging.error("fetch.run() result: %s", e) if options.download_only: return UnattendedUpgradesResult(True) if cache.get_changes(): cache.clear() pkg_conffile_prompt = False if dpkg_conffile_prompt(): # now check the downloaded debs for conffile conflicts and build # a blacklist conffile_blacklist = [] # type: List[str] for item in fetcher.items: logging.debug("%s" % item) if item.status == item.STAT_ERROR: print(_("An error occurred: %s") % item.error_text) logging.error(_("An error occurred: %s"), item.error_text) if not item.complete: print(_("The URI %s failed to download, aborting") % item.desc_uri) logging.error(_("The URI %s failed to download, aborting"), item.desc_uri) return UnattendedUpgradesResult( False, (_("The URI %s failed to download, aborting") % item.desc_uri)) if not os.path.exists(item.destfile): print(_("Download finished, but file %s not there?!?") % item.destfile) logging.error("Download finished, but file %s not " "there?!?", item.destfile) return UnattendedUpgradesResult( False, (_("Download finished, but file %s not there?!?") % item.destfile)) if not item.is_trusted and not apt_pkg.config.find_b( "APT::Get::AllowUnauthenticated", False): logging.debug("%s is blacklisted because it is not trusted") pkg_name = pkgname_from_deb(item.destfile) if not is_pkgname_in_blacklist(pkg_name, cache.blacklist): conffile_blacklist.append("%s$" % re.escape(pkg_name)) if not is_deb(item.destfile): logging.debug("%s is not a .deb file" % item) continue if conffile_prompt(item.destfile): # skip package (means to re-run the whole marking again # and making sure that the package will not be pulled in by # some other package again!) # # print to stdout to ensure that this message is part of # the cron mail (only if no summary mail is requested) email = apt_pkg.config.find("Unattended-Upgrade::Mail", "") if not email: print(_("Package %s has conffile prompt and needs " "to be upgraded manually") % pkgname_from_deb(item.destfile)) # log to the logfile logging.warning(_("Package %s has conffile prompt and " "needs to be upgraded manually"), pkgname_from_deb(item.destfile)) pkg_name = pkgname_from_deb(item.destfile) if not is_pkgname_in_blacklist(pkg_name, cache.blacklist): conffile_blacklist.append("%s$" % re.escape(pkg_name)) pkg_conffile_prompt = True # redo the selection about the packages to upgrade based on the new # blacklist logging.debug("Packages blacklist due to conffile prompts: %s" % conffile_blacklist) # find out about the packages that are upgradable (in a allowed_origin) if len(conffile_blacklist) > 0: for regex in conffile_blacklist: cache.blacklist.append(regex) cache.apply_pinning(cache.pinning_from_regex_list( conffile_blacklist, NEVER_PIN)) # type: ignore old_pkgs_to_upgrade = pkgs_to_upgrade[:] pkgs_to_upgrade = [] for pkg in old_pkgs_to_upgrade: logging.debug("Checking the black and whitelist: %s" % (pkg.name)) cache.mark_upgrade_adjusted( pkg, from_user=not pkg.is_auto_installed) if check_changes_for_sanity(cache): pkgs_to_upgrade.append(pkg) else: logging.info(_("package %s not upgraded"), pkg.name) cache.clear() for pkg2 in pkgs_to_upgrade: cache.call_adjusted( apt.package.Package.mark_upgrade, pkg2, from_user=not pkg2.is_auto_installed) if cache.get_changes(): cache.clear() else: logging.debug("dpkg is configured not to cause conffile prompts") # auto-removals kernel_pkgs_remove_success = True # type: bool kernel_pkgs_removed = [] # type: List[str] kernel_pkgs_kept_installed = [] # type: List[str] if (auto_removable and apt_pkg.config.find_b( "Unattended-Upgrade::Remove-Unused-Kernel-Packages", True)): # remove unused kernels before installing new ones because the newly # installed ones may fill up /boot and break the system right before # removing old ones could take place # # this step may also remove _auto-removable_ reverse dependencies # of kernel packages auto_removable_kernel_pkgs = { p for p in auto_removable if (cache.versioned_kernel_pkgs_regexp and cache.versioned_kernel_pkgs_regexp.match(p) and not cache.running_kernel_pkgs_regexp.match(p))} if auto_removable_kernel_pkgs: logging.info(_("Removing unused kernel packages: %s"), " ".join(auto_removable_kernel_pkgs)) (kernel_pkgs_remove_success, kernel_pkgs_removed, kernel_pkgs_kept_installed) = do_auto_remove( cache, auto_removable_kernel_pkgs, logfile_dpkg, options.minimal_upgrade_steps, options.verbose or options.debug, options.dry_run) auto_removable = get_auto_removable(cache) previous_autoremovals = auto_removable if apt_pkg.config.find_b( "Unattended-Upgrade::Remove-Unused-Dependencies", False): pending_autoremovals = previous_autoremovals else: pending_autoremovals = set() # exit if there is nothing to do and nothing to report if (len(pending_autoremovals) == 0 and len(pkgs_to_upgrade) == 0): logging.info(_("No packages found that can be upgraded unattended " "and no pending auto-removals")) pkgs_kept_back = cache.find_kept_packages(options.dry_run) return UnattendedUpgradesResult( kernel_pkgs_remove_success, _("No packages found that can be upgraded unattended and no " "pending auto-removals"), pkgs_removed=kernel_pkgs_removed, pkgs_kept_back=pkgs_kept_back, pkgs_kept_installed=kernel_pkgs_kept_installed, update_stamp=True) # check if its configured for install on shutdown, if so, the # environment UNATTENDED_UPGRADES_FORCE_INSTALL_ON_SHUTDOWN will # be set by the unatteded-upgrades-shutdown script if ("UNATTENDED_UPGRADES_FORCE_INSTALL_ON_SHUTDOWN" not in os.environ and apt_pkg.config.find_b( "Unattended-Upgrade::InstallOnShutdown", False)): logger = logging.getLogger() logger.debug("Configured to install on shutdown, so exiting now") return UnattendedUpgradesResult(True) # check if we are in dry-run mode if options.dry_run: logging.info("Option --dry-run given, *not* performing real actions") apt_pkg.config.set("Debug::pkgDPkgPM", "1") # do the install based on the new list of pkgs pkgs = [pkg.name for pkg in pkgs_to_upgrade] logging.info(_("Packages that will be upgraded: %s"), " ".join(pkgs)) # only perform install step if we actually have packages to install pkg_install_success = True if len(pkgs_to_upgrade) > 0: # do install pkg_install_success = do_install(cache, pkgs, options, logfile_dpkg) # Was the overall run succesful: only if everything installed # fine and nothing was held back because of a conffile prompt. successful_run = (kernel_pkgs_remove_success and pkg_install_success and not pkg_conffile_prompt) # now check if any auto-removing needs to be done if cache._depcache.broken_count > 0: print(_("Cache has broken packages, exiting")) logging.error(_("Cache has broken packages, exiting")) return UnattendedUpgradesResult( False, _("Cache has broken packages, exiting"), pkgs=pkgs) # make sure we start autoremovals with a clear cache if cache.get_changes(): cache.clear() # the user wants *all* auto-removals to be removed # (unless u-u got signalled to stop gracefully quickly) pkgs_removed = [] # type: List[str] pkgs_kept_installed = [] # type: List[str] if ((apt_pkg.config.find_b( "Unattended-Upgrade::Remove-Unused-Dependencies", False) and not SIGNAL_STOP_REQUEST)): auto_removals = get_auto_removable(cache) (pkg_remove_success, pkgs_removed, pkgs_kept_installed) = do_auto_remove( cache, auto_removals, logfile_dpkg, options.minimal_upgrade_steps, options.verbose or options.debug, options.dry_run) successful_run = successful_run and pkg_remove_success # the user wants *only new* auto-removals to be removed elif apt_pkg.config.find_b( "Unattended-Upgrade::Remove-New-Unused-Dependencies", True): # calculate the new auto-removals new_pending_autoremovals = get_auto_removable(cache) auto_removals = new_pending_autoremovals - previous_autoremovals (pkg_remove_success, pkgs_removed, pkgs_kept_installed) = do_auto_remove( cache, auto_removals, logfile_dpkg, options.minimal_upgrade_steps, options.verbose or options.debug, options.dry_run) successful_run = successful_run and pkg_remove_success logging.debug("InstCount=%i DelCount=%i BrokenCount=%i" % (cache._depcache.inst_count, cache._depcache.del_count, cache._depcache.broken_count)) # clean after success install (if needed) keep_key = "Unattended-Upgrade::Keep-Debs-After-Install" if (not apt_pkg.config.find_b(keep_key, False) and not options.dry_run and pkg_install_success): clean_downloaded_packages(fetcher) pkgs_kept_back = cache.find_kept_packages(options.dry_run) return UnattendedUpgradesResult( successful_run, _("All upgrades installed"), pkgs, pkgs_kept_back, kernel_pkgs_removed + pkgs_removed, kernel_pkgs_kept_installed + pkgs_kept_installed, update_stamp=True) class Options: def __init__(self): self.download_only = False self.dry_run = False self.debug = False self.apt_debug = False self.verbose = False self.minimal_upgrade_steps = False if __name__ == "__main__": localesApp = "unattended-upgrades" localesDir = "/usr/share/locale" gettext.bindtextdomain(localesApp, localesDir) gettext.textdomain(localesApp) # set debconf to NON_INTERACTIVE os.environ["DEBIAN_FRONTEND"] = "noninteractive" # this ensures the commandline is logged in /var/log/apt/history.log apt_pkg.config.set("Commandline::AsString", " ".join(sys.argv)) # COMPAT with the mispelling minimal_steps_default = ( apt_pkg.config.find_b("Unattended-Upgrades::MinimalSteps", True) and apt_pkg.config.find_b("Unattended-Upgrade::MinimalSteps", True)) # init the options parser = OptionParser() parser.add_option("-d", "--debug", action="store_true", default=apt_pkg.config.find_b( "Unattended-Upgrade::Debug", False), help=_("print debug messages")) parser.add_option("", "--apt-debug", action="store_true", default=False, help=_("make apt/libapt print verbose debug messages")) parser.add_option("-v", "--verbose", action="store_true", default=apt_pkg.config.find_b( "Unattended-Upgrade::Verbose", False), help=_("print info messages")) parser.add_option("", "--dry-run", action="store_true", default=False, help=_("Simulation, download but do not install")) parser.add_option("", "--download-only", action="store_true", default=False, help=_("Only download, do not even try to install.")) parser.add_option("", "--minimal-upgrade-steps", action="store_true", default=minimal_steps_default, help=_("Upgrade in minimal steps (and allow " "interrupting with SIGTERM) (default)")) parser.add_option("", "--no-minimal-upgrade-steps", action="store_false", default=minimal_steps_default, dest="minimal_upgrade_steps", help=_("Upgrade all packages together instead of in " "smaller sets")) parser.add_option("", "--minimal_upgrade_steps", action="store_true", help=SUPPRESS_HELP, default=minimal_steps_default) options = cast(Options, (parser.parse_args())[0]) if os.getuid() != 0: print(_("You need to be root to run this application")) sys.exit(1) # ensure that we are not killed when the terminal goes away e.g. on # shutdown signal.signal(signal.SIGHUP, signal.SIG_IGN) # setup signal handler for graceful stopping signal.signal(signal.SIGTERM, signal_handler) # write pid to let other processes find this one pidf = os.path.join(apt_pkg.config.find_dir("Dir"), "var", "run", "unattended-upgrades.pid") # clean up pid file on exit with open(pidf, "w") as fp: fp.write("%s" % os.getpid()) atexit.register(os.remove, pidf) # run the main code sys.exit(main(options))