]> git.rkrishnan.org Git - tahoe-lafs/zfec.git/blob - setuptools-0.6c16dev3.egg/setuptools/archive_util.py
zfec: rearrange files
[tahoe-lafs/zfec.git] / setuptools-0.6c16dev3.egg / setuptools / archive_util.py
1 """Utilities for extracting common archive formats"""
2
3
4 __all__ = [
5     "unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter",
6     "UnrecognizedFormat", "extraction_drivers", "unpack_directory",
7 ]
8
9 import zipfile, tarfile, os, shutil
10 from pkg_resources import ensure_directory
11 from distutils.errors import DistutilsError
12
13 class UnrecognizedFormat(DistutilsError):
14     """Couldn't recognize the archive type"""
15
16 def default_filter(src,dst):
17     """The default progress/filter callback; returns True for all files"""
18     return dst
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42 def unpack_archive(filename, extract_dir, progress_filter=default_filter,
43     drivers=None
44 ):
45     """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat``
46
47     `progress_filter` is a function taking two arguments: a source path
48     internal to the archive ('/'-separated), and a filesystem path where it
49     will be extracted.  The callback must return the desired extract path
50     (which may be the same as the one passed in), or else ``None`` to skip
51     that file or directory.  The callback can thus be used to report on the
52     progress of the extraction, as well as to filter the items extracted or
53     alter their extraction paths.
54
55     `drivers`, if supplied, must be a non-empty sequence of functions with the
56     same signature as this function (minus the `drivers` argument), that raise
57     ``UnrecognizedFormat`` if they do not support extracting the designated
58     archive type.  The `drivers` are tried in sequence until one is found that
59     does not raise an error, or until all are exhausted (in which case
60     ``UnrecognizedFormat`` is raised).  If you do not supply a sequence of
61     drivers, the module's ``extraction_drivers`` constant will be used, which
62     means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that
63     order.
64     """
65     for driver in drivers or extraction_drivers:
66         try:
67             driver(filename, extract_dir, progress_filter)
68         except UnrecognizedFormat:
69             continue
70         else:
71             return
72     else:
73         raise UnrecognizedFormat(
74             "Not a recognized archive type: %s" % filename
75         )
76
77
78
79
80
81
82
83 def unpack_directory(filename, extract_dir, progress_filter=default_filter):
84     """"Unpack" a directory, using the same interface as for archives
85
86     Raises ``UnrecognizedFormat`` if `filename` is not a directory
87     """
88     if not os.path.isdir(filename):
89         raise UnrecognizedFormat("%s is not a directory" % (filename,))
90
91     paths = {filename:('',extract_dir)}
92     for base, dirs, files in os.walk(filename):
93         src,dst = paths[base]
94         for d in dirs:
95             paths[os.path.join(base,d)] = src+d+'/', os.path.join(dst,d)
96         for f in files:
97             name = src+f
98             target = os.path.join(dst,f)
99             target = progress_filter(src+f, target)
100             if not target:
101                 continue    # skip non-files
102             ensure_directory(target)
103             f = os.path.join(base,f)
104             shutil.copyfile(f, target)
105             shutil.copystat(f, target)
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124 def unpack_zipfile(filename, extract_dir, progress_filter=default_filter):
125     """Unpack zip `filename` to `extract_dir`
126
127     Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined
128     by ``zipfile.is_zipfile()``).  See ``unpack_archive()`` for an explanation
129     of the `progress_filter` argument.
130     """
131
132     if not zipfile.is_zipfile(filename):
133         raise UnrecognizedFormat("%s is not a zip file" % (filename,))
134
135     z = zipfile.ZipFile(filename)
136     try:
137         for info in z.infolist():
138             name = info.filename
139
140             # don't extract absolute paths or ones with .. in them
141             if name.startswith('/') or '..' in name:
142                 continue
143
144             target = os.path.join(extract_dir, *name.split('/'))
145             target = progress_filter(name, target)
146             if not target:
147                 continue
148             if name.endswith('/'):
149                 # directory
150                 ensure_directory(target)
151             else:
152                 # file
153                 ensure_directory(target)
154                 data = z.read(info.filename)
155                 f = open(target,'wb')
156                 try:
157                     f.write(data)
158                 finally:
159                     f.close()
160                     del data
161     finally:
162         z.close()
163
164
165 def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
166     """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
167
168     Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
169     by ``tarfile.open()``).  See ``unpack_archive()`` for an explanation
170     of the `progress_filter` argument.
171     """
172
173     try:
174         tarobj = tarfile.open(filename)
175     except tarfile.TarError:
176         raise UnrecognizedFormat(
177             "%s is not a compressed or uncompressed tar file" % (filename,)
178         )
179
180     try:
181         tarobj.chown = lambda *args: None   # don't do any chowning!
182         for member in tarobj:
183             if member.isfile() or member.isdir():
184                 name = member.name
185                 # don't extract absolute paths or ones with .. in them
186                 if not name.startswith('/') and '..' not in name:
187                     dst = os.path.join(extract_dir, *name.split('/'))
188                     dst = progress_filter(name, dst)
189                     if dst:
190                         if dst.endswith(os.sep):
191                             dst = dst[:-1]
192                         try:
193                             tarobj._extract_member(member,dst)  # XXX Ugh
194                         except tarfile.ExtractError:
195                             pass    # chown/chmod/mkfifo/mknode/makedev failed
196         return True
197     finally:
198         tarobj.close()
199
200
201
202
203 extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile
204
205