[Rpm-metadata] createrepo/__init__.py createrepo/utils.py createrepo/yumbased.py genpkgmetadata.py Makefile worker.py

skvidal at osuosl.org skvidal at osuosl.org
Thu Sep 9 21:09:40 UTC 2010


 Makefile               |    3 
 createrepo/__init__.py |  232 +++++++++++++++++++++++++++++--------------------
 createrepo/utils.py    |    9 +
 createrepo/yumbased.py |    9 +
 genpkgmetadata.py      |    5 +
 worker.py              |   76 ++++++++++++++++
 6 files changed, 239 insertions(+), 95 deletions(-)

New commits:
commit b0b85d5c6cdb9b7b97147ce31bda8b4711d3a67c
Author: Seth Vidal <skvidal at fedoraproject.org>
Date:   Thu Sep 9 17:07:06 2010 -0400

    create a worker script for createrepo so createrepo can
    fork off N processes to handle the md gathering from pkgs.
    This should speed up results on systems which have been cpubound
    on the createrepo process.
    
    If you're io bound it won't help you at all, and MAY make it worse.
    
    many misc issues to iron out here - not the least of which is the
    callback output and gathering stdout/stderr from the workers

diff --git a/Makefile b/Makefile
index 6b907d8..60bb9db 100644
--- a/Makefile
+++ b/Makefile
@@ -47,7 +47,8 @@ RM              = rm -f
 
 MODULES = $(srcdir)/genpkgmetadata.py \
 	$(srcdir)/modifyrepo.py \
-	$(srcdir)/mergerepo.py	
+	$(srcdir)/mergerepo.py	\
+	$(srcdir)/worker.py
 
 .SUFFIXES: .py .pyc
 .py.pyc: 
diff --git a/createrepo/__init__.py b/createrepo/__init__.py
index e06da99..b2937c9 100644
--- a/createrepo/__init__.py
+++ b/createrepo/__init__.py
@@ -26,7 +26,7 @@ from urlgrabber import grabber
 import tempfile
 import stat
 import fcntl
-
+import subprocess
 
 from yum import misc, Errors, to_unicode
 from yum.sqlutils import executeSQL
@@ -47,7 +47,7 @@ except ImportError:
     pass
 
 from utils import _gzipOpen, bzipFile, checkAndMakeDir, GzipFile, \
-                  checksum_and_rename
+                  checksum_and_rename, split_list_into_equal_chunks
 import deltarpms
 
 __version__ = '0.9.8'
@@ -106,7 +106,10 @@ class MetaDataConfig(object):
         self.read_pkgs_list = None # filepath/name to write out list of pkgs
                                    # read in this run of createrepo
         self.collapse_glibc_requires = True
-
+        self.workers = 1 # number of workers to fork off to grab metadata from the pkgs
+        #self.worker_cmd = '/usr/share/createrepo_worker.py'
+        self.worker_cmd = './worker.py' # fixme - need a nice way to make this be the right place :(
+        
 class SimpleMDCallBack(object):
     def errorlog(self, thing):
         print >> sys.stderr, thing
@@ -477,17 +480,20 @@ class MetaDataGenerator:
         else:
             rpmfile = '%s/%s' % (pkgpath, rpmfile)
 
+        external_data = { '_cachedir': self.conf.cachedir,
+                          '_baseurl': baseurl,
+                          '_reldir': reldir,
+                          '_packagenumber': self.current_pkg,
+                          '_collapse_libc_requires':self.conf.collapse_glibc_requires,
+                          }
+                        
         try:
             po = yumbased.CreateRepoPackage(self.ts, rpmfile,
-                                            sumtype=self.conf.sumtype)
+                                            sumtype=self.conf.sumtype,
+                                            external_data = external_data)
         except Errors.MiscError, e:
             raise MDError, "Unable to open package: %s" % e
-        # external info we need
-        po._cachedir = self.conf.cachedir
-        po._baseurl = baseurl
-        po._reldir = reldir
-        po._packagenumber = self.current_pkg
-        po._collapse_libc_requires = self.conf.collapse_glibc_requires
+
         for r in po.requires_print:
             if r.startswith('rpmlib('):
                 self.rpmlib_reqs[r] = 1
@@ -508,101 +514,143 @@ class MetaDataGenerator:
         else:
             directory = pkgpath
 
-        for pkg in pkglist:
-            self.current_pkg += 1
-            recycled = False
+        # for worker/forked model
+        # iterate the pkglist - see which ones are handled by --update and let them
+        # go on their merry way
+        
+        newpkgs = []
+        if self.conf.update:
+            # if we're in --update mode then only act on the new/changed pkgs
+            for pkg in pkglist:
+                self.current_pkg += 1
 
-            # look to see if we can get the data from the old repodata
-            # if so write this one out that way
-            if self.conf.update:
                 #see if we can pull the nodes from the old repo
                 #print self.oldData.basenodes.keys()
                 old_pkg = pkg
                 if pkg.find("://") != -1:
                     old_pkg = os.path.basename(pkg)
                 nodes = self.oldData.getNodes(old_pkg)
-                if nodes is not None:
-                    recycled = True
-
-                # FIXME also open up the delta file
-
-            # otherwise do it individually
-            if not recycled:
-                #scan rpm files
-                if not pkgpath:
-                    reldir = os.path.join(self.conf.basedir, directory)
-                else:
-                    reldir = pkgpath
-
-                if not isinstance(pkg, YumAvailablePackage):
-
-                    try:
-                        po = self.read_in_package(pkg, pkgpath=pkgpath,
-                                                  reldir=reldir)
-                    except MDError, e:
-                        # need to say something here
-                        self.callback.errorlog("\nError %s: %s\n" % (pkg, e))
-                        continue
-                    # we can use deltas:
-                    if self.conf.deltas:
-                        self._do_delta_rpm_package(po)
-                    self.read_pkgs.append(pkg)
+                if nodes is not None: # we have a match in the old metadata
+                    if self.conf.verbose:
+                        self.callback.log(_("Using data from old metadata for %s")
+                                            % pkg)
+                    (primarynode, filenode, othernode) = nodes
+
+                    for node, outfile in ((primarynode, self.primaryfile),
+                                          (filenode, self.flfile),
+                                          (othernode, self.otherfile)):
+                        if node is None:
+                            break
+
+                        if self.conf.baseurl:
+                            anode = node.children
+                            while anode is not None:
+                                if anode.type != "element":
+                                    anode = anode.next
+                                    continue
+                                if anode.name == "location":
+                                    anode.setProp('xml:base', self.conf.baseurl)
+                                anode = anode.next
 
+                        output = node.serialize('UTF-8', self.conf.pretty)
+                        if output:
+                            outfile.write(output)
+                        else:
+                            if self.conf.verbose:
+                                self.callback.log(_("empty serialize on write to" \
+                                                    "%s in %s") % (outfile, pkg))
+                        outfile.write('\n')
+
+                    self.oldData.freeNodes(pkg)
+                    #FIXME - if we're in update and we have deltas enabled
+                    # check the presto data for this pkg and write its info back out
+                    # to our deltafile
+                    continue
                 else:
-                    po = pkg
-                    if isinstance(pkg, YumLocalPackage):
-                        self.read_pkgs.append(po.localpath)
+                    newpkgs.append(pkg)
+        else:
+            newpkgs = pkglist
 
-                if self.conf.database_only:
-                    pass # disabled right now for sanity reasons (mine)
-                    #po.do_sqlite_dump(self.md_sqlite)
-                else:
-                    self.primaryfile.write(po.xml_dump_primary_metadata())
-                    self.flfile.write(po.xml_dump_filelists_metadata())
-                    self.otherfile.write(po.xml_dump_other_metadata(
-                              clog_limit=self.conf.changelog_limit))
+        # setup our reldir
+        if not pkgpath:
+            reldir = os.path.join(self.conf.basedir, directory)
+        else:
+            reldir = pkgpath
+
+        # filter out those pkgs which are not files - but are pkgobjects
+        pkgfiles = []
+        for pkg in newpkgs:
+            if isinstance(pkg, YumAvailablePackage):
+                po = pkg
+                self.read_pkgs.append(po.localpath)
+                self.primaryfile.write(po.xml_dump_primary_metadata())
+                self.flfile.write(po.xml_dump_filelists_metadata())
+                self.otherfile.write(po.xml_dump_other_metadata(
+                                     clog_limit=self.conf.changelog_limit))
+                continue
+            
+                
+            pkgfiles.append(pkg)
+            
+       
+        # divide that list by the number of workers and fork off that many
+       
+        # workers to tmpdirs
+        # waitfor the workers to finish and as each one comes in
+        # open the files they created and write them out to our metadata
+        # add up the total pkg counts and return that value
+        worker_tmp_path = tempfile.mkdtemp()
+        worker_chunks = utils.split_list_into_equal_chunks(pkgfiles,  self.conf.workers)
+        worker_cmd_dict = {}
+        worker_jobs = []
+        base_worker_cmdline = [self.conf.worker_cmd, 
+                '--pkgoptions=_reldir=%s' % reldir,
+                '--pkgoptions=_collapse_libc_requires=%s' % self.conf.collapse_glibc_requires, 
+                '--pkgoptions=_cachedir=%s' % self.conf.cachedir,
+                '--pkgoptions=_baseurl=%s' % self.conf.baseurl,]
+                               
+                               
+        for worker_num in range(self.conf.workers):
+            # make the worker directory
+            workercmdline = []
+            workercmdline.extend(base_worker_cmdline)
+            thisdir = worker_tmp_path + '/' + str(worker_num)
+            if checkAndMakeDir(thisdir):
+                workercmdline.append('--tmpmdpath=%s' % thisdir)
             else:
-                if self.conf.verbose:
-                    self.callback.log(_("Using data from old metadata for %s")
-                                     % pkg)
-                (primarynode, filenode, othernode) = nodes
-
-                for node, outfile in ((primarynode, self.primaryfile),
-                                      (filenode, self.flfile),
-                                      (othernode, self.otherfile)):
-                    if node is None:
-                        break
-
-                    if self.conf.baseurl:
-                        anode = node.children
-                        while anode is not None:
-                            if anode.type != "element":
-                                anode = anode.next
-                                continue
-                            if anode.name == "location":
-                                anode.setProp('xml:base', self.conf.baseurl)
-                            anode = anode.next
-
-                    output = node.serialize('UTF-8', self.conf.pretty)
-                    if output:
-                        outfile.write(output)
-                    else:
-                        if self.conf.verbose:
-                            self.callback.log(_("empty serialize on write to" \
-                                                "%s in %s") % (outfile, pkg))
-                    outfile.write('\n')
+                raise MDError, "Unable to create worker path: %s" % thisdir
+            workercmdline.extend(worker_chunks[worker_num])
+            worker_cmd_dict[worker_num] = workercmdline
+        
+            
 
-                self.oldData.freeNodes(pkg)
-                #FIXME - if we're in update and we have deltas enabled
-                # check the presto data for this pkg and write its info back out
-                # to our deltafile
+        for (num, cmdline) in worker_cmd_dict.items():
+            self.callback.log("Spawning worker %s with %s pkgs" % (num, len(worker_chunks[num])))
+            job = subprocess.Popen(cmdline) # fixme - add stdout/stderr PIPES here,
+            worker_jobs.append(job)
+        
+        for job in worker_jobs:
+            # fixme - need 'communicate' to see about getting info back
+            os.waitpid(job.pid, 0)
+        
+        self.callback.log("Workers Finished")
+        # finished with workers
+        # go to their dirs and add the contents
+        self.callback.log("Gathering worker results")
+        for num in range(self.conf.workers):
+            for (fn, fo) in (('primary.xml', self.primaryfile), 
+                       ('filelists.xml', self.flfile),
+                       ('other.xml', self.otherfile)):
+                fnpath = worker_tmp_path + '/' + str(num) + '/' + fn
+                if os.path.exists(fnpath):
+                    fo.write(open(fnpath, 'r').read())
 
-            if not self.conf.quiet:
-                if self.conf.verbose:
-                    self.callback.log('%d/%d - %s' % (self.current_pkg,
-                                                      self.pkgcount, pkg))
-                else:
-                    self.callback.progress(pkg, self.current_pkg, self.pkgcount)
+                
+        for pkgfile in pkgfiles:
+            if self.conf.deltas:
+                po = self.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir)
+                self._do_delta_rpm_package(po)
+            self.read_pkgs.append(pkgfile)
 
         return self.current_pkg
 
diff --git a/createrepo/utils.py b/createrepo/utils.py
index 3fa077f..995c3b9 100644
--- a/createrepo/utils.py
+++ b/createrepo/utils.py
@@ -123,6 +123,15 @@ def encodefiletypelist(filetypelist):
         result += ftl[x]
     return result
 
+def split_list_into_equal_chunks(seq, num_chunks):
+    avg = len(seq) / float(num_chunks)
+    out = []
+    last = 0.0
+    while last < len(seq):
+        out.append(seq[int(last):int(last + avg)])
+        last += avg
+
+    return out
 
 
 class MDError(Exception):
diff --git a/createrepo/yumbased.py b/createrepo/yumbased.py
index acb5851..ac06196 100644
--- a/createrepo/yumbased.py
+++ b/createrepo/yumbased.py
@@ -26,10 +26,15 @@ import utils
 import tempfile
 
 class CreateRepoPackage(YumLocalPackage):
-    def __init__(self, ts, package, sumtype=None):
+    def __init__(self, ts, package, sumtype=None, external_data={}):
         YumLocalPackage.__init__(self, ts, package)
         if sumtype:
             self.checksum_type = sumtype
+        
+        if external_data:
+            for (key, val) in external_data.items():
+                setattr(self, key, val)
+                
 
     def _do_checksum(self):
         """return a checksum for a package:
@@ -44,7 +49,7 @@ class CreateRepoPackage(YumLocalPackage):
             return self._checksum
 
         # not using the cachedir
-        if not self._cachedir:
+        if not hasattr(self, '_cachedir') or not self._cachedir:
             self._checksum = misc.checksum(self.checksum_type, self.localpath)
             self._checksums = [(self.checksum_type, self._checksum, 1)]
             return self._checksum
diff --git a/genpkgmetadata.py b/genpkgmetadata.py
index 108c68f..8c98191 100755
--- a/genpkgmetadata.py
+++ b/genpkgmetadata.py
@@ -119,6 +119,11 @@ def parse_args(args, conf):
     parser.add_option("--max-delta-rpm-size", default=100000000,
         dest='max_delta_rpm_size', type='int',
         help="max size of an rpm that to run deltarpm against (in bytes)")
+
+    parser.add_option("--workers", default=1,
+        dest='workers', type='int',
+        help="number of workers to spawn to read rpms")
+    
     (opts, argsleft) = parser.parse_args(args)
     if len(argsleft) > 1 and not opts.split:
         errorprint(_('Error: Only one directory allowed per run.'))
diff --git a/worker.py b/worker.py
new file mode 100755
index 0000000..8c7e4c5
--- /dev/null
+++ b/worker.py
@@ -0,0 +1,76 @@
+#!/usr/bin/python -tt
+
+import sys
+import yum
+import createrepo
+import os
+import rpmUtils
+from optparse import OptionParser
+
+
+# pass in dir to make tempdirs in
+# make tempdir for this worker
+# create 3 files in that tempdir
+# return how many pkgs
+# return on stderr where things went to hell
+
+#TODO - take most of read_in_package from createrepo and duplicate it here
+# so we can do downloads, etc.
+# then replace callers of read_in_package with forked callers of this
+# and reassemble at the end
+
+def main(args):
+    parser = OptionParser()
+    parser.add_option('--tmpmdpath', default=None, 
+                help="path where the outputs should be dumped for this worker")
+    parser.add_option("--pkgoptions", default=[], action='append',
+                help="pkgoptions in the format of key=value")
+
+    
+    opts, pkgs = parser.parse_args(args)
+    external_data = {'_packagenumber': 0}
+    if not opts.tmpmdpath:
+        print >> sys.stderr, "tmpmdpath required for destination files"
+        sys.exit(1)
+    
+    for strs in opts.pkgoptions:
+        k,v = strs.split('=')
+        if v in ['True', 'true', 'yes', '1', 1]:
+            v = True
+        elif v in ['False', 'false', 'no', '0', 0]:
+            v = False
+        elif v in ['None', 'none', '']:
+            v = None
+        external_data[k] = v
+
+    
+    reldir = external_data['_reldir']
+    ts = rpmUtils.transaction.initReadOnlyTransaction()
+    pri = open(opts.tmpmdpath + '/primary.xml' , 'w')
+    fl = open(opts.tmpmdpath  + '/filelists.xml' , 'w')
+    other = open(opts.tmpmdpath  + '/other.xml' , 'w')
+    
+    for pkgfile in pkgs:
+        pkgpath = reldir + '/' + pkgfile
+        if not os.path.exists(pkgpath):
+            continue
+
+        try:
+            pkg = createrepo.yumbased.CreateRepoPackage(ts, package=pkgpath, 
+                                                        external_data=external_data)
+            pri.write(pkg.xml_dump_primary_metadata())
+            fl.write(pkg.xml_dump_filelists_metadata())
+            other.write(pkg.xml_dump_other_metadata())
+        except yum.Errors.YumBaseError, e:
+            print >> sys.stderr, "Error: %s" % e
+            continue
+        else:
+            external_data['_packagenumber']+=1
+        
+    pri.close()
+    fl.close()
+    other.close()
+    print external_data['_packagenumber']
+    
+if __name__ == "__main__":
+    main(sys.argv[1:])


More information about the Rpm-metadata mailing list