[yum-commits] Branch 'multi-downloader' - 5 commits - urlgrabber/grabber.py urlgrabber/mirror.py
zpavlas at osuosl.org
zpavlas at osuosl.org
Fri Dec 9 13:16:33 UTC 2011
urlgrabber/grabber.py | 248 ++++++++++++--------------------------------------
urlgrabber/mirror.py | 2
2 files changed, 63 insertions(+), 187 deletions(-)
New commits:
commit d735e596821f292fcb30426dfe5a2b884566b903
Author: ZdenÄk Pavlas <zpavlas at redhat.com>
Date: Fri Dec 9 14:14:47 2011 +0100
Use first 5 mirrors instead of just the 1st one.
diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py
index 41c99f5..853790b 100644
--- a/urlgrabber/mirror.py
+++ b/urlgrabber/mirror.py
@@ -403,6 +403,8 @@ class MirrorGroup:
# async code iterates mirrors and calls failfunc
kwargs['mirror_group'] = self, gr, mirrorchoice
kwargs['failfunc'] = gr.kw.get('failfunc', _do_raise)
+ # increment master
+ self._next = (self._next + 1) % min(len(self.mirrors), 5)
try:
return func_ref( *(fullurl,), **kwargs )
except URLGrabError, e:
commit e7917b12e871e700eed09ea89852ad69bc4a6b76
Author: ZdenÄk Pavlas <zpavlas at redhat.com>
Date: Fri Dec 9 14:13:44 2011 +0100
Revert "move pycurl.error handling to _do_perform_exc()"
This reverts commit 54ee979ada490cdae12fff038a838602fa404075.
diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 8c45ff2..88bada6 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1340,14 +1340,8 @@ class PyCurlFileObject(object):
return
try:
- e = None
self.curl_obj.perform()
- except pycurl.error, e: pass
- self._do_perform_exc(e)
-
- def _do_perform_exc(self, e):
- # handle pycurl exception 'e'
- if e:
+ except pycurl.error, e:
# XXX - break some of these out a bit more clearly
# to other URLGrabErrors from
# http://curl.haxx.se/libcurl/c/libcurl-errors.html
commit 6292510b70b16513449d602d9cea4ffc3625f5df
Author: ZdenÄk Pavlas <zpavlas at redhat.com>
Date: Fri Dec 9 14:13:42 2011 +0100
Revert "move opening of target file to _do_open_fo()."
This reverts commit eb365596a44bd8ca15fe290d50290f0f84a268dd.
diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 05a5c51..8c45ff2 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1601,7 +1601,22 @@ class PyCurlFileObject(object):
_was_filename = False
if type(self.filename) in types.StringTypes and self.filename:
_was_filename = True
- self._do_open_fo()
+ self._prog_reportname = str(self.filename)
+ self._prog_basename = os.path.basename(self.filename)
+
+ if self.append: mode = 'ab'
+ else: mode = 'wb'
+
+ if DEBUG: DEBUG.info('opening local file "%s" with mode %s' % \
+ (self.filename, mode))
+ try:
+ self.fo = open(self.filename, mode)
+ except IOError, e:
+ err = URLGrabError(16, _(\
+ 'error opening local file from %s, IOError: %s') % (self.url, e))
+ err.url = self.url
+ raise err
+
else:
self._prog_reportname = 'MEMORY'
self._prog_basename = 'MEMORY'
@@ -1659,22 +1674,6 @@ class PyCurlFileObject(object):
self._complete = True
- def _do_open_fo(self):
- self._prog_reportname = str(self.filename)
- self._prog_basename = os.path.basename(self.filename)
- if self.append: mode = 'ab'
- else: mode = 'wb'
-
- if DEBUG: DEBUG.info('opening local file "%s" with mode %s' % \
- (self.filename, mode))
- try:
- self.fo = open(self.filename, mode)
- except IOError, e:
- err = URLGrabError(16, _(\
- 'error opening local file from %s, IOError: %s') % (self.url, e))
- err.url = self.url
- raise err
-
def _fill_buffer(self, amt=None):
"""fill the buffer to contain at least 'amt' bytes by reading
from the underlying file object. If amt is None, then it will
commit cceba103aa81e18b4dbe454092bdb1cb10aad8c8
Author: ZdenÄk Pavlas <zpavlas at redhat.com>
Date: Fri Dec 9 14:13:33 2011 +0100
Revert "move closing of target file to _do_close_fo()."
This reverts commit 5443754de676ed4b173502522143460ae8a50013.
diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 649d235..05a5c51 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -1621,7 +1621,29 @@ class PyCurlFileObject(object):
raise e
if _was_filename:
- self._do_close_fo()
+ # close it up
+ self.fo.flush()
+ self.fo.close()
+
+ # Set the URL where we got it from:
+ if xattr is not None:
+ # See: http://www.freedesktop.org/wiki/CommonExtendedAttributes
+ try:
+ xattr.set(self.filename, 'user.xdg.origin.url', self.url)
+ except:
+ pass # URL too long. = IOError ... ignore everything.
+
+ # set the time
+ mod_time = self.curl_obj.getinfo(pycurl.INFO_FILETIME)
+ if mod_time != -1:
+ try:
+ os.utime(self.filename, (mod_time, mod_time))
+ except OSError, e:
+ err = URLGrabError(16, _(\
+ 'error setting timestamp on file %s from %s, OSError: %s')
+ % (self.filename, self.url, e))
+ err.url = self.url
+ raise err
# re open it
try:
self.fo = open(self.filename, 'r')
@@ -1653,31 +1675,6 @@ class PyCurlFileObject(object):
err.url = self.url
raise err
- def _do_close_fo(self):
- # close it up
- self.fo.flush()
- self.fo.close()
-
- # Set the URL where we got it from:
- if xattr is not None:
- # See: http://www.freedesktop.org/wiki/CommonExtendedAttributes
- try:
- xattr.set(self.filename, 'user.xdg.origin.url', self.url)
- except:
- pass # URL too long. = IOError ... ignore everything.
-
- # set the time
- mod_time = self.curl_obj.getinfo(pycurl.INFO_FILETIME)
- if mod_time != -1:
- try:
- os.utime(self.filename, (mod_time, mod_time))
- except OSError, e:
- err = URLGrabError(16, _(\
- 'error setting timestamp on file %s from %s, OSError: %s')
- % (self.filename, self.url, e))
- err.url = self.url
- raise err
-
def _fill_buffer(self, amt=None):
"""fill the buffer to contain at least 'amt' bytes by reading
from the underlying file object. If amt is None, then it will
commit 64de088cdeda737541d40f07f0bb5228cdbddc39
Author: ZdenÄk Pavlas <zpavlas at redhat.com>
Date: Fri Dec 9 14:11:36 2011 +0100
assume ug_mode == 'pooled'
diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 0400e8f..649d235 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -563,13 +563,6 @@ _log_package_state()
def _(st):
return st
-# Downloader modes:
-# 'compat': no parallel downloads
-# 'direct': use CurlMulti directly
-# 'extern': fork+exec a process that uses CurlMulti
-# 'pooled': multiple fork+exec'd processes
-ug_mode = os.environ.get('URLGRABBER_MODE', 'pooled')
-
########################################################################
# END MODULE INITIALIZATION
########################################################################
@@ -1036,7 +1029,7 @@ class URLGrabber(object):
_run_callback(opts.checkfunc, obj)
return path
- if opts.async and ug_mode != 'compat':
+ if opts.async:
opts.url = url
opts.filename = filename
opts.size = int(opts.size or 0)
@@ -1859,105 +1852,6 @@ def retrygrab(url, filename=None, copy_local=0, close_connection=0,
#####################################################################
-# Downloader
-#####################################################################
-
-class _AsyncCurlFile(PyCurlFileObject):
- curl_cache = {}
- def _host(self):
- return urlparse.urlsplit(self.opts.url).netloc
-
- def _do_open(self):
- # try to reuse curl objects
- curl = self.curl_cache.pop(self._host(), None)
- self.curl_obj = curl or pycurl.Curl()
- self._set_opts()
- self._do_open_fo() # open the file but don't grab
-
- def _do_close_fo(self):
- self.curl_cache[self._host()] = self.curl_obj
- PyCurlFileObject._do_close_fo(self)
-
-class _DirectDownloaderMulti:
- def __init__(self):
- ''' A downloader context.
- '''
- self.running = {}
- self.multi = pycurl.CurlMulti()
- self.tout = 0
-
- def select(self):
- ''' Block for some time. Return True if 'stdin' readable,
- False when just internal processing needed.
- '''
- fdset = self.multi.fdset()
- fdset[0].append(0) # stdin
- fdset = select.select(*(fdset + (self.tout,)))
- self.tout = min(self.tout * 1.1, 5)
- return 0 in fdset[0]
-
- def start(self, opts):
- ''' Start download of job 'opts'
- '''
- fo = _AsyncCurlFile(opts.url, opts.filename, opts)
- self.running[fo.curl_obj] = fo
- self.multi.add_handle(fo.curl_obj)
-
- # XXX: likely a CurlMulti() bug
- # fdset() is empty shortly after starting new request.
- # Do some polling to work this around.
- self.tout = 10e-3
-
- def perform(self):
- ''' Run downloads, return finished ones.
- '''
- while self.multi.perform()[0] == pycurl.E_CALL_MULTI_PERFORM:
- pass
- ret = []
- _, finished, failed = self.multi.info_read()
- for curl in finished + failed:
- curl_err = None
- if type(curl) == tuple:
- curl, code, msg = curl
- curl_err = pycurl.error(code, msg)
- self.multi.remove_handle(curl)
- fo = self.running.pop(curl)
- try: ug_err = None; fo._do_perform_exc(curl_err)
- except URLGrabError, ug_err: pass
- fo._do_close_fo()
- ret.append((fo.opts, ug_err, fo._amount_read))
- return ret
-
- def abort(self):
- ''' Abort currently active downloads.
- '''
- while self.running:
- curl, fo = self.running.popitem()
- self.multi.remove_handle(curl)
- fo._do_close_fo()
-
-class _DirectDownloaderSingle:
- ''' _DirectDownloader downloading one file at a time.
- '''
- def select(self):
- return True
-
- def start(self, opts):
- try:
- fo = PyCurlFileObject(opts.url, opts.filename, opts)
- fo._do_grab(); _amount_read = fo._amount_read
- fo.fo.close(); ug_err = None
- except URLGrabError, ug_err:
- _amount_read = 0
- self.result = opts, ug_err, _amount_read
-
- def perform(self):
- return [self.result]
-
- def abort(self):
- pass
-
-#####################################################################
# Serializer + parser: A replacement of the rather bulky Json code.
#
# - handles basic python literals, lists and tuples.
@@ -2058,33 +1952,28 @@ def download_process():
- use ProxyProgress to send _amount_read during dl.
- abort on EOF.
'''
- if ug_mode == 'extern': dl = _DirectDownloaderMulti()
- elif ug_mode == 'pooled': dl = _DirectDownloaderSingle()
- else: raise ValueError, 'Unknown ext mode %s' % ug_mode
-
cnt = 0
while True:
- if dl.select():
- lines = _readlines(0)
- if not lines: break
- for line in lines:
- # start new download
- cnt += 1
- opts = URLGrabberOptions()
- opts._id = cnt
- for k in line.split(' '):
- k, v = k.split('=', 1)
- setattr(opts, k, _loads(v))
- if opts.progress_obj:
- opts.progress_obj = _ProxyProgress()
- opts.progress_obj._id = cnt
- dl.start(opts)
-
- # perform requests
- for opts, ug_err, _amount_read in dl.perform():
- ug_err = ug_err and '%d %s' % ug_err.args or 'OK'
+ lines = _readlines(0)
+ if not lines: break
+ for line in lines:
+ cnt += 1
+ opts = URLGrabberOptions()
+ opts._id = cnt
+ for k in line.split(' '):
+ k, v = k.split('=', 1)
+ setattr(opts, k, _loads(v))
+ if opts.progress_obj:
+ opts.progress_obj = _ProxyProgress()
+ opts.progress_obj._id = cnt
+ try:
+ fo = PyCurlFileObject(opts.url, opts.filename, opts)
+ fo._do_grab(); _amount_read = fo._amount_read
+ fo.fo.close(); ug_err = 'OK'
+ except URLGrabError, e:
+ _amount_read = 0
+ ug_err = '%d %s' % e.args
os.write(1, '%d %d %s\n' % (opts._id, _amount_read, ug_err))
- dl.abort()
sys.exit(0)
import subprocess
@@ -2217,8 +2106,6 @@ def parallel_wait(meter = 'text'):
'''Process queued requests in parallel.
'''
- if ug_mode == 'compat':
- return
if meter:
count = total = 0
for limit, queue in _async.values():
@@ -2230,10 +2117,7 @@ def parallel_wait(meter = 'text'):
meter = TextMultiFileMeter()
meter.start(count, total)
- if ug_mode == 'direct': dl = _DirectDownloaderMulti()
- elif ug_mode == 'extern': dl = _ExternalDownloader()
- elif ug_mode == 'pooled': dl = _ExternalDownloaderPool()
- else: raise ValueError, 'Unknown mode %s' % ug_mode
+ dl = _ExternalDownloaderPool()
def start(opts, tries):
opts.tries = tries
More information about the Yum-commits
mailing list