[yum-cvs] yum/yum rpmsack.py,NONE,1.1
Seth Vidal
skvidal at linux.duke.edu
Mon Jun 26 01:31:36 UTC 2006
Update of /home/groups/yum/cvs/yum/yum
In directory login1.linux.duke.edu:/tmp/cvs-serv1258
Added Files:
rpmsack.py
Log Message:
check in tambet's rpmsack.py
modify mi2list to return YumInstalledPackage objects instead of tuples
rename main class to RPMDBPackageSack to hopefully decrease confusion
--- NEW FILE rpmsack.py ---
#!/usr/bin/python -tt
import rpm
from rpmUtils import miscutils
import misc
from packages import YumInstalledPackage
class RPMDBPackageSack:
def __init__(self, rootdir='/'):
self.excludes = {}
self.ts = rpm.TransactionSet(rootdir)
self.ts.setVSFlags((rpm._RPMVSF_NOSIGNATURES | rpm._RPMVSF_NODIGESTS))
self.dep_table = { 'requires' : (rpm.RPMTAG_REQUIRENAME,
rpm.RPMTAG_REQUIREVERSION,
rpm.RPMTAG_REQUIREFLAGS),
'provides' : (rpm.RPMTAG_PROVIDENAME,
rpm.RPMTAG_PROVIDEVERSION,
rpm.RPMTAG_PROVIDEFLAGS),
'conflicts' : (rpm.RPMTAG_CONFLICTNAME,
rpm.RPMTAG_CONFLICTVERSION,
rpm.RPMTAG_CONFLICTFLAGS),
'obsoletes' : (rpm.RPMTAG_OBSOLETENAME,
rpm.RPMTAG_OBSOLETEVERSION,
rpm.RPMTAG_OBSOLETEFLAGS)
}
def buildIndexes(self):
# We don't need these
return
def _checkIndexes(self, failure='error'):
return
def delPackage(self, obj):
self.excludes[obj.pkgId] = 1
def delPackageById(self, pkgId):
self.excludes[pkgId] = 1
def getChangelog(self, pkgId):
mi = self.ts.dbMatch(rpm.RPMTAG_SHA1HEADER, pkgId)
hdr = mi.next()
times = hdr[rpm.RPMTAG_CHANGELOGTIME]
names = hdr[rpm.RPMTAG_CHANGELOGNAME]
texts = hdr[rpm.RPMTAG_CHANGELOGTEXT]
result = []
for i in range(0, len(times)):
result.append((times[i], names[i], texts[i]))
return result
def getPrco(self, pkgId, prcotype=None):
mi = self.ts.dbMatch(rpm.RPMTAG_SHA1HEADER, pkgId)
hdr = mi.next()
if prcotype:
types = (prcotype,)
else:
types = self.dep_table.keys()
result = {}
for t in types:
result[t] = self._getDependencies(hdr, self.dep_table[t])
return lst
def getFiles(self, pkgId):
mi = self.ts.dbMatch(rpm.RPMTAG_SHA1HEADER, pkgId)
hdr = mi.next()
dirs = hdr[rpm.RPMTAG_DIRNAMES]
filenames = hdr[rpm.RPMTAG_BASENAMES]
for i in range(0, len(dirs)):
print "%s%s" % (dirs[i], filenames[i])
def searchAll(self, name, query_type='like'):
result = {}
# check provides
table = self.dep_table['provides']
mi = self.ts.dbMatch()
mi.pattern(table[0], rpm.RPMMIRE_GLOB, name)
for hdr in mi:
pkg = hdr2class(hdr)
if not result.has_key(pkg.pkgId):
result[pkg.pkgId] = pkg
# FIXME
# check filelists/dirlists
return result.values()
def returnObsoletes(self):
obsoletes = {}
tags = self.dep_table['obsoletes']
mi = self.ts.dbMatch()
for hdr in mi:
if not len(hdr[rpm.RPMTAG_OBSOLETENAMES]):
continue
key = (hdr[rpm.RPMTAG_NAME],
hdr[rpm.RPMTAG_ARCH],
hdr[rpm.RPMTAG_EPOCH],
hdr[rpm.RPMTAG_VERSION],
hdr[rpm.RPMTAG_RELEASE])
obsoletes[key] = self._getDependencies(hdr, tags)
return obsoletes
def getPackageDetails(self, pkgId):
mi = self.ts.dbMatch(rpm.RPMTAG_SHA1HEADER, pkgId)
return self.hdr2class(mi.next())
def searchPrco(self, name, prcotype):
result = []
table = self.dep_table[prcotype]
mi = self.ts.dbMatch()
mi.pattern(table[0], rpm.RPMMIRE_STRCMP, name)
for hdr in mi:
pkg = self.hdr2class(hdr, True)
names = hdr[table[0]]
vers = hdr[table[1]]
flags = hdr[table[2]]
for i in range(0, len(names)):
n = names[i]
if n != name:
continue
(e, v, r) = miscutils.stringToVersion(vers[i])
pkg.prco = {prcotype: [{'name' : name,
'flags' : self._parseFlags (flags[i]),
'epoch' : e,
'ver' : v,
'rel' : r}
]
}
result.append(pkg)
# If it's not a porvides or filename, we are done
if prcotype != 'provides' or name[0] != '/':
return result
# FIXME: Search package files
return result
def searchProvides(self, name):
return self.searchPrco(name, 'provides')
def searchRequires(self, name):
return self.searchPrco(name, 'requires')
def seatchObsoletes(self, name):
return self.searchPrco(name, 'obsoletes')
def searchConflicts(self, name):
return self.searchPrco(name, 'conflicts')
def simplePkgList(self, repoid=None):
return self.returnPackages()
def returnNewestByNameArch(self, naTup=None):
if not naTup:
return
allpkg = []
mi = self.ts.dbMatch(rpm.RPMTAG_NAME, naTup[0])
arch = naTup[1]
for hdr in mi:
if hdr[rpm.RPMTAG_ARCH] == arch:
allpkg.append(self.hdr2tuple (hdr))
if not allpkg:
# FIXME: raise ...
print 'No Package Matching %s' % name
return misc.newestInList(allpkg)
def returnNewestByName(self, name=None):
if not name:
return
allpkg = self.mi2list(self.ts.dbMatch(rpm.RPMTAG_NAME, name))
if not allpkg:
# FIXME: raise ...
print 'No Package Matching %s' % name
return misc.newestInList(allpkg)
def returnPackages(self, repoid=None):
return self.mi2list(self.ts.dbMatch())
def searchNevra(self, name=None, epoch=None, ver=None, rel=None, arch=None):
mi = self.ts.dbMatch()
if name:
mi.pattern(rpm.RPMTAG_NAME, rpm.RPMMIRE_STRCMP, name)
if epoch:
mi.pattern(rpm.RPMTAG_EPOCH, rpm.RPMMIRE_STRCMP, epoch)
if ver:
mi.pattern(rpm.RPMTAG_VERSION, rpm.RPMMIRE_STRCMP, ver)
if rel:
mi.pattern(rpm.RPMTAG_RELEASE, rpm.RPMMIRE_STRCMP, rel)
if arch:
mi.pattern(rpm.RPMTAG_ARCH, rpm.RPMMIRE_STRCMP, arch)
return self.mi2list(mi)
def excludeArchs(self, archlist):
for arch in archlist:
mi = self.ts.dbMatch()
mi.pattern(rpm.RPMTAG_ARCH, rpm.RPMMIRE_STRCMP, arch)
for hdr in mi:
self.delPackageById(hdr[rpm.RPMTAG_SHA1HEADER])
# Helper functions
def _parseFlags(self, flags):
flagstr = ''
if flags & rpm.RPMSENSE_LESS:
flagstr += '<'
if flags & rpm.RPMSENSE_GREATER:
flagstr += '>'
if flags & rpm.RPMSENSE_EQUAL:
flagstr += '='
return flagstr
def _getDependencies(self, hdr, tags):
# tags is a tuple containing 3 rpm tags:
# first one to get dep names, the 2nd to get dep versions,
# and the 3rd to get dep flags
deps = []
names = hdr[tags[0]]
vers = hdr[tags[1]]
flags = hdr[tags[2]]
for i in range(0, len(names)):
deps.append(names[i],
self._parseFlags(flags[i]),
miscutils.stringToVersion(vers[i]))
return deps
def hdr2class(self, hdr, nevra_only=False):
class tmpObject:
pass
y = tmpObject()
y.nevra = (hdr[rpm.RPMTAG_NAME],
hdr[rpm.RPMTAG_EPOCH],
hdr[rpm.RPMTAG_VERSION],
hdr[rpm.RPMTAG_RELEASE],
hdr[rpm.RPMTAG_ARCH])
y.sack = self
y.pkgId = hdr[rpm.RPMTAG_SHA1HEADER]
if nevra_only:
return y
y.hdrange = {'start' : hdr[rpm.RPMTAG_],
'end' : hdr[rpm.RPMTAG_]}
y.location = {'href' : hdr[rpm.RPMTAG_],
'value' : '',
'base' : hdr[rpm.RPMTAG_]}
y.checksum = {'pkgid' : 'YES',
'type' : hdr[rpm.RPMTAG_],
'value' : hdr[rpm.RPMTAG_]}
y.time = {'build' : hdr[rpm.RPMTAG_BUILDTIME],
'file' : hdr[rpm.RPMTAG_]}
y.size = {'package' : hdr[rpm.RPMTAG_SIZE],
'archive' : hdr[rpm.RPMTAG_ARCHIVESIZE],
'installed' : hdr[rpm.RPMTAG_]}
y.info = {'summary' : hdr[rpm.RPMTAG_SUMMARY],
'description' : hdr[rpm.RPMTAG_DESCRIPTION],
'packager' : hdr[rpm.RPMTAG_PACKAGER],
'group' : hdr[rpm.RPMTAG_GROUP],
'buildhost' : hdr[rpm.RPMTAG_BUILDHOST],
'sourcerpm' : hdr[rpm.RPMTAG_SOURCERPM],
'url' : hdr[rpm.RPMTAG_URL],
'vendor' : hdr[rpm.RPMTAG_VENDOR],
'license' : hdr[rpm.RPMTAG_LICENSE]}
return y
def mi2list(self, mi):
returnList = []
for hdr in mi:
returnList.append(YumInstalledPackage(hdr))
return returnList
def hdr2tuple(self, hdr):
return (hdr[rpm.RPMTAG_SHA1HEADER],
hdr[rpm.RPMTAG_NAME],
hdr[rpm.RPMTAG_EPOCH],
hdr[rpm.RPMTAG_VERSION],
hdr[rpm.RPMTAG_RELEASE],
hdr[rpm.RPMTAG_ARCH])
def main():
sack = RPMDBPackageSack()
#pkgs = sack.returnPackages()
pkgs = sack.searchNevra(name="kernel-default", rel='6')
#pkgs = sack.returnNewestByNameArch(("kernel-default", "i586"))
#pkgs = sack.returnNewestByName(("yum"))
## ret = sack.searchProvides("zmd")
## print ret
## ret = sack.searchRequires("zmd")
## print ret
for p in pkgs:
print p
#sack.getFiles(p[0])
#print sack.getChangelog(p[0])
if __name__ == '__main__':
main()
More information about the Yum-cvs-commits
mailing list