#!/usr/bin/python -tt # A tool to help monitor & manage SELinux using func # # Copyright (C) 2009 Red Hat, Inc. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # Authors: Luke Macken import time from pprint import pprint from optparse import OptionParser from func import jobthing from func.overlord.client import Overlord status, stdout, stderr = range(3) class SELinuxOverlord(Overlord): selinux_status = {'Enforcing': [], 'Permissive': [], 'Disabled': []} selinux_minions = {} def __init__(self, minions): super(SELinuxOverlord, self).__init__(minions) self.minion_glob = minions def get_selinux_status(self): """ Get the SELinux status of all minions """ results = self.command.run('/usr/sbin/getenforce') for minion, result in results.iteritems(): if result[status]: print "[%s] Error: %s" % (minion, result) else: self.selinux_status[result[stdout].strip()].append(minion) for key in self.selinux_status: self.selinux_status[key].sort() return self.selinux_status def get_selinux_denials(self): """ Return all AVC denials from this week """ if len(self.selinux_minions): for minion, result in self.selinux_minions.iteritems(): yield minion, result else: results = self.command.run('ausearch -m AVC -ts this-week --input-logs') for minion, result in results.iteritems(): self.selinux_minions[minion] = result yield minion, result def dump_selinux_denials(self): """ Write out all SELinux denials for all minions """ for minion, result in self.get_selinux_denials(): if not result[status]: out = file(minion, 'w') out.write(result[stdout]) out.close() print "[%s] Successfully collected this weeks AVCs" % minion else: if '\n' in result: print "[%s] No AVCs Found" % minion out = file(minion, 'w') out.close() else: print "[%s] Problem running ausearch: %r" % (minion, result) def get_enforced_denials(self): """ Print all denials from SELinux-enforcing minions """ for minion, result in self.get_selinux_denials(): if minion not in self.selinux_status['Enforcing']: continue if not result[status]: overlord = Overlord(minion) audit2allow = overlord.command.run('audit2allow -la') for m, r in audit2allow.iteritems(): if r[stdout].strip(): print "[%s]\n%s\n" % (m, r[stdout]) def upgrade_policy(self): """ Update the SELinux policy across the given minions """ print "Cleaning yum metadata..." results = self.command.run('yum clean metadata') for minion, result in results.items(): if result[0]: print "[%s] Problem cleaning yum cache: %s" % (minion, result[1]) async_client = Overlord(self.minion_glob, nforks=10, async=True) print "Upgrading SELinux policy..." job_id = async_client.command.run('yum -y update selinux*') running = True while running: time.sleep(20) return_code, results = async_client.job_status(job_id) if return_code == jobthing.JOB_ID_RUNNING: continue elif return_code in (jobthing.JOB_ID_FINISHED, jobthing.JOB_ID_PARTIAL): for minion, result in results.items(): if result[0]: print '[%s] Problem upgrading policy: %s' % (minion, result[1]) if 'Updated: selinux-policy' in result[1]: ver = result[1].split('Updated: ')[-1].split()[1].split(':')[1] print "[%s] selinux-policy successfully upgraded to %s" % (minion, ver) else: print "selinux-policy *not* upgraded on %s: %s" % (minion, result[1]) if return_code == jobthing.JOB_ID_FINISHED: running = False elif return_code == jobthing.JOB_ID_LOST_IN_SPACE: print "Job %s lost in space: %s" % (job_id, results) else: print "Unknown return code %s: %s" % (return_code, results) print "SELinux policy upgrade complete!" if __name__ == '__main__': parser = OptionParser('usage: %prog [options] [minion(s)]') parser.add_option('-s', '--status', action='store_true', dest='status', help='Display the SELinux status of all minions') parser.add_option('-e', '--enforced-denials', action='store_true', dest='enforced_denials', help='Display enforced denials') parser.add_option('-d', '--dump-avcs', action='store_true', dest='dump_avcs', help='Dump AVCs to disk') parser.add_option('-u', '--upgrade-policy', action='store_true', dest='upgrade', help='Upgrade SELinux policy') opts, args = parser.parse_args() minions = len(args) > 0 and ';'.join(args) or '*' overlord = SELinuxOverlord(minions) if opts.status or opts.enforced_denials: print "Determining SELinux status on minions: %s" % minions pprint(overlord.get_selinux_status()) if opts.enforced_denials: print "Finding enforced SELinux denials..." overlord.get_enforced_denials() if opts.dump_avcs: print "Dumping SELinux denials to disk..." overlord.dump_selinux_denials() if opts.upgrade: overlord.upgrade_policy() # vim: ts=4 sw=4 expandtab ai