1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9 file_cancel_secret_hash, bucket_renewal_secret_hash, \
10 bucket_cancel_secret_hash, plaintext_hasher, \
11 storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17 shares_by_server, merge_servers, \
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23 NoServersError, InsufficientVersionError, UploadUnhappinessError, \
24 DEFAULT_MAX_SEGMENT_SIZE
25 from allmydata.immutable import layout
26 from pycryptopp.cipher.aes import AES
28 from cStringIO import StringIO
31 # this wants to live in storage, not here
32 class TooFullError(Exception):
35 class UploadResults(Copyable, RemoteCopy):
36 implements(IUploadResults)
37 # note: don't change this string, it needs to match the value used on the
38 # helper, and it does *not* need to match the fully-qualified
39 # package/module/class name
40 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
43 # also, think twice about changing the shape of any existing attribute,
44 # because instances of this class are sent from the helper to its client,
45 # so changing this may break compatibility. Consider adding new fields
46 # instead of modifying existing ones.
49 self.timings = {} # dict of name to number of seconds
50 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
51 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
53 self.ciphertext_fetched = None # how much the helper fetched
55 self.preexisting_shares = None # count of shares already present
56 self.pushed_shares = None # count of shares we pushed
59 # our current uri_extension is 846 bytes for small files, a few bytes
60 # more for larger ones (since the filesize is encoded in decimal in a
61 # few places). Ask for a little bit more just in case we need it. If
62 # the extension changes size, we can change EXTENSION_SIZE to
63 # allocate a more accurate amount of space.
65 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
68 def pretty_print_shnum_to_servers(s):
69 return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
72 def __init__(self, server,
73 sharesize, blocksize, num_segments, num_share_hashes,
75 bucket_renewal_secret, bucket_cancel_secret):
77 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
78 self.sharesize = sharesize
80 wbp = layout.make_write_bucket_proxy(None, sharesize,
81 blocksize, num_segments,
83 EXTENSION_SIZE, server.get_serverid())
84 self.wbp_class = wbp.__class__ # to create more of them
85 self.allocated_size = wbp.get_allocated_size()
86 self.blocksize = blocksize
87 self.num_segments = num_segments
88 self.num_share_hashes = num_share_hashes
89 self.storage_index = storage_index
91 self.renew_secret = bucket_renewal_secret
92 self.cancel_secret = bucket_cancel_secret
95 return ("<ServerTracker for server %s and SI %s>"
96 % (self._server.name(), si_b2a(self.storage_index)[:5]))
98 def get_serverid(self):
99 return self._server.get_serverid()
101 return self._server.name()
103 def query(self, sharenums):
104 rref = self._server.get_rref()
105 d = rref.callRemote("allocate_buckets",
111 canary=Referenceable())
112 d.addCallback(self._got_reply)
115 def ask_about_existing_shares(self):
116 rref = self._server.get_rref()
117 return rref.callRemote("get_buckets", self.storage_index)
119 def _got_reply(self, (alreadygot, buckets)):
120 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
122 for sharenum, rref in buckets.iteritems():
123 bp = self.wbp_class(rref, self.sharesize,
126 self.num_share_hashes,
128 self._server.get_serverid())
130 self.buckets.update(b)
131 return (alreadygot, set(b.keys()))
136 I abort the remote bucket writers for all shares. This is a good idea
137 to conserve space on the storage server.
139 self.abort_some_buckets(self.buckets.keys())
141 def abort_some_buckets(self, sharenums):
143 I abort the remote bucket writers for the share numbers in sharenums.
145 for sharenum in sharenums:
146 if sharenum in self.buckets:
147 self.buckets[sharenum].abort()
148 del self.buckets[sharenum]
151 def str_shareloc(shnum, bucketwriter):
152 return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),)
154 class Tahoe2ServerSelector(log.PrefixingLogMixin):
156 def __init__(self, upload_id, logparent=None, upload_status=None):
157 self.upload_id = upload_id
158 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
159 # Servers that are working normally, but full.
162 self.num_servers_contacted = 0
163 self.last_failure_msg = None
164 self._status = IUploadStatus(upload_status)
165 log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
166 self.log("starting", level=log.OPERATIONAL)
169 return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
171 def get_shareholders(self, storage_broker, secret_holder,
172 storage_index, share_size, block_size,
173 num_segments, total_shares, needed_shares,
174 servers_of_happiness):
176 @return: (upload_trackers, already_serverids), where upload_trackers
177 is a set of ServerTracker instances that have agreed to hold
178 some shares for us (the shareids are stashed inside the
179 ServerTracker), and already_serverids is a dict mapping
180 shnum to a set of serverids for servers which claim to
181 already have the share.
185 self._status.set_status("Contacting Servers..")
187 self.total_shares = total_shares
188 self.servers_of_happiness = servers_of_happiness
189 self.needed_shares = needed_shares
191 self.homeless_shares = set(range(total_shares))
192 self.use_trackers = set() # ServerTrackers that have shares assigned
194 self.preexisting_shares = {} # shareid => set(serverids) holding shareid
196 # These servers have shares -- any shares -- for our SI. We keep
197 # track of these to write an error message with them later.
198 self.serverids_with_shares = set()
200 # this needed_hashes computation should mirror
201 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
202 # (instead of a HashTree) because we don't require actual hashing
203 # just to count the levels.
204 ht = hashtree.IncompleteHashTree(total_shares)
205 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
207 # figure out how much space to ask for
208 wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
209 num_share_hashes, EXTENSION_SIZE,
211 allocated_size = wbp.get_allocated_size()
212 all_servers = storage_broker.get_servers_for_psi(storage_index)
214 raise NoServersError("client gave us zero servers")
216 # filter the list of servers according to which ones can accomodate
217 # this request. This excludes older servers (which used a 4-byte size
218 # field) from getting large shares (for files larger than about
219 # 12GiB). See #439 for details.
220 def _get_maxsize(server):
221 v0 = server.get_rref().version
222 v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
223 return v1["maximum-immutable-share-size"]
224 writable_servers = [server for server in all_servers
225 if _get_maxsize(server) >= allocated_size]
226 readonly_servers = set(all_servers[:2*total_shares]) - set(writable_servers)
228 # decide upon the renewal/cancel secrets, to include them in the
229 # allocate_buckets query.
230 client_renewal_secret = secret_holder.get_renewal_secret()
231 client_cancel_secret = secret_holder.get_cancel_secret()
233 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
235 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
237 def _make_trackers(servers):
240 seed = s.get_lease_seed()
241 renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
242 cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
243 st = ServerTracker(s,
244 share_size, block_size,
245 num_segments, num_share_hashes,
251 # We assign each servers/trackers into one three lists. They all
252 # start in the "first pass" list. During the first pass, as we ask
253 # each one to hold a share, we move their tracker to the "second
254 # pass" list, until the first-pass list is empty. Then during the
255 # second pass, as we ask each to hold more shares, we move their
256 # tracker to the "next pass" list, until the second-pass list is
257 # empty. Then we move everybody from the next-pass list back to the
258 # second-pass list and repeat the "second" pass (really the third,
259 # fourth, etc pass), until all shares are assigned, or we've run out
260 # of potential servers.
261 self.first_pass_trackers = _make_trackers(writable_servers)
262 self.second_pass_trackers = [] # servers worth asking again
263 self.next_pass_trackers = [] # servers that we have asked again
264 self._started_second_pass = False
266 # We don't try to allocate shares to these servers, since they've
267 # said that they're incapable of storing shares of the size that we'd
268 # want to store. We ask them about existing shares for this storage
269 # index, which we want to know about for accurate
270 # servers_of_happiness accounting, then we forget about them.
271 readonly_trackers = _make_trackers(readonly_servers)
273 # We now ask servers that can't hold any new shares about existing
274 # shares that they might have for our SI. Once this is done, we
275 # start placing the shares that we haven't already accounted
278 if self._status and readonly_trackers:
279 self._status.set_status("Contacting readonly servers to find "
280 "any existing shares")
281 for tracker in readonly_trackers:
282 assert isinstance(tracker, ServerTracker)
283 d = tracker.ask_about_existing_shares()
284 d.addBoth(self._handle_existing_response, tracker)
286 self.num_servers_contacted += 1
287 self.query_count += 1
288 self.log("asking server %s for any existing shares" %
289 (tracker.name(),), level=log.NOISY)
290 dl = defer.DeferredList(ds)
291 dl.addCallback(lambda ign: self._loop())
295 def _handle_existing_response(self, res, tracker):
297 I handle responses to the queries sent by
298 Tahoe2ServerSelector._existing_shares.
300 serverid = tracker.get_serverid()
301 if isinstance(res, failure.Failure):
302 self.log("%s got error during existing shares check: %s"
303 % (tracker.name(), res), level=log.UNUSUAL)
304 self.error_count += 1
305 self.bad_query_count += 1
309 self.serverids_with_shares.add(serverid)
310 self.log("response to get_buckets() from server %s: alreadygot=%s"
311 % (tracker.name(), tuple(sorted(buckets))),
313 for bucket in buckets:
314 self.preexisting_shares.setdefault(bucket, set()).add(serverid)
315 self.homeless_shares.discard(bucket)
317 self.bad_query_count += 1
320 def _get_progress_message(self):
321 if not self.homeless_shares:
322 msg = "placed all %d shares, " % (self.total_shares)
324 msg = ("placed %d shares out of %d total (%d homeless), " %
325 (self.total_shares - len(self.homeless_shares),
327 len(self.homeless_shares)))
328 return (msg + "want to place shares on at least %d servers such that "
329 "any %d of them have enough shares to recover the file, "
330 "sent %d queries to %d servers, "
331 "%d queries placed some shares, %d placed none "
332 "(of which %d placed none due to the server being"
333 " full and %d placed none due to an error)" %
334 (self.servers_of_happiness, self.needed_shares,
335 self.query_count, self.num_servers_contacted,
336 self.good_query_count, self.bad_query_count,
337 self.full_count, self.error_count))
341 if not self.homeless_shares:
342 merged = merge_servers(self.preexisting_shares, self.use_trackers)
343 effective_happiness = servers_of_happiness(merged)
344 if self.servers_of_happiness <= effective_happiness:
345 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
346 "self.use_trackers: %s, self.preexisting_shares: %s") \
347 % (self, self._get_progress_message(),
348 pretty_print_shnum_to_servers(merged),
349 [', '.join([str_shareloc(k,v)
350 for k,v in st.buckets.iteritems()])
351 for st in self.use_trackers],
352 pretty_print_shnum_to_servers(self.preexisting_shares))
353 self.log(msg, level=log.OPERATIONAL)
354 return (self.use_trackers, self.preexisting_shares)
356 # We're not okay right now, but maybe we can fix it by
357 # redistributing some shares. In cases where one or two
358 # servers has, before the upload, all or most of the
359 # shares for a given SI, this can work by allowing _loop
360 # a chance to spread those out over the other servers,
361 delta = self.servers_of_happiness - effective_happiness
362 shares = shares_by_server(self.preexisting_shares)
363 # Each server in shares maps to a set of shares stored on it.
364 # Since we want to keep at least one share on each server
365 # that has one (otherwise we'd only be making
366 # the situation worse by removing distinct servers),
367 # each server has len(its shares) - 1 to spread around.
368 shares_to_spread = sum([len(list(sharelist)) - 1
369 for (server, sharelist)
371 if delta <= len(self.first_pass_trackers) and \
372 shares_to_spread >= delta:
373 items = shares.items()
374 while len(self.homeless_shares) < delta:
375 # Loop through the allocated shares, removing
376 # one from each server that has more than one
377 # and putting it back into self.homeless_shares
378 # until we've done this delta times.
379 server, sharelist = items.pop()
380 if len(sharelist) > 1:
381 share = sharelist.pop()
382 self.homeless_shares.add(share)
383 self.preexisting_shares[share].remove(server)
384 if not self.preexisting_shares[share]:
385 del self.preexisting_shares[share]
386 items.append((server, sharelist))
387 for writer in self.use_trackers:
388 writer.abort_some_buckets(self.homeless_shares)
391 # Redistribution won't help us; fail.
392 server_count = len(self.serverids_with_shares)
393 failmsg = failure_message(server_count,
395 self.servers_of_happiness,
397 servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
398 servmsg = servmsgtempl % (
401 self._get_progress_message(),
402 pretty_print_shnum_to_servers(merged)
404 self.log(servmsg, level=log.INFREQUENT)
405 return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
407 if self.first_pass_trackers:
408 tracker = self.first_pass_trackers.pop(0)
409 # TODO: don't pre-convert all serverids to ServerTrackers
410 assert isinstance(tracker, ServerTracker)
412 shares_to_ask = set(sorted(self.homeless_shares)[:1])
413 self.homeless_shares -= shares_to_ask
414 self.query_count += 1
415 self.num_servers_contacted += 1
417 self._status.set_status("Contacting Servers [%s] (first query),"
420 len(self.homeless_shares)))
421 d = tracker.query(shares_to_ask)
422 d.addBoth(self._got_response, tracker, shares_to_ask,
423 self.second_pass_trackers)
425 elif self.second_pass_trackers:
426 # ask a server that we've already asked.
427 if not self._started_second_pass:
428 self.log("starting second pass",
430 self._started_second_pass = True
431 num_shares = mathutil.div_ceil(len(self.homeless_shares),
432 len(self.second_pass_trackers))
433 tracker = self.second_pass_trackers.pop(0)
434 shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
435 self.homeless_shares -= shares_to_ask
436 self.query_count += 1
438 self._status.set_status("Contacting Servers [%s] (second query),"
441 len(self.homeless_shares)))
442 d = tracker.query(shares_to_ask)
443 d.addBoth(self._got_response, tracker, shares_to_ask,
444 self.next_pass_trackers)
446 elif self.next_pass_trackers:
447 # we've finished the second-or-later pass. Move all the remaining
448 # servers back into self.second_pass_trackers for the next pass.
449 self.second_pass_trackers.extend(self.next_pass_trackers)
450 self.next_pass_trackers[:] = []
453 # no more servers. If we haven't placed enough shares, we fail.
454 merged = merge_servers(self.preexisting_shares, self.use_trackers)
455 effective_happiness = servers_of_happiness(merged)
456 if effective_happiness < self.servers_of_happiness:
457 msg = failure_message(len(self.serverids_with_shares),
459 self.servers_of_happiness,
461 msg = ("server selection failed for %s: %s (%s)" %
462 (self, msg, self._get_progress_message()))
463 if self.last_failure_msg:
464 msg += " (%s)" % (self.last_failure_msg,)
465 self.log(msg, level=log.UNUSUAL)
466 return self._failed(msg)
468 # we placed enough to be happy, so we're done
470 self._status.set_status("Placed all shares")
471 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
472 self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
473 self.log(msg, level=log.OPERATIONAL)
474 return (self.use_trackers, self.preexisting_shares)
476 def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
477 if isinstance(res, failure.Failure):
478 # This is unusual, and probably indicates a bug or a network
480 self.log("%s got error during server selection: %s" % (tracker, res),
482 self.error_count += 1
483 self.bad_query_count += 1
484 self.homeless_shares |= shares_to_ask
485 if (self.first_pass_trackers
486 or self.second_pass_trackers
487 or self.next_pass_trackers):
488 # there is still hope, so just loop
491 # No more servers, so this upload might fail (it depends upon
492 # whether we've hit servers_of_happiness or not). Log the last
493 # failure we got: if a coding error causes all servers to fail
494 # in the same way, this allows the common failure to be seen
495 # by the uploader and should help with debugging
496 msg = ("last failure (from %s) was: %s" % (tracker, res))
497 self.last_failure_msg = msg
499 (alreadygot, allocated) = res
500 self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
502 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
506 self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
507 if s in self.homeless_shares:
508 self.homeless_shares.remove(s)
510 elif s in shares_to_ask:
513 # the ServerTracker will remember which shares were allocated on
514 # that peer. We just have to remember to use them.
516 self.use_trackers.add(tracker)
519 if allocated or alreadygot:
520 self.serverids_with_shares.add(tracker.get_serverid())
522 not_yet_present = set(shares_to_ask) - set(alreadygot)
523 still_homeless = not_yet_present - set(allocated)
526 # They accepted at least one of the shares that we asked
527 # them to accept, or they had a share that we didn't ask
528 # them to accept but that we hadn't placed yet, so this
529 # was a productive query
530 self.good_query_count += 1
532 self.bad_query_count += 1
536 # In networks with lots of space, this is very unusual and
537 # probably indicates an error. In networks with servers that
538 # are full, it is merely unusual. In networks that are very
539 # full, it is common, and many uploads will fail. In most
540 # cases, this is obviously not fatal, and we'll just use some
543 # some shares are still homeless, keep trying to find them a
544 # home. The ones that were rejected get first priority.
545 self.homeless_shares |= still_homeless
546 # Since they were unable to accept all of our requests, so it
547 # is safe to assume that asking them again won't help.
549 # if they *were* able to accept everything, they might be
550 # willing to accept even more.
551 put_tracker_here.append(tracker)
557 def _failed(self, msg):
559 I am called when server selection fails. I first abort all of the
560 remote buckets that I allocated during my unsuccessful attempt to
561 place shares for this file. I then raise an
562 UploadUnhappinessError with my msg argument.
564 for tracker in self.use_trackers:
565 assert isinstance(tracker, ServerTracker)
567 raise UploadUnhappinessError(msg)
570 class EncryptAnUploadable:
571 """This is a wrapper that takes an IUploadable and provides
572 IEncryptedUploadable."""
573 implements(IEncryptedUploadable)
576 def __init__(self, original, log_parent=None):
577 self.original = IUploadable(original)
578 self._log_number = log_parent
579 self._encryptor = None
580 self._plaintext_hasher = plaintext_hasher()
581 self._plaintext_segment_hasher = None
582 self._plaintext_segment_hashes = []
583 self._encoding_parameters = None
584 self._file_size = None
585 self._ciphertext_bytes_read = 0
588 def set_upload_status(self, upload_status):
589 self._status = IUploadStatus(upload_status)
590 self.original.set_upload_status(upload_status)
592 def log(self, *args, **kwargs):
593 if "facility" not in kwargs:
594 kwargs["facility"] = "upload.encryption"
595 if "parent" not in kwargs:
596 kwargs["parent"] = self._log_number
597 return log.msg(*args, **kwargs)
600 if self._file_size is not None:
601 return defer.succeed(self._file_size)
602 d = self.original.get_size()
604 self._file_size = size
606 self._status.set_size(size)
608 d.addCallback(_got_size)
611 def get_all_encoding_parameters(self):
612 if self._encoding_parameters is not None:
613 return defer.succeed(self._encoding_parameters)
614 d = self.original.get_all_encoding_parameters()
615 def _got(encoding_parameters):
616 (k, happy, n, segsize) = encoding_parameters
617 self._segment_size = segsize # used by segment hashers
618 self._encoding_parameters = encoding_parameters
619 self.log("my encoding parameters: %s" % (encoding_parameters,),
621 return encoding_parameters
625 def _get_encryptor(self):
627 return defer.succeed(self._encryptor)
629 d = self.original.get_encryption_key()
634 storage_index = storage_index_hash(key)
635 assert isinstance(storage_index, str)
636 # There's no point to having the SI be longer than the key, so we
637 # specify that it is truncated to the same 128 bits as the AES key.
638 assert len(storage_index) == 16 # SHA-256 truncated to 128b
639 self._storage_index = storage_index
641 self._status.set_storage_index(storage_index)
646 def get_storage_index(self):
647 d = self._get_encryptor()
648 d.addCallback(lambda res: self._storage_index)
651 def _get_segment_hasher(self):
652 p = self._plaintext_segment_hasher
654 left = self._segment_size - self._plaintext_segment_hashed_bytes
656 p = plaintext_segment_hasher()
657 self._plaintext_segment_hasher = p
658 self._plaintext_segment_hashed_bytes = 0
659 return p, self._segment_size
661 def _update_segment_hash(self, chunk):
663 while offset < len(chunk):
664 p, segment_left = self._get_segment_hasher()
665 chunk_left = len(chunk) - offset
666 this_segment = min(chunk_left, segment_left)
667 p.update(chunk[offset:offset+this_segment])
668 self._plaintext_segment_hashed_bytes += this_segment
670 if self._plaintext_segment_hashed_bytes == self._segment_size:
671 # we've filled this segment
672 self._plaintext_segment_hashes.append(p.digest())
673 self._plaintext_segment_hasher = None
674 self.log("closed hash [%d]: %dB" %
675 (len(self._plaintext_segment_hashes)-1,
676 self._plaintext_segment_hashed_bytes),
678 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
679 segnum=len(self._plaintext_segment_hashes)-1,
680 hash=base32.b2a(p.digest()),
683 offset += this_segment
686 def read_encrypted(self, length, hash_only):
687 # make sure our parameters have been set up first
688 d = self.get_all_encoding_parameters()
690 d.addCallback(lambda ignored: self.get_size())
691 d.addCallback(lambda ignored: self._get_encryptor())
692 # then fetch and encrypt the plaintext. The unusual structure here
693 # (passing a Deferred *into* a function) is needed to avoid
694 # overflowing the stack: Deferreds don't optimize out tail recursion.
695 # We also pass in a list, to which _read_encrypted will append
698 d2 = defer.Deferred()
699 d.addCallback(lambda ignored:
700 self._read_encrypted(length, ciphertext, hash_only, d2))
701 d.addCallback(lambda ignored: d2)
704 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
706 fire_when_done.callback(ciphertext)
708 # tolerate large length= values without consuming a lot of RAM by
709 # reading just a chunk (say 50kB) at a time. This only really matters
710 # when hash_only==True (i.e. resuming an interrupted upload), since
711 # that's the case where we will be skipping over a lot of data.
712 size = min(remaining, self.CHUNKSIZE)
713 remaining = remaining - size
714 # read a chunk of plaintext..
715 d = defer.maybeDeferred(self.original.read, size)
716 # N.B.: if read() is synchronous, then since everything else is
717 # actually synchronous too, we'd blow the stack unless we stall for a
718 # tick. Once you accept a Deferred from IUploadable.read(), you must
719 # be prepared to have it fire immediately too.
720 d.addCallback(fireEventually)
721 def _good(plaintext):
723 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
724 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
725 ciphertext.extend(ct)
726 self._read_encrypted(remaining, ciphertext, hash_only,
729 fire_when_done.errback(why)
734 def _hash_and_encrypt_plaintext(self, data, hash_only):
735 assert isinstance(data, (tuple, list)), type(data)
738 # we use data.pop(0) instead of 'for chunk in data' to save
739 # memory: each chunk is destroyed as soon as we're done with it.
743 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
745 bytes_processed += len(chunk)
746 self._plaintext_hasher.update(chunk)
747 self._update_segment_hash(chunk)
748 # TODO: we have to encrypt the data (even if hash_only==True)
749 # because pycryptopp's AES-CTR implementation doesn't offer a
750 # way to change the counter value. Once pycryptopp acquires
751 # this ability, change this to simply update the counter
752 # before each call to (hash_only==False) _encryptor.process()
753 ciphertext = self._encryptor.process(chunk)
755 self.log(" skipping encryption", level=log.NOISY)
757 cryptdata.append(ciphertext)
760 self._ciphertext_bytes_read += bytes_processed
762 progress = float(self._ciphertext_bytes_read) / self._file_size
763 self._status.set_progress(1, progress)
767 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
768 # this is currently unused, but will live again when we fix #453
769 if len(self._plaintext_segment_hashes) < num_segments:
770 # close out the last one
771 assert len(self._plaintext_segment_hashes) == num_segments-1
772 p, segment_left = self._get_segment_hasher()
773 self._plaintext_segment_hashes.append(p.digest())
774 del self._plaintext_segment_hasher
775 self.log("closing plaintext leaf hasher, hashed %d bytes" %
776 self._plaintext_segment_hashed_bytes,
778 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
779 segnum=len(self._plaintext_segment_hashes)-1,
780 hash=base32.b2a(p.digest()),
782 assert len(self._plaintext_segment_hashes) == num_segments
783 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
785 def get_plaintext_hash(self):
786 h = self._plaintext_hasher.digest()
787 return defer.succeed(h)
790 return self.original.close()
793 implements(IUploadStatus)
794 statusid_counter = itertools.count(0)
797 self.storage_index = None
800 self.status = "Not started"
801 self.progress = [0.0, 0.0, 0.0]
804 self.counter = self.statusid_counter.next()
805 self.started = time.time()
807 def get_started(self):
809 def get_storage_index(self):
810 return self.storage_index
813 def using_helper(self):
815 def get_status(self):
817 def get_progress(self):
818 return tuple(self.progress)
819 def get_active(self):
821 def get_results(self):
823 def get_counter(self):
826 def set_storage_index(self, si):
827 self.storage_index = si
828 def set_size(self, size):
830 def set_helper(self, helper):
832 def set_status(self, status):
834 def set_progress(self, which, value):
835 # [0]: chk, [1]: ciphertext, [2]: encode+push
836 self.progress[which] = value
837 def set_active(self, value):
839 def set_results(self, value):
843 server_selector_class = Tahoe2ServerSelector
845 def __init__(self, storage_broker, secret_holder):
846 # server_selector needs storage_broker and secret_holder
847 self._storage_broker = storage_broker
848 self._secret_holder = secret_holder
849 self._log_number = self.log("CHKUploader starting", parent=None)
851 self._results = UploadResults()
852 self._storage_index = None
853 self._upload_status = UploadStatus()
854 self._upload_status.set_helper(False)
855 self._upload_status.set_active(True)
856 self._upload_status.set_results(self._results)
858 # locate_all_shareholders() will create the following attribute:
859 # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
861 def log(self, *args, **kwargs):
862 if "parent" not in kwargs:
863 kwargs["parent"] = self._log_number
864 if "facility" not in kwargs:
865 kwargs["facility"] = "tahoe.upload"
866 return log.msg(*args, **kwargs)
868 def start(self, encrypted_uploadable):
869 """Start uploading the file.
871 Returns a Deferred that will fire with the UploadResults instance.
874 self._started = time.time()
875 eu = IEncryptedUploadable(encrypted_uploadable)
876 self.log("starting upload of %s" % eu)
878 eu.set_upload_status(self._upload_status)
879 d = self.start_encrypted(eu)
880 def _done(uploadresults):
881 self._upload_status.set_active(False)
887 """Call this if the upload must be abandoned before it completes.
888 This will tell the shareholders to delete their partial shares. I
889 return a Deferred that fires when these messages have been acked."""
890 if not self._encoder:
891 # how did you call abort() before calling start() ?
892 return defer.succeed(None)
893 return self._encoder.abort()
895 def start_encrypted(self, encrypted):
896 """ Returns a Deferred that will fire with the UploadResults instance. """
897 eu = IEncryptedUploadable(encrypted)
899 started = time.time()
900 self._encoder = e = encode.Encoder(self._log_number,
902 d = e.set_encrypted_uploadable(eu)
903 d.addCallback(self.locate_all_shareholders, started)
904 d.addCallback(self.set_shareholders, e)
905 d.addCallback(lambda res: e.start())
906 d.addCallback(self._encrypted_done)
909 def locate_all_shareholders(self, encoder, started):
910 server_selection_started = now = time.time()
911 self._storage_index_elapsed = now - started
912 storage_broker = self._storage_broker
913 secret_holder = self._secret_holder
914 storage_index = encoder.get_param("storage_index")
915 self._storage_index = storage_index
916 upload_id = si_b2a(storage_index)[:5]
917 self.log("using storage index %s" % upload_id)
918 server_selector = self.server_selector_class(upload_id,
922 share_size = encoder.get_param("share_size")
923 block_size = encoder.get_param("block_size")
924 num_segments = encoder.get_param("num_segments")
925 k,desired,n = encoder.get_param("share_counts")
927 self._server_selection_started = time.time()
928 d = server_selector.get_shareholders(storage_broker, secret_holder,
930 share_size, block_size,
931 num_segments, n, k, desired)
933 self._server_selection_elapsed = time.time() - server_selection_started
938 def set_shareholders(self, (upload_trackers, already_serverids), encoder):
940 @param upload_trackers: a sequence of ServerTracker objects that
941 have agreed to hold some shares for us (the
942 shareids are stashed inside the ServerTracker)
944 @paran already_serverids: a dict mapping sharenum to a set of
945 serverids for servers that claim to already
948 msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
949 values = ([', '.join([str_shareloc(k,v)
950 for k,v in st.buckets.iteritems()])
951 for st in upload_trackers], already_serverids)
952 self.log(msgtempl % values, level=log.OPERATIONAL)
953 # record already-present shares in self._results
954 self._results.preexisting_shares = len(already_serverids)
956 self._server_trackers = {} # k: shnum, v: instance of ServerTracker
957 for tracker in upload_trackers:
958 assert isinstance(tracker, ServerTracker)
960 servermap = already_serverids.copy()
961 for tracker in upload_trackers:
962 buckets.update(tracker.buckets)
963 for shnum in tracker.buckets:
964 self._server_trackers[shnum] = tracker
965 servermap.setdefault(shnum, set()).add(tracker.get_serverid())
966 assert len(buckets) == sum([len(tracker.buckets)
967 for tracker in upload_trackers]), \
968 "%s (%s) != %s (%s)" % (
971 sum([len(tracker.buckets) for tracker in upload_trackers]),
972 [(t.buckets, t.get_serverid()) for t in upload_trackers]
974 encoder.set_shareholders(buckets, servermap)
976 def _encrypted_done(self, verifycap):
977 """ Returns a Deferred that will fire with the UploadResults instance. """
979 for shnum in self._encoder.get_shares_placed():
980 server_tracker = self._server_trackers[shnum]
981 serverid = server_tracker.get_serverid()
982 r.sharemap.add(shnum, serverid)
983 r.servermap.add(serverid, shnum)
984 r.pushed_shares = len(self._encoder.get_shares_placed())
986 r.file_size = self._encoder.file_size
987 r.timings["total"] = now - self._started
988 r.timings["storage_index"] = self._storage_index_elapsed
989 r.timings["peer_selection"] = self._server_selection_elapsed
990 r.timings.update(self._encoder.get_times())
991 r.uri_extension_data = self._encoder.get_uri_extension_data()
992 r.verifycapstr = verifycap.to_string()
995 def get_upload_status(self):
996 return self._upload_status
998 def read_this_many_bytes(uploadable, size, prepend_data=[]):
1000 return defer.succeed([])
1001 d = uploadable.read(size)
1003 assert isinstance(data, list)
1004 bytes = sum([len(piece) for piece in data])
1006 assert bytes <= size
1007 remaining = size - bytes
1009 return read_this_many_bytes(uploadable, remaining,
1010 prepend_data + data)
1011 return prepend_data + data
1015 class LiteralUploader:
1018 self._results = UploadResults()
1019 self._status = s = UploadStatus()
1020 s.set_storage_index(None)
1022 s.set_progress(0, 1.0)
1024 s.set_results(self._results)
1026 def start(self, uploadable):
1027 uploadable = IUploadable(uploadable)
1028 d = uploadable.get_size()
1029 def _got_size(size):
1031 self._status.set_size(size)
1032 self._results.file_size = size
1033 return read_this_many_bytes(uploadable, size)
1034 d.addCallback(_got_size)
1035 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
1036 d.addCallback(lambda u: u.to_string())
1037 d.addCallback(self._build_results)
1040 def _build_results(self, uri):
1041 self._results.uri = uri
1042 self._status.set_status("Finished")
1043 self._status.set_progress(1, 1.0)
1044 self._status.set_progress(2, 1.0)
1045 return self._results
1050 def get_upload_status(self):
1053 class RemoteEncryptedUploadable(Referenceable):
1054 implements(RIEncryptedUploadable)
1056 def __init__(self, encrypted_uploadable, upload_status):
1057 self._eu = IEncryptedUploadable(encrypted_uploadable)
1059 self._bytes_sent = 0
1060 self._status = IUploadStatus(upload_status)
1061 # we are responsible for updating the status string while we run, and
1062 # for setting the ciphertext-fetch progress.
1066 if self._size is not None:
1067 return defer.succeed(self._size)
1068 d = self._eu.get_size()
1069 def _got_size(size):
1072 d.addCallback(_got_size)
1075 def remote_get_size(self):
1076 return self.get_size()
1077 def remote_get_all_encoding_parameters(self):
1078 return self._eu.get_all_encoding_parameters()
1080 def _read_encrypted(self, length, hash_only):
1081 d = self._eu.read_encrypted(length, hash_only)
1084 self._offset += length
1086 size = sum([len(data) for data in strings])
1087 self._offset += size
1089 d.addCallback(_read)
1092 def remote_read_encrypted(self, offset, length):
1093 # we don't support seek backwards, but we allow skipping forwards
1094 precondition(offset >= 0, offset)
1095 precondition(length >= 0, length)
1096 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1098 precondition(offset >= self._offset, offset, self._offset)
1099 if offset > self._offset:
1100 # read the data from disk anyways, to build up the hash tree
1101 skip = offset - self._offset
1102 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1103 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1104 d = self._read_encrypted(skip, hash_only=True)
1106 d = defer.succeed(None)
1108 def _at_correct_offset(res):
1109 assert offset == self._offset, "%d != %d" % (offset, self._offset)
1110 return self._read_encrypted(length, hash_only=False)
1111 d.addCallback(_at_correct_offset)
1114 size = sum([len(data) for data in strings])
1115 self._bytes_sent += size
1117 d.addCallback(_read)
1120 def remote_close(self):
1121 return self._eu.close()
1124 class AssistedUploader:
1126 def __init__(self, helper):
1127 self._helper = helper
1128 self._log_number = log.msg("AssistedUploader starting")
1129 self._storage_index = None
1130 self._upload_status = s = UploadStatus()
1134 def log(self, *args, **kwargs):
1135 if "parent" not in kwargs:
1136 kwargs["parent"] = self._log_number
1137 return log.msg(*args, **kwargs)
1139 def start(self, encrypted_uploadable, storage_index):
1140 """Start uploading the file.
1142 Returns a Deferred that will fire with the UploadResults instance.
1144 precondition(isinstance(storage_index, str), storage_index)
1145 self._started = time.time()
1146 eu = IEncryptedUploadable(encrypted_uploadable)
1147 eu.set_upload_status(self._upload_status)
1148 self._encuploadable = eu
1149 self._storage_index = storage_index
1151 d.addCallback(self._got_size)
1152 d.addCallback(lambda res: eu.get_all_encoding_parameters())
1153 d.addCallback(self._got_all_encoding_parameters)
1154 d.addCallback(self._contact_helper)
1155 d.addCallback(self._build_verifycap)
1157 self._upload_status.set_active(False)
1162 def _got_size(self, size):
1164 self._upload_status.set_size(size)
1166 def _got_all_encoding_parameters(self, params):
1167 k, happy, n, segment_size = params
1168 # stash these for URI generation later
1169 self._needed_shares = k
1170 self._total_shares = n
1171 self._segment_size = segment_size
1173 def _contact_helper(self, res):
1174 now = self._time_contacting_helper_start = time.time()
1175 self._storage_index_elapsed = now - self._started
1176 self.log(format="contacting helper for SI %(si)s..",
1177 si=si_b2a(self._storage_index), level=log.NOISY)
1178 self._upload_status.set_status("Contacting Helper")
1179 d = self._helper.callRemote("upload_chk", self._storage_index)
1180 d.addCallback(self._contacted_helper)
1183 def _contacted_helper(self, (upload_results, upload_helper)):
1185 elapsed = now - self._time_contacting_helper_start
1186 self._elapsed_time_contacting_helper = elapsed
1188 self.log("helper says we need to upload", level=log.NOISY)
1189 self._upload_status.set_status("Uploading Ciphertext")
1190 # we need to upload the file
1191 reu = RemoteEncryptedUploadable(self._encuploadable,
1192 self._upload_status)
1193 # let it pre-compute the size for progress purposes
1195 d.addCallback(lambda ignored:
1196 upload_helper.callRemote("upload", reu))
1197 # this Deferred will fire with the upload results
1199 self.log("helper says file is already uploaded", level=log.OPERATIONAL)
1200 self._upload_status.set_progress(1, 1.0)
1201 self._upload_status.set_results(upload_results)
1202 return upload_results
1204 def _convert_old_upload_results(self, upload_results):
1205 # pre-1.3.0 helpers return upload results which contain a mapping
1206 # from shnum to a single human-readable string, containing things
1207 # like "Found on [x],[y],[z]" (for healthy files that were already in
1208 # the grid), "Found on [x]" (for files that needed upload but which
1209 # discovered pre-existing shares), and "Placed on [x]" (for newly
1210 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1211 # set of binary serverid strings.
1213 # the old results are too hard to deal with (they don't even contain
1214 # as much information as the new results, since the nodeids are
1215 # abbreviated), so if we detect old results, just clobber them.
1217 sharemap = upload_results.sharemap
1218 if str in [type(v) for v in sharemap.values()]:
1219 upload_results.sharemap = None
1221 def _build_verifycap(self, upload_results):
1222 self.log("upload finished, building readcap", level=log.OPERATIONAL)
1223 self._convert_old_upload_results(upload_results)
1224 self._upload_status.set_status("Building Readcap")
1226 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1227 assert r.uri_extension_data["total_shares"] == self._total_shares
1228 assert r.uri_extension_data["segment_size"] == self._segment_size
1229 assert r.uri_extension_data["size"] == self._size
1230 r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1231 uri_extension_hash=r.uri_extension_hash,
1232 needed_shares=self._needed_shares,
1233 total_shares=self._total_shares, size=self._size
1236 r.file_size = self._size
1237 r.timings["storage_index"] = self._storage_index_elapsed
1238 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1239 if "total" in r.timings:
1240 r.timings["helper_total"] = r.timings["total"]
1241 r.timings["total"] = now - self._started
1242 self._upload_status.set_status("Finished")
1243 self._upload_status.set_results(r)
1246 def get_upload_status(self):
1247 return self._upload_status
1249 class BaseUploadable:
1250 # this is overridden by max_segment_size
1251 default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
1252 default_encoding_param_k = 3 # overridden by encoding_parameters
1253 default_encoding_param_happy = 7
1254 default_encoding_param_n = 10
1256 max_segment_size = None
1257 encoding_param_k = None
1258 encoding_param_happy = None
1259 encoding_param_n = None
1261 _all_encoding_parameters = None
1264 def set_upload_status(self, upload_status):
1265 self._status = IUploadStatus(upload_status)
1267 def set_default_encoding_parameters(self, default_params):
1268 assert isinstance(default_params, dict)
1269 for k,v in default_params.items():
1270 precondition(isinstance(k, str), k, v)
1271 precondition(isinstance(v, int), k, v)
1272 if "k" in default_params:
1273 self.default_encoding_param_k = default_params["k"]
1274 if "happy" in default_params:
1275 self.default_encoding_param_happy = default_params["happy"]
1276 if "n" in default_params:
1277 self.default_encoding_param_n = default_params["n"]
1278 if "max_segment_size" in default_params:
1279 self.default_max_segment_size = default_params["max_segment_size"]
1281 def get_all_encoding_parameters(self):
1282 if self._all_encoding_parameters:
1283 return defer.succeed(self._all_encoding_parameters)
1285 max_segsize = self.max_segment_size or self.default_max_segment_size
1286 k = self.encoding_param_k or self.default_encoding_param_k
1287 happy = self.encoding_param_happy or self.default_encoding_param_happy
1288 n = self.encoding_param_n or self.default_encoding_param_n
1291 def _got_size(file_size):
1292 # for small files, shrink the segment size to avoid wasting space
1293 segsize = min(max_segsize, file_size)
1294 # this must be a multiple of 'required_shares'==k
1295 segsize = mathutil.next_multiple(segsize, k)
1296 encoding_parameters = (k, happy, n, segsize)
1297 self._all_encoding_parameters = encoding_parameters
1298 return encoding_parameters
1299 d.addCallback(_got_size)
1302 class FileHandle(BaseUploadable):
1303 implements(IUploadable)
1305 def __init__(self, filehandle, convergence):
1307 Upload the data from the filehandle. If convergence is None then a
1308 random encryption key will be used, else the plaintext will be hashed,
1309 then the hash will be hashed together with the string in the
1310 "convergence" argument to form the encryption key.
1312 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1313 self._filehandle = filehandle
1315 self.convergence = convergence
1318 def _get_encryption_key_convergent(self):
1319 if self._key is not None:
1320 return defer.succeed(self._key)
1323 # that sets self._size as a side-effect
1324 d.addCallback(lambda size: self.get_all_encoding_parameters())
1326 k, happy, n, segsize = params
1327 f = self._filehandle
1328 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1333 data = f.read(BLOCKSIZE)
1336 enckey_hasher.update(data)
1337 # TODO: setting progress in a non-yielding loop is kind of
1338 # pointless, but I'm anticipating (perhaps prematurely) the
1339 # day when we use a slowjob or twisted's CooperatorService to
1340 # make this yield time to other jobs.
1341 bytes_read += len(data)
1343 self._status.set_progress(0, float(bytes_read)/self._size)
1345 self._key = enckey_hasher.digest()
1347 self._status.set_progress(0, 1.0)
1348 assert len(self._key) == 16
1353 def _get_encryption_key_random(self):
1354 if self._key is None:
1355 self._key = os.urandom(16)
1356 return defer.succeed(self._key)
1358 def get_encryption_key(self):
1359 if self.convergence is not None:
1360 return self._get_encryption_key_convergent()
1362 return self._get_encryption_key_random()
1365 if self._size is not None:
1366 return defer.succeed(self._size)
1367 self._filehandle.seek(0,2)
1368 size = self._filehandle.tell()
1370 self._filehandle.seek(0)
1371 return defer.succeed(size)
1373 def read(self, length):
1374 return defer.succeed([self._filehandle.read(length)])
1377 # the originator of the filehandle reserves the right to close it
1380 class FileName(FileHandle):
1381 def __init__(self, filename, convergence):
1383 Upload the data from the filename. If convergence is None then a
1384 random encryption key will be used, else the plaintext will be hashed,
1385 then the hash will be hashed together with the string in the
1386 "convergence" argument to form the encryption key.
1388 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1389 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1391 FileHandle.close(self)
1392 self._filehandle.close()
1394 class Data(FileHandle):
1395 def __init__(self, data, convergence):
1397 Upload the data from the data argument. If convergence is None then a
1398 random encryption key will be used, else the plaintext will be hashed,
1399 then the hash will be hashed together with the string in the
1400 "convergence" argument to form the encryption key.
1402 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1403 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1405 class Uploader(service.MultiService, log.PrefixingLogMixin):
1406 """I am a service that allows file uploading. I am a service-child of the
1409 implements(IUploader)
1411 URI_LIT_SIZE_THRESHOLD = 55
1413 def __init__(self, helper_furl=None, stats_provider=None):
1414 self._helper_furl = helper_furl
1415 self.stats_provider = stats_provider
1417 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1418 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1419 service.MultiService.__init__(self)
1421 def startService(self):
1422 service.MultiService.startService(self)
1423 if self._helper_furl:
1424 self.parent.tub.connectTo(self._helper_furl,
1427 def _got_helper(self, helper):
1428 self.log("got helper connection, getting versions")
1429 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1431 "application-version": "unknown: no get_version()",
1433 d = add_version_to_remote_reference(helper, default)
1434 d.addCallback(self._got_versioned_helper)
1436 def _got_versioned_helper(self, helper):
1437 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1438 if needed not in helper.version:
1439 raise InsufficientVersionError(needed, helper.version)
1440 self._helper = helper
1441 helper.notifyOnDisconnect(self._lost_helper)
1443 def _lost_helper(self):
1446 def get_helper_info(self):
1447 # return a tuple of (helper_furl_or_None, connected_bool)
1448 return (self._helper_furl, bool(self._helper))
1451 def upload(self, uploadable, history=None):
1453 Returns a Deferred that will fire with the UploadResults instance.
1458 uploadable = IUploadable(uploadable)
1459 d = uploadable.get_size()
1460 def _got_size(size):
1461 default_params = self.parent.get_encoding_parameters()
1462 precondition(isinstance(default_params, dict), default_params)
1463 precondition("max_segment_size" in default_params, default_params)
1464 uploadable.set_default_encoding_parameters(default_params)
1466 if self.stats_provider:
1467 self.stats_provider.count('uploader.files_uploaded', 1)
1468 self.stats_provider.count('uploader.bytes_uploaded', size)
1470 if size <= self.URI_LIT_SIZE_THRESHOLD:
1471 uploader = LiteralUploader()
1472 return uploader.start(uploadable)
1474 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1475 d2 = defer.succeed(None)
1477 uploader = AssistedUploader(self._helper)
1478 d2.addCallback(lambda x: eu.get_storage_index())
1479 d2.addCallback(lambda si: uploader.start(eu, si))
1481 storage_broker = self.parent.get_storage_broker()
1482 secret_holder = self.parent._secret_holder
1483 uploader = CHKUploader(storage_broker, secret_holder)
1484 d2.addCallback(lambda x: uploader.start(eu))
1486 self._all_uploads[uploader] = None
1488 history.add_upload(uploader.get_upload_status())
1489 def turn_verifycap_into_read_cap(uploadresults):
1490 # Generate the uri from the verifycap plus the key.
1491 d3 = uploadable.get_encryption_key()
1492 def put_readcap_into_results(key):
1493 v = uri.from_string(uploadresults.verifycapstr)
1494 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1495 uploadresults.uri = r.to_string()
1496 return uploadresults
1497 d3.addCallback(put_readcap_into_results)
1499 d2.addCallback(turn_verifycap_into_read_cap)
1501 d.addCallback(_got_size)