1 import os, stat, struct
3 from allmydata.interfaces import BadWriteEnablerError
4 from allmydata.util import idlib, log
5 from allmydata.util.assertutil import precondition
6 from allmydata.storage.lease import LeaseInfo
7 from allmydata.storage.common import DataTooLargeError
9 # the MutableShareFile is like the ShareFile, but used for mutable data. It
10 # has a different layout. See docs/mutable.txt for more details.
13 # 1 0 32 magic verstr "tahoe mutable container v1" plus binary
14 # 2 32 20 write enabler's nodeid
15 # 3 52 32 write enabler
16 # 4 84 8 data size (actual share data present) (a)
17 # 5 92 8 offset of (8) count of extra leases (after data)
18 # 6 100 368 four leases, 92 bytes each
19 # 0 4 ownerid (0 means "no lease here")
20 # 4 4 expiration timestamp
23 # 72 20 nodeid which accepted the tokens
25 # 8 ?? 4 count of extra leases
26 # 9 ?? n*92 extra leases
29 assert struct.calcsize("L"), 4 # The struct module doc says that L's are 4 bytes in size.
30 assert struct.calcsize("Q"), 8 # The struct module doc says that Q's are 8 bytes in size (at least with big-endian ordering).
32 class MutableShareFile:
34 DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
35 EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
36 HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
37 LEASE_SIZE = struct.calcsize(">LL32s32s20s")
38 assert LEASE_SIZE == 92
39 DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
40 assert DATA_OFFSET == 468, DATA_OFFSET
41 # our sharefiles share with a recognizable string, plus some random
42 # binary data to reduce the chance that a regular text file will look
44 MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
45 assert len(MAGIC) == 32
46 MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
47 # TODO: decide upon a policy for max share size
49 def __init__(self, filename, parent=None):
51 if os.path.exists(self.home):
52 # we don't cache anything, just check the magic
53 f = open(self.home, 'rb')
54 data = f.read(self.HEADER_SIZE)
56 write_enabler_nodeid, write_enabler,
57 data_length, extra_least_offset) = \
58 struct.unpack(">32s20s32sQQ", data)
59 assert magic == self.MAGIC
60 self.parent = parent # for logging
62 def log(self, *args, **kwargs):
63 return self.parent.log(*args, **kwargs)
65 def create(self, my_nodeid, write_enabler):
66 assert not os.path.exists(self.home)
68 extra_lease_offset = (self.HEADER_SIZE
71 assert extra_lease_offset == self.DATA_OFFSET # true at creation
73 f = open(self.home, 'wb')
74 header = struct.pack(">32s20s32sQQ",
75 self.MAGIC, my_nodeid, write_enabler,
76 data_length, extra_lease_offset,
78 leases = ("\x00"*self.LEASE_SIZE) * 4
79 f.write(header + leases)
80 # data goes here, empty after creation
81 f.write(struct.pack(">L", num_extra_leases))
82 # extra leases go here, none at creation
88 def _read_data_length(self, f):
89 f.seek(self.DATA_LENGTH_OFFSET)
90 (data_length,) = struct.unpack(">Q", f.read(8))
93 def _write_data_length(self, f, data_length):
94 f.seek(self.DATA_LENGTH_OFFSET)
95 f.write(struct.pack(">Q", data_length))
97 def _read_share_data(self, f, offset, length):
98 precondition(offset >= 0)
99 data_length = self._read_data_length(f)
100 if offset+length > data_length:
101 # reads beyond the end of the data are truncated. Reads that
102 # start beyond the end of the data return an empty string.
103 length = max(0, data_length-offset)
106 precondition(offset+length <= data_length)
107 f.seek(self.DATA_OFFSET+offset)
108 data = f.read(length)
111 def _read_extra_lease_offset(self, f):
112 f.seek(self.EXTRA_LEASE_OFFSET)
113 (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
114 return extra_lease_offset
116 def _write_extra_lease_offset(self, f, offset):
117 f.seek(self.EXTRA_LEASE_OFFSET)
118 f.write(struct.pack(">Q", offset))
120 def _read_num_extra_leases(self, f):
121 offset = self._read_extra_lease_offset(f)
123 (num_extra_leases,) = struct.unpack(">L", f.read(4))
124 return num_extra_leases
126 def _write_num_extra_leases(self, f, num_leases):
127 extra_lease_offset = self._read_extra_lease_offset(f)
128 f.seek(extra_lease_offset)
129 f.write(struct.pack(">L", num_leases))
131 def _change_container_size(self, f, new_container_size):
132 if new_container_size > self.MAX_SIZE:
133 raise DataTooLargeError()
134 old_extra_lease_offset = self._read_extra_lease_offset(f)
135 new_extra_lease_offset = self.DATA_OFFSET + new_container_size
136 if new_extra_lease_offset < old_extra_lease_offset:
137 # TODO: allow containers to shrink. For now they remain large.
139 num_extra_leases = self._read_num_extra_leases(f)
140 f.seek(old_extra_lease_offset)
141 extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
142 f.seek(new_extra_lease_offset)
143 f.write(extra_lease_data)
144 # an interrupt here will corrupt the leases, iff the move caused the
145 # extra leases to overlap.
146 self._write_extra_lease_offset(f, new_extra_lease_offset)
148 def _write_share_data(self, f, offset, data):
150 precondition(offset >= 0)
151 data_length = self._read_data_length(f)
152 extra_lease_offset = self._read_extra_lease_offset(f)
154 if offset+length >= data_length:
155 # They are expanding their data size.
156 if self.DATA_OFFSET+offset+length > extra_lease_offset:
157 # Their new data won't fit in the current container, so we
158 # have to move the leases. With luck, they're expanding it
159 # more than the size of the extra lease block, which will
160 # minimize the corrupt-the-share window
161 self._change_container_size(f, offset+length)
162 extra_lease_offset = self._read_extra_lease_offset(f)
164 # an interrupt here is ok.. the container has been enlarged
165 # but the data remains untouched
167 assert self.DATA_OFFSET+offset+length <= extra_lease_offset
168 # Their data now fits in the current container. We must write
169 # their new data and modify the recorded data size.
170 new_data_length = offset+length
171 self._write_data_length(f, new_data_length)
172 # an interrupt here will result in a corrupted share
174 # now all that's left to do is write out their data
175 f.seek(self.DATA_OFFSET+offset)
179 def _write_lease_record(self, f, lease_number, lease_info):
180 extra_lease_offset = self._read_extra_lease_offset(f)
181 num_extra_leases = self._read_num_extra_leases(f)
183 offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
184 elif (lease_number-4) < num_extra_leases:
185 offset = (extra_lease_offset
187 + (lease_number-4)*self.LEASE_SIZE)
189 # must add an extra lease record
190 self._write_num_extra_leases(f, num_extra_leases+1)
191 offset = (extra_lease_offset
193 + (lease_number-4)*self.LEASE_SIZE)
195 assert f.tell() == offset
196 f.write(lease_info.to_mutable_data())
198 def _read_lease_record(self, f, lease_number):
199 # returns a LeaseInfo instance, or None
200 extra_lease_offset = self._read_extra_lease_offset(f)
201 num_extra_leases = self._read_num_extra_leases(f)
203 offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
204 elif (lease_number-4) < num_extra_leases:
205 offset = (extra_lease_offset
207 + (lease_number-4)*self.LEASE_SIZE)
209 raise IndexError("No such lease number %d" % lease_number)
211 assert f.tell() == offset
212 data = f.read(self.LEASE_SIZE)
213 lease_info = LeaseInfo().from_mutable_data(data)
214 if lease_info.owner_num == 0:
218 def _get_num_lease_slots(self, f):
219 # how many places do we have allocated for leases? Not all of them
221 num_extra_leases = self._read_num_extra_leases(f)
222 return 4+num_extra_leases
224 def _get_first_empty_lease_slot(self, f):
225 # return an int with the index of an empty slot, or None if we do not
226 # currently have an empty slot
228 for i in range(self._get_num_lease_slots(f)):
229 if self._read_lease_record(f, i) is None:
233 def _enumerate_leases(self, f):
234 """Yields (leasenum, (ownerid, expiration_time, renew_secret,
235 cancel_secret, accepting_nodeid)) for all leases."""
236 for i in range(self._get_num_lease_slots(f)):
238 data = self._read_lease_record(f, i)
244 def debug_get_leases(self):
245 f = open(self.home, 'rb')
246 leases = list(self._enumerate_leases(f))
250 def add_lease(self, lease_info):
251 precondition(lease_info.owner_num != 0) # 0 means "no lease here"
252 f = open(self.home, 'rb+')
253 num_lease_slots = self._get_num_lease_slots(f)
254 empty_slot = self._get_first_empty_lease_slot(f)
255 if empty_slot is not None:
256 self._write_lease_record(f, empty_slot, lease_info)
258 self._write_lease_record(f, num_lease_slots, lease_info)
261 def renew_lease(self, renew_secret, new_expire_time):
262 accepting_nodeids = set()
263 f = open(self.home, 'rb+')
264 for (leasenum,lease) in self._enumerate_leases(f):
265 if lease.renew_secret == renew_secret:
266 # yup. See if we need to update the owner time.
267 if new_expire_time > lease.expiration_time:
269 lease.expiration_time = new_expire_time
270 self._write_lease_record(f, leasenum, lease)
273 accepting_nodeids.add(lease.nodeid)
275 # Return the accepting_nodeids set, to give the client a chance to
276 # update the leases on a share which has been migrated from its
277 # original server to a new one.
278 msg = ("Unable to renew non-existent lease. I have leases accepted by"
280 msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
281 for anid in accepting_nodeids])
283 raise IndexError(msg)
285 def add_or_renew_lease(self, lease_info):
286 precondition(lease_info.owner_num != 0) # 0 means "no lease here"
288 self.renew_lease(lease_info.renew_secret,
289 lease_info.expiration_time)
291 self.add_lease(lease_info)
293 def cancel_lease(self, cancel_secret):
294 """Remove any leases with the given cancel_secret. If the last lease
295 is cancelled, the file will be removed. Return the number of bytes
296 that were freed (by truncating the list of leases, and possibly by
297 deleting the file. Raise IndexError if there was no lease with the
298 given cancel_secret."""
300 accepting_nodeids = set()
303 blank_lease = LeaseInfo(owner_num=0,
304 renew_secret="\x00"*32,
305 cancel_secret="\x00"*32,
308 f = open(self.home, 'rb+')
309 for (leasenum,lease) in self._enumerate_leases(f):
310 accepting_nodeids.add(lease.nodeid)
311 if lease.cancel_secret == cancel_secret:
312 self._write_lease_record(f, leasenum, blank_lease)
317 freed_space = self._pack_leases(f)
320 freed_space += os.stat(self.home)[stat.ST_SIZE]
324 msg = ("Unable to cancel non-existent lease. I have leases "
325 "accepted by nodeids: ")
326 msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
327 for anid in accepting_nodeids])
329 raise IndexError(msg)
331 def _pack_leases(self, f):
332 # TODO: reclaim space from cancelled leases
335 def _read_write_enabler_and_nodeid(self, f):
337 data = f.read(self.HEADER_SIZE)
339 write_enabler_nodeid, write_enabler,
340 data_length, extra_least_offset) = \
341 struct.unpack(">32s20s32sQQ", data)
342 assert magic == self.MAGIC
343 return (write_enabler, write_enabler_nodeid)
345 def readv(self, readv):
347 f = open(self.home, 'rb')
348 for (offset, length) in readv:
349 datav.append(self._read_share_data(f, offset, length))
353 # def remote_get_length(self):
354 # f = open(self.home, 'rb')
355 # data_length = self._read_data_length(f)
359 def check_write_enabler(self, write_enabler, si_s):
360 f = open(self.home, 'rb+')
361 (real_write_enabler, write_enabler_nodeid) = \
362 self._read_write_enabler_and_nodeid(f)
364 if write_enabler != real_write_enabler:
365 # accomodate share migration by reporting the nodeid used for the
367 self.log(format="bad write enabler on SI %(si)s,"
368 " recorded by nodeid %(nodeid)s",
369 facility="tahoe.storage",
370 level=log.WEIRD, umid="cE1eBQ",
371 si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
372 msg = "The write enabler was recorded by nodeid '%s'." % \
373 (idlib.nodeid_b2a(write_enabler_nodeid),)
374 raise BadWriteEnablerError(msg)
376 def check_testv(self, testv):
378 f = open(self.home, 'rb+')
379 for (offset, length, operator, specimen) in testv:
380 data = self._read_share_data(f, offset, length)
381 if not testv_compare(data, operator, specimen):
387 def writev(self, datav, new_length):
388 f = open(self.home, 'rb+')
389 for (offset, data) in datav:
390 self._write_share_data(f, offset, data)
391 if new_length is not None:
392 self._change_container_size(f, new_length)
393 f.seek(self.DATA_LENGTH_OFFSET)
394 f.write(struct.pack(">Q", new_length))
397 def testv_compare(a, op, b):
398 assert op in ("lt", "le", "eq", "ne", "ge", "gt")
415 def check_testv(self, testv):
417 for (offset, length, operator, specimen) in testv:
419 if not testv_compare(data, operator, specimen):
424 def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
425 ms = MutableShareFile(filename, parent)
426 ms.create(my_nodeid, write_enabler)
428 return MutableShareFile(filename, parent)