[yum-commits] Branch 'multi-downloader' - 2 commits - urlgrabber/grabber.py urlgrabber/mirror.py

zpavlas at osuosl.org zpavlas at osuosl.org
Wed Dec 14 14:35:09 UTC 2011


 urlgrabber/grabber.py |   19 +++++++++++++------
 urlgrabber/mirror.py  |   22 +++++++++++++++++++---
 2 files changed, 32 insertions(+), 9 deletions(-)

New commits:
commit 7d7d741ad787d8d53bd53d7b98b0e277361a1a5c
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Wed Dec 14 15:29:24 2011 +0100

    max_connections: updated semantics
    
    - mirror's max_connections default changed from 3 to 1.
    
    - added and documented MirrorGroup's max_connections option.
    
    - updated the code that selects initial mirrors for parallel downloads
      to adjust the number of mirrors used, honoring the MG's option.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 2c748b9..bf2338c 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -2177,7 +2177,7 @@ def parallel_wait(meter = 'text'):
                     # use new mirrorchoice
                     key = mirrorchoice['mirror']
                     kwargs = mirrorchoice.get('kwargs', {})
-                    limit = kwargs.get('max_connections') or 3
+                    limit = kwargs.get('max_connections', 1)
                     opts.async = key, limit
                     opts.url = mg._join_url(mirrorchoice['mirror'], gr.url)
 
diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py
index 853790b..da601b7 100644
--- a/urlgrabber/mirror.py
+++ b/urlgrabber/mirror.py
@@ -175,6 +175,13 @@ class MirrorGroup:
                'fail': 0}              # set at instantiation, reset
                                        # from callback
 
+      max_connections
+
+        This option applies to parallel downloading only.  If specified,
+        downloads are evenly distributed to first N mirrors.  N is selected as
+        the smallest number of mirrors such that their total connection limit
+        is at least max_connections.  Retries are not such restricted.
+
       failure_callback
 
         this is a callback that will be called when a mirror "fails",
@@ -263,6 +270,14 @@ class MirrorGroup:
     def _process_kwargs(self, kwargs):
         self.failure_callback = kwargs.get('failure_callback')
         self.default_action   = kwargs.get('default_action')
+        if 'max_connections' in kwargs:
+            total = count = 0
+            for mirror in self.mirrors:
+                count += 1
+                total += mirror.get('kwargs', {}).get('max_connections', 1)
+                if total >= kwargs['max_connections']:
+                    break
+            self._spread = count
        
     def _parse_mirrors(self, mirrors):
         parsed_mirrors = []
@@ -398,13 +413,14 @@ class MirrorGroup:
             if kw.get('async'):
                 # 'async' option to the 1st mirror
                 key = mirrorchoice['mirror']
-                limit = kwargs.get('max_connections') or 3
+                limit = kwargs.get('max_connections', 1)
                 kwargs['async'] = key, limit
                 # async code iterates mirrors and calls failfunc
                 kwargs['mirror_group'] = self, gr, mirrorchoice
                 kwargs['failfunc'] = gr.kw.get('failfunc', _do_raise)
-                # increment master
-                self._next = (self._next + 1) % min(len(self.mirrors), 5)
+                if hasattr(self, '_spread'):
+                    # increment the master mirror index
+                    self._next = (self._next + 1) % self._spread
             try:
                 return func_ref( *(fullurl,), **kwargs )
             except URLGrabError, e:
commit 14f2ae5f4227dda30d45050d2975cb9b2e9209f5
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Wed Dec 14 14:24:08 2011 +0100

    Fix ctrl-c tracebacks from downloader processes
    
    - handle broken pipe case
    
    - handle sigint early, so no KeyboardInterrupt
      inside pycurl callbacks that can't be caught.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 1bd7c70..2c748b9 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1926,7 +1926,7 @@ class _ProxyProgress:
         t = time.time()
         if t < self.next_update: return
         self.next_update = t + 0.31
-        os.write(1, '%d %d\n' % (self._id, _amount_read))
+        _write('%d %d\n', self._id, _amount_read)
 
 def _readlines(fd):
     buf = os.read(fd, 4096)
@@ -1936,12 +1936,21 @@ def _readlines(fd):
         buf += os.read(fd, 4096)
     return buf[:-1].split('\n')
 
+def _write(fmt, *arg):
+    try: os.write(1, fmt % arg)
+    except OSError, e:
+        if e.arg[0] != errno.EPIPE: raise
+        sys.exit(1)
+
 def download_process():
     ''' Download process
         - watch stdin for new requests, parse & issue em.
         - use ProxyProgress to send _amount_read during dl.
         - abort on EOF.
     '''
+    import signal
+    signal.signal(signal.SIGINT, lambda n, f: sys.exit(1))
+
     cnt = 0
     while True:
         lines = _readlines(0)
@@ -1963,7 +1972,7 @@ def download_process():
             except URLGrabError, e:
                 _amount_read = 0
                 ug_err = '%d %s' % e.args
-            os.write(1, '%d %d %s\n' % (opts._id, _amount_read, ug_err))
+            _write('%d %d %s\n', opts._id, _amount_read, ug_err)
     sys.exit(0)
 
 import subprocess
@@ -2305,9 +2314,7 @@ def _test_file_object_readlines(wrapper, fo_output):
 
 if __name__ == '__main__':
     if sys.argv[1:] == ['DOWNLOADER']:
-        try: download_process()
-        except KeyboardInterrupt:
-            raise SystemExit # no traceback
+        download_process()
 
     _main_test()
     _retry_test()


More information about the Yum-commits mailing list