Add check_commit_rights plugin

Signed-off-by: Mattia Verga <mattia.verga@proton.me>
This commit is contained in:
Mattia Verga 2023-09-27 14:33:33 +02:00
parent d789a51852
commit 621c31ed95
3 changed files with 535 additions and 0 deletions

View file

@ -0,0 +1,324 @@
from unittest.mock import Mock, patch
import fedora_messaging.api
import pytest
import toddlers.plugins.check_commit_rights
class TestCheckCommitRightsToddler:
toddler_cls = toddlers.plugins.check_commit_rights.CheckCommitRights
def test_accepts_topic_invalid(self, toddler):
assert toddler.accepts_topic("foo.bar") is False
@pytest.mark.parametrize(
"topic",
[
"org.fedoraproject.*.toddlers.trigger.check_commit_rights",
"org.fedoraproject.prod.toddlers.trigger.check_commit_rights",
"org.fedoraproject.stg.toddlers.trigger.check_commit_rights",
],
)
def test_accepts_topic_valid(self, topic, toddler):
assert toddler.accepts_topic(topic)
def test_process_config_missing_exclude_users(self, toddler):
msg = fedora_messaging.api.Message()
msg.id = 123
msg.topic = "org.fedoraproject.prod.toddlers.trigger.check_commit_rights"
msg.body = {"foo": "bar"}
with pytest.raises(
Exception,
match=r"Invalid toddler configuration, no `exclude_users` defined",
):
assert toddler.process(config={}, message=msg) is None
def test_process_config_missing_notify_emails(self, toddler):
msg = fedora_messaging.api.Message()
msg.id = 123
msg.topic = "org.fedoraproject.prod.toddlers.trigger.check_commit_rights"
msg.body = {"foo": "bar"}
with pytest.raises(
Exception,
match=r"Invalid toddler configuration, no `notify_emails` defined",
):
assert toddler.process(config={"exclude_users": []}, message=msg) is None
@patch("toddlers.utils.fedora_account.set_fasjson", new=Mock(return_value=True))
@patch("toddlers.utils.fedora_account.get_group_member")
def test_process_no_fas(self, get_group_member, toddler):
get_group_member.return_value = []
msg = fedora_messaging.api.Message()
msg.id = 123
msg.topic = "org.fedoraproject.prod.toddlers.trigger.check_commit_rights"
msg.body = {"foo": "bar"}
config = {
"exclude_users": "[]",
"notify_emails": "[]",
}
with pytest.raises(
Exception,
match=r"Something wrong occurred, I found no packagers. Exiting.",
):
toddler.process(config=config, message=msg, send_email=False)
@patch("toddlers.utils.fedora_account.set_fasjson", new=Mock(return_value=True))
@patch("toddlers.plugins.check_commit_rights._log.info")
@patch("toddlers.utils.notify.send_email")
@patch("toddlers.utils.fedora_account.get_group_member")
def test_process_all_right(self, get_group_member, send_email, info, toddler):
req_json = {
"pagination": {
"page": 1,
"pages": 1,
"per_page": 1,
},
"projects": [
{
"access_users": {
"admin": ["admin_user", "packager_user"],
"collaborator": ["collaborator_user"],
"commit": ["commit_user"],
"owner": ["owner_user"],
},
"fullname": "rpms/project_one",
}
],
}
req = Mock()
req.ok = True
req.json.return_value = req_json
toddler.requests_session.get.return_value = req
get_group_member.return_value = ["packager_user"]
msg = fedora_messaging.api.Message()
msg.id = 123
msg.topic = "org.fedoraproject.prod.toddlers.trigger.check_commit_rights"
msg.body = {"foo": "bar"}
config = {
"exclude_users": "[]",
"notify_emails": "[]",
"admin_email": "foo@bar",
"mail_server": "127.0.0.1",
}
assert toddler.process(config=config, message=msg) is None
info.assert_any_call(
"### Found 4 users with commit privileges but not in packager group ###"
)
info.assert_any_call("admin_user")
info.assert_any_call("collaborator_user")
info.assert_any_call("commit_user")
info.assert_any_call("owner_user")
with pytest.raises(AssertionError):
info.assert_any_call("packager_user")
send_email.assert_called()
@patch("toddlers.utils.fedora_account.set_fasjson", new=Mock(return_value=True))
@patch("toddlers.plugins.check_commit_rights._log.info")
@patch("toddlers.utils.notify.send_email")
@patch("toddlers.utils.fedora_account.get_group_member")
def test_process_all_right_multiple_pages(
self, get_group_member, send_email, info, toddler
):
req_json = [
{
"pagination": {
"page": 1,
"pages": 2,
"per_page": 1,
},
"projects": [
{
"access_users": {
"admin": ["admin_user", "packager_user"],
"collaborator": [],
"commit": [],
"owner": [],
},
"fullname": "rpms/project_one",
}
],
},
{
"pagination": {
"page": 2,
"pages": 2,
"per_page": 1,
},
"projects": [
{
"access_users": {
"admin": [],
"collaborator": [],
"commit": [],
"owner": ["owner_user"],
},
"fullname": "rpms/project_two",
}
],
},
]
req = Mock()
req.ok = True
req.json.side_effect = req_json
toddler.requests_session.get.return_value = req
get_group_member.return_value = ["packager_user"]
msg = fedora_messaging.api.Message()
msg.id = 123
msg.topic = "org.fedoraproject.prod.toddlers.trigger.check_commit_rights"
msg.body = {"foo": "bar"}
config = {
"exclude_users": "[]",
"notify_emails": "[]",
"admin_email": "foo@bar",
"mail_server": "127.0.0.1",
}
assert toddler.process(config=config, message=msg) is None
info.assert_any_call(
"### Found 2 users with commit privileges but not in packager group ###"
)
info.assert_any_call("admin_user")
info.assert_any_call("owner_user")
with pytest.raises(AssertionError):
info.assert_any_call("packager_user")
send_email.assert_called()
@patch("toddlers.utils.fedora_account.set_fasjson", new=Mock(return_value=True))
@patch("toddlers.plugins.check_commit_rights._log.info")
@patch("toddlers.utils.notify.send_email")
@patch("toddlers.utils.fedora_account.get_group_member")
def test_process_exclude_user(self, get_group_member, send_email, info, toddler):
req_json = {
"pagination": {
"page": 1,
"pages": 1,
"per_page": 1,
},
"projects": [
{
"access_users": {
"admin": ["admin_user", "packager_user"],
"collaborator": ["collaborator_user"],
"commit": ["commit_user"],
"owner": ["owner_user"],
},
"fullname": "rpms/project_one",
}
],
}
req = Mock()
req.ok = True
req.json.return_value = req_json
toddler.requests_session.get.return_value = req
get_group_member.return_value = ["packager_user"]
msg = fedora_messaging.api.Message()
msg.id = 123
msg.topic = "org.fedoraproject.prod.toddlers.trigger.check_commit_rights"
msg.body = {"foo": "bar"}
config = {
"exclude_users": "['admin_user']",
"notify_emails": "[]",
"admin_email": "foo@bar",
"mail_server": "127.0.0.1",
}
assert toddler.process(config=config, message=msg) is None
info.assert_any_call(
"### Found 3 users with commit privileges but not in packager group ###"
)
info.assert_any_call("collaborator_user")
info.assert_any_call("commit_user")
info.assert_any_call("owner_user")
with pytest.raises(AssertionError):
info.assert_any_call("admin_user")
send_email.assert_called()
@patch("toddlers.utils.fedora_account.set_fasjson", new=Mock(return_value=True))
@patch("toddlers.plugins.check_commit_rights._log.info")
@patch("toddlers.utils.notify.send_email")
@patch("toddlers.utils.fedora_account.get_group_member")
def test_process_no_result(self, get_group_member, send_email, info, toddler):
req_json = {
"pagination": {
"page": 1,
"pages": 1,
"per_page": 1,
},
"projects": [
{
"access_users": {
"admin": ["packager_user"],
"collaborator": [],
"commit": [],
"owner": ["packager_user"],
},
"fullname": "rpms/project_one",
}
],
}
req = Mock()
req.ok = True
req.json.return_value = req_json
toddler.requests_session.get.return_value = req
get_group_member.return_value = ["packager_user"]
msg = fedora_messaging.api.Message()
msg.id = 123
msg.topic = "org.fedoraproject.prod.toddlers.trigger.check_commit_rights"
msg.body = {"foo": "bar"}
config = {
"exclude_users": "[]",
"notify_emails": "[]",
}
assert toddler.process(config=config, message=msg) is None
info.assert_any_call(
"### Found 0 users with commit privileges but not in packager group ###"
)
send_email.assert_not_called()
def test_main_no_args(self, capsys):
with pytest.raises(SystemExit):
toddlers.plugins.check_commit_rights.main([])
out, err = capsys.readouterr()
assert out == ""
# Expecting something along these lines, but don't make the test too tight:
#
# usage: pytest [-h] [--dry-run] [-q | --debug] conf [username]
# pytest: error: the following arguments are required: conf
assert err.startswith("usage:")
assert "error: the following arguments are required:" in err
@patch("toml.load", new=Mock(return_value={}))
def test_main_no_exclude_users(self, capsys):
with pytest.raises(
Exception,
match=r"Invalid toddler configuration, no `exclude_users` defined",
):
toddlers.plugins.check_commit_rights.main(["test.cfg"])
out, err = capsys.readouterr()
assert out == ""
assert err == ""

View file

@ -335,3 +335,10 @@ handlers = ["console"]
[log_config.root]
level = "ERROR"
handlers = ["console"]
# Configuration section for check_commit_rights
[consumer_config.check_commit_rights]
exclude_users = []
notify_emails = [
"root@localhost.localdomain",
]

View file

@ -0,0 +1,204 @@
"""
This script takes as input the fedmsg messages published under the topic
``toddlers.trigger.check_commit_rights`` and searches for any user not
member of the packager group which still have commit rights.
Authors: Mattia Verga <mattia.verga@proton.me>
"""
import argparse
import logging
import sys
import toml
from toddlers.base import ToddlerBase
from toddlers.utils import fedora_account
from toddlers.utils import notify
from toddlers.utils.misc import merge_dicts
from toddlers.utils.requests import make_session
_log = logging.getLogger(__name__)
class CheckCommitRights(ToddlerBase):
"""Listens to messages sent by playtime (which lives in toddlers) and searches
for any user not member of the packager group which still have commit rights.
"""
name = "check_commit_rights"
amqp_topics = [
"org.fedoraproject.*.toddlers.trigger.check_commit_rights",
]
def __init__(self):
self.requests_session = make_session()
def accepts_topic(self, topic):
"""Returns a boolean whether this toddler is interested in messages
from this specific topic.
"""
return topic.startswith("org.fedoraproject.") and topic.endswith(
"toddlers.trigger.check_commit_rights"
)
def process(self, config, message, send_email=True):
"""Looks for users with commit rights not in packager group."""
exclude_users = config.get("exclude_users", None)
if exclude_users is None:
raise Exception("Invalid toddler configuration, no `exclude_users` defined")
notify_emails = config.get("notify_emails")
if not notify_emails and send_email:
raise Exception("Invalid toddler configuration, no `notify_emails` defined")
_log.info("### Setting up connection to FAS ###")
fedora_account.set_fasjson(config)
_log.info("### Getting packagers list ###")
packagers_list = fedora_account.get_group_member("packager")
_log.info(f"Found {len(packagers_list)} packagers")
if not packagers_list:
raise Exception("Something wrong occurred, I found no packagers. Exiting.")
_log.info("### Looking for users with commit rights ###")
page = 1
proj_num = 0
not_packagers = []
while True:
req = self.requests_session.get(
"https://src.fedoraproject.org/api/0/projects?fork=false&namespace=rpms"
f"&per_page=100&page={page}"
)
data = req.json()
projects = data.get("projects", [])
total_pages = data.get("pagination", dict()).get("pages", 1)
for proj in projects:
proj_num += 1
users = proj.get("access_users", dict())
for user in (
users.get("owner")
+ users.get("admin")
+ users.get("collaborator")
+ users.get("commit")
):
if user in exclude_users:
continue
if user not in packagers_list:
not_packagers.append(user)
_log.info(f"Done {page} pages out of {total_pages}.")
if page >= total_pages:
break
page += 1
not_packagers_set = sorted(set(not_packagers))
_log.info(
f"### Found {len(not_packagers_set)} users with "
"commit privileges but not in packager group ###"
)
for u in not_packagers_set:
_log.info(u)
if len(not_packagers_set) and send_email:
_log.info("### Sending email to admins ###")
users_list = "\n".join([f"- {username}" for username in not_packagers_set])
message = f"""Dear Admin,
The periodic check on user commit rights on src.fp.o has identified {len(not_packagers_set)}
users which aren't member of the packager group. The list of them is here:
{users_list}
Please check is these rights are correct, they can be a leftover from when the users
were removed, or removed themselves, from the packager group.
Have a wonderful day and see you (maybe?) at the next run!
"""
notify.send_email(
to_addresses=config["notify_emails"],
from_address=config["admin_email"],
subject="Ex-packagers still having commit rights",
content=message,
mail_server=config["mail_server"],
)
# The following code allows to run this script stand-alone if needed.
def setup_logging(log_level: int):
handlers = []
_log.setLevel(log_level)
# We want all messages logged at level INFO or lower to be printed to stdout
info_handler = logging.StreamHandler(stream=sys.stdout)
handlers.append(info_handler)
if log_level == logging.INFO:
# In normal operation, don't decorate messages
for handler in handlers:
handler.setFormatter(logging.Formatter("%(message)s"))
logging.basicConfig(level=log_level, handlers=handlers)
def get_arguments(args):
"""Load and parse the CLI arguments."""
parser = argparse.ArgumentParser(
description="Looks for users with commit rights not in packager group"
)
parser.add_argument(
"conf",
help="Configuration file",
)
log_level_group = parser.add_mutually_exclusive_group()
log_level_group.add_argument(
"-q",
"--quiet",
action="store_const",
dest="log_level",
const=logging.WARNING,
default=logging.INFO,
help="Be less talkative",
)
log_level_group.add_argument(
"--debug",
action="store_const",
dest="log_level",
const=logging.DEBUG,
help="Enable debugging output",
)
return parser.parse_args(args)
def main(args):
"""Schedule the first test and run the scheduler."""
args = get_arguments(args)
setup_logging(log_level=args.log_level)
base_config = toml.load(args.conf)
default_config = base_config.get("consumer_config", {}).get("default", {})
private_config = base_config.get("consumer_config", {}).get(
"check_commit_rights", {}
)
config = merge_dicts(default_config, private_config)
CheckCommitRights().process(
config=config,
message={},
send_email=False,
)
if __name__ == "__main__": # pragma: no cover
try:
main(sys.argv[1:])
except KeyboardInterrupt:
pass