]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/mutable.py
storage: more paranoid handling of bounds and palimpsests in mutable share files
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / mutable.py
1 import os, stat, struct
2
3 from allmydata.interfaces import BadWriteEnablerError
4 from allmydata.util import idlib, log
5 from allmydata.util.assertutil import precondition
6 from allmydata.util.hashutil import constant_time_compare
7 from allmydata.storage.lease import LeaseInfo
8 from allmydata.storage.common import UnknownMutableContainerVersionError, \
9      DataTooLargeError
10
11 # the MutableShareFile is like the ShareFile, but used for mutable data. It
12 # has a different layout. See docs/mutable.txt for more details.
13
14 # #   offset    size    name
15 # 1   0         32      magic verstr "tahoe mutable container v1" plus binary
16 # 2   32        20      write enabler's nodeid
17 # 3   52        32      write enabler
18 # 4   84        8       data size (actual share data present) (a)
19 # 5   92        8       offset of (8) count of extra leases (after data)
20 # 6   100       368     four leases, 92 bytes each
21 #                        0    4   ownerid (0 means "no lease here")
22 #                        4    4   expiration timestamp
23 #                        8   32   renewal token
24 #                        40  32   cancel token
25 #                        72  20   nodeid which accepted the tokens
26 # 7   468       (a)     data
27 # 8   ??        4       count of extra leases
28 # 9   ??        n*92    extra leases
29
30
31 # The struct module doc says that L's are 4 bytes in size., and that Q's are
32 # 8 bytes in size. Since compatibility depends upon this, double-check it.
33 assert struct.calcsize(">L") == 4, struct.calcsize(">L")
34 assert struct.calcsize(">Q") == 8, struct.calcsize(">Q")
35
36 class MutableShareFile:
37
38     sharetype = "mutable"
39     DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
40     EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
41     HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
42     LEASE_SIZE = struct.calcsize(">LL32s32s20s")
43     assert LEASE_SIZE == 92
44     DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
45     assert DATA_OFFSET == 468, DATA_OFFSET
46     # our sharefiles share with a recognizable string, plus some random
47     # binary data to reduce the chance that a regular text file will look
48     # like a sharefile.
49     MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
50     assert len(MAGIC) == 32
51     MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
52     # TODO: decide upon a policy for max share size
53
54     def __init__(self, filename, parent=None):
55         self.home = filename
56         if os.path.exists(self.home):
57             # we don't cache anything, just check the magic
58             f = open(self.home, 'rb')
59             data = f.read(self.HEADER_SIZE)
60             (magic,
61              write_enabler_nodeid, write_enabler,
62              data_length, extra_least_offset) = \
63              struct.unpack(">32s20s32sQQ", data)
64             if magic != self.MAGIC:
65                 msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
66                       (filename, magic, self.MAGIC)
67                 raise UnknownMutableContainerVersionError(msg)
68         self.parent = parent # for logging
69
70     def log(self, *args, **kwargs):
71         return self.parent.log(*args, **kwargs)
72
73     def create(self, my_nodeid, write_enabler):
74         assert not os.path.exists(self.home)
75         data_length = 0
76         extra_lease_offset = (self.HEADER_SIZE
77                               + 4 * self.LEASE_SIZE
78                               + data_length)
79         assert extra_lease_offset == self.DATA_OFFSET # true at creation
80         num_extra_leases = 0
81         f = open(self.home, 'wb')
82         header = struct.pack(">32s20s32sQQ",
83                              self.MAGIC, my_nodeid, write_enabler,
84                              data_length, extra_lease_offset,
85                              )
86         leases = ("\x00"*self.LEASE_SIZE) * 4
87         f.write(header + leases)
88         # data goes here, empty after creation
89         f.write(struct.pack(">L", num_extra_leases))
90         # extra leases go here, none at creation
91         f.close()
92
93     def unlink(self):
94         os.unlink(self.home)
95
96     def _read_data_length(self, f):
97         f.seek(self.DATA_LENGTH_OFFSET)
98         (data_length,) = struct.unpack(">Q", f.read(8))
99         return data_length
100
101     def _write_data_length(self, f, data_length):
102         f.seek(self.DATA_LENGTH_OFFSET)
103         f.write(struct.pack(">Q", data_length))
104
105     def _read_share_data(self, f, offset, length):
106         precondition(offset >= 0)
107         data_length = self._read_data_length(f)
108         if offset+length > data_length:
109             # reads beyond the end of the data are truncated. Reads that
110             # start beyond the end of the data return an empty string.
111             length = max(0, data_length-offset)
112         if length == 0:
113             return ""
114         precondition(offset+length <= data_length)
115         f.seek(self.DATA_OFFSET+offset)
116         data = f.read(length)
117         return data
118
119     def _read_extra_lease_offset(self, f):
120         f.seek(self.EXTRA_LEASE_OFFSET)
121         (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
122         return extra_lease_offset
123
124     def _write_extra_lease_offset(self, f, offset):
125         f.seek(self.EXTRA_LEASE_OFFSET)
126         f.write(struct.pack(">Q", offset))
127
128     def _read_num_extra_leases(self, f):
129         offset = self._read_extra_lease_offset(f)
130         f.seek(offset)
131         (num_extra_leases,) = struct.unpack(">L", f.read(4))
132         return num_extra_leases
133
134     def _write_num_extra_leases(self, f, num_leases):
135         extra_lease_offset = self._read_extra_lease_offset(f)
136         f.seek(extra_lease_offset)
137         f.write(struct.pack(">L", num_leases))
138
139     def _change_container_size(self, f, new_container_size):
140         if new_container_size > self.MAX_SIZE:
141             raise DataTooLargeError()
142         old_extra_lease_offset = self._read_extra_lease_offset(f)
143         new_extra_lease_offset = self.DATA_OFFSET + new_container_size
144         if new_extra_lease_offset < old_extra_lease_offset:
145             # TODO: allow containers to shrink. For now they remain large.
146             return
147         num_extra_leases = self._read_num_extra_leases(f)
148         f.seek(old_extra_lease_offset)
149         leases_size = 4 + num_extra_leases * self.LEASE_SIZE
150         extra_lease_data = f.read(leases_size)
151
152         # Zero out the old lease info (in order to minimize the chance that
153         # it could accidentally be exposed to a reader later, re #1528).
154         f.seek(old_extra_lease_offset)
155         f.write('\x00' * leases_size)
156         f.flush()
157
158         # An interrupt here will corrupt the leases.
159
160         f.seek(new_extra_lease_offset)
161         f.write(extra_lease_data)
162         self._write_extra_lease_offset(f, new_extra_lease_offset)
163
164     def _write_share_data(self, f, offset, data):
165         length = len(data)
166         precondition(offset >= 0)
167         data_length = self._read_data_length(f)
168         extra_lease_offset = self._read_extra_lease_offset(f)
169
170         if offset+length >= data_length:
171             # They are expanding their data size.
172
173             if self.DATA_OFFSET+offset+length > extra_lease_offset:
174                 # TODO: allow containers to shrink. For now, they remain
175                 # large.
176
177                 # Their new data won't fit in the current container, so we
178                 # have to move the leases. With luck, they're expanding it
179                 # more than the size of the extra lease block, which will
180                 # minimize the corrupt-the-share window
181                 self._change_container_size(f, offset+length)
182                 extra_lease_offset = self._read_extra_lease_offset(f)
183
184                 # an interrupt here is ok.. the container has been enlarged
185                 # but the data remains untouched
186
187             assert self.DATA_OFFSET+offset+length <= extra_lease_offset
188             # Their data now fits in the current container. We must write
189             # their new data and modify the recorded data size.
190
191             # Fill any newly exposed empty space with 0's.
192             if offset > data_length:
193                 f.seek(self.DATA_OFFSET+data_length)
194                 f.write('\x00'*(offset - data_length))
195                 f.flush()
196
197             new_data_length = offset+length
198             self._write_data_length(f, new_data_length)
199             # an interrupt here will result in a corrupted share
200
201         # now all that's left to do is write out their data
202         f.seek(self.DATA_OFFSET+offset)
203         f.write(data)
204         return
205
206     def _write_lease_record(self, f, lease_number, lease_info):
207         extra_lease_offset = self._read_extra_lease_offset(f)
208         num_extra_leases = self._read_num_extra_leases(f)
209         if lease_number < 4:
210             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
211         elif (lease_number-4) < num_extra_leases:
212             offset = (extra_lease_offset
213                       + 4
214                       + (lease_number-4)*self.LEASE_SIZE)
215         else:
216             # must add an extra lease record
217             self._write_num_extra_leases(f, num_extra_leases+1)
218             offset = (extra_lease_offset
219                       + 4
220                       + (lease_number-4)*self.LEASE_SIZE)
221         f.seek(offset)
222         assert f.tell() == offset
223         f.write(lease_info.to_mutable_data())
224
225     def _read_lease_record(self, f, lease_number):
226         # returns a LeaseInfo instance, or None
227         extra_lease_offset = self._read_extra_lease_offset(f)
228         num_extra_leases = self._read_num_extra_leases(f)
229         if lease_number < 4:
230             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
231         elif (lease_number-4) < num_extra_leases:
232             offset = (extra_lease_offset
233                       + 4
234                       + (lease_number-4)*self.LEASE_SIZE)
235         else:
236             raise IndexError("No such lease number %d" % lease_number)
237         f.seek(offset)
238         assert f.tell() == offset
239         data = f.read(self.LEASE_SIZE)
240         lease_info = LeaseInfo().from_mutable_data(data)
241         if lease_info.owner_num == 0:
242             return None
243         return lease_info
244
245     def _get_num_lease_slots(self, f):
246         # how many places do we have allocated for leases? Not all of them
247         # are filled.
248         num_extra_leases = self._read_num_extra_leases(f)
249         return 4+num_extra_leases
250
251     def _get_first_empty_lease_slot(self, f):
252         # return an int with the index of an empty slot, or None if we do not
253         # currently have an empty slot
254
255         for i in range(self._get_num_lease_slots(f)):
256             if self._read_lease_record(f, i) is None:
257                 return i
258         return None
259
260     def get_leases(self):
261         """Yields a LeaseInfo instance for all leases."""
262         f = open(self.home, 'rb')
263         for i, lease in self._enumerate_leases(f):
264             yield lease
265         f.close()
266
267     def _enumerate_leases(self, f):
268         for i in range(self._get_num_lease_slots(f)):
269             try:
270                 data = self._read_lease_record(f, i)
271                 if data is not None:
272                     yield i,data
273             except IndexError:
274                 return
275
276     def add_lease(self, lease_info):
277         precondition(lease_info.owner_num != 0) # 0 means "no lease here"
278         f = open(self.home, 'rb+')
279         num_lease_slots = self._get_num_lease_slots(f)
280         empty_slot = self._get_first_empty_lease_slot(f)
281         if empty_slot is not None:
282             self._write_lease_record(f, empty_slot, lease_info)
283         else:
284             self._write_lease_record(f, num_lease_slots, lease_info)
285         f.close()
286
287     def renew_lease(self, renew_secret, new_expire_time):
288         accepting_nodeids = set()
289         f = open(self.home, 'rb+')
290         for (leasenum,lease) in self._enumerate_leases(f):
291             if constant_time_compare(lease.renew_secret, renew_secret):
292                 # yup. See if we need to update the owner time.
293                 if new_expire_time > lease.expiration_time:
294                     # yes
295                     lease.expiration_time = new_expire_time
296                     self._write_lease_record(f, leasenum, lease)
297                 f.close()
298                 return
299             accepting_nodeids.add(lease.nodeid)
300         f.close()
301         # Return the accepting_nodeids set, to give the client a chance to
302         # update the leases on a share which has been migrated from its
303         # original server to a new one.
304         msg = ("Unable to renew non-existent lease. I have leases accepted by"
305                " nodeids: ")
306         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
307                          for anid in accepting_nodeids])
308         msg += " ."
309         raise IndexError(msg)
310
311     def add_or_renew_lease(self, lease_info):
312         precondition(lease_info.owner_num != 0) # 0 means "no lease here"
313         try:
314             self.renew_lease(lease_info.renew_secret,
315                              lease_info.expiration_time)
316         except IndexError:
317             self.add_lease(lease_info)
318
319     def cancel_lease(self, cancel_secret):
320         """Remove any leases with the given cancel_secret. If the last lease
321         is cancelled, the file will be removed. Return the number of bytes
322         that were freed (by truncating the list of leases, and possibly by
323         deleting the file. Raise IndexError if there was no lease with the
324         given cancel_secret."""
325
326         accepting_nodeids = set()
327         modified = 0
328         remaining = 0
329         blank_lease = LeaseInfo(owner_num=0,
330                                 renew_secret="\x00"*32,
331                                 cancel_secret="\x00"*32,
332                                 expiration_time=0,
333                                 nodeid="\x00"*20)
334         f = open(self.home, 'rb+')
335         for (leasenum,lease) in self._enumerate_leases(f):
336             accepting_nodeids.add(lease.nodeid)
337             if constant_time_compare(lease.cancel_secret, cancel_secret):
338                 self._write_lease_record(f, leasenum, blank_lease)
339                 modified += 1
340             else:
341                 remaining += 1
342         if modified:
343             freed_space = self._pack_leases(f)
344             f.close()
345             if not remaining:
346                 freed_space += os.stat(self.home)[stat.ST_SIZE]
347                 self.unlink()
348             return freed_space
349
350         msg = ("Unable to cancel non-existent lease. I have leases "
351                "accepted by nodeids: ")
352         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
353                          for anid in accepting_nodeids])
354         msg += " ."
355         raise IndexError(msg)
356
357     def _pack_leases(self, f):
358         # TODO: reclaim space from cancelled leases
359         return 0
360
361     def _read_write_enabler_and_nodeid(self, f):
362         f.seek(0)
363         data = f.read(self.HEADER_SIZE)
364         (magic,
365          write_enabler_nodeid, write_enabler,
366          data_length, extra_least_offset) = \
367          struct.unpack(">32s20s32sQQ", data)
368         assert magic == self.MAGIC
369         return (write_enabler, write_enabler_nodeid)
370
371     def readv(self, readv):
372         datav = []
373         f = open(self.home, 'rb')
374         for (offset, length) in readv:
375             datav.append(self._read_share_data(f, offset, length))
376         f.close()
377         return datav
378
379 #    def remote_get_length(self):
380 #        f = open(self.home, 'rb')
381 #        data_length = self._read_data_length(f)
382 #        f.close()
383 #        return data_length
384
385     def check_write_enabler(self, write_enabler, si_s):
386         f = open(self.home, 'rb+')
387         (real_write_enabler, write_enabler_nodeid) = \
388                              self._read_write_enabler_and_nodeid(f)
389         f.close()
390         # avoid a timing attack
391         #if write_enabler != real_write_enabler:
392         if not constant_time_compare(write_enabler, real_write_enabler):
393             # accomodate share migration by reporting the nodeid used for the
394             # old write enabler.
395             self.log(format="bad write enabler on SI %(si)s,"
396                      " recorded by nodeid %(nodeid)s",
397                      facility="tahoe.storage",
398                      level=log.WEIRD, umid="cE1eBQ",
399                      si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
400             msg = "The write enabler was recorded by nodeid '%s'." % \
401                   (idlib.nodeid_b2a(write_enabler_nodeid),)
402             raise BadWriteEnablerError(msg)
403
404     def check_testv(self, testv):
405         test_good = True
406         f = open(self.home, 'rb+')
407         for (offset, length, operator, specimen) in testv:
408             data = self._read_share_data(f, offset, length)
409             if not testv_compare(data, operator, specimen):
410                 test_good = False
411                 break
412         f.close()
413         return test_good
414
415     def writev(self, datav, new_length):
416         f = open(self.home, 'rb+')
417         for (offset, data) in datav:
418             self._write_share_data(f, offset, data)
419         if new_length is not None:
420             cur_length = self._read_data_length(f)
421             if new_length < cur_length:
422                 self._write_data_length(f, new_length)
423                 # TODO: if we're going to shrink the share file when the
424                 # share data has shrunk, then call
425                 # self._change_container_size() here.
426         f.close()
427
428 def testv_compare(a, op, b):
429     assert op in ("lt", "le", "eq", "ne", "ge", "gt")
430     if op == "lt":
431         return a < b
432     if op == "le":
433         return a <= b
434     if op == "eq":
435         return a == b
436     if op == "ne":
437         return a != b
438     if op == "ge":
439         return a >= b
440     if op == "gt":
441         return a > b
442     # never reached
443
444 class EmptyShare:
445
446     def check_testv(self, testv):
447         test_good = True
448         for (offset, length, operator, specimen) in testv:
449             data = ""
450             if not testv_compare(data, operator, specimen):
451                 test_good = False
452                 break
453         return test_good
454
455 def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
456     ms = MutableShareFile(filename, parent)
457     ms.create(my_nodeid, write_enabler)
458     del ms
459     return MutableShareFile(filename, parent)
460