]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/encode.py
add upload timings and rates to the POST /uri?t=upload results page
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / encode.py
1 # -*- test-case-name: allmydata.test.test_encode -*-
2
3 import time
4 from zope.interface import implements
5 from twisted.internet import defer
6 from foolscap import eventual
7 from allmydata import uri
8 from allmydata.hashtree import HashTree
9 from allmydata.util import mathutil, hashutil, idlib, log
10 from allmydata.util.assertutil import _assert, precondition
11 from allmydata.codec import CRSEncoder
12 from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
13      IEncryptedUploadable
14
15 """
16 The goal of the encoder is to turn the original file into a series of
17 'shares'. Each share is going to a 'shareholder' (nominally each shareholder
18 is a different host, but for small grids there may be overlap). The number
19 of shares is chosen to hit our reliability goals (more shares on more
20 machines means more reliability), and is limited by overhead (proportional to
21 numshares or log(numshares)) and the encoding technology in use (zfec permits
22 only 256 shares total). It is also constrained by the amount of data
23 we want to send to each host. For estimating purposes, think of 10 shares
24 out of which we need 3 to reconstruct the file.
25
26 The encoder starts by cutting the original file into segments. All segments
27 except the last are of equal size. The segment size is chosen to constrain
28 the memory footprint (which will probably vary between 1x and 4x segment
29 size) and to constrain the overhead (which will be proportional to
30 log(number of segments)).
31
32
33 Each segment (A,B,C) is read into memory, encrypted, and encoded into
34 blocks. The 'share' (say, share #1) that makes it out to a host is a
35 collection of these blocks (block A1, B1, C1), plus some hash-tree
36 information necessary to validate the data upon retrieval. Only one segment
37 is handled at a time: all blocks for segment A are delivered before any
38 work is begun on segment B.
39
40 As blocks are created, we retain the hash of each one. The list of
41 block hashes for a single share (say, hash(A1), hash(B1), hash(C1)) is
42 used to form the base of a Merkle hash tree for that share (hashtrees[1]).
43 This hash tree has one terminal leaf per block. The complete block hash
44 tree is sent to the shareholder after all the data has been sent. At
45 retrieval time, the decoder will ask for specific pieces of this tree before
46 asking for blocks, whichever it needs to validate those blocks.
47
48 (Note: we don't really need to generate this whole block hash tree
49 ourselves. It would be sufficient to have the shareholder generate it and
50 just tell us the root. This gives us an extra level of validation on the
51 transfer, though, and it is relatively cheap to compute.)
52
53 Each of these block hash trees has a root hash. The collection of these
54 root hashes for all shares are collected into the 'share hash tree', which
55 has one terminal leaf per share. After sending the blocks and the complete
56 block hash tree to each shareholder, we send them the portion of the share
57 hash tree that is necessary to validate their share. The root of the share
58 hash tree is put into the URI.
59
60 """
61
62 class NotEnoughPeersError(Exception):
63     pass
64
65 class UploadAborted(Exception):
66     pass
67
68 KiB=1024
69 MiB=1024*KiB
70 GiB=1024*MiB
71 TiB=1024*GiB
72 PiB=1024*TiB
73
74 class Encoder(object):
75     implements(IEncoder)
76
77     def __init__(self, log_parent=None):
78         object.__init__(self)
79         self.uri_extension_data = {}
80         self._codec = None
81         precondition(log_parent is None or isinstance(log_parent, int),
82                      log_parent)
83         self._log_number = log.msg("creating Encoder %s" % self,
84                                    facility="tahoe.encoder", parent=log_parent)
85         self._aborted = False
86
87     def __repr__(self):
88         if hasattr(self, "_storage_index"):
89             return "<Encoder for %s>" % idlib.b2a(self._storage_index)[:6]
90         return "<Encoder for unknown storage index>"
91
92     def log(self, *args, **kwargs):
93         if "parent" not in kwargs:
94             kwargs["parent"] = self._log_number
95         if "facility" not in kwargs:
96             kwargs["facility"] = "tahoe.encoder"
97         return log.msg(*args, **kwargs)
98
99     def set_encrypted_uploadable(self, uploadable):
100         eu = self._uploadable = IEncryptedUploadable(uploadable)
101         d = eu.get_size()
102         def _got_size(size):
103             self.log(format="file size: %(size)d", size=size)
104             self.file_size = size
105         d.addCallback(_got_size)
106         d.addCallback(lambda res: eu.get_all_encoding_parameters())
107         d.addCallback(self._got_all_encoding_parameters)
108         d.addCallback(lambda res: eu.get_storage_index())
109         def _done(storage_index):
110             self._storage_index = storage_index
111             return self
112         d.addCallback(_done)
113         return d
114
115     def _got_all_encoding_parameters(self, params):
116         assert not self._codec
117         k, happy, n, segsize = params
118         self.required_shares = k
119         self.shares_of_happiness = happy
120         self.num_shares = n
121         self.segment_size = segsize
122         self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize))
123         self.log("now setting up codec")
124
125         assert self.segment_size % self.required_shares == 0
126
127         self.num_segments = mathutil.div_ceil(self.file_size,
128                                               self.segment_size)
129
130         self._codec = CRSEncoder()
131         self._codec.set_params(self.segment_size,
132                                self.required_shares, self.num_shares)
133
134         data = self.uri_extension_data
135         data['codec_name'] = self._codec.get_encoder_type()
136         data['codec_params'] = self._codec.get_serialized_params()
137
138         data['size'] = self.file_size
139         data['segment_size'] = self.segment_size
140         self.share_size = mathutil.div_ceil(self.file_size,
141                                             self.required_shares)
142         data['num_segments'] = self.num_segments
143         data['needed_shares'] = self.required_shares
144         data['total_shares'] = self.num_shares
145
146         # the "tail" is the last segment. This segment may or may not be
147         # shorter than all other segments. We use the "tail codec" to handle
148         # it. If the tail is short, we use a different codec instance. In
149         # addition, the tail codec must be fed data which has been padded out
150         # to the right size.
151         self.tail_size = self.file_size % self.segment_size
152         if not self.tail_size:
153             self.tail_size = self.segment_size
154
155         # the tail codec is responsible for encoding tail_size bytes
156         padded_tail_size = mathutil.next_multiple(self.tail_size,
157                                                   self.required_shares)
158         self._tail_codec = CRSEncoder()
159         self._tail_codec.set_params(padded_tail_size,
160                                     self.required_shares, self.num_shares)
161         data['tail_codec_params'] = self._tail_codec.get_serialized_params()
162
163     def _get_share_size(self):
164         share_size = mathutil.div_ceil(self.file_size, self.required_shares)
165         overhead = self._compute_overhead()
166         return share_size + overhead
167
168     def _compute_overhead(self):
169         return 0
170
171     def get_param(self, name):
172         assert self._codec
173
174         if name == "storage_index":
175             return self._storage_index
176         elif name == "share_counts":
177             return (self.required_shares, self.shares_of_happiness,
178                     self.num_shares)
179         elif name == "num_segments":
180             return self.num_segments
181         elif name == "segment_size":
182             return self.segment_size
183         elif name == "block_size":
184             return self._codec.get_block_size()
185         elif name == "share_size":
186             return self._get_share_size()
187         elif name == "serialized_params":
188             return self._codec.get_serialized_params()
189         else:
190             raise KeyError("unknown parameter name '%s'" % name)
191
192     def set_shareholders(self, landlords):
193         assert isinstance(landlords, dict)
194         for k in landlords:
195             assert IStorageBucketWriter.providedBy(landlords[k])
196         self.landlords = landlords.copy()
197
198     def start(self):
199         self.log("%s starting" % (self,))
200         #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
201         assert self._codec
202         self._crypttext_hasher = hashutil.crypttext_hasher()
203         self._crypttext_hashes = []
204         self.segment_num = 0
205         self.subshare_hashes = [[] for x in range(self.num_shares)]
206         # subshare_hashes[i] is a list that will be accumulated and then send
207         # to landlord[i]. This list contains a hash of each segment_share
208         # that we sent to that landlord.
209         self.share_root_hashes = [None] * self.num_shares
210
211         self._times = {
212             "cumulative_encoding": 0.0,
213             "cumulative_sending": 0.0,
214             "hashes_and_close": 0.0,
215             "total_encode_and_push": 0.0,
216             }
217         self._start_total_timestamp = time.time()
218
219         d = eventual.fireEventually()
220
221         d.addCallback(lambda res: self.start_all_shareholders())
222
223         for i in range(self.num_segments-1):
224             # note to self: this form doesn't work, because lambda only
225             # captures the slot, not the value
226             #d.addCallback(lambda res: self.do_segment(i))
227             # use this form instead:
228             d.addCallback(lambda res, i=i: self._encode_segment(i))
229             d.addCallback(self._send_segment, i)
230             d.addCallback(self._turn_barrier)
231         last_segnum = self.num_segments - 1
232         d.addCallback(lambda res: self._encode_tail_segment(last_segnum))
233         d.addCallback(self._send_segment, last_segnum)
234         d.addCallback(self._turn_barrier)
235
236         d.addCallback(lambda res: self.finish_hashing())
237
238         d.addCallback(lambda res:
239                       self.send_plaintext_hash_tree_to_all_shareholders())
240         d.addCallback(lambda res:
241                       self.send_crypttext_hash_tree_to_all_shareholders())
242         d.addCallback(lambda res: self.send_all_subshare_hash_trees())
243         d.addCallback(lambda res: self.send_all_share_hash_trees())
244         d.addCallback(lambda res: self.send_uri_extension_to_all_shareholders())
245
246         d.addCallback(lambda res: self.close_all_shareholders())
247         d.addCallbacks(lambda res: self.done(), self.err)
248         return d
249
250     def abort(self):
251         self.log("aborting upload", level=log.UNUSUAL)
252         assert self._codec, "don't call abort before start"
253         self._aborted = True
254         # the next segment read (in _gather_data inside _encode_segment) will
255         # raise UploadAborted(), which will bypass the rest of the upload
256         # chain. If we've sent the final segment's shares, it's too late to
257         # abort. TODO: allow abort any time up to close_all_shareholders.
258
259     def _turn_barrier(self, res):
260         # putting this method in a Deferred chain imposes a guaranteed
261         # reactor turn between the pre- and post- portions of that chain.
262         # This can be useful to limit memory consumption: since Deferreds do
263         # not do tail recursion, code which uses defer.succeed(result) for
264         # consistency will cause objects to live for longer than you might
265         # normally expect.
266
267         return eventual.fireEventually(res)
268
269
270     def start_all_shareholders(self):
271         self.log("starting shareholders", level=log.NOISY)
272         dl = []
273         for shareid in self.landlords:
274             d = self.landlords[shareid].start()
275             d.addErrback(self._remove_shareholder, shareid, "start")
276             dl.append(d)
277         return self._gather_responses(dl)
278
279     def _encode_segment(self, segnum):
280         codec = self._codec
281         start = time.time()
282
283         # the ICodecEncoder API wants to receive a total of self.segment_size
284         # bytes on each encode() call, broken up into a number of
285         # identically-sized pieces. Due to the way the codec algorithm works,
286         # these pieces need to be the same size as the share which the codec
287         # will generate. Therefore we must feed it with input_piece_size that
288         # equals the output share size.
289         input_piece_size = codec.get_block_size()
290
291         # as a result, the number of input pieces per encode() call will be
292         # equal to the number of required shares with which the codec was
293         # constructed. You can think of the codec as chopping up a
294         # 'segment_size' of data into 'required_shares' shares (not doing any
295         # fancy math at all, just doing a split), then creating some number
296         # of additional shares which can be substituted if the primary ones
297         # are unavailable
298
299         crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
300
301         # memory footprint: we only hold a tiny piece of the plaintext at any
302         # given time. We build up a segment's worth of cryptttext, then hand
303         # it to the encoder. Assuming 3-of-10 encoding (3.3x expansion) and
304         # 1MiB max_segment_size, we get a peak memory footprint of 4.3*1MiB =
305         # 4.3MiB. Lowering max_segment_size to, say, 100KiB would drop the
306         # footprint to 430KiB at the expense of more hash-tree overhead.
307
308         d = self._gather_data(self.required_shares, input_piece_size,
309                               crypttext_segment_hasher)
310         def _done_gathering(chunks):
311             for c in chunks:
312                 assert len(c) == input_piece_size
313             self._crypttext_hashes.append(crypttext_segment_hasher.digest())
314             # during this call, we hit 5*segsize memory
315             return codec.encode(chunks)
316         d.addCallback(_done_gathering)
317         def _done(res):
318             elapsed = time.time() - start
319             self._times["cumulative_encoding"] += elapsed
320             return res
321         d.addCallback(_done)
322         return d
323
324     def _encode_tail_segment(self, segnum):
325
326         start = time.time()
327         codec = self._tail_codec
328         input_piece_size = codec.get_block_size()
329
330         crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
331
332         d = self._gather_data(self.required_shares, input_piece_size,
333                               crypttext_segment_hasher,
334                               allow_short=True)
335         def _done_gathering(chunks):
336             for c in chunks:
337                 # a short trailing chunk will have been padded by
338                 # _gather_data
339                 assert len(c) == input_piece_size
340             self._crypttext_hashes.append(crypttext_segment_hasher.digest())
341             return codec.encode(chunks)
342         d.addCallback(_done_gathering)
343         def _done(res):
344             elapsed = time.time() - start
345             self._times["cumulative_encoding"] += elapsed
346             return res
347         d.addCallback(_done)
348         return d
349
350     def _gather_data(self, num_chunks, input_chunk_size,
351                      crypttext_segment_hasher,
352                      allow_short=False,
353                      previous_chunks=[]):
354         """Return a Deferred that will fire when the required number of
355         chunks have been read (and hashed and encrypted). The Deferred fires
356         with the combination of any 'previous_chunks' and the new chunks
357         which were gathered."""
358
359         if self._aborted:
360             raise UploadAborted()
361
362         if not num_chunks:
363             return defer.succeed(previous_chunks)
364
365         d = self._uploadable.read_encrypted(input_chunk_size, False)
366         def _got(data):
367             if self._aborted:
368                 raise UploadAborted()
369             encrypted_pieces = []
370             length = 0
371             while data:
372                 encrypted_piece = data.pop(0)
373                 length += len(encrypted_piece)
374                 crypttext_segment_hasher.update(encrypted_piece)
375                 self._crypttext_hasher.update(encrypted_piece)
376                 encrypted_pieces.append(encrypted_piece)
377
378             if allow_short:
379                 if length < input_chunk_size:
380                     # padding
381                     pad_size = input_chunk_size - length
382                     encrypted_pieces.append('\x00' * pad_size)
383             else:
384                 # non-tail segments should be the full segment size
385                 if length != input_chunk_size:
386                     log.msg("non-tail segment should be full segment size: %d!=%d"
387                             % (length, input_chunk_size), level=log.BAD)
388                 precondition(length == input_chunk_size,
389                              "length=%d != input_chunk_size=%d" %
390                              (length, input_chunk_size))
391
392             encrypted_piece = "".join(encrypted_pieces)
393             return previous_chunks + [encrypted_piece]
394
395         d.addCallback(_got)
396         d.addCallback(lambda chunks:
397                       self._gather_data(num_chunks-1, input_chunk_size,
398                                         crypttext_segment_hasher,
399                                         allow_short, chunks))
400         return d
401
402     def _send_segment(self, (shares, shareids), segnum):
403         # To generate the URI, we must generate the roothash, so we must
404         # generate all shares, even if we aren't actually giving them to
405         # anybody. This means that the set of shares we create will be equal
406         # to or larger than the set of landlords. If we have any landlord who
407         # *doesn't* have a share, that's an error.
408         _assert(set(self.landlords.keys()).issubset(set(shareids)),
409                 shareids=shareids, landlords=self.landlords)
410         start = time.time()
411         dl = []
412         lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY)
413         for i in range(len(shares)):
414             subshare = shares[i]
415             shareid = shareids[i]
416             d = self.send_subshare(shareid, segnum, subshare, lognum)
417             dl.append(d)
418             subshare_hash = hashutil.block_hash(subshare)
419             #from allmydata.util import idlib
420             #log.msg("creating block (shareid=%d, blocknum=%d) "
421             #        "len=%d %r .. %r: %s" %
422             #        (shareid, segnum, len(subshare),
423             #         subshare[:50], subshare[-50:], idlib.b2a(subshare_hash)))
424             self.subshare_hashes[shareid].append(subshare_hash)
425
426         dl = self._gather_responses(dl)
427         def _logit(res):
428             self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
429                      (self,
430                       self.segment_size*(segnum+1),
431                       self.segment_size*self.num_segments,
432                       100 * (segnum+1) / self.num_segments,
433                       ),
434                      level=log.OPERATIONAL)
435             elapsed = time.time() - start
436             self._times["cumulative_sending"] += elapsed
437             return res
438         dl.addCallback(_logit)
439         return dl
440
441     def send_subshare(self, shareid, segment_num, subshare, lognum):
442         if shareid not in self.landlords:
443             return defer.succeed(None)
444         sh = self.landlords[shareid]
445         lognum2 = self.log("put_block to %s" % self.landlords[shareid],
446                            parent=lognum, level=log.NOISY)
447         d = sh.put_block(segment_num, subshare)
448         def _done(res):
449             self.log("put_block done", parent=lognum2, level=log.NOISY)
450             return res
451         d.addCallback(_done)
452         d.addErrback(self._remove_shareholder, shareid,
453                      "segnum=%d" % segment_num)
454         return d
455
456     def _remove_shareholder(self, why, shareid, where):
457         ln = self.log(format="error while sending %(method)s to shareholder=%(shnum)d",
458                       method=where, shnum=shareid,
459                       level=log.UNUSUAL, failure=why)
460         if shareid in self.landlords:
461             del self.landlords[shareid]
462         else:
463             # even more UNUSUAL
464             self.log("they weren't in our list of landlords", parent=ln,
465                      level=log.WEIRD)
466         if len(self.landlords) < self.shares_of_happiness:
467             msg = "lost too many shareholders during upload: %s" % why
468             raise NotEnoughPeersError(msg)
469         self.log("but we can still continue with %s shares, we'll be happy "
470                  "with at least %s" % (len(self.landlords),
471                                        self.shares_of_happiness),
472                  parent=ln)
473
474     def _gather_responses(self, dl):
475         d = defer.DeferredList(dl, fireOnOneErrback=True)
476         def _eatNotEnoughPeersError(f):
477             # all exceptions that occur while talking to a peer are handled
478             # in _remove_shareholder. That might raise NotEnoughPeersError,
479             # which will cause the DeferredList to errback but which should
480             # otherwise be consumed. Allow non-NotEnoughPeersError exceptions
481             # to pass through as an unhandled errback. We use this in lieu of
482             # consumeErrors=True to allow coding errors to be logged.
483             f.trap(NotEnoughPeersError)
484             return None
485         for d0 in dl:
486             d0.addErrback(_eatNotEnoughPeersError)
487         return d
488
489     def finish_hashing(self):
490         self._start_hashing_and_close_timestamp = time.time()
491         crypttext_hash = self._crypttext_hasher.digest()
492         self.uri_extension_data["crypttext_hash"] = crypttext_hash
493         d = self._uploadable.get_plaintext_hash()
494         def _got(plaintext_hash):
495             self.uri_extension_data["plaintext_hash"] = plaintext_hash
496             return self._uploadable.get_plaintext_hashtree_leaves(0, self.num_segments, self.num_segments)
497         d.addCallback(_got)
498         def _got_hashtree_leaves(leaves):
499             self.log("Encoder: got plaintext_hashtree_leaves: %s" %
500                      (",".join([idlib.b2a(h) for h in leaves]),),
501                      level=log.NOISY)
502             ht = list(HashTree(list(leaves)))
503             self.uri_extension_data["plaintext_root_hash"] = ht[0]
504             self._plaintext_hashtree_nodes = ht
505         d.addCallback(_got_hashtree_leaves)
506
507         d.addCallback(lambda res: self._uploadable.close())
508         return d
509
510     def send_plaintext_hash_tree_to_all_shareholders(self):
511         self.log("sending plaintext hash tree", level=log.NOISY)
512         dl = []
513         for shareid in self.landlords.keys():
514             d = self.send_plaintext_hash_tree(shareid,
515                                               self._plaintext_hashtree_nodes)
516             dl.append(d)
517         return self._gather_responses(dl)
518
519     def send_plaintext_hash_tree(self, shareid, all_hashes):
520         if shareid not in self.landlords:
521             return defer.succeed(None)
522         sh = self.landlords[shareid]
523         d = sh.put_plaintext_hashes(all_hashes)
524         d.addErrback(self._remove_shareholder, shareid, "put_plaintext_hashes")
525         return d
526
527     def send_crypttext_hash_tree_to_all_shareholders(self):
528         self.log("sending crypttext hash tree", level=log.NOISY)
529         t = HashTree(self._crypttext_hashes)
530         all_hashes = list(t)
531         self.uri_extension_data["crypttext_root_hash"] = t[0]
532         dl = []
533         for shareid in self.landlords.keys():
534             dl.append(self.send_crypttext_hash_tree(shareid, all_hashes))
535         return self._gather_responses(dl)
536
537     def send_crypttext_hash_tree(self, shareid, all_hashes):
538         if shareid not in self.landlords:
539             return defer.succeed(None)
540         sh = self.landlords[shareid]
541         d = sh.put_crypttext_hashes(all_hashes)
542         d.addErrback(self._remove_shareholder, shareid, "put_crypttext_hashes")
543         return d
544
545     def send_all_subshare_hash_trees(self):
546         self.log("sending subshare hash trees", level=log.NOISY)
547         dl = []
548         for shareid,hashes in enumerate(self.subshare_hashes):
549             # hashes is a list of the hashes of all subshares that were sent
550             # to shareholder[shareid].
551             dl.append(self.send_one_subshare_hash_tree(shareid, hashes))
552         return self._gather_responses(dl)
553
554     def send_one_subshare_hash_tree(self, shareid, subshare_hashes):
555         t = HashTree(subshare_hashes)
556         all_hashes = list(t)
557         # all_hashes[0] is the root hash, == hash(ah[1]+ah[2])
558         # all_hashes[1] is the left child, == hash(ah[3]+ah[4])
559         # all_hashes[n] == hash(all_hashes[2*n+1] + all_hashes[2*n+2])
560         self.share_root_hashes[shareid] = t[0]
561         if shareid not in self.landlords:
562             return defer.succeed(None)
563         sh = self.landlords[shareid]
564         d = sh.put_block_hashes(all_hashes)
565         d.addErrback(self._remove_shareholder, shareid, "put_block_hashes")
566         return d
567
568     def send_all_share_hash_trees(self):
569         # each bucket gets a set of share hash tree nodes that are needed to
570         # validate their share. This includes the share hash itself, but does
571         # not include the top-level hash root (which is stored securely in
572         # the URI instead).
573         self.log("sending all share hash trees", level=log.NOISY)
574         dl = []
575         for h in self.share_root_hashes:
576             assert h
577         # create the share hash tree
578         t = HashTree(self.share_root_hashes)
579         # the root of this hash tree goes into our URI
580         self.uri_extension_data['share_root_hash'] = t[0]
581         # now send just the necessary pieces out to each shareholder
582         for i in range(self.num_shares):
583             # the HashTree is given a list of leaves: 0,1,2,3..n .
584             # These become nodes A+0,A+1,A+2.. of the tree, where A=n-1
585             needed_hash_indices = t.needed_hashes(i, include_leaf=True)
586             hashes = [(hi, t[hi]) for hi in needed_hash_indices]
587             dl.append(self.send_one_share_hash_tree(i, hashes))
588         return self._gather_responses(dl)
589
590     def send_one_share_hash_tree(self, shareid, needed_hashes):
591         if shareid not in self.landlords:
592             return defer.succeed(None)
593         sh = self.landlords[shareid]
594         d = sh.put_share_hashes(needed_hashes)
595         d.addErrback(self._remove_shareholder, shareid, "put_share_hashes")
596         return d
597
598     def send_uri_extension_to_all_shareholders(self):
599         lp = self.log("sending uri_extension", level=log.NOISY)
600         for k in ('crypttext_root_hash', 'crypttext_hash',
601                   'plaintext_root_hash', 'plaintext_hash',
602                   ):
603             assert k in self.uri_extension_data
604         uri_extension = uri.pack_extension(self.uri_extension_data)
605         ed = {}
606         for k,v in self.uri_extension_data.items():
607             if k.endswith("hash"):
608                 ed[k] = idlib.b2a(v)
609             else:
610                 ed[k] = v
611         self.log("uri_extension_data is %s" % (ed,), level=log.NOISY, parent=lp)
612         self.uri_extension_hash = hashutil.uri_extension_hash(uri_extension)
613         dl = []
614         for shareid in self.landlords.keys():
615             dl.append(self.send_uri_extension(shareid, uri_extension))
616         return self._gather_responses(dl)
617
618     def send_uri_extension(self, shareid, uri_extension):
619         sh = self.landlords[shareid]
620         d = sh.put_uri_extension(uri_extension)
621         d.addErrback(self._remove_shareholder, shareid, "put_uri_extension")
622         return d
623
624     def close_all_shareholders(self):
625         self.log("closing shareholders", level=log.NOISY)
626         dl = []
627         for shareid in self.landlords:
628             d = self.landlords[shareid].close()
629             d.addErrback(self._remove_shareholder, shareid, "close")
630             dl.append(d)
631         return self._gather_responses(dl)
632
633     def done(self):
634         self.log("upload done", level=log.OPERATIONAL)
635         now = time.time()
636         h_and_c_elapsed = now - self._start_hashing_and_close_timestamp
637         self._times["hashes_and_close"] = h_and_c_elapsed
638         total_elapsed = now - self._start_total_timestamp
639         self._times["total_encode_and_push"] = total_elapsed
640
641         # update our sharemap
642         self._shares_placed = set(self.landlords.keys())
643         return (self.uri_extension_hash, self.required_shares,
644                 self.num_shares, self.file_size)
645
646     def err(self, f):
647         self.log("upload failed", failure=f, level=log.UNUSUAL)
648         # we need to abort any remaining shareholders, so they'll delete the
649         # partial share, allowing someone else to upload it again.
650         self.log("aborting shareholders", level=log.UNUSUAL)
651         dl = []
652         for shareid in list(self.landlords.keys()):
653             d = self.landlords[shareid].abort()
654             d.addErrback(self._remove_shareholder, shareid, "abort")
655             dl.append(d)
656         d = self._gather_responses(dl)
657         def _done(res):
658             self.log("shareholders aborted", level=log.UNUSUAL)
659             if f.check(defer.FirstError):
660                 return f.value.subFailure
661             return f
662         d.addCallback(_done)
663         return d
664
665     def get_shares_placed(self):
666         # return a set of share numbers that were successfully placed.
667         return self._shares_placed
668
669     def get_times(self):
670         # return a dictionary of encode+push timings
671         return self._times
672     def get_rates(self):
673         # return a dictionary of encode+push speeds
674         rates = {
675             "encode": self.file_size / self._times["cumulative_encoding"],
676             "push": self.file_size / self._times["cumulative_sending"],
677             }
678         return rates