1 # -*- test-case-name: allmydata.test.test_encode -*-
4 from zope.interface import implements
5 from twisted.internet import defer
6 from foolscap.api import fireEventually
7 from allmydata import uri
8 from allmydata.storage.server import si_b2a
9 from allmydata.hashtree import HashTree
10 from allmydata.util import mathutil, hashutil, base32, log, happinessutil
11 from allmydata.util.assertutil import _assert, precondition
12 from allmydata.codec import CRSEncoder
13 from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
14 IEncryptedUploadable, IUploadStatus, UploadUnhappinessError
18 The goal of the encoder is to turn the original file into a series of
19 'shares'. Each share is going to a 'shareholder' (nominally each shareholder
20 is a different host, but for small grids there may be overlap). The number
21 of shares is chosen to hit our reliability goals (more shares on more
22 machines means more reliability), and is limited by overhead (proportional to
23 numshares or log(numshares)) and the encoding technology in use (zfec permits
24 only 256 shares total). It is also constrained by the amount of data
25 we want to send to each host. For estimating purposes, think of 10 shares
26 out of which we need 3 to reconstruct the file.
28 The encoder starts by cutting the original file into segments. All segments
29 except the last are of equal size. The segment size is chosen to constrain
30 the memory footprint (which will probably vary between 1x and 4x segment
31 size) and to constrain the overhead (which will be proportional to
32 log(number of segments)).
35 Each segment (A,B,C) is read into memory, encrypted, and encoded into
36 blocks. The 'share' (say, share #1) that makes it out to a host is a
37 collection of these blocks (block A1, B1, C1), plus some hash-tree
38 information necessary to validate the data upon retrieval. Only one segment
39 is handled at a time: all blocks for segment A are delivered before any
40 work is begun on segment B.
42 As blocks are created, we retain the hash of each one. The list of block hashes
43 for a single share (say, hash(A1), hash(B1), hash(C1)) is used to form the base
44 of a Merkle hash tree for that share, called the block hash tree.
46 This hash tree has one terminal leaf per block. The complete block hash
47 tree is sent to the shareholder after all the data has been sent. At
48 retrieval time, the decoder will ask for specific pieces of this tree before
49 asking for blocks, whichever it needs to validate those blocks.
51 (Note: we don't really need to generate this whole block hash tree
52 ourselves. It would be sufficient to have the shareholder generate it and
53 just tell us the root. This gives us an extra level of validation on the
54 transfer, though, and it is relatively cheap to compute.)
56 Each of these block hash trees has a root hash. The collection of these
57 root hashes for all shares are collected into the 'share hash tree', which
58 has one terminal leaf per share. After sending the blocks and the complete
59 block hash tree to each shareholder, we send them the portion of the share
60 hash tree that is necessary to validate their share. The root of the share
61 hash tree is put into the URI.
65 class UploadAborted(Exception):
74 class Encoder(object):
77 def __init__(self, log_parent=None, upload_status=None):
79 self.uri_extension_data = {}
83 self._status = IUploadStatus(upload_status)
84 precondition(log_parent is None or isinstance(log_parent, int),
86 self._log_number = log.msg("creating Encoder %s" % self,
87 facility="tahoe.encoder", parent=log_parent)
91 if hasattr(self, "_storage_index"):
92 return "<Encoder for %s>" % si_b2a(self._storage_index)[:5]
93 return "<Encoder for unknown storage index>"
95 def log(self, *args, **kwargs):
96 if "parent" not in kwargs:
97 kwargs["parent"] = self._log_number
98 if "facility" not in kwargs:
99 kwargs["facility"] = "tahoe.encoder"
100 return log.msg(*args, **kwargs)
102 def set_encrypted_uploadable(self, uploadable):
103 eu = self._uploadable = IEncryptedUploadable(uploadable)
106 self.log(format="file size: %(size)d", size=size)
107 self.file_size = size
108 d.addCallback(_got_size)
109 d.addCallback(lambda res: eu.get_all_encoding_parameters())
110 d.addCallback(self._got_all_encoding_parameters)
111 d.addCallback(lambda res: eu.get_storage_index())
112 def _done(storage_index):
113 self._storage_index = storage_index
118 def _got_all_encoding_parameters(self, params):
119 assert not self._codec
120 k, happy, n, segsize = params
121 self.required_shares = k
122 self.servers_of_happiness = happy
124 self.segment_size = segsize
125 self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize))
126 self.log("now setting up codec")
128 assert self.segment_size % self.required_shares == 0
130 self.num_segments = mathutil.div_ceil(self.file_size,
133 self._codec = CRSEncoder()
134 self._codec.set_params(self.segment_size,
135 self.required_shares, self.num_shares)
137 data = self.uri_extension_data
138 data['codec_name'] = self._codec.get_encoder_type()
139 data['codec_params'] = self._codec.get_serialized_params()
141 data['size'] = self.file_size
142 data['segment_size'] = self.segment_size
143 self.share_size = mathutil.div_ceil(self.file_size,
144 self.required_shares)
145 data['num_segments'] = self.num_segments
146 data['needed_shares'] = self.required_shares
147 data['total_shares'] = self.num_shares
149 # the "tail" is the last segment. This segment may or may not be
150 # shorter than all other segments. We use the "tail codec" to handle
151 # it. If the tail is short, we use a different codec instance. In
152 # addition, the tail codec must be fed data which has been padded out
154 tail_size = self.file_size % self.segment_size
156 tail_size = self.segment_size
158 # the tail codec is responsible for encoding tail_size bytes
159 padded_tail_size = mathutil.next_multiple(tail_size,
160 self.required_shares)
161 self._tail_codec = CRSEncoder()
162 self._tail_codec.set_params(padded_tail_size,
163 self.required_shares, self.num_shares)
164 data['tail_codec_params'] = self._tail_codec.get_serialized_params()
166 def _get_share_size(self):
167 share_size = mathutil.div_ceil(self.file_size, self.required_shares)
168 overhead = self._compute_overhead()
169 return share_size + overhead
171 def _compute_overhead(self):
174 def get_param(self, name):
177 if name == "storage_index":
178 return self._storage_index
179 elif name == "share_counts":
180 return (self.required_shares, self.servers_of_happiness,
182 elif name == "num_segments":
183 return self.num_segments
184 elif name == "segment_size":
185 return self.segment_size
186 elif name == "block_size":
187 return self._codec.get_block_size()
188 elif name == "share_size":
189 return self._get_share_size()
190 elif name == "serialized_params":
191 return self._codec.get_serialized_params()
193 raise KeyError("unknown parameter name '%s'" % name)
195 def set_shareholders(self, landlords, servermap):
196 assert isinstance(landlords, dict)
198 assert IStorageBucketWriter.providedBy(landlords[k])
199 self.landlords = landlords.copy()
200 assert isinstance(servermap, dict)
201 for v in servermap.itervalues():
202 assert isinstance(v, set)
203 self.servermap = servermap.copy()
206 """ Returns a Deferred that will fire with the verify cap (an instance of
207 uri.CHKFileVerifierURI)."""
208 self.log("%s starting" % (self,))
209 #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
211 self._crypttext_hasher = hashutil.crypttext_hasher()
212 self._crypttext_hashes = []
214 self.block_hashes = [[] for x in range(self.num_shares)]
215 # block_hashes[i] is a list that will be accumulated and then send
216 # to landlord[i]. This list contains a hash of each segment_share
217 # that we sent to that landlord.
218 self.share_root_hashes = [None] * self.num_shares
221 "cumulative_encoding": 0.0,
222 "cumulative_sending": 0.0,
223 "hashes_and_close": 0.0,
224 "total_encode_and_push": 0.0,
226 self._start_total_timestamp = time.time()
230 d.addCallback(lambda res: self.start_all_shareholders())
232 for i in range(self.num_segments-1):
233 # note to self: this form doesn't work, because lambda only
234 # captures the slot, not the value
235 #d.addCallback(lambda res: self.do_segment(i))
236 # use this form instead:
237 d.addCallback(lambda res, i=i: self._encode_segment(i))
238 d.addCallback(self._send_segment, i)
239 d.addCallback(self._turn_barrier)
240 last_segnum = self.num_segments - 1
241 d.addCallback(lambda res: self._encode_tail_segment(last_segnum))
242 d.addCallback(self._send_segment, last_segnum)
243 d.addCallback(self._turn_barrier)
245 d.addCallback(lambda res: self.finish_hashing())
247 d.addCallback(lambda res:
248 self.send_crypttext_hash_tree_to_all_shareholders())
249 d.addCallback(lambda res: self.send_all_block_hash_trees())
250 d.addCallback(lambda res: self.send_all_share_hash_trees())
251 d.addCallback(lambda res: self.send_uri_extension_to_all_shareholders())
253 d.addCallback(lambda res: self.close_all_shareholders())
254 d.addCallbacks(self.done, self.err)
257 def set_status(self, status):
259 self._status.set_status(status)
261 def set_encode_and_push_progress(self, sent_segments=None, extra=0.0):
263 # we treat the final hash+close as an extra segment
264 if sent_segments is None:
265 sent_segments = self.num_segments
266 progress = float(sent_segments + extra) / (self.num_segments + 1)
267 self._status.set_progress(2, progress)
270 self.log("aborting upload", level=log.UNUSUAL)
271 assert self._codec, "don't call abort before start"
273 # the next segment read (in _gather_data inside _encode_segment) will
274 # raise UploadAborted(), which will bypass the rest of the upload
275 # chain. If we've sent the final segment's shares, it's too late to
276 # abort. TODO: allow abort any time up to close_all_shareholders.
278 def _turn_barrier(self, res):
279 # putting this method in a Deferred chain imposes a guaranteed
280 # reactor turn between the pre- and post- portions of that chain.
281 # This can be useful to limit memory consumption: since Deferreds do
282 # not do tail recursion, code which uses defer.succeed(result) for
283 # consistency will cause objects to live for longer than you might
286 return fireEventually(res)
289 def start_all_shareholders(self):
290 self.log("starting shareholders", level=log.NOISY)
291 self.set_status("Starting shareholders")
293 for shareid in list(self.landlords):
294 d = self.landlords[shareid].put_header()
295 d.addErrback(self._remove_shareholder, shareid, "start")
297 return self._gather_responses(dl)
299 def _encode_segment(self, segnum):
303 # the ICodecEncoder API wants to receive a total of self.segment_size
304 # bytes on each encode() call, broken up into a number of
305 # identically-sized pieces. Due to the way the codec algorithm works,
306 # these pieces need to be the same size as the share which the codec
307 # will generate. Therefore we must feed it with input_piece_size that
308 # equals the output share size.
309 input_piece_size = codec.get_block_size()
311 # as a result, the number of input pieces per encode() call will be
312 # equal to the number of required shares with which the codec was
313 # constructed. You can think of the codec as chopping up a
314 # 'segment_size' of data into 'required_shares' shares (not doing any
315 # fancy math at all, just doing a split), then creating some number
316 # of additional shares which can be substituted if the primary ones
319 # we read data from the source one segment at a time, and then chop
320 # it into 'input_piece_size' pieces before handing it to the codec
322 crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
324 # memory footprint: we only hold a tiny piece of the plaintext at any
325 # given time. We build up a segment's worth of cryptttext, then hand
326 # it to the encoder. Assuming 3-of-10 encoding (3.3x expansion) and
327 # 1MiB max_segment_size, we get a peak memory footprint of 4.3*1MiB =
328 # 4.3MiB. Lowering max_segment_size to, say, 100KiB would drop the
329 # footprint to 430KiB at the expense of more hash-tree overhead.
331 d = self._gather_data(self.required_shares, input_piece_size,
332 crypttext_segment_hasher)
333 def _done_gathering(chunks):
335 assert len(c) == input_piece_size
336 self._crypttext_hashes.append(crypttext_segment_hasher.digest())
337 # during this call, we hit 5*segsize memory
338 return codec.encode(chunks)
339 d.addCallback(_done_gathering)
341 elapsed = time.time() - start
342 self._times["cumulative_encoding"] += elapsed
347 def _encode_tail_segment(self, segnum):
350 codec = self._tail_codec
351 input_piece_size = codec.get_block_size()
353 crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
355 d = self._gather_data(self.required_shares, input_piece_size,
356 crypttext_segment_hasher, allow_short=True)
357 def _done_gathering(chunks):
359 # a short trailing chunk will have been padded by
361 assert len(c) == input_piece_size
362 self._crypttext_hashes.append(crypttext_segment_hasher.digest())
363 return codec.encode(chunks)
364 d.addCallback(_done_gathering)
366 elapsed = time.time() - start
367 self._times["cumulative_encoding"] += elapsed
372 def _gather_data(self, num_chunks, input_chunk_size,
373 crypttext_segment_hasher,
375 """Return a Deferred that will fire when the required number of
376 chunks have been read (and hashed and encrypted). The Deferred fires
377 with a list of chunks, each of size input_chunk_size."""
379 # I originally built this to allow read_encrypted() to behave badly:
380 # to let it return more or less data than you asked for. It would
381 # stash the leftovers until later, and then recurse until it got
382 # enough. I don't think that was actually useful.
384 # who defines read_encrypted?
385 # offloaded.LocalCiphertextReader: real disk file: exact
386 # upload.EncryptAnUploadable: Uploadable, but a wrapper that makes
387 # it exact. The return value is a list of 50KiB chunks, to reduce
388 # the memory footprint of the encryption process.
389 # repairer.Repairer: immutable.filenode.CiphertextFileNode: exact
391 # This has been redefined to require read_encrypted() to behave like
392 # a local file: return exactly the amount requested unless it hits
397 raise UploadAborted()
399 read_size = num_chunks * input_chunk_size
400 d = self._uploadable.read_encrypted(read_size, hash_only=False)
402 assert isinstance(data, (list,tuple))
404 raise UploadAborted()
406 precondition(len(data) <= read_size, len(data), read_size)
408 precondition(len(data) == read_size, len(data), read_size)
409 crypttext_segment_hasher.update(data)
410 self._crypttext_hasher.update(data)
411 if allow_short and len(data) < read_size:
413 data += "\x00" * (read_size - len(data))
414 encrypted_pieces = [data[i:i+input_chunk_size]
415 for i in range(0, len(data), input_chunk_size)]
416 return encrypted_pieces
420 def _send_segment(self, (shares, shareids), segnum):
421 # To generate the URI, we must generate the roothash, so we must
422 # generate all shares, even if we aren't actually giving them to
423 # anybody. This means that the set of shares we create will be equal
424 # to or larger than the set of landlords. If we have any landlord who
425 # *doesn't* have a share, that's an error.
426 _assert(set(self.landlords.keys()).issubset(set(shareids)),
427 shareids=shareids, landlords=self.landlords)
430 self.set_status("Sending segment %d of %d" % (segnum+1,
432 self.set_encode_and_push_progress(segnum)
433 lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY)
434 for i in range(len(shares)):
436 shareid = shareids[i]
437 d = self.send_block(shareid, segnum, block, lognum)
439 block_hash = hashutil.block_hash(block)
440 #from allmydata.util import base32
441 #log.msg("creating block (shareid=%d, blocknum=%d) "
442 # "len=%d %r .. %r: %s" %
443 # (shareid, segnum, len(block),
444 # block[:50], block[-50:], base32.b2a(block_hash)))
445 self.block_hashes[shareid].append(block_hash)
447 dl = self._gather_responses(dl)
449 self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
451 self.segment_size*(segnum+1),
452 self.segment_size*self.num_segments,
453 100 * (segnum+1) / self.num_segments,
455 level=log.OPERATIONAL)
456 elapsed = time.time() - start
457 self._times["cumulative_sending"] += elapsed
459 dl.addCallback(_logit)
462 def send_block(self, shareid, segment_num, block, lognum):
463 if shareid not in self.landlords:
464 return defer.succeed(None)
465 sh = self.landlords[shareid]
466 lognum2 = self.log("put_block to %s" % self.landlords[shareid],
467 parent=lognum, level=log.NOISY)
468 d = sh.put_block(segment_num, block)
470 self.log("put_block done", parent=lognum2, level=log.NOISY)
473 d.addErrback(self._remove_shareholder, shareid,
474 "segnum=%d" % segment_num)
477 def _remove_shareholder(self, why, shareid, where):
478 ln = self.log(format="error while sending %(method)s to shareholder=%(shnum)d",
479 method=where, shnum=shareid,
480 level=log.UNUSUAL, failure=why)
481 if shareid in self.landlords:
482 self.landlords[shareid].abort()
483 peerid = self.landlords[shareid].get_peerid()
485 del self.landlords[shareid]
486 self.servermap[shareid].remove(peerid)
487 if not self.servermap[shareid]:
488 del self.servermap[shareid]
491 self.log("they weren't in our list of landlords", parent=ln,
492 level=log.WEIRD, umid="TQGFRw")
493 happiness = happinessutil.servers_of_happiness(self.servermap)
494 if happiness < self.servers_of_happiness:
495 peerids = set(happinessutil.shares_by_server(self.servermap).keys())
496 msg = happinessutil.failure_message(len(peerids),
497 self.required_shares,
498 self.servers_of_happiness,
500 msg = "%s: %s" % (msg, why)
501 raise UploadUnhappinessError(msg)
502 self.log("but we can still continue with %s shares, we'll be happy "
503 "with at least %s" % (happiness,
504 self.servers_of_happiness),
507 def _gather_responses(self, dl):
508 d = defer.DeferredList(dl, fireOnOneErrback=True)
509 def _eatUploadUnhappinessError(f):
510 # all exceptions that occur while talking to a peer are handled
511 # in _remove_shareholder. That might raise UploadUnhappinessError,
512 # which will cause the DeferredList to errback but which should
513 # otherwise be consumed. Allow non-UploadUnhappinessError exceptions
514 # to pass through as an unhandled errback. We use this in lieu of
515 # consumeErrors=True to allow coding errors to be logged.
516 f.trap(UploadUnhappinessError)
519 d0.addErrback(_eatUploadUnhappinessError)
522 def finish_hashing(self):
523 self._start_hashing_and_close_timestamp = time.time()
524 self.set_status("Finishing hashes")
525 self.set_encode_and_push_progress(extra=0.0)
526 crypttext_hash = self._crypttext_hasher.digest()
527 self.uri_extension_data["crypttext_hash"] = crypttext_hash
528 self._uploadable.close()
530 def send_crypttext_hash_tree_to_all_shareholders(self):
531 self.log("sending crypttext hash tree", level=log.NOISY)
532 self.set_status("Sending Crypttext Hash Tree")
533 self.set_encode_and_push_progress(extra=0.3)
534 t = HashTree(self._crypttext_hashes)
536 self.uri_extension_data["crypttext_root_hash"] = t[0]
538 for shareid in list(self.landlords):
539 dl.append(self.send_crypttext_hash_tree(shareid, all_hashes))
540 return self._gather_responses(dl)
542 def send_crypttext_hash_tree(self, shareid, all_hashes):
543 if shareid not in self.landlords:
544 return defer.succeed(None)
545 sh = self.landlords[shareid]
546 d = sh.put_crypttext_hashes(all_hashes)
547 d.addErrback(self._remove_shareholder, shareid, "put_crypttext_hashes")
550 def send_all_block_hash_trees(self):
551 self.log("sending block hash trees", level=log.NOISY)
552 self.set_status("Sending Subshare Hash Trees")
553 self.set_encode_and_push_progress(extra=0.4)
555 for shareid,hashes in enumerate(self.block_hashes):
556 # hashes is a list of the hashes of all blocks that were sent
557 # to shareholder[shareid].
558 dl.append(self.send_one_block_hash_tree(shareid, hashes))
559 return self._gather_responses(dl)
561 def send_one_block_hash_tree(self, shareid, block_hashes):
562 t = HashTree(block_hashes)
564 # all_hashes[0] is the root hash, == hash(ah[1]+ah[2])
565 # all_hashes[1] is the left child, == hash(ah[3]+ah[4])
566 # all_hashes[n] == hash(all_hashes[2*n+1] + all_hashes[2*n+2])
567 self.share_root_hashes[shareid] = t[0]
568 if shareid not in self.landlords:
569 return defer.succeed(None)
570 sh = self.landlords[shareid]
571 d = sh.put_block_hashes(all_hashes)
572 d.addErrback(self._remove_shareholder, shareid, "put_block_hashes")
575 def send_all_share_hash_trees(self):
576 # Each bucket gets a set of share hash tree nodes that are needed to validate their
577 # share. This includes the share hash itself, but does not include the top-level hash
578 # root (which is stored securely in the URI instead).
579 self.log("sending all share hash trees", level=log.NOISY)
580 self.set_status("Sending Share Hash Trees")
581 self.set_encode_and_push_progress(extra=0.6)
583 for h in self.share_root_hashes:
585 # create the share hash tree
586 t = HashTree(self.share_root_hashes)
587 # the root of this hash tree goes into our URI
588 self.uri_extension_data['share_root_hash'] = t[0]
589 # now send just the necessary pieces out to each shareholder
590 for i in range(self.num_shares):
591 # the HashTree is given a list of leaves: 0,1,2,3..n .
592 # These become nodes A+0,A+1,A+2.. of the tree, where A=n-1
593 needed_hash_indices = t.needed_hashes(i, include_leaf=True)
594 hashes = [(hi, t[hi]) for hi in needed_hash_indices]
595 dl.append(self.send_one_share_hash_tree(i, hashes))
596 return self._gather_responses(dl)
598 def send_one_share_hash_tree(self, shareid, needed_hashes):
599 if shareid not in self.landlords:
600 return defer.succeed(None)
601 sh = self.landlords[shareid]
602 d = sh.put_share_hashes(needed_hashes)
603 d.addErrback(self._remove_shareholder, shareid, "put_share_hashes")
606 def send_uri_extension_to_all_shareholders(self):
607 lp = self.log("sending uri_extension", level=log.NOISY)
608 self.set_status("Sending URI Extensions")
609 self.set_encode_and_push_progress(extra=0.8)
610 for k in ('crypttext_root_hash', 'crypttext_hash',
612 assert k in self.uri_extension_data
613 uri_extension = uri.pack_extension(self.uri_extension_data)
615 for k,v in self.uri_extension_data.items():
616 if k.endswith("hash"):
617 ed[k] = base32.b2a(v)
620 self.log("uri_extension_data is %s" % (ed,), level=log.NOISY, parent=lp)
621 self.uri_extension_hash = hashutil.uri_extension_hash(uri_extension)
623 for shareid in list(self.landlords):
624 dl.append(self.send_uri_extension(shareid, uri_extension))
625 return self._gather_responses(dl)
627 def send_uri_extension(self, shareid, uri_extension):
628 sh = self.landlords[shareid]
629 d = sh.put_uri_extension(uri_extension)
630 d.addErrback(self._remove_shareholder, shareid, "put_uri_extension")
633 def close_all_shareholders(self):
634 self.log("closing shareholders", level=log.NOISY)
635 self.set_status("Closing Shareholders")
636 self.set_encode_and_push_progress(extra=0.9)
638 for shareid in list(self.landlords):
639 d = self.landlords[shareid].close()
640 d.addErrback(self._remove_shareholder, shareid, "close")
642 return self._gather_responses(dl)
645 self.log("upload done", level=log.OPERATIONAL)
646 self.set_status("Finished")
647 self.set_encode_and_push_progress(extra=1.0) # done
649 h_and_c_elapsed = now - self._start_hashing_and_close_timestamp
650 self._times["hashes_and_close"] = h_and_c_elapsed
651 total_elapsed = now - self._start_total_timestamp
652 self._times["total_encode_and_push"] = total_elapsed
654 # update our sharemap
655 self._shares_placed = set(self.landlords.keys())
656 return uri.CHKFileVerifierURI(self._storage_index, self.uri_extension_hash,
657 self.required_shares, self.num_shares, self.file_size)
660 self.log("upload failed", failure=f, level=log.UNUSUAL)
661 self.set_status("Failed")
662 # we need to abort any remaining shareholders, so they'll delete the
663 # partial share, allowing someone else to upload it again.
664 self.log("aborting shareholders", level=log.UNUSUAL)
665 for shareid in list(self.landlords):
666 self.landlords[shareid].abort()
667 if f.check(defer.FirstError):
668 return f.value.subFailure
671 def get_shares_placed(self):
672 # return a set of share numbers that were successfully placed.
673 return self._shares_placed
676 # return a dictionary of encode+push timings
679 def get_uri_extension_data(self):
680 return self.uri_extension_data