""" This module provides the functionality to download the latest primary.xml database from koji on the rawhide repo. Decompress that xml file (which are downloaded compressed). Read its content and build a dictionary with the package names as keys and their summaries as values. This code can then be used to create an in-memory cache of this information which can then later be re-used in other places. This prevents relying on remote services such as mdapi (of which a lot of code here is coming from) when needing to access the summary of a lot of packages. """ import contextlib import hashlib import logging import os import time import xml.etree.ElementTree as ET import xml.sax import defusedxml.sax import requests KOJI_REPO = 'https://kojipkgs.fedoraproject.org/repos/' repomd_xml_namespace = { 'repo': 'http://linux.duke.edu/metadata/repo', 'rpm': 'http://linux.duke.edu/metadata/rpm', } log = logging.getLogger(__name__) def download_db(name, repomd_url, archive): log.info('%s Downloading file: %s to %s' % ( name.ljust(12), repomd_url, archive)) response = requests.get(repomd_url, verify=True) with open(archive, 'wb') as stream: stream.write(response.content) def decompress_db(name, archive, location): ''' Decompress the given archive at the specified location. ''' log.info('%s Extracting %s to %s' % (name.ljust(12), archive, location)) if archive.endswith('.xz'): import lzma with contextlib.closing(lzma.LZMAFile(archive)) as stream_xz: data = stream_xz.read() with open(location, 'wb') as stream: stream.write(data) elif archive.endswith('.tar.gz'): import tarfile with tarfile.open(archive) as tar: tar.extractall(path=location) elif archive.endswith('.gz'): import gzip with open(location, 'wb') as out: with gzip.open(archive, 'rb') as inp: out.write(inp.read()) elif archive.endswith('.bz2'): import bz2 with open(location, 'wb') as out: bzar = bz2.BZ2File(archive) out.write(bzar.read()) bzar.close() else: raise NotImplementedError(archive) def needs_update(local_file, remote_sha, sha_type): ''' Compare hash of a local and remote file. Return True if our local file needs to be updated. ''' if not os.path.isfile(local_file): # If we have never downloaded this before, then obviously it has # "changed" return True # Old epel5 doesn't even know which sha it is using... if sha_type == 'sha': sha_type = 'sha1' hash = getattr(hashlib, sha_type)() with open(local_file, 'rb') as f: hash.update(f.read()) local_sha = hash.hexdigest() if local_sha != remote_sha: return True return False class PackageHandler(xml.sax.ContentHandler): def __init__(self): self.current_data = "" self.name = "" self.summary = "" self.output = {} self.pkg = {} # Call when an element starts def startElement(self, tag, attributes): self.current_data = tag if tag == "package": if self.pkg: self.output[self.pkg["name"]] = self.pkg["summary"] self.type = attributes["type"] self.pkg = {} # Call when a character is read def characters(self, content): if self.current_data == "summary": self.summary = content elif self.current_data == "name": self.name = content # Call when an elements ends def endElement(self, tag): if self.current_data == "summary": # print("Summary:", self.summary) self.pkg["summary"] = self.summary elif self.current_data == "name": # print("name:", self.name) self.pkg["name"] = self.name self.current_data = "" def get_primary_xml(destfolder, url, name): ''' Retrieve the repo metadata at the given url and store them using the provided name. ''' repomd_url = url + '/repomd.xml' response = requests.get(repomd_url, verify=True) if not bool(response): print('%s !! Failed to get %r %r' % ( name.ljust(12), repomd_url, response)) return # Parse the xml doc and get a list of locations and their shasum. files = (( node.find('repo:location', repomd_xml_namespace), node.find('repo:open-checksum', repomd_xml_namespace), ) for node in ET.fromstring(response.text)) # Extract out the attributes that we're really interested in. files = ( (f.attrib['href'].replace('repodata/', ''), s.text, s.attrib['type']) for f, s in files if f is not None and s is not None ) # Filter down to only the primary.xml files files = list((f, s, t) for f, s, t in files if 'primary.xml' in f) if not files: log.debug('No primary.xml could be found in %s' % url) elif len(files) > 1: log.debug("More than one primary.xml could be found in %s" % url) return filename, shasum, shatype = files[0] repomd_url = url + '/' + filename # First, determine if the file has changed by comparing hash db = "distgit-bugzilla-sync-primary.xml" # Have we downloaded this before? Did it change? destfile = os.path.join(destfolder, db) if not needs_update(destfile, shasum, shatype): log.debug('%s No change of %s' % (name.ljust(12), repomd_url)) else: # If it has changed, then download it and move it into place. archive = os.path.join(destfolder, filename) download_db(name, repomd_url, archive) decompress_db(name, archive, destfile) return destfile def get_package_summaries(): start = time.time() primary_xml = get_primary_xml( "/var/tmp", KOJI_REPO + 'rawhide/latest/x86_64/repodata', "koji", ) handler = PackageHandler() defusedxml.sax.parse(primary_xml, handler) delta = time.time() - start log.info(f"Parsed in {delta} seconds -- ie: {delta/60} minutes") return handler.output if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) db = get_package_summaries() print(f"guake: {db.get('guake')}") print(f"geany: {db.get('geany')}") print(f"kernel: {db.get('kernel')}")