diff --git a/roles/collectd/fedmsg-activation/files/fedmsg-map.py b/roles/collectd/fedmsg-activation/files/fedmsg-map.py index 190d14923b..fffdabe513 100644 --- a/roles/collectd/fedmsg-activation/files/fedmsg-map.py +++ b/roles/collectd/fedmsg-activation/files/fedmsg-map.py @@ -6,6 +6,7 @@ Reports what percentage of fedmsg endpoints are bound and ready. import base64 import collections +import multiprocessing.pool import socket import sys import time @@ -18,39 +19,43 @@ expected = '/wAAAAAAAAABfw==' for_collectd = 'verbose' not in sys.argv +active = collections.defaultdict(list) +inactive = collections.defaultdict(list) + +pool = multiprocessing.pool.ThreadPool(10) def info(content="\n"): if not for_collectd: sys.stdout.write(content) sys.stdout.flush() - -def do_scan(): - active = collections.defaultdict(list) - inactive = collections.defaultdict(list) - - for i, item in enumerate(config['endpoints'].items()): - name, endpoints = item - for endpoint in endpoints: - if not endpoint.startswith('tcp://'): - raise ValueError("Don't know how to deal with %r" % endpoint) - endpoint = endpoint[len('tcp://'):].split(':') - connection = None - try: - connection = socket.create_connection(endpoint, timeout) - actual = base64.b64encode(connection.recv(10)) - if actual != expected: - inactive[name].append(( - endpoint, "%r is not %r" % (actual, expected))) - info("F") - else: - active[name].append((endpoint, "all active")) - info(".") - except socket.error as e: - inactive[name].append((endpoint, str(e))) +def scan_one(item): + name, endpoints = item + for endpoint in endpoints: + if not endpoint.startswith('tcp://'): + raise ValueError("Don't know how to deal with %r" % endpoint) + endpoint = endpoint[len('tcp://'):].split(':') + connection = None + try: + connection = socket.create_connection(endpoint, timeout) + actual = base64.b64encode(connection.recv(10)) + if actual != expected: + inactive[name].append(( + endpoint, "%r is not %r" % (actual, expected))) info("F") - if connection: - connection.close() + else: + active[name].append((endpoint, "all active")) + info(".") + except socket.error as e: + inactive[name].append((endpoint, str(e))) + info("F") + if connection: + connection.close() + + +def scan_all(): + + pool.map(scan_one, config['endpoints'].items()) info() @@ -101,13 +106,13 @@ def do_scan(): return value if not for_collectd: - do_scan() + scan_all() else: - interval = 10 + interval = 5 host = socket.getfqdn() while True: start = time.time() - value = do_scan() + value = scan_all() stop = timestamp = time.time() delta = stop - start output = (