1 # -*- test-case-name: allmydata.test.test_encode -*-
4 from zope.interface import implements
5 from twisted.internet import defer
6 from foolscap import eventual
7 from allmydata import uri
8 from allmydata.hashtree import HashTree
9 from allmydata.util import mathutil, hashutil, idlib, log
10 from allmydata.util.assertutil import _assert, precondition
11 from allmydata.codec import CRSEncoder
12 from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
16 The goal of the encoder is to turn the original file into a series of
17 'shares'. Each share is going to a 'shareholder' (nominally each shareholder
18 is a different host, but for small grids there may be overlap). The number
19 of shares is chosen to hit our reliability goals (more shares on more
20 machines means more reliability), and is limited by overhead (proportional to
21 numshares or log(numshares)) and the encoding technology in use (zfec permits
22 only 256 shares total). It is also constrained by the amount of data
23 we want to send to each host. For estimating purposes, think of 10 shares
24 out of which we need 3 to reconstruct the file.
26 The encoder starts by cutting the original file into segments. All segments
27 except the last are of equal size. The segment size is chosen to constrain
28 the memory footprint (which will probably vary between 1x and 4x segment
29 size) and to constrain the overhead (which will be proportional to
30 log(number of segments)).
33 Each segment (A,B,C) is read into memory, encrypted, and encoded into
34 blocks. The 'share' (say, share #1) that makes it out to a host is a
35 collection of these blocks (block A1, B1, C1), plus some hash-tree
36 information necessary to validate the data upon retrieval. Only one segment
37 is handled at a time: all blocks for segment A are delivered before any
38 work is begun on segment B.
40 As blocks are created, we retain the hash of each one. The list of
41 block hashes for a single share (say, hash(A1), hash(B1), hash(C1)) is
42 used to form the base of a Merkle hash tree for that share (hashtrees[1]).
43 This hash tree has one terminal leaf per block. The complete block hash
44 tree is sent to the shareholder after all the data has been sent. At
45 retrieval time, the decoder will ask for specific pieces of this tree before
46 asking for blocks, whichever it needs to validate those blocks.
48 (Note: we don't really need to generate this whole block hash tree
49 ourselves. It would be sufficient to have the shareholder generate it and
50 just tell us the root. This gives us an extra level of validation on the
51 transfer, though, and it is relatively cheap to compute.)
53 Each of these block hash trees has a root hash. The collection of these
54 root hashes for all shares are collected into the 'share hash tree', which
55 has one terminal leaf per share. After sending the blocks and the complete
56 block hash tree to each shareholder, we send them the portion of the share
57 hash tree that is necessary to validate their share. The root of the share
58 hash tree is put into the URI.
62 class NotEnoughPeersError(Exception):
65 class UploadAborted(Exception):
74 class Encoder(object):
77 def __init__(self, log_parent=None):
79 self.uri_extension_data = {}
81 precondition(log_parent is None or isinstance(log_parent, int),
83 self._log_number = log.msg("creating Encoder %s" % self,
84 facility="tahoe.encoder", parent=log_parent)
88 if hasattr(self, "_storage_index"):
89 return "<Encoder for %s>" % idlib.b2a(self._storage_index)[:6]
90 return "<Encoder for unknown storage index>"
92 def log(self, *args, **kwargs):
93 if "parent" not in kwargs:
94 kwargs["parent"] = self._log_number
95 if "facility" not in kwargs:
96 kwargs["facility"] = "tahoe.encoder"
97 return log.msg(*args, **kwargs)
99 def set_encrypted_uploadable(self, uploadable):
100 eu = self._uploadable = IEncryptedUploadable(uploadable)
103 self.log(format="file size: %(size)d", size=size)
104 self.file_size = size
105 d.addCallback(_got_size)
106 d.addCallback(lambda res: eu.get_all_encoding_parameters())
107 d.addCallback(self._got_all_encoding_parameters)
108 d.addCallback(lambda res: eu.get_storage_index())
109 def _done(storage_index):
110 self._storage_index = storage_index
115 def _got_all_encoding_parameters(self, params):
116 assert not self._codec
117 k, happy, n, segsize = params
118 self.required_shares = k
119 self.shares_of_happiness = happy
121 self.segment_size = segsize
122 self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize))
123 self.log("now setting up codec")
125 assert self.segment_size % self.required_shares == 0
127 self.num_segments = mathutil.div_ceil(self.file_size,
130 self._codec = CRSEncoder()
131 self._codec.set_params(self.segment_size,
132 self.required_shares, self.num_shares)
134 data = self.uri_extension_data
135 data['codec_name'] = self._codec.get_encoder_type()
136 data['codec_params'] = self._codec.get_serialized_params()
138 data['size'] = self.file_size
139 data['segment_size'] = self.segment_size
140 self.share_size = mathutil.div_ceil(self.file_size,
141 self.required_shares)
142 data['num_segments'] = self.num_segments
143 data['needed_shares'] = self.required_shares
144 data['total_shares'] = self.num_shares
146 # the "tail" is the last segment. This segment may or may not be
147 # shorter than all other segments. We use the "tail codec" to handle
148 # it. If the tail is short, we use a different codec instance. In
149 # addition, the tail codec must be fed data which has been padded out
151 self.tail_size = self.file_size % self.segment_size
152 if not self.tail_size:
153 self.tail_size = self.segment_size
155 # the tail codec is responsible for encoding tail_size bytes
156 padded_tail_size = mathutil.next_multiple(self.tail_size,
157 self.required_shares)
158 self._tail_codec = CRSEncoder()
159 self._tail_codec.set_params(padded_tail_size,
160 self.required_shares, self.num_shares)
161 data['tail_codec_params'] = self._tail_codec.get_serialized_params()
163 def _get_share_size(self):
164 share_size = mathutil.div_ceil(self.file_size, self.required_shares)
165 overhead = self._compute_overhead()
166 return share_size + overhead
168 def _compute_overhead(self):
171 def get_param(self, name):
174 if name == "storage_index":
175 return self._storage_index
176 elif name == "share_counts":
177 return (self.required_shares, self.shares_of_happiness,
179 elif name == "num_segments":
180 return self.num_segments
181 elif name == "segment_size":
182 return self.segment_size
183 elif name == "block_size":
184 return self._codec.get_block_size()
185 elif name == "share_size":
186 return self._get_share_size()
187 elif name == "serialized_params":
188 return self._codec.get_serialized_params()
190 raise KeyError("unknown parameter name '%s'" % name)
192 def set_shareholders(self, landlords):
193 assert isinstance(landlords, dict)
195 assert IStorageBucketWriter.providedBy(landlords[k])
196 self.landlords = landlords.copy()
199 self.log("%s starting" % (self,))
200 #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
202 self._crypttext_hasher = hashutil.crypttext_hasher()
203 self._crypttext_hashes = []
205 self.subshare_hashes = [[] for x in range(self.num_shares)]
206 # subshare_hashes[i] is a list that will be accumulated and then send
207 # to landlord[i]. This list contains a hash of each segment_share
208 # that we sent to that landlord.
209 self.share_root_hashes = [None] * self.num_shares
212 "cumulative_encoding": 0.0,
213 "cumulative_sending": 0.0,
214 "hashes_and_close": 0.0,
215 "total_encode_and_push": 0.0,
217 self._start_total_timestamp = time.time()
219 d = eventual.fireEventually()
221 d.addCallback(lambda res: self.start_all_shareholders())
223 for i in range(self.num_segments-1):
224 # note to self: this form doesn't work, because lambda only
225 # captures the slot, not the value
226 #d.addCallback(lambda res: self.do_segment(i))
227 # use this form instead:
228 d.addCallback(lambda res, i=i: self._encode_segment(i))
229 d.addCallback(self._send_segment, i)
230 d.addCallback(self._turn_barrier)
231 last_segnum = self.num_segments - 1
232 d.addCallback(lambda res: self._encode_tail_segment(last_segnum))
233 d.addCallback(self._send_segment, last_segnum)
234 d.addCallback(self._turn_barrier)
236 d.addCallback(lambda res: self.finish_hashing())
238 d.addCallback(lambda res:
239 self.send_plaintext_hash_tree_to_all_shareholders())
240 d.addCallback(lambda res:
241 self.send_crypttext_hash_tree_to_all_shareholders())
242 d.addCallback(lambda res: self.send_all_subshare_hash_trees())
243 d.addCallback(lambda res: self.send_all_share_hash_trees())
244 d.addCallback(lambda res: self.send_uri_extension_to_all_shareholders())
246 d.addCallback(lambda res: self.close_all_shareholders())
247 d.addCallbacks(lambda res: self.done(), self.err)
251 self.log("aborting upload", level=log.UNUSUAL)
252 assert self._codec, "don't call abort before start"
254 # the next segment read (in _gather_data inside _encode_segment) will
255 # raise UploadAborted(), which will bypass the rest of the upload
256 # chain. If we've sent the final segment's shares, it's too late to
257 # abort. TODO: allow abort any time up to close_all_shareholders.
259 def _turn_barrier(self, res):
260 # putting this method in a Deferred chain imposes a guaranteed
261 # reactor turn between the pre- and post- portions of that chain.
262 # This can be useful to limit memory consumption: since Deferreds do
263 # not do tail recursion, code which uses defer.succeed(result) for
264 # consistency will cause objects to live for longer than you might
267 return eventual.fireEventually(res)
270 def start_all_shareholders(self):
271 self.log("starting shareholders", level=log.NOISY)
273 for shareid in self.landlords:
274 d = self.landlords[shareid].start()
275 d.addErrback(self._remove_shareholder, shareid, "start")
277 return self._gather_responses(dl)
279 def _encode_segment(self, segnum):
283 # the ICodecEncoder API wants to receive a total of self.segment_size
284 # bytes on each encode() call, broken up into a number of
285 # identically-sized pieces. Due to the way the codec algorithm works,
286 # these pieces need to be the same size as the share which the codec
287 # will generate. Therefore we must feed it with input_piece_size that
288 # equals the output share size.
289 input_piece_size = codec.get_block_size()
291 # as a result, the number of input pieces per encode() call will be
292 # equal to the number of required shares with which the codec was
293 # constructed. You can think of the codec as chopping up a
294 # 'segment_size' of data into 'required_shares' shares (not doing any
295 # fancy math at all, just doing a split), then creating some number
296 # of additional shares which can be substituted if the primary ones
299 crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
301 # memory footprint: we only hold a tiny piece of the plaintext at any
302 # given time. We build up a segment's worth of cryptttext, then hand
303 # it to the encoder. Assuming 3-of-10 encoding (3.3x expansion) and
304 # 1MiB max_segment_size, we get a peak memory footprint of 4.3*1MiB =
305 # 4.3MiB. Lowering max_segment_size to, say, 100KiB would drop the
306 # footprint to 430KiB at the expense of more hash-tree overhead.
308 d = self._gather_data(self.required_shares, input_piece_size,
309 crypttext_segment_hasher)
310 def _done_gathering(chunks):
312 assert len(c) == input_piece_size
313 self._crypttext_hashes.append(crypttext_segment_hasher.digest())
314 # during this call, we hit 5*segsize memory
315 return codec.encode(chunks)
316 d.addCallback(_done_gathering)
318 elapsed = time.time() - start
319 self._times["cumulative_encoding"] += elapsed
324 def _encode_tail_segment(self, segnum):
327 codec = self._tail_codec
328 input_piece_size = codec.get_block_size()
330 crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
332 d = self._gather_data(self.required_shares, input_piece_size,
333 crypttext_segment_hasher,
335 def _done_gathering(chunks):
337 # a short trailing chunk will have been padded by
339 assert len(c) == input_piece_size
340 self._crypttext_hashes.append(crypttext_segment_hasher.digest())
341 return codec.encode(chunks)
342 d.addCallback(_done_gathering)
344 elapsed = time.time() - start
345 self._times["cumulative_encoding"] += elapsed
350 def _gather_data(self, num_chunks, input_chunk_size,
351 crypttext_segment_hasher,
354 """Return a Deferred that will fire when the required number of
355 chunks have been read (and hashed and encrypted). The Deferred fires
356 with the combination of any 'previous_chunks' and the new chunks
357 which were gathered."""
360 raise UploadAborted()
363 return defer.succeed(previous_chunks)
365 d = self._uploadable.read_encrypted(input_chunk_size, False)
368 raise UploadAborted()
369 encrypted_pieces = []
372 encrypted_piece = data.pop(0)
373 length += len(encrypted_piece)
374 crypttext_segment_hasher.update(encrypted_piece)
375 self._crypttext_hasher.update(encrypted_piece)
376 encrypted_pieces.append(encrypted_piece)
379 if length < input_chunk_size:
381 pad_size = input_chunk_size - length
382 encrypted_pieces.append('\x00' * pad_size)
384 # non-tail segments should be the full segment size
385 if length != input_chunk_size:
386 log.msg("non-tail segment should be full segment size: %d!=%d"
387 % (length, input_chunk_size), level=log.BAD)
388 precondition(length == input_chunk_size,
389 "length=%d != input_chunk_size=%d" %
390 (length, input_chunk_size))
392 encrypted_piece = "".join(encrypted_pieces)
393 return previous_chunks + [encrypted_piece]
396 d.addCallback(lambda chunks:
397 self._gather_data(num_chunks-1, input_chunk_size,
398 crypttext_segment_hasher,
399 allow_short, chunks))
402 def _send_segment(self, (shares, shareids), segnum):
403 # To generate the URI, we must generate the roothash, so we must
404 # generate all shares, even if we aren't actually giving them to
405 # anybody. This means that the set of shares we create will be equal
406 # to or larger than the set of landlords. If we have any landlord who
407 # *doesn't* have a share, that's an error.
408 _assert(set(self.landlords.keys()).issubset(set(shareids)),
409 shareids=shareids, landlords=self.landlords)
412 lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY)
413 for i in range(len(shares)):
415 shareid = shareids[i]
416 d = self.send_subshare(shareid, segnum, subshare, lognum)
418 subshare_hash = hashutil.block_hash(subshare)
419 #from allmydata.util import idlib
420 #log.msg("creating block (shareid=%d, blocknum=%d) "
421 # "len=%d %r .. %r: %s" %
422 # (shareid, segnum, len(subshare),
423 # subshare[:50], subshare[-50:], idlib.b2a(subshare_hash)))
424 self.subshare_hashes[shareid].append(subshare_hash)
426 dl = self._gather_responses(dl)
428 self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
430 self.segment_size*(segnum+1),
431 self.segment_size*self.num_segments,
432 100 * (segnum+1) / self.num_segments,
434 level=log.OPERATIONAL)
435 elapsed = time.time() - start
436 self._times["cumulative_sending"] += elapsed
438 dl.addCallback(_logit)
441 def send_subshare(self, shareid, segment_num, subshare, lognum):
442 if shareid not in self.landlords:
443 return defer.succeed(None)
444 sh = self.landlords[shareid]
445 lognum2 = self.log("put_block to %s" % self.landlords[shareid],
446 parent=lognum, level=log.NOISY)
447 d = sh.put_block(segment_num, subshare)
449 self.log("put_block done", parent=lognum2, level=log.NOISY)
452 d.addErrback(self._remove_shareholder, shareid,
453 "segnum=%d" % segment_num)
456 def _remove_shareholder(self, why, shareid, where):
457 ln = self.log(format="error while sending %(method)s to shareholder=%(shnum)d",
458 method=where, shnum=shareid,
459 level=log.UNUSUAL, failure=why)
460 if shareid in self.landlords:
461 del self.landlords[shareid]
464 self.log("they weren't in our list of landlords", parent=ln,
466 if len(self.landlords) < self.shares_of_happiness:
467 msg = "lost too many shareholders during upload: %s" % why
468 raise NotEnoughPeersError(msg)
469 self.log("but we can still continue with %s shares, we'll be happy "
470 "with at least %s" % (len(self.landlords),
471 self.shares_of_happiness),
474 def _gather_responses(self, dl):
475 d = defer.DeferredList(dl, fireOnOneErrback=True)
476 def _eatNotEnoughPeersError(f):
477 # all exceptions that occur while talking to a peer are handled
478 # in _remove_shareholder. That might raise NotEnoughPeersError,
479 # which will cause the DeferredList to errback but which should
480 # otherwise be consumed. Allow non-NotEnoughPeersError exceptions
481 # to pass through as an unhandled errback. We use this in lieu of
482 # consumeErrors=True to allow coding errors to be logged.
483 f.trap(NotEnoughPeersError)
486 d0.addErrback(_eatNotEnoughPeersError)
489 def finish_hashing(self):
490 self._start_hashing_and_close_timestamp = time.time()
491 crypttext_hash = self._crypttext_hasher.digest()
492 self.uri_extension_data["crypttext_hash"] = crypttext_hash
493 d = self._uploadable.get_plaintext_hash()
494 def _got(plaintext_hash):
495 self.uri_extension_data["plaintext_hash"] = plaintext_hash
496 return self._uploadable.get_plaintext_hashtree_leaves(0, self.num_segments, self.num_segments)
498 def _got_hashtree_leaves(leaves):
499 self.log("Encoder: got plaintext_hashtree_leaves: %s" %
500 (",".join([idlib.b2a(h) for h in leaves]),),
502 ht = list(HashTree(list(leaves)))
503 self.uri_extension_data["plaintext_root_hash"] = ht[0]
504 self._plaintext_hashtree_nodes = ht
505 d.addCallback(_got_hashtree_leaves)
507 d.addCallback(lambda res: self._uploadable.close())
510 def send_plaintext_hash_tree_to_all_shareholders(self):
511 self.log("sending plaintext hash tree", level=log.NOISY)
513 for shareid in self.landlords.keys():
514 d = self.send_plaintext_hash_tree(shareid,
515 self._plaintext_hashtree_nodes)
517 return self._gather_responses(dl)
519 def send_plaintext_hash_tree(self, shareid, all_hashes):
520 if shareid not in self.landlords:
521 return defer.succeed(None)
522 sh = self.landlords[shareid]
523 d = sh.put_plaintext_hashes(all_hashes)
524 d.addErrback(self._remove_shareholder, shareid, "put_plaintext_hashes")
527 def send_crypttext_hash_tree_to_all_shareholders(self):
528 self.log("sending crypttext hash tree", level=log.NOISY)
529 t = HashTree(self._crypttext_hashes)
531 self.uri_extension_data["crypttext_root_hash"] = t[0]
533 for shareid in self.landlords.keys():
534 dl.append(self.send_crypttext_hash_tree(shareid, all_hashes))
535 return self._gather_responses(dl)
537 def send_crypttext_hash_tree(self, shareid, all_hashes):
538 if shareid not in self.landlords:
539 return defer.succeed(None)
540 sh = self.landlords[shareid]
541 d = sh.put_crypttext_hashes(all_hashes)
542 d.addErrback(self._remove_shareholder, shareid, "put_crypttext_hashes")
545 def send_all_subshare_hash_trees(self):
546 self.log("sending subshare hash trees", level=log.NOISY)
548 for shareid,hashes in enumerate(self.subshare_hashes):
549 # hashes is a list of the hashes of all subshares that were sent
550 # to shareholder[shareid].
551 dl.append(self.send_one_subshare_hash_tree(shareid, hashes))
552 return self._gather_responses(dl)
554 def send_one_subshare_hash_tree(self, shareid, subshare_hashes):
555 t = HashTree(subshare_hashes)
557 # all_hashes[0] is the root hash, == hash(ah[1]+ah[2])
558 # all_hashes[1] is the left child, == hash(ah[3]+ah[4])
559 # all_hashes[n] == hash(all_hashes[2*n+1] + all_hashes[2*n+2])
560 self.share_root_hashes[shareid] = t[0]
561 if shareid not in self.landlords:
562 return defer.succeed(None)
563 sh = self.landlords[shareid]
564 d = sh.put_block_hashes(all_hashes)
565 d.addErrback(self._remove_shareholder, shareid, "put_block_hashes")
568 def send_all_share_hash_trees(self):
569 # each bucket gets a set of share hash tree nodes that are needed to
570 # validate their share. This includes the share hash itself, but does
571 # not include the top-level hash root (which is stored securely in
573 self.log("sending all share hash trees", level=log.NOISY)
575 for h in self.share_root_hashes:
577 # create the share hash tree
578 t = HashTree(self.share_root_hashes)
579 # the root of this hash tree goes into our URI
580 self.uri_extension_data['share_root_hash'] = t[0]
581 # now send just the necessary pieces out to each shareholder
582 for i in range(self.num_shares):
583 # the HashTree is given a list of leaves: 0,1,2,3..n .
584 # These become nodes A+0,A+1,A+2.. of the tree, where A=n-1
585 needed_hash_indices = t.needed_hashes(i, include_leaf=True)
586 hashes = [(hi, t[hi]) for hi in needed_hash_indices]
587 dl.append(self.send_one_share_hash_tree(i, hashes))
588 return self._gather_responses(dl)
590 def send_one_share_hash_tree(self, shareid, needed_hashes):
591 if shareid not in self.landlords:
592 return defer.succeed(None)
593 sh = self.landlords[shareid]
594 d = sh.put_share_hashes(needed_hashes)
595 d.addErrback(self._remove_shareholder, shareid, "put_share_hashes")
598 def send_uri_extension_to_all_shareholders(self):
599 lp = self.log("sending uri_extension", level=log.NOISY)
600 for k in ('crypttext_root_hash', 'crypttext_hash',
601 'plaintext_root_hash', 'plaintext_hash',
603 assert k in self.uri_extension_data
604 uri_extension = uri.pack_extension(self.uri_extension_data)
606 for k,v in self.uri_extension_data.items():
607 if k.endswith("hash"):
611 self.log("uri_extension_data is %s" % (ed,), level=log.NOISY, parent=lp)
612 self.uri_extension_hash = hashutil.uri_extension_hash(uri_extension)
614 for shareid in self.landlords.keys():
615 dl.append(self.send_uri_extension(shareid, uri_extension))
616 return self._gather_responses(dl)
618 def send_uri_extension(self, shareid, uri_extension):
619 sh = self.landlords[shareid]
620 d = sh.put_uri_extension(uri_extension)
621 d.addErrback(self._remove_shareholder, shareid, "put_uri_extension")
624 def close_all_shareholders(self):
625 self.log("closing shareholders", level=log.NOISY)
627 for shareid in self.landlords:
628 d = self.landlords[shareid].close()
629 d.addErrback(self._remove_shareholder, shareid, "close")
631 return self._gather_responses(dl)
634 self.log("upload done", level=log.OPERATIONAL)
636 h_and_c_elapsed = now - self._start_hashing_and_close_timestamp
637 self._times["hashes_and_close"] = h_and_c_elapsed
638 total_elapsed = now - self._start_total_timestamp
639 self._times["total_encode_and_push"] = total_elapsed
641 # update our sharemap
642 self._shares_placed = set(self.landlords.keys())
643 return (self.uri_extension_hash, self.required_shares,
644 self.num_shares, self.file_size)
647 self.log("upload failed", failure=f, level=log.UNUSUAL)
648 # we need to abort any remaining shareholders, so they'll delete the
649 # partial share, allowing someone else to upload it again.
650 self.log("aborting shareholders", level=log.UNUSUAL)
652 for shareid in list(self.landlords.keys()):
653 d = self.landlords[shareid].abort()
654 d.addErrback(self._remove_shareholder, shareid, "abort")
656 d = self._gather_responses(dl)
658 self.log("shareholders aborted", level=log.UNUSUAL)
659 if f.check(defer.FirstError):
660 return f.value.subFailure
665 def get_shares_placed(self):
666 # return a set of share numbers that were successfully placed.
667 return self._shares_placed
670 # return a dictionary of encode+push timings