1 import os, stat, struct
3 from allmydata.interfaces import BadWriteEnablerError
4 from allmydata.util import idlib, log
5 from allmydata.util.assertutil import precondition
6 from allmydata.util.hashutil import constant_time_compare
7 from allmydata.storage.lease import LeaseInfo
8 from allmydata.storage.common import UnknownMutableContainerVersionError, \
11 # the MutableShareFile is like the ShareFile, but used for mutable data. It
12 # has a different layout. See docs/mutable.txt for more details.
15 # 1 0 32 magic verstr "tahoe mutable container v1" plus binary
16 # 2 32 20 write enabler's nodeid
17 # 3 52 32 write enabler
18 # 4 84 8 data size (actual share data present) (a)
19 # 5 92 8 offset of (8) count of extra leases (after data)
20 # 6 100 368 four leases, 92 bytes each
21 # 0 4 ownerid (0 means "no lease here")
22 # 4 4 expiration timestamp
25 # 72 20 nodeid which accepted the tokens
27 # 8 ?? 4 count of extra leases
28 # 9 ?? n*92 extra leases
31 # The struct module doc says that L's are 4 bytes in size., and that Q's are
32 # 8 bytes in size. Since compatibility depends upon this, double-check it.
33 assert struct.calcsize(">L") == 4, struct.calcsize(">L")
34 assert struct.calcsize(">Q") == 8, struct.calcsize(">Q")
36 class MutableShareFile:
39 DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
40 EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
41 HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
42 LEASE_SIZE = struct.calcsize(">LL32s32s20s")
43 assert LEASE_SIZE == 92
44 DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
45 assert DATA_OFFSET == 468, DATA_OFFSET
46 # our sharefiles share with a recognizable string, plus some random
47 # binary data to reduce the chance that a regular text file will look
49 MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
50 assert len(MAGIC) == 32
51 MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
52 # TODO: decide upon a policy for max share size
54 def __init__(self, filename, parent=None):
56 if os.path.exists(self.home):
57 # we don't cache anything, just check the magic
58 f = open(self.home, 'rb')
59 data = f.read(self.HEADER_SIZE)
61 write_enabler_nodeid, write_enabler,
62 data_length, extra_least_offset) = \
63 struct.unpack(">32s20s32sQQ", data)
64 if magic != self.MAGIC:
65 msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
66 (filename, magic, self.MAGIC)
67 raise UnknownMutableContainerVersionError(msg)
68 self.parent = parent # for logging
70 def log(self, *args, **kwargs):
71 return self.parent.log(*args, **kwargs)
73 def create(self, my_nodeid, write_enabler):
74 assert not os.path.exists(self.home)
76 extra_lease_offset = (self.HEADER_SIZE
79 assert extra_lease_offset == self.DATA_OFFSET # true at creation
81 f = open(self.home, 'wb')
82 header = struct.pack(">32s20s32sQQ",
83 self.MAGIC, my_nodeid, write_enabler,
84 data_length, extra_lease_offset,
86 leases = ("\x00"*self.LEASE_SIZE) * 4
87 f.write(header + leases)
88 # data goes here, empty after creation
89 f.write(struct.pack(">L", num_extra_leases))
90 # extra leases go here, none at creation
96 def _read_data_length(self, f):
97 f.seek(self.DATA_LENGTH_OFFSET)
98 (data_length,) = struct.unpack(">Q", f.read(8))
101 def _write_data_length(self, f, data_length):
102 f.seek(self.DATA_LENGTH_OFFSET)
103 f.write(struct.pack(">Q", data_length))
105 def _read_share_data(self, f, offset, length):
106 precondition(offset >= 0)
107 data_length = self._read_data_length(f)
108 if offset+length > data_length:
109 # reads beyond the end of the data are truncated. Reads that
110 # start beyond the end of the data return an empty string.
111 length = max(0, data_length-offset)
114 precondition(offset+length <= data_length)
115 f.seek(self.DATA_OFFSET+offset)
116 data = f.read(length)
119 def _read_extra_lease_offset(self, f):
120 f.seek(self.EXTRA_LEASE_OFFSET)
121 (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
122 return extra_lease_offset
124 def _write_extra_lease_offset(self, f, offset):
125 f.seek(self.EXTRA_LEASE_OFFSET)
126 f.write(struct.pack(">Q", offset))
128 def _read_num_extra_leases(self, f):
129 offset = self._read_extra_lease_offset(f)
131 (num_extra_leases,) = struct.unpack(">L", f.read(4))
132 return num_extra_leases
134 def _write_num_extra_leases(self, f, num_leases):
135 extra_lease_offset = self._read_extra_lease_offset(f)
136 f.seek(extra_lease_offset)
137 f.write(struct.pack(">L", num_leases))
139 def _change_container_size(self, f, new_container_size):
140 if new_container_size > self.MAX_SIZE:
141 raise DataTooLargeError()
142 old_extra_lease_offset = self._read_extra_lease_offset(f)
143 new_extra_lease_offset = self.DATA_OFFSET + new_container_size
144 if new_extra_lease_offset < old_extra_lease_offset:
145 # TODO: allow containers to shrink. For now they remain large.
147 num_extra_leases = self._read_num_extra_leases(f)
148 f.seek(old_extra_lease_offset)
149 extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
150 f.seek(new_extra_lease_offset)
151 f.write(extra_lease_data)
152 # an interrupt here will corrupt the leases, iff the move caused the
153 # extra leases to overlap.
154 self._write_extra_lease_offset(f, new_extra_lease_offset)
156 def _write_share_data(self, f, offset, data):
158 precondition(offset >= 0)
159 data_length = self._read_data_length(f)
160 extra_lease_offset = self._read_extra_lease_offset(f)
162 if offset+length >= data_length:
163 # They are expanding their data size.
164 if self.DATA_OFFSET+offset+length > extra_lease_offset:
165 # Their new data won't fit in the current container, so we
166 # have to move the leases. With luck, they're expanding it
167 # more than the size of the extra lease block, which will
168 # minimize the corrupt-the-share window
169 self._change_container_size(f, offset+length)
170 extra_lease_offset = self._read_extra_lease_offset(f)
172 # an interrupt here is ok.. the container has been enlarged
173 # but the data remains untouched
175 assert self.DATA_OFFSET+offset+length <= extra_lease_offset
176 # Their data now fits in the current container. We must write
177 # their new data and modify the recorded data size.
178 new_data_length = offset+length
179 self._write_data_length(f, new_data_length)
180 # an interrupt here will result in a corrupted share
182 # now all that's left to do is write out their data
183 f.seek(self.DATA_OFFSET+offset)
187 def _write_lease_record(self, f, lease_number, lease_info):
188 extra_lease_offset = self._read_extra_lease_offset(f)
189 num_extra_leases = self._read_num_extra_leases(f)
191 offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
192 elif (lease_number-4) < num_extra_leases:
193 offset = (extra_lease_offset
195 + (lease_number-4)*self.LEASE_SIZE)
197 # must add an extra lease record
198 self._write_num_extra_leases(f, num_extra_leases+1)
199 offset = (extra_lease_offset
201 + (lease_number-4)*self.LEASE_SIZE)
203 assert f.tell() == offset
204 f.write(lease_info.to_mutable_data())
206 def _read_lease_record(self, f, lease_number):
207 # returns a LeaseInfo instance, or None
208 extra_lease_offset = self._read_extra_lease_offset(f)
209 num_extra_leases = self._read_num_extra_leases(f)
211 offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
212 elif (lease_number-4) < num_extra_leases:
213 offset = (extra_lease_offset
215 + (lease_number-4)*self.LEASE_SIZE)
217 raise IndexError("No such lease number %d" % lease_number)
219 assert f.tell() == offset
220 data = f.read(self.LEASE_SIZE)
221 lease_info = LeaseInfo().from_mutable_data(data)
222 if lease_info.owner_num == 0:
226 def _get_num_lease_slots(self, f):
227 # how many places do we have allocated for leases? Not all of them
229 num_extra_leases = self._read_num_extra_leases(f)
230 return 4+num_extra_leases
232 def _get_first_empty_lease_slot(self, f):
233 # return an int with the index of an empty slot, or None if we do not
234 # currently have an empty slot
236 for i in range(self._get_num_lease_slots(f)):
237 if self._read_lease_record(f, i) is None:
241 def get_leases(self):
242 """Yields a LeaseInfo instance for all leases."""
243 f = open(self.home, 'rb')
244 for i, lease in self._enumerate_leases(f):
248 def _enumerate_leases(self, f):
249 for i in range(self._get_num_lease_slots(f)):
251 data = self._read_lease_record(f, i)
257 def add_lease(self, lease_info):
258 precondition(lease_info.owner_num != 0) # 0 means "no lease here"
259 f = open(self.home, 'rb+')
260 num_lease_slots = self._get_num_lease_slots(f)
261 empty_slot = self._get_first_empty_lease_slot(f)
262 if empty_slot is not None:
263 self._write_lease_record(f, empty_slot, lease_info)
265 self._write_lease_record(f, num_lease_slots, lease_info)
268 def renew_lease(self, renew_secret, new_expire_time):
269 accepting_nodeids = set()
270 f = open(self.home, 'rb+')
271 for (leasenum,lease) in self._enumerate_leases(f):
272 if constant_time_compare(lease.renew_secret, renew_secret):
273 # yup. See if we need to update the owner time.
274 if new_expire_time > lease.expiration_time:
276 lease.expiration_time = new_expire_time
277 self._write_lease_record(f, leasenum, lease)
280 accepting_nodeids.add(lease.nodeid)
282 # Return the accepting_nodeids set, to give the client a chance to
283 # update the leases on a share which has been migrated from its
284 # original server to a new one.
285 msg = ("Unable to renew non-existent lease. I have leases accepted by"
287 msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
288 for anid in accepting_nodeids])
290 raise IndexError(msg)
292 def add_or_renew_lease(self, lease_info):
293 precondition(lease_info.owner_num != 0) # 0 means "no lease here"
295 self.renew_lease(lease_info.renew_secret,
296 lease_info.expiration_time)
298 self.add_lease(lease_info)
300 def cancel_lease(self, cancel_secret):
301 """Remove any leases with the given cancel_secret. If the last lease
302 is cancelled, the file will be removed. Return the number of bytes
303 that were freed (by truncating the list of leases, and possibly by
304 deleting the file. Raise IndexError if there was no lease with the
305 given cancel_secret."""
307 accepting_nodeids = set()
310 blank_lease = LeaseInfo(owner_num=0,
311 renew_secret="\x00"*32,
312 cancel_secret="\x00"*32,
315 f = open(self.home, 'rb+')
316 for (leasenum,lease) in self._enumerate_leases(f):
317 accepting_nodeids.add(lease.nodeid)
318 if constant_time_compare(lease.cancel_secret, cancel_secret):
319 self._write_lease_record(f, leasenum, blank_lease)
324 freed_space = self._pack_leases(f)
327 freed_space += os.stat(self.home)[stat.ST_SIZE]
331 msg = ("Unable to cancel non-existent lease. I have leases "
332 "accepted by nodeids: ")
333 msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
334 for anid in accepting_nodeids])
336 raise IndexError(msg)
338 def _pack_leases(self, f):
339 # TODO: reclaim space from cancelled leases
342 def _read_write_enabler_and_nodeid(self, f):
344 data = f.read(self.HEADER_SIZE)
346 write_enabler_nodeid, write_enabler,
347 data_length, extra_least_offset) = \
348 struct.unpack(">32s20s32sQQ", data)
349 assert magic == self.MAGIC
350 return (write_enabler, write_enabler_nodeid)
352 def readv(self, readv):
354 f = open(self.home, 'rb')
355 for (offset, length) in readv:
356 datav.append(self._read_share_data(f, offset, length))
360 # def remote_get_length(self):
361 # f = open(self.home, 'rb')
362 # data_length = self._read_data_length(f)
366 def check_write_enabler(self, write_enabler, si_s):
367 f = open(self.home, 'rb+')
368 (real_write_enabler, write_enabler_nodeid) = \
369 self._read_write_enabler_and_nodeid(f)
371 # avoid a timing attack
372 #if write_enabler != real_write_enabler:
373 if not constant_time_compare(write_enabler, real_write_enabler):
374 # accomodate share migration by reporting the nodeid used for the
376 self.log(format="bad write enabler on SI %(si)s,"
377 " recorded by nodeid %(nodeid)s",
378 facility="tahoe.storage",
379 level=log.WEIRD, umid="cE1eBQ",
380 si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
381 msg = "The write enabler was recorded by nodeid '%s'." % \
382 (idlib.nodeid_b2a(write_enabler_nodeid),)
383 raise BadWriteEnablerError(msg)
385 def check_testv(self, testv):
387 f = open(self.home, 'rb+')
388 for (offset, length, operator, specimen) in testv:
389 data = self._read_share_data(f, offset, length)
390 if not testv_compare(data, operator, specimen):
396 def writev(self, datav, new_length):
397 f = open(self.home, 'rb+')
398 for (offset, data) in datav:
399 self._write_share_data(f, offset, data)
400 if new_length is not None:
401 self._change_container_size(f, new_length)
402 f.seek(self.DATA_LENGTH_OFFSET)
403 f.write(struct.pack(">Q", new_length))
406 def testv_compare(a, op, b):
407 assert op in ("lt", "le", "eq", "ne", "ge", "gt")
424 def check_testv(self, testv):
426 for (offset, length, operator, specimen) in testv:
428 if not testv_compare(data, operator, specimen):
433 def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
434 ms = MutableShareFile(filename, parent)
435 ms.create(my_nodeid, write_enabler)
437 return MutableShareFile(filename, parent)