1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9 file_cancel_secret_hash, bucket_renewal_secret_hash, \
10 bucket_cancel_secret_hash, plaintext_hasher, \
11 storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17 shares_by_server, merge_peers, \
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23 NoServersError, InsufficientVersionError, UploadUnhappinessError
24 from allmydata.immutable import layout
25 from pycryptopp.cipher.aes import AES
27 from cStringIO import StringIO
36 class HaveAllPeersError(Exception):
37 # we use this to jump out of the loop
40 # this wants to live in storage, not here
41 class TooFullError(Exception):
44 class UploadResults(Copyable, RemoteCopy):
45 implements(IUploadResults)
46 # note: don't change this string, it needs to match the value used on the
47 # helper, and it does *not* need to match the fully-qualified
48 # package/module/class name
49 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
52 # also, think twice about changing the shape of any existing attribute,
53 # because instances of this class are sent from the helper to its client,
54 # so changing this may break compatibility. Consider adding new fields
55 # instead of modifying existing ones.
58 self.timings = {} # dict of name to number of seconds
59 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
60 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
62 self.ciphertext_fetched = None # how much the helper fetched
64 self.preexisting_shares = None # count of shares already present
65 self.pushed_shares = None # count of shares we pushed
68 # our current uri_extension is 846 bytes for small files, a few bytes
69 # more for larger ones (since the filesize is encoded in decimal in a
70 # few places). Ask for a little bit more just in case we need it. If
71 # the extension changes size, we can change EXTENSION_SIZE to
72 # allocate a more accurate amount of space.
74 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
77 def pretty_print_shnum_to_servers(s):
78 return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
81 def __init__(self, peerid, storage_server,
82 sharesize, blocksize, num_segments, num_share_hashes,
84 bucket_renewal_secret, bucket_cancel_secret):
85 precondition(isinstance(peerid, str), peerid)
86 precondition(len(peerid) == 20, peerid)
88 self._storageserver = storage_server # to an RIStorageServer
89 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
90 self.sharesize = sharesize
92 wbp = layout.make_write_bucket_proxy(None, sharesize,
93 blocksize, num_segments,
95 EXTENSION_SIZE, peerid)
96 self.wbp_class = wbp.__class__ # to create more of them
97 self.allocated_size = wbp.get_allocated_size()
98 self.blocksize = blocksize
99 self.num_segments = num_segments
100 self.num_share_hashes = num_share_hashes
101 self.storage_index = storage_index
103 self.renew_secret = bucket_renewal_secret
104 self.cancel_secret = bucket_cancel_secret
107 return ("<PeerTracker for peer %s and SI %s>"
108 % (idlib.shortnodeid_b2a(self.peerid),
109 si_b2a(self.storage_index)[:5]))
111 def query(self, sharenums):
112 d = self._storageserver.callRemote("allocate_buckets",
118 canary=Referenceable())
119 d.addCallback(self._got_reply)
122 def ask_about_existing_shares(self):
123 return self._storageserver.callRemote("get_buckets",
126 def _got_reply(self, (alreadygot, buckets)):
127 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
129 for sharenum, rref in buckets.iteritems():
130 bp = self.wbp_class(rref, self.sharesize,
133 self.num_share_hashes,
137 self.buckets.update(b)
138 return (alreadygot, set(b.keys()))
143 I abort the remote bucket writers for all shares. This is a good idea
144 to conserve space on the storage server.
146 self.abort_some_buckets(self.buckets.keys())
148 def abort_some_buckets(self, sharenums):
150 I abort the remote bucket writers for the share numbers in sharenums.
152 for sharenum in sharenums:
153 if sharenum in self.buckets:
154 self.buckets[sharenum].abort()
155 del self.buckets[sharenum]
158 class Tahoe2PeerSelector(log.PrefixingLogMixin):
160 def __init__(self, upload_id, logparent=None, upload_status=None):
161 self.upload_id = upload_id
162 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
163 # Peers that are working normally, but full.
166 self.num_peers_contacted = 0
167 self.last_failure_msg = None
168 self._status = IUploadStatus(upload_status)
169 log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
170 self.log("starting", level=log.OPERATIONAL)
173 return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
175 def get_shareholders(self, storage_broker, secret_holder,
176 storage_index, share_size, block_size,
177 num_segments, total_shares, needed_shares,
178 servers_of_happiness):
180 @return: (upload_servers, already_peers), where upload_servers is a set of
181 PeerTracker instances that have agreed to hold some shares
182 for us (the shareids are stashed inside the PeerTracker),
183 and already_peers is a dict mapping shnum to a set of peers
184 which claim to already have the share.
188 self._status.set_status("Contacting Peers..")
190 self.total_shares = total_shares
191 self.servers_of_happiness = servers_of_happiness
192 self.needed_shares = needed_shares
194 self.homeless_shares = set(range(total_shares))
195 self.contacted_peers = [] # peers worth asking again
196 self.contacted_peers2 = [] # peers that we have asked again
197 self._started_second_pass = False
198 self.use_peers = set() # PeerTrackers that have shares assigned to them
199 self.preexisting_shares = {} # shareid => set(peerids) holding shareid
200 # We don't try to allocate shares to these servers, since they've said
201 # that they're incapable of storing shares of the size that we'd want
202 # to store. We keep them around because they may have existing shares
203 # for this storage index, which we want to know about for accurate
204 # servers_of_happiness accounting
205 # (this is eventually a list, but it is initialized later)
206 self.readonly_peers = None
207 # These peers have shares -- any shares -- for our SI. We keep
208 # track of these to write an error message with them later.
209 self.peers_with_shares = set()
211 # this needed_hashes computation should mirror
212 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
213 # (instead of a HashTree) because we don't require actual hashing
214 # just to count the levels.
215 ht = hashtree.IncompleteHashTree(total_shares)
216 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
218 # figure out how much space to ask for
219 wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
220 num_share_hashes, EXTENSION_SIZE,
222 allocated_size = wbp.get_allocated_size()
223 all_peers = storage_broker.get_servers_for_index(storage_index)
225 raise NoServersError("client gave us zero peers")
227 # filter the list of peers according to which ones can accomodate
228 # this request. This excludes older peers (which used a 4-byte size
229 # field) from getting large shares (for files larger than about
230 # 12GiB). See #439 for details.
231 def _get_maxsize(peer):
232 (peerid, conn) = peer
233 v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
234 return v1["maximum-immutable-share-size"]
235 writable_peers = [peer for peer in all_peers
236 if _get_maxsize(peer) >= allocated_size]
237 readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers)
239 # decide upon the renewal/cancel secrets, to include them in the
240 # allocate_buckets query.
241 client_renewal_secret = secret_holder.get_renewal_secret()
242 client_cancel_secret = secret_holder.get_cancel_secret()
244 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
246 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
248 def _make_trackers(peers):
249 return [PeerTracker(peerid, conn,
250 share_size, block_size,
251 num_segments, num_share_hashes,
253 bucket_renewal_secret_hash(file_renewal_secret,
255 bucket_cancel_secret_hash(file_cancel_secret,
257 for (peerid, conn) in peers]
258 self.uncontacted_peers = _make_trackers(writable_peers)
259 self.readonly_peers = _make_trackers(readonly_peers)
260 # We now ask peers that can't hold any new shares about existing
261 # shares that they might have for our SI. Once this is done, we
262 # start placing the shares that we haven't already accounted
265 if self._status and self.readonly_peers:
266 self._status.set_status("Contacting readonly peers to find "
267 "any existing shares")
268 for peer in self.readonly_peers:
269 assert isinstance(peer, PeerTracker)
270 d = peer.ask_about_existing_shares()
271 d.addBoth(self._handle_existing_response, peer.peerid)
273 self.num_peers_contacted += 1
274 self.query_count += 1
275 self.log("asking peer %s for any existing shares" %
276 (idlib.shortnodeid_b2a(peer.peerid),),
278 dl = defer.DeferredList(ds)
279 dl.addCallback(lambda ign: self._loop())
283 def _handle_existing_response(self, res, peer):
285 I handle responses to the queries sent by
286 Tahoe2PeerSelector._existing_shares.
288 if isinstance(res, failure.Failure):
289 self.log("%s got error during existing shares check: %s"
290 % (idlib.shortnodeid_b2a(peer), res),
292 self.error_count += 1
293 self.bad_query_count += 1
297 self.peers_with_shares.add(peer)
298 self.log("response to get_buckets() from peer %s: alreadygot=%s"
299 % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
301 for bucket in buckets:
302 self.preexisting_shares.setdefault(bucket, set()).add(peer)
303 self.homeless_shares.discard(bucket)
305 self.bad_query_count += 1
308 def _get_progress_message(self):
309 if not self.homeless_shares:
310 msg = "placed all %d shares, " % (self.total_shares)
312 msg = ("placed %d shares out of %d total (%d homeless), " %
313 (self.total_shares - len(self.homeless_shares),
315 len(self.homeless_shares)))
316 return (msg + "want to place shares on at least %d servers such that "
317 "any %d of them have enough shares to recover the file, "
318 "sent %d queries to %d peers, "
319 "%d queries placed some shares, %d placed none "
320 "(of which %d placed none due to the server being"
321 " full and %d placed none due to an error)" %
322 (self.servers_of_happiness, self.needed_shares,
323 self.query_count, self.num_peers_contacted,
324 self.good_query_count, self.bad_query_count,
325 self.full_count, self.error_count))
329 if not self.homeless_shares:
330 merged = merge_peers(self.preexisting_shares, self.use_peers)
331 effective_happiness = servers_of_happiness(merged)
332 if self.servers_of_happiness <= effective_happiness:
333 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, self.use_peers: %s, self.preexisting_shares: %s" % (self, self._get_progress_message(), pretty_print_shnum_to_servers(merged), [', '.join(["%s: %s" % (k, idlib.shortnodeid_b2a(v._nodeid),) for k,v in p.buckets.iteritems()]) for p in self.use_peers], pretty_print_shnum_to_servers(self.preexisting_shares)))
334 self.log(msg, level=log.OPERATIONAL)
335 return (self.use_peers, self.preexisting_shares)
337 # We're not okay right now, but maybe we can fix it by
338 # redistributing some shares. In cases where one or two
339 # servers has, before the upload, all or most of the
340 # shares for a given SI, this can work by allowing _loop
341 # a chance to spread those out over the other peers,
342 delta = self.servers_of_happiness - effective_happiness
343 shares = shares_by_server(self.preexisting_shares)
344 # Each server in shares maps to a set of shares stored on it.
345 # Since we want to keep at least one share on each server
346 # that has one (otherwise we'd only be making
347 # the situation worse by removing distinct servers),
348 # each server has len(its shares) - 1 to spread around.
349 shares_to_spread = sum([len(list(sharelist)) - 1
350 for (server, sharelist)
352 if delta <= len(self.uncontacted_peers) and \
353 shares_to_spread >= delta:
354 items = shares.items()
355 while len(self.homeless_shares) < delta:
356 # Loop through the allocated shares, removing
357 # one from each server that has more than one
358 # and putting it back into self.homeless_shares
359 # until we've done this delta times.
360 server, sharelist = items.pop()
361 if len(sharelist) > 1:
362 share = sharelist.pop()
363 self.homeless_shares.add(share)
364 self.preexisting_shares[share].remove(server)
365 if not self.preexisting_shares[share]:
366 del self.preexisting_shares[share]
367 items.append((server, sharelist))
368 for writer in self.use_peers:
369 writer.abort_some_buckets(self.homeless_shares)
372 # Redistribution won't help us; fail.
373 peer_count = len(self.peers_with_shares)
374 msg = failure_message(peer_count,
376 self.servers_of_happiness,
378 self.log("server selection unsuccessful for %r: %s (%s), merged=%s" % (self, msg, self._get_progress_message(), pretty_print_shnum_to_servers(merged)), level=log.INFREQUENT)
379 return self._failed("%s (%s)" % (msg, self._get_progress_message()))
381 if self.uncontacted_peers:
382 peer = self.uncontacted_peers.pop(0)
383 # TODO: don't pre-convert all peerids to PeerTrackers
384 assert isinstance(peer, PeerTracker)
386 shares_to_ask = set(sorted(self.homeless_shares)[:1])
387 self.homeless_shares -= shares_to_ask
388 self.query_count += 1
389 self.num_peers_contacted += 1
391 self._status.set_status("Contacting Peers [%s] (first query),"
393 % (idlib.shortnodeid_b2a(peer.peerid),
394 len(self.homeless_shares)))
395 d = peer.query(shares_to_ask)
396 d.addBoth(self._got_response, peer, shares_to_ask,
397 self.contacted_peers)
399 elif self.contacted_peers:
400 # ask a peer that we've already asked.
401 if not self._started_second_pass:
402 self.log("starting second pass",
404 self._started_second_pass = True
405 num_shares = mathutil.div_ceil(len(self.homeless_shares),
406 len(self.contacted_peers))
407 peer = self.contacted_peers.pop(0)
408 shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
409 self.homeless_shares -= shares_to_ask
410 self.query_count += 1
412 self._status.set_status("Contacting Peers [%s] (second query),"
414 % (idlib.shortnodeid_b2a(peer.peerid),
415 len(self.homeless_shares)))
416 d = peer.query(shares_to_ask)
417 d.addBoth(self._got_response, peer, shares_to_ask,
418 self.contacted_peers2)
420 elif self.contacted_peers2:
421 # we've finished the second-or-later pass. Move all the remaining
422 # peers back into self.contacted_peers for the next pass.
423 self.contacted_peers.extend(self.contacted_peers2)
424 self.contacted_peers2[:] = []
427 # no more peers. If we haven't placed enough shares, we fail.
428 merged = merge_peers(self.preexisting_shares, self.use_peers)
429 effective_happiness = servers_of_happiness(merged)
430 if effective_happiness < self.servers_of_happiness:
431 msg = failure_message(len(self.peers_with_shares),
433 self.servers_of_happiness,
435 msg = ("peer selection failed for %s: %s (%s)" % (self,
437 self._get_progress_message()))
438 if self.last_failure_msg:
439 msg += " (%s)" % (self.last_failure_msg,)
440 self.log(msg, level=log.UNUSUAL)
441 return self._failed(msg)
443 # we placed enough to be happy, so we're done
445 self._status.set_status("Placed all shares")
446 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
447 self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
448 self.log(msg, level=log.OPERATIONAL)
449 return (self.use_peers, self.preexisting_shares)
451 def _got_response(self, res, peer, shares_to_ask, put_peer_here):
452 if isinstance(res, failure.Failure):
453 # This is unusual, and probably indicates a bug or a network
455 self.log("%s got error during peer selection: %s" % (peer, res),
457 self.error_count += 1
458 self.bad_query_count += 1
459 self.homeless_shares |= shares_to_ask
460 if (self.uncontacted_peers
461 or self.contacted_peers
462 or self.contacted_peers2):
463 # there is still hope, so just loop
466 # No more peers, so this upload might fail (it depends upon
467 # whether we've hit servers_of_happiness or not). Log the last
468 # failure we got: if a coding error causes all peers to fail
469 # in the same way, this allows the common failure to be seen
470 # by the uploader and should help with debugging
471 msg = ("last failure (from %s) was: %s" % (peer, res))
472 self.last_failure_msg = msg
474 (alreadygot, allocated) = res
475 self.log("response to allocate_buckets() from peer %s: alreadygot=%s, allocated=%s"
476 % (idlib.shortnodeid_b2a(peer.peerid),
477 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
481 self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
482 if s in self.homeless_shares:
483 self.homeless_shares.remove(s)
485 elif s in shares_to_ask:
488 # the PeerTracker will remember which shares were allocated on
489 # that peer. We just have to remember to use them.
491 self.use_peers.add(peer)
494 if allocated or alreadygot:
495 self.peers_with_shares.add(peer.peerid)
497 not_yet_present = set(shares_to_ask) - set(alreadygot)
498 still_homeless = not_yet_present - set(allocated)
501 # They accepted at least one of the shares that we asked
502 # them to accept, or they had a share that we didn't ask
503 # them to accept but that we hadn't placed yet, so this
504 # was a productive query
505 self.good_query_count += 1
507 self.bad_query_count += 1
511 # In networks with lots of space, this is very unusual and
512 # probably indicates an error. In networks with peers that
513 # are full, it is merely unusual. In networks that are very
514 # full, it is common, and many uploads will fail. In most
515 # cases, this is obviously not fatal, and we'll just use some
518 # some shares are still homeless, keep trying to find them a
519 # home. The ones that were rejected get first priority.
520 self.homeless_shares |= still_homeless
521 # Since they were unable to accept all of our requests, so it
522 # is safe to assume that asking them again won't help.
524 # if they *were* able to accept everything, they might be
525 # willing to accept even more.
526 put_peer_here.append(peer)
532 def _failed(self, msg):
534 I am called when peer selection fails. I first abort all of the
535 remote buckets that I allocated during my unsuccessful attempt to
536 place shares for this file. I then raise an
537 UploadUnhappinessError with my msg argument.
539 for peer in self.use_peers:
540 assert isinstance(peer, PeerTracker)
544 raise UploadUnhappinessError(msg)
547 class EncryptAnUploadable:
548 """This is a wrapper that takes an IUploadable and provides
549 IEncryptedUploadable."""
550 implements(IEncryptedUploadable)
553 def __init__(self, original, log_parent=None):
554 self.original = IUploadable(original)
555 self._log_number = log_parent
556 self._encryptor = None
557 self._plaintext_hasher = plaintext_hasher()
558 self._plaintext_segment_hasher = None
559 self._plaintext_segment_hashes = []
560 self._encoding_parameters = None
561 self._file_size = None
562 self._ciphertext_bytes_read = 0
565 def set_upload_status(self, upload_status):
566 self._status = IUploadStatus(upload_status)
567 self.original.set_upload_status(upload_status)
569 def log(self, *args, **kwargs):
570 if "facility" not in kwargs:
571 kwargs["facility"] = "upload.encryption"
572 if "parent" not in kwargs:
573 kwargs["parent"] = self._log_number
574 return log.msg(*args, **kwargs)
577 if self._file_size is not None:
578 return defer.succeed(self._file_size)
579 d = self.original.get_size()
581 self._file_size = size
583 self._status.set_size(size)
585 d.addCallback(_got_size)
588 def get_all_encoding_parameters(self):
589 if self._encoding_parameters is not None:
590 return defer.succeed(self._encoding_parameters)
591 d = self.original.get_all_encoding_parameters()
592 def _got(encoding_parameters):
593 (k, happy, n, segsize) = encoding_parameters
594 self._segment_size = segsize # used by segment hashers
595 self._encoding_parameters = encoding_parameters
596 self.log("my encoding parameters: %s" % (encoding_parameters,),
598 return encoding_parameters
602 def _get_encryptor(self):
604 return defer.succeed(self._encryptor)
606 d = self.original.get_encryption_key()
611 storage_index = storage_index_hash(key)
612 assert isinstance(storage_index, str)
613 # There's no point to having the SI be longer than the key, so we
614 # specify that it is truncated to the same 128 bits as the AES key.
615 assert len(storage_index) == 16 # SHA-256 truncated to 128b
616 self._storage_index = storage_index
618 self._status.set_storage_index(storage_index)
623 def get_storage_index(self):
624 d = self._get_encryptor()
625 d.addCallback(lambda res: self._storage_index)
628 def _get_segment_hasher(self):
629 p = self._plaintext_segment_hasher
631 left = self._segment_size - self._plaintext_segment_hashed_bytes
633 p = plaintext_segment_hasher()
634 self._plaintext_segment_hasher = p
635 self._plaintext_segment_hashed_bytes = 0
636 return p, self._segment_size
638 def _update_segment_hash(self, chunk):
640 while offset < len(chunk):
641 p, segment_left = self._get_segment_hasher()
642 chunk_left = len(chunk) - offset
643 this_segment = min(chunk_left, segment_left)
644 p.update(chunk[offset:offset+this_segment])
645 self._plaintext_segment_hashed_bytes += this_segment
647 if self._plaintext_segment_hashed_bytes == self._segment_size:
648 # we've filled this segment
649 self._plaintext_segment_hashes.append(p.digest())
650 self._plaintext_segment_hasher = None
651 self.log("closed hash [%d]: %dB" %
652 (len(self._plaintext_segment_hashes)-1,
653 self._plaintext_segment_hashed_bytes),
655 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
656 segnum=len(self._plaintext_segment_hashes)-1,
657 hash=base32.b2a(p.digest()),
660 offset += this_segment
663 def read_encrypted(self, length, hash_only):
664 # make sure our parameters have been set up first
665 d = self.get_all_encoding_parameters()
667 d.addCallback(lambda ignored: self.get_size())
668 d.addCallback(lambda ignored: self._get_encryptor())
669 # then fetch and encrypt the plaintext. The unusual structure here
670 # (passing a Deferred *into* a function) is needed to avoid
671 # overflowing the stack: Deferreds don't optimize out tail recursion.
672 # We also pass in a list, to which _read_encrypted will append
675 d2 = defer.Deferred()
676 d.addCallback(lambda ignored:
677 self._read_encrypted(length, ciphertext, hash_only, d2))
678 d.addCallback(lambda ignored: d2)
681 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
683 fire_when_done.callback(ciphertext)
685 # tolerate large length= values without consuming a lot of RAM by
686 # reading just a chunk (say 50kB) at a time. This only really matters
687 # when hash_only==True (i.e. resuming an interrupted upload), since
688 # that's the case where we will be skipping over a lot of data.
689 size = min(remaining, self.CHUNKSIZE)
690 remaining = remaining - size
691 # read a chunk of plaintext..
692 d = defer.maybeDeferred(self.original.read, size)
693 # N.B.: if read() is synchronous, then since everything else is
694 # actually synchronous too, we'd blow the stack unless we stall for a
695 # tick. Once you accept a Deferred from IUploadable.read(), you must
696 # be prepared to have it fire immediately too.
697 d.addCallback(fireEventually)
698 def _good(plaintext):
700 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
701 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
702 ciphertext.extend(ct)
703 self._read_encrypted(remaining, ciphertext, hash_only,
706 fire_when_done.errback(why)
711 def _hash_and_encrypt_plaintext(self, data, hash_only):
712 assert isinstance(data, (tuple, list)), type(data)
715 # we use data.pop(0) instead of 'for chunk in data' to save
716 # memory: each chunk is destroyed as soon as we're done with it.
720 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
722 bytes_processed += len(chunk)
723 self._plaintext_hasher.update(chunk)
724 self._update_segment_hash(chunk)
725 # TODO: we have to encrypt the data (even if hash_only==True)
726 # because pycryptopp's AES-CTR implementation doesn't offer a
727 # way to change the counter value. Once pycryptopp acquires
728 # this ability, change this to simply update the counter
729 # before each call to (hash_only==False) _encryptor.process()
730 ciphertext = self._encryptor.process(chunk)
732 self.log(" skipping encryption", level=log.NOISY)
734 cryptdata.append(ciphertext)
737 self._ciphertext_bytes_read += bytes_processed
739 progress = float(self._ciphertext_bytes_read) / self._file_size
740 self._status.set_progress(1, progress)
744 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
745 # this is currently unused, but will live again when we fix #453
746 if len(self._plaintext_segment_hashes) < num_segments:
747 # close out the last one
748 assert len(self._plaintext_segment_hashes) == num_segments-1
749 p, segment_left = self._get_segment_hasher()
750 self._plaintext_segment_hashes.append(p.digest())
751 del self._plaintext_segment_hasher
752 self.log("closing plaintext leaf hasher, hashed %d bytes" %
753 self._plaintext_segment_hashed_bytes,
755 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
756 segnum=len(self._plaintext_segment_hashes)-1,
757 hash=base32.b2a(p.digest()),
759 assert len(self._plaintext_segment_hashes) == num_segments
760 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
762 def get_plaintext_hash(self):
763 h = self._plaintext_hasher.digest()
764 return defer.succeed(h)
767 return self.original.close()
770 implements(IUploadStatus)
771 statusid_counter = itertools.count(0)
774 self.storage_index = None
777 self.status = "Not started"
778 self.progress = [0.0, 0.0, 0.0]
781 self.counter = self.statusid_counter.next()
782 self.started = time.time()
784 def get_started(self):
786 def get_storage_index(self):
787 return self.storage_index
790 def using_helper(self):
792 def get_status(self):
794 def get_progress(self):
795 return tuple(self.progress)
796 def get_active(self):
798 def get_results(self):
800 def get_counter(self):
803 def set_storage_index(self, si):
804 self.storage_index = si
805 def set_size(self, size):
807 def set_helper(self, helper):
809 def set_status(self, status):
811 def set_progress(self, which, value):
812 # [0]: chk, [1]: ciphertext, [2]: encode+push
813 self.progress[which] = value
814 def set_active(self, value):
816 def set_results(self, value):
820 peer_selector_class = Tahoe2PeerSelector
822 def __init__(self, storage_broker, secret_holder):
823 # peer_selector needs storage_broker and secret_holder
824 self._storage_broker = storage_broker
825 self._secret_holder = secret_holder
826 self._log_number = self.log("CHKUploader starting", parent=None)
828 self._results = UploadResults()
829 self._storage_index = None
830 self._upload_status = UploadStatus()
831 self._upload_status.set_helper(False)
832 self._upload_status.set_active(True)
833 self._upload_status.set_results(self._results)
835 # locate_all_shareholders() will create the following attribute:
836 # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
838 def log(self, *args, **kwargs):
839 if "parent" not in kwargs:
840 kwargs["parent"] = self._log_number
841 if "facility" not in kwargs:
842 kwargs["facility"] = "tahoe.upload"
843 return log.msg(*args, **kwargs)
845 def start(self, encrypted_uploadable):
846 """Start uploading the file.
848 Returns a Deferred that will fire with the UploadResults instance.
851 self._started = time.time()
852 eu = IEncryptedUploadable(encrypted_uploadable)
853 self.log("starting upload of %s" % eu)
855 eu.set_upload_status(self._upload_status)
856 d = self.start_encrypted(eu)
857 def _done(uploadresults):
858 self._upload_status.set_active(False)
864 """Call this if the upload must be abandoned before it completes.
865 This will tell the shareholders to delete their partial shares. I
866 return a Deferred that fires when these messages have been acked."""
867 if not self._encoder:
868 # how did you call abort() before calling start() ?
869 return defer.succeed(None)
870 return self._encoder.abort()
872 def start_encrypted(self, encrypted):
873 """ Returns a Deferred that will fire with the UploadResults instance. """
874 eu = IEncryptedUploadable(encrypted)
876 started = time.time()
877 self._encoder = e = encode.Encoder(self._log_number,
879 d = e.set_encrypted_uploadable(eu)
880 d.addCallback(self.locate_all_shareholders, started)
881 d.addCallback(self.set_shareholders, e)
882 d.addCallback(lambda res: e.start())
883 d.addCallback(self._encrypted_done)
886 def locate_all_shareholders(self, encoder, started):
887 peer_selection_started = now = time.time()
888 self._storage_index_elapsed = now - started
889 storage_broker = self._storage_broker
890 secret_holder = self._secret_holder
891 storage_index = encoder.get_param("storage_index")
892 self._storage_index = storage_index
893 upload_id = si_b2a(storage_index)[:5]
894 self.log("using storage index %s" % upload_id)
895 peer_selector = self.peer_selector_class(upload_id, self._log_number,
898 share_size = encoder.get_param("share_size")
899 block_size = encoder.get_param("block_size")
900 num_segments = encoder.get_param("num_segments")
901 k,desired,n = encoder.get_param("share_counts")
903 self._peer_selection_started = time.time()
904 d = peer_selector.get_shareholders(storage_broker, secret_holder,
906 share_size, block_size,
907 num_segments, n, k, desired)
909 self._peer_selection_elapsed = time.time() - peer_selection_started
914 def set_shareholders(self, (upload_servers, already_peers), encoder):
916 @param upload_servers: a sequence of PeerTracker objects that have agreed to hold some shares for us (the shareids are stashed inside the PeerTracker)
917 @paran already_peers: a dict mapping sharenum to a set of peerids
918 that claim to already have this share
920 self.log("set_shareholders; upload_servers is %s, already_peers is %s" % ([', '.join(["%s: %s" % (k, idlib.shortnodeid_b2a(v._nodeid),) for k,v in p.buckets.iteritems()]) for p in upload_servers], already_peers))
921 # record already-present shares in self._results
922 self._results.preexisting_shares = len(already_peers)
924 self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
925 for peer in upload_servers:
926 assert isinstance(peer, PeerTracker)
928 servermap = already_peers.copy()
929 for peer in upload_servers:
930 buckets.update(peer.buckets)
931 for shnum in peer.buckets:
932 self._peer_trackers[shnum] = peer
933 servermap.setdefault(shnum, set()).add(peer.peerid)
934 self.log("set_shareholders; %s (%s) == %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers]))
935 assert len(buckets) == sum([len(peer.buckets) for peer in upload_servers]), "%s (%s) != %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers])
936 encoder.set_shareholders(buckets, servermap)
938 def _encrypted_done(self, verifycap):
939 """ Returns a Deferred that will fire with the UploadResults instance. """
941 for shnum in self._encoder.get_shares_placed():
942 peer_tracker = self._peer_trackers[shnum]
943 peerid = peer_tracker.peerid
944 r.sharemap.add(shnum, peerid)
945 r.servermap.add(peerid, shnum)
946 r.pushed_shares = len(self._encoder.get_shares_placed())
948 r.file_size = self._encoder.file_size
949 r.timings["total"] = now - self._started
950 r.timings["storage_index"] = self._storage_index_elapsed
951 r.timings["peer_selection"] = self._peer_selection_elapsed
952 r.timings.update(self._encoder.get_times())
953 r.uri_extension_data = self._encoder.get_uri_extension_data()
954 r.verifycapstr = verifycap.to_string()
957 def get_upload_status(self):
958 return self._upload_status
960 def read_this_many_bytes(uploadable, size, prepend_data=[]):
962 return defer.succeed([])
963 d = uploadable.read(size)
965 assert isinstance(data, list)
966 bytes = sum([len(piece) for piece in data])
969 remaining = size - bytes
971 return read_this_many_bytes(uploadable, remaining,
973 return prepend_data + data
977 class LiteralUploader:
980 self._results = UploadResults()
981 self._status = s = UploadStatus()
982 s.set_storage_index(None)
984 s.set_progress(0, 1.0)
986 s.set_results(self._results)
988 def start(self, uploadable):
989 uploadable = IUploadable(uploadable)
990 d = uploadable.get_size()
993 self._status.set_size(size)
994 self._results.file_size = size
995 return read_this_many_bytes(uploadable, size)
996 d.addCallback(_got_size)
997 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
998 d.addCallback(lambda u: u.to_string())
999 d.addCallback(self._build_results)
1002 def _build_results(self, uri):
1003 self._results.uri = uri
1004 self._status.set_status("Finished")
1005 self._status.set_progress(1, 1.0)
1006 self._status.set_progress(2, 1.0)
1007 return self._results
1012 def get_upload_status(self):
1015 class RemoteEncryptedUploadable(Referenceable):
1016 implements(RIEncryptedUploadable)
1018 def __init__(self, encrypted_uploadable, upload_status):
1019 self._eu = IEncryptedUploadable(encrypted_uploadable)
1021 self._bytes_sent = 0
1022 self._status = IUploadStatus(upload_status)
1023 # we are responsible for updating the status string while we run, and
1024 # for setting the ciphertext-fetch progress.
1028 if self._size is not None:
1029 return defer.succeed(self._size)
1030 d = self._eu.get_size()
1031 def _got_size(size):
1034 d.addCallback(_got_size)
1037 def remote_get_size(self):
1038 return self.get_size()
1039 def remote_get_all_encoding_parameters(self):
1040 return self._eu.get_all_encoding_parameters()
1042 def _read_encrypted(self, length, hash_only):
1043 d = self._eu.read_encrypted(length, hash_only)
1046 self._offset += length
1048 size = sum([len(data) for data in strings])
1049 self._offset += size
1051 d.addCallback(_read)
1054 def remote_read_encrypted(self, offset, length):
1055 # we don't support seek backwards, but we allow skipping forwards
1056 precondition(offset >= 0, offset)
1057 precondition(length >= 0, length)
1058 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1060 precondition(offset >= self._offset, offset, self._offset)
1061 if offset > self._offset:
1062 # read the data from disk anyways, to build up the hash tree
1063 skip = offset - self._offset
1064 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1065 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1066 d = self._read_encrypted(skip, hash_only=True)
1068 d = defer.succeed(None)
1070 def _at_correct_offset(res):
1071 assert offset == self._offset, "%d != %d" % (offset, self._offset)
1072 return self._read_encrypted(length, hash_only=False)
1073 d.addCallback(_at_correct_offset)
1076 size = sum([len(data) for data in strings])
1077 self._bytes_sent += size
1079 d.addCallback(_read)
1082 def remote_close(self):
1083 return self._eu.close()
1086 class AssistedUploader:
1088 def __init__(self, helper):
1089 self._helper = helper
1090 self._log_number = log.msg("AssistedUploader starting")
1091 self._storage_index = None
1092 self._upload_status = s = UploadStatus()
1096 def log(self, *args, **kwargs):
1097 if "parent" not in kwargs:
1098 kwargs["parent"] = self._log_number
1099 return log.msg(*args, **kwargs)
1101 def start(self, encrypted_uploadable, storage_index):
1102 """Start uploading the file.
1104 Returns a Deferred that will fire with the UploadResults instance.
1106 precondition(isinstance(storage_index, str), storage_index)
1107 self._started = time.time()
1108 eu = IEncryptedUploadable(encrypted_uploadable)
1109 eu.set_upload_status(self._upload_status)
1110 self._encuploadable = eu
1111 self._storage_index = storage_index
1113 d.addCallback(self._got_size)
1114 d.addCallback(lambda res: eu.get_all_encoding_parameters())
1115 d.addCallback(self._got_all_encoding_parameters)
1116 d.addCallback(self._contact_helper)
1117 d.addCallback(self._build_verifycap)
1119 self._upload_status.set_active(False)
1124 def _got_size(self, size):
1126 self._upload_status.set_size(size)
1128 def _got_all_encoding_parameters(self, params):
1129 k, happy, n, segment_size = params
1130 # stash these for URI generation later
1131 self._needed_shares = k
1132 self._total_shares = n
1133 self._segment_size = segment_size
1135 def _contact_helper(self, res):
1136 now = self._time_contacting_helper_start = time.time()
1137 self._storage_index_elapsed = now - self._started
1138 self.log(format="contacting helper for SI %(si)s..",
1139 si=si_b2a(self._storage_index))
1140 self._upload_status.set_status("Contacting Helper")
1141 d = self._helper.callRemote("upload_chk", self._storage_index)
1142 d.addCallback(self._contacted_helper)
1145 def _contacted_helper(self, (upload_results, upload_helper)):
1147 elapsed = now - self._time_contacting_helper_start
1148 self._elapsed_time_contacting_helper = elapsed
1150 self.log("helper says we need to upload")
1151 self._upload_status.set_status("Uploading Ciphertext")
1152 # we need to upload the file
1153 reu = RemoteEncryptedUploadable(self._encuploadable,
1154 self._upload_status)
1155 # let it pre-compute the size for progress purposes
1157 d.addCallback(lambda ignored:
1158 upload_helper.callRemote("upload", reu))
1159 # this Deferred will fire with the upload results
1161 self.log("helper says file is already uploaded")
1162 self._upload_status.set_progress(1, 1.0)
1163 self._upload_status.set_results(upload_results)
1164 return upload_results
1166 def _convert_old_upload_results(self, upload_results):
1167 # pre-1.3.0 helpers return upload results which contain a mapping
1168 # from shnum to a single human-readable string, containing things
1169 # like "Found on [x],[y],[z]" (for healthy files that were already in
1170 # the grid), "Found on [x]" (for files that needed upload but which
1171 # discovered pre-existing shares), and "Placed on [x]" (for newly
1172 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1173 # set of binary serverid strings.
1175 # the old results are too hard to deal with (they don't even contain
1176 # as much information as the new results, since the nodeids are
1177 # abbreviated), so if we detect old results, just clobber them.
1179 sharemap = upload_results.sharemap
1180 if str in [type(v) for v in sharemap.values()]:
1181 upload_results.sharemap = None
1183 def _build_verifycap(self, upload_results):
1184 self.log("upload finished, building readcap")
1185 self._convert_old_upload_results(upload_results)
1186 self._upload_status.set_status("Building Readcap")
1188 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1189 assert r.uri_extension_data["total_shares"] == self._total_shares
1190 assert r.uri_extension_data["segment_size"] == self._segment_size
1191 assert r.uri_extension_data["size"] == self._size
1192 r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1193 uri_extension_hash=r.uri_extension_hash,
1194 needed_shares=self._needed_shares,
1195 total_shares=self._total_shares, size=self._size
1198 r.file_size = self._size
1199 r.timings["storage_index"] = self._storage_index_elapsed
1200 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1201 if "total" in r.timings:
1202 r.timings["helper_total"] = r.timings["total"]
1203 r.timings["total"] = now - self._started
1204 self._upload_status.set_status("Finished")
1205 self._upload_status.set_results(r)
1208 def get_upload_status(self):
1209 return self._upload_status
1211 class BaseUploadable:
1212 default_max_segment_size = 128*KiB # overridden by max_segment_size
1213 default_encoding_param_k = 3 # overridden by encoding_parameters
1214 default_encoding_param_happy = 7
1215 default_encoding_param_n = 10
1217 max_segment_size = None
1218 encoding_param_k = None
1219 encoding_param_happy = None
1220 encoding_param_n = None
1222 _all_encoding_parameters = None
1225 def set_upload_status(self, upload_status):
1226 self._status = IUploadStatus(upload_status)
1228 def set_default_encoding_parameters(self, default_params):
1229 assert isinstance(default_params, dict)
1230 for k,v in default_params.items():
1231 precondition(isinstance(k, str), k, v)
1232 precondition(isinstance(v, int), k, v)
1233 if "k" in default_params:
1234 self.default_encoding_param_k = default_params["k"]
1235 if "happy" in default_params:
1236 self.default_encoding_param_happy = default_params["happy"]
1237 if "n" in default_params:
1238 self.default_encoding_param_n = default_params["n"]
1239 if "max_segment_size" in default_params:
1240 self.default_max_segment_size = default_params["max_segment_size"]
1242 def get_all_encoding_parameters(self):
1243 if self._all_encoding_parameters:
1244 return defer.succeed(self._all_encoding_parameters)
1246 max_segsize = self.max_segment_size or self.default_max_segment_size
1247 k = self.encoding_param_k or self.default_encoding_param_k
1248 happy = self.encoding_param_happy or self.default_encoding_param_happy
1249 n = self.encoding_param_n or self.default_encoding_param_n
1252 def _got_size(file_size):
1253 # for small files, shrink the segment size to avoid wasting space
1254 segsize = min(max_segsize, file_size)
1255 # this must be a multiple of 'required_shares'==k
1256 segsize = mathutil.next_multiple(segsize, k)
1257 encoding_parameters = (k, happy, n, segsize)
1258 self._all_encoding_parameters = encoding_parameters
1259 return encoding_parameters
1260 d.addCallback(_got_size)
1263 class FileHandle(BaseUploadable):
1264 implements(IUploadable)
1266 def __init__(self, filehandle, convergence):
1268 Upload the data from the filehandle. If convergence is None then a
1269 random encryption key will be used, else the plaintext will be hashed,
1270 then the hash will be hashed together with the string in the
1271 "convergence" argument to form the encryption key.
1273 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1274 self._filehandle = filehandle
1276 self.convergence = convergence
1279 def _get_encryption_key_convergent(self):
1280 if self._key is not None:
1281 return defer.succeed(self._key)
1284 # that sets self._size as a side-effect
1285 d.addCallback(lambda size: self.get_all_encoding_parameters())
1287 k, happy, n, segsize = params
1288 f = self._filehandle
1289 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1294 data = f.read(BLOCKSIZE)
1297 enckey_hasher.update(data)
1298 # TODO: setting progress in a non-yielding loop is kind of
1299 # pointless, but I'm anticipating (perhaps prematurely) the
1300 # day when we use a slowjob or twisted's CooperatorService to
1301 # make this yield time to other jobs.
1302 bytes_read += len(data)
1304 self._status.set_progress(0, float(bytes_read)/self._size)
1306 self._key = enckey_hasher.digest()
1308 self._status.set_progress(0, 1.0)
1309 assert len(self._key) == 16
1314 def _get_encryption_key_random(self):
1315 if self._key is None:
1316 self._key = os.urandom(16)
1317 return defer.succeed(self._key)
1319 def get_encryption_key(self):
1320 if self.convergence is not None:
1321 return self._get_encryption_key_convergent()
1323 return self._get_encryption_key_random()
1326 if self._size is not None:
1327 return defer.succeed(self._size)
1328 self._filehandle.seek(0,2)
1329 size = self._filehandle.tell()
1331 self._filehandle.seek(0)
1332 return defer.succeed(size)
1334 def read(self, length):
1335 return defer.succeed([self._filehandle.read(length)])
1338 # the originator of the filehandle reserves the right to close it
1341 class FileName(FileHandle):
1342 def __init__(self, filename, convergence):
1344 Upload the data from the filename. If convergence is None then a
1345 random encryption key will be used, else the plaintext will be hashed,
1346 then the hash will be hashed together with the string in the
1347 "convergence" argument to form the encryption key.
1349 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1350 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1352 FileHandle.close(self)
1353 self._filehandle.close()
1355 class Data(FileHandle):
1356 def __init__(self, data, convergence):
1358 Upload the data from the data argument. If convergence is None then a
1359 random encryption key will be used, else the plaintext will be hashed,
1360 then the hash will be hashed together with the string in the
1361 "convergence" argument to form the encryption key.
1363 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1364 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1366 class Uploader(service.MultiService, log.PrefixingLogMixin):
1367 """I am a service that allows file uploading. I am a service-child of the
1370 implements(IUploader)
1372 URI_LIT_SIZE_THRESHOLD = 55
1374 def __init__(self, helper_furl=None, stats_provider=None):
1375 self._helper_furl = helper_furl
1376 self.stats_provider = stats_provider
1378 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1379 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1380 service.MultiService.__init__(self)
1382 def startService(self):
1383 service.MultiService.startService(self)
1384 if self._helper_furl:
1385 self.parent.tub.connectTo(self._helper_furl,
1388 def _got_helper(self, helper):
1389 self.log("got helper connection, getting versions")
1390 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1392 "application-version": "unknown: no get_version()",
1394 d = add_version_to_remote_reference(helper, default)
1395 d.addCallback(self._got_versioned_helper)
1397 def _got_versioned_helper(self, helper):
1398 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1399 if needed not in helper.version:
1400 raise InsufficientVersionError(needed, helper.version)
1401 self._helper = helper
1402 helper.notifyOnDisconnect(self._lost_helper)
1404 def _lost_helper(self):
1407 def get_helper_info(self):
1408 # return a tuple of (helper_furl_or_None, connected_bool)
1409 return (self._helper_furl, bool(self._helper))
1412 def upload(self, uploadable, history=None):
1414 Returns a Deferred that will fire with the UploadResults instance.
1419 uploadable = IUploadable(uploadable)
1420 d = uploadable.get_size()
1421 def _got_size(size):
1422 default_params = self.parent.get_encoding_parameters()
1423 precondition(isinstance(default_params, dict), default_params)
1424 precondition("max_segment_size" in default_params, default_params)
1425 uploadable.set_default_encoding_parameters(default_params)
1427 if self.stats_provider:
1428 self.stats_provider.count('uploader.files_uploaded', 1)
1429 self.stats_provider.count('uploader.bytes_uploaded', size)
1431 if size <= self.URI_LIT_SIZE_THRESHOLD:
1432 uploader = LiteralUploader()
1433 return uploader.start(uploadable)
1435 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1436 d2 = defer.succeed(None)
1438 uploader = AssistedUploader(self._helper)
1439 d2.addCallback(lambda x: eu.get_storage_index())
1440 d2.addCallback(lambda si: uploader.start(eu, si))
1442 storage_broker = self.parent.get_storage_broker()
1443 secret_holder = self.parent._secret_holder
1444 uploader = CHKUploader(storage_broker, secret_holder)
1445 d2.addCallback(lambda x: uploader.start(eu))
1447 self._all_uploads[uploader] = None
1449 history.add_upload(uploader.get_upload_status())
1450 def turn_verifycap_into_read_cap(uploadresults):
1451 # Generate the uri from the verifycap plus the key.
1452 d3 = uploadable.get_encryption_key()
1453 def put_readcap_into_results(key):
1454 v = uri.from_string(uploadresults.verifycapstr)
1455 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1456 uploadresults.uri = r.to_string()
1457 return uploadresults
1458 d3.addCallback(put_readcap_into_results)
1460 d2.addCallback(turn_verifycap_into_read_cap)
1462 d.addCallback(_got_size)