[yum-commits] Branch 'external-multi-downloader' - 3 commits - urlgrabber/grabber.py

zpavlas at osuosl.org zpavlas at osuosl.org
Tue Dec 6 15:13:16 UTC 2011


 urlgrabber/grabber.py |  110 +++++++++++++++++++++++++++-----------------------
 1 file changed, 61 insertions(+), 49 deletions(-)

New commits:
commit f8ce663305605fd92162d823721f417e21d7716b
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Tue Dec 6 16:09:44 2011 +0100

    urlgrabber downloader mode selection
    
    Removed the parallel_wait() argument 'external',
    and the compile-time constant AVOID_CURL_MULTI.
    
    URLGRABBER_MODE selects the downloader mode:
    * 'compat': no parallel downloads
    * 'direct': use CurlMulti directly
    * 'extern': fork+exec a process that uses CurlMulti
    * 'pooled': multiple fork+exec'd processes

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 1c0b6b4..cf4110c 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -563,6 +563,13 @@ _log_package_state()
 def _(st):
     return st
 
+# Downloader modes:
+# 'compat': no parallel downloads
+# 'direct': use CurlMulti directly
+# 'extern': fork+exec a process that uses CurlMulti
+# 'pooled': multiple fork+exec'd processes
+ug_mode = os.environ.get('URLGRABBER_MODE', 'pooled')
+
 ########################################################################
 #                 END MODULE INITIALIZATION
 ########################################################################
@@ -1029,7 +1036,7 @@ class URLGrabber(object):
                     _run_callback(opts.checkfunc, obj)
                 return path
         
-        if opts.async:
+        if opts.async and ug_mode != 'compat':
             opts.url = url
             opts.filename = filename
             opts.size = int(opts.size or 0)
@@ -2045,19 +2052,16 @@ def _readlines(fd):
         buf += os.read(fd, 4096)
     return buf[:-1].split('\n')
 
-# Set this flag to 'True' to avoid using pycurl.CurlMulti()
-AVOID_CURL_MULTI = True
-
 def download_process():
     ''' Download process
         - watch stdin for new requests, parse & issue em.
         - use ProxyProgress to send _amount_read during dl.
         - abort on EOF.
     '''
-    if AVOID_CURL_MULTI:
-        dl = _DirectDownloaderSingle()
-    else:
-        dl = _DirectDownloaderMulti()
+    if   ug_mode == 'extern': dl = _DirectDownloaderMulti()
+    elif ug_mode == 'pooled': dl = _DirectDownloaderSingle()
+    else: raise ValueError, 'Unknown ext mode %s' % ug_mode
+
     cnt = 0
     while True:
         if dl.select():
@@ -2209,10 +2213,12 @@ class _ExternalDownloaderPool:
 
 _async = {}
 
-def parallel_wait(meter = 'text', external = True):
+def parallel_wait(meter = 'text'):
     '''Process queued requests in parallel.
     '''
 
+    if dl_mode == 'compat':
+        return
     if meter:
         count = total = 0
         for limit, queue in _async.values():
@@ -2224,12 +2230,10 @@ def parallel_wait(meter = 'text', external = True):
             meter = TextMultiFileMeter()
         meter.start(count, total)
 
-    if external:
-        if AVOID_CURL_MULTI:
-            dl = _ExternalDownloaderPool()
-        else:
-            dl = _ExternalDownloader()
-    else: dl = _DirectDownloaderMulti()
+    if   dl_mode == 'direct': dl = _DirectDownloaderMulti()
+    elif dl_mode == 'extern': dl = _ExternalDownloader()
+    elif dl_mode == 'pooled': dl = _ExternalDownloaderPool()
+    else: raise ValueError, 'Unknown mode %s' % ug_mode
 
     def start(opts, tries):
         opts.tries = tries
commit 9dfbbe40927c3afcd1f71a2932f39623fed6ae6a
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Tue Dec 6 15:38:47 2011 +0100

    _DirectDownloader{Single,Multi}
    
    Add some more code to _DirectDownloader class and rename it to
    '_DirectDownloaderMulti'.  Add a similiar class with the same API
    that downloads a single file at a time: _DirectDownloaderSingle.
    Downloader then picks the right class at runtime.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 7015d42..1c0b6b4 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1871,12 +1871,23 @@ class _AsyncCurlFile(PyCurlFileObject):
         self.curl_cache[self._host()] = self.curl_obj
         PyCurlFileObject._do_close_fo(self)
 
-class _DirectDownloader:
+class _DirectDownloaderMulti:
     def __init__(self):
         ''' A downloader context.
         '''
         self.running = {}
         self.multi = pycurl.CurlMulti()
+        self.tout = 0
+
+    def select(self):
+        ''' Block for some time.  Return True if 'stdin' readable,
+            False when just internal processing needed.
+        '''
+        fdset = self.multi.fdset()
+        fdset[0].append(0) # stdin
+        fdset = select.select(*(fdset + (self.tout,)))
+        self.tout = min(self.tout * 1.1, 5)
+        return 0 in fdset[0]
 
     def start(self, opts):
         ''' Start download of job 'opts'
@@ -1885,6 +1896,11 @@ class _DirectDownloader:
         self.running[fo.curl_obj] = fo
         self.multi.add_handle(fo.curl_obj)
 
+        # XXX: likely a CurlMulti() bug
+        # fdset() is empty shortly after starting new request.
+        # Do some polling to work this around.
+        self.tout = 10e-3
+
     def perform(self):
         ''' Run downloads, return finished ones.
         '''
@@ -1906,11 +1922,34 @@ class _DirectDownloader:
         return ret
 
     def abort(self):
+        ''' Abort currently active downloads.
+        '''
         while self.running:
             curl, fo = self.running.popitem()
             self.multi.remove_handle(curl)
             fo._do_close_fo()
 
+class _DirectDownloaderSingle:
+    ''' _DirectDownloader downloading one file at a time.
+    '''
+    def select(self):
+        return True
+
+    def start(self, opts):
+        try:
+            fo = PyCurlFileObject(opts.url, opts.filename, opts)
+            fo._do_grab(); _amount_read = fo._amount_read
+            fo.fo.close(); ug_err = None
+        except URLGrabError, ug_err:
+            _amount_read = 0
+        self.result = opts, ug_err, _amount_read
+
+    def perform(self):
+        return [self.result]
+
+    def abort(self):
+        pass
+
 #####################################################################
 #  Serializer + parser: A replacement of the rather bulky Json code.
 #
@@ -2016,36 +2055,12 @@ def download_process():
         - abort on EOF.
     '''
     if AVOID_CURL_MULTI:
-        cnt = 0
-        while True:
-            lines = _readlines(0)
-            if not lines: break
-            for line in lines:
-                cnt += 1
-                opts = URLGrabberOptions()
-                for k in line.split(' '):
-                    k, v = k.split('=', 1)
-                    setattr(opts, k, _loads(v))
-                if opts.progress_obj:
-                    opts.progress_obj = _ProxyProgress()
-                    opts.progress_obj._id = cnt
-                try:
-                    fo = PyCurlFileObject(opts.url, opts.filename, opts)
-                    fo._do_grab(); _amount_read = fo._amount_read
-                    fo.fo.close(); ug_err = 'OK'
-                except URLGrabError, e:
-                    _amount_read = 0
-                    ug_err = '%d %s' % e.args
-                os.write(1, '%d %d %s\n' % (cnt, _amount_read, ug_err))
-        sys.exit(0)
-
-    dl = _DirectDownloader()
-    cnt = tout = 0
+        dl = _DirectDownloaderSingle()
+    else:
+        dl = _DirectDownloaderMulti()
+    cnt = 0
     while True:
-        fdset = dl.multi.fdset()
-        fdset[0].append(0) # stdin
-        fdset = select.select(*(fdset + (tout,)))
-        if 0 in fdset[0]:
+        if dl.select():
             lines = _readlines(0)
             if not lines: break
             for line in lines:
@@ -2061,16 +2076,10 @@ def download_process():
                     opts.progress_obj._id = cnt
                 dl.start(opts)
 
-            # XXX: likely a CurlMulti() bug
-            # fdset() is empty shortly after starting new request.
-            # Do some polling to work this around.
-            tout = 10e-3
-
         # perform requests
         for opts, ug_err, _amount_read in dl.perform():
             ug_err = ug_err and '%d %s' % ug_err.args or 'OK'
             os.write(1, '%d %d %s\n' % (opts._id, _amount_read, ug_err))
-        tout = min(tout * 1.1, 5)
     dl.abort()
     sys.exit(0)
 
@@ -2220,7 +2229,7 @@ def parallel_wait(meter = 'text', external = True):
             dl = _ExternalDownloaderPool()
         else:
             dl = _ExternalDownloader()
-    else: dl = _DirectDownloader()
+    else: dl = _DirectDownloaderMulti()
 
     def start(opts, tries):
         opts.tries = tries
commit 00eced8ba305f4b56e9722378dece87433bda263
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Tue Dec 6 14:26:36 2011 +0100

    Revert last patch - it's not needed.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index c044e2a..7015d42 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -2190,7 +2190,6 @@ class _ExternalDownloaderPool:
         for dl in self.running.values():
             self.epoll.unregister(dl.stdout)
             dl.abort()
-        self.epoll.close()
         for dl in self.cache.values():
             dl.abort()
 


More information about the Yum-commits mailing list