From f36696a1fd79ec6ee9dfaf596db5947cf10bb150 Mon Sep 17 00:00:00 2001 From: Sayan Chowdhury Date: Tue, 28 Jun 2016 22:14:01 +0530 Subject: [PATCH] Original autocloud_job.py and consumer file for hotfix --- files/hotfix/autocloud/autocloud_job.py | 332 ++++++++++++++++++++++++ files/hotfix/autocloud/consumer.py | 236 ++++++++--------- roles/autocloud/backend/tasks/main.yml | 20 ++ 3 files changed, 458 insertions(+), 130 deletions(-) create mode 100644 files/hotfix/autocloud/autocloud_job.py diff --git a/files/hotfix/autocloud/autocloud_job.py b/files/hotfix/autocloud/autocloud_job.py new file mode 100644 index 0000000000..3acf0a619c --- /dev/null +++ b/files/hotfix/autocloud/autocloud_job.py @@ -0,0 +1,332 @@ +# -*- coding: utf-8 -*- +import copy +import datetime +import json +import os +import subprocess +import sys + +from collections import defaultdict + +import fedfind.release + +from retask.queue import Queue + +from autocloud.constants import SUCCESS, FAILED, ABORTED, RUNNING +from autocloud.models import init_model, ComposeJobDetails, ComposeDetails +from autocloud.producer import publish_to_fedmsg + +import logging + +logging.basicConfig(level=logging.DEBUG) +log = logging.getLogger(__name__) + +tree = lambda: defaultdict(tree) +results = tree() + +def handle_err(session, data, out, err): + """ + Prints the details and exits. + :param out: + :param err: + :return: None + """ + # Update DB first. + data.status = u'f' + data.output = "%s: %s" % (out, err) + timestamp = datetime.datetime.now() + data.last_updated = timestamp + session.commit() + log.debug("%s: %s", out, err) + + +def system(cmd): + """ + Runs a shell command, and returns the output, err, returncode + + :param cmd: The command to run. + :return: Tuple with (output, err, returncode). + """ + ret = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) + out, err = ret.communicate() + returncode = ret.returncode + return out, err, returncode + + +def refresh_storage_pool(): + '''Refreshes libvirt storage pool. + + http://kushaldas.in/posts/storage-volume-error-in-libvirt-with-vagrant.html + ''' + out, err, retcode = system('virsh pool-list') + lines = out.split('\n') + if len(lines) > 2: + for line in lines[2:]: + words = line.split() + if len(words) == 3: + if words[1] == 'active': + system('virsh pool-refresh {0}'.format(words[0])) + + +def image_cleanup(image_path): + """ + Delete the image if it is processed or if there is any exception occur + + :param basename: Absoulte path for image + """ + if os.path.exists(image_path): + try: + os.remove(image_path) + except OSError as e: + log.error('Error: %s - %s.', e.filename, e.strerror) + + +def create_dirs(): + """ + Creates the runtime dirs + """ + system('mkdir -p /var/run/tunir') + system('mkdir -p /var/run/autocloud') + + +def create_result_text(out): + """ + :param out: Output text from the command. + """ + lines = out.splitlines() + for line in lines: + if line.startswith('Result file at:'): + result_filename = line.split(' ')[1] + + result_filename = result_filename.strip() + if os.path.exists(result_filename): + new_content = '' + with open(result_filename) as fobj: + new_content = fobj.read() + job_status_index = out.find('Job status:') + if job_status_index == -1: + return out # No job status in the output. + new_line_index = out[job_status_index:].find('\n') + out = out[:job_status_index + new_line_index] + out = out + '\n\n' + new_content + system('rm -f {0}'.format(result_filename)) + return out + return out + + +def auto_job(task_data): + """ + This fuction queues the job, and then executes the tests, + updates the db as required. + + :param taskid: Koji taskid. + :param image_url: URL to download the fedora image. + :return: + """ + # TODO: + # We will have to update the job information on DB, rather + # than creating it. But we will do it afterwards. + + compose_image_url = task_data['absolute_path'] + compose_id = task_data['compose']['id'] + release = task_data['compose']['release'] + job_id = task_data['job_id'] + image_type = task_data['type'] + + job_type = 'vm' + + # Just to make sure that we have runtime dirs + create_dirs() + + session = init_model() + timestamp = datetime.datetime.now() + data = None + try: + data = session.query(ComposeJobDetails).get(str(job_id)) + data.status = u'r' + data.last_updated = timestamp + except Exception as err: + log.error("%s" % err) + log.error("%s: %s", compose_id, compose_image_url) + session.commit() + + params = { + 'compose_url': compose_image_url, + 'compose_id': compose_id, + 'status': RUNNING, + 'job_id': job_id, + 'release': release, + 'family': data.family.value, + 'type': image_type, + 'image_name': data.image_name, + } + publish_to_fedmsg(topic='image.running', **params) + + # Now we have job queued, let us start the job. + # Step 1: Download the image + image_url = compose_image_url + basename = os.path.basename(image_url) + image_path = '/var/run/autocloud/%s' % basename + log.debug("Going to download {0}".format(image_url)) + out, err, ret_code = system('wget %s -O %s' % (image_url, image_path)) + if ret_code: + image_cleanup(image_path) + handle_err(session, data, out, err) + log.debug("Return code: %d" % ret_code) + + params.update({'status': FAILED}) + publish_to_fedmsg(topic='image.failed', **params) + return FAILED, check_status_of_compose_image(compose_id) + + # Step 2: Create the conf file with correct image path. + if basename.find('vagrant') == -1: + conf = {"image": "/var/run/autocloud/%s" % basename, + "name": "fedora", + "password": "passw0rd", + "ram": 2048, + "type": "vm", + "user": "fedora"} + + else: # We now have a Vagrant job. + conf = { + "name": "fedora", + "type": "vagrant", + "image": "/var/run/autocloud/%s" % basename, + "ram": 2048, + "user": "vagrant", + "port": "22" + } + if basename.find('virtualbox') != -1: + conf['provider'] = 'virtualbox' + job_type = 'vagrant' + + #Now let us refresh the storage pool + refresh_storage_pool() + + with open('/var/run/autocloud/fedora.json', 'w') as fobj: + fobj.write(json.dumps(conf)) + + system('/usr/bin/cp -f /etc/autocloud/fedora.txt /var/run/autocloud/fedora.txt') + + cmd = 'tunir --job fedora --config-dir /var/run/autocloud/' + # Now run tunir + out, err, ret_code = system(cmd) + if ret_code: + image_cleanup(image_path) + handle_err(session, data, create_result_text(out), err) + log.debug("Return code: %d" % ret_code) + params.update({'status': FAILED}) + publish_to_fedmsg(topic='image.failed', **params) + return FAILED, check_status_of_compose_image(compose_id) + else: + image_cleanup(image_path) + + # Enabling direct stdout as output of the command + out = create_result_text(out) + if job_type == 'vm': + com_text = out[out.find('/usr/bin/qemu-kvm'):] + else: + com_text = out + + data.status = u's' + timestamp = datetime.datetime.now() + data.last_updated = timestamp + data.output = com_text + session.commit() + + params.update({'status': SUCCESS}) + publish_to_fedmsg(topic='image.success', **params) + return SUCCESS, check_status_of_compose_image(compose_id) + + +def check_status_of_compose_image(compose_id): + session = init_model() + compose_job_objs = session.query(ComposeJobDetails).filter_by( + compose_id=compose_id).all() + compose_obj = session.query(ComposeDetails).filter_by( + compose_id=compose_id).first() + + is_running = False + + for compose_job_obj in compose_job_objs: + status = compose_job_obj.status.code + if status in ('r', 'q'): + is_running = True + break + + if is_running: + return False + + for compose_job_obj in compose_job_objs: + status = compose_job_obj.status.code + + if status in ('s',): + results[compose_id][SUCCESS] = results[compose_id].get(SUCCESS, 0) + 1 + elif status in ('f', 'a'): + results[compose_id][FAILED] = results[compose_id].get(FAILED, 0) + 1 + + if isinstance(results[compose_id][SUCCESS], defaultdict): + results[compose_id][SUCCESS] = 0 + + if isinstance(results[compose_id][FAILED], defaultdict): + results[compose_id][FAILED] = 0 + + compose_obj.passed = results[compose_id][SUCCESS] + compose_obj.failed = results[compose_id][FAILED] + compose_obj.status = u'c' + + session.commit() + + compose_id = compose_obj.compose_id + rel = fedfind.release.get_release(cid=compose_id) + release = rel.release + + params = { + 'compose_id': compose_obj.compose_id, + 'respin': compose_obj.respin, + 'type': compose_obj.type, + 'date': datetime.datetime.strftime(compose_obj.date, '%Y%m%d'), + 'results': results[compose_id], + 'release': release, + 'status': 'completed', + 'compose_job_id': compose_obj.id + } + + publish_to_fedmsg(topic='compose.complete', **params) + results.pop(compose_id, {}) + + return True + + +def main(): + jobqueue = Queue('jobqueue') + jobqueue.connect() + + while True: + task = jobqueue.wait() + + task_data = task.data + pos, num_images = task_data['pos'] + + compose_details = task_data['compose'] + + if pos == 1: + session = init_model() + compose_id = compose_details['id'] + compose_obj = session.query(ComposeDetails).filter_by( + compose_id=compose_id).first() + compose_obj.status = u'r' + session.commit() + + + params = copy.deepcopy(compose_details) + params.update({'status': 'running'}) + publish_to_fedmsg(topic='compose.running', **params) + + result, running_status = auto_job(task_data) + + + +if __name__ == '__main__': + main() diff --git a/files/hotfix/autocloud/consumer.py b/files/hotfix/autocloud/consumer.py index c70cde9841..33db5205b2 100644 --- a/files/hotfix/autocloud/consumer.py +++ b/files/hotfix/autocloud/consumer.py @@ -1,11 +1,18 @@ # -*- coding: utf-8 -*- +from datetime import datetime +import requests import fedmsg.consumers -import koji +import fedfind.release + +from sqlalchemy import exc -from autocloud.utils import get_image_url, produce_jobs, get_image_name import autocloud +from autocloud.models import init_model, ComposeDetails +from autocloud.producer import publish_to_fedmsg +from autocloud.utils import is_valid_image, produce_jobs + import logging log = logging.getLogger("fedmsg") @@ -13,151 +20,120 @@ DEBUG = autocloud.DEBUG class AutoCloudConsumer(fedmsg.consumers.FedmsgConsumer): + """ + Fedmsg consumer for Autocloud + """ if DEBUG: topic = [ - 'org.fedoraproject.dev.__main__.buildsys.build.state.change', - 'org.fedoraproject.dev.__main__.buildsys.task.state.change', + 'org.fedoraproject.dev.__main__.pungi.compose.status.change' ] else: topic = [ - 'org.fedoraproject.prod.buildsys.build.state.change', - 'org.fedoraproject.prod.buildsys.task.state.change', + 'org.fedoraproject.prod.pungi.compose.status.change' ] config_key = 'autocloud.consumer.enabled' def __init__(self, *args, **kwargs): + log.info("Autocloud Consumer is ready for action.") super(AutoCloudConsumer, self).__init__(*args, **kwargs) - def _get_tasks(self, builds): - """ Takes a list of koji createImage task IDs and returns dictionary of - build ids and image url corresponding to that build ids""" - - if autocloud.VIRTUALBOX: - _supported_images = ('Fedora-Cloud-Base-Vagrant', - 'Fedora-Cloud-Atomic-Vagrant',) - else: - _supported_images = ('Fedora-Cloud-Base-Vagrant', - 'Fedora-Cloud-Atomic-Vagrant', - 'Fedora-Cloud-Atomic', 'Fedora-Cloud-Base',) - - for build in builds: - log.info('Got Koji build {0}'.format(build)) - - # Create a Koji connection to the Fedora Koji instance - koji_session = koji.ClientSession(autocloud.KOJI_SERVER_URL) - - image_files = [] # list of full URLs of files - - if len(builds) == 1: - task_result = koji_session.getTaskResult(builds[0]) - name = task_result.get('name') - #TODO: Change to get the release information from PDC instead - # of koji once it is set up - release = task_result.get('version') - if name in _supported_images: - task_relpath = koji.pathinfo.taskrelpath(int(builds[0])) - url = get_image_url(task_result.get('files'), task_relpath) - if url: - name = get_image_name(image_name=name) - data = { - 'buildid': builds[0], - 'image_url': url, - 'name': name, - 'release': release, - } - image_files.append(data) - elif len(builds) >= 2: - koji_session.multicall = True - for build in builds: - koji_session.getTaskResult(build) - results = koji_session.multiCall() - for result in results: - - if not result: - continue - - name = result[0].get('name') - if name not in _supported_images: - continue - - #TODO: Change to get the release information from PDC instead - # of koji once it is set up - release = result[0].get('version') - task_relpath = koji.pathinfo.taskrelpath( - int(result[0].get('task_id'))) - url = get_image_url(result[0].get('files'), task_relpath) - if url: - name = get_image_name(image_name=name) - data = { - 'buildid': result[0]['task_id'], - 'image_url': url, - 'name': name, - 'release': release, - } - image_files.append(data) - - return image_files - def consume(self, msg): """ This is called when we receive a message matching the topic. """ - if msg['topic'].endswith('.buildsys.task.state.change'): - # Do the thing you've always done... this will go away soon. - # releng is transitioning away from it. - self._consume_scratch_task(msg) - elif msg['topic'].endswith('.buildsys.build.state.change'): - # Do the new thing we need to do. handle a 'real build' from koji, - # not just a scratch task. - self._consume_real_build(msg) - else: - raise NotImplementedError("Should be impossible to get here...") - - def _consume_real_build(self, msg): - builds = list() # These will be the Koji task IDs to upload, if any. - - msg = msg['body']['msg'] - if msg['owner'] != 'releng': - log.debug("Dropping message. Owned by %r" % msg['owner']) - return - - if msg['instance'] != 'primary': - log.info("Dropping message. From %r instance." % msg['instance']) - return - - # Don't upload *any* images if one of them fails. - if msg['new'] != 1: - log.info("Dropping message. State is %r" % msg['new']) - return - - koji_session = koji.ClientSession(autocloud.KOJI_SERVER_URL) - children = koji_session.getTaskChildren(msg['task_id']) - for child in children: - if child["method"] == "createImage": - builds.append(child["id"]) - - if len(builds) > 0: - produce_jobs(self._get_tasks(builds)) - - def _consume_scratch_task(self, msg): - builds = list() # These will be the Koji build IDs to upload, if any. - - msg_info = msg["body"]["msg"]["info"] - log.info('Received %r %r' % (msg['topic'], msg['body']['msg_id'])) - # If the build method is "image", we check to see if the child - # task's method is "createImage". - if msg_info["method"] == "image": - if isinstance(msg_info["children"], list): - for child in msg_info["children"]: - if child["method"] == "createImage": - # We only care about the image if the build - # completed successfully (with state code 2). - if child["state"] == 2: - builds.append(child["id"]) + STATUS_F = ('FINISHED_INCOMPLETE', 'FINISHED',) + VARIANTS_F = ('CloudImages',) - if len(builds) > 0: - produce_jobs(self._get_tasks(builds)) + images = [] + compose_db_update = False + msg_body = msg['body'] + + if msg_body['msg']['status'] in STATUS_F: + location = msg_body['msg']['location'] + json_metadata = '{}/metadata/images.json'.format(location) + + resp = requests.get(json_metadata) + compose_images_json = getattr(resp, 'json', False) + + if compose_images_json: + compose_images_json = compose_images_json() + + compose_images = compose_images_json['payload']['images'] + compose_details = compose_images_json['payload']['compose'] + + compose_images = dict( + (variant, compose_images[variant]) + for variant in VARIANTS_F + if variant in compose_images + ) + + compose_id = compose_details['id'] + rel = fedfind.release.get_release(cid=compose_id) + release = rel.release + + compose_details.update({'release': release}) + + for variant in VARIANTS_F: + + if variant not in compose_images: + continue + + for arch, payload in compose_images[variant].iteritems(): + for item in payload: + relative_path = item['path'] + + if not is_valid_image(relative_path): + continue + + absolute_path = '{}/{}'.format(location, + relative_path) + + item.update({ + 'compose': compose_details, + 'absolute_path': absolute_path, + }) + images.append(item) + compose_db_update = True + + if compose_db_update: + session = init_model() + compose_date = datetime.strptime(compose_details['date'], + '%Y%m%d') + try: + cd = ComposeDetails( + date=compose_date, + compose_id=compose_details['id'], + respin=compose_details['respin'], + type=compose_details['type'], + status=u'q', + location=location, + ) + + session.add(cd) + session.commit() + + compose_details.update({ + 'status': 'queued', + 'compose_job_id': cd.id, + }) + publish_to_fedmsg(topic='compose.queued', + **compose_details) + + except exc.IntegrityError: + session.rollback() + cd = session.query(ComposeDetails).filter_by( + compose_id=compose_details['id']).first() + log.info('Compose already exists %s: %s' % ( + compose_details['id'], + cd.id + )) + + num_images = len(images) + for pos, image in enumerate(images): + image.update({'pos': (pos+1, num_images)}) + + produce_jobs(images) diff --git a/roles/autocloud/backend/tasks/main.yml b/roles/autocloud/backend/tasks/main.yml index 5d4f15cf46..b25c2853e7 100644 --- a/roles/autocloud/backend/tasks/main.yml +++ b/roles/autocloud/backend/tasks/main.yml @@ -120,3 +120,23 @@ tags: - autocloud - autocloud/backend + +- name: hotfix - copy over comsumer.py for autocloud + copy: src="{{ files }}/hotfix/autocloud/consumer.py" dest=/usr/lib/python2.7/site-packages/autocloud + owner=root group=fedmsg mode=0644 + notify: + - restart fedmsg-hub + tags: + - autocloud + - autocloud/backend + - hotfix + +- name: hotfix - copy over autocloud_job.py for autocloud + copy: src="{{ files }}/hotfix/autocloud/autocloud_job.py" dest=/usr/share/autocloud/autocloud_job.py + owner=root group=fedmsg mode=0644 + notify: + - restart fedmsg-hub + tags: + - autocloud + - autocloud/backend + - hotfix