Move RPM, Koji code into misc module

Signed-off-by: Nils Philippsen <nils@redhat.com>
This commit is contained in:
Nils Philippsen 2020-02-24 11:41:57 +01:00
parent 30f90f2c99
commit 9f10baebcd
3 changed files with 82 additions and 65 deletions

60
rpmautospec/misc.py Normal file
View file

@ -0,0 +1,60 @@
from functools import cmp_to_key
import re
from typing import List, Optional, Tuple
import koji
import rpm
release_re = re.compile(r"^(?P<pkgrel>\d+)(?:(?P<middle>.*?)(?:\.(?P<minorbump>\d+))?)?$")
disttag_re = re.compile(r"^\.?(?P<distcode>[^\d\.]+)(?P<distver>\d+)")
evr_re = re.compile(r"^(?:(?P<epoch>\d+):)?(?P<version>[^-:]+)(?:-(?P<release>[^-:]+))?$")
rpmvercmp_key = cmp_to_key(
lambda b1, b2: rpm.labelCompare(
(str(b1["epoch"]), b1["version"], b1["release"]),
(str(b2["epoch"]), b2["version"], b2["release"]),
)
)
_kojiclient = None
def parse_evr(evr_str: str) -> Tuple[int, str, str]:
match = evr_re.match(evr_str)
if not match:
raise ValueError(str)
epoch = match.group("epoch") or 0
epoch = int(epoch)
return epoch, match.group("version"), match.group("release")
def parse_release_tag(tag: str) -> Tuple[Optional[int], Optional[str], Optional[str]]:
pkgrel = middle = minorbump = None
match = release_re.match(tag)
if match:
pkgrel = int(match.group("pkgrel"))
middle = match.group("middle")
try:
minorbump = int(match.group("minorbump"))
except TypeError:
pass
return pkgrel, middle, minorbump
def koji_init(koji_url: str):
global _kojiclient
_kojiclient = koji.ClientSession(koji_url)
def get_package_builds(pkgname: str) -> List[dict]:
assert _kojiclient
pkgid = _kojiclient.getPackageID(pkgname)
if not pkgid:
raise ValueError(f"Package {pkgname!r} not found!")
return _kojiclient.listBuilds(pkgid, type="rpm", queryOpts={"order": "-nvr"})

View file

@ -2,23 +2,18 @@
import argparse import argparse
from collections import defaultdict from collections import defaultdict
from functools import cmp_to_key
from itertools import chain from itertools import chain
import logging import logging
import re import re
import sys import sys
import typing import typing
import koji from .misc import disttag_re, get_package_builds, koji_init, parse_evr, parse_release_tag
import rpm from .misc import rpmvercmp_key
_log = logging.getLogger(__name__) _log = logging.getLogger(__name__)
_release_re = re.compile(r"^(?P<pkgrel>\d+)(?:(?P<middle>.*?)(?:\.(?P<minorbump>\d+))?)?$")
_disttag_re = re.compile(r"^\.?(?P<distcode>[^\d\.]+)(?P<distver>\d+)")
_evr_re = re.compile(r"^(?:(?P<epoch>\d+):)?(?P<version>[^-:]+)(?:-(?P<release>[^-:]+))?$")
def register_subcommand(subparsers): def register_subcommand(subparsers):
subcmd_name = "calculate-release" subcmd_name = "calculate-release"
@ -43,35 +38,10 @@ def register_subcommand(subparsers):
return subcmd_name return subcmd_name
def parse_evr(evr_str): def main_sequential_builds_algo(args):
match = _evr_re.match(evr_str)
if not match:
raise ValueError(str)
epoch = match.group("epoch") or 0
epoch = int(epoch)
return epoch, match.group("version"), match.group("release")
def parse_release_tag(tag):
pkgrel = middle = minorbump = None
match = _release_re.match(tag)
if match:
pkgrel = int(match.group("pkgrel"))
middle = match.group("middle")
try:
minorbump = int(match.group("minorbump"))
except TypeError:
pass
return pkgrel, middle, minorbump
def main_sequential_builds_algo(args, client, pkgid):
n_builds = 1 n_builds = 1
last_build = last_version = None last_build = last_version = None
for build in client.listBuilds(pkgid, type="rpm", queryOpts={"order": "-nvr"}): for build in get_package_builds(args.package):
if args.dist in build["release"]: if args.dist in build["release"]:
if n_builds == 1: if n_builds == 1:
last_build = build last_build = build
@ -92,14 +62,6 @@ def main_sequential_builds_algo(args, client, pkgid):
print(f"Next build: {last_build['name']}-{last_build['version']}-{n_builds}.{args.dist}") print(f"Next build: {last_build['name']}-{last_build['version']}-{n_builds}.{args.dist}")
_rpmvercmp_key = cmp_to_key(
lambda b1, b2: rpm.labelCompare(
(str(b1["epoch"]), b1["version"], b1["release"]),
(str(b2["epoch"]), b2["version"], b2["release"]),
)
)
def holistic_heuristic_calculate_release( def holistic_heuristic_calculate_release(
args: argparse.Namespace, lower_bound: dict, higher_bound: typing.Optional[dict], args: argparse.Namespace, lower_bound: dict, higher_bound: typing.Optional[dict],
): ):
@ -116,7 +78,7 @@ def holistic_heuristic_calculate_release(
) )
new_evr = {"epoch": epoch, "version": version, "release": release} new_evr = {"epoch": epoch, "version": version, "release": release}
if _rpmvercmp_key(new_evr) > _rpmvercmp_key(lower_bound): if rpmvercmp_key(new_evr) > rpmvercmp_key(lower_bound):
lower_bound = new_evr lower_bound = new_evr
if not release: if not release:
lower_bound["release"] = f"1.{dist}" lower_bound["release"] = f"1.{dist}"
@ -134,7 +96,7 @@ def holistic_heuristic_calculate_release(
# exists. # exists.
new_evr["release"] = f"{lpkgrel + 1}.{dist}" new_evr["release"] = f"{lpkgrel + 1}.{dist}"
if not higher_bound or _rpmvercmp_key(new_evr) < _rpmvercmp_key(higher_bound): if not higher_bound or rpmvercmp_key(new_evr) < rpmvercmp_key(higher_bound):
# No (satisfiable) higher bound exists or it has a higher epoch-version-release. # No (satisfiable) higher bound exists or it has a higher epoch-version-release.
return new_evr return new_evr
@ -145,13 +107,13 @@ def holistic_heuristic_calculate_release(
new_evr["release"] = rel_bak = f"{lpkgrel}.{dist}.{nminorbump}" new_evr["release"] = rel_bak = f"{lpkgrel}.{dist}.{nminorbump}"
if _rpmvercmp_key(new_evr) < _rpmvercmp_key(higher_bound): if rpmvercmp_key(new_evr) < rpmvercmp_key(higher_bound):
return new_evr return new_evr
# Oops. Attempt appending '.1' to the minor bump, ... # Oops. Attempt appending '.1' to the minor bump, ...
new_evr["release"] += ".1" new_evr["release"] += ".1"
if _rpmvercmp_key(new_evr) < _rpmvercmp_key(higher_bound): if rpmvercmp_key(new_evr) < rpmvercmp_key(higher_bound):
return new_evr return new_evr
# ... otherwise don't bother. # ... otherwise don't bother.
@ -159,8 +121,8 @@ def holistic_heuristic_calculate_release(
return new_evr return new_evr
def main_holistic_heuristic_algo(args, client, pkgid): def main_holistic_heuristic_algo(args):
match = _disttag_re.match(args.dist) match = disttag_re.match(args.dist)
if not match: if not match:
print( print(
f"Dist tag {args.dist!r} has wrong format (should be e.g. 'fc31', 'epel7')", f"Dist tag {args.dist!r} has wrong format (should be e.g. 'fc31', 'epel7')",
@ -174,13 +136,13 @@ def main_holistic_heuristic_algo(args, client, pkgid):
dtag_re = re.compile(fr"\.{distcode}(?P<distver>\d+)") dtag_re = re.compile(fr"\.{distcode}(?P<distver>\d+)")
builds = [ builds = [
build for build in client.listBuilds(pkgid, type="rpm") if dtag_re.search(build["release"]) build for build in get_package_builds(args.package) if dtag_re.search(build["release"])
] ]
# builds by distro release # builds by distro release
builds_per_distver = defaultdict(list) builds_per_distver = defaultdict(list)
for build in client.listBuilds(pkgid, type="rpm"): for build in builds:
match = dtag_re.search(build["release"]) match = dtag_re.search(build["release"])
if not match: if not match:
@ -195,14 +157,14 @@ def main_holistic_heuristic_algo(args, client, pkgid):
return return
for builds in builds_per_distver.values(): for builds in builds_per_distver.values():
builds.sort(key=_rpmvercmp_key, reverse=True) builds.sort(key=rpmvercmp_key, reverse=True)
# All builds that should be 'lower' than what we are targetting, sorted by 'highest first'. # All builds that should be 'lower' than what we are targetting, sorted by 'highest first'.
# We get by throwing all lower/current distro versions into one list because the new release # We get by throwing all lower/current distro versions into one list because the new release
# absolutely has to be higher than the highest in this list. # absolutely has to be higher than the highest in this list.
lower_bound_builds = sorted( lower_bound_builds = sorted(
chain(*(builds for dver, builds in builds_per_distver.items() if dver <= pkgdistver)), chain(*(builds for dver, builds in builds_per_distver.items() if dver <= pkgdistver)),
key=_rpmvercmp_key, key=rpmvercmp_key,
reverse=True, reverse=True,
) )
@ -217,20 +179,20 @@ def main_holistic_heuristic_algo(args, client, pkgid):
# can't be done, accommodate at least some. # can't be done, accommodate at least some.
higher_bound_builds = sorted( higher_bound_builds = sorted(
(builds[0] for dver, builds in builds_per_distver.items() if dver > pkgdistver), (builds[0] for dver, builds in builds_per_distver.items() if dver > pkgdistver),
key=_rpmvercmp_key, key=rpmvercmp_key,
) )
higher_bound_builds_nvr = [b["nvr"] for b in higher_bound_builds] higher_bound_builds_nvr = [b["nvr"] for b in higher_bound_builds]
print(f"Highest build of lower or current distro versions: {lower_bound_nvr}") print(f"Highest build of lower or current distro versions: {lower_bound_nvr}")
print(f"Highest builds of higher distro versions: {', '.join(higher_bound_builds_nvr)}") print(f"Highest builds of higher distro versions: {', '.join(higher_bound_builds_nvr)}")
lower_bound_rpmvercmp_key = _rpmvercmp_key(lower_bound) lower_bound_rpmvercmp_key = rpmvercmp_key(lower_bound)
# Disregard builds of higher distro versions that we can't go below. Sort so the first element # Disregard builds of higher distro versions that we can't go below. Sort so the first element
# is the lowest build we can (and should) go "under". # is the lowest build we can (and should) go "under".
satisfiable_higher_bound_builds = sorted( satisfiable_higher_bound_builds = sorted(
(b for b in higher_bound_builds if lower_bound_rpmvercmp_key < _rpmvercmp_key(b)), (b for b in higher_bound_builds if lower_bound_rpmvercmp_key < rpmvercmp_key(b)),
key=_rpmvercmp_key, key=rpmvercmp_key,
) )
if satisfiable_higher_bound_builds: if satisfiable_higher_bound_builds:
@ -253,14 +215,9 @@ def main_holistic_heuristic_algo(args, client, pkgid):
def main(args): def main(args):
""" Main method. """ """ Main method. """
client = koji.ClientSession(args.koji_url) koji_init(args.koji_url)
pkgid = client.getPackageID(args.package)
if not pkgid:
print(f"Package {args.package!r} not found!", file=sys.stderr)
return 1
if args.algorithm == "sequential_builds": if args.algorithm == "sequential_builds":
main_sequential_builds_algo(args, client, pkgid) main_sequential_builds_algo(args)
elif args.algorithm == "holistic_heuristic": elif args.algorithm == "holistic_heuristic":
main_holistic_heuristic_algo(args, client, pkgid) main_holistic_heuristic_algo(args)

View file

@ -42,7 +42,7 @@ class TestNextBuild:
) as f: ) as f:
koji_list_builds_output = json.load(f) koji_list_builds_output = json.load(f)
with mock.patch("rpmautospec.release.koji") as mock_koji: with mock.patch("rpmautospec.misc.koji") as mock_koji:
mock_client = mock.MagicMock() mock_client = mock.MagicMock()
mock_koji.ClientSession.return_value = mock_client mock_koji.ClientSession.return_value = mock_client
mock_client.getPackageID.return_value = 1234 mock_client.getPackageID.return_value = 1234