[yum-commits] plugins/security

James Antill james at osuosl.org
Tue Aug 3 04:06:20 UTC 2010


 plugins/security/security.py |  324 ++++++++++++++++++++++++++-----------------
 1 file changed, 200 insertions(+), 124 deletions(-)

New commits:
commit 7dc568d8758729c68f9c0a86bc855b656809bd24
Author: James Antill <james at and.org>
Date:   Mon Aug 2 11:59:21 2010 -0400

     Big changes to security plugin command:
    
    . Move list, info, summary under a single "updateinfo" command.
    
    . Allow each command to see "available", "installed", "updates" or
      "all". So you can easily see what has been applied as well as what
      is to be applied.
    
    . Should be fully backward compat.
    
    . Cleanup some output.
    
    ...plugin now approached ridiculously badly named status.

diff --git a/plugins/security/security.py b/plugins/security/security.py
index 4247560..f32ccce 100755
--- a/plugins/security/security.py
+++ b/plugins/security/security.py
@@ -146,16 +146,6 @@ def ysp_has_info_md(rname, md):
             continue
         return md
 
-def ysp_should_show_pkgtup(opts, pkgtup, md_info, used_map, rname=None):
-    """ Do we want to show this package in list-updateinfo. """
-    
-    name = pkgtup[0]
-    for (pkgtup, notice) in reversed(md_info.get_applicable_notices(pkgtup)):
-        if rname and not ysp_has_info_md(rname, notice):
-            continue
-        if ysp_should_filter_pkg(opts, name, notice, used_map):
-            yield (pkgtup, notice)
-
 def ysp_gen_used_map(opts):
     used_map = {'bugzilla' : {}, 'cve' : {}, 'id' : {}, 'cmd' : {}}
     for i in opts.sec_cmds:
@@ -171,7 +161,7 @@ def ysp_gen_used_map(opts):
 def ysp_chk_used_map(used_map, msg):
     for i in used_map['cmd']:
         if not used_map['cmd'][i]:
-            msg('Argument \"%s\" not found applicable for this system' % i)
+            msg('No update information found for \"%s\"' % i)
     for i in used_map['id']:
         if not used_map['id'][i]:
             msg('Advisory \"%s\" not found applicable for this system' % i)
@@ -182,34 +172,98 @@ def ysp_chk_used_map(used_map, msg):
         if not used_map['cve'][i]:
             msg('CVE \"%s\" not found applicable for this system' % i)
 
-class SecurityListCommand:
+class UpdateinfoCommand:
+    # Old command names...
+    direct_cmds = {'list-updateinfo'    : 'list',
+                   'list-security'      : 'list',
+                   'list-sec'           : 'list',
+                   'info-updateinfo'    : 'info',
+                   'info-security'      : 'info',
+                   'info-sec'           : 'info',
+                   'summary-updateinfo' : 'summary'}
+
     def getNames(self):
-        return ['list-updateinfo', 'list-security', 'list-sec']
+        return ['updateinfo'] + sorted(self.direct_cmds.keys())
 
     def getUsage(self):
         return "[security|bugzilla|cve|new-packages] [PACKAGE-wildcard]"
 
     def getSummary(self):
-        return "Returns security data for the packages listed, that affects your system"
+        return "Acts on repository update information"
 
     def doCheck(self, base, basecmd, extcmds):
         pass
 
-    def show_pkg(self, base, msg, pkg, notice, disp=None):
-        # Make the list view much smaller
-        # ysp_show_pkg_md_info(pkg, md, msg)
-        if disp and ysp_has_info_md(disp, notice):
-            for ref in ysp__safe_refs(notice['references']):
-                if ref['type'] != disp:
-                    continue
-                msg(" %s %-8s %s" % (str(ref['id']), notice['type'], pkg))
-        elif notice['type'] == 'newpackage':
-            print base.fmtKeyValFill("%s: " % pkg.name, base._enc(pkg.summary))
-        else:
-            msg("%s %-8s %s" % (notice['update_id'], notice['type'], pkg))
+    def list_show_pkgs(self, base, md_info, list_type, show_type, data, msg):
+        n_maxsize = 0
+        r_maxsize = 0
+        t_maxsize = 0
+        for (notice, pkgtup, pkg) in data:
+            n_maxsize = max(len(notice['update_id']), n_maxsize)
+            t_maxsize = max(len(notice['type']),      t_maxsize)
+            if show_type:
+                for ref in ysp__safe_refs(notice['references']):
+                    if ref['type'] != show_type:
+                        continue
+                    r_maxsize = max(len(str(ref['id'])), r_maxsize)
+
+        for (notice, pkgtup, pkg) in data:
+            if show_type and ysp_has_info_md(show_type, notice):
+                for ref in ysp__safe_refs(notice['references']):
+                    if ref['type'] != show_type:
+                        continue
+                    msg(" %-*s %-*s %s" % (r_maxsize, str(ref['id']),
+                                           t_maxsize, notice['type'], pkg))
+            elif hasattr(pkg, 'name'):
+                print base.fmtKeyValFill("%s: " % pkg.name,
+                                         base._enc(pkg.summary))
+            else:
+                msg("%-*s %-*s %s" % (n_maxsize, notice['update_id'],
+                                      t_maxsize, notice['type'], pkg))
 
-    def show_pkg_exit(self, base, md_info):
-        pass
+    def info_show_pkgs(self, base, md_info, list_type, show_type, data, msg):
+        show_pkg_info_done = {}
+        for (notice, pkgtup, pkg) in data:
+            if notice['update_id'] in show_pkg_info_done:
+                continue
+            show_pkg_info_done[notice['update_id']] = notice
+            # Python-2.4.* doesn't understand str(x) returning unicode *sigh*
+            obj = notice.__str__()
+            msg(obj)
+
+    def summary_show_pkgs(self, base, md_info, list_type, show_type, data, msg):
+        def _msg(x):
+            print x
+        counts = {}
+        show_pkg_info_done = {}
+        for (notice, pkgtup, pkg) in data:
+            if notice['update_id'] in show_pkg_info_done:
+                continue
+            show_pkg_info_done[notice['update_id']] = notice
+            counts[notice['type']] = counts.get(notice['type'], 0) + 1
+
+        maxsize = 0
+        for T in ('newpackage', 'security', 'bugfix', 'enhancement'):
+            if T not in counts:
+                continue
+            size = len(str(counts[T]))
+            if maxsize < size:
+                maxsize = size
+        if not maxsize:
+            _check_running_kernel(base, md_info, _msg)
+            return
+
+        outT = {'newpackage' : 'New Package',
+                'security' : 'Security',
+                'bugfix' : 'Bugfix',
+                'enhancement' : 'Enhancement'}
+        print "Updates Information Summary:", list_type
+        for T in ('newpackage', 'security', 'bugfix', 'enhancement'):
+            if T not in counts:
+                continue
+            print "    %*u %s notice(s)" % (maxsize, counts[T], outT[T])
+        _check_running_kernel(base, md_info, _msg)
+        self.show_pkg_info_done = {}
 
     def _get_new_pkgs(self, md_info):
         for notice in md_info.notices:
@@ -222,13 +276,56 @@ class SecurityListCommand:
                     yield (notice, pkgtup)
 
     def doCommand(self, base, basecmd, extcmds):
-        self.repos = base.repos
-        md_info = ysp_gen_metadata(self.repos.listEnabled())
-        logger = logging.getLogger("yum.verbose.main")
-        def msg(x):
-            logger.log(logginglevels.INFO_2, x)
+        if basecmd in self.direct_cmds:
+            subcommand = self.direct_cmds[basecmd]
+        elif extcmds and extcmds[0] in ('list', 'info', 'summary'):
+            subcommand = extcmds[0]
+            extcmds = extcmds[1:]
+        else:
+            subcommand = 'summary'
+
+        if subcommand == 'list':
+            return self.doCommand_li(base, 'updateinfo list', extcmds,
+                                     self.list_show_pkgs)
+        if subcommand == 'info':
+            return self.doCommand_li(base, 'updateinfo info', extcmds,
+                                     self.info_show_pkgs)
+
+        if subcommand == 'summary':
+            return self.doCommand_li(base, 'updateinfo summary', extcmds,
+                                     self.summary_show_pkgs)
+
+    def doCommand_li_new(self, base, list_type, extcmds, md_info, msg,
+                         show_pkgs):
+        done_pkgs = set()
+        data = []
+        for (notice, pkgtup) in sorted(self._get_new_pkgs(md_info),
+                                       key=lambda x: x[1][0]):
+            if extcmds and not _match_sec_cmd(extcmds, pkgtup[0], notice):
+                continue
+            n = pkgtup[0]
+            if n in done_pkgs:
+                continue
+            ipkgs = list(reversed(sorted(base.rpmdb.searchNames([n]))))
+            if list_type in ('installed', 'updates') and not ipkgs:
+                done_pkgs.add(n)
+                continue
+            if list_type == 'available' and ipkgs:
+                done_pkgs.add(n)
+                continue
 
-        opts, cmdline = base.plugins.cmdline
+            pkgs = base.pkgSack.searchPkgTuple(pkgtup)
+            if not pkgs:
+                continue
+            if list_type == "updates" and pkgs[0].verLE(ipkgs[0]):
+                done_pkgs.add(n)
+                continue
+            done_pkgs.add(n)
+            data.append((notice, pkgtup, pkgs[0]))
+        show_pkgs(base, md_info, list_type, None, data, msg)
+
+    @staticmethod
+    def _parse_extcmds(extcmds):
         filt_type = None
         show_type = None
         if len(extcmds) >= 1:
@@ -267,103 +364,74 @@ class SecurityListCommand:
             show_type = filt_type
             if filt_type and filt_type in __update_info_types__:
                 show_type = None
+        return extcmds, show_type, filt_type
+
+    def doCommand_li(self, base, basecmd, extcmds, show_pkgs):
+        self.repos = base.repos
+        md_info = ysp_gen_metadata(self.repos.listEnabled())
+        logger = logging.getLogger("yum.verbose.main")
+        def msg(x):
+            logger.log(logginglevels.INFO_2, x)
+
+        opts, cmdline = base.plugins.cmdline
+        extcmds, show_type, filt_type = self._parse_extcmds(extcmds)
+
+        list_type = "available"
+        if extcmds and extcmds[0] in ("updates","available","installed", "all"):
+            list_type = extcmds.pop(0)
 
         if filt_type == "newpackage":
             # No filtering here, as we want what isn't installed...
-            done_pkgs = set()
-            for (notice, pkgtup) in sorted(self._get_new_pkgs(md_info),
-                                           key=lambda x: x[1][0]):
-                if extcmds and not _match_sec_cmd(extcmds, pkgtup[0], notice):
-                    continue
-                if pkgtup[0] in done_pkgs:
-                    continue
-                pkgs = base.pkgSack.searchPkgTuple(pkgtup)
-                if not pkgs:
-                    continue
-                done_pkgs.add(pkgs[0].name)
-                self.show_pkg(base, msg, pkgs[0], notice, None)
-            self.show_pkg_exit(base, md_info)
+            self.doCommand_li_new(base, list_type, extcmds, md_info, msg,
+                                  show_pkgs)
             return 0, [basecmd + ' new done']
 
         opts.sec_cmds = extcmds
         used_map = ysp_gen_used_map(opts)
-        name2tup = _get_name2oldpkgtup(base)
+        if False: pass
+        elif list_type == 'all':
+            name2tup = _get_name2allpkgtup(base)
+        elif list_type == 'updates':
+            name2tup = _get_name2oldpkgtup(base)
+        elif list_type == 'available':
+            name2tup = _get_name2instpkgtup(base)
+        elif list_type == 'installed':
+            name2tup  = _get_name2allpkgtup(base)
+            iname2tup = _get_name2instpkgtup(base)
+
+        def _show_pkgtup(pkgtup):
+            name = pkgtup[0]
+            notices = reversed(md_info.get_applicable_notices(pkgtup))
+            for (pkgtup, notice) in notices:
+                if filt_type and not ysp_has_info_md(filt_type, notice):
+                    continue
+
+                if list_type == 'installed':
+                    # Remove any that are newer than what we have installed
+                    if _rpm_tup_vercmp(iname2tup[name], pkgtup) < 0:
+                        continue
+
+                if ysp_should_filter_pkg(opts, name, notice, used_map):
+                    yield (pkgtup, notice)
+
+        data = []
         for pkgname in sorted(name2tup):
-            for (pkgtup, notice) in ysp_should_show_pkgtup(opts,
-                                                           name2tup[pkgname],
-                                                           md_info,
-                                                           used_map, filt_type):
+            for (pkgtup, notice) in _show_pkgtup(name2tup[pkgname]):
                 d = {}
                 (d['n'], d['a'], d['e'], d['v'], d['r']) = pkgtup
                 if d['e'] == '0':
                     d['epoch'] = ''
                 else:
                     d['epoch'] = "%s:" % d['e']
-                self.show_pkg(base, msg, "%(n)s-%(epoch)s%(v)s-%(r)s.%(a)s" % d,
-                              notice, show_type)
+                data.append((notice, pkgtup,
+                            "%(n)s-%(epoch)s%(v)s-%(r)s.%(a)s" % d))
+        show_pkgs(base, md_info, list_type, show_type, data, msg)
+
         ysp_chk_used_map(used_map, msg)
 
-        self.show_pkg_exit(base, md_info)
         return 0, [basecmd + ' done']
             
 
-class SecurityInfoCommand(SecurityListCommand):
-    show_pkg_info_done = {}
-    def getNames(self):
-        return ['info-updateinfo', 'info-security', 'info-sec']
-
-    def show_pkg(self, base, msg, pkg, notice, disp=None):
-        if notice['update_id'] in self.show_pkg_info_done:
-            return
-        self.show_pkg_info_done[notice['update_id']] = True
-        # Python-2.4.* doesn't understand str(x) returning unicode *sigh*
-        obj = notice.__str__()
-        msg(obj)
-
-    def show_pkg_exit(self, base, md_info):
-        self.show_pkg_info_done = {}
-
-
-class SecuritySummaryCommand(SecurityListCommand):
-    show_pkg_info_done = {}
-    def getNames(self):
-        return ['summary-updateinfo']
-
-    def show_pkg(self, base, msg, pkg, notice, disp=None):
-        if notice['update_id'] in self.show_pkg_info_done:
-            return
-        self.show_pkg_info_done[notice['update_id']] = notice
-
-    def show_pkg_exit(self, base, md_info):
-        def _msg(x):
-            print x
-        counts = {}
-        for notice in self.show_pkg_info_done.values():
-            counts[notice['type']] = counts.get(notice['type'], 0) + 1
-        maxsize = 0
-        for T in ('newpackage', 'security', 'bugfix', 'enhancement'):
-            if T not in counts:
-                continue
-            size = len(str(counts[T]))
-            if maxsize < size:
-                maxsize = size
-        if not maxsize:
-            _check_running_kernel(base, md_info, _msg)
-            return
-
-        outT = {'newpackage' : 'New Package',
-                'security' : 'Security',
-                'bugfix' : 'Bugfix',
-                'enhancement' : 'Enhancement'}
-        print "Updates Info Summary:"
-        for T in ('newpackage', 'security', 'bugfix', 'enhancement'):
-            if T not in counts:
-                continue
-            print "    %*u %s notice(s)" % (maxsize, counts[T], outT[T])
-        _check_running_kernel(base, md_info, _msg)
-        self.show_pkg_info_done = {}
-
-
 # "Borrowed" from yumcommands.py
 def yumcommands_checkRootUID(base):
     """
@@ -395,17 +463,29 @@ For more information contact your distribution or package provider.
                 base.logger.critical(msg)
                 raise CliError
 
-#  We need the list of installed pkgs, that are going to be updated
-# (by default). Then we match their names to the above.
-def _get_name2oldpkgtup(base):
-    oupdates = map(lambda x: x[1], base.up.getUpdatesTuples())
+def _get_name2pkgtup(base, pkgtups):
     name2tup = {}
-    for pkgtup in oupdates: # Get the latest "old" pkgtups
+    for pkgtup in pkgtups:
+        # Get the latest "old" pkgtups
         if (pkgtup[0] in name2tup and
             _rpm_tup_vercmp(name2tup[pkgtup[0]], pkgtup) > 0):
             continue
         name2tup[pkgtup[0]] = pkgtup
     return name2tup
+def _get_name2oldpkgtup(base):
+    """ Get the pkgtups for all installed pkgs. which have an update. """
+    oupdates = map(lambda x: x[1], base.up.getUpdatesTuples())
+    return _get_name2pkgtup(base, oupdates)
+def _get_name2instpkgtup(base):
+    """ Get the pkgtups for all installed pkgs. """
+    return _get_name2pkgtup(base, base.rpmdb.simplePkgList())
+def _get_name2allpkgtup(base):
+    """ Get the pkgtups for all installed pkgs. and munge that to be the
+        first possible pkgtup. """
+    ofirst = [(pt[0], pt[1], '0','0','0') for pt in base.rpmdb.simplePkgList()]
+    return _get_name2pkgtup(base, ofirst)
+
+
 
 class SecurityUpdateCommand:
     def getNames(self):
@@ -479,9 +559,7 @@ def config_hook(conduit):
     if hasattr(parser, 'plugin_option_group'):
         parser = parser.plugin_option_group
 
-    conduit.registerCommand(SecurityListCommand())
-    conduit.registerCommand(SecurityInfoCommand())
-    conduit.registerCommand(SecuritySummaryCommand())
+    conduit.registerCommand(UpdateinfoCommand())
     conduit.registerCommand(SecurityUpdateCommand())
     def osec(opt, key, val, parser):
          # CVE is a subset of --security on RHEL, but not on Fedora
@@ -553,14 +631,12 @@ def ysp_check_func_enter(conduit):
             ret = {"skip": ndata, "list_cmd": True}
         if (args[0] in ["update", "upgrade"]):
             ret = {"skip": ndata, "list_cmd": False}
-        if (args[0] in ("list-sec", "list-security", 'list-updateinfo')):
+        if args[0] == 'updateinfo':
             return (opts, {"skip": True, "list_cmd": True})
-        if (args[0] in ("info-sec", "info-security", 'info-updateinfo')):
+        if (args[0] in UpdateinfoCommand.direct_cmds):
             return (opts, {"skip": True, "list_cmd": True})
 
     if ret:
-        if ndata:
-            conduit.info(2, 'Skipping security plugin, no data')
         return (opts, ret)
     
     if not ndata:


More information about the Yum-commits mailing list