1 import os, stat, struct, time
3 from foolscap.api import Referenceable
5 from zope.interface import implements
6 from allmydata.interfaces import RIBucketWriter, RIBucketReader
7 from allmydata.util import base32, fileutil, log
8 from allmydata.util.assertutil import precondition
9 from allmydata.util.hashutil import constant_time_compare
10 from allmydata.storage.lease import LeaseInfo
11 from allmydata.storage.common import UnknownImmutableContainerVersionError, \
14 # each share file (in storage/shares/$SI/$SHNUM) contains lease information
15 # and share data. The share data is accessed by RIBucketWriter.write and
16 # RIBucketReader.read . The lease information is not accessible through these
19 # The share file has the following layout:
20 # 0x00: share file version number, four bytes, current version is 1
21 # 0x04: share data length, four bytes big-endian = A # See Footnote 1 below.
22 # 0x08: number of leases, four bytes big-endian
23 # 0x0c: beginning of share data (see immutable.layout.WriteBucketProxy)
24 # A+0x0c = B: first lease. Lease format is:
25 # B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner
26 # B+0x04: renew secret, 32 bytes (SHA256)
27 # B+0x24: cancel secret, 32 bytes (SHA256)
28 # B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
29 # B+0x48: next lease, or end of record
31 # Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers,
32 # but it is still filled in by storage servers in case the storage server
33 # software gets downgraded from >= Tahoe v1.3.0 to < Tahoe v1.3.0, or the
34 # share file is moved from one storage server to another. The value stored in
35 # this field is truncated, so if the actual share data length is >= 2**32,
36 # then the value stored in this field will be the actual share data length
40 LEASE_SIZE = struct.calcsize(">L32s32sL")
41 sharetype = "immutable"
43 def __init__(self, filename, max_size=None, create=False):
44 """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """
45 precondition((max_size is not None) or (not create), max_size, create)
47 self._max_size = max_size
49 # touch the file, so later callers will see that we're working on
50 # it. Also construct the metadata.
51 assert not os.path.exists(self.home)
52 fileutil.make_dirs(os.path.dirname(self.home))
53 f = open(self.home, 'wb')
54 # The second field -- the four-byte share data length -- is no
55 # longer used as of Tahoe v1.3.0, but we continue to write it in
56 # there in case someone downgrades a storage server from >=
57 # Tahoe-1.3.0 to < Tahoe-1.3.0, or moves a share file from one
58 # server to another, etc. We do saturation -- a share data length
59 # larger than 2**32-1 (what can fit into the field) is marked as
60 # the largest length that can fit into the field. That way, even
61 # if this does happen, the old < v1.3.0 server will still allow
62 # clients to read the first part of the share.
63 f.write(struct.pack(">LLL", 1, min(2**32-1, max_size), 0))
65 self._lease_offset = max_size + 0x0c
68 f = open(self.home, 'rb')
69 filesize = os.path.getsize(self.home)
70 (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc))
73 msg = "sharefile %s had version %d but we wanted 1" % \
75 raise UnknownImmutableContainerVersionError(msg)
76 self._num_leases = num_leases
77 self._lease_offset = filesize - (num_leases * self.LEASE_SIZE)
78 self._data_offset = 0xc
83 def read_share_data(self, offset, length):
84 precondition(offset >= 0)
85 # reads beyond the end of the data are truncated. Reads that start
86 # beyond the end of the data return an empty string.
87 seekpos = self._data_offset+offset
88 actuallength = max(0, min(length, self._lease_offset-seekpos))
91 f = open(self.home, 'rb')
93 return f.read(actuallength)
95 def write_share_data(self, offset, data):
97 precondition(offset >= 0, offset)
98 if self._max_size is not None and offset+length > self._max_size:
99 raise DataTooLargeError(self._max_size, offset, length)
100 f = open(self.home, 'rb+')
101 real_offset = self._data_offset+offset
103 assert f.tell() == real_offset
107 def _write_lease_record(self, f, lease_number, lease_info):
108 offset = self._lease_offset + lease_number * self.LEASE_SIZE
110 assert f.tell() == offset
111 f.write(lease_info.to_immutable_data())
113 def _read_num_leases(self, f):
115 (num_leases,) = struct.unpack(">L", f.read(4))
118 def _write_num_leases(self, f, num_leases):
120 f.write(struct.pack(">L", num_leases))
122 def _truncate_leases(self, f, num_leases):
123 f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
125 def get_leases(self):
126 """Yields a LeaseInfo instance for all leases."""
127 f = open(self.home, 'rb')
128 (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc))
129 f.seek(self._lease_offset)
130 for i in range(num_leases):
131 data = f.read(self.LEASE_SIZE)
133 yield LeaseInfo().from_immutable_data(data)
135 def add_lease(self, lease_info):
136 f = open(self.home, 'rb+')
137 num_leases = self._read_num_leases(f)
138 self._write_lease_record(f, num_leases, lease_info)
139 self._write_num_leases(f, num_leases+1)
142 def renew_lease(self, renew_secret, new_expire_time):
143 for i,lease in enumerate(self.get_leases()):
144 if constant_time_compare(lease.renew_secret, renew_secret):
145 # yup. See if we need to update the owner time.
146 if new_expire_time > lease.expiration_time:
148 lease.expiration_time = new_expire_time
149 f = open(self.home, 'rb+')
150 self._write_lease_record(f, i, lease)
153 raise IndexError("unable to renew non-existent lease")
155 def add_or_renew_lease(self, lease_info):
157 self.renew_lease(lease_info.renew_secret,
158 lease_info.expiration_time)
160 self.add_lease(lease_info)
163 def cancel_lease(self, cancel_secret):
164 """Remove a lease with the given cancel_secret. If the last lease is
165 cancelled, the file will be removed. Return the number of bytes that
166 were freed (by truncating the list of leases, and possibly by
167 deleting the file. Raise IndexError if there was no lease with the
171 leases = list(self.get_leases())
172 num_leases_removed = 0
173 for i,lease in enumerate(leases):
174 if constant_time_compare(lease.cancel_secret, cancel_secret):
176 num_leases_removed += 1
177 if not num_leases_removed:
178 raise IndexError("unable to find matching lease to cancel")
179 if num_leases_removed:
180 # pack and write out the remaining leases. We write these out in
181 # the same order as they were added, so that if we crash while
182 # doing this, we won't lose any non-cancelled leases.
183 leases = [l for l in leases if l] # remove the cancelled leases
184 f = open(self.home, 'rb+')
185 for i,lease in enumerate(leases):
186 self._write_lease_record(f, i, lease)
187 self._write_num_leases(f, len(leases))
188 self._truncate_leases(f, len(leases))
190 space_freed = self.LEASE_SIZE * num_leases_removed
192 space_freed += os.stat(self.home)[stat.ST_SIZE]
197 class BucketWriter(Referenceable):
198 implements(RIBucketWriter)
200 def __init__(self, ss, incominghome, finalhome, max_size, lease_info, canary):
202 self.incominghome = incominghome
203 self.finalhome = finalhome
204 self._max_size = max_size # don't allow the client to write more than this
205 self._canary = canary
206 self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
208 self.throw_out_all_data = False
209 self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
210 # also, add our lease to the file now, so that other ones can be
211 # added by simultaneous uploaders
212 self._sharefile.add_lease(lease_info)
214 def allocated_size(self):
215 return self._max_size
217 def remote_write(self, offset, data):
219 precondition(not self.closed)
220 if self.throw_out_all_data:
222 self._sharefile.write_share_data(offset, data)
223 self.ss.add_latency("write", time.time() - start)
224 self.ss.count("write")
226 def remote_close(self):
227 precondition(not self.closed)
230 fileutil.make_dirs(os.path.dirname(self.finalhome))
231 fileutil.rename(self.incominghome, self.finalhome)
233 # self.incominghome is like storage/shares/incoming/ab/abcde/4 .
234 # We try to delete the parent (.../ab/abcde) to avoid leaving
235 # these directories lying around forever, but the delete might
236 # fail if we're working on another share for the same storage
237 # index (like ab/abcde/5). The alternative approach would be to
238 # use a hierarchy of objects (PrefixHolder, BucketHolder,
239 # ShareWriter), each of which is responsible for a single
240 # directory on disk, and have them use reference counting of
241 # their children to know when they should do the rmdir. This
242 # approach is simpler, but relies on os.rmdir refusing to delete
243 # a non-empty directory. Do *not* use fileutil.rm_dir() here!
244 os.rmdir(os.path.dirname(self.incominghome))
245 # we also delete the grandparent (prefix) directory, .../ab ,
246 # again to avoid leaving directories lying around. This might
247 # fail if there is another bucket open that shares a prefix (like
249 os.rmdir(os.path.dirname(os.path.dirname(self.incominghome)))
250 # we leave the great-grandparent (incoming/) directory in place.
251 except EnvironmentError:
252 # ignore the "can't rmdir because the directory is not empty"
253 # exceptions, those are normal consequences of the
254 # above-mentioned conditions.
256 self._sharefile = None
258 self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
260 filelen = os.stat(self.finalhome)[stat.ST_SIZE]
261 self.ss.bucket_writer_closed(self, filelen)
262 self.ss.add_latency("close", time.time() - start)
263 self.ss.count("close")
265 def _disconnected(self):
269 def remote_abort(self):
270 log.msg("storage: aborting sharefile %s" % self.incominghome,
271 facility="tahoe.storage", level=log.UNUSUAL)
273 self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
275 self.ss.count("abort")
281 os.remove(self.incominghome)
282 # if we were the last share to be moved, remove the incoming/
283 # directory that was our parent
284 parentdir = os.path.split(self.incominghome)[0]
285 if not os.listdir(parentdir):
287 self._sharefile = None
289 # We are now considered closed for further writing. We must tell
290 # the storage server about this so that it stops expecting us to
291 # use the space it allocated for us earlier.
293 self.ss.bucket_writer_closed(self, 0)
296 class BucketReader(Referenceable):
297 implements(RIBucketReader)
299 def __init__(self, ss, sharefname, storage_index=None, shnum=None):
301 self._share_file = ShareFile(sharefname)
302 self.storage_index = storage_index
306 return "<%s %s %s>" % (self.__class__.__name__,
307 base32.b2a_l(self.storage_index[:8], 60),
310 def remote_read(self, offset, length):
312 data = self._share_file.read_share_data(offset, length)
313 self.ss.add_latency("read", time.time() - start)
314 self.ss.count("read")
317 def remote_advise_corrupt_share(self, reason):
318 return self.ss.remote_advise_corrupt_share("immutable",