1 # -*- test-case-name: allmydata.test.test_encode -*-
4 from zope.interface import implements
5 from twisted.internet import defer
6 from foolscap.api import fireEventually
7 from allmydata import uri
8 from allmydata.storage.server import si_b2a
9 from allmydata.hashtree import HashTree
10 from allmydata.util import mathutil, hashutil, base32, log, happinessutil
11 from allmydata.util.assertutil import _assert, precondition
12 from allmydata.codec import CRSEncoder
13 from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
14 IEncryptedUploadable, IUploadStatus, UploadUnhappinessError
18 The goal of the encoder is to turn the original file into a series of
19 'shares'. Each share is going to a 'shareholder' (nominally each shareholder
20 is a different host, but for small grids there may be overlap). The number
21 of shares is chosen to hit our reliability goals (more shares on more
22 machines means more reliability), and is limited by overhead (proportional to
23 numshares or log(numshares)) and the encoding technology in use (zfec permits
24 only 256 shares total). It is also constrained by the amount of data
25 we want to send to each host. For estimating purposes, think of 10 shares
26 out of which we need 3 to reconstruct the file.
28 The encoder starts by cutting the original file into segments. All segments
29 except the last are of equal size. The segment size is chosen to constrain
30 the memory footprint (which will probably vary between 1x and 4x segment
31 size) and to constrain the overhead (which will be proportional to
32 log(number of segments)).
35 Each segment (A,B,C) is read into memory, encrypted, and encoded into
36 blocks. The 'share' (say, share #1) that makes it out to a host is a
37 collection of these blocks (block A1, B1, C1), plus some hash-tree
38 information necessary to validate the data upon retrieval. Only one segment
39 is handled at a time: all blocks for segment A are delivered before any
40 work is begun on segment B.
42 As blocks are created, we retain the hash of each one. The list of block hashes
43 for a single share (say, hash(A1), hash(B1), hash(C1)) is used to form the base
44 of a Merkle hash tree for that share, called the block hash tree.
46 This hash tree has one terminal leaf per block. The complete block hash
47 tree is sent to the shareholder after all the data has been sent. At
48 retrieval time, the decoder will ask for specific pieces of this tree before
49 asking for blocks, whichever it needs to validate those blocks.
51 (Note: we don't really need to generate this whole block hash tree
52 ourselves. It would be sufficient to have the shareholder generate it and
53 just tell us the root. This gives us an extra level of validation on the
54 transfer, though, and it is relatively cheap to compute.)
56 Each of these block hash trees has a root hash. The collection of these
57 root hashes for all shares are collected into the 'share hash tree', which
58 has one terminal leaf per share. After sending the blocks and the complete
59 block hash tree to each shareholder, we send them the portion of the share
60 hash tree that is necessary to validate their share. The root of the share
61 hash tree is put into the URI.
65 class UploadAborted(Exception):
74 class Encoder(object):
77 def __init__(self, log_parent=None, upload_status=None, progress=None):
79 self.uri_extension_data = {}
83 self._status = IUploadStatus(upload_status)
84 precondition(log_parent is None or isinstance(log_parent, int),
86 self._log_number = log.msg("creating Encoder %s" % self,
87 facility="tahoe.encoder", parent=log_parent)
89 self._progress = progress
92 if hasattr(self, "_storage_index"):
93 return "<Encoder for %s>" % si_b2a(self._storage_index)[:5]
94 return "<Encoder for unknown storage index>"
96 def log(self, *args, **kwargs):
97 if "parent" not in kwargs:
98 kwargs["parent"] = self._log_number
99 if "facility" not in kwargs:
100 kwargs["facility"] = "tahoe.encoder"
101 return log.msg(*args, **kwargs)
103 def set_encrypted_uploadable(self, uploadable):
104 eu = self._uploadable = IEncryptedUploadable(uploadable)
107 self.log(format="file size: %(size)d", size=size)
108 self.file_size = size
110 self._progress.set_progress_total(self.file_size)
111 d.addCallback(_got_size)
112 d.addCallback(lambda res: eu.get_all_encoding_parameters())
113 d.addCallback(self._got_all_encoding_parameters)
114 d.addCallback(lambda res: eu.get_storage_index())
115 def _done(storage_index):
116 self._storage_index = storage_index
121 def _got_all_encoding_parameters(self, params):
122 assert not self._codec
123 k, happy, n, segsize = params
124 self.required_shares = k
125 self.servers_of_happiness = happy
127 self.segment_size = segsize
128 self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize))
129 self.log("now setting up codec")
131 assert self.segment_size % self.required_shares == 0
133 self.num_segments = mathutil.div_ceil(self.file_size,
136 self._codec = CRSEncoder()
137 self._codec.set_params(self.segment_size,
138 self.required_shares, self.num_shares)
140 data = self.uri_extension_data
141 data['codec_name'] = self._codec.get_encoder_type()
142 data['codec_params'] = self._codec.get_serialized_params()
144 data['size'] = self.file_size
145 data['segment_size'] = self.segment_size
146 self.share_size = mathutil.div_ceil(self.file_size,
147 self.required_shares)
148 data['num_segments'] = self.num_segments
149 data['needed_shares'] = self.required_shares
150 data['total_shares'] = self.num_shares
152 # the "tail" is the last segment. This segment may or may not be
153 # shorter than all other segments. We use the "tail codec" to handle
154 # it. If the tail is short, we use a different codec instance. In
155 # addition, the tail codec must be fed data which has been padded out
157 tail_size = self.file_size % self.segment_size
159 tail_size = self.segment_size
161 # the tail codec is responsible for encoding tail_size bytes
162 padded_tail_size = mathutil.next_multiple(tail_size,
163 self.required_shares)
164 self._tail_codec = CRSEncoder()
165 self._tail_codec.set_params(padded_tail_size,
166 self.required_shares, self.num_shares)
167 data['tail_codec_params'] = self._tail_codec.get_serialized_params()
169 def _get_share_size(self):
170 share_size = mathutil.div_ceil(self.file_size, self.required_shares)
171 overhead = self._compute_overhead()
172 return share_size + overhead
174 def _compute_overhead(self):
177 def get_param(self, name):
180 if name == "storage_index":
181 return self._storage_index
182 elif name == "share_counts":
183 return (self.required_shares, self.servers_of_happiness,
185 elif name == "num_segments":
186 return self.num_segments
187 elif name == "segment_size":
188 return self.segment_size
189 elif name == "block_size":
190 return self._codec.get_block_size()
191 elif name == "share_size":
192 return self._get_share_size()
193 elif name == "serialized_params":
194 return self._codec.get_serialized_params()
196 raise KeyError("unknown parameter name '%s'" % name)
198 def set_shareholders(self, landlords, servermap):
199 assert isinstance(landlords, dict)
201 assert IStorageBucketWriter.providedBy(landlords[k])
202 self.landlords = landlords.copy()
203 assert isinstance(servermap, dict)
204 for v in servermap.itervalues():
205 assert isinstance(v, set)
206 self.servermap = servermap.copy()
209 """ Returns a Deferred that will fire with the verify cap (an instance of
210 uri.CHKFileVerifierURI)."""
211 self.log("%s starting" % (self,))
212 #paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
214 self._crypttext_hasher = hashutil.crypttext_hasher()
215 self._crypttext_hashes = []
217 self.block_hashes = [[] for x in range(self.num_shares)]
218 # block_hashes[i] is a list that will be accumulated and then send
219 # to landlord[i]. This list contains a hash of each segment_share
220 # that we sent to that landlord.
221 self.share_root_hashes = [None] * self.num_shares
224 "cumulative_encoding": 0.0,
225 "cumulative_sending": 0.0,
226 "hashes_and_close": 0.0,
227 "total_encode_and_push": 0.0,
229 self._start_total_timestamp = time.time()
233 d.addCallback(lambda res: self.start_all_shareholders())
235 for i in range(self.num_segments-1):
236 # note to self: this form doesn't work, because lambda only
237 # captures the slot, not the value
238 #d.addCallback(lambda res: self.do_segment(i))
239 # use this form instead:
240 d.addCallback(lambda res, i=i: self._encode_segment(i))
241 d.addCallback(self._send_segment, i)
242 d.addCallback(self._turn_barrier)
243 last_segnum = self.num_segments - 1
244 d.addCallback(lambda res: self._encode_tail_segment(last_segnum))
245 d.addCallback(self._send_segment, last_segnum)
246 d.addCallback(self._turn_barrier)
248 d.addCallback(lambda res: self.finish_hashing())
250 d.addCallback(lambda res:
251 self.send_crypttext_hash_tree_to_all_shareholders())
252 d.addCallback(lambda res: self.send_all_block_hash_trees())
253 d.addCallback(lambda res: self.send_all_share_hash_trees())
254 d.addCallback(lambda res: self.send_uri_extension_to_all_shareholders())
256 d.addCallback(lambda res: self.close_all_shareholders())
257 d.addCallbacks(self.done, self.err)
260 def set_status(self, status):
262 self._status.set_status(status)
264 def set_encode_and_push_progress(self, sent_segments=None, extra=0.0):
266 # we treat the final hash+close as an extra segment
267 if sent_segments is None:
268 sent_segments = self.num_segments
269 progress = float(sent_segments + extra) / (self.num_segments + 1)
270 self._status.set_progress(2, progress)
273 self.log("aborting upload", level=log.UNUSUAL)
274 assert self._codec, "don't call abort before start"
276 # the next segment read (in _gather_data inside _encode_segment) will
277 # raise UploadAborted(), which will bypass the rest of the upload
278 # chain. If we've sent the final segment's shares, it's too late to
279 # abort. TODO: allow abort any time up to close_all_shareholders.
281 def _turn_barrier(self, res):
282 # putting this method in a Deferred chain imposes a guaranteed
283 # reactor turn between the pre- and post- portions of that chain.
284 # This can be useful to limit memory consumption: since Deferreds do
285 # not do tail recursion, code which uses defer.succeed(result) for
286 # consistency will cause objects to live for longer than you might
289 return fireEventually(res)
292 def start_all_shareholders(self):
293 self.log("starting shareholders", level=log.NOISY)
294 self.set_status("Starting shareholders")
296 for shareid in list(self.landlords):
297 d = self.landlords[shareid].put_header()
298 d.addErrback(self._remove_shareholder, shareid, "start")
300 return self._gather_responses(dl)
302 def _encode_segment(self, segnum):
306 # the ICodecEncoder API wants to receive a total of self.segment_size
307 # bytes on each encode() call, broken up into a number of
308 # identically-sized pieces. Due to the way the codec algorithm works,
309 # these pieces need to be the same size as the share which the codec
310 # will generate. Therefore we must feed it with input_piece_size that
311 # equals the output share size.
312 input_piece_size = codec.get_block_size()
314 # as a result, the number of input pieces per encode() call will be
315 # equal to the number of required shares with which the codec was
316 # constructed. You can think of the codec as chopping up a
317 # 'segment_size' of data into 'required_shares' shares (not doing any
318 # fancy math at all, just doing a split), then creating some number
319 # of additional shares which can be substituted if the primary ones
322 # we read data from the source one segment at a time, and then chop
323 # it into 'input_piece_size' pieces before handing it to the codec
325 crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
327 # memory footprint: we only hold a tiny piece of the plaintext at any
328 # given time. We build up a segment's worth of cryptttext, then hand
329 # it to the encoder. Assuming 3-of-10 encoding (3.3x expansion) and
330 # 1MiB max_segment_size, we get a peak memory footprint of 4.3*1MiB =
331 # 4.3MiB. Lowering max_segment_size to, say, 100KiB would drop the
332 # footprint to 430KiB at the expense of more hash-tree overhead.
334 d = self._gather_data(self.required_shares, input_piece_size,
335 crypttext_segment_hasher)
336 def _done_gathering(chunks):
338 assert len(c) == input_piece_size
339 self._crypttext_hashes.append(crypttext_segment_hasher.digest())
340 # during this call, we hit 5*segsize memory
341 return codec.encode(chunks)
342 d.addCallback(_done_gathering)
344 elapsed = time.time() - start
345 self._times["cumulative_encoding"] += elapsed
350 def _encode_tail_segment(self, segnum):
353 codec = self._tail_codec
354 input_piece_size = codec.get_block_size()
356 crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
358 d = self._gather_data(self.required_shares, input_piece_size,
359 crypttext_segment_hasher, allow_short=True)
360 def _done_gathering(chunks):
362 # a short trailing chunk will have been padded by
364 assert len(c) == input_piece_size
365 self._crypttext_hashes.append(crypttext_segment_hasher.digest())
366 return codec.encode(chunks)
367 d.addCallback(_done_gathering)
369 elapsed = time.time() - start
370 self._times["cumulative_encoding"] += elapsed
375 def _gather_data(self, num_chunks, input_chunk_size,
376 crypttext_segment_hasher,
378 """Return a Deferred that will fire when the required number of
379 chunks have been read (and hashed and encrypted). The Deferred fires
380 with a list of chunks, each of size input_chunk_size."""
382 # I originally built this to allow read_encrypted() to behave badly:
383 # to let it return more or less data than you asked for. It would
384 # stash the leftovers until later, and then recurse until it got
385 # enough. I don't think that was actually useful.
387 # who defines read_encrypted?
388 # offloaded.LocalCiphertextReader: real disk file: exact
389 # upload.EncryptAnUploadable: Uploadable, but a wrapper that makes
390 # it exact. The return value is a list of 50KiB chunks, to reduce
391 # the memory footprint of the encryption process.
392 # repairer.Repairer: immutable.filenode.CiphertextFileNode: exact
394 # This has been redefined to require read_encrypted() to behave like
395 # a local file: return exactly the amount requested unless it hits
400 raise UploadAborted()
402 read_size = num_chunks * input_chunk_size
403 d = self._uploadable.read_encrypted(read_size, hash_only=False)
405 assert isinstance(data, (list,tuple))
407 raise UploadAborted()
409 precondition(len(data) <= read_size, len(data), read_size)
411 precondition(len(data) == read_size, len(data), read_size)
412 crypttext_segment_hasher.update(data)
413 self._crypttext_hasher.update(data)
414 if allow_short and len(data) < read_size:
416 data += "\x00" * (read_size - len(data))
417 encrypted_pieces = [data[i:i+input_chunk_size]
418 for i in range(0, len(data), input_chunk_size)]
419 return encrypted_pieces
423 def _send_segment(self, (shares, shareids), segnum):
424 # To generate the URI, we must generate the roothash, so we must
425 # generate all shares, even if we aren't actually giving them to
426 # anybody. This means that the set of shares we create will be equal
427 # to or larger than the set of landlords. If we have any landlord who
428 # *doesn't* have a share, that's an error.
429 _assert(set(self.landlords.keys()).issubset(set(shareids)),
430 shareids=shareids, landlords=self.landlords)
433 self.set_status("Sending segment %d of %d" % (segnum+1,
435 self.set_encode_and_push_progress(segnum)
436 lognum = self.log("send_segment(%d)" % segnum, level=log.NOISY)
437 for i in range(len(shares)):
439 shareid = shareids[i]
440 d = self.send_block(shareid, segnum, block, lognum)
443 block_hash = hashutil.block_hash(block)
444 #from allmydata.util import base32
445 #log.msg("creating block (shareid=%d, blocknum=%d) "
446 # "len=%d %r .. %r: %s" %
447 # (shareid, segnum, len(block),
448 # block[:50], block[-50:], base32.b2a(block_hash)))
449 self.block_hashes[shareid].append(block_hash)
451 dl = self._gather_responses(dl)
453 def do_progress(ign):
454 done = self.segment_size * (segnum + 1)
456 self._progress.set_progress(done)
458 dl.addCallback(do_progress)
461 self.log("%s uploaded %s / %s bytes (%d%%) of your file." %
463 self.segment_size*(segnum+1),
464 self.segment_size*self.num_segments,
465 100 * (segnum+1) / self.num_segments,
467 level=log.OPERATIONAL)
468 elapsed = time.time() - start
469 self._times["cumulative_sending"] += elapsed
471 dl.addCallback(_logit)
474 def send_block(self, shareid, segment_num, block, lognum):
475 if shareid not in self.landlords:
476 return defer.succeed(None)
477 sh = self.landlords[shareid]
478 lognum2 = self.log("put_block to %s" % self.landlords[shareid],
479 parent=lognum, level=log.NOISY)
480 d = sh.put_block(segment_num, block)
482 self.log("put_block done", parent=lognum2, level=log.NOISY)
485 d.addErrback(self._remove_shareholder, shareid,
486 "segnum=%d" % segment_num)
489 def _remove_shareholder(self, why, shareid, where):
490 ln = self.log(format="error while sending %(method)s to shareholder=%(shnum)d",
491 method=where, shnum=shareid,
492 level=log.UNUSUAL, failure=why)
493 if shareid in self.landlords:
494 self.landlords[shareid].abort()
495 peerid = self.landlords[shareid].get_peerid()
497 del self.landlords[shareid]
498 self.servermap[shareid].remove(peerid)
499 if not self.servermap[shareid]:
500 del self.servermap[shareid]
503 self.log("they weren't in our list of landlords", parent=ln,
504 level=log.WEIRD, umid="TQGFRw")
505 happiness = happinessutil.servers_of_happiness(self.servermap)
506 if happiness < self.servers_of_happiness:
507 peerids = set(happinessutil.shares_by_server(self.servermap).keys())
508 msg = happinessutil.failure_message(len(peerids),
509 self.required_shares,
510 self.servers_of_happiness,
512 msg = "%s: %s" % (msg, why)
513 raise UploadUnhappinessError(msg)
514 self.log("but we can still continue with %s shares, we'll be happy "
515 "with at least %s" % (happiness,
516 self.servers_of_happiness),
519 def _gather_responses(self, dl):
520 d = defer.DeferredList(dl, fireOnOneErrback=True)
521 def _eatUploadUnhappinessError(f):
522 # all exceptions that occur while talking to a peer are handled
523 # in _remove_shareholder. That might raise UploadUnhappinessError,
524 # which will cause the DeferredList to errback but which should
525 # otherwise be consumed. Allow non-UploadUnhappinessError exceptions
526 # to pass through as an unhandled errback. We use this in lieu of
527 # consumeErrors=True to allow coding errors to be logged.
528 f.trap(UploadUnhappinessError)
531 d0.addErrback(_eatUploadUnhappinessError)
534 def finish_hashing(self):
535 self._start_hashing_and_close_timestamp = time.time()
536 self.set_status("Finishing hashes")
537 self.set_encode_and_push_progress(extra=0.0)
538 crypttext_hash = self._crypttext_hasher.digest()
539 self.uri_extension_data["crypttext_hash"] = crypttext_hash
540 self._uploadable.close()
542 def send_crypttext_hash_tree_to_all_shareholders(self):
543 self.log("sending crypttext hash tree", level=log.NOISY)
544 self.set_status("Sending Crypttext Hash Tree")
545 self.set_encode_and_push_progress(extra=0.3)
546 t = HashTree(self._crypttext_hashes)
548 self.uri_extension_data["crypttext_root_hash"] = t[0]
550 for shareid in list(self.landlords):
551 dl.append(self.send_crypttext_hash_tree(shareid, all_hashes))
552 return self._gather_responses(dl)
554 def send_crypttext_hash_tree(self, shareid, all_hashes):
555 if shareid not in self.landlords:
556 return defer.succeed(None)
557 sh = self.landlords[shareid]
558 d = sh.put_crypttext_hashes(all_hashes)
559 d.addErrback(self._remove_shareholder, shareid, "put_crypttext_hashes")
562 def send_all_block_hash_trees(self):
563 self.log("sending block hash trees", level=log.NOISY)
564 self.set_status("Sending Subshare Hash Trees")
565 self.set_encode_and_push_progress(extra=0.4)
567 for shareid,hashes in enumerate(self.block_hashes):
568 # hashes is a list of the hashes of all blocks that were sent
569 # to shareholder[shareid].
570 dl.append(self.send_one_block_hash_tree(shareid, hashes))
571 return self._gather_responses(dl)
573 def send_one_block_hash_tree(self, shareid, block_hashes):
574 t = HashTree(block_hashes)
576 # all_hashes[0] is the root hash, == hash(ah[1]+ah[2])
577 # all_hashes[1] is the left child, == hash(ah[3]+ah[4])
578 # all_hashes[n] == hash(all_hashes[2*n+1] + all_hashes[2*n+2])
579 self.share_root_hashes[shareid] = t[0]
580 if shareid not in self.landlords:
581 return defer.succeed(None)
582 sh = self.landlords[shareid]
583 d = sh.put_block_hashes(all_hashes)
584 d.addErrback(self._remove_shareholder, shareid, "put_block_hashes")
587 def send_all_share_hash_trees(self):
588 # Each bucket gets a set of share hash tree nodes that are needed to validate their
589 # share. This includes the share hash itself, but does not include the top-level hash
590 # root (which is stored securely in the URI instead).
591 self.log("sending all share hash trees", level=log.NOISY)
592 self.set_status("Sending Share Hash Trees")
593 self.set_encode_and_push_progress(extra=0.6)
595 for h in self.share_root_hashes:
597 # create the share hash tree
598 t = HashTree(self.share_root_hashes)
599 # the root of this hash tree goes into our URI
600 self.uri_extension_data['share_root_hash'] = t[0]
601 # now send just the necessary pieces out to each shareholder
602 for i in range(self.num_shares):
603 # the HashTree is given a list of leaves: 0,1,2,3..n .
604 # These become nodes A+0,A+1,A+2.. of the tree, where A=n-1
605 needed_hash_indices = t.needed_hashes(i, include_leaf=True)
606 hashes = [(hi, t[hi]) for hi in needed_hash_indices]
607 dl.append(self.send_one_share_hash_tree(i, hashes))
608 return self._gather_responses(dl)
610 def send_one_share_hash_tree(self, shareid, needed_hashes):
611 if shareid not in self.landlords:
612 return defer.succeed(None)
613 sh = self.landlords[shareid]
614 d = sh.put_share_hashes(needed_hashes)
615 d.addErrback(self._remove_shareholder, shareid, "put_share_hashes")
618 def send_uri_extension_to_all_shareholders(self):
619 lp = self.log("sending uri_extension", level=log.NOISY)
620 self.set_status("Sending URI Extensions")
621 self.set_encode_and_push_progress(extra=0.8)
622 for k in ('crypttext_root_hash', 'crypttext_hash',
624 assert k in self.uri_extension_data
625 uri_extension = uri.pack_extension(self.uri_extension_data)
627 for k,v in self.uri_extension_data.items():
628 if k.endswith("hash"):
629 ed[k] = base32.b2a(v)
632 self.log("uri_extension_data is %s" % (ed,), level=log.NOISY, parent=lp)
633 self.uri_extension_hash = hashutil.uri_extension_hash(uri_extension)
635 for shareid in list(self.landlords):
636 dl.append(self.send_uri_extension(shareid, uri_extension))
637 return self._gather_responses(dl)
639 def send_uri_extension(self, shareid, uri_extension):
640 sh = self.landlords[shareid]
641 d = sh.put_uri_extension(uri_extension)
642 d.addErrback(self._remove_shareholder, shareid, "put_uri_extension")
645 def close_all_shareholders(self):
646 self.log("closing shareholders", level=log.NOISY)
647 self.set_status("Closing Shareholders")
648 self.set_encode_and_push_progress(extra=0.9)
650 for shareid in list(self.landlords):
651 d = self.landlords[shareid].close()
652 d.addErrback(self._remove_shareholder, shareid, "close")
654 return self._gather_responses(dl)
657 self.log("upload done", level=log.OPERATIONAL)
658 self.set_status("Finished")
659 self.set_encode_and_push_progress(extra=1.0) # done
661 h_and_c_elapsed = now - self._start_hashing_and_close_timestamp
662 self._times["hashes_and_close"] = h_and_c_elapsed
663 total_elapsed = now - self._start_total_timestamp
664 self._times["total_encode_and_push"] = total_elapsed
666 # update our sharemap
667 self._shares_placed = set(self.landlords.keys())
668 return uri.CHKFileVerifierURI(self._storage_index, self.uri_extension_hash,
669 self.required_shares, self.num_shares, self.file_size)
672 self.log("upload failed", failure=f, level=log.UNUSUAL)
673 self.set_status("Failed")
674 # we need to abort any remaining shareholders, so they'll delete the
675 # partial share, allowing someone else to upload it again.
676 self.log("aborting shareholders", level=log.UNUSUAL)
677 for shareid in list(self.landlords):
678 self.landlords[shareid].abort()
679 if f.check(defer.FirstError):
680 return f.value.subFailure
683 def get_shares_placed(self):
684 # return a set of share numbers that were successfully placed.
685 return self._shares_placed
688 # return a dictionary of encode+push timings
691 def get_uri_extension_data(self):
692 return self.uri_extension_data
693 def get_uri_extension_hash(self):
694 return self.uri_extension_hash