]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/layout.py
break storage.py into smaller pieces in storage/*.py . No behavioral changes.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / layout.py
1 import struct
2 from zope.interface import implements
3 from twisted.internet import defer
4 from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
5      FileTooLargeError, HASH_SIZE
6 from allmydata.util import mathutil, idlib, observer
7 from allmydata.util.assertutil import precondition
8 from allmydata.storage.server import si_b2a
9
10 class LayoutInvalid(Exception):
11     """ There is something wrong with these bytes so they can't be interpreted as the kind of
12     immutable file that I know how to download. """
13     pass
14
15 class RidiculouslyLargeURIExtensionBlock(LayoutInvalid):
16     """ When downloading a file, the length of the URI Extension Block was given as >= 2**32.
17     This means the share data must have been corrupted, or else the original uploader of the
18     file wrote a ridiculous value into the URI Extension Block length. """
19     pass
20
21 class ShareVersionIncompatible(LayoutInvalid):
22     """ When downloading a share, its format was not one of the formats we know how to
23     parse. """
24     pass
25
26 """
27 Share data is written in a file. At the start of the file, there is a series of four-byte
28 big-endian offset values, which indicate where each section starts. Each offset is measured from
29 the beginning of the share data.
30
31 0x00: version number (=00 00 00 01)
32 0x04: block size # See Footnote 1 below.
33 0x08: share data size # See Footnote 1 below.
34 0x0c: offset of data (=00 00 00 24)
35 0x10: offset of plaintext_hash_tree UNUSED
36 0x14: offset of crypttext_hash_tree
37 0x18: offset of block_hashes
38 0x1c: offset of share_hashes
39 0x20: offset of uri_extension_length + uri_extension
40 0x24: start of data
41 ?   : start of plaintext_hash_tree UNUSED
42 ?   : start of crypttext_hash_tree
43 ?   : start of block_hashes
44 ?   : start of share_hashes
45        each share_hash is written as a two-byte (big-endian) hashnum
46        followed by the 32-byte SHA-256 hash. We store only the hashes
47        necessary to validate the share hash root
48 ?   : start of uri_extension_length (four-byte big-endian value)
49 ?   : start of uri_extension
50 """
51
52 """
53 v2 shares: these use 8-byte offsets to remove two of the three ~12GiB size
54 limitations described in #346.
55
56 0x00: version number (=00 00 00 02)
57 0x04: block size # See Footnote 1 below.
58 0x0c: share data size # See Footnote 1 below.
59 0x14: offset of data (=00 00 00 00 00 00 00 44)
60 0x1c: offset of plaintext_hash_tree UNUSED
61 0x24: offset of crypttext_hash_tree
62 0x2c: offset of block_hashes
63 0x34: offset of share_hashes
64 0x3c: offset of uri_extension_length + uri_extension
65 0x44: start of data
66     : rest of share is the same as v1, above
67 ...   ...
68 ?   : start of uri_extension_length (eight-byte big-endian value)
69 ?   : start of uri_extension
70 """
71
72 # Footnote 1: as of Tahoe v1.3.0 these fields are not used when reading, but
73 # they are still provided when writing so that older versions of Tahoe can
74 # read them.
75
76 def make_write_bucket_proxy(rref, data_size, block_size, num_segments,
77                             num_share_hashes, uri_extension_size_max, nodeid):
78     # Use layout v1 for small files, so they'll be readable by older versions
79     # (<tahoe-1.3.0). Use layout v2 for large files; they'll only be readable
80     # by tahoe-1.3.0 or later.
81     try:
82         wbp = WriteBucketProxy(rref, data_size, block_size, num_segments,
83                                num_share_hashes, uri_extension_size_max, nodeid)
84     except FileTooLargeError:
85         wbp = WriteBucketProxy_v2(rref, data_size, block_size, num_segments,
86                                   num_share_hashes, uri_extension_size_max, nodeid)
87     return wbp
88
89 class WriteBucketProxy:
90     implements(IStorageBucketWriter)
91     fieldsize = 4
92     fieldstruct = ">L"
93
94     def __init__(self, rref, data_size, block_size, num_segments,
95                  num_share_hashes, uri_extension_size_max, nodeid):
96         self._rref = rref
97         self._data_size = data_size
98         self._block_size = block_size
99         self._num_segments = num_segments
100         self._nodeid = nodeid
101
102         effective_segments = mathutil.next_power_of_k(num_segments,2)
103         self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
104         # how many share hashes are included in each share? This will be
105         # about ln2(num_shares).
106         self._share_hashtree_size = num_share_hashes * (2+HASH_SIZE)
107         # we commit to not sending a uri extension larger than this
108         self._uri_extension_size_max = uri_extension_size_max
109
110         self._create_offsets(block_size, data_size)
111
112     def get_allocated_size(self):
113         return (self._offsets['uri_extension'] + self.fieldsize +
114                 self._uri_extension_size_max)
115
116     def _create_offsets(self, block_size, data_size):
117         if block_size >= 2**32 or data_size >= 2**32:
118             raise FileTooLargeError("This file is too large to be uploaded (data_size).")
119
120         offsets = self._offsets = {}
121         x = 0x24
122         offsets['data'] = x
123         x += data_size
124         offsets['plaintext_hash_tree'] = x # UNUSED
125         x += self._segment_hash_size
126         offsets['crypttext_hash_tree'] = x
127         x += self._segment_hash_size
128         offsets['block_hashes'] = x
129         x += self._segment_hash_size
130         offsets['share_hashes'] = x
131         x += self._share_hashtree_size
132         offsets['uri_extension'] = x
133
134         if x >= 2**32:
135             raise FileTooLargeError("This file is too large to be uploaded (offsets).")
136
137         offset_data = struct.pack(">LLLLLLLLL",
138                                   1, # version number
139                                   block_size,
140                                   data_size,
141                                   offsets['data'],
142                                   offsets['plaintext_hash_tree'], # UNUSED
143                                   offsets['crypttext_hash_tree'],
144                                   offsets['block_hashes'],
145                                   offsets['share_hashes'],
146                                   offsets['uri_extension'],
147                                   )
148         assert len(offset_data) == 0x24
149         self._offset_data = offset_data
150
151     def __repr__(self):
152         if self._nodeid:
153             nodeid_s = idlib.nodeid_b2a(self._nodeid)
154         else:
155             nodeid_s = "[None]"
156         return "<WriteBucketProxy for node %s>" % nodeid_s
157
158     def put_header(self):
159         return self._write(0, self._offset_data)
160
161     def put_block(self, segmentnum, data):
162         offset = self._offsets['data'] + segmentnum * self._block_size
163         assert offset + len(data) <= self._offsets['uri_extension']
164         assert isinstance(data, str)
165         if segmentnum < self._num_segments-1:
166             precondition(len(data) == self._block_size,
167                          len(data), self._block_size)
168         else:
169             precondition(len(data) == (self._data_size -
170                                        (self._block_size *
171                                         (self._num_segments - 1))),
172                          len(data), self._block_size)
173         return self._write(offset, data)
174
175     def put_crypttext_hashes(self, hashes):
176         offset = self._offsets['crypttext_hash_tree']
177         assert isinstance(hashes, list)
178         data = "".join(hashes)
179         precondition(len(data) == self._segment_hash_size,
180                      len(data), self._segment_hash_size)
181         precondition(offset + len(data) <= self._offsets['block_hashes'],
182                      offset, len(data), offset+len(data),
183                      self._offsets['block_hashes'])
184         return self._write(offset, data)
185
186     def put_block_hashes(self, blockhashes):
187         offset = self._offsets['block_hashes']
188         assert isinstance(blockhashes, list)
189         data = "".join(blockhashes)
190         precondition(len(data) == self._segment_hash_size,
191                      len(data), self._segment_hash_size)
192         precondition(offset + len(data) <= self._offsets['share_hashes'],
193                      offset, len(data), offset+len(data),
194                      self._offsets['share_hashes'])
195         return self._write(offset, data)
196
197     def put_share_hashes(self, sharehashes):
198         # sharehashes is a list of (index, hash) tuples, so they get stored
199         # as 2+32=34 bytes each
200         offset = self._offsets['share_hashes']
201         assert isinstance(sharehashes, list)
202         data = "".join([struct.pack(">H", hashnum) + hashvalue
203                         for hashnum,hashvalue in sharehashes])
204         precondition(len(data) == self._share_hashtree_size,
205                      len(data), self._share_hashtree_size)
206         precondition(offset + len(data) <= self._offsets['uri_extension'],
207                      offset, len(data), offset+len(data),
208                      self._offsets['uri_extension'])
209         return self._write(offset, data)
210
211     def put_uri_extension(self, data):
212         offset = self._offsets['uri_extension']
213         assert isinstance(data, str)
214         precondition(len(data) <= self._uri_extension_size_max,
215                      len(data), self._uri_extension_size_max)
216         length = struct.pack(self.fieldstruct, len(data))
217         return self._write(offset, length+data)
218
219     def _write(self, offset, data):
220         # TODO: for small shares, buffer the writes and do just a single call
221         return self._rref.callRemote("write", offset, data)
222
223     def close(self):
224         return self._rref.callRemote("close")
225
226     def abort(self):
227         return self._rref.callRemoteOnly("abort")
228
229 class WriteBucketProxy_v2(WriteBucketProxy):
230     fieldsize = 8
231     fieldstruct = ">Q"
232
233     def _create_offsets(self, block_size, data_size):
234         if block_size >= 2**64 or data_size >= 2**64:
235             raise FileTooLargeError("This file is too large to be uploaded (data_size).")
236
237         offsets = self._offsets = {}
238         x = 0x44
239         offsets['data'] = x
240         x += data_size
241         offsets['plaintext_hash_tree'] = x # UNUSED
242         x += self._segment_hash_size
243         offsets['crypttext_hash_tree'] = x
244         x += self._segment_hash_size
245         offsets['block_hashes'] = x
246         x += self._segment_hash_size
247         offsets['share_hashes'] = x
248         x += self._share_hashtree_size
249         offsets['uri_extension'] = x
250
251         if x >= 2**64:
252             raise FileTooLargeError("This file is too large to be uploaded (offsets).")
253
254         offset_data = struct.pack(">LQQQQQQQQ",
255                                   2, # version number
256                                   block_size,
257                                   data_size,
258                                   offsets['data'],
259                                   offsets['plaintext_hash_tree'], # UNUSED
260                                   offsets['crypttext_hash_tree'],
261                                   offsets['block_hashes'],
262                                   offsets['share_hashes'],
263                                   offsets['uri_extension'],
264                                   )
265         assert len(offset_data) == 0x44, len(offset_data)
266         self._offset_data = offset_data
267
268 class ReadBucketProxy:
269     implements(IStorageBucketReader)
270
271     MAX_UEB_SIZE = 2000 # actual size is closer to 419, but varies by a few bytes
272
273     def __init__(self, rref, peerid, storage_index):
274         self._rref = rref
275         self._peerid = peerid
276         peer_id_s = idlib.shortnodeid_b2a(peerid)
277         storage_index_s = si_b2a(storage_index)
278         self._reprstr = "<ReadBucketProxy %s to peer [%s] SI %s>" % (id(self), peer_id_s, storage_index_s)
279         self._started = False # sent request to server
280         self._ready = observer.OneShotObserverList() # got response from server
281
282     def get_peerid(self):
283         return self._peerid
284
285     def __repr__(self):
286         return self._reprstr
287
288     def _start_if_needed(self):
289         """ Returns a deferred that will be fired when I'm ready to return data, or errbacks if
290         the starting (header reading and parsing) process fails."""
291         if not self._started:
292             self._start()
293         return self._ready.when_fired()
294
295     def _start(self):
296         self._started = True
297         # TODO: for small shares, read the whole bucket in _start()
298         d = self._fetch_header()
299         d.addCallback(self._parse_offsets)
300         # XXX The following two callbacks implement a slightly faster/nicer way to get the ueb
301         # and sharehashtree, but it requires that the storage server be >= v1.3.0.
302         # d.addCallback(self._fetch_sharehashtree_and_ueb)
303         # d.addCallback(self._parse_sharehashtree_and_ueb)
304         def _fail_waiters(f):
305             self._ready.fire(f)
306         def _notify_waiters(result):
307             self._ready.fire(result)
308         d.addCallbacks(_notify_waiters, _fail_waiters)
309         return d
310
311     def _fetch_header(self):
312         return self._read(0, 0x44)
313
314     def _parse_offsets(self, data):
315         precondition(len(data) >= 0x4)
316         self._offsets = {}
317         (version,) = struct.unpack(">L", data[0:4])
318         if version != 1 and version != 2:
319             raise ShareVersionIncompatible(version)
320
321         if version == 1:
322             precondition(len(data) >= 0x24)
323             x = 0x0c
324             fieldsize = 0x4
325             fieldstruct = ">L"
326         else:
327             precondition(len(data) >= 0x44)
328             x = 0x14
329             fieldsize = 0x8
330             fieldstruct = ">Q"
331
332         self._version = version
333         self._fieldsize = fieldsize
334         self._fieldstruct = fieldstruct
335
336         for field in ( 'data',
337                        'plaintext_hash_tree', # UNUSED
338                        'crypttext_hash_tree',
339                        'block_hashes',
340                        'share_hashes',
341                        'uri_extension',
342                        ):
343             offset = struct.unpack(fieldstruct, data[x:x+fieldsize])[0]
344             x += fieldsize
345             self._offsets[field] = offset
346         return self._offsets
347
348     def _fetch_sharehashtree_and_ueb(self, offsets):
349         sharehashtree_size = offsets['uri_extension'] - offsets['share_hashes']
350         return self._read(offsets['share_hashes'], self.MAX_UEB_SIZE+sharehashtree_size)
351
352     def _parse_sharehashtree_and_ueb(self, data):
353         sharehashtree_size = self._offsets['uri_extension'] - self._offsets['share_hashes']
354         if len(data) < sharehashtree_size:
355             raise LayoutInvalid("share hash tree truncated -- should have at least %d bytes -- not %d" % (sharehashtree_size, len(data)))
356         if sharehashtree_size % (2+HASH_SIZE) != 0:
357             raise LayoutInvalid("share hash tree malformed -- should have an even multiple of %d bytes -- not %d" % (2+HASH_SIZE, sharehashtree_size))
358         self._share_hashes = []
359         for i in range(0, sharehashtree_size, 2+HASH_SIZE):
360             hashnum = struct.unpack(">H", data[i:i+2])[0]
361             hashvalue = data[i+2:i+2+HASH_SIZE]
362             self._share_hashes.append( (hashnum, hashvalue) )
363
364         i = self._offsets['uri_extension']-self._offsets['share_hashes']
365         if len(data) < i+self._fieldsize:
366             raise LayoutInvalid("not enough bytes to encode URI length -- should be at least %d bytes long, not %d " % (i+self._fieldsize, len(data),))
367         length = struct.unpack(self._fieldstruct, data[i:i+self._fieldsize])[0]
368         self._ueb_data = data[i+self._fieldsize:i+self._fieldsize+length]
369
370     def _get_block_data(self, unused, blocknum, blocksize, thisblocksize):
371         offset = self._offsets['data'] + blocknum * blocksize
372         return self._read(offset, thisblocksize)
373
374     def get_block_data(self, blocknum, blocksize, thisblocksize):
375         d = self._start_if_needed()
376         d.addCallback(self._get_block_data, blocknum, blocksize, thisblocksize)
377         return d
378
379     def _str2l(self, s):
380         """ split string (pulled from storage) into a list of blockids """
381         return [ s[i:i+HASH_SIZE]
382                  for i in range(0, len(s), HASH_SIZE) ]
383
384     def _get_crypttext_hashes(self, unused=None):
385         offset = self._offsets['crypttext_hash_tree']
386         size = self._offsets['block_hashes'] - offset
387         d = self._read(offset, size)
388         d.addCallback(self._str2l)
389         return d
390
391     def get_crypttext_hashes(self):
392         d = self._start_if_needed()
393         d.addCallback(self._get_crypttext_hashes)
394         return d
395
396     def _get_block_hashes(self, unused=None, at_least_these=()):
397         # TODO: fetch only at_least_these instead of all of them.
398         offset = self._offsets['block_hashes']
399         size = self._offsets['share_hashes'] - offset
400         d = self._read(offset, size)
401         d.addCallback(self._str2l)
402         return d
403
404     def get_block_hashes(self, at_least_these=()):
405         if at_least_these:
406             d = self._start_if_needed()
407             d.addCallback(self._get_block_hashes, at_least_these)
408             return d
409         else:
410             return defer.succeed([])
411
412     def _get_share_hashes(self, unused=None):
413         if hasattr(self, '_share_hashes'):
414             return self._share_hashes
415         else:
416             return self._get_share_hashes_the_old_way()
417         return self._share_hashes
418
419     def get_share_hashes(self):
420         d = self._start_if_needed()
421         d.addCallback(self._get_share_hashes)
422         return d
423
424     def _get_share_hashes_the_old_way(self):
425         """ Tahoe storage servers < v1.3.0 would return an error if you tried to read past the
426         end of the share, so we need to use the offset and read just that much."""
427         offset = self._offsets['share_hashes']
428         size = self._offsets['uri_extension'] - offset
429         if size % (2+HASH_SIZE) != 0:
430             raise LayoutInvalid("share hash tree corrupted -- should occupy a multiple of %d bytes, not %d bytes" % ((2+HASH_SIZE), size))
431         d = self._read(offset, size)
432         def _unpack_share_hashes(data):
433             if len(data) != size:
434                 raise LayoutInvalid("share hash tree corrupted -- got a short read of the share data -- should have gotten %d, not %d bytes" % (size, len(data)))
435             hashes = []
436             for i in range(0, size, 2+HASH_SIZE):
437                 hashnum = struct.unpack(">H", data[i:i+2])[0]
438                 hashvalue = data[i+2:i+2+HASH_SIZE]
439                 hashes.append( (hashnum, hashvalue) )
440             return hashes
441         d.addCallback(_unpack_share_hashes)
442         return d
443
444     def _get_uri_extension_the_old_way(self, unused=None):
445         """ Tahoe storage servers < v1.3.0 would return an error if you tried to read past the
446         end of the share, so we need to fetch the UEB size and then read just that much."""
447         offset = self._offsets['uri_extension']
448         d = self._read(offset, self._fieldsize)
449         def _got_length(data):
450             if len(data) != self._fieldsize:
451                 raise LayoutInvalid("not enough bytes to encode URI length -- should be %d bytes long, not %d " % (self._fieldsize, len(data),))
452             length = struct.unpack(self._fieldstruct, data)[0]
453             if length >= 2**31:
454                 # URI extension blocks are around 419 bytes long, so this must be corrupted.
455                 # Anyway, the foolscap interface schema for "read" will not allow >= 2**31 bytes
456                 # length.
457                 raise RidiculouslyLargeURIExtensionBlock(length)
458
459             return self._read(offset+self._fieldsize, length)
460         d.addCallback(_got_length)
461         return d
462
463     def _get_uri_extension(self, unused=None):
464         if hasattr(self, '_ueb_data'):
465             return self._ueb_data
466         else:
467             return self._get_uri_extension_the_old_way()
468
469     def get_uri_extension(self):
470         d = self._start_if_needed()
471         d.addCallback(self._get_uri_extension)
472         return d
473
474     def _read(self, offset, length):
475         return self._rref.callRemote("read", offset, length)