From 0ab09960c4497d19f72174a26b25d1f25c2b3f46 Mon Sep 17 00:00:00 2001 From: Kevin Fenzi Date: Wed, 28 Sep 2022 17:09:31 -0700 Subject: [PATCH] notifs / staging: hotfix python3-moksha-hub This blows up with python3 utf8 Signed-off-by: Kevin Fenzi --- roles/notifs/backend/files/consumer.py | 252 +++++++++++++++++++++++++ roles/notifs/backend/tasks/main.yml | 12 ++ 2 files changed, 264 insertions(+) create mode 100644 roles/notifs/backend/files/consumer.py diff --git a/roles/notifs/backend/files/consumer.py b/roles/notifs/backend/files/consumer.py new file mode 100644 index 0000000000..e35211417b --- /dev/null +++ b/roles/notifs/backend/files/consumer.py @@ -0,0 +1,252 @@ +# This file is part of Moksha. +# Copyright (C) 2008-2014 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +:mod:`moksha.hub.api.consumer` - The Moksha Consumer API +======================================================== +Moksha provides a simple API for creating "consumers" of message topics. + +This means that your consumer is instantiated when the MokshaHub is initially +loaded, and receives each message for the specified topic through the +:meth:`Consumer.consume` method. + +.. moduleauthor:: Luke Macken +.. moduleauthor:: Ralph Bean +""" + +import json +import threading +import time +import logging +log = logging.getLogger('moksha.hub') + +import six.moves.queue as queue +from collections import deque + +from kitchen.iterutils import iterate +from moksha.common.lib.helpers import create_app_engine +from moksha.common.lib.converters import asbool +import moksha.hub.reactor + + +class Consumer(object): + """ A message consumer """ + topic = '' + + # Automatically decode JSON data + jsonify = True + + # Internal use only + _initialized = False + _exception_count = 0 + + def __init__(self, hub): + self.hub = hub + self.log = log + + # Set up a queue to communicate between the main twisted thread + # receiving raw messages, and a worker thread that pulls items off + # the queue to do "consume" work. + self.incoming = queue.Queue() + self.headcount_in = self.headcount_out = 0 + self._times = deque(maxlen=1024) + + callback = self._consume + if self.jsonify: + callback = self._consume_json + + for topic in iterate(self.topic): + log.debug('Subscribing to consumer topic %s' % topic) + self.hub.subscribe(topic, callback) + + # If the consumer specifies an 'app', then setup `self.engine` to + # be a SQLAlchemy engine, along with a configured DBSession + app = getattr(self, 'app', None) + self.engine = self.DBSession = None + if app: + log.debug("Setting up individual engine for consumer") + from sqlalchemy.orm import sessionmaker + self.engine = create_app_engine(app, hub.config) + self.DBSession = sessionmaker(bind=self.engine)() + + self.blocking_mode = asbool(self.hub.config.get('moksha.blocking_mode', False)) + if self.blocking_mode: + log.info("Blocking mode true for %r. " + "Messages handled as they arrive." % self) + else: + self.N = int(self.hub.config.get('moksha.workers_per_consumer', 1)) + log.info("Blocking mode false for %r. " + "Messages to be queued and distributed to %r threads." % ( + self, self.N)) + for i in range(self.N): + moksha.hub.reactor.reactor.callInThread(self._work_loop) + + self._initialized = True + + def __json__(self): + if self._initialized: + backlog = self.incoming.qsize() + headcount_out = self.headcount_out + headcount_in = self.headcount_in + times = list(self._times) + else: + backlog = None + headcount_out = headcount_in = 0 + times = [] + + results = { + "name": type(self).__name__, + "module": type(self).__module__, + "topic": self.topic, + "initialized": self._initialized, + "exceptions": self._exception_count, + "jsonify": self.jsonify, + "backlog": backlog, + "headcount_out": headcount_out, + "headcount_in": headcount_in, + "times": times, + } + # Reset these counters before returning. + self.headcount_out = self.headcount_in = 0 + self._exception_count = 0 + self._times.clear() + return results + + def debug(self, message): + idx = threading.current_thread().ident + log.debug("%r thread %r | %s" % (type(self).__name__, idx, message)) + + def _consume_json(self, message): + """ Convert our AMQP messages into a consistent dictionary format. + + This method exists because our STOMP & AMQP message brokers consume + messages in different formats. This causes our messaging abstraction + to leak into the consumers themselves. + + :Note: We do not pass the message headers to the consumer (in this AMQP consumer) + because the current AMQP.js bindings do not allow the client to change them. + Thus, we need to throw any topic/queue details into the JSON body itself. + """ + try: + body = json.loads(message.body) + except: + log.debug("Unable to decode message body to JSON: %r" % message.body) + body = message.body + topic = None + + # Try some stuff for AMQP: + try: + topic = message.headers[0].routing_key + except TypeError: + # We didn't get a JSON dictionary + pass + except AttributeError: + # We didn't get headers or a routing key? + pass + + # If that didn't work, it might be zeromq + if not topic: + try: + topic = message.topic + except AttributeError: + # Weird. I have no idea... + pass + + message_as_dict = {'body': body, 'topic': topic} + return self._consume(message_as_dict) + + def _consume(self, message): + self.headcount_in += 1 + if self.blocking_mode: + # Do the work right now + return self._do_work(message) + else: + # Otherwise, put the message in a queue for other threads to handle + self.incoming.put(message) + + def _work_loop(self): + while True: + # This is a blocking call. It waits until a message is available. + message = self.incoming.get() + # Then we are being asked to quit + if message is StopIteration: + break + self._do_work(message) + self.debug("Worker thread exiting.") + + def _do_work(self, message): + self.headcount_out += 1 + start = time.time() + handled = True + + self.debug("Worker thread picking a message.") + try: + self.validate(message) + except Exception as e: + log.warning("Received invalid message %r" % e) + return False # Not handled + + try: + self.pre_consume(message) + except Exception as e: + self.log.exception(message) + + try: + self.consume(message) + except Exception as e: + handled = False # Not handled. Return this later. + self.log.exception(message) + # Keep track of how many exceptions we've hit in a row + self._exception_count += 1 + + try: + self.post_consume(message) + except Exception as e: + self.log.exception(message) + + # Record how long it took to process this message (for stats) + self._times.append(time.time() - start) + + self.debug("Going back to waiting on the incoming queue. Message handled: %r" % handled) + return handled + + def validate(self, message): + """ Override to implement your own validation scheme. """ + pass + + def pre_consume(self, message): + pass + + def consume(self, message): + raise NotImplementedError + + def post_consume(self, message): + pass + + def send_message(self, topic, message): + try: + self.hub.send_message(topic, message) + except Exception as e: + log.error('Cannot send message: %s' % e) + + def stop(self): + for i in range(getattr(self, 'N', 0)): + self.incoming.put(StopIteration) + + if hasattr(self, 'hub'): + self.hub.close() + + if getattr(self, 'DBSession', None): + self.DBSession.close() diff --git a/roles/notifs/backend/tasks/main.yml b/roles/notifs/backend/tasks/main.yml index 6d14fb0c60..c7871a25b5 100644 --- a/roles/notifs/backend/tasks/main.yml +++ b/roles/notifs/backend/tasks/main.yml @@ -51,6 +51,18 @@ when: inventory_hostname.startswith('notifs-backend02') or env == 'staging' +- name: hotfix python3-moksha-hub + copy: > + src=consumer.py dest=/usr/lib/python3.10/site-packages/moksha/hub/api/consumer.py + owner=root group=root mode=0640 + notify: + - restart fedmsg-hub + tags: + - notifs + - notifs/backend + when: + inventory_hostname.startswith('notifs-backend02') or env == 'staging' + - name: copy database configuration template: > src={{ item }} dest=/etc/fedmsg.d/{{ item }}