1 import os, stat, struct
3 from allmydata.interfaces import BadWriteEnablerError
4 from allmydata.util import idlib, log
5 from allmydata.util.assertutil import precondition
6 from allmydata.storage.lease import LeaseInfo
7 from allmydata.storage.common import UnknownMutableContainerVersionError, \
10 # the MutableShareFile is like the ShareFile, but used for mutable data. It
11 # has a different layout. See docs/mutable.txt for more details.
14 # 1 0 32 magic verstr "tahoe mutable container v1" plus binary
15 # 2 32 20 write enabler's nodeid
16 # 3 52 32 write enabler
17 # 4 84 8 data size (actual share data present) (a)
18 # 5 92 8 offset of (8) count of extra leases (after data)
19 # 6 100 368 four leases, 92 bytes each
20 # 0 4 ownerid (0 means "no lease here")
21 # 4 4 expiration timestamp
24 # 72 20 nodeid which accepted the tokens
26 # 8 ?? 4 count of extra leases
27 # 9 ?? n*92 extra leases
30 assert struct.calcsize("L"), 4 # The struct module doc says that L's are 4 bytes in size.
31 assert struct.calcsize("Q"), 8 # The struct module doc says that Q's are 8 bytes in size (at least with big-endian ordering).
33 class MutableShareFile:
35 DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
36 EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
37 HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
38 LEASE_SIZE = struct.calcsize(">LL32s32s20s")
39 assert LEASE_SIZE == 92
40 DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
41 assert DATA_OFFSET == 468, DATA_OFFSET
42 # our sharefiles share with a recognizable string, plus some random
43 # binary data to reduce the chance that a regular text file will look
45 MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
46 assert len(MAGIC) == 32
47 MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
48 # TODO: decide upon a policy for max share size
50 def __init__(self, filename, parent=None):
52 if os.path.exists(self.home):
53 # we don't cache anything, just check the magic
54 f = open(self.home, 'rb')
55 data = f.read(self.HEADER_SIZE)
57 write_enabler_nodeid, write_enabler,
58 data_length, extra_least_offset) = \
59 struct.unpack(">32s20s32sQQ", data)
60 if magic != self.MAGIC:
61 msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
62 (filename, magic, self.MAGIC)
63 raise UnknownMutableContainerVersionError(msg)
64 self.parent = parent # for logging
66 def log(self, *args, **kwargs):
67 return self.parent.log(*args, **kwargs)
69 def create(self, my_nodeid, write_enabler):
70 assert not os.path.exists(self.home)
72 extra_lease_offset = (self.HEADER_SIZE
75 assert extra_lease_offset == self.DATA_OFFSET # true at creation
77 f = open(self.home, 'wb')
78 header = struct.pack(">32s20s32sQQ",
79 self.MAGIC, my_nodeid, write_enabler,
80 data_length, extra_lease_offset,
82 leases = ("\x00"*self.LEASE_SIZE) * 4
83 f.write(header + leases)
84 # data goes here, empty after creation
85 f.write(struct.pack(">L", num_extra_leases))
86 # extra leases go here, none at creation
92 def _read_data_length(self, f):
93 f.seek(self.DATA_LENGTH_OFFSET)
94 (data_length,) = struct.unpack(">Q", f.read(8))
97 def _write_data_length(self, f, data_length):
98 f.seek(self.DATA_LENGTH_OFFSET)
99 f.write(struct.pack(">Q", data_length))
101 def _read_share_data(self, f, offset, length):
102 precondition(offset >= 0)
103 data_length = self._read_data_length(f)
104 if offset+length > data_length:
105 # reads beyond the end of the data are truncated. Reads that
106 # start beyond the end of the data return an empty string.
107 length = max(0, data_length-offset)
110 precondition(offset+length <= data_length)
111 f.seek(self.DATA_OFFSET+offset)
112 data = f.read(length)
115 def _read_extra_lease_offset(self, f):
116 f.seek(self.EXTRA_LEASE_OFFSET)
117 (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
118 return extra_lease_offset
120 def _write_extra_lease_offset(self, f, offset):
121 f.seek(self.EXTRA_LEASE_OFFSET)
122 f.write(struct.pack(">Q", offset))
124 def _read_num_extra_leases(self, f):
125 offset = self._read_extra_lease_offset(f)
127 (num_extra_leases,) = struct.unpack(">L", f.read(4))
128 return num_extra_leases
130 def _write_num_extra_leases(self, f, num_leases):
131 extra_lease_offset = self._read_extra_lease_offset(f)
132 f.seek(extra_lease_offset)
133 f.write(struct.pack(">L", num_leases))
135 def _change_container_size(self, f, new_container_size):
136 if new_container_size > self.MAX_SIZE:
137 raise DataTooLargeError()
138 old_extra_lease_offset = self._read_extra_lease_offset(f)
139 new_extra_lease_offset = self.DATA_OFFSET + new_container_size
140 if new_extra_lease_offset < old_extra_lease_offset:
141 # TODO: allow containers to shrink. For now they remain large.
143 num_extra_leases = self._read_num_extra_leases(f)
144 f.seek(old_extra_lease_offset)
145 extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
146 f.seek(new_extra_lease_offset)
147 f.write(extra_lease_data)
148 # an interrupt here will corrupt the leases, iff the move caused the
149 # extra leases to overlap.
150 self._write_extra_lease_offset(f, new_extra_lease_offset)
152 def _write_share_data(self, f, offset, data):
154 precondition(offset >= 0)
155 data_length = self._read_data_length(f)
156 extra_lease_offset = self._read_extra_lease_offset(f)
158 if offset+length >= data_length:
159 # They are expanding their data size.
160 if self.DATA_OFFSET+offset+length > extra_lease_offset:
161 # Their new data won't fit in the current container, so we
162 # have to move the leases. With luck, they're expanding it
163 # more than the size of the extra lease block, which will
164 # minimize the corrupt-the-share window
165 self._change_container_size(f, offset+length)
166 extra_lease_offset = self._read_extra_lease_offset(f)
168 # an interrupt here is ok.. the container has been enlarged
169 # but the data remains untouched
171 assert self.DATA_OFFSET+offset+length <= extra_lease_offset
172 # Their data now fits in the current container. We must write
173 # their new data and modify the recorded data size.
174 new_data_length = offset+length
175 self._write_data_length(f, new_data_length)
176 # an interrupt here will result in a corrupted share
178 # now all that's left to do is write out their data
179 f.seek(self.DATA_OFFSET+offset)
183 def _write_lease_record(self, f, lease_number, lease_info):
184 extra_lease_offset = self._read_extra_lease_offset(f)
185 num_extra_leases = self._read_num_extra_leases(f)
187 offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
188 elif (lease_number-4) < num_extra_leases:
189 offset = (extra_lease_offset
191 + (lease_number-4)*self.LEASE_SIZE)
193 # must add an extra lease record
194 self._write_num_extra_leases(f, num_extra_leases+1)
195 offset = (extra_lease_offset
197 + (lease_number-4)*self.LEASE_SIZE)
199 assert f.tell() == offset
200 f.write(lease_info.to_mutable_data())
202 def _read_lease_record(self, f, lease_number):
203 # returns a LeaseInfo instance, or None
204 extra_lease_offset = self._read_extra_lease_offset(f)
205 num_extra_leases = self._read_num_extra_leases(f)
207 offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
208 elif (lease_number-4) < num_extra_leases:
209 offset = (extra_lease_offset
211 + (lease_number-4)*self.LEASE_SIZE)
213 raise IndexError("No such lease number %d" % lease_number)
215 assert f.tell() == offset
216 data = f.read(self.LEASE_SIZE)
217 lease_info = LeaseInfo().from_mutable_data(data)
218 if lease_info.owner_num == 0:
222 def _get_num_lease_slots(self, f):
223 # how many places do we have allocated for leases? Not all of them
225 num_extra_leases = self._read_num_extra_leases(f)
226 return 4+num_extra_leases
228 def _get_first_empty_lease_slot(self, f):
229 # return an int with the index of an empty slot, or None if we do not
230 # currently have an empty slot
232 for i in range(self._get_num_lease_slots(f)):
233 if self._read_lease_record(f, i) is None:
237 def get_leases(self):
238 """Yields a LeaseInfo instance for all leases."""
239 f = open(self.home, 'rb')
240 for i, lease in self._enumerate_leases(f):
244 def _enumerate_leases(self, f):
245 for i in range(self._get_num_lease_slots(f)):
247 data = self._read_lease_record(f, i)
253 def add_lease(self, lease_info):
254 precondition(lease_info.owner_num != 0) # 0 means "no lease here"
255 f = open(self.home, 'rb+')
256 num_lease_slots = self._get_num_lease_slots(f)
257 empty_slot = self._get_first_empty_lease_slot(f)
258 if empty_slot is not None:
259 self._write_lease_record(f, empty_slot, lease_info)
261 self._write_lease_record(f, num_lease_slots, lease_info)
264 def renew_lease(self, renew_secret, new_expire_time):
265 accepting_nodeids = set()
266 f = open(self.home, 'rb+')
267 for (leasenum,lease) in self._enumerate_leases(f):
268 if lease.renew_secret == renew_secret:
269 # yup. See if we need to update the owner time.
270 if new_expire_time > lease.expiration_time:
272 lease.expiration_time = new_expire_time
273 self._write_lease_record(f, leasenum, lease)
276 accepting_nodeids.add(lease.nodeid)
278 # Return the accepting_nodeids set, to give the client a chance to
279 # update the leases on a share which has been migrated from its
280 # original server to a new one.
281 msg = ("Unable to renew non-existent lease. I have leases accepted by"
283 msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
284 for anid in accepting_nodeids])
286 raise IndexError(msg)
288 def add_or_renew_lease(self, lease_info):
289 precondition(lease_info.owner_num != 0) # 0 means "no lease here"
291 self.renew_lease(lease_info.renew_secret,
292 lease_info.expiration_time)
294 self.add_lease(lease_info)
296 def cancel_lease(self, cancel_secret):
297 """Remove any leases with the given cancel_secret. If the last lease
298 is cancelled, the file will be removed. Return the number of bytes
299 that were freed (by truncating the list of leases, and possibly by
300 deleting the file. Raise IndexError if there was no lease with the
301 given cancel_secret."""
303 accepting_nodeids = set()
306 blank_lease = LeaseInfo(owner_num=0,
307 renew_secret="\x00"*32,
308 cancel_secret="\x00"*32,
311 f = open(self.home, 'rb+')
312 for (leasenum,lease) in self._enumerate_leases(f):
313 accepting_nodeids.add(lease.nodeid)
314 if lease.cancel_secret == cancel_secret:
315 self._write_lease_record(f, leasenum, blank_lease)
320 freed_space = self._pack_leases(f)
323 freed_space += os.stat(self.home)[stat.ST_SIZE]
327 msg = ("Unable to cancel non-existent lease. I have leases "
328 "accepted by nodeids: ")
329 msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
330 for anid in accepting_nodeids])
332 raise IndexError(msg)
334 def _pack_leases(self, f):
335 # TODO: reclaim space from cancelled leases
338 def _read_write_enabler_and_nodeid(self, f):
340 data = f.read(self.HEADER_SIZE)
342 write_enabler_nodeid, write_enabler,
343 data_length, extra_least_offset) = \
344 struct.unpack(">32s20s32sQQ", data)
345 assert magic == self.MAGIC
346 return (write_enabler, write_enabler_nodeid)
348 def readv(self, readv):
350 f = open(self.home, 'rb')
351 for (offset, length) in readv:
352 datav.append(self._read_share_data(f, offset, length))
356 # def remote_get_length(self):
357 # f = open(self.home, 'rb')
358 # data_length = self._read_data_length(f)
362 def check_write_enabler(self, write_enabler, si_s):
363 f = open(self.home, 'rb+')
364 (real_write_enabler, write_enabler_nodeid) = \
365 self._read_write_enabler_and_nodeid(f)
367 if write_enabler != real_write_enabler:
368 # accomodate share migration by reporting the nodeid used for the
370 self.log(format="bad write enabler on SI %(si)s,"
371 " recorded by nodeid %(nodeid)s",
372 facility="tahoe.storage",
373 level=log.WEIRD, umid="cE1eBQ",
374 si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
375 msg = "The write enabler was recorded by nodeid '%s'." % \
376 (idlib.nodeid_b2a(write_enabler_nodeid),)
377 raise BadWriteEnablerError(msg)
379 def check_testv(self, testv):
381 f = open(self.home, 'rb+')
382 for (offset, length, operator, specimen) in testv:
383 data = self._read_share_data(f, offset, length)
384 if not testv_compare(data, operator, specimen):
390 def writev(self, datav, new_length):
391 f = open(self.home, 'rb+')
392 for (offset, data) in datav:
393 self._write_share_data(f, offset, data)
394 if new_length is not None:
395 self._change_container_size(f, new_length)
396 f.seek(self.DATA_LENGTH_OFFSET)
397 f.write(struct.pack(">Q", new_length))
400 def testv_compare(a, op, b):
401 assert op in ("lt", "le", "eq", "ne", "ge", "gt")
418 def check_testv(self, testv):
420 for (offset, length, operator, specimen) in testv:
422 if not testv_compare(data, operator, specimen):
427 def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
428 ms = MutableShareFile(filename, parent)
429 ms.create(my_nodeid, write_enabler)
431 return MutableShareFile(filename, parent)