[Rpm-metadata] [PATCH] Add delta metadata generator to createrepo

Zdenek Pavlas zpavlas at redhat.com
Tue Jul 23 14:15:23 UTC 2013


This adds the following options:

--deltamd <a persistent per-repository directory>
--deltamd-types <list of mdtypes>
--deltamd-sizes <list of delta thresholds>

We create up to N deltas for each mdtype where N is the number
of values specified with --deltamd-sizes.  The same delta file
may be used to update from more than one version.

We start with the most recent version and keep adding older ones
until the estimated delta size hits the first threshold. A single
combined delta is created.  We continue with the next threshold,
again packing as much versions as possible.

The delta files contain literals (prefixed with +<size>/n)
and old data references.  Since we don't reorder chunks the
references are interpreted as "skip until the matching
one is found".  This allows us to use prefixes and since
all source versions are known we can use the shortest one,
often just an empty string plus terminating newline.

Note:

The repomd.xml formatter is in yum.repoMDObject, and it was
necessary to update this.  Hence, yum with this commit
is needed to create delta files:

http://yum.baseurl.org/gitweb?p=yum.git;a=commitdiff;h=4b787ee

However, repomd.xml with deltas does NOT break old yum,
neither any client that properly ignores unknown XML element.
---
 createrepo/__init__.py | 137 ++++++++++++++++++++++++++++++++++++++++++++++++-
 createrepo/utils.py    |  28 ++++++++++
 docs/createrepo.8      |  11 +++-
 genpkgmetadata.py      |   3 ++
 4 files changed, 177 insertions(+), 2 deletions(-)

diff --git a/createrepo/__init__.py b/createrepo/__init__.py
index 698b7f3..6305c98 100644
--- a/createrepo/__init__.py
+++ b/createrepo/__init__.py
@@ -51,6 +51,8 @@ from utils import _gzipOpen, compressFile, compressOpen, checkAndMakeDir, GzipFi
                   checksum_and_rename, split_list_into_equal_chunks
 from utils import num_cpus_online
 import deltarpms
+import glob
+from createrepo.utils import delta_split, delta_hash
 
 __version__ = '0.9.9'
 
@@ -174,6 +176,7 @@ class MetaDataGenerator:
         # this does the dir setup we need done
         self._parse_directory()
         self._test_setup_dirs()
+        self.deltamd = {}
 
     def _parse_directory(self):
         """pick up the first directory given to us and make sure we know
@@ -747,6 +750,111 @@ class MetaDataGenerator:
             if self.conf.profile:
                 self.callback.log('deltam time: %0.3f' % (time.time() - deltam_st))
 
+        # create delta metadata if enabled
+        if getattr(self.conf, 'deltamd', None) and not self.conf.database_only:
+            for mdtype in self.conf.deltamd_types.split(','):
+                if hasattr(self.conf, mdtype + 'file'):
+                    st = time.time()
+                    self.make_deltamd(mdtype)
+                    if self.conf.profile:
+                        self.callback.log('deltamd %s time: %0.3f' % (mdtype, time.time() - st))
+
+    def make_deltamd(self, mdtype):
+        # first mdtype pass: hash only
+        name = getattr(self.conf, mdtype + 'file')
+        path = os.path.join(self.conf.outputdir, self.conf.tempdir, name + '.gz')
+        latest = []; total = 0
+        for chunk in delta_split(compressOpen(path).read):
+            # XXX keep chunks in memory instead of 2nd pass?
+            latest.append((delta_hash(chunk), len(chunk)))
+            total += len(chunk)
+
+        # find previous metadata versions
+        hist = glob.glob('%s/%s-*' % (self.conf.deltamd, mdtype))
+        hist = [(int(f.rsplit('-', 1)[1]), f) for f in hist]
+        hist.sort()
+
+        # create deltas up to the specified relative size
+        deltas = []
+        timestamps = []
+        for limit in sorted(map(float, self.conf.deltamd_sizes.split(','))):
+            old = [] # single delta to update from any of these
+            common = latest # common subsequence of the above
+
+            while hist:
+                # load the newest version
+                ver_hash = {}; ver = []
+                for h in open(hist[-1][1]).read().split():
+                    assert h not in ver_hash # should be unique
+                    ver_hash[h] = len(ver)
+                    ver.append(h)
+
+                # intersect with common to new_common
+                new_common = []
+                i = -1; size = 0
+                for h, s in common:
+                    j = ver_hash.get(h, -1)
+                    if j > i:
+                        # matched j-th chunk
+                        new_common.append((h, s))
+                        i = j; size += s
+                size = (total - size)*100.0/total
+                if size > limit:
+                    break
+
+                # we're below the limit- merge it
+                old.append(ver)
+                common = new_common
+                deltasize = size
+                timestamp = hist.pop()[0]
+            if not old:
+                continue
+
+            # minimal prefix needed to find the right chunk
+            pos = [0] * len(old)
+            def min_prefix(h):
+                plen = 0
+                n = 0
+                while n < len(old):
+                    i = pos[n]
+                    while True:
+                        old_h = old[n][i]; i += 1
+                        if old_h == h: break
+                        while old_h[:plen] == h[:plen]:
+                            plen += 1
+                    pos[n] = i
+                    n += 1
+                return plen
+
+            # calculate prefixes now so we won't need "old" later
+            common = [h for h, s in common] # drop sizes
+            dpath = '%s.delta%d.%s' % (path[:-3], len(deltas), self.conf.compress_type)
+            deltas.append((compressOpen(dpath, 'wb'), common, map(min_prefix, common)))
+            self.callback.log('deltamd: %s %.1f%% delta: %d versions merged' % (mdtype, deltasize, len(old)))
+            timestamps.append(timestamp)
+
+        # second metadata pass: write all deltas
+        signature = []
+        pos = [0] * len(deltas)
+        for chunk in delta_split(compressOpen(path).read):
+            h = delta_hash(chunk)
+            signature.append(h)
+            n = 0
+            while n < len(deltas):
+                i = pos[n]
+                dfile, common, plen = deltas[n]
+                if i < len(common) and common[i] == h:
+                    dfile.write('%s\n' % h[:plen[i]])
+                    pos[n] = i + 1
+                else:
+                    # XXX should we merge adjacent literals?
+                    dfile.write('+%d\n' % len(chunk))
+                    dfile.write(chunk)
+                n += 1
+
+        # will be used later..
+        self.deltamd[mdtype] = timestamps, signature
+
     def _do_delta_rpm_package(self, pkg):
         """makes the drpms, if possible, for this package object.
            returns the presto/delta xml metadata as a string
@@ -1067,6 +1175,33 @@ class MetaDataGenerator:
             data.location = (self.conf.baseurl, href)
             repomd.repoData[data.type] = data
 
+            # add deltas to RepoData object
+            rpm_file = getattr(self.conf, ftype + 'file')
+            timestamps, signature = self.deltamd.get(ftype) or ([], None)
+            for n, timestamp in enumerate(timestamps):
+
+                delta_file = '%s.delta%d.%s' % (rpm_file, n, self.conf.compress_type)
+                delta_path = os.path.join(repopath, delta_file)
+                csum = misc.checksum(sumtype, delta_path)
+                if self.conf.unique_md_filenames:
+                    orig = delta_path
+                    delta_file = '%s-%s' % (csum, delta_file)
+                    delta_path = os.path.join(repopath, delta_file)
+                    os.rename(orig, delta_path)
+
+                ddata = RepoData()
+                ddata.checksum = (sumtype, csum)
+                ddata.timestamp = str(timestamp)
+                ddata.size = str(os.stat(delta_path).st_size)
+                href = os.path.join(self.conf.finaldir, delta_file)
+                ddata.location = (self.conf.baseurl, href)
+                data.deltas.append(ddata)
+
+            if signature:
+                # add metadata signature to history directory
+                hfile = os.path.join(self.conf.deltamd, '%s-%s' % (ftype, data.timestamp))
+                open(hfile, 'wb').write('\n'.join(signature))
+
         if not self.conf.quiet and self.conf.database:
             self.callback.log('Sqlite DBs complete')
 
@@ -1159,7 +1294,7 @@ class MetaDataGenerator:
             for (end,lst) in (('-primary.sqlite', old_pr_db), ('-primary.xml', old_pr),
                            ('-filelists.sqlite', old_fl_db), ('-filelists.xml', old_fl),
                            ('-other.sqlite', old_ot_db), ('-other.xml', old_ot)):
-                fn = '.'.join(f.split('.')[:-1])
+                fn = f.rsplit('.', 1)[0].rsplit('.delta', 1)[0]
                 if fn.endswith(end):
                     lst.append(oldfile)
                     break
diff --git a/createrepo/utils.py b/createrepo/utils.py
index b0d92ec..67f431c 100644
--- a/createrepo/utils.py
+++ b/createrepo/utils.py
@@ -136,6 +136,34 @@ def compressOpen(fn, mode='rb', compress_type=None):
     else:
         raise MDError, "Unknown compression type %s" % compress_type
     
+def delta_split(read, pattern='<package '):
+    ''' Read the stream, splitting data at each pattern instance.
+        Split at the last '</', too, so XML with N elements below
+        the root yields exactly N+2 items.
+    '''
+    buf = ''
+    while True:
+        more = read(0x4000)
+        if not more: break
+        buf += more
+        i = 0
+        while True:
+            j = buf.find(pattern, i + len(pattern))
+            if j == -1: break
+            yield buf[i:j]
+            i = j
+        buf = buf[i:]
+    i = buf.rfind('</')
+    if i != -1:
+        yield buf[:i]
+        buf = buf[i:]
+    yield buf
+
+from hashlib import sha1 as hash_func
+
+def delta_hash(chunk):
+    return '%x%s' % (len(chunk) & 0xf, hash_func(chunk).hexdigest())
+
 def returnFD(filename):
     try:
         fdno = os.open(filename, os.O_RDONLY)
diff --git a/docs/createrepo.8 b/docs/createrepo.8
index eefd4bf..cf91e35 100644
--- a/docs/createrepo.8
+++ b/docs/createrepo.8
@@ -111,7 +111,16 @@ Specify keyword/tags about the repository itself. Can be specified more than onc
 .IP "\fB\--revision\fP"
 Arbitrary string for a repository revision.
 .IP "\fB\--deltas\fP"
-Tells createrepo to generate deltarpms and the delta metadata
+Tells createrepo to generate deltarpms and the deltarpm metadata
+.IP "\fB\--deltamd\fP"
+Set this to a persistent directory to enable creation of delta metadata.
+The directory is used to store signatures of metadata that clients cache locally.
+.IP "\fB\--deltamd-types\fP"
+A comma-separated list of mdtypes to generate delta metadata for.
+Default is `primary,filelists`.
+.IP "\fB\--deltamd-sizes\fP"
+A comma-separated list of delta metadata thresholds.  The default is `1,5`.
+This adds up to two deltas, approximately 1% and 5% the size of full metadata.
 .IP "\fB\--oldpackagedirs\fP PATH"
 paths to look for older pkgs to delta against. Can be specified multiple times
 .IP "\fB\--num-deltas\fP int"
diff --git a/genpkgmetadata.py b/genpkgmetadata.py
index 4528bf2..10f2197 100755
--- a/genpkgmetadata.py
+++ b/genpkgmetadata.py
@@ -135,6 +135,9 @@ def parse_args(args, conf):
         help=SUPPRESS_HELP)
     parser.add_option("--compress-type", default='compat', dest="compress_type",
         help="which compression type to use")
+    parser.add_option("--deltamd", help="create delta metadata, using this persistent directory")
+    parser.add_option("--deltamd-types", default="primary,filelists", help="list of deltamd types")
+    parser.add_option("--deltamd-sizes", default="1,5", help="list of deltamd size tresholds")
         
     
     (opts, argsleft) = parser.parse_args(args)
-- 
1.7.11.7



More information about the Rpm-metadata mailing list