2 from zope.interface import implements
3 from twisted.internet import defer
4 from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
5 FileTooLargeError, HASH_SIZE
6 from allmydata.util import mathutil, idlib, observer
7 from allmydata.util.assertutil import precondition
8 from allmydata.storage.server import si_b2a
10 class LayoutInvalid(Exception):
11 """ There is something wrong with these bytes so they can't be interpreted as the kind of
12 immutable file that I know how to download. """
15 class RidiculouslyLargeURIExtensionBlock(LayoutInvalid):
16 """ When downloading a file, the length of the URI Extension Block was given as >= 2**32.
17 This means the share data must have been corrupted, or else the original uploader of the
18 file wrote a ridiculous value into the URI Extension Block length. """
21 class ShareVersionIncompatible(LayoutInvalid):
22 """ When downloading a share, its format was not one of the formats we know how to
27 Share data is written in a file. At the start of the file, there is a series of four-byte
28 big-endian offset values, which indicate where each section starts. Each offset is measured from
29 the beginning of the share data.
31 0x00: version number (=00 00 00 01)
32 0x04: block size # See Footnote 1 below.
33 0x08: share data size # See Footnote 1 below.
34 0x0c: offset of data (=00 00 00 24)
35 0x10: offset of plaintext_hash_tree UNUSED
36 0x14: offset of crypttext_hash_tree
37 0x18: offset of block_hashes
38 0x1c: offset of share_hashes
39 0x20: offset of uri_extension_length + uri_extension
41 ? : start of plaintext_hash_tree UNUSED
42 ? : start of crypttext_hash_tree
43 ? : start of block_hashes
44 ? : start of share_hashes
45 each share_hash is written as a two-byte (big-endian) hashnum
46 followed by the 32-byte SHA-256 hash. We store only the hashes
47 necessary to validate the share hash root
48 ? : start of uri_extension_length (four-byte big-endian value)
49 ? : start of uri_extension
53 v2 shares: these use 8-byte offsets to remove two of the three ~12GiB size
54 limitations described in #346.
56 0x00: version number (=00 00 00 02)
57 0x04: block size # See Footnote 1 below.
58 0x0c: share data size # See Footnote 1 below.
59 0x14: offset of data (=00 00 00 00 00 00 00 44)
60 0x1c: offset of plaintext_hash_tree UNUSED
61 0x24: offset of crypttext_hash_tree
62 0x2c: offset of block_hashes
63 0x34: offset of share_hashes
64 0x3c: offset of uri_extension_length + uri_extension
66 : rest of share is the same as v1, above
68 ? : start of uri_extension_length (eight-byte big-endian value)
69 ? : start of uri_extension
72 # Footnote 1: as of Tahoe v1.3.0 these fields are not used when reading, but
73 # they are still provided when writing so that older versions of Tahoe can
76 def make_write_bucket_proxy(rref, data_size, block_size, num_segments,
77 num_share_hashes, uri_extension_size_max, nodeid):
78 # Use layout v1 for small files, so they'll be readable by older versions
79 # (<tahoe-1.3.0). Use layout v2 for large files; they'll only be readable
80 # by tahoe-1.3.0 or later.
82 wbp = WriteBucketProxy(rref, data_size, block_size, num_segments,
83 num_share_hashes, uri_extension_size_max, nodeid)
84 except FileTooLargeError:
85 wbp = WriteBucketProxy_v2(rref, data_size, block_size, num_segments,
86 num_share_hashes, uri_extension_size_max, nodeid)
89 class WriteBucketProxy:
90 implements(IStorageBucketWriter)
94 def __init__(self, rref, data_size, block_size, num_segments,
95 num_share_hashes, uri_extension_size_max, nodeid):
97 self._data_size = data_size
98 self._block_size = block_size
99 self._num_segments = num_segments
100 self._nodeid = nodeid
102 effective_segments = mathutil.next_power_of_k(num_segments,2)
103 self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
104 # how many share hashes are included in each share? This will be
105 # about ln2(num_shares).
106 self._share_hashtree_size = num_share_hashes * (2+HASH_SIZE)
107 # we commit to not sending a uri extension larger than this
108 self._uri_extension_size_max = uri_extension_size_max
110 self._create_offsets(block_size, data_size)
112 def get_allocated_size(self):
113 return (self._offsets['uri_extension'] + self.fieldsize +
114 self._uri_extension_size_max)
116 def _create_offsets(self, block_size, data_size):
117 if block_size >= 2**32 or data_size >= 2**32:
118 raise FileTooLargeError("This file is too large to be uploaded (data_size).")
120 offsets = self._offsets = {}
124 offsets['plaintext_hash_tree'] = x # UNUSED
125 x += self._segment_hash_size
126 offsets['crypttext_hash_tree'] = x
127 x += self._segment_hash_size
128 offsets['block_hashes'] = x
129 x += self._segment_hash_size
130 offsets['share_hashes'] = x
131 x += self._share_hashtree_size
132 offsets['uri_extension'] = x
135 raise FileTooLargeError("This file is too large to be uploaded (offsets).")
137 offset_data = struct.pack(">LLLLLLLLL",
142 offsets['plaintext_hash_tree'], # UNUSED
143 offsets['crypttext_hash_tree'],
144 offsets['block_hashes'],
145 offsets['share_hashes'],
146 offsets['uri_extension'],
148 assert len(offset_data) == 0x24
149 self._offset_data = offset_data
153 nodeid_s = idlib.nodeid_b2a(self._nodeid)
156 return "<WriteBucketProxy for node %s>" % nodeid_s
158 def put_header(self):
159 return self._write(0, self._offset_data)
161 def put_block(self, segmentnum, data):
162 offset = self._offsets['data'] + segmentnum * self._block_size
163 assert offset + len(data) <= self._offsets['uri_extension']
164 assert isinstance(data, str)
165 if segmentnum < self._num_segments-1:
166 precondition(len(data) == self._block_size,
167 len(data), self._block_size)
169 precondition(len(data) == (self._data_size -
171 (self._num_segments - 1))),
172 len(data), self._block_size)
173 return self._write(offset, data)
175 def put_crypttext_hashes(self, hashes):
176 offset = self._offsets['crypttext_hash_tree']
177 assert isinstance(hashes, list)
178 data = "".join(hashes)
179 precondition(len(data) == self._segment_hash_size,
180 len(data), self._segment_hash_size)
181 precondition(offset + len(data) <= self._offsets['block_hashes'],
182 offset, len(data), offset+len(data),
183 self._offsets['block_hashes'])
184 return self._write(offset, data)
186 def put_block_hashes(self, blockhashes):
187 offset = self._offsets['block_hashes']
188 assert isinstance(blockhashes, list)
189 data = "".join(blockhashes)
190 precondition(len(data) == self._segment_hash_size,
191 len(data), self._segment_hash_size)
192 precondition(offset + len(data) <= self._offsets['share_hashes'],
193 offset, len(data), offset+len(data),
194 self._offsets['share_hashes'])
195 return self._write(offset, data)
197 def put_share_hashes(self, sharehashes):
198 # sharehashes is a list of (index, hash) tuples, so they get stored
199 # as 2+32=34 bytes each
200 offset = self._offsets['share_hashes']
201 assert isinstance(sharehashes, list)
202 data = "".join([struct.pack(">H", hashnum) + hashvalue
203 for hashnum,hashvalue in sharehashes])
204 precondition(len(data) == self._share_hashtree_size,
205 len(data), self._share_hashtree_size)
206 precondition(offset + len(data) <= self._offsets['uri_extension'],
207 offset, len(data), offset+len(data),
208 self._offsets['uri_extension'])
209 return self._write(offset, data)
211 def put_uri_extension(self, data):
212 offset = self._offsets['uri_extension']
213 assert isinstance(data, str)
214 precondition(len(data) <= self._uri_extension_size_max,
215 len(data), self._uri_extension_size_max)
216 length = struct.pack(self.fieldstruct, len(data))
217 return self._write(offset, length+data)
219 def _write(self, offset, data):
220 # TODO: for small shares, buffer the writes and do just a single call
221 return self._rref.callRemote("write", offset, data)
224 return self._rref.callRemote("close")
227 return self._rref.callRemoteOnly("abort")
229 class WriteBucketProxy_v2(WriteBucketProxy):
233 def _create_offsets(self, block_size, data_size):
234 if block_size >= 2**64 or data_size >= 2**64:
235 raise FileTooLargeError("This file is too large to be uploaded (data_size).")
237 offsets = self._offsets = {}
241 offsets['plaintext_hash_tree'] = x # UNUSED
242 x += self._segment_hash_size
243 offsets['crypttext_hash_tree'] = x
244 x += self._segment_hash_size
245 offsets['block_hashes'] = x
246 x += self._segment_hash_size
247 offsets['share_hashes'] = x
248 x += self._share_hashtree_size
249 offsets['uri_extension'] = x
252 raise FileTooLargeError("This file is too large to be uploaded (offsets).")
254 offset_data = struct.pack(">LQQQQQQQQ",
259 offsets['plaintext_hash_tree'], # UNUSED
260 offsets['crypttext_hash_tree'],
261 offsets['block_hashes'],
262 offsets['share_hashes'],
263 offsets['uri_extension'],
265 assert len(offset_data) == 0x44, len(offset_data)
266 self._offset_data = offset_data
268 class ReadBucketProxy:
269 implements(IStorageBucketReader)
271 MAX_UEB_SIZE = 2000 # actual size is closer to 419, but varies by a few bytes
273 def __init__(self, rref, peerid, storage_index):
275 self._peerid = peerid
276 peer_id_s = idlib.shortnodeid_b2a(peerid)
277 storage_index_s = si_b2a(storage_index)
278 self._reprstr = "<ReadBucketProxy %s to peer [%s] SI %s>" % (id(self), peer_id_s, storage_index_s)
279 self._started = False # sent request to server
280 self._ready = observer.OneShotObserverList() # got response from server
282 def get_peerid(self):
288 def _start_if_needed(self):
289 """ Returns a deferred that will be fired when I'm ready to return data, or errbacks if
290 the starting (header reading and parsing) process fails."""
291 if not self._started:
293 return self._ready.when_fired()
297 # TODO: for small shares, read the whole bucket in _start()
298 d = self._fetch_header()
299 d.addCallback(self._parse_offsets)
300 # XXX The following two callbacks implement a slightly faster/nicer way to get the ueb
301 # and sharehashtree, but it requires that the storage server be >= v1.3.0.
302 # d.addCallback(self._fetch_sharehashtree_and_ueb)
303 # d.addCallback(self._parse_sharehashtree_and_ueb)
304 def _fail_waiters(f):
306 def _notify_waiters(result):
307 self._ready.fire(result)
308 d.addCallbacks(_notify_waiters, _fail_waiters)
311 def _fetch_header(self):
312 return self._read(0, 0x44)
314 def _parse_offsets(self, data):
315 precondition(len(data) >= 0x4)
317 (version,) = struct.unpack(">L", data[0:4])
318 if version != 1 and version != 2:
319 raise ShareVersionIncompatible(version)
322 precondition(len(data) >= 0x24)
327 precondition(len(data) >= 0x44)
332 self._version = version
333 self._fieldsize = fieldsize
334 self._fieldstruct = fieldstruct
336 for field in ( 'data',
337 'plaintext_hash_tree', # UNUSED
338 'crypttext_hash_tree',
343 offset = struct.unpack(fieldstruct, data[x:x+fieldsize])[0]
345 self._offsets[field] = offset
348 def _fetch_sharehashtree_and_ueb(self, offsets):
349 sharehashtree_size = offsets['uri_extension'] - offsets['share_hashes']
350 return self._read(offsets['share_hashes'], self.MAX_UEB_SIZE+sharehashtree_size)
352 def _parse_sharehashtree_and_ueb(self, data):
353 sharehashtree_size = self._offsets['uri_extension'] - self._offsets['share_hashes']
354 if len(data) < sharehashtree_size:
355 raise LayoutInvalid("share hash tree truncated -- should have at least %d bytes -- not %d" % (sharehashtree_size, len(data)))
356 if sharehashtree_size % (2+HASH_SIZE) != 0:
357 raise LayoutInvalid("share hash tree malformed -- should have an even multiple of %d bytes -- not %d" % (2+HASH_SIZE, sharehashtree_size))
358 self._share_hashes = []
359 for i in range(0, sharehashtree_size, 2+HASH_SIZE):
360 hashnum = struct.unpack(">H", data[i:i+2])[0]
361 hashvalue = data[i+2:i+2+HASH_SIZE]
362 self._share_hashes.append( (hashnum, hashvalue) )
364 i = self._offsets['uri_extension']-self._offsets['share_hashes']
365 if len(data) < i+self._fieldsize:
366 raise LayoutInvalid("not enough bytes to encode URI length -- should be at least %d bytes long, not %d " % (i+self._fieldsize, len(data),))
367 length = struct.unpack(self._fieldstruct, data[i:i+self._fieldsize])[0]
368 self._ueb_data = data[i+self._fieldsize:i+self._fieldsize+length]
370 def _get_block_data(self, unused, blocknum, blocksize, thisblocksize):
371 offset = self._offsets['data'] + blocknum * blocksize
372 return self._read(offset, thisblocksize)
374 def get_block_data(self, blocknum, blocksize, thisblocksize):
375 d = self._start_if_needed()
376 d.addCallback(self._get_block_data, blocknum, blocksize, thisblocksize)
380 """ split string (pulled from storage) into a list of blockids """
381 return [ s[i:i+HASH_SIZE]
382 for i in range(0, len(s), HASH_SIZE) ]
384 def _get_crypttext_hashes(self, unused=None):
385 offset = self._offsets['crypttext_hash_tree']
386 size = self._offsets['block_hashes'] - offset
387 d = self._read(offset, size)
388 d.addCallback(self._str2l)
391 def get_crypttext_hashes(self):
392 d = self._start_if_needed()
393 d.addCallback(self._get_crypttext_hashes)
396 def _get_block_hashes(self, unused=None, at_least_these=()):
397 # TODO: fetch only at_least_these instead of all of them.
398 offset = self._offsets['block_hashes']
399 size = self._offsets['share_hashes'] - offset
400 d = self._read(offset, size)
401 d.addCallback(self._str2l)
404 def get_block_hashes(self, at_least_these=()):
406 d = self._start_if_needed()
407 d.addCallback(self._get_block_hashes, at_least_these)
410 return defer.succeed([])
412 def _get_share_hashes(self, unused=None):
413 if hasattr(self, '_share_hashes'):
414 return self._share_hashes
416 return self._get_share_hashes_the_old_way()
417 return self._share_hashes
419 def get_share_hashes(self):
420 d = self._start_if_needed()
421 d.addCallback(self._get_share_hashes)
424 def _get_share_hashes_the_old_way(self):
425 """ Tahoe storage servers < v1.3.0 would return an error if you tried to read past the
426 end of the share, so we need to use the offset and read just that much."""
427 offset = self._offsets['share_hashes']
428 size = self._offsets['uri_extension'] - offset
429 if size % (2+HASH_SIZE) != 0:
430 raise LayoutInvalid("share hash tree corrupted -- should occupy a multiple of %d bytes, not %d bytes" % ((2+HASH_SIZE), size))
431 d = self._read(offset, size)
432 def _unpack_share_hashes(data):
433 if len(data) != size:
434 raise LayoutInvalid("share hash tree corrupted -- got a short read of the share data -- should have gotten %d, not %d bytes" % (size, len(data)))
436 for i in range(0, size, 2+HASH_SIZE):
437 hashnum = struct.unpack(">H", data[i:i+2])[0]
438 hashvalue = data[i+2:i+2+HASH_SIZE]
439 hashes.append( (hashnum, hashvalue) )
441 d.addCallback(_unpack_share_hashes)
444 def _get_uri_extension_the_old_way(self, unused=None):
445 """ Tahoe storage servers < v1.3.0 would return an error if you tried to read past the
446 end of the share, so we need to fetch the UEB size and then read just that much."""
447 offset = self._offsets['uri_extension']
448 d = self._read(offset, self._fieldsize)
449 def _got_length(data):
450 if len(data) != self._fieldsize:
451 raise LayoutInvalid("not enough bytes to encode URI length -- should be %d bytes long, not %d " % (self._fieldsize, len(data),))
452 length = struct.unpack(self._fieldstruct, data)[0]
454 # URI extension blocks are around 419 bytes long, so this must be corrupted.
455 # Anyway, the foolscap interface schema for "read" will not allow >= 2**31 bytes
457 raise RidiculouslyLargeURIExtensionBlock(length)
459 return self._read(offset+self._fieldsize, length)
460 d.addCallback(_got_length)
463 def _get_uri_extension(self, unused=None):
464 if hasattr(self, '_ueb_data'):
465 return self._ueb_data
467 return self._get_uri_extension_the_old_way()
469 def get_uri_extension(self):
470 d = self._start_if_needed()
471 d.addCallback(self._get_uri_extension)
474 def _read(self, offset, length):
475 return self._rref.callRemote("read", offset, length)