1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9 file_cancel_secret_hash, bucket_renewal_secret_hash, \
10 bucket_cancel_secret_hash, plaintext_hasher, \
11 storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17 shares_by_server, merge_peers, \
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23 NoServersError, InsufficientVersionError, UploadUnhappinessError, \
24 DEFAULT_MAX_SEGMENT_SIZE
25 from allmydata.immutable import layout
26 from pycryptopp.cipher.aes import AES
28 from cStringIO import StringIO
37 class HaveAllPeersError(Exception):
38 # we use this to jump out of the loop
41 # this wants to live in storage, not here
42 class TooFullError(Exception):
45 class UploadResults(Copyable, RemoteCopy):
46 implements(IUploadResults)
47 # note: don't change this string, it needs to match the value used on the
48 # helper, and it does *not* need to match the fully-qualified
49 # package/module/class name
50 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
53 # also, think twice about changing the shape of any existing attribute,
54 # because instances of this class are sent from the helper to its client,
55 # so changing this may break compatibility. Consider adding new fields
56 # instead of modifying existing ones.
59 self.timings = {} # dict of name to number of seconds
60 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
61 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
63 self.ciphertext_fetched = None # how much the helper fetched
65 self.preexisting_shares = None # count of shares already present
66 self.pushed_shares = None # count of shares we pushed
69 # our current uri_extension is 846 bytes for small files, a few bytes
70 # more for larger ones (since the filesize is encoded in decimal in a
71 # few places). Ask for a little bit more just in case we need it. If
72 # the extension changes size, we can change EXTENSION_SIZE to
73 # allocate a more accurate amount of space.
75 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
78 def pretty_print_shnum_to_servers(s):
79 return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
82 def __init__(self, peerid, storage_server,
83 sharesize, blocksize, num_segments, num_share_hashes,
85 bucket_renewal_secret, bucket_cancel_secret):
86 precondition(isinstance(peerid, str), peerid)
87 precondition(len(peerid) == 20, peerid)
89 self._storageserver = storage_server # to an RIStorageServer
90 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
91 self.sharesize = sharesize
93 wbp = layout.make_write_bucket_proxy(None, sharesize,
94 blocksize, num_segments,
96 EXTENSION_SIZE, peerid)
97 self.wbp_class = wbp.__class__ # to create more of them
98 self.allocated_size = wbp.get_allocated_size()
99 self.blocksize = blocksize
100 self.num_segments = num_segments
101 self.num_share_hashes = num_share_hashes
102 self.storage_index = storage_index
104 self.renew_secret = bucket_renewal_secret
105 self.cancel_secret = bucket_cancel_secret
108 return ("<PeerTracker for peer %s and SI %s>"
109 % (idlib.shortnodeid_b2a(self.peerid),
110 si_b2a(self.storage_index)[:5]))
112 def query(self, sharenums):
113 d = self._storageserver.callRemote("allocate_buckets",
119 canary=Referenceable())
120 d.addCallback(self._got_reply)
123 def ask_about_existing_shares(self):
124 return self._storageserver.callRemote("get_buckets",
127 def _got_reply(self, (alreadygot, buckets)):
128 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
130 for sharenum, rref in buckets.iteritems():
131 bp = self.wbp_class(rref, self.sharesize,
134 self.num_share_hashes,
138 self.buckets.update(b)
139 return (alreadygot, set(b.keys()))
144 I abort the remote bucket writers for all shares. This is a good idea
145 to conserve space on the storage server.
147 self.abort_some_buckets(self.buckets.keys())
149 def abort_some_buckets(self, sharenums):
151 I abort the remote bucket writers for the share numbers in sharenums.
153 for sharenum in sharenums:
154 if sharenum in self.buckets:
155 self.buckets[sharenum].abort()
156 del self.buckets[sharenum]
159 def str_shareloc(shnum, bucketwriter):
160 return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),)
162 class Tahoe2PeerSelector(log.PrefixingLogMixin):
164 def __init__(self, upload_id, logparent=None, upload_status=None):
165 self.upload_id = upload_id
166 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
167 # Peers that are working normally, but full.
170 self.num_peers_contacted = 0
171 self.last_failure_msg = None
172 self._status = IUploadStatus(upload_status)
173 log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
174 self.log("starting", level=log.OPERATIONAL)
177 return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
179 def get_shareholders(self, storage_broker, secret_holder,
180 storage_index, share_size, block_size,
181 num_segments, total_shares, needed_shares,
182 servers_of_happiness):
184 @return: (upload_servers, already_peers), where upload_servers is a set of
185 PeerTracker instances that have agreed to hold some shares
186 for us (the shareids are stashed inside the PeerTracker),
187 and already_peers is a dict mapping shnum to a set of peers
188 which claim to already have the share.
192 self._status.set_status("Contacting Peers..")
194 self.total_shares = total_shares
195 self.servers_of_happiness = servers_of_happiness
196 self.needed_shares = needed_shares
198 self.homeless_shares = set(range(total_shares))
199 self.contacted_peers = [] # peers worth asking again
200 self.contacted_peers2 = [] # peers that we have asked again
201 self._started_second_pass = False
202 self.use_peers = set() # PeerTrackers that have shares assigned to them
203 self.preexisting_shares = {} # shareid => set(peerids) holding shareid
204 # We don't try to allocate shares to these servers, since they've said
205 # that they're incapable of storing shares of the size that we'd want
206 # to store. We keep them around because they may have existing shares
207 # for this storage index, which we want to know about for accurate
208 # servers_of_happiness accounting
209 # (this is eventually a list, but it is initialized later)
210 self.readonly_peers = None
211 # These peers have shares -- any shares -- for our SI. We keep
212 # track of these to write an error message with them later.
213 self.peers_with_shares = set()
215 # this needed_hashes computation should mirror
216 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
217 # (instead of a HashTree) because we don't require actual hashing
218 # just to count the levels.
219 ht = hashtree.IncompleteHashTree(total_shares)
220 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
222 # figure out how much space to ask for
223 wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
224 num_share_hashes, EXTENSION_SIZE,
226 allocated_size = wbp.get_allocated_size()
227 all_peers = [(s.get_serverid(), s.get_rref())
228 for s in storage_broker.get_servers_for_psi(storage_index)]
230 raise NoServersError("client gave us zero peers")
232 # filter the list of peers according to which ones can accomodate
233 # this request. This excludes older peers (which used a 4-byte size
234 # field) from getting large shares (for files larger than about
235 # 12GiB). See #439 for details.
236 def _get_maxsize(peer):
237 (peerid, conn) = peer
238 v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
239 return v1["maximum-immutable-share-size"]
240 writable_peers = [peer for peer in all_peers
241 if _get_maxsize(peer) >= allocated_size]
242 readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers)
244 # decide upon the renewal/cancel secrets, to include them in the
245 # allocate_buckets query.
246 client_renewal_secret = secret_holder.get_renewal_secret()
247 client_cancel_secret = secret_holder.get_cancel_secret()
249 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
251 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
253 def _make_trackers(peers):
254 return [PeerTracker(peerid, conn,
255 share_size, block_size,
256 num_segments, num_share_hashes,
258 bucket_renewal_secret_hash(file_renewal_secret,
260 bucket_cancel_secret_hash(file_cancel_secret,
262 for (peerid, conn) in peers]
263 self.uncontacted_peers = _make_trackers(writable_peers)
264 self.readonly_peers = _make_trackers(readonly_peers)
265 # We now ask peers that can't hold any new shares about existing
266 # shares that they might have for our SI. Once this is done, we
267 # start placing the shares that we haven't already accounted
270 if self._status and self.readonly_peers:
271 self._status.set_status("Contacting readonly peers to find "
272 "any existing shares")
273 for peer in self.readonly_peers:
274 assert isinstance(peer, PeerTracker)
275 d = peer.ask_about_existing_shares()
276 d.addBoth(self._handle_existing_response, peer.peerid)
278 self.num_peers_contacted += 1
279 self.query_count += 1
280 self.log("asking peer %s for any existing shares" %
281 (idlib.shortnodeid_b2a(peer.peerid),),
283 dl = defer.DeferredList(ds)
284 dl.addCallback(lambda ign: self._loop())
288 def _handle_existing_response(self, res, peer):
290 I handle responses to the queries sent by
291 Tahoe2PeerSelector._existing_shares.
293 if isinstance(res, failure.Failure):
294 self.log("%s got error during existing shares check: %s"
295 % (idlib.shortnodeid_b2a(peer), res),
297 self.error_count += 1
298 self.bad_query_count += 1
302 self.peers_with_shares.add(peer)
303 self.log("response to get_buckets() from peer %s: alreadygot=%s"
304 % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
306 for bucket in buckets:
307 self.preexisting_shares.setdefault(bucket, set()).add(peer)
308 self.homeless_shares.discard(bucket)
310 self.bad_query_count += 1
313 def _get_progress_message(self):
314 if not self.homeless_shares:
315 msg = "placed all %d shares, " % (self.total_shares)
317 msg = ("placed %d shares out of %d total (%d homeless), " %
318 (self.total_shares - len(self.homeless_shares),
320 len(self.homeless_shares)))
321 return (msg + "want to place shares on at least %d servers such that "
322 "any %d of them have enough shares to recover the file, "
323 "sent %d queries to %d peers, "
324 "%d queries placed some shares, %d placed none "
325 "(of which %d placed none due to the server being"
326 " full and %d placed none due to an error)" %
327 (self.servers_of_happiness, self.needed_shares,
328 self.query_count, self.num_peers_contacted,
329 self.good_query_count, self.bad_query_count,
330 self.full_count, self.error_count))
334 if not self.homeless_shares:
335 merged = merge_peers(self.preexisting_shares, self.use_peers)
336 effective_happiness = servers_of_happiness(merged)
337 if self.servers_of_happiness <= effective_happiness:
338 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
339 "self.use_peers: %s, self.preexisting_shares: %s") \
340 % (self, self._get_progress_message(),
341 pretty_print_shnum_to_servers(merged),
342 [', '.join([str_shareloc(k,v) for k,v in p.buckets.iteritems()])
343 for p in self.use_peers],
344 pretty_print_shnum_to_servers(self.preexisting_shares))
345 self.log(msg, level=log.OPERATIONAL)
346 return (self.use_peers, self.preexisting_shares)
348 # We're not okay right now, but maybe we can fix it by
349 # redistributing some shares. In cases where one or two
350 # servers has, before the upload, all or most of the
351 # shares for a given SI, this can work by allowing _loop
352 # a chance to spread those out over the other peers,
353 delta = self.servers_of_happiness - effective_happiness
354 shares = shares_by_server(self.preexisting_shares)
355 # Each server in shares maps to a set of shares stored on it.
356 # Since we want to keep at least one share on each server
357 # that has one (otherwise we'd only be making
358 # the situation worse by removing distinct servers),
359 # each server has len(its shares) - 1 to spread around.
360 shares_to_spread = sum([len(list(sharelist)) - 1
361 for (server, sharelist)
363 if delta <= len(self.uncontacted_peers) and \
364 shares_to_spread >= delta:
365 items = shares.items()
366 while len(self.homeless_shares) < delta:
367 # Loop through the allocated shares, removing
368 # one from each server that has more than one
369 # and putting it back into self.homeless_shares
370 # until we've done this delta times.
371 server, sharelist = items.pop()
372 if len(sharelist) > 1:
373 share = sharelist.pop()
374 self.homeless_shares.add(share)
375 self.preexisting_shares[share].remove(server)
376 if not self.preexisting_shares[share]:
377 del self.preexisting_shares[share]
378 items.append((server, sharelist))
379 for writer in self.use_peers:
380 writer.abort_some_buckets(self.homeless_shares)
383 # Redistribution won't help us; fail.
384 peer_count = len(self.peers_with_shares)
385 failmsg = failure_message(peer_count,
387 self.servers_of_happiness,
389 servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
390 servmsg = servmsgtempl % (
393 self._get_progress_message(),
394 pretty_print_shnum_to_servers(merged)
396 self.log(servmsg, level=log.INFREQUENT)
397 return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
399 if self.uncontacted_peers:
400 peer = self.uncontacted_peers.pop(0)
401 # TODO: don't pre-convert all peerids to PeerTrackers
402 assert isinstance(peer, PeerTracker)
404 shares_to_ask = set(sorted(self.homeless_shares)[:1])
405 self.homeless_shares -= shares_to_ask
406 self.query_count += 1
407 self.num_peers_contacted += 1
409 self._status.set_status("Contacting Peers [%s] (first query),"
411 % (idlib.shortnodeid_b2a(peer.peerid),
412 len(self.homeless_shares)))
413 d = peer.query(shares_to_ask)
414 d.addBoth(self._got_response, peer, shares_to_ask,
415 self.contacted_peers)
417 elif self.contacted_peers:
418 # ask a peer that we've already asked.
419 if not self._started_second_pass:
420 self.log("starting second pass",
422 self._started_second_pass = True
423 num_shares = mathutil.div_ceil(len(self.homeless_shares),
424 len(self.contacted_peers))
425 peer = self.contacted_peers.pop(0)
426 shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
427 self.homeless_shares -= shares_to_ask
428 self.query_count += 1
430 self._status.set_status("Contacting Peers [%s] (second query),"
432 % (idlib.shortnodeid_b2a(peer.peerid),
433 len(self.homeless_shares)))
434 d = peer.query(shares_to_ask)
435 d.addBoth(self._got_response, peer, shares_to_ask,
436 self.contacted_peers2)
438 elif self.contacted_peers2:
439 # we've finished the second-or-later pass. Move all the remaining
440 # peers back into self.contacted_peers for the next pass.
441 self.contacted_peers.extend(self.contacted_peers2)
442 self.contacted_peers2[:] = []
445 # no more peers. If we haven't placed enough shares, we fail.
446 merged = merge_peers(self.preexisting_shares, self.use_peers)
447 effective_happiness = servers_of_happiness(merged)
448 if effective_happiness < self.servers_of_happiness:
449 msg = failure_message(len(self.peers_with_shares),
451 self.servers_of_happiness,
453 msg = ("peer selection failed for %s: %s (%s)" % (self,
455 self._get_progress_message()))
456 if self.last_failure_msg:
457 msg += " (%s)" % (self.last_failure_msg,)
458 self.log(msg, level=log.UNUSUAL)
459 return self._failed(msg)
461 # we placed enough to be happy, so we're done
463 self._status.set_status("Placed all shares")
464 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
465 self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
466 self.log(msg, level=log.OPERATIONAL)
467 return (self.use_peers, self.preexisting_shares)
469 def _got_response(self, res, peer, shares_to_ask, put_peer_here):
470 if isinstance(res, failure.Failure):
471 # This is unusual, and probably indicates a bug or a network
473 self.log("%s got error during peer selection: %s" % (peer, res),
475 self.error_count += 1
476 self.bad_query_count += 1
477 self.homeless_shares |= shares_to_ask
478 if (self.uncontacted_peers
479 or self.contacted_peers
480 or self.contacted_peers2):
481 # there is still hope, so just loop
484 # No more peers, so this upload might fail (it depends upon
485 # whether we've hit servers_of_happiness or not). Log the last
486 # failure we got: if a coding error causes all peers to fail
487 # in the same way, this allows the common failure to be seen
488 # by the uploader and should help with debugging
489 msg = ("last failure (from %s) was: %s" % (peer, res))
490 self.last_failure_msg = msg
492 (alreadygot, allocated) = res
493 self.log("response to allocate_buckets() from peer %s: alreadygot=%s, allocated=%s"
494 % (idlib.shortnodeid_b2a(peer.peerid),
495 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
499 self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
500 if s in self.homeless_shares:
501 self.homeless_shares.remove(s)
503 elif s in shares_to_ask:
506 # the PeerTracker will remember which shares were allocated on
507 # that peer. We just have to remember to use them.
509 self.use_peers.add(peer)
512 if allocated or alreadygot:
513 self.peers_with_shares.add(peer.peerid)
515 not_yet_present = set(shares_to_ask) - set(alreadygot)
516 still_homeless = not_yet_present - set(allocated)
519 # They accepted at least one of the shares that we asked
520 # them to accept, or they had a share that we didn't ask
521 # them to accept but that we hadn't placed yet, so this
522 # was a productive query
523 self.good_query_count += 1
525 self.bad_query_count += 1
529 # In networks with lots of space, this is very unusual and
530 # probably indicates an error. In networks with peers that
531 # are full, it is merely unusual. In networks that are very
532 # full, it is common, and many uploads will fail. In most
533 # cases, this is obviously not fatal, and we'll just use some
536 # some shares are still homeless, keep trying to find them a
537 # home. The ones that were rejected get first priority.
538 self.homeless_shares |= still_homeless
539 # Since they were unable to accept all of our requests, so it
540 # is safe to assume that asking them again won't help.
542 # if they *were* able to accept everything, they might be
543 # willing to accept even more.
544 put_peer_here.append(peer)
550 def _failed(self, msg):
552 I am called when peer selection fails. I first abort all of the
553 remote buckets that I allocated during my unsuccessful attempt to
554 place shares for this file. I then raise an
555 UploadUnhappinessError with my msg argument.
557 for peer in self.use_peers:
558 assert isinstance(peer, PeerTracker)
562 raise UploadUnhappinessError(msg)
565 class EncryptAnUploadable:
566 """This is a wrapper that takes an IUploadable and provides
567 IEncryptedUploadable."""
568 implements(IEncryptedUploadable)
571 def __init__(self, original, log_parent=None):
572 self.original = IUploadable(original)
573 self._log_number = log_parent
574 self._encryptor = None
575 self._plaintext_hasher = plaintext_hasher()
576 self._plaintext_segment_hasher = None
577 self._plaintext_segment_hashes = []
578 self._encoding_parameters = None
579 self._file_size = None
580 self._ciphertext_bytes_read = 0
583 def set_upload_status(self, upload_status):
584 self._status = IUploadStatus(upload_status)
585 self.original.set_upload_status(upload_status)
587 def log(self, *args, **kwargs):
588 if "facility" not in kwargs:
589 kwargs["facility"] = "upload.encryption"
590 if "parent" not in kwargs:
591 kwargs["parent"] = self._log_number
592 return log.msg(*args, **kwargs)
595 if self._file_size is not None:
596 return defer.succeed(self._file_size)
597 d = self.original.get_size()
599 self._file_size = size
601 self._status.set_size(size)
603 d.addCallback(_got_size)
606 def get_all_encoding_parameters(self):
607 if self._encoding_parameters is not None:
608 return defer.succeed(self._encoding_parameters)
609 d = self.original.get_all_encoding_parameters()
610 def _got(encoding_parameters):
611 (k, happy, n, segsize) = encoding_parameters
612 self._segment_size = segsize # used by segment hashers
613 self._encoding_parameters = encoding_parameters
614 self.log("my encoding parameters: %s" % (encoding_parameters,),
616 return encoding_parameters
620 def _get_encryptor(self):
622 return defer.succeed(self._encryptor)
624 d = self.original.get_encryption_key()
629 storage_index = storage_index_hash(key)
630 assert isinstance(storage_index, str)
631 # There's no point to having the SI be longer than the key, so we
632 # specify that it is truncated to the same 128 bits as the AES key.
633 assert len(storage_index) == 16 # SHA-256 truncated to 128b
634 self._storage_index = storage_index
636 self._status.set_storage_index(storage_index)
641 def get_storage_index(self):
642 d = self._get_encryptor()
643 d.addCallback(lambda res: self._storage_index)
646 def _get_segment_hasher(self):
647 p = self._plaintext_segment_hasher
649 left = self._segment_size - self._plaintext_segment_hashed_bytes
651 p = plaintext_segment_hasher()
652 self._plaintext_segment_hasher = p
653 self._plaintext_segment_hashed_bytes = 0
654 return p, self._segment_size
656 def _update_segment_hash(self, chunk):
658 while offset < len(chunk):
659 p, segment_left = self._get_segment_hasher()
660 chunk_left = len(chunk) - offset
661 this_segment = min(chunk_left, segment_left)
662 p.update(chunk[offset:offset+this_segment])
663 self._plaintext_segment_hashed_bytes += this_segment
665 if self._plaintext_segment_hashed_bytes == self._segment_size:
666 # we've filled this segment
667 self._plaintext_segment_hashes.append(p.digest())
668 self._plaintext_segment_hasher = None
669 self.log("closed hash [%d]: %dB" %
670 (len(self._plaintext_segment_hashes)-1,
671 self._plaintext_segment_hashed_bytes),
673 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
674 segnum=len(self._plaintext_segment_hashes)-1,
675 hash=base32.b2a(p.digest()),
678 offset += this_segment
681 def read_encrypted(self, length, hash_only):
682 # make sure our parameters have been set up first
683 d = self.get_all_encoding_parameters()
685 d.addCallback(lambda ignored: self.get_size())
686 d.addCallback(lambda ignored: self._get_encryptor())
687 # then fetch and encrypt the plaintext. The unusual structure here
688 # (passing a Deferred *into* a function) is needed to avoid
689 # overflowing the stack: Deferreds don't optimize out tail recursion.
690 # We also pass in a list, to which _read_encrypted will append
693 d2 = defer.Deferred()
694 d.addCallback(lambda ignored:
695 self._read_encrypted(length, ciphertext, hash_only, d2))
696 d.addCallback(lambda ignored: d2)
699 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
701 fire_when_done.callback(ciphertext)
703 # tolerate large length= values without consuming a lot of RAM by
704 # reading just a chunk (say 50kB) at a time. This only really matters
705 # when hash_only==True (i.e. resuming an interrupted upload), since
706 # that's the case where we will be skipping over a lot of data.
707 size = min(remaining, self.CHUNKSIZE)
708 remaining = remaining - size
709 # read a chunk of plaintext..
710 d = defer.maybeDeferred(self.original.read, size)
711 # N.B.: if read() is synchronous, then since everything else is
712 # actually synchronous too, we'd blow the stack unless we stall for a
713 # tick. Once you accept a Deferred from IUploadable.read(), you must
714 # be prepared to have it fire immediately too.
715 d.addCallback(fireEventually)
716 def _good(plaintext):
718 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
719 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
720 ciphertext.extend(ct)
721 self._read_encrypted(remaining, ciphertext, hash_only,
724 fire_when_done.errback(why)
729 def _hash_and_encrypt_plaintext(self, data, hash_only):
730 assert isinstance(data, (tuple, list)), type(data)
733 # we use data.pop(0) instead of 'for chunk in data' to save
734 # memory: each chunk is destroyed as soon as we're done with it.
738 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
740 bytes_processed += len(chunk)
741 self._plaintext_hasher.update(chunk)
742 self._update_segment_hash(chunk)
743 # TODO: we have to encrypt the data (even if hash_only==True)
744 # because pycryptopp's AES-CTR implementation doesn't offer a
745 # way to change the counter value. Once pycryptopp acquires
746 # this ability, change this to simply update the counter
747 # before each call to (hash_only==False) _encryptor.process()
748 ciphertext = self._encryptor.process(chunk)
750 self.log(" skipping encryption", level=log.NOISY)
752 cryptdata.append(ciphertext)
755 self._ciphertext_bytes_read += bytes_processed
757 progress = float(self._ciphertext_bytes_read) / self._file_size
758 self._status.set_progress(1, progress)
762 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
763 # this is currently unused, but will live again when we fix #453
764 if len(self._plaintext_segment_hashes) < num_segments:
765 # close out the last one
766 assert len(self._plaintext_segment_hashes) == num_segments-1
767 p, segment_left = self._get_segment_hasher()
768 self._plaintext_segment_hashes.append(p.digest())
769 del self._plaintext_segment_hasher
770 self.log("closing plaintext leaf hasher, hashed %d bytes" %
771 self._plaintext_segment_hashed_bytes,
773 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
774 segnum=len(self._plaintext_segment_hashes)-1,
775 hash=base32.b2a(p.digest()),
777 assert len(self._plaintext_segment_hashes) == num_segments
778 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
780 def get_plaintext_hash(self):
781 h = self._plaintext_hasher.digest()
782 return defer.succeed(h)
785 return self.original.close()
788 implements(IUploadStatus)
789 statusid_counter = itertools.count(0)
792 self.storage_index = None
795 self.status = "Not started"
796 self.progress = [0.0, 0.0, 0.0]
799 self.counter = self.statusid_counter.next()
800 self.started = time.time()
802 def get_started(self):
804 def get_storage_index(self):
805 return self.storage_index
808 def using_helper(self):
810 def get_status(self):
812 def get_progress(self):
813 return tuple(self.progress)
814 def get_active(self):
816 def get_results(self):
818 def get_counter(self):
821 def set_storage_index(self, si):
822 self.storage_index = si
823 def set_size(self, size):
825 def set_helper(self, helper):
827 def set_status(self, status):
829 def set_progress(self, which, value):
830 # [0]: chk, [1]: ciphertext, [2]: encode+push
831 self.progress[which] = value
832 def set_active(self, value):
834 def set_results(self, value):
838 peer_selector_class = Tahoe2PeerSelector
840 def __init__(self, storage_broker, secret_holder):
841 # peer_selector needs storage_broker and secret_holder
842 self._storage_broker = storage_broker
843 self._secret_holder = secret_holder
844 self._log_number = self.log("CHKUploader starting", parent=None)
846 self._results = UploadResults()
847 self._storage_index = None
848 self._upload_status = UploadStatus()
849 self._upload_status.set_helper(False)
850 self._upload_status.set_active(True)
851 self._upload_status.set_results(self._results)
853 # locate_all_shareholders() will create the following attribute:
854 # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
856 def log(self, *args, **kwargs):
857 if "parent" not in kwargs:
858 kwargs["parent"] = self._log_number
859 if "facility" not in kwargs:
860 kwargs["facility"] = "tahoe.upload"
861 return log.msg(*args, **kwargs)
863 def start(self, encrypted_uploadable):
864 """Start uploading the file.
866 Returns a Deferred that will fire with the UploadResults instance.
869 self._started = time.time()
870 eu = IEncryptedUploadable(encrypted_uploadable)
871 self.log("starting upload of %s" % eu)
873 eu.set_upload_status(self._upload_status)
874 d = self.start_encrypted(eu)
875 def _done(uploadresults):
876 self._upload_status.set_active(False)
882 """Call this if the upload must be abandoned before it completes.
883 This will tell the shareholders to delete their partial shares. I
884 return a Deferred that fires when these messages have been acked."""
885 if not self._encoder:
886 # how did you call abort() before calling start() ?
887 return defer.succeed(None)
888 return self._encoder.abort()
890 def start_encrypted(self, encrypted):
891 """ Returns a Deferred that will fire with the UploadResults instance. """
892 eu = IEncryptedUploadable(encrypted)
894 started = time.time()
895 self._encoder = e = encode.Encoder(self._log_number,
897 d = e.set_encrypted_uploadable(eu)
898 d.addCallback(self.locate_all_shareholders, started)
899 d.addCallback(self.set_shareholders, e)
900 d.addCallback(lambda res: e.start())
901 d.addCallback(self._encrypted_done)
904 def locate_all_shareholders(self, encoder, started):
905 peer_selection_started = now = time.time()
906 self._storage_index_elapsed = now - started
907 storage_broker = self._storage_broker
908 secret_holder = self._secret_holder
909 storage_index = encoder.get_param("storage_index")
910 self._storage_index = storage_index
911 upload_id = si_b2a(storage_index)[:5]
912 self.log("using storage index %s" % upload_id)
913 peer_selector = self.peer_selector_class(upload_id, self._log_number,
916 share_size = encoder.get_param("share_size")
917 block_size = encoder.get_param("block_size")
918 num_segments = encoder.get_param("num_segments")
919 k,desired,n = encoder.get_param("share_counts")
921 self._peer_selection_started = time.time()
922 d = peer_selector.get_shareholders(storage_broker, secret_holder,
924 share_size, block_size,
925 num_segments, n, k, desired)
927 self._peer_selection_elapsed = time.time() - peer_selection_started
932 def set_shareholders(self, (upload_servers, already_peers), encoder):
934 @param upload_servers: a sequence of PeerTracker objects that have agreed to hold some
935 shares for us (the shareids are stashed inside the PeerTracker)
936 @paran already_peers: a dict mapping sharenum to a set of peerids
937 that claim to already have this share
939 msgtempl = "set_shareholders; upload_servers is %s, already_peers is %s"
940 values = ([', '.join([str_shareloc(k,v) for k,v in p.buckets.iteritems()])
941 for p in upload_servers], already_peers)
942 self.log(msgtempl % values, level=log.OPERATIONAL)
943 # record already-present shares in self._results
944 self._results.preexisting_shares = len(already_peers)
946 self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
947 for peer in upload_servers:
948 assert isinstance(peer, PeerTracker)
950 servermap = already_peers.copy()
951 for peer in upload_servers:
952 buckets.update(peer.buckets)
953 for shnum in peer.buckets:
954 self._peer_trackers[shnum] = peer
955 servermap.setdefault(shnum, set()).add(peer.peerid)
956 assert len(buckets) == sum([len(peer.buckets) for peer in upload_servers]), \
957 "%s (%s) != %s (%s)" % (
960 sum([len(peer.buckets) for peer in upload_servers]),
961 [(p.buckets, p.peerid) for p in upload_servers]
963 encoder.set_shareholders(buckets, servermap)
965 def _encrypted_done(self, verifycap):
966 """ Returns a Deferred that will fire with the UploadResults instance. """
968 for shnum in self._encoder.get_shares_placed():
969 peer_tracker = self._peer_trackers[shnum]
970 peerid = peer_tracker.peerid
971 r.sharemap.add(shnum, peerid)
972 r.servermap.add(peerid, shnum)
973 r.pushed_shares = len(self._encoder.get_shares_placed())
975 r.file_size = self._encoder.file_size
976 r.timings["total"] = now - self._started
977 r.timings["storage_index"] = self._storage_index_elapsed
978 r.timings["peer_selection"] = self._peer_selection_elapsed
979 r.timings.update(self._encoder.get_times())
980 r.uri_extension_data = self._encoder.get_uri_extension_data()
981 r.verifycapstr = verifycap.to_string()
984 def get_upload_status(self):
985 return self._upload_status
987 def read_this_many_bytes(uploadable, size, prepend_data=[]):
989 return defer.succeed([])
990 d = uploadable.read(size)
992 assert isinstance(data, list)
993 bytes = sum([len(piece) for piece in data])
996 remaining = size - bytes
998 return read_this_many_bytes(uploadable, remaining,
1000 return prepend_data + data
1004 class LiteralUploader:
1007 self._results = UploadResults()
1008 self._status = s = UploadStatus()
1009 s.set_storage_index(None)
1011 s.set_progress(0, 1.0)
1013 s.set_results(self._results)
1015 def start(self, uploadable):
1016 uploadable = IUploadable(uploadable)
1017 d = uploadable.get_size()
1018 def _got_size(size):
1020 self._status.set_size(size)
1021 self._results.file_size = size
1022 return read_this_many_bytes(uploadable, size)
1023 d.addCallback(_got_size)
1024 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
1025 d.addCallback(lambda u: u.to_string())
1026 d.addCallback(self._build_results)
1029 def _build_results(self, uri):
1030 self._results.uri = uri
1031 self._status.set_status("Finished")
1032 self._status.set_progress(1, 1.0)
1033 self._status.set_progress(2, 1.0)
1034 return self._results
1039 def get_upload_status(self):
1042 class RemoteEncryptedUploadable(Referenceable):
1043 implements(RIEncryptedUploadable)
1045 def __init__(self, encrypted_uploadable, upload_status):
1046 self._eu = IEncryptedUploadable(encrypted_uploadable)
1048 self._bytes_sent = 0
1049 self._status = IUploadStatus(upload_status)
1050 # we are responsible for updating the status string while we run, and
1051 # for setting the ciphertext-fetch progress.
1055 if self._size is not None:
1056 return defer.succeed(self._size)
1057 d = self._eu.get_size()
1058 def _got_size(size):
1061 d.addCallback(_got_size)
1064 def remote_get_size(self):
1065 return self.get_size()
1066 def remote_get_all_encoding_parameters(self):
1067 return self._eu.get_all_encoding_parameters()
1069 def _read_encrypted(self, length, hash_only):
1070 d = self._eu.read_encrypted(length, hash_only)
1073 self._offset += length
1075 size = sum([len(data) for data in strings])
1076 self._offset += size
1078 d.addCallback(_read)
1081 def remote_read_encrypted(self, offset, length):
1082 # we don't support seek backwards, but we allow skipping forwards
1083 precondition(offset >= 0, offset)
1084 precondition(length >= 0, length)
1085 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1087 precondition(offset >= self._offset, offset, self._offset)
1088 if offset > self._offset:
1089 # read the data from disk anyways, to build up the hash tree
1090 skip = offset - self._offset
1091 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1092 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1093 d = self._read_encrypted(skip, hash_only=True)
1095 d = defer.succeed(None)
1097 def _at_correct_offset(res):
1098 assert offset == self._offset, "%d != %d" % (offset, self._offset)
1099 return self._read_encrypted(length, hash_only=False)
1100 d.addCallback(_at_correct_offset)
1103 size = sum([len(data) for data in strings])
1104 self._bytes_sent += size
1106 d.addCallback(_read)
1109 def remote_close(self):
1110 return self._eu.close()
1113 class AssistedUploader:
1115 def __init__(self, helper):
1116 self._helper = helper
1117 self._log_number = log.msg("AssistedUploader starting")
1118 self._storage_index = None
1119 self._upload_status = s = UploadStatus()
1123 def log(self, *args, **kwargs):
1124 if "parent" not in kwargs:
1125 kwargs["parent"] = self._log_number
1126 return log.msg(*args, **kwargs)
1128 def start(self, encrypted_uploadable, storage_index):
1129 """Start uploading the file.
1131 Returns a Deferred that will fire with the UploadResults instance.
1133 precondition(isinstance(storage_index, str), storage_index)
1134 self._started = time.time()
1135 eu = IEncryptedUploadable(encrypted_uploadable)
1136 eu.set_upload_status(self._upload_status)
1137 self._encuploadable = eu
1138 self._storage_index = storage_index
1140 d.addCallback(self._got_size)
1141 d.addCallback(lambda res: eu.get_all_encoding_parameters())
1142 d.addCallback(self._got_all_encoding_parameters)
1143 d.addCallback(self._contact_helper)
1144 d.addCallback(self._build_verifycap)
1146 self._upload_status.set_active(False)
1151 def _got_size(self, size):
1153 self._upload_status.set_size(size)
1155 def _got_all_encoding_parameters(self, params):
1156 k, happy, n, segment_size = params
1157 # stash these for URI generation later
1158 self._needed_shares = k
1159 self._total_shares = n
1160 self._segment_size = segment_size
1162 def _contact_helper(self, res):
1163 now = self._time_contacting_helper_start = time.time()
1164 self._storage_index_elapsed = now - self._started
1165 self.log(format="contacting helper for SI %(si)s..",
1166 si=si_b2a(self._storage_index), level=log.NOISY)
1167 self._upload_status.set_status("Contacting Helper")
1168 d = self._helper.callRemote("upload_chk", self._storage_index)
1169 d.addCallback(self._contacted_helper)
1172 def _contacted_helper(self, (upload_results, upload_helper)):
1174 elapsed = now - self._time_contacting_helper_start
1175 self._elapsed_time_contacting_helper = elapsed
1177 self.log("helper says we need to upload", level=log.NOISY)
1178 self._upload_status.set_status("Uploading Ciphertext")
1179 # we need to upload the file
1180 reu = RemoteEncryptedUploadable(self._encuploadable,
1181 self._upload_status)
1182 # let it pre-compute the size for progress purposes
1184 d.addCallback(lambda ignored:
1185 upload_helper.callRemote("upload", reu))
1186 # this Deferred will fire with the upload results
1188 self.log("helper says file is already uploaded", level=log.OPERATIONAL)
1189 self._upload_status.set_progress(1, 1.0)
1190 self._upload_status.set_results(upload_results)
1191 return upload_results
1193 def _convert_old_upload_results(self, upload_results):
1194 # pre-1.3.0 helpers return upload results which contain a mapping
1195 # from shnum to a single human-readable string, containing things
1196 # like "Found on [x],[y],[z]" (for healthy files that were already in
1197 # the grid), "Found on [x]" (for files that needed upload but which
1198 # discovered pre-existing shares), and "Placed on [x]" (for newly
1199 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1200 # set of binary serverid strings.
1202 # the old results are too hard to deal with (they don't even contain
1203 # as much information as the new results, since the nodeids are
1204 # abbreviated), so if we detect old results, just clobber them.
1206 sharemap = upload_results.sharemap
1207 if str in [type(v) for v in sharemap.values()]:
1208 upload_results.sharemap = None
1210 def _build_verifycap(self, upload_results):
1211 self.log("upload finished, building readcap", level=log.OPERATIONAL)
1212 self._convert_old_upload_results(upload_results)
1213 self._upload_status.set_status("Building Readcap")
1215 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1216 assert r.uri_extension_data["total_shares"] == self._total_shares
1217 assert r.uri_extension_data["segment_size"] == self._segment_size
1218 assert r.uri_extension_data["size"] == self._size
1219 r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1220 uri_extension_hash=r.uri_extension_hash,
1221 needed_shares=self._needed_shares,
1222 total_shares=self._total_shares, size=self._size
1225 r.file_size = self._size
1226 r.timings["storage_index"] = self._storage_index_elapsed
1227 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1228 if "total" in r.timings:
1229 r.timings["helper_total"] = r.timings["total"]
1230 r.timings["total"] = now - self._started
1231 self._upload_status.set_status("Finished")
1232 self._upload_status.set_results(r)
1235 def get_upload_status(self):
1236 return self._upload_status
1238 class BaseUploadable:
1239 # this is overridden by max_segment_size
1240 default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
1241 default_encoding_param_k = 3 # overridden by encoding_parameters
1242 default_encoding_param_happy = 7
1243 default_encoding_param_n = 10
1245 max_segment_size = None
1246 encoding_param_k = None
1247 encoding_param_happy = None
1248 encoding_param_n = None
1250 _all_encoding_parameters = None
1253 def set_upload_status(self, upload_status):
1254 self._status = IUploadStatus(upload_status)
1256 def set_default_encoding_parameters(self, default_params):
1257 assert isinstance(default_params, dict)
1258 for k,v in default_params.items():
1259 precondition(isinstance(k, str), k, v)
1260 precondition(isinstance(v, int), k, v)
1261 if "k" in default_params:
1262 self.default_encoding_param_k = default_params["k"]
1263 if "happy" in default_params:
1264 self.default_encoding_param_happy = default_params["happy"]
1265 if "n" in default_params:
1266 self.default_encoding_param_n = default_params["n"]
1267 if "max_segment_size" in default_params:
1268 self.default_max_segment_size = default_params["max_segment_size"]
1270 def get_all_encoding_parameters(self):
1271 if self._all_encoding_parameters:
1272 return defer.succeed(self._all_encoding_parameters)
1274 max_segsize = self.max_segment_size or self.default_max_segment_size
1275 k = self.encoding_param_k or self.default_encoding_param_k
1276 happy = self.encoding_param_happy or self.default_encoding_param_happy
1277 n = self.encoding_param_n or self.default_encoding_param_n
1280 def _got_size(file_size):
1281 # for small files, shrink the segment size to avoid wasting space
1282 segsize = min(max_segsize, file_size)
1283 # this must be a multiple of 'required_shares'==k
1284 segsize = mathutil.next_multiple(segsize, k)
1285 encoding_parameters = (k, happy, n, segsize)
1286 self._all_encoding_parameters = encoding_parameters
1287 return encoding_parameters
1288 d.addCallback(_got_size)
1291 class FileHandle(BaseUploadable):
1292 implements(IUploadable)
1294 def __init__(self, filehandle, convergence):
1296 Upload the data from the filehandle. If convergence is None then a
1297 random encryption key will be used, else the plaintext will be hashed,
1298 then the hash will be hashed together with the string in the
1299 "convergence" argument to form the encryption key.
1301 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1302 self._filehandle = filehandle
1304 self.convergence = convergence
1307 def _get_encryption_key_convergent(self):
1308 if self._key is not None:
1309 return defer.succeed(self._key)
1312 # that sets self._size as a side-effect
1313 d.addCallback(lambda size: self.get_all_encoding_parameters())
1315 k, happy, n, segsize = params
1316 f = self._filehandle
1317 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1322 data = f.read(BLOCKSIZE)
1325 enckey_hasher.update(data)
1326 # TODO: setting progress in a non-yielding loop is kind of
1327 # pointless, but I'm anticipating (perhaps prematurely) the
1328 # day when we use a slowjob or twisted's CooperatorService to
1329 # make this yield time to other jobs.
1330 bytes_read += len(data)
1332 self._status.set_progress(0, float(bytes_read)/self._size)
1334 self._key = enckey_hasher.digest()
1336 self._status.set_progress(0, 1.0)
1337 assert len(self._key) == 16
1342 def _get_encryption_key_random(self):
1343 if self._key is None:
1344 self._key = os.urandom(16)
1345 return defer.succeed(self._key)
1347 def get_encryption_key(self):
1348 if self.convergence is not None:
1349 return self._get_encryption_key_convergent()
1351 return self._get_encryption_key_random()
1354 if self._size is not None:
1355 return defer.succeed(self._size)
1356 self._filehandle.seek(0,2)
1357 size = self._filehandle.tell()
1359 self._filehandle.seek(0)
1360 return defer.succeed(size)
1362 def read(self, length):
1363 return defer.succeed([self._filehandle.read(length)])
1366 # the originator of the filehandle reserves the right to close it
1369 class FileName(FileHandle):
1370 def __init__(self, filename, convergence):
1372 Upload the data from the filename. If convergence is None then a
1373 random encryption key will be used, else the plaintext will be hashed,
1374 then the hash will be hashed together with the string in the
1375 "convergence" argument to form the encryption key.
1377 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1378 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1380 FileHandle.close(self)
1381 self._filehandle.close()
1383 class Data(FileHandle):
1384 def __init__(self, data, convergence):
1386 Upload the data from the data argument. If convergence is None then a
1387 random encryption key will be used, else the plaintext will be hashed,
1388 then the hash will be hashed together with the string in the
1389 "convergence" argument to form the encryption key.
1391 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1392 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1394 class Uploader(service.MultiService, log.PrefixingLogMixin):
1395 """I am a service that allows file uploading. I am a service-child of the
1398 implements(IUploader)
1400 URI_LIT_SIZE_THRESHOLD = 55
1402 def __init__(self, helper_furl=None, stats_provider=None):
1403 self._helper_furl = helper_furl
1404 self.stats_provider = stats_provider
1406 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1407 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1408 service.MultiService.__init__(self)
1410 def startService(self):
1411 service.MultiService.startService(self)
1412 if self._helper_furl:
1413 self.parent.tub.connectTo(self._helper_furl,
1416 def _got_helper(self, helper):
1417 self.log("got helper connection, getting versions")
1418 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1420 "application-version": "unknown: no get_version()",
1422 d = add_version_to_remote_reference(helper, default)
1423 d.addCallback(self._got_versioned_helper)
1425 def _got_versioned_helper(self, helper):
1426 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1427 if needed not in helper.version:
1428 raise InsufficientVersionError(needed, helper.version)
1429 self._helper = helper
1430 helper.notifyOnDisconnect(self._lost_helper)
1432 def _lost_helper(self):
1435 def get_helper_info(self):
1436 # return a tuple of (helper_furl_or_None, connected_bool)
1437 return (self._helper_furl, bool(self._helper))
1440 def upload(self, uploadable, history=None):
1442 Returns a Deferred that will fire with the UploadResults instance.
1447 uploadable = IUploadable(uploadable)
1448 d = uploadable.get_size()
1449 def _got_size(size):
1450 default_params = self.parent.get_encoding_parameters()
1451 precondition(isinstance(default_params, dict), default_params)
1452 precondition("max_segment_size" in default_params, default_params)
1453 uploadable.set_default_encoding_parameters(default_params)
1455 if self.stats_provider:
1456 self.stats_provider.count('uploader.files_uploaded', 1)
1457 self.stats_provider.count('uploader.bytes_uploaded', size)
1459 if size <= self.URI_LIT_SIZE_THRESHOLD:
1460 uploader = LiteralUploader()
1461 return uploader.start(uploadable)
1463 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1464 d2 = defer.succeed(None)
1466 uploader = AssistedUploader(self._helper)
1467 d2.addCallback(lambda x: eu.get_storage_index())
1468 d2.addCallback(lambda si: uploader.start(eu, si))
1470 storage_broker = self.parent.get_storage_broker()
1471 secret_holder = self.parent._secret_holder
1472 uploader = CHKUploader(storage_broker, secret_holder)
1473 d2.addCallback(lambda x: uploader.start(eu))
1475 self._all_uploads[uploader] = None
1477 history.add_upload(uploader.get_upload_status())
1478 def turn_verifycap_into_read_cap(uploadresults):
1479 # Generate the uri from the verifycap plus the key.
1480 d3 = uploadable.get_encryption_key()
1481 def put_readcap_into_results(key):
1482 v = uri.from_string(uploadresults.verifycapstr)
1483 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1484 uploadresults.uri = r.to_string()
1485 return uploadresults
1486 d3.addCallback(put_readcap_into_results)
1488 d2.addCallback(turn_verifycap_into_read_cap)
1490 d.addCallback(_got_size)