[yum-commits] Branch 'multi-downloader' - 2 commits - urlgrabber/grabber.py

zpavlas at osuosl.org zpavlas at osuosl.org
Tue Jan 31 12:18:22 UTC 2012


 urlgrabber/grabber.py |  149 ++++++++++++++++++++++++++++----------------------
 1 file changed, 84 insertions(+), 65 deletions(-)

New commits:
commit aa040f88889073cd29f35dc3e549b2010fe0947a
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Tue Jan 31 12:20:07 2012 +0100

    code reformat

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 4623360..096b7ca 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -2134,7 +2134,8 @@ def parallel_wait(meter = 'text'):
             if opts.failure_callback:
                 opts.exception = ug_err
                 try: _run_callback(opts.failure_callback, opts)
-                except URLGrabError, ug_err: retry = 0 # no retries
+                except URLGrabError, ug_err:
+                    retry = 0 # no retries
             if opts.tries < retry and ug_err.args[0] in opts.retrycodes:
                 start(opts, opts.tries + 1) # simple retry
                 continue
@@ -2158,17 +2159,25 @@ def parallel_wait(meter = 'text'):
         idx = 0
         while True:
             if idx >= len(_async_queue):
+                # the queue is empty
                 if not dl.running: break
-                perform(); continue
+                # pending dl may extend it
+                perform()
+                continue
+
+            # handle next request
+            opts = _async_queue[idx]
+            idx += 1
 
             # check global limit
-            opts = _async_queue[idx]; idx += 1
-            limit = opts.max_connections
-            while len(dl.running) >= limit: perform()
+            while len(dl.running) >= opts.max_connections:
+                perform()
 
             if opts.mirror_group:
-                best = None
                 mg, failed = opts.mirror_group
+
+                # find the best mirror
+                best = None
                 for mirror in mg.mirrors:
                     key = mirror['mirror']
                     if key in failed: continue
@@ -2177,7 +2186,8 @@ def parallel_wait(meter = 'text'):
                     speed = _TH.estimate(key)
                     speed /= 1 + host_con.get(key, 0)
                     if best is None or speed > best_speed:
-                        best, best_speed = mirror, speed
+                        best = mirror
+                        best_speed = speed
 
                 if best is None:
                     opts.exception = URLGrabError(256, _('No more mirrors to try.'))
@@ -2192,7 +2202,8 @@ def parallel_wait(meter = 'text'):
 
             # check host limit, then start
             key, limit = opts.async
-            while host_con.get(key, 0) >= limit: perform()
+            while host_con.get(key, 0) >= limit:
+                perform()
             start(opts, 1)
 
     finally:
commit 72512c889809aea1fd7e7ab80bab53ead9d8460c
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Tue Jan 31 11:57:15 2012 +0100

    'timedhosts' option updates
    
    Global symbols now in a private _TH class.
    HTTP error 404 is not really a host failure.
    Added a new method estimate(url).

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 2c1257c..4623360 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1069,7 +1069,7 @@ class URLGrabber(object):
             fo = PyCurlFileObject(url, filename, opts)
             try:
                 fo._do_grab()
-                timedhosts(url, fo._amount_read - fo._reget_length, time.time() - tm, None)
+                _TH.update(url, fo._amount_read - fo._reget_length, time.time() - tm, None)
                 if not opts.checkfunc is None:
                     obj = CallbackObject(filename=filename, url=url)
                     _run_callback(opts.checkfunc, obj)
@@ -1080,7 +1080,7 @@ class URLGrabber(object):
         try:
             return self._retry(opts, retryfunc, url, filename)
         except URLGrabError, e:
-            timedhosts(url, 0, 0, e)
+            _TH.update(url, 0, 0, e)
             opts.exception = e
             return _run_callback(opts.failfunc, opts)
     
@@ -2027,7 +2027,7 @@ class _ExternalDownloader:
             else:
                 ug_err = URLGrabError(int(line[4]), line[5])
                 if DEBUG: DEBUG.info('failure: %s', err)
-            timedhosts(opts.url, int(line[2]), float(line[3]), ug_err)
+            _TH.update(opts.url, int(line[2]), float(line[3]), ug_err)
             ret.append((opts, size, ug_err))
         return ret
 
@@ -2156,7 +2156,6 @@ def parallel_wait(meter = 'text'):
 
     try:
         idx = 0
-        th_load()
         while True:
             if idx >= len(_async_queue):
                 if not dl.running: break
@@ -2175,14 +2174,7 @@ def parallel_wait(meter = 'text'):
                     if key in failed: continue
 
                     # estimate mirror speed
-                    host = urlparse.urlsplit(key).netloc
-                    if host in th_dict:
-                        speed, fail, ts = th_dict[host]
-                        speed *= 2**-fail
-                        k = 2**((ts - time.time()) / opts.half_life)
-                    else:
-                        speed = k = 0
-                    speed = k * speed + (1 - k) * opts.default_speed
+                    speed = _TH.estimate(key)
                     speed /= 1 + host_con.get(key, 0)
                     if best is None or speed > best_speed:
                         best, best_speed = mirror, speed
@@ -2207,58 +2199,74 @@ def parallel_wait(meter = 'text'):
         dl.abort()
         if meter: meter.end()
         del _async_queue[:]
-        th_save()
+        _TH.save()
 
 
 #####################################################################
 #  Host bandwidth estimation
 #####################################################################
 
-th_dict = {}
+class _TH:
+    hosts = {}
+    dirty = None
 
-def th_load():
-    filename = default_grabber.opts.timedhosts
-    if filename and '_dirty' not in th_dict:
-        try:
-            for line in open(filename):
-                host, speed, fail, ts = line.split()
-                th_dict[host] = float(speed), int(fail), int(ts)
-        except IOError: pass
-        th_dict['_dirty'] = False
-
-def th_save():
-    filename = default_grabber.opts.timedhosts
-    if filename and th_dict['_dirty'] is True:
-        del th_dict['_dirty']
-        try:
-            write = open(default_grabber.opts.timedhosts, 'w').write
-            for host in th_dict:
-                write(host + ' %d %d %d\n' % th_dict[host])
-        except IOError: pass
-        th_dict['_dirty'] = False
-
-def timedhosts(url, dl_size, dl_time, ug_err):
-    ''' Called when download finishes '''
-
-    th_load()
-    host = urlparse.urlsplit(url).netloc
-    speed, fail, ts = th_dict.get(host) or \
-        (default_grabber.opts.default_speed, 1, 0)
-    now = time.time()
-
-    if ug_err is None:
-        # k1: the older, the less useful
-        # k2: if it was <1MB, don't trust it much
-        # speeds vary a lot, use 10:1 smoothing
-        k1 = 2**((ts - now) / default_grabber.opts.half_life)
-        k2 = min(dl_size / 1e6, 1) / 10.0
-        speed = (k1 * speed + k2 * dl_size / dl_time) / (k1 + k2)
-        fail = 0
-    else:
-        fail += 1
-
-    th_dict[host] = speed, fail, int(now)
-    th_dict['_dirty'] = True
+    @staticmethod
+    def load():
+        filename = default_grabber.opts.timedhosts
+        if filename and _TH.dirty is None:
+            try:
+                for line in open(filename):
+                    host, speed, fail, ts = line.split()
+                    _TH.hosts[host] = int(speed), int(fail), int(ts)
+            except IOError: pass
+            _TH.dirty = False
+
+    @staticmethod
+    def save():
+        filename = default_grabber.opts.timedhosts
+        if filename and _TH.dirty is True:
+            try:
+                write = open(filename, 'w').write
+                for host in _TH.hosts:
+                    write(host + ' %d %d %d\n' % _TH.hosts[host])
+            except IOError: pass
+            _TH.dirty = False
+
+    @staticmethod
+    def update(url, dl_size, dl_time, ug_err):
+        _TH.load()
+        host = urlparse.urlsplit(url).netloc
+        speed, fail, ts = _TH.hosts.get(host) or (0, 0, 0)
+        now = time.time()
+
+        if ug_err is None:
+            # k1: the older, the less useful
+            # k2: if it was <1MiB, don't trust it much
+            # speeds vary, use 10:1 smoothing
+            k1 = 2**((ts - now) / default_grabber.opts.half_life)
+            k2 = min(dl_size / 1e6, 1.0) / 10
+            speed = (k1 * speed + k2 * dl_size / dl_time) / (k1 + k2)
+            fail = 0
+        elif ug_err.code == 404:
+            fail = 0 # alive, at least
+        else:
+            fail += 1 # seems dead
+
+        _TH.hosts[host] = speed, fail, now
+        _TH.dirty = True
+
+    @staticmethod
+    def estimate(url):
+        _TH.load()
+        host = urlparse.urlsplit(url).netloc
+        default_speed = default_grabber.opts.default_speed
+        try: speed, fail, ts = _TH.hosts[host]
+        except KeyError: return default_speed
+
+        speed *= 2**-fail
+        k = 2**((ts - time.time()) / default_grabber.opts.half_life)
+        speed = k * speed + (1 - k) * default_speed
+        return speed
 
 #####################################################################
 #  TESTING


More information about the Yum-commits mailing list