1 import os, stat, struct
3 from allmydata.interfaces import BadWriteEnablerError
4 from allmydata.util import idlib, log
5 from allmydata.util.assertutil import precondition
6 from allmydata.util.hashutil import constant_time_compare
7 from allmydata.storage.lease import LeaseInfo
8 from allmydata.storage.common import UnknownMutableContainerVersionError, \
11 # the MutableShareFile is like the ShareFile, but used for mutable data. It
12 # has a different layout. See docs/mutable.txt for more details.
15 # 1 0 32 magic verstr "tahoe mutable container v1" plus binary
16 # 2 32 20 write enabler's nodeid
17 # 3 52 32 write enabler
18 # 4 84 8 data size (actual share data present) (a)
19 # 5 92 8 offset of (8) count of extra leases (after data)
20 # 6 100 368 four leases, 92 bytes each
21 # 0 4 ownerid (0 means "no lease here")
22 # 4 4 expiration timestamp
25 # 72 20 nodeid which accepted the tokens
27 # 8 ?? 4 count of extra leases
28 # 9 ?? n*92 extra leases
31 # The struct module doc says that L's are 4 bytes in size., and that Q's are
32 # 8 bytes in size. Since compatibility depends upon this, double-check it.
33 assert struct.calcsize(">L") == 4, struct.calcsize(">L")
34 assert struct.calcsize(">Q") == 8, struct.calcsize(">Q")
36 class MutableShareFile:
39 DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
40 EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
41 HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
42 LEASE_SIZE = struct.calcsize(">LL32s32s20s")
43 assert LEASE_SIZE == 92
44 DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
45 assert DATA_OFFSET == 468, DATA_OFFSET
46 # our sharefiles share with a recognizable string, plus some random
47 # binary data to reduce the chance that a regular text file will look
49 MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
50 assert len(MAGIC) == 32
51 MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
52 # TODO: decide upon a policy for max share size
54 def __init__(self, filename, parent=None):
56 if os.path.exists(self.home):
57 # we don't cache anything, just check the magic
58 f = open(self.home, 'rb')
59 data = f.read(self.HEADER_SIZE)
61 write_enabler_nodeid, write_enabler,
62 data_length, extra_least_offset) = \
63 struct.unpack(">32s20s32sQQ", data)
64 if magic != self.MAGIC:
65 msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
66 (filename, magic, self.MAGIC)
67 raise UnknownMutableContainerVersionError(msg)
68 self.parent = parent # for logging
70 def log(self, *args, **kwargs):
71 return self.parent.log(*args, **kwargs)
73 def create(self, my_nodeid, write_enabler):
74 assert not os.path.exists(self.home)
76 extra_lease_offset = (self.HEADER_SIZE
79 assert extra_lease_offset == self.DATA_OFFSET # true at creation
81 f = open(self.home, 'wb')
82 header = struct.pack(">32s20s32sQQ",
83 self.MAGIC, my_nodeid, write_enabler,
84 data_length, extra_lease_offset,
86 leases = ("\x00"*self.LEASE_SIZE) * 4
87 f.write(header + leases)
88 # data goes here, empty after creation
89 f.write(struct.pack(">L", num_extra_leases))
90 # extra leases go here, none at creation
96 def _read_data_length(self, f):
97 f.seek(self.DATA_LENGTH_OFFSET)
98 (data_length,) = struct.unpack(">Q", f.read(8))
101 def _write_data_length(self, f, data_length):
102 f.seek(self.DATA_LENGTH_OFFSET)
103 f.write(struct.pack(">Q", data_length))
105 def _read_share_data(self, f, offset, length):
106 precondition(offset >= 0)
107 data_length = self._read_data_length(f)
108 if offset+length > data_length:
109 # reads beyond the end of the data are truncated. Reads that
110 # start beyond the end of the data return an empty string.
111 length = max(0, data_length-offset)
114 precondition(offset+length <= data_length)
115 f.seek(self.DATA_OFFSET+offset)
116 data = f.read(length)
119 def _read_extra_lease_offset(self, f):
120 f.seek(self.EXTRA_LEASE_OFFSET)
121 (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
122 return extra_lease_offset
124 def _write_extra_lease_offset(self, f, offset):
125 f.seek(self.EXTRA_LEASE_OFFSET)
126 f.write(struct.pack(">Q", offset))
128 def _read_num_extra_leases(self, f):
129 offset = self._read_extra_lease_offset(f)
131 (num_extra_leases,) = struct.unpack(">L", f.read(4))
132 return num_extra_leases
134 def _write_num_extra_leases(self, f, num_leases):
135 extra_lease_offset = self._read_extra_lease_offset(f)
136 f.seek(extra_lease_offset)
137 f.write(struct.pack(">L", num_leases))
139 def _change_container_size(self, f, new_container_size):
140 if new_container_size > self.MAX_SIZE:
141 raise DataTooLargeError()
142 old_extra_lease_offset = self._read_extra_lease_offset(f)
143 new_extra_lease_offset = self.DATA_OFFSET + new_container_size
144 if new_extra_lease_offset < old_extra_lease_offset:
145 # TODO: allow containers to shrink. For now they remain large.
147 num_extra_leases = self._read_num_extra_leases(f)
148 f.seek(old_extra_lease_offset)
149 leases_size = 4 + num_extra_leases * self.LEASE_SIZE
150 extra_lease_data = f.read(leases_size)
152 # Zero out the old lease info (in order to minimize the chance that
153 # it could accidentally be exposed to a reader later, re #1528).
154 f.seek(old_extra_lease_offset)
155 f.write('\x00' * leases_size)
158 # An interrupt here will corrupt the leases.
160 f.seek(new_extra_lease_offset)
161 f.write(extra_lease_data)
162 self._write_extra_lease_offset(f, new_extra_lease_offset)
164 def _write_share_data(self, f, offset, data):
166 precondition(offset >= 0)
167 data_length = self._read_data_length(f)
168 extra_lease_offset = self._read_extra_lease_offset(f)
170 if offset+length >= data_length:
171 # They are expanding their data size.
173 if self.DATA_OFFSET+offset+length > extra_lease_offset:
174 # TODO: allow containers to shrink. For now, they remain
177 # Their new data won't fit in the current container, so we
178 # have to move the leases. With luck, they're expanding it
179 # more than the size of the extra lease block, which will
180 # minimize the corrupt-the-share window
181 self._change_container_size(f, offset+length)
182 extra_lease_offset = self._read_extra_lease_offset(f)
184 # an interrupt here is ok.. the container has been enlarged
185 # but the data remains untouched
187 assert self.DATA_OFFSET+offset+length <= extra_lease_offset
188 # Their data now fits in the current container. We must write
189 # their new data and modify the recorded data size.
191 # Fill any newly exposed empty space with 0's.
192 if offset > data_length:
193 f.seek(self.DATA_OFFSET+data_length)
194 f.write('\x00'*(offset - data_length))
197 new_data_length = offset+length
198 self._write_data_length(f, new_data_length)
199 # an interrupt here will result in a corrupted share
201 # now all that's left to do is write out their data
202 f.seek(self.DATA_OFFSET+offset)
206 def _write_lease_record(self, f, lease_number, lease_info):
207 extra_lease_offset = self._read_extra_lease_offset(f)
208 num_extra_leases = self._read_num_extra_leases(f)
210 offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
211 elif (lease_number-4) < num_extra_leases:
212 offset = (extra_lease_offset
214 + (lease_number-4)*self.LEASE_SIZE)
216 # must add an extra lease record
217 self._write_num_extra_leases(f, num_extra_leases+1)
218 offset = (extra_lease_offset
220 + (lease_number-4)*self.LEASE_SIZE)
222 assert f.tell() == offset
223 f.write(lease_info.to_mutable_data())
225 def _read_lease_record(self, f, lease_number):
226 # returns a LeaseInfo instance, or None
227 extra_lease_offset = self._read_extra_lease_offset(f)
228 num_extra_leases = self._read_num_extra_leases(f)
230 offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
231 elif (lease_number-4) < num_extra_leases:
232 offset = (extra_lease_offset
234 + (lease_number-4)*self.LEASE_SIZE)
236 raise IndexError("No such lease number %d" % lease_number)
238 assert f.tell() == offset
239 data = f.read(self.LEASE_SIZE)
240 lease_info = LeaseInfo().from_mutable_data(data)
241 if lease_info.owner_num == 0:
245 def _get_num_lease_slots(self, f):
246 # how many places do we have allocated for leases? Not all of them
248 num_extra_leases = self._read_num_extra_leases(f)
249 return 4+num_extra_leases
251 def _get_first_empty_lease_slot(self, f):
252 # return an int with the index of an empty slot, or None if we do not
253 # currently have an empty slot
255 for i in range(self._get_num_lease_slots(f)):
256 if self._read_lease_record(f, i) is None:
260 def get_leases(self):
261 """Yields a LeaseInfo instance for all leases."""
262 f = open(self.home, 'rb')
263 for i, lease in self._enumerate_leases(f):
267 def _enumerate_leases(self, f):
268 for i in range(self._get_num_lease_slots(f)):
270 data = self._read_lease_record(f, i)
276 def add_lease(self, lease_info):
277 precondition(lease_info.owner_num != 0) # 0 means "no lease here"
278 f = open(self.home, 'rb+')
279 num_lease_slots = self._get_num_lease_slots(f)
280 empty_slot = self._get_first_empty_lease_slot(f)
281 if empty_slot is not None:
282 self._write_lease_record(f, empty_slot, lease_info)
284 self._write_lease_record(f, num_lease_slots, lease_info)
287 def renew_lease(self, renew_secret, new_expire_time):
288 accepting_nodeids = set()
289 f = open(self.home, 'rb+')
290 for (leasenum,lease) in self._enumerate_leases(f):
291 if constant_time_compare(lease.renew_secret, renew_secret):
292 # yup. See if we need to update the owner time.
293 if new_expire_time > lease.expiration_time:
295 lease.expiration_time = new_expire_time
296 self._write_lease_record(f, leasenum, lease)
299 accepting_nodeids.add(lease.nodeid)
301 # Return the accepting_nodeids set, to give the client a chance to
302 # update the leases on a share which has been migrated from its
303 # original server to a new one.
304 msg = ("Unable to renew non-existent lease. I have leases accepted by"
306 msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
307 for anid in accepting_nodeids])
309 raise IndexError(msg)
311 def add_or_renew_lease(self, lease_info):
312 precondition(lease_info.owner_num != 0) # 0 means "no lease here"
314 self.renew_lease(lease_info.renew_secret,
315 lease_info.expiration_time)
317 self.add_lease(lease_info)
319 def cancel_lease(self, cancel_secret):
320 """Remove any leases with the given cancel_secret. If the last lease
321 is cancelled, the file will be removed. Return the number of bytes
322 that were freed (by truncating the list of leases, and possibly by
323 deleting the file. Raise IndexError if there was no lease with the
324 given cancel_secret."""
326 accepting_nodeids = set()
329 blank_lease = LeaseInfo(owner_num=0,
330 renew_secret="\x00"*32,
331 cancel_secret="\x00"*32,
334 f = open(self.home, 'rb+')
335 for (leasenum,lease) in self._enumerate_leases(f):
336 accepting_nodeids.add(lease.nodeid)
337 if constant_time_compare(lease.cancel_secret, cancel_secret):
338 self._write_lease_record(f, leasenum, blank_lease)
343 freed_space = self._pack_leases(f)
346 freed_space += os.stat(self.home)[stat.ST_SIZE]
350 msg = ("Unable to cancel non-existent lease. I have leases "
351 "accepted by nodeids: ")
352 msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
353 for anid in accepting_nodeids])
355 raise IndexError(msg)
357 def _pack_leases(self, f):
358 # TODO: reclaim space from cancelled leases
361 def _read_write_enabler_and_nodeid(self, f):
363 data = f.read(self.HEADER_SIZE)
365 write_enabler_nodeid, write_enabler,
366 data_length, extra_least_offset) = \
367 struct.unpack(">32s20s32sQQ", data)
368 assert magic == self.MAGIC
369 return (write_enabler, write_enabler_nodeid)
371 def readv(self, readv):
373 f = open(self.home, 'rb')
374 for (offset, length) in readv:
375 datav.append(self._read_share_data(f, offset, length))
379 # def remote_get_length(self):
380 # f = open(self.home, 'rb')
381 # data_length = self._read_data_length(f)
385 def check_write_enabler(self, write_enabler, si_s):
386 f = open(self.home, 'rb+')
387 (real_write_enabler, write_enabler_nodeid) = \
388 self._read_write_enabler_and_nodeid(f)
390 # avoid a timing attack
391 #if write_enabler != real_write_enabler:
392 if not constant_time_compare(write_enabler, real_write_enabler):
393 # accomodate share migration by reporting the nodeid used for the
395 self.log(format="bad write enabler on SI %(si)s,"
396 " recorded by nodeid %(nodeid)s",
397 facility="tahoe.storage",
398 level=log.WEIRD, umid="cE1eBQ",
399 si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
400 msg = "The write enabler was recorded by nodeid '%s'." % \
401 (idlib.nodeid_b2a(write_enabler_nodeid),)
402 raise BadWriteEnablerError(msg)
404 def check_testv(self, testv):
406 f = open(self.home, 'rb+')
407 for (offset, length, operator, specimen) in testv:
408 data = self._read_share_data(f, offset, length)
409 if not testv_compare(data, operator, specimen):
415 def writev(self, datav, new_length):
416 f = open(self.home, 'rb+')
417 for (offset, data) in datav:
418 self._write_share_data(f, offset, data)
419 if new_length is not None:
420 cur_length = self._read_data_length(f)
421 if new_length < cur_length:
422 self._write_data_length(f, new_length)
423 # TODO: if we're going to shrink the share file when the
424 # share data has shrunk, then call
425 # self._change_container_size() here.
428 def testv_compare(a, op, b):
429 assert op in ("lt", "le", "eq", "ne", "ge", "gt")
446 def check_testv(self, testv):
448 for (offset, length, operator, specimen) in testv:
450 if not testv_compare(data, operator, specimen):
455 def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
456 ms = MutableShareFile(filename, parent)
457 ms.create(my_nodeid, write_enabler)
459 return MutableShareFile(filename, parent)