1 import os, stat, struct
3 from allmydata.interfaces import BadWriteEnablerError
4 from allmydata.util import idlib, log
5 from allmydata.util.assertutil import precondition
6 from allmydata.util.hashutil import constant_time_compare
7 from allmydata.storage.lease import LeaseInfo
8 from allmydata.storage.common import UnknownMutableContainerVersionError, \
10 from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
13 # the MutableShareFile is like the ShareFile, but used for mutable data. It
14 # has a different layout. See docs/mutable.txt for more details.
17 # 1 0 32 magic verstr "tahoe mutable container v1" plus binary
18 # 2 32 20 write enabler's nodeid
19 # 3 52 32 write enabler
20 # 4 84 8 data size (actual share data present) (a)
21 # 5 92 8 offset of (8) count of extra leases (after data)
22 # 6 100 368 four leases, 92 bytes each
23 # 0 4 ownerid (0 means "no lease here")
24 # 4 4 expiration timestamp
27 # 72 20 nodeid which accepted the tokens
29 # 8 ?? 4 count of extra leases
30 # 9 ?? n*92 extra leases
33 # The struct module doc says that L's are 4 bytes in size., and that Q's are
34 # 8 bytes in size. Since compatibility depends upon this, double-check it.
35 assert struct.calcsize(">L") == 4, struct.calcsize(">L")
36 assert struct.calcsize(">Q") == 8, struct.calcsize(">Q")
38 class MutableShareFile:
41 DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
42 EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
43 HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
44 LEASE_SIZE = struct.calcsize(">LL32s32s20s")
45 assert LEASE_SIZE == 92
46 DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
47 assert DATA_OFFSET == 468, DATA_OFFSET
48 # our sharefiles share with a recognizable string, plus some random
49 # binary data to reduce the chance that a regular text file will look
51 MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
52 assert len(MAGIC) == 32
53 MAX_SIZE = MAX_MUTABLE_SHARE_SIZE
54 # TODO: decide upon a policy for max share size
56 def __init__(self, filename, parent=None):
58 if os.path.exists(self.home):
59 # we don't cache anything, just check the magic
60 f = open(self.home, 'rb')
61 data = f.read(self.HEADER_SIZE)
63 write_enabler_nodeid, write_enabler,
64 data_length, extra_least_offset) = \
65 struct.unpack(">32s20s32sQQ", data)
66 if magic != self.MAGIC:
67 msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
68 (filename, magic, self.MAGIC)
69 raise UnknownMutableContainerVersionError(msg)
70 self.parent = parent # for logging
72 def log(self, *args, **kwargs):
73 return self.parent.log(*args, **kwargs)
75 def create(self, my_nodeid, write_enabler):
76 assert not os.path.exists(self.home)
78 extra_lease_offset = (self.HEADER_SIZE
81 assert extra_lease_offset == self.DATA_OFFSET # true at creation
83 f = open(self.home, 'wb')
84 header = struct.pack(">32s20s32sQQ",
85 self.MAGIC, my_nodeid, write_enabler,
86 data_length, extra_lease_offset,
88 leases = ("\x00"*self.LEASE_SIZE) * 4
89 f.write(header + leases)
90 # data goes here, empty after creation
91 f.write(struct.pack(">L", num_extra_leases))
92 # extra leases go here, none at creation
98 def _read_data_length(self, f):
99 f.seek(self.DATA_LENGTH_OFFSET)
100 (data_length,) = struct.unpack(">Q", f.read(8))
103 def _write_data_length(self, f, data_length):
104 f.seek(self.DATA_LENGTH_OFFSET)
105 f.write(struct.pack(">Q", data_length))
107 def _read_share_data(self, f, offset, length):
108 precondition(offset >= 0)
109 data_length = self._read_data_length(f)
110 if offset+length > data_length:
111 # reads beyond the end of the data are truncated. Reads that
112 # start beyond the end of the data return an empty string.
113 length = max(0, data_length-offset)
116 precondition(offset+length <= data_length)
117 f.seek(self.DATA_OFFSET+offset)
118 data = f.read(length)
121 def _read_extra_lease_offset(self, f):
122 f.seek(self.EXTRA_LEASE_OFFSET)
123 (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
124 return extra_lease_offset
126 def _write_extra_lease_offset(self, f, offset):
127 f.seek(self.EXTRA_LEASE_OFFSET)
128 f.write(struct.pack(">Q", offset))
130 def _read_num_extra_leases(self, f):
131 offset = self._read_extra_lease_offset(f)
133 (num_extra_leases,) = struct.unpack(">L", f.read(4))
134 return num_extra_leases
136 def _write_num_extra_leases(self, f, num_leases):
137 extra_lease_offset = self._read_extra_lease_offset(f)
138 f.seek(extra_lease_offset)
139 f.write(struct.pack(">L", num_leases))
141 def _change_container_size(self, f, new_container_size):
142 if new_container_size > self.MAX_SIZE:
143 raise DataTooLargeError()
144 old_extra_lease_offset = self._read_extra_lease_offset(f)
145 new_extra_lease_offset = self.DATA_OFFSET + new_container_size
146 if new_extra_lease_offset < old_extra_lease_offset:
147 # TODO: allow containers to shrink. For now they remain large.
149 num_extra_leases = self._read_num_extra_leases(f)
150 f.seek(old_extra_lease_offset)
151 leases_size = 4 + num_extra_leases * self.LEASE_SIZE
152 extra_lease_data = f.read(leases_size)
154 # Zero out the old lease info (in order to minimize the chance that
155 # it could accidentally be exposed to a reader later, re #1528).
156 f.seek(old_extra_lease_offset)
157 f.write('\x00' * leases_size)
160 # An interrupt here will corrupt the leases.
162 f.seek(new_extra_lease_offset)
163 f.write(extra_lease_data)
164 self._write_extra_lease_offset(f, new_extra_lease_offset)
166 def _write_share_data(self, f, offset, data):
168 precondition(offset >= 0)
169 data_length = self._read_data_length(f)
170 extra_lease_offset = self._read_extra_lease_offset(f)
172 if offset+length >= data_length:
173 # They are expanding their data size.
175 if self.DATA_OFFSET+offset+length > extra_lease_offset:
176 # TODO: allow containers to shrink. For now, they remain
179 # Their new data won't fit in the current container, so we
180 # have to move the leases. With luck, they're expanding it
181 # more than the size of the extra lease block, which will
182 # minimize the corrupt-the-share window
183 self._change_container_size(f, offset+length)
184 extra_lease_offset = self._read_extra_lease_offset(f)
186 # an interrupt here is ok.. the container has been enlarged
187 # but the data remains untouched
189 assert self.DATA_OFFSET+offset+length <= extra_lease_offset
190 # Their data now fits in the current container. We must write
191 # their new data and modify the recorded data size.
193 # Fill any newly exposed empty space with 0's.
194 if offset > data_length:
195 f.seek(self.DATA_OFFSET+data_length)
196 f.write('\x00'*(offset - data_length))
199 new_data_length = offset+length
200 self._write_data_length(f, new_data_length)
201 # an interrupt here will result in a corrupted share
203 # now all that's left to do is write out their data
204 f.seek(self.DATA_OFFSET+offset)
208 def _write_lease_record(self, f, lease_number, lease_info):
209 extra_lease_offset = self._read_extra_lease_offset(f)
210 num_extra_leases = self._read_num_extra_leases(f)
212 offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
213 elif (lease_number-4) < num_extra_leases:
214 offset = (extra_lease_offset
216 + (lease_number-4)*self.LEASE_SIZE)
218 # must add an extra lease record
219 self._write_num_extra_leases(f, num_extra_leases+1)
220 offset = (extra_lease_offset
222 + (lease_number-4)*self.LEASE_SIZE)
224 assert f.tell() == offset
225 f.write(lease_info.to_mutable_data())
227 def _read_lease_record(self, f, lease_number):
228 # returns a LeaseInfo instance, or None
229 extra_lease_offset = self._read_extra_lease_offset(f)
230 num_extra_leases = self._read_num_extra_leases(f)
232 offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
233 elif (lease_number-4) < num_extra_leases:
234 offset = (extra_lease_offset
236 + (lease_number-4)*self.LEASE_SIZE)
238 raise IndexError("No such lease number %d" % lease_number)
240 assert f.tell() == offset
241 data = f.read(self.LEASE_SIZE)
242 lease_info = LeaseInfo().from_mutable_data(data)
243 if lease_info.owner_num == 0:
247 def _get_num_lease_slots(self, f):
248 # how many places do we have allocated for leases? Not all of them
250 num_extra_leases = self._read_num_extra_leases(f)
251 return 4+num_extra_leases
253 def _get_first_empty_lease_slot(self, f):
254 # return an int with the index of an empty slot, or None if we do not
255 # currently have an empty slot
257 for i in range(self._get_num_lease_slots(f)):
258 if self._read_lease_record(f, i) is None:
262 def get_leases(self):
263 """Yields a LeaseInfo instance for all leases."""
264 f = open(self.home, 'rb')
265 for i, lease in self._enumerate_leases(f):
269 def _enumerate_leases(self, f):
270 for i in range(self._get_num_lease_slots(f)):
272 data = self._read_lease_record(f, i)
278 def add_lease(self, lease_info):
279 precondition(lease_info.owner_num != 0) # 0 means "no lease here"
280 f = open(self.home, 'rb+')
281 num_lease_slots = self._get_num_lease_slots(f)
282 empty_slot = self._get_first_empty_lease_slot(f)
283 if empty_slot is not None:
284 self._write_lease_record(f, empty_slot, lease_info)
286 self._write_lease_record(f, num_lease_slots, lease_info)
289 def renew_lease(self, renew_secret, new_expire_time):
290 accepting_nodeids = set()
291 f = open(self.home, 'rb+')
292 for (leasenum,lease) in self._enumerate_leases(f):
293 if constant_time_compare(lease.renew_secret, renew_secret):
294 # yup. See if we need to update the owner time.
295 if new_expire_time > lease.expiration_time:
297 lease.expiration_time = new_expire_time
298 self._write_lease_record(f, leasenum, lease)
301 accepting_nodeids.add(lease.nodeid)
303 # Return the accepting_nodeids set, to give the client a chance to
304 # update the leases on a share which has been migrated from its
305 # original server to a new one.
306 msg = ("Unable to renew non-existent lease. I have leases accepted by"
308 msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
309 for anid in accepting_nodeids])
311 raise IndexError(msg)
313 def add_or_renew_lease(self, lease_info):
314 precondition(lease_info.owner_num != 0) # 0 means "no lease here"
316 self.renew_lease(lease_info.renew_secret,
317 lease_info.expiration_time)
319 self.add_lease(lease_info)
321 def cancel_lease(self, cancel_secret):
322 """Remove any leases with the given cancel_secret. If the last lease
323 is cancelled, the file will be removed. Return the number of bytes
324 that were freed (by truncating the list of leases, and possibly by
325 deleting the file. Raise IndexError if there was no lease with the
326 given cancel_secret."""
328 accepting_nodeids = set()
331 blank_lease = LeaseInfo(owner_num=0,
332 renew_secret="\x00"*32,
333 cancel_secret="\x00"*32,
336 f = open(self.home, 'rb+')
337 for (leasenum,lease) in self._enumerate_leases(f):
338 accepting_nodeids.add(lease.nodeid)
339 if constant_time_compare(lease.cancel_secret, cancel_secret):
340 self._write_lease_record(f, leasenum, blank_lease)
345 freed_space = self._pack_leases(f)
348 freed_space += os.stat(self.home)[stat.ST_SIZE]
352 msg = ("Unable to cancel non-existent lease. I have leases "
353 "accepted by nodeids: ")
354 msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
355 for anid in accepting_nodeids])
357 raise IndexError(msg)
359 def _pack_leases(self, f):
360 # TODO: reclaim space from cancelled leases
363 def _read_write_enabler_and_nodeid(self, f):
365 data = f.read(self.HEADER_SIZE)
367 write_enabler_nodeid, write_enabler,
368 data_length, extra_least_offset) = \
369 struct.unpack(">32s20s32sQQ", data)
370 assert magic == self.MAGIC
371 return (write_enabler, write_enabler_nodeid)
373 def readv(self, readv):
375 f = open(self.home, 'rb')
376 for (offset, length) in readv:
377 datav.append(self._read_share_data(f, offset, length))
381 # def remote_get_length(self):
382 # f = open(self.home, 'rb')
383 # data_length = self._read_data_length(f)
387 def check_write_enabler(self, write_enabler, si_s):
388 f = open(self.home, 'rb+')
389 (real_write_enabler, write_enabler_nodeid) = \
390 self._read_write_enabler_and_nodeid(f)
392 # avoid a timing attack
393 #if write_enabler != real_write_enabler:
394 if not constant_time_compare(write_enabler, real_write_enabler):
395 # accomodate share migration by reporting the nodeid used for the
397 self.log(format="bad write enabler on SI %(si)s,"
398 " recorded by nodeid %(nodeid)s",
399 facility="tahoe.storage",
400 level=log.WEIRD, umid="cE1eBQ",
401 si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
402 msg = "The write enabler was recorded by nodeid '%s'." % \
403 (idlib.nodeid_b2a(write_enabler_nodeid),)
404 raise BadWriteEnablerError(msg)
406 def check_testv(self, testv):
408 f = open(self.home, 'rb+')
409 for (offset, length, operator, specimen) in testv:
410 data = self._read_share_data(f, offset, length)
411 if not testv_compare(data, operator, specimen):
417 def writev(self, datav, new_length):
418 f = open(self.home, 'rb+')
419 for (offset, data) in datav:
420 self._write_share_data(f, offset, data)
421 if new_length is not None:
422 cur_length = self._read_data_length(f)
423 if new_length < cur_length:
424 self._write_data_length(f, new_length)
425 # TODO: if we're going to shrink the share file when the
426 # share data has shrunk, then call
427 # self._change_container_size() here.
430 def testv_compare(a, op, b):
431 assert op in ("lt", "le", "eq", "ne", "ge", "gt")
448 def check_testv(self, testv):
450 for (offset, length, operator, specimen) in testv:
452 if not testv_compare(data, operator, specimen):
457 def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
458 ms = MutableShareFile(filename, parent)
459 ms.create(my_nodeid, write_enabler)
461 return MutableShareFile(filename, parent)