From 821209cb26aeeb29e15a6ff32c2c43fb2a1ae9ce Mon Sep 17 00:00:00 2001 From: Stephen Coady Date: Mon, 22 Feb 2021 15:21:35 +0000 Subject: [PATCH] hotpatch fmn to work with fasjson Signed-off-by: Stephen Coady --- .../backend/files/fasjson-port/consumer.py | 210 ++++++++ .../files/fasjson-port/fasjson_client.py | 62 +++ .../backend/files/fasjson-port/fmn_fasshim.py | 198 ++++++++ .../backend/files/fasjson-port/tasks.py | 475 ++++++++++++++++++ .../notifs/backend/files/fasjson-port/util.py | 52 ++ roles/notifs/backend/tasks/main.yml | 15 + .../notifs/backend/templates/fmn.consumer.py | 4 + 7 files changed, 1016 insertions(+) create mode 100644 roles/notifs/backend/files/fasjson-port/consumer.py create mode 100644 roles/notifs/backend/files/fasjson-port/fasjson_client.py create mode 100644 roles/notifs/backend/files/fasjson-port/fmn_fasshim.py create mode 100644 roles/notifs/backend/files/fasjson-port/tasks.py create mode 100644 roles/notifs/backend/files/fasjson-port/util.py diff --git a/roles/notifs/backend/files/fasjson-port/consumer.py b/roles/notifs/backend/files/fasjson-port/consumer.py new file mode 100644 index 0000000000..ee9fedc43a --- /dev/null +++ b/roles/notifs/backend/files/fasjson-port/consumer.py @@ -0,0 +1,210 @@ +""" +This is a `fedmsg consumer`_ that subscribes to every topic on the message bus +it is connected to. It has two tasks. The first is to place all incoming +messages into a RabbitMQ message queue. The second is to manage the FMN caches. + +FMN makes heavy use of caches since it needs to know who owns what packages and +what user notification preferences are, both of which require expensive API +queries to `FAS`_, `pkgdb`_, or the database. + +.. _fedmsg consumer: http://www.fedmsg.com/en/latest/consuming/#the-hub-consumer-approach +.. _FAS: https://admin.fedoraproject.org/accounts/ +.. _pkgdb: https://admin.fedoraproject.org/pkgdb/ +""" + +import logging + +import fedmsg.consumers +import kombu + +import fmn.lib +import fmn.rules.utils +from fmn import config +from fmn.celery import RELOAD_CACHE_EXCHANGE_NAME +from .util import ( + new_packager, + new_badges_user, + get_fas_email, + get_fasjson_email +) +from fmn.tasks import find_recipients, REFRESH_CACHE_TOPIC, heat_fas_cache + + +log = logging.getLogger("fmn") +_log = logging.getLogger(__name__) + + +class FMNConsumer(fedmsg.consumers.FedmsgConsumer): + """ + A `fedmsg consumer`_ that subscribes to all topics and re-publishes all + messages to the ``workers`` exchange. + + Attributes: + topic (str): The topics this consumer is subscribed to. Set to ``*`` + (all topics). + config_key (str): The key to set to ``True`` in the fedmsg config to + enable this consumer. The key is ``fmn.consumer.enabled``. + """ + config_key = 'fmn.consumer.enabled' + + def __init__(self, hub, *args, **kwargs): + self.topic = config.app_conf['fmn.topics'] + + _log.info("FMNConsumer initializing") + super(FMNConsumer, self).__init__(hub, *args, **kwargs) + + self.uri = config.app_conf['fmn.sqlalchemy.uri'] + self.autocreate = config.app_conf['fmn.autocreate'] + self.junk_suffixes = config.app_conf['fmn.junk_suffixes'] + self.ignored_copr_owners = config.app_conf['ignored_copr_owners'] + + heat_fas_cache.apply_async() + + _log.info("Loading rules from fmn.rules") + self.valid_paths = fmn.lib.load_rules(root="fmn.rules") + + session = self.make_session() + session.close() + + _log.info("FMNConsumer initialized") + + def make_session(self): + """ + Initialize the database session and return it. + + Returns: + sqlalchemy.orm.scoping.scoped_session: An SQLAlchemy scoped session. + Calling it returns the current Session, creating it using the + scoped_session.session_factory if not present. + """ + return fmn.lib.models.init(self.uri) + + def consume(self, raw_msg): + """ + This method is called when a message arrives on the fedmsg bus. + + Args: + raw_msg (dict): The raw fedmsg deserialized to a Python dictionary. + """ + session = self.make_session() + try: + self.work(session, raw_msg) + session.commit() # transaction is committed here + except: + session.rollback() # rolls back the transaction + raise + + def work(self, session, raw_msg): + """ + This method is called when a message arrives on the fedmsg bus by the + :meth:`.consume` method. + + Args: + session (sqlalchemy.orm.session.Session): The SQLAlchemy session to use. + raw_msg (dict): The raw fedmsg deserialized to a Python dictionary. + """ + topic, msg = raw_msg['topic'], raw_msg['body'] + + for suffix in self.junk_suffixes: + if topic.endswith(suffix): + log.debug("Dropping %r", topic) + return + + # Ignore high-usage COPRs + if topic.startswith('org.fedoraproject.prod.copr.') and \ + msg['msg'].get('owner') in self.ignored_copr_owners: + log.debug('Dropping COPR %r by %r' % (topic, msg['msg']['owner'])) + return + + _log.info("FMNConsumer received %s %s", msg['msg_id'], msg['topic']) + + # First, do some cache management. This can be confusing because there + # are two different caches, with two different mechanisms, storing two + # different kinds of data. The first is a simple python dict that + # contains the 'preferences' from the fmn database. The second is a + # dogpile.cache (potentially stored in memcached, but configurable from + # /etc/fedmsg.d/). The dogpile.cache cache stores pkgdb2 + # package-ownership relations. Both caches are held for a very long + # time and update themselves dynamically here. + + if '.fmn.' in topic: + openid = msg['msg']['openid'] + _log.info('Broadcasting message to Celery workers to update cache for %s', openid) + find_recipients.apply_async( + ({'topic': 'fmn.internal.refresh_cache', 'body': openid},), + exchange=RELOAD_CACHE_EXCHANGE_NAME, + routing_key=config.app_conf['celery']['task_default_queue'], + ) + + # If a user has tweaked something in the pkgdb2 db, then invalidate our + # dogpile cache.. but only the parts that have something to do with any + # one of the users involved in the pkgdb2 interaction. Note that a + # 'username' here could be an actual username, or a group name like + # 'group::infra-sig'. + if '.pkgdb.' in topic: + usernames = fedmsg.meta.msg2usernames(msg, **config.app_conf) + for username in usernames: + log.info("Invalidating pkgdb2 dogpile cache for %r" % username) + target = fmn.rules.utils.get_packages_of_user + fmn.rules.utils.invalidate_cache_for( + config.app_conf, target, username) + + # Create a local account with all the default rules if a user is + # identified by one of our 'selectors'. Here we can add all kinds of + # new triggers that should create new FMN accounts. At this point in + # time we only create new accounts if 1) a new user is added to the + # packager group or 2) someone logs into badges.fp.o for the first + # time. + if self.autocreate: + selectors = [new_packager, new_badges_user] + candidates = [fn(topic, msg) for fn in selectors] + for username in candidates: + if not username: + continue + log.info("Autocreating account for %r" % username) + openid = '%s.id.fedoraproject.org' % username + openid_url = 'https://%s.id.fedoraproject.org' % username + fasjson = config.app_conf.get("fasjson", {}).get("active") + if fasjson: + email = get_fasjson_email(config.app_conf, username) + else: + email = get_fas_email(config.app_conf, username) + user = fmn.lib.models.User.get_or_create( + session, openid=openid, openid_url=openid_url, + create_defaults=True, detail_values=dict(email=email), + ) + session.add(user) + session.commit() + _log.info('Broadcasting message to Celery workers to update cache for %s', openid) + find_recipients.apply_async( + ({'topic': REFRESH_CACHE_TOPIC, 'body': openid},), + exchange=RELOAD_CACHE_EXCHANGE_NAME, + ) + + # Do the same dogpile.cache invalidation trick that we did above, but + # here do it for fas group membership changes. (This is important + # because someone could be in a group like the infra-sig which itself + # has package-ownership relations in pkgdb. If membership in that + # group changes we need to sync fas relationships to catch up and route + # messages to the new group members). + if '.fas.group.' in topic: + usernames = fedmsg.meta.msg2usernames(msg, **config.app_conf) + for username in usernames: + log.info("Invalidating fas cache for %r" % username) + target = fmn.rules.utils.get_groups_of_user + fmn.rules.utils.invalidate_cache_for(config.app_conf, target, username) + + # Finding recipients is computationally quite expensive so it's handled + # by Celery worker processes. The results are then dropped into an AMQP + # queue and processed by the backends. + try: + find_recipients.apply_async((raw_msg,)) + except kombu.exceptions.OperationalError: + _log.exception('Dispatching task to find recipients failed') + + def stop(self): + """ + Gracefully halt this fedmsg consumer. + """ + log.info("Cleaning up FMNConsumer.") + super(FMNConsumer, self).stop() diff --git a/roles/notifs/backend/files/fasjson-port/fasjson_client.py b/roles/notifs/backend/files/fasjson-port/fasjson_client.py new file mode 100644 index 0000000000..fc14f38ece --- /dev/null +++ b/roles/notifs/backend/files/fasjson-port/fasjson_client.py @@ -0,0 +1,62 @@ +import logging + +import requests +import requests.exceptions +from gssapi import Credentials, exceptions +from requests.compat import urlencode, urljoin +from requests_gssapi import HTTPSPNEGOAuth + + +log = logging.getLogger(__name__) + + +class Client(object): + """ + A fasjson client to make very specific requests to fasjson. + Necessary because the official fasjson-client library does not support + python2. + """ + def __init__(self, url, principal=None): + self.url = url + self.principal = principal + try: + creds = Credentials(usage="initiate") + except exceptions.GSSError as e: + log.error("GSError. Unable to create credentials store.", e) + gssapi_auth = HTTPSPNEGOAuth(opportunistic_auth=True, creds=creds) + self.session = requests.Session() + self.session.auth = gssapi_auth + + def search(self, email): + """ + A very limited search built to only serve fmn's requirement of + finding a user based on an email. + """ + # email must be an exact match in fasjson, so we will either have + # 1 result or empty result + search_string = "search/users" + "?" + urlencode({"email": email}) + endpoint = urljoin(self.url, search_string) + + return self.session.get(endpoint).json() + + def get_user(self, username): + """ + Get a specific user based on their username + """ + url_string = "users/" + username + "/" + endpoint = urljoin(self.url, url_string) + + return self.session.get(endpoint).json() + + def list_all_entities(self, ent_name): + """ + Return all entities of a certain type. In fmn's case it is users. + """ + endpoint = urljoin(self.url, ent_name + "/") + + next_page_url = endpoint + "?" + urlencode({"page_number": 1}) + while next_page_url: + res = self.session.get(next_page_url).json() + for item in res["result"]: + yield item + next_page_url = res.get("page", {}).get("next_page") diff --git a/roles/notifs/backend/files/fasjson-port/fmn_fasshim.py b/roles/notifs/backend/files/fasjson-port/fmn_fasshim.py new file mode 100644 index 0000000000..9ef9c1217a --- /dev/null +++ b/roles/notifs/backend/files/fasjson-port/fmn_fasshim.py @@ -0,0 +1,198 @@ +from __future__ import print_function + +import logging +import socket +import string +import requests + +import fedmsg +import fedmsg.meta +import fedora.client +import fedora.client.fas2 +from dogpile.cache import make_region + +from fmn import config +from .fasjson_client import Client + +fedmsg.meta.make_processors(**config.app_conf) + +_cache = make_region( + key_mangler=lambda key: "fmn.consumer:dogpile:" + key +).configure(**config.app_conf['fmn.rules.cache'].copy()) + +log = logging.getLogger("moksha.hub") + +default_url = 'https://admin.fedoraproject.org/accounts/' +creds = config.app_conf['fas_credentials'] + +fasjson = config.app_conf['fasjson'] +if fasjson.get('active'): + client = Client(url=fasjson.get('url', default_url)) +else: + client = fedora.client.fas2.AccountSystem( + base_url=creds.get('base_url', default_url), + username=creds['username'], + password=creds['password'], + ) + + +def make_fasjson_cache(**config): + log.warning("Building the FASJSON cache into redis.") + if _cache.get('fas_cache_built'): + log.warning("FASJSON cache already built into redis.") + return + global client + try: + _add_to_cache(list(client.list_all_entities("users"))) + except requests.exceptions.RequestException as e: + log.error("Something went wrong building cache with error: %s" % e) + return + + _cache.set('fas_cache_built', True) + + +def make_fas_cache(**config): + log.warning("Building the FAS cache into redis.") + if _cache.get('fas_cache_built'): + log.warning("FAS cache already built into redis.") + return + + global client + timeout = socket.getdefaulttimeout() + for key in string.ascii_lowercase: + socket.setdefaulttimeout(600) + try: + log.info("Downloading FAS cache for %s*" % key) + request = client.send_request( + '/user/list', + req_params={ + 'search': '%s*' % key, + 'status': 'active' + }, + auth=True) + except fedora.client.ServerError as e: + log.warning("Failed to download fas cache for %s %r" % (key, e)) + return {} + finally: + socket.setdefaulttimeout(timeout) + + log.info("Caching necessary user data") + for user in request['people']: + nick = user['ircnick'] + if nick: + _cache.set(str(nick), user['username']) + + email = user['email'] + if email: + _cache.set(str(email), user['username']) + + del request + + _cache.set('fas_cache_built', True) + + +def _add_to_cache(users): + for user in users: + nicks = user.get('ircnicks', []) + for nick in nicks: + _cache.set(nick, user['username']) + + emails = user.get('emails', []) + for email in emails: + _cache.set(email, user['username']) + + +def update_nick(username): + global client + if config.get('fasjson'): + try: + log.info("Downloading FASJSON cache for %s*" % username) + response = client.get_user(username=username) + _add_to_cache([response["result"]]) + except requests.exceptions.RequestException as e: + log.error("Something went wrong updating the cache with error: %s" % e) + else: + try: + log.info("Downloading FAS cache for %s*" % username) + request = client.send_request( + '/user/list', + req_params={'search': '%s' % username}, + auth=True) + except fedora.client.ServerError as e: + log.warning( + "Failed to download fas cache for %s: %r" % (username, e)) + return {} + + log.info("Caching necessary data for %s" % username) + for user in request['people']: + nick = user['ircnick'] + if nick: + _cache.set(nick, user['username']) + + email = user['email'] + if email: + _cache.set(email, user['username']) + else: + # If we couldn't find the nick in FAS, save it in the _cache as nick + # so that we avoid calling FAS for every single filter we have to + # run through + _cache.set(username, username) + + +def update_email(email): + global client + if config.get('fasjson'): + try: + log.info("Downloading FASJSON cache for %s*" % email) + response = client.search(email=email) + _add_to_cache(response['result']) + except requests.exceptions.RequestException as e: + log.error("Something went wrong updating the cache with error: %s" % e) + else: + try: + log.info("Downloading FAS cache for %s" % email) + request = client.send_request( + '/user/list', + req_params={ + 'search': '%s' % email, + 'by_email': 1, + }, + auth=True) + except fedora.client.ServerError as e: + log.warning( + "Failed to download fas cache for %s: %r" % (email, e)) + return {} + + log.info("Caching necessary data for %s" % email) + for user in request['people']: + nick = user['ircnick'] + if nick: + _cache.set(nick, user['username']) + + email = user['email'] + if email: + _cache.set(email, user['username']) + else: + # If we couldn't find the email in FAS, save it in the _cache as + # email so that we avoid calling FAS for every single filter we + # have to run through + _cache.set(email, email) + + +def nick2fas(nickname, **config): + result = _cache.get(nickname) + if not result: + update_nick(nickname) + result = _cache.get(nickname) + return result or nickname + + +def email2fas(email, **config): + if email.endswith('@fedoraproject.org'): + return email.rsplit('@', 1)[0] + + result = _cache.get(email) + if not result: + update_email(email) + result = _cache.get(email) + return result or email diff --git a/roles/notifs/backend/files/fasjson-port/tasks.py b/roles/notifs/backend/files/fasjson-port/tasks.py new file mode 100644 index 0000000000..3fb5a6144e --- /dev/null +++ b/roles/notifs/backend/files/fasjson-port/tasks.py @@ -0,0 +1,475 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the FMN project. +# Copyright (C) 2017 Red Hat, Inc. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +""" +This module contains the `Celery tasks`_ used by FMN. + +.. _Celery tasks: http://docs.celeryproject.org/en/latest/ +""" + +from __future__ import absolute_import + +import datetime + +from celery.utils.log import get_task_logger +from fedmsg_meta_fedora_infrastructure import fasshim +from kombu import Connection, Queue +from kombu.pools import connections +from celery import task +import fedmsg +import fedmsg.meta +import fedmsg_meta_fedora_infrastructure +import sqlalchemy + +from . import config, lib as fmn_lib, formatters, exceptions +from . import fmn_fasshim +from .lib import models +from .celery import app +from .constants import BACKEND_QUEUE_PREFIX + + +__all__ = ['find_recipients'] + + +_log = get_task_logger(__name__) + + +REFRESH_CACHE_TOPIC = 'fmn.internal.refresh_cache' + + +# Monkey patch fedmsg_meta modules +fasshim.nick2fas = fmn_fasshim.nick2fas +fasshim.email2fas = fmn_fasshim.email2fas +fedmsg_meta_fedora_infrastructure.supybot.nick2fas = fmn_fasshim.nick2fas +fedmsg_meta_fedora_infrastructure.anitya.email2fas = fmn_fasshim.email2fas +fedmsg_meta_fedora_infrastructure.bz.email2fas = fmn_fasshim.email2fas +fedmsg_meta_fedora_infrastructure.mailman3.email2fas = fmn_fasshim.email2fas +fedmsg_meta_fedora_infrastructure.pagure.email2fas = fmn_fasshim.email2fas + + +class _FindRecipients(task.Task): + """A Celery task sub-class that loads and caches user preferences.""" + + name = 'fmn.tasks.find_recipients' + # Retry tasks every hour for 60 days before giving up + default_retry_delay = 3600 + max_retries = 1440 + autoretry_for = (Exception,) + + def __init__(self): + """ + Initialize caches and other resources for the tasks that require user preferences. + + This is run once per process, not per task. + """ + _log.info('Initializing the "%s" task', self.name) + fedmsg.meta.make_processors(**config.app_conf) + self._valid_paths = None + self._user_preferences = None + _log.info('Initialization complete for the "%s" task', self.name) + + @property + def valid_paths(self): + """ + A property that lazy-loads the valid paths for FMN rules. + + This is done here rather in ``__init__`` so that users of this task + don't load all the valid paths when the task is registered with + Celery. + """ + if self._valid_paths is None: + _log.info('Loading valid FMN rule paths') + self._valid_paths = fmn_lib.load_rules(root="fmn.rules") + _log.info('All FMN rule paths successfully loaded') + return self._valid_paths + + @property + def user_preferences(self): + """ + A property that lazy-loads the user preferences. + + This is done here rather in ``__init__`` so that users of this task + don't load all the user preferences when the task is registered with + Celery. + """ + if self._user_preferences is None: + _log.info('Loading all user preferences from the database') + self._user_preferences = fmn_lib.load_preferences( + cull_disabled=True, cull_backends=['desktop']) + _log.info('All user preferences successfully loaded from the database') + return self._user_preferences + + def run(self, message): + """ + A Celery task that finds a list of recipients for a message. + + When the recipients have been found, it publishes an AMQP message for each + context (backend) in the format:: + + { + 'context': , + 'recipients': [ + { + "triggered_by_links": true, + "markup_messages": false, + "user": "jcline.id.fedoraproject.org", + "filter_name": "firehose", + "filter_oneshot": false, + "filter_id": 7, + "shorten_links": false, + "verbose": true, + }, + ] + 'raw_msg': the message that this task handled, + } + + + Args: + self (celery.Task): The instance of the Task object this function is bound to. + message (dict): A fedmsg to find recipients for. + """ + _log.debug('Determining recipients for message "%r"', message) + topic, message_body = message['topic'], message['body'] + + # We send a fake message with this topic as a broadcast to all workers in order for them + # to refresh their caches, so if this message is a cache refresh notification stop early. + if topic == REFRESH_CACHE_TOPIC: + _log.info('Refreshing the user preferences for %s', message_body) + fmn_lib.update_preferences(message_body, self.user_preferences) + return + + results = fmn_lib.recipients( + self.user_preferences, message_body, self.valid_paths, config.app_conf) + _log.info('Found %s recipients for message %s', sum(map(len, results.values())), + message_body.get('msg_id', topic)) + + self._queue_for_delivery(results, message) + + def _queue_for_delivery(self, results, message): + """ + Queue a processed message for delivery to its recipients. + + The message is either delivered to the default AMQP exchange with the 'backends' + routing key or placed in the database if the user has enabled batch delivery. If + it is placed in the database, the :func:`batch_messages` task will handle its + delivery. + + Message format:: + { + "context": "email", + "recipient": dict, + "fedmsg": dict, + "formatted_message": + } + + Args: + results (dict): A dictionary where the keys are context names and the values are + a list of recipients for that context. A recipient entry in the list is a + dictionary. See :func:`fmn.lib.recipients` for the dictionary format. + message (dict): The raw fedmsg to humanize and deliver to the given recipients. + """ + broker_url = config.app_conf['celery']['broker'] + + with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn: + producer = conn.Producer() + for context, recipients in results.items(): + _log.info('Dispatching messages for %d recipients for the %s backend', + len(recipients), context) + for recipient in recipients: + _maybe_mark_filter_fired(recipient) + + user = recipient['user'] + preference = self.user_preferences['{}_{}'.format(user, context)] + if _batch(preference, context, recipient, message): + continue + + formatted_message = _format(context, message, recipient) + + _log.info('Queuing message for delivery to %s on the %s backend', user, context) + backend_message = { + "context": context, + "recipient": recipient, + "fedmsg": message, + "formatted_message": formatted_message, + } + routing_key = BACKEND_QUEUE_PREFIX + context + producer.publish(backend_message, routing_key=routing_key, + declare=[Queue(routing_key, durable=True)]) + + +def _maybe_mark_filter_fired(recipient): + """ + If the filter was a one-shot filter, try to mark it as triggered. If that fails, + log the error and continue since there's not much else to be done. + + Args: + recipient (dict): The recipient dictionary. + """ + + if ('filter_oneshot' in recipient and recipient['filter_oneshot']): + _log.info('Marking one-time filter as fired') + session = models.Session() + idx = recipient['filter_id'] + try: + fltr = models.Filter.query.get(idx) + fltr.fired(session) + session.commit() + except (sqlalchemy.exc.SQLAlchemyError, AttributeError): + _log.exception('Unable to mark one-shot filter (id %s) as fired', idx) + session.rollback() + finally: + models.Session.remove() + + +def _batch(preference, context, recipient, message): + """ + Batch the message if the user wishes it. + + Args: + preference (dict): The user's preferences in dictionary form. + context (str): The context to batch it for. + recipient (dict): The recipient dictionary. + message (dict): The fedmsg to batch. + """ + if preference.get('batch_delta') or preference.get('batch_count'): + _log.info('User "%s" has batch delivery set; placing message in database', + recipient['user']) + session = models.Session() + try: + models.QueuedMessage.enqueue(session, recipient['user'], context, message) + session.commit() + return True + except sqlalchemy.exc.SQLAlchemyError: + _log.exception('Unable to queue message for batch delivery') + session.rollback() + finally: + models.Session.remove() + + return False + + +def _format(context, message, recipient): + """ + Format the message(s) using the context and recipient to determine settings. + + Args: + context (str): The name of the context; this is used to determine what formatter + function to use. + message (dict or list): A fedmsg or list of fedmsgs to format. + recipient (dict): A recipient dictionary passed on to the formatter function. + + Raises: + FmnError: If the message could not be formatted. + """ + formatted_message = None + + # If it's a dictionary, it's a single message that doesn't need batching + if isinstance(message, dict): + if context == 'email': + formatted_message = formatters.email(message['body'], recipient) + elif context == 'irc': + formatted_message = formatters.irc(message['body'], recipient) + elif context == 'sse': + try: + formatted_message = formatters.sse(message['body'], recipient) + except Exception: + _log.exception('An exception occurred formatting the message ' + 'for delivery: falling back to sending the raw fedmsg') + formatted_message = message + elif isinstance(message, list): + if context == 'email': + formatted_message = formatters.email_batch( + [m['body'] for m in message], recipient) + elif context == 'irc': + formatted_message = formatters.irc_batch( + [m['body'] for m in message], recipient) + + if formatted_message is None: + raise exceptions.FmnError( + 'The message was not formatted in any way, aborting!') + + return formatted_message + + +@app.task(name='fmn.tasks.batch_messages', ignore_results=True) +def batch_messages(): + """ + A task that collects all messages ready for batch delivery and queues them. + + Messages for users of the batch feature are placed in the database by the + :func:`find_recipients` task. Those messages are then picked up by this task, + turned into a summary using the :mod:`fmn.formatters` module, and placed in + the delivery service's AMQP queue. + + This is intended to be run as a periodic task using Celery's beat service. + """ + session = models.Session() + try: + broker_url = config.app_conf['celery']['broker'] + with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn: + producer = conn.Producer() + for pref in models.Preference.list_batching(session): + if not _batch_ready(pref): + continue + + queued_messages = models.QueuedMessage.list_for( + session, pref.user, pref.context) + _log.info('Batching %d queued messages for %s', + len(queued_messages), pref.user.openid) + + messages = [m.message for m in queued_messages] + recipients = [ + { + pref.context.detail_name: value.value, + 'user': pref.user.openid, + 'markup_messages': pref.markup_messages, + 'triggered_by_links': pref.triggered_by_links, + 'shorten_links': pref.shorten_links, + } + for value in pref.detail_values + ] + for recipient in recipients: + try: + formatted_message = _format(pref.context.name, messages, recipient) + except exceptions.FmnError: + _log.error('A batch message for %r was not formatted, skipping!', + recipient) + continue + + backend_message = { + "context": pref.context.name, + "recipient": recipient, + "fedmsg": messages, + "formatted_message": formatted_message, + } + routing_key = BACKEND_QUEUE_PREFIX + pref.context.name + producer.publish(backend_message, routing_key=routing_key, + declare=[Queue(routing_key, durable=True)]) + + for message in queued_messages: + message.dequeue(session) + session.commit() + except sqlalchemy.exc.SQLAlchemyError: + _log.exception('Failed to dispatch queued messages for delivery') + session.rollback() + finally: + models.Session.remove() + + +def _batch_ready(preference): + """ + Determine if a message batch is ready for a user. + + Args: + preference (models.Preference): The user preference entry which + contains the user's batch preferences. + Returns: + bool: True if there's a batch ready. + """ + session = models.Session() + try: + count = models.QueuedMessage.count_for(session, preference.user, preference.context) + if not count: + return False + + # Batch based on count + if preference.batch_count is not None and preference.batch_count <= count: + _log.info("Sending digest for %r per msg count", preference.user.openid) + return True + + # Batch based on time + earliest = models.QueuedMessage.earliest_for( + session, preference.user, preference.context) + now = datetime.datetime.utcnow() + delta = datetime.timedelta.total_seconds(now - earliest.created_on) + if preference.batch_delta is not None and preference.batch_delta <= delta: + _log.info("Sending digest for %r per time delta", preference.user.openid) + return True + except sqlalchemy.exc.SQLAlchemyError: + _log.exception('Failed to determine if the batch is ready for %s', preference.user) + session.rollback() + + return False + + +@app.task(name='fmn.tasks.heat_fas_cache', ignore_results=True) +def heat_fas_cache(): # pragma: no cover + """ + Fetch all users from FAS and populate the local Redis cache. + + This is helpful to do once on startup since we'll need everyone's email or + IRC nickname eventually. + """ + if config.app_conf['fasjson'].get('active'): + fmn_fasshim.make_fasjson_cache(**config.app_conf) + else: + fmn_fasshim.make_fas_cache(**config.app_conf) + + +@app.task(name='fmn.tasks.confirmations', ignore_results=True) +def confirmations(): + """ + Load all pending confirmations, create formatted messages, and dispatch them to the + delivery service. + + This is intended to be dispatched regularly via celery beat. + """ + session = models.Session() + try: + models.Confirmation.delete_expired(session) + pending = models.Confirmation.query.filter_by(status='pending').all() + broker_url = config.app_conf['celery']['broker'] + with connections[Connection(broker_url)].acquire(block=True, timeout=60) as conn: + producer = conn.Producer() + for confirmation in pending: + message = None + if confirmation.context.name == 'email': + message = formatters.email_confirmation(confirmation) + else: + # The way the irc backend is currently written, it has to format the + # confirmation itself. For now, just send an empty message, but in the + # future it may be worth refactoring the irc backend to let us format here. + message = '' + recipient = { + confirmation.context.detail_name: confirmation.detail_value, + 'user': confirmation.user.openid, + 'triggered_by_links': False, + 'confirmation': True, + } + backend_message = { + "context": confirmation.context.name, + "recipient": recipient, + "fedmsg": {}, + "formatted_message": message, + } + _log.info('Dispatching confirmation message for %r', confirmation) + confirmation.set_status(session, 'valid') + routing_key = BACKEND_QUEUE_PREFIX + confirmation.context.name + producer.publish(backend_message, routing_key=routing_key, + declare=[Queue(routing_key, durable=True)]) + session.commit() + except sqlalchemy.exc.SQLAlchemyError: + _log.exception('Unable to handle confirmations') + session.rollback() + finally: + models.Session.remove() + + +#: A Celery task that accepts a message as input and determines the recipients. +find_recipients = app.tasks[_FindRecipients.name] diff --git a/roles/notifs/backend/files/fasjson-port/util.py b/roles/notifs/backend/files/fasjson-port/util.py new file mode 100644 index 0000000000..bb9c60d062 --- /dev/null +++ b/roles/notifs/backend/files/fasjson-port/util.py @@ -0,0 +1,52 @@ +import fedora.client +import fasjson_client + +import logging +log = logging.getLogger("fmn") + + +def new_packager(topic, msg): + """ Returns a username if the message is about a new packager in FAS. """ + if '.fas.group.member.sponsor' in topic: + group = msg['msg']['group'] + if group == 'packager': + return msg['msg']['user'] + return None + + +def new_badges_user(topic, msg): + """ Returns a username if the message is about a new fedbadges user. """ + if '.fedbadges.person.login.first' in topic: + return msg['msg']['user']['username'] + return None + + +def get_fas_email(config, username): + """ Return FAS email associated with a username. + + We use this to try and get the right email for new autocreated users. + We used to just use $USERNAME@fp.o, but when first created most users don't + have that alias available yet. + """ + try: + fas = fedora.client.AccountSystem(**config['fas_credentials']) + person = fas.person_by_username(username) + if person.get('email'): + return person['email'] + raise ValueError("No email found: %r" % username) + except Exception: + log.exception("Failed to get FAS email for %r" % username) + return '%s@fedoraproject.org' % username + + +def get_fasjson_email(config, username): + """ Return FASJSON email associated with a username. """ + try: + fasjson = config["fasjson"] + client = fasjson_client.Client(url=fasjson.get('url')) + person = client.get_user(username=username).result + + return person.get('emails')[0] + except Exception: + log.exception("Failed to get FASJSON email for %r" % username) + return '%s@fedoraproject.org' % username diff --git a/roles/notifs/backend/tasks/main.yml b/roles/notifs/backend/tasks/main.yml index 9af37829f4..40da387f5d 100644 --- a/roles/notifs/backend/tasks/main.yml +++ b/roles/notifs/backend/tasks/main.yml @@ -7,6 +7,8 @@ - python-fmn - python-psycopg2 - libsemanage-python + - python-gssapi + - python-requests-gssapi # Needed to produce nice long emails about koji builds - koji tags: @@ -25,6 +27,19 @@ - notifs - notifs/backend +- name: Hotfix fmn for fasjson + copy: > + src=fasjson-port/{{ item }} dest=/usr/lib/python2.7/site-packages/fmn/{{ item }} + with_items: + - consumer.py + - fasjson_client.py + - fmn_fasshim.py + - tasks.py + - util.py + tags: + - notifs + - notifs/backend + - name: Install fmn SSE configuration template: > src={{ item }} dest=/etc/fedmsg.d/{{ item }} diff --git a/roles/notifs/backend/templates/fmn.consumer.py b/roles/notifs/backend/templates/fmn.consumer.py index 15be2ebdfa..09db8d05c3 100644 --- a/roles/notifs/backend/templates/fmn.consumer.py +++ b/roles/notifs/backend/templates/fmn.consumer.py @@ -97,6 +97,10 @@ config = { "base_url": "https://admin.stg.fedoraproject.org/accounts", }, {% else -%} + "fasjson": { + "active": True, + "url": "https://fasjson.fedoraproject.org/v1/" + } "fas_credentials": { "username": "{{fedoraDummyUser}}", "password": "{{fedoraDummyUserPassword}}",