[yum-commits] 40 commits - scripts/urlgrabber-ext-down setup.py urlgrabber/grabber.py urlgrabber/mirror.py urlgrabber/progress.py

zpavlas at osuosl.org zpavlas at osuosl.org
Thu Jun 7 15:48:41 UTC 2012


 scripts/urlgrabber-ext-down |   55 +++
 setup.py                    |    6 
 urlgrabber/grabber.py       |  619 ++++++++++++++++++++++++++++++++++++++------
 urlgrabber/mirror.py        |   23 +
 urlgrabber/progress.py      |   10 
 5 files changed, 627 insertions(+), 86 deletions(-)

New commits:
commit eafe1f0ea15818c125672f8dd3fe662996764ad9
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Tue May 22 09:20:09 2012 +0200

    lower the default host limit from 3 to 2 simultaneous connections

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 73e14aa..c5470b1 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -2229,7 +2229,7 @@ def parallel_wait(meter = 'text'):
 
                 # update the current mirror and limit
                 key = best['mirror']
-                limit = best.get('kwargs', {}).get('max_connections', 3)
+                limit = best.get('kwargs', {}).get('max_connections', 2)
                 opts.async = key, limit
 
                 # update URL and proxy
commit dc03a090ed83637a949b3c91c22e4ea6d2a7485c
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Mon May 21 09:06:13 2012 +0200

    timedhosts: sanity check on dl_time
    
    - handle the dl_time <= 0 case
    
    - relative validity of calculated speed now depends
      on dl_time instead of dl_size.  (that's where the
      random error is)

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index be85f92..73e14aa 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -2301,11 +2301,12 @@ class _TH:
 
         if ug_err is None:
             # k1: the older, the less useful
-            # k2: if it was <1MiB, don't trust it much
+            # k2: <500ms readings are less reliable
             # speeds vary, use 10:1 smoothing
             k1 = 2**((ts - now) / default_grabber.opts.half_life)
-            k2 = min(dl_size / 1e6, 1.0) / 10
-            speed = (k1 * speed + k2 * dl_size / dl_time) / (k1 + k2)
+            k2 = min(dl_time / .500, 1.0) / 10
+            if k2 > 0:
+                speed = (k1 * speed + k2 * dl_size / dl_time) / (k1 + k2)
             fail = 0
         elif getattr(ug_err, 'code', None) == 404:
             fail = 0 # alive, at least
commit 6356ea626711773bccbc11038c8d3e9aada13f4b
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri May 18 15:38:44 2012 +0200

    timedhosts: fix file:// profiling.  BZ 822632.
    
    - Do not profile absolute file:// URLs.
    - Give a hint to _TH.update() which baseurl was used
      so we may profile file:// mirrors, too.
    - Strip username and password from stored hostnames.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 3b0e238..be85f92 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -2060,7 +2060,7 @@ class _ExternalDownloader:
             else:
                 ug_err = URLGrabError(int(line[4]), line[5])
                 if DEBUG: DEBUG.info('failure: %s', err)
-            _TH.update(opts.url, int(line[2]), float(line[3]), ug_err)
+            _TH.update(opts.url, int(line[2]), float(line[3]), ug_err, opts.async[0])
             ret.append((opts, size, ug_err))
         return ret
 
@@ -2288,12 +2288,14 @@ class _TH:
             _TH.dirty = False
 
     @staticmethod
-    def update(url, dl_size, dl_time, ug_err):
+    def update(url, dl_size, dl_time, ug_err, baseurl=None):
         _TH.load()
-        host = urlparse.urlsplit(url).netloc
-        if not host or ' ' in host:
-            if DEBUG: DEBUG.warn('malformed url: %s', repr(url))
-            return
+
+        # Use hostname from URL.  If it's a file:// URL, use baseurl.
+        # If no baseurl, do not update timedhosts.
+        host = urlparse.urlsplit(url).netloc.split('@')[-1] or baseurl
+        if not host: return
+
         speed, fail, ts = _TH.hosts.get(host) or (0, 0, 0)
         now = time.time()
 
@@ -2314,9 +2316,12 @@ class _TH:
         _TH.dirty = True
 
     @staticmethod
-    def estimate(url):
+    def estimate(baseurl):
         _TH.load()
-        host = urlparse.urlsplit(url).netloc
+
+        # Use just the hostname, unless it's a file:// baseurl.
+        host = urlparse.urlsplit(baseurl).netloc.split('@')[-1] or baseurl
+
         default_speed = default_grabber.opts.default_speed
         try: speed, fail, ts = _TH.hosts[host]
         except KeyError: return default_speed
commit 5c4b8e51c25dae7f9f4b7159fefc56e68923c061
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri May 18 10:37:04 2012 +0200

    URL sanity checking.  BZ 822632
    
    Ignore bogus hostnames, and make parsing of timedhosts file
    a bit more robust.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 094be77..3b0e238 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -2268,7 +2268,7 @@ class _TH:
         if filename and _TH.dirty is None:
             try:
                 for line in open(filename):
-                    host, speed, fail, ts = line.split()
+                    host, speed, fail, ts = line.split(' ', 3)
                     _TH.hosts[host] = int(speed), int(fail), int(ts)
             except IOError: pass
             _TH.dirty = False
@@ -2291,6 +2291,9 @@ class _TH:
     def update(url, dl_size, dl_time, ug_err):
         _TH.load()
         host = urlparse.urlsplit(url).netloc
+        if not host or ' ' in host:
+            if DEBUG: DEBUG.warn('malformed url: %s', repr(url))
+            return
         speed, fail, ts = _TH.hosts.get(host) or (0, 0, 0)
         now = time.time()
 
commit 4143ea11ab03b9efd646e63f2c943a66dd953688
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri Mar 23 16:24:29 2012 +0100

    yum compatibility: raise KeyboardInterrupt instead of IOError(EINTR)

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 25c322c..094be77 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -2243,6 +2243,9 @@ def parallel_wait(meter = 'text'):
             while host_con.get(key, 0) >= limit:
                 perform()
             start(opts, 1)
+    except IOError, e:
+        if e.errno != 4: raise
+        raise KeyboardInterrupt
 
     finally:
         dl.abort()
commit 40d65866afe1384aebf5d761debffe02a01a769d
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Thu Mar 22 14:20:17 2012 +0100

    Parallel downloader: parse URL and update proxy

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 2e37c08..25c322c 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -923,6 +923,7 @@ class URLGrabberOptions:
         self.keepalive = 1
         self.proxies = None
         self.libproxy = False
+        self.proxy = None
         self.reget = None
         self.failure_callback = None
         self.interrupt_callback = None
@@ -2226,11 +2227,16 @@ def parallel_wait(meter = 'text'):
                     _run_callback(opts.failfunc, opts)
                     continue
 
-                # update the request
+                # update the current mirror and limit
                 key = best['mirror']
                 limit = best.get('kwargs', {}).get('max_connections', 3)
                 opts.async = key, limit
-                opts.url = mg._join_url(key, opts.relative_url)
+
+                # update URL and proxy
+                url = mg._join_url(key, opts.relative_url)
+                url, parts = opts.urlparser.parse(url, opts)
+                opts.find_proxy(url, parts[0])
+                opts.url = url
 
             # check host limit, then start
             key, limit = opts.async
commit dcf4877de879439902ac1f104324b45c7fb0fd6e
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri Mar 16 15:19:36 2012 +0100

    Remove "proxies" and "quote" options in downloader
    (already handled in parent process). Add a new option 'proxy'.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 0277399..2e37c08 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -2009,8 +2009,7 @@ class _ExternalDownloader:
         'timeout', 'close_connection', 'keepalive',
         'throttle', 'bandwidth', 'range', 'reget',
         'user_agent', 'http_headers', 'ftp_headers',
-        'proxies', 'prefix', 'quote',
-        'username', 'password',
+        'proxy', 'prefix', 'username', 'password',
         'ssl_ca_cert',
         'ssl_cert', 'ssl_cert_type',
         'ssl_key', 'ssl_key_type',
commit d2aefa5c79475ccdd921b1db4db9d370e315cd44
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri Mar 16 15:16:00 2012 +0100

    Revert "Add 'dict' support to serializer."
    
    This reverts commit a4abdabf71451e03ea4a5f7b7ba3661cca129892.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 6c0f6cc..0277399 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1912,7 +1912,7 @@ def retrygrab(url, filename=None, copy_local=0, close_connection=0,
 #####################################################################
 
 _quoter_map = {}
-for c in '%{[(,:)]} \n':
+for c in '%[(,)] \n':
     _quoter_map[c] = '%%%02x' % ord(c)
 del c
 
@@ -1931,10 +1931,6 @@ def _dumps(v):
         return "(%s)" % ','.join(map(_dumps, v))
     if type(v) == list:
         return "[%s]" % ','.join(map(_dumps, v))
-    if type(v) == dict:
-        def keyval(k):
-            return '%s:%s' % (_dumps(k), _dumps(v[k]))
-        return "{%s}" % ','.join(map(keyval, sorted(v)))
     raise TypeError, 'Can\'t serialize %s' % v
 
 def _loads(s):
@@ -1960,19 +1956,17 @@ def _loads(s):
     l = []
     i = j = 0
     while True:
-        if j == len(s) or s[j] in ',:)]}':
+        if j == len(s) or s[j] in ',)]':
             if j > i:
                 l.append(decode(s[i:j]))
             if j == len(s): break
-            if s[j] in ')]}':
+            if s[j] in ')]':
                 if s[j] == ')':
                     l = tuple(l)
-                elif s[j] == '}':
-                    l = dict(zip(l[::2], l[1::2]))
                 stk[0].append(l)
                 l, stk = stk
             i = j = j + 1
-        elif s[j] in '{[(':
+        elif s[j] in '[(':
             stk = l, stk
             l = []
             i = j = j + 1
commit b66f850e883c1817bc98d647286105e6c5a2896f
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Wed Feb 29 17:54:03 2012 +0100

    Add 'dict' support to serializer.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 0277399..6c0f6cc 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1912,7 +1912,7 @@ def retrygrab(url, filename=None, copy_local=0, close_connection=0,
 #####################################################################
 
 _quoter_map = {}
-for c in '%[(,)] \n':
+for c in '%{[(,:)]} \n':
     _quoter_map[c] = '%%%02x' % ord(c)
 del c
 
@@ -1931,6 +1931,10 @@ def _dumps(v):
         return "(%s)" % ','.join(map(_dumps, v))
     if type(v) == list:
         return "[%s]" % ','.join(map(_dumps, v))
+    if type(v) == dict:
+        def keyval(k):
+            return '%s:%s' % (_dumps(k), _dumps(v[k]))
+        return "{%s}" % ','.join(map(keyval, sorted(v)))
     raise TypeError, 'Can\'t serialize %s' % v
 
 def _loads(s):
@@ -1956,17 +1960,19 @@ def _loads(s):
     l = []
     i = j = 0
     while True:
-        if j == len(s) or s[j] in ',)]':
+        if j == len(s) or s[j] in ',:)]}':
             if j > i:
                 l.append(decode(s[i:j]))
             if j == len(s): break
-            if s[j] in ')]':
+            if s[j] in ')]}':
                 if s[j] == ')':
                     l = tuple(l)
+                elif s[j] == '}':
+                    l = dict(zip(l[::2], l[1::2]))
                 stk[0].append(l)
                 l, stk = stk
             i = j = j + 1
-        elif s[j] in '[(':
+        elif s[j] in '{[(':
             stk = l, stk
             l = []
             i = j = j + 1
commit cbf74916f68aa9bc355220288cc8507fc01bd086
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Wed Feb 15 16:17:05 2012 +0100

    HTTP error code is optional

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 4d42c98..0277399 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -2294,7 +2294,7 @@ class _TH:
             k2 = min(dl_size / 1e6, 1.0) / 10
             speed = (k1 * speed + k2 * dl_size / dl_time) / (k1 + k2)
             fail = 0
-        elif ug_err.code == 404:
+        elif getattr(ug_err, 'code', None) == 404:
             fail = 0 # alive, at least
         else:
             fail += 1 # seems dead
commit 98263856289767c612865f3b862eb11ab0acd18c
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Wed Feb 1 16:33:20 2012 +0100

    prevent a race on timedhosts file

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index bc22b94..4d42c98 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -2269,10 +2269,13 @@ class _TH:
     def save():
         filename = default_grabber.opts.timedhosts
         if filename and _TH.dirty is True:
+            tmp = '%s.%d' % (filename, os.getpid())
             try:
-                write = open(filename, 'w').write
+                f = open(tmp, 'w')
                 for host in _TH.hosts:
-                    write(host + ' %d %d %d\n' % _TH.hosts[host])
+                    f.write(host + ' %d %d %d\n' % _TH.hosts[host])
+                f.close()
+                os.rename(tmp, filename)
             except IOError: pass
             _TH.dirty = False
 
commit f4a50455ffffcdac2582ee3f179653791ec0f3bc
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Tue Jan 31 12:20:07 2012 +0100

    code reformat

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index aba1c7a..bc22b94 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -2167,7 +2167,8 @@ def parallel_wait(meter = 'text'):
             if opts.failure_callback:
                 opts.exception = ug_err
                 try: _run_callback(opts.failure_callback, opts)
-                except URLGrabError, ug_err: retry = 0 # no retries
+                except URLGrabError, ug_err:
+                    retry = 0 # no retries
             if opts.tries < retry and ug_err.args[0] in opts.retrycodes:
                 start(opts, opts.tries + 1) # simple retry
                 continue
@@ -2191,17 +2192,25 @@ def parallel_wait(meter = 'text'):
         idx = 0
         while True:
             if idx >= len(_async_queue):
+                # the queue is empty
                 if not dl.running: break
-                perform(); continue
+                # pending dl may extend it
+                perform()
+                continue
+
+            # handle next request
+            opts = _async_queue[idx]
+            idx += 1
 
             # check global limit
-            opts = _async_queue[idx]; idx += 1
-            limit = opts.max_connections
-            while len(dl.running) >= limit: perform()
+            while len(dl.running) >= opts.max_connections:
+                perform()
 
             if opts.mirror_group:
-                best = None
                 mg, failed = opts.mirror_group
+
+                # find the best mirror
+                best = None
                 for mirror in mg.mirrors:
                     key = mirror['mirror']
                     if key in failed: continue
@@ -2210,7 +2219,8 @@ def parallel_wait(meter = 'text'):
                     speed = _TH.estimate(key)
                     speed /= 1 + host_con.get(key, 0)
                     if best is None or speed > best_speed:
-                        best, best_speed = mirror, speed
+                        best = mirror
+                        best_speed = speed
 
                 if best is None:
                     opts.exception = URLGrabError(256, _('No more mirrors to try.'))
@@ -2225,7 +2235,8 @@ def parallel_wait(meter = 'text'):
 
             # check host limit, then start
             key, limit = opts.async
-            while host_con.get(key, 0) >= limit: perform()
+            while host_con.get(key, 0) >= limit:
+                perform()
             start(opts, 1)
 
     finally:
commit ab35f3ddc6139496c44d00d79bb0a09d35c4ea47
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Tue Jan 31 11:57:15 2012 +0100

    'timedhosts' option updates
    
    Global symbols now in a private _TH class.
    HTTP error 404 is not really a host failure.
    Added a new method estimate(url).

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 3e1febd..aba1c7a 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1112,7 +1112,7 @@ class URLGrabber(object):
             fo = PyCurlFileObject(url, filename, opts)
             try:
                 fo._do_grab()
-                timedhosts(url, fo._amount_read - fo._reget_length, time.time() - tm, None)
+                _TH.update(url, fo._amount_read - fo._reget_length, time.time() - tm, None)
                 if not opts.checkfunc is None:
                     obj = CallbackObject(filename=filename, url=url)
                     _run_callback(opts.checkfunc, obj)
@@ -1123,7 +1123,7 @@ class URLGrabber(object):
         try:
             return self._retry(opts, retryfunc, url, filename)
         except URLGrabError, e:
-            timedhosts(url, 0, 0, e)
+            _TH.update(url, 0, 0, e)
             opts.exception = e
             return _run_callback(opts.failfunc, opts)
     
@@ -2060,7 +2060,7 @@ class _ExternalDownloader:
             else:
                 ug_err = URLGrabError(int(line[4]), line[5])
                 if DEBUG: DEBUG.info('failure: %s', err)
-            timedhosts(opts.url, int(line[2]), float(line[3]), ug_err)
+            _TH.update(opts.url, int(line[2]), float(line[3]), ug_err)
             ret.append((opts, size, ug_err))
         return ret
 
@@ -2189,7 +2189,6 @@ def parallel_wait(meter = 'text'):
 
     try:
         idx = 0
-        th_load()
         while True:
             if idx >= len(_async_queue):
                 if not dl.running: break
@@ -2208,14 +2207,7 @@ def parallel_wait(meter = 'text'):
                     if key in failed: continue
 
                     # estimate mirror speed
-                    host = urlparse.urlsplit(key).netloc
-                    if host in th_dict:
-                        speed, fail, ts = th_dict[host]
-                        speed *= 2**-fail
-                        k = 2**((ts - time.time()) / opts.half_life)
-                    else:
-                        speed = k = 0
-                    speed = k * speed + (1 - k) * opts.default_speed
+                    speed = _TH.estimate(key)
                     speed /= 1 + host_con.get(key, 0)
                     if best is None or speed > best_speed:
                         best, best_speed = mirror, speed
@@ -2240,58 +2232,74 @@ def parallel_wait(meter = 'text'):
         dl.abort()
         if meter: meter.end()
         del _async_queue[:]
-        th_save()
+        _TH.save()
 
 
 #####################################################################
 #  Host bandwidth estimation
 #####################################################################
 
-th_dict = {}
+class _TH:
+    hosts = {}
+    dirty = None
 
-def th_load():
-    filename = default_grabber.opts.timedhosts
-    if filename and '_dirty' not in th_dict:
-        try:
-            for line in open(filename):
-                host, speed, fail, ts = line.split()
-                th_dict[host] = float(speed), int(fail), int(ts)
-        except IOError: pass
-        th_dict['_dirty'] = False
-
-def th_save():
-    filename = default_grabber.opts.timedhosts
-    if filename and th_dict['_dirty'] is True:
-        del th_dict['_dirty']
-        try:
-            write = open(default_grabber.opts.timedhosts, 'w').write
-            for host in th_dict:
-                write(host + ' %d %d %d\n' % th_dict[host])
-        except IOError: pass
-        th_dict['_dirty'] = False
-
-def timedhosts(url, dl_size, dl_time, ug_err):
-    ''' Called when download finishes '''
-
-    th_load()
-    host = urlparse.urlsplit(url).netloc
-    speed, fail, ts = th_dict.get(host) or \
-        (default_grabber.opts.default_speed, 1, 0)
-    now = time.time()
-
-    if ug_err is None:
-        # k1: the older, the less useful
-        # k2: if it was <1MB, don't trust it much
-        # speeds vary a lot, use 10:1 smoothing
-        k1 = 2**((ts - now) / default_grabber.opts.half_life)
-        k2 = min(dl_size / 1e6, 1) / 10.0
-        speed = (k1 * speed + k2 * dl_size / dl_time) / (k1 + k2)
-        fail = 0
-    else:
-        fail += 1
-
-    th_dict[host] = speed, fail, int(now)
-    th_dict['_dirty'] = True
+    @staticmethod
+    def load():
+        filename = default_grabber.opts.timedhosts
+        if filename and _TH.dirty is None:
+            try:
+                for line in open(filename):
+                    host, speed, fail, ts = line.split()
+                    _TH.hosts[host] = int(speed), int(fail), int(ts)
+            except IOError: pass
+            _TH.dirty = False
+
+    @staticmethod
+    def save():
+        filename = default_grabber.opts.timedhosts
+        if filename and _TH.dirty is True:
+            try:
+                write = open(filename, 'w').write
+                for host in _TH.hosts:
+                    write(host + ' %d %d %d\n' % _TH.hosts[host])
+            except IOError: pass
+            _TH.dirty = False
+
+    @staticmethod
+    def update(url, dl_size, dl_time, ug_err):
+        _TH.load()
+        host = urlparse.urlsplit(url).netloc
+        speed, fail, ts = _TH.hosts.get(host) or (0, 0, 0)
+        now = time.time()
+
+        if ug_err is None:
+            # k1: the older, the less useful
+            # k2: if it was <1MiB, don't trust it much
+            # speeds vary, use 10:1 smoothing
+            k1 = 2**((ts - now) / default_grabber.opts.half_life)
+            k2 = min(dl_size / 1e6, 1.0) / 10
+            speed = (k1 * speed + k2 * dl_size / dl_time) / (k1 + k2)
+            fail = 0
+        elif ug_err.code == 404:
+            fail = 0 # alive, at least
+        else:
+            fail += 1 # seems dead
+
+        _TH.hosts[host] = speed, fail, now
+        _TH.dirty = True
+
+    @staticmethod
+    def estimate(url):
+        _TH.load()
+        host = urlparse.urlsplit(url).netloc
+        default_speed = default_grabber.opts.default_speed
+        try: speed, fail, ts = _TH.hosts[host]
+        except KeyError: return default_speed
+
+        speed *= 2**-fail
+        k = 2**((ts - time.time()) / default_grabber.opts.half_life)
+        speed = k * speed + (1 - k) * default_speed
+        return speed
 
 #####################################################################
 #  TESTING
commit 9e52317722906703e8eb0f9578cb45cfeb035616
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Thu Jan 26 15:08:01 2012 +0100

    timedhosts option, smarter mirror selection
    
    Added the 'timedhosts' option.
    
    (speed, failures, timestamp) tuple is assigned to each host
    we've downloaded from.  Successful downloads update 'speed'
    and reset the failure count to zero.  Failures increment
    the failure counter.  Speed update code:
    
    - assigns lower weight to old information
    - <1MB downloads are assumed not to be 100% accurate
    - simple 10:1 filtering
    
    Updated the mirror selection code.  Try to find the best one,
    using the above mentioned information.
    
    - every failure in a row halves the speed estimate
    - unknown or too old mirrors regress to default speed
    - pending downloads are assumed to eat bandwidth

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index fb167a6..3e1febd 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -273,6 +273,23 @@ GENERAL ARGUMENTS (kwargs)
 
     The global connection limit.
 
+  timedhosts
+
+    The filename of the host download statistics.  If defined, urlgrabber
+    will update the stats at the end of every download.  At the end of
+    parallel_wait(), the updated stats are saved.  If synchronous grabs
+    are used, you should call th_save().
+
+  default_speed, half_life
+
+    These options only affect the async mirror selection code.
+    The default_speed option sets the speed estimate for mirrors
+    we have never downloaded from, and defaults to 1 MBps.
+
+    The speed estimate also drifts exponentially from the speed
+    actually measured to the default speed, with default
+    period of 30 days.
+
 
 RETRY RELATED ARGUMENTS
 
@@ -936,6 +953,9 @@ class URLGrabberOptions:
         self.async = None # blocking by default
         self.mirror_group = None
         self.max_connections = 5
+        self.timedhosts = None
+        self.half_life = 30*24*60*60 # 30 days
+        self.default_speed = 1e6 # 1 MBit
         
     def __repr__(self):
         return self.format()
@@ -2169,6 +2189,7 @@ def parallel_wait(meter = 'text'):
 
     try:
         idx = 0
+        th_load()
         while True:
             if idx >= len(_async_queue):
                 if not dl.running: break
@@ -2180,25 +2201,33 @@ def parallel_wait(meter = 'text'):
             while len(dl.running) >= limit: perform()
 
             if opts.mirror_group:
-                first = None
+                best = None
                 mg, failed = opts.mirror_group
                 for mirror in mg.mirrors:
                     key = mirror['mirror']
                     if key in failed: continue
-                    if not first: first = mirror
-                    limit = mirror.get('kwargs', {}).get('max_connections', 3)
-                    if host_con.get(key, 0) < limit: break
-                else:
-                    # no mirror with free slots.
-                    if not first:
-                        opts.exception = URLGrabError(256, _('No more mirrors to try.'))
-                        _run_callback(opts.failfunc, opts)
-                        continue
-                    mirror = first # fallback
-                    key = mirror['mirror']
-                    limit = mirror.get('kwargs', {}).get('max_connections', 3)
+
+                    # estimate mirror speed
+                    host = urlparse.urlsplit(key).netloc
+                    if host in th_dict:
+                        speed, fail, ts = th_dict[host]
+                        speed *= 2**-fail
+                        k = 2**((ts - time.time()) / opts.half_life)
+                    else:
+                        speed = k = 0
+                    speed = k * speed + (1 - k) * opts.default_speed
+                    speed /= 1 + host_con.get(key, 0)
+                    if best is None or speed > best_speed:
+                        best, best_speed = mirror, speed
+
+                if best is None:
+                    opts.exception = URLGrabError(256, _('No more mirrors to try.'))
+                    _run_callback(opts.failfunc, opts)
+                    continue
 
                 # update the request
+                key = best['mirror']
+                limit = best.get('kwargs', {}).get('max_connections', 3)
                 opts.async = key, limit
                 opts.url = mg._join_url(key, opts.relative_url)
 
@@ -2211,15 +2240,59 @@ def parallel_wait(meter = 'text'):
         dl.abort()
         if meter: meter.end()
         del _async_queue[:]
+        th_save()
 
 
 #####################################################################
 #  Host bandwidth estimation
 #####################################################################
 
-def timedhosts(url, dl_size, dl_time, eg_err):
+th_dict = {}
+
+def th_load():
+    filename = default_grabber.opts.timedhosts
+    if filename and '_dirty' not in th_dict:
+        try:
+            for line in open(filename):
+                host, speed, fail, ts = line.split()
+                th_dict[host] = float(speed), int(fail), int(ts)
+        except IOError: pass
+        th_dict['_dirty'] = False
+
+def th_save():
+    filename = default_grabber.opts.timedhosts
+    if filename and th_dict['_dirty'] is True:
+        del th_dict['_dirty']
+        try:
+            write = open(default_grabber.opts.timedhosts, 'w').write
+            for host in th_dict:
+                write(host + ' %d %d %d\n' % th_dict[host])
+        except IOError: pass
+        th_dict['_dirty'] = False
+
+def timedhosts(url, dl_size, dl_time, ug_err):
     ''' Called when download finishes '''
 
+    th_load()
+    host = urlparse.urlsplit(url).netloc
+    speed, fail, ts = th_dict.get(host) or \
+        (default_grabber.opts.default_speed, 1, 0)
+    now = time.time()
+
+    if ug_err is None:
+        # k1: the older, the less useful
+        # k2: if it was <1MB, don't trust it much
+        # speeds vary a lot, use 10:1 smoothing
+        k1 = 2**((ts - now) / default_grabber.opts.half_life)
+        k2 = min(dl_size / 1e6, 1) / 10.0
+        speed = (k1 * speed + k2 * dl_size / dl_time) / (k1 + k2)
+        fail = 0
+    else:
+        fail += 1
+
+    th_dict[host] = speed, fail, int(now)
+    th_dict['_dirty'] = True
+
 #####################################################################
 #  TESTING
 def _main_test():
commit 5cd314928d5ecc8f58b4bbd00efaacb333367e74
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Wed Jan 25 16:06:32 2012 +0100

    Move the downloader code to /usr/libexec/urlgrabber-ext-down.
    
    This is much cleaner.  Also exec() the script instead of python,
    to make the api lang-neutral, and more selinux friendly.

diff --git a/scripts/urlgrabber-ext-down b/scripts/urlgrabber-ext-down
new file mode 100755
index 0000000..c37e6a8
--- /dev/null
+++ b/scripts/urlgrabber-ext-down
@@ -0,0 +1,55 @@
+#! /usr/bin/python
+#  A very simple external downloader
+
+import time, os, errno, sys
+from urlgrabber.grabber import \
+    _readlines, URLGrabberOptions, _loads, \
+    PyCurlFileObject, URLGrabError
+
+def write(fmt, *arg):
+    try: os.write(1, fmt % arg)
+    except OSError, e:
+        if e.arg[0] != errno.EPIPE: raise
+        sys.exit(1)
+
+class ProxyProgress:
+    def start(self, *d1, **d2):
+        self.next_update = 0
+    def update(self, _amount_read):
+        t = time.time()
+        if t < self.next_update: return
+        self.next_update = t + 0.31
+        write('%d %d\n', self._id, _amount_read)
+
+def main():
+    import signal
+    signal.signal(signal.SIGINT, lambda n, f: sys.exit(1))
+    cnt = 0
+    while True:
+        lines = _readlines(0)
+        if not lines: break
+        for line in lines:
+            cnt += 1
+            opts = URLGrabberOptions()
+            opts._id = cnt
+            for k in line.split(' '):
+                k, v = k.split('=', 1)
+                setattr(opts, k, _loads(v))
+            if opts.progress_obj:
+                opts.progress_obj = ProxyProgress()
+                opts.progress_obj._id = cnt
+            tm = time.time()
+            try:
+                fo = PyCurlFileObject(opts.url, opts.filename, opts)
+                fo._do_grab()
+                fo.fo.close()
+                size = fo._amount_read
+                dlsz = size - fo._reget_length
+                ug_err = 'OK'
+            except URLGrabError, e:
+                size = dlsz = 0
+                ug_err = '%d %s' % e.args
+            write('%d %d %d %.3f %s\n', opts._id, size, dlsz, time.time() - tm, ug_err)
+
+if __name__ == '__main__':
+    main()
diff --git a/setup.py b/setup.py
index d0b87b8..bfa4a18 100644
--- a/setup.py
+++ b/setup.py
@@ -15,8 +15,10 @@ url = _urlgrabber.__url__
 packages = ['urlgrabber']
 package_dir = {'urlgrabber':'urlgrabber'}
 scripts = ['scripts/urlgrabber']
-data_files = [('share/doc/' + name + '-' + version,
-               ['README','LICENSE', 'TODO', 'ChangeLog'])]
+data_files = [
+    ('share/doc/' + name + '-' + version, ['README','LICENSE', 'TODO', 'ChangeLog']),
+    ('libexec', ['scripts/urlgrabber-ext-down']),
+]
 options = { 'clean' : { 'all' : 1 } }
 classifiers = [
         'Development Status :: 4 - Beta',
diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 607cbd5..fb167a6 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -463,7 +463,7 @@ import pycurl
 from ftplib import parse150
 from StringIO import StringIO
 from httplib import HTTPException
-import socket, select, errno, fcntl
+import socket, select, fcntl
 from byterange import range_tuple_normalize, range_tuple_to_header, RangeError
 
 try:
@@ -1961,15 +1961,6 @@ def _loads(s):
 #  External downloader process
 #####################################################################
 
-class _ProxyProgress:
-    def start(self, *d1, **d2):
-        self.next_update = 0
-    def update(self, _amount_read):
-        t = time.time()
-        if t < self.next_update: return
-        self.next_update = t + 0.31
-        _write('%d %d\n', self._id, _amount_read)
-
 def _readlines(fd):
     buf = os.read(fd, 4096)
     if not buf: return None
@@ -1978,55 +1969,12 @@ def _readlines(fd):
         buf += os.read(fd, 4096)
     return buf[:-1].split('\n')
 
-def _write(fmt, *arg):
-    try: os.write(1, fmt % arg)
-    except OSError, e:
-        if e.arg[0] != errno.EPIPE: raise
-        sys.exit(1)
-
-def download_process():
-    ''' Download process
-        - watch stdin for new requests, parse & issue em.
-        - use ProxyProgress to send _amount_read during dl.
-        - abort on EOF.
-    '''
-    import signal
-    signal.signal(signal.SIGINT, lambda n, f: sys.exit(1))
-
-    cnt = 0
-    while True:
-        lines = _readlines(0)
-        if not lines: break
-        for line in lines:
-            cnt += 1
-            opts = URLGrabberOptions()
-            opts._id = cnt
-            for k in line.split(' '):
-                k, v = k.split('=', 1)
-                setattr(opts, k, _loads(v))
-            if opts.progress_obj:
-                opts.progress_obj = _ProxyProgress()
-                opts.progress_obj._id = cnt
-            tm = time.time()
-            try:
-                fo = PyCurlFileObject(opts.url, opts.filename, opts)
-                fo._do_grab()
-                fo.fo.close()
-                size = fo._amount_read
-                dlsz = size - fo._reget_length
-                ug_err = 'OK'
-            except URLGrabError, e:
-                size = dlsz = 0
-                ug_err = '%d %s' % e.args
-            _write('%d %d %d %.3f %s\n', opts._id, size, dlsz, time.time() - tm, ug_err)
-    sys.exit(0)
-
 import subprocess
 
 class _ExternalDownloader:
     def __init__(self):
         self.popen = subprocess.Popen(
-            ['/usr/bin/python', __file__, 'DOWNLOADER'],
+            '/usr/libexec/urlgrabber-ext-down',
             stdin = subprocess.PIPE,
             stdout = subprocess.PIPE,
         )
@@ -2377,9 +2325,6 @@ def _test_file_object_readlines(wrapper, fo_output):
     fo_output.write(string.join(li, ''))
 
 if __name__ == '__main__':
-    if sys.argv[1:] == ['DOWNLOADER']:
-        download_process()
-
     _main_test()
     _retry_test()
     _file_object_test('test')
commit b840f9ad78b28995a1e3d077d77ef3e1485db91f
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Tue Jan 24 15:44:35 2012 +0100

    Add internal 'timedhosts' callback.
    
    Gets called when a download (both sync and async ones) finishes.
    May be used to estimate mirror speeds and update their priority.
    
    - true download size is reported
    - checkfunc callback is not timed

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 5c75088..607cbd5 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1088,9 +1088,11 @@ class URLGrabber(object):
             return filename
 
         def retryfunc(opts, url, filename):
+            tm = time.time()
             fo = PyCurlFileObject(url, filename, opts)
             try:
                 fo._do_grab()
+                timedhosts(url, fo._amount_read - fo._reget_length, time.time() - tm, None)
                 if not opts.checkfunc is None:
                     obj = CallbackObject(filename=filename, url=url)
                     _run_callback(opts.checkfunc, obj)
@@ -1101,6 +1103,7 @@ class URLGrabber(object):
         try:
             return self._retry(opts, retryfunc, url, filename)
         except URLGrabError, e:
+            timedhosts(url, 0, 0, e)
             opts.exception = e
             return _run_callback(opts.failfunc, opts)
     
@@ -2004,16 +2007,18 @@ def download_process():
             if opts.progress_obj:
                 opts.progress_obj = _ProxyProgress()
                 opts.progress_obj._id = cnt
+            tm = time.time()
             try:
                 fo = PyCurlFileObject(opts.url, opts.filename, opts)
                 fo._do_grab()
                 fo.fo.close()
                 size = fo._amount_read
+                dlsz = size - fo._reget_length
                 ug_err = 'OK'
             except URLGrabError, e:
-                size = 0
+                size = dlsz = 0
                 ug_err = '%d %s' % e.args
-            _write('%d %d %s\n', opts._id, size, ug_err)
+            _write('%d %d %d %.3f %s\n', opts._id, size, dlsz, time.time() - tm, ug_err)
     sys.exit(0)
 
 import subprocess
@@ -2069,7 +2074,7 @@ class _ExternalDownloader:
             raise KeyboardInterrupt
         for line in lines:
             # parse downloader output
-            line = line.split(' ', 3)
+            line = line.split(' ', 5)
             _id, size = map(int, line[:2])
             if len(line) == 2:
                 opts = self.running[_id]
@@ -2081,12 +2086,13 @@ class _ExternalDownloader:
                 continue
             # job done
             opts = self.running.pop(_id)
-            if line[2] == 'OK':
+            if line[4] == 'OK':
                 ug_err = None
                 if DEBUG: DEBUG.info('success')
             else:
-                ug_err = URLGrabError(int(line[2]), line[3])
+                ug_err = URLGrabError(int(line[4]), line[5])
                 if DEBUG: DEBUG.info('failure: %s', err)
+            timedhosts(opts.url, int(line[2]), float(line[3]), ug_err)
             ret.append((opts, size, ug_err))
         return ret
 
@@ -2260,6 +2266,13 @@ def parallel_wait(meter = 'text'):
 
 
 #####################################################################
+#  Host bandwidth estimation
+#####################################################################
+
+def timedhosts(url, dl_size, dl_time, eg_err):
+    ''' Called when download finishes '''
+
+#####################################################################
 #  TESTING
 def _main_test():
     try: url, filename = sys.argv[1:3]
commit 950b48e6fa09857d7165fd01dbb035bd6a2fdb6c
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri Jan 20 14:15:06 2012 +0100

    Dynamic mirror selection
    
    - Dropped the MG's 'max_connections' option, added a global one.
    - Single download queue, mirror selected as late as possible.
    - Merged request dispatch & flushing loops.
    
    Also, dropped the 'GrabRequest' instance, which originally contained
    a copy of all mirrors.  Replaced that with a set of failed mirrors.
    
    - No need to clone the mirror list to each request.
    - Now the master list could be shuffled during downloads,
      and further mirror selections could act on that.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index e0bec1b..5c75088 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -269,6 +269,10 @@ GENERAL ARGUMENTS (kwargs)
     but queued.  parallel_wait() then processes grabs in parallel, limiting
     the numer of connections in each 'key' group to at most 'limit'.
 
+  max_connections
+
+    The global connection limit.
+
 
 RETRY RELATED ARGUMENTS
 
@@ -930,6 +934,8 @@ class URLGrabberOptions:
                          # to be. this is ultimately a MAXIMUM size for the file
         self.max_header_size = 2097152 #2mb seems reasonable for maximum header size
         self.async = None # blocking by default
+        self.mirror_group = None
+        self.max_connections = 5
         
     def __repr__(self):
         return self.format()
@@ -953,6 +959,8 @@ def _do_raise(obj):
     raise obj.exception
 
 def _run_callback(cb, obj):
+    if not cb:
+        return
     if callable(cb):
         return cb(obj)
     cb, arg, karg = cb
@@ -1076,9 +1084,7 @@ class URLGrabber(object):
             opts.url = url
             opts.filename = filename
             opts.size = int(opts.size or 0)
-            key, limit = opts.async
-            limit, queue = _async.setdefault(key, [limit, []])
-            queue.append(opts)
+            _async_queue.append(opts)
             return filename
 
         def retryfunc(opts, url, filename):
@@ -2134,7 +2140,7 @@ class _ExternalDownloaderPool:
 #  High level async API
 #####################################################################
 
-_async = {}
+_async_queue = []
 
 def parallel_wait(meter = 'text'):
     '''Process queued requests in parallel.
@@ -2142,31 +2148,29 @@ def parallel_wait(meter = 'text'):
 
     if meter:
         count = total = 0
-        for limit, queue in _async.values():
-            for opts in queue:
-                count += 1
-                total += opts.size
+        for opts in _async_queue:
+            count += 1
+            total += opts.size
         if meter == 'text':
             from progress import TextMultiFileMeter
             meter = TextMultiFileMeter()
         meter.start(count, total)
 
     dl = _ExternalDownloaderPool()
+    host_con = {} # current host connection counts
 
     def start(opts, tries):
+        key, limit = opts.async
+        host_con[key] = host_con.get(key, 0) + 1
         opts.tries = tries
         opts.progress_obj = meter and meter.newMeter()
         if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, opts.url)
         dl.start(opts)
 
-    def start_next(opts):
-        key, limit = opts.async
-        pos, queue = _async[key]; _async[key][0] += 1
-        if pos < len(queue):
-            start(queue[pos], 1)
-
     def perform():
         for opts, size, ug_err in dl.perform():
+            key, limit = opts.async
+            host_con[key] -= 1
             if meter:
                 m = opts.progress_obj
                 m.basename = os.path.basename(opts.filename)
@@ -2181,12 +2185,8 @@ def parallel_wait(meter = 'text'):
             if ug_err is None:
                 if opts.checkfunc:
                     try: _run_callback(opts.checkfunc, opts)
-                    except URLGrabError, ug_err:
-                        if meter:
-                            meter.numfiles += 1
-                            meter.re.total += opts.size
+                    except URLGrabError, ug_err: pass
                 if ug_err is None:
-                    start_next(opts)
                     continue
 
             retry = opts.retry or 0
@@ -2197,31 +2197,16 @@ def parallel_wait(meter = 'text'):
             if opts.tries < retry and ug_err.args[0] in opts.retrycodes:
                 start(opts, opts.tries + 1) # simple retry
                 continue
-            start_next(opts)
 
-            if hasattr(opts, 'mirror_group'):
-                mg, gr, mirrorchoice = opts.mirror_group
+            if opts.mirror_group:
+                mg, failed = opts.mirror_group
+                opts.mirror = key
                 opts.exception = ug_err
-                opts.mirror = mirrorchoice['mirror']
-                opts.relative_url = gr.url
-                try:
-                    mg._failure(gr, opts)
-                    mirrorchoice = mg._get_mirror(gr)
-                    opts.mirror_group = mg, gr, mirrorchoice
-                except URLGrabError, ug_err: pass
-                else:
-                    # use new mirrorchoice
-                    key = mirrorchoice['mirror']
-                    kwargs = mirrorchoice.get('kwargs', {})
-                    limit = kwargs.get('max_connections', 1)
-                    opts.async = key, limit
-                    opts.url = mg._join_url(mirrorchoice['mirror'], gr.url)
-
-                    # add request to the new queue
-                    pos, queue = _async.setdefault(key, [limit, []])
-                    queue[pos:pos] = [opts] # inserting at head
-                    if len(queue) <= pos:
-                        start(opts, 1)
+                action = _run_callback(mg.failure_callback, opts)
+                if not (action and action.get('fail')):
+                    # mask this mirror and retry
+                    failed.add(key)
+                    _async_queue.append(opts)
                     continue
 
             # urlgrab failed
@@ -2229,18 +2214,49 @@ def parallel_wait(meter = 'text'):
             _run_callback(opts.failfunc, opts)
 
     try:
-        for limit, queue in _async.values():
-            for opts in queue[:limit]:
-                start(opts, 1)
-        # now 'limit' is used as 'pos', index
-        # of the first request not started yet.
-        while dl.running:
-            perform()
+        idx = 0
+        while True:
+            if idx >= len(_async_queue):
+                if not dl.running: break
+                perform(); continue
+
+            # check global limit
+            opts = _async_queue[idx]; idx += 1
+            limit = opts.max_connections
+            while len(dl.running) >= limit: perform()
+
+            if opts.mirror_group:
+                first = None
+                mg, failed = opts.mirror_group
+                for mirror in mg.mirrors:
+                    key = mirror['mirror']
+                    if key in failed: continue
+                    if not first: first = mirror
+                    limit = mirror.get('kwargs', {}).get('max_connections', 3)
+                    if host_con.get(key, 0) < limit: break
+                else:
+                    # no mirror with free slots.
+                    if not first:
+                        opts.exception = URLGrabError(256, _('No more mirrors to try.'))
+                        _run_callback(opts.failfunc, opts)
+                        continue
+                    mirror = first # fallback
+                    key = mirror['mirror']
+                    limit = mirror.get('kwargs', {}).get('max_connections', 3)
+
+                # update the request
+                opts.async = key, limit
+                opts.url = mg._join_url(key, opts.relative_url)
+
+            # check host limit, then start
+            key, limit = opts.async
+            while host_con.get(key, 0) >= limit: perform()
+            start(opts, 1)
 
     finally:
         dl.abort()
         if meter: meter.end()
-        _async.clear()
+        del _async_queue[:]
 
 
 #####################################################################
diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py
index da601b7..d699b61 100644
--- a/urlgrabber/mirror.py
+++ b/urlgrabber/mirror.py
@@ -77,7 +77,7 @@ CUSTOMIZATION
        kwargs are omitted, then (duh) they will not be used.
 
        kwarg 'max_connections' is used to store the max connection
-       limit of this mirror, and to update the value of 'async' option.
+       limit of this mirror.
 
     3) Pass keyword arguments when instantiating the mirror group.
        See, for example, the failure_callback argument.
@@ -94,7 +94,7 @@ import random
 import thread  # needed for locking to make this threadsafe
 
 from grabber import URLGrabError, CallbackObject, DEBUG, _to_utf8
-from grabber import _run_callback, _do_raise
+from grabber import _run_callback, _do_raise, _async_queue
 
 def _(st): 
     return st
@@ -175,13 +175,6 @@ class MirrorGroup:
                'fail': 0}              # set at instantiation, reset
                                        # from callback
 
-      max_connections
-
-        This option applies to parallel downloading only.  If specified,
-        downloads are evenly distributed to first N mirrors.  N is selected as
-        the smallest number of mirrors such that their total connection limit
-        is at least max_connections.  Retries are not such restricted.
-
       failure_callback
 
         this is a callback that will be called when a mirror "fails",
@@ -270,14 +263,6 @@ class MirrorGroup:
     def _process_kwargs(self, kwargs):
         self.failure_callback = kwargs.get('failure_callback')
         self.default_action   = kwargs.get('default_action')
-        if 'max_connections' in kwargs:
-            total = count = 0
-            for mirror in self.mirrors:
-                count += 1
-                total += mirror.get('kwargs', {}).get('max_connections', 1)
-                if total >= kwargs['max_connections']:
-                    break
-            self._spread = count
        
     def _parse_mirrors(self, mirrors):
         parsed_mirrors = []
@@ -410,17 +395,6 @@ class MirrorGroup:
             grabber = mirrorchoice.get('grabber') or self.grabber
             func_ref = getattr(grabber, func)
             if DEBUG: DEBUG.info('MIRROR: trying %s -> %s', url, fullurl)
-            if kw.get('async'):
-                # 'async' option to the 1st mirror
-                key = mirrorchoice['mirror']
-                limit = kwargs.get('max_connections', 1)
-                kwargs['async'] = key, limit
-                # async code iterates mirrors and calls failfunc
-                kwargs['mirror_group'] = self, gr, mirrorchoice
-                kwargs['failfunc'] = gr.kw.get('failfunc', _do_raise)
-                if hasattr(self, '_spread'):
-                    # increment the master mirror index
-                    self._next = (self._next + 1) % self._spread
             try:
                 return func_ref( *(fullurl,), **kwargs )
             except URLGrabError, e:
@@ -433,6 +407,17 @@ class MirrorGroup:
                 self._failure(gr, obj)
 
     def urlgrab(self, url, filename=None, **kwargs):
+        if kwargs.get('async'):
+            opts = self.grabber.opts.derive(**kwargs)
+            opts.mirror_group = self, set()
+            opts.relative_url = _to_utf8(url)
+
+            opts.url = 'http://tbd'
+            opts.filename = filename
+            opts.size = int(opts.size or 0)
+            _async_queue.append(opts)
+            return filename
+
         kw = dict(kwargs)
         kw['filename'] = filename
         func = 'urlgrab'
commit b628bf26e59fce60e0cb28b0841ca78f4bf90324
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Mon Jan 23 16:54:01 2012 +0100

    Small fixes

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index d604e26..e0bec1b 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -2000,12 +2000,14 @@ def download_process():
                 opts.progress_obj._id = cnt
             try:
                 fo = PyCurlFileObject(opts.url, opts.filename, opts)
-                fo._do_grab(); _amount_read = fo._amount_read
-                fo.fo.close(); ug_err = 'OK'
+                fo._do_grab()
+                fo.fo.close()
+                size = fo._amount_read
+                ug_err = 'OK'
             except URLGrabError, e:
-                _amount_read = 0
+                size = 0
                 ug_err = '%d %s' % e.args
-            _write('%d %d %s\n', opts._id, _amount_read, ug_err)
+            _write('%d %d %s\n', opts._id, size, ug_err)
     sys.exit(0)
 
 import subprocess
@@ -2047,7 +2049,7 @@ class _ExternalDownloader:
         if opts.progress_obj:
             arg.append('progress_obj=True')
         arg = ' '.join(arg)
-        if DEBUG: DEBUG.info('external: %s', arg)
+        if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, opts.url)
 
         self.cnt += 1
         self.running[self.cnt] = opts
@@ -2062,21 +2064,24 @@ class _ExternalDownloader:
         for line in lines:
             # parse downloader output
             line = line.split(' ', 3)
-            cnt, _amount_read = map(int, line[:2])
+            _id, size = map(int, line[:2])
             if len(line) == 2:
-                opts = self.running[cnt]
+                opts = self.running[_id]
                 m = opts.progress_obj
                 if m:
                     if not m.last_update_time:
                         m.start(text = opts.text)
-                    m.update(_amount_read)
+                    m.update(size)
                 continue
             # job done
-            opts = self.running.pop(cnt)
-            err = None
-            if line[2] != 'OK':
-                err = URLGrabError(int(line[2]), line[3])
-            ret.append((opts, err, _amount_read))
+            opts = self.running.pop(_id)
+            if line[2] == 'OK':
+                ug_err = None
+                if DEBUG: DEBUG.info('success')
+            else:
+                ug_err = URLGrabError(int(line[2]), line[3])
+                if DEBUG: DEBUG.info('failure: %s', err)
+            ret.append((opts, size, ug_err))
         return ret
 
     def abort(self):
@@ -2161,7 +2166,7 @@ def parallel_wait(meter = 'text'):
             start(queue[pos], 1)
 
     def perform():
-        for opts, ug_err, _amount_read in dl.perform():
+        for opts, size, ug_err in dl.perform():
             if meter:
                 m = opts.progress_obj
                 m.basename = os.path.basename(opts.filename)
@@ -2169,12 +2174,11 @@ def parallel_wait(meter = 'text'):
                     m.failure(ug_err.args[1])
                 else:
                     # file size might have changed
-                    meter.re.total += _amount_read - opts.size
-                    m.end(_amount_read)
+                    meter.re.total += size - opts.size
+                    m.end(size)
                 meter.removeMeter(m)
 
             if ug_err is None:
-                if DEBUG: DEBUG.info('success')
                 if opts.checkfunc:
                     try: _run_callback(opts.checkfunc, opts)
                     except URLGrabError, ug_err:
@@ -2185,7 +2189,6 @@ def parallel_wait(meter = 'text'):
                     start_next(opts)
                     continue
 
-            if DEBUG: DEBUG.info('failure: %s', ug_err)
             retry = opts.retry or 0
             if opts.failure_callback:
                 opts.exception = ug_err
@@ -2236,9 +2239,8 @@ def parallel_wait(meter = 'text'):
 
     finally:
         dl.abort()
+        if meter: meter.end()
         _async.clear()
-        if meter:
-            meter.end()
 
 
 #####################################################################
commit a668662dadf5b7c5684733ed7ff6a2a4c5d4c0c6
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Wed Dec 14 15:29:24 2011 +0100

    max_connections: updated semantics
    
    - mirror's max_connections default changed from 3 to 1.
    
    - added and documented MirrorGroup's max_connections option.
    
    - updated the code that selects initial mirrors for parallel downloads
      to adjust the number of mirrors used, honoring the MG's option.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 6d147c4..d604e26 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -2210,7 +2210,7 @@ def parallel_wait(meter = 'text'):
                     # use new mirrorchoice
                     key = mirrorchoice['mirror']
                     kwargs = mirrorchoice.get('kwargs', {})
-                    limit = kwargs.get('max_connections') or 3
+                    limit = kwargs.get('max_connections', 1)
                     opts.async = key, limit
                     opts.url = mg._join_url(mirrorchoice['mirror'], gr.url)
 
diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py
index 853790b..da601b7 100644
--- a/urlgrabber/mirror.py
+++ b/urlgrabber/mirror.py
@@ -175,6 +175,13 @@ class MirrorGroup:
                'fail': 0}              # set at instantiation, reset
                                        # from callback
 
+      max_connections
+
+        This option applies to parallel downloading only.  If specified,
+        downloads are evenly distributed to first N mirrors.  N is selected as
+        the smallest number of mirrors such that their total connection limit
+        is at least max_connections.  Retries are not such restricted.
+
       failure_callback
 
         this is a callback that will be called when a mirror "fails",
@@ -263,6 +270,14 @@ class MirrorGroup:
     def _process_kwargs(self, kwargs):
         self.failure_callback = kwargs.get('failure_callback')
         self.default_action   = kwargs.get('default_action')
+        if 'max_connections' in kwargs:
+            total = count = 0
+            for mirror in self.mirrors:
+                count += 1
+                total += mirror.get('kwargs', {}).get('max_connections', 1)
+                if total >= kwargs['max_connections']:
+                    break
+            self._spread = count
        
     def _parse_mirrors(self, mirrors):
         parsed_mirrors = []
@@ -398,13 +413,14 @@ class MirrorGroup:
             if kw.get('async'):
                 # 'async' option to the 1st mirror
                 key = mirrorchoice['mirror']
-                limit = kwargs.get('max_connections') or 3
+                limit = kwargs.get('max_connections', 1)
                 kwargs['async'] = key, limit
                 # async code iterates mirrors and calls failfunc
                 kwargs['mirror_group'] = self, gr, mirrorchoice
                 kwargs['failfunc'] = gr.kw.get('failfunc', _do_raise)
-                # increment master
-                self._next = (self._next + 1) % min(len(self.mirrors), 5)
+                if hasattr(self, '_spread'):
+                    # increment the master mirror index
+                    self._next = (self._next + 1) % self._spread
             try:
                 return func_ref( *(fullurl,), **kwargs )
             except URLGrabError, e:
commit e001be2bce1bc0b72af87e667232dac9c269d681
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Wed Dec 14 14:24:08 2011 +0100

    Fix ctrl-c tracebacks from downloader processes
    
    - handle broken pipe case
    
    - handle sigint early, so no KeyboardInterrupt
      inside pycurl callbacks that can't be caught.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index aeb0cb1..6d147c4 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1959,7 +1959,7 @@ class _ProxyProgress:
         t = time.time()
         if t < self.next_update: return
         self.next_update = t + 0.31
-        os.write(1, '%d %d\n' % (self._id, _amount_read))
+        _write('%d %d\n', self._id, _amount_read)
 
 def _readlines(fd):
     buf = os.read(fd, 4096)
@@ -1969,12 +1969,21 @@ def _readlines(fd):
         buf += os.read(fd, 4096)
     return buf[:-1].split('\n')
 
+def _write(fmt, *arg):
+    try: os.write(1, fmt % arg)
+    except OSError, e:
+        if e.arg[0] != errno.EPIPE: raise
+        sys.exit(1)
+
 def download_process():
     ''' Download process
         - watch stdin for new requests, parse & issue em.
         - use ProxyProgress to send _amount_read during dl.
         - abort on EOF.
     '''
+    import signal
+    signal.signal(signal.SIGINT, lambda n, f: sys.exit(1))
+
     cnt = 0
     while True:
         lines = _readlines(0)
@@ -1996,7 +2005,7 @@ def download_process():
             except URLGrabError, e:
                 _amount_read = 0
                 ug_err = '%d %s' % e.args
-            os.write(1, '%d %d %s\n' % (opts._id, _amount_read, ug_err))
+            _write('%d %d %s\n', opts._id, _amount_read, ug_err)
     sys.exit(0)
 
 import subprocess
@@ -2338,9 +2347,7 @@ def _test_file_object_readlines(wrapper, fo_output):
 
 if __name__ == '__main__':
     if sys.argv[1:] == ['DOWNLOADER']:
-        try: download_process()
-        except KeyboardInterrupt:
-            raise SystemExit # no traceback
+        download_process()
 
     _main_test()
     _retry_test()
commit 9dc61b84d6a213666f87836955bdd3b8f6042b79
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri Dec 9 15:13:51 2011 +0100

    Remove unnecessary try/except
    
    In pooled mode, _readlines won't block

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 6890396..aeb0cb1 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -2046,10 +2046,7 @@ class _ExternalDownloader:
 
     def perform(self):
         ret = []
-        try: lines = _readlines(self.stdout)
-        except OSError, e:
-            if e.args[0] != errno.EINTR: raise
-            raise KeyboardInterrupt
+        lines = _readlines(self.stdout)
         if not lines:
             if DEBUG: DEBUG.info('downloader died')
             raise KeyboardInterrupt
commit 2f4c818214510e9176852977860104ad3ff0d818
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri Dec 9 14:14:47 2011 +0100

    Use first 5 mirrors instead of just the 1st one.

diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py
index 41c99f5..853790b 100644
--- a/urlgrabber/mirror.py
+++ b/urlgrabber/mirror.py
@@ -403,6 +403,8 @@ class MirrorGroup:
                 # async code iterates mirrors and calls failfunc
                 kwargs['mirror_group'] = self, gr, mirrorchoice
                 kwargs['failfunc'] = gr.kw.get('failfunc', _do_raise)
+                # increment master
+                self._next = (self._next + 1) % min(len(self.mirrors), 5)
             try:
                 return func_ref( *(fullurl,), **kwargs )
             except URLGrabError, e:
commit c364bee533f18985acf45b3bafdb76ceda707912
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri Dec 9 14:13:44 2011 +0100

    Revert "move pycurl.error handling to _do_perform_exc()"
    
    This reverts commit 54ee979ada490cdae12fff038a838602fa404075.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index d9e5111..6890396 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1373,14 +1373,8 @@ class PyCurlFileObject(object):
             return
         
         try:
-            e = None
             self.curl_obj.perform()
-        except pycurl.error, e: pass
-        self._do_perform_exc(e)
-
-    def _do_perform_exc(self, e):
-        # handle pycurl exception 'e'
-        if e:
+        except pycurl.error, e:
             # XXX - break some of these out a bit more clearly
             # to other URLGrabErrors from 
             # http://curl.haxx.se/libcurl/c/libcurl-errors.html
commit 19b2083d2b8be17959aec9c06cec845d0bcbce5e
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri Dec 9 14:13:42 2011 +0100

    Revert "move opening of target file to _do_open_fo()."
    
    This reverts commit eb365596a44bd8ca15fe290d50290f0f84a268dd.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 28754d9..d9e5111 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1634,7 +1634,22 @@ class PyCurlFileObject(object):
         _was_filename = False
         if type(self.filename) in types.StringTypes and self.filename:
             _was_filename = True
-            self._do_open_fo()
+            self._prog_reportname = str(self.filename)
+            self._prog_basename = os.path.basename(self.filename)
+            
+            if self.append: mode = 'ab'
+            else: mode = 'wb'
+
+            if DEBUG: DEBUG.info('opening local file "%s" with mode %s' % \
+                                 (self.filename, mode))
+            try:
+                self.fo = open(self.filename, mode)
+            except IOError, e:
+                err = URLGrabError(16, _(\
+                  'error opening local file from %s, IOError: %s') % (self.url, e))
+                err.url = self.url
+                raise err
+
         else:
             self._prog_reportname = 'MEMORY'
             self._prog_basename = 'MEMORY'
@@ -1692,22 +1707,6 @@ class PyCurlFileObject(object):
 
         self._complete = True
     
-    def _do_open_fo(self):
-        self._prog_reportname = str(self.filename)
-        self._prog_basename = os.path.basename(self.filename)
-        if self.append: mode = 'ab'
-        else: mode = 'wb'
-
-        if DEBUG: DEBUG.info('opening local file "%s" with mode %s' % \
-                             (self.filename, mode))
-        try:
-            self.fo = open(self.filename, mode)
-        except IOError, e:
-            err = URLGrabError(16, _(\
-              'error opening local file from %s, IOError: %s') % (self.url, e))
-            err.url = self.url
-            raise err
-
     def _fill_buffer(self, amt=None):
         """fill the buffer to contain at least 'amt' bytes by reading
         from the underlying file object.  If amt is None, then it will
commit ecd131488ba6f0f120bfadbf542cb69c6b0fc2b2
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri Dec 9 14:13:33 2011 +0100

    Revert "move closing of target file to _do_close_fo()."
    
    This reverts commit 5443754de676ed4b173502522143460ae8a50013.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 8aa70d9..28754d9 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1654,7 +1654,29 @@ class PyCurlFileObject(object):
             raise e
     
         if _was_filename:
-            self._do_close_fo()
+            # close it up
+            self.fo.flush()
+            self.fo.close()
+
+            # Set the URL where we got it from:
+            if xattr is not None:
+                # See: http://www.freedesktop.org/wiki/CommonExtendedAttributes
+                try:
+                    xattr.set(self.filename, 'user.xdg.origin.url', self.url)
+                except:
+                    pass # URL too long. = IOError ... ignore everything.
+
+            # set the time
+            mod_time = self.curl_obj.getinfo(pycurl.INFO_FILETIME)
+            if mod_time != -1:
+                try:
+                    os.utime(self.filename, (mod_time, mod_time))
+                except OSError, e:
+                    err = URLGrabError(16, _(\
+                      'error setting timestamp on file %s from %s, OSError: %s') 
+                              % (self.filename, self.url, e))
+                    err.url = self.url
+                    raise err
             # re open it
             try:
                 self.fo = open(self.filename, 'r')
@@ -1686,31 +1708,6 @@ class PyCurlFileObject(object):
             err.url = self.url
             raise err
 
-    def _do_close_fo(self):
-        # close it up
-        self.fo.flush()
-        self.fo.close()
-
-        # Set the URL where we got it from:
-        if xattr is not None:
-            # See: http://www.freedesktop.org/wiki/CommonExtendedAttributes
-            try:
-                xattr.set(self.filename, 'user.xdg.origin.url', self.url)
-            except:
-                pass # URL too long. = IOError ... ignore everything.
-
-        # set the time
-        mod_time = self.curl_obj.getinfo(pycurl.INFO_FILETIME)
-        if mod_time != -1:
-            try:
-                os.utime(self.filename, (mod_time, mod_time))
-            except OSError, e:
-                err = URLGrabError(16, _(\
-                  'error setting timestamp on file %s from %s, OSError: %s') 
-                          % (self.filename, self.url, e))
-                err.url = self.url
-                raise err
-
     def _fill_buffer(self, amt=None):
         """fill the buffer to contain at least 'amt' bytes by reading
         from the underlying file object.  If amt is None, then it will
commit e3d8593f32bdea82ee4ef41c3f5bc232ea91291a
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri Dec 9 14:11:36 2011 +0100

    assume ug_mode == 'pooled'

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 655fa0f..8aa70d9 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -568,13 +568,6 @@ _log_package_state()
 def _(st):
     return st
 
-# Downloader modes:
-# 'compat': no parallel downloads
-# 'direct': use CurlMulti directly
-# 'extern': fork+exec a process that uses CurlMulti
-# 'pooled': multiple fork+exec'd processes
-ug_mode = os.environ.get('URLGRABBER_MODE', 'pooled')
-
 ########################################################################
 #                 END MODULE INITIALIZATION
 ########################################################################
@@ -1079,7 +1072,7 @@ class URLGrabber(object):
                     _run_callback(opts.checkfunc, obj)
                 return path
         
-        if opts.async and ug_mode != 'compat':
+        if opts.async:
             opts.url = url
             opts.filename = filename
             opts.size = int(opts.size or 0)
@@ -1892,105 +1885,6 @@ def retrygrab(url, filename=None, copy_local=0, close_connection=0,
 
         
 #####################################################################
-#  Downloader
-#####################################################################
-
-class _AsyncCurlFile(PyCurlFileObject):
-    curl_cache = {}
-    def _host(self):
-        return urlparse.urlsplit(self.opts.url).netloc
-
-    def _do_open(self):
-        # try to reuse curl objects
-        curl = self.curl_cache.pop(self._host(), None)
-        self.curl_obj = curl or pycurl.Curl()
-        self._set_opts()
-        self._do_open_fo() # open the file but don't grab
-
-    def _do_close_fo(self):
-        self.curl_cache[self._host()] = self.curl_obj
-        PyCurlFileObject._do_close_fo(self)
-
-class _DirectDownloaderMulti:
-    def __init__(self):
-        ''' A downloader context.
-        '''
-        self.running = {}
-        self.multi = pycurl.CurlMulti()
-        self.tout = 0
-
-    def select(self):
-        ''' Block for some time.  Return True if 'stdin' readable,
-            False when just internal processing needed.
-        '''
-        fdset = self.multi.fdset()
-        fdset[0].append(0) # stdin
-        fdset = select.select(*(fdset + (self.tout,)))
-        self.tout = min(self.tout * 1.1, 5)
-        return 0 in fdset[0]
-
-    def start(self, opts):
-        ''' Start download of job 'opts'
-        '''
-        fo = _AsyncCurlFile(opts.url, opts.filename, opts)
-        self.running[fo.curl_obj] = fo
-        self.multi.add_handle(fo.curl_obj)
-
-        # XXX: likely a CurlMulti() bug
-        # fdset() is empty shortly after starting new request.
-        # Do some polling to work this around.
-        self.tout = 10e-3
-
-    def perform(self):
-        ''' Run downloads, return finished ones.
-        '''
-        while self.multi.perform()[0] == pycurl.E_CALL_MULTI_PERFORM:
-            pass
-        ret = []
-        _, finished, failed = self.multi.info_read()
-        for curl in finished + failed:
-            curl_err = None
-            if type(curl) == tuple:
-                curl, code, msg = curl
-                curl_err = pycurl.error(code, msg)
-            self.multi.remove_handle(curl)
-            fo = self.running.pop(curl)
-            try: ug_err = None; fo._do_perform_exc(curl_err)
-            except URLGrabError, ug_err: pass
-            fo._do_close_fo()
-            ret.append((fo.opts, ug_err, fo._amount_read))
-        return ret
-
-    def abort(self):
-        ''' Abort currently active downloads.
-        '''
-        while self.running:
-            curl, fo = self.running.popitem()
-            self.multi.remove_handle(curl)
-            fo._do_close_fo()
-
-class _DirectDownloaderSingle:
-    ''' _DirectDownloader downloading one file at a time.
-    '''
-    def select(self):
-        return True
-
-    def start(self, opts):
-        try:
-            fo = PyCurlFileObject(opts.url, opts.filename, opts)
-            fo._do_grab(); _amount_read = fo._amount_read
-            fo.fo.close(); ug_err = None
-        except URLGrabError, ug_err:
-            _amount_read = 0
-        self.result = opts, ug_err, _amount_read
-
-    def perform(self):
-        return [self.result]
-
-    def abort(self):
-        pass
-
-#####################################################################
 #  Serializer + parser: A replacement of the rather bulky Json code.
 #
 # - handles basic python literals, lists and tuples.
@@ -2091,33 +1985,28 @@ def download_process():
         - use ProxyProgress to send _amount_read during dl.
         - abort on EOF.
     '''
-    if   ug_mode == 'extern': dl = _DirectDownloaderMulti()
-    elif ug_mode == 'pooled': dl = _DirectDownloaderSingle()
-    else: raise ValueError, 'Unknown ext mode %s' % ug_mode
-
     cnt = 0
     while True:
-        if dl.select():
-            lines = _readlines(0)
-            if not lines: break
-            for line in lines:
-                # start new download
-                cnt += 1
-                opts = URLGrabberOptions()
-                opts._id = cnt
-                for k in line.split(' '):
-                    k, v = k.split('=', 1)
-                    setattr(opts, k, _loads(v))
-                if opts.progress_obj:
-                    opts.progress_obj = _ProxyProgress()
-                    opts.progress_obj._id = cnt
-                dl.start(opts)
-
-        # perform requests
-        for opts, ug_err, _amount_read in dl.perform():
-            ug_err = ug_err and '%d %s' % ug_err.args or 'OK'
+        lines = _readlines(0)
+        if not lines: break
+        for line in lines:
+            cnt += 1
+            opts = URLGrabberOptions()
+            opts._id = cnt
+            for k in line.split(' '):
+                k, v = k.split('=', 1)
+                setattr(opts, k, _loads(v))
+            if opts.progress_obj:
+                opts.progress_obj = _ProxyProgress()
+                opts.progress_obj._id = cnt
+            try:
+                fo = PyCurlFileObject(opts.url, opts.filename, opts)
+                fo._do_grab(); _amount_read = fo._amount_read
+                fo.fo.close(); ug_err = 'OK'
+            except URLGrabError, e:
+                _amount_read = 0
+                ug_err = '%d %s' % e.args
             os.write(1, '%d %d %s\n' % (opts._id, _amount_read, ug_err))
-    dl.abort()
     sys.exit(0)
 
 import subprocess
@@ -2250,8 +2139,6 @@ def parallel_wait(meter = 'text'):
     '''Process queued requests in parallel.
     '''
 
-    if ug_mode == 'compat':
-        return
     if meter:
         count = total = 0
         for limit, queue in _async.values():
@@ -2263,10 +2150,7 @@ def parallel_wait(meter = 'text'):
             meter = TextMultiFileMeter()
         meter.start(count, total)
 
-    if   ug_mode == 'direct': dl = _DirectDownloaderMulti()
-    elif ug_mode == 'extern': dl = _ExternalDownloader()
-    elif ug_mode == 'pooled': dl = _ExternalDownloaderPool()
-    else: raise ValueError, 'Unknown mode %s' % ug_mode
+    dl = _ExternalDownloaderPool()
 
     def start(opts, tries):
         opts.tries = tries
commit 772b9d1f0ca3e561ba2cb40c9f1cc9efc6f7692f
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Tue Dec 6 16:09:44 2011 +0100

    urlgrabber downloader mode selection
    
    Removed the parallel_wait() argument 'external',
    and the compile-time constant AVOID_CURL_MULTI.
    
    URLGRABBER_MODE selects the downloader mode:
    * 'compat': no parallel downloads
    * 'direct': use CurlMulti directly
    * 'extern': fork+exec a process that uses CurlMulti
    * 'pooled': multiple fork+exec'd processes

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 585209a..655fa0f 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -568,6 +568,13 @@ _log_package_state()
 def _(st):
     return st
 
+# Downloader modes:
+# 'compat': no parallel downloads
+# 'direct': use CurlMulti directly
+# 'extern': fork+exec a process that uses CurlMulti
+# 'pooled': multiple fork+exec'd processes
+ug_mode = os.environ.get('URLGRABBER_MODE', 'pooled')
+
 ########################################################################
 #                 END MODULE INITIALIZATION
 ########################################################################
@@ -1072,7 +1079,7 @@ class URLGrabber(object):
                     _run_callback(opts.checkfunc, obj)
                 return path
         
-        if opts.async:
+        if opts.async and ug_mode != 'compat':
             opts.url = url
             opts.filename = filename
             opts.size = int(opts.size or 0)
@@ -2078,19 +2085,16 @@ def _readlines(fd):
         buf += os.read(fd, 4096)
     return buf[:-1].split('\n')
 
-# Set this flag to 'True' to avoid using pycurl.CurlMulti()
-AVOID_CURL_MULTI = True
-
 def download_process():
     ''' Download process
         - watch stdin for new requests, parse & issue em.
         - use ProxyProgress to send _amount_read during dl.
         - abort on EOF.
     '''
-    if AVOID_CURL_MULTI:
-        dl = _DirectDownloaderSingle()
-    else:
-        dl = _DirectDownloaderMulti()
+    if   ug_mode == 'extern': dl = _DirectDownloaderMulti()
+    elif ug_mode == 'pooled': dl = _DirectDownloaderSingle()
+    else: raise ValueError, 'Unknown ext mode %s' % ug_mode
+
     cnt = 0
     while True:
         if dl.select():
@@ -2242,10 +2246,12 @@ class _ExternalDownloaderPool:
 
 _async = {}
 
-def parallel_wait(meter = 'text', external = True):
+def parallel_wait(meter = 'text'):
     '''Process queued requests in parallel.
     '''
 
+    if ug_mode == 'compat':
+        return
     if meter:
         count = total = 0
         for limit, queue in _async.values():
@@ -2257,12 +2263,10 @@ def parallel_wait(meter = 'text', external = True):
             meter = TextMultiFileMeter()
         meter.start(count, total)
 
-    if external:
-        if AVOID_CURL_MULTI:
-            dl = _ExternalDownloaderPool()
-        else:
-            dl = _ExternalDownloader()
-    else: dl = _DirectDownloaderMulti()
+    if   ug_mode == 'direct': dl = _DirectDownloaderMulti()
+    elif ug_mode == 'extern': dl = _ExternalDownloader()
+    elif ug_mode == 'pooled': dl = _ExternalDownloaderPool()
+    else: raise ValueError, 'Unknown mode %s' % ug_mode
 
     def start(opts, tries):
         opts.tries = tries
commit 7951911478a35c3d6559b7ef48b8483e0f63abf9
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Tue Dec 6 15:38:47 2011 +0100

    _DirectDownloader{Single,Multi}
    
    Add some more code to _DirectDownloader class and rename it to
    '_DirectDownloaderMulti'.  Add a similiar class with the same API
    that downloads a single file at a time: _DirectDownloaderSingle.
    Downloader then picks the right class at runtime.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index af60dce..585209a 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1904,12 +1904,23 @@ class _AsyncCurlFile(PyCurlFileObject):
         self.curl_cache[self._host()] = self.curl_obj
         PyCurlFileObject._do_close_fo(self)
 
-class _DirectDownloader:
+class _DirectDownloaderMulti:
     def __init__(self):
         ''' A downloader context.
         '''
         self.running = {}
         self.multi = pycurl.CurlMulti()
+        self.tout = 0
+
+    def select(self):
+        ''' Block for some time.  Return True if 'stdin' readable,
+            False when just internal processing needed.
+        '''
+        fdset = self.multi.fdset()
+        fdset[0].append(0) # stdin
+        fdset = select.select(*(fdset + (self.tout,)))
+        self.tout = min(self.tout * 1.1, 5)
+        return 0 in fdset[0]
 
     def start(self, opts):
         ''' Start download of job 'opts'
@@ -1918,6 +1929,11 @@ class _DirectDownloader:
         self.running[fo.curl_obj] = fo
         self.multi.add_handle(fo.curl_obj)
 
+        # XXX: likely a CurlMulti() bug
+        # fdset() is empty shortly after starting new request.
+        # Do some polling to work this around.
+        self.tout = 10e-3
+
     def perform(self):
         ''' Run downloads, return finished ones.
         '''
@@ -1939,11 +1955,34 @@ class _DirectDownloader:
         return ret
 
     def abort(self):
+        ''' Abort currently active downloads.
+        '''
         while self.running:
             curl, fo = self.running.popitem()
             self.multi.remove_handle(curl)
             fo._do_close_fo()
 
+class _DirectDownloaderSingle:
+    ''' _DirectDownloader downloading one file at a time.
+    '''
+    def select(self):
+        return True
+
+    def start(self, opts):
+        try:
+            fo = PyCurlFileObject(opts.url, opts.filename, opts)
+            fo._do_grab(); _amount_read = fo._amount_read
+            fo.fo.close(); ug_err = None
+        except URLGrabError, ug_err:
+            _amount_read = 0
+        self.result = opts, ug_err, _amount_read
+
+    def perform(self):
+        return [self.result]
+
+    def abort(self):
+        pass
+
 #####################################################################
 #  Serializer + parser: A replacement of the rather bulky Json code.
 #
@@ -2049,36 +2088,12 @@ def download_process():
         - abort on EOF.
     '''
     if AVOID_CURL_MULTI:
-        cnt = 0
-        while True:
-            lines = _readlines(0)
-            if not lines: break
-            for line in lines:
-                cnt += 1
-                opts = URLGrabberOptions()
-                for k in line.split(' '):
-                    k, v = k.split('=', 1)
-                    setattr(opts, k, _loads(v))
-                if opts.progress_obj:
-                    opts.progress_obj = _ProxyProgress()
-                    opts.progress_obj._id = cnt
-                try:
-                    fo = PyCurlFileObject(opts.url, opts.filename, opts)
-                    fo._do_grab(); _amount_read = fo._amount_read
-                    fo.fo.close(); ug_err = 'OK'
-                except URLGrabError, e:
-                    _amount_read = 0
-                    ug_err = '%d %s' % e.args
-                os.write(1, '%d %d %s\n' % (cnt, _amount_read, ug_err))
-        sys.exit(0)
-
-    dl = _DirectDownloader()
-    cnt = tout = 0
+        dl = _DirectDownloaderSingle()
+    else:
+        dl = _DirectDownloaderMulti()
+    cnt = 0
     while True:
-        fdset = dl.multi.fdset()
-        fdset[0].append(0) # stdin
-        fdset = select.select(*(fdset + (tout,)))
-        if 0 in fdset[0]:
+        if dl.select():
             lines = _readlines(0)
             if not lines: break
             for line in lines:
@@ -2094,16 +2109,10 @@ def download_process():
                     opts.progress_obj._id = cnt
                 dl.start(opts)
 
-            # XXX: likely a CurlMulti() bug
-            # fdset() is empty shortly after starting new request.
-            # Do some polling to work this around.
-            tout = 10e-3
-
         # perform requests
         for opts, ug_err, _amount_read in dl.perform():
             ug_err = ug_err and '%d %s' % ug_err.args or 'OK'
             os.write(1, '%d %d %s\n' % (opts._id, _amount_read, ug_err))
-        tout = min(tout * 1.1, 5)
     dl.abort()
     sys.exit(0)
 
@@ -2253,7 +2262,7 @@ def parallel_wait(meter = 'text', external = True):
             dl = _ExternalDownloaderPool()
         else:
             dl = _ExternalDownloader()
-    else: dl = _DirectDownloader()
+    else: dl = _DirectDownloaderMulti()
 
     def start(opts, tries):
         opts.tries = tries
commit 31e91398f0dcf282984a09159c153f19ac37107d
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Thu Nov 24 15:30:06 2011 +0100

    Initial epoll() support in ExternalDownloaderPool.
    
    Handy if there's 1000+ downloaders to handle :)

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 5ee34fc..af60dce 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -2188,6 +2188,7 @@ class _ExternalDownloader:
 
 class _ExternalDownloaderPool:
     def __init__(self):
+        self.epoll = select.epoll()
         self.running = {}
         self.cache = {}
 
@@ -2198,12 +2199,14 @@ class _ExternalDownloaderPool:
             dl = _ExternalDownloader()
             fl = fcntl.fcntl(dl.stdin, fcntl.F_GETFD)
             fcntl.fcntl(dl.stdin, fcntl.F_SETFD, fl | fcntl.FD_CLOEXEC)
+        self.epoll.register(dl.stdout, select.EPOLLIN)
         self.running[dl.stdout] = dl
         dl.start(opts)
 
     def perform(self):
         ret = []
-        for fd in select.select(self.running, [], [])[0]:
+        for fd, event in self.epoll.poll():
+            assert event & select.EPOLLIN
             done = self.running[fd].perform()
             if not done: continue
             assert len(done) == 1
@@ -2212,11 +2215,13 @@ class _ExternalDownloaderPool:
             # dl finished, move it to the cache
             host = urlparse.urlsplit(done[0][0].url).netloc
             if host in self.cache: self.cache[host].abort()
+            self.epoll.unregister(fd)
             self.cache[host] = self.running.pop(fd)
         return ret
 
     def abort(self):
         for dl in self.running.values():
+            self.epoll.unregister(dl.stdout)
             dl.abort()
         for dl in self.cache.values():
             dl.abort()
commit 9cfced39c466a6416266c4e961f76fb63b0f7d2a
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Tue Nov 22 14:37:43 2011 +0100

    AVOID_CURL_MULTI flag
    
    When using the 'external' download process it's not strictly necessary
    to use CurlMulti(), as we can use ordinary blocking code and just throw
    more processes on that.
    
    AVOID_CURL_MULTI = True: Each download runs is a separate process.
    Processes are reused when downloading files from the same host.
    
    AVOID_CURL_MULTI = False: Fork a single process that handles all the
    downloading.  Should be somewhat more efficient.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index d1a8222..5ee34fc 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -459,7 +459,7 @@ import pycurl
 from ftplib import parse150
 from StringIO import StringIO
 from httplib import HTTPException
-import socket, select, errno
+import socket, select, errno, fcntl
 from byterange import range_tuple_normalize, range_tuple_to_header, RangeError
 
 try:
@@ -2039,12 +2039,39 @@ def _readlines(fd):
         buf += os.read(fd, 4096)
     return buf[:-1].split('\n')
 
+# Set this flag to 'True' to avoid using pycurl.CurlMulti()
+AVOID_CURL_MULTI = True
+
 def download_process():
     ''' Download process
         - watch stdin for new requests, parse & issue em.
         - use ProxyProgress to send _amount_read during dl.
         - abort on EOF.
     '''
+    if AVOID_CURL_MULTI:
+        cnt = 0
+        while True:
+            lines = _readlines(0)
+            if not lines: break
+            for line in lines:
+                cnt += 1
+                opts = URLGrabberOptions()
+                for k in line.split(' '):
+                    k, v = k.split('=', 1)
+                    setattr(opts, k, _loads(v))
+                if opts.progress_obj:
+                    opts.progress_obj = _ProxyProgress()
+                    opts.progress_obj._id = cnt
+                try:
+                    fo = PyCurlFileObject(opts.url, opts.filename, opts)
+                    fo._do_grab(); _amount_read = fo._amount_read
+                    fo.fo.close(); ug_err = 'OK'
+                except URLGrabError, e:
+                    _amount_read = 0
+                    ug_err = '%d %s' % e.args
+                os.write(1, '%d %d %s\n' % (cnt, _amount_read, ug_err))
+        sys.exit(0)
+
     dl = _DirectDownloader()
     cnt = tout = 0
     while True:
@@ -2159,6 +2186,41 @@ class _ExternalDownloader:
         self.popen.stdout.close()
         self.popen.wait()
 
+class _ExternalDownloaderPool:
+    def __init__(self):
+        self.running = {}
+        self.cache = {}
+
+    def start(self, opts):
+        host = urlparse.urlsplit(opts.url).netloc
+        dl = self.cache.pop(host, None)
+        if not dl:
+            dl = _ExternalDownloader()
+            fl = fcntl.fcntl(dl.stdin, fcntl.F_GETFD)
+            fcntl.fcntl(dl.stdin, fcntl.F_SETFD, fl | fcntl.FD_CLOEXEC)
+        self.running[dl.stdout] = dl
+        dl.start(opts)
+
+    def perform(self):
+        ret = []
+        for fd in select.select(self.running, [], [])[0]:
+            done = self.running[fd].perform()
+            if not done: continue
+            assert len(done) == 1
+            ret.extend(done)
+
+            # dl finished, move it to the cache
+            host = urlparse.urlsplit(done[0][0].url).netloc
+            if host in self.cache: self.cache[host].abort()
+            self.cache[host] = self.running.pop(fd)
+        return ret
+
+    def abort(self):
+        for dl in self.running.values():
+            dl.abort()
+        for dl in self.cache.values():
+            dl.abort()
+
 
 #####################################################################
 #  High level async API
@@ -2181,7 +2243,11 @@ def parallel_wait(meter = 'text', external = True):
             meter = TextMultiFileMeter()
         meter.start(count, total)
 
-    if external: dl = _ExternalDownloader()
+    if external:
+        if AVOID_CURL_MULTI:
+            dl = _ExternalDownloaderPool()
+        else:
+            dl = _ExternalDownloader()
     else: dl = _DirectDownloader()
 
     def start(opts, tries):
commit 994239add7d848305f77c72acbb0fbbf31b5297b
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Mon Nov 7 11:02:33 2011 +0100

    Optional/throttled progress reporting in download_process
    
    CurlFileObject updates progress meter on every write.  _ProxyProgress
    pipes this to the parent, but it's often ignored there.
    
    - make updates conditional
    - throttle update rate at 0.31s

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 00117dc..d1a8222 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -2023,8 +2023,12 @@ def _loads(s):
 #####################################################################
 
 class _ProxyProgress:
-    def start(*d1, **d2): pass
+    def start(self, *d1, **d2):
+        self.next_update = 0
     def update(self, _amount_read):
+        t = time.time()
+        if t < self.next_update: return
+        self.next_update = t + 0.31
         os.write(1, '%d %d\n' % (self._id, _amount_read))
 
 def _readlines(fd):
@@ -2055,11 +2059,12 @@ def download_process():
                 cnt += 1
                 opts = URLGrabberOptions()
                 opts._id = cnt
-                opts.progress_obj = _ProxyProgress()
-                opts.progress_obj._id = cnt
                 for k in line.split(' '):
                     k, v = k.split('=', 1)
                     setattr(opts, k, _loads(v))
+                if opts.progress_obj:
+                    opts.progress_obj = _ProxyProgress()
+                    opts.progress_obj._id = cnt
                 dl.start(opts)
 
             # XXX: likely a CurlMulti() bug
@@ -2111,6 +2116,8 @@ class _ExternalDownloader:
             v = getattr(opts, k)
             if v is None: continue
             arg.append('%s=%s' % (k, _dumps(v)))
+        if opts.progress_obj:
+            arg.append('progress_obj=True')
         arg = ' '.join(arg)
         if DEBUG: DEBUG.info('external: %s', arg)
 
commit d3b2e10afc4e99bc13c161ebf110383ee6f93b42
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri Nov 4 09:25:58 2011 +0100

    Improve ctrl-c handling
    
    We don't detach the downloader process from TTY, so it receives
    SIGINT as well, and may even exit sooner than Python raises
    KeyboardInterrupt in the parent process.
    
    - downloader: don't print ctrl-c traceback
    - parent: handle EINTR and EOF as ctrl-c

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index ee41d97..00117dc 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -459,7 +459,7 @@ import pycurl
 from ftplib import parse150
 from StringIO import StringIO
 from httplib import HTTPException
-import socket, select
+import socket, select, errno
 from byterange import range_tuple_normalize, range_tuple_to_header, RangeError
 
 try:
@@ -2120,7 +2120,13 @@ class _ExternalDownloader:
 
     def perform(self):
         ret = []
-        lines = _readlines(self.stdout)
+        try: lines = _readlines(self.stdout)
+        except OSError, e:
+            if e.args[0] != errno.EINTR: raise
+            raise KeyboardInterrupt
+        if not lines:
+            if DEBUG: DEBUG.info('downloader died')
+            raise KeyboardInterrupt
         for line in lines:
             # parse downloader output
             line = line.split(' ', 3)
@@ -2370,7 +2376,9 @@ def _test_file_object_readlines(wrapper, fo_output):
 
 if __name__ == '__main__':
     if sys.argv[1:] == ['DOWNLOADER']:
-        download_process()
+        try: download_process()
+        except KeyboardInterrupt:
+            raise SystemExit # no traceback
 
     _main_test()
     _retry_test()
commit 1ab655bf8ac8c19c6b0a6eee85cacbf4c384ccf6
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri Oct 21 10:42:59 2011 +0200

    External downloading
    
    Add 'external = True' flag to parallel_wait()
    to relay download requests to external process.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 45744f0..ee41d97 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -2041,7 +2041,6 @@ def download_process():
         - use ProxyProgress to send _amount_read during dl.
         - abort on EOF.
     '''
-    sys.stderr.close()
     dl = _DirectDownloader()
     cnt = tout = 0
     while True:
@@ -2076,6 +2075,77 @@ def download_process():
     dl.abort()
     sys.exit(0)
 
+import subprocess
+
+class _ExternalDownloader:
+    def __init__(self):
+        self.popen = subprocess.Popen(
+            ['/usr/bin/python', __file__, 'DOWNLOADER'],
+            stdin = subprocess.PIPE,
+            stdout = subprocess.PIPE,
+        )
+        self.stdin  = self.popen.stdin.fileno()
+        self.stdout = self.popen.stdout.fileno()
+        self.running = {}
+        self.cnt = 0
+
+    # list of options we pass to downloader
+    _options = (
+        'url', 'filename',
+        'timeout', 'close_connection', 'keepalive',
+        'throttle', 'bandwidth', 'range', 'reget',
+        'user_agent', 'http_headers', 'ftp_headers',
+        'proxies', 'prefix', 'quote',
+        'username', 'password',
+        'ssl_ca_cert',
+        'ssl_cert', 'ssl_cert_type',
+        'ssl_key', 'ssl_key_type',
+        'ssl_key_pass',
+        'ssl_verify_peer', 'ssl_verify_host',
+        'size', 'max_header_size', 'ip_resolve',
+    )
+
+    def start(self, opts):
+        arg = []
+        for k in self._options:
+            v = getattr(opts, k)
+            if v is None: continue
+            arg.append('%s=%s' % (k, _dumps(v)))
+        arg = ' '.join(arg)
+        if DEBUG: DEBUG.info('external: %s', arg)
+
+        self.cnt += 1
+        self.running[self.cnt] = opts
+        os.write(self.stdin, arg +'\n')
+
+    def perform(self):
+        ret = []
+        lines = _readlines(self.stdout)
+        for line in lines:
+            # parse downloader output
+            line = line.split(' ', 3)
+            cnt, _amount_read = map(int, line[:2])
+            if len(line) == 2:
+                opts = self.running[cnt]
+                m = opts.progress_obj
+                if m:
+                    if not m.last_update_time:
+                        m.start(text = opts.text)
+                    m.update(_amount_read)
+                continue
+            # job done
+            opts = self.running.pop(cnt)
+            err = None
+            if line[2] != 'OK':
+                err = URLGrabError(int(line[2]), line[3])
+            ret.append((opts, err, _amount_read))
+        return ret
+
+    def abort(self):
+        self.popen.stdin.close()
+        self.popen.stdout.close()
+        self.popen.wait()
+
 
 #####################################################################
 #  High level async API
@@ -2083,7 +2153,7 @@ def download_process():
 
 _async = {}
 
-def parallel_wait(meter = 'text'):
+def parallel_wait(meter = 'text', external = True):
     '''Process queued requests in parallel.
     '''
 
@@ -2098,7 +2168,8 @@ def parallel_wait(meter = 'text'):
             meter = TextMultiFileMeter()
         meter.start(count, total)
 
-    dl = _DirectDownloader()
+    if external: dl = _ExternalDownloader()
+    else: dl = _DirectDownloader()
 
     def start(opts, tries):
         opts.tries = tries
commit 40e8c87e51ab99d3ce18596c8d5d9da168010609
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri Nov 4 08:35:15 2011 +0100

    Downloader process
    
    When executed with a single argument 'DOWNLOADER', grabber.py
    parses download requests on stdin, and reports the results to stdout.
    
    Conflicts:
    
    	urlgrabber/grabber.py

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 15b1643..45744f0 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -459,7 +459,7 @@ import pycurl
 from ftplib import parse150
 from StringIO import StringIO
 from httplib import HTTPException
-import socket
+import socket, select
 from byterange import range_tuple_normalize, range_tuple_to_header, RangeError
 
 try:
@@ -2019,6 +2019,65 @@ def _loads(s):
 
 
 #####################################################################
+#  External downloader process
+#####################################################################
+
+class _ProxyProgress:
+    def start(*d1, **d2): pass
+    def update(self, _amount_read):
+        os.write(1, '%d %d\n' % (self._id, _amount_read))
+
+def _readlines(fd):
+    buf = os.read(fd, 4096)
+    if not buf: return None
+    # whole lines only, no buffering
+    while buf[-1] != '\n':
+        buf += os.read(fd, 4096)
+    return buf[:-1].split('\n')
+
+def download_process():
+    ''' Download process
+        - watch stdin for new requests, parse & issue em.
+        - use ProxyProgress to send _amount_read during dl.
+        - abort on EOF.
+    '''
+    sys.stderr.close()
+    dl = _DirectDownloader()
+    cnt = tout = 0
+    while True:
+        fdset = dl.multi.fdset()
+        fdset[0].append(0) # stdin
+        fdset = select.select(*(fdset + (tout,)))
+        if 0 in fdset[0]:
+            lines = _readlines(0)
+            if not lines: break
+            for line in lines:
+                # start new download
+                cnt += 1
+                opts = URLGrabberOptions()
+                opts._id = cnt
+                opts.progress_obj = _ProxyProgress()
+                opts.progress_obj._id = cnt
+                for k in line.split(' '):
+                    k, v = k.split('=', 1)
+                    setattr(opts, k, _loads(v))
+                dl.start(opts)
+
+            # XXX: likely a CurlMulti() bug
+            # fdset() is empty shortly after starting new request.
+            # Do some polling to work this around.
+            tout = 10e-3
+
+        # perform requests
+        for opts, ug_err, _amount_read in dl.perform():
+            ug_err = ug_err and '%d %s' % ug_err.args or 'OK'
+            os.write(1, '%d %d %s\n' % (opts._id, _amount_read, ug_err))
+        tout = min(tout * 1.1, 5)
+    dl.abort()
+    sys.exit(0)
+
+
+#####################################################################
 #  High level async API
 #####################################################################
 
@@ -2239,6 +2298,9 @@ def _test_file_object_readlines(wrapper, fo_output):
     fo_output.write(string.join(li, ''))
 
 if __name__ == '__main__':
+    if sys.argv[1:] == ['DOWNLOADER']:
+        download_process()
+
     _main_test()
     _retry_test()
     _file_object_test('test')
commit 106d66883dcf00a76aa97cde15319429f9d77d5f
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri Nov 4 08:17:02 2011 +0100

    _dumps + _loads: custom serializer/parser

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 6563d0e..15b1643 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1944,6 +1944,79 @@ class _DirectDownloader:
             self.multi.remove_handle(curl)
             fo._do_close_fo()
 
+#####################################################################
+#  Serializer + parser: A replacement of the rather bulky Json code.
+#
+# - handles basic python literals, lists and tuples.
+# - serialized strings never contain ' ' or '\n'
+#
+#####################################################################
+
+_quoter_map = {}
+for c in '%[(,)] \n':
+    _quoter_map[c] = '%%%02x' % ord(c)
+del c
+
+def _dumps(v):
+    if v is None: return 'None'
+    if v is True: return 'True'
+    if v is False: return 'False'
+    if type(v) in (int, long, float):
+        return str(v)
+    if type(v) == unicode:
+        v = v.encode('UTF8')
+    if type(v) == str:
+        def quoter(c): return _quoter_map.get(c, c)
+        return "'%s'" % ''.join(map(quoter, v))
+    if type(v) == tuple:
+        return "(%s)" % ','.join(map(_dumps, v))
+    if type(v) == list:
+        return "[%s]" % ','.join(map(_dumps, v))
+    raise TypeError, 'Can\'t serialize %s' % v
+
+def _loads(s):
+    def decode(v):
+        if v == 'None': return None
+        if v == 'True': return True
+        if v == 'False': return False
+        try: return int(v)
+        except ValueError: pass
+        try: return float(v)
+        except ValueError: pass
+        if len(v) >= 2 and v[0] == v[-1] == "'":
+            ret = []; i = 1
+            while True:
+                j = v.find('%', i)
+                ret.append(v[i:j]) # skips the final "'"
+                if j == -1: break
+                ret.append(chr(int(v[j + 1:j + 3], 16)))
+                i = j + 3
+            v = ''.join(ret)
+        return v
+    stk = None
+    l = []
+    i = j = 0
+    while True:
+        if j == len(s) or s[j] in ',)]':
+            if j > i:
+                l.append(decode(s[i:j]))
+            if j == len(s): break
+            if s[j] in ')]':
+                if s[j] == ')':
+                    l = tuple(l)
+                stk[0].append(l)
+                l, stk = stk
+            i = j = j + 1
+        elif s[j] in '[(':
+            stk = l, stk
+            l = []
+            i = j = j + 1
+        else:
+            j += 1 # safe because '[(,)]' are quoted
+    if stk: raise ValueError
+    if len(l) == 1: l = l[0]
+    return l
+
 
 #####################################################################
 #  High level async API
commit c31c49ca9edc9d225743b62f0cde79aa7f8456c6
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Tue Oct 25 13:54:43 2011 +0200

    Reuse curl objects (per host)

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index d2f2a54..6563d0e 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1889,11 +1889,21 @@ def retrygrab(url, filename=None, copy_local=0, close_connection=0,
 #####################################################################
 
 class _AsyncCurlFile(PyCurlFileObject):
+    curl_cache = {}
+    def _host(self):
+        return urlparse.urlsplit(self.opts.url).netloc
+
     def _do_open(self):
-        self.curl_obj = pycurl.Curl() # don't reuse _curl_cache
+        # try to reuse curl objects
+        curl = self.curl_cache.pop(self._host(), None)
+        self.curl_obj = curl or pycurl.Curl()
         self._set_opts()
         self._do_open_fo() # open the file but don't grab
 
+    def _do_close_fo(self):
+        self.curl_cache[self._host()] = self.curl_obj
+        PyCurlFileObject._do_close_fo(self)
+
 class _DirectDownloader:
     def __init__(self):
         ''' A downloader context.
commit 4c401b7a30b670c52c336c7467b746b74023ed52
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Tue Oct 4 17:24:20 2011 +0200

    Implement parallel urlgrab()s
    
    opts.async = (key, limit):
        async urlgrab() with conn limiting.
    
    parallel_wait():
        wait untill all grabs have finished.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 2c1e929..d2f2a54 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -263,6 +263,12 @@ GENERAL ARGUMENTS (kwargs)
     What type of name to IP resolving to use, default is to do both IPV4 and
     IPV6.
 
+  async = (key, limit)
+
+    When this option is set, the urlgrab() is not processed immediately
+    but queued.  parallel_wait() then processes grabs in parallel, limiting
+    the numer of connections in each 'key' group to at most 'limit'.
+
 
 RETRY RELATED ARGUMENTS
 
@@ -923,6 +929,7 @@ class URLGrabberOptions:
         self.size = None # if we know how big the thing we're getting is going
                          # to be. this is ultimately a MAXIMUM size for the file
         self.max_header_size = 2097152 #2mb seems reasonable for maximum header size
+        self.async = None # blocking by default
         
     def __repr__(self):
         return self.format()
@@ -1065,6 +1072,15 @@ class URLGrabber(object):
                     _run_callback(opts.checkfunc, obj)
                 return path
         
+        if opts.async:
+            opts.url = url
+            opts.filename = filename
+            opts.size = int(opts.size or 0)
+            key, limit = opts.async
+            limit, queue = _async.setdefault(key, [limit, []])
+            queue.append(opts)
+            return filename
+
         def retryfunc(opts, url, filename):
             fo = PyCurlFileObject(url, filename, opts)
             try:
@@ -1869,6 +1885,173 @@ def retrygrab(url, filename=None, copy_local=0, close_connection=0,
 
         
 #####################################################################
+#  Downloader
+#####################################################################
+
+class _AsyncCurlFile(PyCurlFileObject):
+    def _do_open(self):
+        self.curl_obj = pycurl.Curl() # don't reuse _curl_cache
+        self._set_opts()
+        self._do_open_fo() # open the file but don't grab
+
+class _DirectDownloader:
+    def __init__(self):
+        ''' A downloader context.
+        '''
+        self.running = {}
+        self.multi = pycurl.CurlMulti()
+
+    def start(self, opts):
+        ''' Start download of job 'opts'
+        '''
+        fo = _AsyncCurlFile(opts.url, opts.filename, opts)
+        self.running[fo.curl_obj] = fo
+        self.multi.add_handle(fo.curl_obj)
+
+    def perform(self):
+        ''' Run downloads, return finished ones.
+        '''
+        while self.multi.perform()[0] == pycurl.E_CALL_MULTI_PERFORM:
+            pass
+        ret = []
+        _, finished, failed = self.multi.info_read()
+        for curl in finished + failed:
+            curl_err = None
+            if type(curl) == tuple:
+                curl, code, msg = curl
+                curl_err = pycurl.error(code, msg)
+            self.multi.remove_handle(curl)
+            fo = self.running.pop(curl)
+            try: ug_err = None; fo._do_perform_exc(curl_err)
+            except URLGrabError, ug_err: pass
+            fo._do_close_fo()
+            ret.append((fo.opts, ug_err, fo._amount_read))
+        return ret
+
+    def abort(self):
+        while self.running:
+            curl, fo = self.running.popitem()
+            self.multi.remove_handle(curl)
+            fo._do_close_fo()
+
+
+#####################################################################
+#  High level async API
+#####################################################################
+
+_async = {}
+
+def parallel_wait(meter = 'text'):
+    '''Process queued requests in parallel.
+    '''
+
+    if meter:
+        count = total = 0
+        for limit, queue in _async.values():
+            for opts in queue:
+                count += 1
+                total += opts.size
+        if meter == 'text':
+            from progress import TextMultiFileMeter
+            meter = TextMultiFileMeter()
+        meter.start(count, total)
+
+    dl = _DirectDownloader()
+
+    def start(opts, tries):
+        opts.tries = tries
+        opts.progress_obj = meter and meter.newMeter()
+        if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, opts.url)
+        dl.start(opts)
+
+    def start_next(opts):
+        key, limit = opts.async
+        pos, queue = _async[key]; _async[key][0] += 1
+        if pos < len(queue):
+            start(queue[pos], 1)
+
+    def perform():
+        for opts, ug_err, _amount_read in dl.perform():
+            if meter:
+                m = opts.progress_obj
+                m.basename = os.path.basename(opts.filename)
+                if ug_err:
+                    m.failure(ug_err.args[1])
+                else:
+                    # file size might have changed
+                    meter.re.total += _amount_read - opts.size
+                    m.end(_amount_read)
+                meter.removeMeter(m)
+
+            if ug_err is None:
+                if DEBUG: DEBUG.info('success')
+                if opts.checkfunc:
+                    try: _run_callback(opts.checkfunc, opts)
+                    except URLGrabError, ug_err:
+                        if meter:
+                            meter.numfiles += 1
+                            meter.re.total += opts.size
+                if ug_err is None:
+                    start_next(opts)
+                    continue
+
+            if DEBUG: DEBUG.info('failure: %s', ug_err)
+            retry = opts.retry or 0
+            if opts.failure_callback:
+                opts.exception = ug_err
+                try: _run_callback(opts.failure_callback, opts)
+                except URLGrabError, ug_err: retry = 0 # no retries
+            if opts.tries < retry and ug_err.args[0] in opts.retrycodes:
+                start(opts, opts.tries + 1) # simple retry
+                continue
+            start_next(opts)
+
+            if hasattr(opts, 'mirror_group'):
+                mg, gr, mirrorchoice = opts.mirror_group
+                opts.exception = ug_err
+                opts.mirror = mirrorchoice['mirror']
+                opts.relative_url = gr.url
+                try:
+                    mg._failure(gr, opts)
+                    mirrorchoice = mg._get_mirror(gr)
+                    opts.mirror_group = mg, gr, mirrorchoice
+                except URLGrabError, ug_err: pass
+                else:
+                    # use new mirrorchoice
+                    key = mirrorchoice['mirror']
+                    kwargs = mirrorchoice.get('kwargs', {})
+                    limit = kwargs.get('max_connections') or 3
+                    opts.async = key, limit
+                    opts.url = mg._join_url(mirrorchoice['mirror'], gr.url)
+
+                    # add request to the new queue
+                    pos, queue = _async.setdefault(key, [limit, []])
+                    queue[pos:pos] = [opts] # inserting at head
+                    if len(queue) <= pos:
+                        start(opts, 1)
+                    continue
+
+            # urlgrab failed
+            opts.exception = ug_err
+            _run_callback(opts.failfunc, opts)
+
+    try:
+        for limit, queue in _async.values():
+            for opts in queue[:limit]:
+                start(opts, 1)
+        # now 'limit' is used as 'pos', index
+        # of the first request not started yet.
+        while dl.running:
+            perform()
+
+    finally:
+        dl.abort()
+        _async.clear()
+        if meter:
+            meter.end()
+
+
+#####################################################################
 #  TESTING
 def _main_test():
     try: url, filename = sys.argv[1:3]
diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py
index 575ef7b..41c99f5 100644
--- a/urlgrabber/mirror.py
+++ b/urlgrabber/mirror.py
@@ -76,6 +76,9 @@ CUSTOMIZATION
        'grabber' is omitted, the default grabber will be used.  If
        kwargs are omitted, then (duh) they will not be used.
 
+       kwarg 'max_connections' is used to store the max connection
+       limit of this mirror, and to update the value of 'async' option.
+
     3) Pass keyword arguments when instantiating the mirror group.
        See, for example, the failure_callback argument.
 
@@ -392,6 +395,14 @@ class MirrorGroup:
             grabber = mirrorchoice.get('grabber') or self.grabber
             func_ref = getattr(grabber, func)
             if DEBUG: DEBUG.info('MIRROR: trying %s -> %s', url, fullurl)
+            if kw.get('async'):
+                # 'async' option to the 1st mirror
+                key = mirrorchoice['mirror']
+                limit = kwargs.get('max_connections') or 3
+                kwargs['async'] = key, limit
+                # async code iterates mirrors and calls failfunc
+                kwargs['mirror_group'] = self, gr, mirrorchoice
+                kwargs['failfunc'] = gr.kw.get('failfunc', _do_raise)
             try:
                 return func_ref( *(fullurl,), **kwargs )
             except URLGrabError, e:
commit 67f668261a525e07c3ee3ba9baa7b720ebb9a933
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Fri Oct 7 12:37:00 2011 +0200

    Obsolete the _make_callback() method
    
    Use _run_callback() instead.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 32de698..2c1e929 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -996,10 +996,9 @@ class URLGrabber(object):
             if DEBUG: DEBUG.info('exception: %s', exception)
             if callback:
                 if DEBUG: DEBUG.info('calling callback: %s', callback)
-                cb_func, cb_args, cb_kwargs = self._make_callback(callback)
                 obj = CallbackObject(exception=exception, url=args[0],
                                      tries=tries, retry=opts.retry)
-                cb_func(obj, *cb_args, **cb_kwargs)
+                _run_callback(callback, obj)
 
             if (opts.retry is None) or (tries == opts.retry):
                 if DEBUG: DEBUG.info('retries exceeded, re-raising')
@@ -1062,12 +1061,8 @@ class URLGrabber(object):
 
             elif not opts.range:
                 if not opts.checkfunc is None:
-                    cb_func, cb_args, cb_kwargs = \
-                       self._make_callback(opts.checkfunc)
-                    obj = CallbackObject()
-                    obj.filename = path
-                    obj.url = url
-                    apply(cb_func, (obj, )+cb_args, cb_kwargs)        
+                    obj = CallbackObject(filename=path, url=url)
+                    _run_callback(opts.checkfunc, obj)
                 return path
         
         def retryfunc(opts, url, filename):
@@ -1075,12 +1070,8 @@ class URLGrabber(object):
             try:
                 fo._do_grab()
                 if not opts.checkfunc is None:
-                    cb_func, cb_args, cb_kwargs = \
-                             self._make_callback(opts.checkfunc)
-                    obj = CallbackObject()
-                    obj.filename = filename
-                    obj.url = url
-                    apply(cb_func, (obj, )+cb_args, cb_kwargs)
+                    obj = CallbackObject(filename=filename, url=url)
+                    _run_callback(opts.checkfunc, obj)
             finally:
                 fo.close()
             return filename
@@ -1118,12 +1109,8 @@ class URLGrabber(object):
                 else: s = fo.read(limit)
 
                 if not opts.checkfunc is None:
-                    cb_func, cb_args, cb_kwargs = \
-                             self._make_callback(opts.checkfunc)
-                    obj = CallbackObject()
-                    obj.data = s
-                    obj.url = url
-                    apply(cb_func, (obj, )+cb_args, cb_kwargs)
+                    obj = CallbackObject(data=s, url=url)
+                    _run_callback(opts.checkfunc, obj)
             finally:
                 fo.close()
             return s
@@ -1138,6 +1125,7 @@ class URLGrabber(object):
         return s
         
     def _make_callback(self, callback_obj):
+        # not used, left for compatibility
         if callable(callback_obj):
             return callback_obj, (), {}
         else:
commit 1299f52e072dc1bec20039227ef289ea592e45eb
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Tue Sep 6 14:41:27 2011 +0200

    Implement 'failfunc' callback.
    
    This callback is called when urlgrab request fails.
    If grab is wrapped in a mirror group, only the mirror
    group issues the callback.

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 38ae1f7..32de698 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -343,6 +343,15 @@ RETRY RELATED ARGUMENTS
     but it cannot (without severe trickiness) prevent the exception
     from being raised.
 
+  failfunc = None
+
+    The callback that gets called when urlgrab request fails.
+    If defined, urlgrab() calls it instead of raising URLGrabError.
+    Callback syntax is identical to failure_callback.
+
+    Contrary to failure_callback, it's called only once.  It's primary
+    purpose is to use urlgrab() without a try/except block.
+
   interrupt_callback = None
 
     This callback is called if KeyboardInterrupt is received at any
@@ -878,6 +887,7 @@ class URLGrabberOptions:
         self.retry = None
         self.retrycodes = [-1,2,4,5,6,7]
         self.checkfunc = None
+        self.failfunc = _do_raise
         self.copy_local = 0
         self.close_connection = 0
         self.range = None
@@ -932,6 +942,15 @@ class URLGrabberOptions:
         s = s + indent + '}'
         return s
 
+def _do_raise(obj):
+    raise obj.exception
+
+def _run_callback(cb, obj):
+    if callable(cb):
+        return cb(obj)
+    cb, arg, karg = cb
+    return cb(obj, *arg, **karg)
+
 class URLGrabber(object):
     """Provides easy opening of URLs with a variety of options.
     
@@ -1066,7 +1085,11 @@ class URLGrabber(object):
                 fo.close()
             return filename
         
-        return self._retry(opts, retryfunc, url, filename)
+        try:
+            return self._retry(opts, retryfunc, url, filename)
+        except URLGrabError, e:
+            opts.exception = e
+            return _run_callback(opts.failfunc, opts)
     
     def urlread(self, url, limit=None, **kwargs):
         """read the url into a string, up to 'limit' bytes
diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py
index 8731aed..575ef7b 100644
--- a/urlgrabber/mirror.py
+++ b/urlgrabber/mirror.py
@@ -91,6 +91,7 @@ import random
 import thread  # needed for locking to make this threadsafe
 
 from grabber import URLGrabError, CallbackObject, DEBUG, _to_utf8
+from grabber import _run_callback, _do_raise
 
 def _(st): 
     return st
@@ -254,7 +255,7 @@ class MirrorGroup:
     # if these values are found in **kwargs passed to one of the urlXXX
     # methods, they will be stripped before getting passed on to the
     # grabber
-    options = ['default_action', 'failure_callback']
+    options = ['default_action', 'failure_callback', 'failfunc']
     
     def _process_kwargs(self, kwargs):
         self.failure_callback = kwargs.get('failure_callback')
@@ -406,7 +407,11 @@ class MirrorGroup:
         kw = dict(kwargs)
         kw['filename'] = filename
         func = 'urlgrab'
-        return self._mirror_try(func, url, kw)
+        try:
+            return self._mirror_try(func, url, kw)
+        except URLGrabError, e:
+            obj = CallbackObject(url=url, filename=filename, exception=e, **kwargs)
+            return _run_callback(kwargs.get('failfunc', _do_raise), obj)
     
     def urlopen(self, url, **kwargs):
         kw = dict(kwargs)
commit 216b75352f672647315c96a36d1bd7f19648d5e2
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Tue Oct 25 12:52:23 2011 +0200

    TextMultiFileMeter: minor tweaks
    
    remove _do_end(), because individual finished files were already
    handled in end_meter, and _do_update_meter(None) fails.
    
    remove _do_update_meter() at end of _do_end_meter().
    we already have bumped finished_files counter, and
    _do_update_meter() would report N+1 -th download.

diff --git a/urlgrabber/progress.py b/urlgrabber/progress.py
index 3d7e99a..4c126c5 100644
--- a/urlgrabber/progress.py
+++ b/urlgrabber/progress.py
@@ -576,7 +576,6 @@ class TextMultiFileMeter(MultiFileMeter):
             self.fo.write(out)
         finally:
             self._lock.release()
-        self._do_update_meter(meter, now)
 
     def _do_failure_meter(self, meter, message, now):
         self._lock.acquire()
@@ -599,15 +598,6 @@ class TextMultiFileMeter(MultiFileMeter):
             pass
         finally:
             self._lock.release()
-
-    def _do_end(self, now):
-        self._do_update_meter(None, now)
-        self._lock.acquire()
-        try:
-            self.fo.write('\n')
-            self.fo.flush()
-        finally:
-            self._lock.release()
         
 ######################################################################
 # support classes and functions


More information about the Yum-commits mailing list