]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/encode.py
Alter 'immutable/encode.py' and 'immutable/upload.py' to use servers_of_happiness...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / encode.py
1 # -*- test-case-name: allmydata.test.test_encode -*-
2
3 import time
4 from zope.interface import implements
5 from twisted.internet import defer
6 from foolscap.api import fireEventually
7 from allmydata import uri
8 from allmydata.storage.server import si_b2a
9 from allmydata.hashtree import HashTree
10 from allmydata.util import mathutil, hashutil, base32, log
11 from allmydata.util.assertutil import _assert, precondition
12 from allmydata.codec import CRSEncoder
13 from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
14      IEncryptedUploadable, IUploadStatus, NotEnoughSharesError, NoSharesError
15
16 """
17 The goal of the encoder is to turn the original file into a series of
18 'shares'. Each share is going to a 'shareholder' (nominally each shareholder
19 is a different host, but for small grids there may be overlap). The number
20 of shares is chosen to hit our reliability goals (more shares on more
21 machines means more reliability), and is limited by overhead (proportional to
22 numshares or log(numshares)) and the encoding technology in use (zfec permits
23 only 256 shares total). It is also constrained by the amount of data
24 we want to send to each host. For estimating purposes, think of 10 shares
25 out of which we need 3 to reconstruct the file.
26
27 The encoder starts by cutting the original file into segments. All segments
28 except the last are of equal size. The segment size is chosen to constrain
29 the memory footprint (which will probably vary between 1x and 4x segment
30 size) and to constrain the overhead (which will be proportional to
31 log(number of segments)).
32
33
34 Each segment (A,B,C) is read into memory, encrypted, and encoded into
35 blocks. The 'share' (say, share #1) that makes it out to a host is a
36 collection of these blocks (block A1, B1, C1), plus some hash-tree
37 information necessary to validate the data upon retrieval. Only one segment
38 is handled at a time: all blocks for segment A are delivered before any
39 work is begun on segment B.
40
41 As blocks are created, we retain the hash of each one. The list of block hashes
42 for a single share (say, hash(A1), hash(B1), hash(C1)) is used to form the base
43 of a Merkle hash tree for that share, called the block hash tree.
44
45 This hash tree has one terminal leaf per block. The complete block hash
46 tree is sent to the shareholder after all the data has been sent. At
47 retrieval time, the decoder will ask for specific pieces of this tree before
48 asking for blocks, whichever it needs to validate those blocks.
49
50 (Note: we don't really need to generate this whole block hash tree
51 ourselves. It would be sufficient to have the shareholder generate it and
52 just tell us the root. This gives us an extra level of validation on the
53 transfer, though, and it is relatively cheap to compute.)
54
55 Each of these block hash trees has a root hash. The collection of these
56 root hashes for all shares are collected into the 'share hash tree', which
57 has one terminal leaf per share. After sending the blocks and the complete
58 block hash tree to each shareholder, we send them the portion of the share
59 hash tree that is necessary to validate their share. The root of the share
60 hash tree is put into the URI.
61
62 """
63
64 class UploadAborted(Exception):
65     pass
66
67 KiB=1024
68 MiB=1024*KiB
69 GiB=1024*MiB
70 TiB=1024*GiB
71 PiB=1024*TiB
72
73 class Encoder(object):
74     implements(IEncoder)
75
76     def __init__(self, log_parent=None, upload_status=None):
77         object.__init__(self)
78         self.uri_extension_data = {}
79         self._codec = None
80         self._status = None
81         if upload_status:
82             self._status = IUploadStatus(upload_status)
83         precondition(log_parent is None or isinstance(log_parent, int),
84                      log_parent)
85         self._log_number = log.msg("creating Encoder %s" % self,
86                                    facility="tahoe.encoder", parent=log_parent)
87         self._aborted = False
88
89     def __repr__(self):
90         if hasattr(self, "_storage_index"):
91             return "<Encoder for %s>" % si_b2a(self._storage_index)[:5]
92         return "<Encoder for unknown storage index>"
93
94     def log(self, *args, **kwargs):
95         if "parent" not in kwargs:
96             kwargs["parent"] = self._log_number
97         if "facility" not in kwargs:
98             kwargs["facility"] = "tahoe.encoder"
99         return log.msg(*args, **kwargs)
100
101     def set_encrypted_uploadable(self, uploadable):
102         eu = self._uploadable = IEncryptedUploadable(uploadable)
103         d = eu.get_size()
104         def _got_size(size):
105             self.log(format="file size: %(size)d", size=size)
106             self.file_size = size
107         d.addCallback(_got_size)
108         d.addCallback(lambda res: eu.get_all_encoding_parameters())
109         d.addCallback(self._got_all_encoding_parameters)
110         d.addCallback(lambda res: eu.get_storage_index())
111         def _done(storage_index):
112             self._storage_index = storage_index
113             return self
114         d.addCallback(_done)
115         return d
116
117     def _got_all_encoding_parameters(self, params):
118         assert not self._codec
119         k, happy, n, segsize = params
120         self.required_shares = k
121         self.servers_of_happiness = happy
122         self.num_shares = n
123         self.segment_size = segsize
124         self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize))
125         self.log("now setting up codec")
126
127         assert self.segment_size % self.required_shares == 0
128
129         self.num_segments = mathutil.div_ceil(self.file_size,
130                                               self.segment_size)
131
132         self._codec = CRSEncoder()
133         self._codec.set_params(self.segment_size,
134                                self.required_shares, self.num_shares)
135
136         data = self.uri_extension_data
137         data['codec_name'] = self._codec.get_encoder_type()
138         data['codec_params'] = self._codec.get_serialized_params()
139
140         data['size'] = self.file_size
141         data['segment_size'] = self.segment_size
142         self.share_size = mathutil.div_ceil(self.file_size,
143                                             self.required_shares)
144         data['num_segments'] = self.num_segments
145         data['needed_shares'] = self.required_shares
146         data['total_shares'] = self.num_shares
147
148         # the "tail" is the last segment. This segment may or may not be
149         # shorter than all other segments. We use the "tail codec" to handle
150         # it. If the tail is short, we use a different codec instance. In
151         # addition, the tail codec must be fed data which has been padded out
152         # to the right size.
153         tail_size = self.file_size % self.segment_size
154         if not tail_size:
155             tail_size = self.segment_size
156
157         # the tail codec is responsible for encoding tail_size bytes
158         padded_tail_size = mathutil.next_multiple(tail_size,
159                                                   self.required_shares)
160         self._tail_codec = CRSEncoder()
161         self._tail_codec.set_params(padded_tail_size,
162                                     self.required_shares, self.num_shares)
163         data['tail_codec_params'] = self._tail_codec.get_serialized_params()
164
165     def _get_share_size(self):
166         share_size = mathutil.div_ceil(self.file_size, self.required_shares)
167         overhead = self._compute_overhead()
168         return share_size + overhead
169
170     def _compute_overhead(self):
171         return 0
172
173     def get_param(self, name):
174         assert self._codec
175
176         if name == "storage_index":
177             return self._storage_index
178         elif name == "share_counts":
179             return (self.required_shares, self.servers_of_happiness,
180                     self.num_shares)
181         elif name == "num_segments":
182             return self.num_segments
183         elif name == "segment_size":
184             return self.segment_size
185         elif name == "block_size":
186             return self._codec.get_block_size()
187         elif name == "share_size":
188             return self._get_share_size()
189         elif name == "serialized_params":
190             return self._codec.get_serialized_params()
191         else:
192             raise KeyError("unknown parameter name '%s'" % name)
193
194     def set_shareholders(self, landlords, servermap):
195         assert isinstance(landlords, dict)
196         for k in landlords:
197             assert IStorageBucketWriter.providedBy(landlords[k])
198         self.landlords = landlords.copy()
199         assert isinstance(servermap, dict)
200         self.servermap = servermap.copy()
201
202     def start(self):
203         """ Returns a Deferred that will fire with the verify cap (an instance of
204         uri.CHKFileVerifierURI)."""
205         self.log("%s starting" % (self,))
206         #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
207         assert self._codec
208         self._crypttext_hasher = hashutil.crypttext_hasher()
209         self._crypttext_hashes = []
210         self.segment_num = 0
211         self.block_hashes = [[] for x in range(self.num_shares)]
212         # block_hashes[i] is a list that will be accumulated and then send
213         # to landlord[i]. This list contains a hash of each segment_share
214         # that we sent to that landlord.
215         self.share_root_hashes = [None] * self.num_shares
216
217         self._times = {
218             "cumulative_encoding": 0.0,
219             "cumulative_sending": 0.0,
220             "hashes_and_close": 0.0,
221             "total_encode_and_push": 0.0,
222             }
223         self._start_total_timestamp = time.time()
224
225         d = fireEventually()
226
227         d.addCallback(lambda res: self.start_all_shareholders())
228
229         for i in range(self.num_segments-1):
230             # note to self: this form doesn't work, because lambda only
231             # captures the slot, not the value
232             #d.addCallback(lambda res: self.do_segment(i))
233             # use this form instead:
234             d.addCallback(lambda res, i=i: self._encode_segment(i))
235             d.addCallback(self._send_segment, i)
236             d.addCallback(self._turn_barrier)
237         last_segnum = self.num_segments - 1
238         d.addCallback(lambda res: self._encode_tail_segment(last_segnum))
239         d.addCallback(self._send_segment, last_segnum)
240         d.addCallback(self._turn_barrier)
241
242         d.addCallback(lambda res: self.finish_hashing())
243
244         d.addCallback(lambda res:
245                       self.send_crypttext_hash_tree_to_all_shareholders())
246         d.addCallback(lambda res: self.send_all_block_hash_trees())
247         d.addCallback(lambda res: self.send_all_share_hash_trees())
248         d.addCallback(lambda res: self.send_uri_extension_to_all_shareholders())
249
250         d.addCallback(lambda res: self.close_all_shareholders())
251         d.addCallbacks(self.done, self.err)
252         return d
253
254     def set_status(self, status):
255         if self._status:
256             self._status.set_status(status)
257
258     def set_encode_and_push_progress(self, sent_segments=None, extra=0.0):
259         if self._status:
260             # we treat the final hash+close as an extra segment
261             if sent_segments is None:
262                 sent_segments = self.num_segments
263             progress = float(sent_segments + extra) / (self.num_segments + 1)
264             self._status.set_progress(2, progress)
265
266     def abort(self):
267         self.log("aborting upload", level=log.UNUSUAL)
268         assert self._codec, "don't call abort before start"
269         self._aborted = True
270         # the next segment read (in _gather_data inside _encode_segment) will
271         # raise UploadAborted(), which will bypass the rest of the upload
272         # chain. If we've sent the final segment's shares, it's too late to
273         # abort. TODO: allow abort any time up to close_all_shareholders.
274
275     def _turn_barrier(self, res):
276         # putting this method in a Deferred chain imposes a guaranteed
277         # reactor turn between the pre- and post- portions of that chain.
278         # This can be useful to limit memory consumption: since Deferreds do
279         # not do tail recursion, code which uses defer.succeed(result) for
280         # consistency will cause objects to live for longer than you might
281         # normally expect.
282
283         return fireEventually(res)
284
285
286     def start_all_shareholders(self):
287         self.log("starting shareholders", level=log.NOISY)
288         self.set_status("Starting shareholders")
289         dl = []
290         for shareid in list(self.landlords):
291             d = self.landlords[shareid].put_header()
292             d.addErrback(self._remove_shareholder, shareid, "start")
293             dl.append(d)
294         return self._gather_responses(dl)
295
296     def _encode_segment(self, segnum):
297         codec = self._codec
298         start = time.time()
299
300         # the ICodecEncoder API wants to receive a total of self.segment_size
301         # bytes on each encode() call, broken up into a number of
302         # identically-sized pieces. Due to the way the codec algorithm works,
303         # these pieces need to be the same size as the share which the codec
304         # will generate. Therefore we must feed it with input_piece_size that
305         # equals the output share size.
306         input_piece_size = codec.get_block_size()
307
308         # as a result, the number of input pieces per encode() call will be
309         # equal to the number of required shares with which the codec was
310         # constructed. You can think of the codec as chopping up a
311         # 'segment_size' of data into 'required_shares' shares (not doing any
312         # fancy math at all, just doing a split), then creating some number
313         # of additional shares which can be substituted if the primary ones
314         # are unavailable
315
316         crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
317
318         # memory footprint: we only hold a tiny piece of the plaintext at any
319         # given time. We build up a segment's worth of cryptttext, then hand
320         # it to the encoder. Assuming 3-of-10 encoding (3.3x expansion) and
321         # 1MiB max_segment_size, we get a peak memory footprint of 4.3*1MiB =
322         # 4.3MiB. Lowering max_segment_size to, say, 100KiB would drop the
323         # footprint to 430KiB at the expense of more hash-tree overhead.
324
325         d = self._gather_data(self.required_shares, input_piece_size,
326                               crypttext_segment_hasher)
327         def _done_gathering(chunks):
328             for c in chunks:
329                 assert len(c) == input_piece_size
330             self._crypttext_hashes.append(crypttext_segment_hasher.digest())
331             # during this call, we hit 5*segsize memory
332             return codec.encode(chunks)
333         d.addCallback(_done_gathering)
334         def _done(res):
335             elapsed = time.time() - start
336             self._times["cumulative_encoding"] += elapsed
337             return res
338         d.addCallback(_done)
339         return d
340
341     def _encode_tail_segment(self, segnum):
342
343         start = time.time()
344         codec = self._tail_codec
345         input_piece_size = codec.get_block_size()
346
347         crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
348
349         d = self._gather_data(self.required_shares, input_piece_size,
350                               crypttext_segment_hasher,
351                               allow_short=True)
352         def _done_gathering(chunks):
353             for c in chunks:
354                 # a short trailing chunk will have been padded by
355                 # _gather_data
356                 assert len(c) == input_piece_size
357             self._crypttext_hashes.append(crypttext_segment_hasher.digest())
358             return codec.encode(chunks)
359         d.addCallback(_done_gathering)
360         def _done(res):
361             elapsed = time.time() - start
362             self._times["cumulative_encoding"] += elapsed
363             return res
364         d.addCallback(_done)
365         return d
366
367     def _gather_data(self, num_chunks, input_chunk_size,
368                      crypttext_segment_hasher,
369                      allow_short=False,
370                      previous_chunks=[]):
371         """Return a Deferred that will fire when the required number of
372         chunks have been read (and hashed and encrypted). The Deferred fires
373         with the combination of any 'previous_chunks' and the new chunks
374         which were gathered."""
375
376         if self._aborted:
377             raise UploadAborted()
378
379         if not num_chunks:
380             return defer.succeed(previous_chunks)
381
382         d = self._uploadable.read_encrypted(input_chunk_size, False)
383         def _got(data):
384             if self._aborted:
385                 raise UploadAborted()
386             encrypted_pieces = []
387             length = 0
388             while data:
389                 encrypted_piece = data.pop(0)
390                 length += len(encrypted_piece)
391                 crypttext_segment_hasher.update(encrypted_piece)
392                 self._crypttext_hasher.update(encrypted_piece)
393                 encrypted_pieces.append(encrypted_piece)
394
395             precondition(length <= input_chunk_size,
396                          "length=%d > input_chunk_size=%d" %
397                          (length, input_chunk_size))
398             if allow_short:
399                 if length < input_chunk_size:
400                     # padding
401                     pad_size = input_chunk_size - length
402                     encrypted_pieces.append('\x00' * pad_size)
403             else:
404                 # non-tail segments should be the full segment size
405                 if length != input_chunk_size:
406                     log.msg("non-tail segment should be full segment size: %d!=%d"
407                             % (length, input_chunk_size),
408                             level=log.BAD, umid="jNk5Yw")
409                 precondition(length == input_chunk_size,
410                              "length=%d != input_chunk_size=%d" %
411                              (length, input_chunk_size))
412
413             encrypted_piece = "".join(encrypted_pieces)
414             return previous_chunks + [encrypted_piece]
415
416         d.addCallback(_got)
417         d.addCallback(lambda chunks:
418                       self._gather_data(num_chunks-1, input_chunk_size,
419                                         crypttext_segment_hasher,
420                                         allow_short, chunks))
421         return d
422
423     def _send_segment(self, (shares, shareids), segnum):
424         # To generate the URI, we must generate the roothash, so we must
425         # generate all shares, even if we aren't actually giving them to
426         # anybody. This means that the set of shares we create will be equal
427         # to or larger than the set of landlords. If we have any landlord who
428         # *doesn't* have a share, that's an error.
429         _assert(set(self.landlords.keys()).issubset(set(shareids)),
430                 shareids=shareids, landlords=self.landlords)
431         start = time.time()
432         dl = []
433         self.set_status("Sending segment %d of %d" % (segnum+1,
434                                                       self.num_segments))
435         self.set_encode_and_push_progress(segnum)
436         lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY)
437         for i in range(len(shares)):
438             block = shares[i]
439             shareid = shareids[i]
440             d = self.send_block(shareid, segnum, block, lognum)
441             dl.append(d)
442             block_hash = hashutil.block_hash(block)
443             #from allmydata.util import base32
444             #log.msg("creating block (shareid=%d, blocknum=%d) "
445             #        "len=%d %r .. %r: %s" %
446             #        (shareid, segnum, len(block),
447             #         block[:50], block[-50:], base32.b2a(block_hash)))
448             self.block_hashes[shareid].append(block_hash)
449
450         dl = self._gather_responses(dl)
451         def _logit(res):
452             self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
453                      (self,
454                       self.segment_size*(segnum+1),
455                       self.segment_size*self.num_segments,
456                       100 * (segnum+1) / self.num_segments,
457                       ),
458                      level=log.OPERATIONAL)
459             elapsed = time.time() - start
460             self._times["cumulative_sending"] += elapsed
461             return res
462         dl.addCallback(_logit)
463         return dl
464
465     def send_block(self, shareid, segment_num, block, lognum):
466         if shareid not in self.landlords:
467             return defer.succeed(None)
468         sh = self.landlords[shareid]
469         lognum2 = self.log("put_block to %s" % self.landlords[shareid],
470                            parent=lognum, level=log.NOISY)
471         d = sh.put_block(segment_num, block)
472         def _done(res):
473             self.log("put_block done", parent=lognum2, level=log.NOISY)
474             return res
475         d.addCallback(_done)
476         d.addErrback(self._remove_shareholder, shareid,
477                      "segnum=%d" % segment_num)
478         return d
479
480     def _remove_shareholder(self, why, shareid, where):
481         ln = self.log(format="error while sending %(method)s to shareholder=%(shnum)d",
482                       method=where, shnum=shareid,
483                       level=log.UNUSUAL, failure=why)
484         if shareid in self.landlords:
485             self.landlords[shareid].abort()
486             del self.landlords[shareid]
487         else:
488             # even more UNUSUAL
489             self.log("they weren't in our list of landlords", parent=ln,
490                      level=log.WEIRD, umid="TQGFRw")
491         del(self.servermap[shareid])
492         servers_left = list(set(self.servermap.values()))
493         if len(servers_left) < self.servers_of_happiness:
494             msg = "lost too many servers during upload (still have %d, want %d): %s" % \
495                   (len(servers_left),
496                    self.servers_of_happiness, why)
497             if servers_left:
498                 raise NotEnoughSharesError(msg)
499             else:
500                 raise NoSharesError(msg)
501         self.log("but we can still continue with %s shares, we'll be happy "
502                  "with at least %s" % (len(servers_left),
503                                        self.servers_of_happiness),
504                  parent=ln)
505
506     def _gather_responses(self, dl):
507         d = defer.DeferredList(dl, fireOnOneErrback=True)
508         def _eatNotEnoughSharesError(f):
509             # all exceptions that occur while talking to a peer are handled
510             # in _remove_shareholder. That might raise NotEnoughSharesError,
511             # which will cause the DeferredList to errback but which should
512             # otherwise be consumed. Allow non-NotEnoughSharesError exceptions
513             # to pass through as an unhandled errback. We use this in lieu of
514             # consumeErrors=True to allow coding errors to be logged.
515             f.trap(NotEnoughSharesError, NoSharesError)
516             return None
517         for d0 in dl:
518             d0.addErrback(_eatNotEnoughSharesError)
519         return d
520
521     def finish_hashing(self):
522         self._start_hashing_and_close_timestamp = time.time()
523         self.set_status("Finishing hashes")
524         self.set_encode_and_push_progress(extra=0.0)
525         crypttext_hash = self._crypttext_hasher.digest()
526         self.uri_extension_data["crypttext_hash"] = crypttext_hash
527         self._uploadable.close()
528
529     def send_crypttext_hash_tree_to_all_shareholders(self):
530         self.log("sending crypttext hash tree", level=log.NOISY)
531         self.set_status("Sending Crypttext Hash Tree")
532         self.set_encode_and_push_progress(extra=0.3)
533         t = HashTree(self._crypttext_hashes)
534         all_hashes = list(t)
535         self.uri_extension_data["crypttext_root_hash"] = t[0]
536         dl = []
537         for shareid in list(self.landlords):
538             dl.append(self.send_crypttext_hash_tree(shareid, all_hashes))
539         return self._gather_responses(dl)
540
541     def send_crypttext_hash_tree(self, shareid, all_hashes):
542         if shareid not in self.landlords:
543             return defer.succeed(None)
544         sh = self.landlords[shareid]
545         d = sh.put_crypttext_hashes(all_hashes)
546         d.addErrback(self._remove_shareholder, shareid, "put_crypttext_hashes")
547         return d
548
549     def send_all_block_hash_trees(self):
550         self.log("sending block hash trees", level=log.NOISY)
551         self.set_status("Sending Subshare Hash Trees")
552         self.set_encode_and_push_progress(extra=0.4)
553         dl = []
554         for shareid,hashes in enumerate(self.block_hashes):
555             # hashes is a list of the hashes of all blocks that were sent
556             # to shareholder[shareid].
557             dl.append(self.send_one_block_hash_tree(shareid, hashes))
558         return self._gather_responses(dl)
559
560     def send_one_block_hash_tree(self, shareid, block_hashes):
561         t = HashTree(block_hashes)
562         all_hashes = list(t)
563         # all_hashes[0] is the root hash, == hash(ah[1]+ah[2])
564         # all_hashes[1] is the left child, == hash(ah[3]+ah[4])
565         # all_hashes[n] == hash(all_hashes[2*n+1] + all_hashes[2*n+2])
566         self.share_root_hashes[shareid] = t[0]
567         if shareid not in self.landlords:
568             return defer.succeed(None)
569         sh = self.landlords[shareid]
570         d = sh.put_block_hashes(all_hashes)
571         d.addErrback(self._remove_shareholder, shareid, "put_block_hashes")
572         return d
573
574     def send_all_share_hash_trees(self):
575         # Each bucket gets a set of share hash tree nodes that are needed to validate their
576         # share. This includes the share hash itself, but does not include the top-level hash
577         # root (which is stored securely in the URI instead).
578         self.log("sending all share hash trees", level=log.NOISY)
579         self.set_status("Sending Share Hash Trees")
580         self.set_encode_and_push_progress(extra=0.6)
581         dl = []
582         for h in self.share_root_hashes:
583             assert h
584         # create the share hash tree
585         t = HashTree(self.share_root_hashes)
586         # the root of this hash tree goes into our URI
587         self.uri_extension_data['share_root_hash'] = t[0]
588         # now send just the necessary pieces out to each shareholder
589         for i in range(self.num_shares):
590             # the HashTree is given a list of leaves: 0,1,2,3..n .
591             # These become nodes A+0,A+1,A+2.. of the tree, where A=n-1
592             needed_hash_indices = t.needed_hashes(i, include_leaf=True)
593             hashes = [(hi, t[hi]) for hi in needed_hash_indices]
594             dl.append(self.send_one_share_hash_tree(i, hashes))
595         return self._gather_responses(dl)
596
597     def send_one_share_hash_tree(self, shareid, needed_hashes):
598         if shareid not in self.landlords:
599             return defer.succeed(None)
600         sh = self.landlords[shareid]
601         d = sh.put_share_hashes(needed_hashes)
602         d.addErrback(self._remove_shareholder, shareid, "put_share_hashes")
603         return d
604
605     def send_uri_extension_to_all_shareholders(self):
606         lp = self.log("sending uri_extension", level=log.NOISY)
607         self.set_status("Sending URI Extensions")
608         self.set_encode_and_push_progress(extra=0.8)
609         for k in ('crypttext_root_hash', 'crypttext_hash',
610                   ):
611             assert k in self.uri_extension_data
612         uri_extension = uri.pack_extension(self.uri_extension_data)
613         ed = {}
614         for k,v in self.uri_extension_data.items():
615             if k.endswith("hash"):
616                 ed[k] = base32.b2a(v)
617             else:
618                 ed[k] = v
619         self.log("uri_extension_data is %s" % (ed,), level=log.NOISY, parent=lp)
620         self.uri_extension_hash = hashutil.uri_extension_hash(uri_extension)
621         dl = []
622         for shareid in list(self.landlords):
623             dl.append(self.send_uri_extension(shareid, uri_extension))
624         return self._gather_responses(dl)
625
626     def send_uri_extension(self, shareid, uri_extension):
627         sh = self.landlords[shareid]
628         d = sh.put_uri_extension(uri_extension)
629         d.addErrback(self._remove_shareholder, shareid, "put_uri_extension")
630         return d
631
632     def close_all_shareholders(self):
633         self.log("closing shareholders", level=log.NOISY)
634         self.set_status("Closing Shareholders")
635         self.set_encode_and_push_progress(extra=0.9)
636         dl = []
637         for shareid in list(self.landlords):
638             d = self.landlords[shareid].close()
639             d.addErrback(self._remove_shareholder, shareid, "close")
640             dl.append(d)
641         return self._gather_responses(dl)
642
643     def done(self, res):
644         self.log("upload done", level=log.OPERATIONAL)
645         self.set_status("Finished")
646         self.set_encode_and_push_progress(extra=1.0) # done
647         now = time.time()
648         h_and_c_elapsed = now - self._start_hashing_and_close_timestamp
649         self._times["hashes_and_close"] = h_and_c_elapsed
650         total_elapsed = now - self._start_total_timestamp
651         self._times["total_encode_and_push"] = total_elapsed
652
653         # update our sharemap
654         self._shares_placed = set(self.landlords.keys())
655         return uri.CHKFileVerifierURI(self._storage_index, self.uri_extension_hash,
656                                       self.required_shares, self.num_shares, self.file_size)
657
658     def err(self, f):
659         self.log("upload failed", failure=f, level=log.UNUSUAL)
660         self.set_status("Failed")
661         # we need to abort any remaining shareholders, so they'll delete the
662         # partial share, allowing someone else to upload it again.
663         self.log("aborting shareholders", level=log.UNUSUAL)
664         for shareid in list(self.landlords):
665             self.landlords[shareid].abort()
666         if f.check(defer.FirstError):
667             return f.value.subFailure
668         return f
669
670     def get_shares_placed(self):
671         # return a set of share numbers that were successfully placed.
672         return self._shares_placed
673
674     def get_times(self):
675         # return a dictionary of encode+push timings
676         return self._times
677
678     def get_uri_extension_data(self):
679         return self.uri_extension_data