1 # -*- test-case-name: allmydata.test.test_encode -*-
4 from zope.interface import implements
5 from twisted.internet import defer
6 from foolscap import eventual
7 from allmydata import storage, uri
8 from allmydata.hashtree import HashTree
9 from allmydata.util import mathutil, hashutil, base32, log
10 from allmydata.util.assertutil import _assert, precondition
11 from allmydata.codec import CRSEncoder
12 from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
13 IEncryptedUploadable, IUploadStatus, NotEnoughSharesError
16 The goal of the encoder is to turn the original file into a series of
17 'shares'. Each share is going to a 'shareholder' (nominally each shareholder
18 is a different host, but for small grids there may be overlap). The number
19 of shares is chosen to hit our reliability goals (more shares on more
20 machines means more reliability), and is limited by overhead (proportional to
21 numshares or log(numshares)) and the encoding technology in use (zfec permits
22 only 256 shares total). It is also constrained by the amount of data
23 we want to send to each host. For estimating purposes, think of 10 shares
24 out of which we need 3 to reconstruct the file.
26 The encoder starts by cutting the original file into segments. All segments
27 except the last are of equal size. The segment size is chosen to constrain
28 the memory footprint (which will probably vary between 1x and 4x segment
29 size) and to constrain the overhead (which will be proportional to
30 log(number of segments)).
33 Each segment (A,B,C) is read into memory, encrypted, and encoded into
34 blocks. The 'share' (say, share #1) that makes it out to a host is a
35 collection of these blocks (block A1, B1, C1), plus some hash-tree
36 information necessary to validate the data upon retrieval. Only one segment
37 is handled at a time: all blocks for segment A are delivered before any
38 work is begun on segment B.
40 As blocks are created, we retain the hash of each one. The list of block hashes
41 for a single share (say, hash(A1), hash(B1), hash(C1)) is used to form the base
42 of a Merkle hash tree for that share, called the block hash tree.
44 This hash tree has one terminal leaf per block. The complete block hash
45 tree is sent to the shareholder after all the data has been sent. At
46 retrieval time, the decoder will ask for specific pieces of this tree before
47 asking for blocks, whichever it needs to validate those blocks.
49 (Note: we don't really need to generate this whole block hash tree
50 ourselves. It would be sufficient to have the shareholder generate it and
51 just tell us the root. This gives us an extra level of validation on the
52 transfer, though, and it is relatively cheap to compute.)
54 Each of these block hash trees has a root hash. The collection of these
55 root hashes for all shares are collected into the 'share hash tree', which
56 has one terminal leaf per share. After sending the blocks and the complete
57 block hash tree to each shareholder, we send them the portion of the share
58 hash tree that is necessary to validate their share. The root of the share
59 hash tree is put into the URI.
63 class UploadAborted(Exception):
72 class Encoder(object):
74 USE_PLAINTEXT_HASHES = False
76 def __init__(self, log_parent=None, upload_status=None):
78 self.uri_extension_data = {}
82 self._status = IUploadStatus(upload_status)
83 precondition(log_parent is None or isinstance(log_parent, int),
85 self._log_number = log.msg("creating Encoder %s" % self,
86 facility="tahoe.encoder", parent=log_parent)
90 if hasattr(self, "_storage_index"):
91 return "<Encoder for %s>" % storage.si_b2a(self._storage_index)[:5]
92 return "<Encoder for unknown storage index>"
94 def log(self, *args, **kwargs):
95 if "parent" not in kwargs:
96 kwargs["parent"] = self._log_number
97 if "facility" not in kwargs:
98 kwargs["facility"] = "tahoe.encoder"
99 return log.msg(*args, **kwargs)
101 def set_encrypted_uploadable(self, uploadable):
102 eu = self._uploadable = IEncryptedUploadable(uploadable)
105 self.log(format="file size: %(size)d", size=size)
106 self.file_size = size
107 d.addCallback(_got_size)
108 d.addCallback(lambda res: eu.get_all_encoding_parameters())
109 d.addCallback(self._got_all_encoding_parameters)
110 d.addCallback(lambda res: eu.get_storage_index())
111 def _done(storage_index):
112 self._storage_index = storage_index
117 def _got_all_encoding_parameters(self, params):
118 assert not self._codec
119 k, happy, n, segsize = params
120 self.required_shares = k
121 self.shares_of_happiness = happy
123 self.segment_size = segsize
124 self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize))
125 self.log("now setting up codec")
127 assert self.segment_size % self.required_shares == 0
129 self.num_segments = mathutil.div_ceil(self.file_size,
132 self._codec = CRSEncoder()
133 self._codec.set_params(self.segment_size,
134 self.required_shares, self.num_shares)
136 data = self.uri_extension_data
137 data['codec_name'] = self._codec.get_encoder_type()
138 data['codec_params'] = self._codec.get_serialized_params()
140 data['size'] = self.file_size
141 data['segment_size'] = self.segment_size
142 self.share_size = mathutil.div_ceil(self.file_size,
143 self.required_shares)
144 data['num_segments'] = self.num_segments
145 data['needed_shares'] = self.required_shares
146 data['total_shares'] = self.num_shares
148 # the "tail" is the last segment. This segment may or may not be
149 # shorter than all other segments. We use the "tail codec" to handle
150 # it. If the tail is short, we use a different codec instance. In
151 # addition, the tail codec must be fed data which has been padded out
153 self.tail_size = self.file_size % self.segment_size
154 if not self.tail_size:
155 self.tail_size = self.segment_size
157 # the tail codec is responsible for encoding tail_size bytes
158 padded_tail_size = mathutil.next_multiple(self.tail_size,
159 self.required_shares)
160 self._tail_codec = CRSEncoder()
161 self._tail_codec.set_params(padded_tail_size,
162 self.required_shares, self.num_shares)
163 data['tail_codec_params'] = self._tail_codec.get_serialized_params()
165 def _get_share_size(self):
166 share_size = mathutil.div_ceil(self.file_size, self.required_shares)
167 overhead = self._compute_overhead()
168 return share_size + overhead
170 def _compute_overhead(self):
173 def get_param(self, name):
176 if name == "storage_index":
177 return self._storage_index
178 elif name == "share_counts":
179 return (self.required_shares, self.shares_of_happiness,
181 elif name == "num_segments":
182 return self.num_segments
183 elif name == "segment_size":
184 return self.segment_size
185 elif name == "block_size":
186 return self._codec.get_block_size()
187 elif name == "share_size":
188 return self._get_share_size()
189 elif name == "serialized_params":
190 return self._codec.get_serialized_params()
192 raise KeyError("unknown parameter name '%s'" % name)
194 def set_shareholders(self, landlords):
195 assert isinstance(landlords, dict)
197 assert IStorageBucketWriter.providedBy(landlords[k])
198 self.landlords = landlords.copy()
201 self.log("%s starting" % (self,))
202 #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
204 self._crypttext_hasher = hashutil.crypttext_hasher()
205 self._crypttext_hashes = []
207 self.subshare_hashes = [[] for x in range(self.num_shares)]
208 # subshare_hashes[i] is a list that will be accumulated and then send
209 # to landlord[i]. This list contains a hash of each segment_share
210 # that we sent to that landlord.
211 self.share_root_hashes = [None] * self.num_shares
214 "cumulative_encoding": 0.0,
215 "cumulative_sending": 0.0,
216 "hashes_and_close": 0.0,
217 "total_encode_and_push": 0.0,
219 self._start_total_timestamp = time.time()
221 d = eventual.fireEventually()
223 d.addCallback(lambda res: self.start_all_shareholders())
225 for i in range(self.num_segments-1):
226 # note to self: this form doesn't work, because lambda only
227 # captures the slot, not the value
228 #d.addCallback(lambda res: self.do_segment(i))
229 # use this form instead:
230 d.addCallback(lambda res, i=i: self._encode_segment(i))
231 d.addCallback(self._send_segment, i)
232 d.addCallback(self._turn_barrier)
233 last_segnum = self.num_segments - 1
234 d.addCallback(lambda res: self._encode_tail_segment(last_segnum))
235 d.addCallback(self._send_segment, last_segnum)
236 d.addCallback(self._turn_barrier)
238 d.addCallback(lambda res: self.finish_hashing())
240 if self.USE_PLAINTEXT_HASHES:
241 d.addCallback(lambda res:
242 self.send_plaintext_hash_tree_to_all_shareholders())
243 d.addCallback(lambda res:
244 self.send_crypttext_hash_tree_to_all_shareholders())
245 d.addCallback(lambda res: self.send_all_subshare_hash_trees())
246 d.addCallback(lambda res: self.send_all_share_hash_trees())
247 d.addCallback(lambda res: self.send_uri_extension_to_all_shareholders())
249 d.addCallback(lambda res: self.close_all_shareholders())
250 d.addCallbacks(self.done, self.err)
253 def set_status(self, status):
255 self._status.set_status(status)
257 def set_encode_and_push_progress(self, sent_segments=None, extra=0.0):
259 # we treat the final hash+close as an extra segment
260 if sent_segments is None:
261 sent_segments = self.num_segments
262 progress = float(sent_segments + extra) / (self.num_segments + 1)
263 self._status.set_progress(2, progress)
266 self.log("aborting upload", level=log.UNUSUAL)
267 assert self._codec, "don't call abort before start"
269 # the next segment read (in _gather_data inside _encode_segment) will
270 # raise UploadAborted(), which will bypass the rest of the upload
271 # chain. If we've sent the final segment's shares, it's too late to
272 # abort. TODO: allow abort any time up to close_all_shareholders.
274 def _turn_barrier(self, res):
275 # putting this method in a Deferred chain imposes a guaranteed
276 # reactor turn between the pre- and post- portions of that chain.
277 # This can be useful to limit memory consumption: since Deferreds do
278 # not do tail recursion, code which uses defer.succeed(result) for
279 # consistency will cause objects to live for longer than you might
282 return eventual.fireEventually(res)
285 def start_all_shareholders(self):
286 self.log("starting shareholders", level=log.NOISY)
287 self.set_status("Starting shareholders")
289 for shareid in self.landlords:
290 d = self.landlords[shareid].start()
291 d.addErrback(self._remove_shareholder, shareid, "start")
293 return self._gather_responses(dl)
295 def _encode_segment(self, segnum):
299 # the ICodecEncoder API wants to receive a total of self.segment_size
300 # bytes on each encode() call, broken up into a number of
301 # identically-sized pieces. Due to the way the codec algorithm works,
302 # these pieces need to be the same size as the share which the codec
303 # will generate. Therefore we must feed it with input_piece_size that
304 # equals the output share size.
305 input_piece_size = codec.get_block_size()
307 # as a result, the number of input pieces per encode() call will be
308 # equal to the number of required shares with which the codec was
309 # constructed. You can think of the codec as chopping up a
310 # 'segment_size' of data into 'required_shares' shares (not doing any
311 # fancy math at all, just doing a split), then creating some number
312 # of additional shares which can be substituted if the primary ones
315 crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
317 # memory footprint: we only hold a tiny piece of the plaintext at any
318 # given time. We build up a segment's worth of cryptttext, then hand
319 # it to the encoder. Assuming 3-of-10 encoding (3.3x expansion) and
320 # 1MiB max_segment_size, we get a peak memory footprint of 4.3*1MiB =
321 # 4.3MiB. Lowering max_segment_size to, say, 100KiB would drop the
322 # footprint to 430KiB at the expense of more hash-tree overhead.
324 d = self._gather_data(self.required_shares, input_piece_size,
325 crypttext_segment_hasher)
326 def _done_gathering(chunks):
328 assert len(c) == input_piece_size
329 self._crypttext_hashes.append(crypttext_segment_hasher.digest())
330 # during this call, we hit 5*segsize memory
331 return codec.encode(chunks)
332 d.addCallback(_done_gathering)
334 elapsed = time.time() - start
335 self._times["cumulative_encoding"] += elapsed
340 def _encode_tail_segment(self, segnum):
343 codec = self._tail_codec
344 input_piece_size = codec.get_block_size()
346 crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
348 d = self._gather_data(self.required_shares, input_piece_size,
349 crypttext_segment_hasher,
351 def _done_gathering(chunks):
353 # a short trailing chunk will have been padded by
355 assert len(c) == input_piece_size
356 self._crypttext_hashes.append(crypttext_segment_hasher.digest())
357 return codec.encode(chunks)
358 d.addCallback(_done_gathering)
360 elapsed = time.time() - start
361 self._times["cumulative_encoding"] += elapsed
366 def _gather_data(self, num_chunks, input_chunk_size,
367 crypttext_segment_hasher,
370 """Return a Deferred that will fire when the required number of
371 chunks have been read (and hashed and encrypted). The Deferred fires
372 with the combination of any 'previous_chunks' and the new chunks
373 which were gathered."""
376 raise UploadAborted()
379 return defer.succeed(previous_chunks)
381 d = self._uploadable.read_encrypted(input_chunk_size, False)
384 raise UploadAborted()
385 encrypted_pieces = []
388 encrypted_piece = data.pop(0)
389 length += len(encrypted_piece)
390 crypttext_segment_hasher.update(encrypted_piece)
391 self._crypttext_hasher.update(encrypted_piece)
392 encrypted_pieces.append(encrypted_piece)
395 if length < input_chunk_size:
397 pad_size = input_chunk_size - length
398 encrypted_pieces.append('\x00' * pad_size)
400 # non-tail segments should be the full segment size
401 if length != input_chunk_size:
402 log.msg("non-tail segment should be full segment size: %d!=%d"
403 % (length, input_chunk_size),
404 level=log.BAD, umid="jNk5Yw")
405 precondition(length == input_chunk_size,
406 "length=%d != input_chunk_size=%d" %
407 (length, input_chunk_size))
409 encrypted_piece = "".join(encrypted_pieces)
410 return previous_chunks + [encrypted_piece]
413 d.addCallback(lambda chunks:
414 self._gather_data(num_chunks-1, input_chunk_size,
415 crypttext_segment_hasher,
416 allow_short, chunks))
419 def _send_segment(self, (shares, shareids), segnum):
420 # To generate the URI, we must generate the roothash, so we must
421 # generate all shares, even if we aren't actually giving them to
422 # anybody. This means that the set of shares we create will be equal
423 # to or larger than the set of landlords. If we have any landlord who
424 # *doesn't* have a share, that's an error.
425 _assert(set(self.landlords.keys()).issubset(set(shareids)),
426 shareids=shareids, landlords=self.landlords)
429 self.set_status("Sending segment %d of %d" % (segnum+1,
431 self.set_encode_and_push_progress(segnum)
432 lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY)
433 for i in range(len(shares)):
435 shareid = shareids[i]
436 d = self.send_subshare(shareid, segnum, subshare, lognum)
438 subshare_hash = hashutil.block_hash(subshare)
439 #from allmydata.util import base32
440 #log.msg("creating block (shareid=%d, blocknum=%d) "
441 # "len=%d %r .. %r: %s" %
442 # (shareid, segnum, len(subshare),
443 # subshare[:50], subshare[-50:], base32.b2a(subshare_hash)))
444 self.subshare_hashes[shareid].append(subshare_hash)
446 dl = self._gather_responses(dl)
448 self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
450 self.segment_size*(segnum+1),
451 self.segment_size*self.num_segments,
452 100 * (segnum+1) / self.num_segments,
454 level=log.OPERATIONAL)
455 elapsed = time.time() - start
456 self._times["cumulative_sending"] += elapsed
458 dl.addCallback(_logit)
461 def send_subshare(self, shareid, segment_num, subshare, lognum):
462 if shareid not in self.landlords:
463 return defer.succeed(None)
464 sh = self.landlords[shareid]
465 lognum2 = self.log("put_block to %s" % self.landlords[shareid],
466 parent=lognum, level=log.NOISY)
467 d = sh.put_block(segment_num, subshare)
469 self.log("put_block done", parent=lognum2, level=log.NOISY)
472 d.addErrback(self._remove_shareholder, shareid,
473 "segnum=%d" % segment_num)
476 def _remove_shareholder(self, why, shareid, where):
477 ln = self.log(format="error while sending %(method)s to shareholder=%(shnum)d",
478 method=where, shnum=shareid,
479 level=log.UNUSUAL, failure=why)
480 if shareid in self.landlords:
481 self.landlords[shareid].abort()
482 del self.landlords[shareid]
485 self.log("they weren't in our list of landlords", parent=ln,
486 level=log.WEIRD, umid="TQGFRw")
487 if len(self.landlords) < self.shares_of_happiness:
488 msg = "lost too many shareholders during upload: %s" % why
489 raise NotEnoughSharesError(msg)
490 self.log("but we can still continue with %s shares, we'll be happy "
491 "with at least %s" % (len(self.landlords),
492 self.shares_of_happiness),
495 def _gather_responses(self, dl):
496 d = defer.DeferredList(dl, fireOnOneErrback=True)
497 def _eatNotEnoughSharesError(f):
498 # all exceptions that occur while talking to a peer are handled
499 # in _remove_shareholder. That might raise NotEnoughSharesError,
500 # which will cause the DeferredList to errback but which should
501 # otherwise be consumed. Allow non-NotEnoughSharesError exceptions
502 # to pass through as an unhandled errback. We use this in lieu of
503 # consumeErrors=True to allow coding errors to be logged.
504 f.trap(NotEnoughSharesError)
507 d0.addErrback(_eatNotEnoughSharesError)
510 def finish_hashing(self):
511 self._start_hashing_and_close_timestamp = time.time()
512 self.set_status("Finishing hashes")
513 self.set_encode_and_push_progress(extra=0.0)
514 crypttext_hash = self._crypttext_hasher.digest()
515 self.uri_extension_data["crypttext_hash"] = crypttext_hash
516 d = self._uploadable.get_plaintext_hash()
517 def _got(plaintext_hash):
518 self.log(format="plaintext_hash=%(plaintext_hash)s, SI=%(SI)s, size=%(size)d",
519 plaintext_hash=base32.b2a(plaintext_hash),
520 SI=storage.si_b2a(self._storage_index),
522 return plaintext_hash
524 if self.USE_PLAINTEXT_HASHES:
525 def _use_plaintext_hash(plaintext_hash):
526 self.uri_extension_data["plaintext_hash"] = plaintext_hash
527 return self._uploadable.get_plaintext_hashtree_leaves(0, self.num_segments, self.num_segments)
528 d.addCallback(_use_plaintext_hash)
529 def _got_hashtree_leaves(leaves):
530 self.log("Encoder: got plaintext_hashtree_leaves: %s" %
531 (",".join([base32.b2a(h) for h in leaves]),),
533 ht = list(HashTree(list(leaves)))
534 self.uri_extension_data["plaintext_root_hash"] = ht[0]
535 self._plaintext_hashtree_nodes = ht
536 d.addCallback(_got_hashtree_leaves)
538 d.addCallback(lambda res: self._uploadable.close())
541 def send_plaintext_hash_tree_to_all_shareholders(self):
542 self.log("sending plaintext hash tree", level=log.NOISY)
543 self.set_status("Sending Plaintext Hash Tree")
544 self.set_encode_and_push_progress(extra=0.2)
546 for shareid in self.landlords.keys():
547 d = self.send_plaintext_hash_tree(shareid,
548 self._plaintext_hashtree_nodes)
550 return self._gather_responses(dl)
552 def send_plaintext_hash_tree(self, shareid, all_hashes):
553 if shareid not in self.landlords:
554 return defer.succeed(None)
555 sh = self.landlords[shareid]
556 d = sh.put_plaintext_hashes(all_hashes)
557 d.addErrback(self._remove_shareholder, shareid, "put_plaintext_hashes")
560 def send_crypttext_hash_tree_to_all_shareholders(self):
561 self.log("sending crypttext hash tree", level=log.NOISY)
562 self.set_status("Sending Crypttext Hash Tree")
563 self.set_encode_and_push_progress(extra=0.3)
564 t = HashTree(self._crypttext_hashes)
566 self.uri_extension_data["crypttext_root_hash"] = t[0]
568 for shareid in self.landlords.keys():
569 dl.append(self.send_crypttext_hash_tree(shareid, all_hashes))
570 return self._gather_responses(dl)
572 def send_crypttext_hash_tree(self, shareid, all_hashes):
573 if shareid not in self.landlords:
574 return defer.succeed(None)
575 sh = self.landlords[shareid]
576 d = sh.put_crypttext_hashes(all_hashes)
577 d.addErrback(self._remove_shareholder, shareid, "put_crypttext_hashes")
580 def send_all_subshare_hash_trees(self):
581 self.log("sending subshare hash trees", level=log.NOISY)
582 self.set_status("Sending Subshare Hash Trees")
583 self.set_encode_and_push_progress(extra=0.4)
585 for shareid,hashes in enumerate(self.subshare_hashes):
586 # hashes is a list of the hashes of all subshares that were sent
587 # to shareholder[shareid].
588 dl.append(self.send_one_subshare_hash_tree(shareid, hashes))
589 return self._gather_responses(dl)
591 def send_one_subshare_hash_tree(self, shareid, subshare_hashes):
592 t = HashTree(subshare_hashes)
594 # all_hashes[0] is the root hash, == hash(ah[1]+ah[2])
595 # all_hashes[1] is the left child, == hash(ah[3]+ah[4])
596 # all_hashes[n] == hash(all_hashes[2*n+1] + all_hashes[2*n+2])
597 self.share_root_hashes[shareid] = t[0]
598 if shareid not in self.landlords:
599 return defer.succeed(None)
600 sh = self.landlords[shareid]
601 d = sh.put_block_hashes(all_hashes)
602 d.addErrback(self._remove_shareholder, shareid, "put_block_hashes")
605 def send_all_share_hash_trees(self):
606 # each bucket gets a set of share hash tree nodes that are needed to
607 # validate their share. This includes the share hash itself, but does
608 # not include the top-level hash root (which is stored securely in
610 self.log("sending all share hash trees", level=log.NOISY)
611 self.set_status("Sending Share Hash Trees")
612 self.set_encode_and_push_progress(extra=0.6)
614 for h in self.share_root_hashes:
616 # create the share hash tree
617 t = HashTree(self.share_root_hashes)
618 # the root of this hash tree goes into our URI
619 self.uri_extension_data['share_root_hash'] = t[0]
620 # now send just the necessary pieces out to each shareholder
621 for i in range(self.num_shares):
622 # the HashTree is given a list of leaves: 0,1,2,3..n .
623 # These become nodes A+0,A+1,A+2.. of the tree, where A=n-1
624 needed_hash_indices = t.needed_hashes(i, include_leaf=True)
625 hashes = [(hi, t[hi]) for hi in needed_hash_indices]
626 dl.append(self.send_one_share_hash_tree(i, hashes))
627 return self._gather_responses(dl)
629 def send_one_share_hash_tree(self, shareid, needed_hashes):
630 if shareid not in self.landlords:
631 return defer.succeed(None)
632 sh = self.landlords[shareid]
633 d = sh.put_share_hashes(needed_hashes)
634 d.addErrback(self._remove_shareholder, shareid, "put_share_hashes")
637 def send_uri_extension_to_all_shareholders(self):
638 lp = self.log("sending uri_extension", level=log.NOISY)
639 self.set_status("Sending URI Extensions")
640 self.set_encode_and_push_progress(extra=0.8)
641 for k in ('crypttext_root_hash', 'crypttext_hash',
643 assert k in self.uri_extension_data
644 if self.USE_PLAINTEXT_HASHES:
645 for k in ('plaintext_root_hash', 'plaintext_hash',
647 assert k in self.uri_extension_data
648 uri_extension = uri.pack_extension(self.uri_extension_data)
650 for k,v in self.uri_extension_data.items():
651 if k.endswith("hash"):
652 ed[k] = base32.b2a(v)
655 self.log("uri_extension_data is %s" % (ed,), level=log.NOISY, parent=lp)
656 self.uri_extension_hash = hashutil.uri_extension_hash(uri_extension)
658 for shareid in self.landlords.keys():
659 dl.append(self.send_uri_extension(shareid, uri_extension))
660 return self._gather_responses(dl)
662 def send_uri_extension(self, shareid, uri_extension):
663 sh = self.landlords[shareid]
664 d = sh.put_uri_extension(uri_extension)
665 d.addErrback(self._remove_shareholder, shareid, "put_uri_extension")
668 def close_all_shareholders(self):
669 self.log("closing shareholders", level=log.NOISY)
670 self.set_status("Closing Shareholders")
671 self.set_encode_and_push_progress(extra=0.9)
673 for shareid in self.landlords:
674 d = self.landlords[shareid].close()
675 d.addErrback(self._remove_shareholder, shareid, "close")
677 return self._gather_responses(dl)
680 self.log("upload done", level=log.OPERATIONAL)
681 self.set_status("Done")
682 self.set_encode_and_push_progress(extra=1.0) # done
684 h_and_c_elapsed = now - self._start_hashing_and_close_timestamp
685 self._times["hashes_and_close"] = h_and_c_elapsed
686 total_elapsed = now - self._start_total_timestamp
687 self._times["total_encode_and_push"] = total_elapsed
689 # update our sharemap
690 self._shares_placed = set(self.landlords.keys())
691 return (self.uri_extension_hash, self.required_shares,
692 self.num_shares, self.file_size)
695 self.log("upload failed", failure=f, level=log.UNUSUAL)
696 self.set_status("Failed")
697 # we need to abort any remaining shareholders, so they'll delete the
698 # partial share, allowing someone else to upload it again.
699 self.log("aborting shareholders", level=log.UNUSUAL)
700 for shareid in list(self.landlords.keys()):
701 self.landlords[shareid].abort()
702 if f.check(defer.FirstError):
703 return f.value.subFailure
706 def get_shares_placed(self):
707 # return a set of share numbers that were successfully placed.
708 return self._shares_placed
711 # return a dictionary of encode+push timings
714 def get_uri_extension_data(self):
715 return self.uri_extension_data