[Yum-devel] [PATCH] download DRPMs in parallel, if possible

Zdenek Pavlas zpavlas at redhat.com
Tue Jun 5 14:03:58 UTC 2012


This is very similar to what we do in YumBase.downloadPkgs().

If urlgrabber supports parallel downloading and we have compatible
Yum and repo._getFile(), just add async and failfunc arguments.

As _getFile(async=True) returns *before* the file is downloaded,
most of the post-processing was being moved to checkfunc.
Error handling is duplicated in failfunc, because we can delay
a callback, but not raising an exception.

To make this work, recent builds if yum and urlgrabber are needed
http://lists.fedoraproject.org/pipermail/devel/2012-May/167122.html
but we are still compatible with the old Yum, too.

---
 yum-presto/presto.py |   71 ++++++++++++++++++++++---------------------------
 1 files changed, 32 insertions(+), 39 deletions(-)

diff --git a/yum-presto/presto.py b/yum-presto/presto.py
index 0461c8e..cac5cdd 100644
--- a/yum-presto/presto.py
+++ b/yum-presto/presto.py
@@ -358,64 +358,57 @@ def downloadPkgs(conduit, presto, download_pkgs=None):
 
     # now we need to do downloads
     i = 0
-    local_size = 0
+    _size = [0, 0] # local_size, rebuild_size
+    async = hasattr(urlgrabber.grabber, 'parallel_wait')
     for (po, delta) in remote_pkgs:
         i += 1
-        # FIXME: verifyChecksum should handle the urlgrabber objects...
-        checkfunc = (lambda fo, csumtype, csum:
-                     conduit._base.verifyChecksum(fo.filename, csumtype, csum),
-                     (delta['checksum_type'],
-                      delta['checksum']), {})
-
-        deltadir = os.path.join(po.repo.cachedir, 'deltas')
-        deltapath = os.path.join(deltadir,
-                                 os.path.basename(delta['filename']))
-
-        # FIXME: this should be moved into _getFile
-        dirstat = os.statvfs(deltadir)
-        delta_size = delta['size']
-        if (dirstat.f_bavail * dirstat.f_bsize) <= (long(po.size) + delta_size):
-            adderror(po, _('Insufficient space in download directory %s '
-                    'to download') % (deltadir,))
-            continue
+        text = os.path.basename(delta['filename'])
+        deltapath = os.path.join(po.repo.cachedir, 'deltas', text)
+
+        def checkfunc(obj, d=delta, po=po, dp=deltapath):
+            # FIXME: verifyChecksum should handle the urlgrabber objects...
+            conduit._base.verifyChecksum(obj.filename, d['checksum_type'], d['checksum'])
+            _size[0] += d['size']
+            _size[1] += po.size
+            if hasattr(urlgrabber.progress, 'text_meter_total_size'):
+                urlgrabber.progress.text_meter_total_size(remote_size, _size[0])
+
+            #  Magic: What happens here is that the checksum data is loaded
+            # from the SQL db in this thread, so that the other thread can call
+            # .verifyLocalPkg() without having to hit sqlite.
+            po.returnChecksums()
+            queue.put((conduit, po, dp, d['size']))
+            if po in errors:
+                del errors[po]
 
-        if hasattr(urlgrabber.progress, 'text_meter_total_size'):
-            urlgrabber.progress.text_meter_total_size(remote_size, local_size)
+        kwargs = {}
+        if async and getattr(po.repo, '_async', None):
+            kwargs['failfunc'] = lambda obj, po=po: adderror(po, str(obj.exception))
+            kwargs['async'] = True
+        elif not (i == 1 and not _size[0] and remote_size == delta['size']):
+            text = '(%s/%s): %s' % (i, len(remote_pkgs), text)
         try:
-            if i == 1 and not local_size and remote_size == delta_size:
-                text = os.path.basename(delta['filename'])
-            else:
-                text = "(%s/%s): %s" % (i, len(remote_pkgs),
-                                        os.path.basename(delta['filename']))
             deltafile = po.repo._getFile(url=po.basepath,
                                     relative=delta['filename'],
                                     local=deltapath,
                                     checkfunc=checkfunc,
                                     text=text,
-                                    cache=po.repo.cache)
-            local_size += delta_size
-            if hasattr(urlgrabber.progress, 'text_meter_total_size'):
-                urlgrabber.progress.text_meter_total_size(remote_size,
-                                                          local_size)
+                                    size=delta['size'],
+                                    cache=po.repo.cache,
+                                    **kwargs)
         except yum.Errors.RepoError, e:
             adderror(po, str(e))
         else:
             # HACK: Use the download progress, at least we get something
             cb = po.repo.callback
-            #  Magic: What happens here is that the checksum data is loaded
-            # from the SQL db in this thread, so that the other thread can call
-            # .verifyLocalPkg() without having to hit sqlite.
-            po.returnChecksums()
-            queue.put((conduit, po, deltafile, delta['size']))
-            rebuild_size += po.size
-
-            if errors.has_key(po):
-                del errors[po]
 
         # Check for waiting messages from building threads
         while not messages.empty():
             conduit.info(2, messages.get())
 
+    if async:
+        urlgrabber.grabber.parallel_wait()
+    rebuild_size += _size[1]
     if hasattr(urlgrabber.progress, 'text_meter_total_size'):
         urlgrabber.progress.text_meter_total_size(0)
     ptsz, pfsz, pfnsz = _processing_data()
-- 
1.7.4.4



More information about the Yum-devel mailing list