[Rpm-metadata] 3 commits - createrepo/__init__.py genpkgmetadata.py

James Antill james at osuosl.org
Mon Mar 17 19:44:05 UTC 2014


 createrepo/__init__.py |  297 +++++++++++++++++++++++++++++++++++++++++++++----
 genpkgmetadata.py      |   12 +
 2 files changed, 287 insertions(+), 22 deletions(-)

New commits:
commit 5f0e23403ffc6f7e000fea14e7f43eb7c07b29cb
Merge: 2028b94 bd55772
Author: James Antill <james at and.org>
Date:   Mon Mar 17 15:41:56 2014 -0400

    Merge commit 'imcleod/parallel_deltas_full'
    
    * commit 'bd557727992838efbf5f2b8052cbdeb64dda6704': (2 commits)
      tweak work queue action logging
      add options for parallel deltarpm creation

commit bd557727992838efbf5f2b8052cbdeb64dda6704
Author: Ian McLeod <imcleod at redhat.com>
Date:   Thu Feb 20 08:48:32 2014 -0600

    tweak work queue action logging

diff --git a/createrepo/__init__.py b/createrepo/__init__.py
index c6ab31a..517ea04 100644
--- a/createrepo/__init__.py
+++ b/createrepo/__init__.py
@@ -866,9 +866,11 @@ class MetaDataGenerator:
                         with active_work_size.get_lock():
                             # As long as we have the lock we may as well refresh our view of the actual size
                             active_work_size.value += package[1]
-                            #Uncomment if you want to convince yourself that this really does keep the size sane
-                            callback_wrap.log("Inbound package size %d" % (package[1]))
-                            callback_wrap.log("Current in-flight work size: %d" % (active_work_size.value))
+                            #Turn on profiling if you want to convince yourself that this really does keep the size sane
+                            if self.conf.profile:
+                                callback_wrap.log("Adding package (%s) of size %d to deltarpm work queue" % (package[0], package[1]))
+                                callback_wrap.log("Current TOTAL in-flight work size: %d" % (active_work_size.value))
+                                callback_wrap.log("Packages remaining to process: %d" % (len(packages)-len(consumed)-1))
                             work_size_snap = active_work_size.value
                             # Note that we block here if the queue is full
                             pending_packages = work_queue.qsize() + 1
commit fa0520ffbdcfe517ad775101c2b31ceb5b87e19e
Author: Ian McLeod <imcleod at redhat.com>
Date:   Wed Feb 19 08:50:40 2014 -0600

    add options for parallel deltarpm creation
    
    In some Fedora project infrastructure use cases, the createrepo process is dominated
    by the time taken to generate deltas.  This change allows delta processing to be done
    in parallel but with limits on the total size of the packages being processed at any
    given time.

diff --git a/createrepo/__init__.py b/createrepo/__init__.py
index 85f2a3d..c6ab31a 100644
--- a/createrepo/__init__.py
+++ b/createrepo/__init__.py
@@ -28,6 +28,10 @@ import fcntl
 import subprocess
 from select import select
 
+# To support parallel deltarpms
+import multiprocessing
+import multiprocessing.managers
+
 from yum import misc, Errors
 from yum.repoMDObject import RepoMD, RepoData
 from yum.sqlutils import executeSQL
@@ -113,7 +117,10 @@ class MetaDataConfig(object):
         #self.worker_cmd = './worker.py' # helpful when testing
         self.retain_old_md = 0
         self.compress_type = 'compat'
-
+        # Parallel deltas additions
+        self.delta_workers = 1 # number of workers to fork when doing deltarpms
+        # Keep the combined payload size of all in-progress deltarpm creation below this number
+        self.max_concurrent_delta_rpm_size = self.max_delta_rpm_size
         
 class SimpleMDCallBack(object):
     def errorlog(self, thing):
@@ -718,19 +725,214 @@ class MetaDataGenerator:
             if err:
                 raise MDError, "Failed to process %d package(s)." % err
 
-            for pkgfile in pkgfiles:
-                if self.conf.deltas:
-                    try:
-                        po = self.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir)
-                        self._do_delta_rpm_package(po)
-                    except MDError, e:
-                        errorprint(e)
-                        continue
-                self.read_pkgs.append(pkgfile)
+            if self.conf.delta_workers == 1:
+                for pkgfile in pkgfiles:
+                    if self.conf.deltas:
+                        try:
+                            po = self.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir)
+                            self._do_delta_rpm_package(po)
+                        except MDError, e:
+                            errorprint(e)
+                            continue
+                    self.read_pkgs.append(pkgfile)
+            else:
+                self._parallel_deltas(pkgfiles, pkgpath, reldir)
 
         save_keptpkgs(None) # append anything left
         return self.current_pkg
 
+    def _parallel_deltas(self, pkgfiles, pkgpath, reldir):
+        class WrappedMDCallBack(object):
+            def __init__(self, log_queue):
+                self.log_queue = log_queue
+            def errorlog(self, thing):
+                self.log_queue.put([ "errorlog", os.getpid(), thing ])
+
+            def log(self, thing):
+                self.log_queue.put([ "log", os.getpid(), thing ])
+
+            def progress(self, item, current, total):
+                # progress messages in a multiprocess context are likely to just be a confusing mess
+                pass
+
+        # Init a few things that we'd rather do in the main process and then
+        # inherit in the children
+        if not hasattr(self, 'tempdir'):
+            self.tempdir = tempfile.mkdtemp()
+        self._get_old_package_dict()
+
+        # queue containing packages that are candidates for processing
+        # now within the memory constraints
+        work_queue = multiprocessing.Queue(1)
+
+        # queue containing callback messages from the workers
+        log_queue = multiprocessing.Queue()
+
+        # Event used to allow the manager, when needed, to block for a completed task in a worker
+        completion_event = multiprocessing.Event()
+
+        # wrapped callback to pass in to workers
+        callback_wrap = WrappedMDCallBack(log_queue)
+
+        # list containing the completed packages
+        # accessed in children via a Manager and proxy as each child proc
+        # will be appending as it finishes
+        manager = multiprocessing.Manager()
+        read_pkgs_proxy = manager.list()
+
+        # lists used by the package size reading workers
+        pkgfiles_proxy = manager.list(pkgfiles)
+        pkgfiles_withsize_proxy = manager.list()
+
+        # process-safe value - total size of RPM payloads being deltaed
+        # The lock for entry into this also functions as our critical section
+        # elsewhere, as changes in the "in-flight" size of deltas is the key
+        # decision point in our work queue
+        # 'L' is unsigned long
+        active_work_size = multiprocessing.Value('L',0)
+
+        # Our candidate list is the packages sorted from largest to smallest
+        # Do this with workers as well because, parallel is good
+        # Seriously though, this is also CPU-bound
+        self.callback.log("Reading package sizes in preparation for deltarpm creation")
+
+        def size_reader_entry(pkgfiles_proxy, pkgpath, reldir, pkgfiles_withsize_proxy, repo_obj):
+            while True:
+                try:
+                    pkgfile = pkgfiles_proxy.pop()
+                    po = repo_obj.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir)
+                    pkgfiles_withsize_proxy.append([ pkgfile, po.size ])
+                except IndexError:
+                    break
+
+        sort_workers = [ ]
+        for i in range(0,self.conf.delta_workers):
+            sw = multiprocessing.Process(target = size_reader_entry, args = (pkgfiles_proxy, pkgpath, reldir, pkgfiles_withsize_proxy, self))
+            sort_workers.append(sw)
+            sw.start()
+
+        for worker in sort_workers:
+            worker.join()
+
+        self.callback.log("Sorting package files by size")
+        sorted_packages = sorted(pkgfiles_withsize_proxy, key=lambda package: package[1], reverse=True)
+
+        def worker_entry(work_queue, log_queue, read_pkgs_proxy, repo_obj, callback_wrap, active_work_size, completion_event, pkgpath, reldir):
+            # We are now a new process - replace the callback with the wrapper that pushes log messages
+            # to a queue for processing in the main process
+            repo_obj.callback = callback_wrap
+            while True:
+                try:
+                    pkg = None
+                    pkg = work_queue.get()
+                    if not pkg:
+                        # The manager feeds each worker a None to indicate we are finished
+                        # this allows us to use a blocking get without fear - I think
+                        break
+                    po = repo_obj.read_in_package(pkg[0], pkgpath=pkgpath, reldir=reldir)
+                    repo_obj._do_delta_rpm_package(po)
+                except Exception, e:
+                    callback_wrap.errorlog(e)
+                    continue
+                finally:
+                    if pkg:
+                        with active_work_size.get_lock():
+                            active_work_size.value -= pkg[1]
+                            completion_event.set()
+                        read_pkgs_proxy.append(pkg)
+
+        def manager_entry(packages, active_work_size, log_queue, completion_event, work_queue, callback_wrap, read_pkgs_proxy, repo_obj, pkgpath, reldir):
+            max_work_size = repo_obj.conf.max_concurrent_delta_rpm_size
+            num_workers = repo_obj.conf.delta_workers
+            workers = [ ]
+            callback_wrap.log("Starting %d workers to process deltarpms - max total work size (%d) bytes" % (num_workers, max_work_size))
+            for i in range(0,repo_obj.conf.delta_workers):
+                wp = multiprocessing.Process(target = worker_entry, args = (work_queue, log_queue, read_pkgs_proxy, repo_obj, callback_wrap, active_work_size, completion_event, pkgpath, reldir))
+                workers.append(wp)
+                wp.start()
+
+            pending_packages = 0
+            while len(packages) > 0:
+                # Look through the package list and add things that fit under the max size limit
+                # until we reach the end of the list
+
+                # Don't read shared state for every package - it is an expensive operation
+                work_size_snap = active_work_size.value
+                #log_queue.put("Entered main loop with package list of length %d and size snap %d" % (len(packages), work_size_snap))
+                consumed = [ ]
+                for i in range(0,len(packages)):
+                    package = packages[i]
+                    if package[1] + work_size_snap < max_work_size:
+                        with active_work_size.get_lock():
+                            # As long as we have the lock we may as well refresh our view of the actual size
+                            active_work_size.value += package[1]
+                            #Uncomment if you want to convince yourself that this really does keep the size sane
+                            callback_wrap.log("Inbound package size %d" % (package[1]))
+                            callback_wrap.log("Current in-flight work size: %d" % (active_work_size.value))
+                            work_size_snap = active_work_size.value
+                            # Note that we block here if the queue is full
+                            pending_packages = work_queue.qsize() + 1
+                            consumed.append(i)
+                        # This can block - do it without the lock
+                        work_queue.put(package)
+                # Now prune the added items from the list, going backwards to ensure that we don't
+                # shift the index and delete the wrong thing
+                for i in reversed(consumed):
+                    del packages[i]
+                if len(packages) == 0:
+                    break
+
+                with active_work_size.get_lock():
+                    work_queue_size = work_queue.qsize()
+                    if pending_packages > work_queue_size:
+                        # Some work was started since we last touched the queue - try to add more
+                        # Note that this guarantees there is at least one free slot in the work_queue
+                        # This should also prevent us from constantly spinning in the package loop when
+                        # we have space in the queue but not enough active_work_size to allow us to add any
+                        # available package
+                        pending_packages = work_queue_size
+                        continue
+                    else:
+                        completion_event.clear()
+
+                # We either have too many items on the work_queue or too much total work size
+                # Wait for a worker to finish and then try again
+                completion_event.wait()
+
+            # We are done - tell the workers to stop
+            for worker in workers:
+                work_queue.put(None)
+
+            for worker in workers:
+                worker.join()
+
+            # Now signal to the main thread that we are done adding work
+            log_queue.put(None)
+
+        manager = multiprocessing.Process(target = manager_entry, args = (sorted_packages, active_work_size, log_queue, completion_event, work_queue, callback_wrap, read_pkgs_proxy, self, pkgpath, reldir))
+        manager.start()
+
+        def log_digest(callback, log_message):
+            if log_message[0] == "errorlog":
+                callback.errorlog("Worker PID(%d) - %s" % (log_message[1], log_message[2]))
+            elif log_message[0] == "log":
+                callback.log("Worker PID(%d) - %s" % (log_message[1], log_message[2]))
+            else:
+                callback.errorlog("Malformed error in queue (%s)" % (str(log_message)))
+
+        # Process log messages until we get the finished signal "None"
+        while True:
+            log_message = log_queue.get()
+            if log_message is None:
+                break
+            log_digest(self.callback, log_message)
+
+        # now empty our proxy list
+        for pkg in read_pkgs_proxy:
+            self.read_pkgs.append(pkg)
+
+        # TODO: we may be able to explicitly stop the Manager at this point
+
 
     def closeMetadataDocs(self):
         # save them up to the tmp locations:
@@ -849,19 +1051,22 @@ class MetaDataGenerator:
         # appending the output. for each of the keys in the dict, return
         # the tag for the target + each of the drpm infos + closure for the target
         # tag
-        targets = {}
         results = []
-        for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'):
-            drpm_rel_fn = os.path.normpath(self.conf.delta_relative +
-                                           '/' + drpm_fn) # this is annoying
-            drpm_po = yumbased.CreateRepoPackage(self.ts,
-                 self.conf.deltadir + '/' + drpm_fn, sumtype=self.conf.sumtype)
-
-            drpm = deltarpms.DeltaRPMPackage(drpm_po, self.conf.outputdir,
-                                             drpm_rel_fn)
-            if not targets.has_key(drpm_po.pkgtup):
-                targets[drpm_po.pkgtup] = []
-            targets[drpm_po.pkgtup].append(drpm.xml_dump_metadata())
+        if self.conf.delta_workers == 1:
+            targets = {}
+            for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'):
+                drpm_rel_fn = os.path.normpath(self.conf.delta_relative +
+                                               '/' + drpm_fn) # this is annoying
+                drpm_po = yumbased.CreateRepoPackage(self.ts,
+                     self.conf.deltadir + '/' + drpm_fn, sumtype=self.conf.sumtype)
+
+                drpm = deltarpms.DeltaRPMPackage(drpm_po, self.conf.outputdir,
+                                                 drpm_rel_fn)
+                if not targets.has_key(drpm_po.pkgtup):
+                    targets[drpm_po.pkgtup] = []
+                targets[drpm_po.pkgtup].append(drpm.xml_dump_metadata())
+        else:
+            targets = self._parallel_generate_delta_xml()
 
         for (n, a, e, v, r) in targets.keys():
             results.append("""  <newpackage name="%s" epoch="%s" version="%s" release="%s" arch="%s">\n""" % (
@@ -874,6 +1079,52 @@ class MetaDataGenerator:
 
         return ' '.join(results)
 
+    def _parallel_generate_delta_xml(self):
+        drpm_fns = [ ]
+        for drpm_fn in self.getFileList(self.conf.deltadir, '.drpm'):
+            drpm_fns.append(drpm_fn)
+
+        manager = multiprocessing.Manager()
+        drpm_fns_proxy = manager.list(drpm_fns)
+        targets_proxy = manager.dict()
+        targets_lock = manager.RLock()
+
+        def drpm_xml_entry(drpm_fns_proxy, targets_proxy, targets_lock, repo_obj):
+            while True:
+                try:
+                    drpm_fn = drpm_fns_proxy.pop()
+                    drpm_rel_fn = os.path.normpath(repo_obj.conf.delta_relative +
+                                                   '/' + drpm_fn) # this is annoying
+                    drpm_po = yumbased.CreateRepoPackage(repo_obj.ts,
+                         repo_obj.conf.deltadir + '/' + drpm_fn, sumtype=repo_obj.conf.sumtype)
+
+                    drpm = deltarpms.DeltaRPMPackage(drpm_po, repo_obj.conf.outputdir,
+                                                     drpm_rel_fn)
+
+                    with targets_lock:
+                        d_element = targets_proxy.get(drpm_po.pkgtup, [ ])
+                        d_element.append(drpm.xml_dump_metadata())
+                        # managed dict requires that we re-assign modified list rather than modify in place
+                        targets_proxy[drpm_po.pkgtup] = d_element
+                except IndexError:
+                    break
+
+        xml_workers = [ ]
+        for i in range(0,self.conf.delta_workers):
+            xw = multiprocessing.Process(target = drpm_xml_entry, args = (drpm_fns_proxy, targets_proxy, targets_lock, self))
+            xml_workers.append(xw)
+            xw.start()
+
+        for worker in xml_workers:
+            worker.join()
+
+        # I'm doing a copy in this way as I believe that prevents references to the manager from lingering
+        # TODO: Verify?
+        targets_copy = { }
+        for key in targets_proxy.keys():
+            targets_copy[key] = targets_proxy[key]
+        return targets_copy
+
     def _createRepoDataObject(self, mdfile, mdtype, compress=True, 
                               compress_type=None, attribs={}):
         """return random metadata as RepoData object to be  added to RepoMD
diff --git a/genpkgmetadata.py b/genpkgmetadata.py
index 35e7fc9..a684038 100755
--- a/genpkgmetadata.py
+++ b/genpkgmetadata.py
@@ -128,9 +128,15 @@ def parse_args(args, conf):
     parser.add_option("--max-delta-rpm-size", default=100000000,
         dest='max_delta_rpm_size', type='int',
         help="max size of an rpm that to run deltarpm against (in bytes)")
+    parser.add_option("--max-concurrent-delta-rpm-size", default=100000000,
+        dest='max_concurrent_delta_rpm_size', type='int',
+        help="max total payload size of concurrent deltarpm runs (in bytes)")
     parser.add_option("--workers", default=def_workers,
         dest='workers', type='int',
         help="number of workers to spawn to read rpms")
+    parser.add_option("--delta-workers", default=1,
+        dest='delta_workers', type='int',
+        help="number of workers to spawn to create delta rpms")
     parser.add_option("--xz", default=False,
         action="store_true",
         help=SUPPRESS_HELP)
@@ -155,6 +161,12 @@ def parse_args(args, conf):
     if opts.workers >= 128:
         errorprint(_('Warning: More than 128 workers is a lot. Limiting.'))
         opts.workers = 128
+    if opts.delta_workers > opts.workers:
+        errorprint(_('Warning: Requested more delta workers than workers. This is insane. Limiting.'))
+        opts.delta_workers = opts.workers
+    if opts.max_concurrent_delta_rpm_size < opts.max_delta_rpm_size:
+        errorprint(_('Warning: max_concurrent_delta_rpm_size < max_delta_rpm_size - this will deadlock. Setting them to the same value.'))
+        opts.max_concurrent_delta_rpm_size = opts.max_delta_rpm_size
     if opts.sumtype == 'sha1':
         errorprint(_('Warning: It is more compatible to use sha instead of sha1'))
 


More information about the Rpm-metadata mailing list