[yum-commits] Branch 'multi-downloader' - 2 commits - urlgrabber/grabber.py
zpavlas at osuosl.org
zpavlas at osuosl.org
Tue Jan 31 12:18:22 UTC 2012
urlgrabber/grabber.py | 149 ++++++++++++++++++++++++++++----------------------
1 file changed, 84 insertions(+), 65 deletions(-)
New commits:
commit aa040f88889073cd29f35dc3e549b2010fe0947a
Author: ZdenÄk Pavlas <zpavlas at redhat.com>
Date: Tue Jan 31 12:20:07 2012 +0100
code reformat
diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 4623360..096b7ca 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -2134,7 +2134,8 @@ def parallel_wait(meter = 'text'):
if opts.failure_callback:
opts.exception = ug_err
try: _run_callback(opts.failure_callback, opts)
- except URLGrabError, ug_err: retry = 0 # no retries
+ except URLGrabError, ug_err:
+ retry = 0 # no retries
if opts.tries < retry and ug_err.args[0] in opts.retrycodes:
start(opts, opts.tries + 1) # simple retry
continue
@@ -2158,17 +2159,25 @@ def parallel_wait(meter = 'text'):
idx = 0
while True:
if idx >= len(_async_queue):
+ # the queue is empty
if not dl.running: break
- perform(); continue
+ # pending dl may extend it
+ perform()
+ continue
+
+ # handle next request
+ opts = _async_queue[idx]
+ idx += 1
# check global limit
- opts = _async_queue[idx]; idx += 1
- limit = opts.max_connections
- while len(dl.running) >= limit: perform()
+ while len(dl.running) >= opts.max_connections:
+ perform()
if opts.mirror_group:
- best = None
mg, failed = opts.mirror_group
+
+ # find the best mirror
+ best = None
for mirror in mg.mirrors:
key = mirror['mirror']
if key in failed: continue
@@ -2177,7 +2186,8 @@ def parallel_wait(meter = 'text'):
speed = _TH.estimate(key)
speed /= 1 + host_con.get(key, 0)
if best is None or speed > best_speed:
- best, best_speed = mirror, speed
+ best = mirror
+ best_speed = speed
if best is None:
opts.exception = URLGrabError(256, _('No more mirrors to try.'))
@@ -2192,7 +2202,8 @@ def parallel_wait(meter = 'text'):
# check host limit, then start
key, limit = opts.async
- while host_con.get(key, 0) >= limit: perform()
+ while host_con.get(key, 0) >= limit:
+ perform()
start(opts, 1)
finally:
commit 72512c889809aea1fd7e7ab80bab53ead9d8460c
Author: ZdenÄk Pavlas <zpavlas at redhat.com>
Date: Tue Jan 31 11:57:15 2012 +0100
'timedhosts' option updates
Global symbols now in a private _TH class.
HTTP error 404 is not really a host failure.
Added a new method estimate(url).
diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 2c1257c..4623360 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1069,7 +1069,7 @@ class URLGrabber(object):
fo = PyCurlFileObject(url, filename, opts)
try:
fo._do_grab()
- timedhosts(url, fo._amount_read - fo._reget_length, time.time() - tm, None)
+ _TH.update(url, fo._amount_read - fo._reget_length, time.time() - tm, None)
if not opts.checkfunc is None:
obj = CallbackObject(filename=filename, url=url)
_run_callback(opts.checkfunc, obj)
@@ -1080,7 +1080,7 @@ class URLGrabber(object):
try:
return self._retry(opts, retryfunc, url, filename)
except URLGrabError, e:
- timedhosts(url, 0, 0, e)
+ _TH.update(url, 0, 0, e)
opts.exception = e
return _run_callback(opts.failfunc, opts)
@@ -2027,7 +2027,7 @@ class _ExternalDownloader:
else:
ug_err = URLGrabError(int(line[4]), line[5])
if DEBUG: DEBUG.info('failure: %s', err)
- timedhosts(opts.url, int(line[2]), float(line[3]), ug_err)
+ _TH.update(opts.url, int(line[2]), float(line[3]), ug_err)
ret.append((opts, size, ug_err))
return ret
@@ -2156,7 +2156,6 @@ def parallel_wait(meter = 'text'):
try:
idx = 0
- th_load()
while True:
if idx >= len(_async_queue):
if not dl.running: break
@@ -2175,14 +2174,7 @@ def parallel_wait(meter = 'text'):
if key in failed: continue
# estimate mirror speed
- host = urlparse.urlsplit(key).netloc
- if host in th_dict:
- speed, fail, ts = th_dict[host]
- speed *= 2**-fail
- k = 2**((ts - time.time()) / opts.half_life)
- else:
- speed = k = 0
- speed = k * speed + (1 - k) * opts.default_speed
+ speed = _TH.estimate(key)
speed /= 1 + host_con.get(key, 0)
if best is None or speed > best_speed:
best, best_speed = mirror, speed
@@ -2207,58 +2199,74 @@ def parallel_wait(meter = 'text'):
dl.abort()
if meter: meter.end()
del _async_queue[:]
- th_save()
+ _TH.save()
#####################################################################
# Host bandwidth estimation
#####################################################################
-th_dict = {}
+class _TH:
+ hosts = {}
+ dirty = None
-def th_load():
- filename = default_grabber.opts.timedhosts
- if filename and '_dirty' not in th_dict:
- try:
- for line in open(filename):
- host, speed, fail, ts = line.split()
- th_dict[host] = float(speed), int(fail), int(ts)
- except IOError: pass
- th_dict['_dirty'] = False
-
-def th_save():
- filename = default_grabber.opts.timedhosts
- if filename and th_dict['_dirty'] is True:
- del th_dict['_dirty']
- try:
- write = open(default_grabber.opts.timedhosts, 'w').write
- for host in th_dict:
- write(host + ' %d %d %d\n' % th_dict[host])
- except IOError: pass
- th_dict['_dirty'] = False
-
-def timedhosts(url, dl_size, dl_time, ug_err):
- ''' Called when download finishes '''
-
- th_load()
- host = urlparse.urlsplit(url).netloc
- speed, fail, ts = th_dict.get(host) or \
- (default_grabber.opts.default_speed, 1, 0)
- now = time.time()
-
- if ug_err is None:
- # k1: the older, the less useful
- # k2: if it was <1MB, don't trust it much
- # speeds vary a lot, use 10:1 smoothing
- k1 = 2**((ts - now) / default_grabber.opts.half_life)
- k2 = min(dl_size / 1e6, 1) / 10.0
- speed = (k1 * speed + k2 * dl_size / dl_time) / (k1 + k2)
- fail = 0
- else:
- fail += 1
-
- th_dict[host] = speed, fail, int(now)
- th_dict['_dirty'] = True
+ @staticmethod
+ def load():
+ filename = default_grabber.opts.timedhosts
+ if filename and _TH.dirty is None:
+ try:
+ for line in open(filename):
+ host, speed, fail, ts = line.split()
+ _TH.hosts[host] = int(speed), int(fail), int(ts)
+ except IOError: pass
+ _TH.dirty = False
+
+ @staticmethod
+ def save():
+ filename = default_grabber.opts.timedhosts
+ if filename and _TH.dirty is True:
+ try:
+ write = open(filename, 'w').write
+ for host in _TH.hosts:
+ write(host + ' %d %d %d\n' % _TH.hosts[host])
+ except IOError: pass
+ _TH.dirty = False
+
+ @staticmethod
+ def update(url, dl_size, dl_time, ug_err):
+ _TH.load()
+ host = urlparse.urlsplit(url).netloc
+ speed, fail, ts = _TH.hosts.get(host) or (0, 0, 0)
+ now = time.time()
+
+ if ug_err is None:
+ # k1: the older, the less useful
+ # k2: if it was <1MiB, don't trust it much
+ # speeds vary, use 10:1 smoothing
+ k1 = 2**((ts - now) / default_grabber.opts.half_life)
+ k2 = min(dl_size / 1e6, 1.0) / 10
+ speed = (k1 * speed + k2 * dl_size / dl_time) / (k1 + k2)
+ fail = 0
+ elif ug_err.code == 404:
+ fail = 0 # alive, at least
+ else:
+ fail += 1 # seems dead
+
+ _TH.hosts[host] = speed, fail, now
+ _TH.dirty = True
+
+ @staticmethod
+ def estimate(url):
+ _TH.load()
+ host = urlparse.urlsplit(url).netloc
+ default_speed = default_grabber.opts.default_speed
+ try: speed, fail, ts = _TH.hosts[host]
+ except KeyError: return default_speed
+
+ speed *= 2**-fail
+ k = 2**((ts - time.time()) / default_grabber.opts.half_life)
+ speed = k * speed + (1 - k) * default_speed
+ return speed
#####################################################################
# TESTING
More information about the Yum-commits
mailing list