]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/immutable.py
immutable: prevent clients from reading past the end of share data, which would allow...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / immutable.py
1 import os, stat, struct, time
2
3 from foolscap.api import Referenceable
4
5 from zope.interface import implements
6 from allmydata.interfaces import RIBucketWriter, RIBucketReader
7 from allmydata.util import base32, fileutil, log
8 from allmydata.util.assertutil import precondition
9 from allmydata.util.hashutil import constant_time_compare
10 from allmydata.storage.lease import LeaseInfo
11 from allmydata.storage.common import UnknownImmutableContainerVersionError, \
12      DataTooLargeError
13
14 # each share file (in storage/shares/$SI/$SHNUM) contains lease information
15 # and share data. The share data is accessed by RIBucketWriter.write and
16 # RIBucketReader.read . The lease information is not accessible through these
17 # interfaces.
18
19 # The share file has the following layout:
20 #  0x00: share file version number, four bytes, current version is 1
21 #  0x04: share data length, four bytes big-endian = A # See Footnote 1 below.
22 #  0x08: number of leases, four bytes big-endian
23 #  0x0c: beginning of share data (see immutable.layout.WriteBucketProxy)
24 #  A+0x0c = B: first lease. Lease format is:
25 #   B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner
26 #   B+0x04: renew secret, 32 bytes (SHA256)
27 #   B+0x24: cancel secret, 32 bytes (SHA256)
28 #   B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
29 #   B+0x48: next lease, or end of record
30
31 # Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers,
32 # but it is still filled in by storage servers in case the storage server
33 # software gets downgraded from >= Tahoe v1.3.0 to < Tahoe v1.3.0, or the
34 # share file is moved from one storage server to another. The value stored in
35 # this field is truncated, so if the actual share data length is >= 2**32,
36 # then the value stored in this field will be the actual share data length
37 # modulo 2**32.
38
39 class ShareFile:
40     LEASE_SIZE = struct.calcsize(">L32s32sL")
41     sharetype = "immutable"
42
43     def __init__(self, filename, max_size=None, create=False):
44         """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """
45         precondition((max_size is not None) or (not create), max_size, create)
46         self.home = filename
47         self._max_size = max_size
48         if create:
49             # touch the file, so later callers will see that we're working on
50             # it. Also construct the metadata.
51             assert not os.path.exists(self.home)
52             fileutil.make_dirs(os.path.dirname(self.home))
53             f = open(self.home, 'wb')
54             # The second field -- the four-byte share data length -- is no
55             # longer used as of Tahoe v1.3.0, but we continue to write it in
56             # there in case someone downgrades a storage server from >=
57             # Tahoe-1.3.0 to < Tahoe-1.3.0, or moves a share file from one
58             # server to another, etc. We do saturation -- a share data length
59             # larger than 2**32-1 (what can fit into the field) is marked as
60             # the largest length that can fit into the field. That way, even
61             # if this does happen, the old < v1.3.0 server will still allow
62             # clients to read the first part of the share.
63             f.write(struct.pack(">LLL", 1, min(2**32-1, max_size), 0))
64             f.close()
65             self._lease_offset = max_size + 0x0c
66             self._num_leases = 0
67         else:
68             f = open(self.home, 'rb')
69             filesize = os.path.getsize(self.home)
70             (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc))
71             f.close()
72             if version != 1:
73                 msg = "sharefile %s had version %d but we wanted 1" % \
74                       (filename, version)
75                 raise UnknownImmutableContainerVersionError(msg)
76             self._num_leases = num_leases
77             self._lease_offset = filesize - (num_leases * self.LEASE_SIZE)
78         self._data_offset = 0xc
79
80     def unlink(self):
81         os.unlink(self.home)
82
83     def read_share_data(self, offset, length):
84         precondition(offset >= 0)
85         # reads beyond the end of the data are truncated. Reads that start
86         # beyond the end of the data return an empty string.
87         seekpos = self._data_offset+offset
88         actuallength = max(0, min(length, self._lease_offset-seekpos))
89         if actuallength == 0:
90             return ""
91         f = open(self.home, 'rb')
92         f.seek(seekpos)
93         return f.read(actuallength)
94
95     def write_share_data(self, offset, data):
96         length = len(data)
97         precondition(offset >= 0, offset)
98         if self._max_size is not None and offset+length > self._max_size:
99             raise DataTooLargeError(self._max_size, offset, length)
100         f = open(self.home, 'rb+')
101         real_offset = self._data_offset+offset
102         f.seek(real_offset)
103         assert f.tell() == real_offset
104         f.write(data)
105         f.close()
106
107     def _write_lease_record(self, f, lease_number, lease_info):
108         offset = self._lease_offset + lease_number * self.LEASE_SIZE
109         f.seek(offset)
110         assert f.tell() == offset
111         f.write(lease_info.to_immutable_data())
112
113     def _read_num_leases(self, f):
114         f.seek(0x08)
115         (num_leases,) = struct.unpack(">L", f.read(4))
116         return num_leases
117
118     def _write_num_leases(self, f, num_leases):
119         f.seek(0x08)
120         f.write(struct.pack(">L", num_leases))
121
122     def _truncate_leases(self, f, num_leases):
123         f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
124
125     def get_leases(self):
126         """Yields a LeaseInfo instance for all leases."""
127         f = open(self.home, 'rb')
128         (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc))
129         f.seek(self._lease_offset)
130         for i in range(num_leases):
131             data = f.read(self.LEASE_SIZE)
132             if data:
133                 yield LeaseInfo().from_immutable_data(data)
134
135     def add_lease(self, lease_info):
136         f = open(self.home, 'rb+')
137         num_leases = self._read_num_leases(f)
138         self._write_lease_record(f, num_leases, lease_info)
139         self._write_num_leases(f, num_leases+1)
140         f.close()
141
142     def renew_lease(self, renew_secret, new_expire_time):
143         for i,lease in enumerate(self.get_leases()):
144             if constant_time_compare(lease.renew_secret, renew_secret):
145                 # yup. See if we need to update the owner time.
146                 if new_expire_time > lease.expiration_time:
147                     # yes
148                     lease.expiration_time = new_expire_time
149                     f = open(self.home, 'rb+')
150                     self._write_lease_record(f, i, lease)
151                     f.close()
152                 return
153         raise IndexError("unable to renew non-existent lease")
154
155     def add_or_renew_lease(self, lease_info):
156         try:
157             self.renew_lease(lease_info.renew_secret,
158                              lease_info.expiration_time)
159         except IndexError:
160             self.add_lease(lease_info)
161
162
163     def cancel_lease(self, cancel_secret):
164         """Remove a lease with the given cancel_secret. If the last lease is
165         cancelled, the file will be removed. Return the number of bytes that
166         were freed (by truncating the list of leases, and possibly by
167         deleting the file. Raise IndexError if there was no lease with the
168         given cancel_secret.
169         """
170
171         leases = list(self.get_leases())
172         num_leases_removed = 0
173         for i,lease in enumerate(leases):
174             if constant_time_compare(lease.cancel_secret, cancel_secret):
175                 leases[i] = None
176                 num_leases_removed += 1
177         if not num_leases_removed:
178             raise IndexError("unable to find matching lease to cancel")
179         if num_leases_removed:
180             # pack and write out the remaining leases. We write these out in
181             # the same order as they were added, so that if we crash while
182             # doing this, we won't lose any non-cancelled leases.
183             leases = [l for l in leases if l] # remove the cancelled leases
184             f = open(self.home, 'rb+')
185             for i,lease in enumerate(leases):
186                 self._write_lease_record(f, i, lease)
187             self._write_num_leases(f, len(leases))
188             self._truncate_leases(f, len(leases))
189             f.close()
190         space_freed = self.LEASE_SIZE * num_leases_removed
191         if not len(leases):
192             space_freed += os.stat(self.home)[stat.ST_SIZE]
193             self.unlink()
194         return space_freed
195
196
197 class BucketWriter(Referenceable):
198     implements(RIBucketWriter)
199
200     def __init__(self, ss, incominghome, finalhome, max_size, lease_info, canary):
201         self.ss = ss
202         self.incominghome = incominghome
203         self.finalhome = finalhome
204         self._max_size = max_size # don't allow the client to write more than this
205         self._canary = canary
206         self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
207         self.closed = False
208         self.throw_out_all_data = False
209         self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
210         # also, add our lease to the file now, so that other ones can be
211         # added by simultaneous uploaders
212         self._sharefile.add_lease(lease_info)
213
214     def allocated_size(self):
215         return self._max_size
216
217     def remote_write(self, offset, data):
218         start = time.time()
219         precondition(not self.closed)
220         if self.throw_out_all_data:
221             return
222         self._sharefile.write_share_data(offset, data)
223         self.ss.add_latency("write", time.time() - start)
224         self.ss.count("write")
225
226     def remote_close(self):
227         precondition(not self.closed)
228         start = time.time()
229
230         fileutil.make_dirs(os.path.dirname(self.finalhome))
231         fileutil.rename(self.incominghome, self.finalhome)
232         try:
233             # self.incominghome is like storage/shares/incoming/ab/abcde/4 .
234             # We try to delete the parent (.../ab/abcde) to avoid leaving
235             # these directories lying around forever, but the delete might
236             # fail if we're working on another share for the same storage
237             # index (like ab/abcde/5). The alternative approach would be to
238             # use a hierarchy of objects (PrefixHolder, BucketHolder,
239             # ShareWriter), each of which is responsible for a single
240             # directory on disk, and have them use reference counting of
241             # their children to know when they should do the rmdir. This
242             # approach is simpler, but relies on os.rmdir refusing to delete
243             # a non-empty directory. Do *not* use fileutil.rm_dir() here!
244             os.rmdir(os.path.dirname(self.incominghome))
245             # we also delete the grandparent (prefix) directory, .../ab ,
246             # again to avoid leaving directories lying around. This might
247             # fail if there is another bucket open that shares a prefix (like
248             # ab/abfff).
249             os.rmdir(os.path.dirname(os.path.dirname(self.incominghome)))
250             # we leave the great-grandparent (incoming/) directory in place.
251         except EnvironmentError:
252             # ignore the "can't rmdir because the directory is not empty"
253             # exceptions, those are normal consequences of the
254             # above-mentioned conditions.
255             pass
256         self._sharefile = None
257         self.closed = True
258         self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
259
260         filelen = os.stat(self.finalhome)[stat.ST_SIZE]
261         self.ss.bucket_writer_closed(self, filelen)
262         self.ss.add_latency("close", time.time() - start)
263         self.ss.count("close")
264
265     def _disconnected(self):
266         if not self.closed:
267             self._abort()
268
269     def remote_abort(self):
270         log.msg("storage: aborting sharefile %s" % self.incominghome,
271                 facility="tahoe.storage", level=log.UNUSUAL)
272         if not self.closed:
273             self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
274         self._abort()
275         self.ss.count("abort")
276
277     def _abort(self):
278         if self.closed:
279             return
280
281         os.remove(self.incominghome)
282         # if we were the last share to be moved, remove the incoming/
283         # directory that was our parent
284         parentdir = os.path.split(self.incominghome)[0]
285         if not os.listdir(parentdir):
286             os.rmdir(parentdir)
287         self._sharefile = None
288
289         # We are now considered closed for further writing. We must tell
290         # the storage server about this so that it stops expecting us to
291         # use the space it allocated for us earlier.
292         self.closed = True
293         self.ss.bucket_writer_closed(self, 0)
294
295
296 class BucketReader(Referenceable):
297     implements(RIBucketReader)
298
299     def __init__(self, ss, sharefname, storage_index=None, shnum=None):
300         self.ss = ss
301         self._share_file = ShareFile(sharefname)
302         self.storage_index = storage_index
303         self.shnum = shnum
304
305     def __repr__(self):
306         return "<%s %s %s>" % (self.__class__.__name__,
307                                base32.b2a_l(self.storage_index[:8], 60),
308                                self.shnum)
309
310     def remote_read(self, offset, length):
311         start = time.time()
312         data = self._share_file.read_share_data(offset, length)
313         self.ss.add_latency("read", time.time() - start)
314         self.ss.count("read")
315         return data
316
317     def remote_advise_corrupt_share(self, reason):
318         return self.ss.remote_advise_corrupt_share("immutable",
319                                                    self.storage_index,
320                                                    self.shnum,
321                                                    reason)