]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/encode.py
f6be4b023554ddc3e7977c9f61852ca2c3d603fa
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / encode.py
1 # -*- test-case-name: allmydata.test.test_encode -*-
2
3 import time
4 from zope.interface import implements
5 from twisted.internet import defer
6 from foolscap.api import fireEventually
7 from allmydata import uri
8 from allmydata.storage.server import si_b2a
9 from allmydata.hashtree import HashTree
10 from allmydata.util import mathutil, hashutil, base32, log
11 from allmydata.util.assertutil import _assert, precondition
12 from allmydata.codec import CRSEncoder
13 from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
14      IEncryptedUploadable, IUploadStatus, UploadUnhappinessError
15
16
17 """
18 The goal of the encoder is to turn the original file into a series of
19 'shares'. Each share is going to a 'shareholder' (nominally each shareholder
20 is a different host, but for small grids there may be overlap). The number
21 of shares is chosen to hit our reliability goals (more shares on more
22 machines means more reliability), and is limited by overhead (proportional to
23 numshares or log(numshares)) and the encoding technology in use (zfec permits
24 only 256 shares total). It is also constrained by the amount of data
25 we want to send to each host. For estimating purposes, think of 10 shares
26 out of which we need 3 to reconstruct the file.
27
28 The encoder starts by cutting the original file into segments. All segments
29 except the last are of equal size. The segment size is chosen to constrain
30 the memory footprint (which will probably vary between 1x and 4x segment
31 size) and to constrain the overhead (which will be proportional to
32 log(number of segments)).
33
34
35 Each segment (A,B,C) is read into memory, encrypted, and encoded into
36 blocks. The 'share' (say, share #1) that makes it out to a host is a
37 collection of these blocks (block A1, B1, C1), plus some hash-tree
38 information necessary to validate the data upon retrieval. Only one segment
39 is handled at a time: all blocks for segment A are delivered before any
40 work is begun on segment B.
41
42 As blocks are created, we retain the hash of each one. The list of block hashes
43 for a single share (say, hash(A1), hash(B1), hash(C1)) is used to form the base
44 of a Merkle hash tree for that share, called the block hash tree.
45
46 This hash tree has one terminal leaf per block. The complete block hash
47 tree is sent to the shareholder after all the data has been sent. At
48 retrieval time, the decoder will ask for specific pieces of this tree before
49 asking for blocks, whichever it needs to validate those blocks.
50
51 (Note: we don't really need to generate this whole block hash tree
52 ourselves. It would be sufficient to have the shareholder generate it and
53 just tell us the root. This gives us an extra level of validation on the
54 transfer, though, and it is relatively cheap to compute.)
55
56 Each of these block hash trees has a root hash. The collection of these
57 root hashes for all shares are collected into the 'share hash tree', which
58 has one terminal leaf per share. After sending the blocks and the complete
59 block hash tree to each shareholder, we send them the portion of the share
60 hash tree that is necessary to validate their share. The root of the share
61 hash tree is put into the URI.
62
63 """
64
65 class UploadAborted(Exception):
66     pass
67
68 KiB=1024
69 MiB=1024*KiB
70 GiB=1024*MiB
71 TiB=1024*GiB
72 PiB=1024*TiB
73
74 class Encoder(object):
75     implements(IEncoder)
76
77     def __init__(self, log_parent=None, upload_status=None):
78         object.__init__(self)
79         self.uri_extension_data = {}
80         self._codec = None
81         self._status = None
82         if upload_status:
83             self._status = IUploadStatus(upload_status)
84         precondition(log_parent is None or isinstance(log_parent, int),
85                      log_parent)
86         self._log_number = log.msg("creating Encoder %s" % self,
87                                    facility="tahoe.encoder", parent=log_parent)
88         self._aborted = False
89
90     def __repr__(self):
91         if hasattr(self, "_storage_index"):
92             return "<Encoder for %s>" % si_b2a(self._storage_index)[:5]
93         return "<Encoder for unknown storage index>"
94
95     def log(self, *args, **kwargs):
96         if "parent" not in kwargs:
97             kwargs["parent"] = self._log_number
98         if "facility" not in kwargs:
99             kwargs["facility"] = "tahoe.encoder"
100         return log.msg(*args, **kwargs)
101
102     def set_encrypted_uploadable(self, uploadable):
103         eu = self._uploadable = IEncryptedUploadable(uploadable)
104         d = eu.get_size()
105         def _got_size(size):
106             self.log(format="file size: %(size)d", size=size)
107             self.file_size = size
108         d.addCallback(_got_size)
109         d.addCallback(lambda res: eu.get_all_encoding_parameters())
110         d.addCallback(self._got_all_encoding_parameters)
111         d.addCallback(lambda res: eu.get_storage_index())
112         def _done(storage_index):
113             self._storage_index = storage_index
114             return self
115         d.addCallback(_done)
116         return d
117
118     def _got_all_encoding_parameters(self, params):
119         assert not self._codec
120         k, happy, n, segsize = params
121         self.required_shares = k
122         self.servers_of_happiness = happy
123         self.num_shares = n
124         self.segment_size = segsize
125         self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize))
126         self.log("now setting up codec")
127
128         assert self.segment_size % self.required_shares == 0
129
130         self.num_segments = mathutil.div_ceil(self.file_size,
131                                               self.segment_size)
132
133         self._codec = CRSEncoder()
134         self._codec.set_params(self.segment_size,
135                                self.required_shares, self.num_shares)
136
137         data = self.uri_extension_data
138         data['codec_name'] = self._codec.get_encoder_type()
139         data['codec_params'] = self._codec.get_serialized_params()
140
141         data['size'] = self.file_size
142         data['segment_size'] = self.segment_size
143         self.share_size = mathutil.div_ceil(self.file_size,
144                                             self.required_shares)
145         data['num_segments'] = self.num_segments
146         data['needed_shares'] = self.required_shares
147         data['total_shares'] = self.num_shares
148
149         # the "tail" is the last segment. This segment may or may not be
150         # shorter than all other segments. We use the "tail codec" to handle
151         # it. If the tail is short, we use a different codec instance. In
152         # addition, the tail codec must be fed data which has been padded out
153         # to the right size.
154         tail_size = self.file_size % self.segment_size
155         if not tail_size:
156             tail_size = self.segment_size
157
158         # the tail codec is responsible for encoding tail_size bytes
159         padded_tail_size = mathutil.next_multiple(tail_size,
160                                                   self.required_shares)
161         self._tail_codec = CRSEncoder()
162         self._tail_codec.set_params(padded_tail_size,
163                                     self.required_shares, self.num_shares)
164         data['tail_codec_params'] = self._tail_codec.get_serialized_params()
165
166     def _get_share_size(self):
167         share_size = mathutil.div_ceil(self.file_size, self.required_shares)
168         overhead = self._compute_overhead()
169         return share_size + overhead
170
171     def _compute_overhead(self):
172         return 0
173
174     def get_param(self, name):
175         assert self._codec
176
177         if name == "storage_index":
178             return self._storage_index
179         elif name == "share_counts":
180             return (self.required_shares, self.servers_of_happiness,
181                     self.num_shares)
182         elif name == "num_segments":
183             return self.num_segments
184         elif name == "segment_size":
185             return self.segment_size
186         elif name == "block_size":
187             return self._codec.get_block_size()
188         elif name == "share_size":
189             return self._get_share_size()
190         elif name == "serialized_params":
191             return self._codec.get_serialized_params()
192         else:
193             raise KeyError("unknown parameter name '%s'" % name)
194
195     def set_shareholders(self, landlords, servermap):
196         assert isinstance(landlords, dict)
197         for k in landlords:
198             assert IStorageBucketWriter.providedBy(landlords[k])
199         self.landlords = landlords.copy()
200         assert isinstance(servermap, dict)
201         self.servermap = servermap.copy()
202
203     def start(self):
204         """ Returns a Deferred that will fire with the verify cap (an instance of
205         uri.CHKFileVerifierURI)."""
206         self.log("%s starting" % (self,))
207         #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
208         assert self._codec
209         self._crypttext_hasher = hashutil.crypttext_hasher()
210         self._crypttext_hashes = []
211         self.segment_num = 0
212         self.block_hashes = [[] for x in range(self.num_shares)]
213         # block_hashes[i] is a list that will be accumulated and then send
214         # to landlord[i]. This list contains a hash of each segment_share
215         # that we sent to that landlord.
216         self.share_root_hashes = [None] * self.num_shares
217
218         self._times = {
219             "cumulative_encoding": 0.0,
220             "cumulative_sending": 0.0,
221             "hashes_and_close": 0.0,
222             "total_encode_and_push": 0.0,
223             }
224         self._start_total_timestamp = time.time()
225
226         d = fireEventually()
227
228         d.addCallback(lambda res: self.start_all_shareholders())
229
230         for i in range(self.num_segments-1):
231             # note to self: this form doesn't work, because lambda only
232             # captures the slot, not the value
233             #d.addCallback(lambda res: self.do_segment(i))
234             # use this form instead:
235             d.addCallback(lambda res, i=i: self._encode_segment(i))
236             d.addCallback(self._send_segment, i)
237             d.addCallback(self._turn_barrier)
238         last_segnum = self.num_segments - 1
239         d.addCallback(lambda res: self._encode_tail_segment(last_segnum))
240         d.addCallback(self._send_segment, last_segnum)
241         d.addCallback(self._turn_barrier)
242
243         d.addCallback(lambda res: self.finish_hashing())
244
245         d.addCallback(lambda res:
246                       self.send_crypttext_hash_tree_to_all_shareholders())
247         d.addCallback(lambda res: self.send_all_block_hash_trees())
248         d.addCallback(lambda res: self.send_all_share_hash_trees())
249         d.addCallback(lambda res: self.send_uri_extension_to_all_shareholders())
250
251         d.addCallback(lambda res: self.close_all_shareholders())
252         d.addCallbacks(self.done, self.err)
253         return d
254
255     def set_status(self, status):
256         if self._status:
257             self._status.set_status(status)
258
259     def set_encode_and_push_progress(self, sent_segments=None, extra=0.0):
260         if self._status:
261             # we treat the final hash+close as an extra segment
262             if sent_segments is None:
263                 sent_segments = self.num_segments
264             progress = float(sent_segments + extra) / (self.num_segments + 1)
265             self._status.set_progress(2, progress)
266
267     def abort(self):
268         self.log("aborting upload", level=log.UNUSUAL)
269         assert self._codec, "don't call abort before start"
270         self._aborted = True
271         # the next segment read (in _gather_data inside _encode_segment) will
272         # raise UploadAborted(), which will bypass the rest of the upload
273         # chain. If we've sent the final segment's shares, it's too late to
274         # abort. TODO: allow abort any time up to close_all_shareholders.
275
276     def _turn_barrier(self, res):
277         # putting this method in a Deferred chain imposes a guaranteed
278         # reactor turn between the pre- and post- portions of that chain.
279         # This can be useful to limit memory consumption: since Deferreds do
280         # not do tail recursion, code which uses defer.succeed(result) for
281         # consistency will cause objects to live for longer than you might
282         # normally expect.
283
284         return fireEventually(res)
285
286
287     def start_all_shareholders(self):
288         self.log("starting shareholders", level=log.NOISY)
289         self.set_status("Starting shareholders")
290         dl = []
291         for shareid in list(self.landlords):
292             d = self.landlords[shareid].put_header()
293             d.addErrback(self._remove_shareholder, shareid, "start")
294             dl.append(d)
295         return self._gather_responses(dl)
296
297     def _encode_segment(self, segnum):
298         codec = self._codec
299         start = time.time()
300
301         # the ICodecEncoder API wants to receive a total of self.segment_size
302         # bytes on each encode() call, broken up into a number of
303         # identically-sized pieces. Due to the way the codec algorithm works,
304         # these pieces need to be the same size as the share which the codec
305         # will generate. Therefore we must feed it with input_piece_size that
306         # equals the output share size.
307         input_piece_size = codec.get_block_size()
308
309         # as a result, the number of input pieces per encode() call will be
310         # equal to the number of required shares with which the codec was
311         # constructed. You can think of the codec as chopping up a
312         # 'segment_size' of data into 'required_shares' shares (not doing any
313         # fancy math at all, just doing a split), then creating some number
314         # of additional shares which can be substituted if the primary ones
315         # are unavailable
316
317         crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
318
319         # memory footprint: we only hold a tiny piece of the plaintext at any
320         # given time. We build up a segment's worth of cryptttext, then hand
321         # it to the encoder. Assuming 3-of-10 encoding (3.3x expansion) and
322         # 1MiB max_segment_size, we get a peak memory footprint of 4.3*1MiB =
323         # 4.3MiB. Lowering max_segment_size to, say, 100KiB would drop the
324         # footprint to 430KiB at the expense of more hash-tree overhead.
325
326         d = self._gather_data(self.required_shares, input_piece_size,
327                               crypttext_segment_hasher)
328         def _done_gathering(chunks):
329             for c in chunks:
330                 assert len(c) == input_piece_size
331             self._crypttext_hashes.append(crypttext_segment_hasher.digest())
332             # during this call, we hit 5*segsize memory
333             return codec.encode(chunks)
334         d.addCallback(_done_gathering)
335         def _done(res):
336             elapsed = time.time() - start
337             self._times["cumulative_encoding"] += elapsed
338             return res
339         d.addCallback(_done)
340         return d
341
342     def _encode_tail_segment(self, segnum):
343
344         start = time.time()
345         codec = self._tail_codec
346         input_piece_size = codec.get_block_size()
347
348         crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
349
350         d = self._gather_data(self.required_shares, input_piece_size,
351                               crypttext_segment_hasher,
352                               allow_short=True)
353         def _done_gathering(chunks):
354             for c in chunks:
355                 # a short trailing chunk will have been padded by
356                 # _gather_data
357                 assert len(c) == input_piece_size
358             self._crypttext_hashes.append(crypttext_segment_hasher.digest())
359             return codec.encode(chunks)
360         d.addCallback(_done_gathering)
361         def _done(res):
362             elapsed = time.time() - start
363             self._times["cumulative_encoding"] += elapsed
364             return res
365         d.addCallback(_done)
366         return d
367
368     def _gather_data(self, num_chunks, input_chunk_size,
369                      crypttext_segment_hasher,
370                      allow_short=False,
371                      previous_chunks=[]):
372         """Return a Deferred that will fire when the required number of
373         chunks have been read (and hashed and encrypted). The Deferred fires
374         with the combination of any 'previous_chunks' and the new chunks
375         which were gathered."""
376
377         if self._aborted:
378             raise UploadAborted()
379
380         if not num_chunks:
381             return defer.succeed(previous_chunks)
382
383         d = self._uploadable.read_encrypted(input_chunk_size, False)
384         def _got(data):
385             if self._aborted:
386                 raise UploadAborted()
387             encrypted_pieces = []
388             length = 0
389             while data:
390                 encrypted_piece = data.pop(0)
391                 length += len(encrypted_piece)
392                 crypttext_segment_hasher.update(encrypted_piece)
393                 self._crypttext_hasher.update(encrypted_piece)
394                 encrypted_pieces.append(encrypted_piece)
395
396             precondition(length <= input_chunk_size,
397                          "length=%d > input_chunk_size=%d" %
398                          (length, input_chunk_size))
399             if allow_short:
400                 if length < input_chunk_size:
401                     # padding
402                     pad_size = input_chunk_size - length
403                     encrypted_pieces.append('\x00' * pad_size)
404             else:
405                 # non-tail segments should be the full segment size
406                 if length != input_chunk_size:
407                     log.msg("non-tail segment should be full segment size: %d!=%d"
408                             % (length, input_chunk_size),
409                             level=log.BAD, umid="jNk5Yw")
410                 precondition(length == input_chunk_size,
411                              "length=%d != input_chunk_size=%d" %
412                              (length, input_chunk_size))
413
414             encrypted_piece = "".join(encrypted_pieces)
415             return previous_chunks + [encrypted_piece]
416
417         d.addCallback(_got)
418         d.addCallback(lambda chunks:
419                       self._gather_data(num_chunks-1, input_chunk_size,
420                                         crypttext_segment_hasher,
421                                         allow_short, chunks))
422         return d
423
424     def _send_segment(self, (shares, shareids), segnum):
425         # To generate the URI, we must generate the roothash, so we must
426         # generate all shares, even if we aren't actually giving them to
427         # anybody. This means that the set of shares we create will be equal
428         # to or larger than the set of landlords. If we have any landlord who
429         # *doesn't* have a share, that's an error.
430         _assert(set(self.landlords.keys()).issubset(set(shareids)),
431                 shareids=shareids, landlords=self.landlords)
432         start = time.time()
433         dl = []
434         self.set_status("Sending segment %d of %d" % (segnum+1,
435                                                       self.num_segments))
436         self.set_encode_and_push_progress(segnum)
437         lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY)
438         for i in range(len(shares)):
439             block = shares[i]
440             shareid = shareids[i]
441             d = self.send_block(shareid, segnum, block, lognum)
442             dl.append(d)
443             block_hash = hashutil.block_hash(block)
444             #from allmydata.util import base32
445             #log.msg("creating block (shareid=%d, blocknum=%d) "
446             #        "len=%d %r .. %r: %s" %
447             #        (shareid, segnum, len(block),
448             #         block[:50], block[-50:], base32.b2a(block_hash)))
449             self.block_hashes[shareid].append(block_hash)
450
451         dl = self._gather_responses(dl)
452         def _logit(res):
453             self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
454                      (self,
455                       self.segment_size*(segnum+1),
456                       self.segment_size*self.num_segments,
457                       100 * (segnum+1) / self.num_segments,
458                       ),
459                      level=log.OPERATIONAL)
460             elapsed = time.time() - start
461             self._times["cumulative_sending"] += elapsed
462             return res
463         dl.addCallback(_logit)
464         return dl
465
466     def send_block(self, shareid, segment_num, block, lognum):
467         if shareid not in self.landlords:
468             return defer.succeed(None)
469         sh = self.landlords[shareid]
470         lognum2 = self.log("put_block to %s" % self.landlords[shareid],
471                            parent=lognum, level=log.NOISY)
472         d = sh.put_block(segment_num, block)
473         def _done(res):
474             self.log("put_block done", parent=lognum2, level=log.NOISY)
475             return res
476         d.addCallback(_done)
477         d.addErrback(self._remove_shareholder, shareid,
478                      "segnum=%d" % segment_num)
479         return d
480
481     def _remove_shareholder(self, why, shareid, where):
482         ln = self.log(format="error while sending %(method)s to shareholder=%(shnum)d",
483                       method=where, shnum=shareid,
484                       level=log.UNUSUAL, failure=why)
485         if shareid in self.landlords:
486             self.landlords[shareid].abort()
487             del self.landlords[shareid]
488         else:
489             # even more UNUSUAL
490             self.log("they weren't in our list of landlords", parent=ln,
491                      level=log.WEIRD, umid="TQGFRw")
492         del(self.servermap[shareid])
493         servers_left = list(set(self.servermap.values()))
494         if len(servers_left) < self.servers_of_happiness:
495             msg = "lost too many servers during upload (still have %d, want %d): %s" % \
496                   (len(servers_left),
497                    self.servers_of_happiness, why)
498             raise UploadUnhappinessError(msg)
499         self.log("but we can still continue with %s shares, we'll be happy "
500                  "with at least %s" % (len(servers_left),
501                                        self.servers_of_happiness),
502                  parent=ln)
503
504     def _gather_responses(self, dl):
505         d = defer.DeferredList(dl, fireOnOneErrback=True)
506         def _eatNotEnoughSharesError(f):
507             # all exceptions that occur while talking to a peer are handled
508             # in _remove_shareholder. That might raise UploadUnhappinessError,
509             # which will cause the DeferredList to errback but which should
510             # otherwise be consumed. Allow non-UploadUnhappinessError exceptions
511             # to pass through as an unhandled errback. We use this in lieu of
512             # consumeErrors=True to allow coding errors to be logged.
513             f.trap(UploadUnhappinessError)
514             return None
515         for d0 in dl:
516             d0.addErrback(_eatNotEnoughSharesError)
517         return d
518
519     def finish_hashing(self):
520         self._start_hashing_and_close_timestamp = time.time()
521         self.set_status("Finishing hashes")
522         self.set_encode_and_push_progress(extra=0.0)
523         crypttext_hash = self._crypttext_hasher.digest()
524         self.uri_extension_data["crypttext_hash"] = crypttext_hash
525         self._uploadable.close()
526
527     def send_crypttext_hash_tree_to_all_shareholders(self):
528         self.log("sending crypttext hash tree", level=log.NOISY)
529         self.set_status("Sending Crypttext Hash Tree")
530         self.set_encode_and_push_progress(extra=0.3)
531         t = HashTree(self._crypttext_hashes)
532         all_hashes = list(t)
533         self.uri_extension_data["crypttext_root_hash"] = t[0]
534         dl = []
535         for shareid in list(self.landlords):
536             dl.append(self.send_crypttext_hash_tree(shareid, all_hashes))
537         return self._gather_responses(dl)
538
539     def send_crypttext_hash_tree(self, shareid, all_hashes):
540         if shareid not in self.landlords:
541             return defer.succeed(None)
542         sh = self.landlords[shareid]
543         d = sh.put_crypttext_hashes(all_hashes)
544         d.addErrback(self._remove_shareholder, shareid, "put_crypttext_hashes")
545         return d
546
547     def send_all_block_hash_trees(self):
548         self.log("sending block hash trees", level=log.NOISY)
549         self.set_status("Sending Subshare Hash Trees")
550         self.set_encode_and_push_progress(extra=0.4)
551         dl = []
552         for shareid,hashes in enumerate(self.block_hashes):
553             # hashes is a list of the hashes of all blocks that were sent
554             # to shareholder[shareid].
555             dl.append(self.send_one_block_hash_tree(shareid, hashes))
556         return self._gather_responses(dl)
557
558     def send_one_block_hash_tree(self, shareid, block_hashes):
559         t = HashTree(block_hashes)
560         all_hashes = list(t)
561         # all_hashes[0] is the root hash, == hash(ah[1]+ah[2])
562         # all_hashes[1] is the left child, == hash(ah[3]+ah[4])
563         # all_hashes[n] == hash(all_hashes[2*n+1] + all_hashes[2*n+2])
564         self.share_root_hashes[shareid] = t[0]
565         if shareid not in self.landlords:
566             return defer.succeed(None)
567         sh = self.landlords[shareid]
568         d = sh.put_block_hashes(all_hashes)
569         d.addErrback(self._remove_shareholder, shareid, "put_block_hashes")
570         return d
571
572     def send_all_share_hash_trees(self):
573         # Each bucket gets a set of share hash tree nodes that are needed to validate their
574         # share. This includes the share hash itself, but does not include the top-level hash
575         # root (which is stored securely in the URI instead).
576         self.log("sending all share hash trees", level=log.NOISY)
577         self.set_status("Sending Share Hash Trees")
578         self.set_encode_and_push_progress(extra=0.6)
579         dl = []
580         for h in self.share_root_hashes:
581             assert h
582         # create the share hash tree
583         t = HashTree(self.share_root_hashes)
584         # the root of this hash tree goes into our URI
585         self.uri_extension_data['share_root_hash'] = t[0]
586         # now send just the necessary pieces out to each shareholder
587         for i in range(self.num_shares):
588             # the HashTree is given a list of leaves: 0,1,2,3..n .
589             # These become nodes A+0,A+1,A+2.. of the tree, where A=n-1
590             needed_hash_indices = t.needed_hashes(i, include_leaf=True)
591             hashes = [(hi, t[hi]) for hi in needed_hash_indices]
592             dl.append(self.send_one_share_hash_tree(i, hashes))
593         return self._gather_responses(dl)
594
595     def send_one_share_hash_tree(self, shareid, needed_hashes):
596         if shareid not in self.landlords:
597             return defer.succeed(None)
598         sh = self.landlords[shareid]
599         d = sh.put_share_hashes(needed_hashes)
600         d.addErrback(self._remove_shareholder, shareid, "put_share_hashes")
601         return d
602
603     def send_uri_extension_to_all_shareholders(self):
604         lp = self.log("sending uri_extension", level=log.NOISY)
605         self.set_status("Sending URI Extensions")
606         self.set_encode_and_push_progress(extra=0.8)
607         for k in ('crypttext_root_hash', 'crypttext_hash',
608                   ):
609             assert k in self.uri_extension_data
610         uri_extension = uri.pack_extension(self.uri_extension_data)
611         ed = {}
612         for k,v in self.uri_extension_data.items():
613             if k.endswith("hash"):
614                 ed[k] = base32.b2a(v)
615             else:
616                 ed[k] = v
617         self.log("uri_extension_data is %s" % (ed,), level=log.NOISY, parent=lp)
618         self.uri_extension_hash = hashutil.uri_extension_hash(uri_extension)
619         dl = []
620         for shareid in list(self.landlords):
621             dl.append(self.send_uri_extension(shareid, uri_extension))
622         return self._gather_responses(dl)
623
624     def send_uri_extension(self, shareid, uri_extension):
625         sh = self.landlords[shareid]
626         d = sh.put_uri_extension(uri_extension)
627         d.addErrback(self._remove_shareholder, shareid, "put_uri_extension")
628         return d
629
630     def close_all_shareholders(self):
631         self.log("closing shareholders", level=log.NOISY)
632         self.set_status("Closing Shareholders")
633         self.set_encode_and_push_progress(extra=0.9)
634         dl = []
635         for shareid in list(self.landlords):
636             d = self.landlords[shareid].close()
637             d.addErrback(self._remove_shareholder, shareid, "close")
638             dl.append(d)
639         return self._gather_responses(dl)
640
641     def done(self, res):
642         self.log("upload done", level=log.OPERATIONAL)
643         self.set_status("Finished")
644         self.set_encode_and_push_progress(extra=1.0) # done
645         now = time.time()
646         h_and_c_elapsed = now - self._start_hashing_and_close_timestamp
647         self._times["hashes_and_close"] = h_and_c_elapsed
648         total_elapsed = now - self._start_total_timestamp
649         self._times["total_encode_and_push"] = total_elapsed
650
651         # update our sharemap
652         self._shares_placed = set(self.landlords.keys())
653         return uri.CHKFileVerifierURI(self._storage_index, self.uri_extension_hash,
654                                       self.required_shares, self.num_shares, self.file_size)
655
656     def err(self, f):
657         self.log("upload failed", failure=f, level=log.UNUSUAL)
658         self.set_status("Failed")
659         # we need to abort any remaining shareholders, so they'll delete the
660         # partial share, allowing someone else to upload it again.
661         self.log("aborting shareholders", level=log.UNUSUAL)
662         for shareid in list(self.landlords):
663             self.landlords[shareid].abort()
664         if f.check(defer.FirstError):
665             return f.value.subFailure
666         return f
667
668     def get_shares_placed(self):
669         # return a set of share numbers that were successfully placed.
670         return self._shares_placed
671
672     def get_times(self):
673         # return a dictionary of encode+push timings
674         return self._times
675
676     def get_uri_extension_data(self):
677         return self.uri_extension_data