Use a threadpool to this faster; increase resolution of the collectd graph.
This commit is contained in:
parent
20e22a5005
commit
06d2517c67
1 changed files with 34 additions and 29 deletions
|
@ -6,6 +6,7 @@ Reports what percentage of fedmsg endpoints are bound and ready.
|
||||||
|
|
||||||
import base64
|
import base64
|
||||||
import collections
|
import collections
|
||||||
|
import multiprocessing.pool
|
||||||
import socket
|
import socket
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
@ -18,39 +19,43 @@ expected = '/wAAAAAAAAABfw=='
|
||||||
|
|
||||||
for_collectd = 'verbose' not in sys.argv
|
for_collectd = 'verbose' not in sys.argv
|
||||||
|
|
||||||
|
active = collections.defaultdict(list)
|
||||||
|
inactive = collections.defaultdict(list)
|
||||||
|
|
||||||
|
pool = multiprocessing.pool.ThreadPool(10)
|
||||||
|
|
||||||
def info(content="\n"):
|
def info(content="\n"):
|
||||||
if not for_collectd:
|
if not for_collectd:
|
||||||
sys.stdout.write(content)
|
sys.stdout.write(content)
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
|
|
||||||
|
def scan_one(item):
|
||||||
def do_scan():
|
name, endpoints = item
|
||||||
active = collections.defaultdict(list)
|
for endpoint in endpoints:
|
||||||
inactive = collections.defaultdict(list)
|
if not endpoint.startswith('tcp://'):
|
||||||
|
raise ValueError("Don't know how to deal with %r" % endpoint)
|
||||||
for i, item in enumerate(config['endpoints'].items()):
|
endpoint = endpoint[len('tcp://'):].split(':')
|
||||||
name, endpoints = item
|
connection = None
|
||||||
for endpoint in endpoints:
|
try:
|
||||||
if not endpoint.startswith('tcp://'):
|
connection = socket.create_connection(endpoint, timeout)
|
||||||
raise ValueError("Don't know how to deal with %r" % endpoint)
|
actual = base64.b64encode(connection.recv(10))
|
||||||
endpoint = endpoint[len('tcp://'):].split(':')
|
if actual != expected:
|
||||||
connection = None
|
inactive[name].append((
|
||||||
try:
|
endpoint, "%r is not %r" % (actual, expected)))
|
||||||
connection = socket.create_connection(endpoint, timeout)
|
|
||||||
actual = base64.b64encode(connection.recv(10))
|
|
||||||
if actual != expected:
|
|
||||||
inactive[name].append((
|
|
||||||
endpoint, "%r is not %r" % (actual, expected)))
|
|
||||||
info("F")
|
|
||||||
else:
|
|
||||||
active[name].append((endpoint, "all active"))
|
|
||||||
info(".")
|
|
||||||
except socket.error as e:
|
|
||||||
inactive[name].append((endpoint, str(e)))
|
|
||||||
info("F")
|
info("F")
|
||||||
if connection:
|
else:
|
||||||
connection.close()
|
active[name].append((endpoint, "all active"))
|
||||||
|
info(".")
|
||||||
|
except socket.error as e:
|
||||||
|
inactive[name].append((endpoint, str(e)))
|
||||||
|
info("F")
|
||||||
|
if connection:
|
||||||
|
connection.close()
|
||||||
|
|
||||||
|
|
||||||
|
def scan_all():
|
||||||
|
|
||||||
|
pool.map(scan_one, config['endpoints'].items())
|
||||||
|
|
||||||
info()
|
info()
|
||||||
|
|
||||||
|
@ -101,13 +106,13 @@ def do_scan():
|
||||||
return value
|
return value
|
||||||
|
|
||||||
if not for_collectd:
|
if not for_collectd:
|
||||||
do_scan()
|
scan_all()
|
||||||
else:
|
else:
|
||||||
interval = 10
|
interval = 5
|
||||||
host = socket.getfqdn()
|
host = socket.getfqdn()
|
||||||
while True:
|
while True:
|
||||||
start = time.time()
|
start = time.time()
|
||||||
value = do_scan()
|
value = scan_all()
|
||||||
stop = timestamp = time.time()
|
stop = timestamp = time.time()
|
||||||
delta = stop - start
|
delta = stop - start
|
||||||
output = (
|
output = (
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue