1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9 file_cancel_secret_hash, bucket_renewal_secret_hash, \
10 bucket_cancel_secret_hash, plaintext_hasher, \
11 storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17 shares_by_server, merge_servers, \
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23 NoServersError, InsufficientVersionError, UploadUnhappinessError, \
24 DEFAULT_MAX_SEGMENT_SIZE
25 from allmydata.immutable import layout
26 from pycryptopp.cipher.aes import AES
28 from cStringIO import StringIO
31 # this wants to live in storage, not here
32 class TooFullError(Exception):
35 # HelperUploadResults are what we get from the Helper, and to retain
36 # backwards compatibility with old Helpers we can't change the format. We
37 # convert them into a local UploadResults upon receipt.
38 class HelperUploadResults(Copyable, RemoteCopy):
39 # note: don't change this string, it needs to match the value used on the
40 # helper, and it does *not* need to match the fully-qualified
41 # package/module/class name
42 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
45 # also, think twice about changing the shape of any existing attribute,
46 # because instances of this class are sent from the helper to its client,
47 # so changing this may break compatibility. Consider adding new fields
48 # instead of modifying existing ones.
51 self.timings = {} # dict of name to number of seconds
52 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
53 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
55 self.ciphertext_fetched = None # how much the helper fetched
57 self.preexisting_shares = None # count of shares already present
58 self.pushed_shares = None # count of shares we pushed
61 implements(IUploadResults)
63 def __init__(self, file_size,
64 ciphertext_fetched, # how much the helper fetched
65 preexisting_shares, # count of shares already present
66 pushed_shares, # count of shares we pushed
67 sharemap, # {shnum: set(serverid)}
68 servermap, # {serverid: set(shnum)}
69 timings, # dict of name to number of seconds
73 self.file_size = file_size
74 self.ciphertext_fetched = ciphertext_fetched
75 self.preexisting_shares = preexisting_shares
76 self.pushed_shares = pushed_shares
77 self.sharemap = sharemap
78 self.servermap = servermap
79 self.timings = timings
80 self.uri_extension_data = uri_extension_data
81 self.uri_extension_hash = uri_extension_hash
82 self.verifycapstr = verifycapstr
85 def set_uri(self, uri):
89 # our current uri_extension is 846 bytes for small files, a few bytes
90 # more for larger ones (since the filesize is encoded in decimal in a
91 # few places). Ask for a little bit more just in case we need it. If
92 # the extension changes size, we can change EXTENSION_SIZE to
93 # allocate a more accurate amount of space.
95 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
98 def pretty_print_shnum_to_servers(s):
99 return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
102 def __init__(self, server,
103 sharesize, blocksize, num_segments, num_share_hashes,
105 bucket_renewal_secret, bucket_cancel_secret):
106 self._server = server
107 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
108 self.sharesize = sharesize
110 wbp = layout.make_write_bucket_proxy(None, None, sharesize,
111 blocksize, num_segments,
114 self.wbp_class = wbp.__class__ # to create more of them
115 self.allocated_size = wbp.get_allocated_size()
116 self.blocksize = blocksize
117 self.num_segments = num_segments
118 self.num_share_hashes = num_share_hashes
119 self.storage_index = storage_index
121 self.renew_secret = bucket_renewal_secret
122 self.cancel_secret = bucket_cancel_secret
125 return ("<ServerTracker for server %s and SI %s>"
126 % (self._server.get_name(), si_b2a(self.storage_index)[:5]))
128 def get_serverid(self):
129 return self._server.get_serverid()
131 return self._server.get_name()
133 def query(self, sharenums):
134 rref = self._server.get_rref()
135 d = rref.callRemote("allocate_buckets",
141 canary=Referenceable())
142 d.addCallback(self._got_reply)
145 def ask_about_existing_shares(self):
146 rref = self._server.get_rref()
147 return rref.callRemote("get_buckets", self.storage_index)
149 def _got_reply(self, (alreadygot, buckets)):
150 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
152 for sharenum, rref in buckets.iteritems():
153 bp = self.wbp_class(rref, self._server, self.sharesize,
156 self.num_share_hashes,
159 self.buckets.update(b)
160 return (alreadygot, set(b.keys()))
165 I abort the remote bucket writers for all shares. This is a good idea
166 to conserve space on the storage server.
168 self.abort_some_buckets(self.buckets.keys())
170 def abort_some_buckets(self, sharenums):
172 I abort the remote bucket writers for the share numbers in sharenums.
174 for sharenum in sharenums:
175 if sharenum in self.buckets:
176 self.buckets[sharenum].abort()
177 del self.buckets[sharenum]
180 def str_shareloc(shnum, bucketwriter):
181 return "%s: %s" % (shnum, bucketwriter.get_servername(),)
183 class Tahoe2ServerSelector(log.PrefixingLogMixin):
185 def __init__(self, upload_id, logparent=None, upload_status=None):
186 self.upload_id = upload_id
187 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
188 # Servers that are working normally, but full.
191 self.num_servers_contacted = 0
192 self.last_failure_msg = None
193 self._status = IUploadStatus(upload_status)
194 log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
195 self.log("starting", level=log.OPERATIONAL)
198 return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
200 def get_shareholders(self, storage_broker, secret_holder,
201 storage_index, share_size, block_size,
202 num_segments, total_shares, needed_shares,
203 servers_of_happiness):
205 @return: (upload_trackers, already_serverids), where upload_trackers
206 is a set of ServerTracker instances that have agreed to hold
207 some shares for us (the shareids are stashed inside the
208 ServerTracker), and already_serverids is a dict mapping
209 shnum to a set of serverids for servers which claim to
210 already have the share.
214 self._status.set_status("Contacting Servers..")
216 self.total_shares = total_shares
217 self.servers_of_happiness = servers_of_happiness
218 self.needed_shares = needed_shares
220 self.homeless_shares = set(range(total_shares))
221 self.use_trackers = set() # ServerTrackers that have shares assigned
223 self.preexisting_shares = {} # shareid => set(serverids) holding shareid
225 # These servers have shares -- any shares -- for our SI. We keep
226 # track of these to write an error message with them later.
227 self.serverids_with_shares = set()
229 # this needed_hashes computation should mirror
230 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
231 # (instead of a HashTree) because we don't require actual hashing
232 # just to count the levels.
233 ht = hashtree.IncompleteHashTree(total_shares)
234 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
236 # figure out how much space to ask for
237 wbp = layout.make_write_bucket_proxy(None, None,
238 share_size, 0, num_segments,
239 num_share_hashes, EXTENSION_SIZE)
240 allocated_size = wbp.get_allocated_size()
241 all_servers = storage_broker.get_servers_for_psi(storage_index)
243 raise NoServersError("client gave us zero servers")
245 # filter the list of servers according to which ones can accomodate
246 # this request. This excludes older servers (which used a 4-byte size
247 # field) from getting large shares (for files larger than about
248 # 12GiB). See #439 for details.
249 def _get_maxsize(server):
250 v0 = server.get_rref().version
251 v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
252 return v1["maximum-immutable-share-size"]
253 writeable_servers = [server for server in all_servers
254 if _get_maxsize(server) >= allocated_size]
255 readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)
257 # decide upon the renewal/cancel secrets, to include them in the
258 # allocate_buckets query.
259 client_renewal_secret = secret_holder.get_renewal_secret()
260 client_cancel_secret = secret_holder.get_cancel_secret()
262 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
264 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
266 def _make_trackers(servers):
269 seed = s.get_lease_seed()
270 renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
271 cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
272 st = ServerTracker(s,
273 share_size, block_size,
274 num_segments, num_share_hashes,
280 # We assign each servers/trackers into one three lists. They all
281 # start in the "first pass" list. During the first pass, as we ask
282 # each one to hold a share, we move their tracker to the "second
283 # pass" list, until the first-pass list is empty. Then during the
284 # second pass, as we ask each to hold more shares, we move their
285 # tracker to the "next pass" list, until the second-pass list is
286 # empty. Then we move everybody from the next-pass list back to the
287 # second-pass list and repeat the "second" pass (really the third,
288 # fourth, etc pass), until all shares are assigned, or we've run out
289 # of potential servers.
290 self.first_pass_trackers = _make_trackers(writeable_servers)
291 self.second_pass_trackers = [] # servers worth asking again
292 self.next_pass_trackers = [] # servers that we have asked again
293 self._started_second_pass = False
295 # We don't try to allocate shares to these servers, since they've
296 # said that they're incapable of storing shares of the size that we'd
297 # want to store. We ask them about existing shares for this storage
298 # index, which we want to know about for accurate
299 # servers_of_happiness accounting, then we forget about them.
300 readonly_trackers = _make_trackers(readonly_servers)
302 # We now ask servers that can't hold any new shares about existing
303 # shares that they might have for our SI. Once this is done, we
304 # start placing the shares that we haven't already accounted
307 if self._status and readonly_trackers:
308 self._status.set_status("Contacting readonly servers to find "
309 "any existing shares")
310 for tracker in readonly_trackers:
311 assert isinstance(tracker, ServerTracker)
312 d = tracker.ask_about_existing_shares()
313 d.addBoth(self._handle_existing_response, tracker)
315 self.num_servers_contacted += 1
316 self.query_count += 1
317 self.log("asking server %s for any existing shares" %
318 (tracker.get_name(),), level=log.NOISY)
319 dl = defer.DeferredList(ds)
320 dl.addCallback(lambda ign: self._loop())
324 def _handle_existing_response(self, res, tracker):
326 I handle responses to the queries sent by
327 Tahoe2ServerSelector._existing_shares.
329 serverid = tracker.get_serverid()
330 if isinstance(res, failure.Failure):
331 self.log("%s got error during existing shares check: %s"
332 % (tracker.get_name(), res), level=log.UNUSUAL)
333 self.error_count += 1
334 self.bad_query_count += 1
338 self.serverids_with_shares.add(serverid)
339 self.log("response to get_buckets() from server %s: alreadygot=%s"
340 % (tracker.get_name(), tuple(sorted(buckets))),
342 for bucket in buckets:
343 self.preexisting_shares.setdefault(bucket, set()).add(serverid)
344 self.homeless_shares.discard(bucket)
346 self.bad_query_count += 1
349 def _get_progress_message(self):
350 if not self.homeless_shares:
351 msg = "placed all %d shares, " % (self.total_shares)
353 msg = ("placed %d shares out of %d total (%d homeless), " %
354 (self.total_shares - len(self.homeless_shares),
356 len(self.homeless_shares)))
357 return (msg + "want to place shares on at least %d servers such that "
358 "any %d of them have enough shares to recover the file, "
359 "sent %d queries to %d servers, "
360 "%d queries placed some shares, %d placed none "
361 "(of which %d placed none due to the server being"
362 " full and %d placed none due to an error)" %
363 (self.servers_of_happiness, self.needed_shares,
364 self.query_count, self.num_servers_contacted,
365 self.good_query_count, self.bad_query_count,
366 self.full_count, self.error_count))
370 if not self.homeless_shares:
371 merged = merge_servers(self.preexisting_shares, self.use_trackers)
372 effective_happiness = servers_of_happiness(merged)
373 if self.servers_of_happiness <= effective_happiness:
374 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
375 "self.use_trackers: %s, self.preexisting_shares: %s") \
376 % (self, self._get_progress_message(),
377 pretty_print_shnum_to_servers(merged),
378 [', '.join([str_shareloc(k,v)
379 for k,v in st.buckets.iteritems()])
380 for st in self.use_trackers],
381 pretty_print_shnum_to_servers(self.preexisting_shares))
382 self.log(msg, level=log.OPERATIONAL)
383 return (self.use_trackers, self.preexisting_shares)
385 # We're not okay right now, but maybe we can fix it by
386 # redistributing some shares. In cases where one or two
387 # servers has, before the upload, all or most of the
388 # shares for a given SI, this can work by allowing _loop
389 # a chance to spread those out over the other servers,
390 delta = self.servers_of_happiness - effective_happiness
391 shares = shares_by_server(self.preexisting_shares)
392 # Each server in shares maps to a set of shares stored on it.
393 # Since we want to keep at least one share on each server
394 # that has one (otherwise we'd only be making
395 # the situation worse by removing distinct servers),
396 # each server has len(its shares) - 1 to spread around.
397 shares_to_spread = sum([len(list(sharelist)) - 1
398 for (server, sharelist)
400 if delta <= len(self.first_pass_trackers) and \
401 shares_to_spread >= delta:
402 items = shares.items()
403 while len(self.homeless_shares) < delta:
404 # Loop through the allocated shares, removing
405 # one from each server that has more than one
406 # and putting it back into self.homeless_shares
407 # until we've done this delta times.
408 server, sharelist = items.pop()
409 if len(sharelist) > 1:
410 share = sharelist.pop()
411 self.homeless_shares.add(share)
412 self.preexisting_shares[share].remove(server)
413 if not self.preexisting_shares[share]:
414 del self.preexisting_shares[share]
415 items.append((server, sharelist))
416 for writer in self.use_trackers:
417 writer.abort_some_buckets(self.homeless_shares)
420 # Redistribution won't help us; fail.
421 server_count = len(self.serverids_with_shares)
422 failmsg = failure_message(server_count,
424 self.servers_of_happiness,
426 servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
427 servmsg = servmsgtempl % (
430 self._get_progress_message(),
431 pretty_print_shnum_to_servers(merged)
433 self.log(servmsg, level=log.INFREQUENT)
434 return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
436 if self.first_pass_trackers:
437 tracker = self.first_pass_trackers.pop(0)
438 # TODO: don't pre-convert all serverids to ServerTrackers
439 assert isinstance(tracker, ServerTracker)
441 shares_to_ask = set(sorted(self.homeless_shares)[:1])
442 self.homeless_shares -= shares_to_ask
443 self.query_count += 1
444 self.num_servers_contacted += 1
446 self._status.set_status("Contacting Servers [%s] (first query),"
448 % (tracker.get_name(),
449 len(self.homeless_shares)))
450 d = tracker.query(shares_to_ask)
451 d.addBoth(self._got_response, tracker, shares_to_ask,
452 self.second_pass_trackers)
454 elif self.second_pass_trackers:
455 # ask a server that we've already asked.
456 if not self._started_second_pass:
457 self.log("starting second pass",
459 self._started_second_pass = True
460 num_shares = mathutil.div_ceil(len(self.homeless_shares),
461 len(self.second_pass_trackers))
462 tracker = self.second_pass_trackers.pop(0)
463 shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
464 self.homeless_shares -= shares_to_ask
465 self.query_count += 1
467 self._status.set_status("Contacting Servers [%s] (second query),"
469 % (tracker.get_name(),
470 len(self.homeless_shares)))
471 d = tracker.query(shares_to_ask)
472 d.addBoth(self._got_response, tracker, shares_to_ask,
473 self.next_pass_trackers)
475 elif self.next_pass_trackers:
476 # we've finished the second-or-later pass. Move all the remaining
477 # servers back into self.second_pass_trackers for the next pass.
478 self.second_pass_trackers.extend(self.next_pass_trackers)
479 self.next_pass_trackers[:] = []
482 # no more servers. If we haven't placed enough shares, we fail.
483 merged = merge_servers(self.preexisting_shares, self.use_trackers)
484 effective_happiness = servers_of_happiness(merged)
485 if effective_happiness < self.servers_of_happiness:
486 msg = failure_message(len(self.serverids_with_shares),
488 self.servers_of_happiness,
490 msg = ("server selection failed for %s: %s (%s)" %
491 (self, msg, self._get_progress_message()))
492 if self.last_failure_msg:
493 msg += " (%s)" % (self.last_failure_msg,)
494 self.log(msg, level=log.UNUSUAL)
495 return self._failed(msg)
497 # we placed enough to be happy, so we're done
499 self._status.set_status("Placed all shares")
500 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
501 self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
502 self.log(msg, level=log.OPERATIONAL)
503 return (self.use_trackers, self.preexisting_shares)
505 def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
506 if isinstance(res, failure.Failure):
507 # This is unusual, and probably indicates a bug or a network
509 self.log("%s got error during server selection: %s" % (tracker, res),
511 self.error_count += 1
512 self.bad_query_count += 1
513 self.homeless_shares |= shares_to_ask
514 if (self.first_pass_trackers
515 or self.second_pass_trackers
516 or self.next_pass_trackers):
517 # there is still hope, so just loop
520 # No more servers, so this upload might fail (it depends upon
521 # whether we've hit servers_of_happiness or not). Log the last
522 # failure we got: if a coding error causes all servers to fail
523 # in the same way, this allows the common failure to be seen
524 # by the uploader and should help with debugging
525 msg = ("last failure (from %s) was: %s" % (tracker, res))
526 self.last_failure_msg = msg
528 (alreadygot, allocated) = res
529 self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
530 % (tracker.get_name(),
531 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
535 self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
536 if s in self.homeless_shares:
537 self.homeless_shares.remove(s)
539 elif s in shares_to_ask:
542 # the ServerTracker will remember which shares were allocated on
543 # that peer. We just have to remember to use them.
545 self.use_trackers.add(tracker)
548 if allocated or alreadygot:
549 self.serverids_with_shares.add(tracker.get_serverid())
551 not_yet_present = set(shares_to_ask) - set(alreadygot)
552 still_homeless = not_yet_present - set(allocated)
555 # They accepted at least one of the shares that we asked
556 # them to accept, or they had a share that we didn't ask
557 # them to accept but that we hadn't placed yet, so this
558 # was a productive query
559 self.good_query_count += 1
561 self.bad_query_count += 1
565 # In networks with lots of space, this is very unusual and
566 # probably indicates an error. In networks with servers that
567 # are full, it is merely unusual. In networks that are very
568 # full, it is common, and many uploads will fail. In most
569 # cases, this is obviously not fatal, and we'll just use some
572 # some shares are still homeless, keep trying to find them a
573 # home. The ones that were rejected get first priority.
574 self.homeless_shares |= still_homeless
575 # Since they were unable to accept all of our requests, so it
576 # is safe to assume that asking them again won't help.
578 # if they *were* able to accept everything, they might be
579 # willing to accept even more.
580 put_tracker_here.append(tracker)
586 def _failed(self, msg):
588 I am called when server selection fails. I first abort all of the
589 remote buckets that I allocated during my unsuccessful attempt to
590 place shares for this file. I then raise an
591 UploadUnhappinessError with my msg argument.
593 for tracker in self.use_trackers:
594 assert isinstance(tracker, ServerTracker)
596 raise UploadUnhappinessError(msg)
599 class EncryptAnUploadable:
600 """This is a wrapper that takes an IUploadable and provides
601 IEncryptedUploadable."""
602 implements(IEncryptedUploadable)
605 def __init__(self, original, log_parent=None):
606 self.original = IUploadable(original)
607 self._log_number = log_parent
608 self._encryptor = None
609 self._plaintext_hasher = plaintext_hasher()
610 self._plaintext_segment_hasher = None
611 self._plaintext_segment_hashes = []
612 self._encoding_parameters = None
613 self._file_size = None
614 self._ciphertext_bytes_read = 0
617 def set_upload_status(self, upload_status):
618 self._status = IUploadStatus(upload_status)
619 self.original.set_upload_status(upload_status)
621 def log(self, *args, **kwargs):
622 if "facility" not in kwargs:
623 kwargs["facility"] = "upload.encryption"
624 if "parent" not in kwargs:
625 kwargs["parent"] = self._log_number
626 return log.msg(*args, **kwargs)
629 if self._file_size is not None:
630 return defer.succeed(self._file_size)
631 d = self.original.get_size()
633 self._file_size = size
635 self._status.set_size(size)
637 d.addCallback(_got_size)
640 def get_all_encoding_parameters(self):
641 if self._encoding_parameters is not None:
642 return defer.succeed(self._encoding_parameters)
643 d = self.original.get_all_encoding_parameters()
644 def _got(encoding_parameters):
645 (k, happy, n, segsize) = encoding_parameters
646 self._segment_size = segsize # used by segment hashers
647 self._encoding_parameters = encoding_parameters
648 self.log("my encoding parameters: %s" % (encoding_parameters,),
650 return encoding_parameters
654 def _get_encryptor(self):
656 return defer.succeed(self._encryptor)
658 d = self.original.get_encryption_key()
663 storage_index = storage_index_hash(key)
664 assert isinstance(storage_index, str)
665 # There's no point to having the SI be longer than the key, so we
666 # specify that it is truncated to the same 128 bits as the AES key.
667 assert len(storage_index) == 16 # SHA-256 truncated to 128b
668 self._storage_index = storage_index
670 self._status.set_storage_index(storage_index)
675 def get_storage_index(self):
676 d = self._get_encryptor()
677 d.addCallback(lambda res: self._storage_index)
680 def _get_segment_hasher(self):
681 p = self._plaintext_segment_hasher
683 left = self._segment_size - self._plaintext_segment_hashed_bytes
685 p = plaintext_segment_hasher()
686 self._plaintext_segment_hasher = p
687 self._plaintext_segment_hashed_bytes = 0
688 return p, self._segment_size
690 def _update_segment_hash(self, chunk):
692 while offset < len(chunk):
693 p, segment_left = self._get_segment_hasher()
694 chunk_left = len(chunk) - offset
695 this_segment = min(chunk_left, segment_left)
696 p.update(chunk[offset:offset+this_segment])
697 self._plaintext_segment_hashed_bytes += this_segment
699 if self._plaintext_segment_hashed_bytes == self._segment_size:
700 # we've filled this segment
701 self._plaintext_segment_hashes.append(p.digest())
702 self._plaintext_segment_hasher = None
703 self.log("closed hash [%d]: %dB" %
704 (len(self._plaintext_segment_hashes)-1,
705 self._plaintext_segment_hashed_bytes),
707 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
708 segnum=len(self._plaintext_segment_hashes)-1,
709 hash=base32.b2a(p.digest()),
712 offset += this_segment
715 def read_encrypted(self, length, hash_only):
716 # make sure our parameters have been set up first
717 d = self.get_all_encoding_parameters()
719 d.addCallback(lambda ignored: self.get_size())
720 d.addCallback(lambda ignored: self._get_encryptor())
721 # then fetch and encrypt the plaintext. The unusual structure here
722 # (passing a Deferred *into* a function) is needed to avoid
723 # overflowing the stack: Deferreds don't optimize out tail recursion.
724 # We also pass in a list, to which _read_encrypted will append
727 d2 = defer.Deferred()
728 d.addCallback(lambda ignored:
729 self._read_encrypted(length, ciphertext, hash_only, d2))
730 d.addCallback(lambda ignored: d2)
733 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
735 fire_when_done.callback(ciphertext)
737 # tolerate large length= values without consuming a lot of RAM by
738 # reading just a chunk (say 50kB) at a time. This only really matters
739 # when hash_only==True (i.e. resuming an interrupted upload), since
740 # that's the case where we will be skipping over a lot of data.
741 size = min(remaining, self.CHUNKSIZE)
742 remaining = remaining - size
743 # read a chunk of plaintext..
744 d = defer.maybeDeferred(self.original.read, size)
745 # N.B.: if read() is synchronous, then since everything else is
746 # actually synchronous too, we'd blow the stack unless we stall for a
747 # tick. Once you accept a Deferred from IUploadable.read(), you must
748 # be prepared to have it fire immediately too.
749 d.addCallback(fireEventually)
750 def _good(plaintext):
752 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
753 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
754 ciphertext.extend(ct)
755 self._read_encrypted(remaining, ciphertext, hash_only,
758 fire_when_done.errback(why)
763 def _hash_and_encrypt_plaintext(self, data, hash_only):
764 assert isinstance(data, (tuple, list)), type(data)
767 # we use data.pop(0) instead of 'for chunk in data' to save
768 # memory: each chunk is destroyed as soon as we're done with it.
772 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
774 bytes_processed += len(chunk)
775 self._plaintext_hasher.update(chunk)
776 self._update_segment_hash(chunk)
777 # TODO: we have to encrypt the data (even if hash_only==True)
778 # because pycryptopp's AES-CTR implementation doesn't offer a
779 # way to change the counter value. Once pycryptopp acquires
780 # this ability, change this to simply update the counter
781 # before each call to (hash_only==False) _encryptor.process()
782 ciphertext = self._encryptor.process(chunk)
784 self.log(" skipping encryption", level=log.NOISY)
786 cryptdata.append(ciphertext)
789 self._ciphertext_bytes_read += bytes_processed
791 progress = float(self._ciphertext_bytes_read) / self._file_size
792 self._status.set_progress(1, progress)
796 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
797 # this is currently unused, but will live again when we fix #453
798 if len(self._plaintext_segment_hashes) < num_segments:
799 # close out the last one
800 assert len(self._plaintext_segment_hashes) == num_segments-1
801 p, segment_left = self._get_segment_hasher()
802 self._plaintext_segment_hashes.append(p.digest())
803 del self._plaintext_segment_hasher
804 self.log("closing plaintext leaf hasher, hashed %d bytes" %
805 self._plaintext_segment_hashed_bytes,
807 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
808 segnum=len(self._plaintext_segment_hashes)-1,
809 hash=base32.b2a(p.digest()),
811 assert len(self._plaintext_segment_hashes) == num_segments
812 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
814 def get_plaintext_hash(self):
815 h = self._plaintext_hasher.digest()
816 return defer.succeed(h)
819 return self.original.close()
822 implements(IUploadStatus)
823 statusid_counter = itertools.count(0)
826 self.storage_index = None
829 self.status = "Not started"
830 self.progress = [0.0, 0.0, 0.0]
833 self.counter = self.statusid_counter.next()
834 self.started = time.time()
836 def get_started(self):
838 def get_storage_index(self):
839 return self.storage_index
842 def using_helper(self):
844 def get_status(self):
846 def get_progress(self):
847 return tuple(self.progress)
848 def get_active(self):
850 def get_results(self):
852 def get_counter(self):
855 def set_storage_index(self, si):
856 self.storage_index = si
857 def set_size(self, size):
859 def set_helper(self, helper):
861 def set_status(self, status):
863 def set_progress(self, which, value):
864 # [0]: chk, [1]: ciphertext, [2]: encode+push
865 self.progress[which] = value
866 def set_active(self, value):
868 def set_results(self, value):
872 server_selector_class = Tahoe2ServerSelector
874 def __init__(self, storage_broker, secret_holder):
875 # server_selector needs storage_broker and secret_holder
876 self._storage_broker = storage_broker
877 self._secret_holder = secret_holder
878 self._log_number = self.log("CHKUploader starting", parent=None)
880 self._storage_index = None
881 self._upload_status = UploadStatus()
882 self._upload_status.set_helper(False)
883 self._upload_status.set_active(True)
885 # locate_all_shareholders() will create the following attribute:
886 # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
888 def log(self, *args, **kwargs):
889 if "parent" not in kwargs:
890 kwargs["parent"] = self._log_number
891 if "facility" not in kwargs:
892 kwargs["facility"] = "tahoe.upload"
893 return log.msg(*args, **kwargs)
895 def start(self, encrypted_uploadable):
896 """Start uploading the file.
898 Returns a Deferred that will fire with the UploadResults instance.
901 self._started = time.time()
902 eu = IEncryptedUploadable(encrypted_uploadable)
903 self.log("starting upload of %s" % eu)
905 eu.set_upload_status(self._upload_status)
906 d = self.start_encrypted(eu)
907 def _done(uploadresults):
908 self._upload_status.set_active(False)
914 """Call this if the upload must be abandoned before it completes.
915 This will tell the shareholders to delete their partial shares. I
916 return a Deferred that fires when these messages have been acked."""
917 if not self._encoder:
918 # how did you call abort() before calling start() ?
919 return defer.succeed(None)
920 return self._encoder.abort()
922 def start_encrypted(self, encrypted):
923 """ Returns a Deferred that will fire with the UploadResults instance. """
924 eu = IEncryptedUploadable(encrypted)
926 started = time.time()
927 self._encoder = e = encode.Encoder(self._log_number,
929 d = e.set_encrypted_uploadable(eu)
930 d.addCallback(self.locate_all_shareholders, started)
931 d.addCallback(self.set_shareholders, e)
932 d.addCallback(lambda res: e.start())
933 d.addCallback(self._encrypted_done)
936 def locate_all_shareholders(self, encoder, started):
937 server_selection_started = now = time.time()
938 self._storage_index_elapsed = now - started
939 storage_broker = self._storage_broker
940 secret_holder = self._secret_holder
941 storage_index = encoder.get_param("storage_index")
942 self._storage_index = storage_index
943 upload_id = si_b2a(storage_index)[:5]
944 self.log("using storage index %s" % upload_id)
945 server_selector = self.server_selector_class(upload_id,
949 share_size = encoder.get_param("share_size")
950 block_size = encoder.get_param("block_size")
951 num_segments = encoder.get_param("num_segments")
952 k,desired,n = encoder.get_param("share_counts")
954 self._server_selection_started = time.time()
955 d = server_selector.get_shareholders(storage_broker, secret_holder,
957 share_size, block_size,
958 num_segments, n, k, desired)
960 self._server_selection_elapsed = time.time() - server_selection_started
965 def set_shareholders(self, (upload_trackers, already_serverids), encoder):
967 @param upload_trackers: a sequence of ServerTracker objects that
968 have agreed to hold some shares for us (the
969 shareids are stashed inside the ServerTracker)
971 @paran already_serverids: a dict mapping sharenum to a set of
972 serverids for servers that claim to already
975 msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
976 values = ([', '.join([str_shareloc(k,v)
977 for k,v in st.buckets.iteritems()])
978 for st in upload_trackers], already_serverids)
979 self.log(msgtempl % values, level=log.OPERATIONAL)
980 # record already-present shares in self._results
981 self._count_preexisting_shares = len(already_serverids)
983 self._server_trackers = {} # k: shnum, v: instance of ServerTracker
984 for tracker in upload_trackers:
985 assert isinstance(tracker, ServerTracker)
987 servermap = already_serverids.copy()
988 for tracker in upload_trackers:
989 buckets.update(tracker.buckets)
990 for shnum in tracker.buckets:
991 self._server_trackers[shnum] = tracker
992 servermap.setdefault(shnum, set()).add(tracker.get_serverid())
993 assert len(buckets) == sum([len(tracker.buckets)
994 for tracker in upload_trackers]), \
995 "%s (%s) != %s (%s)" % (
998 sum([len(tracker.buckets) for tracker in upload_trackers]),
999 [(t.buckets, t.get_serverid()) for t in upload_trackers]
1001 encoder.set_shareholders(buckets, servermap)
1003 def _encrypted_done(self, verifycap):
1004 """Returns a Deferred that will fire with the UploadResults instance."""
1006 sharemap = dictutil.DictOfSets()
1007 servermap = dictutil.DictOfSets()
1008 for shnum in e.get_shares_placed():
1009 server_tracker = self._server_trackers[shnum]
1010 serverid = server_tracker.get_serverid()
1011 sharemap.add(shnum, serverid)
1012 servermap.add(serverid, shnum)
1015 timings["total"] = now - self._started
1016 timings["storage_index"] = self._storage_index_elapsed
1017 timings["peer_selection"] = self._server_selection_elapsed
1018 timings.update(e.get_times())
1019 ur = UploadResults(file_size=e.file_size,
1020 ciphertext_fetched=0,
1021 preexisting_shares=self._count_preexisting_shares,
1022 pushed_shares=len(e.get_shares_placed()),
1024 servermap=servermap,
1026 uri_extension_data=e.get_uri_extension_data(),
1027 uri_extension_hash=e.get_uri_extension_hash(),
1028 verifycapstr=verifycap.to_string())
1029 self._upload_status.set_results(ur)
1032 def get_upload_status(self):
1033 return self._upload_status
1035 def read_this_many_bytes(uploadable, size, prepend_data=[]):
1037 return defer.succeed([])
1038 d = uploadable.read(size)
1040 assert isinstance(data, list)
1041 bytes = sum([len(piece) for piece in data])
1043 assert bytes <= size
1044 remaining = size - bytes
1046 return read_this_many_bytes(uploadable, remaining,
1047 prepend_data + data)
1048 return prepend_data + data
1052 class LiteralUploader:
1055 self._status = s = UploadStatus()
1056 s.set_storage_index(None)
1058 s.set_progress(0, 1.0)
1061 def start(self, uploadable):
1062 uploadable = IUploadable(uploadable)
1063 d = uploadable.get_size()
1064 def _got_size(size):
1066 self._status.set_size(size)
1067 return read_this_many_bytes(uploadable, size)
1068 d.addCallback(_got_size)
1069 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
1070 d.addCallback(lambda u: u.to_string())
1071 d.addCallback(self._build_results)
1074 def _build_results(self, uri):
1075 ur = UploadResults(file_size=self._size,
1076 ciphertext_fetched=0,
1077 preexisting_shares=0,
1082 uri_extension_data=None,
1083 uri_extension_hash=None,
1086 self._status.set_status("Finished")
1087 self._status.set_progress(1, 1.0)
1088 self._status.set_progress(2, 1.0)
1089 self._status.set_results(ur)
1095 def get_upload_status(self):
1098 class RemoteEncryptedUploadable(Referenceable):
1099 implements(RIEncryptedUploadable)
1101 def __init__(self, encrypted_uploadable, upload_status):
1102 self._eu = IEncryptedUploadable(encrypted_uploadable)
1104 self._bytes_sent = 0
1105 self._status = IUploadStatus(upload_status)
1106 # we are responsible for updating the status string while we run, and
1107 # for setting the ciphertext-fetch progress.
1111 if self._size is not None:
1112 return defer.succeed(self._size)
1113 d = self._eu.get_size()
1114 def _got_size(size):
1117 d.addCallback(_got_size)
1120 def remote_get_size(self):
1121 return self.get_size()
1122 def remote_get_all_encoding_parameters(self):
1123 return self._eu.get_all_encoding_parameters()
1125 def _read_encrypted(self, length, hash_only):
1126 d = self._eu.read_encrypted(length, hash_only)
1129 self._offset += length
1131 size = sum([len(data) for data in strings])
1132 self._offset += size
1134 d.addCallback(_read)
1137 def remote_read_encrypted(self, offset, length):
1138 # we don't support seek backwards, but we allow skipping forwards
1139 precondition(offset >= 0, offset)
1140 precondition(length >= 0, length)
1141 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1143 precondition(offset >= self._offset, offset, self._offset)
1144 if offset > self._offset:
1145 # read the data from disk anyways, to build up the hash tree
1146 skip = offset - self._offset
1147 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1148 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1149 d = self._read_encrypted(skip, hash_only=True)
1151 d = defer.succeed(None)
1153 def _at_correct_offset(res):
1154 assert offset == self._offset, "%d != %d" % (offset, self._offset)
1155 return self._read_encrypted(length, hash_only=False)
1156 d.addCallback(_at_correct_offset)
1159 size = sum([len(data) for data in strings])
1160 self._bytes_sent += size
1162 d.addCallback(_read)
1165 def remote_close(self):
1166 return self._eu.close()
1169 class AssistedUploader:
1171 def __init__(self, helper):
1172 self._helper = helper
1173 self._log_number = log.msg("AssistedUploader starting")
1174 self._storage_index = None
1175 self._upload_status = s = UploadStatus()
1179 def log(self, *args, **kwargs):
1180 if "parent" not in kwargs:
1181 kwargs["parent"] = self._log_number
1182 return log.msg(*args, **kwargs)
1184 def start(self, encrypted_uploadable, storage_index):
1185 """Start uploading the file.
1187 Returns a Deferred that will fire with the UploadResults instance.
1189 precondition(isinstance(storage_index, str), storage_index)
1190 self._started = time.time()
1191 eu = IEncryptedUploadable(encrypted_uploadable)
1192 eu.set_upload_status(self._upload_status)
1193 self._encuploadable = eu
1194 self._storage_index = storage_index
1196 d.addCallback(self._got_size)
1197 d.addCallback(lambda res: eu.get_all_encoding_parameters())
1198 d.addCallback(self._got_all_encoding_parameters)
1199 d.addCallback(self._contact_helper)
1200 d.addCallback(self._build_verifycap)
1202 self._upload_status.set_active(False)
1207 def _got_size(self, size):
1209 self._upload_status.set_size(size)
1211 def _got_all_encoding_parameters(self, params):
1212 k, happy, n, segment_size = params
1213 # stash these for URI generation later
1214 self._needed_shares = k
1215 self._total_shares = n
1216 self._segment_size = segment_size
1218 def _contact_helper(self, res):
1219 now = self._time_contacting_helper_start = time.time()
1220 self._storage_index_elapsed = now - self._started
1221 self.log(format="contacting helper for SI %(si)s..",
1222 si=si_b2a(self._storage_index), level=log.NOISY)
1223 self._upload_status.set_status("Contacting Helper")
1224 d = self._helper.callRemote("upload_chk", self._storage_index)
1225 d.addCallback(self._contacted_helper)
1228 def _contacted_helper(self, (helper_upload_results, upload_helper)):
1230 elapsed = now - self._time_contacting_helper_start
1231 self._elapsed_time_contacting_helper = elapsed
1233 self.log("helper says we need to upload", level=log.NOISY)
1234 self._upload_status.set_status("Uploading Ciphertext")
1235 # we need to upload the file
1236 reu = RemoteEncryptedUploadable(self._encuploadable,
1237 self._upload_status)
1238 # let it pre-compute the size for progress purposes
1240 d.addCallback(lambda ignored:
1241 upload_helper.callRemote("upload", reu))
1242 # this Deferred will fire with the upload results
1244 self.log("helper says file is already uploaded", level=log.OPERATIONAL)
1245 self._upload_status.set_progress(1, 1.0)
1246 return helper_upload_results
1248 def _convert_old_upload_results(self, upload_results):
1249 # pre-1.3.0 helpers return upload results which contain a mapping
1250 # from shnum to a single human-readable string, containing things
1251 # like "Found on [x],[y],[z]" (for healthy files that were already in
1252 # the grid), "Found on [x]" (for files that needed upload but which
1253 # discovered pre-existing shares), and "Placed on [x]" (for newly
1254 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1255 # set of binary serverid strings.
1257 # the old results are too hard to deal with (they don't even contain
1258 # as much information as the new results, since the nodeids are
1259 # abbreviated), so if we detect old results, just clobber them.
1261 sharemap = upload_results.sharemap
1262 if str in [type(v) for v in sharemap.values()]:
1263 upload_results.sharemap = None
1265 def _build_verifycap(self, helper_upload_results):
1266 self.log("upload finished, building readcap", level=log.OPERATIONAL)
1267 self._convert_old_upload_results(helper_upload_results)
1268 self._upload_status.set_status("Building Readcap")
1269 hur = helper_upload_results
1270 assert hur.uri_extension_data["needed_shares"] == self._needed_shares
1271 assert hur.uri_extension_data["total_shares"] == self._total_shares
1272 assert hur.uri_extension_data["segment_size"] == self._segment_size
1273 assert hur.uri_extension_data["size"] == self._size
1275 # hur.verifycap doesn't exist if already found
1276 v = uri.CHKFileVerifierURI(self._storage_index,
1277 uri_extension_hash=hur.uri_extension_hash,
1278 needed_shares=self._needed_shares,
1279 total_shares=self._total_shares,
1282 timings["storage_index"] = self._storage_index_elapsed
1283 timings["contacting_helper"] = self._elapsed_time_contacting_helper
1284 for key,val in hur.timings.items():
1286 key = "helper_total"
1289 timings["total"] = now - self._started
1291 ur = UploadResults(file_size=self._size,
1292 # not if already found
1293 ciphertext_fetched=hur.ciphertext_fetched,
1294 preexisting_shares=hur.preexisting_shares,
1295 pushed_shares=hur.pushed_shares,
1296 sharemap=hur.sharemap,
1297 servermap=hur.servermap, # not if already found
1299 uri_extension_data=hur.uri_extension_data,
1300 uri_extension_hash=hur.uri_extension_hash,
1301 verifycapstr=v.to_string())
1303 self._upload_status.set_status("Finished")
1304 self._upload_status.set_results(ur)
1307 def get_upload_status(self):
1308 return self._upload_status
1310 class BaseUploadable:
1311 # this is overridden by max_segment_size
1312 default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
1313 default_encoding_param_k = 3 # overridden by encoding_parameters
1314 default_encoding_param_happy = 7
1315 default_encoding_param_n = 10
1317 max_segment_size = None
1318 encoding_param_k = None
1319 encoding_param_happy = None
1320 encoding_param_n = None
1322 _all_encoding_parameters = None
1325 def set_upload_status(self, upload_status):
1326 self._status = IUploadStatus(upload_status)
1328 def set_default_encoding_parameters(self, default_params):
1329 assert isinstance(default_params, dict)
1330 for k,v in default_params.items():
1331 precondition(isinstance(k, str), k, v)
1332 precondition(isinstance(v, int), k, v)
1333 if "k" in default_params:
1334 self.default_encoding_param_k = default_params["k"]
1335 if "happy" in default_params:
1336 self.default_encoding_param_happy = default_params["happy"]
1337 if "n" in default_params:
1338 self.default_encoding_param_n = default_params["n"]
1339 if "max_segment_size" in default_params:
1340 self.default_max_segment_size = default_params["max_segment_size"]
1342 def get_all_encoding_parameters(self):
1343 if self._all_encoding_parameters:
1344 return defer.succeed(self._all_encoding_parameters)
1346 max_segsize = self.max_segment_size or self.default_max_segment_size
1347 k = self.encoding_param_k or self.default_encoding_param_k
1348 happy = self.encoding_param_happy or self.default_encoding_param_happy
1349 n = self.encoding_param_n or self.default_encoding_param_n
1352 def _got_size(file_size):
1353 # for small files, shrink the segment size to avoid wasting space
1354 segsize = min(max_segsize, file_size)
1355 # this must be a multiple of 'required_shares'==k
1356 segsize = mathutil.next_multiple(segsize, k)
1357 encoding_parameters = (k, happy, n, segsize)
1358 self._all_encoding_parameters = encoding_parameters
1359 return encoding_parameters
1360 d.addCallback(_got_size)
1363 class FileHandle(BaseUploadable):
1364 implements(IUploadable)
1366 def __init__(self, filehandle, convergence):
1368 Upload the data from the filehandle. If convergence is None then a
1369 random encryption key will be used, else the plaintext will be hashed,
1370 then the hash will be hashed together with the string in the
1371 "convergence" argument to form the encryption key.
1373 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1374 self._filehandle = filehandle
1376 self.convergence = convergence
1379 def _get_encryption_key_convergent(self):
1380 if self._key is not None:
1381 return defer.succeed(self._key)
1384 # that sets self._size as a side-effect
1385 d.addCallback(lambda size: self.get_all_encoding_parameters())
1387 k, happy, n, segsize = params
1388 f = self._filehandle
1389 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1394 data = f.read(BLOCKSIZE)
1397 enckey_hasher.update(data)
1398 # TODO: setting progress in a non-yielding loop is kind of
1399 # pointless, but I'm anticipating (perhaps prematurely) the
1400 # day when we use a slowjob or twisted's CooperatorService to
1401 # make this yield time to other jobs.
1402 bytes_read += len(data)
1404 self._status.set_progress(0, float(bytes_read)/self._size)
1406 self._key = enckey_hasher.digest()
1408 self._status.set_progress(0, 1.0)
1409 assert len(self._key) == 16
1414 def _get_encryption_key_random(self):
1415 if self._key is None:
1416 self._key = os.urandom(16)
1417 return defer.succeed(self._key)
1419 def get_encryption_key(self):
1420 if self.convergence is not None:
1421 return self._get_encryption_key_convergent()
1423 return self._get_encryption_key_random()
1426 if self._size is not None:
1427 return defer.succeed(self._size)
1428 self._filehandle.seek(0, os.SEEK_END)
1429 size = self._filehandle.tell()
1431 self._filehandle.seek(0)
1432 return defer.succeed(size)
1434 def read(self, length):
1435 return defer.succeed([self._filehandle.read(length)])
1438 # the originator of the filehandle reserves the right to close it
1441 class FileName(FileHandle):
1442 def __init__(self, filename, convergence):
1444 Upload the data from the filename. If convergence is None then a
1445 random encryption key will be used, else the plaintext will be hashed,
1446 then the hash will be hashed together with the string in the
1447 "convergence" argument to form the encryption key.
1449 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1450 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1452 FileHandle.close(self)
1453 self._filehandle.close()
1455 class Data(FileHandle):
1456 def __init__(self, data, convergence):
1458 Upload the data from the data argument. If convergence is None then a
1459 random encryption key will be used, else the plaintext will be hashed,
1460 then the hash will be hashed together with the string in the
1461 "convergence" argument to form the encryption key.
1463 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1464 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1466 class Uploader(service.MultiService, log.PrefixingLogMixin):
1467 """I am a service that allows file uploading. I am a service-child of the
1470 implements(IUploader)
1472 URI_LIT_SIZE_THRESHOLD = 55
1474 def __init__(self, helper_furl=None, stats_provider=None, history=None):
1475 self._helper_furl = helper_furl
1476 self.stats_provider = stats_provider
1477 self._history = history
1479 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1480 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1481 service.MultiService.__init__(self)
1483 def startService(self):
1484 service.MultiService.startService(self)
1485 if self._helper_furl:
1486 self.parent.tub.connectTo(self._helper_furl,
1489 def _got_helper(self, helper):
1490 self.log("got helper connection, getting versions")
1491 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1493 "application-version": "unknown: no get_version()",
1495 d = add_version_to_remote_reference(helper, default)
1496 d.addCallback(self._got_versioned_helper)
1498 def _got_versioned_helper(self, helper):
1499 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1500 if needed not in helper.version:
1501 raise InsufficientVersionError(needed, helper.version)
1502 self._helper = helper
1503 helper.notifyOnDisconnect(self._lost_helper)
1505 def _lost_helper(self):
1508 def get_helper_info(self):
1509 # return a tuple of (helper_furl_or_None, connected_bool)
1510 return (self._helper_furl, bool(self._helper))
1513 def upload(self, uploadable):
1515 Returns a Deferred that will fire with the UploadResults instance.
1520 uploadable = IUploadable(uploadable)
1521 d = uploadable.get_size()
1522 def _got_size(size):
1523 default_params = self.parent.get_encoding_parameters()
1524 precondition(isinstance(default_params, dict), default_params)
1525 precondition("max_segment_size" in default_params, default_params)
1526 uploadable.set_default_encoding_parameters(default_params)
1528 if self.stats_provider:
1529 self.stats_provider.count('uploader.files_uploaded', 1)
1530 self.stats_provider.count('uploader.bytes_uploaded', size)
1532 if size <= self.URI_LIT_SIZE_THRESHOLD:
1533 uploader = LiteralUploader()
1534 return uploader.start(uploadable)
1536 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1537 d2 = defer.succeed(None)
1539 uploader = AssistedUploader(self._helper)
1540 d2.addCallback(lambda x: eu.get_storage_index())
1541 d2.addCallback(lambda si: uploader.start(eu, si))
1543 storage_broker = self.parent.get_storage_broker()
1544 secret_holder = self.parent._secret_holder
1545 uploader = CHKUploader(storage_broker, secret_holder)
1546 d2.addCallback(lambda x: uploader.start(eu))
1548 self._all_uploads[uploader] = None
1550 self._history.add_upload(uploader.get_upload_status())
1551 def turn_verifycap_into_read_cap(uploadresults):
1552 # Generate the uri from the verifycap plus the key.
1553 d3 = uploadable.get_encryption_key()
1554 def put_readcap_into_results(key):
1555 v = uri.from_string(uploadresults.verifycapstr)
1556 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1557 uploadresults.set_uri(r.to_string())
1558 return uploadresults
1559 d3.addCallback(put_readcap_into_results)
1561 d2.addCallback(turn_verifycap_into_read_cap)
1563 d.addCallback(_got_size)