Add the fedora-messaging consumer to mirror a pagure project

This commit comes with both the consumer as well as an example
configuration file

Signed-off-by: Pierre-Yves Chibon <pingou@pingoured.fr>
This commit is contained in:
Pierre-Yves Chibon 2020-04-29 17:58:15 +02:00
parent d390ef18f4
commit 942c0e76f3
2 changed files with 217 additions and 0 deletions

99
local_config.toml Normal file
View file

@ -0,0 +1,99 @@
# A basic configuration for Fedora's message broker, using the example callback
# which simply prints messages to standard output.
#
# This file is in the TOML format.
amqp_url = "amqps://fedora:@rabbitmq.fedoraproject.org/%2Fpublic_pubsub"
callback = "mirror_from_pagure_bus:MirrorFromPagure"
[tls]
ca_cert = "/etc/fedora-messaging/cacert.pem"
keyfile = "/etc/fedora-messaging/fedora-key.pem"
certfile = "/etc/fedora-messaging/fedora-cert.pem"
[client_properties]
app = "mirror from pagure"
# Some suggested extra fields:
# URL of the project that provides this consumer
app_url = "https://pagure.io/Fedora-Infra/mirror_from_pagure"
# Contact emails for the maintainer(s) of the consumer - in case the
# broker admin needs to contact them, for e.g.
app_contacts_email = ["pingou@fedoraproject.org"]
[exchanges."amq.topic"]
type = "topic"
durable = true
auto_delete = false
arguments = {}
# Queue names *must* be in the normal UUID format: run "uuidgen" and use the
# output as your queue name. If your queue is not exclusive, anyone can connect
# and consume from it, causing you to miss messages, so do not share your queue
# name. Any queues that are not auto-deleted on disconnect are garbage-collected
# after approximately one hour.
#
# If you require a stronger guarantee about delivery, please talk to Fedora's
# Infrastructure team.
[queues.00000000-0000-0000-0000-000000000000]
durable = false
auto_delete = true
exclusive = true
arguments = {}
[[bindings]]
queue = "00000000-0000-0000-0000-000000000000"
exchange = "amq.topic"
# Set this to the specific topics you are interested in.
routing_keys = [
"org.fedoraproject.prod.infragit.receive",
"io.pagure.prod.pagure.git.receive",
]
[consumer_config]
mirror_folder = "mirrors"
trigger_names = ["Fedora-Infra/ansible"]
urls = [
"https://pagure.io/Fedora-Infra/ansible.git",
]
[qos]
prefetch_size = 0
prefetch_count = 25
[log_config]
version = 1
disable_existing_loggers = true
[log_config.formatters.simple]
format = "[%(levelname)s %(name)s] %(message)s"
[log_config.handlers.console]
class = "logging.StreamHandler"
formatter = "simple"
stream = "ext://sys.stdout"
[log_config.loggers.fedora_messaging]
level = "INFO"
propagate = false
handlers = ["console"]
[log_config.loggers.twisted]
level = "INFO"
propagate = false
handlers = ["console"]
[log_config.loggers.pika]
level = "WARNING"
propagate = false
handlers = ["console"]
# If your consumer sets up a logger, you must add a configuration for it
# here in order for the messages to show up. e.g. if it set up a logger
# called 'example_printer', you could do:
[log_config.loggers.mirror_from_pagure_bus]
level = "DEBUG"
propagate = false
handlers = ["console"]
[log_config.root]
level = "ERROR"
handlers = ["console"]

118
mirror_from_pagure_bus.py Normal file
View file

@ -0,0 +1,118 @@
"""
This script runs in a loop and clone or update the clone of the ansible repo
hosted in pagure.io
"""
from __future__ import print_function
import datetime
import logging
import os
import sched
import subprocess
import sys
import time
from fedora_messaging import api, config
_log = logging.getLogger(__name__)
def run_command(command, cwd=None):
""" Run the specified command in a specific working directory if one
is specified.
:arg command: the command to run
:type command: list
:kwarg cwd: the working directory in which to run this command
:type cwd: str or None
"""
output = None
try:
output = subprocess.check_output(command, cwd=cwd, stderr=subprocess.PIPE)
except subprocess.CalledProcessError as e:
_log.error("Command `%s` return code: `%s`", " ".join(command), e.returncode)
_log.error("stdout:\n-------\n%s", e.stdout)
_log.error("stderr:\n-------\n%s", e.stderr)
raise
return output
class MirrorFromPagure(object):
"""
A fedora-messaging consumer update a local mirror of a repo hosted on
pagure.io.
Three configuration key is used from fedora-messaging's
"consumer_config" key:
- "mirror_folder", which indicates where mirrors should be store
- "urls", which is a list of mirrors to keep up to date
- "triggers_name", the fullname of the project (ie: name or namespace/name)
that we want to trigger a refresh of our clone on
::
[consumer_config]
mirror_folder = "mirrors"
trigger_names = ["Fedora-Infra/ansible"]
urls = ["https://pagure.io/Fedora-Infra/ansible.git"]
"""
def __init__(self):
"""Perform some one-time initialization for the consumer."""
self.path = config.conf["consumer_config"]["mirror_folder"]
self.urls = config.conf["consumer_config"]["urls"]
self.trigger_names = config.conf["consumer_config"]["trigger_names"]
if not os.path.exists(self.path):
raise OSError("No folder %s found on disk" % self.path)
def __call__(self, message, cnt=0):
"""
Invoked when a message is received by the consumer.
Args:
message (fedora_messaging.api.Message): The message from AMQP.
"""
_log.info("Received topic: %s", message.topic)
if message.topic == "io.pagure.prod.pagure.git.receive":
repo_name = message.body.get("repo", {}).get("fullname")
if repo_name not in self.trigger_names:
_log.info("Not the pagure repo of interest, bailing")
return
elif message.topic == "org.fedoraproject.prod.infragit.receive":
pass
else:
_log.info("Unexpected topic received: %s", message.topic)
return
try:
for url in self.urls:
_log.info("Syncing %s", url)
name = url.rsplit("/", 1)[-1]
dest_folder = os.path.join(self.path, name)
if not os.path.exists(dest_folder):
_log.info(" Cloning as new %s", url)
cmd = ["git", "clone", "--mirror", url]
run_command(cmd, cwd=self.path)
_log.info(
" Running git fetch with transfer.fsckObjects=1 in %s",
dest_folder
)
cmd = ["git", "-c", "transfer.fsckObjects=1", "fetch"]
run_command(cmd, cwd=dest_folder)
except Exception:
_log.exception("Something happened while calling git")
if cnt >= 3:
raise
_log.info(" Re-running in 10 seconds")
time.sleep(10)
self.__call__(message, cnt=cnt+1)
api.consume(MirrorFromPagure)