1 import os, stat, struct, time
3 from foolscap.api import Referenceable
5 from zope.interface import implements
6 from allmydata.interfaces import RIBucketWriter, RIBucketReader
7 from allmydata.util import base32, fileutil, log
8 from allmydata.util.assertutil import precondition
9 from allmydata.util.hashutil import constant_time_compare
10 from allmydata.storage.lease import LeaseInfo
11 from allmydata.storage.common import UnknownImmutableContainerVersionError, \
14 # each share file (in storage/shares/$SI/$SHNUM) contains lease information
15 # and share data. The share data is accessed by RIBucketWriter.write and
16 # RIBucketReader.read . The lease information is not accessible through these
19 # The share file has the following layout:
20 # 0x00: share file version number, four bytes, current version is 1
21 # 0x04: share data length, four bytes big-endian = A # See Footnote 1 below.
22 # 0x08: number of leases, four bytes big-endian
23 # 0x0c: beginning of share data (see immutable.layout.WriteBucketProxy)
24 # A+0x0c = B: first lease. Lease format is:
25 # B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner
26 # B+0x04: renew secret, 32 bytes (SHA256)
27 # B+0x24: cancel secret, 32 bytes (SHA256)
28 # B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
29 # B+0x48: next lease, or end of record
31 # Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers,
32 # but it is still filled in by storage servers in case the storage server
33 # software gets downgraded from >= Tahoe v1.3.0 to < Tahoe v1.3.0, or the
34 # share file is moved from one storage server to another. The value stored in
35 # this field is truncated, so if the actual share data length is >= 2**32,
36 # then the value stored in this field will be the actual share data length
40 LEASE_SIZE = struct.calcsize(">L32s32sL")
41 sharetype = "immutable"
43 def __init__(self, filename, max_size=None, create=False):
44 """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """
45 precondition((max_size is not None) or (not create), max_size, create)
47 self._max_size = max_size
49 # touch the file, so later callers will see that we're working on
50 # it. Also construct the metadata.
51 assert not os.path.exists(self.home)
52 fileutil.make_dirs(os.path.dirname(self.home))
53 f = open(self.home, 'wb')
54 # The second field -- the four-byte share data length -- is no
55 # longer used as of Tahoe v1.3.0, but we continue to write it in
56 # there in case someone downgrades a storage server from >=
57 # Tahoe-1.3.0 to < Tahoe-1.3.0, or moves a share file from one
58 # server to another, etc. We do saturation -- a share data length
59 # larger than 2**32-1 (what can fit into the field) is marked as
60 # the largest length that can fit into the field. That way, even
61 # if this does happen, the old < v1.3.0 server will still allow
62 # clients to read the first part of the share.
63 f.write(struct.pack(">LLL", 1, min(2**32-1, max_size), 0))
65 self._lease_offset = max_size + 0x0c
68 f = open(self.home, 'rb')
69 filesize = os.path.getsize(self.home)
70 (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc))
73 msg = "sharefile %s had version %d but we wanted 1" % \
75 raise UnknownImmutableContainerVersionError(msg)
76 self._num_leases = num_leases
77 self._lease_offset = filesize - (num_leases * self.LEASE_SIZE)
78 self._data_offset = 0xc
83 def read_share_data(self, offset, length):
84 precondition(offset >= 0)
85 # reads beyond the end of the data are truncated. Reads that start
86 # beyond the end of the data return an empty string. I wonder why
87 # Python doesn't do the following computation for me?
88 seekpos = self._data_offset+offset
89 fsize = os.path.getsize(self.home)
90 actuallength = max(0, min(length, fsize-seekpos))
93 f = open(self.home, 'rb')
95 return f.read(actuallength)
97 def write_share_data(self, offset, data):
99 precondition(offset >= 0, offset)
100 if self._max_size is not None and offset+length > self._max_size:
101 raise DataTooLargeError(self._max_size, offset, length)
102 f = open(self.home, 'rb+')
103 real_offset = self._data_offset+offset
105 assert f.tell() == real_offset
109 def _write_lease_record(self, f, lease_number, lease_info):
110 offset = self._lease_offset + lease_number * self.LEASE_SIZE
112 assert f.tell() == offset
113 f.write(lease_info.to_immutable_data())
115 def _read_num_leases(self, f):
117 (num_leases,) = struct.unpack(">L", f.read(4))
120 def _write_num_leases(self, f, num_leases):
122 f.write(struct.pack(">L", num_leases))
124 def _truncate_leases(self, f, num_leases):
125 f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
127 def get_leases(self):
128 """Yields a LeaseInfo instance for all leases."""
129 f = open(self.home, 'rb')
130 (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc))
131 f.seek(self._lease_offset)
132 for i in range(num_leases):
133 data = f.read(self.LEASE_SIZE)
135 yield LeaseInfo().from_immutable_data(data)
137 def add_lease(self, lease_info):
138 f = open(self.home, 'rb+')
139 num_leases = self._read_num_leases(f)
140 self._write_lease_record(f, num_leases, lease_info)
141 self._write_num_leases(f, num_leases+1)
144 def renew_lease(self, renew_secret, new_expire_time):
145 for i,lease in enumerate(self.get_leases()):
146 if constant_time_compare(lease.renew_secret, renew_secret):
147 # yup. See if we need to update the owner time.
148 if new_expire_time > lease.expiration_time:
150 lease.expiration_time = new_expire_time
151 f = open(self.home, 'rb+')
152 self._write_lease_record(f, i, lease)
155 raise IndexError("unable to renew non-existent lease")
157 def add_or_renew_lease(self, lease_info):
159 self.renew_lease(lease_info.renew_secret,
160 lease_info.expiration_time)
162 self.add_lease(lease_info)
165 def cancel_lease(self, cancel_secret):
166 """Remove a lease with the given cancel_secret. If the last lease is
167 cancelled, the file will be removed. Return the number of bytes that
168 were freed (by truncating the list of leases, and possibly by
169 deleting the file. Raise IndexError if there was no lease with the
173 leases = list(self.get_leases())
174 num_leases_removed = 0
175 for i,lease in enumerate(leases):
176 if constant_time_compare(lease.cancel_secret, cancel_secret):
178 num_leases_removed += 1
179 if not num_leases_removed:
180 raise IndexError("unable to find matching lease to cancel")
181 if num_leases_removed:
182 # pack and write out the remaining leases. We write these out in
183 # the same order as they were added, so that if we crash while
184 # doing this, we won't lose any non-cancelled leases.
185 leases = [l for l in leases if l] # remove the cancelled leases
186 f = open(self.home, 'rb+')
187 for i,lease in enumerate(leases):
188 self._write_lease_record(f, i, lease)
189 self._write_num_leases(f, len(leases))
190 self._truncate_leases(f, len(leases))
192 space_freed = self.LEASE_SIZE * num_leases_removed
194 space_freed += os.stat(self.home)[stat.ST_SIZE]
199 class BucketWriter(Referenceable):
200 implements(RIBucketWriter)
202 def __init__(self, ss, incominghome, finalhome, max_size, lease_info, canary):
204 self.incominghome = incominghome
205 self.finalhome = finalhome
206 self._max_size = max_size # don't allow the client to write more than this
207 self._canary = canary
208 self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
210 self.throw_out_all_data = False
211 self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
212 # also, add our lease to the file now, so that other ones can be
213 # added by simultaneous uploaders
214 self._sharefile.add_lease(lease_info)
216 def allocated_size(self):
217 return self._max_size
219 def remote_write(self, offset, data):
221 precondition(not self.closed)
222 if self.throw_out_all_data:
224 self._sharefile.write_share_data(offset, data)
225 self.ss.add_latency("write", time.time() - start)
226 self.ss.count("write")
228 def remote_close(self):
229 precondition(not self.closed)
232 fileutil.make_dirs(os.path.dirname(self.finalhome))
233 fileutil.rename(self.incominghome, self.finalhome)
235 # self.incominghome is like storage/shares/incoming/ab/abcde/4 .
236 # We try to delete the parent (.../ab/abcde) to avoid leaving
237 # these directories lying around forever, but the delete might
238 # fail if we're working on another share for the same storage
239 # index (like ab/abcde/5). The alternative approach would be to
240 # use a hierarchy of objects (PrefixHolder, BucketHolder,
241 # ShareWriter), each of which is responsible for a single
242 # directory on disk, and have them use reference counting of
243 # their children to know when they should do the rmdir. This
244 # approach is simpler, but relies on os.rmdir refusing to delete
245 # a non-empty directory. Do *not* use fileutil.rm_dir() here!
246 os.rmdir(os.path.dirname(self.incominghome))
247 # we also delete the grandparent (prefix) directory, .../ab ,
248 # again to avoid leaving directories lying around. This might
249 # fail if there is another bucket open that shares a prefix (like
251 os.rmdir(os.path.dirname(os.path.dirname(self.incominghome)))
252 # we leave the great-grandparent (incoming/) directory in place.
253 except EnvironmentError:
254 # ignore the "can't rmdir because the directory is not empty"
255 # exceptions, those are normal consequences of the
256 # above-mentioned conditions.
258 self._sharefile = None
260 self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
262 filelen = os.stat(self.finalhome)[stat.ST_SIZE]
263 self.ss.bucket_writer_closed(self, filelen)
264 self.ss.add_latency("close", time.time() - start)
265 self.ss.count("close")
267 def _disconnected(self):
271 def remote_abort(self):
272 log.msg("storage: aborting sharefile %s" % self.incominghome,
273 facility="tahoe.storage", level=log.UNUSUAL)
275 self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
277 self.ss.count("abort")
283 os.remove(self.incominghome)
284 # if we were the last share to be moved, remove the incoming/
285 # directory that was our parent
286 parentdir = os.path.split(self.incominghome)[0]
287 if not os.listdir(parentdir):
289 self._sharefile = None
291 # We are now considered closed for further writing. We must tell
292 # the storage server about this so that it stops expecting us to
293 # use the space it allocated for us earlier.
295 self.ss.bucket_writer_closed(self, 0)
298 class BucketReader(Referenceable):
299 implements(RIBucketReader)
301 def __init__(self, ss, sharefname, storage_index=None, shnum=None):
303 self._share_file = ShareFile(sharefname)
304 self.storage_index = storage_index
308 return "<%s %s %s>" % (self.__class__.__name__,
309 base32.b2a_l(self.storage_index[:8], 60),
312 def remote_read(self, offset, length):
314 data = self._share_file.read_share_data(offset, length)
315 self.ss.add_latency("read", time.time() - start)
316 self.ss.count("read")
319 def remote_advise_corrupt_share(self, reason):
320 return self.ss.remote_advise_corrupt_share("immutable",