]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/mutable.py
Change the maximum mutable share size to 69105 TB, and add a maximum-mutable-share...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / mutable.py
1 import os, stat, struct
2
3 from allmydata.interfaces import BadWriteEnablerError
4 from allmydata.util import idlib, log
5 from allmydata.util.assertutil import precondition
6 from allmydata.util.hashutil import constant_time_compare
7 from allmydata.storage.lease import LeaseInfo
8 from allmydata.storage.common import UnknownMutableContainerVersionError, \
9      DataTooLargeError
10 from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
11
12
13 # the MutableShareFile is like the ShareFile, but used for mutable data. It
14 # has a different layout. See docs/mutable.txt for more details.
15
16 # #   offset    size    name
17 # 1   0         32      magic verstr "tahoe mutable container v1" plus binary
18 # 2   32        20      write enabler's nodeid
19 # 3   52        32      write enabler
20 # 4   84        8       data size (actual share data present) (a)
21 # 5   92        8       offset of (8) count of extra leases (after data)
22 # 6   100       368     four leases, 92 bytes each
23 #                        0    4   ownerid (0 means "no lease here")
24 #                        4    4   expiration timestamp
25 #                        8   32   renewal token
26 #                        40  32   cancel token
27 #                        72  20   nodeid which accepted the tokens
28 # 7   468       (a)     data
29 # 8   ??        4       count of extra leases
30 # 9   ??        n*92    extra leases
31
32
33 # The struct module doc says that L's are 4 bytes in size., and that Q's are
34 # 8 bytes in size. Since compatibility depends upon this, double-check it.
35 assert struct.calcsize(">L") == 4, struct.calcsize(">L")
36 assert struct.calcsize(">Q") == 8, struct.calcsize(">Q")
37
38 class MutableShareFile:
39
40     sharetype = "mutable"
41     DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
42     EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
43     HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
44     LEASE_SIZE = struct.calcsize(">LL32s32s20s")
45     assert LEASE_SIZE == 92
46     DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
47     assert DATA_OFFSET == 468, DATA_OFFSET
48     # our sharefiles share with a recognizable string, plus some random
49     # binary data to reduce the chance that a regular text file will look
50     # like a sharefile.
51     MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
52     assert len(MAGIC) == 32
53     MAX_SIZE = MAX_MUTABLE_SHARE_SIZE
54     # TODO: decide upon a policy for max share size
55
56     def __init__(self, filename, parent=None):
57         self.home = filename
58         if os.path.exists(self.home):
59             # we don't cache anything, just check the magic
60             f = open(self.home, 'rb')
61             data = f.read(self.HEADER_SIZE)
62             (magic,
63              write_enabler_nodeid, write_enabler,
64              data_length, extra_least_offset) = \
65              struct.unpack(">32s20s32sQQ", data)
66             if magic != self.MAGIC:
67                 msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
68                       (filename, magic, self.MAGIC)
69                 raise UnknownMutableContainerVersionError(msg)
70         self.parent = parent # for logging
71
72     def log(self, *args, **kwargs):
73         return self.parent.log(*args, **kwargs)
74
75     def create(self, my_nodeid, write_enabler):
76         assert not os.path.exists(self.home)
77         data_length = 0
78         extra_lease_offset = (self.HEADER_SIZE
79                               + 4 * self.LEASE_SIZE
80                               + data_length)
81         assert extra_lease_offset == self.DATA_OFFSET # true at creation
82         num_extra_leases = 0
83         f = open(self.home, 'wb')
84         header = struct.pack(">32s20s32sQQ",
85                              self.MAGIC, my_nodeid, write_enabler,
86                              data_length, extra_lease_offset,
87                              )
88         leases = ("\x00"*self.LEASE_SIZE) * 4
89         f.write(header + leases)
90         # data goes here, empty after creation
91         f.write(struct.pack(">L", num_extra_leases))
92         # extra leases go here, none at creation
93         f.close()
94
95     def unlink(self):
96         os.unlink(self.home)
97
98     def _read_data_length(self, f):
99         f.seek(self.DATA_LENGTH_OFFSET)
100         (data_length,) = struct.unpack(">Q", f.read(8))
101         return data_length
102
103     def _write_data_length(self, f, data_length):
104         f.seek(self.DATA_LENGTH_OFFSET)
105         f.write(struct.pack(">Q", data_length))
106
107     def _read_share_data(self, f, offset, length):
108         precondition(offset >= 0)
109         data_length = self._read_data_length(f)
110         if offset+length > data_length:
111             # reads beyond the end of the data are truncated. Reads that
112             # start beyond the end of the data return an empty string.
113             length = max(0, data_length-offset)
114         if length == 0:
115             return ""
116         precondition(offset+length <= data_length)
117         f.seek(self.DATA_OFFSET+offset)
118         data = f.read(length)
119         return data
120
121     def _read_extra_lease_offset(self, f):
122         f.seek(self.EXTRA_LEASE_OFFSET)
123         (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
124         return extra_lease_offset
125
126     def _write_extra_lease_offset(self, f, offset):
127         f.seek(self.EXTRA_LEASE_OFFSET)
128         f.write(struct.pack(">Q", offset))
129
130     def _read_num_extra_leases(self, f):
131         offset = self._read_extra_lease_offset(f)
132         f.seek(offset)
133         (num_extra_leases,) = struct.unpack(">L", f.read(4))
134         return num_extra_leases
135
136     def _write_num_extra_leases(self, f, num_leases):
137         extra_lease_offset = self._read_extra_lease_offset(f)
138         f.seek(extra_lease_offset)
139         f.write(struct.pack(">L", num_leases))
140
141     def _change_container_size(self, f, new_container_size):
142         if new_container_size > self.MAX_SIZE:
143             raise DataTooLargeError()
144         old_extra_lease_offset = self._read_extra_lease_offset(f)
145         new_extra_lease_offset = self.DATA_OFFSET + new_container_size
146         if new_extra_lease_offset < old_extra_lease_offset:
147             # TODO: allow containers to shrink. For now they remain large.
148             return
149         num_extra_leases = self._read_num_extra_leases(f)
150         f.seek(old_extra_lease_offset)
151         leases_size = 4 + num_extra_leases * self.LEASE_SIZE
152         extra_lease_data = f.read(leases_size)
153
154         # Zero out the old lease info (in order to minimize the chance that
155         # it could accidentally be exposed to a reader later, re #1528).
156         f.seek(old_extra_lease_offset)
157         f.write('\x00' * leases_size)
158         f.flush()
159
160         # An interrupt here will corrupt the leases.
161
162         f.seek(new_extra_lease_offset)
163         f.write(extra_lease_data)
164         self._write_extra_lease_offset(f, new_extra_lease_offset)
165
166     def _write_share_data(self, f, offset, data):
167         length = len(data)
168         precondition(offset >= 0)
169         data_length = self._read_data_length(f)
170         extra_lease_offset = self._read_extra_lease_offset(f)
171
172         if offset+length >= data_length:
173             # They are expanding their data size.
174
175             if self.DATA_OFFSET+offset+length > extra_lease_offset:
176                 # TODO: allow containers to shrink. For now, they remain
177                 # large.
178
179                 # Their new data won't fit in the current container, so we
180                 # have to move the leases. With luck, they're expanding it
181                 # more than the size of the extra lease block, which will
182                 # minimize the corrupt-the-share window
183                 self._change_container_size(f, offset+length)
184                 extra_lease_offset = self._read_extra_lease_offset(f)
185
186                 # an interrupt here is ok.. the container has been enlarged
187                 # but the data remains untouched
188
189             assert self.DATA_OFFSET+offset+length <= extra_lease_offset
190             # Their data now fits in the current container. We must write
191             # their new data and modify the recorded data size.
192
193             # Fill any newly exposed empty space with 0's.
194             if offset > data_length:
195                 f.seek(self.DATA_OFFSET+data_length)
196                 f.write('\x00'*(offset - data_length))
197                 f.flush()
198
199             new_data_length = offset+length
200             self._write_data_length(f, new_data_length)
201             # an interrupt here will result in a corrupted share
202
203         # now all that's left to do is write out their data
204         f.seek(self.DATA_OFFSET+offset)
205         f.write(data)
206         return
207
208     def _write_lease_record(self, f, lease_number, lease_info):
209         extra_lease_offset = self._read_extra_lease_offset(f)
210         num_extra_leases = self._read_num_extra_leases(f)
211         if lease_number < 4:
212             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
213         elif (lease_number-4) < num_extra_leases:
214             offset = (extra_lease_offset
215                       + 4
216                       + (lease_number-4)*self.LEASE_SIZE)
217         else:
218             # must add an extra lease record
219             self._write_num_extra_leases(f, num_extra_leases+1)
220             offset = (extra_lease_offset
221                       + 4
222                       + (lease_number-4)*self.LEASE_SIZE)
223         f.seek(offset)
224         assert f.tell() == offset
225         f.write(lease_info.to_mutable_data())
226
227     def _read_lease_record(self, f, lease_number):
228         # returns a LeaseInfo instance, or None
229         extra_lease_offset = self._read_extra_lease_offset(f)
230         num_extra_leases = self._read_num_extra_leases(f)
231         if lease_number < 4:
232             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
233         elif (lease_number-4) < num_extra_leases:
234             offset = (extra_lease_offset
235                       + 4
236                       + (lease_number-4)*self.LEASE_SIZE)
237         else:
238             raise IndexError("No such lease number %d" % lease_number)
239         f.seek(offset)
240         assert f.tell() == offset
241         data = f.read(self.LEASE_SIZE)
242         lease_info = LeaseInfo().from_mutable_data(data)
243         if lease_info.owner_num == 0:
244             return None
245         return lease_info
246
247     def _get_num_lease_slots(self, f):
248         # how many places do we have allocated for leases? Not all of them
249         # are filled.
250         num_extra_leases = self._read_num_extra_leases(f)
251         return 4+num_extra_leases
252
253     def _get_first_empty_lease_slot(self, f):
254         # return an int with the index of an empty slot, or None if we do not
255         # currently have an empty slot
256
257         for i in range(self._get_num_lease_slots(f)):
258             if self._read_lease_record(f, i) is None:
259                 return i
260         return None
261
262     def get_leases(self):
263         """Yields a LeaseInfo instance for all leases."""
264         f = open(self.home, 'rb')
265         for i, lease in self._enumerate_leases(f):
266             yield lease
267         f.close()
268
269     def _enumerate_leases(self, f):
270         for i in range(self._get_num_lease_slots(f)):
271             try:
272                 data = self._read_lease_record(f, i)
273                 if data is not None:
274                     yield i,data
275             except IndexError:
276                 return
277
278     def add_lease(self, lease_info):
279         precondition(lease_info.owner_num != 0) # 0 means "no lease here"
280         f = open(self.home, 'rb+')
281         num_lease_slots = self._get_num_lease_slots(f)
282         empty_slot = self._get_first_empty_lease_slot(f)
283         if empty_slot is not None:
284             self._write_lease_record(f, empty_slot, lease_info)
285         else:
286             self._write_lease_record(f, num_lease_slots, lease_info)
287         f.close()
288
289     def renew_lease(self, renew_secret, new_expire_time):
290         accepting_nodeids = set()
291         f = open(self.home, 'rb+')
292         for (leasenum,lease) in self._enumerate_leases(f):
293             if constant_time_compare(lease.renew_secret, renew_secret):
294                 # yup. See if we need to update the owner time.
295                 if new_expire_time > lease.expiration_time:
296                     # yes
297                     lease.expiration_time = new_expire_time
298                     self._write_lease_record(f, leasenum, lease)
299                 f.close()
300                 return
301             accepting_nodeids.add(lease.nodeid)
302         f.close()
303         # Return the accepting_nodeids set, to give the client a chance to
304         # update the leases on a share which has been migrated from its
305         # original server to a new one.
306         msg = ("Unable to renew non-existent lease. I have leases accepted by"
307                " nodeids: ")
308         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
309                          for anid in accepting_nodeids])
310         msg += " ."
311         raise IndexError(msg)
312
313     def add_or_renew_lease(self, lease_info):
314         precondition(lease_info.owner_num != 0) # 0 means "no lease here"
315         try:
316             self.renew_lease(lease_info.renew_secret,
317                              lease_info.expiration_time)
318         except IndexError:
319             self.add_lease(lease_info)
320
321     def cancel_lease(self, cancel_secret):
322         """Remove any leases with the given cancel_secret. If the last lease
323         is cancelled, the file will be removed. Return the number of bytes
324         that were freed (by truncating the list of leases, and possibly by
325         deleting the file. Raise IndexError if there was no lease with the
326         given cancel_secret."""
327
328         accepting_nodeids = set()
329         modified = 0
330         remaining = 0
331         blank_lease = LeaseInfo(owner_num=0,
332                                 renew_secret="\x00"*32,
333                                 cancel_secret="\x00"*32,
334                                 expiration_time=0,
335                                 nodeid="\x00"*20)
336         f = open(self.home, 'rb+')
337         for (leasenum,lease) in self._enumerate_leases(f):
338             accepting_nodeids.add(lease.nodeid)
339             if constant_time_compare(lease.cancel_secret, cancel_secret):
340                 self._write_lease_record(f, leasenum, blank_lease)
341                 modified += 1
342             else:
343                 remaining += 1
344         if modified:
345             freed_space = self._pack_leases(f)
346             f.close()
347             if not remaining:
348                 freed_space += os.stat(self.home)[stat.ST_SIZE]
349                 self.unlink()
350             return freed_space
351
352         msg = ("Unable to cancel non-existent lease. I have leases "
353                "accepted by nodeids: ")
354         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
355                          for anid in accepting_nodeids])
356         msg += " ."
357         raise IndexError(msg)
358
359     def _pack_leases(self, f):
360         # TODO: reclaim space from cancelled leases
361         return 0
362
363     def _read_write_enabler_and_nodeid(self, f):
364         f.seek(0)
365         data = f.read(self.HEADER_SIZE)
366         (magic,
367          write_enabler_nodeid, write_enabler,
368          data_length, extra_least_offset) = \
369          struct.unpack(">32s20s32sQQ", data)
370         assert magic == self.MAGIC
371         return (write_enabler, write_enabler_nodeid)
372
373     def readv(self, readv):
374         datav = []
375         f = open(self.home, 'rb')
376         for (offset, length) in readv:
377             datav.append(self._read_share_data(f, offset, length))
378         f.close()
379         return datav
380
381 #    def remote_get_length(self):
382 #        f = open(self.home, 'rb')
383 #        data_length = self._read_data_length(f)
384 #        f.close()
385 #        return data_length
386
387     def check_write_enabler(self, write_enabler, si_s):
388         f = open(self.home, 'rb+')
389         (real_write_enabler, write_enabler_nodeid) = \
390                              self._read_write_enabler_and_nodeid(f)
391         f.close()
392         # avoid a timing attack
393         #if write_enabler != real_write_enabler:
394         if not constant_time_compare(write_enabler, real_write_enabler):
395             # accomodate share migration by reporting the nodeid used for the
396             # old write enabler.
397             self.log(format="bad write enabler on SI %(si)s,"
398                      " recorded by nodeid %(nodeid)s",
399                      facility="tahoe.storage",
400                      level=log.WEIRD, umid="cE1eBQ",
401                      si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
402             msg = "The write enabler was recorded by nodeid '%s'." % \
403                   (idlib.nodeid_b2a(write_enabler_nodeid),)
404             raise BadWriteEnablerError(msg)
405
406     def check_testv(self, testv):
407         test_good = True
408         f = open(self.home, 'rb+')
409         for (offset, length, operator, specimen) in testv:
410             data = self._read_share_data(f, offset, length)
411             if not testv_compare(data, operator, specimen):
412                 test_good = False
413                 break
414         f.close()
415         return test_good
416
417     def writev(self, datav, new_length):
418         f = open(self.home, 'rb+')
419         for (offset, data) in datav:
420             self._write_share_data(f, offset, data)
421         if new_length is not None:
422             cur_length = self._read_data_length(f)
423             if new_length < cur_length:
424                 self._write_data_length(f, new_length)
425                 # TODO: if we're going to shrink the share file when the
426                 # share data has shrunk, then call
427                 # self._change_container_size() here.
428         f.close()
429
430 def testv_compare(a, op, b):
431     assert op in ("lt", "le", "eq", "ne", "ge", "gt")
432     if op == "lt":
433         return a < b
434     if op == "le":
435         return a <= b
436     if op == "eq":
437         return a == b
438     if op == "ne":
439         return a != b
440     if op == "ge":
441         return a >= b
442     if op == "gt":
443         return a > b
444     # never reached
445
446 class EmptyShare:
447
448     def check_testv(self, testv):
449         test_good = True
450         for (offset, length, operator, specimen) in testv:
451             data = ""
452             if not testv_compare(data, operator, specimen):
453                 test_good = False
454                 break
455         return test_good
456
457 def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
458     ms = MutableShareFile(filename, parent)
459     ms.create(my_nodeid, write_enabler)
460     del ms
461     return MutableShareFile(filename, parent)
462