1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9 file_cancel_secret_hash, bucket_renewal_secret_hash, \
10 bucket_cancel_secret_hash, plaintext_hasher, \
11 storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.rrefutil import add_version_to_remote_reference
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
20 NoServersError, InsufficientVersionError, UploadUnhappinessError
21 from allmydata.immutable import layout
22 from pycryptopp.cipher.aes import AES
24 from cStringIO import StringIO
33 class HaveAllPeersError(Exception):
34 # we use this to jump out of the loop
37 # this wants to live in storage, not here
38 class TooFullError(Exception):
41 class UploadResults(Copyable, RemoteCopy):
42 implements(IUploadResults)
43 # note: don't change this string, it needs to match the value used on the
44 # helper, and it does *not* need to match the fully-qualified
45 # package/module/class name
46 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
49 # also, think twice about changing the shape of any existing attribute,
50 # because instances of this class are sent from the helper to its client,
51 # so changing this may break compatibility. Consider adding new fields
52 # instead of modifying existing ones.
55 self.timings = {} # dict of name to number of seconds
56 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
57 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
59 self.ciphertext_fetched = None # how much the helper fetched
61 self.preexisting_shares = None # count of shares already present
62 self.pushed_shares = None # count of shares we pushed
65 # our current uri_extension is 846 bytes for small files, a few bytes
66 # more for larger ones (since the filesize is encoded in decimal in a
67 # few places). Ask for a little bit more just in case we need it. If
68 # the extension changes size, we can change EXTENSION_SIZE to
69 # allocate a more accurate amount of space.
71 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
75 def __init__(self, peerid, storage_server,
76 sharesize, blocksize, num_segments, num_share_hashes,
78 bucket_renewal_secret, bucket_cancel_secret):
79 precondition(isinstance(peerid, str), peerid)
80 precondition(len(peerid) == 20, peerid)
82 self._storageserver = storage_server # to an RIStorageServer
83 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
84 self.sharesize = sharesize
86 wbp = layout.make_write_bucket_proxy(None, sharesize,
87 blocksize, num_segments,
89 EXTENSION_SIZE, peerid)
90 self.wbp_class = wbp.__class__ # to create more of them
91 self.allocated_size = wbp.get_allocated_size()
92 self.blocksize = blocksize
93 self.num_segments = num_segments
94 self.num_share_hashes = num_share_hashes
95 self.storage_index = storage_index
97 self.renew_secret = bucket_renewal_secret
98 self.cancel_secret = bucket_cancel_secret
101 return ("<PeerTracker for peer %s and SI %s>"
102 % (idlib.shortnodeid_b2a(self.peerid),
103 si_b2a(self.storage_index)[:5]))
105 def query(self, sharenums):
106 d = self._storageserver.callRemote("allocate_buckets",
112 canary=Referenceable())
113 d.addCallback(self._got_reply)
116 def query_allocated(self):
117 d = self._storageserver.callRemote("get_buckets",
121 def _got_reply(self, (alreadygot, buckets)):
122 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
124 for sharenum, rref in buckets.iteritems():
125 bp = self.wbp_class(rref, self.sharesize,
128 self.num_share_hashes,
132 self.buckets.update(b)
133 return (alreadygot, set(b.keys()))
135 def servers_with_unique_shares(existing_shares, used_peers=None):
137 I accept a dict of shareid -> peerid mappings (and optionally a list
138 of PeerTracker instances) and return a list of servers that have shares.
141 existing_shares = existing_shares.copy()
144 for peer in used_peers:
145 peerdict.update(dict([(i, peer.peerid) for i in peer.buckets]))
146 for k in peerdict.keys():
147 if existing_shares.has_key(k):
148 # Prevent overcounting; favor the bucket, and not the
150 del(existing_shares[k])
151 peers = list(used_peers.copy())
152 # We do this because the preexisting shares list goes by peerid.
153 peers = [x.peerid for x in peers]
154 servers.extend(peers)
155 servers.extend(existing_shares.values())
156 return list(set(servers))
158 def shares_by_server(existing_shares):
160 I accept a dict of shareid -> peerid mappings, and return a dict
161 of peerid -> shareid mappings
164 for server in set(existing_shares.values()):
165 servers[server] = set([x for x in existing_shares.keys()
166 if existing_shares[x] == server])
169 def should_add_server(existing_shares, server, bucket):
171 I tell my caller whether the servers_of_happiness number will be
172 increased or decreased if a particular server is added as the peer
173 already holding a particular share. I take a dictionary, a peerid,
174 and a bucket as arguments, and return a boolean.
176 old_size = len(servers_with_unique_shares(existing_shares))
177 new_candidate = existing_shares.copy()
178 new_candidate[bucket] = server
179 new_size = len(servers_with_unique_shares(new_candidate))
180 return old_size < new_size
182 class Tahoe2PeerSelector:
184 def __init__(self, upload_id, logparent=None, upload_status=None):
185 self.upload_id = upload_id
186 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
187 # Peers that are working normally, but full.
190 self.num_peers_contacted = 0
191 self.last_failure_msg = None
192 self._status = IUploadStatus(upload_status)
193 self._log_parent = log.msg("%s starting" % self, parent=logparent)
196 return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
198 def get_shareholders(self, storage_broker, secret_holder,
199 storage_index, share_size, block_size,
200 num_segments, total_shares, needed_shares,
201 servers_of_happiness):
203 @return: (used_peers, already_peers), where used_peers is a set of
204 PeerTracker instances that have agreed to hold some shares
205 for us (the shnum is stashed inside the PeerTracker),
206 and already_peers is a dict mapping shnum to a peer
207 which claims to already have the share.
211 self._status.set_status("Contacting Peers..")
213 self.total_shares = total_shares
214 self.servers_of_happiness = servers_of_happiness
215 self.needed_shares = needed_shares
217 self.homeless_shares = range(total_shares)
218 # self.uncontacted_peers = list() # peers we haven't asked yet
219 self.contacted_peers = [] # peers worth asking again
220 self.contacted_peers2 = [] # peers that we have asked again
221 self._started_second_pass = False
222 self.use_peers = set() # PeerTrackers that have shares assigned to them
223 self.preexisting_shares = {} # sharenum -> peerid holding the share
224 # We don't try to allocate shares to these servers, since they've
225 # said that they're incapable of storing shares of the size that
226 # we'd want to store. We keep them around because they may have
227 # existing shares for this storage index, which we want to know
228 # about for accurate servers_of_happiness accounting
229 self.readonly_peers = []
230 # These peers have shares -- any shares -- for our SI. We keep track
231 # of these to write an error message with them later.
232 self.peers_with_shares = []
234 peers = storage_broker.get_servers_for_index(storage_index)
236 raise NoServersError("client gave us zero peers")
238 # this needed_hashes computation should mirror
239 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
240 # (instead of a HashTree) because we don't require actual hashing
241 # just to count the levels.
242 ht = hashtree.IncompleteHashTree(total_shares)
243 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
245 # figure out how much space to ask for
246 wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
247 num_share_hashes, EXTENSION_SIZE,
249 allocated_size = wbp.get_allocated_size()
251 # filter the list of peers according to which ones can accomodate
252 # this request. This excludes older peers (which used a 4-byte size
253 # field) from getting large shares (for files larger than about
254 # 12GiB). See #439 for details.
255 def _get_maxsize(peer):
256 (peerid, conn) = peer
257 v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
258 return v1["maximum-immutable-share-size"]
259 new_peers = [peer for peer in peers
260 if _get_maxsize(peer) >= allocated_size]
261 old_peers = list(set(peers).difference(set(new_peers)))
264 # decide upon the renewal/cancel secrets, to include them in the
265 # allocate_buckets query.
266 client_renewal_secret = secret_holder.get_renewal_secret()
267 client_cancel_secret = secret_holder.get_cancel_secret()
269 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
271 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
273 def _make_trackers(peers):
274 return [ PeerTracker(peerid, conn,
275 share_size, block_size,
276 num_segments, num_share_hashes,
278 bucket_renewal_secret_hash(file_renewal_secret,
280 bucket_cancel_secret_hash(file_cancel_secret,
282 for (peerid, conn) in peers]
283 self.uncontacted_peers = _make_trackers(peers)
284 self.readonly_peers = _make_trackers(old_peers)
285 # Talk to the readonly servers to get an idea of what servers
286 # have what shares (if any) for this storage index
287 d = defer.maybeDeferred(self._existing_shares)
288 d.addCallback(lambda ign: self._loop())
291 def _existing_shares(self):
292 if self.readonly_peers:
293 peer = self.readonly_peers.pop()
294 assert isinstance(peer, PeerTracker)
295 d = peer.query_allocated()
296 d.addBoth(self._handle_existing_response, peer.peerid)
297 self.num_peers_contacted += 1
298 self.query_count += 1
299 log.msg("asking peer %s for any existing shares for upload id %s"
300 % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
301 level=log.NOISY, parent=self._log_parent)
303 self._status.set_status("Contacting Peer %s to find "
304 "any existing shares"
305 % idlib.shortnodeid_b2a(peer.peerid))
308 def _handle_existing_response(self, res, peer):
309 if isinstance(res, failure.Failure):
310 log.msg("%s got error during existing shares check: %s"
311 % (idlib.shortnodeid_b2a(peer), res),
312 level=log.UNUSUAL, parent=self._log_parent)
313 self.error_count += 1
314 self.bad_query_count += 1
318 self.peers_with_shares.append(peer)
319 log.msg("response from peer %s: alreadygot=%s"
320 % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
321 level=log.NOISY, parent=self._log_parent)
322 for bucket in buckets:
323 if should_add_server(self.preexisting_shares, peer, bucket):
324 self.preexisting_shares[bucket] = peer
325 if self.homeless_shares and bucket in self.homeless_shares:
326 self.homeless_shares.remove(bucket)
328 self.bad_query_count += 1
329 return self._existing_shares()
331 def _get_progress_message(self):
332 if not self.homeless_shares:
333 msg = "placed all %d shares, " % (self.total_shares)
335 msg = ("placed %d shares out of %d total (%d homeless), " %
336 (self.total_shares - len(self.homeless_shares),
338 len(self.homeless_shares)))
339 return (msg + "want to place shares on at least %d servers such that "
340 "any %d of them have enough shares to recover the file, "
341 "sent %d queries to %d peers, "
342 "%d queries placed some shares, %d placed none "
343 "(of which %d placed none due to the server being"
344 " full and %d placed none due to an error)" %
345 (self.servers_of_happiness, self.needed_shares,
346 self.query_count, self.num_peers_contacted,
347 self.good_query_count, self.bad_query_count,
348 self.full_count, self.error_count))
352 if not self.homeless_shares:
353 effective_happiness = servers_with_unique_shares(
354 self.preexisting_shares,
356 if self.servers_of_happiness <= len(effective_happiness):
357 msg = ("peer selection successful for %s: %s" % (self,
358 self._get_progress_message()))
359 log.msg(msg, parent=self._log_parent)
360 return (self.use_peers, self.preexisting_shares)
362 delta = self.servers_of_happiness - len(effective_happiness)
363 shares = shares_by_server(self.preexisting_shares)
364 # Each server in shares maps to a set of shares stored on it.
365 # Since we want to keep at least one share on each server
366 # that has one (otherwise we'd only be making
367 # the situation worse by removing distinct servers),
368 # each server has len(its shares) - 1 to spread around.
369 shares_to_spread = sum([len(list(sharelist)) - 1
370 for (server, sharelist)
372 if delta <= len(self.uncontacted_peers) and \
373 shares_to_spread >= delta:
374 # Loop through the allocated shares, removing
375 # one from each server that has more than one and putting
376 # it back into self.homeless_shares until we've done
378 items = shares.items()
379 while len(self.homeless_shares) < delta:
380 servernum, sharelist = items.pop()
381 if len(sharelist) > 1:
382 share = sharelist.pop()
383 self.homeless_shares.append(share)
384 del(self.preexisting_shares[share])
385 items.append((servernum, sharelist))
388 peer_count = len(list(set(self.peers_with_shares)))
389 # If peer_count < needed_shares, then the second error
390 # message is nonsensical, so we use this one.
391 if peer_count < self.needed_shares:
392 msg = ("shares could only be placed or found on %d "
394 "We were asked to place shares on at least %d "
395 "server(s) such that any %d of them have "
396 "enough shares to recover the file." %
398 self.servers_of_happiness,
400 # Otherwise, if we've placed on at least needed_shares
401 # peers, but there isn't an x-happy subset of those peers
402 # for x < needed_shares, we use this error message.
403 elif len(effective_happiness) < self.needed_shares:
404 msg = ("shares could be placed or found on %d "
405 "server(s), but they are not spread out evenly "
406 "enough to ensure that any %d of these servers "
407 "would have enough shares to recover the file. "
408 "We were asked to place "
409 "shares on at least %d servers such that any "
410 "%d of them have enough shares to recover the "
414 self.servers_of_happiness,
416 # Otherwise, if there is an x-happy subset of peers where
417 # x >= needed_shares, but x < shares_of_happiness, then
418 # we use this message.
420 msg = ("shares could only be placed on %d server(s) "
421 "such that any %d of them have enough shares "
422 "to recover the file, but we were asked to use "
423 "at least %d such servers." %
424 (len(effective_happiness),
426 self.servers_of_happiness))
427 raise UploadUnhappinessError(msg)
429 if self.uncontacted_peers:
430 peer = self.uncontacted_peers.pop(0)
431 # TODO: don't pre-convert all peerids to PeerTrackers
432 assert isinstance(peer, PeerTracker)
434 shares_to_ask = set([self.homeless_shares.pop(0)])
435 self.query_count += 1
436 self.num_peers_contacted += 1
438 self._status.set_status("Contacting Peers [%s] (first query),"
440 % (idlib.shortnodeid_b2a(peer.peerid),
441 len(self.homeless_shares)))
442 d = peer.query(shares_to_ask)
443 d.addBoth(self._got_response, peer, shares_to_ask,
444 self.contacted_peers)
446 elif self.contacted_peers:
447 # ask a peer that we've already asked.
448 if not self._started_second_pass:
449 log.msg("starting second pass", parent=self._log_parent,
451 self._started_second_pass = True
452 num_shares = mathutil.div_ceil(len(self.homeless_shares),
453 len(self.contacted_peers))
454 peer = self.contacted_peers.pop(0)
455 shares_to_ask = set(self.homeless_shares[:num_shares])
456 self.homeless_shares[:num_shares] = []
457 self.query_count += 1
459 self._status.set_status("Contacting Peers [%s] (second query),"
461 % (idlib.shortnodeid_b2a(peer.peerid),
462 len(self.homeless_shares)))
463 d = peer.query(shares_to_ask)
464 d.addBoth(self._got_response, peer, shares_to_ask,
465 self.contacted_peers2)
467 elif self.contacted_peers2:
468 # we've finished the second-or-later pass. Move all the remaining
469 # peers back into self.contacted_peers for the next pass.
470 self.contacted_peers.extend(self.contacted_peers2)
471 self.contacted_peers2[:] = []
474 # no more peers. If we haven't placed enough shares, we fail.
475 placed_shares = self.total_shares - len(self.homeless_shares)
476 effective_happiness = servers_with_unique_shares(
477 self.preexisting_shares,
479 if len(effective_happiness) < self.servers_of_happiness:
480 msg = ("peer selection failed for %s: %s" % (self,
481 self._get_progress_message()))
482 if self.last_failure_msg:
483 msg += " (%s)" % (self.last_failure_msg,)
484 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
485 raise UploadUnhappinessError(msg)
487 # we placed enough to be happy, so we're done
489 self._status.set_status("Placed all shares")
490 return (self.use_peers, self.preexisting_shares)
492 def _got_response(self, res, peer, shares_to_ask, put_peer_here):
493 if isinstance(res, failure.Failure):
494 # This is unusual, and probably indicates a bug or a network
496 log.msg("%s got error during peer selection: %s" % (peer, res),
497 level=log.UNUSUAL, parent=self._log_parent)
498 self.error_count += 1
499 self.bad_query_count += 1
500 self.homeless_shares = list(shares_to_ask) + self.homeless_shares
501 if (self.uncontacted_peers
502 or self.contacted_peers
503 or self.contacted_peers2):
504 # there is still hope, so just loop
507 # No more peers, so this upload might fail (it depends upon
508 # whether we've hit servers_of_happiness or not). Log the last
509 # failure we got: if a coding error causes all peers to fail
510 # in the same way, this allows the common failure to be seen
511 # by the uploader and should help with debugging
512 msg = ("last failure (from %s) was: %s" % (peer, res))
513 self.last_failure_msg = msg
515 (alreadygot, allocated) = res
516 log.msg("response from peer %s: alreadygot=%s, allocated=%s"
517 % (idlib.shortnodeid_b2a(peer.peerid),
518 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
519 level=log.NOISY, parent=self._log_parent)
522 if should_add_server(self.preexisting_shares,
524 self.preexisting_shares[s] = peer.peerid
525 if s in self.homeless_shares:
526 self.homeless_shares.remove(s)
528 # the PeerTracker will remember which shares were allocated on
529 # that peer. We just have to remember to use them.
531 self.use_peers.add(peer)
534 if allocated or alreadygot:
535 self.peers_with_shares.append(peer.peerid)
537 not_yet_present = set(shares_to_ask) - set(alreadygot)
538 still_homeless = not_yet_present - set(allocated)
541 # they accepted or already had at least one share, so
542 # progress has been made
543 self.good_query_count += 1
545 self.bad_query_count += 1
549 # In networks with lots of space, this is very unusual and
550 # probably indicates an error. In networks with peers that
551 # are full, it is merely unusual. In networks that are very
552 # full, it is common, and many uploads will fail. In most
553 # cases, this is obviously not fatal, and we'll just use some
556 # some shares are still homeless, keep trying to find them a
557 # home. The ones that were rejected get first priority.
558 self.homeless_shares = (list(still_homeless)
559 + self.homeless_shares)
560 # Since they were unable to accept all of our requests, so it
561 # is safe to assume that asking them again won't help.
563 # if they *were* able to accept everything, they might be
564 # willing to accept even more.
565 put_peer_here.append(peer)
571 class EncryptAnUploadable:
572 """This is a wrapper that takes an IUploadable and provides
573 IEncryptedUploadable."""
574 implements(IEncryptedUploadable)
577 def __init__(self, original, log_parent=None):
578 self.original = IUploadable(original)
579 self._log_number = log_parent
580 self._encryptor = None
581 self._plaintext_hasher = plaintext_hasher()
582 self._plaintext_segment_hasher = None
583 self._plaintext_segment_hashes = []
584 self._encoding_parameters = None
585 self._file_size = None
586 self._ciphertext_bytes_read = 0
589 def set_upload_status(self, upload_status):
590 self._status = IUploadStatus(upload_status)
591 self.original.set_upload_status(upload_status)
593 def log(self, *args, **kwargs):
594 if "facility" not in kwargs:
595 kwargs["facility"] = "upload.encryption"
596 if "parent" not in kwargs:
597 kwargs["parent"] = self._log_number
598 return log.msg(*args, **kwargs)
601 if self._file_size is not None:
602 return defer.succeed(self._file_size)
603 d = self.original.get_size()
605 self._file_size = size
607 self._status.set_size(size)
609 d.addCallback(_got_size)
612 def get_all_encoding_parameters(self):
613 if self._encoding_parameters is not None:
614 return defer.succeed(self._encoding_parameters)
615 d = self.original.get_all_encoding_parameters()
616 def _got(encoding_parameters):
617 (k, happy, n, segsize) = encoding_parameters
618 self._segment_size = segsize # used by segment hashers
619 self._encoding_parameters = encoding_parameters
620 self.log("my encoding parameters: %s" % (encoding_parameters,),
622 return encoding_parameters
626 def _get_encryptor(self):
628 return defer.succeed(self._encryptor)
630 d = self.original.get_encryption_key()
635 storage_index = storage_index_hash(key)
636 assert isinstance(storage_index, str)
637 # There's no point to having the SI be longer than the key, so we
638 # specify that it is truncated to the same 128 bits as the AES key.
639 assert len(storage_index) == 16 # SHA-256 truncated to 128b
640 self._storage_index = storage_index
642 self._status.set_storage_index(storage_index)
647 def get_storage_index(self):
648 d = self._get_encryptor()
649 d.addCallback(lambda res: self._storage_index)
652 def _get_segment_hasher(self):
653 p = self._plaintext_segment_hasher
655 left = self._segment_size - self._plaintext_segment_hashed_bytes
657 p = plaintext_segment_hasher()
658 self._plaintext_segment_hasher = p
659 self._plaintext_segment_hashed_bytes = 0
660 return p, self._segment_size
662 def _update_segment_hash(self, chunk):
664 while offset < len(chunk):
665 p, segment_left = self._get_segment_hasher()
666 chunk_left = len(chunk) - offset
667 this_segment = min(chunk_left, segment_left)
668 p.update(chunk[offset:offset+this_segment])
669 self._plaintext_segment_hashed_bytes += this_segment
671 if self._plaintext_segment_hashed_bytes == self._segment_size:
672 # we've filled this segment
673 self._plaintext_segment_hashes.append(p.digest())
674 self._plaintext_segment_hasher = None
675 self.log("closed hash [%d]: %dB" %
676 (len(self._plaintext_segment_hashes)-1,
677 self._plaintext_segment_hashed_bytes),
679 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
680 segnum=len(self._plaintext_segment_hashes)-1,
681 hash=base32.b2a(p.digest()),
684 offset += this_segment
687 def read_encrypted(self, length, hash_only):
688 # make sure our parameters have been set up first
689 d = self.get_all_encoding_parameters()
691 d.addCallback(lambda ignored: self.get_size())
692 d.addCallback(lambda ignored: self._get_encryptor())
693 # then fetch and encrypt the plaintext. The unusual structure here
694 # (passing a Deferred *into* a function) is needed to avoid
695 # overflowing the stack: Deferreds don't optimize out tail recursion.
696 # We also pass in a list, to which _read_encrypted will append
699 d2 = defer.Deferred()
700 d.addCallback(lambda ignored:
701 self._read_encrypted(length, ciphertext, hash_only, d2))
702 d.addCallback(lambda ignored: d2)
705 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
707 fire_when_done.callback(ciphertext)
709 # tolerate large length= values without consuming a lot of RAM by
710 # reading just a chunk (say 50kB) at a time. This only really matters
711 # when hash_only==True (i.e. resuming an interrupted upload), since
712 # that's the case where we will be skipping over a lot of data.
713 size = min(remaining, self.CHUNKSIZE)
714 remaining = remaining - size
715 # read a chunk of plaintext..
716 d = defer.maybeDeferred(self.original.read, size)
717 # N.B.: if read() is synchronous, then since everything else is
718 # actually synchronous too, we'd blow the stack unless we stall for a
719 # tick. Once you accept a Deferred from IUploadable.read(), you must
720 # be prepared to have it fire immediately too.
721 d.addCallback(fireEventually)
722 def _good(plaintext):
724 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
725 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
726 ciphertext.extend(ct)
727 self._read_encrypted(remaining, ciphertext, hash_only,
730 fire_when_done.errback(why)
735 def _hash_and_encrypt_plaintext(self, data, hash_only):
736 assert isinstance(data, (tuple, list)), type(data)
739 # we use data.pop(0) instead of 'for chunk in data' to save
740 # memory: each chunk is destroyed as soon as we're done with it.
744 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
746 bytes_processed += len(chunk)
747 self._plaintext_hasher.update(chunk)
748 self._update_segment_hash(chunk)
749 # TODO: we have to encrypt the data (even if hash_only==True)
750 # because pycryptopp's AES-CTR implementation doesn't offer a
751 # way to change the counter value. Once pycryptopp acquires
752 # this ability, change this to simply update the counter
753 # before each call to (hash_only==False) _encryptor.process()
754 ciphertext = self._encryptor.process(chunk)
756 self.log(" skipping encryption", level=log.NOISY)
758 cryptdata.append(ciphertext)
761 self._ciphertext_bytes_read += bytes_processed
763 progress = float(self._ciphertext_bytes_read) / self._file_size
764 self._status.set_progress(1, progress)
768 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
769 # this is currently unused, but will live again when we fix #453
770 if len(self._plaintext_segment_hashes) < num_segments:
771 # close out the last one
772 assert len(self._plaintext_segment_hashes) == num_segments-1
773 p, segment_left = self._get_segment_hasher()
774 self._plaintext_segment_hashes.append(p.digest())
775 del self._plaintext_segment_hasher
776 self.log("closing plaintext leaf hasher, hashed %d bytes" %
777 self._plaintext_segment_hashed_bytes,
779 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
780 segnum=len(self._plaintext_segment_hashes)-1,
781 hash=base32.b2a(p.digest()),
783 assert len(self._plaintext_segment_hashes) == num_segments
784 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
786 def get_plaintext_hash(self):
787 h = self._plaintext_hasher.digest()
788 return defer.succeed(h)
791 return self.original.close()
794 implements(IUploadStatus)
795 statusid_counter = itertools.count(0)
798 self.storage_index = None
801 self.status = "Not started"
802 self.progress = [0.0, 0.0, 0.0]
805 self.counter = self.statusid_counter.next()
806 self.started = time.time()
808 def get_started(self):
810 def get_storage_index(self):
811 return self.storage_index
814 def using_helper(self):
816 def get_status(self):
818 def get_progress(self):
819 return tuple(self.progress)
820 def get_active(self):
822 def get_results(self):
824 def get_counter(self):
827 def set_storage_index(self, si):
828 self.storage_index = si
829 def set_size(self, size):
831 def set_helper(self, helper):
833 def set_status(self, status):
835 def set_progress(self, which, value):
836 # [0]: chk, [1]: ciphertext, [2]: encode+push
837 self.progress[which] = value
838 def set_active(self, value):
840 def set_results(self, value):
844 peer_selector_class = Tahoe2PeerSelector
846 def __init__(self, storage_broker, secret_holder):
847 # peer_selector needs storage_broker and secret_holder
848 self._storage_broker = storage_broker
849 self._secret_holder = secret_holder
850 self._log_number = self.log("CHKUploader starting", parent=None)
852 self._results = UploadResults()
853 self._storage_index = None
854 self._upload_status = UploadStatus()
855 self._upload_status.set_helper(False)
856 self._upload_status.set_active(True)
857 self._upload_status.set_results(self._results)
859 # locate_all_shareholders() will create the following attribute:
860 # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
862 def log(self, *args, **kwargs):
863 if "parent" not in kwargs:
864 kwargs["parent"] = self._log_number
865 if "facility" not in kwargs:
866 kwargs["facility"] = "tahoe.upload"
867 return log.msg(*args, **kwargs)
869 def start(self, encrypted_uploadable):
870 """Start uploading the file.
872 Returns a Deferred that will fire with the UploadResults instance.
875 self._started = time.time()
876 eu = IEncryptedUploadable(encrypted_uploadable)
877 self.log("starting upload of %s" % eu)
879 eu.set_upload_status(self._upload_status)
880 d = self.start_encrypted(eu)
881 def _done(uploadresults):
882 self._upload_status.set_active(False)
888 """Call this if the upload must be abandoned before it completes.
889 This will tell the shareholders to delete their partial shares. I
890 return a Deferred that fires when these messages have been acked."""
891 if not self._encoder:
892 # how did you call abort() before calling start() ?
893 return defer.succeed(None)
894 return self._encoder.abort()
896 def start_encrypted(self, encrypted):
897 """ Returns a Deferred that will fire with the UploadResults instance. """
898 eu = IEncryptedUploadable(encrypted)
900 started = time.time()
901 self._encoder = e = encode.Encoder(self._log_number,
903 d = e.set_encrypted_uploadable(eu)
904 d.addCallback(self.locate_all_shareholders, started)
905 d.addCallback(self.set_shareholders, e)
906 d.addCallback(lambda res: e.start())
907 d.addCallback(self._encrypted_done)
910 def locate_all_shareholders(self, encoder, started):
911 peer_selection_started = now = time.time()
912 self._storage_index_elapsed = now - started
913 storage_broker = self._storage_broker
914 secret_holder = self._secret_holder
915 storage_index = encoder.get_param("storage_index")
916 self._storage_index = storage_index
917 upload_id = si_b2a(storage_index)[:5]
918 self.log("using storage index %s" % upload_id)
919 peer_selector = self.peer_selector_class(upload_id, self._log_number,
922 share_size = encoder.get_param("share_size")
923 block_size = encoder.get_param("block_size")
924 num_segments = encoder.get_param("num_segments")
925 k,desired,n = encoder.get_param("share_counts")
927 self._peer_selection_started = time.time()
928 d = peer_selector.get_shareholders(storage_broker, secret_holder,
930 share_size, block_size,
931 num_segments, n, k, desired)
933 self._peer_selection_elapsed = time.time() - peer_selection_started
938 def set_shareholders(self, (used_peers, already_peers), encoder):
940 @param used_peers: a sequence of PeerTracker objects
941 @paran already_peers: a dict mapping sharenum to a peerid that
942 claims to already have this share
944 self.log("_send_shares, used_peers is %s" % (used_peers,))
945 # record already-present shares in self._results
946 self._results.preexisting_shares = len(already_peers)
948 self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
949 for peer in used_peers:
950 assert isinstance(peer, PeerTracker)
952 servermap = already_peers.copy()
953 for peer in used_peers:
954 buckets.update(peer.buckets)
955 for shnum in peer.buckets:
956 self._peer_trackers[shnum] = peer
957 servermap[shnum] = peer.peerid
958 assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
959 encoder.set_shareholders(buckets, servermap)
961 def _encrypted_done(self, verifycap):
962 """ Returns a Deferred that will fire with the UploadResults instance. """
964 for shnum in self._encoder.get_shares_placed():
965 peer_tracker = self._peer_trackers[shnum]
966 peerid = peer_tracker.peerid
967 r.sharemap.add(shnum, peerid)
968 r.servermap.add(peerid, shnum)
969 r.pushed_shares = len(self._encoder.get_shares_placed())
971 r.file_size = self._encoder.file_size
972 r.timings["total"] = now - self._started
973 r.timings["storage_index"] = self._storage_index_elapsed
974 r.timings["peer_selection"] = self._peer_selection_elapsed
975 r.timings.update(self._encoder.get_times())
976 r.uri_extension_data = self._encoder.get_uri_extension_data()
977 r.verifycapstr = verifycap.to_string()
980 def get_upload_status(self):
981 return self._upload_status
983 def read_this_many_bytes(uploadable, size, prepend_data=[]):
985 return defer.succeed([])
986 d = uploadable.read(size)
988 assert isinstance(data, list)
989 bytes = sum([len(piece) for piece in data])
992 remaining = size - bytes
994 return read_this_many_bytes(uploadable, remaining,
996 return prepend_data + data
1000 class LiteralUploader:
1003 self._results = UploadResults()
1004 self._status = s = UploadStatus()
1005 s.set_storage_index(None)
1007 s.set_progress(0, 1.0)
1009 s.set_results(self._results)
1011 def start(self, uploadable):
1012 uploadable = IUploadable(uploadable)
1013 d = uploadable.get_size()
1014 def _got_size(size):
1016 self._status.set_size(size)
1017 self._results.file_size = size
1018 return read_this_many_bytes(uploadable, size)
1019 d.addCallback(_got_size)
1020 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
1021 d.addCallback(lambda u: u.to_string())
1022 d.addCallback(self._build_results)
1025 def _build_results(self, uri):
1026 self._results.uri = uri
1027 self._status.set_status("Finished")
1028 self._status.set_progress(1, 1.0)
1029 self._status.set_progress(2, 1.0)
1030 return self._results
1035 def get_upload_status(self):
1038 class RemoteEncryptedUploadable(Referenceable):
1039 implements(RIEncryptedUploadable)
1041 def __init__(self, encrypted_uploadable, upload_status):
1042 self._eu = IEncryptedUploadable(encrypted_uploadable)
1044 self._bytes_sent = 0
1045 self._status = IUploadStatus(upload_status)
1046 # we are responsible for updating the status string while we run, and
1047 # for setting the ciphertext-fetch progress.
1051 if self._size is not None:
1052 return defer.succeed(self._size)
1053 d = self._eu.get_size()
1054 def _got_size(size):
1057 d.addCallback(_got_size)
1060 def remote_get_size(self):
1061 return self.get_size()
1062 def remote_get_all_encoding_parameters(self):
1063 return self._eu.get_all_encoding_parameters()
1065 def _read_encrypted(self, length, hash_only):
1066 d = self._eu.read_encrypted(length, hash_only)
1069 self._offset += length
1071 size = sum([len(data) for data in strings])
1072 self._offset += size
1074 d.addCallback(_read)
1077 def remote_read_encrypted(self, offset, length):
1078 # we don't support seek backwards, but we allow skipping forwards
1079 precondition(offset >= 0, offset)
1080 precondition(length >= 0, length)
1081 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1083 precondition(offset >= self._offset, offset, self._offset)
1084 if offset > self._offset:
1085 # read the data from disk anyways, to build up the hash tree
1086 skip = offset - self._offset
1087 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1088 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1089 d = self._read_encrypted(skip, hash_only=True)
1091 d = defer.succeed(None)
1093 def _at_correct_offset(res):
1094 assert offset == self._offset, "%d != %d" % (offset, self._offset)
1095 return self._read_encrypted(length, hash_only=False)
1096 d.addCallback(_at_correct_offset)
1099 size = sum([len(data) for data in strings])
1100 self._bytes_sent += size
1102 d.addCallback(_read)
1105 def remote_close(self):
1106 return self._eu.close()
1109 class AssistedUploader:
1111 def __init__(self, helper):
1112 self._helper = helper
1113 self._log_number = log.msg("AssistedUploader starting")
1114 self._storage_index = None
1115 self._upload_status = s = UploadStatus()
1119 def log(self, *args, **kwargs):
1120 if "parent" not in kwargs:
1121 kwargs["parent"] = self._log_number
1122 return log.msg(*args, **kwargs)
1124 def start(self, encrypted_uploadable, storage_index):
1125 """Start uploading the file.
1127 Returns a Deferred that will fire with the UploadResults instance.
1129 precondition(isinstance(storage_index, str), storage_index)
1130 self._started = time.time()
1131 eu = IEncryptedUploadable(encrypted_uploadable)
1132 eu.set_upload_status(self._upload_status)
1133 self._encuploadable = eu
1134 self._storage_index = storage_index
1136 d.addCallback(self._got_size)
1137 d.addCallback(lambda res: eu.get_all_encoding_parameters())
1138 d.addCallback(self._got_all_encoding_parameters)
1139 d.addCallback(self._contact_helper)
1140 d.addCallback(self._build_verifycap)
1142 self._upload_status.set_active(False)
1147 def _got_size(self, size):
1149 self._upload_status.set_size(size)
1151 def _got_all_encoding_parameters(self, params):
1152 k, happy, n, segment_size = params
1153 # stash these for URI generation later
1154 self._needed_shares = k
1155 self._total_shares = n
1156 self._segment_size = segment_size
1158 def _contact_helper(self, res):
1159 now = self._time_contacting_helper_start = time.time()
1160 self._storage_index_elapsed = now - self._started
1161 self.log(format="contacting helper for SI %(si)s..",
1162 si=si_b2a(self._storage_index))
1163 self._upload_status.set_status("Contacting Helper")
1164 d = self._helper.callRemote("upload_chk", self._storage_index)
1165 d.addCallback(self._contacted_helper)
1168 def _contacted_helper(self, (upload_results, upload_helper)):
1170 elapsed = now - self._time_contacting_helper_start
1171 self._elapsed_time_contacting_helper = elapsed
1173 self.log("helper says we need to upload")
1174 self._upload_status.set_status("Uploading Ciphertext")
1175 # we need to upload the file
1176 reu = RemoteEncryptedUploadable(self._encuploadable,
1177 self._upload_status)
1178 # let it pre-compute the size for progress purposes
1180 d.addCallback(lambda ignored:
1181 upload_helper.callRemote("upload", reu))
1182 # this Deferred will fire with the upload results
1184 self.log("helper says file is already uploaded")
1185 self._upload_status.set_progress(1, 1.0)
1186 self._upload_status.set_results(upload_results)
1187 return upload_results
1189 def _convert_old_upload_results(self, upload_results):
1190 # pre-1.3.0 helpers return upload results which contain a mapping
1191 # from shnum to a single human-readable string, containing things
1192 # like "Found on [x],[y],[z]" (for healthy files that were already in
1193 # the grid), "Found on [x]" (for files that needed upload but which
1194 # discovered pre-existing shares), and "Placed on [x]" (for newly
1195 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1196 # set of binary serverid strings.
1198 # the old results are too hard to deal with (they don't even contain
1199 # as much information as the new results, since the nodeids are
1200 # abbreviated), so if we detect old results, just clobber them.
1202 sharemap = upload_results.sharemap
1203 if str in [type(v) for v in sharemap.values()]:
1204 upload_results.sharemap = None
1206 def _build_verifycap(self, upload_results):
1207 self.log("upload finished, building readcap")
1208 self._convert_old_upload_results(upload_results)
1209 self._upload_status.set_status("Building Readcap")
1211 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1212 assert r.uri_extension_data["total_shares"] == self._total_shares
1213 assert r.uri_extension_data["segment_size"] == self._segment_size
1214 assert r.uri_extension_data["size"] == self._size
1215 r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1216 uri_extension_hash=r.uri_extension_hash,
1217 needed_shares=self._needed_shares,
1218 total_shares=self._total_shares, size=self._size
1221 r.file_size = self._size
1222 r.timings["storage_index"] = self._storage_index_elapsed
1223 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1224 if "total" in r.timings:
1225 r.timings["helper_total"] = r.timings["total"]
1226 r.timings["total"] = now - self._started
1227 self._upload_status.set_status("Finished")
1228 self._upload_status.set_results(r)
1231 def get_upload_status(self):
1232 return self._upload_status
1234 class BaseUploadable:
1235 default_max_segment_size = 128*KiB # overridden by max_segment_size
1236 default_encoding_param_k = 3 # overridden by encoding_parameters
1237 default_encoding_param_happy = 7
1238 default_encoding_param_n = 10
1240 max_segment_size = None
1241 encoding_param_k = None
1242 encoding_param_happy = None
1243 encoding_param_n = None
1245 _all_encoding_parameters = None
1248 def set_upload_status(self, upload_status):
1249 self._status = IUploadStatus(upload_status)
1251 def set_default_encoding_parameters(self, default_params):
1252 assert isinstance(default_params, dict)
1253 for k,v in default_params.items():
1254 precondition(isinstance(k, str), k, v)
1255 precondition(isinstance(v, int), k, v)
1256 if "k" in default_params:
1257 self.default_encoding_param_k = default_params["k"]
1258 if "happy" in default_params:
1259 self.default_encoding_param_happy = default_params["happy"]
1260 if "n" in default_params:
1261 self.default_encoding_param_n = default_params["n"]
1262 if "max_segment_size" in default_params:
1263 self.default_max_segment_size = default_params["max_segment_size"]
1265 def get_all_encoding_parameters(self):
1266 if self._all_encoding_parameters:
1267 return defer.succeed(self._all_encoding_parameters)
1269 max_segsize = self.max_segment_size or self.default_max_segment_size
1270 k = self.encoding_param_k or self.default_encoding_param_k
1271 happy = self.encoding_param_happy or self.default_encoding_param_happy
1272 n = self.encoding_param_n or self.default_encoding_param_n
1275 def _got_size(file_size):
1276 # for small files, shrink the segment size to avoid wasting space
1277 segsize = min(max_segsize, file_size)
1278 # this must be a multiple of 'required_shares'==k
1279 segsize = mathutil.next_multiple(segsize, k)
1280 encoding_parameters = (k, happy, n, segsize)
1281 self._all_encoding_parameters = encoding_parameters
1282 return encoding_parameters
1283 d.addCallback(_got_size)
1286 class FileHandle(BaseUploadable):
1287 implements(IUploadable)
1289 def __init__(self, filehandle, convergence):
1291 Upload the data from the filehandle. If convergence is None then a
1292 random encryption key will be used, else the plaintext will be hashed,
1293 then the hash will be hashed together with the string in the
1294 "convergence" argument to form the encryption key.
1296 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1297 self._filehandle = filehandle
1299 self.convergence = convergence
1302 def _get_encryption_key_convergent(self):
1303 if self._key is not None:
1304 return defer.succeed(self._key)
1307 # that sets self._size as a side-effect
1308 d.addCallback(lambda size: self.get_all_encoding_parameters())
1310 k, happy, n, segsize = params
1311 f = self._filehandle
1312 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1317 data = f.read(BLOCKSIZE)
1320 enckey_hasher.update(data)
1321 # TODO: setting progress in a non-yielding loop is kind of
1322 # pointless, but I'm anticipating (perhaps prematurely) the
1323 # day when we use a slowjob or twisted's CooperatorService to
1324 # make this yield time to other jobs.
1325 bytes_read += len(data)
1327 self._status.set_progress(0, float(bytes_read)/self._size)
1329 self._key = enckey_hasher.digest()
1331 self._status.set_progress(0, 1.0)
1332 assert len(self._key) == 16
1337 def _get_encryption_key_random(self):
1338 if self._key is None:
1339 self._key = os.urandom(16)
1340 return defer.succeed(self._key)
1342 def get_encryption_key(self):
1343 if self.convergence is not None:
1344 return self._get_encryption_key_convergent()
1346 return self._get_encryption_key_random()
1349 if self._size is not None:
1350 return defer.succeed(self._size)
1351 self._filehandle.seek(0,2)
1352 size = self._filehandle.tell()
1354 self._filehandle.seek(0)
1355 return defer.succeed(size)
1357 def read(self, length):
1358 return defer.succeed([self._filehandle.read(length)])
1361 # the originator of the filehandle reserves the right to close it
1364 class FileName(FileHandle):
1365 def __init__(self, filename, convergence):
1367 Upload the data from the filename. If convergence is None then a
1368 random encryption key will be used, else the plaintext will be hashed,
1369 then the hash will be hashed together with the string in the
1370 "convergence" argument to form the encryption key.
1372 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1373 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1375 FileHandle.close(self)
1376 self._filehandle.close()
1378 class Data(FileHandle):
1379 def __init__(self, data, convergence):
1381 Upload the data from the data argument. If convergence is None then a
1382 random encryption key will be used, else the plaintext will be hashed,
1383 then the hash will be hashed together with the string in the
1384 "convergence" argument to form the encryption key.
1386 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1387 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1389 class Uploader(service.MultiService, log.PrefixingLogMixin):
1390 """I am a service that allows file uploading. I am a service-child of the
1393 implements(IUploader)
1395 URI_LIT_SIZE_THRESHOLD = 55
1397 def __init__(self, helper_furl=None, stats_provider=None):
1398 self._helper_furl = helper_furl
1399 self.stats_provider = stats_provider
1401 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1402 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1403 service.MultiService.__init__(self)
1405 def startService(self):
1406 service.MultiService.startService(self)
1407 if self._helper_furl:
1408 self.parent.tub.connectTo(self._helper_furl,
1411 def _got_helper(self, helper):
1412 self.log("got helper connection, getting versions")
1413 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1415 "application-version": "unknown: no get_version()",
1417 d = add_version_to_remote_reference(helper, default)
1418 d.addCallback(self._got_versioned_helper)
1420 def _got_versioned_helper(self, helper):
1421 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1422 if needed not in helper.version:
1423 raise InsufficientVersionError(needed, helper.version)
1424 self._helper = helper
1425 helper.notifyOnDisconnect(self._lost_helper)
1427 def _lost_helper(self):
1430 def get_helper_info(self):
1431 # return a tuple of (helper_furl_or_None, connected_bool)
1432 return (self._helper_furl, bool(self._helper))
1435 def upload(self, uploadable, history=None):
1437 Returns a Deferred that will fire with the UploadResults instance.
1442 uploadable = IUploadable(uploadable)
1443 d = uploadable.get_size()
1444 def _got_size(size):
1445 default_params = self.parent.get_encoding_parameters()
1446 precondition(isinstance(default_params, dict), default_params)
1447 precondition("max_segment_size" in default_params, default_params)
1448 uploadable.set_default_encoding_parameters(default_params)
1450 if self.stats_provider:
1451 self.stats_provider.count('uploader.files_uploaded', 1)
1452 self.stats_provider.count('uploader.bytes_uploaded', size)
1454 if size <= self.URI_LIT_SIZE_THRESHOLD:
1455 uploader = LiteralUploader()
1456 return uploader.start(uploadable)
1458 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1459 d2 = defer.succeed(None)
1461 uploader = AssistedUploader(self._helper)
1462 d2.addCallback(lambda x: eu.get_storage_index())
1463 d2.addCallback(lambda si: uploader.start(eu, si))
1465 storage_broker = self.parent.get_storage_broker()
1466 secret_holder = self.parent._secret_holder
1467 uploader = CHKUploader(storage_broker, secret_holder)
1468 d2.addCallback(lambda x: uploader.start(eu))
1470 self._all_uploads[uploader] = None
1472 history.add_upload(uploader.get_upload_status())
1473 def turn_verifycap_into_read_cap(uploadresults):
1474 # Generate the uri from the verifycap plus the key.
1475 d3 = uploadable.get_encryption_key()
1476 def put_readcap_into_results(key):
1477 v = uri.from_string(uploadresults.verifycapstr)
1478 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1479 uploadresults.uri = r.to_string()
1480 return uploadresults
1481 d3.addCallback(put_readcap_into_results)
1483 d2.addCallback(turn_verifycap_into_read_cap)
1485 d.addCallback(_got_size)