]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/mutable.py
break storage.py into smaller pieces in storage/*.py . No behavioral changes.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / mutable.py
1 import os, stat, struct
2
3 from allmydata.interfaces import BadWriteEnablerError
4 from allmydata.util import idlib, log
5 from allmydata.util.assertutil import precondition
6 from allmydata.storage.lease import LeaseInfo
7 from allmydata.storage.common import DataTooLargeError
8
9 # the MutableShareFile is like the ShareFile, but used for mutable data. It
10 # has a different layout. See docs/mutable.txt for more details.
11
12 # #   offset    size    name
13 # 1   0         32      magic verstr "tahoe mutable container v1" plus binary
14 # 2   32        20      write enabler's nodeid
15 # 3   52        32      write enabler
16 # 4   84        8       data size (actual share data present) (a)
17 # 5   92        8       offset of (8) count of extra leases (after data)
18 # 6   100       368     four leases, 92 bytes each
19 #                        0    4   ownerid (0 means "no lease here")
20 #                        4    4   expiration timestamp
21 #                        8   32   renewal token
22 #                        40  32   cancel token
23 #                        72  20   nodeid which accepted the tokens
24 # 7   468       (a)     data
25 # 8   ??        4       count of extra leases
26 # 9   ??        n*92    extra leases
27
28
29 assert struct.calcsize("L"), 4 # The struct module doc says that L's are 4 bytes in size.
30 assert struct.calcsize("Q"), 8 # The struct module doc says that Q's are 8 bytes in size (at least with big-endian ordering).
31
32 class MutableShareFile:
33
34     DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
35     EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
36     HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
37     LEASE_SIZE = struct.calcsize(">LL32s32s20s")
38     assert LEASE_SIZE == 92
39     DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
40     assert DATA_OFFSET == 468, DATA_OFFSET
41     # our sharefiles share with a recognizable string, plus some random
42     # binary data to reduce the chance that a regular text file will look
43     # like a sharefile.
44     MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
45     assert len(MAGIC) == 32
46     MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
47     # TODO: decide upon a policy for max share size
48
49     def __init__(self, filename, parent=None):
50         self.home = filename
51         if os.path.exists(self.home):
52             # we don't cache anything, just check the magic
53             f = open(self.home, 'rb')
54             data = f.read(self.HEADER_SIZE)
55             (magic,
56              write_enabler_nodeid, write_enabler,
57              data_length, extra_least_offset) = \
58              struct.unpack(">32s20s32sQQ", data)
59             assert magic == self.MAGIC
60         self.parent = parent # for logging
61
62     def log(self, *args, **kwargs):
63         return self.parent.log(*args, **kwargs)
64
65     def create(self, my_nodeid, write_enabler):
66         assert not os.path.exists(self.home)
67         data_length = 0
68         extra_lease_offset = (self.HEADER_SIZE
69                               + 4 * self.LEASE_SIZE
70                               + data_length)
71         assert extra_lease_offset == self.DATA_OFFSET # true at creation
72         num_extra_leases = 0
73         f = open(self.home, 'wb')
74         header = struct.pack(">32s20s32sQQ",
75                              self.MAGIC, my_nodeid, write_enabler,
76                              data_length, extra_lease_offset,
77                              )
78         leases = ("\x00"*self.LEASE_SIZE) * 4
79         f.write(header + leases)
80         # data goes here, empty after creation
81         f.write(struct.pack(">L", num_extra_leases))
82         # extra leases go here, none at creation
83         f.close()
84
85     def unlink(self):
86         os.unlink(self.home)
87
88     def _read_data_length(self, f):
89         f.seek(self.DATA_LENGTH_OFFSET)
90         (data_length,) = struct.unpack(">Q", f.read(8))
91         return data_length
92
93     def _write_data_length(self, f, data_length):
94         f.seek(self.DATA_LENGTH_OFFSET)
95         f.write(struct.pack(">Q", data_length))
96
97     def _read_share_data(self, f, offset, length):
98         precondition(offset >= 0)
99         data_length = self._read_data_length(f)
100         if offset+length > data_length:
101             # reads beyond the end of the data are truncated. Reads that
102             # start beyond the end of the data return an empty string.
103             length = max(0, data_length-offset)
104         if length == 0:
105             return ""
106         precondition(offset+length <= data_length)
107         f.seek(self.DATA_OFFSET+offset)
108         data = f.read(length)
109         return data
110
111     def _read_extra_lease_offset(self, f):
112         f.seek(self.EXTRA_LEASE_OFFSET)
113         (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
114         return extra_lease_offset
115
116     def _write_extra_lease_offset(self, f, offset):
117         f.seek(self.EXTRA_LEASE_OFFSET)
118         f.write(struct.pack(">Q", offset))
119
120     def _read_num_extra_leases(self, f):
121         offset = self._read_extra_lease_offset(f)
122         f.seek(offset)
123         (num_extra_leases,) = struct.unpack(">L", f.read(4))
124         return num_extra_leases
125
126     def _write_num_extra_leases(self, f, num_leases):
127         extra_lease_offset = self._read_extra_lease_offset(f)
128         f.seek(extra_lease_offset)
129         f.write(struct.pack(">L", num_leases))
130
131     def _change_container_size(self, f, new_container_size):
132         if new_container_size > self.MAX_SIZE:
133             raise DataTooLargeError()
134         old_extra_lease_offset = self._read_extra_lease_offset(f)
135         new_extra_lease_offset = self.DATA_OFFSET + new_container_size
136         if new_extra_lease_offset < old_extra_lease_offset:
137             # TODO: allow containers to shrink. For now they remain large.
138             return
139         num_extra_leases = self._read_num_extra_leases(f)
140         f.seek(old_extra_lease_offset)
141         extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
142         f.seek(new_extra_lease_offset)
143         f.write(extra_lease_data)
144         # an interrupt here will corrupt the leases, iff the move caused the
145         # extra leases to overlap.
146         self._write_extra_lease_offset(f, new_extra_lease_offset)
147
148     def _write_share_data(self, f, offset, data):
149         length = len(data)
150         precondition(offset >= 0)
151         data_length = self._read_data_length(f)
152         extra_lease_offset = self._read_extra_lease_offset(f)
153
154         if offset+length >= data_length:
155             # They are expanding their data size.
156             if self.DATA_OFFSET+offset+length > extra_lease_offset:
157                 # Their new data won't fit in the current container, so we
158                 # have to move the leases. With luck, they're expanding it
159                 # more than the size of the extra lease block, which will
160                 # minimize the corrupt-the-share window
161                 self._change_container_size(f, offset+length)
162                 extra_lease_offset = self._read_extra_lease_offset(f)
163
164                 # an interrupt here is ok.. the container has been enlarged
165                 # but the data remains untouched
166
167             assert self.DATA_OFFSET+offset+length <= extra_lease_offset
168             # Their data now fits in the current container. We must write
169             # their new data and modify the recorded data size.
170             new_data_length = offset+length
171             self._write_data_length(f, new_data_length)
172             # an interrupt here will result in a corrupted share
173
174         # now all that's left to do is write out their data
175         f.seek(self.DATA_OFFSET+offset)
176         f.write(data)
177         return
178
179     def _write_lease_record(self, f, lease_number, lease_info):
180         extra_lease_offset = self._read_extra_lease_offset(f)
181         num_extra_leases = self._read_num_extra_leases(f)
182         if lease_number < 4:
183             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
184         elif (lease_number-4) < num_extra_leases:
185             offset = (extra_lease_offset
186                       + 4
187                       + (lease_number-4)*self.LEASE_SIZE)
188         else:
189             # must add an extra lease record
190             self._write_num_extra_leases(f, num_extra_leases+1)
191             offset = (extra_lease_offset
192                       + 4
193                       + (lease_number-4)*self.LEASE_SIZE)
194         f.seek(offset)
195         assert f.tell() == offset
196         f.write(lease_info.to_mutable_data())
197
198     def _read_lease_record(self, f, lease_number):
199         # returns a LeaseInfo instance, or None
200         extra_lease_offset = self._read_extra_lease_offset(f)
201         num_extra_leases = self._read_num_extra_leases(f)
202         if lease_number < 4:
203             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
204         elif (lease_number-4) < num_extra_leases:
205             offset = (extra_lease_offset
206                       + 4
207                       + (lease_number-4)*self.LEASE_SIZE)
208         else:
209             raise IndexError("No such lease number %d" % lease_number)
210         f.seek(offset)
211         assert f.tell() == offset
212         data = f.read(self.LEASE_SIZE)
213         lease_info = LeaseInfo().from_mutable_data(data)
214         if lease_info.owner_num == 0:
215             return None
216         return lease_info
217
218     def _get_num_lease_slots(self, f):
219         # how many places do we have allocated for leases? Not all of them
220         # are filled.
221         num_extra_leases = self._read_num_extra_leases(f)
222         return 4+num_extra_leases
223
224     def _get_first_empty_lease_slot(self, f):
225         # return an int with the index of an empty slot, or None if we do not
226         # currently have an empty slot
227
228         for i in range(self._get_num_lease_slots(f)):
229             if self._read_lease_record(f, i) is None:
230                 return i
231         return None
232
233     def _enumerate_leases(self, f):
234         """Yields (leasenum, (ownerid, expiration_time, renew_secret,
235         cancel_secret, accepting_nodeid)) for all leases."""
236         for i in range(self._get_num_lease_slots(f)):
237             try:
238                 data = self._read_lease_record(f, i)
239                 if data is not None:
240                     yield (i,data)
241             except IndexError:
242                 return
243
244     def debug_get_leases(self):
245         f = open(self.home, 'rb')
246         leases = list(self._enumerate_leases(f))
247         f.close()
248         return leases
249
250     def add_lease(self, lease_info):
251         precondition(lease_info.owner_num != 0) # 0 means "no lease here"
252         f = open(self.home, 'rb+')
253         num_lease_slots = self._get_num_lease_slots(f)
254         empty_slot = self._get_first_empty_lease_slot(f)
255         if empty_slot is not None:
256             self._write_lease_record(f, empty_slot, lease_info)
257         else:
258             self._write_lease_record(f, num_lease_slots, lease_info)
259         f.close()
260
261     def renew_lease(self, renew_secret, new_expire_time):
262         accepting_nodeids = set()
263         f = open(self.home, 'rb+')
264         for (leasenum,lease) in self._enumerate_leases(f):
265             if lease.renew_secret == renew_secret:
266                 # yup. See if we need to update the owner time.
267                 if new_expire_time > lease.expiration_time:
268                     # yes
269                     lease.expiration_time = new_expire_time
270                     self._write_lease_record(f, leasenum, lease)
271                 f.close()
272                 return
273             accepting_nodeids.add(lease.nodeid)
274         f.close()
275         # Return the accepting_nodeids set, to give the client a chance to
276         # update the leases on a share which has been migrated from its
277         # original server to a new one.
278         msg = ("Unable to renew non-existent lease. I have leases accepted by"
279                " nodeids: ")
280         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
281                          for anid in accepting_nodeids])
282         msg += " ."
283         raise IndexError(msg)
284
285     def add_or_renew_lease(self, lease_info):
286         precondition(lease_info.owner_num != 0) # 0 means "no lease here"
287         try:
288             self.renew_lease(lease_info.renew_secret,
289                              lease_info.expiration_time)
290         except IndexError:
291             self.add_lease(lease_info)
292
293     def cancel_lease(self, cancel_secret):
294         """Remove any leases with the given cancel_secret. If the last lease
295         is cancelled, the file will be removed. Return the number of bytes
296         that were freed (by truncating the list of leases, and possibly by
297         deleting the file. Raise IndexError if there was no lease with the
298         given cancel_secret."""
299
300         accepting_nodeids = set()
301         modified = 0
302         remaining = 0
303         blank_lease = LeaseInfo(owner_num=0,
304                                 renew_secret="\x00"*32,
305                                 cancel_secret="\x00"*32,
306                                 expiration_time=0,
307                                 nodeid="\x00"*20)
308         f = open(self.home, 'rb+')
309         for (leasenum,lease) in self._enumerate_leases(f):
310             accepting_nodeids.add(lease.nodeid)
311             if lease.cancel_secret == cancel_secret:
312                 self._write_lease_record(f, leasenum, blank_lease)
313                 modified += 1
314             else:
315                 remaining += 1
316         if modified:
317             freed_space = self._pack_leases(f)
318             f.close()
319             if not remaining:
320                 freed_space += os.stat(self.home)[stat.ST_SIZE]
321                 self.unlink()
322             return freed_space
323
324         msg = ("Unable to cancel non-existent lease. I have leases "
325                "accepted by nodeids: ")
326         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
327                          for anid in accepting_nodeids])
328         msg += " ."
329         raise IndexError(msg)
330
331     def _pack_leases(self, f):
332         # TODO: reclaim space from cancelled leases
333         return 0
334
335     def _read_write_enabler_and_nodeid(self, f):
336         f.seek(0)
337         data = f.read(self.HEADER_SIZE)
338         (magic,
339          write_enabler_nodeid, write_enabler,
340          data_length, extra_least_offset) = \
341          struct.unpack(">32s20s32sQQ", data)
342         assert magic == self.MAGIC
343         return (write_enabler, write_enabler_nodeid)
344
345     def readv(self, readv):
346         datav = []
347         f = open(self.home, 'rb')
348         for (offset, length) in readv:
349             datav.append(self._read_share_data(f, offset, length))
350         f.close()
351         return datav
352
353 #    def remote_get_length(self):
354 #        f = open(self.home, 'rb')
355 #        data_length = self._read_data_length(f)
356 #        f.close()
357 #        return data_length
358
359     def check_write_enabler(self, write_enabler, si_s):
360         f = open(self.home, 'rb+')
361         (real_write_enabler, write_enabler_nodeid) = \
362                              self._read_write_enabler_and_nodeid(f)
363         f.close()
364         if write_enabler != real_write_enabler:
365             # accomodate share migration by reporting the nodeid used for the
366             # old write enabler.
367             self.log(format="bad write enabler on SI %(si)s,"
368                      " recorded by nodeid %(nodeid)s",
369                      facility="tahoe.storage",
370                      level=log.WEIRD, umid="cE1eBQ",
371                      si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
372             msg = "The write enabler was recorded by nodeid '%s'." % \
373                   (idlib.nodeid_b2a(write_enabler_nodeid),)
374             raise BadWriteEnablerError(msg)
375
376     def check_testv(self, testv):
377         test_good = True
378         f = open(self.home, 'rb+')
379         for (offset, length, operator, specimen) in testv:
380             data = self._read_share_data(f, offset, length)
381             if not testv_compare(data, operator, specimen):
382                 test_good = False
383                 break
384         f.close()
385         return test_good
386
387     def writev(self, datav, new_length):
388         f = open(self.home, 'rb+')
389         for (offset, data) in datav:
390             self._write_share_data(f, offset, data)
391         if new_length is not None:
392             self._change_container_size(f, new_length)
393             f.seek(self.DATA_LENGTH_OFFSET)
394             f.write(struct.pack(">Q", new_length))
395         f.close()
396
397 def testv_compare(a, op, b):
398     assert op in ("lt", "le", "eq", "ne", "ge", "gt")
399     if op == "lt":
400         return a < b
401     if op == "le":
402         return a <= b
403     if op == "eq":
404         return a == b
405     if op == "ne":
406         return a != b
407     if op == "ge":
408         return a >= b
409     if op == "gt":
410         return a > b
411     # never reached
412
413 class EmptyShare:
414
415     def check_testv(self, testv):
416         test_good = True
417         for (offset, length, operator, specimen) in testv:
418             data = ""
419             if not testv_compare(data, operator, specimen):
420                 test_good = False
421                 break
422         return test_good
423
424 def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
425     ms = MutableShareFile(filename, parent)
426     ms.create(my_nodeid, write_enabler)
427     del ms
428     return MutableShareFile(filename, parent)
429