]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/mutable.py
storage: add a lease-checker-and-expirer crawler, plus web status page.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / mutable.py
1 import os, stat, struct
2
3 from allmydata.interfaces import BadWriteEnablerError
4 from allmydata.util import idlib, log
5 from allmydata.util.assertutil import precondition
6 from allmydata.storage.lease import LeaseInfo
7 from allmydata.storage.common import DataTooLargeError
8
9 # the MutableShareFile is like the ShareFile, but used for mutable data. It
10 # has a different layout. See docs/mutable.txt for more details.
11
12 # #   offset    size    name
13 # 1   0         32      magic verstr "tahoe mutable container v1" plus binary
14 # 2   32        20      write enabler's nodeid
15 # 3   52        32      write enabler
16 # 4   84        8       data size (actual share data present) (a)
17 # 5   92        8       offset of (8) count of extra leases (after data)
18 # 6   100       368     four leases, 92 bytes each
19 #                        0    4   ownerid (0 means "no lease here")
20 #                        4    4   expiration timestamp
21 #                        8   32   renewal token
22 #                        40  32   cancel token
23 #                        72  20   nodeid which accepted the tokens
24 # 7   468       (a)     data
25 # 8   ??        4       count of extra leases
26 # 9   ??        n*92    extra leases
27
28
29 assert struct.calcsize("L"), 4 # The struct module doc says that L's are 4 bytes in size.
30 assert struct.calcsize("Q"), 8 # The struct module doc says that Q's are 8 bytes in size (at least with big-endian ordering).
31
32 class MutableShareFile:
33
34     DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
35     EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
36     HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
37     LEASE_SIZE = struct.calcsize(">LL32s32s20s")
38     assert LEASE_SIZE == 92
39     DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
40     assert DATA_OFFSET == 468, DATA_OFFSET
41     # our sharefiles share with a recognizable string, plus some random
42     # binary data to reduce the chance that a regular text file will look
43     # like a sharefile.
44     MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
45     assert len(MAGIC) == 32
46     MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
47     # TODO: decide upon a policy for max share size
48
49     def __init__(self, filename, parent=None):
50         self.home = filename
51         if os.path.exists(self.home):
52             # we don't cache anything, just check the magic
53             f = open(self.home, 'rb')
54             data = f.read(self.HEADER_SIZE)
55             (magic,
56              write_enabler_nodeid, write_enabler,
57              data_length, extra_least_offset) = \
58              struct.unpack(">32s20s32sQQ", data)
59             assert magic == self.MAGIC
60         self.parent = parent # for logging
61
62     def log(self, *args, **kwargs):
63         return self.parent.log(*args, **kwargs)
64
65     def create(self, my_nodeid, write_enabler):
66         assert not os.path.exists(self.home)
67         data_length = 0
68         extra_lease_offset = (self.HEADER_SIZE
69                               + 4 * self.LEASE_SIZE
70                               + data_length)
71         assert extra_lease_offset == self.DATA_OFFSET # true at creation
72         num_extra_leases = 0
73         f = open(self.home, 'wb')
74         header = struct.pack(">32s20s32sQQ",
75                              self.MAGIC, my_nodeid, write_enabler,
76                              data_length, extra_lease_offset,
77                              )
78         leases = ("\x00"*self.LEASE_SIZE) * 4
79         f.write(header + leases)
80         # data goes here, empty after creation
81         f.write(struct.pack(">L", num_extra_leases))
82         # extra leases go here, none at creation
83         f.close()
84
85     def unlink(self):
86         os.unlink(self.home)
87
88     def _read_data_length(self, f):
89         f.seek(self.DATA_LENGTH_OFFSET)
90         (data_length,) = struct.unpack(">Q", f.read(8))
91         return data_length
92
93     def _write_data_length(self, f, data_length):
94         f.seek(self.DATA_LENGTH_OFFSET)
95         f.write(struct.pack(">Q", data_length))
96
97     def _read_share_data(self, f, offset, length):
98         precondition(offset >= 0)
99         data_length = self._read_data_length(f)
100         if offset+length > data_length:
101             # reads beyond the end of the data are truncated. Reads that
102             # start beyond the end of the data return an empty string.
103             length = max(0, data_length-offset)
104         if length == 0:
105             return ""
106         precondition(offset+length <= data_length)
107         f.seek(self.DATA_OFFSET+offset)
108         data = f.read(length)
109         return data
110
111     def _read_extra_lease_offset(self, f):
112         f.seek(self.EXTRA_LEASE_OFFSET)
113         (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
114         return extra_lease_offset
115
116     def _write_extra_lease_offset(self, f, offset):
117         f.seek(self.EXTRA_LEASE_OFFSET)
118         f.write(struct.pack(">Q", offset))
119
120     def _read_num_extra_leases(self, f):
121         offset = self._read_extra_lease_offset(f)
122         f.seek(offset)
123         (num_extra_leases,) = struct.unpack(">L", f.read(4))
124         return num_extra_leases
125
126     def _write_num_extra_leases(self, f, num_leases):
127         extra_lease_offset = self._read_extra_lease_offset(f)
128         f.seek(extra_lease_offset)
129         f.write(struct.pack(">L", num_leases))
130
131     def _change_container_size(self, f, new_container_size):
132         if new_container_size > self.MAX_SIZE:
133             raise DataTooLargeError()
134         old_extra_lease_offset = self._read_extra_lease_offset(f)
135         new_extra_lease_offset = self.DATA_OFFSET + new_container_size
136         if new_extra_lease_offset < old_extra_lease_offset:
137             # TODO: allow containers to shrink. For now they remain large.
138             return
139         num_extra_leases = self._read_num_extra_leases(f)
140         f.seek(old_extra_lease_offset)
141         extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
142         f.seek(new_extra_lease_offset)
143         f.write(extra_lease_data)
144         # an interrupt here will corrupt the leases, iff the move caused the
145         # extra leases to overlap.
146         self._write_extra_lease_offset(f, new_extra_lease_offset)
147
148     def _write_share_data(self, f, offset, data):
149         length = len(data)
150         precondition(offset >= 0)
151         data_length = self._read_data_length(f)
152         extra_lease_offset = self._read_extra_lease_offset(f)
153
154         if offset+length >= data_length:
155             # They are expanding their data size.
156             if self.DATA_OFFSET+offset+length > extra_lease_offset:
157                 # Their new data won't fit in the current container, so we
158                 # have to move the leases. With luck, they're expanding it
159                 # more than the size of the extra lease block, which will
160                 # minimize the corrupt-the-share window
161                 self._change_container_size(f, offset+length)
162                 extra_lease_offset = self._read_extra_lease_offset(f)
163
164                 # an interrupt here is ok.. the container has been enlarged
165                 # but the data remains untouched
166
167             assert self.DATA_OFFSET+offset+length <= extra_lease_offset
168             # Their data now fits in the current container. We must write
169             # their new data and modify the recorded data size.
170             new_data_length = offset+length
171             self._write_data_length(f, new_data_length)
172             # an interrupt here will result in a corrupted share
173
174         # now all that's left to do is write out their data
175         f.seek(self.DATA_OFFSET+offset)
176         f.write(data)
177         return
178
179     def _write_lease_record(self, f, lease_number, lease_info):
180         extra_lease_offset = self._read_extra_lease_offset(f)
181         num_extra_leases = self._read_num_extra_leases(f)
182         if lease_number < 4:
183             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
184         elif (lease_number-4) < num_extra_leases:
185             offset = (extra_lease_offset
186                       + 4
187                       + (lease_number-4)*self.LEASE_SIZE)
188         else:
189             # must add an extra lease record
190             self._write_num_extra_leases(f, num_extra_leases+1)
191             offset = (extra_lease_offset
192                       + 4
193                       + (lease_number-4)*self.LEASE_SIZE)
194         f.seek(offset)
195         assert f.tell() == offset
196         f.write(lease_info.to_mutable_data())
197
198     def _read_lease_record(self, f, lease_number):
199         # returns a LeaseInfo instance, or None
200         extra_lease_offset = self._read_extra_lease_offset(f)
201         num_extra_leases = self._read_num_extra_leases(f)
202         if lease_number < 4:
203             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
204         elif (lease_number-4) < num_extra_leases:
205             offset = (extra_lease_offset
206                       + 4
207                       + (lease_number-4)*self.LEASE_SIZE)
208         else:
209             raise IndexError("No such lease number %d" % lease_number)
210         f.seek(offset)
211         assert f.tell() == offset
212         data = f.read(self.LEASE_SIZE)
213         lease_info = LeaseInfo().from_mutable_data(data)
214         if lease_info.owner_num == 0:
215             return None
216         return lease_info
217
218     def _get_num_lease_slots(self, f):
219         # how many places do we have allocated for leases? Not all of them
220         # are filled.
221         num_extra_leases = self._read_num_extra_leases(f)
222         return 4+num_extra_leases
223
224     def _get_first_empty_lease_slot(self, f):
225         # return an int with the index of an empty slot, or None if we do not
226         # currently have an empty slot
227
228         for i in range(self._get_num_lease_slots(f)):
229             if self._read_lease_record(f, i) is None:
230                 return i
231         return None
232
233     def get_leases(self):
234         """Yields a LeaseInfo instance for all leases."""
235         f = open(self.home, 'rb')
236         for i, lease in self._enumerate_leases(f):
237             yield lease
238         f.close()
239
240     def _enumerate_leases(self, f):
241         for i in range(self._get_num_lease_slots(f)):
242             try:
243                 data = self._read_lease_record(f, i)
244                 if data is not None:
245                     yield i,data
246             except IndexError:
247                 return
248
249     def add_lease(self, lease_info):
250         precondition(lease_info.owner_num != 0) # 0 means "no lease here"
251         f = open(self.home, 'rb+')
252         num_lease_slots = self._get_num_lease_slots(f)
253         empty_slot = self._get_first_empty_lease_slot(f)
254         if empty_slot is not None:
255             self._write_lease_record(f, empty_slot, lease_info)
256         else:
257             self._write_lease_record(f, num_lease_slots, lease_info)
258         f.close()
259
260     def renew_lease(self, renew_secret, new_expire_time):
261         accepting_nodeids = set()
262         f = open(self.home, 'rb+')
263         for (leasenum,lease) in self._enumerate_leases(f):
264             if lease.renew_secret == renew_secret:
265                 # yup. See if we need to update the owner time.
266                 if new_expire_time > lease.expiration_time:
267                     # yes
268                     lease.expiration_time = new_expire_time
269                     self._write_lease_record(f, leasenum, lease)
270                 f.close()
271                 return
272             accepting_nodeids.add(lease.nodeid)
273         f.close()
274         # Return the accepting_nodeids set, to give the client a chance to
275         # update the leases on a share which has been migrated from its
276         # original server to a new one.
277         msg = ("Unable to renew non-existent lease. I have leases accepted by"
278                " nodeids: ")
279         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
280                          for anid in accepting_nodeids])
281         msg += " ."
282         raise IndexError(msg)
283
284     def add_or_renew_lease(self, lease_info):
285         precondition(lease_info.owner_num != 0) # 0 means "no lease here"
286         try:
287             self.renew_lease(lease_info.renew_secret,
288                              lease_info.expiration_time)
289         except IndexError:
290             self.add_lease(lease_info)
291
292     def cancel_lease(self, cancel_secret):
293         """Remove any leases with the given cancel_secret. If the last lease
294         is cancelled, the file will be removed. Return the number of bytes
295         that were freed (by truncating the list of leases, and possibly by
296         deleting the file. Raise IndexError if there was no lease with the
297         given cancel_secret."""
298
299         accepting_nodeids = set()
300         modified = 0
301         remaining = 0
302         blank_lease = LeaseInfo(owner_num=0,
303                                 renew_secret="\x00"*32,
304                                 cancel_secret="\x00"*32,
305                                 expiration_time=0,
306                                 nodeid="\x00"*20)
307         f = open(self.home, 'rb+')
308         for (leasenum,lease) in self._enumerate_leases(f):
309             accepting_nodeids.add(lease.nodeid)
310             if lease.cancel_secret == cancel_secret:
311                 self._write_lease_record(f, leasenum, blank_lease)
312                 modified += 1
313             else:
314                 remaining += 1
315         if modified:
316             freed_space = self._pack_leases(f)
317             f.close()
318             if not remaining:
319                 freed_space += os.stat(self.home)[stat.ST_SIZE]
320                 self.unlink()
321             return freed_space
322
323         msg = ("Unable to cancel non-existent lease. I have leases "
324                "accepted by nodeids: ")
325         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
326                          for anid in accepting_nodeids])
327         msg += " ."
328         raise IndexError(msg)
329
330     def _pack_leases(self, f):
331         # TODO: reclaim space from cancelled leases
332         return 0
333
334     def _read_write_enabler_and_nodeid(self, f):
335         f.seek(0)
336         data = f.read(self.HEADER_SIZE)
337         (magic,
338          write_enabler_nodeid, write_enabler,
339          data_length, extra_least_offset) = \
340          struct.unpack(">32s20s32sQQ", data)
341         assert magic == self.MAGIC
342         return (write_enabler, write_enabler_nodeid)
343
344     def readv(self, readv):
345         datav = []
346         f = open(self.home, 'rb')
347         for (offset, length) in readv:
348             datav.append(self._read_share_data(f, offset, length))
349         f.close()
350         return datav
351
352 #    def remote_get_length(self):
353 #        f = open(self.home, 'rb')
354 #        data_length = self._read_data_length(f)
355 #        f.close()
356 #        return data_length
357
358     def check_write_enabler(self, write_enabler, si_s):
359         f = open(self.home, 'rb+')
360         (real_write_enabler, write_enabler_nodeid) = \
361                              self._read_write_enabler_and_nodeid(f)
362         f.close()
363         if write_enabler != real_write_enabler:
364             # accomodate share migration by reporting the nodeid used for the
365             # old write enabler.
366             self.log(format="bad write enabler on SI %(si)s,"
367                      " recorded by nodeid %(nodeid)s",
368                      facility="tahoe.storage",
369                      level=log.WEIRD, umid="cE1eBQ",
370                      si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
371             msg = "The write enabler was recorded by nodeid '%s'." % \
372                   (idlib.nodeid_b2a(write_enabler_nodeid),)
373             raise BadWriteEnablerError(msg)
374
375     def check_testv(self, testv):
376         test_good = True
377         f = open(self.home, 'rb+')
378         for (offset, length, operator, specimen) in testv:
379             data = self._read_share_data(f, offset, length)
380             if not testv_compare(data, operator, specimen):
381                 test_good = False
382                 break
383         f.close()
384         return test_good
385
386     def writev(self, datav, new_length):
387         f = open(self.home, 'rb+')
388         for (offset, data) in datav:
389             self._write_share_data(f, offset, data)
390         if new_length is not None:
391             self._change_container_size(f, new_length)
392             f.seek(self.DATA_LENGTH_OFFSET)
393             f.write(struct.pack(">Q", new_length))
394         f.close()
395
396 def testv_compare(a, op, b):
397     assert op in ("lt", "le", "eq", "ne", "ge", "gt")
398     if op == "lt":
399         return a < b
400     if op == "le":
401         return a <= b
402     if op == "eq":
403         return a == b
404     if op == "ne":
405         return a != b
406     if op == "ge":
407         return a >= b
408     if op == "gt":
409         return a > b
410     # never reached
411
412 class EmptyShare:
413
414     def check_testv(self, testv):
415         test_good = True
416         for (offset, length, operator, specimen) in testv:
417             data = ""
418             if not testv_compare(data, operator, specimen):
419                 test_good = False
420                 break
421         return test_good
422
423 def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
424     ms = MutableShareFile(filename, parent)
425     ms.create(my_nodeid, write_enabler)
426     del ms
427     return MutableShareFile(filename, parent)
428