[Yum-devel] [PATCH 4/6] Add the history command, so the user can see/use the history API

James Antill james at and.org
Wed Aug 26 00:15:19 UTC 2009


---
 yumcommands.py |  270 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 268 insertions(+), 2 deletions(-)

diff --git a/yumcommands.py b/yumcommands.py
index bd73d0c..260c820 100644
--- a/yumcommands.py
+++ b/yumcommands.py
@@ -29,6 +29,7 @@ import locale
 import fnmatch
 import time
 from yum.i18n import utf8_width, utf8_width_fill, to_unicode
+import pwd
 
 def checkRootUID(base):
     """
@@ -1061,7 +1062,7 @@ class DowngradeCommand(YumCommand):
 
     def needTs(self, base, basecmd, extcmds):
         return False
-        
+
 
 class VersionCommand(YumCommand):
     def getNames(self):
@@ -1126,10 +1127,275 @@ class VersionCommand(YumCommand):
         for line in cols:
             print base.fmtColumns(zip(line, columns))
 
-        return 0, []
+        return 0, ['version']
 
     def needTs(self, base, basecmd, extcmds):
         vcmd = 'installed'
         if extcmds:
             vcmd = extcmds[0]
         return vcmd in ('available', 'all')
+
+
+class HistoryCommand(YumCommand):
+    def getNames(self):
+        return ['history']
+
+    def getUsage(self):
+        return "[info|list|summary|repeat|undo|new]"
+
+    def getSummary(self):
+        return _("Display, or use, the transaction history")
+
+    def _hcmd_list(self, base, extcmds):
+        tid = None
+        try:
+            if len(extcmds) > 1:
+                tid = int(extcmds[1])
+        except ValueError:
+            pass
+
+        fmt = "%-8s | %-16s | %-38s | %-8s"
+        print fmt % ("ID", "Login user", "Start time (seconds taken)","Altered")
+        print "-" * 79
+        fmt = "%8u | %-16.16s | %-38.38s | %8u"
+        for old in base.history.old(tid):
+            name = old.loginuid
+            try:
+                usertup = pwd.getpwuid(old.loginuid)
+                name = usertup[0]
+            except KeyError:
+                pass
+            tm = time.ctime(old.beg_timestamp)
+            tm += (" (%u)" % (old.end_timestamp - old.beg_timestamp))
+            print fmt % (old.tid, name, tm, len(old.trans_data))
+
+    def _get_transaction(self, base, extcmds):
+        if len(extcmds) < 2:
+            base.logger.critical(_('No transaction ID given'))
+            return None
+
+        try:
+            tid = int(extcmds[1])
+        except ValueError:
+            base.logger.critical(_('Bad transaction ID given'))
+            return None
+
+        old = base.history.old(tid)
+        if not old:
+            base.logger.critical(_('Not found given transaction ID'))
+            return None
+        if len(old) > 1:
+            base.logger.critical(_('Found more than one transaction ID!'))
+        return old[0]
+
+    def _hcmd_info(self, base, extcmds):
+        old = self._get_transaction(base, extcmds)
+        if old is None:
+            return 1, ['Failed history info']
+
+        name = old.loginuid
+        try:
+            usertup = pwd.getpwuid(old.loginuid)
+            name = usertup[0]
+        except KeyError:
+            pass
+
+        print "Transaction ID:", old.tid
+        print "Begin time:", time.ctime(old.beg_timestamp)
+        print "Begin rpmdb:", old.beg_rpmdbversion
+        print "End time:", time.ctime(old.end_timestamp)
+        print "End rpmdb:", old.end_rpmdbversion
+        print "User:", name
+        print "Return-Code:", old.return_code
+        print "Packages Used:"
+        for hpkg in old.trans_with:
+            prefix = " " * 4
+            if not base.rpmdb.contains(po=hpkg):
+                prefix = " ** "
+            if hpkg.epoch == '0':
+                print "%s%s" % (prefix, hpkg)
+
+        print "Packages Altered:"
+        for hpkg in old.trans_data:
+            prefix = " " * 4
+            if not hpkg.done:
+                prefix = " ** "
+
+            if False: pass
+            elif hpkg.state == 'Update':
+                ln = len(hpkg.name) + 1
+                cn = (" " * ln) + str(hpkg)[ln:]
+                print "%s%-12s %s" % (prefix, hpkg.state, cn)
+            elif hpkg.state == 'Downgraded':
+                ln = len(hpkg.name) + 1
+                cn = (" " * ln) + str(hpkg)[ln:]
+                print "%s%-12s %s" % (prefix, hpkg.state, cn)
+            else:
+                print "%s%-12s %s" % (prefix, hpkg.state, hpkg)
+
+    def _hcmd_summary(self, base, extcmds):
+        fmt = "%-16s | %-38s | %-8s"
+        print fmt % ("Login user", "Time (seconds taken)", "Altered")
+        print "-" * 79
+        fmt = "%-16.16s | %-38.38s | %8u"
+        data = {'day' : {}, 'week' : {},
+                'fortnight' : {}, 'quarter' : {}, 'half' : {}, 
+                'year' : {}, 'all' : {}}
+        for old in base.history.old():
+            name = old.loginuid
+            try:
+                usertup = pwd.getpwuid(old.loginuid)
+                name = usertup[0]
+            except KeyError:
+                pass
+            period = 'all'
+            now = time.time()
+            if False: pass
+            elif old.beg_timestamp > (now - (24 * 60 * 60)):
+                period = 'day'
+            elif old.beg_timestamp > (now - (24 * 60 * 60 *  7)):
+                period = 'week'
+            elif old.beg_timestamp > (now - (24 * 60 * 60 * 14)):
+                period = 'fortnight'
+            elif old.beg_timestamp > (now - (24 * 60 * 60 *  7 * 13)):
+                period = 'quarter'
+            elif old.beg_timestamp > (now - (24 * 60 * 60 *  7 * 26)):
+                period = 'half'
+            elif old.beg_timestamp > (now - (24 * 60 * 60 * 365)):
+                period = 'year'
+            data[period].setdefault(name, []).append(old)
+        _period2user = {'day' : _("Last day"),
+                        'week' : _("Last week"),
+                        'fortnight' : _("Last 2 weeks"), # US default :p
+                        'quarter' : _("Last 3 months"),
+                        'half' : _("Last 6 months"),
+                        'year' : _("Last year"),
+                       'all' : _("Over a year ago")}
+        for period in ('day', 'week', 'fortnight', 'quarter', 'half', 'year',
+                       'all'):
+            if not data[period]:
+                continue
+            for name in sorted(data[period]):
+                tm   = sum(map(lambda x: x.end_timestamp - x.beg_timestamp,
+                               data[period][name]))
+                pkgs = sum(map(lambda x: len(x.trans_data), data[period][name]))
+                uperiod = _period2user[period]
+                uperiod += (" (%u)" % tm)
+                print fmt % (name, uperiod, pkgs)
+
+    def _hcmd_repeat(self, base, extcmds):
+        old = self._get_transaction(base, extcmds)
+        if old is None:
+            return 1, ['Failed history repeat']
+        tm = time.ctime(old.beg_timestamp)
+        print "Repeating transaction %u, from %s" % (old.tid, tm)
+        # This is basic atm.
+        done = False
+        for pkg in old.trans_data:
+            if pkg.state == 'Reinstall':
+                if base.reinstall(pkgtup=pkg.pkgtup):
+                    done = True
+        for pkg in old.trans_data:
+            if pkg.state == 'Downgrade':
+                try:
+                    if base.downgrade(pkgtup=pkg.pkgtup):
+                        done = True
+                except yum.Errors.DowngradeError:
+                    base.logger.critical(_('Failed to downgrade: %s'), pkg)
+        for pkg in old.trans_data:
+            if pkg.state == 'Update':
+                if base.update(pkgtup=pkg.pkgtup):
+                    done = True
+        for pkg in old.trans_data:
+            if pkg.state in ('Install', 'True-Install'):
+                if base.install(pkgtup=pkg.pkgtup):
+                    done = True
+        for pkg in old.trans_data:
+            if pkg.state == 'Erase':
+                if base.remove(pkgtup=pkg.pkgtup):
+                    done = True
+        if done:
+            return 2, ["Repeating transaction %u" % (old.tid,)]
+
+    def _hcmd_undo(self, base, extcmds):
+        old = self._get_transaction(base, extcmds)
+        if old is None:
+            return 1, ['Failed history undo']
+        tm = time.ctime(old.beg_timestamp)
+        print "Undoing transaction %u, from %s" % (old.tid, tm)
+        # FIXME: This is __horribley__ basic atm.
+        done = False
+        for pkg in old.trans_data:
+            if pkg.state == 'Reinstall':
+                if base.reinstall(pkgtup=pkg.pkgtup):
+                    done = True
+        for pkg in old.trans_data:
+            if pkg.state == 'Updated':
+                try:
+                    if base.downgrade(pkgtup=pkg.pkgtup):
+                        done = True
+                except yum.Errors.DowngradeError:
+                    base.logger.critical(_('Failed to downgrade: %s'), pkg)
+        for pkg in old.trans_data:
+            if pkg.state == 'Downgraded':
+                if base.update(pkgtup=pkg.pkgtup):
+                    done = True
+        for pkg in old.trans_data:
+            if pkg.state in ('Install', 'True-Install'):
+                if base.remove(pkgtup=pkg.pkgtup):
+                    done = True
+        for pkg in old.trans_data:
+            if pkg.state == 'Erase':
+                if base.install(pkgtup=pkg.pkgtup):
+                    done = True
+        if done:
+            return 2, ["Undoing transaction %u" % (old.tid,)]
+
+    def _hcmd_new(self, base, extcmds):
+        base.history._create_db_file()
+
+    def _hcmd_dump(self, base, extcmds):
+        rpmdb_ver = base.rpmdb.simpleVersion()[0]
+        base.history.beg(rpmdb_ver, base.rpmdb.returnPackages(), [])
+        base.history.end(rpmdb_ver, 0)
+
+    def doCheck(self, base, basecmd, extcmds):
+        cmds = ('list', 'info', 'summary', 'repeat', 'undo', 'dump', 'new')
+        if extcmds and extcmds[0] not in cmds:
+            base.logger.critical(_('Invalid history sub-command, use: %s.'),
+                                 ", ".join(cmds))
+            raise cli.CliError
+        if extcmds and extcmds[0] in ('repeat', 'undo', 'dump', 'new'):
+            checkRootUID(base)
+            checkGPGKey(base)
+
+    def doCommand(self, base, basecmd, extcmds):
+        vcmd = 'list'
+        if extcmds:
+            vcmd = extcmds[0]
+
+        if False: pass
+        elif vcmd == 'list':
+            ret = self._hcmd_list(base, extcmds)
+        elif vcmd == 'info':
+            ret = self._hcmd_info(base, extcmds)
+        elif vcmd == 'summary':
+            ret = self._hcmd_summary(base, extcmds)
+        elif vcmd == 'undo':
+            ret = self._hcmd_undo(base, extcmds)
+        elif vcmd == 'repeat':
+            ret = self._hcmd_repeat(base, extcmds)
+        elif vcmd == 'dump':
+            ret = self._hcmd_dump(base, extcmds)
+        elif vcmd == 'new':
+            ret = self._hcmd_new(base, extcmds)
+
+        if ret is None:
+            return 0, ['history %s' % (vcmd,)]
+        return ret
+
+    def needTs(self, base, basecmd, extcmds):
+        vcmd = 'list'
+        if extcmds:
+            vcmd = extcmds[0]
+        return vcmd in ('repeat', 'undo')
-- 
1.6.2.5



More information about the Yum-devel mailing list