1 import os, stat, struct
3 from allmydata.interfaces import BadWriteEnablerError
4 from allmydata.util import idlib, log
5 from allmydata.util.assertutil import precondition
6 from allmydata.storage.lease import LeaseInfo
7 from allmydata.storage.common import DataTooLargeError
9 # the MutableShareFile is like the ShareFile, but used for mutable data. It
10 # has a different layout. See docs/mutable.txt for more details.
13 # 1 0 32 magic verstr "tahoe mutable container v1" plus binary
14 # 2 32 20 write enabler's nodeid
15 # 3 52 32 write enabler
16 # 4 84 8 data size (actual share data present) (a)
17 # 5 92 8 offset of (8) count of extra leases (after data)
18 # 6 100 368 four leases, 92 bytes each
19 # 0 4 ownerid (0 means "no lease here")
20 # 4 4 expiration timestamp
23 # 72 20 nodeid which accepted the tokens
25 # 8 ?? 4 count of extra leases
26 # 9 ?? n*92 extra leases
29 assert struct.calcsize("L"), 4 # The struct module doc says that L's are 4 bytes in size.
30 assert struct.calcsize("Q"), 8 # The struct module doc says that Q's are 8 bytes in size (at least with big-endian ordering).
32 class MutableShareFile:
34 DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
35 EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
36 HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
37 LEASE_SIZE = struct.calcsize(">LL32s32s20s")
38 assert LEASE_SIZE == 92
39 DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
40 assert DATA_OFFSET == 468, DATA_OFFSET
41 # our sharefiles share with a recognizable string, plus some random
42 # binary data to reduce the chance that a regular text file will look
44 MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e"
45 assert len(MAGIC) == 32
46 MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary
47 # TODO: decide upon a policy for max share size
49 def __init__(self, filename, parent=None):
51 if os.path.exists(self.home):
52 # we don't cache anything, just check the magic
53 f = open(self.home, 'rb')
54 data = f.read(self.HEADER_SIZE)
56 write_enabler_nodeid, write_enabler,
57 data_length, extra_least_offset) = \
58 struct.unpack(">32s20s32sQQ", data)
59 assert magic == self.MAGIC
60 self.parent = parent # for logging
62 def log(self, *args, **kwargs):
63 return self.parent.log(*args, **kwargs)
65 def create(self, my_nodeid, write_enabler):
66 assert not os.path.exists(self.home)
68 extra_lease_offset = (self.HEADER_SIZE
71 assert extra_lease_offset == self.DATA_OFFSET # true at creation
73 f = open(self.home, 'wb')
74 header = struct.pack(">32s20s32sQQ",
75 self.MAGIC, my_nodeid, write_enabler,
76 data_length, extra_lease_offset,
78 leases = ("\x00"*self.LEASE_SIZE) * 4
79 f.write(header + leases)
80 # data goes here, empty after creation
81 f.write(struct.pack(">L", num_extra_leases))
82 # extra leases go here, none at creation
88 def _read_data_length(self, f):
89 f.seek(self.DATA_LENGTH_OFFSET)
90 (data_length,) = struct.unpack(">Q", f.read(8))
93 def _write_data_length(self, f, data_length):
94 f.seek(self.DATA_LENGTH_OFFSET)
95 f.write(struct.pack(">Q", data_length))
97 def _read_share_data(self, f, offset, length):
98 precondition(offset >= 0)
99 data_length = self._read_data_length(f)
100 if offset+length > data_length:
101 # reads beyond the end of the data are truncated. Reads that
102 # start beyond the end of the data return an empty string.
103 length = max(0, data_length-offset)
106 precondition(offset+length <= data_length)
107 f.seek(self.DATA_OFFSET+offset)
108 data = f.read(length)
111 def _read_extra_lease_offset(self, f):
112 f.seek(self.EXTRA_LEASE_OFFSET)
113 (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
114 return extra_lease_offset
116 def _write_extra_lease_offset(self, f, offset):
117 f.seek(self.EXTRA_LEASE_OFFSET)
118 f.write(struct.pack(">Q", offset))
120 def _read_num_extra_leases(self, f):
121 offset = self._read_extra_lease_offset(f)
123 (num_extra_leases,) = struct.unpack(">L", f.read(4))
124 return num_extra_leases
126 def _write_num_extra_leases(self, f, num_leases):
127 extra_lease_offset = self._read_extra_lease_offset(f)
128 f.seek(extra_lease_offset)
129 f.write(struct.pack(">L", num_leases))
131 def _change_container_size(self, f, new_container_size):
132 if new_container_size > self.MAX_SIZE:
133 raise DataTooLargeError()
134 old_extra_lease_offset = self._read_extra_lease_offset(f)
135 new_extra_lease_offset = self.DATA_OFFSET + new_container_size
136 if new_extra_lease_offset < old_extra_lease_offset:
137 # TODO: allow containers to shrink. For now they remain large.
139 num_extra_leases = self._read_num_extra_leases(f)
140 f.seek(old_extra_lease_offset)
141 extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
142 f.seek(new_extra_lease_offset)
143 f.write(extra_lease_data)
144 # an interrupt here will corrupt the leases, iff the move caused the
145 # extra leases to overlap.
146 self._write_extra_lease_offset(f, new_extra_lease_offset)
148 def _write_share_data(self, f, offset, data):
150 precondition(offset >= 0)
151 data_length = self._read_data_length(f)
152 extra_lease_offset = self._read_extra_lease_offset(f)
154 if offset+length >= data_length:
155 # They are expanding their data size.
156 if self.DATA_OFFSET+offset+length > extra_lease_offset:
157 # Their new data won't fit in the current container, so we
158 # have to move the leases. With luck, they're expanding it
159 # more than the size of the extra lease block, which will
160 # minimize the corrupt-the-share window
161 self._change_container_size(f, offset+length)
162 extra_lease_offset = self._read_extra_lease_offset(f)
164 # an interrupt here is ok.. the container has been enlarged
165 # but the data remains untouched
167 assert self.DATA_OFFSET+offset+length <= extra_lease_offset
168 # Their data now fits in the current container. We must write
169 # their new data and modify the recorded data size.
170 new_data_length = offset+length
171 self._write_data_length(f, new_data_length)
172 # an interrupt here will result in a corrupted share
174 # now all that's left to do is write out their data
175 f.seek(self.DATA_OFFSET+offset)
179 def _write_lease_record(self, f, lease_number, lease_info):
180 extra_lease_offset = self._read_extra_lease_offset(f)
181 num_extra_leases = self._read_num_extra_leases(f)
183 offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
184 elif (lease_number-4) < num_extra_leases:
185 offset = (extra_lease_offset
187 + (lease_number-4)*self.LEASE_SIZE)
189 # must add an extra lease record
190 self._write_num_extra_leases(f, num_extra_leases+1)
191 offset = (extra_lease_offset
193 + (lease_number-4)*self.LEASE_SIZE)
195 assert f.tell() == offset
196 f.write(lease_info.to_mutable_data())
198 def _read_lease_record(self, f, lease_number):
199 # returns a LeaseInfo instance, or None
200 extra_lease_offset = self._read_extra_lease_offset(f)
201 num_extra_leases = self._read_num_extra_leases(f)
203 offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
204 elif (lease_number-4) < num_extra_leases:
205 offset = (extra_lease_offset
207 + (lease_number-4)*self.LEASE_SIZE)
209 raise IndexError("No such lease number %d" % lease_number)
211 assert f.tell() == offset
212 data = f.read(self.LEASE_SIZE)
213 lease_info = LeaseInfo().from_mutable_data(data)
214 if lease_info.owner_num == 0:
218 def _get_num_lease_slots(self, f):
219 # how many places do we have allocated for leases? Not all of them
221 num_extra_leases = self._read_num_extra_leases(f)
222 return 4+num_extra_leases
224 def _get_first_empty_lease_slot(self, f):
225 # return an int with the index of an empty slot, or None if we do not
226 # currently have an empty slot
228 for i in range(self._get_num_lease_slots(f)):
229 if self._read_lease_record(f, i) is None:
233 def get_leases(self):
234 """Yields a LeaseInfo instance for all leases."""
235 f = open(self.home, 'rb')
236 for i, lease in self._enumerate_leases(f):
240 def _enumerate_leases(self, f):
241 for i in range(self._get_num_lease_slots(f)):
243 data = self._read_lease_record(f, i)
249 def add_lease(self, lease_info):
250 precondition(lease_info.owner_num != 0) # 0 means "no lease here"
251 f = open(self.home, 'rb+')
252 num_lease_slots = self._get_num_lease_slots(f)
253 empty_slot = self._get_first_empty_lease_slot(f)
254 if empty_slot is not None:
255 self._write_lease_record(f, empty_slot, lease_info)
257 self._write_lease_record(f, num_lease_slots, lease_info)
260 def renew_lease(self, renew_secret, new_expire_time):
261 accepting_nodeids = set()
262 f = open(self.home, 'rb+')
263 for (leasenum,lease) in self._enumerate_leases(f):
264 if lease.renew_secret == renew_secret:
265 # yup. See if we need to update the owner time.
266 if new_expire_time > lease.expiration_time:
268 lease.expiration_time = new_expire_time
269 self._write_lease_record(f, leasenum, lease)
272 accepting_nodeids.add(lease.nodeid)
274 # Return the accepting_nodeids set, to give the client a chance to
275 # update the leases on a share which has been migrated from its
276 # original server to a new one.
277 msg = ("Unable to renew non-existent lease. I have leases accepted by"
279 msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
280 for anid in accepting_nodeids])
282 raise IndexError(msg)
284 def add_or_renew_lease(self, lease_info):
285 precondition(lease_info.owner_num != 0) # 0 means "no lease here"
287 self.renew_lease(lease_info.renew_secret,
288 lease_info.expiration_time)
290 self.add_lease(lease_info)
292 def cancel_lease(self, cancel_secret):
293 """Remove any leases with the given cancel_secret. If the last lease
294 is cancelled, the file will be removed. Return the number of bytes
295 that were freed (by truncating the list of leases, and possibly by
296 deleting the file. Raise IndexError if there was no lease with the
297 given cancel_secret."""
299 accepting_nodeids = set()
302 blank_lease = LeaseInfo(owner_num=0,
303 renew_secret="\x00"*32,
304 cancel_secret="\x00"*32,
307 f = open(self.home, 'rb+')
308 for (leasenum,lease) in self._enumerate_leases(f):
309 accepting_nodeids.add(lease.nodeid)
310 if lease.cancel_secret == cancel_secret:
311 self._write_lease_record(f, leasenum, blank_lease)
316 freed_space = self._pack_leases(f)
319 freed_space += os.stat(self.home)[stat.ST_SIZE]
323 msg = ("Unable to cancel non-existent lease. I have leases "
324 "accepted by nodeids: ")
325 msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
326 for anid in accepting_nodeids])
328 raise IndexError(msg)
330 def _pack_leases(self, f):
331 # TODO: reclaim space from cancelled leases
334 def _read_write_enabler_and_nodeid(self, f):
336 data = f.read(self.HEADER_SIZE)
338 write_enabler_nodeid, write_enabler,
339 data_length, extra_least_offset) = \
340 struct.unpack(">32s20s32sQQ", data)
341 assert magic == self.MAGIC
342 return (write_enabler, write_enabler_nodeid)
344 def readv(self, readv):
346 f = open(self.home, 'rb')
347 for (offset, length) in readv:
348 datav.append(self._read_share_data(f, offset, length))
352 # def remote_get_length(self):
353 # f = open(self.home, 'rb')
354 # data_length = self._read_data_length(f)
358 def check_write_enabler(self, write_enabler, si_s):
359 f = open(self.home, 'rb+')
360 (real_write_enabler, write_enabler_nodeid) = \
361 self._read_write_enabler_and_nodeid(f)
363 if write_enabler != real_write_enabler:
364 # accomodate share migration by reporting the nodeid used for the
366 self.log(format="bad write enabler on SI %(si)s,"
367 " recorded by nodeid %(nodeid)s",
368 facility="tahoe.storage",
369 level=log.WEIRD, umid="cE1eBQ",
370 si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
371 msg = "The write enabler was recorded by nodeid '%s'." % \
372 (idlib.nodeid_b2a(write_enabler_nodeid),)
373 raise BadWriteEnablerError(msg)
375 def check_testv(self, testv):
377 f = open(self.home, 'rb+')
378 for (offset, length, operator, specimen) in testv:
379 data = self._read_share_data(f, offset, length)
380 if not testv_compare(data, operator, specimen):
386 def writev(self, datav, new_length):
387 f = open(self.home, 'rb+')
388 for (offset, data) in datav:
389 self._write_share_data(f, offset, data)
390 if new_length is not None:
391 self._change_container_size(f, new_length)
392 f.seek(self.DATA_LENGTH_OFFSET)
393 f.write(struct.pack(">Q", new_length))
396 def testv_compare(a, op, b):
397 assert op in ("lt", "le", "eq", "ne", "ge", "gt")
414 def check_testv(self, testv):
416 for (offset, length, operator, specimen) in testv:
418 if not testv_compare(data, operator, specimen):
423 def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
424 ms = MutableShareFile(filename, parent)
425 ms.create(my_nodeid, write_enabler)
427 return MutableShareFile(filename, parent)