]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/encode.py
310e446e4e338e407c43c9a4ef7061f01f4d2697
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / encode.py
1 # -*- test-case-name: allmydata.test.test_encode -*-
2
3 import time
4 from zope.interface import implements
5 from twisted.internet import defer
6 from foolscap import eventual
7 from allmydata import uri
8 from allmydata.storage.server import si_b2a
9 from allmydata.hashtree import HashTree
10 from allmydata.util import mathutil, hashutil, base32, log
11 from allmydata.util.assertutil import _assert, precondition
12 from allmydata.codec import CRSEncoder
13 from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
14      IEncryptedUploadable, IUploadStatus, NotEnoughSharesError
15
16 """
17 The goal of the encoder is to turn the original file into a series of
18 'shares'. Each share is going to a 'shareholder' (nominally each shareholder
19 is a different host, but for small grids there may be overlap). The number
20 of shares is chosen to hit our reliability goals (more shares on more
21 machines means more reliability), and is limited by overhead (proportional to
22 numshares or log(numshares)) and the encoding technology in use (zfec permits
23 only 256 shares total). It is also constrained by the amount of data
24 we want to send to each host. For estimating purposes, think of 10 shares
25 out of which we need 3 to reconstruct the file.
26
27 The encoder starts by cutting the original file into segments. All segments
28 except the last are of equal size. The segment size is chosen to constrain
29 the memory footprint (which will probably vary between 1x and 4x segment
30 size) and to constrain the overhead (which will be proportional to
31 log(number of segments)).
32
33
34 Each segment (A,B,C) is read into memory, encrypted, and encoded into
35 blocks. The 'share' (say, share #1) that makes it out to a host is a
36 collection of these blocks (block A1, B1, C1), plus some hash-tree
37 information necessary to validate the data upon retrieval. Only one segment
38 is handled at a time: all blocks for segment A are delivered before any
39 work is begun on segment B.
40
41 As blocks are created, we retain the hash of each one. The list of block hashes
42 for a single share (say, hash(A1), hash(B1), hash(C1)) is used to form the base
43 of a Merkle hash tree for that share, called the block hash tree.
44
45 This hash tree has one terminal leaf per block. The complete block hash
46 tree is sent to the shareholder after all the data has been sent. At
47 retrieval time, the decoder will ask for specific pieces of this tree before
48 asking for blocks, whichever it needs to validate those blocks.
49
50 (Note: we don't really need to generate this whole block hash tree
51 ourselves. It would be sufficient to have the shareholder generate it and
52 just tell us the root. This gives us an extra level of validation on the
53 transfer, though, and it is relatively cheap to compute.)
54
55 Each of these block hash trees has a root hash. The collection of these
56 root hashes for all shares are collected into the 'share hash tree', which
57 has one terminal leaf per share. After sending the blocks and the complete
58 block hash tree to each shareholder, we send them the portion of the share
59 hash tree that is necessary to validate their share. The root of the share
60 hash tree is put into the URI.
61
62 """
63
64 class UploadAborted(Exception):
65     pass
66
67 KiB=1024
68 MiB=1024*KiB
69 GiB=1024*MiB
70 TiB=1024*GiB
71 PiB=1024*TiB
72
73 class Encoder(object):
74     implements(IEncoder)
75
76     def __init__(self, log_parent=None, upload_status=None):
77         object.__init__(self)
78         self.uri_extension_data = {}
79         self._codec = None
80         self._status = None
81         if upload_status:
82             self._status = IUploadStatus(upload_status)
83         precondition(log_parent is None or isinstance(log_parent, int),
84                      log_parent)
85         self._log_number = log.msg("creating Encoder %s" % self,
86                                    facility="tahoe.encoder", parent=log_parent)
87         self._aborted = False
88
89     def __repr__(self):
90         if hasattr(self, "_storage_index"):
91             return "<Encoder for %s>" % si_b2a(self._storage_index)[:5]
92         return "<Encoder for unknown storage index>"
93
94     def log(self, *args, **kwargs):
95         if "parent" not in kwargs:
96             kwargs["parent"] = self._log_number
97         if "facility" not in kwargs:
98             kwargs["facility"] = "tahoe.encoder"
99         return log.msg(*args, **kwargs)
100
101     def set_encrypted_uploadable(self, uploadable):
102         eu = self._uploadable = IEncryptedUploadable(uploadable)
103         d = eu.get_size()
104         def _got_size(size):
105             self.log(format="file size: %(size)d", size=size)
106             self.file_size = size
107         d.addCallback(_got_size)
108         d.addCallback(lambda res: eu.get_all_encoding_parameters())
109         d.addCallback(self._got_all_encoding_parameters)
110         d.addCallback(lambda res: eu.get_storage_index())
111         def _done(storage_index):
112             self._storage_index = storage_index
113             return self
114         d.addCallback(_done)
115         return d
116
117     def _got_all_encoding_parameters(self, params):
118         assert not self._codec
119         k, happy, n, segsize = params
120         self.required_shares = k
121         self.shares_of_happiness = happy
122         self.num_shares = n
123         self.segment_size = segsize
124         self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize))
125         self.log("now setting up codec")
126
127         assert self.segment_size % self.required_shares == 0
128
129         self.num_segments = mathutil.div_ceil(self.file_size,
130                                               self.segment_size)
131
132         self._codec = CRSEncoder()
133         self._codec.set_params(self.segment_size,
134                                self.required_shares, self.num_shares)
135
136         data = self.uri_extension_data
137         data['codec_name'] = self._codec.get_encoder_type()
138         data['codec_params'] = self._codec.get_serialized_params()
139
140         data['size'] = self.file_size
141         data['segment_size'] = self.segment_size
142         self.share_size = mathutil.div_ceil(self.file_size,
143                                             self.required_shares)
144         data['num_segments'] = self.num_segments
145         data['needed_shares'] = self.required_shares
146         data['total_shares'] = self.num_shares
147
148         # the "tail" is the last segment. This segment may or may not be
149         # shorter than all other segments. We use the "tail codec" to handle
150         # it. If the tail is short, we use a different codec instance. In
151         # addition, the tail codec must be fed data which has been padded out
152         # to the right size.
153         tail_size = self.file_size % self.segment_size
154         if not tail_size:
155             tail_size = self.segment_size
156
157         # the tail codec is responsible for encoding tail_size bytes
158         padded_tail_size = mathutil.next_multiple(tail_size,
159                                                   self.required_shares)
160         self._tail_codec = CRSEncoder()
161         self._tail_codec.set_params(padded_tail_size,
162                                     self.required_shares, self.num_shares)
163         data['tail_codec_params'] = self._tail_codec.get_serialized_params()
164
165     def _get_share_size(self):
166         share_size = mathutil.div_ceil(self.file_size, self.required_shares)
167         overhead = self._compute_overhead()
168         return share_size + overhead
169
170     def _compute_overhead(self):
171         return 0
172
173     def get_param(self, name):
174         assert self._codec
175
176         if name == "storage_index":
177             return self._storage_index
178         elif name == "share_counts":
179             return (self.required_shares, self.shares_of_happiness,
180                     self.num_shares)
181         elif name == "num_segments":
182             return self.num_segments
183         elif name == "segment_size":
184             return self.segment_size
185         elif name == "block_size":
186             return self._codec.get_block_size()
187         elif name == "share_size":
188             return self._get_share_size()
189         elif name == "serialized_params":
190             return self._codec.get_serialized_params()
191         else:
192             raise KeyError("unknown parameter name '%s'" % name)
193
194     def set_shareholders(self, landlords):
195         assert isinstance(landlords, dict)
196         for k in landlords:
197             assert IStorageBucketWriter.providedBy(landlords[k])
198         self.landlords = landlords.copy()
199
200     def start(self):
201         """ Returns a Deferred that will fire with the verify cap (an instance of
202         uri.CHKFileVerifierURI)."""
203         self.log("%s starting" % (self,))
204         #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
205         assert self._codec
206         self._crypttext_hasher = hashutil.crypttext_hasher()
207         self._crypttext_hashes = []
208         self.segment_num = 0
209         self.block_hashes = [[] for x in range(self.num_shares)]
210         # block_hashes[i] is a list that will be accumulated and then send
211         # to landlord[i]. This list contains a hash of each segment_share
212         # that we sent to that landlord.
213         self.share_root_hashes = [None] * self.num_shares
214
215         self._times = {
216             "cumulative_encoding": 0.0,
217             "cumulative_sending": 0.0,
218             "hashes_and_close": 0.0,
219             "total_encode_and_push": 0.0,
220             }
221         self._start_total_timestamp = time.time()
222
223         d = eventual.fireEventually()
224
225         d.addCallback(lambda res: self.start_all_shareholders())
226
227         for i in range(self.num_segments-1):
228             # note to self: this form doesn't work, because lambda only
229             # captures the slot, not the value
230             #d.addCallback(lambda res: self.do_segment(i))
231             # use this form instead:
232             d.addCallback(lambda res, i=i: self._encode_segment(i))
233             d.addCallback(self._send_segment, i)
234             d.addCallback(self._turn_barrier)
235         last_segnum = self.num_segments - 1
236         d.addCallback(lambda res: self._encode_tail_segment(last_segnum))
237         d.addCallback(self._send_segment, last_segnum)
238         d.addCallback(self._turn_barrier)
239
240         d.addCallback(lambda res: self.finish_hashing())
241
242         d.addCallback(lambda res:
243                       self.send_crypttext_hash_tree_to_all_shareholders())
244         d.addCallback(lambda res: self.send_all_block_hash_trees())
245         d.addCallback(lambda res: self.send_all_share_hash_trees())
246         d.addCallback(lambda res: self.send_uri_extension_to_all_shareholders())
247
248         d.addCallback(lambda res: self.close_all_shareholders())
249         d.addCallbacks(self.done, self.err)
250         return d
251
252     def set_status(self, status):
253         if self._status:
254             self._status.set_status(status)
255
256     def set_encode_and_push_progress(self, sent_segments=None, extra=0.0):
257         if self._status:
258             # we treat the final hash+close as an extra segment
259             if sent_segments is None:
260                 sent_segments = self.num_segments
261             progress = float(sent_segments + extra) / (self.num_segments + 1)
262             self._status.set_progress(2, progress)
263
264     def abort(self):
265         self.log("aborting upload", level=log.UNUSUAL)
266         assert self._codec, "don't call abort before start"
267         self._aborted = True
268         # the next segment read (in _gather_data inside _encode_segment) will
269         # raise UploadAborted(), which will bypass the rest of the upload
270         # chain. If we've sent the final segment's shares, it's too late to
271         # abort. TODO: allow abort any time up to close_all_shareholders.
272
273     def _turn_barrier(self, res):
274         # putting this method in a Deferred chain imposes a guaranteed
275         # reactor turn between the pre- and post- portions of that chain.
276         # This can be useful to limit memory consumption: since Deferreds do
277         # not do tail recursion, code which uses defer.succeed(result) for
278         # consistency will cause objects to live for longer than you might
279         # normally expect.
280
281         return eventual.fireEventually(res)
282
283
284     def start_all_shareholders(self):
285         self.log("starting shareholders", level=log.NOISY)
286         self.set_status("Starting shareholders")
287         dl = []
288         for shareid in self.landlords:
289             d = self.landlords[shareid].put_header()
290             d.addErrback(self._remove_shareholder, shareid, "start")
291             dl.append(d)
292         return self._gather_responses(dl)
293
294     def _encode_segment(self, segnum):
295         codec = self._codec
296         start = time.time()
297
298         # the ICodecEncoder API wants to receive a total of self.segment_size
299         # bytes on each encode() call, broken up into a number of
300         # identically-sized pieces. Due to the way the codec algorithm works,
301         # these pieces need to be the same size as the share which the codec
302         # will generate. Therefore we must feed it with input_piece_size that
303         # equals the output share size.
304         input_piece_size = codec.get_block_size()
305
306         # as a result, the number of input pieces per encode() call will be
307         # equal to the number of required shares with which the codec was
308         # constructed. You can think of the codec as chopping up a
309         # 'segment_size' of data into 'required_shares' shares (not doing any
310         # fancy math at all, just doing a split), then creating some number
311         # of additional shares which can be substituted if the primary ones
312         # are unavailable
313
314         crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
315
316         # memory footprint: we only hold a tiny piece of the plaintext at any
317         # given time. We build up a segment's worth of cryptttext, then hand
318         # it to the encoder. Assuming 3-of-10 encoding (3.3x expansion) and
319         # 1MiB max_segment_size, we get a peak memory footprint of 4.3*1MiB =
320         # 4.3MiB. Lowering max_segment_size to, say, 100KiB would drop the
321         # footprint to 430KiB at the expense of more hash-tree overhead.
322
323         d = self._gather_data(self.required_shares, input_piece_size,
324                               crypttext_segment_hasher)
325         def _done_gathering(chunks):
326             for c in chunks:
327                 assert len(c) == input_piece_size
328             self._crypttext_hashes.append(crypttext_segment_hasher.digest())
329             # during this call, we hit 5*segsize memory
330             return codec.encode(chunks)
331         d.addCallback(_done_gathering)
332         def _done(res):
333             elapsed = time.time() - start
334             self._times["cumulative_encoding"] += elapsed
335             return res
336         d.addCallback(_done)
337         return d
338
339     def _encode_tail_segment(self, segnum):
340
341         start = time.time()
342         codec = self._tail_codec
343         input_piece_size = codec.get_block_size()
344
345         crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
346
347         d = self._gather_data(self.required_shares, input_piece_size,
348                               crypttext_segment_hasher,
349                               allow_short=True)
350         def _done_gathering(chunks):
351             for c in chunks:
352                 # a short trailing chunk will have been padded by
353                 # _gather_data
354                 assert len(c) == input_piece_size
355             self._crypttext_hashes.append(crypttext_segment_hasher.digest())
356             return codec.encode(chunks)
357         d.addCallback(_done_gathering)
358         def _done(res):
359             elapsed = time.time() - start
360             self._times["cumulative_encoding"] += elapsed
361             return res
362         d.addCallback(_done)
363         return d
364
365     def _gather_data(self, num_chunks, input_chunk_size,
366                      crypttext_segment_hasher,
367                      allow_short=False,
368                      previous_chunks=[]):
369         """Return a Deferred that will fire when the required number of
370         chunks have been read (and hashed and encrypted). The Deferred fires
371         with the combination of any 'previous_chunks' and the new chunks
372         which were gathered."""
373
374         if self._aborted:
375             raise UploadAborted()
376
377         if not num_chunks:
378             return defer.succeed(previous_chunks)
379
380         d = self._uploadable.read_encrypted(input_chunk_size, False)
381         def _got(data):
382             if self._aborted:
383                 raise UploadAborted()
384             encrypted_pieces = []
385             length = 0
386             while data:
387                 encrypted_piece = data.pop(0)
388                 length += len(encrypted_piece)
389                 crypttext_segment_hasher.update(encrypted_piece)
390                 self._crypttext_hasher.update(encrypted_piece)
391                 encrypted_pieces.append(encrypted_piece)
392
393             precondition(length <= input_chunk_size,
394                          "length=%d > input_chunk_size=%d" %
395                          (length, input_chunk_size))
396             if allow_short:
397                 if length < input_chunk_size:
398                     # padding
399                     pad_size = input_chunk_size - length
400                     encrypted_pieces.append('\x00' * pad_size)
401             else:
402                 # non-tail segments should be the full segment size
403                 if length != input_chunk_size:
404                     log.msg("non-tail segment should be full segment size: %d!=%d"
405                             % (length, input_chunk_size),
406                             level=log.BAD, umid="jNk5Yw")
407                 precondition(length == input_chunk_size,
408                              "length=%d != input_chunk_size=%d" %
409                              (length, input_chunk_size))
410
411             encrypted_piece = "".join(encrypted_pieces)
412             return previous_chunks + [encrypted_piece]
413
414         d.addCallback(_got)
415         d.addCallback(lambda chunks:
416                       self._gather_data(num_chunks-1, input_chunk_size,
417                                         crypttext_segment_hasher,
418                                         allow_short, chunks))
419         return d
420
421     def _send_segment(self, (shares, shareids), segnum):
422         # To generate the URI, we must generate the roothash, so we must
423         # generate all shares, even if we aren't actually giving them to
424         # anybody. This means that the set of shares we create will be equal
425         # to or larger than the set of landlords. If we have any landlord who
426         # *doesn't* have a share, that's an error.
427         _assert(set(self.landlords.keys()).issubset(set(shareids)),
428                 shareids=shareids, landlords=self.landlords)
429         start = time.time()
430         dl = []
431         self.set_status("Sending segment %d of %d" % (segnum+1,
432                                                       self.num_segments))
433         self.set_encode_and_push_progress(segnum)
434         lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY)
435         for i in range(len(shares)):
436             block = shares[i]
437             shareid = shareids[i]
438             d = self.send_block(shareid, segnum, block, lognum)
439             dl.append(d)
440             block_hash = hashutil.block_hash(block)
441             #from allmydata.util import base32
442             #log.msg("creating block (shareid=%d, blocknum=%d) "
443             #        "len=%d %r .. %r: %s" %
444             #        (shareid, segnum, len(block),
445             #         block[:50], block[-50:], base32.b2a(block_hash)))
446             self.block_hashes[shareid].append(block_hash)
447
448         dl = self._gather_responses(dl)
449         def _logit(res):
450             self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
451                      (self,
452                       self.segment_size*(segnum+1),
453                       self.segment_size*self.num_segments,
454                       100 * (segnum+1) / self.num_segments,
455                       ),
456                      level=log.OPERATIONAL)
457             elapsed = time.time() - start
458             self._times["cumulative_sending"] += elapsed
459             return res
460         dl.addCallback(_logit)
461         return dl
462
463     def send_block(self, shareid, segment_num, block, lognum):
464         if shareid not in self.landlords:
465             return defer.succeed(None)
466         sh = self.landlords[shareid]
467         lognum2 = self.log("put_block to %s" % self.landlords[shareid],
468                            parent=lognum, level=log.NOISY)
469         d = sh.put_block(segment_num, block)
470         def _done(res):
471             self.log("put_block done", parent=lognum2, level=log.NOISY)
472             return res
473         d.addCallback(_done)
474         d.addErrback(self._remove_shareholder, shareid,
475                      "segnum=%d" % segment_num)
476         return d
477
478     def _remove_shareholder(self, why, shareid, where):
479         ln = self.log(format="error while sending %(method)s to shareholder=%(shnum)d",
480                       method=where, shnum=shareid,
481                       level=log.UNUSUAL, failure=why)
482         if shareid in self.landlords:
483             self.landlords[shareid].abort()
484             del self.landlords[shareid]
485         else:
486             # even more UNUSUAL
487             self.log("they weren't in our list of landlords", parent=ln,
488                      level=log.WEIRD, umid="TQGFRw")
489         if len(self.landlords) < self.shares_of_happiness:
490             msg = "lost too many shareholders during upload: %s" % why
491             raise NotEnoughSharesError(msg, len(self.landlords),
492                                        self.shares_of_happiness)
493         self.log("but we can still continue with %s shares, we'll be happy "
494                  "with at least %s" % (len(self.landlords),
495                                        self.shares_of_happiness),
496                  parent=ln)
497
498     def _gather_responses(self, dl):
499         d = defer.DeferredList(dl, fireOnOneErrback=True)
500         def _eatNotEnoughSharesError(f):
501             # all exceptions that occur while talking to a peer are handled
502             # in _remove_shareholder. That might raise NotEnoughSharesError,
503             # which will cause the DeferredList to errback but which should
504             # otherwise be consumed. Allow non-NotEnoughSharesError exceptions
505             # to pass through as an unhandled errback. We use this in lieu of
506             # consumeErrors=True to allow coding errors to be logged.
507             f.trap(NotEnoughSharesError)
508             return None
509         for d0 in dl:
510             d0.addErrback(_eatNotEnoughSharesError)
511         return d
512
513     def finish_hashing(self):
514         self._start_hashing_and_close_timestamp = time.time()
515         self.set_status("Finishing hashes")
516         self.set_encode_and_push_progress(extra=0.0)
517         crypttext_hash = self._crypttext_hasher.digest()
518         self.uri_extension_data["crypttext_hash"] = crypttext_hash
519         self._uploadable.close()
520
521     def send_crypttext_hash_tree_to_all_shareholders(self):
522         self.log("sending crypttext hash tree", level=log.NOISY)
523         self.set_status("Sending Crypttext Hash Tree")
524         self.set_encode_and_push_progress(extra=0.3)
525         t = HashTree(self._crypttext_hashes)
526         all_hashes = list(t)
527         self.uri_extension_data["crypttext_root_hash"] = t[0]
528         dl = []
529         for shareid in self.landlords.keys():
530             dl.append(self.send_crypttext_hash_tree(shareid, all_hashes))
531         return self._gather_responses(dl)
532
533     def send_crypttext_hash_tree(self, shareid, all_hashes):
534         if shareid not in self.landlords:
535             return defer.succeed(None)
536         sh = self.landlords[shareid]
537         d = sh.put_crypttext_hashes(all_hashes)
538         d.addErrback(self._remove_shareholder, shareid, "put_crypttext_hashes")
539         return d
540
541     def send_all_block_hash_trees(self):
542         self.log("sending block hash trees", level=log.NOISY)
543         self.set_status("Sending Subshare Hash Trees")
544         self.set_encode_and_push_progress(extra=0.4)
545         dl = []
546         for shareid,hashes in enumerate(self.block_hashes):
547             # hashes is a list of the hashes of all blocks that were sent
548             # to shareholder[shareid].
549             dl.append(self.send_one_block_hash_tree(shareid, hashes))
550         return self._gather_responses(dl)
551
552     def send_one_block_hash_tree(self, shareid, block_hashes):
553         t = HashTree(block_hashes)
554         all_hashes = list(t)
555         # all_hashes[0] is the root hash, == hash(ah[1]+ah[2])
556         # all_hashes[1] is the left child, == hash(ah[3]+ah[4])
557         # all_hashes[n] == hash(all_hashes[2*n+1] + all_hashes[2*n+2])
558         self.share_root_hashes[shareid] = t[0]
559         if shareid not in self.landlords:
560             return defer.succeed(None)
561         sh = self.landlords[shareid]
562         d = sh.put_block_hashes(all_hashes)
563         d.addErrback(self._remove_shareholder, shareid, "put_block_hashes")
564         return d
565
566     def send_all_share_hash_trees(self):
567         # Each bucket gets a set of share hash tree nodes that are needed to validate their
568         # share. This includes the share hash itself, but does not include the top-level hash
569         # root (which is stored securely in the URI instead).
570         self.log("sending all share hash trees", level=log.NOISY)
571         self.set_status("Sending Share Hash Trees")
572         self.set_encode_and_push_progress(extra=0.6)
573         dl = []
574         for h in self.share_root_hashes:
575             assert h
576         # create the share hash tree
577         t = HashTree(self.share_root_hashes)
578         # the root of this hash tree goes into our URI
579         self.uri_extension_data['share_root_hash'] = t[0]
580         # now send just the necessary pieces out to each shareholder
581         for i in range(self.num_shares):
582             # the HashTree is given a list of leaves: 0,1,2,3..n .
583             # These become nodes A+0,A+1,A+2.. of the tree, where A=n-1
584             needed_hash_indices = t.needed_hashes(i, include_leaf=True)
585             hashes = [(hi, t[hi]) for hi in needed_hash_indices]
586             dl.append(self.send_one_share_hash_tree(i, hashes))
587         return self._gather_responses(dl)
588
589     def send_one_share_hash_tree(self, shareid, needed_hashes):
590         if shareid not in self.landlords:
591             return defer.succeed(None)
592         sh = self.landlords[shareid]
593         d = sh.put_share_hashes(needed_hashes)
594         d.addErrback(self._remove_shareholder, shareid, "put_share_hashes")
595         return d
596
597     def send_uri_extension_to_all_shareholders(self):
598         lp = self.log("sending uri_extension", level=log.NOISY)
599         self.set_status("Sending URI Extensions")
600         self.set_encode_and_push_progress(extra=0.8)
601         for k in ('crypttext_root_hash', 'crypttext_hash',
602                   ):
603             assert k in self.uri_extension_data
604         uri_extension = uri.pack_extension(self.uri_extension_data)
605         ed = {}
606         for k,v in self.uri_extension_data.items():
607             if k.endswith("hash"):
608                 ed[k] = base32.b2a(v)
609             else:
610                 ed[k] = v
611         self.log("uri_extension_data is %s" % (ed,), level=log.NOISY, parent=lp)
612         self.uri_extension_hash = hashutil.uri_extension_hash(uri_extension)
613         dl = []
614         for shareid in self.landlords.keys():
615             dl.append(self.send_uri_extension(shareid, uri_extension))
616         return self._gather_responses(dl)
617
618     def send_uri_extension(self, shareid, uri_extension):
619         sh = self.landlords[shareid]
620         d = sh.put_uri_extension(uri_extension)
621         d.addErrback(self._remove_shareholder, shareid, "put_uri_extension")
622         return d
623
624     def close_all_shareholders(self):
625         self.log("closing shareholders", level=log.NOISY)
626         self.set_status("Closing Shareholders")
627         self.set_encode_and_push_progress(extra=0.9)
628         dl = []
629         for shareid in self.landlords:
630             d = self.landlords[shareid].close()
631             d.addErrback(self._remove_shareholder, shareid, "close")
632             dl.append(d)
633         return self._gather_responses(dl)
634
635     def done(self, res):
636         self.log("upload done", level=log.OPERATIONAL)
637         self.set_status("Done")
638         self.set_encode_and_push_progress(extra=1.0) # done
639         now = time.time()
640         h_and_c_elapsed = now - self._start_hashing_and_close_timestamp
641         self._times["hashes_and_close"] = h_and_c_elapsed
642         total_elapsed = now - self._start_total_timestamp
643         self._times["total_encode_and_push"] = total_elapsed
644
645         # update our sharemap
646         self._shares_placed = set(self.landlords.keys())
647         return uri.CHKFileVerifierURI(self._storage_index, self.uri_extension_hash,
648                                       self.required_shares, self.num_shares, self.file_size)
649
650     def err(self, f):
651         self.log("upload failed", failure=f, level=log.UNUSUAL)
652         self.set_status("Failed")
653         # we need to abort any remaining shareholders, so they'll delete the
654         # partial share, allowing someone else to upload it again.
655         self.log("aborting shareholders", level=log.UNUSUAL)
656         for shareid in list(self.landlords.keys()):
657             self.landlords[shareid].abort()
658         if f.check(defer.FirstError):
659             return f.value.subFailure
660         return f
661
662     def get_shares_placed(self):
663         # return a set of share numbers that were successfully placed.
664         return self._shares_placed
665
666     def get_times(self):
667         # return a dictionary of encode+push timings
668         return self._times
669
670     def get_uri_extension_data(self):
671         return self.uri_extension_data