[Yum-devel] [PATCH 8/9] Implement parallel urlgrab()s

Zdeněk Pavlas zpavlas at redhat.com
Mon Sep 26 16:06:24 UTC 2011


        default_grabber.opts.parallel:
            concurrent connections limit.
            0 = always use blocking urlgrab()

        parallel_begin():
            start queueing grab requests

        parallel_end():
            process queue in parallel
---
 urlgrabber/grabber.py |  171 +++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 171 insertions(+), 0 deletions(-)

diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 303428c..f0f4c6b 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -872,6 +872,7 @@ class URLGrabberOptions:
         self.size = None # if we know how big the thing we're getting is going
                          # to be. this is ultimately a MAXIMUM size for the file
         self.max_header_size = 2097152 #2mb seems reasonable for maximum header size
+        self.parallel = 5 # max connections for parallel grabs
         
     def __repr__(self):
         return self.format()
@@ -1008,6 +1009,13 @@ class URLGrabber(object):
                     apply(cb_func, (obj, )+cb_args, cb_kwargs)        
                 return path
         
+        if _async_on:
+            opts.url = url
+            opts.filename = filename
+            opts.size = int(opts.size or 0)
+            _async_list.append(opts)
+            return filename
+
         def retryfunc(opts, url, filename):
             fo = PyCurlFileObject(url, filename, opts)
             try:
@@ -1827,6 +1835,169 @@ def retrygrab(url, filename=None, copy_local=0, close_connection=0,
 
         
 #####################################################################
+#  Helpers
+#####################################################################
+
+class _AsyncCurlFile(PyCurlFileObject):
+    def _do_open(self):
+        self.curl_obj = pycurl.Curl() # don't reuse _curl_cache
+        self._set_opts()
+        self._do_open_fo() # open the file but don't grab
+
+def _callback(cb, obj):
+    if callable(cb):
+        return cb(obj)
+    cb, arg, karg = cb
+    return cb(obj, *arg, **karg)
+
+#####################################################################
+#  High level async API
+#####################################################################
+
+_async_on = False
+_async_list = []
+
+def parallel_begin():
+    '''Start queuing urlgrab() requests.
+    '''
+
+    if default_grabber.opts.parallel == 0:
+        return
+
+    global _async_on
+    assert _async_on == False
+    _async_on = True
+
+def parallel_end(meter = 'text'):
+    '''Process queued requests in parallel.
+    '''
+
+    if default_grabber.opts.parallel == 0:
+        return
+
+    global _async_on
+    assert _async_on == True
+    _async_on = False
+
+    global _async_list
+    if not _async_list: return
+    todo = _async_list; _async_list = []
+    limit = default_grabber.opts.parallel
+
+    if meter:
+        total = 0
+        for opts in todo:
+            total += opts.size
+        if meter == 'text':
+            from progress import TextMultiFileMeter
+            meter = TextMultiFileMeter()
+        meter.start(len(todo), total)
+
+    running = {}
+    multi = pycurl.CurlMulti()
+
+    def start(opts, tries):
+        opts.tries = tries
+        if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, opts.url)
+        opts.progress_obj = meter and meter.newMeter()
+        fo = _AsyncCurlFile(opts.url, opts.filename, opts)
+        running[fo.curl_obj] = fo
+        multi.add_handle(fo.curl_obj)
+
+    def perform():
+        while multi.perform()[0] == pycurl.E_CALL_MULTI_PERFORM:
+            pass
+        q, finished, failed = multi.info_read()
+        for curl in finished + failed:
+
+            # curl layer
+            curl_err = None
+            if type(curl) == tuple:
+                curl, code, msg = curl
+                curl_err = pycurl.error(code, msg)
+            multi.remove_handle(curl)
+
+            # grabber layer
+            fo = running.pop(curl); opts = fo.opts
+            try: ug_err = None; fo._do_perform_exc(curl_err)
+            except URLGrabError, ug_err: pass
+            fo._do_close_fo()
+
+            # do progress before callbacks to show retries
+            if meter:
+                m = opts.progress_obj
+                if ug_err:
+                    # progress_obj might not have start()ed yet
+                    m.basename = fo._prog_basename
+                    m.failure(ug_err.args[1])
+                else:
+                    # file size might have changed
+                    meter.re.total += fo._amount_read - opts.size
+                    m.end(fo._amount_read)
+                meter.removeMeter(m)
+
+            if ug_err is None:
+                if DEBUG: DEBUG.info('success')
+                if opts.checkfunc:
+                    try: _callback(opts.checkfunc, opts)
+                    except URLGrabError, ug_err:
+                        if meter:
+                            meter.numfiles += 1
+                            meter.re.total += opts.size
+                if ug_err is None:
+                    # download & checkfunc: OK
+                    continue
+
+            if DEBUG: DEBUG.info('failure: %s', ug_err)
+            retry = opts.retry or 0
+            if opts.failure_callback:
+                opts.exception = ug_err
+                try: _callback(opts.failure_callback, opts)
+                except URLGrabError, ug_err: retry = 0 # no retries
+            if opts.tries < retry and ug_err.args[0] in opts.retrycodes:
+                # simple retry
+                start(opts, opts.tries + 1)
+                continue
+
+            if hasattr(opts, 'mirror_group'):
+                mg, gr, mirrorchoice = opts.mirror_group
+                opts.exception = ug_err
+                opts.mirror = mirrorchoice['mirror']
+                opts.relative_url = gr.url
+                try:
+                    mg._failure(gr, opts)
+                    mirrorchoice = mg._get_mirror(gr)
+                    opts.url = mg._join_url(mirrorchoice['mirror'], gr.url)
+                    opts.mirror_group = mg, gr, mirrorchoice
+                    # retry next mirror
+                    start(opts, 1)
+                    continue
+                except URLGrabError, ug_err: pass
+
+            # urlgrab failed
+            if not hasattr(opts, 'failfunc'): raise ug_err
+            opts.exception = ug_err
+            _callback(opts.failfunc, opts)
+
+    try:
+        for opts in todo:
+            start(opts, 1)
+            while len(running) >= limit:
+                perform()
+        while running:
+            perform()
+
+    finally:
+        while running:
+            curl, fo = running.popitem()
+            multi.remove_handle(curl)
+            fo._do_close_fo()
+            os.unlink(fo.opts.filename)
+
+        if meter:
+            meter.end()
+
+#####################################################################
 #  TESTING
 def _main_test():
     try: url, filename = sys.argv[1:3]
-- 
1.7.4.4



More information about the Yum-devel mailing list