From 7142c188a01d2663ef3a02cd68211061f7bef1bf Mon Sep 17 00:00:00 2001 From: Pierre-Yves Chibon Date: Fri, 8 Jan 2021 09:56:39 +0100 Subject: [PATCH] pagure: hotfix the stream server with the py3-only version Signed-off-by: Pierre-Yves Chibon --- roles/pagure/files/pagure_stream_server.py | 308 +++++++++++++++++++++ roles/pagure/tasks/main.yml | 12 + 2 files changed, 320 insertions(+) create mode 100644 roles/pagure/files/pagure_stream_server.py diff --git a/roles/pagure/files/pagure_stream_server.py b/roles/pagure/files/pagure_stream_server.py new file mode 100644 index 0000000000..64462140f2 --- /dev/null +++ b/roles/pagure/files/pagure_stream_server.py @@ -0,0 +1,308 @@ +#!/usr/bin/env python + +""" + (c) 2015-2017 - Copyright Red Hat Inc + + Authors: + Pierre-Yves Chibon + + +Streaming server for pagure's eventsource feature +This server takes messages sent to redis and publish them at the specified +endpoint + +To test, run this script and in another terminal +nc localhost 8080 + HELLO + + GET /test/issue/26?foo=bar HTTP/1.1 + +""" + +from __future__ import unicode_literals, absolute_import + +import logging +import os + + +import redis +import asyncio + +from six.moves.urllib.parse import urlparse + +log = logging.getLogger(__name__) + + +if "PAGURE_CONFIG" not in os.environ and os.path.exists( + "/etc/pagure/pagure.cfg" +): + print("Using configuration file `/etc/pagure/pagure.cfg`") + os.environ["PAGURE_CONFIG"] = "/etc/pagure/pagure.cfg" + + +import pagure # noqa: E402 +import pagure.lib.model_base # noqa: E402 +import pagure.lib.query # noqa: E402 +from pagure.exceptions import PagureException, PagureEvException # noqa: E402 + +SERVER = None +SESSION = None +POOL = redis.ConnectionPool( + host=pagure.config.config["REDIS_HOST"], + port=pagure.config.config["REDIS_PORT"], + db=pagure.config.config["REDIS_DB"], +) + + +def _get_session(): + global SESSION + if SESSION is None: + print(pagure.config.config["DB_URL"]) + SESSION = pagure.lib.model_base.create_session( + pagure.config.config["DB_URL"] + ) + + return SESSION + + +def _get_issue(repo, objid): + """Get a Ticket (issue) instance for a given repo (Project) and + objid (issue number). + """ + issue = None + if not repo.settings.get("issue_tracker", True): + raise PagureEvException("No issue tracker found for this project") + + session = _get_session() + issue = pagure.lib.query.search_issues(session, repo, issueid=objid) + + if issue is None or issue.project != repo: + raise PagureEvException("Issue '%s' not found" % objid) + + if issue.private: + # TODO: find a way to do auth + raise PagureEvException( + "This issue is private and you are not allowed to view it" + ) + + return issue + + +def _get_pull_request(repo, objid): + """Get a PullRequest instance for a given repo (Project) and objid + (request number). + """ + if not repo.settings.get("pull_requests", True): + raise PagureEvException( + "No pull-request tracker found for this project" + ) + + session = _get_session() + request = pagure.lib.query.search_pull_requests( + session, project_id=repo.id, requestid=objid + ) + + if request is None or request.project != repo: + raise PagureEvException("Pull-Request '%s' not found" % objid) + + return request + + +# Dict representing known object types that we handle requests for, +# and the bound functions for getting an object instance from the +# parsed path data. Has to come after the functions it binds +OBJECTS = {"issue": _get_issue, "pull-request": _get_pull_request} + + +def get_obj_from_path(path): + """ Return the Ticket or Request object based on the path provided. + """ + (username, namespace, reponame, objtype, objid) = pagure.utils.parse_path( + path + ) + session = _get_session() + repo = pagure.lib.query.get_authorized_project( + session, reponame, user=username, namespace=namespace + ) + + if repo is None: + raise PagureEvException("Project '%s' not found" % reponame) + + # find the appropriate object getter function from OBJECTS + try: + getfunc = OBJECTS[objtype] + except KeyError: + raise PagureEvException("Invalid object provided: '%s'" % objtype) + + return getfunc(repo, objid) + + +@asyncio.coroutine +def handle_client(client_reader, client_writer): + data = None + while True: + # give client a chance to respond, timeout after 10 seconds + line = yield from asyncio.wait_for(client_reader.readline(), timeout=10.0) + + if not line or not line.decode().strip(): + break + line = line.decode().rstrip() + if data is None: + data = line + + if data is None: + log.warning("Expected ticket uid, received None") + return + + data = data.rstrip().split() + log.info("Received %s", data) + if not data: + log.warning("No URL provided: %s" % data) + return + + if "/" not in data[1]: + log.warning("Invalid URL provided: %s" % data[1]) + return + + url = urlparse(data[1]) + + try: + obj = get_obj_from_path(url.path) + except PagureException as err: + log.warning(err.message) + return + + origin = pagure.config.config.get("APP_URL") + if origin.endswith("/"): + origin = origin[:-1] + + client_writer.write( + ( + "HTTP/1.0 200 OK\n" + "Content-Type: text/event-stream\n" + "Cache: nocache\n" + "Connection: keep-alive\n" + "Access-Control-Allow-Origin: %s\n\n" % origin + ).encode() + ) + + conn = redis.Redis(connection_pool=POOL) + subscriber = conn.pubsub(ignore_subscribe_messages=True) + + try: + subscriber.subscribe("pagure.%s" % obj.uid) + + # Inside a while loop, wait for incoming events. + oncall = 0 + while True: + msg = subscriber.get_message() + if msg is None: + # Send a ping to see if the client is still alive + if oncall >= 5: + # Only send a ping once every 5 seconds + client_writer.write(("event: ping\n\n").encode()) + oncall = 0 + oncall += 1 + yield from client_writer.drain() + yield from asyncio.sleep(1) + else: + log.info("Sending %s", msg["data"].decode()) + client_writer.write(("data: %s\n\n" % msg["data"].decode()).encode()) + yield from client_writer.drain() + + except OSError: + log.info("Client closed connection") + except ConnectionResetError as err: + log.exception("ERROR: ConnectionResetError in handle_client") + except Exception as err: + log.exception("ERROR: Exception in handle_client") + log.info(type(err)) + finally: + # Wathever happens, close the connection. + log.info("Client left. Goodbye!") + subscriber.close() + client_writer.close() + + +@asyncio.coroutine +def stats(client_reader, client_writer): + + try: + log.info("Clients: %s", SERVER._active_count) + client_writer.write( + ("HTTP/1.0 200 OK\n" "Cache: nocache\n\n").encode() + ) + client_writer.write(("data: %s\n\n" % SERVER._active_count).encode()) + yield from client_writer.drain() + + except ConnectionResetError as err: + log.info(err) + finally: + client_writer.close() + return + + +def main(): + global SERVER + _get_session() + + try: + loop = asyncio.get_event_loop() + coro = asyncio.start_server( + handle_client, + host=None, + port=pagure.config.config["EVENTSOURCE_PORT"], + loop=loop, + ) + SERVER = loop.run_until_complete(coro) + log.info( + "Serving server at {}".format(SERVER.sockets[0].getsockname()) + ) + if pagure.config.config.get("EV_STATS_PORT"): + stats_coro = asyncio.start_server( + stats, + host=None, + port=pagure.config.config.get("EV_STATS_PORT"), + loop=loop, + ) + stats_server = loop.run_until_complete(stats_coro) + log.info( + "Serving stats at {}".format( + stats_server.sockets[0].getsockname() + ) + ) + loop.run_forever() + except KeyboardInterrupt: + pass + except ConnectionResetError as err: + log.exception("ERROR: ConnectionResetError in main") + except Exception: + log.exception("ERROR: Exception in main") + finally: + # Close the server + SERVER.close() + if pagure.config.config.get("EV_STATS_PORT"): + stats_server.close() + log.info("End Connection") + loop.run_until_complete(SERVER.wait_closed()) + loop.close() + log.info("End") + + +if __name__ == "__main__": + log = logging.getLogger("") + formatter = logging.Formatter( + "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s" + ) + + # setup console logging + log.setLevel(logging.DEBUG) + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + + aslog = logging.getLogger("asyncio") + aslog.setLevel(logging.DEBUG) + + ch.setFormatter(formatter) + log.addHandler(ch) + main() diff --git a/roles/pagure/tasks/main.yml b/roles/pagure/tasks/main.yml index 275a57d693..a4798e3177 100644 --- a/roles/pagure/tasks/main.yml +++ b/roles/pagure/tasks/main.yml @@ -412,6 +412,18 @@ tags: - selinux +# Hotfix(es) + +- name: Install the python3-only version of the stream_server so it works + copy: src=pagure_stream_server.py + dest=/usr/libexec/pagure-ev/pagure_stream_server.py + owner=root mode=0755 + tags: + - pagure + - hotfix + notify: + - reload httpd + # Ensure all the services are up and running - name: Start and enable httpd, postfix, pagure_milter