1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9 file_cancel_secret_hash, bucket_renewal_secret_hash, \
10 bucket_cancel_secret_hash, plaintext_hasher, \
11 storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17 shares_by_server, merge_peers, \
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23 NoServersError, InsufficientVersionError, UploadUnhappinessError, \
24 DEFAULT_MAX_SEGMENT_SIZE
25 from allmydata.immutable import layout
26 from pycryptopp.cipher.aes import AES
28 from cStringIO import StringIO
31 # this wants to live in storage, not here
32 class TooFullError(Exception):
35 class UploadResults(Copyable, RemoteCopy):
36 implements(IUploadResults)
37 # note: don't change this string, it needs to match the value used on the
38 # helper, and it does *not* need to match the fully-qualified
39 # package/module/class name
40 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
43 # also, think twice about changing the shape of any existing attribute,
44 # because instances of this class are sent from the helper to its client,
45 # so changing this may break compatibility. Consider adding new fields
46 # instead of modifying existing ones.
49 self.timings = {} # dict of name to number of seconds
50 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
51 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
53 self.ciphertext_fetched = None # how much the helper fetched
55 self.preexisting_shares = None # count of shares already present
56 self.pushed_shares = None # count of shares we pushed
59 # our current uri_extension is 846 bytes for small files, a few bytes
60 # more for larger ones (since the filesize is encoded in decimal in a
61 # few places). Ask for a little bit more just in case we need it. If
62 # the extension changes size, we can change EXTENSION_SIZE to
63 # allocate a more accurate amount of space.
65 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
68 def pretty_print_shnum_to_servers(s):
69 return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
72 def __init__(self, serverid, storage_server,
73 sharesize, blocksize, num_segments, num_share_hashes,
75 bucket_renewal_secret, bucket_cancel_secret):
76 precondition(isinstance(serverid, str), serverid)
77 precondition(len(serverid) == 20, serverid)
78 self.serverid = serverid
79 self._storageserver = storage_server # to an RIStorageServer
80 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
81 self.sharesize = sharesize
83 wbp = layout.make_write_bucket_proxy(None, sharesize,
84 blocksize, num_segments,
86 EXTENSION_SIZE, serverid)
87 self.wbp_class = wbp.__class__ # to create more of them
88 self.allocated_size = wbp.get_allocated_size()
89 self.blocksize = blocksize
90 self.num_segments = num_segments
91 self.num_share_hashes = num_share_hashes
92 self.storage_index = storage_index
94 self.renew_secret = bucket_renewal_secret
95 self.cancel_secret = bucket_cancel_secret
98 return ("<ServerTracker for server %s and SI %s>"
99 % (idlib.shortnodeid_b2a(self.serverid),
100 si_b2a(self.storage_index)[:5]))
102 def query(self, sharenums):
103 d = self._storageserver.callRemote("allocate_buckets",
109 canary=Referenceable())
110 d.addCallback(self._got_reply)
113 def ask_about_existing_shares(self):
114 return self._storageserver.callRemote("get_buckets",
117 def _got_reply(self, (alreadygot, buckets)):
118 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
120 for sharenum, rref in buckets.iteritems():
121 bp = self.wbp_class(rref, self.sharesize,
124 self.num_share_hashes,
128 self.buckets.update(b)
129 return (alreadygot, set(b.keys()))
134 I abort the remote bucket writers for all shares. This is a good idea
135 to conserve space on the storage server.
137 self.abort_some_buckets(self.buckets.keys())
139 def abort_some_buckets(self, sharenums):
141 I abort the remote bucket writers for the share numbers in sharenums.
143 for sharenum in sharenums:
144 if sharenum in self.buckets:
145 self.buckets[sharenum].abort()
146 del self.buckets[sharenum]
149 def str_shareloc(shnum, bucketwriter):
150 return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),)
152 class Tahoe2ServerSelector(log.PrefixingLogMixin):
154 def __init__(self, upload_id, logparent=None, upload_status=None):
155 self.upload_id = upload_id
156 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
157 # Servers that are working normally, but full.
160 self.num_servers_contacted = 0
161 self.last_failure_msg = None
162 self._status = IUploadStatus(upload_status)
163 log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
164 self.log("starting", level=log.OPERATIONAL)
167 return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
169 def get_shareholders(self, storage_broker, secret_holder,
170 storage_index, share_size, block_size,
171 num_segments, total_shares, needed_shares,
172 servers_of_happiness):
174 @return: (upload_servers, already_servers), where upload_servers is
175 a set of ServerTracker instances that have agreed to hold
176 some shares for us (the shareids are stashed inside the
177 ServerTracker), and already_servers is a dict mapping shnum
178 to a set of servers which claim to already have the share.
182 self._status.set_status("Contacting Servers..")
184 self.total_shares = total_shares
185 self.servers_of_happiness = servers_of_happiness
186 self.needed_shares = needed_shares
188 self.homeless_shares = set(range(total_shares))
189 self.contacted_trackers = [] # servers worth asking again
190 self.contacted_trackers2 = [] # servers that we have asked again
191 self._started_second_pass = False
192 self.use_trackers = set() # ServerTrackers that have shares assigned
194 self.preexisting_shares = {} # shareid => set(serverids) holding shareid
196 # These servers have shares -- any shares -- for our SI. We keep
197 # track of these to write an error message with them later.
198 self.servers_with_shares = set()
200 # this needed_hashes computation should mirror
201 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
202 # (instead of a HashTree) because we don't require actual hashing
203 # just to count the levels.
204 ht = hashtree.IncompleteHashTree(total_shares)
205 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
207 # figure out how much space to ask for
208 wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
209 num_share_hashes, EXTENSION_SIZE,
211 allocated_size = wbp.get_allocated_size()
212 all_servers = [(s.get_serverid(), s.get_rref())
213 for s in storage_broker.get_servers_for_psi(storage_index)]
215 raise NoServersError("client gave us zero servers")
217 # filter the list of servers according to which ones can accomodate
218 # this request. This excludes older servers (which used a 4-byte size
219 # field) from getting large shares (for files larger than about
220 # 12GiB). See #439 for details.
221 def _get_maxsize(server):
222 (serverid, conn) = server
223 v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
224 return v1["maximum-immutable-share-size"]
225 writable_servers = [server for server in all_servers
226 if _get_maxsize(server) >= allocated_size]
227 readonly_servers = set(all_servers[:2*total_shares]) - set(writable_servers)
229 # decide upon the renewal/cancel secrets, to include them in the
230 # allocate_buckets query.
231 client_renewal_secret = secret_holder.get_renewal_secret()
232 client_cancel_secret = secret_holder.get_cancel_secret()
234 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
236 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
238 def _make_trackers(servers):
239 return [ServerTracker(serverid, conn,
240 share_size, block_size,
241 num_segments, num_share_hashes,
243 bucket_renewal_secret_hash(file_renewal_secret,
245 bucket_cancel_secret_hash(file_cancel_secret,
247 for (serverid, conn) in servers]
248 self.uncontacted_trackers = _make_trackers(writable_servers)
250 # We don't try to allocate shares to these servers, since they've
251 # said that they're incapable of storing shares of the size that we'd
252 # want to store. We ask them about existing shares for this storage
253 # index, which we want to know about for accurate
254 # servers_of_happiness accounting, then we forget about them.
255 readonly_trackers = _make_trackers(readonly_servers)
257 # We now ask servers that can't hold any new shares about existing
258 # shares that they might have for our SI. Once this is done, we
259 # start placing the shares that we haven't already accounted
262 if self._status and readonly_trackers:
263 self._status.set_status("Contacting readonly servers to find "
264 "any existing shares")
265 for tracker in readonly_trackers:
266 assert isinstance(tracker, ServerTracker)
267 d = tracker.ask_about_existing_shares()
268 d.addBoth(self._handle_existing_response, tracker.serverid)
270 self.num_servers_contacted += 1
271 self.query_count += 1
272 self.log("asking server %s for any existing shares" %
273 (idlib.shortnodeid_b2a(tracker.serverid),),
275 dl = defer.DeferredList(ds)
276 dl.addCallback(lambda ign: self._loop())
280 def _handle_existing_response(self, res, server):
282 I handle responses to the queries sent by
283 Tahoe2ServerSelector._existing_shares.
285 if isinstance(res, failure.Failure):
286 self.log("%s got error during existing shares check: %s"
287 % (idlib.shortnodeid_b2a(server), res),
289 self.error_count += 1
290 self.bad_query_count += 1
294 self.servers_with_shares.add(server)
295 self.log("response to get_buckets() from server %s: alreadygot=%s"
296 % (idlib.shortnodeid_b2a(server), tuple(sorted(buckets))),
298 for bucket in buckets:
299 self.preexisting_shares.setdefault(bucket, set()).add(server)
300 self.homeless_shares.discard(bucket)
302 self.bad_query_count += 1
305 def _get_progress_message(self):
306 if not self.homeless_shares:
307 msg = "placed all %d shares, " % (self.total_shares)
309 msg = ("placed %d shares out of %d total (%d homeless), " %
310 (self.total_shares - len(self.homeless_shares),
312 len(self.homeless_shares)))
313 return (msg + "want to place shares on at least %d servers such that "
314 "any %d of them have enough shares to recover the file, "
315 "sent %d queries to %d servers, "
316 "%d queries placed some shares, %d placed none "
317 "(of which %d placed none due to the server being"
318 " full and %d placed none due to an error)" %
319 (self.servers_of_happiness, self.needed_shares,
320 self.query_count, self.num_servers_contacted,
321 self.good_query_count, self.bad_query_count,
322 self.full_count, self.error_count))
326 if not self.homeless_shares:
327 merged = merge_peers(self.preexisting_shares, self.use_trackers)
328 effective_happiness = servers_of_happiness(merged)
329 if self.servers_of_happiness <= effective_happiness:
330 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
331 "self.use_trackers: %s, self.preexisting_shares: %s") \
332 % (self, self._get_progress_message(),
333 pretty_print_shnum_to_servers(merged),
334 [', '.join([str_shareloc(k,v)
335 for k,v in st.buckets.iteritems()])
336 for st in self.use_trackers],
337 pretty_print_shnum_to_servers(self.preexisting_shares))
338 self.log(msg, level=log.OPERATIONAL)
339 return (self.use_trackers, self.preexisting_shares)
341 # We're not okay right now, but maybe we can fix it by
342 # redistributing some shares. In cases where one or two
343 # servers has, before the upload, all or most of the
344 # shares for a given SI, this can work by allowing _loop
345 # a chance to spread those out over the other servers,
346 delta = self.servers_of_happiness - effective_happiness
347 shares = shares_by_server(self.preexisting_shares)
348 # Each server in shares maps to a set of shares stored on it.
349 # Since we want to keep at least one share on each server
350 # that has one (otherwise we'd only be making
351 # the situation worse by removing distinct servers),
352 # each server has len(its shares) - 1 to spread around.
353 shares_to_spread = sum([len(list(sharelist)) - 1
354 for (server, sharelist)
356 if delta <= len(self.uncontacted_trackers) and \
357 shares_to_spread >= delta:
358 items = shares.items()
359 while len(self.homeless_shares) < delta:
360 # Loop through the allocated shares, removing
361 # one from each server that has more than one
362 # and putting it back into self.homeless_shares
363 # until we've done this delta times.
364 server, sharelist = items.pop()
365 if len(sharelist) > 1:
366 share = sharelist.pop()
367 self.homeless_shares.add(share)
368 self.preexisting_shares[share].remove(server)
369 if not self.preexisting_shares[share]:
370 del self.preexisting_shares[share]
371 items.append((server, sharelist))
372 for writer in self.use_trackers:
373 writer.abort_some_buckets(self.homeless_shares)
376 # Redistribution won't help us; fail.
377 server_count = len(self.servers_with_shares)
378 failmsg = failure_message(server_count,
380 self.servers_of_happiness,
382 servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
383 servmsg = servmsgtempl % (
386 self._get_progress_message(),
387 pretty_print_shnum_to_servers(merged)
389 self.log(servmsg, level=log.INFREQUENT)
390 return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
392 if self.uncontacted_trackers:
393 tracker = self.uncontacted_trackers.pop(0)
394 # TODO: don't pre-convert all serverids to ServerTrackers
395 assert isinstance(tracker, ServerTracker)
397 shares_to_ask = set(sorted(self.homeless_shares)[:1])
398 self.homeless_shares -= shares_to_ask
399 self.query_count += 1
400 self.num_servers_contacted += 1
402 self._status.set_status("Contacting Servers [%s] (first query),"
404 % (idlib.shortnodeid_b2a(tracker.serverid),
405 len(self.homeless_shares)))
406 d = tracker.query(shares_to_ask)
407 d.addBoth(self._got_response, tracker, shares_to_ask,
408 self.contacted_trackers)
410 elif self.contacted_trackers:
411 # ask a server that we've already asked.
412 if not self._started_second_pass:
413 self.log("starting second pass",
415 self._started_second_pass = True
416 num_shares = mathutil.div_ceil(len(self.homeless_shares),
417 len(self.contacted_trackers))
418 tracker = self.contacted_trackers.pop(0)
419 shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
420 self.homeless_shares -= shares_to_ask
421 self.query_count += 1
423 self._status.set_status("Contacting Servers [%s] (second query),"
425 % (idlib.shortnodeid_b2a(tracker.serverid),
426 len(self.homeless_shares)))
427 d = tracker.query(shares_to_ask)
428 d.addBoth(self._got_response, tracker, shares_to_ask,
429 self.contacted_trackers2)
431 elif self.contacted_trackers2:
432 # we've finished the second-or-later pass. Move all the remaining
433 # servers back into self.contacted_trackers for the next pass.
434 self.contacted_trackers.extend(self.contacted_trackers2)
435 self.contacted_trackers2[:] = []
438 # no more servers. If we haven't placed enough shares, we fail.
439 merged = merge_peers(self.preexisting_shares, self.use_trackers)
440 effective_happiness = servers_of_happiness(merged)
441 if effective_happiness < self.servers_of_happiness:
442 msg = failure_message(len(self.servers_with_shares),
444 self.servers_of_happiness,
446 msg = ("server selection failed for %s: %s (%s)" %
447 (self, msg, self._get_progress_message()))
448 if self.last_failure_msg:
449 msg += " (%s)" % (self.last_failure_msg,)
450 self.log(msg, level=log.UNUSUAL)
451 return self._failed(msg)
453 # we placed enough to be happy, so we're done
455 self._status.set_status("Placed all shares")
456 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
457 self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
458 self.log(msg, level=log.OPERATIONAL)
459 return (self.use_trackers, self.preexisting_shares)
461 def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
462 if isinstance(res, failure.Failure):
463 # This is unusual, and probably indicates a bug or a network
465 self.log("%s got error during server selection: %s" % (tracker, res),
467 self.error_count += 1
468 self.bad_query_count += 1
469 self.homeless_shares |= shares_to_ask
470 if (self.uncontacted_trackers
471 or self.contacted_trackers
472 or self.contacted_trackers2):
473 # there is still hope, so just loop
476 # No more servers, so this upload might fail (it depends upon
477 # whether we've hit servers_of_happiness or not). Log the last
478 # failure we got: if a coding error causes all servers to fail
479 # in the same way, this allows the common failure to be seen
480 # by the uploader and should help with debugging
481 msg = ("last failure (from %s) was: %s" % (tracker, res))
482 self.last_failure_msg = msg
484 (alreadygot, allocated) = res
485 self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
486 % (idlib.shortnodeid_b2a(tracker.serverid),
487 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
491 self.preexisting_shares.setdefault(s, set()).add(tracker.serverid)
492 if s in self.homeless_shares:
493 self.homeless_shares.remove(s)
495 elif s in shares_to_ask:
498 # the ServerTracker will remember which shares were allocated on
499 # that peer. We just have to remember to use them.
501 self.use_trackers.add(tracker)
504 if allocated or alreadygot:
505 self.servers_with_shares.add(tracker.serverid)
507 not_yet_present = set(shares_to_ask) - set(alreadygot)
508 still_homeless = not_yet_present - set(allocated)
511 # They accepted at least one of the shares that we asked
512 # them to accept, or they had a share that we didn't ask
513 # them to accept but that we hadn't placed yet, so this
514 # was a productive query
515 self.good_query_count += 1
517 self.bad_query_count += 1
521 # In networks with lots of space, this is very unusual and
522 # probably indicates an error. In networks with servers that
523 # are full, it is merely unusual. In networks that are very
524 # full, it is common, and many uploads will fail. In most
525 # cases, this is obviously not fatal, and we'll just use some
528 # some shares are still homeless, keep trying to find them a
529 # home. The ones that were rejected get first priority.
530 self.homeless_shares |= still_homeless
531 # Since they were unable to accept all of our requests, so it
532 # is safe to assume that asking them again won't help.
534 # if they *were* able to accept everything, they might be
535 # willing to accept even more.
536 put_tracker_here.append(tracker)
542 def _failed(self, msg):
544 I am called when server selection fails. I first abort all of the
545 remote buckets that I allocated during my unsuccessful attempt to
546 place shares for this file. I then raise an
547 UploadUnhappinessError with my msg argument.
549 for tracker in self.use_trackers:
550 assert isinstance(tracker, ServerTracker)
552 raise UploadUnhappinessError(msg)
555 class EncryptAnUploadable:
556 """This is a wrapper that takes an IUploadable and provides
557 IEncryptedUploadable."""
558 implements(IEncryptedUploadable)
561 def __init__(self, original, log_parent=None):
562 self.original = IUploadable(original)
563 self._log_number = log_parent
564 self._encryptor = None
565 self._plaintext_hasher = plaintext_hasher()
566 self._plaintext_segment_hasher = None
567 self._plaintext_segment_hashes = []
568 self._encoding_parameters = None
569 self._file_size = None
570 self._ciphertext_bytes_read = 0
573 def set_upload_status(self, upload_status):
574 self._status = IUploadStatus(upload_status)
575 self.original.set_upload_status(upload_status)
577 def log(self, *args, **kwargs):
578 if "facility" not in kwargs:
579 kwargs["facility"] = "upload.encryption"
580 if "parent" not in kwargs:
581 kwargs["parent"] = self._log_number
582 return log.msg(*args, **kwargs)
585 if self._file_size is not None:
586 return defer.succeed(self._file_size)
587 d = self.original.get_size()
589 self._file_size = size
591 self._status.set_size(size)
593 d.addCallback(_got_size)
596 def get_all_encoding_parameters(self):
597 if self._encoding_parameters is not None:
598 return defer.succeed(self._encoding_parameters)
599 d = self.original.get_all_encoding_parameters()
600 def _got(encoding_parameters):
601 (k, happy, n, segsize) = encoding_parameters
602 self._segment_size = segsize # used by segment hashers
603 self._encoding_parameters = encoding_parameters
604 self.log("my encoding parameters: %s" % (encoding_parameters,),
606 return encoding_parameters
610 def _get_encryptor(self):
612 return defer.succeed(self._encryptor)
614 d = self.original.get_encryption_key()
619 storage_index = storage_index_hash(key)
620 assert isinstance(storage_index, str)
621 # There's no point to having the SI be longer than the key, so we
622 # specify that it is truncated to the same 128 bits as the AES key.
623 assert len(storage_index) == 16 # SHA-256 truncated to 128b
624 self._storage_index = storage_index
626 self._status.set_storage_index(storage_index)
631 def get_storage_index(self):
632 d = self._get_encryptor()
633 d.addCallback(lambda res: self._storage_index)
636 def _get_segment_hasher(self):
637 p = self._plaintext_segment_hasher
639 left = self._segment_size - self._plaintext_segment_hashed_bytes
641 p = plaintext_segment_hasher()
642 self._plaintext_segment_hasher = p
643 self._plaintext_segment_hashed_bytes = 0
644 return p, self._segment_size
646 def _update_segment_hash(self, chunk):
648 while offset < len(chunk):
649 p, segment_left = self._get_segment_hasher()
650 chunk_left = len(chunk) - offset
651 this_segment = min(chunk_left, segment_left)
652 p.update(chunk[offset:offset+this_segment])
653 self._plaintext_segment_hashed_bytes += this_segment
655 if self._plaintext_segment_hashed_bytes == self._segment_size:
656 # we've filled this segment
657 self._plaintext_segment_hashes.append(p.digest())
658 self._plaintext_segment_hasher = None
659 self.log("closed hash [%d]: %dB" %
660 (len(self._plaintext_segment_hashes)-1,
661 self._plaintext_segment_hashed_bytes),
663 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
664 segnum=len(self._plaintext_segment_hashes)-1,
665 hash=base32.b2a(p.digest()),
668 offset += this_segment
671 def read_encrypted(self, length, hash_only):
672 # make sure our parameters have been set up first
673 d = self.get_all_encoding_parameters()
675 d.addCallback(lambda ignored: self.get_size())
676 d.addCallback(lambda ignored: self._get_encryptor())
677 # then fetch and encrypt the plaintext. The unusual structure here
678 # (passing a Deferred *into* a function) is needed to avoid
679 # overflowing the stack: Deferreds don't optimize out tail recursion.
680 # We also pass in a list, to which _read_encrypted will append
683 d2 = defer.Deferred()
684 d.addCallback(lambda ignored:
685 self._read_encrypted(length, ciphertext, hash_only, d2))
686 d.addCallback(lambda ignored: d2)
689 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
691 fire_when_done.callback(ciphertext)
693 # tolerate large length= values without consuming a lot of RAM by
694 # reading just a chunk (say 50kB) at a time. This only really matters
695 # when hash_only==True (i.e. resuming an interrupted upload), since
696 # that's the case where we will be skipping over a lot of data.
697 size = min(remaining, self.CHUNKSIZE)
698 remaining = remaining - size
699 # read a chunk of plaintext..
700 d = defer.maybeDeferred(self.original.read, size)
701 # N.B.: if read() is synchronous, then since everything else is
702 # actually synchronous too, we'd blow the stack unless we stall for a
703 # tick. Once you accept a Deferred from IUploadable.read(), you must
704 # be prepared to have it fire immediately too.
705 d.addCallback(fireEventually)
706 def _good(plaintext):
708 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
709 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
710 ciphertext.extend(ct)
711 self._read_encrypted(remaining, ciphertext, hash_only,
714 fire_when_done.errback(why)
719 def _hash_and_encrypt_plaintext(self, data, hash_only):
720 assert isinstance(data, (tuple, list)), type(data)
723 # we use data.pop(0) instead of 'for chunk in data' to save
724 # memory: each chunk is destroyed as soon as we're done with it.
728 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
730 bytes_processed += len(chunk)
731 self._plaintext_hasher.update(chunk)
732 self._update_segment_hash(chunk)
733 # TODO: we have to encrypt the data (even if hash_only==True)
734 # because pycryptopp's AES-CTR implementation doesn't offer a
735 # way to change the counter value. Once pycryptopp acquires
736 # this ability, change this to simply update the counter
737 # before each call to (hash_only==False) _encryptor.process()
738 ciphertext = self._encryptor.process(chunk)
740 self.log(" skipping encryption", level=log.NOISY)
742 cryptdata.append(ciphertext)
745 self._ciphertext_bytes_read += bytes_processed
747 progress = float(self._ciphertext_bytes_read) / self._file_size
748 self._status.set_progress(1, progress)
752 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
753 # this is currently unused, but will live again when we fix #453
754 if len(self._plaintext_segment_hashes) < num_segments:
755 # close out the last one
756 assert len(self._plaintext_segment_hashes) == num_segments-1
757 p, segment_left = self._get_segment_hasher()
758 self._plaintext_segment_hashes.append(p.digest())
759 del self._plaintext_segment_hasher
760 self.log("closing plaintext leaf hasher, hashed %d bytes" %
761 self._plaintext_segment_hashed_bytes,
763 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
764 segnum=len(self._plaintext_segment_hashes)-1,
765 hash=base32.b2a(p.digest()),
767 assert len(self._plaintext_segment_hashes) == num_segments
768 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
770 def get_plaintext_hash(self):
771 h = self._plaintext_hasher.digest()
772 return defer.succeed(h)
775 return self.original.close()
778 implements(IUploadStatus)
779 statusid_counter = itertools.count(0)
782 self.storage_index = None
785 self.status = "Not started"
786 self.progress = [0.0, 0.0, 0.0]
789 self.counter = self.statusid_counter.next()
790 self.started = time.time()
792 def get_started(self):
794 def get_storage_index(self):
795 return self.storage_index
798 def using_helper(self):
800 def get_status(self):
802 def get_progress(self):
803 return tuple(self.progress)
804 def get_active(self):
806 def get_results(self):
808 def get_counter(self):
811 def set_storage_index(self, si):
812 self.storage_index = si
813 def set_size(self, size):
815 def set_helper(self, helper):
817 def set_status(self, status):
819 def set_progress(self, which, value):
820 # [0]: chk, [1]: ciphertext, [2]: encode+push
821 self.progress[which] = value
822 def set_active(self, value):
824 def set_results(self, value):
828 server_selector_class = Tahoe2ServerSelector
830 def __init__(self, storage_broker, secret_holder):
831 # server_selector needs storage_broker and secret_holder
832 self._storage_broker = storage_broker
833 self._secret_holder = secret_holder
834 self._log_number = self.log("CHKUploader starting", parent=None)
836 self._results = UploadResults()
837 self._storage_index = None
838 self._upload_status = UploadStatus()
839 self._upload_status.set_helper(False)
840 self._upload_status.set_active(True)
841 self._upload_status.set_results(self._results)
843 # locate_all_shareholders() will create the following attribute:
844 # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
846 def log(self, *args, **kwargs):
847 if "parent" not in kwargs:
848 kwargs["parent"] = self._log_number
849 if "facility" not in kwargs:
850 kwargs["facility"] = "tahoe.upload"
851 return log.msg(*args, **kwargs)
853 def start(self, encrypted_uploadable):
854 """Start uploading the file.
856 Returns a Deferred that will fire with the UploadResults instance.
859 self._started = time.time()
860 eu = IEncryptedUploadable(encrypted_uploadable)
861 self.log("starting upload of %s" % eu)
863 eu.set_upload_status(self._upload_status)
864 d = self.start_encrypted(eu)
865 def _done(uploadresults):
866 self._upload_status.set_active(False)
872 """Call this if the upload must be abandoned before it completes.
873 This will tell the shareholders to delete their partial shares. I
874 return a Deferred that fires when these messages have been acked."""
875 if not self._encoder:
876 # how did you call abort() before calling start() ?
877 return defer.succeed(None)
878 return self._encoder.abort()
880 def start_encrypted(self, encrypted):
881 """ Returns a Deferred that will fire with the UploadResults instance. """
882 eu = IEncryptedUploadable(encrypted)
884 started = time.time()
885 self._encoder = e = encode.Encoder(self._log_number,
887 d = e.set_encrypted_uploadable(eu)
888 d.addCallback(self.locate_all_shareholders, started)
889 d.addCallback(self.set_shareholders, e)
890 d.addCallback(lambda res: e.start())
891 d.addCallback(self._encrypted_done)
894 def locate_all_shareholders(self, encoder, started):
895 server_selection_started = now = time.time()
896 self._storage_index_elapsed = now - started
897 storage_broker = self._storage_broker
898 secret_holder = self._secret_holder
899 storage_index = encoder.get_param("storage_index")
900 self._storage_index = storage_index
901 upload_id = si_b2a(storage_index)[:5]
902 self.log("using storage index %s" % upload_id)
903 server_selector = self.server_selector_class(upload_id,
907 share_size = encoder.get_param("share_size")
908 block_size = encoder.get_param("block_size")
909 num_segments = encoder.get_param("num_segments")
910 k,desired,n = encoder.get_param("share_counts")
912 self._server_selection_started = time.time()
913 d = server_selector.get_shareholders(storage_broker, secret_holder,
915 share_size, block_size,
916 num_segments, n, k, desired)
918 self._server_selection_elapsed = time.time() - server_selection_started
923 def set_shareholders(self, (upload_servers, already_servers), encoder):
925 @param upload_servers: a sequence of ServerTracker objects that
926 have agreed to hold some shares for us (the
927 shareids are stashed inside the ServerTracker)
928 @paran already_servers: a dict mapping sharenum to a set of serverids
929 that claim to already have this share
931 msgtempl = "set_shareholders; upload_servers is %s, already_servers is %s"
932 values = ([', '.join([str_shareloc(k,v) for k,v in s.buckets.iteritems()])
933 for s in upload_servers], already_servers)
934 self.log(msgtempl % values, level=log.OPERATIONAL)
935 # record already-present shares in self._results
936 self._results.preexisting_shares = len(already_servers)
938 self._server_trackers = {} # k: shnum, v: instance of ServerTracker
939 for server in upload_servers:
940 assert isinstance(server, ServerTracker)
942 servermap = already_servers.copy()
943 for server in upload_servers:
944 buckets.update(server.buckets)
945 for shnum in server.buckets:
946 self._server_trackers[shnum] = server
947 servermap.setdefault(shnum, set()).add(server.serverid)
948 assert len(buckets) == sum([len(server.buckets)
949 for server in upload_servers]), \
950 "%s (%s) != %s (%s)" % (
953 sum([len(server.buckets) for server in upload_servers]),
954 [(s.buckets, s.serverid) for s in upload_servers]
956 encoder.set_shareholders(buckets, servermap)
958 def _encrypted_done(self, verifycap):
959 """ Returns a Deferred that will fire with the UploadResults instance. """
961 for shnum in self._encoder.get_shares_placed():
962 server_tracker = self._server_trackers[shnum]
963 serverid = server_tracker.serverid
964 r.sharemap.add(shnum, serverid)
965 r.servermap.add(serverid, shnum)
966 r.pushed_shares = len(self._encoder.get_shares_placed())
968 r.file_size = self._encoder.file_size
969 r.timings["total"] = now - self._started
970 r.timings["storage_index"] = self._storage_index_elapsed
971 r.timings["peer_selection"] = self._server_selection_elapsed
972 r.timings.update(self._encoder.get_times())
973 r.uri_extension_data = self._encoder.get_uri_extension_data()
974 r.verifycapstr = verifycap.to_string()
977 def get_upload_status(self):
978 return self._upload_status
980 def read_this_many_bytes(uploadable, size, prepend_data=[]):
982 return defer.succeed([])
983 d = uploadable.read(size)
985 assert isinstance(data, list)
986 bytes = sum([len(piece) for piece in data])
989 remaining = size - bytes
991 return read_this_many_bytes(uploadable, remaining,
993 return prepend_data + data
997 class LiteralUploader:
1000 self._results = UploadResults()
1001 self._status = s = UploadStatus()
1002 s.set_storage_index(None)
1004 s.set_progress(0, 1.0)
1006 s.set_results(self._results)
1008 def start(self, uploadable):
1009 uploadable = IUploadable(uploadable)
1010 d = uploadable.get_size()
1011 def _got_size(size):
1013 self._status.set_size(size)
1014 self._results.file_size = size
1015 return read_this_many_bytes(uploadable, size)
1016 d.addCallback(_got_size)
1017 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
1018 d.addCallback(lambda u: u.to_string())
1019 d.addCallback(self._build_results)
1022 def _build_results(self, uri):
1023 self._results.uri = uri
1024 self._status.set_status("Finished")
1025 self._status.set_progress(1, 1.0)
1026 self._status.set_progress(2, 1.0)
1027 return self._results
1032 def get_upload_status(self):
1035 class RemoteEncryptedUploadable(Referenceable):
1036 implements(RIEncryptedUploadable)
1038 def __init__(self, encrypted_uploadable, upload_status):
1039 self._eu = IEncryptedUploadable(encrypted_uploadable)
1041 self._bytes_sent = 0
1042 self._status = IUploadStatus(upload_status)
1043 # we are responsible for updating the status string while we run, and
1044 # for setting the ciphertext-fetch progress.
1048 if self._size is not None:
1049 return defer.succeed(self._size)
1050 d = self._eu.get_size()
1051 def _got_size(size):
1054 d.addCallback(_got_size)
1057 def remote_get_size(self):
1058 return self.get_size()
1059 def remote_get_all_encoding_parameters(self):
1060 return self._eu.get_all_encoding_parameters()
1062 def _read_encrypted(self, length, hash_only):
1063 d = self._eu.read_encrypted(length, hash_only)
1066 self._offset += length
1068 size = sum([len(data) for data in strings])
1069 self._offset += size
1071 d.addCallback(_read)
1074 def remote_read_encrypted(self, offset, length):
1075 # we don't support seek backwards, but we allow skipping forwards
1076 precondition(offset >= 0, offset)
1077 precondition(length >= 0, length)
1078 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1080 precondition(offset >= self._offset, offset, self._offset)
1081 if offset > self._offset:
1082 # read the data from disk anyways, to build up the hash tree
1083 skip = offset - self._offset
1084 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1085 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1086 d = self._read_encrypted(skip, hash_only=True)
1088 d = defer.succeed(None)
1090 def _at_correct_offset(res):
1091 assert offset == self._offset, "%d != %d" % (offset, self._offset)
1092 return self._read_encrypted(length, hash_only=False)
1093 d.addCallback(_at_correct_offset)
1096 size = sum([len(data) for data in strings])
1097 self._bytes_sent += size
1099 d.addCallback(_read)
1102 def remote_close(self):
1103 return self._eu.close()
1106 class AssistedUploader:
1108 def __init__(self, helper):
1109 self._helper = helper
1110 self._log_number = log.msg("AssistedUploader starting")
1111 self._storage_index = None
1112 self._upload_status = s = UploadStatus()
1116 def log(self, *args, **kwargs):
1117 if "parent" not in kwargs:
1118 kwargs["parent"] = self._log_number
1119 return log.msg(*args, **kwargs)
1121 def start(self, encrypted_uploadable, storage_index):
1122 """Start uploading the file.
1124 Returns a Deferred that will fire with the UploadResults instance.
1126 precondition(isinstance(storage_index, str), storage_index)
1127 self._started = time.time()
1128 eu = IEncryptedUploadable(encrypted_uploadable)
1129 eu.set_upload_status(self._upload_status)
1130 self._encuploadable = eu
1131 self._storage_index = storage_index
1133 d.addCallback(self._got_size)
1134 d.addCallback(lambda res: eu.get_all_encoding_parameters())
1135 d.addCallback(self._got_all_encoding_parameters)
1136 d.addCallback(self._contact_helper)
1137 d.addCallback(self._build_verifycap)
1139 self._upload_status.set_active(False)
1144 def _got_size(self, size):
1146 self._upload_status.set_size(size)
1148 def _got_all_encoding_parameters(self, params):
1149 k, happy, n, segment_size = params
1150 # stash these for URI generation later
1151 self._needed_shares = k
1152 self._total_shares = n
1153 self._segment_size = segment_size
1155 def _contact_helper(self, res):
1156 now = self._time_contacting_helper_start = time.time()
1157 self._storage_index_elapsed = now - self._started
1158 self.log(format="contacting helper for SI %(si)s..",
1159 si=si_b2a(self._storage_index), level=log.NOISY)
1160 self._upload_status.set_status("Contacting Helper")
1161 d = self._helper.callRemote("upload_chk", self._storage_index)
1162 d.addCallback(self._contacted_helper)
1165 def _contacted_helper(self, (upload_results, upload_helper)):
1167 elapsed = now - self._time_contacting_helper_start
1168 self._elapsed_time_contacting_helper = elapsed
1170 self.log("helper says we need to upload", level=log.NOISY)
1171 self._upload_status.set_status("Uploading Ciphertext")
1172 # we need to upload the file
1173 reu = RemoteEncryptedUploadable(self._encuploadable,
1174 self._upload_status)
1175 # let it pre-compute the size for progress purposes
1177 d.addCallback(lambda ignored:
1178 upload_helper.callRemote("upload", reu))
1179 # this Deferred will fire with the upload results
1181 self.log("helper says file is already uploaded", level=log.OPERATIONAL)
1182 self._upload_status.set_progress(1, 1.0)
1183 self._upload_status.set_results(upload_results)
1184 return upload_results
1186 def _convert_old_upload_results(self, upload_results):
1187 # pre-1.3.0 helpers return upload results which contain a mapping
1188 # from shnum to a single human-readable string, containing things
1189 # like "Found on [x],[y],[z]" (for healthy files that were already in
1190 # the grid), "Found on [x]" (for files that needed upload but which
1191 # discovered pre-existing shares), and "Placed on [x]" (for newly
1192 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1193 # set of binary serverid strings.
1195 # the old results are too hard to deal with (they don't even contain
1196 # as much information as the new results, since the nodeids are
1197 # abbreviated), so if we detect old results, just clobber them.
1199 sharemap = upload_results.sharemap
1200 if str in [type(v) for v in sharemap.values()]:
1201 upload_results.sharemap = None
1203 def _build_verifycap(self, upload_results):
1204 self.log("upload finished, building readcap", level=log.OPERATIONAL)
1205 self._convert_old_upload_results(upload_results)
1206 self._upload_status.set_status("Building Readcap")
1208 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1209 assert r.uri_extension_data["total_shares"] == self._total_shares
1210 assert r.uri_extension_data["segment_size"] == self._segment_size
1211 assert r.uri_extension_data["size"] == self._size
1212 r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1213 uri_extension_hash=r.uri_extension_hash,
1214 needed_shares=self._needed_shares,
1215 total_shares=self._total_shares, size=self._size
1218 r.file_size = self._size
1219 r.timings["storage_index"] = self._storage_index_elapsed
1220 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1221 if "total" in r.timings:
1222 r.timings["helper_total"] = r.timings["total"]
1223 r.timings["total"] = now - self._started
1224 self._upload_status.set_status("Finished")
1225 self._upload_status.set_results(r)
1228 def get_upload_status(self):
1229 return self._upload_status
1231 class BaseUploadable:
1232 # this is overridden by max_segment_size
1233 default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
1234 default_encoding_param_k = 3 # overridden by encoding_parameters
1235 default_encoding_param_happy = 7
1236 default_encoding_param_n = 10
1238 max_segment_size = None
1239 encoding_param_k = None
1240 encoding_param_happy = None
1241 encoding_param_n = None
1243 _all_encoding_parameters = None
1246 def set_upload_status(self, upload_status):
1247 self._status = IUploadStatus(upload_status)
1249 def set_default_encoding_parameters(self, default_params):
1250 assert isinstance(default_params, dict)
1251 for k,v in default_params.items():
1252 precondition(isinstance(k, str), k, v)
1253 precondition(isinstance(v, int), k, v)
1254 if "k" in default_params:
1255 self.default_encoding_param_k = default_params["k"]
1256 if "happy" in default_params:
1257 self.default_encoding_param_happy = default_params["happy"]
1258 if "n" in default_params:
1259 self.default_encoding_param_n = default_params["n"]
1260 if "max_segment_size" in default_params:
1261 self.default_max_segment_size = default_params["max_segment_size"]
1263 def get_all_encoding_parameters(self):
1264 if self._all_encoding_parameters:
1265 return defer.succeed(self._all_encoding_parameters)
1267 max_segsize = self.max_segment_size or self.default_max_segment_size
1268 k = self.encoding_param_k or self.default_encoding_param_k
1269 happy = self.encoding_param_happy or self.default_encoding_param_happy
1270 n = self.encoding_param_n or self.default_encoding_param_n
1273 def _got_size(file_size):
1274 # for small files, shrink the segment size to avoid wasting space
1275 segsize = min(max_segsize, file_size)
1276 # this must be a multiple of 'required_shares'==k
1277 segsize = mathutil.next_multiple(segsize, k)
1278 encoding_parameters = (k, happy, n, segsize)
1279 self._all_encoding_parameters = encoding_parameters
1280 return encoding_parameters
1281 d.addCallback(_got_size)
1284 class FileHandle(BaseUploadable):
1285 implements(IUploadable)
1287 def __init__(self, filehandle, convergence):
1289 Upload the data from the filehandle. If convergence is None then a
1290 random encryption key will be used, else the plaintext will be hashed,
1291 then the hash will be hashed together with the string in the
1292 "convergence" argument to form the encryption key.
1294 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1295 self._filehandle = filehandle
1297 self.convergence = convergence
1300 def _get_encryption_key_convergent(self):
1301 if self._key is not None:
1302 return defer.succeed(self._key)
1305 # that sets self._size as a side-effect
1306 d.addCallback(lambda size: self.get_all_encoding_parameters())
1308 k, happy, n, segsize = params
1309 f = self._filehandle
1310 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1315 data = f.read(BLOCKSIZE)
1318 enckey_hasher.update(data)
1319 # TODO: setting progress in a non-yielding loop is kind of
1320 # pointless, but I'm anticipating (perhaps prematurely) the
1321 # day when we use a slowjob or twisted's CooperatorService to
1322 # make this yield time to other jobs.
1323 bytes_read += len(data)
1325 self._status.set_progress(0, float(bytes_read)/self._size)
1327 self._key = enckey_hasher.digest()
1329 self._status.set_progress(0, 1.0)
1330 assert len(self._key) == 16
1335 def _get_encryption_key_random(self):
1336 if self._key is None:
1337 self._key = os.urandom(16)
1338 return defer.succeed(self._key)
1340 def get_encryption_key(self):
1341 if self.convergence is not None:
1342 return self._get_encryption_key_convergent()
1344 return self._get_encryption_key_random()
1347 if self._size is not None:
1348 return defer.succeed(self._size)
1349 self._filehandle.seek(0,2)
1350 size = self._filehandle.tell()
1352 self._filehandle.seek(0)
1353 return defer.succeed(size)
1355 def read(self, length):
1356 return defer.succeed([self._filehandle.read(length)])
1359 # the originator of the filehandle reserves the right to close it
1362 class FileName(FileHandle):
1363 def __init__(self, filename, convergence):
1365 Upload the data from the filename. If convergence is None then a
1366 random encryption key will be used, else the plaintext will be hashed,
1367 then the hash will be hashed together with the string in the
1368 "convergence" argument to form the encryption key.
1370 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1371 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1373 FileHandle.close(self)
1374 self._filehandle.close()
1376 class Data(FileHandle):
1377 def __init__(self, data, convergence):
1379 Upload the data from the data argument. If convergence is None then a
1380 random encryption key will be used, else the plaintext will be hashed,
1381 then the hash will be hashed together with the string in the
1382 "convergence" argument to form the encryption key.
1384 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1385 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1387 class Uploader(service.MultiService, log.PrefixingLogMixin):
1388 """I am a service that allows file uploading. I am a service-child of the
1391 implements(IUploader)
1393 URI_LIT_SIZE_THRESHOLD = 55
1395 def __init__(self, helper_furl=None, stats_provider=None):
1396 self._helper_furl = helper_furl
1397 self.stats_provider = stats_provider
1399 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1400 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1401 service.MultiService.__init__(self)
1403 def startService(self):
1404 service.MultiService.startService(self)
1405 if self._helper_furl:
1406 self.parent.tub.connectTo(self._helper_furl,
1409 def _got_helper(self, helper):
1410 self.log("got helper connection, getting versions")
1411 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1413 "application-version": "unknown: no get_version()",
1415 d = add_version_to_remote_reference(helper, default)
1416 d.addCallback(self._got_versioned_helper)
1418 def _got_versioned_helper(self, helper):
1419 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1420 if needed not in helper.version:
1421 raise InsufficientVersionError(needed, helper.version)
1422 self._helper = helper
1423 helper.notifyOnDisconnect(self._lost_helper)
1425 def _lost_helper(self):
1428 def get_helper_info(self):
1429 # return a tuple of (helper_furl_or_None, connected_bool)
1430 return (self._helper_furl, bool(self._helper))
1433 def upload(self, uploadable, history=None):
1435 Returns a Deferred that will fire with the UploadResults instance.
1440 uploadable = IUploadable(uploadable)
1441 d = uploadable.get_size()
1442 def _got_size(size):
1443 default_params = self.parent.get_encoding_parameters()
1444 precondition(isinstance(default_params, dict), default_params)
1445 precondition("max_segment_size" in default_params, default_params)
1446 uploadable.set_default_encoding_parameters(default_params)
1448 if self.stats_provider:
1449 self.stats_provider.count('uploader.files_uploaded', 1)
1450 self.stats_provider.count('uploader.bytes_uploaded', size)
1452 if size <= self.URI_LIT_SIZE_THRESHOLD:
1453 uploader = LiteralUploader()
1454 return uploader.start(uploadable)
1456 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1457 d2 = defer.succeed(None)
1459 uploader = AssistedUploader(self._helper)
1460 d2.addCallback(lambda x: eu.get_storage_index())
1461 d2.addCallback(lambda si: uploader.start(eu, si))
1463 storage_broker = self.parent.get_storage_broker()
1464 secret_holder = self.parent._secret_holder
1465 uploader = CHKUploader(storage_broker, secret_holder)
1466 d2.addCallback(lambda x: uploader.start(eu))
1468 self._all_uploads[uploader] = None
1470 history.add_upload(uploader.get_upload_status())
1471 def turn_verifycap_into_read_cap(uploadresults):
1472 # Generate the uri from the verifycap plus the key.
1473 d3 = uploadable.get_encryption_key()
1474 def put_readcap_into_results(key):
1475 v = uri.from_string(uploadresults.verifycapstr)
1476 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1477 uploadresults.uri = r.to_string()
1478 return uploadresults
1479 d3.addCallback(put_readcap_into_results)
1481 d2.addCallback(turn_verifycap_into_read_cap)
1483 d.addCallback(_got_size)