[yum-cvs] yum-utils repo-rss.py, NONE, 1.1 repoclosure.py, NONE, 1.1 repomanage.py, NONE, 1.1
Seth Vidal
skvidal at login.linux.duke.edu
Mon Mar 21 08:42:31 UTC 2005
Update of /home/groups/yum/cvs/yum-utils
In directory login:/tmp/cvs-serv16922
Added Files:
repo-rss.py repoclosure.py repomanage.py
Log Message:
initial import of repo* scripts
--- NEW FILE repo-rss.py ---
#!/usr/bin/python
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# seth vidal 2005 (c) etc etc
import yum
import yum.Errors
import sys
import os
import libxml2
import time
from optparse import OptionParser
class YumQuiet(yum.YumBase):
def __init__(self):
yum.YumBase.__init__(self)
def log(self, value, msg):
pass
def getRecent(self, days=1):
"""return most recent packages from sack"""
recent = []
now = time.time()
recentlimit = now-(days*86400)
ftimehash = {}
if self.conf.showdupesfromrepos:
avail = self.pkgSack.returnPackages()
else:
avail = self.pkgSack.returnNewestByNameArch()
for po in avail:
ftime = int(po.returnSimple('filetime'))
if ftime > recentlimit:
if not ftimehash.has_key(ftime):
ftimehash[ftime] = [po]
else:
ftimehash[ftime].append(po)
for sometime in ftimehash.keys():
for po in ftimehash[sometime]:
recent.append(po)
return recent
class RepoRSS:
def __init__(self, fn='repo-rss.xml'):
self.description = 'Repository RSS'
self.link = 'http://linux.duke.edu/projects/yum'
self.title = 'Recent Packages'
self.doFile(fn)
self.doDoc()
def doFile(self, fn):
if fn[0] != '/':
cwd = os.getcwd()
self.fn = os.path.join(cwd, fn)
else:
self.fn = fn
try:
self.fo = open(self.fn, 'w')
except IOError, e:
print >> sys.stderr, "Error opening file %s: %s" % (self.fn, e)
sys.exit(1)
def doDoc(self):
"""sets up our doc and rssnode attribute initially, rssnode will be
redfined as we move along"""
self.doc = libxml2.newDoc('1.0')
self.xmlescape = self.doc.encodeEntitiesReentrant
rss = self.doc.newChild(None, 'rss', None)
rss.setProp('version', '2.0')
self.rssnode = rss.newChild(None, 'channel', None)
def startRSS(self):
"""return string representation of rss preamble"""
rfc822_format = "%a, %d %b %Y %X GMT"
now = time.strftime(rfc822_format, time.gmtime())
rssheader = """<?xml version="1.0" encoding="utf-8"?>
<rss version="2.0">
<channel>
<title>%s</title>
<link>%s</link>
<description>%s</description>
<pubDate>%s</pubDate>
<generator>Yum</generator>
""" % (self.title, self.link, self.description, now)
self.fo.write(rssheader)
def doPkg(self, pkg, url):
item = self.rsspkg(pkg, url)
self.fo.write(item.serialize("utf-8", 1))
item.unlinkNode()
item.freeNode()
del item
def rsspkg(self, pkg, url):
"""takes a pkg object and repourl for the pkg object"""
rfc822_format = "%a, %d %b %Y %X GMT"
clog_format = "%a, %d %b %Y GMT"
xhtml_ns = "http://www.w3.org/1999/xhtml"
escape = self.xmlescape
item = self.rssnode.newChild(None, 'item', None)
title = escape(str(pkg))
item.newChild(None, 'title', title)
date = time.gmtime(float(pkg.returnSimple('buildtime')))
item.newChild(None, 'pubDate', time.strftime(rfc822_format, date))
item.newChild(None, 'guid', pkg.returnSimple('id'))
link = url + '/' + pkg.returnSimple('relativepath')
item.newChild(None, 'link', escape(link))
# build up changelog
changelog = ''
cnt = 0
for e in pkg.changelog:
cnt += 1
if cnt > 3:
changelog += '...'
break
(date, author, desc) = e
date = time.strftime(clog_format, time.gmtime(float(date)))
changelog += '%s - %s\n%s\n\n' % (date, author, desc)
body = item.newChild(None, "body", None)
body.newNs(xhtml_ns, None)
body.newChild(None, "p", escape(pkg.returnSimple('summary')))
body.newChild(None, "pre", escape(pkg.returnSimple('description')))
body.newChild(None, "p", 'Change Log:')
body.newChild(None, "pre", escape(changelog))
description = '<pre>%s - %s\n\n' % (escape(pkg.name),
escape(pkg.returnSimple('summary')))
description += '%s\n\nChange Log:\n\n</pre>' % escape(pkg.returnSimple('description'))
description += escape('<pre>%s</pre>' % escape(changelog))
item.newChild(None, 'description', description)
return item
def closeRSS(self):
"""end the rss output"""
end="\n </channel>\n</rss>\n"
self.fo.write(end)
self.fo.close()
del self.fo
self.doc.freeDoc()
del self.doc
def main(options, args):
days = options.days
repoids = args
my = YumQuiet()
my.doConfigSetup()
if os.geteuid() != 0:
my.conf.setConfigOption('cache', 1)
print 'Not running as root, might not be able to import all of cache'
if len(repoids) > 0:
for repo in my.repos.repos.values():
if repo.id not in repoids:
repo.disable()
else:
repo.enable()
try:
my.doRepoSetup()
except yum.Errors.RepoError, e:
print >> sys.stderr, '%s' % e
print 'Cannot continue'
sys.exit(1)
print 'Reading in repository metadata - please wait....'
my.doSackSetup()
for repo in my.repos.listEnabled():
try:
my.repos.populateSack(which=[repo.id], with='otherdata')
except yum.Errors.RepoError, e:
print >> sys.stderr, 'otherdata not available for repo: %s' % repo
print >> sys.stderr, 'run as root to get changelog data'
sys.exit(1)
recent = my.getRecent(days=days)
rssobj = RepoRSS(fn=options.filename)
rssobj.title = options.title
rssobj.link = options.link
rssobj.description = options.description
rssobj.startRSS()
# take recent updates only and dump to an rss compat output
if len(recent) > 0:
for pkg in recent:
repo = my.repos.getRepo(pkg.repoid)
url = repo.urls[0]
rssobj.doPkg(pkg, url)
rssobj.closeRSS()
if __name__ == "__main__":
usage = "usage: repo-rss.py [options] repoid1 repoid2"
parser = OptionParser(usage=usage)
parser.add_option("-f", action="store", type="string", dest="filename",
default='repo-rss.xml', help="filename to write rss to: %default")
parser.add_option("-l", action="store", type='string', dest='link',
default='http://linux.duke.edu/projects/yum/',
help="url for rss feed link: %default")
parser.add_option("-t", action='store', type='string', dest='title',
default="RSS Repository - Recent Packages",
help='Title for Rss feed: %default')
parser.add_option("-d", action='store', type='string', dest='description',
default="Most recent packages in Repositories",
help='description of feed: %default')
parser.add_option('-r', action='store', type='int', dest='days', default=3,
help='most recent (in days): %default')
(options, args) = parser.parse_args()
main(options, args)
--- NEW FILE repoclosure.py ---
#!/usr/bin/python
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# seth vidal 2005 (c) etc etc
#Read in the metadata of a series of repositories and check all the
# dependencies in all packages for resolution. Print out the list of
# packages with unresolved dependencies
import yum
import yum.Errors
import sys
import os
flagsdict = {'GT':'>', 'LT':'<', 'EQ': '=', 'GE':'>=', 'LE':'<='}
def evrTupletoVer(tuple):
"""convert and evr tuple to a version string, return None if nothing
to convert"""
e, v,r = tuple
if v is None:
return None
val = ''
if e is not None:
val = '%s:%s' % (e, v)
if r is not None:
val = '%s-%s' % (val, r)
return val
class YumQuiet(yum.YumBase):
def __init__(self):
yum.YumBase.__init__(self)
def log(self, value, msg):
pass
def main(repoids=None):
my = YumQuiet()
my.doConfigSetup()
if hasattr(my.repos, 'sqlite'):
my.repos.sqlite = False
my.repos._selectSackType()
if os.geteuid() != 0:
my.conf.setConfigOption('cache', 1)
print 'Not running as root, might not be able to import all of cache'
if repoids is not None:
for repo in my.repos.repos.values():
if repo.id not in repoids:
repo.disable()
else:
repo.enable()
my.doRepoSetup()
print 'Reading in repository metadata - please wait....'
my.doSackSetup()
for repo in my.repos.listEnabled():
try:
my.repos.populateSack(which=[repo.id], with='filelists')
except yum.Errors.RepoError, e:
print 'Filelists not available for repo: %s' % repo
print 'Some dependencies may not be complete for this repository'
print 'Run as root to get all dependencies'
unresolved = {}
print 'Checking Dependencies'
for pkg in my.pkgSack:
for (req, flags, (reqe, reqv, reqr)) in pkg.returnPrco('requires'):
if req.startswith('rpmlib'): continue # ignore rpmlib deps
ver = evrTupletoVer((reqe, reqv, reqr))
try:
resolve_sack = my.whatProvides(req, flags, ver)
except yum.Errors.RepoError, e:
pass
if len(resolve_sack) < 1:
if not unresolved.has_key(pkg):
unresolved[pkg] = []
unresolved[pkg].append((req, flags, ver))
num = len(my.pkgSack)
repos = my.repos.listEnabled()
print 'Repos looked at: %s' % len(repos)
for repo in repos:
print ' %s' % repo
print 'Num Packages in Repos: %s' % num
pkgs = unresolved.keys()
pkgs.sort()
for pkg in pkgs:
print 'package: %s from %s\n unresolved deps: ' % (pkg, pkg.repoid)
for (n, f, v) in unresolved[pkg]:
req = '%s' % n
if f:
flag = flagsdict[f]
req = '%s %s'% (req, flag)
if v:
req = '%s %s' % (req, v)
print ' %s' % req
if __name__ == "__main__":
if len(sys.argv) < 2:
repos = None
else:
repos = sys.argv[1:]
main(repos)
--- NEW FILE repomanage.py ---
#!/usr/bin/python
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Library General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# (c) Copyright Seth Vidal 2004
# need hdropen, dir traversing, version comparison, and getopt (eventually)
# this should take a dir, traverse it - build a dict of foo[(name, arch)] = [/path/to/file/that/is/highest, /path/to/equalfile]
import os
import sys
import rpm
import fnmatch
import types
import string
import getopt
from exceptions import Exception
class Error(Exception):
def __init__(self, args=None):
Exception.__init__(self)
self.args = args
def errorprint(stuff):
print >> sys.stderr, stuff
def rpmOutToStr(arg):
if type(arg) != types.StringType:
# and arg is not None:
arg = str(arg)
return arg
def compareEVR((e1, v1, r1), (e2, v2, r2)):
# return 1: a is newer than b
# 0: a and b are the same version
# -1: b is newer than a
e1 = rpmOutToStr(e1)
v1 = rpmOutToStr(v1)
r1 = rpmOutToStr(r1)
e2 = rpmOutToStr(e2)
v2 = rpmOutToStr(v2)
r2 = rpmOutToStr(r2)
#print '%s, %s, %s vs %s, %s, %s' % (e1, v1, r1, e2, v2, r2)
rc = rpm.labelCompare((e1, v1, r1), (e2, v2, r2))
#print '%s, %s, %s vs %s, %s, %s = %s' % (e1, v1, r1, e2, v2, r2, rc)
return rc
def returnHdr(ts, package):
"""hand back the rpm header or raise an Error if the pkg is fubar"""
try:
fdno = os.open(package, os.O_RDONLY)
except OSError, e:
raise Error, "Error opening file %s" % package
ts.setVSFlags(~(rpm.RPMVSF_NOMD5|rpm.RPMVSF_NEEDPAYLOAD))
try:
hdr = ts.hdrFromFdno(fdno)
except rpm.error, e:
raise Error, "Error opening package %s" % package
if type(hdr) != rpm.hdr:
raise Error, "Error opening package %s" % package
ts.setVSFlags(0)
os.close(fdno)
return hdr
def hdr2pkgTuple(hdr):
name = hdr['name']
arch = hdr['arch']
ver = str(hdr['version']) # convert these to strings to be sure
rel = str(hdr['release'])
epoch = hdr['epoch']
if epoch is None:
epoch = '0'
else:
epoch = str(epoch)
return (name, arch, epoch, ver, rel)
def getFileList(path, ext, filelist):
"""Return all files in path matching ext, store them in filelist, recurse dirs
return list object"""
extlen = len(ext)
try:
dir_list = os.listdir(path)
except OSError, e:
errorprint('Error accessing directory %s, %s' % (path, e))
raise Error, 'Error accessing directory %s, %s' % (path, e)
for d in dir_list:
if os.path.isdir(path + '/' + d):
filelist = getFileList(path + '/' + d, ext, filelist)
else:
if string.lower(d[-extlen:]) == '%s' % (ext):
newpath = os.path.normpath(path + '/' + d)
filelist.append(newpath)
return filelist
def trimRpms(rpms, excludeGlobs):
# print 'Pre-Trim Len: %d' % len(rpms)
badrpms = []
for file in rpms:
for glob in excludeGlobs:
if fnmatch.fnmatch(file, glob):
# print 'excluded: %s' % file
if file not in badrpms:
badrpms.append(file)
for file in badrpms:
if file in rpms:
rpms.remove(file)
# print 'Post-Trim Len: %d' % len(rpms)
return rpms
def parseargs(args):
options = {}
options['output'] = 'new'
options['passed'] = []
options['space'] = 0
try:
gopts, argsleft = getopt.getopt(args, 'onhs', ['space', 'new', 'old', 'help'])
except getopt.error, e:
errorprint(_('Options Error: %s.') % e)
usage()
sys.exit(1)
try:
for arg,a in gopts:
if arg in ['-h','--help']:
usage()
sys.exit(0)
elif arg in ['-o', '--old']:
options['output'] = 'old'
if 'new' in options['passed']:
errorprint('\nPass either --old or --new, not both!\n')
usage()
sys.exit(1)
else:
options['passed'].append('old')
elif arg in ['-n', '--new']:
options['output'] = 'new'
if 'old' in options['passed']:
errorprint('\nPass either --old or --new, not both!\n')
usage()
sys.exit(1)
else:
options['passed'].append('new')
elif arg in ['-s', '--space']:
options['space'] = 1
except ValueError, e:
errorprint(_('Options Error: %s') % e)
usage()
sys.exit(1)
if len(argsleft) > 1:
errorprint('Error: Only one directory allowed per run.')
usage()
sys.exit(1)
elif len(argsleft) == 0:
errorprint('Error: Must specify a directory to index.')
usage()
sys.exit(1)
else:
directory = argsleft[0]
return options, directory
def main(args):
options, mydir = parseargs(args)
rpmList = []
rpmList = getFileList(mydir, '.rpm', rpmList)
verfile = {}
naver = {}
if len(rpmList) == 0:
errorprint('No files to process')
sys.exit(1)
ts = rpm.TransactionSet()
for pkg in rpmList:
try:
hdr = returnHdr(ts, pkg)
except Error, e:
errorprint(e)
continue
pkgtuple = hdr2pkgTuple(hdr)
(n,a,e,v,r) = pkgtuple
del hdr
if not verfile.has_key(pkgtuple):
verfile[pkgtuple] = []
verfile[pkgtuple].append(pkg)
if not naver.has_key((n,a)):
naver[(n,a)] = (e,v,r)
continue
(e2, v2, r2) = naver[(n,a)] # the current champion
rc = compareEVR((e,v,r), (e2,v2,r2))
if rc == 0:
continue
if rc < 0:
continue
if rc > 0:
naver[(n,a)] = (e,v,r)
del ts
# now we have our dicts - we can return whatever by iterating over them
# just print newests
outputpackages = []
if options['output'] == 'new':
for (n,a) in naver.keys():
(e,v,r) = naver[(n,a)]
for pkg in verfile[(n,a,e,v,r)]:
outputpackages.append(pkg)
if options['output'] == 'old':
for (n,a,e,v,r) in verfile.keys():
if (e,v,r) != naver[(n,a,)]:
for pkg in verfile[(n,a,e,v,r)]:
outputpackages.append(pkg)
outputpackages.sort()
for pkg in outputpackages:
if options['space']:
print '%s' % pkg,
else:
print pkg
def usage():
print """
repomanage [--old] [--new] path
-o --old - print the older packages
-n --new - print the newest packages
-s --space - space separated output, not newline
-h --help - duh
By default it will output the full path to the newest packages in the path.
"""
if __name__ == "__main__":
if len(sys.argv) < 1:
usage()
sys.exit(1)
else:
main(sys.argv[1:])
More information about the Yum-cvs-commits
mailing list