1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9 file_cancel_secret_hash, bucket_renewal_secret_hash, \
10 bucket_cancel_secret_hash, plaintext_hasher, \
11 storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17 shares_by_server, merge_peers, \
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23 NoServersError, InsufficientVersionError, UploadUnhappinessError
24 from allmydata.immutable import layout
25 from pycryptopp.cipher.aes import AES
27 from cStringIO import StringIO
36 class HaveAllPeersError(Exception):
37 # we use this to jump out of the loop
40 # this wants to live in storage, not here
41 class TooFullError(Exception):
44 class UploadResults(Copyable, RemoteCopy):
45 implements(IUploadResults)
46 # note: don't change this string, it needs to match the value used on the
47 # helper, and it does *not* need to match the fully-qualified
48 # package/module/class name
49 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
52 # also, think twice about changing the shape of any existing attribute,
53 # because instances of this class are sent from the helper to its client,
54 # so changing this may break compatibility. Consider adding new fields
55 # instead of modifying existing ones.
58 self.timings = {} # dict of name to number of seconds
59 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
60 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
62 self.ciphertext_fetched = None # how much the helper fetched
64 self.preexisting_shares = None # count of shares already present
65 self.pushed_shares = None # count of shares we pushed
68 # our current uri_extension is 846 bytes for small files, a few bytes
69 # more for larger ones (since the filesize is encoded in decimal in a
70 # few places). Ask for a little bit more just in case we need it. If
71 # the extension changes size, we can change EXTENSION_SIZE to
72 # allocate a more accurate amount of space.
74 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
77 def pretty_print_shnum_to_servers(s):
78 return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
81 def __init__(self, peerid, storage_server,
82 sharesize, blocksize, num_segments, num_share_hashes,
84 bucket_renewal_secret, bucket_cancel_secret):
85 precondition(isinstance(peerid, str), peerid)
86 precondition(len(peerid) == 20, peerid)
88 self._storageserver = storage_server # to an RIStorageServer
89 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
90 self.sharesize = sharesize
92 wbp = layout.make_write_bucket_proxy(None, sharesize,
93 blocksize, num_segments,
95 EXTENSION_SIZE, peerid)
96 self.wbp_class = wbp.__class__ # to create more of them
97 self.allocated_size = wbp.get_allocated_size()
98 self.blocksize = blocksize
99 self.num_segments = num_segments
100 self.num_share_hashes = num_share_hashes
101 self.storage_index = storage_index
103 self.renew_secret = bucket_renewal_secret
104 self.cancel_secret = bucket_cancel_secret
107 return ("<PeerTracker for peer %s and SI %s>"
108 % (idlib.shortnodeid_b2a(self.peerid),
109 si_b2a(self.storage_index)[:5]))
111 def query(self, sharenums):
112 d = self._storageserver.callRemote("allocate_buckets",
118 canary=Referenceable())
119 d.addCallback(self._got_reply)
122 def ask_about_existing_shares(self):
123 return self._storageserver.callRemote("get_buckets",
126 def _got_reply(self, (alreadygot, buckets)):
127 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
129 for sharenum, rref in buckets.iteritems():
130 bp = self.wbp_class(rref, self.sharesize,
133 self.num_share_hashes,
137 self.buckets.update(b)
138 return (alreadygot, set(b.keys()))
143 I abort the remote bucket writers for all shares. This is a good idea
144 to conserve space on the storage server.
146 self.abort_some_buckets(self.buckets.keys())
148 def abort_some_buckets(self, sharenums):
150 I abort the remote bucket writers for the share numbers in sharenums.
152 for sharenum in sharenums:
153 if sharenum in self.buckets:
154 self.buckets[sharenum].abort()
155 del self.buckets[sharenum]
158 def str_shareloc(shnum, bucketwriter):
159 return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),)
161 class Tahoe2PeerSelector(log.PrefixingLogMixin):
163 def __init__(self, upload_id, logparent=None, upload_status=None):
164 self.upload_id = upload_id
165 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
166 # Peers that are working normally, but full.
169 self.num_peers_contacted = 0
170 self.last_failure_msg = None
171 self._status = IUploadStatus(upload_status)
172 log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
173 self.log("starting", level=log.OPERATIONAL)
176 return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
178 def get_shareholders(self, storage_broker, secret_holder,
179 storage_index, share_size, block_size,
180 num_segments, total_shares, needed_shares,
181 servers_of_happiness):
183 @return: (upload_servers, already_peers), where upload_servers is a set of
184 PeerTracker instances that have agreed to hold some shares
185 for us (the shareids are stashed inside the PeerTracker),
186 and already_peers is a dict mapping shnum to a set of peers
187 which claim to already have the share.
191 self._status.set_status("Contacting Peers..")
193 self.total_shares = total_shares
194 self.servers_of_happiness = servers_of_happiness
195 self.needed_shares = needed_shares
197 self.homeless_shares = set(range(total_shares))
198 self.contacted_peers = [] # peers worth asking again
199 self.contacted_peers2 = [] # peers that we have asked again
200 self._started_second_pass = False
201 self.use_peers = set() # PeerTrackers that have shares assigned to them
202 self.preexisting_shares = {} # shareid => set(peerids) holding shareid
203 # We don't try to allocate shares to these servers, since they've said
204 # that they're incapable of storing shares of the size that we'd want
205 # to store. We keep them around because they may have existing shares
206 # for this storage index, which we want to know about for accurate
207 # servers_of_happiness accounting
208 # (this is eventually a list, but it is initialized later)
209 self.readonly_peers = None
210 # These peers have shares -- any shares -- for our SI. We keep
211 # track of these to write an error message with them later.
212 self.peers_with_shares = set()
214 # this needed_hashes computation should mirror
215 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
216 # (instead of a HashTree) because we don't require actual hashing
217 # just to count the levels.
218 ht = hashtree.IncompleteHashTree(total_shares)
219 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
221 # figure out how much space to ask for
222 wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
223 num_share_hashes, EXTENSION_SIZE,
225 allocated_size = wbp.get_allocated_size()
226 all_peers = storage_broker.get_servers_for_index(storage_index)
228 raise NoServersError("client gave us zero peers")
230 # filter the list of peers according to which ones can accomodate
231 # this request. This excludes older peers (which used a 4-byte size
232 # field) from getting large shares (for files larger than about
233 # 12GiB). See #439 for details.
234 def _get_maxsize(peer):
235 (peerid, conn) = peer
236 v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
237 return v1["maximum-immutable-share-size"]
238 writable_peers = [peer for peer in all_peers
239 if _get_maxsize(peer) >= allocated_size]
240 readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers)
242 # decide upon the renewal/cancel secrets, to include them in the
243 # allocate_buckets query.
244 client_renewal_secret = secret_holder.get_renewal_secret()
245 client_cancel_secret = secret_holder.get_cancel_secret()
247 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
249 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
251 def _make_trackers(peers):
252 return [PeerTracker(peerid, conn,
253 share_size, block_size,
254 num_segments, num_share_hashes,
256 bucket_renewal_secret_hash(file_renewal_secret,
258 bucket_cancel_secret_hash(file_cancel_secret,
260 for (peerid, conn) in peers]
261 self.uncontacted_peers = _make_trackers(writable_peers)
262 self.readonly_peers = _make_trackers(readonly_peers)
263 # We now ask peers that can't hold any new shares about existing
264 # shares that they might have for our SI. Once this is done, we
265 # start placing the shares that we haven't already accounted
268 if self._status and self.readonly_peers:
269 self._status.set_status("Contacting readonly peers to find "
270 "any existing shares")
271 for peer in self.readonly_peers:
272 assert isinstance(peer, PeerTracker)
273 d = peer.ask_about_existing_shares()
274 d.addBoth(self._handle_existing_response, peer.peerid)
276 self.num_peers_contacted += 1
277 self.query_count += 1
278 self.log("asking peer %s for any existing shares" %
279 (idlib.shortnodeid_b2a(peer.peerid),),
281 dl = defer.DeferredList(ds)
282 dl.addCallback(lambda ign: self._loop())
286 def _handle_existing_response(self, res, peer):
288 I handle responses to the queries sent by
289 Tahoe2PeerSelector._existing_shares.
291 if isinstance(res, failure.Failure):
292 self.log("%s got error during existing shares check: %s"
293 % (idlib.shortnodeid_b2a(peer), res),
295 self.error_count += 1
296 self.bad_query_count += 1
300 self.peers_with_shares.add(peer)
301 self.log("response to get_buckets() from peer %s: alreadygot=%s"
302 % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
304 for bucket in buckets:
305 self.preexisting_shares.setdefault(bucket, set()).add(peer)
306 self.homeless_shares.discard(bucket)
308 self.bad_query_count += 1
311 def _get_progress_message(self):
312 if not self.homeless_shares:
313 msg = "placed all %d shares, " % (self.total_shares)
315 msg = ("placed %d shares out of %d total (%d homeless), " %
316 (self.total_shares - len(self.homeless_shares),
318 len(self.homeless_shares)))
319 return (msg + "want to place shares on at least %d servers such that "
320 "any %d of them have enough shares to recover the file, "
321 "sent %d queries to %d peers, "
322 "%d queries placed some shares, %d placed none "
323 "(of which %d placed none due to the server being"
324 " full and %d placed none due to an error)" %
325 (self.servers_of_happiness, self.needed_shares,
326 self.query_count, self.num_peers_contacted,
327 self.good_query_count, self.bad_query_count,
328 self.full_count, self.error_count))
332 if not self.homeless_shares:
333 merged = merge_peers(self.preexisting_shares, self.use_peers)
334 effective_happiness = servers_of_happiness(merged)
335 if self.servers_of_happiness <= effective_happiness:
336 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
337 "self.use_peers: %s, self.preexisting_shares: %s") \
338 % (self, self._get_progress_message(),
339 pretty_print_shnum_to_servers(merged),
340 [', '.join([str_shareloc(k,v) for k,v in p.buckets.iteritems()])
341 for p in self.use_peers],
342 pretty_print_shnum_to_servers(self.preexisting_shares))
343 self.log(msg, level=log.OPERATIONAL)
344 return (self.use_peers, self.preexisting_shares)
346 # We're not okay right now, but maybe we can fix it by
347 # redistributing some shares. In cases where one or two
348 # servers has, before the upload, all or most of the
349 # shares for a given SI, this can work by allowing _loop
350 # a chance to spread those out over the other peers,
351 delta = self.servers_of_happiness - effective_happiness
352 shares = shares_by_server(self.preexisting_shares)
353 # Each server in shares maps to a set of shares stored on it.
354 # Since we want to keep at least one share on each server
355 # that has one (otherwise we'd only be making
356 # the situation worse by removing distinct servers),
357 # each server has len(its shares) - 1 to spread around.
358 shares_to_spread = sum([len(list(sharelist)) - 1
359 for (server, sharelist)
361 if delta <= len(self.uncontacted_peers) and \
362 shares_to_spread >= delta:
363 items = shares.items()
364 while len(self.homeless_shares) < delta:
365 # Loop through the allocated shares, removing
366 # one from each server that has more than one
367 # and putting it back into self.homeless_shares
368 # until we've done this delta times.
369 server, sharelist = items.pop()
370 if len(sharelist) > 1:
371 share = sharelist.pop()
372 self.homeless_shares.add(share)
373 self.preexisting_shares[share].remove(server)
374 if not self.preexisting_shares[share]:
375 del self.preexisting_shares[share]
376 items.append((server, sharelist))
377 for writer in self.use_peers:
378 writer.abort_some_buckets(self.homeless_shares)
381 # Redistribution won't help us; fail.
382 peer_count = len(self.peers_with_shares)
383 failmsg = failure_message(peer_count,
385 self.servers_of_happiness,
387 servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
388 servmsg = servmsgtempl % (
391 self._get_progress_message(),
392 pretty_print_shnum_to_servers(merged)
394 self.log(servmsg, level=log.INFREQUENT)
395 return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
397 if self.uncontacted_peers:
398 peer = self.uncontacted_peers.pop(0)
399 # TODO: don't pre-convert all peerids to PeerTrackers
400 assert isinstance(peer, PeerTracker)
402 shares_to_ask = set(sorted(self.homeless_shares)[:1])
403 self.homeless_shares -= shares_to_ask
404 self.query_count += 1
405 self.num_peers_contacted += 1
407 self._status.set_status("Contacting Peers [%s] (first query),"
409 % (idlib.shortnodeid_b2a(peer.peerid),
410 len(self.homeless_shares)))
411 d = peer.query(shares_to_ask)
412 d.addBoth(self._got_response, peer, shares_to_ask,
413 self.contacted_peers)
415 elif self.contacted_peers:
416 # ask a peer that we've already asked.
417 if not self._started_second_pass:
418 self.log("starting second pass",
420 self._started_second_pass = True
421 num_shares = mathutil.div_ceil(len(self.homeless_shares),
422 len(self.contacted_peers))
423 peer = self.contacted_peers.pop(0)
424 shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
425 self.homeless_shares -= shares_to_ask
426 self.query_count += 1
428 self._status.set_status("Contacting Peers [%s] (second query),"
430 % (idlib.shortnodeid_b2a(peer.peerid),
431 len(self.homeless_shares)))
432 d = peer.query(shares_to_ask)
433 d.addBoth(self._got_response, peer, shares_to_ask,
434 self.contacted_peers2)
436 elif self.contacted_peers2:
437 # we've finished the second-or-later pass. Move all the remaining
438 # peers back into self.contacted_peers for the next pass.
439 self.contacted_peers.extend(self.contacted_peers2)
440 self.contacted_peers2[:] = []
443 # no more peers. If we haven't placed enough shares, we fail.
444 merged = merge_peers(self.preexisting_shares, self.use_peers)
445 effective_happiness = servers_of_happiness(merged)
446 if effective_happiness < self.servers_of_happiness:
447 msg = failure_message(len(self.peers_with_shares),
449 self.servers_of_happiness,
451 msg = ("peer selection failed for %s: %s (%s)" % (self,
453 self._get_progress_message()))
454 if self.last_failure_msg:
455 msg += " (%s)" % (self.last_failure_msg,)
456 self.log(msg, level=log.UNUSUAL)
457 return self._failed(msg)
459 # we placed enough to be happy, so we're done
461 self._status.set_status("Placed all shares")
462 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
463 self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
464 self.log(msg, level=log.OPERATIONAL)
465 return (self.use_peers, self.preexisting_shares)
467 def _got_response(self, res, peer, shares_to_ask, put_peer_here):
468 if isinstance(res, failure.Failure):
469 # This is unusual, and probably indicates a bug or a network
471 self.log("%s got error during peer selection: %s" % (peer, res),
473 self.error_count += 1
474 self.bad_query_count += 1
475 self.homeless_shares |= shares_to_ask
476 if (self.uncontacted_peers
477 or self.contacted_peers
478 or self.contacted_peers2):
479 # there is still hope, so just loop
482 # No more peers, so this upload might fail (it depends upon
483 # whether we've hit servers_of_happiness or not). Log the last
484 # failure we got: if a coding error causes all peers to fail
485 # in the same way, this allows the common failure to be seen
486 # by the uploader and should help with debugging
487 msg = ("last failure (from %s) was: %s" % (peer, res))
488 self.last_failure_msg = msg
490 (alreadygot, allocated) = res
491 self.log("response to allocate_buckets() from peer %s: alreadygot=%s, allocated=%s"
492 % (idlib.shortnodeid_b2a(peer.peerid),
493 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
497 self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
498 if s in self.homeless_shares:
499 self.homeless_shares.remove(s)
501 elif s in shares_to_ask:
504 # the PeerTracker will remember which shares were allocated on
505 # that peer. We just have to remember to use them.
507 self.use_peers.add(peer)
510 if allocated or alreadygot:
511 self.peers_with_shares.add(peer.peerid)
513 not_yet_present = set(shares_to_ask) - set(alreadygot)
514 still_homeless = not_yet_present - set(allocated)
517 # They accepted at least one of the shares that we asked
518 # them to accept, or they had a share that we didn't ask
519 # them to accept but that we hadn't placed yet, so this
520 # was a productive query
521 self.good_query_count += 1
523 self.bad_query_count += 1
527 # In networks with lots of space, this is very unusual and
528 # probably indicates an error. In networks with peers that
529 # are full, it is merely unusual. In networks that are very
530 # full, it is common, and many uploads will fail. In most
531 # cases, this is obviously not fatal, and we'll just use some
534 # some shares are still homeless, keep trying to find them a
535 # home. The ones that were rejected get first priority.
536 self.homeless_shares |= still_homeless
537 # Since they were unable to accept all of our requests, so it
538 # is safe to assume that asking them again won't help.
540 # if they *were* able to accept everything, they might be
541 # willing to accept even more.
542 put_peer_here.append(peer)
548 def _failed(self, msg):
550 I am called when peer selection fails. I first abort all of the
551 remote buckets that I allocated during my unsuccessful attempt to
552 place shares for this file. I then raise an
553 UploadUnhappinessError with my msg argument.
555 for peer in self.use_peers:
556 assert isinstance(peer, PeerTracker)
560 raise UploadUnhappinessError(msg)
563 class EncryptAnUploadable:
564 """This is a wrapper that takes an IUploadable and provides
565 IEncryptedUploadable."""
566 implements(IEncryptedUploadable)
569 def __init__(self, original, log_parent=None):
570 self.original = IUploadable(original)
571 self._log_number = log_parent
572 self._encryptor = None
573 self._plaintext_hasher = plaintext_hasher()
574 self._plaintext_segment_hasher = None
575 self._plaintext_segment_hashes = []
576 self._encoding_parameters = None
577 self._file_size = None
578 self._ciphertext_bytes_read = 0
581 def set_upload_status(self, upload_status):
582 self._status = IUploadStatus(upload_status)
583 self.original.set_upload_status(upload_status)
585 def log(self, *args, **kwargs):
586 if "facility" not in kwargs:
587 kwargs["facility"] = "upload.encryption"
588 if "parent" not in kwargs:
589 kwargs["parent"] = self._log_number
590 return log.msg(*args, **kwargs)
593 if self._file_size is not None:
594 return defer.succeed(self._file_size)
595 d = self.original.get_size()
597 self._file_size = size
599 self._status.set_size(size)
601 d.addCallback(_got_size)
604 def get_all_encoding_parameters(self):
605 if self._encoding_parameters is not None:
606 return defer.succeed(self._encoding_parameters)
607 d = self.original.get_all_encoding_parameters()
608 def _got(encoding_parameters):
609 (k, happy, n, segsize) = encoding_parameters
610 self._segment_size = segsize # used by segment hashers
611 self._encoding_parameters = encoding_parameters
612 self.log("my encoding parameters: %s" % (encoding_parameters,),
614 return encoding_parameters
618 def _get_encryptor(self):
620 return defer.succeed(self._encryptor)
622 d = self.original.get_encryption_key()
627 storage_index = storage_index_hash(key)
628 assert isinstance(storage_index, str)
629 # There's no point to having the SI be longer than the key, so we
630 # specify that it is truncated to the same 128 bits as the AES key.
631 assert len(storage_index) == 16 # SHA-256 truncated to 128b
632 self._storage_index = storage_index
634 self._status.set_storage_index(storage_index)
639 def get_storage_index(self):
640 d = self._get_encryptor()
641 d.addCallback(lambda res: self._storage_index)
644 def _get_segment_hasher(self):
645 p = self._plaintext_segment_hasher
647 left = self._segment_size - self._plaintext_segment_hashed_bytes
649 p = plaintext_segment_hasher()
650 self._plaintext_segment_hasher = p
651 self._plaintext_segment_hashed_bytes = 0
652 return p, self._segment_size
654 def _update_segment_hash(self, chunk):
656 while offset < len(chunk):
657 p, segment_left = self._get_segment_hasher()
658 chunk_left = len(chunk) - offset
659 this_segment = min(chunk_left, segment_left)
660 p.update(chunk[offset:offset+this_segment])
661 self._plaintext_segment_hashed_bytes += this_segment
663 if self._plaintext_segment_hashed_bytes == self._segment_size:
664 # we've filled this segment
665 self._plaintext_segment_hashes.append(p.digest())
666 self._plaintext_segment_hasher = None
667 self.log("closed hash [%d]: %dB" %
668 (len(self._plaintext_segment_hashes)-1,
669 self._plaintext_segment_hashed_bytes),
671 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
672 segnum=len(self._plaintext_segment_hashes)-1,
673 hash=base32.b2a(p.digest()),
676 offset += this_segment
679 def read_encrypted(self, length, hash_only):
680 # make sure our parameters have been set up first
681 d = self.get_all_encoding_parameters()
683 d.addCallback(lambda ignored: self.get_size())
684 d.addCallback(lambda ignored: self._get_encryptor())
685 # then fetch and encrypt the plaintext. The unusual structure here
686 # (passing a Deferred *into* a function) is needed to avoid
687 # overflowing the stack: Deferreds don't optimize out tail recursion.
688 # We also pass in a list, to which _read_encrypted will append
691 d2 = defer.Deferred()
692 d.addCallback(lambda ignored:
693 self._read_encrypted(length, ciphertext, hash_only, d2))
694 d.addCallback(lambda ignored: d2)
697 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
699 fire_when_done.callback(ciphertext)
701 # tolerate large length= values without consuming a lot of RAM by
702 # reading just a chunk (say 50kB) at a time. This only really matters
703 # when hash_only==True (i.e. resuming an interrupted upload), since
704 # that's the case where we will be skipping over a lot of data.
705 size = min(remaining, self.CHUNKSIZE)
706 remaining = remaining - size
707 # read a chunk of plaintext..
708 d = defer.maybeDeferred(self.original.read, size)
709 # N.B.: if read() is synchronous, then since everything else is
710 # actually synchronous too, we'd blow the stack unless we stall for a
711 # tick. Once you accept a Deferred from IUploadable.read(), you must
712 # be prepared to have it fire immediately too.
713 d.addCallback(fireEventually)
714 def _good(plaintext):
716 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
717 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
718 ciphertext.extend(ct)
719 self._read_encrypted(remaining, ciphertext, hash_only,
722 fire_when_done.errback(why)
727 def _hash_and_encrypt_plaintext(self, data, hash_only):
728 assert isinstance(data, (tuple, list)), type(data)
731 # we use data.pop(0) instead of 'for chunk in data' to save
732 # memory: each chunk is destroyed as soon as we're done with it.
736 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
738 bytes_processed += len(chunk)
739 self._plaintext_hasher.update(chunk)
740 self._update_segment_hash(chunk)
741 # TODO: we have to encrypt the data (even if hash_only==True)
742 # because pycryptopp's AES-CTR implementation doesn't offer a
743 # way to change the counter value. Once pycryptopp acquires
744 # this ability, change this to simply update the counter
745 # before each call to (hash_only==False) _encryptor.process()
746 ciphertext = self._encryptor.process(chunk)
748 self.log(" skipping encryption", level=log.NOISY)
750 cryptdata.append(ciphertext)
753 self._ciphertext_bytes_read += bytes_processed
755 progress = float(self._ciphertext_bytes_read) / self._file_size
756 self._status.set_progress(1, progress)
760 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
761 # this is currently unused, but will live again when we fix #453
762 if len(self._plaintext_segment_hashes) < num_segments:
763 # close out the last one
764 assert len(self._plaintext_segment_hashes) == num_segments-1
765 p, segment_left = self._get_segment_hasher()
766 self._plaintext_segment_hashes.append(p.digest())
767 del self._plaintext_segment_hasher
768 self.log("closing plaintext leaf hasher, hashed %d bytes" %
769 self._plaintext_segment_hashed_bytes,
771 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
772 segnum=len(self._plaintext_segment_hashes)-1,
773 hash=base32.b2a(p.digest()),
775 assert len(self._plaintext_segment_hashes) == num_segments
776 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
778 def get_plaintext_hash(self):
779 h = self._plaintext_hasher.digest()
780 return defer.succeed(h)
783 return self.original.close()
786 implements(IUploadStatus)
787 statusid_counter = itertools.count(0)
790 self.storage_index = None
793 self.status = "Not started"
794 self.progress = [0.0, 0.0, 0.0]
797 self.counter = self.statusid_counter.next()
798 self.started = time.time()
800 def get_started(self):
802 def get_storage_index(self):
803 return self.storage_index
806 def using_helper(self):
808 def get_status(self):
810 def get_progress(self):
811 return tuple(self.progress)
812 def get_active(self):
814 def get_results(self):
816 def get_counter(self):
819 def set_storage_index(self, si):
820 self.storage_index = si
821 def set_size(self, size):
823 def set_helper(self, helper):
825 def set_status(self, status):
827 def set_progress(self, which, value):
828 # [0]: chk, [1]: ciphertext, [2]: encode+push
829 self.progress[which] = value
830 def set_active(self, value):
832 def set_results(self, value):
836 peer_selector_class = Tahoe2PeerSelector
838 def __init__(self, storage_broker, secret_holder):
839 # peer_selector needs storage_broker and secret_holder
840 self._storage_broker = storage_broker
841 self._secret_holder = secret_holder
842 self._log_number = self.log("CHKUploader starting", parent=None)
844 self._results = UploadResults()
845 self._storage_index = None
846 self._upload_status = UploadStatus()
847 self._upload_status.set_helper(False)
848 self._upload_status.set_active(True)
849 self._upload_status.set_results(self._results)
851 # locate_all_shareholders() will create the following attribute:
852 # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
854 def log(self, *args, **kwargs):
855 if "parent" not in kwargs:
856 kwargs["parent"] = self._log_number
857 if "facility" not in kwargs:
858 kwargs["facility"] = "tahoe.upload"
859 return log.msg(*args, **kwargs)
861 def start(self, encrypted_uploadable):
862 """Start uploading the file.
864 Returns a Deferred that will fire with the UploadResults instance.
867 self._started = time.time()
868 eu = IEncryptedUploadable(encrypted_uploadable)
869 self.log("starting upload of %s" % eu)
871 eu.set_upload_status(self._upload_status)
872 d = self.start_encrypted(eu)
873 def _done(uploadresults):
874 self._upload_status.set_active(False)
880 """Call this if the upload must be abandoned before it completes.
881 This will tell the shareholders to delete their partial shares. I
882 return a Deferred that fires when these messages have been acked."""
883 if not self._encoder:
884 # how did you call abort() before calling start() ?
885 return defer.succeed(None)
886 return self._encoder.abort()
888 def start_encrypted(self, encrypted):
889 """ Returns a Deferred that will fire with the UploadResults instance. """
890 eu = IEncryptedUploadable(encrypted)
892 started = time.time()
893 self._encoder = e = encode.Encoder(self._log_number,
895 d = e.set_encrypted_uploadable(eu)
896 d.addCallback(self.locate_all_shareholders, started)
897 d.addCallback(self.set_shareholders, e)
898 d.addCallback(lambda res: e.start())
899 d.addCallback(self._encrypted_done)
902 def locate_all_shareholders(self, encoder, started):
903 peer_selection_started = now = time.time()
904 self._storage_index_elapsed = now - started
905 storage_broker = self._storage_broker
906 secret_holder = self._secret_holder
907 storage_index = encoder.get_param("storage_index")
908 self._storage_index = storage_index
909 upload_id = si_b2a(storage_index)[:5]
910 self.log("using storage index %s" % upload_id)
911 peer_selector = self.peer_selector_class(upload_id, self._log_number,
914 share_size = encoder.get_param("share_size")
915 block_size = encoder.get_param("block_size")
916 num_segments = encoder.get_param("num_segments")
917 k,desired,n = encoder.get_param("share_counts")
919 self._peer_selection_started = time.time()
920 d = peer_selector.get_shareholders(storage_broker, secret_holder,
922 share_size, block_size,
923 num_segments, n, k, desired)
925 self._peer_selection_elapsed = time.time() - peer_selection_started
930 def set_shareholders(self, (upload_servers, already_peers), encoder):
932 @param upload_servers: a sequence of PeerTracker objects that have agreed to hold some
933 shares for us (the shareids are stashed inside the PeerTracker)
934 @paran already_peers: a dict mapping sharenum to a set of peerids
935 that claim to already have this share
937 msgtempl = "set_shareholders; upload_servers is %s, already_peers is %s"
938 values = ([', '.join([str_shareloc(k,v) for k,v in p.buckets.iteritems()])
939 for p in upload_servers], already_peers)
940 self.log(msgtempl % values, level=log.OPERATIONAL)
941 # record already-present shares in self._results
942 self._results.preexisting_shares = len(already_peers)
944 self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
945 for peer in upload_servers:
946 assert isinstance(peer, PeerTracker)
948 servermap = already_peers.copy()
949 for peer in upload_servers:
950 buckets.update(peer.buckets)
951 for shnum in peer.buckets:
952 self._peer_trackers[shnum] = peer
953 servermap.setdefault(shnum, set()).add(peer.peerid)
954 assert len(buckets) == sum([len(peer.buckets) for peer in upload_servers]), \
955 "%s (%s) != %s (%s)" % (
958 sum([len(peer.buckets) for peer in upload_servers]),
959 [(p.buckets, p.peerid) for p in upload_servers]
961 encoder.set_shareholders(buckets, servermap)
963 def _encrypted_done(self, verifycap):
964 """ Returns a Deferred that will fire with the UploadResults instance. """
966 for shnum in self._encoder.get_shares_placed():
967 peer_tracker = self._peer_trackers[shnum]
968 peerid = peer_tracker.peerid
969 r.sharemap.add(shnum, peerid)
970 r.servermap.add(peerid, shnum)
971 r.pushed_shares = len(self._encoder.get_shares_placed())
973 r.file_size = self._encoder.file_size
974 r.timings["total"] = now - self._started
975 r.timings["storage_index"] = self._storage_index_elapsed
976 r.timings["peer_selection"] = self._peer_selection_elapsed
977 r.timings.update(self._encoder.get_times())
978 r.uri_extension_data = self._encoder.get_uri_extension_data()
979 r.verifycapstr = verifycap.to_string()
982 def get_upload_status(self):
983 return self._upload_status
985 def read_this_many_bytes(uploadable, size, prepend_data=[]):
987 return defer.succeed([])
988 d = uploadable.read(size)
990 assert isinstance(data, list)
991 bytes = sum([len(piece) for piece in data])
994 remaining = size - bytes
996 return read_this_many_bytes(uploadable, remaining,
998 return prepend_data + data
1002 class LiteralUploader:
1005 self._results = UploadResults()
1006 self._status = s = UploadStatus()
1007 s.set_storage_index(None)
1009 s.set_progress(0, 1.0)
1011 s.set_results(self._results)
1013 def start(self, uploadable):
1014 uploadable = IUploadable(uploadable)
1015 d = uploadable.get_size()
1016 def _got_size(size):
1018 self._status.set_size(size)
1019 self._results.file_size = size
1020 return read_this_many_bytes(uploadable, size)
1021 d.addCallback(_got_size)
1022 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
1023 d.addCallback(lambda u: u.to_string())
1024 d.addCallback(self._build_results)
1027 def _build_results(self, uri):
1028 self._results.uri = uri
1029 self._status.set_status("Finished")
1030 self._status.set_progress(1, 1.0)
1031 self._status.set_progress(2, 1.0)
1032 return self._results
1037 def get_upload_status(self):
1040 class RemoteEncryptedUploadable(Referenceable):
1041 implements(RIEncryptedUploadable)
1043 def __init__(self, encrypted_uploadable, upload_status):
1044 self._eu = IEncryptedUploadable(encrypted_uploadable)
1046 self._bytes_sent = 0
1047 self._status = IUploadStatus(upload_status)
1048 # we are responsible for updating the status string while we run, and
1049 # for setting the ciphertext-fetch progress.
1053 if self._size is not None:
1054 return defer.succeed(self._size)
1055 d = self._eu.get_size()
1056 def _got_size(size):
1059 d.addCallback(_got_size)
1062 def remote_get_size(self):
1063 return self.get_size()
1064 def remote_get_all_encoding_parameters(self):
1065 return self._eu.get_all_encoding_parameters()
1067 def _read_encrypted(self, length, hash_only):
1068 d = self._eu.read_encrypted(length, hash_only)
1071 self._offset += length
1073 size = sum([len(data) for data in strings])
1074 self._offset += size
1076 d.addCallback(_read)
1079 def remote_read_encrypted(self, offset, length):
1080 # we don't support seek backwards, but we allow skipping forwards
1081 precondition(offset >= 0, offset)
1082 precondition(length >= 0, length)
1083 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1085 precondition(offset >= self._offset, offset, self._offset)
1086 if offset > self._offset:
1087 # read the data from disk anyways, to build up the hash tree
1088 skip = offset - self._offset
1089 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1090 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1091 d = self._read_encrypted(skip, hash_only=True)
1093 d = defer.succeed(None)
1095 def _at_correct_offset(res):
1096 assert offset == self._offset, "%d != %d" % (offset, self._offset)
1097 return self._read_encrypted(length, hash_only=False)
1098 d.addCallback(_at_correct_offset)
1101 size = sum([len(data) for data in strings])
1102 self._bytes_sent += size
1104 d.addCallback(_read)
1107 def remote_close(self):
1108 return self._eu.close()
1111 class AssistedUploader:
1113 def __init__(self, helper):
1114 self._helper = helper
1115 self._log_number = log.msg("AssistedUploader starting")
1116 self._storage_index = None
1117 self._upload_status = s = UploadStatus()
1121 def log(self, *args, **kwargs):
1122 if "parent" not in kwargs:
1123 kwargs["parent"] = self._log_number
1124 return log.msg(*args, **kwargs)
1126 def start(self, encrypted_uploadable, storage_index):
1127 """Start uploading the file.
1129 Returns a Deferred that will fire with the UploadResults instance.
1131 precondition(isinstance(storage_index, str), storage_index)
1132 self._started = time.time()
1133 eu = IEncryptedUploadable(encrypted_uploadable)
1134 eu.set_upload_status(self._upload_status)
1135 self._encuploadable = eu
1136 self._storage_index = storage_index
1138 d.addCallback(self._got_size)
1139 d.addCallback(lambda res: eu.get_all_encoding_parameters())
1140 d.addCallback(self._got_all_encoding_parameters)
1141 d.addCallback(self._contact_helper)
1142 d.addCallback(self._build_verifycap)
1144 self._upload_status.set_active(False)
1149 def _got_size(self, size):
1151 self._upload_status.set_size(size)
1153 def _got_all_encoding_parameters(self, params):
1154 k, happy, n, segment_size = params
1155 # stash these for URI generation later
1156 self._needed_shares = k
1157 self._total_shares = n
1158 self._segment_size = segment_size
1160 def _contact_helper(self, res):
1161 now = self._time_contacting_helper_start = time.time()
1162 self._storage_index_elapsed = now - self._started
1163 self.log(format="contacting helper for SI %(si)s..",
1164 si=si_b2a(self._storage_index), level=log.NOISY)
1165 self._upload_status.set_status("Contacting Helper")
1166 d = self._helper.callRemote("upload_chk", self._storage_index)
1167 d.addCallback(self._contacted_helper)
1170 def _contacted_helper(self, (upload_results, upload_helper)):
1172 elapsed = now - self._time_contacting_helper_start
1173 self._elapsed_time_contacting_helper = elapsed
1175 self.log("helper says we need to upload", level=log.NOISY)
1176 self._upload_status.set_status("Uploading Ciphertext")
1177 # we need to upload the file
1178 reu = RemoteEncryptedUploadable(self._encuploadable,
1179 self._upload_status)
1180 # let it pre-compute the size for progress purposes
1182 d.addCallback(lambda ignored:
1183 upload_helper.callRemote("upload", reu))
1184 # this Deferred will fire with the upload results
1186 self.log("helper says file is already uploaded", level=log.OPERATIONAL)
1187 self._upload_status.set_progress(1, 1.0)
1188 self._upload_status.set_results(upload_results)
1189 return upload_results
1191 def _convert_old_upload_results(self, upload_results):
1192 # pre-1.3.0 helpers return upload results which contain a mapping
1193 # from shnum to a single human-readable string, containing things
1194 # like "Found on [x],[y],[z]" (for healthy files that were already in
1195 # the grid), "Found on [x]" (for files that needed upload but which
1196 # discovered pre-existing shares), and "Placed on [x]" (for newly
1197 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1198 # set of binary serverid strings.
1200 # the old results are too hard to deal with (they don't even contain
1201 # as much information as the new results, since the nodeids are
1202 # abbreviated), so if we detect old results, just clobber them.
1204 sharemap = upload_results.sharemap
1205 if str in [type(v) for v in sharemap.values()]:
1206 upload_results.sharemap = None
1208 def _build_verifycap(self, upload_results):
1209 self.log("upload finished, building readcap", level=log.OPERATIONAL)
1210 self._convert_old_upload_results(upload_results)
1211 self._upload_status.set_status("Building Readcap")
1213 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1214 assert r.uri_extension_data["total_shares"] == self._total_shares
1215 assert r.uri_extension_data["segment_size"] == self._segment_size
1216 assert r.uri_extension_data["size"] == self._size
1217 r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1218 uri_extension_hash=r.uri_extension_hash,
1219 needed_shares=self._needed_shares,
1220 total_shares=self._total_shares, size=self._size
1223 r.file_size = self._size
1224 r.timings["storage_index"] = self._storage_index_elapsed
1225 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1226 if "total" in r.timings:
1227 r.timings["helper_total"] = r.timings["total"]
1228 r.timings["total"] = now - self._started
1229 self._upload_status.set_status("Finished")
1230 self._upload_status.set_results(r)
1233 def get_upload_status(self):
1234 return self._upload_status
1236 class BaseUploadable:
1237 default_max_segment_size = 128*KiB # overridden by max_segment_size
1238 default_encoding_param_k = 3 # overridden by encoding_parameters
1239 default_encoding_param_happy = 7
1240 default_encoding_param_n = 10
1242 max_segment_size = None
1243 encoding_param_k = None
1244 encoding_param_happy = None
1245 encoding_param_n = None
1247 _all_encoding_parameters = None
1250 def set_upload_status(self, upload_status):
1251 self._status = IUploadStatus(upload_status)
1253 def set_default_encoding_parameters(self, default_params):
1254 assert isinstance(default_params, dict)
1255 for k,v in default_params.items():
1256 precondition(isinstance(k, str), k, v)
1257 precondition(isinstance(v, int), k, v)
1258 if "k" in default_params:
1259 self.default_encoding_param_k = default_params["k"]
1260 if "happy" in default_params:
1261 self.default_encoding_param_happy = default_params["happy"]
1262 if "n" in default_params:
1263 self.default_encoding_param_n = default_params["n"]
1264 if "max_segment_size" in default_params:
1265 self.default_max_segment_size = default_params["max_segment_size"]
1267 def get_all_encoding_parameters(self):
1268 if self._all_encoding_parameters:
1269 return defer.succeed(self._all_encoding_parameters)
1271 max_segsize = self.max_segment_size or self.default_max_segment_size
1272 k = self.encoding_param_k or self.default_encoding_param_k
1273 happy = self.encoding_param_happy or self.default_encoding_param_happy
1274 n = self.encoding_param_n or self.default_encoding_param_n
1277 def _got_size(file_size):
1278 # for small files, shrink the segment size to avoid wasting space
1279 segsize = min(max_segsize, file_size)
1280 # this must be a multiple of 'required_shares'==k
1281 segsize = mathutil.next_multiple(segsize, k)
1282 encoding_parameters = (k, happy, n, segsize)
1283 self._all_encoding_parameters = encoding_parameters
1284 return encoding_parameters
1285 d.addCallback(_got_size)
1288 class FileHandle(BaseUploadable):
1289 implements(IUploadable)
1291 def __init__(self, filehandle, convergence):
1293 Upload the data from the filehandle. If convergence is None then a
1294 random encryption key will be used, else the plaintext will be hashed,
1295 then the hash will be hashed together with the string in the
1296 "convergence" argument to form the encryption key.
1298 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1299 self._filehandle = filehandle
1301 self.convergence = convergence
1304 def _get_encryption_key_convergent(self):
1305 if self._key is not None:
1306 return defer.succeed(self._key)
1309 # that sets self._size as a side-effect
1310 d.addCallback(lambda size: self.get_all_encoding_parameters())
1312 k, happy, n, segsize = params
1313 f = self._filehandle
1314 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1319 data = f.read(BLOCKSIZE)
1322 enckey_hasher.update(data)
1323 # TODO: setting progress in a non-yielding loop is kind of
1324 # pointless, but I'm anticipating (perhaps prematurely) the
1325 # day when we use a slowjob or twisted's CooperatorService to
1326 # make this yield time to other jobs.
1327 bytes_read += len(data)
1329 self._status.set_progress(0, float(bytes_read)/self._size)
1331 self._key = enckey_hasher.digest()
1333 self._status.set_progress(0, 1.0)
1334 assert len(self._key) == 16
1339 def _get_encryption_key_random(self):
1340 if self._key is None:
1341 self._key = os.urandom(16)
1342 return defer.succeed(self._key)
1344 def get_encryption_key(self):
1345 if self.convergence is not None:
1346 return self._get_encryption_key_convergent()
1348 return self._get_encryption_key_random()
1351 if self._size is not None:
1352 return defer.succeed(self._size)
1353 self._filehandle.seek(0,2)
1354 size = self._filehandle.tell()
1356 self._filehandle.seek(0)
1357 return defer.succeed(size)
1359 def read(self, length):
1360 return defer.succeed([self._filehandle.read(length)])
1363 # the originator of the filehandle reserves the right to close it
1366 class FileName(FileHandle):
1367 def __init__(self, filename, convergence):
1369 Upload the data from the filename. If convergence is None then a
1370 random encryption key will be used, else the plaintext will be hashed,
1371 then the hash will be hashed together with the string in the
1372 "convergence" argument to form the encryption key.
1374 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1375 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1377 FileHandle.close(self)
1378 self._filehandle.close()
1380 class Data(FileHandle):
1381 def __init__(self, data, convergence):
1383 Upload the data from the data argument. If convergence is None then a
1384 random encryption key will be used, else the plaintext will be hashed,
1385 then the hash will be hashed together with the string in the
1386 "convergence" argument to form the encryption key.
1388 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1389 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1391 class Uploader(service.MultiService, log.PrefixingLogMixin):
1392 """I am a service that allows file uploading. I am a service-child of the
1395 implements(IUploader)
1397 URI_LIT_SIZE_THRESHOLD = 55
1399 def __init__(self, helper_furl=None, stats_provider=None):
1400 self._helper_furl = helper_furl
1401 self.stats_provider = stats_provider
1403 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1404 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1405 service.MultiService.__init__(self)
1407 def startService(self):
1408 service.MultiService.startService(self)
1409 if self._helper_furl:
1410 self.parent.tub.connectTo(self._helper_furl,
1413 def _got_helper(self, helper):
1414 self.log("got helper connection, getting versions")
1415 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1417 "application-version": "unknown: no get_version()",
1419 d = add_version_to_remote_reference(helper, default)
1420 d.addCallback(self._got_versioned_helper)
1422 def _got_versioned_helper(self, helper):
1423 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1424 if needed not in helper.version:
1425 raise InsufficientVersionError(needed, helper.version)
1426 self._helper = helper
1427 helper.notifyOnDisconnect(self._lost_helper)
1429 def _lost_helper(self):
1432 def get_helper_info(self):
1433 # return a tuple of (helper_furl_or_None, connected_bool)
1434 return (self._helper_furl, bool(self._helper))
1437 def upload(self, uploadable, history=None):
1439 Returns a Deferred that will fire with the UploadResults instance.
1444 uploadable = IUploadable(uploadable)
1445 d = uploadable.get_size()
1446 def _got_size(size):
1447 default_params = self.parent.get_encoding_parameters()
1448 precondition(isinstance(default_params, dict), default_params)
1449 precondition("max_segment_size" in default_params, default_params)
1450 uploadable.set_default_encoding_parameters(default_params)
1452 if self.stats_provider:
1453 self.stats_provider.count('uploader.files_uploaded', 1)
1454 self.stats_provider.count('uploader.bytes_uploaded', size)
1456 if size <= self.URI_LIT_SIZE_THRESHOLD:
1457 uploader = LiteralUploader()
1458 return uploader.start(uploadable)
1460 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1461 d2 = defer.succeed(None)
1463 uploader = AssistedUploader(self._helper)
1464 d2.addCallback(lambda x: eu.get_storage_index())
1465 d2.addCallback(lambda si: uploader.start(eu, si))
1467 storage_broker = self.parent.get_storage_broker()
1468 secret_holder = self.parent._secret_holder
1469 uploader = CHKUploader(storage_broker, secret_holder)
1470 d2.addCallback(lambda x: uploader.start(eu))
1472 self._all_uploads[uploader] = None
1474 history.add_upload(uploader.get_upload_status())
1475 def turn_verifycap_into_read_cap(uploadresults):
1476 # Generate the uri from the verifycap plus the key.
1477 d3 = uploadable.get_encryption_key()
1478 def put_readcap_into_results(key):
1479 v = uri.from_string(uploadresults.verifycapstr)
1480 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1481 uploadresults.uri = r.to_string()
1482 return uploadresults
1483 d3.addCallback(put_readcap_into_results)
1485 d2.addCallback(turn_verifycap_into_read_cap)
1487 d.addCallback(_got_size)