1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9 file_cancel_secret_hash, bucket_renewal_secret_hash, \
10 bucket_cancel_secret_hash, plaintext_hasher, \
11 storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17 shares_by_server, merge_servers, \
19 from allmydata.util.assertutil import precondition, _assert
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23 NoServersError, InsufficientVersionError, UploadUnhappinessError, \
24 DEFAULT_MAX_SEGMENT_SIZE
25 from allmydata.immutable import layout
26 from pycryptopp.cipher.aes import AES
28 from cStringIO import StringIO
31 # this wants to live in storage, not here
32 class TooFullError(Exception):
35 # HelperUploadResults are what we get from the Helper, and to retain
36 # backwards compatibility with old Helpers we can't change the format. We
37 # convert them into a local UploadResults upon receipt.
38 class HelperUploadResults(Copyable, RemoteCopy):
39 # note: don't change this string, it needs to match the value used on the
40 # helper, and it does *not* need to match the fully-qualified
41 # package/module/class name
42 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
45 # also, think twice about changing the shape of any existing attribute,
46 # because instances of this class are sent from the helper to its client,
47 # so changing this may break compatibility. Consider adding new fields
48 # instead of modifying existing ones.
51 self.timings = {} # dict of name to number of seconds
52 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
53 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
55 self.ciphertext_fetched = None # how much the helper fetched
57 self.preexisting_shares = None # count of shares already present
58 self.pushed_shares = None # count of shares we pushed
61 implements(IUploadResults)
63 def __init__(self, file_size,
64 ciphertext_fetched, # how much the helper fetched
65 preexisting_shares, # count of shares already present
66 pushed_shares, # count of shares we pushed
67 sharemap, # {shnum: set(server)}
68 servermap, # {server: set(shnum)}
69 timings, # dict of name to number of seconds
73 self._file_size = file_size
74 self._ciphertext_fetched = ciphertext_fetched
75 self._preexisting_shares = preexisting_shares
76 self._pushed_shares = pushed_shares
77 self._sharemap = sharemap
78 self._servermap = servermap
79 self._timings = timings
80 self._uri_extension_data = uri_extension_data
81 self._uri_extension_hash = uri_extension_hash
82 self._verifycapstr = verifycapstr
84 def set_uri(self, uri):
87 def get_file_size(self):
88 return self._file_size
91 def get_ciphertext_fetched(self):
92 return self._ciphertext_fetched
93 def get_preexisting_shares(self):
94 return self._preexisting_shares
95 def get_pushed_shares(self):
96 return self._pushed_shares
97 def get_sharemap(self):
99 def get_servermap(self):
100 return self._servermap
101 def get_timings(self):
103 def get_uri_extension_data(self):
104 return self._uri_extension_data
105 def get_verifycapstr(self):
106 return self._verifycapstr
108 # our current uri_extension is 846 bytes for small files, a few bytes
109 # more for larger ones (since the filesize is encoded in decimal in a
110 # few places). Ask for a little bit more just in case we need it. If
111 # the extension changes size, we can change EXTENSION_SIZE to
112 # allocate a more accurate amount of space.
113 EXTENSION_SIZE = 1000
114 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
117 def pretty_print_shnum_to_servers(s):
118 return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
121 def __init__(self, server,
122 sharesize, blocksize, num_segments, num_share_hashes,
124 bucket_renewal_secret, bucket_cancel_secret):
125 self._server = server
126 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
127 self.sharesize = sharesize
129 wbp = layout.make_write_bucket_proxy(None, None, sharesize,
130 blocksize, num_segments,
133 self.wbp_class = wbp.__class__ # to create more of them
134 self.allocated_size = wbp.get_allocated_size()
135 self.blocksize = blocksize
136 self.num_segments = num_segments
137 self.num_share_hashes = num_share_hashes
138 self.storage_index = storage_index
140 self.renew_secret = bucket_renewal_secret
141 self.cancel_secret = bucket_cancel_secret
144 return ("<ServerTracker for server %s and SI %s>"
145 % (self._server.get_name(), si_b2a(self.storage_index)[:5]))
147 def get_server(self):
149 def get_serverid(self):
150 return self._server.get_serverid()
152 return self._server.get_name()
154 def query(self, sharenums):
155 rref = self._server.get_rref()
156 d = rref.callRemote("allocate_buckets",
162 canary=Referenceable())
163 d.addCallback(self._got_reply)
166 def ask_about_existing_shares(self):
167 rref = self._server.get_rref()
168 return rref.callRemote("get_buckets", self.storage_index)
170 def _got_reply(self, (alreadygot, buckets)):
171 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
173 for sharenum, rref in buckets.iteritems():
174 bp = self.wbp_class(rref, self._server, self.sharesize,
177 self.num_share_hashes,
180 self.buckets.update(b)
181 return (alreadygot, set(b.keys()))
186 I abort the remote bucket writers for all shares. This is a good idea
187 to conserve space on the storage server.
189 self.abort_some_buckets(self.buckets.keys())
191 def abort_some_buckets(self, sharenums):
193 I abort the remote bucket writers for the share numbers in sharenums.
195 for sharenum in sharenums:
196 if sharenum in self.buckets:
197 self.buckets[sharenum].abort()
198 del self.buckets[sharenum]
201 def str_shareloc(shnum, bucketwriter):
202 return "%s: %s" % (shnum, bucketwriter.get_servername(),)
204 class Tahoe2ServerSelector(log.PrefixingLogMixin):
206 def __init__(self, upload_id, logparent=None, upload_status=None):
207 self.upload_id = upload_id
208 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
209 # Servers that are working normally, but full.
212 self.num_servers_contacted = 0
213 self.last_failure_msg = None
214 self._status = IUploadStatus(upload_status)
215 log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
216 self.log("starting", level=log.OPERATIONAL)
219 return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
221 def get_shareholders(self, storage_broker, secret_holder,
222 storage_index, share_size, block_size,
223 num_segments, total_shares, needed_shares,
224 servers_of_happiness):
226 @return: (upload_trackers, already_serverids), where upload_trackers
227 is a set of ServerTracker instances that have agreed to hold
228 some shares for us (the shareids are stashed inside the
229 ServerTracker), and already_serverids is a dict mapping
230 shnum to a set of serverids for servers which claim to
231 already have the share.
235 self._status.set_status("Contacting Servers..")
237 self.total_shares = total_shares
238 self.servers_of_happiness = servers_of_happiness
239 self.needed_shares = needed_shares
241 self.homeless_shares = set(range(total_shares))
242 self.use_trackers = set() # ServerTrackers that have shares assigned
244 self.preexisting_shares = {} # shareid => set(serverids) holding shareid
246 # These servers have shares -- any shares -- for our SI. We keep
247 # track of these to write an error message with them later.
248 self.serverids_with_shares = set()
250 # this needed_hashes computation should mirror
251 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
252 # (instead of a HashTree) because we don't require actual hashing
253 # just to count the levels.
254 ht = hashtree.IncompleteHashTree(total_shares)
255 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
257 # figure out how much space to ask for
258 wbp = layout.make_write_bucket_proxy(None, None,
259 share_size, 0, num_segments,
260 num_share_hashes, EXTENSION_SIZE)
261 allocated_size = wbp.get_allocated_size()
262 all_servers = storage_broker.get_servers_for_psi(storage_index)
264 raise NoServersError("client gave us zero servers")
266 # filter the list of servers according to which ones can accomodate
267 # this request. This excludes older servers (which used a 4-byte size
268 # field) from getting large shares (for files larger than about
269 # 12GiB). See #439 for details.
270 def _get_maxsize(server):
271 v0 = server.get_rref().version
272 v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
273 return v1["maximum-immutable-share-size"]
274 writeable_servers = [server for server in all_servers
275 if _get_maxsize(server) >= allocated_size]
276 readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)
278 # decide upon the renewal/cancel secrets, to include them in the
279 # allocate_buckets query.
280 client_renewal_secret = secret_holder.get_renewal_secret()
281 client_cancel_secret = secret_holder.get_cancel_secret()
283 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
285 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
287 def _make_trackers(servers):
290 seed = s.get_lease_seed()
291 renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
292 cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
293 st = ServerTracker(s,
294 share_size, block_size,
295 num_segments, num_share_hashes,
301 # We assign each servers/trackers into one three lists. They all
302 # start in the "first pass" list. During the first pass, as we ask
303 # each one to hold a share, we move their tracker to the "second
304 # pass" list, until the first-pass list is empty. Then during the
305 # second pass, as we ask each to hold more shares, we move their
306 # tracker to the "next pass" list, until the second-pass list is
307 # empty. Then we move everybody from the next-pass list back to the
308 # second-pass list and repeat the "second" pass (really the third,
309 # fourth, etc pass), until all shares are assigned, or we've run out
310 # of potential servers.
311 self.first_pass_trackers = _make_trackers(writeable_servers)
312 self.second_pass_trackers = [] # servers worth asking again
313 self.next_pass_trackers = [] # servers that we have asked again
314 self._started_second_pass = False
316 # We don't try to allocate shares to these servers, since they've
317 # said that they're incapable of storing shares of the size that we'd
318 # want to store. We ask them about existing shares for this storage
319 # index, which we want to know about for accurate
320 # servers_of_happiness accounting, then we forget about them.
321 readonly_trackers = _make_trackers(readonly_servers)
323 # We now ask servers that can't hold any new shares about existing
324 # shares that they might have for our SI. Once this is done, we
325 # start placing the shares that we haven't already accounted
328 if self._status and readonly_trackers:
329 self._status.set_status("Contacting readonly servers to find "
330 "any existing shares")
331 for tracker in readonly_trackers:
332 assert isinstance(tracker, ServerTracker)
333 d = tracker.ask_about_existing_shares()
334 d.addBoth(self._handle_existing_response, tracker)
336 self.num_servers_contacted += 1
337 self.query_count += 1
338 self.log("asking server %s for any existing shares" %
339 (tracker.get_name(),), level=log.NOISY)
340 dl = defer.DeferredList(ds)
341 dl.addCallback(lambda ign: self._loop())
345 def _handle_existing_response(self, res, tracker):
347 I handle responses to the queries sent by
348 Tahoe2ServerSelector._existing_shares.
350 serverid = tracker.get_serverid()
351 if isinstance(res, failure.Failure):
352 self.log("%s got error during existing shares check: %s"
353 % (tracker.get_name(), res), level=log.UNUSUAL)
354 self.error_count += 1
355 self.bad_query_count += 1
359 self.serverids_with_shares.add(serverid)
360 self.log("response to get_buckets() from server %s: alreadygot=%s"
361 % (tracker.get_name(), tuple(sorted(buckets))),
363 for bucket in buckets:
364 self.preexisting_shares.setdefault(bucket, set()).add(serverid)
365 self.homeless_shares.discard(bucket)
367 self.bad_query_count += 1
370 def _get_progress_message(self):
371 if not self.homeless_shares:
372 msg = "placed all %d shares, " % (self.total_shares)
374 msg = ("placed %d shares out of %d total (%d homeless), " %
375 (self.total_shares - len(self.homeless_shares),
377 len(self.homeless_shares)))
378 return (msg + "want to place shares on at least %d servers such that "
379 "any %d of them have enough shares to recover the file, "
380 "sent %d queries to %d servers, "
381 "%d queries placed some shares, %d placed none "
382 "(of which %d placed none due to the server being"
383 " full and %d placed none due to an error)" %
384 (self.servers_of_happiness, self.needed_shares,
385 self.query_count, self.num_servers_contacted,
386 self.good_query_count, self.bad_query_count,
387 self.full_count, self.error_count))
391 if not self.homeless_shares:
392 merged = merge_servers(self.preexisting_shares, self.use_trackers)
393 effective_happiness = servers_of_happiness(merged)
394 if self.servers_of_happiness <= effective_happiness:
395 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
396 "self.use_trackers: %s, self.preexisting_shares: %s") \
397 % (self, self._get_progress_message(),
398 pretty_print_shnum_to_servers(merged),
399 [', '.join([str_shareloc(k,v)
400 for k,v in st.buckets.iteritems()])
401 for st in self.use_trackers],
402 pretty_print_shnum_to_servers(self.preexisting_shares))
403 self.log(msg, level=log.OPERATIONAL)
404 return (self.use_trackers, self.preexisting_shares)
406 # We're not okay right now, but maybe we can fix it by
407 # redistributing some shares. In cases where one or two
408 # servers has, before the upload, all or most of the
409 # shares for a given SI, this can work by allowing _loop
410 # a chance to spread those out over the other servers,
411 delta = self.servers_of_happiness - effective_happiness
412 shares = shares_by_server(self.preexisting_shares)
413 # Each server in shares maps to a set of shares stored on it.
414 # Since we want to keep at least one share on each server
415 # that has one (otherwise we'd only be making
416 # the situation worse by removing distinct servers),
417 # each server has len(its shares) - 1 to spread around.
418 shares_to_spread = sum([len(list(sharelist)) - 1
419 for (server, sharelist)
421 if delta <= len(self.first_pass_trackers) and \
422 shares_to_spread >= delta:
423 items = shares.items()
424 while len(self.homeless_shares) < delta:
425 # Loop through the allocated shares, removing
426 # one from each server that has more than one
427 # and putting it back into self.homeless_shares
428 # until we've done this delta times.
429 server, sharelist = items.pop()
430 if len(sharelist) > 1:
431 share = sharelist.pop()
432 self.homeless_shares.add(share)
433 self.preexisting_shares[share].remove(server)
434 if not self.preexisting_shares[share]:
435 del self.preexisting_shares[share]
436 items.append((server, sharelist))
437 for writer in self.use_trackers:
438 writer.abort_some_buckets(self.homeless_shares)
441 # Redistribution won't help us; fail.
442 server_count = len(self.serverids_with_shares)
443 failmsg = failure_message(server_count,
445 self.servers_of_happiness,
447 servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
448 servmsg = servmsgtempl % (
451 self._get_progress_message(),
452 pretty_print_shnum_to_servers(merged)
454 self.log(servmsg, level=log.INFREQUENT)
455 return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
457 if self.first_pass_trackers:
458 tracker = self.first_pass_trackers.pop(0)
459 # TODO: don't pre-convert all serverids to ServerTrackers
460 assert isinstance(tracker, ServerTracker)
462 shares_to_ask = set(sorted(self.homeless_shares)[:1])
463 self.homeless_shares -= shares_to_ask
464 self.query_count += 1
465 self.num_servers_contacted += 1
467 self._status.set_status("Contacting Servers [%s] (first query),"
469 % (tracker.get_name(),
470 len(self.homeless_shares)))
471 d = tracker.query(shares_to_ask)
472 d.addBoth(self._got_response, tracker, shares_to_ask,
473 self.second_pass_trackers)
475 elif self.second_pass_trackers:
476 # ask a server that we've already asked.
477 if not self._started_second_pass:
478 self.log("starting second pass",
480 self._started_second_pass = True
481 num_shares = mathutil.div_ceil(len(self.homeless_shares),
482 len(self.second_pass_trackers))
483 tracker = self.second_pass_trackers.pop(0)
484 shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
485 self.homeless_shares -= shares_to_ask
486 self.query_count += 1
488 self._status.set_status("Contacting Servers [%s] (second query),"
490 % (tracker.get_name(),
491 len(self.homeless_shares)))
492 d = tracker.query(shares_to_ask)
493 d.addBoth(self._got_response, tracker, shares_to_ask,
494 self.next_pass_trackers)
496 elif self.next_pass_trackers:
497 # we've finished the second-or-later pass. Move all the remaining
498 # servers back into self.second_pass_trackers for the next pass.
499 self.second_pass_trackers.extend(self.next_pass_trackers)
500 self.next_pass_trackers[:] = []
503 # no more servers. If we haven't placed enough shares, we fail.
504 merged = merge_servers(self.preexisting_shares, self.use_trackers)
505 effective_happiness = servers_of_happiness(merged)
506 if effective_happiness < self.servers_of_happiness:
507 msg = failure_message(len(self.serverids_with_shares),
509 self.servers_of_happiness,
511 msg = ("server selection failed for %s: %s (%s)" %
512 (self, msg, self._get_progress_message()))
513 if self.last_failure_msg:
514 msg += " (%s)" % (self.last_failure_msg,)
515 self.log(msg, level=log.UNUSUAL)
516 return self._failed(msg)
518 # we placed enough to be happy, so we're done
520 self._status.set_status("Placed all shares")
521 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
522 self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
523 self.log(msg, level=log.OPERATIONAL)
524 return (self.use_trackers, self.preexisting_shares)
526 def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
527 if isinstance(res, failure.Failure):
528 # This is unusual, and probably indicates a bug or a network
530 self.log("%s got error during server selection: %s" % (tracker, res),
532 self.error_count += 1
533 self.bad_query_count += 1
534 self.homeless_shares |= shares_to_ask
535 if (self.first_pass_trackers
536 or self.second_pass_trackers
537 or self.next_pass_trackers):
538 # there is still hope, so just loop
541 # No more servers, so this upload might fail (it depends upon
542 # whether we've hit servers_of_happiness or not). Log the last
543 # failure we got: if a coding error causes all servers to fail
544 # in the same way, this allows the common failure to be seen
545 # by the uploader and should help with debugging
546 msg = ("last failure (from %s) was: %s" % (tracker, res))
547 self.last_failure_msg = msg
549 (alreadygot, allocated) = res
550 self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
551 % (tracker.get_name(),
552 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
556 self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
557 if s in self.homeless_shares:
558 self.homeless_shares.remove(s)
560 elif s in shares_to_ask:
563 # the ServerTracker will remember which shares were allocated on
564 # that peer. We just have to remember to use them.
566 self.use_trackers.add(tracker)
569 if allocated or alreadygot:
570 self.serverids_with_shares.add(tracker.get_serverid())
572 not_yet_present = set(shares_to_ask) - set(alreadygot)
573 still_homeless = not_yet_present - set(allocated)
576 # They accepted at least one of the shares that we asked
577 # them to accept, or they had a share that we didn't ask
578 # them to accept but that we hadn't placed yet, so this
579 # was a productive query
580 self.good_query_count += 1
582 self.bad_query_count += 1
586 # In networks with lots of space, this is very unusual and
587 # probably indicates an error. In networks with servers that
588 # are full, it is merely unusual. In networks that are very
589 # full, it is common, and many uploads will fail. In most
590 # cases, this is obviously not fatal, and we'll just use some
593 # some shares are still homeless, keep trying to find them a
594 # home. The ones that were rejected get first priority.
595 self.homeless_shares |= still_homeless
596 # Since they were unable to accept all of our requests, so it
597 # is safe to assume that asking them again won't help.
599 # if they *were* able to accept everything, they might be
600 # willing to accept even more.
601 put_tracker_here.append(tracker)
607 def _failed(self, msg):
609 I am called when server selection fails. I first abort all of the
610 remote buckets that I allocated during my unsuccessful attempt to
611 place shares for this file. I then raise an
612 UploadUnhappinessError with my msg argument.
614 for tracker in self.use_trackers:
615 assert isinstance(tracker, ServerTracker)
617 raise UploadUnhappinessError(msg)
620 class EncryptAnUploadable:
621 """This is a wrapper that takes an IUploadable and provides
622 IEncryptedUploadable."""
623 implements(IEncryptedUploadable)
626 def __init__(self, original, log_parent=None):
627 precondition(original.default_params_set,
628 "set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,))
629 self.original = IUploadable(original)
630 self._log_number = log_parent
631 self._encryptor = None
632 self._plaintext_hasher = plaintext_hasher()
633 self._plaintext_segment_hasher = None
634 self._plaintext_segment_hashes = []
635 self._encoding_parameters = None
636 self._file_size = None
637 self._ciphertext_bytes_read = 0
640 def set_upload_status(self, upload_status):
641 self._status = IUploadStatus(upload_status)
642 self.original.set_upload_status(upload_status)
644 def log(self, *args, **kwargs):
645 if "facility" not in kwargs:
646 kwargs["facility"] = "upload.encryption"
647 if "parent" not in kwargs:
648 kwargs["parent"] = self._log_number
649 return log.msg(*args, **kwargs)
652 if self._file_size is not None:
653 return defer.succeed(self._file_size)
654 d = self.original.get_size()
656 self._file_size = size
658 self._status.set_size(size)
660 d.addCallback(_got_size)
663 def get_all_encoding_parameters(self):
664 if self._encoding_parameters is not None:
665 return defer.succeed(self._encoding_parameters)
666 d = self.original.get_all_encoding_parameters()
667 def _got(encoding_parameters):
668 (k, happy, n, segsize) = encoding_parameters
669 self._segment_size = segsize # used by segment hashers
670 self._encoding_parameters = encoding_parameters
671 self.log("my encoding parameters: %s" % (encoding_parameters,),
673 return encoding_parameters
677 def _get_encryptor(self):
679 return defer.succeed(self._encryptor)
681 d = self.original.get_encryption_key()
686 storage_index = storage_index_hash(key)
687 assert isinstance(storage_index, str)
688 # There's no point to having the SI be longer than the key, so we
689 # specify that it is truncated to the same 128 bits as the AES key.
690 assert len(storage_index) == 16 # SHA-256 truncated to 128b
691 self._storage_index = storage_index
693 self._status.set_storage_index(storage_index)
698 def get_storage_index(self):
699 d = self._get_encryptor()
700 d.addCallback(lambda res: self._storage_index)
703 def _get_segment_hasher(self):
704 p = self._plaintext_segment_hasher
706 left = self._segment_size - self._plaintext_segment_hashed_bytes
708 p = plaintext_segment_hasher()
709 self._plaintext_segment_hasher = p
710 self._plaintext_segment_hashed_bytes = 0
711 return p, self._segment_size
713 def _update_segment_hash(self, chunk):
715 while offset < len(chunk):
716 p, segment_left = self._get_segment_hasher()
717 chunk_left = len(chunk) - offset
718 this_segment = min(chunk_left, segment_left)
719 p.update(chunk[offset:offset+this_segment])
720 self._plaintext_segment_hashed_bytes += this_segment
722 if self._plaintext_segment_hashed_bytes == self._segment_size:
723 # we've filled this segment
724 self._plaintext_segment_hashes.append(p.digest())
725 self._plaintext_segment_hasher = None
726 self.log("closed hash [%d]: %dB" %
727 (len(self._plaintext_segment_hashes)-1,
728 self._plaintext_segment_hashed_bytes),
730 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
731 segnum=len(self._plaintext_segment_hashes)-1,
732 hash=base32.b2a(p.digest()),
735 offset += this_segment
738 def read_encrypted(self, length, hash_only):
739 # make sure our parameters have been set up first
740 d = self.get_all_encoding_parameters()
742 d.addCallback(lambda ignored: self.get_size())
743 d.addCallback(lambda ignored: self._get_encryptor())
744 # then fetch and encrypt the plaintext. The unusual structure here
745 # (passing a Deferred *into* a function) is needed to avoid
746 # overflowing the stack: Deferreds don't optimize out tail recursion.
747 # We also pass in a list, to which _read_encrypted will append
750 d2 = defer.Deferred()
751 d.addCallback(lambda ignored:
752 self._read_encrypted(length, ciphertext, hash_only, d2))
753 d.addCallback(lambda ignored: d2)
756 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
758 fire_when_done.callback(ciphertext)
760 # tolerate large length= values without consuming a lot of RAM by
761 # reading just a chunk (say 50kB) at a time. This only really matters
762 # when hash_only==True (i.e. resuming an interrupted upload), since
763 # that's the case where we will be skipping over a lot of data.
764 size = min(remaining, self.CHUNKSIZE)
765 remaining = remaining - size
766 # read a chunk of plaintext..
767 d = defer.maybeDeferred(self.original.read, size)
768 # N.B.: if read() is synchronous, then since everything else is
769 # actually synchronous too, we'd blow the stack unless we stall for a
770 # tick. Once you accept a Deferred from IUploadable.read(), you must
771 # be prepared to have it fire immediately too.
772 d.addCallback(fireEventually)
773 def _good(plaintext):
775 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
776 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
777 ciphertext.extend(ct)
778 self._read_encrypted(remaining, ciphertext, hash_only,
781 fire_when_done.errback(why)
786 def _hash_and_encrypt_plaintext(self, data, hash_only):
787 assert isinstance(data, (tuple, list)), type(data)
790 # we use data.pop(0) instead of 'for chunk in data' to save
791 # memory: each chunk is destroyed as soon as we're done with it.
795 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
797 bytes_processed += len(chunk)
798 self._plaintext_hasher.update(chunk)
799 self._update_segment_hash(chunk)
800 # TODO: we have to encrypt the data (even if hash_only==True)
801 # because pycryptopp's AES-CTR implementation doesn't offer a
802 # way to change the counter value. Once pycryptopp acquires
803 # this ability, change this to simply update the counter
804 # before each call to (hash_only==False) _encryptor.process()
805 ciphertext = self._encryptor.process(chunk)
807 self.log(" skipping encryption", level=log.NOISY)
809 cryptdata.append(ciphertext)
812 self._ciphertext_bytes_read += bytes_processed
814 progress = float(self._ciphertext_bytes_read) / self._file_size
815 self._status.set_progress(1, progress)
819 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
820 # this is currently unused, but will live again when we fix #453
821 if len(self._plaintext_segment_hashes) < num_segments:
822 # close out the last one
823 assert len(self._plaintext_segment_hashes) == num_segments-1
824 p, segment_left = self._get_segment_hasher()
825 self._plaintext_segment_hashes.append(p.digest())
826 del self._plaintext_segment_hasher
827 self.log("closing plaintext leaf hasher, hashed %d bytes" %
828 self._plaintext_segment_hashed_bytes,
830 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
831 segnum=len(self._plaintext_segment_hashes)-1,
832 hash=base32.b2a(p.digest()),
834 assert len(self._plaintext_segment_hashes) == num_segments
835 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
837 def get_plaintext_hash(self):
838 h = self._plaintext_hasher.digest()
839 return defer.succeed(h)
842 return self.original.close()
845 implements(IUploadStatus)
846 statusid_counter = itertools.count(0)
849 self.storage_index = None
852 self.status = "Not started"
853 self.progress = [0.0, 0.0, 0.0]
856 self.counter = self.statusid_counter.next()
857 self.started = time.time()
859 def get_started(self):
861 def get_storage_index(self):
862 return self.storage_index
865 def using_helper(self):
867 def get_status(self):
869 def get_progress(self):
870 return tuple(self.progress)
871 def get_active(self):
873 def get_results(self):
875 def get_counter(self):
878 def set_storage_index(self, si):
879 self.storage_index = si
880 def set_size(self, size):
882 def set_helper(self, helper):
884 def set_status(self, status):
886 def set_progress(self, which, value):
887 # [0]: chk, [1]: ciphertext, [2]: encode+push
888 self.progress[which] = value
889 def set_active(self, value):
891 def set_results(self, value):
895 server_selector_class = Tahoe2ServerSelector
897 def __init__(self, storage_broker, secret_holder):
898 # server_selector needs storage_broker and secret_holder
899 self._storage_broker = storage_broker
900 self._secret_holder = secret_holder
901 self._log_number = self.log("CHKUploader starting", parent=None)
903 self._storage_index = None
904 self._upload_status = UploadStatus()
905 self._upload_status.set_helper(False)
906 self._upload_status.set_active(True)
908 # locate_all_shareholders() will create the following attribute:
909 # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
911 def log(self, *args, **kwargs):
912 if "parent" not in kwargs:
913 kwargs["parent"] = self._log_number
914 if "facility" not in kwargs:
915 kwargs["facility"] = "tahoe.upload"
916 return log.msg(*args, **kwargs)
918 def start(self, encrypted_uploadable):
919 """Start uploading the file.
921 Returns a Deferred that will fire with the UploadResults instance.
924 self._started = time.time()
925 eu = IEncryptedUploadable(encrypted_uploadable)
926 self.log("starting upload of %s" % eu)
928 eu.set_upload_status(self._upload_status)
929 d = self.start_encrypted(eu)
930 def _done(uploadresults):
931 self._upload_status.set_active(False)
937 """Call this if the upload must be abandoned before it completes.
938 This will tell the shareholders to delete their partial shares. I
939 return a Deferred that fires when these messages have been acked."""
940 if not self._encoder:
941 # how did you call abort() before calling start() ?
942 return defer.succeed(None)
943 return self._encoder.abort()
945 def start_encrypted(self, encrypted):
946 """ Returns a Deferred that will fire with the UploadResults instance. """
947 eu = IEncryptedUploadable(encrypted)
949 started = time.time()
950 self._encoder = e = encode.Encoder(self._log_number,
952 d = e.set_encrypted_uploadable(eu)
953 d.addCallback(self.locate_all_shareholders, started)
954 d.addCallback(self.set_shareholders, e)
955 d.addCallback(lambda res: e.start())
956 d.addCallback(self._encrypted_done)
959 def locate_all_shareholders(self, encoder, started):
960 server_selection_started = now = time.time()
961 self._storage_index_elapsed = now - started
962 storage_broker = self._storage_broker
963 secret_holder = self._secret_holder
964 storage_index = encoder.get_param("storage_index")
965 self._storage_index = storage_index
966 upload_id = si_b2a(storage_index)[:5]
967 self.log("using storage index %s" % upload_id)
968 server_selector = self.server_selector_class(upload_id,
972 share_size = encoder.get_param("share_size")
973 block_size = encoder.get_param("block_size")
974 num_segments = encoder.get_param("num_segments")
975 k,desired,n = encoder.get_param("share_counts")
977 self._server_selection_started = time.time()
978 d = server_selector.get_shareholders(storage_broker, secret_holder,
980 share_size, block_size,
981 num_segments, n, k, desired)
983 self._server_selection_elapsed = time.time() - server_selection_started
988 def set_shareholders(self, (upload_trackers, already_serverids), encoder):
990 @param upload_trackers: a sequence of ServerTracker objects that
991 have agreed to hold some shares for us (the
992 shareids are stashed inside the ServerTracker)
994 @paran already_serverids: a dict mapping sharenum to a set of
995 serverids for servers that claim to already
998 msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
999 values = ([', '.join([str_shareloc(k,v)
1000 for k,v in st.buckets.iteritems()])
1001 for st in upload_trackers], already_serverids)
1002 self.log(msgtempl % values, level=log.OPERATIONAL)
1003 # record already-present shares in self._results
1004 self._count_preexisting_shares = len(already_serverids)
1006 self._server_trackers = {} # k: shnum, v: instance of ServerTracker
1007 for tracker in upload_trackers:
1008 assert isinstance(tracker, ServerTracker)
1010 servermap = already_serverids.copy()
1011 for tracker in upload_trackers:
1012 buckets.update(tracker.buckets)
1013 for shnum in tracker.buckets:
1014 self._server_trackers[shnum] = tracker
1015 servermap.setdefault(shnum, set()).add(tracker.get_serverid())
1016 assert len(buckets) == sum([len(tracker.buckets)
1017 for tracker in upload_trackers]), \
1018 "%s (%s) != %s (%s)" % (
1021 sum([len(tracker.buckets) for tracker in upload_trackers]),
1022 [(t.buckets, t.get_serverid()) for t in upload_trackers]
1024 encoder.set_shareholders(buckets, servermap)
1026 def _encrypted_done(self, verifycap):
1027 """Returns a Deferred that will fire with the UploadResults instance."""
1029 sharemap = dictutil.DictOfSets()
1030 servermap = dictutil.DictOfSets()
1031 for shnum in e.get_shares_placed():
1032 server = self._server_trackers[shnum].get_server()
1033 sharemap.add(shnum, server)
1034 servermap.add(server, shnum)
1037 timings["total"] = now - self._started
1038 timings["storage_index"] = self._storage_index_elapsed
1039 timings["peer_selection"] = self._server_selection_elapsed
1040 timings.update(e.get_times())
1041 ur = UploadResults(file_size=e.file_size,
1042 ciphertext_fetched=0,
1043 preexisting_shares=self._count_preexisting_shares,
1044 pushed_shares=len(e.get_shares_placed()),
1046 servermap=servermap,
1048 uri_extension_data=e.get_uri_extension_data(),
1049 uri_extension_hash=e.get_uri_extension_hash(),
1050 verifycapstr=verifycap.to_string())
1051 self._upload_status.set_results(ur)
1054 def get_upload_status(self):
1055 return self._upload_status
1057 def read_this_many_bytes(uploadable, size, prepend_data=[]):
1059 return defer.succeed([])
1060 d = uploadable.read(size)
1062 assert isinstance(data, list)
1063 bytes = sum([len(piece) for piece in data])
1065 assert bytes <= size
1066 remaining = size - bytes
1068 return read_this_many_bytes(uploadable, remaining,
1069 prepend_data + data)
1070 return prepend_data + data
1074 class LiteralUploader:
1077 self._status = s = UploadStatus()
1078 s.set_storage_index(None)
1080 s.set_progress(0, 1.0)
1083 def start(self, uploadable):
1084 uploadable = IUploadable(uploadable)
1085 d = uploadable.get_size()
1086 def _got_size(size):
1088 self._status.set_size(size)
1089 return read_this_many_bytes(uploadable, size)
1090 d.addCallback(_got_size)
1091 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
1092 d.addCallback(lambda u: u.to_string())
1093 d.addCallback(self._build_results)
1096 def _build_results(self, uri):
1097 ur = UploadResults(file_size=self._size,
1098 ciphertext_fetched=0,
1099 preexisting_shares=0,
1104 uri_extension_data=None,
1105 uri_extension_hash=None,
1108 self._status.set_status("Finished")
1109 self._status.set_progress(1, 1.0)
1110 self._status.set_progress(2, 1.0)
1111 self._status.set_results(ur)
1117 def get_upload_status(self):
1120 class RemoteEncryptedUploadable(Referenceable):
1121 implements(RIEncryptedUploadable)
1123 def __init__(self, encrypted_uploadable, upload_status):
1124 self._eu = IEncryptedUploadable(encrypted_uploadable)
1126 self._bytes_sent = 0
1127 self._status = IUploadStatus(upload_status)
1128 # we are responsible for updating the status string while we run, and
1129 # for setting the ciphertext-fetch progress.
1133 if self._size is not None:
1134 return defer.succeed(self._size)
1135 d = self._eu.get_size()
1136 def _got_size(size):
1139 d.addCallback(_got_size)
1142 def remote_get_size(self):
1143 return self.get_size()
1144 def remote_get_all_encoding_parameters(self):
1145 return self._eu.get_all_encoding_parameters()
1147 def _read_encrypted(self, length, hash_only):
1148 d = self._eu.read_encrypted(length, hash_only)
1151 self._offset += length
1153 size = sum([len(data) for data in strings])
1154 self._offset += size
1156 d.addCallback(_read)
1159 def remote_read_encrypted(self, offset, length):
1160 # we don't support seek backwards, but we allow skipping forwards
1161 precondition(offset >= 0, offset)
1162 precondition(length >= 0, length)
1163 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1165 precondition(offset >= self._offset, offset, self._offset)
1166 if offset > self._offset:
1167 # read the data from disk anyways, to build up the hash tree
1168 skip = offset - self._offset
1169 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1170 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1171 d = self._read_encrypted(skip, hash_only=True)
1173 d = defer.succeed(None)
1175 def _at_correct_offset(res):
1176 assert offset == self._offset, "%d != %d" % (offset, self._offset)
1177 return self._read_encrypted(length, hash_only=False)
1178 d.addCallback(_at_correct_offset)
1181 size = sum([len(data) for data in strings])
1182 self._bytes_sent += size
1184 d.addCallback(_read)
1187 def remote_close(self):
1188 return self._eu.close()
1191 class AssistedUploader:
1193 def __init__(self, helper, storage_broker):
1194 self._helper = helper
1195 self._storage_broker = storage_broker
1196 self._log_number = log.msg("AssistedUploader starting")
1197 self._storage_index = None
1198 self._upload_status = s = UploadStatus()
1202 def log(self, *args, **kwargs):
1203 if "parent" not in kwargs:
1204 kwargs["parent"] = self._log_number
1205 return log.msg(*args, **kwargs)
1207 def start(self, encrypted_uploadable, storage_index):
1208 """Start uploading the file.
1210 Returns a Deferred that will fire with the UploadResults instance.
1212 precondition(isinstance(storage_index, str), storage_index)
1213 self._started = time.time()
1214 eu = IEncryptedUploadable(encrypted_uploadable)
1215 eu.set_upload_status(self._upload_status)
1216 self._encuploadable = eu
1217 self._storage_index = storage_index
1219 d.addCallback(self._got_size)
1220 d.addCallback(lambda res: eu.get_all_encoding_parameters())
1221 d.addCallback(self._got_all_encoding_parameters)
1222 d.addCallback(self._contact_helper)
1223 d.addCallback(self._build_verifycap)
1225 self._upload_status.set_active(False)
1230 def _got_size(self, size):
1232 self._upload_status.set_size(size)
1234 def _got_all_encoding_parameters(self, params):
1235 k, happy, n, segment_size = params
1236 # stash these for URI generation later
1237 self._needed_shares = k
1238 self._total_shares = n
1239 self._segment_size = segment_size
1241 def _contact_helper(self, res):
1242 now = self._time_contacting_helper_start = time.time()
1243 self._storage_index_elapsed = now - self._started
1244 self.log(format="contacting helper for SI %(si)s..",
1245 si=si_b2a(self._storage_index), level=log.NOISY)
1246 self._upload_status.set_status("Contacting Helper")
1247 d = self._helper.callRemote("upload_chk", self._storage_index)
1248 d.addCallback(self._contacted_helper)
1251 def _contacted_helper(self, (helper_upload_results, upload_helper)):
1253 elapsed = now - self._time_contacting_helper_start
1254 self._elapsed_time_contacting_helper = elapsed
1256 self.log("helper says we need to upload", level=log.NOISY)
1257 self._upload_status.set_status("Uploading Ciphertext")
1258 # we need to upload the file
1259 reu = RemoteEncryptedUploadable(self._encuploadable,
1260 self._upload_status)
1261 # let it pre-compute the size for progress purposes
1263 d.addCallback(lambda ignored:
1264 upload_helper.callRemote("upload", reu))
1265 # this Deferred will fire with the upload results
1267 self.log("helper says file is already uploaded", level=log.OPERATIONAL)
1268 self._upload_status.set_progress(1, 1.0)
1269 return helper_upload_results
1271 def _convert_old_upload_results(self, upload_results):
1272 # pre-1.3.0 helpers return upload results which contain a mapping
1273 # from shnum to a single human-readable string, containing things
1274 # like "Found on [x],[y],[z]" (for healthy files that were already in
1275 # the grid), "Found on [x]" (for files that needed upload but which
1276 # discovered pre-existing shares), and "Placed on [x]" (for newly
1277 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1278 # set of binary serverid strings.
1280 # the old results are too hard to deal with (they don't even contain
1281 # as much information as the new results, since the nodeids are
1282 # abbreviated), so if we detect old results, just clobber them.
1284 sharemap = upload_results.sharemap
1285 if str in [type(v) for v in sharemap.values()]:
1286 upload_results.sharemap = None
1288 def _build_verifycap(self, helper_upload_results):
1289 self.log("upload finished, building readcap", level=log.OPERATIONAL)
1290 self._convert_old_upload_results(helper_upload_results)
1291 self._upload_status.set_status("Building Readcap")
1292 hur = helper_upload_results
1293 assert hur.uri_extension_data["needed_shares"] == self._needed_shares
1294 assert hur.uri_extension_data["total_shares"] == self._total_shares
1295 assert hur.uri_extension_data["segment_size"] == self._segment_size
1296 assert hur.uri_extension_data["size"] == self._size
1298 # hur.verifycap doesn't exist if already found
1299 v = uri.CHKFileVerifierURI(self._storage_index,
1300 uri_extension_hash=hur.uri_extension_hash,
1301 needed_shares=self._needed_shares,
1302 total_shares=self._total_shares,
1305 timings["storage_index"] = self._storage_index_elapsed
1306 timings["contacting_helper"] = self._elapsed_time_contacting_helper
1307 for key,val in hur.timings.items():
1309 key = "helper_total"
1312 timings["total"] = now - self._started
1314 gss = self._storage_broker.get_stub_server
1317 for shnum, serverids in hur.sharemap.items():
1318 sharemap[shnum] = set([gss(serverid) for serverid in serverids])
1319 # if the file was already in the grid, hur.servermap is an empty dict
1320 for serverid, shnums in hur.servermap.items():
1321 servermap[gss(serverid)] = set(shnums)
1323 ur = UploadResults(file_size=self._size,
1324 # not if already found
1325 ciphertext_fetched=hur.ciphertext_fetched,
1326 preexisting_shares=hur.preexisting_shares,
1327 pushed_shares=hur.pushed_shares,
1329 servermap=servermap,
1331 uri_extension_data=hur.uri_extension_data,
1332 uri_extension_hash=hur.uri_extension_hash,
1333 verifycapstr=v.to_string())
1335 self._upload_status.set_status("Finished")
1336 self._upload_status.set_results(ur)
1339 def get_upload_status(self):
1340 return self._upload_status
1342 class BaseUploadable:
1343 # this is overridden by max_segment_size
1344 default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
1345 default_params_set = False
1347 max_segment_size = None
1348 encoding_param_k = None
1349 encoding_param_happy = None
1350 encoding_param_n = None
1352 _all_encoding_parameters = None
1355 def set_upload_status(self, upload_status):
1356 self._status = IUploadStatus(upload_status)
1358 def set_default_encoding_parameters(self, default_params):
1359 assert isinstance(default_params, dict)
1360 for k,v in default_params.items():
1361 precondition(isinstance(k, str), k, v)
1362 precondition(isinstance(v, int), k, v)
1363 if "k" in default_params:
1364 self.default_encoding_param_k = default_params["k"]
1365 if "happy" in default_params:
1366 self.default_encoding_param_happy = default_params["happy"]
1367 if "n" in default_params:
1368 self.default_encoding_param_n = default_params["n"]
1369 if "max_segment_size" in default_params:
1370 self.default_max_segment_size = default_params["max_segment_size"]
1371 self.default_params_set = True
1373 def get_all_encoding_parameters(self):
1374 _assert(self.default_params_set, "set_default_encoding_parameters not called on %r" % (self,))
1375 if self._all_encoding_parameters:
1376 return defer.succeed(self._all_encoding_parameters)
1378 max_segsize = self.max_segment_size or self.default_max_segment_size
1379 k = self.encoding_param_k or self.default_encoding_param_k
1380 happy = self.encoding_param_happy or self.default_encoding_param_happy
1381 n = self.encoding_param_n or self.default_encoding_param_n
1384 def _got_size(file_size):
1385 # for small files, shrink the segment size to avoid wasting space
1386 segsize = min(max_segsize, file_size)
1387 # this must be a multiple of 'required_shares'==k
1388 segsize = mathutil.next_multiple(segsize, k)
1389 encoding_parameters = (k, happy, n, segsize)
1390 self._all_encoding_parameters = encoding_parameters
1391 return encoding_parameters
1392 d.addCallback(_got_size)
1395 class FileHandle(BaseUploadable):
1396 implements(IUploadable)
1398 def __init__(self, filehandle, convergence):
1400 Upload the data from the filehandle. If convergence is None then a
1401 random encryption key will be used, else the plaintext will be hashed,
1402 then the hash will be hashed together with the string in the
1403 "convergence" argument to form the encryption key.
1405 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1406 self._filehandle = filehandle
1408 self.convergence = convergence
1411 def _get_encryption_key_convergent(self):
1412 if self._key is not None:
1413 return defer.succeed(self._key)
1416 # that sets self._size as a side-effect
1417 d.addCallback(lambda size: self.get_all_encoding_parameters())
1419 k, happy, n, segsize = params
1420 f = self._filehandle
1421 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1426 data = f.read(BLOCKSIZE)
1429 enckey_hasher.update(data)
1430 # TODO: setting progress in a non-yielding loop is kind of
1431 # pointless, but I'm anticipating (perhaps prematurely) the
1432 # day when we use a slowjob or twisted's CooperatorService to
1433 # make this yield time to other jobs.
1434 bytes_read += len(data)
1436 self._status.set_progress(0, float(bytes_read)/self._size)
1438 self._key = enckey_hasher.digest()
1440 self._status.set_progress(0, 1.0)
1441 assert len(self._key) == 16
1446 def _get_encryption_key_random(self):
1447 if self._key is None:
1448 self._key = os.urandom(16)
1449 return defer.succeed(self._key)
1451 def get_encryption_key(self):
1452 if self.convergence is not None:
1453 return self._get_encryption_key_convergent()
1455 return self._get_encryption_key_random()
1458 if self._size is not None:
1459 return defer.succeed(self._size)
1460 self._filehandle.seek(0, os.SEEK_END)
1461 size = self._filehandle.tell()
1463 self._filehandle.seek(0)
1464 return defer.succeed(size)
1466 def read(self, length):
1467 return defer.succeed([self._filehandle.read(length)])
1470 # the originator of the filehandle reserves the right to close it
1473 class FileName(FileHandle):
1474 def __init__(self, filename, convergence):
1476 Upload the data from the filename. If convergence is None then a
1477 random encryption key will be used, else the plaintext will be hashed,
1478 then the hash will be hashed together with the string in the
1479 "convergence" argument to form the encryption key.
1481 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1482 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1484 FileHandle.close(self)
1485 self._filehandle.close()
1487 class Data(FileHandle):
1488 def __init__(self, data, convergence):
1490 Upload the data from the data argument. If convergence is None then a
1491 random encryption key will be used, else the plaintext will be hashed,
1492 then the hash will be hashed together with the string in the
1493 "convergence" argument to form the encryption key.
1495 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1496 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1498 class Uploader(service.MultiService, log.PrefixingLogMixin):
1499 """I am a service that allows file uploading. I am a service-child of the
1502 implements(IUploader)
1504 URI_LIT_SIZE_THRESHOLD = 55
1506 def __init__(self, helper_furl=None, stats_provider=None, history=None):
1507 self._helper_furl = helper_furl
1508 self.stats_provider = stats_provider
1509 self._history = history
1511 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1512 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1513 service.MultiService.__init__(self)
1515 def startService(self):
1516 service.MultiService.startService(self)
1517 if self._helper_furl:
1518 self.parent.tub.connectTo(self._helper_furl,
1521 def _got_helper(self, helper):
1522 self.log("got helper connection, getting versions")
1523 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1525 "application-version": "unknown: no get_version()",
1527 d = add_version_to_remote_reference(helper, default)
1528 d.addCallback(self._got_versioned_helper)
1530 def _got_versioned_helper(self, helper):
1531 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1532 if needed not in helper.version:
1533 raise InsufficientVersionError(needed, helper.version)
1534 self._helper = helper
1535 helper.notifyOnDisconnect(self._lost_helper)
1537 def _lost_helper(self):
1540 def get_helper_info(self):
1541 # return a tuple of (helper_furl_or_None, connected_bool)
1542 return (self._helper_furl, bool(self._helper))
1545 def upload(self, uploadable):
1547 Returns a Deferred that will fire with the UploadResults instance.
1552 uploadable = IUploadable(uploadable)
1553 d = uploadable.get_size()
1554 def _got_size(size):
1555 default_params = self.parent.get_encoding_parameters()
1556 precondition(isinstance(default_params, dict), default_params)
1557 precondition("max_segment_size" in default_params, default_params)
1558 uploadable.set_default_encoding_parameters(default_params)
1560 if self.stats_provider:
1561 self.stats_provider.count('uploader.files_uploaded', 1)
1562 self.stats_provider.count('uploader.bytes_uploaded', size)
1564 if size <= self.URI_LIT_SIZE_THRESHOLD:
1565 uploader = LiteralUploader()
1566 return uploader.start(uploadable)
1568 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1569 d2 = defer.succeed(None)
1570 storage_broker = self.parent.get_storage_broker()
1572 uploader = AssistedUploader(self._helper, storage_broker)
1573 d2.addCallback(lambda x: eu.get_storage_index())
1574 d2.addCallback(lambda si: uploader.start(eu, si))
1576 storage_broker = self.parent.get_storage_broker()
1577 secret_holder = self.parent._secret_holder
1578 uploader = CHKUploader(storage_broker, secret_holder)
1579 d2.addCallback(lambda x: uploader.start(eu))
1581 self._all_uploads[uploader] = None
1583 self._history.add_upload(uploader.get_upload_status())
1584 def turn_verifycap_into_read_cap(uploadresults):
1585 # Generate the uri from the verifycap plus the key.
1586 d3 = uploadable.get_encryption_key()
1587 def put_readcap_into_results(key):
1588 v = uri.from_string(uploadresults.get_verifycapstr())
1589 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1590 uploadresults.set_uri(r.to_string())
1591 return uploadresults
1592 d3.addCallback(put_readcap_into_results)
1594 d2.addCallback(turn_verifycap_into_read_cap)
1596 d.addCallback(_got_size)