Original autocloud_job.py and consumer file for hotfix

This commit is contained in:
Sayan Chowdhury 2016-06-28 22:14:01 +05:30 committed by Sayan Chowdhury
parent 8e1fdc82e4
commit f36696a1fd
3 changed files with 458 additions and 130 deletions

View file

@ -0,0 +1,332 @@
# -*- coding: utf-8 -*-
import copy
import datetime
import json
import os
import subprocess
import sys
from collections import defaultdict
import fedfind.release
from retask.queue import Queue
from autocloud.constants import SUCCESS, FAILED, ABORTED, RUNNING
from autocloud.models import init_model, ComposeJobDetails, ComposeDetails
from autocloud.producer import publish_to_fedmsg
import logging
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger(__name__)
tree = lambda: defaultdict(tree)
results = tree()
def handle_err(session, data, out, err):
"""
Prints the details and exits.
:param out:
:param err:
:return: None
"""
# Update DB first.
data.status = u'f'
data.output = "%s: %s" % (out, err)
timestamp = datetime.datetime.now()
data.last_updated = timestamp
session.commit()
log.debug("%s: %s", out, err)
def system(cmd):
"""
Runs a shell command, and returns the output, err, returncode
:param cmd: The command to run.
:return: Tuple with (output, err, returncode).
"""
ret = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
out, err = ret.communicate()
returncode = ret.returncode
return out, err, returncode
def refresh_storage_pool():
'''Refreshes libvirt storage pool.
http://kushaldas.in/posts/storage-volume-error-in-libvirt-with-vagrant.html
'''
out, err, retcode = system('virsh pool-list')
lines = out.split('\n')
if len(lines) > 2:
for line in lines[2:]:
words = line.split()
if len(words) == 3:
if words[1] == 'active':
system('virsh pool-refresh {0}'.format(words[0]))
def image_cleanup(image_path):
"""
Delete the image if it is processed or if there is any exception occur
:param basename: Absoulte path for image
"""
if os.path.exists(image_path):
try:
os.remove(image_path)
except OSError as e:
log.error('Error: %s - %s.', e.filename, e.strerror)
def create_dirs():
"""
Creates the runtime dirs
"""
system('mkdir -p /var/run/tunir')
system('mkdir -p /var/run/autocloud')
def create_result_text(out):
"""
:param out: Output text from the command.
"""
lines = out.splitlines()
for line in lines:
if line.startswith('Result file at:'):
result_filename = line.split(' ')[1]
result_filename = result_filename.strip()
if os.path.exists(result_filename):
new_content = ''
with open(result_filename) as fobj:
new_content = fobj.read()
job_status_index = out.find('Job status:')
if job_status_index == -1:
return out # No job status in the output.
new_line_index = out[job_status_index:].find('\n')
out = out[:job_status_index + new_line_index]
out = out + '\n\n' + new_content
system('rm -f {0}'.format(result_filename))
return out
return out
def auto_job(task_data):
"""
This fuction queues the job, and then executes the tests,
updates the db as required.
:param taskid: Koji taskid.
:param image_url: URL to download the fedora image.
:return:
"""
# TODO:
# We will have to update the job information on DB, rather
# than creating it. But we will do it afterwards.
compose_image_url = task_data['absolute_path']
compose_id = task_data['compose']['id']
release = task_data['compose']['release']
job_id = task_data['job_id']
image_type = task_data['type']
job_type = 'vm'
# Just to make sure that we have runtime dirs
create_dirs()
session = init_model()
timestamp = datetime.datetime.now()
data = None
try:
data = session.query(ComposeJobDetails).get(str(job_id))
data.status = u'r'
data.last_updated = timestamp
except Exception as err:
log.error("%s" % err)
log.error("%s: %s", compose_id, compose_image_url)
session.commit()
params = {
'compose_url': compose_image_url,
'compose_id': compose_id,
'status': RUNNING,
'job_id': job_id,
'release': release,
'family': data.family.value,
'type': image_type,
'image_name': data.image_name,
}
publish_to_fedmsg(topic='image.running', **params)
# Now we have job queued, let us start the job.
# Step 1: Download the image
image_url = compose_image_url
basename = os.path.basename(image_url)
image_path = '/var/run/autocloud/%s' % basename
log.debug("Going to download {0}".format(image_url))
out, err, ret_code = system('wget %s -O %s' % (image_url, image_path))
if ret_code:
image_cleanup(image_path)
handle_err(session, data, out, err)
log.debug("Return code: %d" % ret_code)
params.update({'status': FAILED})
publish_to_fedmsg(topic='image.failed', **params)
return FAILED, check_status_of_compose_image(compose_id)
# Step 2: Create the conf file with correct image path.
if basename.find('vagrant') == -1:
conf = {"image": "/var/run/autocloud/%s" % basename,
"name": "fedora",
"password": "passw0rd",
"ram": 2048,
"type": "vm",
"user": "fedora"}
else: # We now have a Vagrant job.
conf = {
"name": "fedora",
"type": "vagrant",
"image": "/var/run/autocloud/%s" % basename,
"ram": 2048,
"user": "vagrant",
"port": "22"
}
if basename.find('virtualbox') != -1:
conf['provider'] = 'virtualbox'
job_type = 'vagrant'
#Now let us refresh the storage pool
refresh_storage_pool()
with open('/var/run/autocloud/fedora.json', 'w') as fobj:
fobj.write(json.dumps(conf))
system('/usr/bin/cp -f /etc/autocloud/fedora.txt /var/run/autocloud/fedora.txt')
cmd = 'tunir --job fedora --config-dir /var/run/autocloud/'
# Now run tunir
out, err, ret_code = system(cmd)
if ret_code:
image_cleanup(image_path)
handle_err(session, data, create_result_text(out), err)
log.debug("Return code: %d" % ret_code)
params.update({'status': FAILED})
publish_to_fedmsg(topic='image.failed', **params)
return FAILED, check_status_of_compose_image(compose_id)
else:
image_cleanup(image_path)
# Enabling direct stdout as output of the command
out = create_result_text(out)
if job_type == 'vm':
com_text = out[out.find('/usr/bin/qemu-kvm'):]
else:
com_text = out
data.status = u's'
timestamp = datetime.datetime.now()
data.last_updated = timestamp
data.output = com_text
session.commit()
params.update({'status': SUCCESS})
publish_to_fedmsg(topic='image.success', **params)
return SUCCESS, check_status_of_compose_image(compose_id)
def check_status_of_compose_image(compose_id):
session = init_model()
compose_job_objs = session.query(ComposeJobDetails).filter_by(
compose_id=compose_id).all()
compose_obj = session.query(ComposeDetails).filter_by(
compose_id=compose_id).first()
is_running = False
for compose_job_obj in compose_job_objs:
status = compose_job_obj.status.code
if status in ('r', 'q'):
is_running = True
break
if is_running:
return False
for compose_job_obj in compose_job_objs:
status = compose_job_obj.status.code
if status in ('s',):
results[compose_id][SUCCESS] = results[compose_id].get(SUCCESS, 0) + 1
elif status in ('f', 'a'):
results[compose_id][FAILED] = results[compose_id].get(FAILED, 0) + 1
if isinstance(results[compose_id][SUCCESS], defaultdict):
results[compose_id][SUCCESS] = 0
if isinstance(results[compose_id][FAILED], defaultdict):
results[compose_id][FAILED] = 0
compose_obj.passed = results[compose_id][SUCCESS]
compose_obj.failed = results[compose_id][FAILED]
compose_obj.status = u'c'
session.commit()
compose_id = compose_obj.compose_id
rel = fedfind.release.get_release(cid=compose_id)
release = rel.release
params = {
'compose_id': compose_obj.compose_id,
'respin': compose_obj.respin,
'type': compose_obj.type,
'date': datetime.datetime.strftime(compose_obj.date, '%Y%m%d'),
'results': results[compose_id],
'release': release,
'status': 'completed',
'compose_job_id': compose_obj.id
}
publish_to_fedmsg(topic='compose.complete', **params)
results.pop(compose_id, {})
return True
def main():
jobqueue = Queue('jobqueue')
jobqueue.connect()
while True:
task = jobqueue.wait()
task_data = task.data
pos, num_images = task_data['pos']
compose_details = task_data['compose']
if pos == 1:
session = init_model()
compose_id = compose_details['id']
compose_obj = session.query(ComposeDetails).filter_by(
compose_id=compose_id).first()
compose_obj.status = u'r'
session.commit()
params = copy.deepcopy(compose_details)
params.update({'status': 'running'})
publish_to_fedmsg(topic='compose.running', **params)
result, running_status = auto_job(task_data)
if __name__ == '__main__':
main()

View file

@ -1,11 +1,18 @@
# -*- coding: utf-8 -*-
from datetime import datetime
import requests
import fedmsg.consumers
import koji
import fedfind.release
from sqlalchemy import exc
from autocloud.utils import get_image_url, produce_jobs, get_image_name
import autocloud
from autocloud.models import init_model, ComposeDetails
from autocloud.producer import publish_to_fedmsg
from autocloud.utils import is_valid_image, produce_jobs
import logging
log = logging.getLogger("fedmsg")
@ -13,151 +20,120 @@ DEBUG = autocloud.DEBUG
class AutoCloudConsumer(fedmsg.consumers.FedmsgConsumer):
"""
Fedmsg consumer for Autocloud
"""
if DEBUG:
topic = [
'org.fedoraproject.dev.__main__.buildsys.build.state.change',
'org.fedoraproject.dev.__main__.buildsys.task.state.change',
'org.fedoraproject.dev.__main__.pungi.compose.status.change'
]
else:
topic = [
'org.fedoraproject.prod.buildsys.build.state.change',
'org.fedoraproject.prod.buildsys.task.state.change',
'org.fedoraproject.prod.pungi.compose.status.change'
]
config_key = 'autocloud.consumer.enabled'
def __init__(self, *args, **kwargs):
log.info("Autocloud Consumer is ready for action.")
super(AutoCloudConsumer, self).__init__(*args, **kwargs)
def _get_tasks(self, builds):
""" Takes a list of koji createImage task IDs and returns dictionary of
build ids and image url corresponding to that build ids"""
if autocloud.VIRTUALBOX:
_supported_images = ('Fedora-Cloud-Base-Vagrant',
'Fedora-Cloud-Atomic-Vagrant',)
else:
_supported_images = ('Fedora-Cloud-Base-Vagrant',
'Fedora-Cloud-Atomic-Vagrant',
'Fedora-Cloud-Atomic', 'Fedora-Cloud-Base',)
for build in builds:
log.info('Got Koji build {0}'.format(build))
# Create a Koji connection to the Fedora Koji instance
koji_session = koji.ClientSession(autocloud.KOJI_SERVER_URL)
image_files = [] # list of full URLs of files
if len(builds) == 1:
task_result = koji_session.getTaskResult(builds[0])
name = task_result.get('name')
#TODO: Change to get the release information from PDC instead
# of koji once it is set up
release = task_result.get('version')
if name in _supported_images:
task_relpath = koji.pathinfo.taskrelpath(int(builds[0]))
url = get_image_url(task_result.get('files'), task_relpath)
if url:
name = get_image_name(image_name=name)
data = {
'buildid': builds[0],
'image_url': url,
'name': name,
'release': release,
}
image_files.append(data)
elif len(builds) >= 2:
koji_session.multicall = True
for build in builds:
koji_session.getTaskResult(build)
results = koji_session.multiCall()
for result in results:
if not result:
continue
name = result[0].get('name')
if name not in _supported_images:
continue
#TODO: Change to get the release information from PDC instead
# of koji once it is set up
release = result[0].get('version')
task_relpath = koji.pathinfo.taskrelpath(
int(result[0].get('task_id')))
url = get_image_url(result[0].get('files'), task_relpath)
if url:
name = get_image_name(image_name=name)
data = {
'buildid': result[0]['task_id'],
'image_url': url,
'name': name,
'release': release,
}
image_files.append(data)
return image_files
def consume(self, msg):
""" This is called when we receive a message matching the topic. """
if msg['topic'].endswith('.buildsys.task.state.change'):
# Do the thing you've always done... this will go away soon.
# releng is transitioning away from it.
self._consume_scratch_task(msg)
elif msg['topic'].endswith('.buildsys.build.state.change'):
# Do the new thing we need to do. handle a 'real build' from koji,
# not just a scratch task.
self._consume_real_build(msg)
else:
raise NotImplementedError("Should be impossible to get here...")
def _consume_real_build(self, msg):
builds = list() # These will be the Koji task IDs to upload, if any.
msg = msg['body']['msg']
if msg['owner'] != 'releng':
log.debug("Dropping message. Owned by %r" % msg['owner'])
return
if msg['instance'] != 'primary':
log.info("Dropping message. From %r instance." % msg['instance'])
return
# Don't upload *any* images if one of them fails.
if msg['new'] != 1:
log.info("Dropping message. State is %r" % msg['new'])
return
koji_session = koji.ClientSession(autocloud.KOJI_SERVER_URL)
children = koji_session.getTaskChildren(msg['task_id'])
for child in children:
if child["method"] == "createImage":
builds.append(child["id"])
if len(builds) > 0:
produce_jobs(self._get_tasks(builds))
def _consume_scratch_task(self, msg):
builds = list() # These will be the Koji build IDs to upload, if any.
msg_info = msg["body"]["msg"]["info"]
log.info('Received %r %r' % (msg['topic'], msg['body']['msg_id']))
# If the build method is "image", we check to see if the child
# task's method is "createImage".
if msg_info["method"] == "image":
if isinstance(msg_info["children"], list):
for child in msg_info["children"]:
if child["method"] == "createImage":
# We only care about the image if the build
# completed successfully (with state code 2).
if child["state"] == 2:
builds.append(child["id"])
STATUS_F = ('FINISHED_INCOMPLETE', 'FINISHED',)
VARIANTS_F = ('CloudImages',)
if len(builds) > 0:
produce_jobs(self._get_tasks(builds))
images = []
compose_db_update = False
msg_body = msg['body']
if msg_body['msg']['status'] in STATUS_F:
location = msg_body['msg']['location']
json_metadata = '{}/metadata/images.json'.format(location)
resp = requests.get(json_metadata)
compose_images_json = getattr(resp, 'json', False)
if compose_images_json:
compose_images_json = compose_images_json()
compose_images = compose_images_json['payload']['images']
compose_details = compose_images_json['payload']['compose']
compose_images = dict(
(variant, compose_images[variant])
for variant in VARIANTS_F
if variant in compose_images
)
compose_id = compose_details['id']
rel = fedfind.release.get_release(cid=compose_id)
release = rel.release
compose_details.update({'release': release})
for variant in VARIANTS_F:
if variant not in compose_images:
continue
for arch, payload in compose_images[variant].iteritems():
for item in payload:
relative_path = item['path']
if not is_valid_image(relative_path):
continue
absolute_path = '{}/{}'.format(location,
relative_path)
item.update({
'compose': compose_details,
'absolute_path': absolute_path,
})
images.append(item)
compose_db_update = True
if compose_db_update:
session = init_model()
compose_date = datetime.strptime(compose_details['date'],
'%Y%m%d')
try:
cd = ComposeDetails(
date=compose_date,
compose_id=compose_details['id'],
respin=compose_details['respin'],
type=compose_details['type'],
status=u'q',
location=location,
)
session.add(cd)
session.commit()
compose_details.update({
'status': 'queued',
'compose_job_id': cd.id,
})
publish_to_fedmsg(topic='compose.queued',
**compose_details)
except exc.IntegrityError:
session.rollback()
cd = session.query(ComposeDetails).filter_by(
compose_id=compose_details['id']).first()
log.info('Compose already exists %s: %s' % (
compose_details['id'],
cd.id
))
num_images = len(images)
for pos, image in enumerate(images):
image.update({'pos': (pos+1, num_images)})
produce_jobs(images)

View file

@ -120,3 +120,23 @@
tags:
- autocloud
- autocloud/backend
- name: hotfix - copy over comsumer.py for autocloud
copy: src="{{ files }}/hotfix/autocloud/consumer.py" dest=/usr/lib/python2.7/site-packages/autocloud
owner=root group=fedmsg mode=0644
notify:
- restart fedmsg-hub
tags:
- autocloud
- autocloud/backend
- hotfix
- name: hotfix - copy over autocloud_job.py for autocloud
copy: src="{{ files }}/hotfix/autocloud/autocloud_job.py" dest=/usr/share/autocloud/autocloud_job.py
owner=root group=fedmsg mode=0644
notify:
- restart fedmsg-hub
tags:
- autocloud
- autocloud/backend
- hotfix