]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/encode.py
a2a134389cd72e29a677ca2fb6a36863abee5883
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / encode.py
1 # -*- test-case-name: allmydata.test.test_encode -*-
2
3 import time
4 from zope.interface import implements
5 from twisted.internet import defer
6 from foolscap import eventual
7 from allmydata import storage, uri
8 from allmydata.hashtree import HashTree
9 from allmydata.util import mathutil, hashutil, base32, log
10 from allmydata.util.assertutil import _assert, precondition
11 from allmydata.codec import CRSEncoder
12 from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
13      IEncryptedUploadable, IUploadStatus, NotEnoughSharesError
14
15 """
16 The goal of the encoder is to turn the original file into a series of
17 'shares'. Each share is going to a 'shareholder' (nominally each shareholder
18 is a different host, but for small grids there may be overlap). The number
19 of shares is chosen to hit our reliability goals (more shares on more
20 machines means more reliability), and is limited by overhead (proportional to
21 numshares or log(numshares)) and the encoding technology in use (zfec permits
22 only 256 shares total). It is also constrained by the amount of data
23 we want to send to each host. For estimating purposes, think of 10 shares
24 out of which we need 3 to reconstruct the file.
25
26 The encoder starts by cutting the original file into segments. All segments
27 except the last are of equal size. The segment size is chosen to constrain
28 the memory footprint (which will probably vary between 1x and 4x segment
29 size) and to constrain the overhead (which will be proportional to
30 log(number of segments)).
31
32
33 Each segment (A,B,C) is read into memory, encrypted, and encoded into
34 blocks. The 'share' (say, share #1) that makes it out to a host is a
35 collection of these blocks (block A1, B1, C1), plus some hash-tree
36 information necessary to validate the data upon retrieval. Only one segment
37 is handled at a time: all blocks for segment A are delivered before any
38 work is begun on segment B.
39
40 As blocks are created, we retain the hash of each one. The list of block hashes
41 for a single share (say, hash(A1), hash(B1), hash(C1)) is used to form the base
42 of a Merkle hash tree for that share, called the block hash tree.
43
44 This hash tree has one terminal leaf per block. The complete block hash
45 tree is sent to the shareholder after all the data has been sent. At
46 retrieval time, the decoder will ask for specific pieces of this tree before
47 asking for blocks, whichever it needs to validate those blocks.
48
49 (Note: we don't really need to generate this whole block hash tree
50 ourselves. It would be sufficient to have the shareholder generate it and
51 just tell us the root. This gives us an extra level of validation on the
52 transfer, though, and it is relatively cheap to compute.)
53
54 Each of these block hash trees has a root hash. The collection of these
55 root hashes for all shares are collected into the 'share hash tree', which
56 has one terminal leaf per share. After sending the blocks and the complete
57 block hash tree to each shareholder, we send them the portion of the share
58 hash tree that is necessary to validate their share. The root of the share
59 hash tree is put into the URI.
60
61 """
62
63 class UploadAborted(Exception):
64     pass
65
66 KiB=1024
67 MiB=1024*KiB
68 GiB=1024*MiB
69 TiB=1024*GiB
70 PiB=1024*TiB
71
72 class Encoder(object):
73     implements(IEncoder)
74     USE_PLAINTEXT_HASHES = False
75
76     def __init__(self, log_parent=None, upload_status=None):
77         object.__init__(self)
78         self.uri_extension_data = {}
79         self._codec = None
80         self._status = None
81         if upload_status:
82             self._status = IUploadStatus(upload_status)
83         precondition(log_parent is None or isinstance(log_parent, int),
84                      log_parent)
85         self._log_number = log.msg("creating Encoder %s" % self,
86                                    facility="tahoe.encoder", parent=log_parent)
87         self._aborted = False
88
89     def __repr__(self):
90         if hasattr(self, "_storage_index"):
91             return "<Encoder for %s>" % storage.si_b2a(self._storage_index)[:5]
92         return "<Encoder for unknown storage index>"
93
94     def log(self, *args, **kwargs):
95         if "parent" not in kwargs:
96             kwargs["parent"] = self._log_number
97         if "facility" not in kwargs:
98             kwargs["facility"] = "tahoe.encoder"
99         return log.msg(*args, **kwargs)
100
101     def set_encrypted_uploadable(self, uploadable):
102         eu = self._uploadable = IEncryptedUploadable(uploadable)
103         d = eu.get_size()
104         def _got_size(size):
105             self.log(format="file size: %(size)d", size=size)
106             self.file_size = size
107         d.addCallback(_got_size)
108         d.addCallback(lambda res: eu.get_all_encoding_parameters())
109         d.addCallback(self._got_all_encoding_parameters)
110         d.addCallback(lambda res: eu.get_storage_index())
111         def _done(storage_index):
112             self._storage_index = storage_index
113             return self
114         d.addCallback(_done)
115         return d
116
117     def _got_all_encoding_parameters(self, params):
118         assert not self._codec
119         k, happy, n, segsize = params
120         self.required_shares = k
121         self.shares_of_happiness = happy
122         self.num_shares = n
123         self.segment_size = segsize
124         self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize))
125         self.log("now setting up codec")
126
127         assert self.segment_size % self.required_shares == 0
128
129         self.num_segments = mathutil.div_ceil(self.file_size,
130                                               self.segment_size)
131
132         self._codec = CRSEncoder()
133         self._codec.set_params(self.segment_size,
134                                self.required_shares, self.num_shares)
135
136         data = self.uri_extension_data
137         data['codec_name'] = self._codec.get_encoder_type()
138         data['codec_params'] = self._codec.get_serialized_params()
139
140         data['size'] = self.file_size
141         data['segment_size'] = self.segment_size
142         self.share_size = mathutil.div_ceil(self.file_size,
143                                             self.required_shares)
144         data['num_segments'] = self.num_segments
145         data['needed_shares'] = self.required_shares
146         data['total_shares'] = self.num_shares
147
148         # the "tail" is the last segment. This segment may or may not be
149         # shorter than all other segments. We use the "tail codec" to handle
150         # it. If the tail is short, we use a different codec instance. In
151         # addition, the tail codec must be fed data which has been padded out
152         # to the right size.
153         self.tail_size = self.file_size % self.segment_size
154         if not self.tail_size:
155             self.tail_size = self.segment_size
156
157         # the tail codec is responsible for encoding tail_size bytes
158         padded_tail_size = mathutil.next_multiple(self.tail_size,
159                                                   self.required_shares)
160         self._tail_codec = CRSEncoder()
161         self._tail_codec.set_params(padded_tail_size,
162                                     self.required_shares, self.num_shares)
163         data['tail_codec_params'] = self._tail_codec.get_serialized_params()
164
165     def _get_share_size(self):
166         share_size = mathutil.div_ceil(self.file_size, self.required_shares)
167         overhead = self._compute_overhead()
168         return share_size + overhead
169
170     def _compute_overhead(self):
171         return 0
172
173     def get_param(self, name):
174         assert self._codec
175
176         if name == "storage_index":
177             return self._storage_index
178         elif name == "share_counts":
179             return (self.required_shares, self.shares_of_happiness,
180                     self.num_shares)
181         elif name == "num_segments":
182             return self.num_segments
183         elif name == "segment_size":
184             return self.segment_size
185         elif name == "block_size":
186             return self._codec.get_block_size()
187         elif name == "share_size":
188             return self._get_share_size()
189         elif name == "serialized_params":
190             return self._codec.get_serialized_params()
191         else:
192             raise KeyError("unknown parameter name '%s'" % name)
193
194     def set_shareholders(self, landlords):
195         assert isinstance(landlords, dict)
196         for k in landlords:
197             assert IStorageBucketWriter.providedBy(landlords[k])
198         self.landlords = landlords.copy()
199
200     def start(self):
201         self.log("%s starting" % (self,))
202         #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
203         assert self._codec
204         self._crypttext_hasher = hashutil.crypttext_hasher()
205         self._crypttext_hashes = []
206         self.segment_num = 0
207         self.subshare_hashes = [[] for x in range(self.num_shares)]
208         # subshare_hashes[i] is a list that will be accumulated and then send
209         # to landlord[i]. This list contains a hash of each segment_share
210         # that we sent to that landlord.
211         self.share_root_hashes = [None] * self.num_shares
212
213         self._times = {
214             "cumulative_encoding": 0.0,
215             "cumulative_sending": 0.0,
216             "hashes_and_close": 0.0,
217             "total_encode_and_push": 0.0,
218             }
219         self._start_total_timestamp = time.time()
220
221         d = eventual.fireEventually()
222
223         d.addCallback(lambda res: self.start_all_shareholders())
224
225         for i in range(self.num_segments-1):
226             # note to self: this form doesn't work, because lambda only
227             # captures the slot, not the value
228             #d.addCallback(lambda res: self.do_segment(i))
229             # use this form instead:
230             d.addCallback(lambda res, i=i: self._encode_segment(i))
231             d.addCallback(self._send_segment, i)
232             d.addCallback(self._turn_barrier)
233         last_segnum = self.num_segments - 1
234         d.addCallback(lambda res: self._encode_tail_segment(last_segnum))
235         d.addCallback(self._send_segment, last_segnum)
236         d.addCallback(self._turn_barrier)
237
238         d.addCallback(lambda res: self.finish_hashing())
239
240         if self.USE_PLAINTEXT_HASHES:
241             d.addCallback(lambda res:
242                           self.send_plaintext_hash_tree_to_all_shareholders())
243         d.addCallback(lambda res:
244                       self.send_crypttext_hash_tree_to_all_shareholders())
245         d.addCallback(lambda res: self.send_all_subshare_hash_trees())
246         d.addCallback(lambda res: self.send_all_share_hash_trees())
247         d.addCallback(lambda res: self.send_uri_extension_to_all_shareholders())
248
249         d.addCallback(lambda res: self.close_all_shareholders())
250         d.addCallbacks(self.done, self.err)
251         return d
252
253     def set_status(self, status):
254         if self._status:
255             self._status.set_status(status)
256
257     def set_encode_and_push_progress(self, sent_segments=None, extra=0.0):
258         if self._status:
259             # we treat the final hash+close as an extra segment
260             if sent_segments is None:
261                 sent_segments = self.num_segments
262             progress = float(sent_segments + extra) / (self.num_segments + 1)
263             self._status.set_progress(2, progress)
264
265     def abort(self):
266         self.log("aborting upload", level=log.UNUSUAL)
267         assert self._codec, "don't call abort before start"
268         self._aborted = True
269         # the next segment read (in _gather_data inside _encode_segment) will
270         # raise UploadAborted(), which will bypass the rest of the upload
271         # chain. If we've sent the final segment's shares, it's too late to
272         # abort. TODO: allow abort any time up to close_all_shareholders.
273
274     def _turn_barrier(self, res):
275         # putting this method in a Deferred chain imposes a guaranteed
276         # reactor turn between the pre- and post- portions of that chain.
277         # This can be useful to limit memory consumption: since Deferreds do
278         # not do tail recursion, code which uses defer.succeed(result) for
279         # consistency will cause objects to live for longer than you might
280         # normally expect.
281
282         return eventual.fireEventually(res)
283
284
285     def start_all_shareholders(self):
286         self.log("starting shareholders", level=log.NOISY)
287         self.set_status("Starting shareholders")
288         dl = []
289         for shareid in self.landlords:
290             d = self.landlords[shareid].start()
291             d.addErrback(self._remove_shareholder, shareid, "start")
292             dl.append(d)
293         return self._gather_responses(dl)
294
295     def _encode_segment(self, segnum):
296         codec = self._codec
297         start = time.time()
298
299         # the ICodecEncoder API wants to receive a total of self.segment_size
300         # bytes on each encode() call, broken up into a number of
301         # identically-sized pieces. Due to the way the codec algorithm works,
302         # these pieces need to be the same size as the share which the codec
303         # will generate. Therefore we must feed it with input_piece_size that
304         # equals the output share size.
305         input_piece_size = codec.get_block_size()
306
307         # as a result, the number of input pieces per encode() call will be
308         # equal to the number of required shares with which the codec was
309         # constructed. You can think of the codec as chopping up a
310         # 'segment_size' of data into 'required_shares' shares (not doing any
311         # fancy math at all, just doing a split), then creating some number
312         # of additional shares which can be substituted if the primary ones
313         # are unavailable
314
315         crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
316
317         # memory footprint: we only hold a tiny piece of the plaintext at any
318         # given time. We build up a segment's worth of cryptttext, then hand
319         # it to the encoder. Assuming 3-of-10 encoding (3.3x expansion) and
320         # 1MiB max_segment_size, we get a peak memory footprint of 4.3*1MiB =
321         # 4.3MiB. Lowering max_segment_size to, say, 100KiB would drop the
322         # footprint to 430KiB at the expense of more hash-tree overhead.
323
324         d = self._gather_data(self.required_shares, input_piece_size,
325                               crypttext_segment_hasher)
326         def _done_gathering(chunks):
327             for c in chunks:
328                 assert len(c) == input_piece_size
329             self._crypttext_hashes.append(crypttext_segment_hasher.digest())
330             # during this call, we hit 5*segsize memory
331             return codec.encode(chunks)
332         d.addCallback(_done_gathering)
333         def _done(res):
334             elapsed = time.time() - start
335             self._times["cumulative_encoding"] += elapsed
336             return res
337         d.addCallback(_done)
338         return d
339
340     def _encode_tail_segment(self, segnum):
341
342         start = time.time()
343         codec = self._tail_codec
344         input_piece_size = codec.get_block_size()
345
346         crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
347
348         d = self._gather_data(self.required_shares, input_piece_size,
349                               crypttext_segment_hasher,
350                               allow_short=True)
351         def _done_gathering(chunks):
352             for c in chunks:
353                 # a short trailing chunk will have been padded by
354                 # _gather_data
355                 assert len(c) == input_piece_size
356             self._crypttext_hashes.append(crypttext_segment_hasher.digest())
357             return codec.encode(chunks)
358         d.addCallback(_done_gathering)
359         def _done(res):
360             elapsed = time.time() - start
361             self._times["cumulative_encoding"] += elapsed
362             return res
363         d.addCallback(_done)
364         return d
365
366     def _gather_data(self, num_chunks, input_chunk_size,
367                      crypttext_segment_hasher,
368                      allow_short=False,
369                      previous_chunks=[]):
370         """Return a Deferred that will fire when the required number of
371         chunks have been read (and hashed and encrypted). The Deferred fires
372         with the combination of any 'previous_chunks' and the new chunks
373         which were gathered."""
374
375         if self._aborted:
376             raise UploadAborted()
377
378         if not num_chunks:
379             return defer.succeed(previous_chunks)
380
381         d = self._uploadable.read_encrypted(input_chunk_size, False)
382         def _got(data):
383             if self._aborted:
384                 raise UploadAborted()
385             encrypted_pieces = []
386             length = 0
387             while data:
388                 encrypted_piece = data.pop(0)
389                 length += len(encrypted_piece)
390                 crypttext_segment_hasher.update(encrypted_piece)
391                 self._crypttext_hasher.update(encrypted_piece)
392                 encrypted_pieces.append(encrypted_piece)
393
394             if allow_short:
395                 if length < input_chunk_size:
396                     # padding
397                     pad_size = input_chunk_size - length
398                     encrypted_pieces.append('\x00' * pad_size)
399             else:
400                 # non-tail segments should be the full segment size
401                 if length != input_chunk_size:
402                     log.msg("non-tail segment should be full segment size: %d!=%d"
403                             % (length, input_chunk_size),
404                             level=log.BAD, umid="jNk5Yw")
405                 precondition(length == input_chunk_size,
406                              "length=%d != input_chunk_size=%d" %
407                              (length, input_chunk_size))
408
409             encrypted_piece = "".join(encrypted_pieces)
410             return previous_chunks + [encrypted_piece]
411
412         d.addCallback(_got)
413         d.addCallback(lambda chunks:
414                       self._gather_data(num_chunks-1, input_chunk_size,
415                                         crypttext_segment_hasher,
416                                         allow_short, chunks))
417         return d
418
419     def _send_segment(self, (shares, shareids), segnum):
420         # To generate the URI, we must generate the roothash, so we must
421         # generate all shares, even if we aren't actually giving them to
422         # anybody. This means that the set of shares we create will be equal
423         # to or larger than the set of landlords. If we have any landlord who
424         # *doesn't* have a share, that's an error.
425         _assert(set(self.landlords.keys()).issubset(set(shareids)),
426                 shareids=shareids, landlords=self.landlords)
427         start = time.time()
428         dl = []
429         self.set_status("Sending segment %d of %d" % (segnum+1,
430                                                       self.num_segments))
431         self.set_encode_and_push_progress(segnum)
432         lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY)
433         for i in range(len(shares)):
434             subshare = shares[i]
435             shareid = shareids[i]
436             d = self.send_subshare(shareid, segnum, subshare, lognum)
437             dl.append(d)
438             subshare_hash = hashutil.block_hash(subshare)
439             #from allmydata.util import base32
440             #log.msg("creating block (shareid=%d, blocknum=%d) "
441             #        "len=%d %r .. %r: %s" %
442             #        (shareid, segnum, len(subshare),
443             #         subshare[:50], subshare[-50:], base32.b2a(subshare_hash)))
444             self.subshare_hashes[shareid].append(subshare_hash)
445
446         dl = self._gather_responses(dl)
447         def _logit(res):
448             self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
449                      (self,
450                       self.segment_size*(segnum+1),
451                       self.segment_size*self.num_segments,
452                       100 * (segnum+1) / self.num_segments,
453                       ),
454                      level=log.OPERATIONAL)
455             elapsed = time.time() - start
456             self._times["cumulative_sending"] += elapsed
457             return res
458         dl.addCallback(_logit)
459         return dl
460
461     def send_subshare(self, shareid, segment_num, subshare, lognum):
462         if shareid not in self.landlords:
463             return defer.succeed(None)
464         sh = self.landlords[shareid]
465         lognum2 = self.log("put_block to %s" % self.landlords[shareid],
466                            parent=lognum, level=log.NOISY)
467         d = sh.put_block(segment_num, subshare)
468         def _done(res):
469             self.log("put_block done", parent=lognum2, level=log.NOISY)
470             return res
471         d.addCallback(_done)
472         d.addErrback(self._remove_shareholder, shareid,
473                      "segnum=%d" % segment_num)
474         return d
475
476     def _remove_shareholder(self, why, shareid, where):
477         ln = self.log(format="error while sending %(method)s to shareholder=%(shnum)d",
478                       method=where, shnum=shareid,
479                       level=log.UNUSUAL, failure=why)
480         if shareid in self.landlords:
481             self.landlords[shareid].abort()
482             del self.landlords[shareid]
483         else:
484             # even more UNUSUAL
485             self.log("they weren't in our list of landlords", parent=ln,
486                      level=log.WEIRD, umid="TQGFRw")
487         if len(self.landlords) < self.shares_of_happiness:
488             msg = "lost too many shareholders during upload: %s" % why
489             raise NotEnoughSharesError(msg)
490         self.log("but we can still continue with %s shares, we'll be happy "
491                  "with at least %s" % (len(self.landlords),
492                                        self.shares_of_happiness),
493                  parent=ln)
494
495     def _gather_responses(self, dl):
496         d = defer.DeferredList(dl, fireOnOneErrback=True)
497         def _eatNotEnoughSharesError(f):
498             # all exceptions that occur while talking to a peer are handled
499             # in _remove_shareholder. That might raise NotEnoughSharesError,
500             # which will cause the DeferredList to errback but which should
501             # otherwise be consumed. Allow non-NotEnoughSharesError exceptions
502             # to pass through as an unhandled errback. We use this in lieu of
503             # consumeErrors=True to allow coding errors to be logged.
504             f.trap(NotEnoughSharesError)
505             return None
506         for d0 in dl:
507             d0.addErrback(_eatNotEnoughSharesError)
508         return d
509
510     def finish_hashing(self):
511         self._start_hashing_and_close_timestamp = time.time()
512         self.set_status("Finishing hashes")
513         self.set_encode_and_push_progress(extra=0.0)
514         crypttext_hash = self._crypttext_hasher.digest()
515         self.uri_extension_data["crypttext_hash"] = crypttext_hash
516         d = self._uploadable.get_plaintext_hash()
517         def _got(plaintext_hash):
518             self.log(format="plaintext_hash=%(plaintext_hash)s, SI=%(SI)s, size=%(size)d",
519                      plaintext_hash=base32.b2a(plaintext_hash),
520                      SI=storage.si_b2a(self._storage_index),
521                      size=self.file_size)
522             return plaintext_hash
523         d.addCallback(_got)
524         if self.USE_PLAINTEXT_HASHES:
525             def _use_plaintext_hash(plaintext_hash):
526                 self.uri_extension_data["plaintext_hash"] = plaintext_hash
527                 return self._uploadable.get_plaintext_hashtree_leaves(0, self.num_segments, self.num_segments)
528             d.addCallback(_use_plaintext_hash)
529             def _got_hashtree_leaves(leaves):
530                 self.log("Encoder: got plaintext_hashtree_leaves: %s" %
531                          (",".join([base32.b2a(h) for h in leaves]),),
532                          level=log.NOISY)
533                 ht = list(HashTree(list(leaves)))
534                 self.uri_extension_data["plaintext_root_hash"] = ht[0]
535                 self._plaintext_hashtree_nodes = ht
536             d.addCallback(_got_hashtree_leaves)
537
538         d.addCallback(lambda res: self._uploadable.close())
539         return d
540
541     def send_plaintext_hash_tree_to_all_shareholders(self):
542         self.log("sending plaintext hash tree", level=log.NOISY)
543         self.set_status("Sending Plaintext Hash Tree")
544         self.set_encode_and_push_progress(extra=0.2)
545         dl = []
546         for shareid in self.landlords.keys():
547             d = self.send_plaintext_hash_tree(shareid,
548                                               self._plaintext_hashtree_nodes)
549             dl.append(d)
550         return self._gather_responses(dl)
551
552     def send_plaintext_hash_tree(self, shareid, all_hashes):
553         if shareid not in self.landlords:
554             return defer.succeed(None)
555         sh = self.landlords[shareid]
556         d = sh.put_plaintext_hashes(all_hashes)
557         d.addErrback(self._remove_shareholder, shareid, "put_plaintext_hashes")
558         return d
559
560     def send_crypttext_hash_tree_to_all_shareholders(self):
561         self.log("sending crypttext hash tree", level=log.NOISY)
562         self.set_status("Sending Crypttext Hash Tree")
563         self.set_encode_and_push_progress(extra=0.3)
564         t = HashTree(self._crypttext_hashes)
565         all_hashes = list(t)
566         self.uri_extension_data["crypttext_root_hash"] = t[0]
567         dl = []
568         for shareid in self.landlords.keys():
569             dl.append(self.send_crypttext_hash_tree(shareid, all_hashes))
570         return self._gather_responses(dl)
571
572     def send_crypttext_hash_tree(self, shareid, all_hashes):
573         if shareid not in self.landlords:
574             return defer.succeed(None)
575         sh = self.landlords[shareid]
576         d = sh.put_crypttext_hashes(all_hashes)
577         d.addErrback(self._remove_shareholder, shareid, "put_crypttext_hashes")
578         return d
579
580     def send_all_subshare_hash_trees(self):
581         self.log("sending subshare hash trees", level=log.NOISY)
582         self.set_status("Sending Subshare Hash Trees")
583         self.set_encode_and_push_progress(extra=0.4)
584         dl = []
585         for shareid,hashes in enumerate(self.subshare_hashes):
586             # hashes is a list of the hashes of all subshares that were sent
587             # to shareholder[shareid].
588             dl.append(self.send_one_subshare_hash_tree(shareid, hashes))
589         return self._gather_responses(dl)
590
591     def send_one_subshare_hash_tree(self, shareid, subshare_hashes):
592         t = HashTree(subshare_hashes)
593         all_hashes = list(t)
594         # all_hashes[0] is the root hash, == hash(ah[1]+ah[2])
595         # all_hashes[1] is the left child, == hash(ah[3]+ah[4])
596         # all_hashes[n] == hash(all_hashes[2*n+1] + all_hashes[2*n+2])
597         self.share_root_hashes[shareid] = t[0]
598         if shareid not in self.landlords:
599             return defer.succeed(None)
600         sh = self.landlords[shareid]
601         d = sh.put_block_hashes(all_hashes)
602         d.addErrback(self._remove_shareholder, shareid, "put_block_hashes")
603         return d
604
605     def send_all_share_hash_trees(self):
606         # each bucket gets a set of share hash tree nodes that are needed to
607         # validate their share. This includes the share hash itself, but does
608         # not include the top-level hash root (which is stored securely in
609         # the URI instead).
610         self.log("sending all share hash trees", level=log.NOISY)
611         self.set_status("Sending Share Hash Trees")
612         self.set_encode_and_push_progress(extra=0.6)
613         dl = []
614         for h in self.share_root_hashes:
615             assert h
616         # create the share hash tree
617         t = HashTree(self.share_root_hashes)
618         # the root of this hash tree goes into our URI
619         self.uri_extension_data['share_root_hash'] = t[0]
620         # now send just the necessary pieces out to each shareholder
621         for i in range(self.num_shares):
622             # the HashTree is given a list of leaves: 0,1,2,3..n .
623             # These become nodes A+0,A+1,A+2.. of the tree, where A=n-1
624             needed_hash_indices = t.needed_hashes(i, include_leaf=True)
625             hashes = [(hi, t[hi]) for hi in needed_hash_indices]
626             dl.append(self.send_one_share_hash_tree(i, hashes))
627         return self._gather_responses(dl)
628
629     def send_one_share_hash_tree(self, shareid, needed_hashes):
630         if shareid not in self.landlords:
631             return defer.succeed(None)
632         sh = self.landlords[shareid]
633         d = sh.put_share_hashes(needed_hashes)
634         d.addErrback(self._remove_shareholder, shareid, "put_share_hashes")
635         return d
636
637     def send_uri_extension_to_all_shareholders(self):
638         lp = self.log("sending uri_extension", level=log.NOISY)
639         self.set_status("Sending URI Extensions")
640         self.set_encode_and_push_progress(extra=0.8)
641         for k in ('crypttext_root_hash', 'crypttext_hash',
642                   ):
643             assert k in self.uri_extension_data
644         if self.USE_PLAINTEXT_HASHES:
645             for k in ('plaintext_root_hash', 'plaintext_hash',
646                       ):
647                 assert k in self.uri_extension_data
648         uri_extension = uri.pack_extension(self.uri_extension_data)
649         ed = {}
650         for k,v in self.uri_extension_data.items():
651             if k.endswith("hash"):
652                 ed[k] = base32.b2a(v)
653             else:
654                 ed[k] = v
655         self.log("uri_extension_data is %s" % (ed,), level=log.NOISY, parent=lp)
656         self.uri_extension_hash = hashutil.uri_extension_hash(uri_extension)
657         dl = []
658         for shareid in self.landlords.keys():
659             dl.append(self.send_uri_extension(shareid, uri_extension))
660         return self._gather_responses(dl)
661
662     def send_uri_extension(self, shareid, uri_extension):
663         sh = self.landlords[shareid]
664         d = sh.put_uri_extension(uri_extension)
665         d.addErrback(self._remove_shareholder, shareid, "put_uri_extension")
666         return d
667
668     def close_all_shareholders(self):
669         self.log("closing shareholders", level=log.NOISY)
670         self.set_status("Closing Shareholders")
671         self.set_encode_and_push_progress(extra=0.9)
672         dl = []
673         for shareid in self.landlords:
674             d = self.landlords[shareid].close()
675             d.addErrback(self._remove_shareholder, shareid, "close")
676             dl.append(d)
677         return self._gather_responses(dl)
678
679     def done(self, res):
680         self.log("upload done", level=log.OPERATIONAL)
681         self.set_status("Done")
682         self.set_encode_and_push_progress(extra=1.0) # done
683         now = time.time()
684         h_and_c_elapsed = now - self._start_hashing_and_close_timestamp
685         self._times["hashes_and_close"] = h_and_c_elapsed
686         total_elapsed = now - self._start_total_timestamp
687         self._times["total_encode_and_push"] = total_elapsed
688
689         # update our sharemap
690         self._shares_placed = set(self.landlords.keys())
691         return (self.uri_extension_hash, self.required_shares,
692                 self.num_shares, self.file_size)
693
694     def err(self, f):
695         self.log("upload failed", failure=f, level=log.UNUSUAL)
696         self.set_status("Failed")
697         # we need to abort any remaining shareholders, so they'll delete the
698         # partial share, allowing someone else to upload it again.
699         self.log("aborting shareholders", level=log.UNUSUAL)
700         for shareid in list(self.landlords.keys()):
701             self.landlords[shareid].abort()
702         if f.check(defer.FirstError):
703             return f.value.subFailure
704         return f
705
706     def get_shares_placed(self):
707         # return a set of share numbers that were successfully placed.
708         return self._shares_placed
709
710     def get_times(self):
711         # return a dictionary of encode+push timings
712         return self._times
713
714     def get_uri_extension_data(self):
715         return self.uri_extension_data