[yum-commits] Branch 'multi-downloader' - urlgrabber/grabber.py

zpavlas at osuosl.org zpavlas at osuosl.org
Thu Jan 26 15:54:03 UTC 2012


 urlgrabber/grabber.py |  103 +++++++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 89 insertions(+), 14 deletions(-)

New commits:
commit d121115287f7e40a0199f9338df803bf62d4a8b2
Author: Zdeněk Pavlas <zpavlas at redhat.com>
Date:   Thu Jan 26 15:08:01 2012 +0100

    smart(er) mirror selection
    
    There's a (speed, failures, timestamp) tuple assigned to each host
    we've downloaded from.  Successful downloads update 'speed' and reset
    the failure counter to zero.  Failures just increment the counter.
    Timestamp is always updated.  Downloads smaller than 1MB update
    the speed only partially.
    
    When selecting a mirror, find the best one, using the above
    mentioned information.  Following also comes to play:
    
    - each failure in a row halves the speed estimate
    - unknown or too old mirrors regress to default speed
    - already used mirrors are assumed to be slower

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 74c9fc7..1f7feb2 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -269,6 +269,21 @@ GENERAL ARGUMENTS (kwargs)
 
     The global connection limit.
 
+  timedhosts
+
+    The filename of the host download statistics.  If defined, urlgrabber
+    will update the stats at the end of every download.
+
+  default_speed, half_life
+
+    These options only affect the async mirror selection code.
+    The `default-speed` option sets the speed estimate for mirrors
+    we have never downloaded from, and defaults to 1 MBps.
+
+    The speed estimate also drifts exponentially from the speed
+    actually measured to the default speed, with the default half_life
+    period of 30 days.
+
 
 RETRY RELATED ARGUMENTS
 
@@ -895,6 +910,9 @@ class URLGrabberOptions:
         self.async = None # blocking by default
         self.mirror_group = None
         self.max_connections = 5
+        self.timedhosts = None
+        self.half_life = 30*24*60*60 # 30 days
+        self.default_speed = 1e6 # 1 MBit
         
     def __repr__(self):
         return self.format()
@@ -2136,6 +2154,7 @@ def parallel_wait(meter = 'text'):
 
     try:
         idx = 0
+        th_dict = timedhosts_get() or {}
         while True:
             if idx >= len(_async_queue):
                 if not dl.running: break
@@ -2147,25 +2166,33 @@ def parallel_wait(meter = 'text'):
             while len(dl.running) >= limit: perform()
 
             if opts.mirror_group:
-                first = None
+                best = None
                 mg, failed = opts.mirror_group
                 for mirror in mg.mirrors:
                     key = mirror['mirror']
                     if key in failed: continue
-                    if not first: first = mirror
-                    limit = mirror.get('kwargs', {}).get('max_connections', 3)
-                    if host_con.get(key, 0) < limit: break
-                else:
-                    # no mirror with free slots.
-                    if not first:
-                        opts.exception = URLGrabError(256, _('No more mirrors to try.'))
-                        _run_callback(opts.failfunc, opts)
-                        continue
-                    mirror = first # fallback
-                    key = mirror['mirror']
-                    limit = mirror.get('kwargs', {}).get('max_connections', 3)
+
+                    # estimate mirror speed
+                    host = urlparse.urlsplit(key).netloc
+                    if host in th_dict:
+                        speed, fail, ts = th_dict[host]
+                        speed *= 2**-fail
+                        k = 2**((ts - time.time()) / opts.half_life)
+                    else:
+                        speed = k = 0.0
+                    speed = k * speed + (1 - k) * opts.default_speed
+                    speed /= 1 + host_con.get(key, 0)
+                    if best is None or speed > best_speed:
+                        best, best_speed = mirror, speed
+
+                if best is None:
+                    opts.exception = URLGrabError(256, _('No more mirrors to try.'))
+                    _run_callback(opts.failfunc, opts)
+                    continue
 
                 # update the request
+                key = best['mirror']
+                limit = best.get('kwargs', {}).get('max_connections', 3)
                 opts.async = key, limit
                 opts.url = mg._join_url(key, opts.relative_url)
 
@@ -2178,15 +2205,63 @@ def parallel_wait(meter = 'text'):
         dl.abort()
         if meter: meter.end()
         del _async_queue[:]
+        timedhosts_flush()
 
 
 #####################################################################
 #  Host bandwidth estimation
 #####################################################################
 
-def timedhosts(url, dl_size, dl_time, eg_err):
+_th_dict = {}
+_th_dirty = None
+
+def timedhosts_get():
+    ''' Return the timedhosts hash, loading it if necessary '''
+
+    global _th_dirty
+    if _th_dirty is None:
+        th = default_grabber.opts.timedhosts
+        if not th: return
+        try:
+            for line in open(th):
+                host, speed, fail, ts = line.split()
+                _th_dict[host] = float(speed), int(fail), int(ts)
+        except IOError: pass
+        _th_dirty = False
+    return _th_dict
+
+def timedhosts_flush():
+    ''' Save the timedhosts hash, if dirty '''
+
+    global _th_dirty
+    if _th_dirty is True:
+        try:
+            write = open(default_grabber.opts.timedhosts, 'w').write
+            for host in _th_dict:
+                write(host + ' %d %d %d\n' % _th_dict[host])
+        except IOError: pass
+        _th_dirty = False
+
+def timedhosts(url, dl_size, dl_time, ug_err):
     ''' Called when download finishes '''
 
+    # get old state
+    th = timedhosts_get()
+    if th is None: return
+    host = urlparse.urlsplit(url).netloc
+    speed, fail, ts = th.get(host) or (0, 0, 0)
+
+    # update statistics
+    if ug_err is None:
+        delta = dl_size / dl_time - speed
+        speed += delta * min(1, dl_size / 1e6)
+        fail = 0
+    else:
+        fail += 1
+
+    th[host] = speed, fail, int(time.time())
+    global _th_dirty; _th_dirty = True
+
 #####################################################################
 #  TESTING
 def _main_test():


More information about the Yum-commits mailing list