1 import os, stat, struct
3 from allmydata.interfaces import BadWriteEnablerError
4 from allmydata.util import idlib, log
5 from allmydata.util.assertutil import precondition
6 from allmydata.util.hashutil import constant_time_compare
7 from allmydata.storage.lease import LeaseInfo
8 from allmydata.storage.common import UnknownMutableContainerVersionError, \
11 # the MutableShareFile is like the ShareFile, but used for mutable data. It
12 # has a different layout. See docs/mutable.txt for more details.
15 # 1 0 32 magic verstr "tahoe mutable container v1" plus binary
16 # 2 32 20 write enabler's nodeid
17 # 3 52 32 write enabler
18 # 4 84 8 data size (actual share data present) (a)
19 # 5 92 8 offset of (8) count of extra leases (after data)
20 # 6 100 368 four leases, 92 bytes each
21 # 0 4 ownerid (0 means "no lease here")
22 # 4 4 expiration timestamp
25 # 72 20 nodeid which accepted the tokens
27 # 8 ?? 4 count of extra leases
28 # 9 ?? n*92 extra leases
31 assert struct.calcsize("L"), 4 # The struct module doc says that L's are 4 bytes in size.
32 assert struct.calcsize("Q"), 8 # The struct module doc says that Q's are 8 bytes in size (at least with big-endian ordering).
34 class MutableShareFile:
37 DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
38 EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
39 HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
40 LEASE_SIZE = struct.calcsize(">LL32s32s20s")
41 assert LEASE_SIZE == 92
42 DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
43 assert DATA_OFFSET == 468, DATA_OFFSET
44 # our sharefiles share with a recognizable string, plus some random
45 # binary data to reduce the chance that a regular text file will look
47 MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
48 assert len(MAGIC) == 32
49 MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
50 # TODO: decide upon a policy for max share size
52 def __init__(self, filename, parent=None):
54 if os.path.exists(self.home):
55 # we don't cache anything, just check the magic
56 f = open(self.home, 'rb')
57 data = f.read(self.HEADER_SIZE)
59 write_enabler_nodeid, write_enabler,
60 data_length, extra_least_offset) = \
61 struct.unpack(">32s20s32sQQ", data)
62 if magic != self.MAGIC:
63 msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
64 (filename, magic, self.MAGIC)
65 raise UnknownMutableContainerVersionError(msg)
66 self.parent = parent # for logging
68 def log(self, *args, **kwargs):
69 return self.parent.log(*args, **kwargs)
71 def create(self, my_nodeid, write_enabler):
72 assert not os.path.exists(self.home)
74 extra_lease_offset = (self.HEADER_SIZE
77 assert extra_lease_offset == self.DATA_OFFSET # true at creation
79 f = open(self.home, 'wb')
80 header = struct.pack(">32s20s32sQQ",
81 self.MAGIC, my_nodeid, write_enabler,
82 data_length, extra_lease_offset,
84 leases = ("\x00"*self.LEASE_SIZE) * 4
85 f.write(header + leases)
86 # data goes here, empty after creation
87 f.write(struct.pack(">L", num_extra_leases))
88 # extra leases go here, none at creation
94 def _read_data_length(self, f):
95 f.seek(self.DATA_LENGTH_OFFSET)
96 (data_length,) = struct.unpack(">Q", f.read(8))
99 def _write_data_length(self, f, data_length):
100 f.seek(self.DATA_LENGTH_OFFSET)
101 f.write(struct.pack(">Q", data_length))
103 def _read_share_data(self, f, offset, length):
104 precondition(offset >= 0)
105 data_length = self._read_data_length(f)
106 if offset+length > data_length:
107 # reads beyond the end of the data are truncated. Reads that
108 # start beyond the end of the data return an empty string.
109 length = max(0, data_length-offset)
112 precondition(offset+length <= data_length)
113 f.seek(self.DATA_OFFSET+offset)
114 data = f.read(length)
117 def _read_extra_lease_offset(self, f):
118 f.seek(self.EXTRA_LEASE_OFFSET)
119 (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
120 return extra_lease_offset
122 def _write_extra_lease_offset(self, f, offset):
123 f.seek(self.EXTRA_LEASE_OFFSET)
124 f.write(struct.pack(">Q", offset))
126 def _read_num_extra_leases(self, f):
127 offset = self._read_extra_lease_offset(f)
129 (num_extra_leases,) = struct.unpack(">L", f.read(4))
130 return num_extra_leases
132 def _write_num_extra_leases(self, f, num_leases):
133 extra_lease_offset = self._read_extra_lease_offset(f)
134 f.seek(extra_lease_offset)
135 f.write(struct.pack(">L", num_leases))
137 def _change_container_size(self, f, new_container_size):
138 if new_container_size > self.MAX_SIZE:
139 raise DataTooLargeError()
140 old_extra_lease_offset = self._read_extra_lease_offset(f)
141 new_extra_lease_offset = self.DATA_OFFSET + new_container_size
142 if new_extra_lease_offset < old_extra_lease_offset:
143 # TODO: allow containers to shrink. For now they remain large.
145 num_extra_leases = self._read_num_extra_leases(f)
146 f.seek(old_extra_lease_offset)
147 extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
148 f.seek(new_extra_lease_offset)
149 f.write(extra_lease_data)
150 # an interrupt here will corrupt the leases, iff the move caused the
151 # extra leases to overlap.
152 self._write_extra_lease_offset(f, new_extra_lease_offset)
154 def _write_share_data(self, f, offset, data):
156 precondition(offset >= 0)
157 data_length = self._read_data_length(f)
158 extra_lease_offset = self._read_extra_lease_offset(f)
160 if offset+length >= data_length:
161 # They are expanding their data size.
162 if self.DATA_OFFSET+offset+length > extra_lease_offset:
163 # Their new data won't fit in the current container, so we
164 # have to move the leases. With luck, they're expanding it
165 # more than the size of the extra lease block, which will
166 # minimize the corrupt-the-share window
167 self._change_container_size(f, offset+length)
168 extra_lease_offset = self._read_extra_lease_offset(f)
170 # an interrupt here is ok.. the container has been enlarged
171 # but the data remains untouched
173 assert self.DATA_OFFSET+offset+length <= extra_lease_offset
174 # Their data now fits in the current container. We must write
175 # their new data and modify the recorded data size.
176 new_data_length = offset+length
177 self._write_data_length(f, new_data_length)
178 # an interrupt here will result in a corrupted share
180 # now all that's left to do is write out their data
181 f.seek(self.DATA_OFFSET+offset)
185 def _write_lease_record(self, f, lease_number, lease_info):
186 extra_lease_offset = self._read_extra_lease_offset(f)
187 num_extra_leases = self._read_num_extra_leases(f)
189 offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
190 elif (lease_number-4) < num_extra_leases:
191 offset = (extra_lease_offset
193 + (lease_number-4)*self.LEASE_SIZE)
195 # must add an extra lease record
196 self._write_num_extra_leases(f, num_extra_leases+1)
197 offset = (extra_lease_offset
199 + (lease_number-4)*self.LEASE_SIZE)
201 assert f.tell() == offset
202 f.write(lease_info.to_mutable_data())
204 def _read_lease_record(self, f, lease_number):
205 # returns a LeaseInfo instance, or None
206 extra_lease_offset = self._read_extra_lease_offset(f)
207 num_extra_leases = self._read_num_extra_leases(f)
209 offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
210 elif (lease_number-4) < num_extra_leases:
211 offset = (extra_lease_offset
213 + (lease_number-4)*self.LEASE_SIZE)
215 raise IndexError("No such lease number %d" % lease_number)
217 assert f.tell() == offset
218 data = f.read(self.LEASE_SIZE)
219 lease_info = LeaseInfo().from_mutable_data(data)
220 if lease_info.owner_num == 0:
224 def _get_num_lease_slots(self, f):
225 # how many places do we have allocated for leases? Not all of them
227 num_extra_leases = self._read_num_extra_leases(f)
228 return 4+num_extra_leases
230 def _get_first_empty_lease_slot(self, f):
231 # return an int with the index of an empty slot, or None if we do not
232 # currently have an empty slot
234 for i in range(self._get_num_lease_slots(f)):
235 if self._read_lease_record(f, i) is None:
239 def get_leases(self):
240 """Yields a LeaseInfo instance for all leases."""
241 f = open(self.home, 'rb')
242 for i, lease in self._enumerate_leases(f):
246 def _enumerate_leases(self, f):
247 for i in range(self._get_num_lease_slots(f)):
249 data = self._read_lease_record(f, i)
255 def add_lease(self, lease_info):
256 precondition(lease_info.owner_num != 0) # 0 means "no lease here"
257 f = open(self.home, 'rb+')
258 num_lease_slots = self._get_num_lease_slots(f)
259 empty_slot = self._get_first_empty_lease_slot(f)
260 if empty_slot is not None:
261 self._write_lease_record(f, empty_slot, lease_info)
263 self._write_lease_record(f, num_lease_slots, lease_info)
266 def renew_lease(self, renew_secret, new_expire_time):
267 accepting_nodeids = set()
268 f = open(self.home, 'rb+')
269 for (leasenum,lease) in self._enumerate_leases(f):
270 if constant_time_compare(lease.renew_secret, renew_secret):
271 # yup. See if we need to update the owner time.
272 if new_expire_time > lease.expiration_time:
274 lease.expiration_time = new_expire_time
275 self._write_lease_record(f, leasenum, lease)
278 accepting_nodeids.add(lease.nodeid)
280 # Return the accepting_nodeids set, to give the client a chance to
281 # update the leases on a share which has been migrated from its
282 # original server to a new one.
283 msg = ("Unable to renew non-existent lease. I have leases accepted by"
285 msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
286 for anid in accepting_nodeids])
288 raise IndexError(msg)
290 def add_or_renew_lease(self, lease_info):
291 precondition(lease_info.owner_num != 0) # 0 means "no lease here"
293 self.renew_lease(lease_info.renew_secret,
294 lease_info.expiration_time)
296 self.add_lease(lease_info)
298 def cancel_lease(self, cancel_secret):
299 """Remove any leases with the given cancel_secret. If the last lease
300 is cancelled, the file will be removed. Return the number of bytes
301 that were freed (by truncating the list of leases, and possibly by
302 deleting the file. Raise IndexError if there was no lease with the
303 given cancel_secret."""
305 accepting_nodeids = set()
308 blank_lease = LeaseInfo(owner_num=0,
309 renew_secret="\x00"*32,
310 cancel_secret="\x00"*32,
313 f = open(self.home, 'rb+')
314 for (leasenum,lease) in self._enumerate_leases(f):
315 accepting_nodeids.add(lease.nodeid)
316 if constant_time_compare(lease.cancel_secret, cancel_secret):
317 self._write_lease_record(f, leasenum, blank_lease)
322 freed_space = self._pack_leases(f)
325 freed_space += os.stat(self.home)[stat.ST_SIZE]
329 msg = ("Unable to cancel non-existent lease. I have leases "
330 "accepted by nodeids: ")
331 msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
332 for anid in accepting_nodeids])
334 raise IndexError(msg)
336 def _pack_leases(self, f):
337 # TODO: reclaim space from cancelled leases
340 def _read_write_enabler_and_nodeid(self, f):
342 data = f.read(self.HEADER_SIZE)
344 write_enabler_nodeid, write_enabler,
345 data_length, extra_least_offset) = \
346 struct.unpack(">32s20s32sQQ", data)
347 assert magic == self.MAGIC
348 return (write_enabler, write_enabler_nodeid)
350 def readv(self, readv):
352 f = open(self.home, 'rb')
353 for (offset, length) in readv:
354 datav.append(self._read_share_data(f, offset, length))
358 # def remote_get_length(self):
359 # f = open(self.home, 'rb')
360 # data_length = self._read_data_length(f)
364 def check_write_enabler(self, write_enabler, si_s):
365 f = open(self.home, 'rb+')
366 (real_write_enabler, write_enabler_nodeid) = \
367 self._read_write_enabler_and_nodeid(f)
369 # avoid a timing attack
370 #if write_enabler != real_write_enabler:
371 if not constant_time_compare(write_enabler, real_write_enabler):
372 # accomodate share migration by reporting the nodeid used for the
374 self.log(format="bad write enabler on SI %(si)s,"
375 " recorded by nodeid %(nodeid)s",
376 facility="tahoe.storage",
377 level=log.WEIRD, umid="cE1eBQ",
378 si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
379 msg = "The write enabler was recorded by nodeid '%s'." % \
380 (idlib.nodeid_b2a(write_enabler_nodeid),)
381 raise BadWriteEnablerError(msg)
383 def check_testv(self, testv):
385 f = open(self.home, 'rb+')
386 for (offset, length, operator, specimen) in testv:
387 data = self._read_share_data(f, offset, length)
388 if not testv_compare(data, operator, specimen):
394 def writev(self, datav, new_length):
395 f = open(self.home, 'rb+')
396 for (offset, data) in datav:
397 self._write_share_data(f, offset, data)
398 if new_length is not None:
399 self._change_container_size(f, new_length)
400 f.seek(self.DATA_LENGTH_OFFSET)
401 f.write(struct.pack(">Q", new_length))
404 def testv_compare(a, op, b):
405 assert op in ("lt", "le", "eq", "ne", "ge", "gt")
422 def check_testv(self, testv):
424 for (offset, length, operator, specimen) in testv:
426 if not testv_compare(data, operator, specimen):
431 def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
432 ms = MutableShareFile(filename, parent)
433 ms.create(my_nodeid, write_enabler)
435 return MutableShareFile(filename, parent)