diff --git a/files/hotfix/mirrormanager2/mm2_crawler b/files/hotfix/mirrormanager2/mm2_crawler deleted file mode 100755 index e05f7e4fae..0000000000 --- a/files/hotfix/mirrormanager2/mm2_crawler +++ /dev/null @@ -1,1194 +0,0 @@ -#!/usr/bin/python - -import argparse -import datetime -import ftplib -from ftplib import FTP -import hashlib -import httplib -import logging -import multiprocessing.pool -import os -import smtplib -import socket -import sys -import threading -import time -import urllib2 -import urlparse -import gc - -sys.path.append('..') -import mirrormanager2.lib -from mirrormanager2.lib.model import HostCategoryDir -from mirrormanager2.lib.sync import run_rsync - - -logger = logging.getLogger("crawler") -formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") -master_formatter = "%(levelname)s:%(name)s:%(hosts)s:%(threads)s:%(hostid)s:%(hostname)s:%(message)s" -current_host = 0 -all_hosts = 0 -threads = 0 -threads_active = 0 - - -# This is a "thread local" object that allows us to store the start time of -# each worker thread (so they can measure and check if they should time out or -# not...) -threadlocal = threading.local() - -# To insert information about the number of hosts and threads in each master -# log message this filter is necessary -class MasterFilter(logging.Filter): - def filter(self, record): - record.hosts = "Hosts(%d/%d)" % (current_host, all_hosts) - record.threads = "Threads(%d/%d)" % (threads_active, threads) - try: - record.hostid = threadlocal.hostid - record.hostname = threadlocal.hostname - except: - record.hostid = 0 - record.hostname = 'master' - return True - -# This filter is necessary to enable logging per thread into a separate file -# Based on http://plumberjack.blogspot.de/2010/09/configuring-logging-for-web.html -class InjectingFilter(logging.Filter): - def __init__(self, thread_id): - self.thread_id = thread_id - - def filter(self, record): - return threadlocal.thread_id == self.thread_id - -def thread_id(): - """ Silly util that returns a git-style short-hash id of the thread. """ - return hashlib.md5(str(threading.current_thread().ident)).hexdigest()[:7] - - -def notify(options, topic, msg): - if not options.fedmsg: - return - - import fedmsg - kwargs = dict( - modname='mirrormanager', - topic='crawler.' + topic, - msg=msg, - - # These direct us to talk to a fedmsg-relay living somewhere. - active=True, - name="relay_inbound", - ) - fedmsg.publish(**kwargs) - - -def doit(options, config): - global all_hosts - global threads - - session = mirrormanager2.lib.create_session(config['DB_URL']) - - # Get *all* of the mirrors - hosts = mirrormanager2.lib.get_mirrors( - session, private=False, order_by_crawl_duration=True, - admin_active=True, user_active=True, site_private=False, - site_user_active=True, site_admin_active=True) - - # Limit our host list down to only the ones we really want to crawl - hosts = [ - host.id for host in hosts if ( - not host.id < options.startid and - not host.id >= options.stopid) - ] - - session.close() - all_hosts = len(hosts) - threads = options.threads - - # And then, for debugging, only do one host - #hosts = [hosts.next()] - - notify(options, 'start', dict(hosts=hosts)) - - # Before we do work, chdir to /var/tmp/. mirrormanager1 did this and I'm - # not totally sure why... - os.chdir('/var/tmp') - - # Then create a threadpool to handle as many at a time as we like - threadpool = multiprocessing.pool.ThreadPool(processes=options.threads) - fn = lambda host_id: worker(options, config, host_id) - - # Here's the big operation - return_codes = threadpool.map(fn, hosts) - - # Put a bow on the results for fedmsg - results = [dict(rc=rc, host=host) for rc, host in zip(return_codes, hosts)] - notify(options, 'complete', dict(results=results)) - - return results - - -def setup_logging(debug): - logging.basicConfig(format=master_formatter) - f = MasterFilter() - logger.addFilter(f) - if debug: - logger.setLevel(logging.DEBUG) - else: - logger.setLevel(logging.INFO) - return logger - - -def main(): - starttime = time.time() - parser = argparse.ArgumentParser(usage=sys.argv[0] + " [options]") - parser.add_argument( - "-c", "--config", - dest="config", default='/etc/mirrormanager/mirrormanager2.cfg', - help="Configuration file to use") - - parser.add_argument( - "--include-private", - action="store_true", dest="include_private", default=False, - help="Include hosts marked 'private' in the crawl") - - parser.add_argument( - "-t", "--threads", type=int, dest="threads", default=10, - help="max threads to start in parallel") - - parser.add_argument( - "--timeout-minutes", type=int, dest="timeout_minutes", default=120, - help="per-host timeout, in minutes") - - parser.add_argument( - "--startid", type=int, metavar="ID", dest="startid", default=0, - help="Start crawling at host ID (default=0)") - - parser.add_argument( - "--stopid", type=int, metavar="ID", - dest="stopid", default=sys.maxint, - help="Stop crawling before host ID (default=maxint)") - - parser.add_argument( - "--category", dest="categories", action="append", default=[], - help="Category to scan (default=all), can be repeated") - - parser.add_argument( - "--disable-fedmsg", dest="fedmsg", action="store_false", default=True, - help="Disable fedmsg notifications at the beginning and end of crawl") - - parser.add_argument( - "--canary", dest="canary", action="store_true", default=False, - help="fast crawl by only scanning for canary files") - - parser.add_argument( - "--debug", "-d", dest="debug", action="store_true", default=False, - help="enable printing of debug-level messages") - - options = parser.parse_args() - - if options.canary: - raise NotImplementedError("Canary mode is not yet implemented.") - - setup_logging(options.debug) - - config = dict() - with open(options.config) as config_file: - exec(compile(config_file.read(), options.config, 'exec'), config) - - doit(options, config) - logger.info("Crawler finished after %d seconds" % (time.time() - starttime)) - return 0 - - -################################################ -# overrides for httplib because we're -# handling keepalives ourself -################################################ -class myHTTPResponse(httplib.HTTPResponse): - def begin(self): - httplib.HTTPResponse.begin(self) - self.will_close = False - - def isclosed(self): - """This is a hack, because otherwise httplib will fail getresponse()""" - return True - - def keepalive_ok(self): - # HTTP/1.1 connections stay open until closed - if self.version == 11: - ka = self.msg.getheader('connection') - if ka and "close" in ka.lower(): - return False - else: - return True - - # other HTTP connections may have a connection: keep-alive header - ka = self.msg.getheader('connection') - if ka and "keep-alive" in ka.lower(): - return True - - try: - ka = self.msg.getheader('keep-alive') - if ka is not None: - maxidx = ka.index('max=') - maxval = ka[maxidx+4:] - if maxval == '1': - return False - return True - else: - ka = self.msg.getheader('connection') - if ka and "keep-alive" in ka.lower(): - return True - return False - except: - return False - return False - - -class myHTTPConnection(httplib.HTTPConnection): - response_class = myHTTPResponse - - def end_request(self): - self.__response = None - - -################################################ -# the magic begins - - -def timeout_check(timeout): - delta = time.time() - threadlocal.starttime - if delta > (timeout * 60): - raise TimeoutException("Timed out after %rs" % delta) - - -class hostState: - def __init__(self, http_debuglevel=0, ftp_debuglevel=0, timeout_minutes=120): - self.httpconn = {} - self.ftpconn = {} - self.http_debuglevel = http_debuglevel - self.ftp_debuglevel = ftp_debuglevel - self.ftp_dir_results = None - self.keepalives_available = False - # ftplib and httplib take the timeout in seconds - self.timeout = timeout_minutes*60 - - def get_connection(self, url): - scheme, netloc, path, query, fragment = urlparse.urlsplit(url) - if scheme == 'ftp': - if self.ftpconn.has_key(netloc): - return self.ftpconn[netloc] - elif scheme == 'http': - if self.httpconn.has_key(netloc): - return self.httpconn[netloc] - return None - - def open_http(self, url): - scheme, netloc, path, query, fragment = urlparse.urlsplit(url) - if not self.httpconn.has_key(netloc): - self.httpconn[netloc] = myHTTPConnection(netloc, timeout=self.timeout) - self.httpconn[netloc].set_debuglevel(self.http_debuglevel) - return self.httpconn[netloc] - - def _open_ftp(self, netloc): - if not self.ftpconn.has_key(netloc): - self.ftpconn[netloc] = FTP(netloc, timeout=self.timeout) - self.ftpconn[netloc].set_debuglevel(self.ftp_debuglevel) - self.ftpconn[netloc].login() - - def check_ftp_dir_callback(self, line): - if self.ftp_debuglevel > 0: - logger.info(line) - self.ftp_dir_results.append(line) - - def ftp_dir(self, url): - scheme, netloc, path, query, fragment = urlparse.urlsplit(url) - self._open_ftp(netloc) - c = self.ftpconn[netloc] - self.ftp_dir_results = [] - c.dir(path, self.check_ftp_dir_callback) - - - def close_http(self, url): - scheme, netloc, path, query, fragment = urlparse.urlsplit(url) - if self.httpconn.has_key(netloc): - self.httpconn[netloc].close() - del self.httpconn[netloc] - - def close_ftp(self, url): - scheme, netloc, path, query, fragment = urlparse.urlsplit(url) - if self.ftpconn.has_key(netloc): - try: - self.ftpconn[netloc].quit() - except: - pass - del self.ftpconn[netloc] - - def close(self): - for c in self.httpconn.keys(): - self.close_http(c) - - for c in self.ftpconn.keys(): - self.close_ftp(c) - - -class TryLater(Exception): pass -class ForbiddenExpected(Exception): pass -class TimeoutException(Exception): pass -class HTTPUnknown(Exception): pass -class HTTP500(Exception): pass - - -def get_ftp_dir(hoststate, url, readable, i=0): - if i > 1: - raise TryLater() - - try: - hoststate.ftp_dir(url) - except ftplib.error_perm, e: - # Returned by Princeton University when directory does not exist - if str(e).startswith('550'): - return [] - # Returned by Princeton University when directory isn't readable - # (pre-bitflip) - if str(e).startswith('553'): - if readable: - return [] - else: - raise ForbiddenExpected() - # Returned by ftp2.surplux.net when cannot log in due to connection - # restrictions - if str(e).startswith('530'): - hoststate.close_ftp(url) - return get_ftp_dir(hoststate, url, readable, i+1) - if str(e).startswith('500'): # Oops - raise TryLater() - else: - logger.error("unknown permanent error %s on %s" % (e, url)) - raise - except ftplib.error_temp, e: - # Returned by Boston University when directory does not exist - if str(e).startswith('450'): - return [] - # Returned by Princeton University when cannot log in due to - # connection restrictions - if str(e).startswith('421'): - logger.info("Connections Exceeded %s" % url) - raise TryLater() - if str(e).startswith('425'): - logger.info("Failed to establish connection on %s" % url) - raise TryLater() - else: - logger.error("unknown error %s on %s" % (e, url)) - raise - except (EOFError, socket.error): - hoststate.close_ftp(url) - return get_ftp_dir(hoststate, url, readable, i+1) - - return hoststate.ftp_dir_results - - -def check_ftp_file(hoststate, url, filedata, readable): - if url.endswith('/'): - url = url[:-1] - try: - results = get_ftp_dir(hoststate, url, readable) - except TryLater: - raise - except ForbiddenExpected: - return None - if results is None: - return None - if len(results) == 1: - line = results[0].split() - # For the basic check in check_for_base_dir() it is only - # relevant if the directory exists or not. Therefore - # passing None as filedata[]. This needs to be handled here. - if filedata is None: - # The file/directory seems to exist - return True - if line[4] == filedata['size']: - return True - return False - - -def check_url(hoststate, url, filedata, recursion, readable): - if url.startswith('http:'): - return check_head(hoststate, url, filedata, recursion, readable) - elif url.startswith('ftp:'): - return check_ftp_file(hoststate, url, filedata, readable) - - -def handle_redirect(hoststate, url, location, filedata, recursion, readable): - if recursion > 10: - raise HTTPUnknown() - if location.startswith('/'): - scheme, netloc, path, query, fragment = urlparse.urlsplit(url) - location = '%s:%s%s' % (scheme, netloc, location) - return check_url(hoststate, location, filedata, recursion+1, readable) - - -def check_head(hoststate, url, filedata, recursion, readable, retry=0): - """ Returns tuple: - True - URL exists - False - URL doesn't exist - None - we don't know - """ - - try: - conn = hoststate.open_http(url) - except: - return None - - scheme, netloc, path, query, fragment = urlparse.urlsplit(url) - reqpath = path - if len(query) > 0: - reqpath += "?%s" % query - if len(fragment) > 0: - reqpath += "#%s" % fragment - conn.request('HEAD', reqpath, - headers={'Connection':'Keep-Alive', - 'Pragma':'no-cache', - 'User-Agent':'mirrormanager-crawler/0.1 (+http://fedorahosted.org/mirrormanager)'}) - - r = None - try: - r = conn.getresponse() - status = r.status - except: - if retry == 0: - # retry once - hoststate.close_http(url) - return check_head(hoststate, url, filedata, recursion, readable, retry=1) - else: - raise HTTPUnknown() - - conn.end_request() - keepalive_ok = r.keepalive_ok() - if keepalive_ok: - hoststate.keepalives_available = True - if not keepalive_ok: - hoststate.close_http(url) - - content_length = r.getheader('Content-Length') - #last_modified = r.getheader('Last-Modified') - - if status >= 200 and status < 300: - # lighttpd returns a Content-Length for directories - # apache and nginx do not - # For the basic check in check_for_base_dir() it is only - # relevant if the directory exists or not. Therefore - # passing None as filedata[]. This needs to be handled here. - if filedata is None: - # The file/directory seems to exist - return True - # fixme should check last_modified too - if filedata['size'] == content_length or content_length is None: # handle no content-length header, streaming/chunked return or zero-length file - return True - else: - return False - if status >= 300 and status < 400: - return handle_redirect(hoststate, url, r.getheader('Location'), filedata, recursion, readable) - elif status >= 400 and status < 500: - if status == 403: # forbidden - # may be a hidden dir still - if readable: - return False - else: - raise ForbiddenExpected() - elif status == 404 or status == 410: # not found / gone - return False - # we don't know - return None - elif status >= 500: - raise HTTP500() - - logger.info("status = %s" % status) - raise HTTPUnknown() - - -def report_stats(stats): - msg = "Crawl duration: %d seconds" % stats['duration'] - logger.info(msg) - msg = "Total directories: %d" % stats['numkeys'] - logger.info(msg) - msg = "Changed to up2date: %d" % stats['up2date'] - logger.info(msg) - msg = "Changed to not up2date: %d" % stats['not_up2date'] - logger.info(msg) - msg = "Unchanged: %d" % stats['unchanged'] - logger.info(msg) - msg = "Unknown disposition: %d" % stats['unknown'] - logger.info(msg) - msg = "New HostCategoryDirs created: %d" % stats['newdir'] - logger.info(msg) - msg = "HostCategoryDirs now deleted on the master, marked not "\ - "up2date: %d" % stats['deleted_on_master'] - logger.info(msg) - - -def sync_hcds(session, host, host_category_dirs): - stats = dict(up2date = 0, not_up2date = 0, unchanged = 0, - unknown = 0, newdir = 0, deleted_on_master = 0, duration = 0) - current_hcds = {} - stats['duration'] = time.time() - threadlocal.starttime - keys = host_category_dirs.keys() - keys = sorted(keys, key = lambda t: t[1].name) - stats['numkeys'] = len(keys) - for (hc, d) in keys: - up2date = host_category_dirs[(hc, d)] - if up2date is None: - stats['unknown'] += 1 - continue - - topname = hc.category.topdir.name - path = d.name[len(topname)+1:] - - hcd = mirrormanager2.lib.get_hostcategorydir_by_hostcategoryid_and_path( - session, host_category_id=hc.id, path=path) - if len(hcd) > 0: - hcd = hcd[0] - else: - # don't create HCDs for directories which aren't up2date on the - # mirror chances are the mirror is excluding that directory - if not up2date: continue - hcd = HostCategoryDir( - host_category_id=hc.id, path=path, directory_id=d.id) - stats['newdir'] += 1 - - if hcd.directory is None: - hcd.directory = d - if hcd.up2date != up2date: - hcd.up2date = up2date - session.add(hcd) - if up2date == False: - logger.info( - "Directory %s is not up-to-date on this host." % d.name) - stats['not_up2date'] += 1 - else: - logger.info(d.name) - stats['up2date'] += 1 - else: - stats['unchanged'] += 1 - - current_hcds[hcd] = True - - # now-historical HostCategoryDirs are not up2date - # we wait for a cascading Directory delete to delete this - for hc in list(host.categories): - for hcd in list(hc.directories): - if hcd.directory is not None and not hcd.directory.readable: - stats['unreadable'] += 1 - continue - if hcd not in current_hcds: - if hcd.up2date != False: - hcd.up2date = False - session.add(hcd) - stats['deleted_on_master'] += 1 - session.commit() - report_stats(stats) - - -def method_pref(urls, prev=""): - """ return which of the hosts connection method should be used - rsync > http > ftp """ - pref = None - for u in urls: - if prev.startswith('rsync:'): - break - if u.startswith('rsync:'): - return u - for u in urls: - if u.startswith('http:'): - pref = u - break - if pref is None: - for u in urls: - if u.startswith('ftp:'): - pref = u - break - logger.info("Crawling with URL %s" % pref) - return pref - - -def parent(session, directory): - parentDir = None - splitpath = directory.name.split(u'/') - if len(splitpath[:-1]) > 0: - parentPath = u'/'.join(splitpath[:-1]) - parentDir = mirrormanager2.lib.get_directory_by_name( - session, parentPath) - return parentDir - - -def add_parents(session, host_category_dirs, hc, d): - parentDir = parent(session, d) - if parentDir is not None: - if (hc, parentDir) not in host_category_dirs: - host_category_dirs[(hc, parentDir)] = None - if parentDir != hc.category.topdir: # stop at top of the category - return add_parents(session, host_category_dirs, hc, parentDir) - - return host_category_dirs - - -def compare_sha256(d, filename, graburl): - """ looks for a FileDetails object that matches the given URL """ - found = False - s = urllib2.urlopen(graburl) - sha256 = hashlib.sha256(s.read()).hexdigest() - for fd in list(d.fileDetails): - if fd.filename == filename and fd.sha256 is not None: - if fd.sha256 == sha256: - found = True - break - return found - - -def try_per_file(d, hoststate, url, timeout): - if d.files is None: - return None - exists = None - for filename in d.files.keys(): - # Check if maximum crawl time for this host has been reached - timeout_check(timeout) - exists = None - graburl = "%s/%s" % (url, filename) - try: - exists = check_url( - hoststate, graburl, d.files[filename], 0, d.readable) - if exists == False: - return False - except TryLater: - raise - except ForbiddenExpected: - return None - except ftplib.all_errors: - hoststate.close_ftp(url) - return None - except: - return None - - if filename == 'repomd.xml': - try: - exists = compare_sha256(d, filename, graburl) - except: - pass - if exists == False: - return False - - if exists is None: - return None - - return True - - -def try_per_category( - session, trydirs, url, host_category_dirs, hc, host, - categoryPrefixLen, timeout, config): - """ In addition to the crawls using http and ftp, this rsync crawl - scans the complete category with one connection instead perdir (ftp) - or perfile(http). """ - - if not url.startswith('rsync'): - return None - - # rsync URL available, let's use it; it requires only one network - # connection instead of multiples like with http and ftp - rsync = {} - if not url.endswith('/'): - url += '/' - - rsync_start_time = datetime.datetime.utcnow() - params = config.get('CRAWLER_RSYNC_PARAMETERS', '--no-motd') - try: - result, listing = run_rsync(url, params, logger) - except: - logger.warning('Failed to run rsync.', exc_info = True) - return False - rsync_stop_time = datetime.datetime.utcnow() - msg = "rsync time: %s" % str(rsync_stop_time - rsync_start_time) - logger.info(msg) - if result == 10: - # no rsync content, fail! - logger.warning( - 'Connection to host %s Refused. Please check that the URL is ' - 'correct and that the host has an rsync module still available.' - % host.name) - return False - if result > 0: - logger.info('rsync returned exit code %d' % result) - - # put the rsync listing in a dict for easy access - while True: - line = listing.readline() - if not line: break - fields = line.split() - try: - rsync[fields[4]] = { - 'mode': fields[0], 'size': fields[1], - 'date': fields[2], 'time': fields[3]} - except IndexError: - logger.debug("invalid rsync line: %s\n" % line) - - # run_rsync() returns a temporary file which needs to be closed - listing.close() - - logger.debug("rsync listing has %d lines" % len(rsync)) - if len(rsync) == 0: - # no rsync content, fail! - return False - # for all directories in this category - for d in trydirs: - # Check if maximum crawl time for this host has been reached - timeout_check(timeout) - - # ignore unreadable directories - we can't really know about them - if not d.readable: - host_category_dirs[(hc, d)] = None - continue - - all_files = True - # the rsync listing is missing the category part of the url - # remove if from the ones we are comparing it with - name = d.name[categoryPrefixLen:] - for filename in sorted(d.files.keys()): - if len(name) == 0: - key = filename - else: - key = os.path.join(name, filename) - try: - logger.debug('trying with key %s' % key) - if rsync[key]['size'] != d.files[filename]['size'] \ - and not rsync[key]['mode'].startswith('l'): - # ignore symlink size differences - logger.debug( - 'rsync: file size mismatch %s %s != %s\n' - % (filename, d.files[filename]['size'], - rsync[key]['size'])) - all_files = False - break - except KeyError: # file is not in the rsync listing - msg = 'Missing remote file %s\n' % key - logger.debug(msg) - all_files = False - break - except: # something else went wrong - exception_msg = "Exception caught in try_per_category()\n" - logger.exception(exception_msg) - all_files = False - break - - if all_files is False: - host_category_dirs[(hc, d)] = False - else: - host_category_dirs[(hc, d)] = True - host_category_dirs = add_parents( - session, host_category_dirs, hc, d) - - if len(host_category_dirs) > 0: - return True - - mark_not_up2date( - session, config, - None, host, "No host category directories found. " - "Check that your Host Category URLs are correct.") - return False - - -def try_per_dir(d, hoststate, url, timeout): - if d.files is None: - return None - if not url.startswith('ftp'): - return None - results = {} - if not url.endswith('/'): - url += '/' - listing = get_ftp_dir(hoststate, url, d.readable) - if listing is None: - return None - - if len(listing) == 0: - return False - - # Check if maximum crawl time for this host has been reached - timeout_check(timeout) - - for line in listing: - if line.startswith('total'): # some servers first include a line starting with the word 'total' that we can ignore - continue - fields = line.split() - try: - results[fields[8]] = {'size': fields[4]} - except IndexError: # line doesn't have 8 fields, it's not a dir line - pass - - for filename in d.files.keys(): - try: - if results[filename]['size'] != d.files[filename]['size']: - return False - except: - return False - return True - - -def send_email(config, host, report_str, exc): - if not config.get('CRAWLER_SEND_EMAIL', False): - return - - SMTP_DATE_FORMAT = "%a, %d %b %Y %H:%M:%S %z" - msg = """From: %s -To: %s -Subject: %s MirrorManager crawler report -Date: %s - -""" % (config.get('EMAIL_FROM'), - config.get('ADMIN_EMAIL'), - host.name, - time.strftime(SMTP_DATE_FORMAT)) - - msg += report_str + '\n' - msg += 'Log can be found at %s/%s.log\n' % (config.get('crawler.logdir'), str(host.id)) - if exc is not None: - msg += "Exception info: type %s; value %s\n" % (exc[0], exc[1]) - msg += str(exc[2]) - try: - smtp = smtplib.SMTP(config.get('SMTP_SERVER')) - - username = config.get('SMTP_USERNAME') - password = config.get('SMTP_PASSWORD') - - if username and password: - smtp.login(username, password) - - smtp.sendmail(config.get('SMTP_SERVER'), - config.get('ADMIN_EMAIL'), - msg) - except: - logger.exception("Error sending email") - logger.debug("Email message follows:") - logger.debug(msg) - - try: - smtp.quit() - except: - pass - - -def mark_not_up2date(session, config, exc, host, reason="Unknown"): - """This function marks a complete host as not being up to date. - It usually is called if the scan of a single category has failed. - This is something the crawler does at multiple places: Failure - in the scan of a single category disables the complete host.""" - # Watch out: set_not_up2date(session) is commiting all changes - # in this thread to the database - host.set_not_up2date(session) - msg = "Host %s marked not up2date: %s" % (host.id, reason) - logger.warning(msg) - if exc is not None: - logger.debug("%s %s %s" % (exc[0], exc[1], exc[2])) - send_email(config, host, msg, exc) - - -def select_host_categories_to_scan(session, options, host): - result = [] - if options.categories: - for category in options.categories: - hc = mirrormanager2.lib.get_host_category_by_hostid_category( - session, host_id=host.id, category=category) - for entry in hc: - result.append(entry) - else: - result = list(host.categories) - return result - -def check_for_base_dir(hoststate, urls): - """Check if at least one of the given URL exists on the remote host. - This is used to detect mirrors which have completely dropped our content. - This is only looking at http and ftp URLs as those URLs are actually - relevant for normal access. If both tests fail the mirror will be marked - as failed during crawl. - """ - exists = False - for u in urls: - if not u.endswith('/'): - u += '/' - if u.startswith('http:'): - try: - exists = check_head(hoststate, u, None, False, True) - except: - exists = False - if not exists: - logger.warning("Base URL %s does not exist." % u) - continue - # The base http URL seems to work. Good! - return True - if u.startswith('ftp:'): - try: - exists = get_ftp_dir(hoststate, u, True) - except TryLater: - # Some temporary difficulties on the mirror - exists = False - # exists is an empty list if that directory does not exist - if not exists: - logger.warning("Base URL %s does not exist." % u) - continue - # The base ftp URL seems to work. Good! - return True - - # Reaching this point means that no functional http/ftp has been - # found. This means that the mirror will not work for normal http - # and ftp users. - return False - -def per_host(session, host, options, config): - """Canary mode looks for 2 things: - directory.path ends in 'iso' or directory.path ends in 'repodata'. In - this case it checks for availability of each of the files in those - directories. - """ - rc = 0 - successful_categories = 0 - host = mirrormanager2.lib.get_host(session, host) - host_category_dirs = {} - if host.private and not options.include_private: - return 1 - http_debuglevel = 0 - ftp_debuglevel = 0 - if options.debug: - http_debuglevel = 2 - ftp_debuglevel = 2 - - hoststate = hostState( - http_debuglevel=http_debuglevel, - ftp_debuglevel=ftp_debuglevel, - timeout_minutes=options.timeout_minutes) - - categoryUrl = '' - host_categories_to_scan = select_host_categories_to_scan( - session, options, host) - if len(host_categories_to_scan) == 0: - mark_not_up2date( - session, config, - None, host, "No host category directories found. " - "Check that your Host Category URLs are correct.") - return 1 - - for hc in host_categories_to_scan: - if hc.always_up2date: - continue - category = hc.category - - logger.info("scanning Category %s" % category.name) - - host_category_urls = [hcurl.url for hcurl in hc.urls] - categoryUrl = method_pref(host_category_urls) - if categoryUrl is None: - continue - categoryPrefixLen = len(category.topdir.name)+1 - - # Check if either the http or ftp URL of the host point - # to an existing and readable URL - exists = check_for_base_dir(hoststate, host_category_urls) - - if not exists: - # Base categoryURL for the current host was not found. - # Skipping this category. - continue - - # Record that this host has at least one (or more) categories - # which is accessible via http or ftp - successful_categories += 1 - - trydirs = list(hc.category.directories) - # check the complete category in one go with rsync - try: - has_all_files = try_per_category( - session, trydirs, categoryUrl, host_category_dirs, hc, - host, categoryPrefixLen, options.timeout_minutes, config) - except TimeoutException: - # If the crawl of only one category fails, the host - # is completely marked as not being up to date. - raise - - if type(has_all_files) == type(True): - # all files in this category are up to date, or not - # no further checks necessary - # do the next category - continue - - # has_all_files is None, we don't know what failed, but something did - # change preferred protocol if necessary to http or ftp - categoryUrl = method_pref(host_category_urls, categoryUrl) - - try_later_delay = 1 - for d in trydirs: - timeout_check(options.timeout_minutes) - - if not d.readable: - continue - - if options.canary: - if not (d.name.endswith('/repodata') \ - or d.name.endswith('/iso')): - continue - - dirname = d.name[categoryPrefixLen:] - url = '%s/%s' % (categoryUrl, dirname) - - try: - has_all_files = try_per_dir(d, hoststate, url, options.timeout_minutes) - if has_all_files is None: - has_all_files = try_per_file(d, hoststate, url, options.timeout_minutes) - - if has_all_files == False: - logger.warning("Not up2date: %s" % (d.name)) - host_category_dirs[(hc, d)] = False - elif has_all_files == True: - host_category_dirs[(hc, d)] = True - logger.info(url) - # make sure our parent dirs appear on the list too - host_category_dirs = add_parents( - session, host_category_dirs, hc, d) - else: - # could be a dir with no files, or an unreadable dir. - # defer decision on this dir, let a child decide. - pass - - # We succeeded, let's reduce the try_later_delay - if try_later_delay > 1: - try_later_delay = try_later_delay >> 1 - except TryLater: - msg = "Server load exceeded on %r - try later (%s seconds)" % ( - host, try_later_delay) - logger.warning(msg) - if categoryUrl.startswith('http') \ - and not hoststate.keepalives_available: - logger.warning( - "Host %s (id=%d) does not have HTTP Keep-Alives " - "enabled." % (host.name, host.id)) - - time.sleep(try_later_delay) - if try_later_delay < 60: - try_later_delay = try_later_delay << 1 - except TimeoutException: - # If the crawl of only one category fails, the host - # is completely marked as not being up to date. - raise - except: - logger.exception("Unhandled exception raised.") - mark_not_up2date( - session, config, - sys.exc_info(), host, "Unhandled exception raised. " - "This is a bug in the MM crawler.") - rc = 1 - break - if categoryUrl.startswith('http') and not hoststate.keepalives_available: - logger.warning( - "Host %s (id=%d) does not have HTTP Keep-Alives enabled." - % (host.name, host.id)) - hoststate.close() - - if successful_categories == 0: - # Let's say that '5' is the signal for the calling function - # that all categories have failed due to broken base URLs - # and that this host should me marked as failed during crawl - return 5 - - if rc == 0: - if len(host_category_dirs) > 0: - sync_hcds(session, host, host_category_dirs) - return rc - -def count_crawl_failures(host, config): - try: - host.crawl_failures += 1 - except TypeError: - host.crawl_failures = 1 - - auto_disable = config.get('CRAWLER_AUTO_DISABLE', 4) - if host.crawl_failures >= auto_disable: - host.disable_reason = ("Host has been disabled (user_active) after %d" - " consecutive crawl failures" % auto_disable ) - host.user_active = False - -def worker(options, config, host_id): - global current_host - global threads_active - - session = mirrormanager2.lib.create_session(config['DB_URL']) - host = mirrormanager2.lib.get_host(session, host_id) - - threads_active = threads_active + 1 - current_host = current_host + 1 - threadlocal.starttime = time.time() - threadlocal.hostid = host.id - threadlocal.hostname = host.name - - # setup per thread file logger - log_dir = config.get('MM_LOG_DIR', None) - # check if the directory exists - if log_dir is not None: - log_dir += "/crawler" - if not os.path.isdir(log_dir): - # MM_LOG_DIR/crawler seems to be configured but does not exist - # not logging - logger.warning("Directory " + log_dir + " does not exists." - " Not logging per host") - log_dir = None - - if log_dir is not None: - fh = logging.FileHandler(log_dir + "/" + str(host.id) + '.log') - threadlocal.thread_id = thread_id() - f = InjectingFilter(thread_id()) - fh.addFilter(f) - - if options.debug: - fh.setLevel(logging.DEBUG) - else: - fh.setLevel(logging.INFO) - fh.setFormatter(formatter) - logger.addHandler(fh) - - logger.info("Worker %r starting on host %r" % (thread_id(), host)) - - - try: - rc = per_host(session, host.id, options, config) - host.last_crawled = datetime.datetime.utcnow() - host.last_crawl_duration = time.time() - threadlocal.starttime - if rc == 5: - # rc == 5 has been define as a problem with all categories - count_crawl_failures(host, config) - else: - # Resetting as this only counts consecutive crawl failures - host.crawl_failures = 0 - session.commit() - except TimeoutException: - rc = 2 - mark_not_up2date( - session, config, - None, host, - "Crawler timed out before completing. " - "Host is likely overloaded.") - host.last_crawled = datetime.datetime.utcnow() - host.last_crawl_duration = time.time() - threadlocal.starttime - count_crawl_failures(host, config) - session.commit() - except Exception: - logger.exception("Failure in thread %r, host %r" % (thread_id(), host)) - rc = 3 - - logger.info("Ending crawl of %r with status %r" % (host, rc)) - logger.removeHandler(fh) - fh.close() - session.close() - threads_active = threads_active - 1 - gc.collect() - return rc - - -if __name__ == "__main__": - sys.exit(main())