[yum-commits] Branch 'multi-downloader' - 5 commits - urlgrabber/grabber.py urlgrabber/mirror.py

zpavlas at osuosl.org zpavlas at osuosl.org
Fri Dec 9 13:16:33 UTC 2011


 urlgrabber/grabber.py |  248 ++++++++++++--------------------------------------
 urlgrabber/mirror.py  |    2 
 2 files changed, 63 insertions(+), 187 deletions(-)

New commits:
commit d735e596821f292fcb30426dfe5a2b884566b903
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri Dec 9 14:14:47 2011 +0100

    Use first 5 mirrors instead of just the 1st one.

diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py
index 41c99f5..853790b 100644
--- a/urlgrabber/mirror.py
+++ b/urlgrabber/mirror.py
@@ -403,6 +403,8 @@ class MirrorGroup:
                 # async code iterates mirrors and calls failfunc
                 kwargs['mirror_group'] = self, gr, mirrorchoice
                 kwargs['failfunc'] = gr.kw.get('failfunc', _do_raise)
+                # increment master
+                self._next = (self._next + 1) % min(len(self.mirrors), 5)
             try:
                 return func_ref( *(fullurl,), **kwargs )
             except URLGrabError, e:
commit e7917b12e871e700eed09ea89852ad69bc4a6b76
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri Dec 9 14:13:44 2011 +0100

    Revert "move pycurl.error handling to _do_perform_exc()"
    
    This reverts commit 54ee979ada490cdae12fff038a838602fa404075.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 8c45ff2..88bada6 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1340,14 +1340,8 @@ class PyCurlFileObject(object):
             return
         
         try:
-            e = None
             self.curl_obj.perform()
-        except pycurl.error, e: pass
-        self._do_perform_exc(e)
-
-    def _do_perform_exc(self, e):
-        # handle pycurl exception 'e'
-        if e:
+        except pycurl.error, e:
             # XXX - break some of these out a bit more clearly
             # to other URLGrabErrors from 
             # http://curl.haxx.se/libcurl/c/libcurl-errors.html
commit 6292510b70b16513449d602d9cea4ffc3625f5df
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri Dec 9 14:13:42 2011 +0100

    Revert "move opening of target file to _do_open_fo()."
    
    This reverts commit eb365596a44bd8ca15fe290d50290f0f84a268dd.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 05a5c51..8c45ff2 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1601,7 +1601,22 @@ class PyCurlFileObject(object):
         _was_filename = False
         if type(self.filename) in types.StringTypes and self.filename:
             _was_filename = True
-            self._do_open_fo()
+            self._prog_reportname = str(self.filename)
+            self._prog_basename = os.path.basename(self.filename)
+            
+            if self.append: mode = 'ab'
+            else: mode = 'wb'
+
+            if DEBUG: DEBUG.info('opening local file "%s" with mode %s' % \
+                                 (self.filename, mode))
+            try:
+                self.fo = open(self.filename, mode)
+            except IOError, e:
+                err = URLGrabError(16, _(\
+                  'error opening local file from %s, IOError: %s') % (self.url, e))
+                err.url = self.url
+                raise err
+
         else:
             self._prog_reportname = 'MEMORY'
             self._prog_basename = 'MEMORY'
@@ -1659,22 +1674,6 @@ class PyCurlFileObject(object):
 
         self._complete = True
     
-    def _do_open_fo(self):
-        self._prog_reportname = str(self.filename)
-        self._prog_basename = os.path.basename(self.filename)
-        if self.append: mode = 'ab'
-        else: mode = 'wb'
-
-        if DEBUG: DEBUG.info('opening local file "%s" with mode %s' % \
-                             (self.filename, mode))
-        try:
-            self.fo = open(self.filename, mode)
-        except IOError, e:
-            err = URLGrabError(16, _(\
-              'error opening local file from %s, IOError: %s') % (self.url, e))
-            err.url = self.url
-            raise err
-
     def _fill_buffer(self, amt=None):
         """fill the buffer to contain at least 'amt' bytes by reading
         from the underlying file object.  If amt is None, then it will
commit cceba103aa81e18b4dbe454092bdb1cb10aad8c8
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri Dec 9 14:13:33 2011 +0100

    Revert "move closing of target file to _do_close_fo()."
    
    This reverts commit 5443754de676ed4b173502522143460ae8a50013.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 649d235..05a5c51 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1621,7 +1621,29 @@ class PyCurlFileObject(object):
             raise e
     
         if _was_filename:
-            self._do_close_fo()
+            # close it up
+            self.fo.flush()
+            self.fo.close()
+
+            # Set the URL where we got it from:
+            if xattr is not None:
+                # See: http://www.freedesktop.org/wiki/CommonExtendedAttributes
+                try:
+                    xattr.set(self.filename, 'user.xdg.origin.url', self.url)
+                except:
+                    pass # URL too long. = IOError ... ignore everything.
+
+            # set the time
+            mod_time = self.curl_obj.getinfo(pycurl.INFO_FILETIME)
+            if mod_time != -1:
+                try:
+                    os.utime(self.filename, (mod_time, mod_time))
+                except OSError, e:
+                    err = URLGrabError(16, _(\
+                      'error setting timestamp on file %s from %s, OSError: %s') 
+                              % (self.filename, self.url, e))
+                    err.url = self.url
+                    raise err
             # re open it
             try:
                 self.fo = open(self.filename, 'r')
@@ -1653,31 +1675,6 @@ class PyCurlFileObject(object):
             err.url = self.url
             raise err
 
-    def _do_close_fo(self):
-        # close it up
-        self.fo.flush()
-        self.fo.close()
-
-        # Set the URL where we got it from:
-        if xattr is not None:
-            # See: http://www.freedesktop.org/wiki/CommonExtendedAttributes
-            try:
-                xattr.set(self.filename, 'user.xdg.origin.url', self.url)
-            except:
-                pass # URL too long. = IOError ... ignore everything.
-
-        # set the time
-        mod_time = self.curl_obj.getinfo(pycurl.INFO_FILETIME)
-        if mod_time != -1:
-            try:
-                os.utime(self.filename, (mod_time, mod_time))
-            except OSError, e:
-                err = URLGrabError(16, _(\
-                  'error setting timestamp on file %s from %s, OSError: %s') 
-                          % (self.filename, self.url, e))
-                err.url = self.url
-                raise err
-
     def _fill_buffer(self, amt=None):
         """fill the buffer to contain at least 'amt' bytes by reading
         from the underlying file object.  If amt is None, then it will
commit 64de088cdeda737541d40f07f0bb5228cdbddc39
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri Dec 9 14:11:36 2011 +0100

    assume ug_mode == 'pooled'

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 0400e8f..649d235 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -563,13 +563,6 @@ _log_package_state()
 def _(st):
     return st
 
-# Downloader modes:
-# 'compat': no parallel downloads
-# 'direct': use CurlMulti directly
-# 'extern': fork+exec a process that uses CurlMulti
-# 'pooled': multiple fork+exec'd processes
-ug_mode = os.environ.get('URLGRABBER_MODE', 'pooled')
-
 ########################################################################
 #                 END MODULE INITIALIZATION
 ########################################################################
@@ -1036,7 +1029,7 @@ class URLGrabber(object):
                     _run_callback(opts.checkfunc, obj)
                 return path
         
-        if opts.async and ug_mode != 'compat':
+        if opts.async:
             opts.url = url
             opts.filename = filename
             opts.size = int(opts.size or 0)
@@ -1859,105 +1852,6 @@ def retrygrab(url, filename=None, copy_local=0, close_connection=0,
 
         
 #####################################################################
-#  Downloader
-#####################################################################
-
-class _AsyncCurlFile(PyCurlFileObject):
-    curl_cache = {}
-    def _host(self):
-        return urlparse.urlsplit(self.opts.url).netloc
-
-    def _do_open(self):
-        # try to reuse curl objects
-        curl = self.curl_cache.pop(self._host(), None)
-        self.curl_obj = curl or pycurl.Curl()
-        self._set_opts()
-        self._do_open_fo() # open the file but don't grab
-
-    def _do_close_fo(self):
-        self.curl_cache[self._host()] = self.curl_obj
-        PyCurlFileObject._do_close_fo(self)
-
-class _DirectDownloaderMulti:
-    def __init__(self):
-        ''' A downloader context.
-        '''
-        self.running = {}
-        self.multi = pycurl.CurlMulti()
-        self.tout = 0
-
-    def select(self):
-        ''' Block for some time.  Return True if 'stdin' readable,
-            False when just internal processing needed.
-        '''
-        fdset = self.multi.fdset()
-        fdset[0].append(0) # stdin
-        fdset = select.select(*(fdset + (self.tout,)))
-        self.tout = min(self.tout * 1.1, 5)
-        return 0 in fdset[0]
-
-    def start(self, opts):
-        ''' Start download of job 'opts'
-        '''
-        fo = _AsyncCurlFile(opts.url, opts.filename, opts)
-        self.running[fo.curl_obj] = fo
-        self.multi.add_handle(fo.curl_obj)
-
-        # XXX: likely a CurlMulti() bug
-        # fdset() is empty shortly after starting new request.
-        # Do some polling to work this around.
-        self.tout = 10e-3
-
-    def perform(self):
-        ''' Run downloads, return finished ones.
-        '''
-        while self.multi.perform()[0] == pycurl.E_CALL_MULTI_PERFORM:
-            pass
-        ret = []
-        _, finished, failed = self.multi.info_read()
-        for curl in finished + failed:
-            curl_err = None
-            if type(curl) == tuple:
-                curl, code, msg = curl
-                curl_err = pycurl.error(code, msg)
-            self.multi.remove_handle(curl)
-            fo = self.running.pop(curl)
-            try: ug_err = None; fo._do_perform_exc(curl_err)
-            except URLGrabError, ug_err: pass
-            fo._do_close_fo()
-            ret.append((fo.opts, ug_err, fo._amount_read))
-        return ret
-
-    def abort(self):
-        ''' Abort currently active downloads.
-        '''
-        while self.running:
-            curl, fo = self.running.popitem()
-            self.multi.remove_handle(curl)
-            fo._do_close_fo()
-
-class _DirectDownloaderSingle:
-    ''' _DirectDownloader downloading one file at a time.
-    '''
-    def select(self):
-        return True
-
-    def start(self, opts):
-        try:
-            fo = PyCurlFileObject(opts.url, opts.filename, opts)
-            fo._do_grab(); _amount_read = fo._amount_read
-            fo.fo.close(); ug_err = None
-        except URLGrabError, ug_err:
-            _amount_read = 0
-        self.result = opts, ug_err, _amount_read
-
-    def perform(self):
-        return [self.result]
-
-    def abort(self):
-        pass
-
-#####################################################################
 #  Serializer + parser: A replacement of the rather bulky Json code.
 #
 # - handles basic python literals, lists and tuples.
@@ -2058,33 +1952,28 @@ def download_process():
         - use ProxyProgress to send _amount_read during dl.
         - abort on EOF.
     '''
-    if   ug_mode == 'extern': dl = _DirectDownloaderMulti()
-    elif ug_mode == 'pooled': dl = _DirectDownloaderSingle()
-    else: raise ValueError, 'Unknown ext mode %s' % ug_mode
-
     cnt = 0
     while True:
-        if dl.select():
-            lines = _readlines(0)
-            if not lines: break
-            for line in lines:
-                # start new download
-                cnt += 1
-                opts = URLGrabberOptions()
-                opts._id = cnt
-                for k in line.split(' '):
-                    k, v = k.split('=', 1)
-                    setattr(opts, k, _loads(v))
-                if opts.progress_obj:
-                    opts.progress_obj = _ProxyProgress()
-                    opts.progress_obj._id = cnt
-                dl.start(opts)
-
-        # perform requests
-        for opts, ug_err, _amount_read in dl.perform():
-            ug_err = ug_err and '%d %s' % ug_err.args or 'OK'
+        lines = _readlines(0)
+        if not lines: break
+        for line in lines:
+            cnt += 1
+            opts = URLGrabberOptions()
+            opts._id = cnt
+            for k in line.split(' '):
+                k, v = k.split('=', 1)
+                setattr(opts, k, _loads(v))
+            if opts.progress_obj:
+                opts.progress_obj = _ProxyProgress()
+                opts.progress_obj._id = cnt
+            try:
+                fo = PyCurlFileObject(opts.url, opts.filename, opts)
+                fo._do_grab(); _amount_read = fo._amount_read
+                fo.fo.close(); ug_err = 'OK'
+            except URLGrabError, e:
+                _amount_read = 0
+                ug_err = '%d %s' % e.args
             os.write(1, '%d %d %s\n' % (opts._id, _amount_read, ug_err))
-    dl.abort()
     sys.exit(0)
 
 import subprocess
@@ -2217,8 +2106,6 @@ def parallel_wait(meter = 'text'):
     '''Process queued requests in parallel.
     '''
 
-    if ug_mode == 'compat':
-        return
     if meter:
         count = total = 0
         for limit, queue in _async.values():
@@ -2230,10 +2117,7 @@ def parallel_wait(meter = 'text'):
             meter = TextMultiFileMeter()
         meter.start(count, total)
 
-    if   ug_mode == 'direct': dl = _DirectDownloaderMulti()
-    elif ug_mode == 'extern': dl = _ExternalDownloader()
-    elif ug_mode == 'pooled': dl = _ExternalDownloaderPool()
-    else: raise ValueError, 'Unknown mode %s' % ug_mode
+    dl = _ExternalDownloaderPool()
 
     def start(opts, tries):
         opts.tries = tries


More information about the Yum-commits mailing list