1 """Utilities for extracting common archive formats"""
5 "unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter",
6 "UnrecognizedFormat", "extraction_drivers", "unpack_directory",
9 import zipfile, tarfile, os, shutil
10 from pkg_resources import ensure_directory
11 from distutils.errors import DistutilsError
13 class UnrecognizedFormat(DistutilsError):
14 """Couldn't recognize the archive type"""
16 def default_filter(src,dst):
17 """The default progress/filter callback; returns True for all files"""
42 def unpack_archive(filename, extract_dir, progress_filter=default_filter,
45 """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat``
47 `progress_filter` is a function taking two arguments: a source path
48 internal to the archive ('/'-separated), and a filesystem path where it
49 will be extracted. The callback must return the desired extract path
50 (which may be the same as the one passed in), or else ``None`` to skip
51 that file or directory. The callback can thus be used to report on the
52 progress of the extraction, as well as to filter the items extracted or
53 alter their extraction paths.
55 `drivers`, if supplied, must be a non-empty sequence of functions with the
56 same signature as this function (minus the `drivers` argument), that raise
57 ``UnrecognizedFormat`` if they do not support extracting the designated
58 archive type. The `drivers` are tried in sequence until one is found that
59 does not raise an error, or until all are exhausted (in which case
60 ``UnrecognizedFormat`` is raised). If you do not supply a sequence of
61 drivers, the module's ``extraction_drivers`` constant will be used, which
62 means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that
65 for driver in drivers or extraction_drivers:
67 driver(filename, extract_dir, progress_filter)
68 except UnrecognizedFormat:
73 raise UnrecognizedFormat(
74 "Not a recognized archive type: %s" % filename
83 def unpack_directory(filename, extract_dir, progress_filter=default_filter):
84 """"Unpack" a directory, using the same interface as for archives
86 Raises ``UnrecognizedFormat`` if `filename` is not a directory
88 if not os.path.isdir(filename):
89 raise UnrecognizedFormat("%s is not a directory" % (filename,))
91 paths = {filename:('',extract_dir)}
92 for base, dirs, files in os.walk(filename):
95 paths[os.path.join(base,d)] = src+d+'/', os.path.join(dst,d)
98 target = os.path.join(dst,f)
99 target = progress_filter(src+f, target)
101 continue # skip non-files
102 ensure_directory(target)
103 f = os.path.join(base,f)
104 shutil.copyfile(f, target)
105 shutil.copystat(f, target)
124 def unpack_zipfile(filename, extract_dir, progress_filter=default_filter):
125 """Unpack zip `filename` to `extract_dir`
127 Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined
128 by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation
129 of the `progress_filter` argument.
132 if not zipfile.is_zipfile(filename):
133 raise UnrecognizedFormat("%s is not a zip file" % (filename,))
135 z = zipfile.ZipFile(filename)
137 for info in z.infolist():
140 # don't extract absolute paths or ones with .. in them
141 if name.startswith('/') or '..' in name:
144 target = os.path.join(extract_dir, *name.split('/'))
145 target = progress_filter(name, target)
148 if name.endswith('/'):
150 ensure_directory(target)
153 ensure_directory(target)
154 data = z.read(info.filename)
155 f = open(target,'wb')
165 def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
166 """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
168 Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
169 by ``tarfile.open()``). See ``unpack_archive()`` for an explanation
170 of the `progress_filter` argument.
174 tarobj = tarfile.open(filename)
175 except tarfile.TarError:
176 raise UnrecognizedFormat(
177 "%s is not a compressed or uncompressed tar file" % (filename,)
181 tarobj.chown = lambda *args: None # don't do any chowning!
182 for member in tarobj:
183 if member.isfile() or member.isdir():
185 # don't extract absolute paths or ones with .. in them
186 if not name.startswith('/') and '..' not in name:
187 dst = os.path.join(extract_dir, *name.split('/'))
188 dst = progress_filter(name, dst)
190 if dst.endswith(os.sep):
193 tarobj._extract_member(member,dst) # XXX Ugh
194 except tarfile.ExtractError:
195 pass # chown/chmod/mkfifo/mknode/makedev failed
203 extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile