""" This module provides the functionality to download the latest primary.xml database from koji on the rawhide repo. Decompress that xml file (which are downloaded compressed). Read its content and build a dictionary with the package names as keys and their summaries as values. This code can then be used to create an in-memory cache of this information which can then later be re-used in other places. This prevents relying on remote services such as mdapi (of which a lot of code here is coming from) when needing to access the summary of a lot of packages. """ import contextlib import hashlib import logging import os import time from xml.etree.ElementTree import ParseError from defusedxml import cElementTree as etree import requests KOJI_REPO = "https://kojipkgs.fedoraproject.org/repos/" repomd_xml_namespace = { "repo": "http://linux.duke.edu/metadata/repo", "rpm": "http://linux.duke.edu/metadata/rpm", } log = logging.getLogger(__name__) def download_db(name, repomd_url, archive): log.info("%-12s Downloading file: %s to %s", name, repomd_url, archive) response = requests.get(repomd_url, verify=True) with open(archive, "wb") as stream: stream.write(response.content) def decompress_db(name, archive, location): """ Decompress the given archive at the specified location. """ log.info("%-12s Extracting %s to %s", name, archive, location) if archive.endswith(".xz"): import lzma with contextlib.closing(lzma.LZMAFile(archive)) as stream_xz: data = stream_xz.read() with open(location, "wb") as stream: stream.write(data) elif archive.endswith(".tar.gz"): import tarfile with tarfile.open(archive) as tar: tar.extractall(path=location) elif archive.endswith(".gz"): import gzip with open(location, "wb") as out: with gzip.open(archive, "rb") as inp: out.write(inp.read()) elif archive.endswith(".bz2"): import bz2 with open(location, "wb") as out: bzar = bz2.BZ2File(archive) out.write(bzar.read()) bzar.close() else: raise NotImplementedError(archive) def needs_update(local_file, remote_sha, sha_type): """ Compare hash of a local and remote file. Return True if our local file needs to be updated. """ if not os.path.isfile(local_file): # If we have never downloaded this before, then obviously it has # "changed" return True # Old epel5 doesn't even know which sha it is using... if sha_type == "sha": sha_type = "sha1" hashobj = getattr(hashlib, sha_type)() with open(local_file, "rb") as f: hashobj.update(f.read()) local_sha = hashobj.hexdigest() if local_sha != remote_sha: return True return False def get_primary_xml(destfolder, url, name): """ Retrieve the repo metadata at the given url and store them using the provided name. """ repomd_url = url + "/repomd.xml" response = requests.get(repomd_url, verify=True) if not bool(response): log.warning("%-12s !! Failed to get %s %s", name, repomd_url, response) return try: root = etree.fromstring(response.text) except ParseError: log.warning("%-12s !! Failed to parse %s %s", name, repomd_url, response) return data_nodes = list(root.findall('repo:data[@type="primary"]', repomd_xml_namespace)) if not data_nodes: log.debug("No primary.xml could be found in %s", url) return elif len(data_nodes) > 1: log.debug("More than one primary.xml could be found in %s", url) return primary_node = data_nodes[0] location_node = primary_node.find("repo:location", repomd_xml_namespace) if location_node is None or "href" not in location_node.attrib: log.debug("No valid location found for primary.xml in %s", url) return cksuminfo_node = primary_node.find("repo:open-checksum", repomd_xml_namespace) if cksuminfo_node is None or "type" not in cksuminfo_node.attrib: log.debug("No valid checksum information found for primary.xml in %s", url) return filename = location_node.attrib["href"].replace("repodata/", "") hash_digest = cksuminfo_node.text hash_type = cksuminfo_node.attrib["type"] repomd_url = url + "/" + filename # First, determine if the file has changed by comparing hash db = "distgit-bugzilla-sync-primary.xml" # Have we downloaded this before? Did it change? destfile = os.path.join(destfolder, db) if not needs_update(destfile, hash_digest, hash_type): log.debug("%s No change of %s", name.ljust(12), repomd_url) else: # If it has changed, then download it and move it into place. archive = os.path.join(destfolder, filename) download_db(name, repomd_url, archive) decompress_db(name, archive, destfile) return destfile def get_package_summaries(): summaries = {} start = time.time() primary_xml = get_primary_xml( "/var/tmp", KOJI_REPO + "rawhide/latest/x86_64/repodata", "koji", ) context = etree.iterparse(primary_xml, events=("start", "end")) root = None # iterate over the rest of the primary.xml tree for event, elem in context: if not root: root = elem continue if ( event == "end" and elem.tag == "package" and elem.get("type", "rpm") == "rpm" ): name = elem.findtext("name") summary = elem.findtext("summary") if name is not None and summary is not None: summaries[name] = summary # remove package child from root element to keep memory consumption low root.clear() delta = time.time() - start log.info(f"Parsed in {delta} seconds -- ie: {delta/60} minutes") return summaries if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) db = get_package_summaries() print(f"guake: {db.get('guake')}") print(f"geany: {db.get('geany')}") print(f"kernel: {db.get('kernel')}")