1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9 file_cancel_secret_hash, bucket_renewal_secret_hash, \
10 bucket_cancel_secret_hash, plaintext_hasher, \
11 storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17 shares_by_server, merge_servers, \
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23 NoServersError, InsufficientVersionError, UploadUnhappinessError, \
24 DEFAULT_MAX_SEGMENT_SIZE
25 from allmydata.immutable import layout
26 from pycryptopp.cipher.aes import AES
28 from cStringIO import StringIO
31 # this wants to live in storage, not here
32 class TooFullError(Exception):
35 # HelperUploadResults are what we get from the Helper, and to retain
36 # backwards compatibility with old Helpers we can't change the format. We
37 # convert them into a local UploadResults upon receipt.
38 class HelperUploadResults(Copyable, RemoteCopy):
39 # note: don't change this string, it needs to match the value used on the
40 # helper, and it does *not* need to match the fully-qualified
41 # package/module/class name
42 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
45 # also, think twice about changing the shape of any existing attribute,
46 # because instances of this class are sent from the helper to its client,
47 # so changing this may break compatibility. Consider adding new fields
48 # instead of modifying existing ones.
51 self.timings = {} # dict of name to number of seconds
52 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
53 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
55 self.ciphertext_fetched = None # how much the helper fetched
57 self.preexisting_shares = None # count of shares already present
58 self.pushed_shares = None # count of shares we pushed
61 implements(IUploadResults)
63 def __init__(self, file_size,
64 ciphertext_fetched, # how much the helper fetched
65 preexisting_shares, # count of shares already present
66 pushed_shares, # count of shares we pushed
67 sharemap, # {shnum: set(server)}
68 servermap, # {server: set(shnum)}
69 timings, # dict of name to number of seconds
73 self._file_size = file_size
74 self._ciphertext_fetched = ciphertext_fetched
75 self._preexisting_shares = preexisting_shares
76 self._pushed_shares = pushed_shares
77 self._sharemap = sharemap
78 self._servermap = servermap
79 self._timings = timings
80 self._uri_extension_data = uri_extension_data
81 self._uri_extension_hash = uri_extension_hash
82 self._verifycapstr = verifycapstr
84 def set_uri(self, uri):
87 def get_file_size(self):
88 return self._file_size
91 def get_ciphertext_fetched(self):
92 return self._ciphertext_fetched
93 def get_preexisting_shares(self):
94 return self._preexisting_shares
95 def get_pushed_shares(self):
96 return self._pushed_shares
97 def get_sharemap(self):
99 def get_servermap(self):
100 return self._servermap
101 def get_timings(self):
103 def get_uri_extension_data(self):
104 return self._uri_extension_data
105 def get_verifycapstr(self):
106 return self._verifycapstr
108 # our current uri_extension is 846 bytes for small files, a few bytes
109 # more for larger ones (since the filesize is encoded in decimal in a
110 # few places). Ask for a little bit more just in case we need it. If
111 # the extension changes size, we can change EXTENSION_SIZE to
112 # allocate a more accurate amount of space.
113 EXTENSION_SIZE = 1000
114 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
117 def pretty_print_shnum_to_servers(s):
118 return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
121 def __init__(self, server,
122 sharesize, blocksize, num_segments, num_share_hashes,
124 bucket_renewal_secret, bucket_cancel_secret):
125 self._server = server
126 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
127 self.sharesize = sharesize
129 wbp = layout.make_write_bucket_proxy(None, None, sharesize,
130 blocksize, num_segments,
133 self.wbp_class = wbp.__class__ # to create more of them
134 self.allocated_size = wbp.get_allocated_size()
135 self.blocksize = blocksize
136 self.num_segments = num_segments
137 self.num_share_hashes = num_share_hashes
138 self.storage_index = storage_index
140 self.renew_secret = bucket_renewal_secret
141 self.cancel_secret = bucket_cancel_secret
144 return ("<ServerTracker for server %s and SI %s>"
145 % (self._server.get_name(), si_b2a(self.storage_index)[:5]))
147 def get_server(self):
149 def get_serverid(self):
150 return self._server.get_serverid()
152 return self._server.get_name()
154 def query(self, sharenums):
155 rref = self._server.get_rref()
156 d = rref.callRemote("allocate_buckets",
162 canary=Referenceable())
163 d.addCallback(self._got_reply)
166 def ask_about_existing_shares(self):
167 rref = self._server.get_rref()
168 return rref.callRemote("get_buckets", self.storage_index)
170 def _got_reply(self, (alreadygot, buckets)):
171 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
173 for sharenum, rref in buckets.iteritems():
174 bp = self.wbp_class(rref, self._server, self.sharesize,
177 self.num_share_hashes,
180 self.buckets.update(b)
181 return (alreadygot, set(b.keys()))
186 I abort the remote bucket writers for all shares. This is a good idea
187 to conserve space on the storage server.
189 self.abort_some_buckets(self.buckets.keys())
191 def abort_some_buckets(self, sharenums):
193 I abort the remote bucket writers for the share numbers in sharenums.
195 for sharenum in sharenums:
196 if sharenum in self.buckets:
197 self.buckets[sharenum].abort()
198 del self.buckets[sharenum]
201 def str_shareloc(shnum, bucketwriter):
202 return "%s: %s" % (shnum, bucketwriter.get_servername(),)
204 class Tahoe2ServerSelector(log.PrefixingLogMixin):
206 def __init__(self, upload_id, logparent=None, upload_status=None):
207 self.upload_id = upload_id
208 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
209 # Servers that are working normally, but full.
212 self.num_servers_contacted = 0
213 self.last_failure_msg = None
214 self._status = IUploadStatus(upload_status)
215 log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
216 self.log("starting", level=log.OPERATIONAL)
219 return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
221 def get_shareholders(self, storage_broker, secret_holder,
222 storage_index, share_size, block_size,
223 num_segments, total_shares, needed_shares,
224 servers_of_happiness):
226 @return: (upload_trackers, already_serverids), where upload_trackers
227 is a set of ServerTracker instances that have agreed to hold
228 some shares for us (the shareids are stashed inside the
229 ServerTracker), and already_serverids is a dict mapping
230 shnum to a set of serverids for servers which claim to
231 already have the share.
235 self._status.set_status("Contacting Servers..")
237 self.total_shares = total_shares
238 self.servers_of_happiness = servers_of_happiness
239 self.needed_shares = needed_shares
241 self.homeless_shares = set(range(total_shares))
242 self.use_trackers = set() # ServerTrackers that have shares assigned
244 self.preexisting_shares = {} # shareid => set(serverids) holding shareid
246 # These servers have shares -- any shares -- for our SI. We keep
247 # track of these to write an error message with them later.
248 self.serverids_with_shares = set()
250 # this needed_hashes computation should mirror
251 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
252 # (instead of a HashTree) because we don't require actual hashing
253 # just to count the levels.
254 ht = hashtree.IncompleteHashTree(total_shares)
255 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
257 # figure out how much space to ask for
258 wbp = layout.make_write_bucket_proxy(None, None,
259 share_size, 0, num_segments,
260 num_share_hashes, EXTENSION_SIZE)
261 allocated_size = wbp.get_allocated_size()
262 all_servers = storage_broker.get_servers_for_psi(storage_index)
264 raise NoServersError("client gave us zero servers")
266 # filter the list of servers according to which ones can accomodate
267 # this request. This excludes older servers (which used a 4-byte size
268 # field) from getting large shares (for files larger than about
269 # 12GiB). See #439 for details.
270 def _get_maxsize(server):
271 v0 = server.get_rref().version
272 v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
273 return v1["maximum-immutable-share-size"]
274 writeable_servers = [server for server in all_servers
275 if _get_maxsize(server) >= allocated_size]
276 readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)
278 # decide upon the renewal/cancel secrets, to include them in the
279 # allocate_buckets query.
280 client_renewal_secret = secret_holder.get_renewal_secret()
281 client_cancel_secret = secret_holder.get_cancel_secret()
283 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
285 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
287 def _make_trackers(servers):
290 seed = s.get_lease_seed()
291 renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
292 cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
293 st = ServerTracker(s,
294 share_size, block_size,
295 num_segments, num_share_hashes,
301 # We assign each servers/trackers into one three lists. They all
302 # start in the "first pass" list. During the first pass, as we ask
303 # each one to hold a share, we move their tracker to the "second
304 # pass" list, until the first-pass list is empty. Then during the
305 # second pass, as we ask each to hold more shares, we move their
306 # tracker to the "next pass" list, until the second-pass list is
307 # empty. Then we move everybody from the next-pass list back to the
308 # second-pass list and repeat the "second" pass (really the third,
309 # fourth, etc pass), until all shares are assigned, or we've run out
310 # of potential servers.
311 self.first_pass_trackers = _make_trackers(writeable_servers)
312 self.second_pass_trackers = [] # servers worth asking again
313 self.next_pass_trackers = [] # servers that we have asked again
314 self._started_second_pass = False
316 # We don't try to allocate shares to these servers, since they've
317 # said that they're incapable of storing shares of the size that we'd
318 # want to store. We ask them about existing shares for this storage
319 # index, which we want to know about for accurate
320 # servers_of_happiness accounting, then we forget about them.
321 readonly_trackers = _make_trackers(readonly_servers)
323 # We now ask servers that can't hold any new shares about existing
324 # shares that they might have for our SI. Once this is done, we
325 # start placing the shares that we haven't already accounted
328 if self._status and readonly_trackers:
329 self._status.set_status("Contacting readonly servers to find "
330 "any existing shares")
331 for tracker in readonly_trackers:
332 assert isinstance(tracker, ServerTracker)
333 d = tracker.ask_about_existing_shares()
334 d.addBoth(self._handle_existing_response, tracker)
336 self.num_servers_contacted += 1
337 self.query_count += 1
338 self.log("asking server %s for any existing shares" %
339 (tracker.get_name(),), level=log.NOISY)
340 dl = defer.DeferredList(ds)
341 dl.addCallback(lambda ign: self._loop())
345 def _handle_existing_response(self, res, tracker):
347 I handle responses to the queries sent by
348 Tahoe2ServerSelector._existing_shares.
350 serverid = tracker.get_serverid()
351 if isinstance(res, failure.Failure):
352 self.log("%s got error during existing shares check: %s"
353 % (tracker.get_name(), res), level=log.UNUSUAL)
354 self.error_count += 1
355 self.bad_query_count += 1
359 self.serverids_with_shares.add(serverid)
360 self.log("response to get_buckets() from server %s: alreadygot=%s"
361 % (tracker.get_name(), tuple(sorted(buckets))),
363 for bucket in buckets:
364 self.preexisting_shares.setdefault(bucket, set()).add(serverid)
365 self.homeless_shares.discard(bucket)
367 self.bad_query_count += 1
370 def _get_progress_message(self):
371 if not self.homeless_shares:
372 msg = "placed all %d shares, " % (self.total_shares)
374 msg = ("placed %d shares out of %d total (%d homeless), " %
375 (self.total_shares - len(self.homeless_shares),
377 len(self.homeless_shares)))
378 return (msg + "want to place shares on at least %d servers such that "
379 "any %d of them have enough shares to recover the file, "
380 "sent %d queries to %d servers, "
381 "%d queries placed some shares, %d placed none "
382 "(of which %d placed none due to the server being"
383 " full and %d placed none due to an error)" %
384 (self.servers_of_happiness, self.needed_shares,
385 self.query_count, self.num_servers_contacted,
386 self.good_query_count, self.bad_query_count,
387 self.full_count, self.error_count))
391 if not self.homeless_shares:
392 merged = merge_servers(self.preexisting_shares, self.use_trackers)
393 effective_happiness = servers_of_happiness(merged)
394 if self.servers_of_happiness <= effective_happiness:
395 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
396 "self.use_trackers: %s, self.preexisting_shares: %s") \
397 % (self, self._get_progress_message(),
398 pretty_print_shnum_to_servers(merged),
399 [', '.join([str_shareloc(k,v)
400 for k,v in st.buckets.iteritems()])
401 for st in self.use_trackers],
402 pretty_print_shnum_to_servers(self.preexisting_shares))
403 self.log(msg, level=log.OPERATIONAL)
404 return (self.use_trackers, self.preexisting_shares)
406 # We're not okay right now, but maybe we can fix it by
407 # redistributing some shares. In cases where one or two
408 # servers has, before the upload, all or most of the
409 # shares for a given SI, this can work by allowing _loop
410 # a chance to spread those out over the other servers,
411 delta = self.servers_of_happiness - effective_happiness
412 shares = shares_by_server(self.preexisting_shares)
413 # Each server in shares maps to a set of shares stored on it.
414 # Since we want to keep at least one share on each server
415 # that has one (otherwise we'd only be making
416 # the situation worse by removing distinct servers),
417 # each server has len(its shares) - 1 to spread around.
418 shares_to_spread = sum([len(list(sharelist)) - 1
419 for (server, sharelist)
421 if delta <= len(self.first_pass_trackers) and \
422 shares_to_spread >= delta:
423 items = shares.items()
424 while len(self.homeless_shares) < delta:
425 # Loop through the allocated shares, removing
426 # one from each server that has more than one
427 # and putting it back into self.homeless_shares
428 # until we've done this delta times.
429 server, sharelist = items.pop()
430 if len(sharelist) > 1:
431 share = sharelist.pop()
432 self.homeless_shares.add(share)
433 self.preexisting_shares[share].remove(server)
434 if not self.preexisting_shares[share]:
435 del self.preexisting_shares[share]
436 items.append((server, sharelist))
437 for writer in self.use_trackers:
438 writer.abort_some_buckets(self.homeless_shares)
441 # Redistribution won't help us; fail.
442 server_count = len(self.serverids_with_shares)
443 failmsg = failure_message(server_count,
445 self.servers_of_happiness,
447 servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
448 servmsg = servmsgtempl % (
451 self._get_progress_message(),
452 pretty_print_shnum_to_servers(merged)
454 self.log(servmsg, level=log.INFREQUENT)
455 return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
457 if self.first_pass_trackers:
458 tracker = self.first_pass_trackers.pop(0)
459 # TODO: don't pre-convert all serverids to ServerTrackers
460 assert isinstance(tracker, ServerTracker)
462 shares_to_ask = set(sorted(self.homeless_shares)[:1])
463 self.homeless_shares -= shares_to_ask
464 self.query_count += 1
465 self.num_servers_contacted += 1
467 self._status.set_status("Contacting Servers [%s] (first query),"
469 % (tracker.get_name(),
470 len(self.homeless_shares)))
471 d = tracker.query(shares_to_ask)
472 d.addBoth(self._got_response, tracker, shares_to_ask,
473 self.second_pass_trackers)
475 elif self.second_pass_trackers:
476 # ask a server that we've already asked.
477 if not self._started_second_pass:
478 self.log("starting second pass",
480 self._started_second_pass = True
481 num_shares = mathutil.div_ceil(len(self.homeless_shares),
482 len(self.second_pass_trackers))
483 tracker = self.second_pass_trackers.pop(0)
484 shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
485 self.homeless_shares -= shares_to_ask
486 self.query_count += 1
488 self._status.set_status("Contacting Servers [%s] (second query),"
490 % (tracker.get_name(),
491 len(self.homeless_shares)))
492 d = tracker.query(shares_to_ask)
493 d.addBoth(self._got_response, tracker, shares_to_ask,
494 self.next_pass_trackers)
496 elif self.next_pass_trackers:
497 # we've finished the second-or-later pass. Move all the remaining
498 # servers back into self.second_pass_trackers for the next pass.
499 self.second_pass_trackers.extend(self.next_pass_trackers)
500 self.next_pass_trackers[:] = []
503 # no more servers. If we haven't placed enough shares, we fail.
504 merged = merge_servers(self.preexisting_shares, self.use_trackers)
505 effective_happiness = servers_of_happiness(merged)
506 if effective_happiness < self.servers_of_happiness:
507 msg = failure_message(len(self.serverids_with_shares),
509 self.servers_of_happiness,
511 msg = ("server selection failed for %s: %s (%s)" %
512 (self, msg, self._get_progress_message()))
513 if self.last_failure_msg:
514 msg += " (%s)" % (self.last_failure_msg,)
515 self.log(msg, level=log.UNUSUAL)
516 return self._failed(msg)
518 # we placed enough to be happy, so we're done
520 self._status.set_status("Placed all shares")
521 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
522 self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
523 self.log(msg, level=log.OPERATIONAL)
524 return (self.use_trackers, self.preexisting_shares)
526 def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
527 if isinstance(res, failure.Failure):
528 # This is unusual, and probably indicates a bug or a network
530 self.log("%s got error during server selection: %s" % (tracker, res),
532 self.error_count += 1
533 self.bad_query_count += 1
534 self.homeless_shares |= shares_to_ask
535 if (self.first_pass_trackers
536 or self.second_pass_trackers
537 or self.next_pass_trackers):
538 # there is still hope, so just loop
541 # No more servers, so this upload might fail (it depends upon
542 # whether we've hit servers_of_happiness or not). Log the last
543 # failure we got: if a coding error causes all servers to fail
544 # in the same way, this allows the common failure to be seen
545 # by the uploader and should help with debugging
546 msg = ("last failure (from %s) was: %s" % (tracker, res))
547 self.last_failure_msg = msg
549 (alreadygot, allocated) = res
550 self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
551 % (tracker.get_name(),
552 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
556 self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
557 if s in self.homeless_shares:
558 self.homeless_shares.remove(s)
560 elif s in shares_to_ask:
563 # the ServerTracker will remember which shares were allocated on
564 # that peer. We just have to remember to use them.
566 self.use_trackers.add(tracker)
569 if allocated or alreadygot:
570 self.serverids_with_shares.add(tracker.get_serverid())
572 not_yet_present = set(shares_to_ask) - set(alreadygot)
573 still_homeless = not_yet_present - set(allocated)
576 # They accepted at least one of the shares that we asked
577 # them to accept, or they had a share that we didn't ask
578 # them to accept but that we hadn't placed yet, so this
579 # was a productive query
580 self.good_query_count += 1
582 self.bad_query_count += 1
586 # In networks with lots of space, this is very unusual and
587 # probably indicates an error. In networks with servers that
588 # are full, it is merely unusual. In networks that are very
589 # full, it is common, and many uploads will fail. In most
590 # cases, this is obviously not fatal, and we'll just use some
593 # some shares are still homeless, keep trying to find them a
594 # home. The ones that were rejected get first priority.
595 self.homeless_shares |= still_homeless
596 # Since they were unable to accept all of our requests, so it
597 # is safe to assume that asking them again won't help.
599 # if they *were* able to accept everything, they might be
600 # willing to accept even more.
601 put_tracker_here.append(tracker)
607 def _failed(self, msg):
609 I am called when server selection fails. I first abort all of the
610 remote buckets that I allocated during my unsuccessful attempt to
611 place shares for this file. I then raise an
612 UploadUnhappinessError with my msg argument.
614 for tracker in self.use_trackers:
615 assert isinstance(tracker, ServerTracker)
617 raise UploadUnhappinessError(msg)
620 class EncryptAnUploadable:
621 """This is a wrapper that takes an IUploadable and provides
622 IEncryptedUploadable."""
623 implements(IEncryptedUploadable)
626 def __init__(self, original, log_parent=None):
627 self.original = IUploadable(original)
628 self._log_number = log_parent
629 self._encryptor = None
630 self._plaintext_hasher = plaintext_hasher()
631 self._plaintext_segment_hasher = None
632 self._plaintext_segment_hashes = []
633 self._encoding_parameters = None
634 self._file_size = None
635 self._ciphertext_bytes_read = 0
638 def set_upload_status(self, upload_status):
639 self._status = IUploadStatus(upload_status)
640 self.original.set_upload_status(upload_status)
642 def log(self, *args, **kwargs):
643 if "facility" not in kwargs:
644 kwargs["facility"] = "upload.encryption"
645 if "parent" not in kwargs:
646 kwargs["parent"] = self._log_number
647 return log.msg(*args, **kwargs)
650 if self._file_size is not None:
651 return defer.succeed(self._file_size)
652 d = self.original.get_size()
654 self._file_size = size
656 self._status.set_size(size)
658 d.addCallback(_got_size)
661 def get_all_encoding_parameters(self):
662 if self._encoding_parameters is not None:
663 return defer.succeed(self._encoding_parameters)
664 d = self.original.get_all_encoding_parameters()
665 def _got(encoding_parameters):
666 (k, happy, n, segsize) = encoding_parameters
667 self._segment_size = segsize # used by segment hashers
668 self._encoding_parameters = encoding_parameters
669 self.log("my encoding parameters: %s" % (encoding_parameters,),
671 return encoding_parameters
675 def _get_encryptor(self):
677 return defer.succeed(self._encryptor)
679 d = self.original.get_encryption_key()
684 storage_index = storage_index_hash(key)
685 assert isinstance(storage_index, str)
686 # There's no point to having the SI be longer than the key, so we
687 # specify that it is truncated to the same 128 bits as the AES key.
688 assert len(storage_index) == 16 # SHA-256 truncated to 128b
689 self._storage_index = storage_index
691 self._status.set_storage_index(storage_index)
696 def get_storage_index(self):
697 d = self._get_encryptor()
698 d.addCallback(lambda res: self._storage_index)
701 def _get_segment_hasher(self):
702 p = self._plaintext_segment_hasher
704 left = self._segment_size - self._plaintext_segment_hashed_bytes
706 p = plaintext_segment_hasher()
707 self._plaintext_segment_hasher = p
708 self._plaintext_segment_hashed_bytes = 0
709 return p, self._segment_size
711 def _update_segment_hash(self, chunk):
713 while offset < len(chunk):
714 p, segment_left = self._get_segment_hasher()
715 chunk_left = len(chunk) - offset
716 this_segment = min(chunk_left, segment_left)
717 p.update(chunk[offset:offset+this_segment])
718 self._plaintext_segment_hashed_bytes += this_segment
720 if self._plaintext_segment_hashed_bytes == self._segment_size:
721 # we've filled this segment
722 self._plaintext_segment_hashes.append(p.digest())
723 self._plaintext_segment_hasher = None
724 self.log("closed hash [%d]: %dB" %
725 (len(self._plaintext_segment_hashes)-1,
726 self._plaintext_segment_hashed_bytes),
728 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
729 segnum=len(self._plaintext_segment_hashes)-1,
730 hash=base32.b2a(p.digest()),
733 offset += this_segment
736 def read_encrypted(self, length, hash_only):
737 # make sure our parameters have been set up first
738 d = self.get_all_encoding_parameters()
740 d.addCallback(lambda ignored: self.get_size())
741 d.addCallback(lambda ignored: self._get_encryptor())
742 # then fetch and encrypt the plaintext. The unusual structure here
743 # (passing a Deferred *into* a function) is needed to avoid
744 # overflowing the stack: Deferreds don't optimize out tail recursion.
745 # We also pass in a list, to which _read_encrypted will append
748 d2 = defer.Deferred()
749 d.addCallback(lambda ignored:
750 self._read_encrypted(length, ciphertext, hash_only, d2))
751 d.addCallback(lambda ignored: d2)
754 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
756 fire_when_done.callback(ciphertext)
758 # tolerate large length= values without consuming a lot of RAM by
759 # reading just a chunk (say 50kB) at a time. This only really matters
760 # when hash_only==True (i.e. resuming an interrupted upload), since
761 # that's the case where we will be skipping over a lot of data.
762 size = min(remaining, self.CHUNKSIZE)
763 remaining = remaining - size
764 # read a chunk of plaintext..
765 d = defer.maybeDeferred(self.original.read, size)
766 # N.B.: if read() is synchronous, then since everything else is
767 # actually synchronous too, we'd blow the stack unless we stall for a
768 # tick. Once you accept a Deferred from IUploadable.read(), you must
769 # be prepared to have it fire immediately too.
770 d.addCallback(fireEventually)
771 def _good(plaintext):
773 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
774 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
775 ciphertext.extend(ct)
776 self._read_encrypted(remaining, ciphertext, hash_only,
779 fire_when_done.errback(why)
784 def _hash_and_encrypt_plaintext(self, data, hash_only):
785 assert isinstance(data, (tuple, list)), type(data)
788 # we use data.pop(0) instead of 'for chunk in data' to save
789 # memory: each chunk is destroyed as soon as we're done with it.
793 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
795 bytes_processed += len(chunk)
796 self._plaintext_hasher.update(chunk)
797 self._update_segment_hash(chunk)
798 # TODO: we have to encrypt the data (even if hash_only==True)
799 # because pycryptopp's AES-CTR implementation doesn't offer a
800 # way to change the counter value. Once pycryptopp acquires
801 # this ability, change this to simply update the counter
802 # before each call to (hash_only==False) _encryptor.process()
803 ciphertext = self._encryptor.process(chunk)
805 self.log(" skipping encryption", level=log.NOISY)
807 cryptdata.append(ciphertext)
810 self._ciphertext_bytes_read += bytes_processed
812 progress = float(self._ciphertext_bytes_read) / self._file_size
813 self._status.set_progress(1, progress)
817 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
818 # this is currently unused, but will live again when we fix #453
819 if len(self._plaintext_segment_hashes) < num_segments:
820 # close out the last one
821 assert len(self._plaintext_segment_hashes) == num_segments-1
822 p, segment_left = self._get_segment_hasher()
823 self._plaintext_segment_hashes.append(p.digest())
824 del self._plaintext_segment_hasher
825 self.log("closing plaintext leaf hasher, hashed %d bytes" %
826 self._plaintext_segment_hashed_bytes,
828 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
829 segnum=len(self._plaintext_segment_hashes)-1,
830 hash=base32.b2a(p.digest()),
832 assert len(self._plaintext_segment_hashes) == num_segments
833 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
835 def get_plaintext_hash(self):
836 h = self._plaintext_hasher.digest()
837 return defer.succeed(h)
840 return self.original.close()
843 implements(IUploadStatus)
844 statusid_counter = itertools.count(0)
847 self.storage_index = None
850 self.status = "Not started"
851 self.progress = [0.0, 0.0, 0.0]
854 self.counter = self.statusid_counter.next()
855 self.started = time.time()
857 def get_started(self):
859 def get_storage_index(self):
860 return self.storage_index
863 def using_helper(self):
865 def get_status(self):
867 def get_progress(self):
868 return tuple(self.progress)
869 def get_active(self):
871 def get_results(self):
873 def get_counter(self):
876 def set_storage_index(self, si):
877 self.storage_index = si
878 def set_size(self, size):
880 def set_helper(self, helper):
882 def set_status(self, status):
884 def set_progress(self, which, value):
885 # [0]: chk, [1]: ciphertext, [2]: encode+push
886 self.progress[which] = value
887 def set_active(self, value):
889 def set_results(self, value):
893 server_selector_class = Tahoe2ServerSelector
895 def __init__(self, storage_broker, secret_holder):
896 # server_selector needs storage_broker and secret_holder
897 self._storage_broker = storage_broker
898 self._secret_holder = secret_holder
899 self._log_number = self.log("CHKUploader starting", parent=None)
901 self._storage_index = None
902 self._upload_status = UploadStatus()
903 self._upload_status.set_helper(False)
904 self._upload_status.set_active(True)
906 # locate_all_shareholders() will create the following attribute:
907 # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
909 def log(self, *args, **kwargs):
910 if "parent" not in kwargs:
911 kwargs["parent"] = self._log_number
912 if "facility" not in kwargs:
913 kwargs["facility"] = "tahoe.upload"
914 return log.msg(*args, **kwargs)
916 def start(self, encrypted_uploadable):
917 """Start uploading the file.
919 Returns a Deferred that will fire with the UploadResults instance.
922 self._started = time.time()
923 eu = IEncryptedUploadable(encrypted_uploadable)
924 self.log("starting upload of %s" % eu)
926 eu.set_upload_status(self._upload_status)
927 d = self.start_encrypted(eu)
928 def _done(uploadresults):
929 self._upload_status.set_active(False)
935 """Call this if the upload must be abandoned before it completes.
936 This will tell the shareholders to delete their partial shares. I
937 return a Deferred that fires when these messages have been acked."""
938 if not self._encoder:
939 # how did you call abort() before calling start() ?
940 return defer.succeed(None)
941 return self._encoder.abort()
943 def start_encrypted(self, encrypted):
944 """ Returns a Deferred that will fire with the UploadResults instance. """
945 eu = IEncryptedUploadable(encrypted)
947 started = time.time()
948 self._encoder = e = encode.Encoder(self._log_number,
950 d = e.set_encrypted_uploadable(eu)
951 d.addCallback(self.locate_all_shareholders, started)
952 d.addCallback(self.set_shareholders, e)
953 d.addCallback(lambda res: e.start())
954 d.addCallback(self._encrypted_done)
957 def locate_all_shareholders(self, encoder, started):
958 server_selection_started = now = time.time()
959 self._storage_index_elapsed = now - started
960 storage_broker = self._storage_broker
961 secret_holder = self._secret_holder
962 storage_index = encoder.get_param("storage_index")
963 self._storage_index = storage_index
964 upload_id = si_b2a(storage_index)[:5]
965 self.log("using storage index %s" % upload_id)
966 server_selector = self.server_selector_class(upload_id,
970 share_size = encoder.get_param("share_size")
971 block_size = encoder.get_param("block_size")
972 num_segments = encoder.get_param("num_segments")
973 k,desired,n = encoder.get_param("share_counts")
975 self._server_selection_started = time.time()
976 d = server_selector.get_shareholders(storage_broker, secret_holder,
978 share_size, block_size,
979 num_segments, n, k, desired)
981 self._server_selection_elapsed = time.time() - server_selection_started
986 def set_shareholders(self, (upload_trackers, already_serverids), encoder):
988 @param upload_trackers: a sequence of ServerTracker objects that
989 have agreed to hold some shares for us (the
990 shareids are stashed inside the ServerTracker)
992 @paran already_serverids: a dict mapping sharenum to a set of
993 serverids for servers that claim to already
996 msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
997 values = ([', '.join([str_shareloc(k,v)
998 for k,v in st.buckets.iteritems()])
999 for st in upload_trackers], already_serverids)
1000 self.log(msgtempl % values, level=log.OPERATIONAL)
1001 # record already-present shares in self._results
1002 self._count_preexisting_shares = len(already_serverids)
1004 self._server_trackers = {} # k: shnum, v: instance of ServerTracker
1005 for tracker in upload_trackers:
1006 assert isinstance(tracker, ServerTracker)
1008 servermap = already_serverids.copy()
1009 for tracker in upload_trackers:
1010 buckets.update(tracker.buckets)
1011 for shnum in tracker.buckets:
1012 self._server_trackers[shnum] = tracker
1013 servermap.setdefault(shnum, set()).add(tracker.get_serverid())
1014 assert len(buckets) == sum([len(tracker.buckets)
1015 for tracker in upload_trackers]), \
1016 "%s (%s) != %s (%s)" % (
1019 sum([len(tracker.buckets) for tracker in upload_trackers]),
1020 [(t.buckets, t.get_serverid()) for t in upload_trackers]
1022 encoder.set_shareholders(buckets, servermap)
1024 def _encrypted_done(self, verifycap):
1025 """Returns a Deferred that will fire with the UploadResults instance."""
1027 sharemap = dictutil.DictOfSets()
1028 servermap = dictutil.DictOfSets()
1029 for shnum in e.get_shares_placed():
1030 server = self._server_trackers[shnum].get_server()
1031 sharemap.add(shnum, server)
1032 servermap.add(server, shnum)
1035 timings["total"] = now - self._started
1036 timings["storage_index"] = self._storage_index_elapsed
1037 timings["peer_selection"] = self._server_selection_elapsed
1038 timings.update(e.get_times())
1039 ur = UploadResults(file_size=e.file_size,
1040 ciphertext_fetched=0,
1041 preexisting_shares=self._count_preexisting_shares,
1042 pushed_shares=len(e.get_shares_placed()),
1044 servermap=servermap,
1046 uri_extension_data=e.get_uri_extension_data(),
1047 uri_extension_hash=e.get_uri_extension_hash(),
1048 verifycapstr=verifycap.to_string())
1049 self._upload_status.set_results(ur)
1052 def get_upload_status(self):
1053 return self._upload_status
1055 def read_this_many_bytes(uploadable, size, prepend_data=[]):
1057 return defer.succeed([])
1058 d = uploadable.read(size)
1060 assert isinstance(data, list)
1061 bytes = sum([len(piece) for piece in data])
1063 assert bytes <= size
1064 remaining = size - bytes
1066 return read_this_many_bytes(uploadable, remaining,
1067 prepend_data + data)
1068 return prepend_data + data
1072 class LiteralUploader:
1075 self._status = s = UploadStatus()
1076 s.set_storage_index(None)
1078 s.set_progress(0, 1.0)
1081 def start(self, uploadable):
1082 uploadable = IUploadable(uploadable)
1083 d = uploadable.get_size()
1084 def _got_size(size):
1086 self._status.set_size(size)
1087 return read_this_many_bytes(uploadable, size)
1088 d.addCallback(_got_size)
1089 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
1090 d.addCallback(lambda u: u.to_string())
1091 d.addCallback(self._build_results)
1094 def _build_results(self, uri):
1095 ur = UploadResults(file_size=self._size,
1096 ciphertext_fetched=0,
1097 preexisting_shares=0,
1102 uri_extension_data=None,
1103 uri_extension_hash=None,
1106 self._status.set_status("Finished")
1107 self._status.set_progress(1, 1.0)
1108 self._status.set_progress(2, 1.0)
1109 self._status.set_results(ur)
1115 def get_upload_status(self):
1118 class RemoteEncryptedUploadable(Referenceable):
1119 implements(RIEncryptedUploadable)
1121 def __init__(self, encrypted_uploadable, upload_status):
1122 self._eu = IEncryptedUploadable(encrypted_uploadable)
1124 self._bytes_sent = 0
1125 self._status = IUploadStatus(upload_status)
1126 # we are responsible for updating the status string while we run, and
1127 # for setting the ciphertext-fetch progress.
1131 if self._size is not None:
1132 return defer.succeed(self._size)
1133 d = self._eu.get_size()
1134 def _got_size(size):
1137 d.addCallback(_got_size)
1140 def remote_get_size(self):
1141 return self.get_size()
1142 def remote_get_all_encoding_parameters(self):
1143 return self._eu.get_all_encoding_parameters()
1145 def _read_encrypted(self, length, hash_only):
1146 d = self._eu.read_encrypted(length, hash_only)
1149 self._offset += length
1151 size = sum([len(data) for data in strings])
1152 self._offset += size
1154 d.addCallback(_read)
1157 def remote_read_encrypted(self, offset, length):
1158 # we don't support seek backwards, but we allow skipping forwards
1159 precondition(offset >= 0, offset)
1160 precondition(length >= 0, length)
1161 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1163 precondition(offset >= self._offset, offset, self._offset)
1164 if offset > self._offset:
1165 # read the data from disk anyways, to build up the hash tree
1166 skip = offset - self._offset
1167 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1168 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1169 d = self._read_encrypted(skip, hash_only=True)
1171 d = defer.succeed(None)
1173 def _at_correct_offset(res):
1174 assert offset == self._offset, "%d != %d" % (offset, self._offset)
1175 return self._read_encrypted(length, hash_only=False)
1176 d.addCallback(_at_correct_offset)
1179 size = sum([len(data) for data in strings])
1180 self._bytes_sent += size
1182 d.addCallback(_read)
1185 def remote_close(self):
1186 return self._eu.close()
1189 class AssistedUploader:
1191 def __init__(self, helper, storage_broker):
1192 self._helper = helper
1193 self._storage_broker = storage_broker
1194 self._log_number = log.msg("AssistedUploader starting")
1195 self._storage_index = None
1196 self._upload_status = s = UploadStatus()
1200 def log(self, *args, **kwargs):
1201 if "parent" not in kwargs:
1202 kwargs["parent"] = self._log_number
1203 return log.msg(*args, **kwargs)
1205 def start(self, encrypted_uploadable, storage_index):
1206 """Start uploading the file.
1208 Returns a Deferred that will fire with the UploadResults instance.
1210 precondition(isinstance(storage_index, str), storage_index)
1211 self._started = time.time()
1212 eu = IEncryptedUploadable(encrypted_uploadable)
1213 eu.set_upload_status(self._upload_status)
1214 self._encuploadable = eu
1215 self._storage_index = storage_index
1217 d.addCallback(self._got_size)
1218 d.addCallback(lambda res: eu.get_all_encoding_parameters())
1219 d.addCallback(self._got_all_encoding_parameters)
1220 d.addCallback(self._contact_helper)
1221 d.addCallback(self._build_verifycap)
1223 self._upload_status.set_active(False)
1228 def _got_size(self, size):
1230 self._upload_status.set_size(size)
1232 def _got_all_encoding_parameters(self, params):
1233 k, happy, n, segment_size = params
1234 # stash these for URI generation later
1235 self._needed_shares = k
1236 self._total_shares = n
1237 self._segment_size = segment_size
1239 def _contact_helper(self, res):
1240 now = self._time_contacting_helper_start = time.time()
1241 self._storage_index_elapsed = now - self._started
1242 self.log(format="contacting helper for SI %(si)s..",
1243 si=si_b2a(self._storage_index), level=log.NOISY)
1244 self._upload_status.set_status("Contacting Helper")
1245 d = self._helper.callRemote("upload_chk", self._storage_index)
1246 d.addCallback(self._contacted_helper)
1249 def _contacted_helper(self, (helper_upload_results, upload_helper)):
1251 elapsed = now - self._time_contacting_helper_start
1252 self._elapsed_time_contacting_helper = elapsed
1254 self.log("helper says we need to upload", level=log.NOISY)
1255 self._upload_status.set_status("Uploading Ciphertext")
1256 # we need to upload the file
1257 reu = RemoteEncryptedUploadable(self._encuploadable,
1258 self._upload_status)
1259 # let it pre-compute the size for progress purposes
1261 d.addCallback(lambda ignored:
1262 upload_helper.callRemote("upload", reu))
1263 # this Deferred will fire with the upload results
1265 self.log("helper says file is already uploaded", level=log.OPERATIONAL)
1266 self._upload_status.set_progress(1, 1.0)
1267 return helper_upload_results
1269 def _convert_old_upload_results(self, upload_results):
1270 # pre-1.3.0 helpers return upload results which contain a mapping
1271 # from shnum to a single human-readable string, containing things
1272 # like "Found on [x],[y],[z]" (for healthy files that were already in
1273 # the grid), "Found on [x]" (for files that needed upload but which
1274 # discovered pre-existing shares), and "Placed on [x]" (for newly
1275 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1276 # set of binary serverid strings.
1278 # the old results are too hard to deal with (they don't even contain
1279 # as much information as the new results, since the nodeids are
1280 # abbreviated), so if we detect old results, just clobber them.
1282 sharemap = upload_results.sharemap
1283 if str in [type(v) for v in sharemap.values()]:
1284 upload_results.sharemap = None
1286 def _build_verifycap(self, helper_upload_results):
1287 self.log("upload finished, building readcap", level=log.OPERATIONAL)
1288 self._convert_old_upload_results(helper_upload_results)
1289 self._upload_status.set_status("Building Readcap")
1290 hur = helper_upload_results
1291 assert hur.uri_extension_data["needed_shares"] == self._needed_shares
1292 assert hur.uri_extension_data["total_shares"] == self._total_shares
1293 assert hur.uri_extension_data["segment_size"] == self._segment_size
1294 assert hur.uri_extension_data["size"] == self._size
1296 # hur.verifycap doesn't exist if already found
1297 v = uri.CHKFileVerifierURI(self._storage_index,
1298 uri_extension_hash=hur.uri_extension_hash,
1299 needed_shares=self._needed_shares,
1300 total_shares=self._total_shares,
1303 timings["storage_index"] = self._storage_index_elapsed
1304 timings["contacting_helper"] = self._elapsed_time_contacting_helper
1305 for key,val in hur.timings.items():
1307 key = "helper_total"
1310 timings["total"] = now - self._started
1312 gss = self._storage_broker.get_stub_server
1315 for shnum, serverids in hur.sharemap.items():
1316 sharemap[shnum] = set([gss(serverid) for serverid in serverids])
1317 # if the file was already in the grid, hur.servermap is an empty dict
1318 for serverid, shnums in hur.servermap.items():
1319 servermap[gss(serverid)] = set(shnums)
1321 ur = UploadResults(file_size=self._size,
1322 # not if already found
1323 ciphertext_fetched=hur.ciphertext_fetched,
1324 preexisting_shares=hur.preexisting_shares,
1325 pushed_shares=hur.pushed_shares,
1327 servermap=servermap,
1329 uri_extension_data=hur.uri_extension_data,
1330 uri_extension_hash=hur.uri_extension_hash,
1331 verifycapstr=v.to_string())
1333 self._upload_status.set_status("Finished")
1334 self._upload_status.set_results(ur)
1337 def get_upload_status(self):
1338 return self._upload_status
1340 class BaseUploadable:
1341 # this is overridden by max_segment_size
1342 default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
1343 default_encoding_param_k = 3 # overridden by encoding_parameters
1344 default_encoding_param_happy = 7
1345 default_encoding_param_n = 10
1347 max_segment_size = None
1348 encoding_param_k = None
1349 encoding_param_happy = None
1350 encoding_param_n = None
1352 _all_encoding_parameters = None
1355 def set_upload_status(self, upload_status):
1356 self._status = IUploadStatus(upload_status)
1358 def set_default_encoding_parameters(self, default_params):
1359 assert isinstance(default_params, dict)
1360 for k,v in default_params.items():
1361 precondition(isinstance(k, str), k, v)
1362 precondition(isinstance(v, int), k, v)
1363 if "k" in default_params:
1364 self.default_encoding_param_k = default_params["k"]
1365 if "happy" in default_params:
1366 self.default_encoding_param_happy = default_params["happy"]
1367 if "n" in default_params:
1368 self.default_encoding_param_n = default_params["n"]
1369 if "max_segment_size" in default_params:
1370 self.default_max_segment_size = default_params["max_segment_size"]
1372 def get_all_encoding_parameters(self):
1373 if self._all_encoding_parameters:
1374 return defer.succeed(self._all_encoding_parameters)
1376 max_segsize = self.max_segment_size or self.default_max_segment_size
1377 k = self.encoding_param_k or self.default_encoding_param_k
1378 happy = self.encoding_param_happy or self.default_encoding_param_happy
1379 n = self.encoding_param_n or self.default_encoding_param_n
1382 def _got_size(file_size):
1383 # for small files, shrink the segment size to avoid wasting space
1384 segsize = min(max_segsize, file_size)
1385 # this must be a multiple of 'required_shares'==k
1386 segsize = mathutil.next_multiple(segsize, k)
1387 encoding_parameters = (k, happy, n, segsize)
1388 self._all_encoding_parameters = encoding_parameters
1389 return encoding_parameters
1390 d.addCallback(_got_size)
1393 class FileHandle(BaseUploadable):
1394 implements(IUploadable)
1396 def __init__(self, filehandle, convergence):
1398 Upload the data from the filehandle. If convergence is None then a
1399 random encryption key will be used, else the plaintext will be hashed,
1400 then the hash will be hashed together with the string in the
1401 "convergence" argument to form the encryption key.
1403 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1404 self._filehandle = filehandle
1406 self.convergence = convergence
1409 def _get_encryption_key_convergent(self):
1410 if self._key is not None:
1411 return defer.succeed(self._key)
1414 # that sets self._size as a side-effect
1415 d.addCallback(lambda size: self.get_all_encoding_parameters())
1417 k, happy, n, segsize = params
1418 f = self._filehandle
1419 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1424 data = f.read(BLOCKSIZE)
1427 enckey_hasher.update(data)
1428 # TODO: setting progress in a non-yielding loop is kind of
1429 # pointless, but I'm anticipating (perhaps prematurely) the
1430 # day when we use a slowjob or twisted's CooperatorService to
1431 # make this yield time to other jobs.
1432 bytes_read += len(data)
1434 self._status.set_progress(0, float(bytes_read)/self._size)
1436 self._key = enckey_hasher.digest()
1438 self._status.set_progress(0, 1.0)
1439 assert len(self._key) == 16
1444 def _get_encryption_key_random(self):
1445 if self._key is None:
1446 self._key = os.urandom(16)
1447 return defer.succeed(self._key)
1449 def get_encryption_key(self):
1450 if self.convergence is not None:
1451 return self._get_encryption_key_convergent()
1453 return self._get_encryption_key_random()
1456 if self._size is not None:
1457 return defer.succeed(self._size)
1458 self._filehandle.seek(0, os.SEEK_END)
1459 size = self._filehandle.tell()
1461 self._filehandle.seek(0)
1462 return defer.succeed(size)
1464 def read(self, length):
1465 return defer.succeed([self._filehandle.read(length)])
1468 # the originator of the filehandle reserves the right to close it
1471 class FileName(FileHandle):
1472 def __init__(self, filename, convergence):
1474 Upload the data from the filename. If convergence is None then a
1475 random encryption key will be used, else the plaintext will be hashed,
1476 then the hash will be hashed together with the string in the
1477 "convergence" argument to form the encryption key.
1479 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1480 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1482 FileHandle.close(self)
1483 self._filehandle.close()
1485 class Data(FileHandle):
1486 def __init__(self, data, convergence):
1488 Upload the data from the data argument. If convergence is None then a
1489 random encryption key will be used, else the plaintext will be hashed,
1490 then the hash will be hashed together with the string in the
1491 "convergence" argument to form the encryption key.
1493 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1494 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1496 class Uploader(service.MultiService, log.PrefixingLogMixin):
1497 """I am a service that allows file uploading. I am a service-child of the
1500 implements(IUploader)
1502 URI_LIT_SIZE_THRESHOLD = 55
1504 def __init__(self, helper_furl=None, stats_provider=None, history=None):
1505 self._helper_furl = helper_furl
1506 self.stats_provider = stats_provider
1507 self._history = history
1509 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1510 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1511 service.MultiService.__init__(self)
1513 def startService(self):
1514 service.MultiService.startService(self)
1515 if self._helper_furl:
1516 self.parent.tub.connectTo(self._helper_furl,
1519 def _got_helper(self, helper):
1520 self.log("got helper connection, getting versions")
1521 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1523 "application-version": "unknown: no get_version()",
1525 d = add_version_to_remote_reference(helper, default)
1526 d.addCallback(self._got_versioned_helper)
1528 def _got_versioned_helper(self, helper):
1529 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1530 if needed not in helper.version:
1531 raise InsufficientVersionError(needed, helper.version)
1532 self._helper = helper
1533 helper.notifyOnDisconnect(self._lost_helper)
1535 def _lost_helper(self):
1538 def get_helper_info(self):
1539 # return a tuple of (helper_furl_or_None, connected_bool)
1540 return (self._helper_furl, bool(self._helper))
1543 def upload(self, uploadable):
1545 Returns a Deferred that will fire with the UploadResults instance.
1550 uploadable = IUploadable(uploadable)
1551 d = uploadable.get_size()
1552 def _got_size(size):
1553 default_params = self.parent.get_encoding_parameters()
1554 precondition(isinstance(default_params, dict), default_params)
1555 precondition("max_segment_size" in default_params, default_params)
1556 uploadable.set_default_encoding_parameters(default_params)
1558 if self.stats_provider:
1559 self.stats_provider.count('uploader.files_uploaded', 1)
1560 self.stats_provider.count('uploader.bytes_uploaded', size)
1562 if size <= self.URI_LIT_SIZE_THRESHOLD:
1563 uploader = LiteralUploader()
1564 return uploader.start(uploadable)
1566 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1567 d2 = defer.succeed(None)
1568 storage_broker = self.parent.get_storage_broker()
1570 uploader = AssistedUploader(self._helper, storage_broker)
1571 d2.addCallback(lambda x: eu.get_storage_index())
1572 d2.addCallback(lambda si: uploader.start(eu, si))
1574 storage_broker = self.parent.get_storage_broker()
1575 secret_holder = self.parent._secret_holder
1576 uploader = CHKUploader(storage_broker, secret_holder)
1577 d2.addCallback(lambda x: uploader.start(eu))
1579 self._all_uploads[uploader] = None
1581 self._history.add_upload(uploader.get_upload_status())
1582 def turn_verifycap_into_read_cap(uploadresults):
1583 # Generate the uri from the verifycap plus the key.
1584 d3 = uploadable.get_encryption_key()
1585 def put_readcap_into_results(key):
1586 v = uri.from_string(uploadresults.get_verifycapstr())
1587 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1588 uploadresults.set_uri(r.to_string())
1589 return uploadresults
1590 d3.addCallback(put_readcap_into_results)
1592 d2.addCallback(turn_verifycap_into_read_cap)
1594 d.addCallback(_got_size)