[Yum-devel] [PATCH 3/3] Implement getPackageAsync() and getPackageDone()

Zdeněk Pavlas zpavlas at redhat.com
Wed Jul 27 12:23:52 UTC 2011


A simple interface to external downloading processes, with
limited support for transparent retries and mirrorlist cycling.
---
 yum/yumRepo.py |   79 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 79 insertions(+), 0 deletions(-)

diff --git a/yum/yumRepo.py b/yum/yumRepo.py
index e5e9ece..e1ec8e8 100644
--- a/yum/yumRepo.py
+++ b/yum/yumRepo.py
@@ -48,6 +48,7 @@ import shutil
 import stat
 import errno
 import tempfile
+import select, fcntl
 
 #  If you want yum to _always_ check the MD .sqlite files then set this to
 # False (this doesn't affect .xml files or .sqilte files derived from them).
@@ -285,6 +286,7 @@ class YumRepository(Repository, config.RepoConf):
 
         self._grabfunc = None
         self._grab = None
+        self._down_jobs = {}
 
     def __cmp__(self, other):
         """ Sort yum repos. by cost, and then by alphanumeric on their id. """
@@ -866,6 +868,83 @@ class YumRepository(Repository, config.RepoConf):
                         size=package.size,
                         )
 
+    # repositories with pending downloads
+    _down_repos = set()
+
+    def getIdleProcess(self):
+        # find an idle download process for this repository
+        while 1:
+            rfd = select.select(self._down_jobs, [], [])[0][0]
+            wfd, po, checkfunc, i = self._down_jobs[rfd]
+
+            # check error level
+            error = int(os.read(rfd, 4096))
+            if error == 0 or i == self.retries:
+                break
+
+            # move to other mirror
+            i += 1
+            os.write(wfd, self.urls[i % len(self.urls)] + po.relativepath + '\n')
+            self._down_jobs[rfd] = wfd, po, checkfunc, i
+
+        # call 'checkfunc' 
+        checkfunc(po, error)
+        return rfd, wfd
+
+    def getPackageAsync(self, po, checkfunc, text=None, parallel=2):
+        ''' Start downloading package 'po'.
+
+        Sends the download request to an idle download process,
+        or create a new such process.  Callback 'checkfunc' is
+        called when download finished.
+        '''
+
+        # use repo-specific value if provided
+        parallel = max(1, self.parallel or parallel)
+        if len(self._down_jobs) < parallel:
+            # new downloading process
+            A = os.pipe()
+            B = os.pipe()
+            if os.fork() == 0:
+                # child: stdin from B, stdout to A
+                os.close(B[1]); os.dup2(B[0], 0)
+                os.close(A[0]); os.dup2(A[1], 1)
+                os.chdir(self.pkgdir)
+                # make a config option for this?
+                os.execl('/usr/local/bin/yum-down.sh', '')
+
+            # parent: read A, write B
+            os.close(A[1]); rfd = A[0]
+            os.close(B[0]); wfd = B[1]
+
+            # we fork/exec later, and keeping 'wfd' in childs
+            # would avoid seeing an EOF when we close it.
+            f = fcntl.fcntl(wfd, fcntl.F_GETFD)
+            fcntl.fcntl(wfd, fcntl.F_SETFD, f | fcntl.FD_CLOEXEC) 
+        else:
+            rfd, wfd = self.getIdleProcess()
+
+        # Send the request to selected process
+        os.write(wfd, self.urls[0] + po.relativepath + '\n')
+        self._down_jobs[rfd] = wfd, po, checkfunc, 0
+        self._down_repos.add(self)
+
+    @classmethod
+    def getPackageDone(self):
+        ''' Wait for all repos to finish downloading.
+        '''
+
+        for self in self._down_repos:
+            while self._down_jobs:
+                # remove one process from dict
+                rfd, wfd = self.getIdleProcess()
+                del self._down_jobs[rfd]
+                # make it exit and wait
+                os.close(rfd)
+                os.close(wfd)
+                os.wait()
+        self._down_repos.clear()
+
     def getHeader(self, package, checkfunc = None, reget = 'simple',
             cache = True):
 
-- 
1.7.4.4



More information about the Yum-devel mailing list