]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/mutable.py
e21c41dd98e60f771697f45ceb7332340a7aa663
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / mutable.py
1 import os, stat, struct
2
3 from allmydata.interfaces import BadWriteEnablerError
4 from allmydata.util import idlib, log
5 from allmydata.util.assertutil import precondition
6 from allmydata.util.hashutil import constant_time_compare
7 from allmydata.storage.lease import LeaseInfo
8 from allmydata.storage.common import UnknownMutableContainerVersionError, \
9      DataTooLargeError
10
11 # the MutableShareFile is like the ShareFile, but used for mutable data. It
12 # has a different layout. See docs/mutable.txt for more details.
13
14 # #   offset    size    name
15 # 1   0         32      magic verstr "tahoe mutable container v1" plus binary
16 # 2   32        20      write enabler's nodeid
17 # 3   52        32      write enabler
18 # 4   84        8       data size (actual share data present) (a)
19 # 5   92        8       offset of (8) count of extra leases (after data)
20 # 6   100       368     four leases, 92 bytes each
21 #                        0    4   ownerid (0 means "no lease here")
22 #                        4    4   expiration timestamp
23 #                        8   32   renewal token
24 #                        40  32   cancel token
25 #                        72  20   nodeid which accepted the tokens
26 # 7   468       (a)     data
27 # 8   ??        4       count of extra leases
28 # 9   ??        n*92    extra leases
29
30
31 assert struct.calcsize(">L") == 4 # The struct module doc says that L's are 4 bytes in size.
32 assert struct.calcsize(">Q") == 8 # The struct module doc says that Q's are 8 bytes in size (at least with big-endian ordering).
33
34 class MutableShareFile:
35
36     sharetype = "mutable"
37     DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
38     EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
39     HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
40     LEASE_SIZE = struct.calcsize(">LL32s32s20s")
41     assert LEASE_SIZE == 92
42     DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
43     assert DATA_OFFSET == 468, DATA_OFFSET
44     # our sharefiles share with a recognizable string, plus some random
45     # binary data to reduce the chance that a regular text file will look
46     # like a sharefile.
47     MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
48     assert len(MAGIC) == 32
49     MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
50     # TODO: decide upon a policy for max share size
51
52     def __init__(self, filename, parent=None):
53         self.home = filename
54         if os.path.exists(self.home):
55             # we don't cache anything, just check the magic
56             f = open(self.home, 'rb')
57             data = f.read(self.HEADER_SIZE)
58             (magic,
59              write_enabler_nodeid, write_enabler,
60              data_length, extra_least_offset) = \
61              struct.unpack(">32s20s32sQQ", data)
62             if magic != self.MAGIC:
63                 msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
64                       (filename, magic, self.MAGIC)
65                 raise UnknownMutableContainerVersionError(msg)
66         self.parent = parent # for logging
67
68     def log(self, *args, **kwargs):
69         return self.parent.log(*args, **kwargs)
70
71     def create(self, my_nodeid, write_enabler):
72         assert not os.path.exists(self.home)
73         data_length = 0
74         extra_lease_offset = (self.HEADER_SIZE
75                               + 4 * self.LEASE_SIZE
76                               + data_length)
77         assert extra_lease_offset == self.DATA_OFFSET # true at creation
78         num_extra_leases = 0
79         f = open(self.home, 'wb')
80         header = struct.pack(">32s20s32sQQ",
81                              self.MAGIC, my_nodeid, write_enabler,
82                              data_length, extra_lease_offset,
83                              )
84         leases = ("\x00"*self.LEASE_SIZE) * 4
85         f.write(header + leases)
86         # data goes here, empty after creation
87         f.write(struct.pack(">L", num_extra_leases))
88         # extra leases go here, none at creation
89         f.close()
90
91     def unlink(self):
92         os.unlink(self.home)
93
94     def _read_data_length(self, f):
95         f.seek(self.DATA_LENGTH_OFFSET)
96         (data_length,) = struct.unpack(">Q", f.read(8))
97         return data_length
98
99     def _write_data_length(self, f, data_length):
100         f.seek(self.DATA_LENGTH_OFFSET)
101         f.write(struct.pack(">Q", data_length))
102
103     def _read_share_data(self, f, offset, length):
104         precondition(offset >= 0)
105         data_length = self._read_data_length(f)
106         if offset+length > data_length:
107             # reads beyond the end of the data are truncated. Reads that
108             # start beyond the end of the data return an empty string.
109             length = max(0, data_length-offset)
110         if length == 0:
111             return ""
112         precondition(offset+length <= data_length)
113         f.seek(self.DATA_OFFSET+offset)
114         data = f.read(length)
115         return data
116
117     def _read_extra_lease_offset(self, f):
118         f.seek(self.EXTRA_LEASE_OFFSET)
119         (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
120         return extra_lease_offset
121
122     def _write_extra_lease_offset(self, f, offset):
123         f.seek(self.EXTRA_LEASE_OFFSET)
124         f.write(struct.pack(">Q", offset))
125
126     def _read_num_extra_leases(self, f):
127         offset = self._read_extra_lease_offset(f)
128         f.seek(offset)
129         (num_extra_leases,) = struct.unpack(">L", f.read(4))
130         return num_extra_leases
131
132     def _write_num_extra_leases(self, f, num_leases):
133         extra_lease_offset = self._read_extra_lease_offset(f)
134         f.seek(extra_lease_offset)
135         f.write(struct.pack(">L", num_leases))
136
137     def _change_container_size(self, f, new_container_size):
138         if new_container_size > self.MAX_SIZE:
139             raise DataTooLargeError()
140         old_extra_lease_offset = self._read_extra_lease_offset(f)
141         new_extra_lease_offset = self.DATA_OFFSET + new_container_size
142         if new_extra_lease_offset < old_extra_lease_offset:
143             # TODO: allow containers to shrink. For now they remain large.
144             return
145         num_extra_leases = self._read_num_extra_leases(f)
146         f.seek(old_extra_lease_offset)
147         extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
148         f.seek(new_extra_lease_offset)
149         f.write(extra_lease_data)
150         # an interrupt here will corrupt the leases, iff the move caused the
151         # extra leases to overlap.
152         self._write_extra_lease_offset(f, new_extra_lease_offset)
153
154     def _write_share_data(self, f, offset, data):
155         length = len(data)
156         precondition(offset >= 0)
157         data_length = self._read_data_length(f)
158         extra_lease_offset = self._read_extra_lease_offset(f)
159
160         if offset+length >= data_length:
161             # They are expanding their data size.
162             if self.DATA_OFFSET+offset+length > extra_lease_offset:
163                 # Their new data won't fit in the current container, so we
164                 # have to move the leases. With luck, they're expanding it
165                 # more than the size of the extra lease block, which will
166                 # minimize the corrupt-the-share window
167                 self._change_container_size(f, offset+length)
168                 extra_lease_offset = self._read_extra_lease_offset(f)
169
170                 # an interrupt here is ok.. the container has been enlarged
171                 # but the data remains untouched
172
173             assert self.DATA_OFFSET+offset+length <= extra_lease_offset
174             # Their data now fits in the current container. We must write
175             # their new data and modify the recorded data size.
176             new_data_length = offset+length
177             self._write_data_length(f, new_data_length)
178             # an interrupt here will result in a corrupted share
179
180         # now all that's left to do is write out their data
181         f.seek(self.DATA_OFFSET+offset)
182         f.write(data)
183         return
184
185     def _write_lease_record(self, f, lease_number, lease_info):
186         extra_lease_offset = self._read_extra_lease_offset(f)
187         num_extra_leases = self._read_num_extra_leases(f)
188         if lease_number < 4:
189             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
190         elif (lease_number-4) < num_extra_leases:
191             offset = (extra_lease_offset
192                       + 4
193                       + (lease_number-4)*self.LEASE_SIZE)
194         else:
195             # must add an extra lease record
196             self._write_num_extra_leases(f, num_extra_leases+1)
197             offset = (extra_lease_offset
198                       + 4
199                       + (lease_number-4)*self.LEASE_SIZE)
200         f.seek(offset)
201         assert f.tell() == offset
202         f.write(lease_info.to_mutable_data())
203
204     def _read_lease_record(self, f, lease_number):
205         # returns a LeaseInfo instance, or None
206         extra_lease_offset = self._read_extra_lease_offset(f)
207         num_extra_leases = self._read_num_extra_leases(f)
208         if lease_number < 4:
209             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
210         elif (lease_number-4) < num_extra_leases:
211             offset = (extra_lease_offset
212                       + 4
213                       + (lease_number-4)*self.LEASE_SIZE)
214         else:
215             raise IndexError("No such lease number %d" % lease_number)
216         f.seek(offset)
217         assert f.tell() == offset
218         data = f.read(self.LEASE_SIZE)
219         lease_info = LeaseInfo().from_mutable_data(data)
220         if lease_info.owner_num == 0:
221             return None
222         return lease_info
223
224     def _get_num_lease_slots(self, f):
225         # how many places do we have allocated for leases? Not all of them
226         # are filled.
227         num_extra_leases = self._read_num_extra_leases(f)
228         return 4+num_extra_leases
229
230     def _get_first_empty_lease_slot(self, f):
231         # return an int with the index of an empty slot, or None if we do not
232         # currently have an empty slot
233
234         for i in range(self._get_num_lease_slots(f)):
235             if self._read_lease_record(f, i) is None:
236                 return i
237         return None
238
239     def get_leases(self):
240         """Yields a LeaseInfo instance for all leases."""
241         f = open(self.home, 'rb')
242         for i, lease in self._enumerate_leases(f):
243             yield lease
244         f.close()
245
246     def _enumerate_leases(self, f):
247         for i in range(self._get_num_lease_slots(f)):
248             try:
249                 data = self._read_lease_record(f, i)
250                 if data is not None:
251                     yield i,data
252             except IndexError:
253                 return
254
255     def add_lease(self, lease_info):
256         precondition(lease_info.owner_num != 0) # 0 means "no lease here"
257         f = open(self.home, 'rb+')
258         num_lease_slots = self._get_num_lease_slots(f)
259         empty_slot = self._get_first_empty_lease_slot(f)
260         if empty_slot is not None:
261             self._write_lease_record(f, empty_slot, lease_info)
262         else:
263             self._write_lease_record(f, num_lease_slots, lease_info)
264         f.close()
265
266     def renew_lease(self, renew_secret, new_expire_time):
267         accepting_nodeids = set()
268         f = open(self.home, 'rb+')
269         for (leasenum,lease) in self._enumerate_leases(f):
270             if constant_time_compare(lease.renew_secret, renew_secret):
271                 # yup. See if we need to update the owner time.
272                 if new_expire_time > lease.expiration_time:
273                     # yes
274                     lease.expiration_time = new_expire_time
275                     self._write_lease_record(f, leasenum, lease)
276                 f.close()
277                 return
278             accepting_nodeids.add(lease.nodeid)
279         f.close()
280         # Return the accepting_nodeids set, to give the client a chance to
281         # update the leases on a share which has been migrated from its
282         # original server to a new one.
283         msg = ("Unable to renew non-existent lease. I have leases accepted by"
284                " nodeids: ")
285         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
286                          for anid in accepting_nodeids])
287         msg += " ."
288         raise IndexError(msg)
289
290     def add_or_renew_lease(self, lease_info):
291         precondition(lease_info.owner_num != 0) # 0 means "no lease here"
292         try:
293             self.renew_lease(lease_info.renew_secret,
294                              lease_info.expiration_time)
295         except IndexError:
296             self.add_lease(lease_info)
297
298     def cancel_lease(self, cancel_secret):
299         """Remove any leases with the given cancel_secret. If the last lease
300         is cancelled, the file will be removed. Return the number of bytes
301         that were freed (by truncating the list of leases, and possibly by
302         deleting the file. Raise IndexError if there was no lease with the
303         given cancel_secret."""
304
305         accepting_nodeids = set()
306         modified = 0
307         remaining = 0
308         blank_lease = LeaseInfo(owner_num=0,
309                                 renew_secret="\x00"*32,
310                                 cancel_secret="\x00"*32,
311                                 expiration_time=0,
312                                 nodeid="\x00"*20)
313         f = open(self.home, 'rb+')
314         for (leasenum,lease) in self._enumerate_leases(f):
315             accepting_nodeids.add(lease.nodeid)
316             if constant_time_compare(lease.cancel_secret, cancel_secret):
317                 self._write_lease_record(f, leasenum, blank_lease)
318                 modified += 1
319             else:
320                 remaining += 1
321         if modified:
322             freed_space = self._pack_leases(f)
323             f.close()
324             if not remaining:
325                 freed_space += os.stat(self.home)[stat.ST_SIZE]
326                 self.unlink()
327             return freed_space
328
329         msg = ("Unable to cancel non-existent lease. I have leases "
330                "accepted by nodeids: ")
331         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
332                          for anid in accepting_nodeids])
333         msg += " ."
334         raise IndexError(msg)
335
336     def _pack_leases(self, f):
337         # TODO: reclaim space from cancelled leases
338         return 0
339
340     def _read_write_enabler_and_nodeid(self, f):
341         f.seek(0)
342         data = f.read(self.HEADER_SIZE)
343         (magic,
344          write_enabler_nodeid, write_enabler,
345          data_length, extra_least_offset) = \
346          struct.unpack(">32s20s32sQQ", data)
347         assert magic == self.MAGIC
348         return (write_enabler, write_enabler_nodeid)
349
350     def readv(self, readv):
351         datav = []
352         f = open(self.home, 'rb')
353         for (offset, length) in readv:
354             datav.append(self._read_share_data(f, offset, length))
355         f.close()
356         return datav
357
358 #    def remote_get_length(self):
359 #        f = open(self.home, 'rb')
360 #        data_length = self._read_data_length(f)
361 #        f.close()
362 #        return data_length
363
364     def check_write_enabler(self, write_enabler, si_s):
365         f = open(self.home, 'rb+')
366         (real_write_enabler, write_enabler_nodeid) = \
367                              self._read_write_enabler_and_nodeid(f)
368         f.close()
369         # avoid a timing attack
370         #if write_enabler != real_write_enabler:
371         if not constant_time_compare(write_enabler, real_write_enabler):
372             # accomodate share migration by reporting the nodeid used for the
373             # old write enabler.
374             self.log(format="bad write enabler on SI %(si)s,"
375                      " recorded by nodeid %(nodeid)s",
376                      facility="tahoe.storage",
377                      level=log.WEIRD, umid="cE1eBQ",
378                      si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
379             msg = "The write enabler was recorded by nodeid '%s'." % \
380                   (idlib.nodeid_b2a(write_enabler_nodeid),)
381             raise BadWriteEnablerError(msg)
382
383     def check_testv(self, testv):
384         test_good = True
385         f = open(self.home, 'rb+')
386         for (offset, length, operator, specimen) in testv:
387             data = self._read_share_data(f, offset, length)
388             if not testv_compare(data, operator, specimen):
389                 test_good = False
390                 break
391         f.close()
392         return test_good
393
394     def writev(self, datav, new_length):
395         f = open(self.home, 'rb+')
396         for (offset, data) in datav:
397             self._write_share_data(f, offset, data)
398         if new_length is not None:
399             self._change_container_size(f, new_length)
400             f.seek(self.DATA_LENGTH_OFFSET)
401             f.write(struct.pack(">Q", new_length))
402         f.close()
403
404 def testv_compare(a, op, b):
405     assert op in ("lt", "le", "eq", "ne", "ge", "gt")
406     if op == "lt":
407         return a < b
408     if op == "le":
409         return a <= b
410     if op == "eq":
411         return a == b
412     if op == "ne":
413         return a != b
414     if op == "ge":
415         return a >= b
416     if op == "gt":
417         return a > b
418     # never reached
419
420 class EmptyShare:
421
422     def check_testv(self, testv):
423         test_good = True
424         for (offset, length, operator, specimen) in testv:
425             data = ""
426             if not testv_compare(data, operator, specimen):
427                 test_good = False
428                 break
429         return test_good
430
431 def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
432     ms = MutableShareFile(filename, parent)
433     ms.create(my_nodeid, write_enabler)
434     del ms
435     return MutableShareFile(filename, parent)
436