[Yum-devel] [PATCH] download DRPMs in parallel, if possible
Zdenek Pavlas
zpavlas at redhat.com
Tue Jun 5 14:03:58 UTC 2012
This is very similar to what we do in YumBase.downloadPkgs().
If urlgrabber supports parallel downloading and we have compatible
Yum and repo._getFile(), just add async and failfunc arguments.
As _getFile(async=True) returns *before* the file is downloaded,
most of the post-processing was being moved to checkfunc.
Error handling is duplicated in failfunc, because we can delay
a callback, but not raising an exception.
To make this work, recent builds if yum and urlgrabber are needed
http://lists.fedoraproject.org/pipermail/devel/2012-May/167122.html
but we are still compatible with the old Yum, too.
---
yum-presto/presto.py | 71 ++++++++++++++++++++++---------------------------
1 files changed, 32 insertions(+), 39 deletions(-)
diff --git a/yum-presto/presto.py b/yum-presto/presto.py
index 0461c8e..cac5cdd 100644
--- a/yum-presto/presto.py
+++ b/yum-presto/presto.py
@@ -358,64 +358,57 @@ def downloadPkgs(conduit, presto, download_pkgs=None):
# now we need to do downloads
i = 0
- local_size = 0
+ _size = [0, 0] # local_size, rebuild_size
+ async = hasattr(urlgrabber.grabber, 'parallel_wait')
for (po, delta) in remote_pkgs:
i += 1
- # FIXME: verifyChecksum should handle the urlgrabber objects...
- checkfunc = (lambda fo, csumtype, csum:
- conduit._base.verifyChecksum(fo.filename, csumtype, csum),
- (delta['checksum_type'],
- delta['checksum']), {})
-
- deltadir = os.path.join(po.repo.cachedir, 'deltas')
- deltapath = os.path.join(deltadir,
- os.path.basename(delta['filename']))
-
- # FIXME: this should be moved into _getFile
- dirstat = os.statvfs(deltadir)
- delta_size = delta['size']
- if (dirstat.f_bavail * dirstat.f_bsize) <= (long(po.size) + delta_size):
- adderror(po, _('Insufficient space in download directory %s '
- 'to download') % (deltadir,))
- continue
+ text = os.path.basename(delta['filename'])
+ deltapath = os.path.join(po.repo.cachedir, 'deltas', text)
+
+ def checkfunc(obj, d=delta, po=po, dp=deltapath):
+ # FIXME: verifyChecksum should handle the urlgrabber objects...
+ conduit._base.verifyChecksum(obj.filename, d['checksum_type'], d['checksum'])
+ _size[0] += d['size']
+ _size[1] += po.size
+ if hasattr(urlgrabber.progress, 'text_meter_total_size'):
+ urlgrabber.progress.text_meter_total_size(remote_size, _size[0])
+
+ # Magic: What happens here is that the checksum data is loaded
+ # from the SQL db in this thread, so that the other thread can call
+ # .verifyLocalPkg() without having to hit sqlite.
+ po.returnChecksums()
+ queue.put((conduit, po, dp, d['size']))
+ if po in errors:
+ del errors[po]
- if hasattr(urlgrabber.progress, 'text_meter_total_size'):
- urlgrabber.progress.text_meter_total_size(remote_size, local_size)
+ kwargs = {}
+ if async and getattr(po.repo, '_async', None):
+ kwargs['failfunc'] = lambda obj, po=po: adderror(po, str(obj.exception))
+ kwargs['async'] = True
+ elif not (i == 1 and not _size[0] and remote_size == delta['size']):
+ text = '(%s/%s): %s' % (i, len(remote_pkgs), text)
try:
- if i == 1 and not local_size and remote_size == delta_size:
- text = os.path.basename(delta['filename'])
- else:
- text = "(%s/%s): %s" % (i, len(remote_pkgs),
- os.path.basename(delta['filename']))
deltafile = po.repo._getFile(url=po.basepath,
relative=delta['filename'],
local=deltapath,
checkfunc=checkfunc,
text=text,
- cache=po.repo.cache)
- local_size += delta_size
- if hasattr(urlgrabber.progress, 'text_meter_total_size'):
- urlgrabber.progress.text_meter_total_size(remote_size,
- local_size)
+ size=delta['size'],
+ cache=po.repo.cache,
+ **kwargs)
except yum.Errors.RepoError, e:
adderror(po, str(e))
else:
# HACK: Use the download progress, at least we get something
cb = po.repo.callback
- # Magic: What happens here is that the checksum data is loaded
- # from the SQL db in this thread, so that the other thread can call
- # .verifyLocalPkg() without having to hit sqlite.
- po.returnChecksums()
- queue.put((conduit, po, deltafile, delta['size']))
- rebuild_size += po.size
-
- if errors.has_key(po):
- del errors[po]
# Check for waiting messages from building threads
while not messages.empty():
conduit.info(2, messages.get())
+ if async:
+ urlgrabber.grabber.parallel_wait()
+ rebuild_size += _size[1]
if hasattr(urlgrabber.progress, 'text_meter_total_size'):
urlgrabber.progress.text_meter_total_size(0)
ptsz, pfsz, pfnsz = _processing_data()
--
1.7.4.4
More information about the Yum-devel
mailing list