toddlers/toddlers/utils/bugzilla_system.py
Michal Konečný d9c03e08ce Fix formatting for for black > 23
Signed-off-by: Michal Konečný <mkonecny@redhat.com>
2023-02-03 14:33:39 +01:00

657 lines
21 KiB
Python

import logging
import time
from typing import (
Any,
Callable,
cast,
Collection,
Dict,
Iterable,
Mapping,
Optional,
TypeVar,
)
import xmlrpc.client
from bugzilla import Bugzilla
from bugzilla.bug import Bug
_log = logging.getLogger(__name__)
# Have a global connection to _BUGZILLA open.
_BUGZILLA = None
def set_bz(conf: Mapping[str, str]) -> Bugzilla:
"""Set the connection bugzilla."""
global _BUGZILLA
# Get a connection to bugzilla
bz_server = conf.get("bugzilla_url")
if not bz_server:
raise ValueError("No bugzilla_url found in the configuration file")
bz_url = bz_server + "/xmlrpc.cgi"
bz_api_key = conf.get("bugzilla_api_key")
if not bz_api_key:
raise ValueError("No bugzilla_api_key found in the configuration file")
_BUGZILLA = Bugzilla(
url=bz_url, api_key=bz_api_key, cookiefile=None, tokenfile=None
)
return _BUGZILLA
def get_bz() -> Bugzilla:
"""Retrieve a connection to bugzilla
:raises ValueError: If bugzilla object wasn't initialized
"""
if _BUGZILLA is None:
raise ValueError("No bugzilla connection set, call set_bz first")
return _BUGZILLA
def get_group_member(group_name: str) -> list:
"""Return a list containing the name the members of the given group.
:arg group_name: The group name used in bugzilla.
:raises XMLRPC Fault: Code 51 if the name does not exist
:raises XMLRPC Fault: Code 805 if the user does not have enough
permissions to view groups
"""
server = get_bz()
group = server.getgroup(group_name, membership=True)
return list(group.member_emails)
def remove_user_from_group(user_email: str, bz_group: str):
"""Remove the specified user from the specified group."""
server = get_bz()
# Remove the user's bugzilla group
try:
server.updateperms(user_email, "rem", bz_group)
except xmlrpc.client.Fault as e:
if e.faultCode == 51:
# It's okay, not having this user is equivalent to setting
# them to not have this group.
pass
else:
raise
def get_user(user_email: str):
"""Returns a user object if the given user exists in bugzilla or None."""
server = get_bz()
# Return the user information from bugzilla
user = None
try:
user = server.getuser(user_email)
# print(user.userid, user.name, user.email, user.groupnames)
except xmlrpc.client.Fault as e:
if e.faultCode == 51:
pass
else:
raise
return user
def add_user_to_group(
user_email: str, bz_group: str, no_bz_account: list, dry_run: bool = False
):
"""Add the specified user to the specified group."""
server = get_bz()
# Make sure the user exists
user_obj = get_user(user_email)
if not user_obj:
# This user doesn't have a bugzilla account yet
# add them to a list and we'll let them know.
no_bz_account.append(user_email)
return no_bz_account
elif not user_obj.can_login:
_log.info(" Skipping %s, user is blocked in bugzilla", user_email)
return no_bz_account
if not dry_run:
# Add the user to the group
print(server.updateperms(user_email, "add", bz_group))
else:
_log.info(" Would add %s to the group %s", user_email, bz_group)
return no_bz_account
def get_product_info_packages(collection: str) -> Mapping[str, Mapping[str, Any]]:
"""Get product info for specific collection (Bugzilla product).
:arg collection: Collection in which to look for packages (example: "Fedora")
:return: Dictionary of product info where key is package name.
"""
server = get_bz()
# https://bugzilla.redhat.com/docs/en/html/api/core/v1/product.html#get-product
_log.debug("Querying product `%s`", collection)
product_info_pkgs = {}
raw_data = execute_bugzilla_call(
server.product_get,
kwargs={
"names": [collection],
"include_fields": [
"components.name",
"components.default_assigned_to",
"components.description",
"components.default_qa_contact",
"components.default_cc",
"components.is_active",
],
},
)
# We get one components list entry per collection
# [{"components": [...]}]
if raw_data:
for package in raw_data[0]["components"]:
# Change the names of the attributes, so they are the same
# as in another component methods
package_info = {
"initialowner": package["default_assigned_to"],
"description": package["description"],
"initialqacontact": package["default_qa_contact"],
"initialcclist": package["default_cc"],
"is_active": package["is_active"],
}
product_info_pkgs[package["name"]] = package_info
else:
_log.error("The bugzilla response doesn't have the expected format")
raise ValueError("Received data doesn't have the expected format")
return product_info_pkgs
def reassign_tickets_to_assignee(
new_poc: str,
old_poc: str,
product: str,
package: str,
versions: Iterable[str],
fas_users_info: Mapping[str, str] = {},
dry_run: bool = False,
print_fas_names: bool = False,
) -> None:
"""Change the tickets assignee for specific package.
:arg new_poc: E-mail of the new point of contact
:arg old_poc: E-mail of the previous point of contact
:arg product: The product of the package to change in bugzilla.
For example: "Fedora"
:arg package: Name of the package to change the owner for
:arg versions: List of versions of product to update.
:arg fas_users_info: Dictionary containing bugzilla e-mails mapped to FAS
usernames. Only used if `print_fas_names` is set to True.
Default to empty dictionary.
:arg dry_run: If True no change in bugzilla will be made.
Default to False.
:arg print_fas_names: If True FAS names of the `new_poc` and `old_poc` will be printed
to log at DEBUG level.
Dafault to False.
:raises xmlrpc.client.Fault: Re-raises the exception from bugzilla
with additional info.
:raises xmlrpc.client.ProtocolError: Re-raises the exception from bugzilla
with additional info.
"""
server = get_bz()
bz_query = {
"product": product,
"component": package,
"bug_status": [
"NEW",
"ASSIGNED",
"ON_DEV",
"ON_QA",
"MODIFIED",
"POST",
"FAILS_QA",
"PASSES_QA",
"RELEASE_PENDING",
],
"version": versions,
}
query_results = (
execute_bugzilla_call(server.query, kwargs={"query": bz_query}) or []
)
for bug in query_results:
if bug.assigned_to == old_poc and bug.assigned_to != new_poc:
temp_old_poc = bug.assigned_to
temp_new_poc = new_poc
if print_fas_names:
if temp_old_poc in fas_users_info:
temp_old_poc = fas_users_info[old_poc]
else:
temp_old_poc = old_poc.split("@", 1)[0] + "@..."
if temp_new_poc in fas_users_info:
temp_new_poc = fas_users_info[new_poc]
else:
temp_new_poc = new_poc.split("@", 1)[0] + "@..."
_log.info(
"%s/%s reassigning bug #%s from %s to %s",
product,
package,
bug.bug_id,
temp_old_poc,
temp_new_poc,
)
if not dry_run:
try:
execute_bugzilla_call(
bug.setassignee,
kwargs={
"assigned_to": new_poc,
"comment": "This package has changed maintainer in Fedora. "
"Reassigning to the new maintainer of this component.",
},
)
except xmlrpc.client.Fault as e:
# Output something useful in args
e.args = (new_poc, e.faultCode, e.faultString)
raise
except xmlrpc.client.ProtocolError as e:
e.args = ("ProtocolError", e.errcode, e.errmsg)
raise
def add_component(
product: str,
owner: str,
package: str,
qa_contact: Optional[str],
cc_list: Iterable[str],
fas_users_info: Mapping[str, str] = {},
description: Optional[str] = None,
retired: bool = False,
print_fas_names: bool = False,
dry_run: bool = False,
) -> None:
"""Add new component to a product in bugzilla.
:arg product: Product for which the component should be added
For example: "Fedora"
:arg owner: Bugzilla e-mail of the new owner
:arg package: Name of the package that should be added
:arg qa_contact: E-mail of QA contact for the component.
:arg cc_list: List of the e-mails that should be in CC for the component.
:arg fas_users_info: Dictionary containing bugzilla e-mails mapped to FAS
usernames. Only used if `print_fas_names` is set to True.
Default to empty dictionary.
:arg description: Description of the new component.
Default to None.
:arg retired: Retirement state of the package.
Default to False.
:arg print_fas_names: If True FAS names of the `new_poc` and `old_poc` will be printed
to log at DEBUG level.
Dafault to False.
:arg dry_run: If True no change in bugzilla will be made.
Default to False.
:raises xmlrpc.client.Fault: Re-raises the exception from bugzilla
with additional info.
"""
server = get_bz()
if retired:
_log.info("[NOADD] %s/%s is retired", product, package)
return
data = {
"product": product,
"component": package,
"description": description or "NA",
"initialowner": owner,
"is_active": not retired,
}
if cc_list:
data["initialcclist"] = cc_list
if qa_contact:
data["initialqacontact"] = qa_contact
for key in [
"initialowner",
"description",
"initialqacontact",
"initialcclist",
"is_active",
]:
if print_fas_names and key in [
"initialowner",
"initialqacontact",
"initialcclist",
]:
if key == "initialowner":
# Print bugzilla e-mail if FAS name is not found
# This shouldn't happen, but to be safe
value = fas_users_info.get(owner, owner)
if key == "initialqacontact" and qa_contact:
# Print bugzilla e-mail if FAS name is not found
# This shouldn't happen, but to be safe
value = fas_users_info.get(qa_contact, qa_contact)
if key == "initialcclist":
# Print bugzilla e-mail if FAS name is not found
# This shouldn't happen, but to be safe
value = (
"['"
+ "',".join(
fas_users_info.get(cc_user, cc_user) for cc_user in cc_list
)
+ "']"
)
_log.info(
"[ADDCOMP] %s/%s %s set to FAS name(s) `%s`",
product,
package,
key,
value,
)
else:
_log.info(
"[ADDCOMP] %s/%s %s set to `%s`", product, package, key, data.get(key)
)
if not dry_run:
try:
execute_bugzilla_call(server.addcomponent, kwargs={"data": data})
except xmlrpc.client.Fault as e:
# Output something useful in args
e.args = (data, e.faultCode, e.faultString)
raise
def edit_component(
owner: str,
product: str,
package: str,
component: dict,
cc_list: Collection[str],
versions: Iterable[str],
description: Optional[str] = None,
qa_contact: Optional[str] = None,
fas_users_info: Mapping[str, Any] = cast(Mapping[str, Any], {}),
retired: bool = False,
print_fas_names: bool = False,
print_no_change: bool = False,
dry_run: bool = False,
) -> None:
"""Edit existing bugzilla component.
:arg owner: Bugzilla e-mail of the new owner
:arg product: Product for which the component should be added
For example: "Fedora"
:arg package: Name of the package that should be added
:arg component: Dictionary containing the existing component.
It's used to compare the new values to previous one.
Could be obtained by calling `get_product_info_packages`.
:arg cc_list: List of the e-mails that should be in CC for the component.
:arg versions: Versions of component where opened bugs should be updated
if owner is changed.
:arg description: Description of the new component.
Default to None.
:arg qa_contact: E-mail of QA contact for the component.
Default to None.
:arg fas_users_info: Dictionary containing bugzilla e-mails mapped to FAS
usernames. Only used if `print_fas_names` is set to True.
Default to empty dictionary.
:arg retired: Retirement state of the package.
Default to False.
:arg print_fas_names: If True FAS names of the `new_poc` and `old_poc` will be printed
to log at DEBUG level.
Dafault to False.
:arg print_no_change: If True message will be printed to log at DEBUG
level when no change is done.
Dafault to False.
:arg dry_run: If True no change in bugzilla will be made.
Default to False.
:raises xmlrpc.client.Fault: Re-raises the exception from bugzilla
with additional info.
:raises xmlrpc.client.ProtocolError: Re-raises the exception from bugzilla
with additional info.
"""
server = get_bz()
data: Dict[str, Any] = {}
# Check for changes to the owner, qa_contact, or description
if component["initialowner"].lower() != owner.lower():
data["initialowner"] = owner
if description and component["description"] != description:
data["description"] = description
if qa_contact and (
component["initialqacontact"]
or component["initialqacontact"].lower() != qa_contact.lower()
):
data["initialqacontact"] = qa_contact
# Be explicit about removing the qacontact from Fedora components
elif qa_contact and component["initialqacontact"]:
data["initialqacontact"] = ""
if len(component["initialcclist"]) != len(cc_list):
data["initialcclist"] = cc_list
else:
cc_list_lower = [cc_member.lower() for cc_member in cc_list]
for cc_member in component["initialcclist"]:
if cc_member.lower() not in cc_list_lower:
data["initialcclist"] = cc_list
break
if component["is_active"] != (not retired):
data["is_active"] = not retired
if data:
# Changes occured. Submit a request to change via xmlrpc
data["product"] = product
data["component"] = package
for key in [
"initialowner",
"description",
"initialqacontact",
"initialcclist",
"is_active",
]:
if data.get(key) is not None:
old_value = component[key]
if isinstance(old_value, list):
old_value = sorted(old_value)
new_value = data.get(key)
if isinstance(new_value, list):
new_value = sorted(new_value)
if print_fas_names and key in [
"initialowner",
"initialqacontact",
"initialcclist",
]:
if key == "initialowner":
# Print bugzilla e-mail if FAS name is not found
# This shouldn't happen, but to be safe
old_value = fas_users_info.get(component[key], component[key])
new_value = fas_users_info.get(owner, owner)
if key == "initialqacontact":
# Print bugzilla e-mail if FAS name is not found
# This shouldn't happen, but to be safe
old_value = fas_users_info.get(component[key], component[key])
new_value = None
if qa_contact:
new_value = fas_users_info.get(qa_contact, qa_contact)
if key == "initialcclist":
# Print bugzilla e-mail if FAS name is not found
# This shouldn't happen, but to be safe
old_value = [
fas_users_info.get(cc_user, cc_user)
for cc_user in component[key]
]
new_value = [
fas_users_info.get(cc_user, cc_user) for cc_user in cc_list
]
_log.info(
"[EDITCOMP] %s/%s %s changed from `%s` to FAS name(s) `%s`",
product,
package,
key,
old_value,
new_value,
)
else:
if data.get(key) is not None:
_log.info(
"[EDITCOMP] %s/%s %s changed from `%s` to `%s`",
product,
package,
key,
old_value,
new_value,
)
owner_changed = "initialowner" in data
# FIXME: initialowner has been made mandatory for some
# reason. Asking dkl why.
data["initialowner"] = owner
if not dry_run:
try:
execute_bugzilla_call(server.editcomponent, kwargs={"data": data})
except xmlrpc.client.Fault as e:
# Output something useful in args
e.args = (data, e.faultCode, e.faultString)
raise
except xmlrpc.client.ProtocolError as e:
e.args = ("ProtocolError", e.errcode, e.errmsg)
raise
if owner_changed:
reassign_tickets_to_assignee(
new_poc=owner,
old_poc=component["initialowner"],
product=product,
package=package,
versions=versions,
fas_users_info=fas_users_info,
dry_run=dry_run,
print_fas_names=print_fas_names,
)
else:
if print_no_change:
_log.info("[NOCHANGE] %s/%s", product, package)
def get_bug(bug_id: str) -> Optional[Bug]:
"""
Retrieve bug from bugzilla.
Params:
bug_id: Id of the bug to retrieve
Returns:
Bug object if bug is retrieved, otherwise None.
"""
bz = get_bz()
try:
bug = execute_bugzilla_call(bz.getbug, args=(bug_id,))
except xmlrpc.client.Fault as e:
# Output something useful in args
e.args = (bug_id, e.faultCode, e.faultString)
raise
except xmlrpc.client.ProtocolError as e:
e.args = ("ProtocolError", e.errcode, e.errmsg)
raise
return bug
def comment_on_bug(bug_id: str, comment: str) -> None:
"""
Add comment to bug on Bugzilla.
Params:
bug_id: Identifier of the bug
comment: Comment to post
Raises:
`xmlrpc.client.Fault`: When the operation fails.
`xmlrpc.client.ProtocolError`: When communication with bugzilla fails.
"""
_log.info("Adding comment `%s` to `%s`", comment, bug_id)
bug = get_bug(bug_id)
if bug:
try:
execute_bugzilla_call(bug.addcomment, args=(comment,))
except xmlrpc.client.Fault as e:
# Output something useful in args
e.args = (comment, e.faultCode, e.faultString)
raise
except xmlrpc.client.ProtocolError as e:
e.args = ("ProtocolError", e.errcode, e.errmsg)
raise
else:
_log.error("Bug '{}' not found!".format(bug_id))
R = TypeVar("R")
def execute_bugzilla_call(
call: Callable[..., R],
args: tuple = (),
kwargs: Mapping = {},
num_of_attempts: int = 5,
) -> Optional[R]:
"""Wrapper function for Bugzilla calls. It repeats the API call for `num_of_attempts.`
:arg call: API method to call.
:arg args: Arguments for the API call, it will be unpacked when given to call.
:arg kwargs: Named arguments for the API call
:arg num_of_attempts: Number of attempts to make before raising the exception.
Default to 5.
:return: Received data.
"""
raw_data = None
for i in range(num_of_attempts):
try:
if args:
raw_data = call(*args, **kwargs)
else:
raw_data = call(**kwargs)
break
except Exception as e:
if i >= num_of_attempts - 1:
raise
_log.debug("ERROR %s", e)
_log.debug("Query failed, going to try again in 10 seconds")
# Wait 10 seconds and try again
time.sleep(10)
return raw_data