1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9 file_cancel_secret_hash, bucket_renewal_secret_hash, \
10 bucket_cancel_secret_hash, plaintext_hasher, \
11 storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17 shares_by_server, merge_servers, \
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23 NoServersError, InsufficientVersionError, UploadUnhappinessError, \
24 DEFAULT_MAX_SEGMENT_SIZE
25 from allmydata.immutable import layout
26 from pycryptopp.cipher.aes import AES
28 from cStringIO import StringIO
31 # this wants to live in storage, not here
32 class TooFullError(Exception):
35 # HelperUploadResults are what we get from the Helper, and to retain
36 # backwards compatibility with old Helpers we can't change the format. We
37 # convert them into a local UploadResults upon receipt.
38 class HelperUploadResults(Copyable, RemoteCopy):
39 # note: don't change this string, it needs to match the value used on the
40 # helper, and it does *not* need to match the fully-qualified
41 # package/module/class name
42 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
45 # also, think twice about changing the shape of any existing attribute,
46 # because instances of this class are sent from the helper to its client,
47 # so changing this may break compatibility. Consider adding new fields
48 # instead of modifying existing ones.
51 self.timings = {} # dict of name to number of seconds
52 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
53 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
55 self.ciphertext_fetched = None # how much the helper fetched
57 self.preexisting_shares = None # count of shares already present
58 self.pushed_shares = None # count of shares we pushed
61 implements(IUploadResults)
63 def __init__(self, file_size,
64 ciphertext_fetched, # how much the helper fetched
65 preexisting_shares, # count of shares already present
66 pushed_shares, # count of shares we pushed
67 sharemap, # {shnum: set(serverid)}
68 servermap, # {serverid: set(shnum)}
69 timings, # dict of name to number of seconds
73 self._file_size = file_size
74 self._ciphertext_fetched = ciphertext_fetched
75 self._preexisting_shares = preexisting_shares
76 self._pushed_shares = pushed_shares
77 self._sharemap = sharemap
78 self._servermap = servermap
79 self._timings = timings
80 self._uri_extension_data = uri_extension_data
81 self._uri_extension_hash = uri_extension_hash
82 self._verifycapstr = verifycapstr
85 def set_uri(self, uri):
88 def get_file_size(self):
89 return self._file_size
90 def get_ciphertext_fetched(self):
91 return self._ciphertext_fetched
92 def get_preexisting_shares(self):
93 return self._preexisting_shares
94 def get_pushed_shares(self):
95 return self._pushed_shares
96 def get_sharemap(self):
98 def get_servermap(self):
99 return self._servermap
100 def get_timings(self):
102 def get_uri_extension_data(self):
103 return self._uri_extension_data
104 def get_verifycapstr(self):
105 return self._verifycapstr
107 # our current uri_extension is 846 bytes for small files, a few bytes
108 # more for larger ones (since the filesize is encoded in decimal in a
109 # few places). Ask for a little bit more just in case we need it. If
110 # the extension changes size, we can change EXTENSION_SIZE to
111 # allocate a more accurate amount of space.
112 EXTENSION_SIZE = 1000
113 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
116 def pretty_print_shnum_to_servers(s):
117 return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
120 def __init__(self, server,
121 sharesize, blocksize, num_segments, num_share_hashes,
123 bucket_renewal_secret, bucket_cancel_secret):
124 self._server = server
125 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
126 self.sharesize = sharesize
128 wbp = layout.make_write_bucket_proxy(None, None, sharesize,
129 blocksize, num_segments,
132 self.wbp_class = wbp.__class__ # to create more of them
133 self.allocated_size = wbp.get_allocated_size()
134 self.blocksize = blocksize
135 self.num_segments = num_segments
136 self.num_share_hashes = num_share_hashes
137 self.storage_index = storage_index
139 self.renew_secret = bucket_renewal_secret
140 self.cancel_secret = bucket_cancel_secret
143 return ("<ServerTracker for server %s and SI %s>"
144 % (self._server.get_name(), si_b2a(self.storage_index)[:5]))
146 def get_serverid(self):
147 return self._server.get_serverid()
149 return self._server.get_name()
151 def query(self, sharenums):
152 rref = self._server.get_rref()
153 d = rref.callRemote("allocate_buckets",
159 canary=Referenceable())
160 d.addCallback(self._got_reply)
163 def ask_about_existing_shares(self):
164 rref = self._server.get_rref()
165 return rref.callRemote("get_buckets", self.storage_index)
167 def _got_reply(self, (alreadygot, buckets)):
168 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
170 for sharenum, rref in buckets.iteritems():
171 bp = self.wbp_class(rref, self._server, self.sharesize,
174 self.num_share_hashes,
177 self.buckets.update(b)
178 return (alreadygot, set(b.keys()))
183 I abort the remote bucket writers for all shares. This is a good idea
184 to conserve space on the storage server.
186 self.abort_some_buckets(self.buckets.keys())
188 def abort_some_buckets(self, sharenums):
190 I abort the remote bucket writers for the share numbers in sharenums.
192 for sharenum in sharenums:
193 if sharenum in self.buckets:
194 self.buckets[sharenum].abort()
195 del self.buckets[sharenum]
198 def str_shareloc(shnum, bucketwriter):
199 return "%s: %s" % (shnum, bucketwriter.get_servername(),)
201 class Tahoe2ServerSelector(log.PrefixingLogMixin):
203 def __init__(self, upload_id, logparent=None, upload_status=None):
204 self.upload_id = upload_id
205 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
206 # Servers that are working normally, but full.
209 self.num_servers_contacted = 0
210 self.last_failure_msg = None
211 self._status = IUploadStatus(upload_status)
212 log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
213 self.log("starting", level=log.OPERATIONAL)
216 return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
218 def get_shareholders(self, storage_broker, secret_holder,
219 storage_index, share_size, block_size,
220 num_segments, total_shares, needed_shares,
221 servers_of_happiness):
223 @return: (upload_trackers, already_serverids), where upload_trackers
224 is a set of ServerTracker instances that have agreed to hold
225 some shares for us (the shareids are stashed inside the
226 ServerTracker), and already_serverids is a dict mapping
227 shnum to a set of serverids for servers which claim to
228 already have the share.
232 self._status.set_status("Contacting Servers..")
234 self.total_shares = total_shares
235 self.servers_of_happiness = servers_of_happiness
236 self.needed_shares = needed_shares
238 self.homeless_shares = set(range(total_shares))
239 self.use_trackers = set() # ServerTrackers that have shares assigned
241 self.preexisting_shares = {} # shareid => set(serverids) holding shareid
243 # These servers have shares -- any shares -- for our SI. We keep
244 # track of these to write an error message with them later.
245 self.serverids_with_shares = set()
247 # this needed_hashes computation should mirror
248 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
249 # (instead of a HashTree) because we don't require actual hashing
250 # just to count the levels.
251 ht = hashtree.IncompleteHashTree(total_shares)
252 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
254 # figure out how much space to ask for
255 wbp = layout.make_write_bucket_proxy(None, None,
256 share_size, 0, num_segments,
257 num_share_hashes, EXTENSION_SIZE)
258 allocated_size = wbp.get_allocated_size()
259 all_servers = storage_broker.get_servers_for_psi(storage_index)
261 raise NoServersError("client gave us zero servers")
263 # filter the list of servers according to which ones can accomodate
264 # this request. This excludes older servers (which used a 4-byte size
265 # field) from getting large shares (for files larger than about
266 # 12GiB). See #439 for details.
267 def _get_maxsize(server):
268 v0 = server.get_rref().version
269 v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
270 return v1["maximum-immutable-share-size"]
271 writeable_servers = [server for server in all_servers
272 if _get_maxsize(server) >= allocated_size]
273 readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)
275 # decide upon the renewal/cancel secrets, to include them in the
276 # allocate_buckets query.
277 client_renewal_secret = secret_holder.get_renewal_secret()
278 client_cancel_secret = secret_holder.get_cancel_secret()
280 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
282 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
284 def _make_trackers(servers):
287 seed = s.get_lease_seed()
288 renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
289 cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
290 st = ServerTracker(s,
291 share_size, block_size,
292 num_segments, num_share_hashes,
298 # We assign each servers/trackers into one three lists. They all
299 # start in the "first pass" list. During the first pass, as we ask
300 # each one to hold a share, we move their tracker to the "second
301 # pass" list, until the first-pass list is empty. Then during the
302 # second pass, as we ask each to hold more shares, we move their
303 # tracker to the "next pass" list, until the second-pass list is
304 # empty. Then we move everybody from the next-pass list back to the
305 # second-pass list and repeat the "second" pass (really the third,
306 # fourth, etc pass), until all shares are assigned, or we've run out
307 # of potential servers.
308 self.first_pass_trackers = _make_trackers(writeable_servers)
309 self.second_pass_trackers = [] # servers worth asking again
310 self.next_pass_trackers = [] # servers that we have asked again
311 self._started_second_pass = False
313 # We don't try to allocate shares to these servers, since they've
314 # said that they're incapable of storing shares of the size that we'd
315 # want to store. We ask them about existing shares for this storage
316 # index, which we want to know about for accurate
317 # servers_of_happiness accounting, then we forget about them.
318 readonly_trackers = _make_trackers(readonly_servers)
320 # We now ask servers that can't hold any new shares about existing
321 # shares that they might have for our SI. Once this is done, we
322 # start placing the shares that we haven't already accounted
325 if self._status and readonly_trackers:
326 self._status.set_status("Contacting readonly servers to find "
327 "any existing shares")
328 for tracker in readonly_trackers:
329 assert isinstance(tracker, ServerTracker)
330 d = tracker.ask_about_existing_shares()
331 d.addBoth(self._handle_existing_response, tracker)
333 self.num_servers_contacted += 1
334 self.query_count += 1
335 self.log("asking server %s for any existing shares" %
336 (tracker.get_name(),), level=log.NOISY)
337 dl = defer.DeferredList(ds)
338 dl.addCallback(lambda ign: self._loop())
342 def _handle_existing_response(self, res, tracker):
344 I handle responses to the queries sent by
345 Tahoe2ServerSelector._existing_shares.
347 serverid = tracker.get_serverid()
348 if isinstance(res, failure.Failure):
349 self.log("%s got error during existing shares check: %s"
350 % (tracker.get_name(), res), level=log.UNUSUAL)
351 self.error_count += 1
352 self.bad_query_count += 1
356 self.serverids_with_shares.add(serverid)
357 self.log("response to get_buckets() from server %s: alreadygot=%s"
358 % (tracker.get_name(), tuple(sorted(buckets))),
360 for bucket in buckets:
361 self.preexisting_shares.setdefault(bucket, set()).add(serverid)
362 self.homeless_shares.discard(bucket)
364 self.bad_query_count += 1
367 def _get_progress_message(self):
368 if not self.homeless_shares:
369 msg = "placed all %d shares, " % (self.total_shares)
371 msg = ("placed %d shares out of %d total (%d homeless), " %
372 (self.total_shares - len(self.homeless_shares),
374 len(self.homeless_shares)))
375 return (msg + "want to place shares on at least %d servers such that "
376 "any %d of them have enough shares to recover the file, "
377 "sent %d queries to %d servers, "
378 "%d queries placed some shares, %d placed none "
379 "(of which %d placed none due to the server being"
380 " full and %d placed none due to an error)" %
381 (self.servers_of_happiness, self.needed_shares,
382 self.query_count, self.num_servers_contacted,
383 self.good_query_count, self.bad_query_count,
384 self.full_count, self.error_count))
388 if not self.homeless_shares:
389 merged = merge_servers(self.preexisting_shares, self.use_trackers)
390 effective_happiness = servers_of_happiness(merged)
391 if self.servers_of_happiness <= effective_happiness:
392 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
393 "self.use_trackers: %s, self.preexisting_shares: %s") \
394 % (self, self._get_progress_message(),
395 pretty_print_shnum_to_servers(merged),
396 [', '.join([str_shareloc(k,v)
397 for k,v in st.buckets.iteritems()])
398 for st in self.use_trackers],
399 pretty_print_shnum_to_servers(self.preexisting_shares))
400 self.log(msg, level=log.OPERATIONAL)
401 return (self.use_trackers, self.preexisting_shares)
403 # We're not okay right now, but maybe we can fix it by
404 # redistributing some shares. In cases where one or two
405 # servers has, before the upload, all or most of the
406 # shares for a given SI, this can work by allowing _loop
407 # a chance to spread those out over the other servers,
408 delta = self.servers_of_happiness - effective_happiness
409 shares = shares_by_server(self.preexisting_shares)
410 # Each server in shares maps to a set of shares stored on it.
411 # Since we want to keep at least one share on each server
412 # that has one (otherwise we'd only be making
413 # the situation worse by removing distinct servers),
414 # each server has len(its shares) - 1 to spread around.
415 shares_to_spread = sum([len(list(sharelist)) - 1
416 for (server, sharelist)
418 if delta <= len(self.first_pass_trackers) and \
419 shares_to_spread >= delta:
420 items = shares.items()
421 while len(self.homeless_shares) < delta:
422 # Loop through the allocated shares, removing
423 # one from each server that has more than one
424 # and putting it back into self.homeless_shares
425 # until we've done this delta times.
426 server, sharelist = items.pop()
427 if len(sharelist) > 1:
428 share = sharelist.pop()
429 self.homeless_shares.add(share)
430 self.preexisting_shares[share].remove(server)
431 if not self.preexisting_shares[share]:
432 del self.preexisting_shares[share]
433 items.append((server, sharelist))
434 for writer in self.use_trackers:
435 writer.abort_some_buckets(self.homeless_shares)
438 # Redistribution won't help us; fail.
439 server_count = len(self.serverids_with_shares)
440 failmsg = failure_message(server_count,
442 self.servers_of_happiness,
444 servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
445 servmsg = servmsgtempl % (
448 self._get_progress_message(),
449 pretty_print_shnum_to_servers(merged)
451 self.log(servmsg, level=log.INFREQUENT)
452 return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
454 if self.first_pass_trackers:
455 tracker = self.first_pass_trackers.pop(0)
456 # TODO: don't pre-convert all serverids to ServerTrackers
457 assert isinstance(tracker, ServerTracker)
459 shares_to_ask = set(sorted(self.homeless_shares)[:1])
460 self.homeless_shares -= shares_to_ask
461 self.query_count += 1
462 self.num_servers_contacted += 1
464 self._status.set_status("Contacting Servers [%s] (first query),"
466 % (tracker.get_name(),
467 len(self.homeless_shares)))
468 d = tracker.query(shares_to_ask)
469 d.addBoth(self._got_response, tracker, shares_to_ask,
470 self.second_pass_trackers)
472 elif self.second_pass_trackers:
473 # ask a server that we've already asked.
474 if not self._started_second_pass:
475 self.log("starting second pass",
477 self._started_second_pass = True
478 num_shares = mathutil.div_ceil(len(self.homeless_shares),
479 len(self.second_pass_trackers))
480 tracker = self.second_pass_trackers.pop(0)
481 shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
482 self.homeless_shares -= shares_to_ask
483 self.query_count += 1
485 self._status.set_status("Contacting Servers [%s] (second query),"
487 % (tracker.get_name(),
488 len(self.homeless_shares)))
489 d = tracker.query(shares_to_ask)
490 d.addBoth(self._got_response, tracker, shares_to_ask,
491 self.next_pass_trackers)
493 elif self.next_pass_trackers:
494 # we've finished the second-or-later pass. Move all the remaining
495 # servers back into self.second_pass_trackers for the next pass.
496 self.second_pass_trackers.extend(self.next_pass_trackers)
497 self.next_pass_trackers[:] = []
500 # no more servers. If we haven't placed enough shares, we fail.
501 merged = merge_servers(self.preexisting_shares, self.use_trackers)
502 effective_happiness = servers_of_happiness(merged)
503 if effective_happiness < self.servers_of_happiness:
504 msg = failure_message(len(self.serverids_with_shares),
506 self.servers_of_happiness,
508 msg = ("server selection failed for %s: %s (%s)" %
509 (self, msg, self._get_progress_message()))
510 if self.last_failure_msg:
511 msg += " (%s)" % (self.last_failure_msg,)
512 self.log(msg, level=log.UNUSUAL)
513 return self._failed(msg)
515 # we placed enough to be happy, so we're done
517 self._status.set_status("Placed all shares")
518 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
519 self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
520 self.log(msg, level=log.OPERATIONAL)
521 return (self.use_trackers, self.preexisting_shares)
523 def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
524 if isinstance(res, failure.Failure):
525 # This is unusual, and probably indicates a bug or a network
527 self.log("%s got error during server selection: %s" % (tracker, res),
529 self.error_count += 1
530 self.bad_query_count += 1
531 self.homeless_shares |= shares_to_ask
532 if (self.first_pass_trackers
533 or self.second_pass_trackers
534 or self.next_pass_trackers):
535 # there is still hope, so just loop
538 # No more servers, so this upload might fail (it depends upon
539 # whether we've hit servers_of_happiness or not). Log the last
540 # failure we got: if a coding error causes all servers to fail
541 # in the same way, this allows the common failure to be seen
542 # by the uploader and should help with debugging
543 msg = ("last failure (from %s) was: %s" % (tracker, res))
544 self.last_failure_msg = msg
546 (alreadygot, allocated) = res
547 self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
548 % (tracker.get_name(),
549 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
553 self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
554 if s in self.homeless_shares:
555 self.homeless_shares.remove(s)
557 elif s in shares_to_ask:
560 # the ServerTracker will remember which shares were allocated on
561 # that peer. We just have to remember to use them.
563 self.use_trackers.add(tracker)
566 if allocated or alreadygot:
567 self.serverids_with_shares.add(tracker.get_serverid())
569 not_yet_present = set(shares_to_ask) - set(alreadygot)
570 still_homeless = not_yet_present - set(allocated)
573 # They accepted at least one of the shares that we asked
574 # them to accept, or they had a share that we didn't ask
575 # them to accept but that we hadn't placed yet, so this
576 # was a productive query
577 self.good_query_count += 1
579 self.bad_query_count += 1
583 # In networks with lots of space, this is very unusual and
584 # probably indicates an error. In networks with servers that
585 # are full, it is merely unusual. In networks that are very
586 # full, it is common, and many uploads will fail. In most
587 # cases, this is obviously not fatal, and we'll just use some
590 # some shares are still homeless, keep trying to find them a
591 # home. The ones that were rejected get first priority.
592 self.homeless_shares |= still_homeless
593 # Since they were unable to accept all of our requests, so it
594 # is safe to assume that asking them again won't help.
596 # if they *were* able to accept everything, they might be
597 # willing to accept even more.
598 put_tracker_here.append(tracker)
604 def _failed(self, msg):
606 I am called when server selection fails. I first abort all of the
607 remote buckets that I allocated during my unsuccessful attempt to
608 place shares for this file. I then raise an
609 UploadUnhappinessError with my msg argument.
611 for tracker in self.use_trackers:
612 assert isinstance(tracker, ServerTracker)
614 raise UploadUnhappinessError(msg)
617 class EncryptAnUploadable:
618 """This is a wrapper that takes an IUploadable and provides
619 IEncryptedUploadable."""
620 implements(IEncryptedUploadable)
623 def __init__(self, original, log_parent=None):
624 self.original = IUploadable(original)
625 self._log_number = log_parent
626 self._encryptor = None
627 self._plaintext_hasher = plaintext_hasher()
628 self._plaintext_segment_hasher = None
629 self._plaintext_segment_hashes = []
630 self._encoding_parameters = None
631 self._file_size = None
632 self._ciphertext_bytes_read = 0
635 def set_upload_status(self, upload_status):
636 self._status = IUploadStatus(upload_status)
637 self.original.set_upload_status(upload_status)
639 def log(self, *args, **kwargs):
640 if "facility" not in kwargs:
641 kwargs["facility"] = "upload.encryption"
642 if "parent" not in kwargs:
643 kwargs["parent"] = self._log_number
644 return log.msg(*args, **kwargs)
647 if self._file_size is not None:
648 return defer.succeed(self._file_size)
649 d = self.original.get_size()
651 self._file_size = size
653 self._status.set_size(size)
655 d.addCallback(_got_size)
658 def get_all_encoding_parameters(self):
659 if self._encoding_parameters is not None:
660 return defer.succeed(self._encoding_parameters)
661 d = self.original.get_all_encoding_parameters()
662 def _got(encoding_parameters):
663 (k, happy, n, segsize) = encoding_parameters
664 self._segment_size = segsize # used by segment hashers
665 self._encoding_parameters = encoding_parameters
666 self.log("my encoding parameters: %s" % (encoding_parameters,),
668 return encoding_parameters
672 def _get_encryptor(self):
674 return defer.succeed(self._encryptor)
676 d = self.original.get_encryption_key()
681 storage_index = storage_index_hash(key)
682 assert isinstance(storage_index, str)
683 # There's no point to having the SI be longer than the key, so we
684 # specify that it is truncated to the same 128 bits as the AES key.
685 assert len(storage_index) == 16 # SHA-256 truncated to 128b
686 self._storage_index = storage_index
688 self._status.set_storage_index(storage_index)
693 def get_storage_index(self):
694 d = self._get_encryptor()
695 d.addCallback(lambda res: self._storage_index)
698 def _get_segment_hasher(self):
699 p = self._plaintext_segment_hasher
701 left = self._segment_size - self._plaintext_segment_hashed_bytes
703 p = plaintext_segment_hasher()
704 self._plaintext_segment_hasher = p
705 self._plaintext_segment_hashed_bytes = 0
706 return p, self._segment_size
708 def _update_segment_hash(self, chunk):
710 while offset < len(chunk):
711 p, segment_left = self._get_segment_hasher()
712 chunk_left = len(chunk) - offset
713 this_segment = min(chunk_left, segment_left)
714 p.update(chunk[offset:offset+this_segment])
715 self._plaintext_segment_hashed_bytes += this_segment
717 if self._plaintext_segment_hashed_bytes == self._segment_size:
718 # we've filled this segment
719 self._plaintext_segment_hashes.append(p.digest())
720 self._plaintext_segment_hasher = None
721 self.log("closed hash [%d]: %dB" %
722 (len(self._plaintext_segment_hashes)-1,
723 self._plaintext_segment_hashed_bytes),
725 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
726 segnum=len(self._plaintext_segment_hashes)-1,
727 hash=base32.b2a(p.digest()),
730 offset += this_segment
733 def read_encrypted(self, length, hash_only):
734 # make sure our parameters have been set up first
735 d = self.get_all_encoding_parameters()
737 d.addCallback(lambda ignored: self.get_size())
738 d.addCallback(lambda ignored: self._get_encryptor())
739 # then fetch and encrypt the plaintext. The unusual structure here
740 # (passing a Deferred *into* a function) is needed to avoid
741 # overflowing the stack: Deferreds don't optimize out tail recursion.
742 # We also pass in a list, to which _read_encrypted will append
745 d2 = defer.Deferred()
746 d.addCallback(lambda ignored:
747 self._read_encrypted(length, ciphertext, hash_only, d2))
748 d.addCallback(lambda ignored: d2)
751 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
753 fire_when_done.callback(ciphertext)
755 # tolerate large length= values without consuming a lot of RAM by
756 # reading just a chunk (say 50kB) at a time. This only really matters
757 # when hash_only==True (i.e. resuming an interrupted upload), since
758 # that's the case where we will be skipping over a lot of data.
759 size = min(remaining, self.CHUNKSIZE)
760 remaining = remaining - size
761 # read a chunk of plaintext..
762 d = defer.maybeDeferred(self.original.read, size)
763 # N.B.: if read() is synchronous, then since everything else is
764 # actually synchronous too, we'd blow the stack unless we stall for a
765 # tick. Once you accept a Deferred from IUploadable.read(), you must
766 # be prepared to have it fire immediately too.
767 d.addCallback(fireEventually)
768 def _good(plaintext):
770 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
771 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
772 ciphertext.extend(ct)
773 self._read_encrypted(remaining, ciphertext, hash_only,
776 fire_when_done.errback(why)
781 def _hash_and_encrypt_plaintext(self, data, hash_only):
782 assert isinstance(data, (tuple, list)), type(data)
785 # we use data.pop(0) instead of 'for chunk in data' to save
786 # memory: each chunk is destroyed as soon as we're done with it.
790 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
792 bytes_processed += len(chunk)
793 self._plaintext_hasher.update(chunk)
794 self._update_segment_hash(chunk)
795 # TODO: we have to encrypt the data (even if hash_only==True)
796 # because pycryptopp's AES-CTR implementation doesn't offer a
797 # way to change the counter value. Once pycryptopp acquires
798 # this ability, change this to simply update the counter
799 # before each call to (hash_only==False) _encryptor.process()
800 ciphertext = self._encryptor.process(chunk)
802 self.log(" skipping encryption", level=log.NOISY)
804 cryptdata.append(ciphertext)
807 self._ciphertext_bytes_read += bytes_processed
809 progress = float(self._ciphertext_bytes_read) / self._file_size
810 self._status.set_progress(1, progress)
814 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
815 # this is currently unused, but will live again when we fix #453
816 if len(self._plaintext_segment_hashes) < num_segments:
817 # close out the last one
818 assert len(self._plaintext_segment_hashes) == num_segments-1
819 p, segment_left = self._get_segment_hasher()
820 self._plaintext_segment_hashes.append(p.digest())
821 del self._plaintext_segment_hasher
822 self.log("closing plaintext leaf hasher, hashed %d bytes" %
823 self._plaintext_segment_hashed_bytes,
825 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
826 segnum=len(self._plaintext_segment_hashes)-1,
827 hash=base32.b2a(p.digest()),
829 assert len(self._plaintext_segment_hashes) == num_segments
830 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
832 def get_plaintext_hash(self):
833 h = self._plaintext_hasher.digest()
834 return defer.succeed(h)
837 return self.original.close()
840 implements(IUploadStatus)
841 statusid_counter = itertools.count(0)
844 self.storage_index = None
847 self.status = "Not started"
848 self.progress = [0.0, 0.0, 0.0]
851 self.counter = self.statusid_counter.next()
852 self.started = time.time()
854 def get_started(self):
856 def get_storage_index(self):
857 return self.storage_index
860 def using_helper(self):
862 def get_status(self):
864 def get_progress(self):
865 return tuple(self.progress)
866 def get_active(self):
868 def get_results(self):
870 def get_counter(self):
873 def set_storage_index(self, si):
874 self.storage_index = si
875 def set_size(self, size):
877 def set_helper(self, helper):
879 def set_status(self, status):
881 def set_progress(self, which, value):
882 # [0]: chk, [1]: ciphertext, [2]: encode+push
883 self.progress[which] = value
884 def set_active(self, value):
886 def set_results(self, value):
890 server_selector_class = Tahoe2ServerSelector
892 def __init__(self, storage_broker, secret_holder):
893 # server_selector needs storage_broker and secret_holder
894 self._storage_broker = storage_broker
895 self._secret_holder = secret_holder
896 self._log_number = self.log("CHKUploader starting", parent=None)
898 self._storage_index = None
899 self._upload_status = UploadStatus()
900 self._upload_status.set_helper(False)
901 self._upload_status.set_active(True)
903 # locate_all_shareholders() will create the following attribute:
904 # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
906 def log(self, *args, **kwargs):
907 if "parent" not in kwargs:
908 kwargs["parent"] = self._log_number
909 if "facility" not in kwargs:
910 kwargs["facility"] = "tahoe.upload"
911 return log.msg(*args, **kwargs)
913 def start(self, encrypted_uploadable):
914 """Start uploading the file.
916 Returns a Deferred that will fire with the UploadResults instance.
919 self._started = time.time()
920 eu = IEncryptedUploadable(encrypted_uploadable)
921 self.log("starting upload of %s" % eu)
923 eu.set_upload_status(self._upload_status)
924 d = self.start_encrypted(eu)
925 def _done(uploadresults):
926 self._upload_status.set_active(False)
932 """Call this if the upload must be abandoned before it completes.
933 This will tell the shareholders to delete their partial shares. I
934 return a Deferred that fires when these messages have been acked."""
935 if not self._encoder:
936 # how did you call abort() before calling start() ?
937 return defer.succeed(None)
938 return self._encoder.abort()
940 def start_encrypted(self, encrypted):
941 """ Returns a Deferred that will fire with the UploadResults instance. """
942 eu = IEncryptedUploadable(encrypted)
944 started = time.time()
945 self._encoder = e = encode.Encoder(self._log_number,
947 d = e.set_encrypted_uploadable(eu)
948 d.addCallback(self.locate_all_shareholders, started)
949 d.addCallback(self.set_shareholders, e)
950 d.addCallback(lambda res: e.start())
951 d.addCallback(self._encrypted_done)
954 def locate_all_shareholders(self, encoder, started):
955 server_selection_started = now = time.time()
956 self._storage_index_elapsed = now - started
957 storage_broker = self._storage_broker
958 secret_holder = self._secret_holder
959 storage_index = encoder.get_param("storage_index")
960 self._storage_index = storage_index
961 upload_id = si_b2a(storage_index)[:5]
962 self.log("using storage index %s" % upload_id)
963 server_selector = self.server_selector_class(upload_id,
967 share_size = encoder.get_param("share_size")
968 block_size = encoder.get_param("block_size")
969 num_segments = encoder.get_param("num_segments")
970 k,desired,n = encoder.get_param("share_counts")
972 self._server_selection_started = time.time()
973 d = server_selector.get_shareholders(storage_broker, secret_holder,
975 share_size, block_size,
976 num_segments, n, k, desired)
978 self._server_selection_elapsed = time.time() - server_selection_started
983 def set_shareholders(self, (upload_trackers, already_serverids), encoder):
985 @param upload_trackers: a sequence of ServerTracker objects that
986 have agreed to hold some shares for us (the
987 shareids are stashed inside the ServerTracker)
989 @paran already_serverids: a dict mapping sharenum to a set of
990 serverids for servers that claim to already
993 msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
994 values = ([', '.join([str_shareloc(k,v)
995 for k,v in st.buckets.iteritems()])
996 for st in upload_trackers], already_serverids)
997 self.log(msgtempl % values, level=log.OPERATIONAL)
998 # record already-present shares in self._results
999 self._count_preexisting_shares = len(already_serverids)
1001 self._server_trackers = {} # k: shnum, v: instance of ServerTracker
1002 for tracker in upload_trackers:
1003 assert isinstance(tracker, ServerTracker)
1005 servermap = already_serverids.copy()
1006 for tracker in upload_trackers:
1007 buckets.update(tracker.buckets)
1008 for shnum in tracker.buckets:
1009 self._server_trackers[shnum] = tracker
1010 servermap.setdefault(shnum, set()).add(tracker.get_serverid())
1011 assert len(buckets) == sum([len(tracker.buckets)
1012 for tracker in upload_trackers]), \
1013 "%s (%s) != %s (%s)" % (
1016 sum([len(tracker.buckets) for tracker in upload_trackers]),
1017 [(t.buckets, t.get_serverid()) for t in upload_trackers]
1019 encoder.set_shareholders(buckets, servermap)
1021 def _encrypted_done(self, verifycap):
1022 """Returns a Deferred that will fire with the UploadResults instance."""
1024 sharemap = dictutil.DictOfSets()
1025 servermap = dictutil.DictOfSets()
1026 for shnum in e.get_shares_placed():
1027 server_tracker = self._server_trackers[shnum]
1028 serverid = server_tracker.get_serverid()
1029 sharemap.add(shnum, serverid)
1030 servermap.add(serverid, shnum)
1033 timings["total"] = now - self._started
1034 timings["storage_index"] = self._storage_index_elapsed
1035 timings["peer_selection"] = self._server_selection_elapsed
1036 timings.update(e.get_times())
1037 ur = UploadResults(file_size=e.file_size,
1038 ciphertext_fetched=0,
1039 preexisting_shares=self._count_preexisting_shares,
1040 pushed_shares=len(e.get_shares_placed()),
1042 servermap=servermap,
1044 uri_extension_data=e.get_uri_extension_data(),
1045 uri_extension_hash=e.get_uri_extension_hash(),
1046 verifycapstr=verifycap.to_string())
1047 self._upload_status.set_results(ur)
1050 def get_upload_status(self):
1051 return self._upload_status
1053 def read_this_many_bytes(uploadable, size, prepend_data=[]):
1055 return defer.succeed([])
1056 d = uploadable.read(size)
1058 assert isinstance(data, list)
1059 bytes = sum([len(piece) for piece in data])
1061 assert bytes <= size
1062 remaining = size - bytes
1064 return read_this_many_bytes(uploadable, remaining,
1065 prepend_data + data)
1066 return prepend_data + data
1070 class LiteralUploader:
1073 self._status = s = UploadStatus()
1074 s.set_storage_index(None)
1076 s.set_progress(0, 1.0)
1079 def start(self, uploadable):
1080 uploadable = IUploadable(uploadable)
1081 d = uploadable.get_size()
1082 def _got_size(size):
1084 self._status.set_size(size)
1085 return read_this_many_bytes(uploadable, size)
1086 d.addCallback(_got_size)
1087 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
1088 d.addCallback(lambda u: u.to_string())
1089 d.addCallback(self._build_results)
1092 def _build_results(self, uri):
1093 ur = UploadResults(file_size=self._size,
1094 ciphertext_fetched=0,
1095 preexisting_shares=0,
1100 uri_extension_data=None,
1101 uri_extension_hash=None,
1104 self._status.set_status("Finished")
1105 self._status.set_progress(1, 1.0)
1106 self._status.set_progress(2, 1.0)
1107 self._status.set_results(ur)
1113 def get_upload_status(self):
1116 class RemoteEncryptedUploadable(Referenceable):
1117 implements(RIEncryptedUploadable)
1119 def __init__(self, encrypted_uploadable, upload_status):
1120 self._eu = IEncryptedUploadable(encrypted_uploadable)
1122 self._bytes_sent = 0
1123 self._status = IUploadStatus(upload_status)
1124 # we are responsible for updating the status string while we run, and
1125 # for setting the ciphertext-fetch progress.
1129 if self._size is not None:
1130 return defer.succeed(self._size)
1131 d = self._eu.get_size()
1132 def _got_size(size):
1135 d.addCallback(_got_size)
1138 def remote_get_size(self):
1139 return self.get_size()
1140 def remote_get_all_encoding_parameters(self):
1141 return self._eu.get_all_encoding_parameters()
1143 def _read_encrypted(self, length, hash_only):
1144 d = self._eu.read_encrypted(length, hash_only)
1147 self._offset += length
1149 size = sum([len(data) for data in strings])
1150 self._offset += size
1152 d.addCallback(_read)
1155 def remote_read_encrypted(self, offset, length):
1156 # we don't support seek backwards, but we allow skipping forwards
1157 precondition(offset >= 0, offset)
1158 precondition(length >= 0, length)
1159 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1161 precondition(offset >= self._offset, offset, self._offset)
1162 if offset > self._offset:
1163 # read the data from disk anyways, to build up the hash tree
1164 skip = offset - self._offset
1165 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1166 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1167 d = self._read_encrypted(skip, hash_only=True)
1169 d = defer.succeed(None)
1171 def _at_correct_offset(res):
1172 assert offset == self._offset, "%d != %d" % (offset, self._offset)
1173 return self._read_encrypted(length, hash_only=False)
1174 d.addCallback(_at_correct_offset)
1177 size = sum([len(data) for data in strings])
1178 self._bytes_sent += size
1180 d.addCallback(_read)
1183 def remote_close(self):
1184 return self._eu.close()
1187 class AssistedUploader:
1189 def __init__(self, helper):
1190 self._helper = helper
1191 self._log_number = log.msg("AssistedUploader starting")
1192 self._storage_index = None
1193 self._upload_status = s = UploadStatus()
1197 def log(self, *args, **kwargs):
1198 if "parent" not in kwargs:
1199 kwargs["parent"] = self._log_number
1200 return log.msg(*args, **kwargs)
1202 def start(self, encrypted_uploadable, storage_index):
1203 """Start uploading the file.
1205 Returns a Deferred that will fire with the UploadResults instance.
1207 precondition(isinstance(storage_index, str), storage_index)
1208 self._started = time.time()
1209 eu = IEncryptedUploadable(encrypted_uploadable)
1210 eu.set_upload_status(self._upload_status)
1211 self._encuploadable = eu
1212 self._storage_index = storage_index
1214 d.addCallback(self._got_size)
1215 d.addCallback(lambda res: eu.get_all_encoding_parameters())
1216 d.addCallback(self._got_all_encoding_parameters)
1217 d.addCallback(self._contact_helper)
1218 d.addCallback(self._build_verifycap)
1220 self._upload_status.set_active(False)
1225 def _got_size(self, size):
1227 self._upload_status.set_size(size)
1229 def _got_all_encoding_parameters(self, params):
1230 k, happy, n, segment_size = params
1231 # stash these for URI generation later
1232 self._needed_shares = k
1233 self._total_shares = n
1234 self._segment_size = segment_size
1236 def _contact_helper(self, res):
1237 now = self._time_contacting_helper_start = time.time()
1238 self._storage_index_elapsed = now - self._started
1239 self.log(format="contacting helper for SI %(si)s..",
1240 si=si_b2a(self._storage_index), level=log.NOISY)
1241 self._upload_status.set_status("Contacting Helper")
1242 d = self._helper.callRemote("upload_chk", self._storage_index)
1243 d.addCallback(self._contacted_helper)
1246 def _contacted_helper(self, (helper_upload_results, upload_helper)):
1248 elapsed = now - self._time_contacting_helper_start
1249 self._elapsed_time_contacting_helper = elapsed
1251 self.log("helper says we need to upload", level=log.NOISY)
1252 self._upload_status.set_status("Uploading Ciphertext")
1253 # we need to upload the file
1254 reu = RemoteEncryptedUploadable(self._encuploadable,
1255 self._upload_status)
1256 # let it pre-compute the size for progress purposes
1258 d.addCallback(lambda ignored:
1259 upload_helper.callRemote("upload", reu))
1260 # this Deferred will fire with the upload results
1262 self.log("helper says file is already uploaded", level=log.OPERATIONAL)
1263 self._upload_status.set_progress(1, 1.0)
1264 return helper_upload_results
1266 def _convert_old_upload_results(self, upload_results):
1267 # pre-1.3.0 helpers return upload results which contain a mapping
1268 # from shnum to a single human-readable string, containing things
1269 # like "Found on [x],[y],[z]" (for healthy files that were already in
1270 # the grid), "Found on [x]" (for files that needed upload but which
1271 # discovered pre-existing shares), and "Placed on [x]" (for newly
1272 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1273 # set of binary serverid strings.
1275 # the old results are too hard to deal with (they don't even contain
1276 # as much information as the new results, since the nodeids are
1277 # abbreviated), so if we detect old results, just clobber them.
1279 sharemap = upload_results.sharemap
1280 if str in [type(v) for v in sharemap.values()]:
1281 upload_results.sharemap = None
1283 def _build_verifycap(self, helper_upload_results):
1284 self.log("upload finished, building readcap", level=log.OPERATIONAL)
1285 self._convert_old_upload_results(helper_upload_results)
1286 self._upload_status.set_status("Building Readcap")
1287 hur = helper_upload_results
1288 assert hur.uri_extension_data["needed_shares"] == self._needed_shares
1289 assert hur.uri_extension_data["total_shares"] == self._total_shares
1290 assert hur.uri_extension_data["segment_size"] == self._segment_size
1291 assert hur.uri_extension_data["size"] == self._size
1293 # hur.verifycap doesn't exist if already found
1294 v = uri.CHKFileVerifierURI(self._storage_index,
1295 uri_extension_hash=hur.uri_extension_hash,
1296 needed_shares=self._needed_shares,
1297 total_shares=self._total_shares,
1300 timings["storage_index"] = self._storage_index_elapsed
1301 timings["contacting_helper"] = self._elapsed_time_contacting_helper
1302 for key,val in hur.timings.items():
1304 key = "helper_total"
1307 timings["total"] = now - self._started
1309 ur = UploadResults(file_size=self._size,
1310 # not if already found
1311 ciphertext_fetched=hur.ciphertext_fetched,
1312 preexisting_shares=hur.preexisting_shares,
1313 pushed_shares=hur.pushed_shares,
1314 sharemap=hur.sharemap,
1315 servermap=hur.servermap, # not if already found
1317 uri_extension_data=hur.uri_extension_data,
1318 uri_extension_hash=hur.uri_extension_hash,
1319 verifycapstr=v.to_string())
1321 self._upload_status.set_status("Finished")
1322 self._upload_status.set_results(ur)
1325 def get_upload_status(self):
1326 return self._upload_status
1328 class BaseUploadable:
1329 # this is overridden by max_segment_size
1330 default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
1331 default_encoding_param_k = 3 # overridden by encoding_parameters
1332 default_encoding_param_happy = 7
1333 default_encoding_param_n = 10
1335 max_segment_size = None
1336 encoding_param_k = None
1337 encoding_param_happy = None
1338 encoding_param_n = None
1340 _all_encoding_parameters = None
1343 def set_upload_status(self, upload_status):
1344 self._status = IUploadStatus(upload_status)
1346 def set_default_encoding_parameters(self, default_params):
1347 assert isinstance(default_params, dict)
1348 for k,v in default_params.items():
1349 precondition(isinstance(k, str), k, v)
1350 precondition(isinstance(v, int), k, v)
1351 if "k" in default_params:
1352 self.default_encoding_param_k = default_params["k"]
1353 if "happy" in default_params:
1354 self.default_encoding_param_happy = default_params["happy"]
1355 if "n" in default_params:
1356 self.default_encoding_param_n = default_params["n"]
1357 if "max_segment_size" in default_params:
1358 self.default_max_segment_size = default_params["max_segment_size"]
1360 def get_all_encoding_parameters(self):
1361 if self._all_encoding_parameters:
1362 return defer.succeed(self._all_encoding_parameters)
1364 max_segsize = self.max_segment_size or self.default_max_segment_size
1365 k = self.encoding_param_k or self.default_encoding_param_k
1366 happy = self.encoding_param_happy or self.default_encoding_param_happy
1367 n = self.encoding_param_n or self.default_encoding_param_n
1370 def _got_size(file_size):
1371 # for small files, shrink the segment size to avoid wasting space
1372 segsize = min(max_segsize, file_size)
1373 # this must be a multiple of 'required_shares'==k
1374 segsize = mathutil.next_multiple(segsize, k)
1375 encoding_parameters = (k, happy, n, segsize)
1376 self._all_encoding_parameters = encoding_parameters
1377 return encoding_parameters
1378 d.addCallback(_got_size)
1381 class FileHandle(BaseUploadable):
1382 implements(IUploadable)
1384 def __init__(self, filehandle, convergence):
1386 Upload the data from the filehandle. If convergence is None then a
1387 random encryption key will be used, else the plaintext will be hashed,
1388 then the hash will be hashed together with the string in the
1389 "convergence" argument to form the encryption key.
1391 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1392 self._filehandle = filehandle
1394 self.convergence = convergence
1397 def _get_encryption_key_convergent(self):
1398 if self._key is not None:
1399 return defer.succeed(self._key)
1402 # that sets self._size as a side-effect
1403 d.addCallback(lambda size: self.get_all_encoding_parameters())
1405 k, happy, n, segsize = params
1406 f = self._filehandle
1407 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1412 data = f.read(BLOCKSIZE)
1415 enckey_hasher.update(data)
1416 # TODO: setting progress in a non-yielding loop is kind of
1417 # pointless, but I'm anticipating (perhaps prematurely) the
1418 # day when we use a slowjob or twisted's CooperatorService to
1419 # make this yield time to other jobs.
1420 bytes_read += len(data)
1422 self._status.set_progress(0, float(bytes_read)/self._size)
1424 self._key = enckey_hasher.digest()
1426 self._status.set_progress(0, 1.0)
1427 assert len(self._key) == 16
1432 def _get_encryption_key_random(self):
1433 if self._key is None:
1434 self._key = os.urandom(16)
1435 return defer.succeed(self._key)
1437 def get_encryption_key(self):
1438 if self.convergence is not None:
1439 return self._get_encryption_key_convergent()
1441 return self._get_encryption_key_random()
1444 if self._size is not None:
1445 return defer.succeed(self._size)
1446 self._filehandle.seek(0, os.SEEK_END)
1447 size = self._filehandle.tell()
1449 self._filehandle.seek(0)
1450 return defer.succeed(size)
1452 def read(self, length):
1453 return defer.succeed([self._filehandle.read(length)])
1456 # the originator of the filehandle reserves the right to close it
1459 class FileName(FileHandle):
1460 def __init__(self, filename, convergence):
1462 Upload the data from the filename. If convergence is None then a
1463 random encryption key will be used, else the plaintext will be hashed,
1464 then the hash will be hashed together with the string in the
1465 "convergence" argument to form the encryption key.
1467 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1468 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1470 FileHandle.close(self)
1471 self._filehandle.close()
1473 class Data(FileHandle):
1474 def __init__(self, data, convergence):
1476 Upload the data from the data argument. If convergence is None then a
1477 random encryption key will be used, else the plaintext will be hashed,
1478 then the hash will be hashed together with the string in the
1479 "convergence" argument to form the encryption key.
1481 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1482 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1484 class Uploader(service.MultiService, log.PrefixingLogMixin):
1485 """I am a service that allows file uploading. I am a service-child of the
1488 implements(IUploader)
1490 URI_LIT_SIZE_THRESHOLD = 55
1492 def __init__(self, helper_furl=None, stats_provider=None, history=None):
1493 self._helper_furl = helper_furl
1494 self.stats_provider = stats_provider
1495 self._history = history
1497 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1498 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1499 service.MultiService.__init__(self)
1501 def startService(self):
1502 service.MultiService.startService(self)
1503 if self._helper_furl:
1504 self.parent.tub.connectTo(self._helper_furl,
1507 def _got_helper(self, helper):
1508 self.log("got helper connection, getting versions")
1509 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1511 "application-version": "unknown: no get_version()",
1513 d = add_version_to_remote_reference(helper, default)
1514 d.addCallback(self._got_versioned_helper)
1516 def _got_versioned_helper(self, helper):
1517 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1518 if needed not in helper.version:
1519 raise InsufficientVersionError(needed, helper.version)
1520 self._helper = helper
1521 helper.notifyOnDisconnect(self._lost_helper)
1523 def _lost_helper(self):
1526 def get_helper_info(self):
1527 # return a tuple of (helper_furl_or_None, connected_bool)
1528 return (self._helper_furl, bool(self._helper))
1531 def upload(self, uploadable):
1533 Returns a Deferred that will fire with the UploadResults instance.
1538 uploadable = IUploadable(uploadable)
1539 d = uploadable.get_size()
1540 def _got_size(size):
1541 default_params = self.parent.get_encoding_parameters()
1542 precondition(isinstance(default_params, dict), default_params)
1543 precondition("max_segment_size" in default_params, default_params)
1544 uploadable.set_default_encoding_parameters(default_params)
1546 if self.stats_provider:
1547 self.stats_provider.count('uploader.files_uploaded', 1)
1548 self.stats_provider.count('uploader.bytes_uploaded', size)
1550 if size <= self.URI_LIT_SIZE_THRESHOLD:
1551 uploader = LiteralUploader()
1552 return uploader.start(uploadable)
1554 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1555 d2 = defer.succeed(None)
1557 uploader = AssistedUploader(self._helper)
1558 d2.addCallback(lambda x: eu.get_storage_index())
1559 d2.addCallback(lambda si: uploader.start(eu, si))
1561 storage_broker = self.parent.get_storage_broker()
1562 secret_holder = self.parent._secret_holder
1563 uploader = CHKUploader(storage_broker, secret_holder)
1564 d2.addCallback(lambda x: uploader.start(eu))
1566 self._all_uploads[uploader] = None
1568 self._history.add_upload(uploader.get_upload_status())
1569 def turn_verifycap_into_read_cap(uploadresults):
1570 # Generate the uri from the verifycap plus the key.
1571 d3 = uploadable.get_encryption_key()
1572 def put_readcap_into_results(key):
1573 v = uri.from_string(uploadresults.get_verifycapstr())
1574 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1575 uploadresults.set_uri(r.to_string())
1576 return uploadresults
1577 d3.addCallback(put_readcap_into_results)
1579 d2.addCallback(turn_verifycap_into_read_cap)
1581 d.addCallback(_got_size)