]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage/immutable.py
b7c4a9f9eaa0a5c29861c22d8219bb8b90bfccfc
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage / immutable.py
1 import os, stat, struct, time
2
3 from foolscap.api import Referenceable
4
5 from zope.interface import implements
6 from allmydata.interfaces import RIBucketWriter, RIBucketReader
7 from allmydata.util import base32, fileutil, log
8 from allmydata.util.assertutil import precondition
9 from allmydata.util.hashutil import constant_time_compare
10 from allmydata.storage.lease import LeaseInfo
11 from allmydata.storage.common import UnknownImmutableContainerVersionError, \
12      DataTooLargeError
13
14 # each share file (in storage/shares/$SI/$SHNUM) contains lease information
15 # and share data. The share data is accessed by RIBucketWriter.write and
16 # RIBucketReader.read . The lease information is not accessible through these
17 # interfaces.
18
19 # The share file has the following layout:
20 #  0x00: share file version number, four bytes, current version is 1
21 #  0x04: share data length, four bytes big-endian = A # See Footnote 1 below.
22 #  0x08: number of leases, four bytes big-endian
23 #  0x0c: beginning of share data (see immutable.layout.WriteBucketProxy)
24 #  A+0x0c = B: first lease. Lease format is:
25 #   B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner
26 #   B+0x04: renew secret, 32 bytes (SHA256)
27 #   B+0x24: cancel secret, 32 bytes (SHA256)
28 #   B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
29 #   B+0x48: next lease, or end of record
30
31 # Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers,
32 # but it is still filled in by storage servers in case the storage server
33 # software gets downgraded from >= Tahoe v1.3.0 to < Tahoe v1.3.0, or the
34 # share file is moved from one storage server to another. The value stored in
35 # this field is truncated, so if the actual share data length is >= 2**32,
36 # then the value stored in this field will be the actual share data length
37 # modulo 2**32.
38
39 class ShareFile:
40     LEASE_SIZE = struct.calcsize(">L32s32sL")
41     sharetype = "immutable"
42
43     def __init__(self, filename, max_size=None, create=False):
44         """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """
45         precondition((max_size is not None) or (not create), max_size, create)
46         self.home = filename
47         self._max_size = max_size
48         if create:
49             # touch the file, so later callers will see that we're working on
50             # it. Also construct the metadata.
51             assert not os.path.exists(self.home)
52             fileutil.make_dirs(os.path.dirname(self.home))
53             f = open(self.home, 'wb')
54             # The second field -- the four-byte share data length -- is no
55             # longer used as of Tahoe v1.3.0, but we continue to write it in
56             # there in case someone downgrades a storage server from >=
57             # Tahoe-1.3.0 to < Tahoe-1.3.0, or moves a share file from one
58             # server to another, etc. We do saturation -- a share data length
59             # larger than 2**32-1 (what can fit into the field) is marked as
60             # the largest length that can fit into the field. That way, even
61             # if this does happen, the old < v1.3.0 server will still allow
62             # clients to read the first part of the share.
63             f.write(struct.pack(">LLL", 1, min(2**32-1, max_size), 0))
64             f.close()
65             self._lease_offset = max_size + 0x0c
66             self._num_leases = 0
67         else:
68             f = open(self.home, 'rb')
69             filesize = os.path.getsize(self.home)
70             (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc))
71             f.close()
72             if version != 1:
73                 msg = "sharefile %s had version %d but we wanted 1" % \
74                       (filename, version)
75                 raise UnknownImmutableContainerVersionError(msg)
76             self._num_leases = num_leases
77             self._lease_offset = filesize - (num_leases * self.LEASE_SIZE)
78         self._data_offset = 0xc
79
80     def unlink(self):
81         os.unlink(self.home)
82
83     def read_share_data(self, offset, length):
84         precondition(offset >= 0)
85         # reads beyond the end of the data are truncated. Reads that start
86         # beyond the end of the data return an empty string. I wonder why
87         # Python doesn't do the following computation for me?
88         seekpos = self._data_offset+offset
89         fsize = os.path.getsize(self.home)
90         actuallength = max(0, min(length, fsize-seekpos))
91         if actuallength == 0:
92             return ""
93         f = open(self.home, 'rb')
94         f.seek(seekpos)
95         return f.read(actuallength)
96
97     def write_share_data(self, offset, data):
98         length = len(data)
99         precondition(offset >= 0, offset)
100         if self._max_size is not None and offset+length > self._max_size:
101             raise DataTooLargeError(self._max_size, offset, length)
102         f = open(self.home, 'rb+')
103         real_offset = self._data_offset+offset
104         f.seek(real_offset)
105         assert f.tell() == real_offset
106         f.write(data)
107         f.close()
108
109     def _write_lease_record(self, f, lease_number, lease_info):
110         offset = self._lease_offset + lease_number * self.LEASE_SIZE
111         f.seek(offset)
112         assert f.tell() == offset
113         f.write(lease_info.to_immutable_data())
114
115     def _read_num_leases(self, f):
116         f.seek(0x08)
117         (num_leases,) = struct.unpack(">L", f.read(4))
118         return num_leases
119
120     def _write_num_leases(self, f, num_leases):
121         f.seek(0x08)
122         f.write(struct.pack(">L", num_leases))
123
124     def _truncate_leases(self, f, num_leases):
125         f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
126
127     def get_leases(self):
128         """Yields a LeaseInfo instance for all leases."""
129         f = open(self.home, 'rb')
130         (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc))
131         f.seek(self._lease_offset)
132         for i in range(num_leases):
133             data = f.read(self.LEASE_SIZE)
134             if data:
135                 yield LeaseInfo().from_immutable_data(data)
136
137     def add_lease(self, lease_info):
138         f = open(self.home, 'rb+')
139         num_leases = self._read_num_leases(f)
140         self._write_lease_record(f, num_leases, lease_info)
141         self._write_num_leases(f, num_leases+1)
142         f.close()
143
144     def renew_lease(self, renew_secret, new_expire_time):
145         for i,lease in enumerate(self.get_leases()):
146             if constant_time_compare(lease.renew_secret, renew_secret):
147                 # yup. See if we need to update the owner time.
148                 if new_expire_time > lease.expiration_time:
149                     # yes
150                     lease.expiration_time = new_expire_time
151                     f = open(self.home, 'rb+')
152                     self._write_lease_record(f, i, lease)
153                     f.close()
154                 return
155         raise IndexError("unable to renew non-existent lease")
156
157     def add_or_renew_lease(self, lease_info):
158         try:
159             self.renew_lease(lease_info.renew_secret,
160                              lease_info.expiration_time)
161         except IndexError:
162             self.add_lease(lease_info)
163
164
165     def cancel_lease(self, cancel_secret):
166         """Remove a lease with the given cancel_secret. If the last lease is
167         cancelled, the file will be removed. Return the number of bytes that
168         were freed (by truncating the list of leases, and possibly by
169         deleting the file. Raise IndexError if there was no lease with the
170         given cancel_secret.
171         """
172
173         leases = list(self.get_leases())
174         num_leases_removed = 0
175         for i,lease in enumerate(leases):
176             if constant_time_compare(lease.cancel_secret, cancel_secret):
177                 leases[i] = None
178                 num_leases_removed += 1
179         if not num_leases_removed:
180             raise IndexError("unable to find matching lease to cancel")
181         if num_leases_removed:
182             # pack and write out the remaining leases. We write these out in
183             # the same order as they were added, so that if we crash while
184             # doing this, we won't lose any non-cancelled leases.
185             leases = [l for l in leases if l] # remove the cancelled leases
186             f = open(self.home, 'rb+')
187             for i,lease in enumerate(leases):
188                 self._write_lease_record(f, i, lease)
189             self._write_num_leases(f, len(leases))
190             self._truncate_leases(f, len(leases))
191             f.close()
192         space_freed = self.LEASE_SIZE * num_leases_removed
193         if not len(leases):
194             space_freed += os.stat(self.home)[stat.ST_SIZE]
195             self.unlink()
196         return space_freed
197
198
199 class BucketWriter(Referenceable):
200     implements(RIBucketWriter)
201
202     def __init__(self, ss, incominghome, finalhome, max_size, lease_info, canary):
203         self.ss = ss
204         self.incominghome = incominghome
205         self.finalhome = finalhome
206         self._max_size = max_size # don't allow the client to write more than this
207         self._canary = canary
208         self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
209         self.closed = False
210         self.throw_out_all_data = False
211         self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
212         # also, add our lease to the file now, so that other ones can be
213         # added by simultaneous uploaders
214         self._sharefile.add_lease(lease_info)
215
216     def allocated_size(self):
217         return self._max_size
218
219     def remote_write(self, offset, data):
220         start = time.time()
221         precondition(not self.closed)
222         if self.throw_out_all_data:
223             return
224         self._sharefile.write_share_data(offset, data)
225         self.ss.add_latency("write", time.time() - start)
226         self.ss.count("write")
227
228     def remote_close(self):
229         precondition(not self.closed)
230         start = time.time()
231
232         fileutil.make_dirs(os.path.dirname(self.finalhome))
233         fileutil.rename(self.incominghome, self.finalhome)
234         try:
235             # self.incominghome is like storage/shares/incoming/ab/abcde/4 .
236             # We try to delete the parent (.../ab/abcde) to avoid leaving
237             # these directories lying around forever, but the delete might
238             # fail if we're working on another share for the same storage
239             # index (like ab/abcde/5). The alternative approach would be to
240             # use a hierarchy of objects (PrefixHolder, BucketHolder,
241             # ShareWriter), each of which is responsible for a single
242             # directory on disk, and have them use reference counting of
243             # their children to know when they should do the rmdir. This
244             # approach is simpler, but relies on os.rmdir refusing to delete
245             # a non-empty directory. Do *not* use fileutil.rm_dir() here!
246             os.rmdir(os.path.dirname(self.incominghome))
247             # we also delete the grandparent (prefix) directory, .../ab ,
248             # again to avoid leaving directories lying around. This might
249             # fail if there is another bucket open that shares a prefix (like
250             # ab/abfff).
251             os.rmdir(os.path.dirname(os.path.dirname(self.incominghome)))
252             # we leave the great-grandparent (incoming/) directory in place.
253         except EnvironmentError:
254             # ignore the "can't rmdir because the directory is not empty"
255             # exceptions, those are normal consequences of the
256             # above-mentioned conditions.
257             pass
258         self._sharefile = None
259         self.closed = True
260         self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
261
262         filelen = os.stat(self.finalhome)[stat.ST_SIZE]
263         self.ss.bucket_writer_closed(self, filelen)
264         self.ss.add_latency("close", time.time() - start)
265         self.ss.count("close")
266
267     def _disconnected(self):
268         if not self.closed:
269             self._abort()
270
271     def remote_abort(self):
272         log.msg("storage: aborting sharefile %s" % self.incominghome,
273                 facility="tahoe.storage", level=log.UNUSUAL)
274         if not self.closed:
275             self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
276         self._abort()
277         self.ss.count("abort")
278
279     def _abort(self):
280         if self.closed:
281             return
282
283         os.remove(self.incominghome)
284         # if we were the last share to be moved, remove the incoming/
285         # directory that was our parent
286         parentdir = os.path.split(self.incominghome)[0]
287         if not os.listdir(parentdir):
288             os.rmdir(parentdir)
289         self._sharefile = None
290
291         # We are now considered closed for further writing. We must tell
292         # the storage server about this so that it stops expecting us to
293         # use the space it allocated for us earlier.
294         self.closed = True
295         self.ss.bucket_writer_closed(self, 0)
296
297
298 class BucketReader(Referenceable):
299     implements(RIBucketReader)
300
301     def __init__(self, ss, sharefname, storage_index=None, shnum=None):
302         self.ss = ss
303         self._share_file = ShareFile(sharefname)
304         self.storage_index = storage_index
305         self.shnum = shnum
306
307     def __repr__(self):
308         return "<%s %s %s>" % (self.__class__.__name__,
309                                base32.b2a_l(self.storage_index[:8], 60),
310                                self.shnum)
311
312     def remote_read(self, offset, length):
313         start = time.time()
314         data = self._share_file.read_share_data(offset, length)
315         self.ss.add_latency("read", time.time() - start)
316         self.ss.count("read")
317         return data
318
319     def remote_advise_corrupt_share(self, reason):
320         return self.ss.remote_advise_corrupt_share("immutable",
321                                                    self.storage_index,
322                                                    self.shnum,
323                                                    reason)