From 8e1fdc82e44cc0f5600e4d78809ea77de041d01b Mon Sep 17 00:00:00 2001 From: Sayan Chowdhury Date: Tue, 28 Jun 2016 22:04:14 +0530 Subject: [PATCH] Revert "Original autocloud_job.py and consumer file for hotfix" This reverts commit aa978492710ead1e9c28921b1a4402df4c867db6. --- files/hotfix/autocloud/autocloud_job.py | 332 ------------------------ files/hotfix/autocloud/consumer.py | 236 +++++++++-------- roles/autocloud/backend/tasks/main.yml | 20 -- 3 files changed, 130 insertions(+), 458 deletions(-) delete mode 100644 files/hotfix/autocloud/autocloud_job.py diff --git a/files/hotfix/autocloud/autocloud_job.py b/files/hotfix/autocloud/autocloud_job.py deleted file mode 100644 index 3acf0a619c..0000000000 --- a/files/hotfix/autocloud/autocloud_job.py +++ /dev/null @@ -1,332 +0,0 @@ -# -*- coding: utf-8 -*- -import copy -import datetime -import json -import os -import subprocess -import sys - -from collections import defaultdict - -import fedfind.release - -from retask.queue import Queue - -from autocloud.constants import SUCCESS, FAILED, ABORTED, RUNNING -from autocloud.models import init_model, ComposeJobDetails, ComposeDetails -from autocloud.producer import publish_to_fedmsg - -import logging - -logging.basicConfig(level=logging.DEBUG) -log = logging.getLogger(__name__) - -tree = lambda: defaultdict(tree) -results = tree() - -def handle_err(session, data, out, err): - """ - Prints the details and exits. - :param out: - :param err: - :return: None - """ - # Update DB first. - data.status = u'f' - data.output = "%s: %s" % (out, err) - timestamp = datetime.datetime.now() - data.last_updated = timestamp - session.commit() - log.debug("%s: %s", out, err) - - -def system(cmd): - """ - Runs a shell command, and returns the output, err, returncode - - :param cmd: The command to run. - :return: Tuple with (output, err, returncode). - """ - ret = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) - out, err = ret.communicate() - returncode = ret.returncode - return out, err, returncode - - -def refresh_storage_pool(): - '''Refreshes libvirt storage pool. - - http://kushaldas.in/posts/storage-volume-error-in-libvirt-with-vagrant.html - ''' - out, err, retcode = system('virsh pool-list') - lines = out.split('\n') - if len(lines) > 2: - for line in lines[2:]: - words = line.split() - if len(words) == 3: - if words[1] == 'active': - system('virsh pool-refresh {0}'.format(words[0])) - - -def image_cleanup(image_path): - """ - Delete the image if it is processed or if there is any exception occur - - :param basename: Absoulte path for image - """ - if os.path.exists(image_path): - try: - os.remove(image_path) - except OSError as e: - log.error('Error: %s - %s.', e.filename, e.strerror) - - -def create_dirs(): - """ - Creates the runtime dirs - """ - system('mkdir -p /var/run/tunir') - system('mkdir -p /var/run/autocloud') - - -def create_result_text(out): - """ - :param out: Output text from the command. - """ - lines = out.splitlines() - for line in lines: - if line.startswith('Result file at:'): - result_filename = line.split(' ')[1] - - result_filename = result_filename.strip() - if os.path.exists(result_filename): - new_content = '' - with open(result_filename) as fobj: - new_content = fobj.read() - job_status_index = out.find('Job status:') - if job_status_index == -1: - return out # No job status in the output. - new_line_index = out[job_status_index:].find('\n') - out = out[:job_status_index + new_line_index] - out = out + '\n\n' + new_content - system('rm -f {0}'.format(result_filename)) - return out - return out - - -def auto_job(task_data): - """ - This fuction queues the job, and then executes the tests, - updates the db as required. - - :param taskid: Koji taskid. - :param image_url: URL to download the fedora image. - :return: - """ - # TODO: - # We will have to update the job information on DB, rather - # than creating it. But we will do it afterwards. - - compose_image_url = task_data['absolute_path'] - compose_id = task_data['compose']['id'] - release = task_data['compose']['release'] - job_id = task_data['job_id'] - image_type = task_data['type'] - - job_type = 'vm' - - # Just to make sure that we have runtime dirs - create_dirs() - - session = init_model() - timestamp = datetime.datetime.now() - data = None - try: - data = session.query(ComposeJobDetails).get(str(job_id)) - data.status = u'r' - data.last_updated = timestamp - except Exception as err: - log.error("%s" % err) - log.error("%s: %s", compose_id, compose_image_url) - session.commit() - - params = { - 'compose_url': compose_image_url, - 'compose_id': compose_id, - 'status': RUNNING, - 'job_id': job_id, - 'release': release, - 'family': data.family.value, - 'type': image_type, - 'image_name': data.image_name, - } - publish_to_fedmsg(topic='image.running', **params) - - # Now we have job queued, let us start the job. - # Step 1: Download the image - image_url = compose_image_url - basename = os.path.basename(image_url) - image_path = '/var/run/autocloud/%s' % basename - log.debug("Going to download {0}".format(image_url)) - out, err, ret_code = system('wget %s -O %s' % (image_url, image_path)) - if ret_code: - image_cleanup(image_path) - handle_err(session, data, out, err) - log.debug("Return code: %d" % ret_code) - - params.update({'status': FAILED}) - publish_to_fedmsg(topic='image.failed', **params) - return FAILED, check_status_of_compose_image(compose_id) - - # Step 2: Create the conf file with correct image path. - if basename.find('vagrant') == -1: - conf = {"image": "/var/run/autocloud/%s" % basename, - "name": "fedora", - "password": "passw0rd", - "ram": 2048, - "type": "vm", - "user": "fedora"} - - else: # We now have a Vagrant job. - conf = { - "name": "fedora", - "type": "vagrant", - "image": "/var/run/autocloud/%s" % basename, - "ram": 2048, - "user": "vagrant", - "port": "22" - } - if basename.find('virtualbox') != -1: - conf['provider'] = 'virtualbox' - job_type = 'vagrant' - - #Now let us refresh the storage pool - refresh_storage_pool() - - with open('/var/run/autocloud/fedora.json', 'w') as fobj: - fobj.write(json.dumps(conf)) - - system('/usr/bin/cp -f /etc/autocloud/fedora.txt /var/run/autocloud/fedora.txt') - - cmd = 'tunir --job fedora --config-dir /var/run/autocloud/' - # Now run tunir - out, err, ret_code = system(cmd) - if ret_code: - image_cleanup(image_path) - handle_err(session, data, create_result_text(out), err) - log.debug("Return code: %d" % ret_code) - params.update({'status': FAILED}) - publish_to_fedmsg(topic='image.failed', **params) - return FAILED, check_status_of_compose_image(compose_id) - else: - image_cleanup(image_path) - - # Enabling direct stdout as output of the command - out = create_result_text(out) - if job_type == 'vm': - com_text = out[out.find('/usr/bin/qemu-kvm'):] - else: - com_text = out - - data.status = u's' - timestamp = datetime.datetime.now() - data.last_updated = timestamp - data.output = com_text - session.commit() - - params.update({'status': SUCCESS}) - publish_to_fedmsg(topic='image.success', **params) - return SUCCESS, check_status_of_compose_image(compose_id) - - -def check_status_of_compose_image(compose_id): - session = init_model() - compose_job_objs = session.query(ComposeJobDetails).filter_by( - compose_id=compose_id).all() - compose_obj = session.query(ComposeDetails).filter_by( - compose_id=compose_id).first() - - is_running = False - - for compose_job_obj in compose_job_objs: - status = compose_job_obj.status.code - if status in ('r', 'q'): - is_running = True - break - - if is_running: - return False - - for compose_job_obj in compose_job_objs: - status = compose_job_obj.status.code - - if status in ('s',): - results[compose_id][SUCCESS] = results[compose_id].get(SUCCESS, 0) + 1 - elif status in ('f', 'a'): - results[compose_id][FAILED] = results[compose_id].get(FAILED, 0) + 1 - - if isinstance(results[compose_id][SUCCESS], defaultdict): - results[compose_id][SUCCESS] = 0 - - if isinstance(results[compose_id][FAILED], defaultdict): - results[compose_id][FAILED] = 0 - - compose_obj.passed = results[compose_id][SUCCESS] - compose_obj.failed = results[compose_id][FAILED] - compose_obj.status = u'c' - - session.commit() - - compose_id = compose_obj.compose_id - rel = fedfind.release.get_release(cid=compose_id) - release = rel.release - - params = { - 'compose_id': compose_obj.compose_id, - 'respin': compose_obj.respin, - 'type': compose_obj.type, - 'date': datetime.datetime.strftime(compose_obj.date, '%Y%m%d'), - 'results': results[compose_id], - 'release': release, - 'status': 'completed', - 'compose_job_id': compose_obj.id - } - - publish_to_fedmsg(topic='compose.complete', **params) - results.pop(compose_id, {}) - - return True - - -def main(): - jobqueue = Queue('jobqueue') - jobqueue.connect() - - while True: - task = jobqueue.wait() - - task_data = task.data - pos, num_images = task_data['pos'] - - compose_details = task_data['compose'] - - if pos == 1: - session = init_model() - compose_id = compose_details['id'] - compose_obj = session.query(ComposeDetails).filter_by( - compose_id=compose_id).first() - compose_obj.status = u'r' - session.commit() - - - params = copy.deepcopy(compose_details) - params.update({'status': 'running'}) - publish_to_fedmsg(topic='compose.running', **params) - - result, running_status = auto_job(task_data) - - - -if __name__ == '__main__': - main() diff --git a/files/hotfix/autocloud/consumer.py b/files/hotfix/autocloud/consumer.py index 33db5205b2..c70cde9841 100644 --- a/files/hotfix/autocloud/consumer.py +++ b/files/hotfix/autocloud/consumer.py @@ -1,18 +1,11 @@ # -*- coding: utf-8 -*- -from datetime import datetime -import requests import fedmsg.consumers -import fedfind.release - -from sqlalchemy import exc +import koji +from autocloud.utils import get_image_url, produce_jobs, get_image_name import autocloud -from autocloud.models import init_model, ComposeDetails -from autocloud.producer import publish_to_fedmsg -from autocloud.utils import is_valid_image, produce_jobs - import logging log = logging.getLogger("fedmsg") @@ -20,120 +13,151 @@ DEBUG = autocloud.DEBUG class AutoCloudConsumer(fedmsg.consumers.FedmsgConsumer): - """ - Fedmsg consumer for Autocloud - """ if DEBUG: topic = [ - 'org.fedoraproject.dev.__main__.pungi.compose.status.change' + 'org.fedoraproject.dev.__main__.buildsys.build.state.change', + 'org.fedoraproject.dev.__main__.buildsys.task.state.change', ] else: topic = [ - 'org.fedoraproject.prod.pungi.compose.status.change' + 'org.fedoraproject.prod.buildsys.build.state.change', + 'org.fedoraproject.prod.buildsys.task.state.change', ] config_key = 'autocloud.consumer.enabled' def __init__(self, *args, **kwargs): - log.info("Autocloud Consumer is ready for action.") super(AutoCloudConsumer, self).__init__(*args, **kwargs) + def _get_tasks(self, builds): + """ Takes a list of koji createImage task IDs and returns dictionary of + build ids and image url corresponding to that build ids""" + + if autocloud.VIRTUALBOX: + _supported_images = ('Fedora-Cloud-Base-Vagrant', + 'Fedora-Cloud-Atomic-Vagrant',) + else: + _supported_images = ('Fedora-Cloud-Base-Vagrant', + 'Fedora-Cloud-Atomic-Vagrant', + 'Fedora-Cloud-Atomic', 'Fedora-Cloud-Base',) + + for build in builds: + log.info('Got Koji build {0}'.format(build)) + + # Create a Koji connection to the Fedora Koji instance + koji_session = koji.ClientSession(autocloud.KOJI_SERVER_URL) + + image_files = [] # list of full URLs of files + + if len(builds) == 1: + task_result = koji_session.getTaskResult(builds[0]) + name = task_result.get('name') + #TODO: Change to get the release information from PDC instead + # of koji once it is set up + release = task_result.get('version') + if name in _supported_images: + task_relpath = koji.pathinfo.taskrelpath(int(builds[0])) + url = get_image_url(task_result.get('files'), task_relpath) + if url: + name = get_image_name(image_name=name) + data = { + 'buildid': builds[0], + 'image_url': url, + 'name': name, + 'release': release, + } + image_files.append(data) + elif len(builds) >= 2: + koji_session.multicall = True + for build in builds: + koji_session.getTaskResult(build) + results = koji_session.multiCall() + for result in results: + + if not result: + continue + + name = result[0].get('name') + if name not in _supported_images: + continue + + #TODO: Change to get the release information from PDC instead + # of koji once it is set up + release = result[0].get('version') + task_relpath = koji.pathinfo.taskrelpath( + int(result[0].get('task_id'))) + url = get_image_url(result[0].get('files'), task_relpath) + if url: + name = get_image_name(image_name=name) + data = { + 'buildid': result[0]['task_id'], + 'image_url': url, + 'name': name, + 'release': release, + } + image_files.append(data) + + return image_files + def consume(self, msg): """ This is called when we receive a message matching the topic. """ + if msg['topic'].endswith('.buildsys.task.state.change'): + # Do the thing you've always done... this will go away soon. + # releng is transitioning away from it. + self._consume_scratch_task(msg) + elif msg['topic'].endswith('.buildsys.build.state.change'): + # Do the new thing we need to do. handle a 'real build' from koji, + # not just a scratch task. + self._consume_real_build(msg) + else: + raise NotImplementedError("Should be impossible to get here...") + + def _consume_real_build(self, msg): + builds = list() # These will be the Koji task IDs to upload, if any. + + msg = msg['body']['msg'] + if msg['owner'] != 'releng': + log.debug("Dropping message. Owned by %r" % msg['owner']) + return + + if msg['instance'] != 'primary': + log.info("Dropping message. From %r instance." % msg['instance']) + return + + # Don't upload *any* images if one of them fails. + if msg['new'] != 1: + log.info("Dropping message. State is %r" % msg['new']) + return + + koji_session = koji.ClientSession(autocloud.KOJI_SERVER_URL) + children = koji_session.getTaskChildren(msg['task_id']) + for child in children: + if child["method"] == "createImage": + builds.append(child["id"]) + + if len(builds) > 0: + produce_jobs(self._get_tasks(builds)) + + def _consume_scratch_task(self, msg): + builds = list() # These will be the Koji build IDs to upload, if any. + + msg_info = msg["body"]["msg"]["info"] + log.info('Received %r %r' % (msg['topic'], msg['body']['msg_id'])) - STATUS_F = ('FINISHED_INCOMPLETE', 'FINISHED',) - VARIANTS_F = ('CloudImages',) + # If the build method is "image", we check to see if the child + # task's method is "createImage". + if msg_info["method"] == "image": + if isinstance(msg_info["children"], list): + for child in msg_info["children"]: + if child["method"] == "createImage": + # We only care about the image if the build + # completed successfully (with state code 2). + if child["state"] == 2: + builds.append(child["id"]) - images = [] - compose_db_update = False - msg_body = msg['body'] - - if msg_body['msg']['status'] in STATUS_F: - location = msg_body['msg']['location'] - json_metadata = '{}/metadata/images.json'.format(location) - - resp = requests.get(json_metadata) - compose_images_json = getattr(resp, 'json', False) - - if compose_images_json: - compose_images_json = compose_images_json() - - compose_images = compose_images_json['payload']['images'] - compose_details = compose_images_json['payload']['compose'] - - compose_images = dict( - (variant, compose_images[variant]) - for variant in VARIANTS_F - if variant in compose_images - ) - - compose_id = compose_details['id'] - rel = fedfind.release.get_release(cid=compose_id) - release = rel.release - - compose_details.update({'release': release}) - - for variant in VARIANTS_F: - - if variant not in compose_images: - continue - - for arch, payload in compose_images[variant].iteritems(): - for item in payload: - relative_path = item['path'] - - if not is_valid_image(relative_path): - continue - - absolute_path = '{}/{}'.format(location, - relative_path) - - item.update({ - 'compose': compose_details, - 'absolute_path': absolute_path, - }) - images.append(item) - compose_db_update = True - - if compose_db_update: - session = init_model() - compose_date = datetime.strptime(compose_details['date'], - '%Y%m%d') - try: - cd = ComposeDetails( - date=compose_date, - compose_id=compose_details['id'], - respin=compose_details['respin'], - type=compose_details['type'], - status=u'q', - location=location, - ) - - session.add(cd) - session.commit() - - compose_details.update({ - 'status': 'queued', - 'compose_job_id': cd.id, - }) - publish_to_fedmsg(topic='compose.queued', - **compose_details) - - except exc.IntegrityError: - session.rollback() - cd = session.query(ComposeDetails).filter_by( - compose_id=compose_details['id']).first() - log.info('Compose already exists %s: %s' % ( - compose_details['id'], - cd.id - )) - - num_images = len(images) - for pos, image in enumerate(images): - image.update({'pos': (pos+1, num_images)}) - - produce_jobs(images) + if len(builds) > 0: + produce_jobs(self._get_tasks(builds)) diff --git a/roles/autocloud/backend/tasks/main.yml b/roles/autocloud/backend/tasks/main.yml index ba390685ae..5d4f15cf46 100644 --- a/roles/autocloud/backend/tasks/main.yml +++ b/roles/autocloud/backend/tasks/main.yml @@ -120,23 +120,3 @@ tags: - autocloud - autocloud/backend - -- name: hotfix - copy over comsumer.py for autocloud - copy: src="{{ files }}/hotfix/autocloud/consumers.py" dest=/usr/lib/python2.7/site-packages/autocloud - owner=root group=fedmsg mode=0644 - notify: - - restart fedmsg-hub - tags: - - autocloud - - autocloud/backend - - hotfix - -- name: hotfix - copy over autocloud_job.py for autocloud - copy: src="{{ files }}/hotfix/autocloud/autocloud_job.py" dest=/usr/share/autocloud/autocloud_job.py - owner=root group=fedmsg mode=0644 - notify: - - restart fedmsg-hub - tags: - - autocloud - - autocloud/backend - - hotfix