1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9 file_cancel_secret_hash, bucket_renewal_secret_hash, \
10 bucket_cancel_secret_hash, plaintext_hasher, \
11 storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17 shares_by_server, merge_servers, \
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23 NoServersError, InsufficientVersionError, UploadUnhappinessError, \
24 DEFAULT_MAX_SEGMENT_SIZE
25 from allmydata.immutable import layout
26 from pycryptopp.cipher.aes import AES
28 from cStringIO import StringIO
31 # this wants to live in storage, not here
32 class TooFullError(Exception):
35 # HelperUploadResults are what we get from the Helper, and to retain
36 # backwards compatibility with old Helpers we can't change the format. We
37 # convert them into a local UploadResults upon receipt.
38 class HelperUploadResults(Copyable, RemoteCopy):
39 # note: don't change this string, it needs to match the value used on the
40 # helper, and it does *not* need to match the fully-qualified
41 # package/module/class name
42 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
45 # also, think twice about changing the shape of any existing attribute,
46 # because instances of this class are sent from the helper to its client,
47 # so changing this may break compatibility. Consider adding new fields
48 # instead of modifying existing ones.
51 self.timings = {} # dict of name to number of seconds
52 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
53 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
55 self.ciphertext_fetched = None # how much the helper fetched
57 self.preexisting_shares = None # count of shares already present
58 self.pushed_shares = None # count of shares we pushed
61 implements(IUploadResults)
64 self.timings = {} # dict of name to number of seconds
65 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
66 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
68 self.ciphertext_fetched = None # how much the helper fetched
70 self.preexisting_shares = None # count of shares already present
71 self.pushed_shares = None # count of shares we pushed
74 # our current uri_extension is 846 bytes for small files, a few bytes
75 # more for larger ones (since the filesize is encoded in decimal in a
76 # few places). Ask for a little bit more just in case we need it. If
77 # the extension changes size, we can change EXTENSION_SIZE to
78 # allocate a more accurate amount of space.
80 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
83 def pretty_print_shnum_to_servers(s):
84 return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
87 def __init__(self, server,
88 sharesize, blocksize, num_segments, num_share_hashes,
90 bucket_renewal_secret, bucket_cancel_secret):
92 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
93 self.sharesize = sharesize
95 wbp = layout.make_write_bucket_proxy(None, None, sharesize,
96 blocksize, num_segments,
99 self.wbp_class = wbp.__class__ # to create more of them
100 self.allocated_size = wbp.get_allocated_size()
101 self.blocksize = blocksize
102 self.num_segments = num_segments
103 self.num_share_hashes = num_share_hashes
104 self.storage_index = storage_index
106 self.renew_secret = bucket_renewal_secret
107 self.cancel_secret = bucket_cancel_secret
110 return ("<ServerTracker for server %s and SI %s>"
111 % (self._server.get_name(), si_b2a(self.storage_index)[:5]))
113 def get_serverid(self):
114 return self._server.get_serverid()
116 return self._server.get_name()
118 def query(self, sharenums):
119 rref = self._server.get_rref()
120 d = rref.callRemote("allocate_buckets",
126 canary=Referenceable())
127 d.addCallback(self._got_reply)
130 def ask_about_existing_shares(self):
131 rref = self._server.get_rref()
132 return rref.callRemote("get_buckets", self.storage_index)
134 def _got_reply(self, (alreadygot, buckets)):
135 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
137 for sharenum, rref in buckets.iteritems():
138 bp = self.wbp_class(rref, self._server, self.sharesize,
141 self.num_share_hashes,
144 self.buckets.update(b)
145 return (alreadygot, set(b.keys()))
150 I abort the remote bucket writers for all shares. This is a good idea
151 to conserve space on the storage server.
153 self.abort_some_buckets(self.buckets.keys())
155 def abort_some_buckets(self, sharenums):
157 I abort the remote bucket writers for the share numbers in sharenums.
159 for sharenum in sharenums:
160 if sharenum in self.buckets:
161 self.buckets[sharenum].abort()
162 del self.buckets[sharenum]
165 def str_shareloc(shnum, bucketwriter):
166 return "%s: %s" % (shnum, bucketwriter.get_servername(),)
168 class Tahoe2ServerSelector(log.PrefixingLogMixin):
170 def __init__(self, upload_id, logparent=None, upload_status=None):
171 self.upload_id = upload_id
172 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
173 # Servers that are working normally, but full.
176 self.num_servers_contacted = 0
177 self.last_failure_msg = None
178 self._status = IUploadStatus(upload_status)
179 log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
180 self.log("starting", level=log.OPERATIONAL)
183 return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
185 def get_shareholders(self, storage_broker, secret_holder,
186 storage_index, share_size, block_size,
187 num_segments, total_shares, needed_shares,
188 servers_of_happiness):
190 @return: (upload_trackers, already_serverids), where upload_trackers
191 is a set of ServerTracker instances that have agreed to hold
192 some shares for us (the shareids are stashed inside the
193 ServerTracker), and already_serverids is a dict mapping
194 shnum to a set of serverids for servers which claim to
195 already have the share.
199 self._status.set_status("Contacting Servers..")
201 self.total_shares = total_shares
202 self.servers_of_happiness = servers_of_happiness
203 self.needed_shares = needed_shares
205 self.homeless_shares = set(range(total_shares))
206 self.use_trackers = set() # ServerTrackers that have shares assigned
208 self.preexisting_shares = {} # shareid => set(serverids) holding shareid
210 # These servers have shares -- any shares -- for our SI. We keep
211 # track of these to write an error message with them later.
212 self.serverids_with_shares = set()
214 # this needed_hashes computation should mirror
215 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
216 # (instead of a HashTree) because we don't require actual hashing
217 # just to count the levels.
218 ht = hashtree.IncompleteHashTree(total_shares)
219 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
221 # figure out how much space to ask for
222 wbp = layout.make_write_bucket_proxy(None, None,
223 share_size, 0, num_segments,
224 num_share_hashes, EXTENSION_SIZE)
225 allocated_size = wbp.get_allocated_size()
226 all_servers = storage_broker.get_servers_for_psi(storage_index)
228 raise NoServersError("client gave us zero servers")
230 # filter the list of servers according to which ones can accomodate
231 # this request. This excludes older servers (which used a 4-byte size
232 # field) from getting large shares (for files larger than about
233 # 12GiB). See #439 for details.
234 def _get_maxsize(server):
235 v0 = server.get_rref().version
236 v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
237 return v1["maximum-immutable-share-size"]
238 writeable_servers = [server for server in all_servers
239 if _get_maxsize(server) >= allocated_size]
240 readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)
242 # decide upon the renewal/cancel secrets, to include them in the
243 # allocate_buckets query.
244 client_renewal_secret = secret_holder.get_renewal_secret()
245 client_cancel_secret = secret_holder.get_cancel_secret()
247 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
249 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
251 def _make_trackers(servers):
254 seed = s.get_lease_seed()
255 renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
256 cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
257 st = ServerTracker(s,
258 share_size, block_size,
259 num_segments, num_share_hashes,
265 # We assign each servers/trackers into one three lists. They all
266 # start in the "first pass" list. During the first pass, as we ask
267 # each one to hold a share, we move their tracker to the "second
268 # pass" list, until the first-pass list is empty. Then during the
269 # second pass, as we ask each to hold more shares, we move their
270 # tracker to the "next pass" list, until the second-pass list is
271 # empty. Then we move everybody from the next-pass list back to the
272 # second-pass list and repeat the "second" pass (really the third,
273 # fourth, etc pass), until all shares are assigned, or we've run out
274 # of potential servers.
275 self.first_pass_trackers = _make_trackers(writeable_servers)
276 self.second_pass_trackers = [] # servers worth asking again
277 self.next_pass_trackers = [] # servers that we have asked again
278 self._started_second_pass = False
280 # We don't try to allocate shares to these servers, since they've
281 # said that they're incapable of storing shares of the size that we'd
282 # want to store. We ask them about existing shares for this storage
283 # index, which we want to know about for accurate
284 # servers_of_happiness accounting, then we forget about them.
285 readonly_trackers = _make_trackers(readonly_servers)
287 # We now ask servers that can't hold any new shares about existing
288 # shares that they might have for our SI. Once this is done, we
289 # start placing the shares that we haven't already accounted
292 if self._status and readonly_trackers:
293 self._status.set_status("Contacting readonly servers to find "
294 "any existing shares")
295 for tracker in readonly_trackers:
296 assert isinstance(tracker, ServerTracker)
297 d = tracker.ask_about_existing_shares()
298 d.addBoth(self._handle_existing_response, tracker)
300 self.num_servers_contacted += 1
301 self.query_count += 1
302 self.log("asking server %s for any existing shares" %
303 (tracker.get_name(),), level=log.NOISY)
304 dl = defer.DeferredList(ds)
305 dl.addCallback(lambda ign: self._loop())
309 def _handle_existing_response(self, res, tracker):
311 I handle responses to the queries sent by
312 Tahoe2ServerSelector._existing_shares.
314 serverid = tracker.get_serverid()
315 if isinstance(res, failure.Failure):
316 self.log("%s got error during existing shares check: %s"
317 % (tracker.get_name(), res), level=log.UNUSUAL)
318 self.error_count += 1
319 self.bad_query_count += 1
323 self.serverids_with_shares.add(serverid)
324 self.log("response to get_buckets() from server %s: alreadygot=%s"
325 % (tracker.get_name(), tuple(sorted(buckets))),
327 for bucket in buckets:
328 self.preexisting_shares.setdefault(bucket, set()).add(serverid)
329 self.homeless_shares.discard(bucket)
331 self.bad_query_count += 1
334 def _get_progress_message(self):
335 if not self.homeless_shares:
336 msg = "placed all %d shares, " % (self.total_shares)
338 msg = ("placed %d shares out of %d total (%d homeless), " %
339 (self.total_shares - len(self.homeless_shares),
341 len(self.homeless_shares)))
342 return (msg + "want to place shares on at least %d servers such that "
343 "any %d of them have enough shares to recover the file, "
344 "sent %d queries to %d servers, "
345 "%d queries placed some shares, %d placed none "
346 "(of which %d placed none due to the server being"
347 " full and %d placed none due to an error)" %
348 (self.servers_of_happiness, self.needed_shares,
349 self.query_count, self.num_servers_contacted,
350 self.good_query_count, self.bad_query_count,
351 self.full_count, self.error_count))
355 if not self.homeless_shares:
356 merged = merge_servers(self.preexisting_shares, self.use_trackers)
357 effective_happiness = servers_of_happiness(merged)
358 if self.servers_of_happiness <= effective_happiness:
359 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
360 "self.use_trackers: %s, self.preexisting_shares: %s") \
361 % (self, self._get_progress_message(),
362 pretty_print_shnum_to_servers(merged),
363 [', '.join([str_shareloc(k,v)
364 for k,v in st.buckets.iteritems()])
365 for st in self.use_trackers],
366 pretty_print_shnum_to_servers(self.preexisting_shares))
367 self.log(msg, level=log.OPERATIONAL)
368 return (self.use_trackers, self.preexisting_shares)
370 # We're not okay right now, but maybe we can fix it by
371 # redistributing some shares. In cases where one or two
372 # servers has, before the upload, all or most of the
373 # shares for a given SI, this can work by allowing _loop
374 # a chance to spread those out over the other servers,
375 delta = self.servers_of_happiness - effective_happiness
376 shares = shares_by_server(self.preexisting_shares)
377 # Each server in shares maps to a set of shares stored on it.
378 # Since we want to keep at least one share on each server
379 # that has one (otherwise we'd only be making
380 # the situation worse by removing distinct servers),
381 # each server has len(its shares) - 1 to spread around.
382 shares_to_spread = sum([len(list(sharelist)) - 1
383 for (server, sharelist)
385 if delta <= len(self.first_pass_trackers) and \
386 shares_to_spread >= delta:
387 items = shares.items()
388 while len(self.homeless_shares) < delta:
389 # Loop through the allocated shares, removing
390 # one from each server that has more than one
391 # and putting it back into self.homeless_shares
392 # until we've done this delta times.
393 server, sharelist = items.pop()
394 if len(sharelist) > 1:
395 share = sharelist.pop()
396 self.homeless_shares.add(share)
397 self.preexisting_shares[share].remove(server)
398 if not self.preexisting_shares[share]:
399 del self.preexisting_shares[share]
400 items.append((server, sharelist))
401 for writer in self.use_trackers:
402 writer.abort_some_buckets(self.homeless_shares)
405 # Redistribution won't help us; fail.
406 server_count = len(self.serverids_with_shares)
407 failmsg = failure_message(server_count,
409 self.servers_of_happiness,
411 servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
412 servmsg = servmsgtempl % (
415 self._get_progress_message(),
416 pretty_print_shnum_to_servers(merged)
418 self.log(servmsg, level=log.INFREQUENT)
419 return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
421 if self.first_pass_trackers:
422 tracker = self.first_pass_trackers.pop(0)
423 # TODO: don't pre-convert all serverids to ServerTrackers
424 assert isinstance(tracker, ServerTracker)
426 shares_to_ask = set(sorted(self.homeless_shares)[:1])
427 self.homeless_shares -= shares_to_ask
428 self.query_count += 1
429 self.num_servers_contacted += 1
431 self._status.set_status("Contacting Servers [%s] (first query),"
433 % (tracker.get_name(),
434 len(self.homeless_shares)))
435 d = tracker.query(shares_to_ask)
436 d.addBoth(self._got_response, tracker, shares_to_ask,
437 self.second_pass_trackers)
439 elif self.second_pass_trackers:
440 # ask a server that we've already asked.
441 if not self._started_second_pass:
442 self.log("starting second pass",
444 self._started_second_pass = True
445 num_shares = mathutil.div_ceil(len(self.homeless_shares),
446 len(self.second_pass_trackers))
447 tracker = self.second_pass_trackers.pop(0)
448 shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
449 self.homeless_shares -= shares_to_ask
450 self.query_count += 1
452 self._status.set_status("Contacting Servers [%s] (second query),"
454 % (tracker.get_name(),
455 len(self.homeless_shares)))
456 d = tracker.query(shares_to_ask)
457 d.addBoth(self._got_response, tracker, shares_to_ask,
458 self.next_pass_trackers)
460 elif self.next_pass_trackers:
461 # we've finished the second-or-later pass. Move all the remaining
462 # servers back into self.second_pass_trackers for the next pass.
463 self.second_pass_trackers.extend(self.next_pass_trackers)
464 self.next_pass_trackers[:] = []
467 # no more servers. If we haven't placed enough shares, we fail.
468 merged = merge_servers(self.preexisting_shares, self.use_trackers)
469 effective_happiness = servers_of_happiness(merged)
470 if effective_happiness < self.servers_of_happiness:
471 msg = failure_message(len(self.serverids_with_shares),
473 self.servers_of_happiness,
475 msg = ("server selection failed for %s: %s (%s)" %
476 (self, msg, self._get_progress_message()))
477 if self.last_failure_msg:
478 msg += " (%s)" % (self.last_failure_msg,)
479 self.log(msg, level=log.UNUSUAL)
480 return self._failed(msg)
482 # we placed enough to be happy, so we're done
484 self._status.set_status("Placed all shares")
485 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
486 self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
487 self.log(msg, level=log.OPERATIONAL)
488 return (self.use_trackers, self.preexisting_shares)
490 def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
491 if isinstance(res, failure.Failure):
492 # This is unusual, and probably indicates a bug or a network
494 self.log("%s got error during server selection: %s" % (tracker, res),
496 self.error_count += 1
497 self.bad_query_count += 1
498 self.homeless_shares |= shares_to_ask
499 if (self.first_pass_trackers
500 or self.second_pass_trackers
501 or self.next_pass_trackers):
502 # there is still hope, so just loop
505 # No more servers, so this upload might fail (it depends upon
506 # whether we've hit servers_of_happiness or not). Log the last
507 # failure we got: if a coding error causes all servers to fail
508 # in the same way, this allows the common failure to be seen
509 # by the uploader and should help with debugging
510 msg = ("last failure (from %s) was: %s" % (tracker, res))
511 self.last_failure_msg = msg
513 (alreadygot, allocated) = res
514 self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
515 % (tracker.get_name(),
516 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
520 self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
521 if s in self.homeless_shares:
522 self.homeless_shares.remove(s)
524 elif s in shares_to_ask:
527 # the ServerTracker will remember which shares were allocated on
528 # that peer. We just have to remember to use them.
530 self.use_trackers.add(tracker)
533 if allocated or alreadygot:
534 self.serverids_with_shares.add(tracker.get_serverid())
536 not_yet_present = set(shares_to_ask) - set(alreadygot)
537 still_homeless = not_yet_present - set(allocated)
540 # They accepted at least one of the shares that we asked
541 # them to accept, or they had a share that we didn't ask
542 # them to accept but that we hadn't placed yet, so this
543 # was a productive query
544 self.good_query_count += 1
546 self.bad_query_count += 1
550 # In networks with lots of space, this is very unusual and
551 # probably indicates an error. In networks with servers that
552 # are full, it is merely unusual. In networks that are very
553 # full, it is common, and many uploads will fail. In most
554 # cases, this is obviously not fatal, and we'll just use some
557 # some shares are still homeless, keep trying to find them a
558 # home. The ones that were rejected get first priority.
559 self.homeless_shares |= still_homeless
560 # Since they were unable to accept all of our requests, so it
561 # is safe to assume that asking them again won't help.
563 # if they *were* able to accept everything, they might be
564 # willing to accept even more.
565 put_tracker_here.append(tracker)
571 def _failed(self, msg):
573 I am called when server selection fails. I first abort all of the
574 remote buckets that I allocated during my unsuccessful attempt to
575 place shares for this file. I then raise an
576 UploadUnhappinessError with my msg argument.
578 for tracker in self.use_trackers:
579 assert isinstance(tracker, ServerTracker)
581 raise UploadUnhappinessError(msg)
584 class EncryptAnUploadable:
585 """This is a wrapper that takes an IUploadable and provides
586 IEncryptedUploadable."""
587 implements(IEncryptedUploadable)
590 def __init__(self, original, log_parent=None):
591 self.original = IUploadable(original)
592 self._log_number = log_parent
593 self._encryptor = None
594 self._plaintext_hasher = plaintext_hasher()
595 self._plaintext_segment_hasher = None
596 self._plaintext_segment_hashes = []
597 self._encoding_parameters = None
598 self._file_size = None
599 self._ciphertext_bytes_read = 0
602 def set_upload_status(self, upload_status):
603 self._status = IUploadStatus(upload_status)
604 self.original.set_upload_status(upload_status)
606 def log(self, *args, **kwargs):
607 if "facility" not in kwargs:
608 kwargs["facility"] = "upload.encryption"
609 if "parent" not in kwargs:
610 kwargs["parent"] = self._log_number
611 return log.msg(*args, **kwargs)
614 if self._file_size is not None:
615 return defer.succeed(self._file_size)
616 d = self.original.get_size()
618 self._file_size = size
620 self._status.set_size(size)
622 d.addCallback(_got_size)
625 def get_all_encoding_parameters(self):
626 if self._encoding_parameters is not None:
627 return defer.succeed(self._encoding_parameters)
628 d = self.original.get_all_encoding_parameters()
629 def _got(encoding_parameters):
630 (k, happy, n, segsize) = encoding_parameters
631 self._segment_size = segsize # used by segment hashers
632 self._encoding_parameters = encoding_parameters
633 self.log("my encoding parameters: %s" % (encoding_parameters,),
635 return encoding_parameters
639 def _get_encryptor(self):
641 return defer.succeed(self._encryptor)
643 d = self.original.get_encryption_key()
648 storage_index = storage_index_hash(key)
649 assert isinstance(storage_index, str)
650 # There's no point to having the SI be longer than the key, so we
651 # specify that it is truncated to the same 128 bits as the AES key.
652 assert len(storage_index) == 16 # SHA-256 truncated to 128b
653 self._storage_index = storage_index
655 self._status.set_storage_index(storage_index)
660 def get_storage_index(self):
661 d = self._get_encryptor()
662 d.addCallback(lambda res: self._storage_index)
665 def _get_segment_hasher(self):
666 p = self._plaintext_segment_hasher
668 left = self._segment_size - self._plaintext_segment_hashed_bytes
670 p = plaintext_segment_hasher()
671 self._plaintext_segment_hasher = p
672 self._plaintext_segment_hashed_bytes = 0
673 return p, self._segment_size
675 def _update_segment_hash(self, chunk):
677 while offset < len(chunk):
678 p, segment_left = self._get_segment_hasher()
679 chunk_left = len(chunk) - offset
680 this_segment = min(chunk_left, segment_left)
681 p.update(chunk[offset:offset+this_segment])
682 self._plaintext_segment_hashed_bytes += this_segment
684 if self._plaintext_segment_hashed_bytes == self._segment_size:
685 # we've filled this segment
686 self._plaintext_segment_hashes.append(p.digest())
687 self._plaintext_segment_hasher = None
688 self.log("closed hash [%d]: %dB" %
689 (len(self._plaintext_segment_hashes)-1,
690 self._plaintext_segment_hashed_bytes),
692 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
693 segnum=len(self._plaintext_segment_hashes)-1,
694 hash=base32.b2a(p.digest()),
697 offset += this_segment
700 def read_encrypted(self, length, hash_only):
701 # make sure our parameters have been set up first
702 d = self.get_all_encoding_parameters()
704 d.addCallback(lambda ignored: self.get_size())
705 d.addCallback(lambda ignored: self._get_encryptor())
706 # then fetch and encrypt the plaintext. The unusual structure here
707 # (passing a Deferred *into* a function) is needed to avoid
708 # overflowing the stack: Deferreds don't optimize out tail recursion.
709 # We also pass in a list, to which _read_encrypted will append
712 d2 = defer.Deferred()
713 d.addCallback(lambda ignored:
714 self._read_encrypted(length, ciphertext, hash_only, d2))
715 d.addCallback(lambda ignored: d2)
718 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
720 fire_when_done.callback(ciphertext)
722 # tolerate large length= values without consuming a lot of RAM by
723 # reading just a chunk (say 50kB) at a time. This only really matters
724 # when hash_only==True (i.e. resuming an interrupted upload), since
725 # that's the case where we will be skipping over a lot of data.
726 size = min(remaining, self.CHUNKSIZE)
727 remaining = remaining - size
728 # read a chunk of plaintext..
729 d = defer.maybeDeferred(self.original.read, size)
730 # N.B.: if read() is synchronous, then since everything else is
731 # actually synchronous too, we'd blow the stack unless we stall for a
732 # tick. Once you accept a Deferred from IUploadable.read(), you must
733 # be prepared to have it fire immediately too.
734 d.addCallback(fireEventually)
735 def _good(plaintext):
737 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
738 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
739 ciphertext.extend(ct)
740 self._read_encrypted(remaining, ciphertext, hash_only,
743 fire_when_done.errback(why)
748 def _hash_and_encrypt_plaintext(self, data, hash_only):
749 assert isinstance(data, (tuple, list)), type(data)
752 # we use data.pop(0) instead of 'for chunk in data' to save
753 # memory: each chunk is destroyed as soon as we're done with it.
757 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
759 bytes_processed += len(chunk)
760 self._plaintext_hasher.update(chunk)
761 self._update_segment_hash(chunk)
762 # TODO: we have to encrypt the data (even if hash_only==True)
763 # because pycryptopp's AES-CTR implementation doesn't offer a
764 # way to change the counter value. Once pycryptopp acquires
765 # this ability, change this to simply update the counter
766 # before each call to (hash_only==False) _encryptor.process()
767 ciphertext = self._encryptor.process(chunk)
769 self.log(" skipping encryption", level=log.NOISY)
771 cryptdata.append(ciphertext)
774 self._ciphertext_bytes_read += bytes_processed
776 progress = float(self._ciphertext_bytes_read) / self._file_size
777 self._status.set_progress(1, progress)
781 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
782 # this is currently unused, but will live again when we fix #453
783 if len(self._plaintext_segment_hashes) < num_segments:
784 # close out the last one
785 assert len(self._plaintext_segment_hashes) == num_segments-1
786 p, segment_left = self._get_segment_hasher()
787 self._plaintext_segment_hashes.append(p.digest())
788 del self._plaintext_segment_hasher
789 self.log("closing plaintext leaf hasher, hashed %d bytes" %
790 self._plaintext_segment_hashed_bytes,
792 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
793 segnum=len(self._plaintext_segment_hashes)-1,
794 hash=base32.b2a(p.digest()),
796 assert len(self._plaintext_segment_hashes) == num_segments
797 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
799 def get_plaintext_hash(self):
800 h = self._plaintext_hasher.digest()
801 return defer.succeed(h)
804 return self.original.close()
807 implements(IUploadStatus)
808 statusid_counter = itertools.count(0)
811 self.storage_index = None
814 self.status = "Not started"
815 self.progress = [0.0, 0.0, 0.0]
818 self.counter = self.statusid_counter.next()
819 self.started = time.time()
821 def get_started(self):
823 def get_storage_index(self):
824 return self.storage_index
827 def using_helper(self):
829 def get_status(self):
831 def get_progress(self):
832 return tuple(self.progress)
833 def get_active(self):
835 def get_results(self):
837 def get_counter(self):
840 def set_storage_index(self, si):
841 self.storage_index = si
842 def set_size(self, size):
844 def set_helper(self, helper):
846 def set_status(self, status):
848 def set_progress(self, which, value):
849 # [0]: chk, [1]: ciphertext, [2]: encode+push
850 self.progress[which] = value
851 def set_active(self, value):
853 def set_results(self, value):
857 server_selector_class = Tahoe2ServerSelector
859 def __init__(self, storage_broker, secret_holder):
860 # server_selector needs storage_broker and secret_holder
861 self._storage_broker = storage_broker
862 self._secret_holder = secret_holder
863 self._log_number = self.log("CHKUploader starting", parent=None)
865 self._storage_index = None
866 self._upload_status = UploadStatus()
867 self._upload_status.set_helper(False)
868 self._upload_status.set_active(True)
870 # locate_all_shareholders() will create the following attribute:
871 # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
873 def log(self, *args, **kwargs):
874 if "parent" not in kwargs:
875 kwargs["parent"] = self._log_number
876 if "facility" not in kwargs:
877 kwargs["facility"] = "tahoe.upload"
878 return log.msg(*args, **kwargs)
880 def start(self, encrypted_uploadable):
881 """Start uploading the file.
883 Returns a Deferred that will fire with the UploadResults instance.
886 self._started = time.time()
887 eu = IEncryptedUploadable(encrypted_uploadable)
888 self.log("starting upload of %s" % eu)
890 eu.set_upload_status(self._upload_status)
891 d = self.start_encrypted(eu)
892 def _done(uploadresults):
893 self._upload_status.set_active(False)
899 """Call this if the upload must be abandoned before it completes.
900 This will tell the shareholders to delete their partial shares. I
901 return a Deferred that fires when these messages have been acked."""
902 if not self._encoder:
903 # how did you call abort() before calling start() ?
904 return defer.succeed(None)
905 return self._encoder.abort()
907 def start_encrypted(self, encrypted):
908 """ Returns a Deferred that will fire with the UploadResults instance. """
909 eu = IEncryptedUploadable(encrypted)
911 started = time.time()
912 self._encoder = e = encode.Encoder(self._log_number,
914 d = e.set_encrypted_uploadable(eu)
915 d.addCallback(self.locate_all_shareholders, started)
916 d.addCallback(self.set_shareholders, e)
917 d.addCallback(lambda res: e.start())
918 d.addCallback(self._encrypted_done)
921 def locate_all_shareholders(self, encoder, started):
922 server_selection_started = now = time.time()
923 self._storage_index_elapsed = now - started
924 storage_broker = self._storage_broker
925 secret_holder = self._secret_holder
926 storage_index = encoder.get_param("storage_index")
927 self._storage_index = storage_index
928 upload_id = si_b2a(storage_index)[:5]
929 self.log("using storage index %s" % upload_id)
930 server_selector = self.server_selector_class(upload_id,
934 share_size = encoder.get_param("share_size")
935 block_size = encoder.get_param("block_size")
936 num_segments = encoder.get_param("num_segments")
937 k,desired,n = encoder.get_param("share_counts")
939 self._server_selection_started = time.time()
940 d = server_selector.get_shareholders(storage_broker, secret_holder,
942 share_size, block_size,
943 num_segments, n, k, desired)
945 self._server_selection_elapsed = time.time() - server_selection_started
950 def set_shareholders(self, (upload_trackers, already_serverids), encoder):
952 @param upload_trackers: a sequence of ServerTracker objects that
953 have agreed to hold some shares for us (the
954 shareids are stashed inside the ServerTracker)
956 @paran already_serverids: a dict mapping sharenum to a set of
957 serverids for servers that claim to already
960 msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
961 values = ([', '.join([str_shareloc(k,v)
962 for k,v in st.buckets.iteritems()])
963 for st in upload_trackers], already_serverids)
964 self.log(msgtempl % values, level=log.OPERATIONAL)
965 # record already-present shares in self._results
966 self._count_preexisting_shares = len(already_serverids)
968 self._server_trackers = {} # k: shnum, v: instance of ServerTracker
969 for tracker in upload_trackers:
970 assert isinstance(tracker, ServerTracker)
972 servermap = already_serverids.copy()
973 for tracker in upload_trackers:
974 buckets.update(tracker.buckets)
975 for shnum in tracker.buckets:
976 self._server_trackers[shnum] = tracker
977 servermap.setdefault(shnum, set()).add(tracker.get_serverid())
978 assert len(buckets) == sum([len(tracker.buckets)
979 for tracker in upload_trackers]), \
980 "%s (%s) != %s (%s)" % (
983 sum([len(tracker.buckets) for tracker in upload_trackers]),
984 [(t.buckets, t.get_serverid()) for t in upload_trackers]
986 encoder.set_shareholders(buckets, servermap)
988 def _encrypted_done(self, verifycap):
989 """ Returns a Deferred that will fire with the UploadResults instance. """
991 for shnum in self._encoder.get_shares_placed():
992 server_tracker = self._server_trackers[shnum]
993 serverid = server_tracker.get_serverid()
994 r.sharemap.add(shnum, serverid)
995 r.servermap.add(serverid, shnum)
996 r.preexisting_shares = self._count_preexisting_shares
997 r.pushed_shares = len(self._encoder.get_shares_placed())
999 r.file_size = self._encoder.file_size
1000 r.timings["total"] = now - self._started
1001 r.timings["storage_index"] = self._storage_index_elapsed
1002 r.timings["peer_selection"] = self._server_selection_elapsed
1003 r.timings.update(self._encoder.get_times())
1004 r.uri_extension_data = self._encoder.get_uri_extension_data()
1005 r.verifycapstr = verifycap.to_string()
1006 self._upload_status.set_results(r)
1009 def get_upload_status(self):
1010 return self._upload_status
1012 def read_this_many_bytes(uploadable, size, prepend_data=[]):
1014 return defer.succeed([])
1015 d = uploadable.read(size)
1017 assert isinstance(data, list)
1018 bytes = sum([len(piece) for piece in data])
1020 assert bytes <= size
1021 remaining = size - bytes
1023 return read_this_many_bytes(uploadable, remaining,
1024 prepend_data + data)
1025 return prepend_data + data
1029 class LiteralUploader:
1032 self._results = UploadResults()
1033 self._status = s = UploadStatus()
1034 s.set_storage_index(None)
1036 s.set_progress(0, 1.0)
1038 s.set_results(self._results)
1040 def start(self, uploadable):
1041 uploadable = IUploadable(uploadable)
1042 d = uploadable.get_size()
1043 def _got_size(size):
1045 self._status.set_size(size)
1046 self._results.file_size = size
1047 return read_this_many_bytes(uploadable, size)
1048 d.addCallback(_got_size)
1049 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
1050 d.addCallback(lambda u: u.to_string())
1051 d.addCallback(self._build_results)
1054 def _build_results(self, uri):
1055 self._results.uri = uri
1056 self._status.set_status("Finished")
1057 self._status.set_progress(1, 1.0)
1058 self._status.set_progress(2, 1.0)
1059 return self._results
1064 def get_upload_status(self):
1067 class RemoteEncryptedUploadable(Referenceable):
1068 implements(RIEncryptedUploadable)
1070 def __init__(self, encrypted_uploadable, upload_status):
1071 self._eu = IEncryptedUploadable(encrypted_uploadable)
1073 self._bytes_sent = 0
1074 self._status = IUploadStatus(upload_status)
1075 # we are responsible for updating the status string while we run, and
1076 # for setting the ciphertext-fetch progress.
1080 if self._size is not None:
1081 return defer.succeed(self._size)
1082 d = self._eu.get_size()
1083 def _got_size(size):
1086 d.addCallback(_got_size)
1089 def remote_get_size(self):
1090 return self.get_size()
1091 def remote_get_all_encoding_parameters(self):
1092 return self._eu.get_all_encoding_parameters()
1094 def _read_encrypted(self, length, hash_only):
1095 d = self._eu.read_encrypted(length, hash_only)
1098 self._offset += length
1100 size = sum([len(data) for data in strings])
1101 self._offset += size
1103 d.addCallback(_read)
1106 def remote_read_encrypted(self, offset, length):
1107 # we don't support seek backwards, but we allow skipping forwards
1108 precondition(offset >= 0, offset)
1109 precondition(length >= 0, length)
1110 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1112 precondition(offset >= self._offset, offset, self._offset)
1113 if offset > self._offset:
1114 # read the data from disk anyways, to build up the hash tree
1115 skip = offset - self._offset
1116 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1117 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1118 d = self._read_encrypted(skip, hash_only=True)
1120 d = defer.succeed(None)
1122 def _at_correct_offset(res):
1123 assert offset == self._offset, "%d != %d" % (offset, self._offset)
1124 return self._read_encrypted(length, hash_only=False)
1125 d.addCallback(_at_correct_offset)
1128 size = sum([len(data) for data in strings])
1129 self._bytes_sent += size
1131 d.addCallback(_read)
1134 def remote_close(self):
1135 return self._eu.close()
1138 class AssistedUploader:
1140 def __init__(self, helper):
1141 self._helper = helper
1142 self._log_number = log.msg("AssistedUploader starting")
1143 self._storage_index = None
1144 self._upload_status = s = UploadStatus()
1148 def log(self, *args, **kwargs):
1149 if "parent" not in kwargs:
1150 kwargs["parent"] = self._log_number
1151 return log.msg(*args, **kwargs)
1153 def start(self, encrypted_uploadable, storage_index):
1154 """Start uploading the file.
1156 Returns a Deferred that will fire with the UploadResults instance.
1158 precondition(isinstance(storage_index, str), storage_index)
1159 self._started = time.time()
1160 eu = IEncryptedUploadable(encrypted_uploadable)
1161 eu.set_upload_status(self._upload_status)
1162 self._encuploadable = eu
1163 self._storage_index = storage_index
1165 d.addCallback(self._got_size)
1166 d.addCallback(lambda res: eu.get_all_encoding_parameters())
1167 d.addCallback(self._got_all_encoding_parameters)
1168 d.addCallback(self._contact_helper)
1169 d.addCallback(self._build_verifycap)
1171 self._upload_status.set_active(False)
1176 def _got_size(self, size):
1178 self._upload_status.set_size(size)
1180 def _got_all_encoding_parameters(self, params):
1181 k, happy, n, segment_size = params
1182 # stash these for URI generation later
1183 self._needed_shares = k
1184 self._total_shares = n
1185 self._segment_size = segment_size
1187 def _contact_helper(self, res):
1188 now = self._time_contacting_helper_start = time.time()
1189 self._storage_index_elapsed = now - self._started
1190 self.log(format="contacting helper for SI %(si)s..",
1191 si=si_b2a(self._storage_index), level=log.NOISY)
1192 self._upload_status.set_status("Contacting Helper")
1193 d = self._helper.callRemote("upload_chk", self._storage_index)
1194 d.addCallback(self._contacted_helper)
1197 def _contacted_helper(self, (helper_upload_results, upload_helper)):
1199 elapsed = now - self._time_contacting_helper_start
1200 self._elapsed_time_contacting_helper = elapsed
1202 self.log("helper says we need to upload", level=log.NOISY)
1203 self._upload_status.set_status("Uploading Ciphertext")
1204 # we need to upload the file
1205 reu = RemoteEncryptedUploadable(self._encuploadable,
1206 self._upload_status)
1207 # let it pre-compute the size for progress purposes
1209 d.addCallback(lambda ignored:
1210 upload_helper.callRemote("upload", reu))
1211 # this Deferred will fire with the upload results
1213 self.log("helper says file is already uploaded", level=log.OPERATIONAL)
1214 self._upload_status.set_progress(1, 1.0)
1215 return helper_upload_results
1217 def _convert_old_upload_results(self, upload_results):
1218 # pre-1.3.0 helpers return upload results which contain a mapping
1219 # from shnum to a single human-readable string, containing things
1220 # like "Found on [x],[y],[z]" (for healthy files that were already in
1221 # the grid), "Found on [x]" (for files that needed upload but which
1222 # discovered pre-existing shares), and "Placed on [x]" (for newly
1223 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1224 # set of binary serverid strings.
1226 # the old results are too hard to deal with (they don't even contain
1227 # as much information as the new results, since the nodeids are
1228 # abbreviated), so if we detect old results, just clobber them.
1230 sharemap = upload_results.sharemap
1231 if str in [type(v) for v in sharemap.values()]:
1232 upload_results.sharemap = None
1234 def _build_verifycap(self, helper_upload_results):
1235 self.log("upload finished, building readcap", level=log.OPERATIONAL)
1236 self._convert_old_upload_results(helper_upload_results)
1237 self._upload_status.set_status("Building Readcap")
1238 hur = helper_upload_results
1239 assert hur.uri_extension_data["needed_shares"] == self._needed_shares
1240 assert hur.uri_extension_data["total_shares"] == self._total_shares
1241 assert hur.uri_extension_data["segment_size"] == self._segment_size
1242 assert hur.uri_extension_data["size"] == self._size
1243 ur = UploadResults()
1244 # hur.verifycap doesn't exist if already found
1245 v = uri.CHKFileVerifierURI(self._storage_index,
1246 uri_extension_hash=hur.uri_extension_hash,
1247 needed_shares=self._needed_shares,
1248 total_shares=self._total_shares,
1250 ur.verifycapstr = v.to_string()
1251 ur.timings = hur.timings
1252 ur.uri_extension_data = hur.uri_extension_data
1253 ur.uri_extension_hash = hur.uri_extension_hash
1254 ur.preexisting_shares = hur.preexisting_shares
1255 ur.pushed_shares = hur.pushed_shares
1256 ur.sharemap = hur.sharemap
1257 ur.servermap = hur.servermap # not if already found
1258 ur.ciphertext_fetched = hur.ciphertext_fetched # not if already found
1260 ur.file_size = self._size
1261 ur.timings["storage_index"] = self._storage_index_elapsed
1262 ur.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1263 if "total" in ur.timings:
1264 ur.timings["helper_total"] = ur.timings["total"]
1265 ur.timings["total"] = now - self._started
1266 self._upload_status.set_status("Finished")
1267 self._upload_status.set_results(ur)
1270 def get_upload_status(self):
1271 return self._upload_status
1273 class BaseUploadable:
1274 # this is overridden by max_segment_size
1275 default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
1276 default_encoding_param_k = 3 # overridden by encoding_parameters
1277 default_encoding_param_happy = 7
1278 default_encoding_param_n = 10
1280 max_segment_size = None
1281 encoding_param_k = None
1282 encoding_param_happy = None
1283 encoding_param_n = None
1285 _all_encoding_parameters = None
1288 def set_upload_status(self, upload_status):
1289 self._status = IUploadStatus(upload_status)
1291 def set_default_encoding_parameters(self, default_params):
1292 assert isinstance(default_params, dict)
1293 for k,v in default_params.items():
1294 precondition(isinstance(k, str), k, v)
1295 precondition(isinstance(v, int), k, v)
1296 if "k" in default_params:
1297 self.default_encoding_param_k = default_params["k"]
1298 if "happy" in default_params:
1299 self.default_encoding_param_happy = default_params["happy"]
1300 if "n" in default_params:
1301 self.default_encoding_param_n = default_params["n"]
1302 if "max_segment_size" in default_params:
1303 self.default_max_segment_size = default_params["max_segment_size"]
1305 def get_all_encoding_parameters(self):
1306 if self._all_encoding_parameters:
1307 return defer.succeed(self._all_encoding_parameters)
1309 max_segsize = self.max_segment_size or self.default_max_segment_size
1310 k = self.encoding_param_k or self.default_encoding_param_k
1311 happy = self.encoding_param_happy or self.default_encoding_param_happy
1312 n = self.encoding_param_n or self.default_encoding_param_n
1315 def _got_size(file_size):
1316 # for small files, shrink the segment size to avoid wasting space
1317 segsize = min(max_segsize, file_size)
1318 # this must be a multiple of 'required_shares'==k
1319 segsize = mathutil.next_multiple(segsize, k)
1320 encoding_parameters = (k, happy, n, segsize)
1321 self._all_encoding_parameters = encoding_parameters
1322 return encoding_parameters
1323 d.addCallback(_got_size)
1326 class FileHandle(BaseUploadable):
1327 implements(IUploadable)
1329 def __init__(self, filehandle, convergence):
1331 Upload the data from the filehandle. If convergence is None then a
1332 random encryption key will be used, else the plaintext will be hashed,
1333 then the hash will be hashed together with the string in the
1334 "convergence" argument to form the encryption key.
1336 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1337 self._filehandle = filehandle
1339 self.convergence = convergence
1342 def _get_encryption_key_convergent(self):
1343 if self._key is not None:
1344 return defer.succeed(self._key)
1347 # that sets self._size as a side-effect
1348 d.addCallback(lambda size: self.get_all_encoding_parameters())
1350 k, happy, n, segsize = params
1351 f = self._filehandle
1352 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1357 data = f.read(BLOCKSIZE)
1360 enckey_hasher.update(data)
1361 # TODO: setting progress in a non-yielding loop is kind of
1362 # pointless, but I'm anticipating (perhaps prematurely) the
1363 # day when we use a slowjob or twisted's CooperatorService to
1364 # make this yield time to other jobs.
1365 bytes_read += len(data)
1367 self._status.set_progress(0, float(bytes_read)/self._size)
1369 self._key = enckey_hasher.digest()
1371 self._status.set_progress(0, 1.0)
1372 assert len(self._key) == 16
1377 def _get_encryption_key_random(self):
1378 if self._key is None:
1379 self._key = os.urandom(16)
1380 return defer.succeed(self._key)
1382 def get_encryption_key(self):
1383 if self.convergence is not None:
1384 return self._get_encryption_key_convergent()
1386 return self._get_encryption_key_random()
1389 if self._size is not None:
1390 return defer.succeed(self._size)
1391 self._filehandle.seek(0, os.SEEK_END)
1392 size = self._filehandle.tell()
1394 self._filehandle.seek(0)
1395 return defer.succeed(size)
1397 def read(self, length):
1398 return defer.succeed([self._filehandle.read(length)])
1401 # the originator of the filehandle reserves the right to close it
1404 class FileName(FileHandle):
1405 def __init__(self, filename, convergence):
1407 Upload the data from the filename. If convergence is None then a
1408 random encryption key will be used, else the plaintext will be hashed,
1409 then the hash will be hashed together with the string in the
1410 "convergence" argument to form the encryption key.
1412 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1413 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1415 FileHandle.close(self)
1416 self._filehandle.close()
1418 class Data(FileHandle):
1419 def __init__(self, data, convergence):
1421 Upload the data from the data argument. If convergence is None then a
1422 random encryption key will be used, else the plaintext will be hashed,
1423 then the hash will be hashed together with the string in the
1424 "convergence" argument to form the encryption key.
1426 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1427 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1429 class Uploader(service.MultiService, log.PrefixingLogMixin):
1430 """I am a service that allows file uploading. I am a service-child of the
1433 implements(IUploader)
1435 URI_LIT_SIZE_THRESHOLD = 55
1437 def __init__(self, helper_furl=None, stats_provider=None, history=None):
1438 self._helper_furl = helper_furl
1439 self.stats_provider = stats_provider
1440 self._history = history
1442 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1443 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1444 service.MultiService.__init__(self)
1446 def startService(self):
1447 service.MultiService.startService(self)
1448 if self._helper_furl:
1449 self.parent.tub.connectTo(self._helper_furl,
1452 def _got_helper(self, helper):
1453 self.log("got helper connection, getting versions")
1454 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1456 "application-version": "unknown: no get_version()",
1458 d = add_version_to_remote_reference(helper, default)
1459 d.addCallback(self._got_versioned_helper)
1461 def _got_versioned_helper(self, helper):
1462 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1463 if needed not in helper.version:
1464 raise InsufficientVersionError(needed, helper.version)
1465 self._helper = helper
1466 helper.notifyOnDisconnect(self._lost_helper)
1468 def _lost_helper(self):
1471 def get_helper_info(self):
1472 # return a tuple of (helper_furl_or_None, connected_bool)
1473 return (self._helper_furl, bool(self._helper))
1476 def upload(self, uploadable):
1478 Returns a Deferred that will fire with the UploadResults instance.
1483 uploadable = IUploadable(uploadable)
1484 d = uploadable.get_size()
1485 def _got_size(size):
1486 default_params = self.parent.get_encoding_parameters()
1487 precondition(isinstance(default_params, dict), default_params)
1488 precondition("max_segment_size" in default_params, default_params)
1489 uploadable.set_default_encoding_parameters(default_params)
1491 if self.stats_provider:
1492 self.stats_provider.count('uploader.files_uploaded', 1)
1493 self.stats_provider.count('uploader.bytes_uploaded', size)
1495 if size <= self.URI_LIT_SIZE_THRESHOLD:
1496 uploader = LiteralUploader()
1497 return uploader.start(uploadable)
1499 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1500 d2 = defer.succeed(None)
1502 uploader = AssistedUploader(self._helper)
1503 d2.addCallback(lambda x: eu.get_storage_index())
1504 d2.addCallback(lambda si: uploader.start(eu, si))
1506 storage_broker = self.parent.get_storage_broker()
1507 secret_holder = self.parent._secret_holder
1508 uploader = CHKUploader(storage_broker, secret_holder)
1509 d2.addCallback(lambda x: uploader.start(eu))
1511 self._all_uploads[uploader] = None
1513 self._history.add_upload(uploader.get_upload_status())
1514 def turn_verifycap_into_read_cap(uploadresults):
1515 # Generate the uri from the verifycap plus the key.
1516 d3 = uploadable.get_encryption_key()
1517 def put_readcap_into_results(key):
1518 v = uri.from_string(uploadresults.verifycapstr)
1519 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1520 uploadresults.uri = r.to_string()
1521 return uploadresults
1522 d3.addCallback(put_readcap_into_results)
1524 d2.addCallback(turn_verifycap_into_read_cap)
1526 d.addCallback(_got_size)