]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/encode.py
Flesh out "tahoe magic-folder status" command
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / encode.py
1 # -*- test-case-name: allmydata.test.test_encode -*-
2
3 import time
4 from zope.interface import implements
5 from twisted.internet import defer
6 from foolscap.api import fireEventually
7 from allmydata import uri
8 from allmydata.storage.server import si_b2a
9 from allmydata.hashtree import HashTree
10 from allmydata.util import mathutil, hashutil, base32, log, happinessutil
11 from allmydata.util.assertutil import _assert, precondition
12 from allmydata.codec import CRSEncoder
13 from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
14      IEncryptedUploadable, IUploadStatus, UploadUnhappinessError
15
16
17 """
18 The goal of the encoder is to turn the original file into a series of
19 'shares'. Each share is going to a 'shareholder' (nominally each shareholder
20 is a different host, but for small grids there may be overlap). The number
21 of shares is chosen to hit our reliability goals (more shares on more
22 machines means more reliability), and is limited by overhead (proportional to
23 numshares or log(numshares)) and the encoding technology in use (zfec permits
24 only 256 shares total). It is also constrained by the amount of data
25 we want to send to each host. For estimating purposes, think of 10 shares
26 out of which we need 3 to reconstruct the file.
27
28 The encoder starts by cutting the original file into segments. All segments
29 except the last are of equal size. The segment size is chosen to constrain
30 the memory footprint (which will probably vary between 1x and 4x segment
31 size) and to constrain the overhead (which will be proportional to
32 log(number of segments)).
33
34
35 Each segment (A,B,C) is read into memory, encrypted, and encoded into
36 blocks. The 'share' (say, share #1) that makes it out to a host is a
37 collection of these blocks (block A1, B1, C1), plus some hash-tree
38 information necessary to validate the data upon retrieval. Only one segment
39 is handled at a time: all blocks for segment A are delivered before any
40 work is begun on segment B.
41
42 As blocks are created, we retain the hash of each one. The list of block hashes
43 for a single share (say, hash(A1), hash(B1), hash(C1)) is used to form the base
44 of a Merkle hash tree for that share, called the block hash tree.
45
46 This hash tree has one terminal leaf per block. The complete block hash
47 tree is sent to the shareholder after all the data has been sent. At
48 retrieval time, the decoder will ask for specific pieces of this tree before
49 asking for blocks, whichever it needs to validate those blocks.
50
51 (Note: we don't really need to generate this whole block hash tree
52 ourselves. It would be sufficient to have the shareholder generate it and
53 just tell us the root. This gives us an extra level of validation on the
54 transfer, though, and it is relatively cheap to compute.)
55
56 Each of these block hash trees has a root hash. The collection of these
57 root hashes for all shares are collected into the 'share hash tree', which
58 has one terminal leaf per share. After sending the blocks and the complete
59 block hash tree to each shareholder, we send them the portion of the share
60 hash tree that is necessary to validate their share. The root of the share
61 hash tree is put into the URI.
62
63 """
64
65 class UploadAborted(Exception):
66     pass
67
68 KiB=1024
69 MiB=1024*KiB
70 GiB=1024*MiB
71 TiB=1024*GiB
72 PiB=1024*TiB
73
74 class Encoder(object):
75     implements(IEncoder)
76
77     def __init__(self, log_parent=None, upload_status=None, progress=None):
78         object.__init__(self)
79         self.uri_extension_data = {}
80         self._codec = None
81         self._status = None
82         if upload_status:
83             self._status = IUploadStatus(upload_status)
84         precondition(log_parent is None or isinstance(log_parent, int),
85                      log_parent)
86         self._log_number = log.msg("creating Encoder %s" % self,
87                                    facility="tahoe.encoder", parent=log_parent)
88         self._aborted = False
89         self._progress = progress
90
91     def __repr__(self):
92         if hasattr(self, "_storage_index"):
93             return "<Encoder for %s>" % si_b2a(self._storage_index)[:5]
94         return "<Encoder for unknown storage index>"
95
96     def log(self, *args, **kwargs):
97         if "parent" not in kwargs:
98             kwargs["parent"] = self._log_number
99         if "facility" not in kwargs:
100             kwargs["facility"] = "tahoe.encoder"
101         return log.msg(*args, **kwargs)
102
103     def set_encrypted_uploadable(self, uploadable):
104         eu = self._uploadable = IEncryptedUploadable(uploadable)
105         d = eu.get_size()
106         def _got_size(size):
107             self.log(format="file size: %(size)d", size=size)
108             self.file_size = size
109             if self._progress:
110                 self._progress.set_progress_total(self.file_size)
111         d.addCallback(_got_size)
112         d.addCallback(lambda res: eu.get_all_encoding_parameters())
113         d.addCallback(self._got_all_encoding_parameters)
114         d.addCallback(lambda res: eu.get_storage_index())
115         def _done(storage_index):
116             self._storage_index = storage_index
117             return self
118         d.addCallback(_done)
119         return d
120
121     def _got_all_encoding_parameters(self, params):
122         assert not self._codec
123         k, happy, n, segsize = params
124         self.required_shares = k
125         self.servers_of_happiness = happy
126         self.num_shares = n
127         self.segment_size = segsize
128         self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize))
129         self.log("now setting up codec")
130
131         assert self.segment_size % self.required_shares == 0
132
133         self.num_segments = mathutil.div_ceil(self.file_size,
134                                               self.segment_size)
135
136         self._codec = CRSEncoder()
137         self._codec.set_params(self.segment_size,
138                                self.required_shares, self.num_shares)
139
140         data = self.uri_extension_data
141         data['codec_name'] = self._codec.get_encoder_type()
142         data['codec_params'] = self._codec.get_serialized_params()
143
144         data['size'] = self.file_size
145         data['segment_size'] = self.segment_size
146         self.share_size = mathutil.div_ceil(self.file_size,
147                                             self.required_shares)
148         data['num_segments'] = self.num_segments
149         data['needed_shares'] = self.required_shares
150         data['total_shares'] = self.num_shares
151
152         # the "tail" is the last segment. This segment may or may not be
153         # shorter than all other segments. We use the "tail codec" to handle
154         # it. If the tail is short, we use a different codec instance. In
155         # addition, the tail codec must be fed data which has been padded out
156         # to the right size.
157         tail_size = self.file_size % self.segment_size
158         if not tail_size:
159             tail_size = self.segment_size
160
161         # the tail codec is responsible for encoding tail_size bytes
162         padded_tail_size = mathutil.next_multiple(tail_size,
163                                                   self.required_shares)
164         self._tail_codec = CRSEncoder()
165         self._tail_codec.set_params(padded_tail_size,
166                                     self.required_shares, self.num_shares)
167         data['tail_codec_params'] = self._tail_codec.get_serialized_params()
168
169     def _get_share_size(self):
170         share_size = mathutil.div_ceil(self.file_size, self.required_shares)
171         overhead = self._compute_overhead()
172         return share_size + overhead
173
174     def _compute_overhead(self):
175         return 0
176
177     def get_param(self, name):
178         assert self._codec
179
180         if name == "storage_index":
181             return self._storage_index
182         elif name == "share_counts":
183             return (self.required_shares, self.servers_of_happiness,
184                     self.num_shares)
185         elif name == "num_segments":
186             return self.num_segments
187         elif name == "segment_size":
188             return self.segment_size
189         elif name == "block_size":
190             return self._codec.get_block_size()
191         elif name == "share_size":
192             return self._get_share_size()
193         elif name == "serialized_params":
194             return self._codec.get_serialized_params()
195         else:
196             raise KeyError("unknown parameter name '%s'" % name)
197
198     def set_shareholders(self, landlords, servermap):
199         assert isinstance(landlords, dict)
200         for k in landlords:
201             assert IStorageBucketWriter.providedBy(landlords[k])
202         self.landlords = landlords.copy()
203         assert isinstance(servermap, dict)
204         for v in servermap.itervalues():
205             assert isinstance(v, set)
206         self.servermap = servermap.copy()
207
208     def start(self):
209         """ Returns a Deferred that will fire with the verify cap (an instance of
210         uri.CHKFileVerifierURI)."""
211         self.log("%s starting" % (self,))
212         #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
213         assert self._codec
214         self._crypttext_hasher = hashutil.crypttext_hasher()
215         self._crypttext_hashes = []
216         self.segment_num = 0
217         self.block_hashes = [[] for x in range(self.num_shares)]
218         # block_hashes[i] is a list that will be accumulated and then send
219         # to landlord[i]. This list contains a hash of each segment_share
220         # that we sent to that landlord.
221         self.share_root_hashes = [None] * self.num_shares
222
223         self._times = {
224             "cumulative_encoding": 0.0,
225             "cumulative_sending": 0.0,
226             "hashes_and_close": 0.0,
227             "total_encode_and_push": 0.0,
228             }
229         self._start_total_timestamp = time.time()
230
231         d = fireEventually()
232
233         d.addCallback(lambda res: self.start_all_shareholders())
234
235         for i in range(self.num_segments-1):
236             # note to self: this form doesn't work, because lambda only
237             # captures the slot, not the value
238             #d.addCallback(lambda res: self.do_segment(i))
239             # use this form instead:
240             d.addCallback(lambda res, i=i: self._encode_segment(i))
241             d.addCallback(self._send_segment, i)
242             d.addCallback(self._turn_barrier)
243         last_segnum = self.num_segments - 1
244         d.addCallback(lambda res: self._encode_tail_segment(last_segnum))
245         d.addCallback(self._send_segment, last_segnum)
246         d.addCallback(self._turn_barrier)
247
248         d.addCallback(lambda res: self.finish_hashing())
249
250         d.addCallback(lambda res:
251                       self.send_crypttext_hash_tree_to_all_shareholders())
252         d.addCallback(lambda res: self.send_all_block_hash_trees())
253         d.addCallback(lambda res: self.send_all_share_hash_trees())
254         d.addCallback(lambda res: self.send_uri_extension_to_all_shareholders())
255
256         d.addCallback(lambda res: self.close_all_shareholders())
257         d.addCallbacks(self.done, self.err)
258         return d
259
260     def set_status(self, status):
261         if self._status:
262             self._status.set_status(status)
263
264     def set_encode_and_push_progress(self, sent_segments=None, extra=0.0):
265         if self._status:
266             # we treat the final hash+close as an extra segment
267             if sent_segments is None:
268                 sent_segments = self.num_segments
269             progress = float(sent_segments + extra) / (self.num_segments + 1)
270             self._status.set_progress(2, progress)
271
272     def abort(self):
273         self.log("aborting upload", level=log.UNUSUAL)
274         assert self._codec, "don't call abort before start"
275         self._aborted = True
276         # the next segment read (in _gather_data inside _encode_segment) will
277         # raise UploadAborted(), which will bypass the rest of the upload
278         # chain. If we've sent the final segment's shares, it's too late to
279         # abort. TODO: allow abort any time up to close_all_shareholders.
280
281     def _turn_barrier(self, res):
282         # putting this method in a Deferred chain imposes a guaranteed
283         # reactor turn between the pre- and post- portions of that chain.
284         # This can be useful to limit memory consumption: since Deferreds do
285         # not do tail recursion, code which uses defer.succeed(result) for
286         # consistency will cause objects to live for longer than you might
287         # normally expect.
288
289         return fireEventually(res)
290
291
292     def start_all_shareholders(self):
293         self.log("starting shareholders", level=log.NOISY)
294         self.set_status("Starting shareholders")
295         dl = []
296         for shareid in list(self.landlords):
297             d = self.landlords[shareid].put_header()
298             d.addErrback(self._remove_shareholder, shareid, "start")
299             dl.append(d)
300         return self._gather_responses(dl)
301
302     def _encode_segment(self, segnum):
303         codec = self._codec
304         start = time.time()
305
306         # the ICodecEncoder API wants to receive a total of self.segment_size
307         # bytes on each encode() call, broken up into a number of
308         # identically-sized pieces. Due to the way the codec algorithm works,
309         # these pieces need to be the same size as the share which the codec
310         # will generate. Therefore we must feed it with input_piece_size that
311         # equals the output share size.
312         input_piece_size = codec.get_block_size()
313
314         # as a result, the number of input pieces per encode() call will be
315         # equal to the number of required shares with which the codec was
316         # constructed. You can think of the codec as chopping up a
317         # 'segment_size' of data into 'required_shares' shares (not doing any
318         # fancy math at all, just doing a split), then creating some number
319         # of additional shares which can be substituted if the primary ones
320         # are unavailable
321
322         # we read data from the source one segment at a time, and then chop
323         # it into 'input_piece_size' pieces before handing it to the codec
324
325         crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
326
327         # memory footprint: we only hold a tiny piece of the plaintext at any
328         # given time. We build up a segment's worth of cryptttext, then hand
329         # it to the encoder. Assuming 3-of-10 encoding (3.3x expansion) and
330         # 1MiB max_segment_size, we get a peak memory footprint of 4.3*1MiB =
331         # 4.3MiB. Lowering max_segment_size to, say, 100KiB would drop the
332         # footprint to 430KiB at the expense of more hash-tree overhead.
333
334         d = self._gather_data(self.required_shares, input_piece_size,
335                               crypttext_segment_hasher)
336         def _done_gathering(chunks):
337             for c in chunks:
338                 assert len(c) == input_piece_size
339             self._crypttext_hashes.append(crypttext_segment_hasher.digest())
340             # during this call, we hit 5*segsize memory
341             return codec.encode(chunks)
342         d.addCallback(_done_gathering)
343         def _done(res):
344             elapsed = time.time() - start
345             self._times["cumulative_encoding"] += elapsed
346             return res
347         d.addCallback(_done)
348         return d
349
350     def _encode_tail_segment(self, segnum):
351
352         start = time.time()
353         codec = self._tail_codec
354         input_piece_size = codec.get_block_size()
355
356         crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
357
358         d = self._gather_data(self.required_shares, input_piece_size,
359                               crypttext_segment_hasher, allow_short=True)
360         def _done_gathering(chunks):
361             for c in chunks:
362                 # a short trailing chunk will have been padded by
363                 # _gather_data
364                 assert len(c) == input_piece_size
365             self._crypttext_hashes.append(crypttext_segment_hasher.digest())
366             return codec.encode(chunks)
367         d.addCallback(_done_gathering)
368         def _done(res):
369             elapsed = time.time() - start
370             self._times["cumulative_encoding"] += elapsed
371             return res
372         d.addCallback(_done)
373         return d
374
375     def _gather_data(self, num_chunks, input_chunk_size,
376                      crypttext_segment_hasher,
377                      allow_short=False):
378         """Return a Deferred that will fire when the required number of
379         chunks have been read (and hashed and encrypted). The Deferred fires
380         with a list of chunks, each of size input_chunk_size."""
381
382         # I originally built this to allow read_encrypted() to behave badly:
383         # to let it return more or less data than you asked for. It would
384         # stash the leftovers until later, and then recurse until it got
385         # enough. I don't think that was actually useful.
386         #
387         # who defines read_encrypted?
388         #  offloaded.LocalCiphertextReader: real disk file: exact
389         #  upload.EncryptAnUploadable: Uploadable, but a wrapper that makes
390         #    it exact. The return value is a list of 50KiB chunks, to reduce
391         #    the memory footprint of the encryption process.
392         #  repairer.Repairer: immutable.filenode.CiphertextFileNode: exact
393         #
394         # This has been redefined to require read_encrypted() to behave like
395         # a local file: return exactly the amount requested unless it hits
396         # EOF.
397         #  -warner
398
399         if self._aborted:
400             raise UploadAborted()
401
402         read_size = num_chunks * input_chunk_size
403         d = self._uploadable.read_encrypted(read_size, hash_only=False)
404         def _got(data):
405             assert isinstance(data, (list,tuple))
406             if self._aborted:
407                 raise UploadAborted()
408             data = "".join(data)
409             precondition(len(data) <= read_size, len(data), read_size)
410             if not allow_short:
411                 precondition(len(data) == read_size, len(data), read_size)
412             crypttext_segment_hasher.update(data)
413             self._crypttext_hasher.update(data)
414             if allow_short and len(data) < read_size:
415                 # padding
416                 data += "\x00" * (read_size - len(data))
417             encrypted_pieces = [data[i:i+input_chunk_size]
418                                 for i in range(0, len(data), input_chunk_size)]
419             return encrypted_pieces
420         d.addCallback(_got)
421         return d
422
423     def _send_segment(self, (shares, shareids), segnum):
424         # To generate the URI, we must generate the roothash, so we must
425         # generate all shares, even if we aren't actually giving them to
426         # anybody. This means that the set of shares we create will be equal
427         # to or larger than the set of landlords. If we have any landlord who
428         # *doesn't* have a share, that's an error.
429         _assert(set(self.landlords.keys()).issubset(set(shareids)),
430                 shareids=shareids, landlords=self.landlords)
431         start = time.time()
432         dl = []
433         self.set_status("Sending segment %d of %d" % (segnum+1,
434                                                       self.num_segments))
435         self.set_encode_and_push_progress(segnum)
436         lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY)
437         for i in range(len(shares)):
438             block = shares[i]
439             shareid = shareids[i]
440             d = self.send_block(shareid, segnum, block, lognum)
441             dl.append(d)
442
443             block_hash = hashutil.block_hash(block)
444             #from allmydata.util import base32
445             #log.msg("creating block (shareid=%d, blocknum=%d) "
446             #        "len=%d %r .. %r: %s" %
447             #        (shareid, segnum, len(block),
448             #         block[:50], block[-50:], base32.b2a(block_hash)))
449             self.block_hashes[shareid].append(block_hash)
450
451         dl = self._gather_responses(dl)
452
453         def do_progress(ign):
454             done = self.segment_size * (segnum + 1)
455             if self._progress:
456                 self._progress.set_progress(done)
457             return ign
458         dl.addCallback(do_progress)
459
460         def _logit(res):
461             self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
462                      (self,
463                       self.segment_size*(segnum+1),
464                       self.segment_size*self.num_segments,
465                       100 * (segnum+1) / self.num_segments,
466                       ),
467                      level=log.OPERATIONAL)
468             elapsed = time.time() - start
469             self._times["cumulative_sending"] += elapsed
470             return res
471         dl.addCallback(_logit)
472         return dl
473
474     def send_block(self, shareid, segment_num, block, lognum):
475         if shareid not in self.landlords:
476             return defer.succeed(None)
477         sh = self.landlords[shareid]
478         lognum2 = self.log("put_block to %s" % self.landlords[shareid],
479                            parent=lognum, level=log.NOISY)
480         d = sh.put_block(segment_num, block)
481         def _done(res):
482             self.log("put_block done", parent=lognum2, level=log.NOISY)
483             return res
484         d.addCallback(_done)
485         d.addErrback(self._remove_shareholder, shareid,
486                      "segnum=%d" % segment_num)
487         return d
488
489     def _remove_shareholder(self, why, shareid, where):
490         ln = self.log(format="error while sending %(method)s to shareholder=%(shnum)d",
491                       method=where, shnum=shareid,
492                       level=log.UNUSUAL, failure=why)
493         if shareid in self.landlords:
494             self.landlords[shareid].abort()
495             peerid = self.landlords[shareid].get_peerid()
496             assert peerid
497             del self.landlords[shareid]
498             self.servermap[shareid].remove(peerid)
499             if not self.servermap[shareid]:
500                 del self.servermap[shareid]
501         else:
502             # even more UNUSUAL
503             self.log("they weren't in our list of landlords", parent=ln,
504                      level=log.WEIRD, umid="TQGFRw")
505         happiness = happinessutil.servers_of_happiness(self.servermap)
506         if happiness < self.servers_of_happiness:
507             peerids = set(happinessutil.shares_by_server(self.servermap).keys())
508             msg = happinessutil.failure_message(len(peerids),
509                                                 self.required_shares,
510                                                 self.servers_of_happiness,
511                                                 happiness)
512             msg = "%s: %s" % (msg, why)
513             raise UploadUnhappinessError(msg)
514         self.log("but we can still continue with %s shares, we'll be happy "
515                  "with at least %s" % (happiness,
516                                        self.servers_of_happiness),
517                  parent=ln)
518
519     def _gather_responses(self, dl):
520         d = defer.DeferredList(dl, fireOnOneErrback=True)
521         def _eatUploadUnhappinessError(f):
522             # all exceptions that occur while talking to a peer are handled
523             # in _remove_shareholder. That might raise UploadUnhappinessError,
524             # which will cause the DeferredList to errback but which should
525             # otherwise be consumed. Allow non-UploadUnhappinessError exceptions
526             # to pass through as an unhandled errback. We use this in lieu of
527             # consumeErrors=True to allow coding errors to be logged.
528             f.trap(UploadUnhappinessError)
529             return None
530         for d0 in dl:
531             d0.addErrback(_eatUploadUnhappinessError)
532         return d
533
534     def finish_hashing(self):
535         self._start_hashing_and_close_timestamp = time.time()
536         self.set_status("Finishing hashes")
537         self.set_encode_and_push_progress(extra=0.0)
538         crypttext_hash = self._crypttext_hasher.digest()
539         self.uri_extension_data["crypttext_hash"] = crypttext_hash
540         self._uploadable.close()
541
542     def send_crypttext_hash_tree_to_all_shareholders(self):
543         self.log("sending crypttext hash tree", level=log.NOISY)
544         self.set_status("Sending Crypttext Hash Tree")
545         self.set_encode_and_push_progress(extra=0.3)
546         t = HashTree(self._crypttext_hashes)
547         all_hashes = list(t)
548         self.uri_extension_data["crypttext_root_hash"] = t[0]
549         dl = []
550         for shareid in list(self.landlords):
551             dl.append(self.send_crypttext_hash_tree(shareid, all_hashes))
552         return self._gather_responses(dl)
553
554     def send_crypttext_hash_tree(self, shareid, all_hashes):
555         if shareid not in self.landlords:
556             return defer.succeed(None)
557         sh = self.landlords[shareid]
558         d = sh.put_crypttext_hashes(all_hashes)
559         d.addErrback(self._remove_shareholder, shareid, "put_crypttext_hashes")
560         return d
561
562     def send_all_block_hash_trees(self):
563         self.log("sending block hash trees", level=log.NOISY)
564         self.set_status("Sending Subshare Hash Trees")
565         self.set_encode_and_push_progress(extra=0.4)
566         dl = []
567         for shareid,hashes in enumerate(self.block_hashes):
568             # hashes is a list of the hashes of all blocks that were sent
569             # to shareholder[shareid].
570             dl.append(self.send_one_block_hash_tree(shareid, hashes))
571         return self._gather_responses(dl)
572
573     def send_one_block_hash_tree(self, shareid, block_hashes):
574         t = HashTree(block_hashes)
575         all_hashes = list(t)
576         # all_hashes[0] is the root hash, == hash(ah[1]+ah[2])
577         # all_hashes[1] is the left child, == hash(ah[3]+ah[4])
578         # all_hashes[n] == hash(all_hashes[2*n+1] + all_hashes[2*n+2])
579         self.share_root_hashes[shareid] = t[0]
580         if shareid not in self.landlords:
581             return defer.succeed(None)
582         sh = self.landlords[shareid]
583         d = sh.put_block_hashes(all_hashes)
584         d.addErrback(self._remove_shareholder, shareid, "put_block_hashes")
585         return d
586
587     def send_all_share_hash_trees(self):
588         # Each bucket gets a set of share hash tree nodes that are needed to validate their
589         # share. This includes the share hash itself, but does not include the top-level hash
590         # root (which is stored securely in the URI instead).
591         self.log("sending all share hash trees", level=log.NOISY)
592         self.set_status("Sending Share Hash Trees")
593         self.set_encode_and_push_progress(extra=0.6)
594         dl = []
595         for h in self.share_root_hashes:
596             assert h
597         # create the share hash tree
598         t = HashTree(self.share_root_hashes)
599         # the root of this hash tree goes into our URI
600         self.uri_extension_data['share_root_hash'] = t[0]
601         # now send just the necessary pieces out to each shareholder
602         for i in range(self.num_shares):
603             # the HashTree is given a list of leaves: 0,1,2,3..n .
604             # These become nodes A+0,A+1,A+2.. of the tree, where A=n-1
605             needed_hash_indices = t.needed_hashes(i, include_leaf=True)
606             hashes = [(hi, t[hi]) for hi in needed_hash_indices]
607             dl.append(self.send_one_share_hash_tree(i, hashes))
608         return self._gather_responses(dl)
609
610     def send_one_share_hash_tree(self, shareid, needed_hashes):
611         if shareid not in self.landlords:
612             return defer.succeed(None)
613         sh = self.landlords[shareid]
614         d = sh.put_share_hashes(needed_hashes)
615         d.addErrback(self._remove_shareholder, shareid, "put_share_hashes")
616         return d
617
618     def send_uri_extension_to_all_shareholders(self):
619         lp = self.log("sending uri_extension", level=log.NOISY)
620         self.set_status("Sending URI Extensions")
621         self.set_encode_and_push_progress(extra=0.8)
622         for k in ('crypttext_root_hash', 'crypttext_hash',
623                   ):
624             assert k in self.uri_extension_data
625         uri_extension = uri.pack_extension(self.uri_extension_data)
626         ed = {}
627         for k,v in self.uri_extension_data.items():
628             if k.endswith("hash"):
629                 ed[k] = base32.b2a(v)
630             else:
631                 ed[k] = v
632         self.log("uri_extension_data is %s" % (ed,), level=log.NOISY, parent=lp)
633         self.uri_extension_hash = hashutil.uri_extension_hash(uri_extension)
634         dl = []
635         for shareid in list(self.landlords):
636             dl.append(self.send_uri_extension(shareid, uri_extension))
637         return self._gather_responses(dl)
638
639     def send_uri_extension(self, shareid, uri_extension):
640         sh = self.landlords[shareid]
641         d = sh.put_uri_extension(uri_extension)
642         d.addErrback(self._remove_shareholder, shareid, "put_uri_extension")
643         return d
644
645     def close_all_shareholders(self):
646         self.log("closing shareholders", level=log.NOISY)
647         self.set_status("Closing Shareholders")
648         self.set_encode_and_push_progress(extra=0.9)
649         dl = []
650         for shareid in list(self.landlords):
651             d = self.landlords[shareid].close()
652             d.addErrback(self._remove_shareholder, shareid, "close")
653             dl.append(d)
654         return self._gather_responses(dl)
655
656     def done(self, res):
657         self.log("upload done", level=log.OPERATIONAL)
658         self.set_status("Finished")
659         self.set_encode_and_push_progress(extra=1.0) # done
660         now = time.time()
661         h_and_c_elapsed = now - self._start_hashing_and_close_timestamp
662         self._times["hashes_and_close"] = h_and_c_elapsed
663         total_elapsed = now - self._start_total_timestamp
664         self._times["total_encode_and_push"] = total_elapsed
665
666         # update our sharemap
667         self._shares_placed = set(self.landlords.keys())
668         return uri.CHKFileVerifierURI(self._storage_index, self.uri_extension_hash,
669                                       self.required_shares, self.num_shares, self.file_size)
670
671     def err(self, f):
672         self.log("upload failed", failure=f, level=log.UNUSUAL)
673         self.set_status("Failed")
674         # we need to abort any remaining shareholders, so they'll delete the
675         # partial share, allowing someone else to upload it again.
676         self.log("aborting shareholders", level=log.UNUSUAL)
677         for shareid in list(self.landlords):
678             self.landlords[shareid].abort()
679         if f.check(defer.FirstError):
680             return f.value.subFailure
681         return f
682
683     def get_shares_placed(self):
684         # return a set of share numbers that were successfully placed.
685         return self._shares_placed
686
687     def get_times(self):
688         # return a dictionary of encode+push timings
689         return self._times
690
691     def get_uri_extension_data(self):
692         return self.uri_extension_data
693     def get_uri_extension_hash(self):
694         return self.uri_extension_hash