]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/mutable.py
improve the storage/mutable.py asserts even more
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / mutable.py
1 import os, stat, struct
2
3 from allmydata.interfaces import BadWriteEnablerError
4 from allmydata.util import idlib, log
5 from allmydata.util.assertutil import precondition
6 from allmydata.util.hashutil import constant_time_compare
7 from allmydata.storage.lease import LeaseInfo
8 from allmydata.storage.common import UnknownMutableContainerVersionError, \
9      DataTooLargeError
10
11 # the MutableShareFile is like the ShareFile, but used for mutable data. It
12 # has a different layout. See docs/mutable.txt for more details.
13
14 # #   offset    size    name
15 # 1   0         32      magic verstr "tahoe mutable container v1" plus binary
16 # 2   32        20      write enabler's nodeid
17 # 3   52        32      write enabler
18 # 4   84        8       data size (actual share data present) (a)
19 # 5   92        8       offset of (8) count of extra leases (after data)
20 # 6   100       368     four leases, 92 bytes each
21 #                        0    4   ownerid (0 means "no lease here")
22 #                        4    4   expiration timestamp
23 #                        8   32   renewal token
24 #                        40  32   cancel token
25 #                        72  20   nodeid which accepted the tokens
26 # 7   468       (a)     data
27 # 8   ??        4       count of extra leases
28 # 9   ??        n*92    extra leases
29
30
31 # The struct module doc says that L's are 4 bytes in size., and that Q's are
32 # 8 bytes in size. Since compatibility depends upon this, double-check it.
33 assert struct.calcsize(">L") == 4, struct.calcsize(">L")
34 assert struct.calcsize(">Q") == 8, struct.calcsize(">Q")
35
36 class MutableShareFile:
37
38     sharetype = "mutable"
39     DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
40     EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
41     HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
42     LEASE_SIZE = struct.calcsize(">LL32s32s20s")
43     assert LEASE_SIZE == 92
44     DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
45     assert DATA_OFFSET == 468, DATA_OFFSET
46     # our sharefiles share with a recognizable string, plus some random
47     # binary data to reduce the chance that a regular text file will look
48     # like a sharefile.
49     MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
50     assert len(MAGIC) == 32
51     MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
52     # TODO: decide upon a policy for max share size
53
54     def __init__(self, filename, parent=None):
55         self.home = filename
56         if os.path.exists(self.home):
57             # we don't cache anything, just check the magic
58             f = open(self.home, 'rb')
59             data = f.read(self.HEADER_SIZE)
60             (magic,
61              write_enabler_nodeid, write_enabler,
62              data_length, extra_least_offset) = \
63              struct.unpack(">32s20s32sQQ", data)
64             if magic != self.MAGIC:
65                 msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
66                       (filename, magic, self.MAGIC)
67                 raise UnknownMutableContainerVersionError(msg)
68         self.parent = parent # for logging
69
70     def log(self, *args, **kwargs):
71         return self.parent.log(*args, **kwargs)
72
73     def create(self, my_nodeid, write_enabler):
74         assert not os.path.exists(self.home)
75         data_length = 0
76         extra_lease_offset = (self.HEADER_SIZE
77                               + 4 * self.LEASE_SIZE
78                               + data_length)
79         assert extra_lease_offset == self.DATA_OFFSET # true at creation
80         num_extra_leases = 0
81         f = open(self.home, 'wb')
82         header = struct.pack(">32s20s32sQQ",
83                              self.MAGIC, my_nodeid, write_enabler,
84                              data_length, extra_lease_offset,
85                              )
86         leases = ("\x00"*self.LEASE_SIZE) * 4
87         f.write(header + leases)
88         # data goes here, empty after creation
89         f.write(struct.pack(">L", num_extra_leases))
90         # extra leases go here, none at creation
91         f.close()
92
93     def unlink(self):
94         os.unlink(self.home)
95
96     def _read_data_length(self, f):
97         f.seek(self.DATA_LENGTH_OFFSET)
98         (data_length,) = struct.unpack(">Q", f.read(8))
99         return data_length
100
101     def _write_data_length(self, f, data_length):
102         f.seek(self.DATA_LENGTH_OFFSET)
103         f.write(struct.pack(">Q", data_length))
104
105     def _read_share_data(self, f, offset, length):
106         precondition(offset >= 0)
107         data_length = self._read_data_length(f)
108         if offset+length > data_length:
109             # reads beyond the end of the data are truncated. Reads that
110             # start beyond the end of the data return an empty string.
111             length = max(0, data_length-offset)
112         if length == 0:
113             return ""
114         precondition(offset+length <= data_length)
115         f.seek(self.DATA_OFFSET+offset)
116         data = f.read(length)
117         return data
118
119     def _read_extra_lease_offset(self, f):
120         f.seek(self.EXTRA_LEASE_OFFSET)
121         (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
122         return extra_lease_offset
123
124     def _write_extra_lease_offset(self, f, offset):
125         f.seek(self.EXTRA_LEASE_OFFSET)
126         f.write(struct.pack(">Q", offset))
127
128     def _read_num_extra_leases(self, f):
129         offset = self._read_extra_lease_offset(f)
130         f.seek(offset)
131         (num_extra_leases,) = struct.unpack(">L", f.read(4))
132         return num_extra_leases
133
134     def _write_num_extra_leases(self, f, num_leases):
135         extra_lease_offset = self._read_extra_lease_offset(f)
136         f.seek(extra_lease_offset)
137         f.write(struct.pack(">L", num_leases))
138
139     def _change_container_size(self, f, new_container_size):
140         if new_container_size > self.MAX_SIZE:
141             raise DataTooLargeError()
142         old_extra_lease_offset = self._read_extra_lease_offset(f)
143         new_extra_lease_offset = self.DATA_OFFSET + new_container_size
144         if new_extra_lease_offset < old_extra_lease_offset:
145             # TODO: allow containers to shrink. For now they remain large.
146             return
147         num_extra_leases = self._read_num_extra_leases(f)
148         f.seek(old_extra_lease_offset)
149         extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
150         f.seek(new_extra_lease_offset)
151         f.write(extra_lease_data)
152         # an interrupt here will corrupt the leases, iff the move caused the
153         # extra leases to overlap.
154         self._write_extra_lease_offset(f, new_extra_lease_offset)
155
156     def _write_share_data(self, f, offset, data):
157         length = len(data)
158         precondition(offset >= 0)
159         data_length = self._read_data_length(f)
160         extra_lease_offset = self._read_extra_lease_offset(f)
161
162         if offset+length >= data_length:
163             # They are expanding their data size.
164             if self.DATA_OFFSET+offset+length > extra_lease_offset:
165                 # Their new data won't fit in the current container, so we
166                 # have to move the leases. With luck, they're expanding it
167                 # more than the size of the extra lease block, which will
168                 # minimize the corrupt-the-share window
169                 self._change_container_size(f, offset+length)
170                 extra_lease_offset = self._read_extra_lease_offset(f)
171
172                 # an interrupt here is ok.. the container has been enlarged
173                 # but the data remains untouched
174
175             assert self.DATA_OFFSET+offset+length <= extra_lease_offset
176             # Their data now fits in the current container. We must write
177             # their new data and modify the recorded data size.
178             new_data_length = offset+length
179             self._write_data_length(f, new_data_length)
180             # an interrupt here will result in a corrupted share
181
182         # now all that's left to do is write out their data
183         f.seek(self.DATA_OFFSET+offset)
184         f.write(data)
185         return
186
187     def _write_lease_record(self, f, lease_number, lease_info):
188         extra_lease_offset = self._read_extra_lease_offset(f)
189         num_extra_leases = self._read_num_extra_leases(f)
190         if lease_number < 4:
191             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
192         elif (lease_number-4) < num_extra_leases:
193             offset = (extra_lease_offset
194                       + 4
195                       + (lease_number-4)*self.LEASE_SIZE)
196         else:
197             # must add an extra lease record
198             self._write_num_extra_leases(f, num_extra_leases+1)
199             offset = (extra_lease_offset
200                       + 4
201                       + (lease_number-4)*self.LEASE_SIZE)
202         f.seek(offset)
203         assert f.tell() == offset
204         f.write(lease_info.to_mutable_data())
205
206     def _read_lease_record(self, f, lease_number):
207         # returns a LeaseInfo instance, or None
208         extra_lease_offset = self._read_extra_lease_offset(f)
209         num_extra_leases = self._read_num_extra_leases(f)
210         if lease_number < 4:
211             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
212         elif (lease_number-4) < num_extra_leases:
213             offset = (extra_lease_offset
214                       + 4
215                       + (lease_number-4)*self.LEASE_SIZE)
216         else:
217             raise IndexError("No such lease number %d" % lease_number)
218         f.seek(offset)
219         assert f.tell() == offset
220         data = f.read(self.LEASE_SIZE)
221         lease_info = LeaseInfo().from_mutable_data(data)
222         if lease_info.owner_num == 0:
223             return None
224         return lease_info
225
226     def _get_num_lease_slots(self, f):
227         # how many places do we have allocated for leases? Not all of them
228         # are filled.
229         num_extra_leases = self._read_num_extra_leases(f)
230         return 4+num_extra_leases
231
232     def _get_first_empty_lease_slot(self, f):
233         # return an int with the index of an empty slot, or None if we do not
234         # currently have an empty slot
235
236         for i in range(self._get_num_lease_slots(f)):
237             if self._read_lease_record(f, i) is None:
238                 return i
239         return None
240
241     def get_leases(self):
242         """Yields a LeaseInfo instance for all leases."""
243         f = open(self.home, 'rb')
244         for i, lease in self._enumerate_leases(f):
245             yield lease
246         f.close()
247
248     def _enumerate_leases(self, f):
249         for i in range(self._get_num_lease_slots(f)):
250             try:
251                 data = self._read_lease_record(f, i)
252                 if data is not None:
253                     yield i,data
254             except IndexError:
255                 return
256
257     def add_lease(self, lease_info):
258         precondition(lease_info.owner_num != 0) # 0 means "no lease here"
259         f = open(self.home, 'rb+')
260         num_lease_slots = self._get_num_lease_slots(f)
261         empty_slot = self._get_first_empty_lease_slot(f)
262         if empty_slot is not None:
263             self._write_lease_record(f, empty_slot, lease_info)
264         else:
265             self._write_lease_record(f, num_lease_slots, lease_info)
266         f.close()
267
268     def renew_lease(self, renew_secret, new_expire_time):
269         accepting_nodeids = set()
270         f = open(self.home, 'rb+')
271         for (leasenum,lease) in self._enumerate_leases(f):
272             if constant_time_compare(lease.renew_secret, renew_secret):
273                 # yup. See if we need to update the owner time.
274                 if new_expire_time > lease.expiration_time:
275                     # yes
276                     lease.expiration_time = new_expire_time
277                     self._write_lease_record(f, leasenum, lease)
278                 f.close()
279                 return
280             accepting_nodeids.add(lease.nodeid)
281         f.close()
282         # Return the accepting_nodeids set, to give the client a chance to
283         # update the leases on a share which has been migrated from its
284         # original server to a new one.
285         msg = ("Unable to renew non-existent lease. I have leases accepted by"
286                " nodeids: ")
287         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
288                          for anid in accepting_nodeids])
289         msg += " ."
290         raise IndexError(msg)
291
292     def add_or_renew_lease(self, lease_info):
293         precondition(lease_info.owner_num != 0) # 0 means "no lease here"
294         try:
295             self.renew_lease(lease_info.renew_secret,
296                              lease_info.expiration_time)
297         except IndexError:
298             self.add_lease(lease_info)
299
300     def cancel_lease(self, cancel_secret):
301         """Remove any leases with the given cancel_secret. If the last lease
302         is cancelled, the file will be removed. Return the number of bytes
303         that were freed (by truncating the list of leases, and possibly by
304         deleting the file. Raise IndexError if there was no lease with the
305         given cancel_secret."""
306
307         accepting_nodeids = set()
308         modified = 0
309         remaining = 0
310         blank_lease = LeaseInfo(owner_num=0,
311                                 renew_secret="\x00"*32,
312                                 cancel_secret="\x00"*32,
313                                 expiration_time=0,
314                                 nodeid="\x00"*20)
315         f = open(self.home, 'rb+')
316         for (leasenum,lease) in self._enumerate_leases(f):
317             accepting_nodeids.add(lease.nodeid)
318             if constant_time_compare(lease.cancel_secret, cancel_secret):
319                 self._write_lease_record(f, leasenum, blank_lease)
320                 modified += 1
321             else:
322                 remaining += 1
323         if modified:
324             freed_space = self._pack_leases(f)
325             f.close()
326             if not remaining:
327                 freed_space += os.stat(self.home)[stat.ST_SIZE]
328                 self.unlink()
329             return freed_space
330
331         msg = ("Unable to cancel non-existent lease. I have leases "
332                "accepted by nodeids: ")
333         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
334                          for anid in accepting_nodeids])
335         msg += " ."
336         raise IndexError(msg)
337
338     def _pack_leases(self, f):
339         # TODO: reclaim space from cancelled leases
340         return 0
341
342     def _read_write_enabler_and_nodeid(self, f):
343         f.seek(0)
344         data = f.read(self.HEADER_SIZE)
345         (magic,
346          write_enabler_nodeid, write_enabler,
347          data_length, extra_least_offset) = \
348          struct.unpack(">32s20s32sQQ", data)
349         assert magic == self.MAGIC
350         return (write_enabler, write_enabler_nodeid)
351
352     def readv(self, readv):
353         datav = []
354         f = open(self.home, 'rb')
355         for (offset, length) in readv:
356             datav.append(self._read_share_data(f, offset, length))
357         f.close()
358         return datav
359
360 #    def remote_get_length(self):
361 #        f = open(self.home, 'rb')
362 #        data_length = self._read_data_length(f)
363 #        f.close()
364 #        return data_length
365
366     def check_write_enabler(self, write_enabler, si_s):
367         f = open(self.home, 'rb+')
368         (real_write_enabler, write_enabler_nodeid) = \
369                              self._read_write_enabler_and_nodeid(f)
370         f.close()
371         # avoid a timing attack
372         #if write_enabler != real_write_enabler:
373         if not constant_time_compare(write_enabler, real_write_enabler):
374             # accomodate share migration by reporting the nodeid used for the
375             # old write enabler.
376             self.log(format="bad write enabler on SI %(si)s,"
377                      " recorded by nodeid %(nodeid)s",
378                      facility="tahoe.storage",
379                      level=log.WEIRD, umid="cE1eBQ",
380                      si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
381             msg = "The write enabler was recorded by nodeid '%s'." % \
382                   (idlib.nodeid_b2a(write_enabler_nodeid),)
383             raise BadWriteEnablerError(msg)
384
385     def check_testv(self, testv):
386         test_good = True
387         f = open(self.home, 'rb+')
388         for (offset, length, operator, specimen) in testv:
389             data = self._read_share_data(f, offset, length)
390             if not testv_compare(data, operator, specimen):
391                 test_good = False
392                 break
393         f.close()
394         return test_good
395
396     def writev(self, datav, new_length):
397         f = open(self.home, 'rb+')
398         for (offset, data) in datav:
399             self._write_share_data(f, offset, data)
400         if new_length is not None:
401             self._change_container_size(f, new_length)
402             f.seek(self.DATA_LENGTH_OFFSET)
403             f.write(struct.pack(">Q", new_length))
404         f.close()
405
406 def testv_compare(a, op, b):
407     assert op in ("lt", "le", "eq", "ne", "ge", "gt")
408     if op == "lt":
409         return a < b
410     if op == "le":
411         return a <= b
412     if op == "eq":
413         return a == b
414     if op == "ne":
415         return a != b
416     if op == "ge":
417         return a >= b
418     if op == "gt":
419         return a > b
420     # never reached
421
422 class EmptyShare:
423
424     def check_testv(self, testv):
425         test_good = True
426         for (offset, length, operator, specimen) in testv:
427             data = ""
428             if not testv_compare(data, operator, specimen):
429                 test_good = False
430                 break
431         return test_good
432
433 def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
434     ms = MutableShareFile(filename, parent)
435     ms.create(my_nodeid, write_enabler)
436     del ms
437     return MutableShareFile(filename, parent)
438