]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/encode.py
cf308dd3ef861171546d5eb71f42bdc46729b891
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / encode.py
1 # -*- test-case-name: allmydata.test.test_encode -*-
2
3 import time
4 from zope.interface import implements
5 from twisted.internet import defer
6 from foolscap.api import fireEventually
7 from allmydata import uri
8 from allmydata.storage.server import si_b2a
9 from allmydata.hashtree import HashTree
10 from allmydata.util import mathutil, hashutil, base32, log, happinessutil
11 from allmydata.util.assertutil import _assert, precondition
12 from allmydata.codec import CRSEncoder
13 from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
14      IEncryptedUploadable, IUploadStatus, UploadUnhappinessError
15
16
17 """
18 The goal of the encoder is to turn the original file into a series of
19 'shares'. Each share is going to a 'shareholder' (nominally each shareholder
20 is a different host, but for small grids there may be overlap). The number
21 of shares is chosen to hit our reliability goals (more shares on more
22 machines means more reliability), and is limited by overhead (proportional to
23 numshares or log(numshares)) and the encoding technology in use (zfec permits
24 only 256 shares total). It is also constrained by the amount of data
25 we want to send to each host. For estimating purposes, think of 10 shares
26 out of which we need 3 to reconstruct the file.
27
28 The encoder starts by cutting the original file into segments. All segments
29 except the last are of equal size. The segment size is chosen to constrain
30 the memory footprint (which will probably vary between 1x and 4x segment
31 size) and to constrain the overhead (which will be proportional to
32 log(number of segments)).
33
34
35 Each segment (A,B,C) is read into memory, encrypted, and encoded into
36 blocks. The 'share' (say, share #1) that makes it out to a host is a
37 collection of these blocks (block A1, B1, C1), plus some hash-tree
38 information necessary to validate the data upon retrieval. Only one segment
39 is handled at a time: all blocks for segment A are delivered before any
40 work is begun on segment B.
41
42 As blocks are created, we retain the hash of each one. The list of block hashes
43 for a single share (say, hash(A1), hash(B1), hash(C1)) is used to form the base
44 of a Merkle hash tree for that share, called the block hash tree.
45
46 This hash tree has one terminal leaf per block. The complete block hash
47 tree is sent to the shareholder after all the data has been sent. At
48 retrieval time, the decoder will ask for specific pieces of this tree before
49 asking for blocks, whichever it needs to validate those blocks.
50
51 (Note: we don't really need to generate this whole block hash tree
52 ourselves. It would be sufficient to have the shareholder generate it and
53 just tell us the root. This gives us an extra level of validation on the
54 transfer, though, and it is relatively cheap to compute.)
55
56 Each of these block hash trees has a root hash. The collection of these
57 root hashes for all shares are collected into the 'share hash tree', which
58 has one terminal leaf per share. After sending the blocks and the complete
59 block hash tree to each shareholder, we send them the portion of the share
60 hash tree that is necessary to validate their share. The root of the share
61 hash tree is put into the URI.
62
63 """
64
65 class UploadAborted(Exception):
66     pass
67
68 KiB=1024
69 MiB=1024*KiB
70 GiB=1024*MiB
71 TiB=1024*GiB
72 PiB=1024*TiB
73
74 class Encoder(object):
75     implements(IEncoder)
76
77     def __init__(self, log_parent=None, upload_status=None):
78         object.__init__(self)
79         self.uri_extension_data = {}
80         self._codec = None
81         self._status = None
82         if upload_status:
83             self._status = IUploadStatus(upload_status)
84         precondition(log_parent is None or isinstance(log_parent, int),
85                      log_parent)
86         self._log_number = log.msg("creating Encoder %s" % self,
87                                    facility="tahoe.encoder", parent=log_parent)
88         self._aborted = False
89
90     def __repr__(self):
91         if hasattr(self, "_storage_index"):
92             return "<Encoder for %s>" % si_b2a(self._storage_index)[:5]
93         return "<Encoder for unknown storage index>"
94
95     def log(self, *args, **kwargs):
96         if "parent" not in kwargs:
97             kwargs["parent"] = self._log_number
98         if "facility" not in kwargs:
99             kwargs["facility"] = "tahoe.encoder"
100         return log.msg(*args, **kwargs)
101
102     def set_encrypted_uploadable(self, uploadable):
103         eu = self._uploadable = IEncryptedUploadable(uploadable)
104         d = eu.get_size()
105         def _got_size(size):
106             self.log(format="file size: %(size)d", size=size)
107             self.file_size = size
108         d.addCallback(_got_size)
109         d.addCallback(lambda res: eu.get_all_encoding_parameters())
110         d.addCallback(self._got_all_encoding_parameters)
111         d.addCallback(lambda res: eu.get_storage_index())
112         def _done(storage_index):
113             self._storage_index = storage_index
114             return self
115         d.addCallback(_done)
116         return d
117
118     def _got_all_encoding_parameters(self, params):
119         assert not self._codec
120         k, happy, n, segsize = params
121         self.required_shares = k
122         self.servers_of_happiness = happy
123         self.num_shares = n
124         self.segment_size = segsize
125         self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize))
126         self.log("now setting up codec")
127
128         assert self.segment_size % self.required_shares == 0
129
130         self.num_segments = mathutil.div_ceil(self.file_size,
131                                               self.segment_size)
132
133         self._codec = CRSEncoder()
134         self._codec.set_params(self.segment_size,
135                                self.required_shares, self.num_shares)
136
137         data = self.uri_extension_data
138         data['codec_name'] = self._codec.get_encoder_type()
139         data['codec_params'] = self._codec.get_serialized_params()
140
141         data['size'] = self.file_size
142         data['segment_size'] = self.segment_size
143         self.share_size = mathutil.div_ceil(self.file_size,
144                                             self.required_shares)
145         data['num_segments'] = self.num_segments
146         data['needed_shares'] = self.required_shares
147         data['total_shares'] = self.num_shares
148
149         # the "tail" is the last segment. This segment may or may not be
150         # shorter than all other segments. We use the "tail codec" to handle
151         # it. If the tail is short, we use a different codec instance. In
152         # addition, the tail codec must be fed data which has been padded out
153         # to the right size.
154         tail_size = self.file_size % self.segment_size
155         if not tail_size:
156             tail_size = self.segment_size
157
158         # the tail codec is responsible for encoding tail_size bytes
159         padded_tail_size = mathutil.next_multiple(tail_size,
160                                                   self.required_shares)
161         self._tail_codec = CRSEncoder()
162         self._tail_codec.set_params(padded_tail_size,
163                                     self.required_shares, self.num_shares)
164         data['tail_codec_params'] = self._tail_codec.get_serialized_params()
165
166     def _get_share_size(self):
167         share_size = mathutil.div_ceil(self.file_size, self.required_shares)
168         overhead = self._compute_overhead()
169         return share_size + overhead
170
171     def _compute_overhead(self):
172         return 0
173
174     def get_param(self, name):
175         assert self._codec
176
177         if name == "storage_index":
178             return self._storage_index
179         elif name == "share_counts":
180             return (self.required_shares, self.servers_of_happiness,
181                     self.num_shares)
182         elif name == "num_segments":
183             return self.num_segments
184         elif name == "segment_size":
185             return self.segment_size
186         elif name == "block_size":
187             return self._codec.get_block_size()
188         elif name == "share_size":
189             return self._get_share_size()
190         elif name == "serialized_params":
191             return self._codec.get_serialized_params()
192         else:
193             raise KeyError("unknown parameter name '%s'" % name)
194
195     def set_shareholders(self, landlords, servermap):
196         assert isinstance(landlords, dict)
197         for k in landlords:
198             assert IStorageBucketWriter.providedBy(landlords[k])
199         self.landlords = landlords.copy()
200         assert isinstance(servermap, dict)
201         for v in servermap.itervalues():
202             assert isinstance(v, set)
203         self.servermap = servermap.copy()
204
205     def start(self):
206         """ Returns a Deferred that will fire with the verify cap (an instance of
207         uri.CHKFileVerifierURI)."""
208         self.log("%s starting" % (self,))
209         #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
210         assert self._codec
211         self._crypttext_hasher = hashutil.crypttext_hasher()
212         self._crypttext_hashes = []
213         self.segment_num = 0
214         self.block_hashes = [[] for x in range(self.num_shares)]
215         # block_hashes[i] is a list that will be accumulated and then send
216         # to landlord[i]. This list contains a hash of each segment_share
217         # that we sent to that landlord.
218         self.share_root_hashes = [None] * self.num_shares
219
220         self._times = {
221             "cumulative_encoding": 0.0,
222             "cumulative_sending": 0.0,
223             "hashes_and_close": 0.0,
224             "total_encode_and_push": 0.0,
225             }
226         self._start_total_timestamp = time.time()
227
228         d = fireEventually()
229
230         d.addCallback(lambda res: self.start_all_shareholders())
231
232         for i in range(self.num_segments-1):
233             # note to self: this form doesn't work, because lambda only
234             # captures the slot, not the value
235             #d.addCallback(lambda res: self.do_segment(i))
236             # use this form instead:
237             d.addCallback(lambda res, i=i: self._encode_segment(i))
238             d.addCallback(self._send_segment, i)
239             d.addCallback(self._turn_barrier)
240         last_segnum = self.num_segments - 1
241         d.addCallback(lambda res: self._encode_tail_segment(last_segnum))
242         d.addCallback(self._send_segment, last_segnum)
243         d.addCallback(self._turn_barrier)
244
245         d.addCallback(lambda res: self.finish_hashing())
246
247         d.addCallback(lambda res:
248                       self.send_crypttext_hash_tree_to_all_shareholders())
249         d.addCallback(lambda res: self.send_all_block_hash_trees())
250         d.addCallback(lambda res: self.send_all_share_hash_trees())
251         d.addCallback(lambda res: self.send_uri_extension_to_all_shareholders())
252
253         d.addCallback(lambda res: self.close_all_shareholders())
254         d.addCallbacks(self.done, self.err)
255         return d
256
257     def set_status(self, status):
258         if self._status:
259             self._status.set_status(status)
260
261     def set_encode_and_push_progress(self, sent_segments=None, extra=0.0):
262         if self._status:
263             # we treat the final hash+close as an extra segment
264             if sent_segments is None:
265                 sent_segments = self.num_segments
266             progress = float(sent_segments + extra) / (self.num_segments + 1)
267             self._status.set_progress(2, progress)
268
269     def abort(self):
270         self.log("aborting upload", level=log.UNUSUAL)
271         assert self._codec, "don't call abort before start"
272         self._aborted = True
273         # the next segment read (in _gather_data inside _encode_segment) will
274         # raise UploadAborted(), which will bypass the rest of the upload
275         # chain. If we've sent the final segment's shares, it's too late to
276         # abort. TODO: allow abort any time up to close_all_shareholders.
277
278     def _turn_barrier(self, res):
279         # putting this method in a Deferred chain imposes a guaranteed
280         # reactor turn between the pre- and post- portions of that chain.
281         # This can be useful to limit memory consumption: since Deferreds do
282         # not do tail recursion, code which uses defer.succeed(result) for
283         # consistency will cause objects to live for longer than you might
284         # normally expect.
285
286         return fireEventually(res)
287
288
289     def start_all_shareholders(self):
290         self.log("starting shareholders", level=log.NOISY)
291         self.set_status("Starting shareholders")
292         dl = []
293         for shareid in list(self.landlords):
294             d = self.landlords[shareid].put_header()
295             d.addErrback(self._remove_shareholder, shareid, "start")
296             dl.append(d)
297         return self._gather_responses(dl)
298
299     def _encode_segment(self, segnum):
300         codec = self._codec
301         start = time.time()
302
303         # the ICodecEncoder API wants to receive a total of self.segment_size
304         # bytes on each encode() call, broken up into a number of
305         # identically-sized pieces. Due to the way the codec algorithm works,
306         # these pieces need to be the same size as the share which the codec
307         # will generate. Therefore we must feed it with input_piece_size that
308         # equals the output share size.
309         input_piece_size = codec.get_block_size()
310
311         # as a result, the number of input pieces per encode() call will be
312         # equal to the number of required shares with which the codec was
313         # constructed. You can think of the codec as chopping up a
314         # 'segment_size' of data into 'required_shares' shares (not doing any
315         # fancy math at all, just doing a split), then creating some number
316         # of additional shares which can be substituted if the primary ones
317         # are unavailable
318
319         # we read data from the source one segment at a time, and then chop
320         # it into 'input_piece_size' pieces before handing it to the codec
321
322         crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
323
324         # memory footprint: we only hold a tiny piece of the plaintext at any
325         # given time. We build up a segment's worth of cryptttext, then hand
326         # it to the encoder. Assuming 3-of-10 encoding (3.3x expansion) and
327         # 1MiB max_segment_size, we get a peak memory footprint of 4.3*1MiB =
328         # 4.3MiB. Lowering max_segment_size to, say, 100KiB would drop the
329         # footprint to 430KiB at the expense of more hash-tree overhead.
330
331         d = self._gather_data(self.required_shares, input_piece_size,
332                               crypttext_segment_hasher)
333         def _done_gathering(chunks):
334             for c in chunks:
335                 assert len(c) == input_piece_size
336             self._crypttext_hashes.append(crypttext_segment_hasher.digest())
337             # during this call, we hit 5*segsize memory
338             return codec.encode(chunks)
339         d.addCallback(_done_gathering)
340         def _done(res):
341             elapsed = time.time() - start
342             self._times["cumulative_encoding"] += elapsed
343             return res
344         d.addCallback(_done)
345         return d
346
347     def _encode_tail_segment(self, segnum):
348
349         start = time.time()
350         codec = self._tail_codec
351         input_piece_size = codec.get_block_size()
352
353         crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
354
355         d = self._gather_data(self.required_shares, input_piece_size,
356                               crypttext_segment_hasher, allow_short=True)
357         def _done_gathering(chunks):
358             for c in chunks:
359                 # a short trailing chunk will have been padded by
360                 # _gather_data
361                 assert len(c) == input_piece_size
362             self._crypttext_hashes.append(crypttext_segment_hasher.digest())
363             return codec.encode(chunks)
364         d.addCallback(_done_gathering)
365         def _done(res):
366             elapsed = time.time() - start
367             self._times["cumulative_encoding"] += elapsed
368             return res
369         d.addCallback(_done)
370         return d
371
372     def _gather_data(self, num_chunks, input_chunk_size,
373                      crypttext_segment_hasher,
374                      allow_short=False):
375         """Return a Deferred that will fire when the required number of
376         chunks have been read (and hashed and encrypted). The Deferred fires
377         with a list of chunks, each of size input_chunk_size."""
378
379         # I originally built this to allow read_encrypted() to behave badly:
380         # to let it return more or less data than you asked for. It would
381         # stash the leftovers until later, and then recurse until it got
382         # enough. I don't think that was actually useful.
383         #
384         # who defines read_encrypted?
385         #  offloaded.LocalCiphertextReader: real disk file: exact
386         #  upload.EncryptAnUploadable: Uploadable, but a wrapper that makes
387         #    it exact. The return value is a list of 50KiB chunks, to reduce
388         #    the memory footprint of the encryption process.
389         #  repairer.Repairer: immutable.filenode.CiphertextFileNode: exact
390         #
391         # This has been redefined to require read_encrypted() to behave like
392         # a local file: return exactly the amount requested unless it hits
393         # EOF.
394         #  -warner
395
396         if self._aborted:
397             raise UploadAborted()
398
399         read_size = num_chunks * input_chunk_size
400         d = self._uploadable.read_encrypted(read_size, hash_only=False)
401         def _got(data):
402             assert isinstance(data, (list,tuple))
403             if self._aborted:
404                 raise UploadAborted()
405             data = "".join(data)
406             precondition(len(data) <= read_size, len(data), read_size)
407             if not allow_short:
408                 precondition(len(data) == read_size, len(data), read_size)
409             crypttext_segment_hasher.update(data)
410             self._crypttext_hasher.update(data)
411             if allow_short and len(data) < read_size:
412                 # padding
413                 data += "\x00" * (read_size - len(data))
414             encrypted_pieces = [data[i:i+input_chunk_size]
415                                 for i in range(0, len(data), input_chunk_size)]
416             return encrypted_pieces
417         d.addCallback(_got)
418         return d
419
420     def _send_segment(self, (shares, shareids), segnum):
421         # To generate the URI, we must generate the roothash, so we must
422         # generate all shares, even if we aren't actually giving them to
423         # anybody. This means that the set of shares we create will be equal
424         # to or larger than the set of landlords. If we have any landlord who
425         # *doesn't* have a share, that's an error.
426         _assert(set(self.landlords.keys()).issubset(set(shareids)),
427                 shareids=shareids, landlords=self.landlords)
428         start = time.time()
429         dl = []
430         self.set_status("Sending segment %d of %d" % (segnum+1,
431                                                       self.num_segments))
432         self.set_encode_and_push_progress(segnum)
433         lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY)
434         for i in range(len(shares)):
435             block = shares[i]
436             shareid = shareids[i]
437             d = self.send_block(shareid, segnum, block, lognum)
438             dl.append(d)
439             block_hash = hashutil.block_hash(block)
440             #from allmydata.util import base32
441             #log.msg("creating block (shareid=%d, blocknum=%d) "
442             #        "len=%d %r .. %r: %s" %
443             #        (shareid, segnum, len(block),
444             #         block[:50], block[-50:], base32.b2a(block_hash)))
445             self.block_hashes[shareid].append(block_hash)
446
447         dl = self._gather_responses(dl)
448         def _logit(res):
449             self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
450                      (self,
451                       self.segment_size*(segnum+1),
452                       self.segment_size*self.num_segments,
453                       100 * (segnum+1) / self.num_segments,
454                       ),
455                      level=log.OPERATIONAL)
456             elapsed = time.time() - start
457             self._times["cumulative_sending"] += elapsed
458             return res
459         dl.addCallback(_logit)
460         return dl
461
462     def send_block(self, shareid, segment_num, block, lognum):
463         if shareid not in self.landlords:
464             return defer.succeed(None)
465         sh = self.landlords[shareid]
466         lognum2 = self.log("put_block to %s" % self.landlords[shareid],
467                            parent=lognum, level=log.NOISY)
468         d = sh.put_block(segment_num, block)
469         def _done(res):
470             self.log("put_block done", parent=lognum2, level=log.NOISY)
471             return res
472         d.addCallback(_done)
473         d.addErrback(self._remove_shareholder, shareid,
474                      "segnum=%d" % segment_num)
475         return d
476
477     def _remove_shareholder(self, why, shareid, where):
478         ln = self.log(format="error while sending %(method)s to shareholder=%(shnum)d",
479                       method=where, shnum=shareid,
480                       level=log.UNUSUAL, failure=why)
481         if shareid in self.landlords:
482             self.landlords[shareid].abort()
483             peerid = self.landlords[shareid].get_peerid()
484             assert peerid
485             del self.landlords[shareid]
486             self.servermap[shareid].remove(peerid)
487             if not self.servermap[shareid]:
488                 del self.servermap[shareid]
489         else:
490             # even more UNUSUAL
491             self.log("they weren't in our list of landlords", parent=ln,
492                      level=log.WEIRD, umid="TQGFRw")
493         happiness = happinessutil.servers_of_happiness(self.servermap)
494         if happiness < self.servers_of_happiness:
495             peerids = set(happinessutil.shares_by_server(self.servermap).keys())
496             msg = happinessutil.failure_message(len(peerids),
497                                                 self.required_shares,
498                                                 self.servers_of_happiness,
499                                                 happiness)
500             msg = "%s: %s" % (msg, why)
501             raise UploadUnhappinessError(msg)
502         self.log("but we can still continue with %s shares, we'll be happy "
503                  "with at least %s" % (happiness,
504                                        self.servers_of_happiness),
505                  parent=ln)
506
507     def _gather_responses(self, dl):
508         d = defer.DeferredList(dl, fireOnOneErrback=True)
509         def _eatUploadUnhappinessError(f):
510             # all exceptions that occur while talking to a peer are handled
511             # in _remove_shareholder. That might raise UploadUnhappinessError,
512             # which will cause the DeferredList to errback but which should
513             # otherwise be consumed. Allow non-UploadUnhappinessError exceptions
514             # to pass through as an unhandled errback. We use this in lieu of
515             # consumeErrors=True to allow coding errors to be logged.
516             f.trap(UploadUnhappinessError)
517             return None
518         for d0 in dl:
519             d0.addErrback(_eatUploadUnhappinessError)
520         return d
521
522     def finish_hashing(self):
523         self._start_hashing_and_close_timestamp = time.time()
524         self.set_status("Finishing hashes")
525         self.set_encode_and_push_progress(extra=0.0)
526         crypttext_hash = self._crypttext_hasher.digest()
527         self.uri_extension_data["crypttext_hash"] = crypttext_hash
528         self._uploadable.close()
529
530     def send_crypttext_hash_tree_to_all_shareholders(self):
531         self.log("sending crypttext hash tree", level=log.NOISY)
532         self.set_status("Sending Crypttext Hash Tree")
533         self.set_encode_and_push_progress(extra=0.3)
534         t = HashTree(self._crypttext_hashes)
535         all_hashes = list(t)
536         self.uri_extension_data["crypttext_root_hash"] = t[0]
537         dl = []
538         for shareid in list(self.landlords):
539             dl.append(self.send_crypttext_hash_tree(shareid, all_hashes))
540         return self._gather_responses(dl)
541
542     def send_crypttext_hash_tree(self, shareid, all_hashes):
543         if shareid not in self.landlords:
544             return defer.succeed(None)
545         sh = self.landlords[shareid]
546         d = sh.put_crypttext_hashes(all_hashes)
547         d.addErrback(self._remove_shareholder, shareid, "put_crypttext_hashes")
548         return d
549
550     def send_all_block_hash_trees(self):
551         self.log("sending block hash trees", level=log.NOISY)
552         self.set_status("Sending Subshare Hash Trees")
553         self.set_encode_and_push_progress(extra=0.4)
554         dl = []
555         for shareid,hashes in enumerate(self.block_hashes):
556             # hashes is a list of the hashes of all blocks that were sent
557             # to shareholder[shareid].
558             dl.append(self.send_one_block_hash_tree(shareid, hashes))
559         return self._gather_responses(dl)
560
561     def send_one_block_hash_tree(self, shareid, block_hashes):
562         t = HashTree(block_hashes)
563         all_hashes = list(t)
564         # all_hashes[0] is the root hash, == hash(ah[1]+ah[2])
565         # all_hashes[1] is the left child, == hash(ah[3]+ah[4])
566         # all_hashes[n] == hash(all_hashes[2*n+1] + all_hashes[2*n+2])
567         self.share_root_hashes[shareid] = t[0]
568         if shareid not in self.landlords:
569             return defer.succeed(None)
570         sh = self.landlords[shareid]
571         d = sh.put_block_hashes(all_hashes)
572         d.addErrback(self._remove_shareholder, shareid, "put_block_hashes")
573         return d
574
575     def send_all_share_hash_trees(self):
576         # Each bucket gets a set of share hash tree nodes that are needed to validate their
577         # share. This includes the share hash itself, but does not include the top-level hash
578         # root (which is stored securely in the URI instead).
579         self.log("sending all share hash trees", level=log.NOISY)
580         self.set_status("Sending Share Hash Trees")
581         self.set_encode_and_push_progress(extra=0.6)
582         dl = []
583         for h in self.share_root_hashes:
584             assert h
585         # create the share hash tree
586         t = HashTree(self.share_root_hashes)
587         # the root of this hash tree goes into our URI
588         self.uri_extension_data['share_root_hash'] = t[0]
589         # now send just the necessary pieces out to each shareholder
590         for i in range(self.num_shares):
591             # the HashTree is given a list of leaves: 0,1,2,3..n .
592             # These become nodes A+0,A+1,A+2.. of the tree, where A=n-1
593             needed_hash_indices = t.needed_hashes(i, include_leaf=True)
594             hashes = [(hi, t[hi]) for hi in needed_hash_indices]
595             dl.append(self.send_one_share_hash_tree(i, hashes))
596         return self._gather_responses(dl)
597
598     def send_one_share_hash_tree(self, shareid, needed_hashes):
599         if shareid not in self.landlords:
600             return defer.succeed(None)
601         sh = self.landlords[shareid]
602         d = sh.put_share_hashes(needed_hashes)
603         d.addErrback(self._remove_shareholder, shareid, "put_share_hashes")
604         return d
605
606     def send_uri_extension_to_all_shareholders(self):
607         lp = self.log("sending uri_extension", level=log.NOISY)
608         self.set_status("Sending URI Extensions")
609         self.set_encode_and_push_progress(extra=0.8)
610         for k in ('crypttext_root_hash', 'crypttext_hash',
611                   ):
612             assert k in self.uri_extension_data
613         uri_extension = uri.pack_extension(self.uri_extension_data)
614         ed = {}
615         for k,v in self.uri_extension_data.items():
616             if k.endswith("hash"):
617                 ed[k] = base32.b2a(v)
618             else:
619                 ed[k] = v
620         self.log("uri_extension_data is %s" % (ed,), level=log.NOISY, parent=lp)
621         self.uri_extension_hash = hashutil.uri_extension_hash(uri_extension)
622         dl = []
623         for shareid in list(self.landlords):
624             dl.append(self.send_uri_extension(shareid, uri_extension))
625         return self._gather_responses(dl)
626
627     def send_uri_extension(self, shareid, uri_extension):
628         sh = self.landlords[shareid]
629         d = sh.put_uri_extension(uri_extension)
630         d.addErrback(self._remove_shareholder, shareid, "put_uri_extension")
631         return d
632
633     def close_all_shareholders(self):
634         self.log("closing shareholders", level=log.NOISY)
635         self.set_status("Closing Shareholders")
636         self.set_encode_and_push_progress(extra=0.9)
637         dl = []
638         for shareid in list(self.landlords):
639             d = self.landlords[shareid].close()
640             d.addErrback(self._remove_shareholder, shareid, "close")
641             dl.append(d)
642         return self._gather_responses(dl)
643
644     def done(self, res):
645         self.log("upload done", level=log.OPERATIONAL)
646         self.set_status("Finished")
647         self.set_encode_and_push_progress(extra=1.0) # done
648         now = time.time()
649         h_and_c_elapsed = now - self._start_hashing_and_close_timestamp
650         self._times["hashes_and_close"] = h_and_c_elapsed
651         total_elapsed = now - self._start_total_timestamp
652         self._times["total_encode_and_push"] = total_elapsed
653
654         # update our sharemap
655         self._shares_placed = set(self.landlords.keys())
656         return uri.CHKFileVerifierURI(self._storage_index, self.uri_extension_hash,
657                                       self.required_shares, self.num_shares, self.file_size)
658
659     def err(self, f):
660         self.log("upload failed", failure=f, level=log.UNUSUAL)
661         self.set_status("Failed")
662         # we need to abort any remaining shareholders, so they'll delete the
663         # partial share, allowing someone else to upload it again.
664         self.log("aborting shareholders", level=log.UNUSUAL)
665         for shareid in list(self.landlords):
666             self.landlords[shareid].abort()
667         if f.check(defer.FirstError):
668             return f.value.subFailure
669         return f
670
671     def get_shares_placed(self):
672         # return a set of share numbers that were successfully placed.
673         return self._shares_placed
674
675     def get_times(self):
676         # return a dictionary of encode+push timings
677         return self._times
678
679     def get_uri_extension_data(self):
680         return self.uri_extension_data
681     def get_uri_extension_hash(self):
682         return self.uri_extension_hash