From 055645e9d3903790bc5ed7b00f2330be82a23d44 Mon Sep 17 00:00:00 2001 From: Ralph Bean Date: Sat, 9 May 2015 16:49:21 +0000 Subject: [PATCH] Pull in more fedmsg-map changes. ... from https://github.com/fedora-infra/fedmsg/pull/338 --- .../fedmsg-activation/files/fedmsg-map.py | 49 ++++++++++--------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/roles/collectd/fedmsg-activation/files/fedmsg-map.py b/roles/collectd/fedmsg-activation/files/fedmsg-map.py index fffdabe513..42c97b2a51 100644 --- a/roles/collectd/fedmsg-activation/files/fedmsg-map.py +++ b/roles/collectd/fedmsg-activation/files/fedmsg-map.py @@ -22,7 +22,7 @@ for_collectd = 'verbose' not in sys.argv active = collections.defaultdict(list) inactive = collections.defaultdict(list) -pool = multiprocessing.pool.ThreadPool(10) +pool = multiprocessing.pool.ThreadPool(25) def info(content="\n"): if not for_collectd: @@ -30,32 +30,33 @@ def info(content="\n"): sys.stdout.flush() def scan_one(item): - name, endpoints = item - for endpoint in endpoints: - if not endpoint.startswith('tcp://'): - raise ValueError("Don't know how to deal with %r" % endpoint) - endpoint = endpoint[len('tcp://'):].split(':') - connection = None - try: - connection = socket.create_connection(endpoint, timeout) - actual = base64.b64encode(connection.recv(10)) - if actual != expected: - inactive[name].append(( - endpoint, "%r is not %r" % (actual, expected))) - info("F") - else: - active[name].append((endpoint, "all active")) - info(".") - except socket.error as e: - inactive[name].append((endpoint, str(e))) + name, endpoint = item + if not endpoint.startswith('tcp://'): + raise ValueError("Don't know how to deal with %r" % endpoint) + endpoint = endpoint[len('tcp://'):].split(':') + connection = None + try: + connection = socket.create_connection(endpoint, timeout) + actual = base64.b64encode(connection.recv(10)) + if actual != expected: + inactive[name].append(( + endpoint, "%r is not %r" % (actual, expected))) info("F") - if connection: - connection.close() + else: + active[name].append((endpoint, "all active")) + info(".") + except socket.error as e: + inactive[name].append((endpoint, str(e))) + info("F") + if connection: + connection.close() def scan_all(): - - pool.map(scan_one, config['endpoints'].items()) + items = [(name, addr) + for name, endpoints in config['endpoints'].items() + for addr in endpoints] + pool.map(scan_one, items) info() @@ -128,3 +129,5 @@ else: print(output) if interval - delta > 0: time.sleep(interval - delta) + +pool.close()