diff --git a/roles/koschei/files/resolver.py b/roles/koschei/files/resolver.py deleted file mode 100644 index f6a4222556..0000000000 --- a/roles/koschei/files/resolver.py +++ /dev/null @@ -1,429 +0,0 @@ -# Copyright (C) 2014 Red Hat, Inc. -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -# -# Author: Michael Simacek - -import os -import time -import hawkey -import itertools -import librepo -import dnf.subject -import dnf.sack - -from sqlalchemy.orm import joinedload - -from koschei.models import (Package, Dependency, DependencyChange, Repo, - ResolutionProblem, RepoGenerationRequest, Build, - CompactDependencyChange, BuildrootProblem) -from koschei import util -from koschei.service import KojiService -from koschei.srpm_cache import SRPMCache -from koschei.repo_cache import RepoCache -from koschei.backend import check_package_state, Backend - - -class AbstractResolverTask(object): - def __init__(self, log, db, koji_session, - srpm_cache, repo_cache, backend): - self.log = log - self.db = db - self.koji_session = koji_session - self.srpm_cache = srpm_cache - self.repo_cache = repo_cache - self.backend = backend - self.problems = [] - self.sack = None - self.group = None - self.resolved_packages = {} - - def get_srpm_pkg(self, name, evr=None): - if evr: - # pylint: disable=W0633 - epoch, version, release = evr - hawk_pkg = hawkey.Query(self.sack)\ - .filter(name=name, epoch=epoch or 0, - arch='src', version=version, - release=release) - else: - hawk_pkg = hawkey.Query(self.sack)\ - .filter(name=name, arch='src', - latest_per_arch=True) - if hawk_pkg: - return hawk_pkg[0] - - def prepare_goal(self, srpm=None): - goal = hawkey.Goal(self.sack) - problems = [] - for name in self.group: - sltr = hawkey.Selector(self.sack).set(name=name) - goal.install(select=sltr) - if srpm: - for reldep in srpm.requires: - subj = dnf.subject.Subject(str(reldep)) - sltr = subj.get_best_selector(self.sack) - # pylint: disable=E1103 - if sltr is None or not sltr.matches(): - problems.append("No package found for: {}".format(reldep)) - else: - goal.install(select=sltr) - return goal, problems - - def store_deps(self, repo_id, package_id, installs): - new_deps = [] - for install in installs or []: - if install.arch != 'src': - dep = Dependency(repo_id=repo_id, package_id=package_id, - name=install.name, epoch=install.epoch, - version=install.version, - release=install.release, - arch=install.arch) - new_deps.append(dep) - - if new_deps: - # pylint: disable=E1101 - table = Dependency.__table__ - dicts = [{c.name: getattr(dep, c.name) for c in table.c - if not c.primary_key} - for dep in new_deps] - self.db.connection().execute(table.insert(), dicts) - self.db.expire_all() - - def compute_dependency_distances(self, srpm, deps): - dep_map = {dep.name: dep for dep in deps} - visited = set() - level = 1 - reldeps = srpm.requires - while level < 5 and reldeps: - pkgs_on_level = set(hawkey.Query(self.sack).filter(provides=reldeps)) - reldeps = {req for pkg in pkgs_on_level if pkg not in visited - for req in pkg.requires} - visited.update(pkgs_on_level) - for pkg in pkgs_on_level: - dep = dep_map.get(pkg.name) - if dep and dep.distance is None: - dep.distance = level - level += 1 - - def resolve_dependencies(self, package, srpm): - goal, problems = self.prepare_goal(srpm) - if goal is not None: - resolved = False - if not problems: - resolved = goal.run() - problems = goal.problems - self.resolved_packages[package.id] = resolved - if resolved: - # pylint: disable=E1101 - deps = [Dependency(name=pkg.name, epoch=pkg.epoch, - version=pkg.version, release=pkg.release, - arch=pkg.arch) - for pkg in goal.list_installs() if pkg.arch != 'src'] - self.compute_dependency_distances(srpm, deps) - return deps - else: - for problem in sorted(set(problems)): - entry = dict(package_id=package.id, - problem=problem) - self.problems.append(entry) - - def get_deps_from_db(self, package_id, repo_id): - deps = self.db.query(Dependency)\ - .filter_by(repo_id=repo_id, - package_id=package_id) - return deps.all() - - def create_dependency_changes(self, deps1, deps2, package_id, - apply_id=None): - if not deps1 or not deps2: - # TODO packages with no deps - return [] - - def key(dep): - return (dep.name, dep.epoch, dep.version, dep.release) - - old = util.set_difference(deps1, deps2, key) - new = util.set_difference(deps2, deps1, key) - - def create_change(name): - return CompactDependencyChange( - package_id=package_id, applied_in_id=apply_id, dep_name=name, - prev_epoch=None, prev_version=None, prev_release=None, - curr_epoch=None, curr_version=None, curr_release=None) - - changes = {} - for dep in old: - change = create_change(dep.name) - change.update(prev_version=dep.version, prev_epoch=dep.epoch, - prev_release=dep.release, distance=dep.distance) - changes[dep.name] = change - for dep in new: - change = changes.get(dep.name) or create_change(dep.name) - change.update(curr_version=dep.version, curr_epoch=dep.epoch, - curr_release=dep.release, distance=dep.distance) - changes[dep.name] = change - return changes.values() if changes else [] - - def update_dependency_changes(self, changes, apply_id=None): - # pylint: disable=E1101 - self.db.query(DependencyChange)\ - .filter_by(applied_in_id=apply_id)\ - .delete(synchronize_session=False) - if changes: - self.db.execute(DependencyChange.__table__.insert(), - changes) - self.db.expire_all() - - def get_prev_build_for_comparison(self, build): - return self.db.query(Build)\ - .filter_by(package_id=build.package_id)\ - .filter(Build.id < build.id)\ - .filter(Build.deps_resolved == True)\ - .order_by(Build.id.desc()).first() - - def prepare_sack(self, repo_id): - for_arch = util.config['dependency']['for_arch'] - sack = dnf.sack.Sack(arch=for_arch) - repos = self.repo_cache.get_repos(repo_id) - if repos: - util.add_repos_to_sack(repo_id, repos, sack) - self.sack = sack - - -class GenerateRepoTask(AbstractResolverTask): - - def get_packages(self, expunge=True): - packages = self.db.query(Package)\ - .filter(Package.ignored == False)\ - .options(joinedload(Package.last_build))\ - .options(joinedload(Package.last_complete_build))\ - .all() - # detaches objects from ORM, prevents spurious queries that hinder - # performance - if expunge: - self.db.expunge_all() - return packages - - def update_repo_index(self, repo_id): - index_path = os.path.join(util.config['directories']['repodata'], 'index') - with open(index_path, 'w') as index: - index.write('{}\n'.format(repo_id)) - - def synchronize_resolution_state(self): - packages = self.get_packages(expunge=False) - for pkg in packages: - curr_state = self.resolved_packages.get(pkg.id) - if curr_state is not None: - prev_state = pkg.msg_state_string - pkg.resolved = curr_state - check_package_state(pkg, prev_state) - - for state in True, False: - ids = [id for id, resolved in self.resolved_packages.iteritems() - if resolved is state] - if ids: - self.db.query(Package).filter(Package.id.in_(ids))\ - .update({'resolved': state}, synchronize_session=False) - - def get_build_for_comparison(self, package): - """ - Returns newest build which should be used for dependency - comparisons or None if it shouldn't be compared at all - """ - last_build = package.last_build - if last_build and last_build.state in Build.FINISHED_STATES: - if last_build.deps_resolved: - return last_build - if last_build.deps_processed: - # unresolved build, skip it - return self.get_prev_build_for_comparison(last_build) - - def generate_dependency_changes(self, packages, repo_id): - changes = [] - for package in packages: - srpm = self.get_srpm_pkg(package.name) - if not srpm: - continue - curr_deps = self.resolve_dependencies(package, srpm) - if curr_deps is not None: - last_build = self.get_build_for_comparison(package) - if last_build: - prev_deps = self.get_deps_from_db(last_build.package_id, - last_build.repo_id) - if prev_deps is not None: - changes += self.create_dependency_changes(prev_deps, - curr_deps, - package.id) - return changes - - def try_install_buildroot(self): - goal, _ = self.prepare_goal() - return goal.run(), goal.problems - - def run(self, repo_id): - start = time.time() - self.log.info("Generating new repo") - repo = Repo(repo_id=repo_id) - self.db.add(repo) - self.db.flush() - self.backend.refresh_latest_builds() - packages = self.get_packages() - srpm_repo = self.srpm_cache.get_repodata() - self.prepare_sack(repo_id) - if not self.sack: - self.log.error('Cannot generate repo: {}'.format(repo_id)) - return - self.update_repo_index(repo_id) - util.add_repo_to_sack('src', srpm_repo, self.sack) - # TODO repo_id - self.group = util.get_build_group(self.koji_session) - base_installable, base_problems = self.try_install_buildroot() - if not base_installable: - self.log.info("Build group not resolvable") - repo.base_resolved = False - self.db.execute(BuildrootProblem.__table__.insert(), - [{'repo_id': repo.repo_id, 'problem': problem} - for problem in base_problems]) - self.db.commit() - return - self.log.info("Resolving dependencies") - resolution_start = time.time() - changes = self.generate_dependency_changes(packages, repo_id) - resolution_end = time.time() - self.db.query(ResolutionProblem).delete(synchronize_session=False) - # pylint: disable=E1101 - if self.problems: - self.db.execute(ResolutionProblem.__table__.insert(), self.problems) - self.synchronize_resolution_state() - self.update_dependency_changes(changes) - repo.base_resolved = True - self.db.commit() - end = time.time() - - self.log.info(("New repo done. Resolution time: {} minutes\n" - "Overall time: {} minutes.") - .format((resolution_end - resolution_start) / 60, - (end - start) / 60)) - -class ProcessBuildsTask(AbstractResolverTask): - - def process_build(self, build): - if build.repo_id: - self.log.info("Processing build {}".format(build.id)) - prev = self.get_prev_build_for_comparison(build) - srpm = self.get_srpm_pkg(build.package.name, (build.epoch, - build.version, - build.release)) - if not srpm: - return - curr_deps = self.resolve_dependencies(package=build.package, - srpm=srpm) - self.store_deps(build.repo_id, build.package_id, curr_deps) - if curr_deps is not None: - build.deps_resolved = True - if prev and prev.repo_id: - prev_deps = self.get_deps_from_db(prev.package_id, - prev.repo_id) - if prev_deps and curr_deps: - changes = self.create_dependency_changes( - prev_deps, curr_deps, package_id=build.package_id, - apply_id=build.id) - self.update_dependency_changes(changes, apply_id=build.id) - keep_builds = util.config['dependency']['keep_build_deps_for'] - boundary_build = self.db.query(Build)\ - .filter_by(package_id=build.package_id)\ - .order_by(Build.id.desc())\ - .offset(keep_builds).first() - if boundary_build and boundary_build.repo_id: - self.db.query(Dependency)\ - .filter_by(package_id=build.package_id)\ - .filter(Dependency.repo_id < - boundary_build.repo_id)\ - .delete(synchronize_session=False) - - def run(self): - # pylint: disable=E1101 - unprocessed = self.db.query(Build)\ - .filter_by(deps_processed=False)\ - .filter(Build.repo_id != None)\ - .options(joinedload(Build.package))\ - .order_by(Build.repo_id).all() - # TODO repo_id - self.group = util.get_build_group(self.koji_session) - - # do this before processing to avoid multiple runs of createrepo - for build in unprocessed: - self.srpm_cache.get_srpm(build.package.name, - build.version, build.release) - srpm_repo = self.srpm_cache.get_repodata() - - for repo_id, builds in itertools.groupby(unprocessed, - lambda build: build.repo_id): - builds = list(builds) - if repo_id is not None: - self.prepare_sack(repo_id) - if self.sack: - util.add_repos_to_sack('srpm', {'src': srpm_repo}, self.sack) - for build in builds: - self.process_build(build) - self.db.query(Build).filter(Build.id.in_([b.id for b in builds]))\ - .update({'deps_processed': True}, - synchronize_session=False) - self.db.commit() - -class Resolver(KojiService): - - def __init__(self, log=None, db=None, koji_session=None, - srpm_cache=None, repo_cache=None, backend=None): - super(Resolver, self).__init__(log=log, db=db, - koji_session=koji_session) - self.srpm_cache = (srpm_cache - or SRPMCache(koji_session=self.koji_session)) - self.repo_cache = repo_cache or RepoCache() - self.backend = backend or Backend(db=self.db, - koji_session=self.koji_session, - log=self.log, - srpm_cache=self.srpm_cache) - - def get_handled_exceptions(self): - return ([librepo.LibrepoException] + - super(Resolver, self).get_handled_exceptions()) - - def create_task(self, cls): - return cls(log=self.log, db=self.db, koji_session=self.koji_session, - srpm_cache=self.srpm_cache, repo_cache=self.repo_cache, - backend=self.backend) - - def process_repo_generation_requests(self): - latest_request = self.db.query(RepoGenerationRequest)\ - .order_by(RepoGenerationRequest.repo_id - .desc())\ - .first() - if latest_request: - repo_id = latest_request.repo_id - [last_repo] = (self.db.query(Repo.repo_id) - .order_by(Repo.repo_id.desc()) - .first() or [0]) - if repo_id > last_repo: - self.create_task(GenerateRepoTask).run(repo_id) - self.db.query(RepoGenerationRequest)\ - .filter(RepoGenerationRequest.repo_id <= repo_id)\ - .delete() - self.db.commit() - - def main(self): - self.create_task(ProcessBuildsTask).run() - self.process_repo_generation_requests() diff --git a/roles/koschei/tasks/main.yml b/roles/koschei/tasks/main.yml index 22f64543d3..196dc15310 100644 --- a/roles/koschei/tasks/main.yml +++ b/roles/koschei/tasks/main.yml @@ -108,77 +108,9 @@ - koschei - config -# See https://github.com/msimacek/koschei/issues/16 -- name: HOTFIX koschei resolver.py - copy: > - src=resolver.py - dest=/usr/lib/python2.7/site-packages/koschei/resolver.py - owner=root group=root mode=0644 - when: env == "production" - notify: - - restart koschei-resolver - tags: - - koschei - - hotfix - - name: access to postgres for scripts template: src=pgpass.j2 dest=/var/cache/koschei/.pgpass owner=koschei group=koschei mode=0600 tags: - koschei - config - -- name: Create libexec/koschei dir - file: path=/usr/libexec/koschei state=directory - when: env == "production" - tags: - - koschei - - hotfix - -- name: Create symlinks to python to get specific executable names - file: path="/usr/libexec/koschei/koschei-{{ item }}" - src=/usr/bin/python - state=link - when: env == "production" - with_items: - - polling - - resolver - - scheduler - - watcher - tags: - - koschei - - hotfix - -- name: Run using specific symlinks - lineinfile: dest="/usr/lib/systemd/system/koschei-{{ item }}.service" - regexp="^ExecStart" - line="ExecStart=/usr/libexec/koschei/koschei-{{ item }} -m koschei.main {{ item }}" - when: env == "production" - with_items: - - polling - - resolver - - scheduler - - watcher - tags: - - koschei - - hotfix - notify: - - reload systemd - - restart koschei-polling - - restart koschei-resolver - - restart koschei-scheduler - - restart koschei-watcher - -- name: HOTFIX datetime error on unfinished tasks with finished parent - lineinfile: dest="/usr/lib/python2.7/site-packages/koschei/backend.py" - regexp="finished=datetime\.fromtimestamp\(task\['completion_ts'\]\)" - line="finished=datetime.fromtimestamp(task['completion_ts']) if task['completion_ts'] else None," - when: env == "production" - tags: - - koschei - - hotfix - notify: - - restart koschei-polling - - restart koschei-resolver - - restart koschei-watcher - - restart koschei-scheduler