1 # -*- test-case-name: allmydata.test.test_encode -*-
4 from zope.interface import implements
5 from twisted.internet import defer
6 from foolscap.api import fireEventually
7 from allmydata import uri
8 from allmydata.storage.server import si_b2a
9 from allmydata.hashtree import HashTree
10 from allmydata.util import mathutil, hashutil, base32, log
11 from allmydata.util.assertutil import _assert, precondition
12 from allmydata.codec import CRSEncoder
13 from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
14 IEncryptedUploadable, IUploadStatus, NotEnoughSharesError, NoSharesError
17 The goal of the encoder is to turn the original file into a series of
18 'shares'. Each share is going to a 'shareholder' (nominally each shareholder
19 is a different host, but for small grids there may be overlap). The number
20 of shares is chosen to hit our reliability goals (more shares on more
21 machines means more reliability), and is limited by overhead (proportional to
22 numshares or log(numshares)) and the encoding technology in use (zfec permits
23 only 256 shares total). It is also constrained by the amount of data
24 we want to send to each host. For estimating purposes, think of 10 shares
25 out of which we need 3 to reconstruct the file.
27 The encoder starts by cutting the original file into segments. All segments
28 except the last are of equal size. The segment size is chosen to constrain
29 the memory footprint (which will probably vary between 1x and 4x segment
30 size) and to constrain the overhead (which will be proportional to
31 log(number of segments)).
34 Each segment (A,B,C) is read into memory, encrypted, and encoded into
35 blocks. The 'share' (say, share #1) that makes it out to a host is a
36 collection of these blocks (block A1, B1, C1), plus some hash-tree
37 information necessary to validate the data upon retrieval. Only one segment
38 is handled at a time: all blocks for segment A are delivered before any
39 work is begun on segment B.
41 As blocks are created, we retain the hash of each one. The list of block hashes
42 for a single share (say, hash(A1), hash(B1), hash(C1)) is used to form the base
43 of a Merkle hash tree for that share, called the block hash tree.
45 This hash tree has one terminal leaf per block. The complete block hash
46 tree is sent to the shareholder after all the data has been sent. At
47 retrieval time, the decoder will ask for specific pieces of this tree before
48 asking for blocks, whichever it needs to validate those blocks.
50 (Note: we don't really need to generate this whole block hash tree
51 ourselves. It would be sufficient to have the shareholder generate it and
52 just tell us the root. This gives us an extra level of validation on the
53 transfer, though, and it is relatively cheap to compute.)
55 Each of these block hash trees has a root hash. The collection of these
56 root hashes for all shares are collected into the 'share hash tree', which
57 has one terminal leaf per share. After sending the blocks and the complete
58 block hash tree to each shareholder, we send them the portion of the share
59 hash tree that is necessary to validate their share. The root of the share
60 hash tree is put into the URI.
64 class UploadAborted(Exception):
73 class Encoder(object):
76 def __init__(self, log_parent=None, upload_status=None):
78 self.uri_extension_data = {}
82 self._status = IUploadStatus(upload_status)
83 precondition(log_parent is None or isinstance(log_parent, int),
85 self._log_number = log.msg("creating Encoder %s" % self,
86 facility="tahoe.encoder", parent=log_parent)
90 if hasattr(self, "_storage_index"):
91 return "<Encoder for %s>" % si_b2a(self._storage_index)[:5]
92 return "<Encoder for unknown storage index>"
94 def log(self, *args, **kwargs):
95 if "parent" not in kwargs:
96 kwargs["parent"] = self._log_number
97 if "facility" not in kwargs:
98 kwargs["facility"] = "tahoe.encoder"
99 return log.msg(*args, **kwargs)
101 def set_encrypted_uploadable(self, uploadable):
102 eu = self._uploadable = IEncryptedUploadable(uploadable)
105 self.log(format="file size: %(size)d", size=size)
106 self.file_size = size
107 d.addCallback(_got_size)
108 d.addCallback(lambda res: eu.get_all_encoding_parameters())
109 d.addCallback(self._got_all_encoding_parameters)
110 d.addCallback(lambda res: eu.get_storage_index())
111 def _done(storage_index):
112 self._storage_index = storage_index
117 def _got_all_encoding_parameters(self, params):
118 assert not self._codec
119 k, happy, n, segsize = params
120 self.required_shares = k
121 self.servers_of_happiness = happy
123 self.segment_size = segsize
124 self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize))
125 self.log("now setting up codec")
127 assert self.segment_size % self.required_shares == 0
129 self.num_segments = mathutil.div_ceil(self.file_size,
132 self._codec = CRSEncoder()
133 self._codec.set_params(self.segment_size,
134 self.required_shares, self.num_shares)
136 data = self.uri_extension_data
137 data['codec_name'] = self._codec.get_encoder_type()
138 data['codec_params'] = self._codec.get_serialized_params()
140 data['size'] = self.file_size
141 data['segment_size'] = self.segment_size
142 self.share_size = mathutil.div_ceil(self.file_size,
143 self.required_shares)
144 data['num_segments'] = self.num_segments
145 data['needed_shares'] = self.required_shares
146 data['total_shares'] = self.num_shares
148 # the "tail" is the last segment. This segment may or may not be
149 # shorter than all other segments. We use the "tail codec" to handle
150 # it. If the tail is short, we use a different codec instance. In
151 # addition, the tail codec must be fed data which has been padded out
153 tail_size = self.file_size % self.segment_size
155 tail_size = self.segment_size
157 # the tail codec is responsible for encoding tail_size bytes
158 padded_tail_size = mathutil.next_multiple(tail_size,
159 self.required_shares)
160 self._tail_codec = CRSEncoder()
161 self._tail_codec.set_params(padded_tail_size,
162 self.required_shares, self.num_shares)
163 data['tail_codec_params'] = self._tail_codec.get_serialized_params()
165 def _get_share_size(self):
166 share_size = mathutil.div_ceil(self.file_size, self.required_shares)
167 overhead = self._compute_overhead()
168 return share_size + overhead
170 def _compute_overhead(self):
173 def get_param(self, name):
176 if name == "storage_index":
177 return self._storage_index
178 elif name == "share_counts":
179 return (self.required_shares, self.servers_of_happiness,
181 elif name == "num_segments":
182 return self.num_segments
183 elif name == "segment_size":
184 return self.segment_size
185 elif name == "block_size":
186 return self._codec.get_block_size()
187 elif name == "share_size":
188 return self._get_share_size()
189 elif name == "serialized_params":
190 return self._codec.get_serialized_params()
192 raise KeyError("unknown parameter name '%s'" % name)
194 def set_shareholders(self, landlords, servermap):
195 assert isinstance(landlords, dict)
197 assert IStorageBucketWriter.providedBy(landlords[k])
198 self.landlords = landlords.copy()
199 assert isinstance(servermap, dict)
200 self.servermap = servermap.copy()
203 """ Returns a Deferred that will fire with the verify cap (an instance of
204 uri.CHKFileVerifierURI)."""
205 self.log("%s starting" % (self,))
206 #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
208 self._crypttext_hasher = hashutil.crypttext_hasher()
209 self._crypttext_hashes = []
211 self.block_hashes = [[] for x in range(self.num_shares)]
212 # block_hashes[i] is a list that will be accumulated and then send
213 # to landlord[i]. This list contains a hash of each segment_share
214 # that we sent to that landlord.
215 self.share_root_hashes = [None] * self.num_shares
218 "cumulative_encoding": 0.0,
219 "cumulative_sending": 0.0,
220 "hashes_and_close": 0.0,
221 "total_encode_and_push": 0.0,
223 self._start_total_timestamp = time.time()
227 d.addCallback(lambda res: self.start_all_shareholders())
229 for i in range(self.num_segments-1):
230 # note to self: this form doesn't work, because lambda only
231 # captures the slot, not the value
232 #d.addCallback(lambda res: self.do_segment(i))
233 # use this form instead:
234 d.addCallback(lambda res, i=i: self._encode_segment(i))
235 d.addCallback(self._send_segment, i)
236 d.addCallback(self._turn_barrier)
237 last_segnum = self.num_segments - 1
238 d.addCallback(lambda res: self._encode_tail_segment(last_segnum))
239 d.addCallback(self._send_segment, last_segnum)
240 d.addCallback(self._turn_barrier)
242 d.addCallback(lambda res: self.finish_hashing())
244 d.addCallback(lambda res:
245 self.send_crypttext_hash_tree_to_all_shareholders())
246 d.addCallback(lambda res: self.send_all_block_hash_trees())
247 d.addCallback(lambda res: self.send_all_share_hash_trees())
248 d.addCallback(lambda res: self.send_uri_extension_to_all_shareholders())
250 d.addCallback(lambda res: self.close_all_shareholders())
251 d.addCallbacks(self.done, self.err)
254 def set_status(self, status):
256 self._status.set_status(status)
258 def set_encode_and_push_progress(self, sent_segments=None, extra=0.0):
260 # we treat the final hash+close as an extra segment
261 if sent_segments is None:
262 sent_segments = self.num_segments
263 progress = float(sent_segments + extra) / (self.num_segments + 1)
264 self._status.set_progress(2, progress)
267 self.log("aborting upload", level=log.UNUSUAL)
268 assert self._codec, "don't call abort before start"
270 # the next segment read (in _gather_data inside _encode_segment) will
271 # raise UploadAborted(), which will bypass the rest of the upload
272 # chain. If we've sent the final segment's shares, it's too late to
273 # abort. TODO: allow abort any time up to close_all_shareholders.
275 def _turn_barrier(self, res):
276 # putting this method in a Deferred chain imposes a guaranteed
277 # reactor turn between the pre- and post- portions of that chain.
278 # This can be useful to limit memory consumption: since Deferreds do
279 # not do tail recursion, code which uses defer.succeed(result) for
280 # consistency will cause objects to live for longer than you might
283 return fireEventually(res)
286 def start_all_shareholders(self):
287 self.log("starting shareholders", level=log.NOISY)
288 self.set_status("Starting shareholders")
290 for shareid in list(self.landlords):
291 d = self.landlords[shareid].put_header()
292 d.addErrback(self._remove_shareholder, shareid, "start")
294 return self._gather_responses(dl)
296 def _encode_segment(self, segnum):
300 # the ICodecEncoder API wants to receive a total of self.segment_size
301 # bytes on each encode() call, broken up into a number of
302 # identically-sized pieces. Due to the way the codec algorithm works,
303 # these pieces need to be the same size as the share which the codec
304 # will generate. Therefore we must feed it with input_piece_size that
305 # equals the output share size.
306 input_piece_size = codec.get_block_size()
308 # as a result, the number of input pieces per encode() call will be
309 # equal to the number of required shares with which the codec was
310 # constructed. You can think of the codec as chopping up a
311 # 'segment_size' of data into 'required_shares' shares (not doing any
312 # fancy math at all, just doing a split), then creating some number
313 # of additional shares which can be substituted if the primary ones
316 crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
318 # memory footprint: we only hold a tiny piece of the plaintext at any
319 # given time. We build up a segment's worth of cryptttext, then hand
320 # it to the encoder. Assuming 3-of-10 encoding (3.3x expansion) and
321 # 1MiB max_segment_size, we get a peak memory footprint of 4.3*1MiB =
322 # 4.3MiB. Lowering max_segment_size to, say, 100KiB would drop the
323 # footprint to 430KiB at the expense of more hash-tree overhead.
325 d = self._gather_data(self.required_shares, input_piece_size,
326 crypttext_segment_hasher)
327 def _done_gathering(chunks):
329 assert len(c) == input_piece_size
330 self._crypttext_hashes.append(crypttext_segment_hasher.digest())
331 # during this call, we hit 5*segsize memory
332 return codec.encode(chunks)
333 d.addCallback(_done_gathering)
335 elapsed = time.time() - start
336 self._times["cumulative_encoding"] += elapsed
341 def _encode_tail_segment(self, segnum):
344 codec = self._tail_codec
345 input_piece_size = codec.get_block_size()
347 crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
349 d = self._gather_data(self.required_shares, input_piece_size,
350 crypttext_segment_hasher,
352 def _done_gathering(chunks):
354 # a short trailing chunk will have been padded by
356 assert len(c) == input_piece_size
357 self._crypttext_hashes.append(crypttext_segment_hasher.digest())
358 return codec.encode(chunks)
359 d.addCallback(_done_gathering)
361 elapsed = time.time() - start
362 self._times["cumulative_encoding"] += elapsed
367 def _gather_data(self, num_chunks, input_chunk_size,
368 crypttext_segment_hasher,
371 """Return a Deferred that will fire when the required number of
372 chunks have been read (and hashed and encrypted). The Deferred fires
373 with the combination of any 'previous_chunks' and the new chunks
374 which were gathered."""
377 raise UploadAborted()
380 return defer.succeed(previous_chunks)
382 d = self._uploadable.read_encrypted(input_chunk_size, False)
385 raise UploadAborted()
386 encrypted_pieces = []
389 encrypted_piece = data.pop(0)
390 length += len(encrypted_piece)
391 crypttext_segment_hasher.update(encrypted_piece)
392 self._crypttext_hasher.update(encrypted_piece)
393 encrypted_pieces.append(encrypted_piece)
395 precondition(length <= input_chunk_size,
396 "length=%d > input_chunk_size=%d" %
397 (length, input_chunk_size))
399 if length < input_chunk_size:
401 pad_size = input_chunk_size - length
402 encrypted_pieces.append('\x00' * pad_size)
404 # non-tail segments should be the full segment size
405 if length != input_chunk_size:
406 log.msg("non-tail segment should be full segment size: %d!=%d"
407 % (length, input_chunk_size),
408 level=log.BAD, umid="jNk5Yw")
409 precondition(length == input_chunk_size,
410 "length=%d != input_chunk_size=%d" %
411 (length, input_chunk_size))
413 encrypted_piece = "".join(encrypted_pieces)
414 return previous_chunks + [encrypted_piece]
417 d.addCallback(lambda chunks:
418 self._gather_data(num_chunks-1, input_chunk_size,
419 crypttext_segment_hasher,
420 allow_short, chunks))
423 def _send_segment(self, (shares, shareids), segnum):
424 # To generate the URI, we must generate the roothash, so we must
425 # generate all shares, even if we aren't actually giving them to
426 # anybody. This means that the set of shares we create will be equal
427 # to or larger than the set of landlords. If we have any landlord who
428 # *doesn't* have a share, that's an error.
429 _assert(set(self.landlords.keys()).issubset(set(shareids)),
430 shareids=shareids, landlords=self.landlords)
433 self.set_status("Sending segment %d of %d" % (segnum+1,
435 self.set_encode_and_push_progress(segnum)
436 lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY)
437 for i in range(len(shares)):
439 shareid = shareids[i]
440 d = self.send_block(shareid, segnum, block, lognum)
442 block_hash = hashutil.block_hash(block)
443 #from allmydata.util import base32
444 #log.msg("creating block (shareid=%d, blocknum=%d) "
445 # "len=%d %r .. %r: %s" %
446 # (shareid, segnum, len(block),
447 # block[:50], block[-50:], base32.b2a(block_hash)))
448 self.block_hashes[shareid].append(block_hash)
450 dl = self._gather_responses(dl)
452 self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
454 self.segment_size*(segnum+1),
455 self.segment_size*self.num_segments,
456 100 * (segnum+1) / self.num_segments,
458 level=log.OPERATIONAL)
459 elapsed = time.time() - start
460 self._times["cumulative_sending"] += elapsed
462 dl.addCallback(_logit)
465 def send_block(self, shareid, segment_num, block, lognum):
466 if shareid not in self.landlords:
467 return defer.succeed(None)
468 sh = self.landlords[shareid]
469 lognum2 = self.log("put_block to %s" % self.landlords[shareid],
470 parent=lognum, level=log.NOISY)
471 d = sh.put_block(segment_num, block)
473 self.log("put_block done", parent=lognum2, level=log.NOISY)
476 d.addErrback(self._remove_shareholder, shareid,
477 "segnum=%d" % segment_num)
480 def _remove_shareholder(self, why, shareid, where):
481 ln = self.log(format="error while sending %(method)s to shareholder=%(shnum)d",
482 method=where, shnum=shareid,
483 level=log.UNUSUAL, failure=why)
484 if shareid in self.landlords:
485 self.landlords[shareid].abort()
486 del self.landlords[shareid]
489 self.log("they weren't in our list of landlords", parent=ln,
490 level=log.WEIRD, umid="TQGFRw")
491 del(self.servermap[shareid])
492 servers_left = list(set(self.servermap.values()))
493 if len(servers_left) < self.servers_of_happiness:
494 msg = "lost too many servers during upload (still have %d, want %d): %s" % \
496 self.servers_of_happiness, why)
498 raise NotEnoughSharesError(msg)
500 raise NoSharesError(msg)
501 self.log("but we can still continue with %s shares, we'll be happy "
502 "with at least %s" % (len(servers_left),
503 self.servers_of_happiness),
506 def _gather_responses(self, dl):
507 d = defer.DeferredList(dl, fireOnOneErrback=True)
508 def _eatNotEnoughSharesError(f):
509 # all exceptions that occur while talking to a peer are handled
510 # in _remove_shareholder. That might raise NotEnoughSharesError,
511 # which will cause the DeferredList to errback but which should
512 # otherwise be consumed. Allow non-NotEnoughSharesError exceptions
513 # to pass through as an unhandled errback. We use this in lieu of
514 # consumeErrors=True to allow coding errors to be logged.
515 f.trap(NotEnoughSharesError, NoSharesError)
518 d0.addErrback(_eatNotEnoughSharesError)
521 def finish_hashing(self):
522 self._start_hashing_and_close_timestamp = time.time()
523 self.set_status("Finishing hashes")
524 self.set_encode_and_push_progress(extra=0.0)
525 crypttext_hash = self._crypttext_hasher.digest()
526 self.uri_extension_data["crypttext_hash"] = crypttext_hash
527 self._uploadable.close()
529 def send_crypttext_hash_tree_to_all_shareholders(self):
530 self.log("sending crypttext hash tree", level=log.NOISY)
531 self.set_status("Sending Crypttext Hash Tree")
532 self.set_encode_and_push_progress(extra=0.3)
533 t = HashTree(self._crypttext_hashes)
535 self.uri_extension_data["crypttext_root_hash"] = t[0]
537 for shareid in list(self.landlords):
538 dl.append(self.send_crypttext_hash_tree(shareid, all_hashes))
539 return self._gather_responses(dl)
541 def send_crypttext_hash_tree(self, shareid, all_hashes):
542 if shareid not in self.landlords:
543 return defer.succeed(None)
544 sh = self.landlords[shareid]
545 d = sh.put_crypttext_hashes(all_hashes)
546 d.addErrback(self._remove_shareholder, shareid, "put_crypttext_hashes")
549 def send_all_block_hash_trees(self):
550 self.log("sending block hash trees", level=log.NOISY)
551 self.set_status("Sending Subshare Hash Trees")
552 self.set_encode_and_push_progress(extra=0.4)
554 for shareid,hashes in enumerate(self.block_hashes):
555 # hashes is a list of the hashes of all blocks that were sent
556 # to shareholder[shareid].
557 dl.append(self.send_one_block_hash_tree(shareid, hashes))
558 return self._gather_responses(dl)
560 def send_one_block_hash_tree(self, shareid, block_hashes):
561 t = HashTree(block_hashes)
563 # all_hashes[0] is the root hash, == hash(ah[1]+ah[2])
564 # all_hashes[1] is the left child, == hash(ah[3]+ah[4])
565 # all_hashes[n] == hash(all_hashes[2*n+1] + all_hashes[2*n+2])
566 self.share_root_hashes[shareid] = t[0]
567 if shareid not in self.landlords:
568 return defer.succeed(None)
569 sh = self.landlords[shareid]
570 d = sh.put_block_hashes(all_hashes)
571 d.addErrback(self._remove_shareholder, shareid, "put_block_hashes")
574 def send_all_share_hash_trees(self):
575 # Each bucket gets a set of share hash tree nodes that are needed to validate their
576 # share. This includes the share hash itself, but does not include the top-level hash
577 # root (which is stored securely in the URI instead).
578 self.log("sending all share hash trees", level=log.NOISY)
579 self.set_status("Sending Share Hash Trees")
580 self.set_encode_and_push_progress(extra=0.6)
582 for h in self.share_root_hashes:
584 # create the share hash tree
585 t = HashTree(self.share_root_hashes)
586 # the root of this hash tree goes into our URI
587 self.uri_extension_data['share_root_hash'] = t[0]
588 # now send just the necessary pieces out to each shareholder
589 for i in range(self.num_shares):
590 # the HashTree is given a list of leaves: 0,1,2,3..n .
591 # These become nodes A+0,A+1,A+2.. of the tree, where A=n-1
592 needed_hash_indices = t.needed_hashes(i, include_leaf=True)
593 hashes = [(hi, t[hi]) for hi in needed_hash_indices]
594 dl.append(self.send_one_share_hash_tree(i, hashes))
595 return self._gather_responses(dl)
597 def send_one_share_hash_tree(self, shareid, needed_hashes):
598 if shareid not in self.landlords:
599 return defer.succeed(None)
600 sh = self.landlords[shareid]
601 d = sh.put_share_hashes(needed_hashes)
602 d.addErrback(self._remove_shareholder, shareid, "put_share_hashes")
605 def send_uri_extension_to_all_shareholders(self):
606 lp = self.log("sending uri_extension", level=log.NOISY)
607 self.set_status("Sending URI Extensions")
608 self.set_encode_and_push_progress(extra=0.8)
609 for k in ('crypttext_root_hash', 'crypttext_hash',
611 assert k in self.uri_extension_data
612 uri_extension = uri.pack_extension(self.uri_extension_data)
614 for k,v in self.uri_extension_data.items():
615 if k.endswith("hash"):
616 ed[k] = base32.b2a(v)
619 self.log("uri_extension_data is %s" % (ed,), level=log.NOISY, parent=lp)
620 self.uri_extension_hash = hashutil.uri_extension_hash(uri_extension)
622 for shareid in list(self.landlords):
623 dl.append(self.send_uri_extension(shareid, uri_extension))
624 return self._gather_responses(dl)
626 def send_uri_extension(self, shareid, uri_extension):
627 sh = self.landlords[shareid]
628 d = sh.put_uri_extension(uri_extension)
629 d.addErrback(self._remove_shareholder, shareid, "put_uri_extension")
632 def close_all_shareholders(self):
633 self.log("closing shareholders", level=log.NOISY)
634 self.set_status("Closing Shareholders")
635 self.set_encode_and_push_progress(extra=0.9)
637 for shareid in list(self.landlords):
638 d = self.landlords[shareid].close()
639 d.addErrback(self._remove_shareholder, shareid, "close")
641 return self._gather_responses(dl)
644 self.log("upload done", level=log.OPERATIONAL)
645 self.set_status("Finished")
646 self.set_encode_and_push_progress(extra=1.0) # done
648 h_and_c_elapsed = now - self._start_hashing_and_close_timestamp
649 self._times["hashes_and_close"] = h_and_c_elapsed
650 total_elapsed = now - self._start_total_timestamp
651 self._times["total_encode_and_push"] = total_elapsed
653 # update our sharemap
654 self._shares_placed = set(self.landlords.keys())
655 return uri.CHKFileVerifierURI(self._storage_index, self.uri_extension_hash,
656 self.required_shares, self.num_shares, self.file_size)
659 self.log("upload failed", failure=f, level=log.UNUSUAL)
660 self.set_status("Failed")
661 # we need to abort any remaining shareholders, so they'll delete the
662 # partial share, allowing someone else to upload it again.
663 self.log("aborting shareholders", level=log.UNUSUAL)
664 for shareid in list(self.landlords):
665 self.landlords[shareid].abort()
666 if f.check(defer.FirstError):
667 return f.value.subFailure
670 def get_shares_placed(self):
671 # return a set of share numbers that were successfully placed.
672 return self._shares_placed
675 # return a dictionary of encode+push timings
678 def get_uri_extension_data(self):
679 return self.uri_extension_data