[Yum-devel] [PATCH] Dynamic mirror selection
Zdeněk Pavlas
zpavlas at redhat.com
Wed Jan 25 10:23:16 UTC 2012
- Dropped the MG's 'max_connections' option, added a global one.
- Single download queue, mirror selected as late as possible.
- Merged request dispatch & flushing loops.
Also, dropped the 'GrabRequest' instance, which originally contained
a copy of all mirrors. Replaced that with a set of failed mirrors.
- No need to clone the mirror list to each request.
- Now the master list could be shuffled during downloads,
and further mirror selections could act on that.
---
urlgrabber/grabber.py | 111 ++++++++++++++++++++++++++++--------------------
urlgrabber/mirror.py | 41 ++++++------------
2 files changed, 78 insertions(+), 74 deletions(-)
diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index e138273..00c5f6f 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -265,6 +265,10 @@ GENERAL ARGUMENTS (kwargs)
but queued. parallel_wait() then processes grabs in parallel, limiting
the numer of connections in each 'key' group to at most 'limit'.
+ max_connections
+
+ The global connection limit.
+
RETRY RELATED ARGUMENTS
@@ -889,6 +893,8 @@ class URLGrabberOptions:
# to be. this is ultimately a MAXIMUM size for the file
self.max_header_size = 2097152 #2mb seems reasonable for maximum header size
self.async = None # blocking by default
+ self.mirror_group = None
+ self.max_connections = 5
def __repr__(self):
return self.format()
@@ -912,6 +918,8 @@ def _do_raise(obj):
raise obj.exception
def _run_callback(cb, obj):
+ if not cb:
+ return
if callable(cb):
return cb(obj)
cb, arg, karg = cb
@@ -1033,9 +1041,7 @@ class URLGrabber(object):
opts.url = url
opts.filename = filename
opts.size = int(opts.size or 0)
- key, limit = opts.async
- limit, queue = _async.setdefault(key, [limit, []])
- queue.append(opts)
+ _async_queue.append(opts)
return filename
def retryfunc(opts, url, filename):
@@ -2101,7 +2107,7 @@ class _ExternalDownloaderPool:
# High level async API
#####################################################################
-_async = {}
+_async_queue = []
def parallel_wait(meter = 'text'):
'''Process queued requests in parallel.
@@ -2109,31 +2115,29 @@ def parallel_wait(meter = 'text'):
if meter:
count = total = 0
- for limit, queue in _async.values():
- for opts in queue:
- count += 1
- total += opts.size
+ for opts in _async_queue:
+ count += 1
+ total += opts.size
if meter == 'text':
from progress import TextMultiFileMeter
meter = TextMultiFileMeter()
meter.start(count, total)
dl = _ExternalDownloaderPool()
+ host_con = {} # current host connection counts
def start(opts, tries):
+ key, limit = opts.async
+ host_con[key] = host_con.get(key, 0) + 1
opts.tries = tries
opts.progress_obj = meter and meter.newMeter()
if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, opts.url)
dl.start(opts)
- def start_next(opts):
- key, limit = opts.async
- pos, queue = _async[key]; _async[key][0] += 1
- if pos < len(queue):
- start(queue[pos], 1)
-
def perform():
for opts, size, ug_err in dl.perform():
+ key, limit = opts.async
+ host_con[key] -= 1
if meter:
m = opts.progress_obj
m.basename = os.path.basename(opts.filename)
@@ -2153,7 +2157,6 @@ def parallel_wait(meter = 'text'):
meter.numfiles += 1
meter.re.total += opts.size
if ug_err is None:
- start_next(opts)
continue
retry = opts.retry or 0
@@ -2164,31 +2167,16 @@ def parallel_wait(meter = 'text'):
if opts.tries < retry and ug_err.args[0] in opts.retrycodes:
start(opts, opts.tries + 1) # simple retry
continue
- start_next(opts)
- if hasattr(opts, 'mirror_group'):
- mg, gr, mirrorchoice = opts.mirror_group
+ if opts.mirror_group:
+ mg, failed = opts.mirror_group
+ opts.mirror = key
opts.exception = ug_err
- opts.mirror = mirrorchoice['mirror']
- opts.relative_url = gr.url
- try:
- mg._failure(gr, opts)
- mirrorchoice = mg._get_mirror(gr)
- opts.mirror_group = mg, gr, mirrorchoice
- except URLGrabError, ug_err: pass
- else:
- # use new mirrorchoice
- key = mirrorchoice['mirror']
- kwargs = mirrorchoice.get('kwargs', {})
- limit = kwargs.get('max_connections', 1)
- opts.async = key, limit
- opts.url = mg._join_url(mirrorchoice['mirror'], gr.url)
-
- # add request to the new queue
- pos, queue = _async.setdefault(key, [limit, []])
- queue[pos:pos] = [opts] # inserting at head
- if len(queue) <= pos:
- start(opts, 1)
+ action = _run_callback(mg.failure_callback, opts)
+ if not (action and action.get('fail')):
+ # mask this mirror and retry
+ failed.add(key)
+ _async_queue.append(opts)
continue
# urlgrab failed
@@ -2196,18 +2184,49 @@ def parallel_wait(meter = 'text'):
_run_callback(opts.failfunc, opts)
try:
- for limit, queue in _async.values():
- for opts in queue[:limit]:
- start(opts, 1)
- # now 'limit' is used as 'pos', index
- # of the first request not started yet.
- while dl.running:
- perform()
+ idx = 0
+ while True:
+ if idx >= len(_async_queue):
+ if not dl.running: break
+ perform(); continue
+
+ # check global limit
+ opts = _async_queue[idx]; idx += 1
+ limit = opts.max_connections
+ while len(dl.running) >= limit: perform()
+
+ if opts.mirror_group:
+ first = None
+ mg, failed = opts.mirror_group
+ for mirror in mg.mirrors:
+ key = mirror['mirror']
+ if key in failed: continue
+ if not first: first = mirror
+ limit = mirror.get('kwargs', {}).get('max_connections', 3)
+ if host_con.get(key, 0) < limit: break
+ else:
+ # no mirror with free slots.
+ if not first:
+ opts.exception = URLGrabError(256, _('No more mirrors to try.'))
+ _run_callback(opts.failfunc, opts)
+ continue
+ mirror = first # fallback
+ key = mirror['mirror']
+ limit = mirror.get('kwargs', {}).get('max_connections', 3)
+
+ # update the request
+ opts.async = key, limit
+ opts.url = mg._join_url(key, opts.relative_url)
+
+ # check host limit, then start
+ key, limit = opts.async
+ while host_con.get(key, 0) >= limit: perform()
+ start(opts, 1)
finally:
dl.abort()
if meter: meter.end()
- _async.clear()
+ del _async_queue[:]
#####################################################################
diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py
index da601b7..d699b61 100644
--- a/urlgrabber/mirror.py
+++ b/urlgrabber/mirror.py
@@ -77,7 +77,7 @@ CUSTOMIZATION
kwargs are omitted, then (duh) they will not be used.
kwarg 'max_connections' is used to store the max connection
- limit of this mirror, and to update the value of 'async' option.
+ limit of this mirror.
3) Pass keyword arguments when instantiating the mirror group.
See, for example, the failure_callback argument.
@@ -94,7 +94,7 @@ import random
import thread # needed for locking to make this threadsafe
from grabber import URLGrabError, CallbackObject, DEBUG, _to_utf8
-from grabber import _run_callback, _do_raise
+from grabber import _run_callback, _do_raise, _async_queue
def _(st):
return st
@@ -175,13 +175,6 @@ class MirrorGroup:
'fail': 0} # set at instantiation, reset
# from callback
- max_connections
-
- This option applies to parallel downloading only. If specified,
- downloads are evenly distributed to first N mirrors. N is selected as
- the smallest number of mirrors such that their total connection limit
- is at least max_connections. Retries are not such restricted.
-
failure_callback
this is a callback that will be called when a mirror "fails",
@@ -270,14 +263,6 @@ class MirrorGroup:
def _process_kwargs(self, kwargs):
self.failure_callback = kwargs.get('failure_callback')
self.default_action = kwargs.get('default_action')
- if 'max_connections' in kwargs:
- total = count = 0
- for mirror in self.mirrors:
- count += 1
- total += mirror.get('kwargs', {}).get('max_connections', 1)
- if total >= kwargs['max_connections']:
- break
- self._spread = count
def _parse_mirrors(self, mirrors):
parsed_mirrors = []
@@ -410,17 +395,6 @@ class MirrorGroup:
grabber = mirrorchoice.get('grabber') or self.grabber
func_ref = getattr(grabber, func)
if DEBUG: DEBUG.info('MIRROR: trying %s -> %s', url, fullurl)
- if kw.get('async'):
- # 'async' option to the 1st mirror
- key = mirrorchoice['mirror']
- limit = kwargs.get('max_connections', 1)
- kwargs['async'] = key, limit
- # async code iterates mirrors and calls failfunc
- kwargs['mirror_group'] = self, gr, mirrorchoice
- kwargs['failfunc'] = gr.kw.get('failfunc', _do_raise)
- if hasattr(self, '_spread'):
- # increment the master mirror index
- self._next = (self._next + 1) % self._spread
try:
return func_ref( *(fullurl,), **kwargs )
except URLGrabError, e:
@@ -433,6 +407,17 @@ class MirrorGroup:
self._failure(gr, obj)
def urlgrab(self, url, filename=None, **kwargs):
+ if kwargs.get('async'):
+ opts = self.grabber.opts.derive(**kwargs)
+ opts.mirror_group = self, set()
+ opts.relative_url = _to_utf8(url)
+
+ opts.url = 'http://tbd'
+ opts.filename = filename
+ opts.size = int(opts.size or 0)
+ _async_queue.append(opts)
+ return filename
+
kw = dict(kwargs)
kw['filename'] = filename
func = 'urlgrab'
--
1.7.4.4
More information about the Yum-devel
mailing list