[Yum-devel] [PATCH] Dynamic mirror selection

Zdeněk Pavlas zpavlas at redhat.com
Wed Jan 25 10:23:16 UTC 2012


- Dropped the MG's 'max_connections' option, added a global one.
- Single download queue, mirror selected as late as possible.
- Merged request dispatch & flushing loops.

Also, dropped the 'GrabRequest' instance, which originally contained
a copy of all mirrors.  Replaced that with a set of failed mirrors.

- No need to clone the mirror list to each request.
- Now the master list could be shuffled during downloads,
  and further mirror selections could act on that.
---
 urlgrabber/grabber.py |  111 ++++++++++++++++++++++++++++--------------------
 urlgrabber/mirror.py  |   41 ++++++------------
 2 files changed, 78 insertions(+), 74 deletions(-)

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index e138273..00c5f6f 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -265,6 +265,10 @@ GENERAL ARGUMENTS (kwargs)
     but queued.  parallel_wait() then processes grabs in parallel, limiting
     the numer of connections in each 'key' group to at most 'limit'.
 
+  max_connections
+
+    The global connection limit.
+
 
 RETRY RELATED ARGUMENTS
 
@@ -889,6 +893,8 @@ class URLGrabberOptions:
                          # to be. this is ultimately a MAXIMUM size for the file
         self.max_header_size = 2097152 #2mb seems reasonable for maximum header size
         self.async = None # blocking by default
+        self.mirror_group = None
+        self.max_connections = 5
         
     def __repr__(self):
         return self.format()
@@ -912,6 +918,8 @@ def _do_raise(obj):
     raise obj.exception
 
 def _run_callback(cb, obj):
+    if not cb:
+        return
     if callable(cb):
         return cb(obj)
     cb, arg, karg = cb
@@ -1033,9 +1041,7 @@ class URLGrabber(object):
             opts.url = url
             opts.filename = filename
             opts.size = int(opts.size or 0)
-            key, limit = opts.async
-            limit, queue = _async.setdefault(key, [limit, []])
-            queue.append(opts)
+            _async_queue.append(opts)
             return filename
 
         def retryfunc(opts, url, filename):
@@ -2101,7 +2107,7 @@ class _ExternalDownloaderPool:
 #  High level async API
 #####################################################################
 
-_async = {}
+_async_queue = []
 
 def parallel_wait(meter = 'text'):
     '''Process queued requests in parallel.
@@ -2109,31 +2115,29 @@ def parallel_wait(meter = 'text'):
 
     if meter:
         count = total = 0
-        for limit, queue in _async.values():
-            for opts in queue:
-                count += 1
-                total += opts.size
+        for opts in _async_queue:
+            count += 1
+            total += opts.size
         if meter == 'text':
             from progress import TextMultiFileMeter
             meter = TextMultiFileMeter()
         meter.start(count, total)
 
     dl = _ExternalDownloaderPool()
+    host_con = {} # current host connection counts
 
     def start(opts, tries):
+        key, limit = opts.async
+        host_con[key] = host_con.get(key, 0) + 1
         opts.tries = tries
         opts.progress_obj = meter and meter.newMeter()
         if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, opts.url)
         dl.start(opts)
 
-    def start_next(opts):
-        key, limit = opts.async
-        pos, queue = _async[key]; _async[key][0] += 1
-        if pos < len(queue):
-            start(queue[pos], 1)
-
     def perform():
         for opts, size, ug_err in dl.perform():
+            key, limit = opts.async
+            host_con[key] -= 1
             if meter:
                 m = opts.progress_obj
                 m.basename = os.path.basename(opts.filename)
@@ -2153,7 +2157,6 @@ def parallel_wait(meter = 'text'):
                             meter.numfiles += 1
                             meter.re.total += opts.size
                 if ug_err is None:
-                    start_next(opts)
                     continue
 
             retry = opts.retry or 0
@@ -2164,31 +2167,16 @@ def parallel_wait(meter = 'text'):
             if opts.tries < retry and ug_err.args[0] in opts.retrycodes:
                 start(opts, opts.tries + 1) # simple retry
                 continue
-            start_next(opts)
 
-            if hasattr(opts, 'mirror_group'):
-                mg, gr, mirrorchoice = opts.mirror_group
+            if opts.mirror_group:
+                mg, failed = opts.mirror_group
+                opts.mirror = key
                 opts.exception = ug_err
-                opts.mirror = mirrorchoice['mirror']
-                opts.relative_url = gr.url
-                try:
-                    mg._failure(gr, opts)
-                    mirrorchoice = mg._get_mirror(gr)
-                    opts.mirror_group = mg, gr, mirrorchoice
-                except URLGrabError, ug_err: pass
-                else:
-                    # use new mirrorchoice
-                    key = mirrorchoice['mirror']
-                    kwargs = mirrorchoice.get('kwargs', {})
-                    limit = kwargs.get('max_connections', 1)
-                    opts.async = key, limit
-                    opts.url = mg._join_url(mirrorchoice['mirror'], gr.url)
-
-                    # add request to the new queue
-                    pos, queue = _async.setdefault(key, [limit, []])
-                    queue[pos:pos] = [opts] # inserting at head
-                    if len(queue) <= pos:
-                        start(opts, 1)
+                action = _run_callback(mg.failure_callback, opts)
+                if not (action and action.get('fail')):
+                    # mask this mirror and retry
+                    failed.add(key)
+                    _async_queue.append(opts)
                     continue
 
             # urlgrab failed
@@ -2196,18 +2184,49 @@ def parallel_wait(meter = 'text'):
             _run_callback(opts.failfunc, opts)
 
     try:
-        for limit, queue in _async.values():
-            for opts in queue[:limit]:
-                start(opts, 1)
-        # now 'limit' is used as 'pos', index
-        # of the first request not started yet.
-        while dl.running:
-            perform()
+        idx = 0
+        while True:
+            if idx >= len(_async_queue):
+                if not dl.running: break
+                perform(); continue
+
+            # check global limit
+            opts = _async_queue[idx]; idx += 1
+            limit = opts.max_connections
+            while len(dl.running) >= limit: perform()
+
+            if opts.mirror_group:
+                first = None
+                mg, failed = opts.mirror_group
+                for mirror in mg.mirrors:
+                    key = mirror['mirror']
+                    if key in failed: continue
+                    if not first: first = mirror
+                    limit = mirror.get('kwargs', {}).get('max_connections', 3)
+                    if host_con.get(key, 0) < limit: break
+                else:
+                    # no mirror with free slots.
+                    if not first:
+                        opts.exception = URLGrabError(256, _('No more mirrors to try.'))
+                        _run_callback(opts.failfunc, opts)
+                        continue
+                    mirror = first # fallback
+                    key = mirror['mirror']
+                    limit = mirror.get('kwargs', {}).get('max_connections', 3)
+
+                # update the request
+                opts.async = key, limit
+                opts.url = mg._join_url(key, opts.relative_url)
+
+            # check host limit, then start
+            key, limit = opts.async
+            while host_con.get(key, 0) >= limit: perform()
+            start(opts, 1)
 
     finally:
         dl.abort()
         if meter: meter.end()
-        _async.clear()
+        del _async_queue[:]
 
 
 #####################################################################
diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py
index da601b7..d699b61 100644
--- a/urlgrabber/mirror.py
+++ b/urlgrabber/mirror.py
@@ -77,7 +77,7 @@ CUSTOMIZATION
        kwargs are omitted, then (duh) they will not be used.
 
        kwarg 'max_connections' is used to store the max connection
-       limit of this mirror, and to update the value of 'async' option.
+       limit of this mirror.
 
     3) Pass keyword arguments when instantiating the mirror group.
        See, for example, the failure_callback argument.
@@ -94,7 +94,7 @@ import random
 import thread  # needed for locking to make this threadsafe
 
 from grabber import URLGrabError, CallbackObject, DEBUG, _to_utf8
-from grabber import _run_callback, _do_raise
+from grabber import _run_callback, _do_raise, _async_queue
 
 def _(st): 
     return st
@@ -175,13 +175,6 @@ class MirrorGroup:
                'fail': 0}              # set at instantiation, reset
                                        # from callback
 
-      max_connections
-
-        This option applies to parallel downloading only.  If specified,
-        downloads are evenly distributed to first N mirrors.  N is selected as
-        the smallest number of mirrors such that their total connection limit
-        is at least max_connections.  Retries are not such restricted.
-
       failure_callback
 
         this is a callback that will be called when a mirror "fails",
@@ -270,14 +263,6 @@ class MirrorGroup:
     def _process_kwargs(self, kwargs):
         self.failure_callback = kwargs.get('failure_callback')
         self.default_action   = kwargs.get('default_action')
-        if 'max_connections' in kwargs:
-            total = count = 0
-            for mirror in self.mirrors:
-                count += 1
-                total += mirror.get('kwargs', {}).get('max_connections', 1)
-                if total >= kwargs['max_connections']:
-                    break
-            self._spread = count
        
     def _parse_mirrors(self, mirrors):
         parsed_mirrors = []
@@ -410,17 +395,6 @@ class MirrorGroup:
             grabber = mirrorchoice.get('grabber') or self.grabber
             func_ref = getattr(grabber, func)
             if DEBUG: DEBUG.info('MIRROR: trying %s -> %s', url, fullurl)
-            if kw.get('async'):
-                # 'async' option to the 1st mirror
-                key = mirrorchoice['mirror']
-                limit = kwargs.get('max_connections', 1)
-                kwargs['async'] = key, limit
-                # async code iterates mirrors and calls failfunc
-                kwargs['mirror_group'] = self, gr, mirrorchoice
-                kwargs['failfunc'] = gr.kw.get('failfunc', _do_raise)
-                if hasattr(self, '_spread'):
-                    # increment the master mirror index
-                    self._next = (self._next + 1) % self._spread
             try:
                 return func_ref( *(fullurl,), **kwargs )
             except URLGrabError, e:
@@ -433,6 +407,17 @@ class MirrorGroup:
                 self._failure(gr, obj)
 
     def urlgrab(self, url, filename=None, **kwargs):
+        if kwargs.get('async'):
+            opts = self.grabber.opts.derive(**kwargs)
+            opts.mirror_group = self, set()
+            opts.relative_url = _to_utf8(url)
+
+            opts.url = 'http://tbd'
+            opts.filename = filename
+            opts.size = int(opts.size or 0)
+            _async_queue.append(opts)
+            return filename
+
         kw = dict(kwargs)
         kw['filename'] = filename
         func = 'urlgrab'
-- 
1.7.4.4



More information about the Yum-devel mailing list