diff --git a/roles/koji_hub/files/kojira b/roles/koji_hub/files/kojira new file mode 100755 index 0000000000..1a9448aa44 --- /dev/null +++ b/roles/koji_hub/files/kojira @@ -0,0 +1,1148 @@ +#!/usr/bin/python3 + +# Koji Repository Administrator (kojira) +# Copyright (c) 2005-2014 Red Hat, Inc. +# +# Koji is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; +# version 2.1 of the License. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this software; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: +# Mike McLean + +from __future__ import absolute_import, division + +import errno +import json +import logging +import logging.handlers +import os +import pprint +import signal +import stat +import sys +import threading +import time +import traceback +from optparse import OptionParser +from xml.etree import ElementTree + +import requests +import six + +import koji +from koji.util import deprecated, parseStatus, rmtree, to_list + +tag_cache = {} + + +def getTag(session, tag, event=None): + """A caching version of the hub call""" + cache = tag_cache + now = time.time() + if (tag, event) in cache: + ts, info = cache[(tag, event)] + if now - ts < 600: + # use the cache + return info + info = session.getTag(tag, event=event) + if info: + cache[(info['id'], event)] = (now, info) + cache[(info['name'], event)] = (now, info) + return info + + +class ManagedRepo(object): + + def __init__(self, manager, data): + self.manager = manager + self.session = manager.session + self.options = manager.options + self.logger = logging.getLogger("koji.repo") + self.current = True + self.repo_id = data['id'] + self.event_id = data['create_event'] + self.event_ts = data['create_ts'] + self.tag_id = data['tag_id'] + self.state = data['state'] + if 'dist' in data: + self._dist = data['dist'] + self.tag_name = data['tag_name'] + self.expire_ts = None + if koji.REPO_STATES[self.state] in ['EXPIRED', 'DELETED', 'PROBLEM']: + self.current = False + self.expire_ts = time.time() + # TODO use hub data to find the actual expiration time + self.first_seen = time.time() + if self.current: + order = self.session.getFullInheritance(self.tag_id, event=self.event_id) + # order may contain same tag more than once + tags = {self.tag_id: 1} + for x in order: + tags[x['parent_id']] = 1 + self.taglist = to_list(tags.keys()) + + @property + def dist(self): + # TODO: remove this indirection once we can rely on the hub to return + # dist field in getActiveRepos + if hasattr(self, '_dist'): + return self._dist + rinfo = self.session.repoInfo(self.repo_id) + self._dist = rinfo['dist'] + + def get_info(self): + "Fetch data from repo.json" + path = self.get_path() + if not path: + # can this be an error yet? + return None + fn = '%s/repo.json' % path + if not os.path.exists(fn): + self.logger.warning('Repo info file missing: %s', fn) + return None + with open(fn, 'r') as fp: + return json.load(fp) + + def get_path(self, volume=None): + """Return the path to the repo directory""" + tag_info = getTag(self.session, self.tag_id) + if not tag_info: + tag_info = getTag(self.session, self.tag_id, self.event_id) + if not tag_info: + self.logger.warning('Could not get info for tag %i, referenced by repo %i' % + (self.tag_id, self.repo_id)) + return None + tag_name = tag_info['name'] + if self.dist: + path = pathinfo.distrepo(self.repo_id, tag_name, volume=volume) + else: + # currently only dist repos can be on another volume + path = pathinfo.repo(self.repo_id, tag_name) + return path + + def expire(self): + """Mark the repo expired""" + if self.state == koji.REPO_EXPIRED: + return + elif self.state == koji.REPO_DELETED: + raise koji.GenericError("Repo already deleted") + self.logger.info("Expiring repo %s.." % self.repo_id) + self.session.repoExpire(self.repo_id) + self.state = koji.REPO_EXPIRED + + def expired(self): + return self.state == koji.REPO_EXPIRED + + def pending(self, timeout=180): + """Determine if repo generation appears to be in progress and not already obsolete""" + if self.state != koji.REPO_INIT: + return False + age = time.time() - self.event_ts + return self.current and age < timeout + + def stale(self): + """Determine if repo seems stale + + By stale, we mean: + - state=INIT + - timestamp really, really old + """ + timeout = 36000 + # XXX - config + if self.state != koji.REPO_INIT: + return False + age = time.time() - max(self.event_ts, self.first_seen) + # the first_seen timestamp is also factored in because a repo can be + # created from an older event and should not be expired based solely on + # that event's timestamp. + return age > timeout + + def tryDelete(self): + """Remove the repo from disk, if possible""" + path = self.get_path() + if not path: + # get_path already warned + return False + if self.dist: + lifetime = self.options.dist_repo_lifetime + else: + lifetime = self.options.deleted_repo_lifetime + # (should really be called expired_repo_lifetime) + try: + # also check dir age. We do this because a repo can be created from an older event + # and should not be removed based solely on that event's timestamp. + mtime = os.stat(path).st_mtime + except OSError as e: + if e.errno == 2: + # No such file or directory, so the repo either never existed, + # or has already been deleted, so allow it to be marked deleted. + self.logger.info("Repo directory does not exist: %s" % path) + pass + else: + self.logger.error("Can't stat repo directory: %s, %s" % (path, e.strerror)) + return False + else: + times = [self.event_ts, mtime, self.first_seen, self.expire_ts] + times = [ts for ts in times if ts is not None] + age = time.time() - max(times) + self.logger.debug("Repo %s (%s) age: %i sec", self.repo_id, path, age) + if age < lifetime: + return False + self.logger.debug("Attempting to delete repo %s.." % self.repo_id) + if self.state != koji.REPO_EXPIRED: + raise koji.GenericError("Repo not expired") + if self.session.repoDelete(self.repo_id) > 0: + # cannot delete, we are referenced by a buildroot + self.logger.debug("Cannot delete repo %s, still referenced" % self.repo_id) + return False + self.logger.info("Deleted repo %s" % self.repo_id) + self.state = koji.REPO_DELETED + if os.path.islink(path): + # expected for repos on other volumes + info = self.get_info() + if not os.path.exists(path): + self.logger.error('Repo volume link broken: %s', path) + return False + if not info or 'volume' not in info: + self.logger.error('Missing repo.json in %s', path) + return False + realpath = self.get_path(volume=info['volume']) + if not os.path.exists(realpath): + self.logger.error('Repo real path missing: %s', realpath) + return False + if not os.path.samefile(path, realpath): + self.logger.error('Incorrect volume link: %s', path) + return False + # ok, try to remove the symlink + try: + os.unlink(path) + except OSError: + self.logger.error('Unable to remove volume link: %s', path) + # and remove the real path + self.manager.rmtree(realpath) + else: + self.manager.rmtree(path) + return True + + def ready(self): + return self.state == koji.REPO_READY + + def deleted(self): + return self.state == koji.REPO_DELETED + + def problem(self): + return self.state == koji.REPO_PROBLEM + + +class RepoManager(object): + + def __init__(self, options, session): + self.options = options + self._local = threading.local() + self._local.session = session + self.repos = {} + self.external_repos = {} + self.tasks = {} + self.recent_tasks = {} + self.other_tasks = {} + self.needed_tags = {} + self.tag_use_stats = {} + self.delete_pids = {} + self.delete_queue = [] + self.logger = logging.getLogger("koji.repo.manager") + + @property + def session(self): + # session is stored in our threadlocal instance + return self._local.session + + @session.setter + def session(self, value): + self._local.session = value + + def printState(self): + self.logger.debug('Tracking %i repos, %i child processes', + len(self.repos), len(self.delete_pids)) + for tag_id, task_id in six.iteritems(self.tasks): + self.logger.debug("Tracking task %s for tag %s", task_id, tag_id) + for pid, desc in six.iteritems(self.delete_pids): + self.logger.debug("Delete job %s: %r", pid, desc) + + def rmtree(self, path): + """Spawn (or queue) and rmtree job""" + self.logger.info("Queuing rmtree job for %s", path) + self.delete_queue.append(path) + self.checkQueue() + + def checkQueue(self): + finished = [pid for pid in self.delete_pids if self.waitPid(pid)] + for pid in finished: + path = self.delete_pids[pid] + self.logger.info("Completed rmtree job for %s", path) + del self.delete_pids[pid] + while self.delete_queue and len(self.delete_pids) <= self.options.max_delete_processes: + path = self.delete_queue.pop(0) + pid = self._rmtree(path) + self.logger.info("Started rmtree (pid %i) for %s", pid, path) + self.delete_pids[pid] = path + + def waitPid(self, pid): + # XXX - can we unify with TaskManager? + prefix = "pid %i (%s)" % (pid, self.delete_pids.get(pid)) + try: + (childpid, status) = os.waitpid(pid, os.WNOHANG) + except OSError as e: + if e.errno != errno.ECHILD: + # should not happen + raise + # otherwise assume the process is gone + self.logger.info("%s: %s" % (prefix, e)) + return True + if childpid != 0: + self.logger.info(parseStatus(status, prefix)) + return True + return False + + def _rmtree(self, path): + pid = os.fork() + if pid: + return pid + # no return + try: + status = 1 + self.session._forget() + try: + rmtree(path) + status = 0 + except BaseException: + logger.error(''.join(traceback.format_exception(*sys.exc_info()))) + logging.shutdown() + finally: + os._exit(status) + + def killChildren(self): + # XXX - unify with TaskManager? + sig = signal.SIGTERM + for pid in self.delete_pids: + try: + os.kill(pid, sig) + except OSError as e: + if e.errno != errno.ESRCH: + logger.error("Unable to kill process %s", pid) + + def readCurrentRepos(self): + self.logger.debug("Reading current repo data") + repodata = self.session.getActiveRepos() + self.logger.debug("Repo data: %r" % repodata) + + for data in repodata: + repo_id = data['id'] + repo = self.repos.get(repo_id) + if repo: + # we're already tracking it + if repo.state != data['state']: + self.logger.info( + 'State changed for repo %s: %s -> %s', + repo_id, koji.REPO_STATES[repo.state], koji.REPO_STATES[data['state']]) + repo.state = data['state'] + else: + self.logger.info('Found repo %s, state=%s' + % (repo_id, koji.REPO_STATES[data['state']])) + repo = ManagedRepo(self, data) + self.repos[repo_id] = repo + if not getTag(self.session, repo.tag_id) and not repo.expired(): + self.logger.info('Tag %d for repo %d disappeared, expiring.', repo.tag_id, repo_id) + repo.expire() + if len(self.repos) > len(repodata): + # This shouldn't normally happen, but might if someone else calls + # repoDelete or similar + active = set([r['id'] for r in repodata]) + for repo_id in to_list(self.repos.keys()): + if repo_id not in active: + self.logger.info('Dropping entry for inactive repo: %s', repo_id) + del self.repos[repo_id] + + def checkExternalRepo(self, ts, repodata, tag): + """Determine which external repos are current, return True if remote repo is newer""" + url = repodata['url'] + if url not in self.external_repos: + self.external_repos[url] = 0 + arches = [] # placeholder for repos without $arch bit + try: + arches = getTag(self.session, tag)['arches'].split() + except AttributeError: + pass + for arch in arches: + if '$arch' in url: + arch_url = url.replace('$arch', arch) + else: + arch_url = url + arch_url = os.path.join(arch_url, 'repodata/repomd.xml') + self.logger.debug('Checking external url: %s' % arch_url) + try: + r = requests.get(arch_url, timeout=5) + root = ElementTree.fromstring(r.text) + for child in root.iter('{http://linux.duke.edu/metadata/repo}timestamp'): + remote_ts = int(child.text) + if remote_ts > self.external_repos[url]: + self.external_repos[url] = remote_ts + except Exception: + # inaccessible or without timestamps + # treat repo as unchanged (ts = 0) + pass + return ts < self.external_repos[url] + + def reposToCheck(self): + to_check = [] + repo_ids = to_list(self.repos.keys()) + for repo_id in repo_ids: + repo = self.repos.get(repo_id) + if repo is None: + # removed by main thread + continue + if not repo.current: + # no point in checking again + continue + if repo.state not in (koji.REPO_READY, koji.REPO_INIT): + repo.current = False + if repo.expire_ts is None: + repo.expire_ts = time.time() + # also no point in further checking + continue + to_check.append(repo) + if self.logger.isEnabledFor(logging.DEBUG): + skipped = set(repo_ids).difference([r.repo_id for r in to_check]) + self.logger.debug("Skipped check for repos: %r", skipped) + return to_check + + def checkExternalRepos(self): + """Determine which external repos changed""" + # clean external repo cache + self.external_repos = {} + for repo in self.reposToCheck(): + changed = False + for tag in repo.taglist: + try: + external_repos = self.session.getExternalRepoList(tag) + except koji.GenericError: + # in case tag was deleted, checkCurrentRepos is + # responsible for cleanup, ignore it here + external_repos = [] + for external_repo in external_repos: + changed = self.checkExternalRepo(repo.event_ts, external_repo, tag) + self.logger.debug("Check external repo %s [%s] for tag %s: %s" % ( + external_repo['external_repo_id'], external_repo['url'], + tag, changed)) + if changed: + break + if changed: + break + if changed: + self.logger.info("Repo %i no longer current due to external repo change" % + repo.repo_id) + repo.current = False + repo.expire_ts = time.time() + + def checkCurrentRepos(self): + """Determine which repos are current""" + for repo in self.reposToCheck(): + if self.session.tagChangedSinceEvent(repo.event_id, repo.taglist): + self.logger.info("Repo %i no longer current", repo.repo_id) + repo.current = False + repo.expire_ts = time.time() + + def currencyChecker(self, session): + """Continually checks repos for currency. Runs as a separate thread""" + self.session = session + self.logger = logging.getLogger("koji.repo.currency") + self.logger.info('currencyChecker starting') + try: + while True: + self.checkCurrentRepos() + time.sleep(self.options.sleeptime) + except Exception: + self.logger.exception('Error in currency checker thread') + raise + finally: + session.logout() + + def currencyExternalChecker(self, session): + """Continually checks repos for external repo currency. Runs as a separate thread""" + self.session = session + self.logger = logging.getLogger("koji.repo.currency_external") + self.logger.info('currencyExternalChecker starting') + try: + while True: + self.checkExternalRepos() + time.sleep(self.options.sleeptime) + except Exception: + self.logger.exception('Error in external currency checker thread') + raise + finally: + session.logout() + + def regenLoop(self, session): + """Triggers regens as needed/possible. Runs in a separate thread""" + self.session = session + self.logger = logging.getLogger("koji.repo.regen") + self.logger.info('regenLoop starting') + try: + while True: + self.regenRepos() + time.sleep(self.options.sleeptime) + except Exception: + self.logger.exception('Error in regen thread') + raise + finally: + session.logout() + + def pruneLocalRepos(self): + for volinfo in self.session.listVolumes(): + volumedir = pathinfo.volumedir(volinfo['name']) + repodir = "%s/repos" % volumedir + self._pruneLocalRepos(repodir, self.options.deleted_repo_lifetime) + distrepodir = "%s/repos-dist" % volumedir + self._pruneLocalRepos(distrepodir, self.options.dist_repo_lifetime) + + def _pruneLocalRepos(self, topdir, max_age): + """Scan filesystem for repos and remove any deleted ones + + Also, warn about any oddities""" + if self.delete_pids: + # skip + return + if not os.path.exists(topdir): + self.logger.debug("%s doesn't exist, skipping", topdir) + return + if not os.path.isdir(topdir): + self.logger.warning("%s is not directory, skipping", topdir) + return + self.logger.debug("Scanning %s for repos", topdir) + self.logger.debug('max age allowed: %s seconds', max_age) + for tag in os.listdir(topdir): + tagdir = "%s/%s" % (topdir, tag) + if not os.path.isdir(tagdir): + self.logger.debug("%s is not a directory, skipping", tagdir) + continue + for repo_id in os.listdir(tagdir): + if repo_id == 'latest': + # ignore latest symlinks + continue + try: + repo_id = int(repo_id) + except ValueError: + self.logger.debug("%s/%s not an int, skipping", tagdir, repo_id) + continue + if repo_id in self.repos: + # we're already managing it, no need to deal with it here + continue + repodir = "%s/%s" % (tagdir, repo_id) + try: + # lstat because it could be link to another volume + dirstat = os.lstat(repodir) + except OSError: + # just in case something deletes the repo out from under us + self.logger.debug("%s deleted already?!", repodir) + continue + symlink = False + if stat.S_ISLNK(dirstat.st_mode): + symlink = True + elif stat.S_ISDIR(dirstat.st_mode): + self.logger.debug("%s not a directory, skipping", repodir) + continue + dir_ts = dirstat.st_mtime + rinfo = self.session.repoInfo(repo_id) + if rinfo is None: + if not self.options.ignore_stray_repos: + age = time.time() - dir_ts + self.logger.debug("did not expect %s; age: %s", + repodir, age) + if age > max_age: + self.logger.info( + "Removing unexpected directory (no such repo): %s", repodir) + if symlink: + os.unlink(repodir) + else: + self.rmtree(repodir) + continue + if rinfo['tag_name'] != tag: + self.logger.warning( + "Tag name mismatch (rename?): %s vs %s", tag, rinfo['tag_name']) + continue + if rinfo['state'] in (koji.REPO_DELETED, koji.REPO_PROBLEM): + age = time.time() - max(rinfo['create_ts'], dir_ts) + self.logger.debug("potential removal candidate: %s; age: %s" % (repodir, age)) + if age > max_age: + logger.info("Removing stray repo (state=%s): %s", + koji.REPO_STATES[rinfo['state']], repodir) + if symlink: + os.unlink(repodir) + else: + self.rmtree(repodir) + + def tagUseStats(self, tag_id): + stats = self.tag_use_stats.get(tag_id) + now = time.time() + if stats and now - stats['ts'] < 3600: + # use the cache + return stats + data = self.session.listBuildroots(tagID=tag_id, + queryOpts={'order': '-create_event_id', 'limit': 100}) + # XXX magic number (limit) + if data: + tag_name = data[0]['tag_name'] + else: + tag_name = "#%i" % tag_id + stats = {'data': data, 'ts': now, 'tag_name': tag_name} + recent = [x for x in data if now - x['create_ts'] < 3600 * 24] + # XXX magic number + stats['n_recent'] = len(recent) + self.tag_use_stats[tag_id] = stats + self.logger.debug("tag %s recent use count: %i" % (tag_name, len(recent))) + return stats + + def setTagScore(self, entry): + """Set score for needed_tag entry + + We score the tags by two factors + - age of current repo + - last use in a buildroot + + Having an older repo or a higher use count gives the tag a higher + priority for regen. The formula attempts to keep the last use factor + from overpowering, so that tags with very old repos still get priority + """ + + stats = self.tagUseStats(entry['taginfo']['id']) + # normalize use count + max_n = max([t.get('n_recent', 0) for t in self.needed_tags.values()] or [1]) + if max_n == 0: + # no recent use or missing data + max_n = 1 + adj = stats['n_recent'] * 9.0 // max_n + 1 # 1.0 to 10.0 + ts = entry['expire_ts'] + age = time.time() - ts + # XXX - need to make sure our times aren't far off, otherwise this + # scoring could have the opposite of the desired effect + if age < 0: + self.logger.warning("Needed tag has future expire_ts: %r", entry) + age = 0 + entry['score'] = age * adj + self.logger.debug("Needed tag %s got score %.2f", + entry['taginfo']['name'], entry['score']) + # so a day old unused repo gets about the regen same score as a + # 2.4-hour-old, very popular repo + + def updateTagScores(self): + for entry in self.needed_tags.values(): + self.setTagScore(entry) + + def updateRepos(self): + self.checkTasks() + self.logger.debug("Current tasks: %r" % self.tasks) + if self.other_tasks: + self.logger.debug("Found %i untracked newRepo tasks", + len(self.other_tasks)) + self.logger.debug("Updating repos") + + self.readCurrentRepos() + + # check for stale repos + for repo in to_list(self.repos.values()): + if repo.stale(): + repo.expire() + + # find out which tags require repos + self.checkNeeded() + + self.updateTagScores() + + # trigger deletes + n_deletes = 0 + for repo in to_list(self.repos.values()): + if n_deletes >= self.options.delete_batch_size: + break + if repo.expired(): + # try to delete + if repo.tryDelete(): + n_deletes += 1 + del self.repos[repo.repo_id] + + def checkTasks(self): + """Check on newRepo tasks + + - update taskinfo + - remove finished tasks + - check for other newRepo tasks (not generated by us) + """ + + # prune recent tasks + now = time.time() + for task_id in list(self.recent_tasks): + if now - self.recent_tasks[task_id] > self.options.recent_tasks_lifetime: + del self.recent_tasks[task_id] + + # check on current tasks + task_ids = list(self.tasks) + self.session.multicall = True + for task_id in task_ids: + self.session.getTaskInfo(task_id) + for task_id, [tinfo] in zip(task_ids, self.session.multiCall(strict=True)): + tstate = koji.TASK_STATES[tinfo['state']] + tag_id = self.tasks[task_id]['tag_id'] + if tstate == 'CLOSED': + self.logger.info("Finished: newRepo task %s for tag %s", task_id, tag_id) + self.recent_tasks[task_id] = time.time() + del self.tasks[task_id] + elif tstate in ('CANCELED', 'FAILED'): + self.logger.info( + "Problem: newRepo task %s for tag %s is %s", task_id, tag_id, tstate) + self.recent_tasks[task_id] = time.time() + del self.tasks[task_id] + else: + self.tasks[task_id]['taskinfo'] = tinfo + # TODO: implement a timeout + + # also check other newRepo tasks + repo_tasks = self.session.listTasks(opts={'method': 'newRepo', + 'state': ([koji.TASK_STATES[s] + for s in ('FREE', 'OPEN')])}) + others = [t for t in repo_tasks if t['id'] not in self.tasks] + for tinfo in others: + if tinfo['id'] not in self.other_tasks: + self.logger.info("Untracked newRepo task: %(id)i", tinfo) + # note: possible race here, but only a log message + # TODO - determine tag and maven support + self.other_tasks = dict([(t['id'], t) for t in others]) + + def checkNeeded(self): + """Determine which tags currently need regeneration""" + + n_need = len(self.needed_tags) + ignore = self.options.ignore_tags.split() + self.build_tags = set([ + t['build_tag'] for t in self.session.getBuildTargets() + if not koji.util.multi_fnmatch(t['build_tag_name'], ignore) + ]) + # index repos by tag + tag_repos = {} + for repo in to_list(self.repos.values()): + tag_repos.setdefault(repo.tag_id, []).append(repo) + + for tag_id in self.build_tags: + covered = False + for repo in tag_repos.get(tag_id, []): + if repo.current: + covered = True + break + elif repo.pending(): + # one on the way + covered = True + break + if tag_id in self.needed_tags: + entry = self.needed_tags[tag_id] + if covered: + # no longer needed + self.logger.info("Tag %(name)s has a current or in " + "progress repo", entry['taginfo']) + del self.needed_tags[tag_id] + # if not covered, we already know + continue + if covered: + continue + + # we haven't noted this need yet + taginfo = self.session.getTag(tag_id) + # (not using the caching version since we only call upon discovery) + if not taginfo: + self.logger.warning('Tag disappeared: %i', tag_id) + continue + self.logger.info('Tag needs regen: %(name)s', taginfo) + + # how expired are we? + ts = 0 + for repo in tag_repos.get(tag_id, []): + if repo.expire_ts: + if repo.expire_ts > ts: + ts = repo.expire_ts + else: + self.logger.warning("No expire timestamp for repo: %s", repo.repo_id) + if ts == 0: + ts = time.time() + + entry = { + 'taginfo': taginfo, + 'expire_ts': ts, + 'needed_since': time.time(), + } + self.setTagScore(entry) + self.needed_tags[tag_id] = entry + + # some cleanup + for tag_id in list(self.needed_tags): + entry = self.needed_tags.get(tag_id) + if tag_id not in self.build_tags: + self.logger.info("Tag %(name)s is no longer a build tag", + entry['taginfo']) + del self.needed_tags[tag_id] + for tag_id, repolist in tag_repos.items(): + if tag_id not in self.build_tags: + # repos for these tags are no longer required + for repo in repolist: + if repo.ready(): + repo.expire() + + if n_need != len(self.needed_tags): + self.logger.info('Needed tags count went from %i to %i', n_need, + len(self.needed_tags)) + + def regenRepos(self): + """Trigger newRepo tasks for needed tags""" + + # first note currently running tasks + running_tasks = 0 + running_tasks_maven = 0 + for task in self.tasks.values(): + if task['taskinfo']['waiting']: + self.logger.debug("Task %(id)i is waiting", task) + else: + # The largest hub impact is from the first part of the newRepo + # task. Once it is waiting on subtasks, that part is over + running_tasks += 1 + if task['maven']: + running_tasks_maven += 1 + + debuginfo_pat = self.options.debuginfo_tags.split() + src_pat = self.options.source_tags.split() + separate_src_pat = self.options.separate_source_tags.split() + order = sorted(self.needed_tags.values(), key=lambda t: t['score'], reverse=True) + for tag in order: + if running_tasks >= self.options.max_repo_tasks: + self.logger.info("Maximum number of repo tasks reached") + return + elif len(self.tasks) + len(self.other_tasks) >= self.options.repo_tasks_limit: + self.logger.info("Repo task limit reached") + return + tagname = tag['taginfo']['name'] + task_id = tag.get('task_id') + if task_id: + if task_id in self.tasks: + # we already have a task + continue + elif task_id in self.recent_tasks: + # avoiding a race, see https://pagure.io/koji/issue/942 + continue + else: + # should not happen + logger.warning('Needed tag refers to unknown task. ' + '%s -> %i', tagname, task_id) + # we'll advance and create a new task + taskopts = {} + if koji.util.multi_fnmatch(tagname, debuginfo_pat): + taskopts['debuginfo'] = True + if koji.util.multi_fnmatch(tagname, src_pat): + taskopts['src'] = True + if koji.util.multi_fnmatch(tagname, separate_src_pat): + taskopts['separate_src'] = True + maven = tag['taginfo']['maven_support'] + if maven: + if running_tasks_maven >= self.options.max_repo_tasks_maven: + continue + task_id = self.session.newRepo(tagname, **taskopts) + running_tasks += 1 + if maven: + running_tasks_maven += 1 + expire_ts = tag['expire_ts'] + if expire_ts == 0: # can this still happen? + time_expired = '???' + else: + time_expired = "%.1f" % (time.time() - expire_ts) + self.logger.info("Created newRepo task %s for tag %s (%s), " + "expired for %s sec", task_id, tag['taginfo']['id'], + tag['taginfo']['name'], time_expired) + self.tasks[task_id] = { + 'id': task_id, + 'taskinfo': self.session.getTaskInfo(task_id), + 'tag_id': tag['taginfo']['id'], + 'maven': maven, + } + tag['task_id'] = task_id + if running_tasks_maven >= self.options.max_repo_tasks_maven: + self.logger.info("Maximum number of maven repo tasks reached") + + +def start_currency_checker(session, repomgr): + subsession = session.subsession() + thread = threading.Thread(name='currencyChecker', + target=repomgr.currencyChecker, args=(subsession,)) + thread.setDaemon(True) + thread.start() + return thread + + +def start_external_currency_checker(session, repomgr): + subsession = session.subsession() + thread = threading.Thread(name='currencyExternalChecker', + target=repomgr.currencyExternalChecker, args=(subsession,)) + thread.setDaemon(True) + thread.start() + return thread + + +def start_regen_loop(session, repomgr): + subsession = session.subsession() + thread = threading.Thread(name='regenLoop', + target=repomgr.regenLoop, args=(subsession,)) + thread.setDaemon(True) + thread.start() + return thread + + +def main(options, session): + repomgr = RepoManager(options, session) + repomgr.readCurrentRepos() + + def shutdown(*args): + raise SystemExit + signal.signal(signal.SIGTERM, shutdown) + curr_chk_thread = start_currency_checker(session, repomgr) + if options.check_external_repos: + curr_ext_chk_thread = start_external_currency_checker(session, repomgr) + regen_thread = start_regen_loop(session, repomgr) + # TODO also move rmtree jobs to threads + logger.info("Entering main loop") + while True: + try: + repomgr.updateRepos() + repomgr.checkQueue() + repomgr.printState() + repomgr.pruneLocalRepos() + if not curr_chk_thread.isAlive(): + logger.error("Currency checker thread died. Restarting it.") + curr_chk_thread = start_currency_checker(session, repomgr) + if options.check_external_repos and not curr_ext_chk_thread.isAlive(): + logger.error("External currency checker thread died. Restarting it.") + curr_ext_chk_thread = start_external_currency_checker(session, repomgr) + if not regen_thread.isAlive(): + logger.error("Regeneration thread died. Restarting it.") + regen_thread = start_regen_loop(session, repomgr) + except KeyboardInterrupt: + logger.warning("User exit") + break + except koji.AuthExpired: + logger.warning("Session expired") + break + except SystemExit: + logger.warning("Shutting down") + break + except Exception: + # log the exception and continue + logger.error(''.join(traceback.format_exception(*sys.exc_info()))) + try: + time.sleep(options.sleeptime) + except KeyboardInterrupt: + logger.warning("User exit") + break + try: + repomgr.checkQueue() + repomgr.killChildren() + finally: + session.logout() + + +def get_options(): + """process options from command line and config file""" + # parse command line args + parser = OptionParser("usage: %prog [opts]") + parser.add_option("-c", "--config", dest="configFile", + help="use alternate configuration file", metavar="FILE", + default="/etc/kojira/kojira.conf") + parser.add_option("--user", help="specify user") + parser.add_option("--password", help="specify password") + parser.add_option("--principal", help="Kerberos principal") + parser.add_option("--keytab", help="Kerberos keytab") + parser.add_option("-f", "--fg", dest="daemon", + action="store_false", default=True, + help="run in foreground") + parser.add_option("-d", "--debug", action="store_true", + help="show debug output") + parser.add_option("-q", "--quiet", action="store_true", + help="don't show warnings") + parser.add_option("-v", "--verbose", action="store_true", + help="show verbose output") + parser.add_option("--force-lock", action="store_true", default=False, + help="force lock for exclusive session") + parser.add_option("--debug-xmlrpc", action="store_true", default=False, + help="show xmlrpc debug output") + parser.add_option("--skip-main", action="store_true", default=False, + help="don't actually run main") + parser.add_option("--show-config", action="store_true", default=False, + help="Show config and exit") + parser.add_option("--sleeptime", type='int', help="Specify the polling interval") + parser.add_option("-s", "--server", help="URL of XMLRPC server") + parser.add_option("--topdir", help="Specify topdir") + parser.add_option("--logfile", help="Specify logfile") + (options, args) = parser.parse_args() + + config = koji.read_config_files(options.configFile) + section = 'kojira' + for x in config.sections(): + if x != section: + quit('invalid section found in config file: %s' % x) + defaults = {'debuginfo_tags': '', + 'source_tags': '', + 'separate_source_tags': '', + 'ignore_tags': '', + 'verbose': False, + 'debug': False, + 'ignore_stray_repos': False, + 'topdir': '/mnt/koji', + 'server': None, + 'logfile': '/var/log/kojira.log', + 'principal': None, + 'keytab': '/etc/kojira/kojira.keytab', + 'ccache': '/var/tmp/kojira.ccache', + 'krbservice': 'host', + 'krb_rdns': True, + 'krb_canon_host': False, + 'krb_server_realm': None, + 'retry_interval': 60, + 'max_retries': 120, + 'offline_retry': True, + 'offline_retry_interval': 120, + 'no_ssl_verify': False, + 'max_delete_processes': 4, + 'max_repo_tasks': 4, + 'max_repo_tasks_maven': 2, + 'repo_tasks_limit': 10, + 'delete_batch_size': 3, + 'deleted_repo_lifetime': 7 * 24 * 3600, + # XXX should really be called expired_repo_lifetime + 'dist_repo_lifetime': 7 * 24 * 3600, + 'check_external_repos': False, + 'recent_tasks_lifetime': 600, + 'sleeptime': 15, + 'cert': None, + 'ca': '', # FIXME: unused, remove in next major release + 'serverca': None, + } + if config.has_section(section): + int_opts = ('deleted_repo_lifetime', 'max_repo_tasks', 'repo_tasks_limit', + 'retry_interval', 'max_retries', 'offline_retry_interval', + 'max_delete_processes', 'max_repo_tasks_maven', + 'delete_batch_size', 'dist_repo_lifetime', 'sleeptime', + 'recent_tasks_lifetime') + str_opts = ('topdir', 'server', 'user', 'password', 'logfile', 'principal', 'keytab', + 'krbservice', 'cert', 'ca', 'serverca', 'debuginfo_tags', + 'source_tags', 'separate_source_tags', 'ignore_tags') # FIXME: remove ca here + bool_opts = ('verbose', 'debug', 'ignore_stray_repos', 'offline_retry', + 'krb_rdns', 'krb_canon_host', 'no_ssl_verify', 'check_external_repos') + legacy_opts = ('with_src') + for name in config.options(section): + if name in int_opts: + defaults[name] = config.getint(section, name) + elif name in str_opts: + defaults[name] = config.get(section, name) + elif name in bool_opts: + defaults[name] = config.getboolean(section, name) + elif name in legacy_opts: + deprecated('The %s configuration option is no longer used\n' % name) + else: + quit("unknown config option: %s" % name) + for name, value in defaults.items(): + if getattr(options, name, None) is None: + setattr(options, name, value) + if options.logfile in ('', 'None', 'none'): + options.logfile = None + # special handling for cert defaults + cert_defaults = { + 'cert': '/etc/kojira/client.crt', + 'serverca': '/etc/kojira/serverca.crt', + } + for name in cert_defaults: + if getattr(options, name, None) is None: + fn = cert_defaults[name] + if os.path.exists(fn): + setattr(options, name, fn) + return options + + +def quit(msg=None, code=1): + if msg: + logging.getLogger("koji.repo").error(msg) + sys.stderr.write('%s\n' % msg) + sys.stderr.flush() + sys.exit(code) + + +if __name__ == "__main__": + + options = get_options() + topdir = getattr(options, 'topdir', None) + pathinfo = koji.PathInfo(topdir) + if options.show_config: + pprint.pprint(options.__dict__) + sys.exit() + if options.logfile: + if not os.path.exists(options.logfile): + try: + logfile = open(options.logfile, "w") + logfile.close() + except Exception: + sys.stderr.write("Cannot create logfile: %s\n" % options.logfile) + sys.exit(1) + if not os.access(options.logfile, os.W_OK): + sys.stderr.write("Cannot write to logfile: %s\n" % options.logfile) + sys.exit(1) + koji.add_file_logger("koji", options.logfile) + # note we're setting logging for koji.* + logger = logging.getLogger("koji") + if options.debug: + logger.setLevel(logging.DEBUG) + elif options.verbose: + logger.setLevel(logging.INFO) + elif options.quiet: + logger.setLevel(logging.ERROR) + else: + logger.setLevel(logging.WARNING) + + session_opts = koji.grab_session_options(options) + session = koji.ClientSession(options.server, session_opts) + if options.cert is not None and os.path.isfile(options.cert): + # authenticate using SSL client certificates + session.ssl_login(options.cert, None, options.serverca) + elif options.user: + # authenticate using user/password + session.login() + elif (koji.krbV or koji.requests_kerberos) and options.principal and options.keytab: + session.krb_login(options.principal, options.keytab, options.ccache) + else: + quit("No username/password/certificate supplied and Kerberos missing or not configured") + # get an exclusive session + try: + session.exclusiveSession(force=options.force_lock) + except koji.AuthLockError: + quit("Error: Unable to get lock. Trying using --force-lock") + if not session.logged_in: + quit("Error: Unknown login error") + if not session.logged_in: + print("Error: unable to log in") + sys.exit(1) + if options.skip_main: + sys.exit() + elif options.daemon: + koji.daemonize() + else: + koji.add_stderr_logger("koji") + main(options, session) diff --git a/roles/koji_hub/tasks/main.yml b/roles/koji_hub/tasks/main.yml index f244960237..c9148651f9 100644 --- a/roles/koji_hub/tasks/main.yml +++ b/roles/koji_hub/tasks/main.yml @@ -463,3 +463,16 @@ tags: - files - koji_hub + +# for now we have a patched version of kojira +# Based on the 1.21.1 version +# With: +# https://pagure.io/koji/pull-request/2140.patch +# https://pagure.io/koji/pull-request/2340.patch +# Hopefully all merged in 1.22. +# +- name: copy in patched kojira + copy: src=kojira dest=/usr/sbin/kojira + tags: + - files + - koji_hub diff --git a/roles/koji_hub/templates/kojira.conf.j2 b/roles/koji_hub/templates/kojira.conf.j2 index 9f12ba6ea3..d286236d00 100644 --- a/roles/koji_hub/templates/kojira.conf.j2 +++ b/roles/koji_hub/templates/kojira.conf.j2 @@ -36,6 +36,9 @@ repo_tasks_limit=15 krb_rdns=false serverca = /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem +;how soon (in seconds) to clean up expired repositories. 1 week default +deleted_repo_lifetime = 604800 + ;how soon (in seconds) to clean up dist repositories. ;we want this super long so we don't delete latest repos ;just set this to 6 months for now. @@ -53,4 +56,4 @@ check_external_repos = True ; every cycle. File would contain information about how long these ; tags are expired and what is the computed score for them. This can ; be used to debug and check in realtime the actual performance. -;queue_file = /var/tmp/kojira-queue +queue_file = /var/tmp/kojira-queue