[Yum-devel] [PATCH 6/6] Implement parallel downloads for regular RPMs

Zdeněk Pavlas zpavlas at redhat.com
Wed Jul 13 16:50:38 UTC 2011


---
 yum/__init__.py |  112 ++++++++++++++++++++++++++++++++++++++++++++-----------
 yum/config.py   |    1 +
 2 files changed, 91 insertions(+), 22 deletions(-)

diff --git a/yum/__init__.py b/yum/__init__.py
index 82cd6f1..119b55f 100644
--- a/yum/__init__.py
+++ b/yum/__init__.py
@@ -86,6 +86,7 @@ from yum.i18n import to_unicode, to_str
 
 import string
 import StringIO
+import select
 
 from weakref import proxy as weakref
 
@@ -1909,6 +1910,35 @@ class YumBase(depsolve.Depsolve):
         
         return 0
 
+    def downloadProcess(self, r, w, ctx):
+        """The downloading process.
+
+        :param r: pipe to read requests from.
+        :param w: pipe to write replies to.
+        :param ctx: read-only context (the package list)
+        """
+
+        # put the selinux stuff here
+        # if selinux_enabled():
+        #    setcontext('user_u:user_r:yum_untrusted_t')
+
+        reply = ' '
+        while 1:
+            # get request
+            os.write(w, reply)
+            i = os.read(r, 4096)
+            if i == 'E': break
+
+            # get the package
+            po = ctx[int(i) - 1]
+            text = os.path.basename(po.relativepath)
+            if len(ctx) > 1:
+                text = '(%s/%s): %s' % (i, len(ctx), text)
+            try:
+                reply = po.repo.getPackage(po, text=text)
+            except Errors.RepoError, e:
+                reply = '!' + str(e)
+
     def downloadPkgs(self, pkglist, callback=None, callback_total=None):
         def mediasort(apo, bpo):
             # FIXME: we should probably also use the mediaid; else we
@@ -1973,6 +2003,52 @@ class YumBase(depsolve.Depsolve):
                 remote_size += po.size
 
         remote_pkgs.sort(mediasort)
+
+        # build the process pool
+        # readable fds are keys, writable fds are values
+        # no need to store PIDs (yet)
+
+        pool = {}
+        n = self.conf.parallel
+        n = max(n, 1)
+        n = min(n, len(remote_pkgs))
+        if n > 1:
+            self.verbose_logger.log(logginglevels.INFO_2, 
+                _('Using %d download processes') % n)
+        for i in range(n):
+            # need two pipes per process
+            A = os.pipe()
+            B = os.pipe()
+            if os.fork() == 0:
+                # child: reads B, writes A
+                os.close(A[0])
+                os.close(B[1])
+                self.downloadProcess(B[0], A[1], remote_pkgs)
+                os._exit(0)
+            # parent: reads A, writes B
+            os.close(A[1])
+            os.close(B[0])
+            pool[A[0]] = B[1]
+
+        curr = {} # maps process fd to the current po
+
+        # waits for an idle process, then sends it a new job.
+        # also handles finished jobs.
+
+        def dispatch(job):
+            fd = select.select(pool, [], [])[0][0]
+            msg = os.read(fd, 4096)
+            po = curr.get(fd)
+            if po:
+                if msg[0] == '!':
+                    # got an exception
+                    adderror(po, msg[1:])
+                else:
+                    # all ok
+                    po.localpath = msg
+            os.write(pool[fd], job)
+            return fd
+
         #  This is kind of a hack and does nothing in non-Fedora versions,
         # we'll fix it one way or anther soon.
         if (hasattr(urlgrabber.progress, 'text_meter_total_size') and
@@ -1983,7 +2059,6 @@ class YumBase(depsolve.Depsolve):
         local_size = 0
         for po in remote_pkgs:
             i += 1
-            checkfunc = (self.verifyPkg, (po, 1), {})
             dirstat = os.statvfs(po.repo.pkgdir)
             if (dirstat.f_bavail * dirstat.f_bsize) <= long(po.size):
                 adderror(po, _('Insufficient space in download directory %s\n'
@@ -1994,30 +2069,23 @@ class YumBase(depsolve.Depsolve):
                           format_number(po.size)))
                 continue
             
-            try:
-                if i == 1 and not local_size and remote_size == po.size:
-                    text = os.path.basename(po.relativepath)
-                else:
-                    text = '(%s/%s): %s' % (i, len(remote_pkgs),
-                                            os.path.basename(po.relativepath))
-                mylocal = po.repo.getPackage(po,
-                                   checkfunc=checkfunc,
-                                   text=text,
-                                   cache=po.repo.http_caching != 'none',
-                                   )
-                local_size += po.size
-                if hasattr(urlgrabber.progress, 'text_meter_total_size'):
-                    urlgrabber.progress.text_meter_total_size(remote_size,
-                                                              local_size)
-            except Errors.RepoError, e:
-                adderror(po, str(e))
-            else:
-                po.localpath = mylocal
-                if po in errors:
-                    del errors[po]
+            fd = dispatch(str(i))
+            curr[fd] = po
+            local_size += po.size
+            if hasattr(urlgrabber.progress, 'text_meter_total_size'):
+                urlgrabber.progress.text_meter_total_size(remote_size,
+                                                          local_size)
 
         if hasattr(urlgrabber.progress, 'text_meter_total_size'):
             urlgrabber.progress.text_meter_total_size(0)
+
+        # tear down the process pool
+        while pool:
+            fd = dispatch('E')
+            os.close(fd)
+            os.close(pool.pop(fd))
+            os.wait()
+
         if callback_total is not None and not errors:
             callback_total(remote_pkgs, remote_size, beg_download)
 
diff --git a/yum/config.py b/yum/config.py
index cb7ed57..6e2498f 100644
--- a/yum/config.py
+++ b/yum/config.py
@@ -686,6 +686,7 @@ class YumConf(StartupConf):
 
     bandwidth = BytesOption(0)
     throttle = ThrottleOption(0)
+    parallel = IntOption(0)
 
     http_caching = SelectionOption('all', ('none', 'packages', 'all'))
     metadata_expire = SecondsOption(60 * 60 * 6) # Time in seconds (6h).
-- 
1.7.4.4



More information about the Yum-devel mailing list