]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/mutable.py
storage/mutable: raise a specific error upon seeing bad magic, instead of using assert
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / mutable.py
1 import os, stat, struct
2
3 from allmydata.interfaces import BadWriteEnablerError
4 from allmydata.util import idlib, log
5 from allmydata.util.assertutil import precondition
6 from allmydata.storage.lease import LeaseInfo
7 from allmydata.storage.common import UnknownMutableContainerVersionError, \
8      DataTooLargeError
9
10 # the MutableShareFile is like the ShareFile, but used for mutable data. It
11 # has a different layout. See docs/mutable.txt for more details.
12
13 # #   offset    size    name
14 # 1   0         32      magic verstr "tahoe mutable container v1" plus binary
15 # 2   32        20      write enabler's nodeid
16 # 3   52        32      write enabler
17 # 4   84        8       data size (actual share data present) (a)
18 # 5   92        8       offset of (8) count of extra leases (after data)
19 # 6   100       368     four leases, 92 bytes each
20 #                        0    4   ownerid (0 means "no lease here")
21 #                        4    4   expiration timestamp
22 #                        8   32   renewal token
23 #                        40  32   cancel token
24 #                        72  20   nodeid which accepted the tokens
25 # 7   468       (a)     data
26 # 8   ??        4       count of extra leases
27 # 9   ??        n*92    extra leases
28
29
30 assert struct.calcsize("L"), 4 # The struct module doc says that L's are 4 bytes in size.
31 assert struct.calcsize("Q"), 8 # The struct module doc says that Q's are 8 bytes in size (at least with big-endian ordering).
32
33 class MutableShareFile:
34
35     DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
36     EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
37     HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
38     LEASE_SIZE = struct.calcsize(">LL32s32s20s")
39     assert LEASE_SIZE == 92
40     DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
41     assert DATA_OFFSET == 468, DATA_OFFSET
42     # our sharefiles share with a recognizable string, plus some random
43     # binary data to reduce the chance that a regular text file will look
44     # like a sharefile.
45     MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
46     assert len(MAGIC) == 32
47     MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
48     # TODO: decide upon a policy for max share size
49
50     def __init__(self, filename, parent=None):
51         self.home = filename
52         if os.path.exists(self.home):
53             # we don't cache anything, just check the magic
54             f = open(self.home, 'rb')
55             data = f.read(self.HEADER_SIZE)
56             (magic,
57              write_enabler_nodeid, write_enabler,
58              data_length, extra_least_offset) = \
59              struct.unpack(">32s20s32sQQ", data)
60             if magic != self.MAGIC:
61                 msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
62                       (filename, magic, self.MAGIC)
63                 raise UnknownMutableContainerVersionError(msg)
64         self.parent = parent # for logging
65
66     def log(self, *args, **kwargs):
67         return self.parent.log(*args, **kwargs)
68
69     def create(self, my_nodeid, write_enabler):
70         assert not os.path.exists(self.home)
71         data_length = 0
72         extra_lease_offset = (self.HEADER_SIZE
73                               + 4 * self.LEASE_SIZE
74                               + data_length)
75         assert extra_lease_offset == self.DATA_OFFSET # true at creation
76         num_extra_leases = 0
77         f = open(self.home, 'wb')
78         header = struct.pack(">32s20s32sQQ",
79                              self.MAGIC, my_nodeid, write_enabler,
80                              data_length, extra_lease_offset,
81                              )
82         leases = ("\x00"*self.LEASE_SIZE) * 4
83         f.write(header + leases)
84         # data goes here, empty after creation
85         f.write(struct.pack(">L", num_extra_leases))
86         # extra leases go here, none at creation
87         f.close()
88
89     def unlink(self):
90         os.unlink(self.home)
91
92     def _read_data_length(self, f):
93         f.seek(self.DATA_LENGTH_OFFSET)
94         (data_length,) = struct.unpack(">Q", f.read(8))
95         return data_length
96
97     def _write_data_length(self, f, data_length):
98         f.seek(self.DATA_LENGTH_OFFSET)
99         f.write(struct.pack(">Q", data_length))
100
101     def _read_share_data(self, f, offset, length):
102         precondition(offset >= 0)
103         data_length = self._read_data_length(f)
104         if offset+length > data_length:
105             # reads beyond the end of the data are truncated. Reads that
106             # start beyond the end of the data return an empty string.
107             length = max(0, data_length-offset)
108         if length == 0:
109             return ""
110         precondition(offset+length <= data_length)
111         f.seek(self.DATA_OFFSET+offset)
112         data = f.read(length)
113         return data
114
115     def _read_extra_lease_offset(self, f):
116         f.seek(self.EXTRA_LEASE_OFFSET)
117         (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
118         return extra_lease_offset
119
120     def _write_extra_lease_offset(self, f, offset):
121         f.seek(self.EXTRA_LEASE_OFFSET)
122         f.write(struct.pack(">Q", offset))
123
124     def _read_num_extra_leases(self, f):
125         offset = self._read_extra_lease_offset(f)
126         f.seek(offset)
127         (num_extra_leases,) = struct.unpack(">L", f.read(4))
128         return num_extra_leases
129
130     def _write_num_extra_leases(self, f, num_leases):
131         extra_lease_offset = self._read_extra_lease_offset(f)
132         f.seek(extra_lease_offset)
133         f.write(struct.pack(">L", num_leases))
134
135     def _change_container_size(self, f, new_container_size):
136         if new_container_size > self.MAX_SIZE:
137             raise DataTooLargeError()
138         old_extra_lease_offset = self._read_extra_lease_offset(f)
139         new_extra_lease_offset = self.DATA_OFFSET + new_container_size
140         if new_extra_lease_offset < old_extra_lease_offset:
141             # TODO: allow containers to shrink. For now they remain large.
142             return
143         num_extra_leases = self._read_num_extra_leases(f)
144         f.seek(old_extra_lease_offset)
145         extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
146         f.seek(new_extra_lease_offset)
147         f.write(extra_lease_data)
148         # an interrupt here will corrupt the leases, iff the move caused the
149         # extra leases to overlap.
150         self._write_extra_lease_offset(f, new_extra_lease_offset)
151
152     def _write_share_data(self, f, offset, data):
153         length = len(data)
154         precondition(offset >= 0)
155         data_length = self._read_data_length(f)
156         extra_lease_offset = self._read_extra_lease_offset(f)
157
158         if offset+length >= data_length:
159             # They are expanding their data size.
160             if self.DATA_OFFSET+offset+length > extra_lease_offset:
161                 # Their new data won't fit in the current container, so we
162                 # have to move the leases. With luck, they're expanding it
163                 # more than the size of the extra lease block, which will
164                 # minimize the corrupt-the-share window
165                 self._change_container_size(f, offset+length)
166                 extra_lease_offset = self._read_extra_lease_offset(f)
167
168                 # an interrupt here is ok.. the container has been enlarged
169                 # but the data remains untouched
170
171             assert self.DATA_OFFSET+offset+length <= extra_lease_offset
172             # Their data now fits in the current container. We must write
173             # their new data and modify the recorded data size.
174             new_data_length = offset+length
175             self._write_data_length(f, new_data_length)
176             # an interrupt here will result in a corrupted share
177
178         # now all that's left to do is write out their data
179         f.seek(self.DATA_OFFSET+offset)
180         f.write(data)
181         return
182
183     def _write_lease_record(self, f, lease_number, lease_info):
184         extra_lease_offset = self._read_extra_lease_offset(f)
185         num_extra_leases = self._read_num_extra_leases(f)
186         if lease_number < 4:
187             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
188         elif (lease_number-4) < num_extra_leases:
189             offset = (extra_lease_offset
190                       + 4
191                       + (lease_number-4)*self.LEASE_SIZE)
192         else:
193             # must add an extra lease record
194             self._write_num_extra_leases(f, num_extra_leases+1)
195             offset = (extra_lease_offset
196                       + 4
197                       + (lease_number-4)*self.LEASE_SIZE)
198         f.seek(offset)
199         assert f.tell() == offset
200         f.write(lease_info.to_mutable_data())
201
202     def _read_lease_record(self, f, lease_number):
203         # returns a LeaseInfo instance, or None
204         extra_lease_offset = self._read_extra_lease_offset(f)
205         num_extra_leases = self._read_num_extra_leases(f)
206         if lease_number < 4:
207             offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
208         elif (lease_number-4) < num_extra_leases:
209             offset = (extra_lease_offset
210                       + 4
211                       + (lease_number-4)*self.LEASE_SIZE)
212         else:
213             raise IndexError("No such lease number %d" % lease_number)
214         f.seek(offset)
215         assert f.tell() == offset
216         data = f.read(self.LEASE_SIZE)
217         lease_info = LeaseInfo().from_mutable_data(data)
218         if lease_info.owner_num == 0:
219             return None
220         return lease_info
221
222     def _get_num_lease_slots(self, f):
223         # how many places do we have allocated for leases? Not all of them
224         # are filled.
225         num_extra_leases = self._read_num_extra_leases(f)
226         return 4+num_extra_leases
227
228     def _get_first_empty_lease_slot(self, f):
229         # return an int with the index of an empty slot, or None if we do not
230         # currently have an empty slot
231
232         for i in range(self._get_num_lease_slots(f)):
233             if self._read_lease_record(f, i) is None:
234                 return i
235         return None
236
237     def get_leases(self):
238         """Yields a LeaseInfo instance for all leases."""
239         f = open(self.home, 'rb')
240         for i, lease in self._enumerate_leases(f):
241             yield lease
242         f.close()
243
244     def _enumerate_leases(self, f):
245         for i in range(self._get_num_lease_slots(f)):
246             try:
247                 data = self._read_lease_record(f, i)
248                 if data is not None:
249                     yield i,data
250             except IndexError:
251                 return
252
253     def add_lease(self, lease_info):
254         precondition(lease_info.owner_num != 0) # 0 means "no lease here"
255         f = open(self.home, 'rb+')
256         num_lease_slots = self._get_num_lease_slots(f)
257         empty_slot = self._get_first_empty_lease_slot(f)
258         if empty_slot is not None:
259             self._write_lease_record(f, empty_slot, lease_info)
260         else:
261             self._write_lease_record(f, num_lease_slots, lease_info)
262         f.close()
263
264     def renew_lease(self, renew_secret, new_expire_time):
265         accepting_nodeids = set()
266         f = open(self.home, 'rb+')
267         for (leasenum,lease) in self._enumerate_leases(f):
268             if lease.renew_secret == renew_secret:
269                 # yup. See if we need to update the owner time.
270                 if new_expire_time > lease.expiration_time:
271                     # yes
272                     lease.expiration_time = new_expire_time
273                     self._write_lease_record(f, leasenum, lease)
274                 f.close()
275                 return
276             accepting_nodeids.add(lease.nodeid)
277         f.close()
278         # Return the accepting_nodeids set, to give the client a chance to
279         # update the leases on a share which has been migrated from its
280         # original server to a new one.
281         msg = ("Unable to renew non-existent lease. I have leases accepted by"
282                " nodeids: ")
283         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
284                          for anid in accepting_nodeids])
285         msg += " ."
286         raise IndexError(msg)
287
288     def add_or_renew_lease(self, lease_info):
289         precondition(lease_info.owner_num != 0) # 0 means "no lease here"
290         try:
291             self.renew_lease(lease_info.renew_secret,
292                              lease_info.expiration_time)
293         except IndexError:
294             self.add_lease(lease_info)
295
296     def cancel_lease(self, cancel_secret):
297         """Remove any leases with the given cancel_secret. If the last lease
298         is cancelled, the file will be removed. Return the number of bytes
299         that were freed (by truncating the list of leases, and possibly by
300         deleting the file. Raise IndexError if there was no lease with the
301         given cancel_secret."""
302
303         accepting_nodeids = set()
304         modified = 0
305         remaining = 0
306         blank_lease = LeaseInfo(owner_num=0,
307                                 renew_secret="\x00"*32,
308                                 cancel_secret="\x00"*32,
309                                 expiration_time=0,
310                                 nodeid="\x00"*20)
311         f = open(self.home, 'rb+')
312         for (leasenum,lease) in self._enumerate_leases(f):
313             accepting_nodeids.add(lease.nodeid)
314             if lease.cancel_secret == cancel_secret:
315                 self._write_lease_record(f, leasenum, blank_lease)
316                 modified += 1
317             else:
318                 remaining += 1
319         if modified:
320             freed_space = self._pack_leases(f)
321             f.close()
322             if not remaining:
323                 freed_space += os.stat(self.home)[stat.ST_SIZE]
324                 self.unlink()
325             return freed_space
326
327         msg = ("Unable to cancel non-existent lease. I have leases "
328                "accepted by nodeids: ")
329         msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
330                          for anid in accepting_nodeids])
331         msg += " ."
332         raise IndexError(msg)
333
334     def _pack_leases(self, f):
335         # TODO: reclaim space from cancelled leases
336         return 0
337
338     def _read_write_enabler_and_nodeid(self, f):
339         f.seek(0)
340         data = f.read(self.HEADER_SIZE)
341         (magic,
342          write_enabler_nodeid, write_enabler,
343          data_length, extra_least_offset) = \
344          struct.unpack(">32s20s32sQQ", data)
345         assert magic == self.MAGIC
346         return (write_enabler, write_enabler_nodeid)
347
348     def readv(self, readv):
349         datav = []
350         f = open(self.home, 'rb')
351         for (offset, length) in readv:
352             datav.append(self._read_share_data(f, offset, length))
353         f.close()
354         return datav
355
356 #    def remote_get_length(self):
357 #        f = open(self.home, 'rb')
358 #        data_length = self._read_data_length(f)
359 #        f.close()
360 #        return data_length
361
362     def check_write_enabler(self, write_enabler, si_s):
363         f = open(self.home, 'rb+')
364         (real_write_enabler, write_enabler_nodeid) = \
365                              self._read_write_enabler_and_nodeid(f)
366         f.close()
367         if write_enabler != real_write_enabler:
368             # accomodate share migration by reporting the nodeid used for the
369             # old write enabler.
370             self.log(format="bad write enabler on SI %(si)s,"
371                      " recorded by nodeid %(nodeid)s",
372                      facility="tahoe.storage",
373                      level=log.WEIRD, umid="cE1eBQ",
374                      si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
375             msg = "The write enabler was recorded by nodeid '%s'." % \
376                   (idlib.nodeid_b2a(write_enabler_nodeid),)
377             raise BadWriteEnablerError(msg)
378
379     def check_testv(self, testv):
380         test_good = True
381         f = open(self.home, 'rb+')
382         for (offset, length, operator, specimen) in testv:
383             data = self._read_share_data(f, offset, length)
384             if not testv_compare(data, operator, specimen):
385                 test_good = False
386                 break
387         f.close()
388         return test_good
389
390     def writev(self, datav, new_length):
391         f = open(self.home, 'rb+')
392         for (offset, data) in datav:
393             self._write_share_data(f, offset, data)
394         if new_length is not None:
395             self._change_container_size(f, new_length)
396             f.seek(self.DATA_LENGTH_OFFSET)
397             f.write(struct.pack(">Q", new_length))
398         f.close()
399
400 def testv_compare(a, op, b):
401     assert op in ("lt", "le", "eq", "ne", "ge", "gt")
402     if op == "lt":
403         return a < b
404     if op == "le":
405         return a <= b
406     if op == "eq":
407         return a == b
408     if op == "ne":
409         return a != b
410     if op == "ge":
411         return a >= b
412     if op == "gt":
413         return a > b
414     # never reached
415
416 class EmptyShare:
417
418     def check_testv(self, testv):
419         test_good = True
420         for (offset, length, operator, specimen) in testv:
421             data = ""
422             if not testv_compare(data, operator, specimen):
423                 test_good = False
424                 break
425         return test_good
426
427 def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
428     ms = MutableShareFile(filename, parent)
429     ms.create(my_nodeid, write_enabler)
430     del ms
431     return MutableShareFile(filename, parent)
432