1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9 file_cancel_secret_hash, bucket_renewal_secret_hash, \
10 bucket_cancel_secret_hash, plaintext_hasher, \
11 storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.rrefutil import add_version_to_remote_reference
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
20 NotEnoughSharesError, NoSharesError, NoServersError, \
21 InsufficientVersionError
22 from allmydata.immutable import layout
23 from pycryptopp.cipher.aes import AES
25 from cStringIO import StringIO
34 class HaveAllPeersError(Exception):
35 # we use this to jump out of the loop
38 # this wants to live in storage, not here
39 class TooFullError(Exception):
42 class UploadResults(Copyable, RemoteCopy):
43 implements(IUploadResults)
44 # note: don't change this string, it needs to match the value used on the
45 # helper, and it does *not* need to match the fully-qualified
46 # package/module/class name
47 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
50 # also, think twice about changing the shape of any existing attribute,
51 # because instances of this class are sent from the helper to its client,
52 # so changing this may break compatibility. Consider adding new fields
53 # instead of modifying existing ones.
56 self.timings = {} # dict of name to number of seconds
57 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
58 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
60 self.ciphertext_fetched = None # how much the helper fetched
62 self.preexisting_shares = None # count of shares already present
63 self.pushed_shares = None # count of shares we pushed
66 # our current uri_extension is 846 bytes for small files, a few bytes
67 # more for larger ones (since the filesize is encoded in decimal in a
68 # few places). Ask for a little bit more just in case we need it. If
69 # the extension changes size, we can change EXTENSION_SIZE to
70 # allocate a more accurate amount of space.
72 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
76 def __init__(self, peerid, storage_server,
77 sharesize, blocksize, num_segments, num_share_hashes,
79 bucket_renewal_secret, bucket_cancel_secret):
80 precondition(isinstance(peerid, str), peerid)
81 precondition(len(peerid) == 20, peerid)
83 self._storageserver = storage_server # to an RIStorageServer
84 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
85 self.sharesize = sharesize
87 wbp = layout.make_write_bucket_proxy(None, sharesize,
88 blocksize, num_segments,
90 EXTENSION_SIZE, peerid)
91 self.wbp_class = wbp.__class__ # to create more of them
92 self.allocated_size = wbp.get_allocated_size()
93 self.blocksize = blocksize
94 self.num_segments = num_segments
95 self.num_share_hashes = num_share_hashes
96 self.storage_index = storage_index
98 self.renew_secret = bucket_renewal_secret
99 self.cancel_secret = bucket_cancel_secret
102 return ("<PeerTracker for peer %s and SI %s>"
103 % (idlib.shortnodeid_b2a(self.peerid),
104 si_b2a(self.storage_index)[:5]))
106 def query(self, sharenums):
107 d = self._storageserver.callRemote("allocate_buckets",
113 canary=Referenceable())
114 d.addCallback(self._got_reply)
117 def _got_reply(self, (alreadygot, buckets)):
118 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
120 for sharenum, rref in buckets.iteritems():
121 bp = self.wbp_class(rref, self.sharesize,
124 self.num_share_hashes,
128 self.buckets.update(b)
129 return (alreadygot, set(b.keys()))
131 class Tahoe2PeerSelector:
133 def __init__(self, upload_id, logparent=None, upload_status=None):
134 self.upload_id = upload_id
135 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
137 self.num_peers_contacted = 0
138 self.last_failure_msg = None
139 self._status = IUploadStatus(upload_status)
140 self._log_parent = log.msg("%s starting" % self, parent=logparent)
143 return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
145 def get_shareholders(self, storage_broker, secret_holder,
146 storage_index, share_size, block_size,
147 num_segments, total_shares, shares_of_happiness):
149 @return: (used_peers, already_peers), where used_peers is a set of
150 PeerTracker instances that have agreed to hold some shares
151 for us (the shnum is stashed inside the PeerTracker),
152 and already_peers is a dict mapping shnum to a peer
153 which claims to already have the share.
157 self._status.set_status("Contacting Peers..")
159 self.total_shares = total_shares
160 self.shares_of_happiness = shares_of_happiness
162 self.homeless_shares = range(total_shares)
163 # self.uncontacted_peers = list() # peers we haven't asked yet
164 self.contacted_peers = [] # peers worth asking again
165 self.contacted_peers2 = [] # peers that we have asked again
166 self._started_second_pass = False
167 self.use_peers = set() # PeerTrackers that have shares assigned to them
168 self.preexisting_shares = {} # sharenum -> peerid holding the share
170 peers = storage_broker.get_servers_for_index(storage_index)
172 raise NoServersError("client gave us zero peers")
174 # this needed_hashes computation should mirror
175 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
176 # (instead of a HashTree) because we don't require actual hashing
177 # just to count the levels.
178 ht = hashtree.IncompleteHashTree(total_shares)
179 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
181 # figure out how much space to ask for
182 wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
183 num_share_hashes, EXTENSION_SIZE,
185 allocated_size = wbp.get_allocated_size()
187 # filter the list of peers according to which ones can accomodate
188 # this request. This excludes older peers (which used a 4-byte size
189 # field) from getting large shares (for files larger than about
190 # 12GiB). See #439 for details.
191 def _get_maxsize(peer):
192 (peerid, conn) = peer
193 v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
194 return v1["maximum-immutable-share-size"]
195 peers = [peer for peer in peers
196 if _get_maxsize(peer) >= allocated_size]
198 raise NoServersError("no peers could accept an allocated_size of %d" % allocated_size)
200 # decide upon the renewal/cancel secrets, to include them in the
201 # allocate_buckets query.
202 client_renewal_secret = secret_holder.get_renewal_secret()
203 client_cancel_secret = secret_holder.get_cancel_secret()
205 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
207 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
210 trackers = [ PeerTracker(peerid, conn,
211 share_size, block_size,
212 num_segments, num_share_hashes,
214 bucket_renewal_secret_hash(file_renewal_secret,
216 bucket_cancel_secret_hash(file_cancel_secret,
219 for (peerid, conn) in peers ]
220 self.uncontacted_peers = trackers
222 d = defer.maybeDeferred(self._loop)
226 if not self.homeless_shares:
228 msg = ("placed all %d shares, "
229 "sent %d queries to %d peers, "
230 "%d queries placed some shares, %d placed none, "
233 self.query_count, self.num_peers_contacted,
234 self.good_query_count, self.bad_query_count,
236 log.msg("peer selection successful for %s: %s" % (self, msg),
237 parent=self._log_parent)
238 return (self.use_peers, self.preexisting_shares)
240 if self.uncontacted_peers:
241 peer = self.uncontacted_peers.pop(0)
242 # TODO: don't pre-convert all peerids to PeerTrackers
243 assert isinstance(peer, PeerTracker)
245 shares_to_ask = set([self.homeless_shares.pop(0)])
246 self.query_count += 1
247 self.num_peers_contacted += 1
249 self._status.set_status("Contacting Peers [%s] (first query),"
251 % (idlib.shortnodeid_b2a(peer.peerid),
252 len(self.homeless_shares)))
253 d = peer.query(shares_to_ask)
254 d.addBoth(self._got_response, peer, shares_to_ask,
255 self.contacted_peers)
257 elif self.contacted_peers:
258 # ask a peer that we've already asked.
259 if not self._started_second_pass:
260 log.msg("starting second pass", parent=self._log_parent,
262 self._started_second_pass = True
263 num_shares = mathutil.div_ceil(len(self.homeless_shares),
264 len(self.contacted_peers))
265 peer = self.contacted_peers.pop(0)
266 shares_to_ask = set(self.homeless_shares[:num_shares])
267 self.homeless_shares[:num_shares] = []
268 self.query_count += 1
270 self._status.set_status("Contacting Peers [%s] (second query),"
272 % (idlib.shortnodeid_b2a(peer.peerid),
273 len(self.homeless_shares)))
274 d = peer.query(shares_to_ask)
275 d.addBoth(self._got_response, peer, shares_to_ask,
276 self.contacted_peers2)
278 elif self.contacted_peers2:
279 # we've finished the second-or-later pass. Move all the remaining
280 # peers back into self.contacted_peers for the next pass.
281 self.contacted_peers.extend(self.contacted_peers2)
282 self.contacted_peers2[:] = []
285 # no more peers. If we haven't placed enough shares, we fail.
286 placed_shares = self.total_shares - len(self.homeless_shares)
287 if placed_shares < self.shares_of_happiness:
288 msg = ("placed %d shares out of %d total (%d homeless), "
290 "sent %d queries to %d peers, "
291 "%d queries placed some shares, %d placed none, "
293 (self.total_shares - len(self.homeless_shares),
294 self.total_shares, len(self.homeless_shares),
295 self.shares_of_happiness,
296 self.query_count, self.num_peers_contacted,
297 self.good_query_count, self.bad_query_count,
299 msg = "peer selection failed for %s: %s" % (self, msg)
300 if self.last_failure_msg:
301 msg += " (%s)" % (self.last_failure_msg,)
302 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
304 raise NotEnoughSharesError(msg)
306 raise NoSharesError(msg)
308 # we placed enough to be happy, so we're done
310 self._status.set_status("Placed all shares")
311 return self.use_peers
313 def _got_response(self, res, peer, shares_to_ask, put_peer_here):
314 if isinstance(res, failure.Failure):
315 # This is unusual, and probably indicates a bug or a network
317 log.msg("%s got error during peer selection: %s" % (peer, res),
318 level=log.UNUSUAL, parent=self._log_parent)
319 self.error_count += 1
320 self.homeless_shares = list(shares_to_ask) + self.homeless_shares
321 if (self.uncontacted_peers
322 or self.contacted_peers
323 or self.contacted_peers2):
324 # there is still hope, so just loop
327 # No more peers, so this upload might fail (it depends upon
328 # whether we've hit shares_of_happiness or not). Log the last
329 # failure we got: if a coding error causes all peers to fail
330 # in the same way, this allows the common failure to be seen
331 # by the uploader and should help with debugging
332 msg = ("last failure (from %s) was: %s" % (peer, res))
333 self.last_failure_msg = msg
335 (alreadygot, allocated) = res
336 log.msg("response from peer %s: alreadygot=%s, allocated=%s"
337 % (idlib.shortnodeid_b2a(peer.peerid),
338 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
339 level=log.NOISY, parent=self._log_parent)
342 self.preexisting_shares[s] = peer.peerid
343 if s in self.homeless_shares:
344 self.homeless_shares.remove(s)
347 # the PeerTracker will remember which shares were allocated on
348 # that peer. We just have to remember to use them.
350 self.use_peers.add(peer)
353 not_yet_present = set(shares_to_ask) - set(alreadygot)
354 still_homeless = not_yet_present - set(allocated)
357 # they accepted or already had at least one share, so
358 # progress has been made
359 self.good_query_count += 1
361 self.bad_query_count += 1
364 # In networks with lots of space, this is very unusual and
365 # probably indicates an error. In networks with peers that
366 # are full, it is merely unusual. In networks that are very
367 # full, it is common, and many uploads will fail. In most
368 # cases, this is obviously not fatal, and we'll just use some
371 # some shares are still homeless, keep trying to find them a
372 # home. The ones that were rejected get first priority.
373 self.homeless_shares = (list(still_homeless)
374 + self.homeless_shares)
375 # Since they were unable to accept all of our requests, so it
376 # is safe to assume that asking them again won't help.
378 # if they *were* able to accept everything, they might be
379 # willing to accept even more.
380 put_peer_here.append(peer)
386 class EncryptAnUploadable:
387 """This is a wrapper that takes an IUploadable and provides
388 IEncryptedUploadable."""
389 implements(IEncryptedUploadable)
392 def __init__(self, original, log_parent=None):
393 self.original = IUploadable(original)
394 self._log_number = log_parent
395 self._encryptor = None
396 self._plaintext_hasher = plaintext_hasher()
397 self._plaintext_segment_hasher = None
398 self._plaintext_segment_hashes = []
399 self._encoding_parameters = None
400 self._file_size = None
401 self._ciphertext_bytes_read = 0
404 def set_upload_status(self, upload_status):
405 self._status = IUploadStatus(upload_status)
406 self.original.set_upload_status(upload_status)
408 def log(self, *args, **kwargs):
409 if "facility" not in kwargs:
410 kwargs["facility"] = "upload.encryption"
411 if "parent" not in kwargs:
412 kwargs["parent"] = self._log_number
413 return log.msg(*args, **kwargs)
416 if self._file_size is not None:
417 return defer.succeed(self._file_size)
418 d = self.original.get_size()
420 self._file_size = size
422 self._status.set_size(size)
424 d.addCallback(_got_size)
427 def get_all_encoding_parameters(self):
428 if self._encoding_parameters is not None:
429 return defer.succeed(self._encoding_parameters)
430 d = self.original.get_all_encoding_parameters()
431 def _got(encoding_parameters):
432 (k, happy, n, segsize) = encoding_parameters
433 self._segment_size = segsize # used by segment hashers
434 self._encoding_parameters = encoding_parameters
435 self.log("my encoding parameters: %s" % (encoding_parameters,),
437 return encoding_parameters
441 def _get_encryptor(self):
443 return defer.succeed(self._encryptor)
445 d = self.original.get_encryption_key()
450 storage_index = storage_index_hash(key)
451 assert isinstance(storage_index, str)
452 # There's no point to having the SI be longer than the key, so we
453 # specify that it is truncated to the same 128 bits as the AES key.
454 assert len(storage_index) == 16 # SHA-256 truncated to 128b
455 self._storage_index = storage_index
457 self._status.set_storage_index(storage_index)
462 def get_storage_index(self):
463 d = self._get_encryptor()
464 d.addCallback(lambda res: self._storage_index)
467 def _get_segment_hasher(self):
468 p = self._plaintext_segment_hasher
470 left = self._segment_size - self._plaintext_segment_hashed_bytes
472 p = plaintext_segment_hasher()
473 self._plaintext_segment_hasher = p
474 self._plaintext_segment_hashed_bytes = 0
475 return p, self._segment_size
477 def _update_segment_hash(self, chunk):
479 while offset < len(chunk):
480 p, segment_left = self._get_segment_hasher()
481 chunk_left = len(chunk) - offset
482 this_segment = min(chunk_left, segment_left)
483 p.update(chunk[offset:offset+this_segment])
484 self._plaintext_segment_hashed_bytes += this_segment
486 if self._plaintext_segment_hashed_bytes == self._segment_size:
487 # we've filled this segment
488 self._plaintext_segment_hashes.append(p.digest())
489 self._plaintext_segment_hasher = None
490 self.log("closed hash [%d]: %dB" %
491 (len(self._plaintext_segment_hashes)-1,
492 self._plaintext_segment_hashed_bytes),
494 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
495 segnum=len(self._plaintext_segment_hashes)-1,
496 hash=base32.b2a(p.digest()),
499 offset += this_segment
502 def read_encrypted(self, length, hash_only):
503 # make sure our parameters have been set up first
504 d = self.get_all_encoding_parameters()
506 d.addCallback(lambda ignored: self.get_size())
507 d.addCallback(lambda ignored: self._get_encryptor())
508 # then fetch and encrypt the plaintext. The unusual structure here
509 # (passing a Deferred *into* a function) is needed to avoid
510 # overflowing the stack: Deferreds don't optimize out tail recursion.
511 # We also pass in a list, to which _read_encrypted will append
514 d2 = defer.Deferred()
515 d.addCallback(lambda ignored:
516 self._read_encrypted(length, ciphertext, hash_only, d2))
517 d.addCallback(lambda ignored: d2)
520 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
522 fire_when_done.callback(ciphertext)
524 # tolerate large length= values without consuming a lot of RAM by
525 # reading just a chunk (say 50kB) at a time. This only really matters
526 # when hash_only==True (i.e. resuming an interrupted upload), since
527 # that's the case where we will be skipping over a lot of data.
528 size = min(remaining, self.CHUNKSIZE)
529 remaining = remaining - size
530 # read a chunk of plaintext..
531 d = defer.maybeDeferred(self.original.read, size)
532 # N.B.: if read() is synchronous, then since everything else is
533 # actually synchronous too, we'd blow the stack unless we stall for a
534 # tick. Once you accept a Deferred from IUploadable.read(), you must
535 # be prepared to have it fire immediately too.
536 d.addCallback(fireEventually)
537 def _good(plaintext):
539 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
540 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
541 ciphertext.extend(ct)
542 self._read_encrypted(remaining, ciphertext, hash_only,
545 fire_when_done.errback(why)
550 def _hash_and_encrypt_plaintext(self, data, hash_only):
551 assert isinstance(data, (tuple, list)), type(data)
554 # we use data.pop(0) instead of 'for chunk in data' to save
555 # memory: each chunk is destroyed as soon as we're done with it.
559 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
561 bytes_processed += len(chunk)
562 self._plaintext_hasher.update(chunk)
563 self._update_segment_hash(chunk)
564 # TODO: we have to encrypt the data (even if hash_only==True)
565 # because pycryptopp's AES-CTR implementation doesn't offer a
566 # way to change the counter value. Once pycryptopp acquires
567 # this ability, change this to simply update the counter
568 # before each call to (hash_only==False) _encryptor.process()
569 ciphertext = self._encryptor.process(chunk)
571 self.log(" skipping encryption", level=log.NOISY)
573 cryptdata.append(ciphertext)
576 self._ciphertext_bytes_read += bytes_processed
578 progress = float(self._ciphertext_bytes_read) / self._file_size
579 self._status.set_progress(1, progress)
583 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
584 # this is currently unused, but will live again when we fix #453
585 if len(self._plaintext_segment_hashes) < num_segments:
586 # close out the last one
587 assert len(self._plaintext_segment_hashes) == num_segments-1
588 p, segment_left = self._get_segment_hasher()
589 self._plaintext_segment_hashes.append(p.digest())
590 del self._plaintext_segment_hasher
591 self.log("closing plaintext leaf hasher, hashed %d bytes" %
592 self._plaintext_segment_hashed_bytes,
594 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
595 segnum=len(self._plaintext_segment_hashes)-1,
596 hash=base32.b2a(p.digest()),
598 assert len(self._plaintext_segment_hashes) == num_segments
599 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
601 def get_plaintext_hash(self):
602 h = self._plaintext_hasher.digest()
603 return defer.succeed(h)
606 return self.original.close()
609 implements(IUploadStatus)
610 statusid_counter = itertools.count(0)
613 self.storage_index = None
616 self.status = "Not started"
617 self.progress = [0.0, 0.0, 0.0]
620 self.counter = self.statusid_counter.next()
621 self.started = time.time()
623 def get_started(self):
625 def get_storage_index(self):
626 return self.storage_index
629 def using_helper(self):
631 def get_status(self):
633 def get_progress(self):
634 return tuple(self.progress)
635 def get_active(self):
637 def get_results(self):
639 def get_counter(self):
642 def set_storage_index(self, si):
643 self.storage_index = si
644 def set_size(self, size):
646 def set_helper(self, helper):
648 def set_status(self, status):
650 def set_progress(self, which, value):
651 # [0]: chk, [1]: ciphertext, [2]: encode+push
652 self.progress[which] = value
653 def set_active(self, value):
655 def set_results(self, value):
659 peer_selector_class = Tahoe2PeerSelector
661 def __init__(self, storage_broker, secret_holder):
662 # peer_selector needs storage_broker and secret_holder
663 self._storage_broker = storage_broker
664 self._secret_holder = secret_holder
665 self._log_number = self.log("CHKUploader starting", parent=None)
667 self._results = UploadResults()
668 self._storage_index = None
669 self._upload_status = UploadStatus()
670 self._upload_status.set_helper(False)
671 self._upload_status.set_active(True)
672 self._upload_status.set_results(self._results)
674 # locate_all_shareholders() will create the following attribute:
675 # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
677 def log(self, *args, **kwargs):
678 if "parent" not in kwargs:
679 kwargs["parent"] = self._log_number
680 if "facility" not in kwargs:
681 kwargs["facility"] = "tahoe.upload"
682 return log.msg(*args, **kwargs)
684 def start(self, encrypted_uploadable):
685 """Start uploading the file.
687 Returns a Deferred that will fire with the UploadResults instance.
690 self._started = time.time()
691 eu = IEncryptedUploadable(encrypted_uploadable)
692 self.log("starting upload of %s" % eu)
694 eu.set_upload_status(self._upload_status)
695 d = self.start_encrypted(eu)
696 def _done(uploadresults):
697 self._upload_status.set_active(False)
703 """Call this if the upload must be abandoned before it completes.
704 This will tell the shareholders to delete their partial shares. I
705 return a Deferred that fires when these messages have been acked."""
706 if not self._encoder:
707 # how did you call abort() before calling start() ?
708 return defer.succeed(None)
709 return self._encoder.abort()
711 def start_encrypted(self, encrypted):
712 """ Returns a Deferred that will fire with the UploadResults instance. """
713 eu = IEncryptedUploadable(encrypted)
715 started = time.time()
716 self._encoder = e = encode.Encoder(self._log_number,
718 d = e.set_encrypted_uploadable(eu)
719 d.addCallback(self.locate_all_shareholders, started)
720 d.addCallback(self.set_shareholders, e)
721 d.addCallback(lambda res: e.start())
722 d.addCallback(self._encrypted_done)
725 def locate_all_shareholders(self, encoder, started):
726 peer_selection_started = now = time.time()
727 self._storage_index_elapsed = now - started
728 storage_broker = self._storage_broker
729 secret_holder = self._secret_holder
730 storage_index = encoder.get_param("storage_index")
731 self._storage_index = storage_index
732 upload_id = si_b2a(storage_index)[:5]
733 self.log("using storage index %s" % upload_id)
734 peer_selector = self.peer_selector_class(upload_id, self._log_number,
737 share_size = encoder.get_param("share_size")
738 block_size = encoder.get_param("block_size")
739 num_segments = encoder.get_param("num_segments")
740 k,desired,n = encoder.get_param("share_counts")
742 self._peer_selection_started = time.time()
743 d = peer_selector.get_shareholders(storage_broker, secret_holder,
745 share_size, block_size,
746 num_segments, n, desired)
748 self._peer_selection_elapsed = time.time() - peer_selection_started
753 def set_shareholders(self, (used_peers, already_peers), encoder):
755 @param used_peers: a sequence of PeerTracker objects
756 @paran already_peers: a dict mapping sharenum to a peerid that
757 claims to already have this share
759 self.log("_send_shares, used_peers is %s" % (used_peers,))
760 # record already-present shares in self._results
761 self._results.preexisting_shares = len(already_peers)
763 self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
764 for peer in used_peers:
765 assert isinstance(peer, PeerTracker)
767 for peer in used_peers:
768 buckets.update(peer.buckets)
769 for shnum in peer.buckets:
770 self._peer_trackers[shnum] = peer
771 assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
772 encoder.set_shareholders(buckets)
774 def _encrypted_done(self, verifycap):
775 """ Returns a Deferred that will fire with the UploadResults instance. """
777 for shnum in self._encoder.get_shares_placed():
778 peer_tracker = self._peer_trackers[shnum]
779 peerid = peer_tracker.peerid
780 peerid_s = idlib.shortnodeid_b2a(peerid)
781 r.sharemap.add(shnum, peerid)
782 r.servermap.add(peerid, shnum)
783 r.pushed_shares = len(self._encoder.get_shares_placed())
785 r.file_size = self._encoder.file_size
786 r.timings["total"] = now - self._started
787 r.timings["storage_index"] = self._storage_index_elapsed
788 r.timings["peer_selection"] = self._peer_selection_elapsed
789 r.timings.update(self._encoder.get_times())
790 r.uri_extension_data = self._encoder.get_uri_extension_data()
791 r.verifycapstr = verifycap.to_string()
794 def get_upload_status(self):
795 return self._upload_status
797 def read_this_many_bytes(uploadable, size, prepend_data=[]):
799 return defer.succeed([])
800 d = uploadable.read(size)
802 assert isinstance(data, list)
803 bytes = sum([len(piece) for piece in data])
806 remaining = size - bytes
808 return read_this_many_bytes(uploadable, remaining,
810 return prepend_data + data
814 class LiteralUploader:
817 self._results = UploadResults()
818 self._status = s = UploadStatus()
819 s.set_storage_index(None)
821 s.set_progress(0, 1.0)
823 s.set_results(self._results)
825 def start(self, uploadable):
826 uploadable = IUploadable(uploadable)
827 d = uploadable.get_size()
830 self._status.set_size(size)
831 self._results.file_size = size
832 return read_this_many_bytes(uploadable, size)
833 d.addCallback(_got_size)
834 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
835 d.addCallback(lambda u: u.to_string())
836 d.addCallback(self._build_results)
839 def _build_results(self, uri):
840 self._results.uri = uri
841 self._status.set_status("Finished")
842 self._status.set_progress(1, 1.0)
843 self._status.set_progress(2, 1.0)
849 def get_upload_status(self):
852 class RemoteEncryptedUploadable(Referenceable):
853 implements(RIEncryptedUploadable)
855 def __init__(self, encrypted_uploadable, upload_status):
856 self._eu = IEncryptedUploadable(encrypted_uploadable)
859 self._status = IUploadStatus(upload_status)
860 # we are responsible for updating the status string while we run, and
861 # for setting the ciphertext-fetch progress.
865 if self._size is not None:
866 return defer.succeed(self._size)
867 d = self._eu.get_size()
871 d.addCallback(_got_size)
874 def remote_get_size(self):
875 return self.get_size()
876 def remote_get_all_encoding_parameters(self):
877 return self._eu.get_all_encoding_parameters()
879 def _read_encrypted(self, length, hash_only):
880 d = self._eu.read_encrypted(length, hash_only)
883 self._offset += length
885 size = sum([len(data) for data in strings])
891 def remote_read_encrypted(self, offset, length):
892 # we don't support seek backwards, but we allow skipping forwards
893 precondition(offset >= 0, offset)
894 precondition(length >= 0, length)
895 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
897 precondition(offset >= self._offset, offset, self._offset)
898 if offset > self._offset:
899 # read the data from disk anyways, to build up the hash tree
900 skip = offset - self._offset
901 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
902 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
903 d = self._read_encrypted(skip, hash_only=True)
905 d = defer.succeed(None)
907 def _at_correct_offset(res):
908 assert offset == self._offset, "%d != %d" % (offset, self._offset)
909 return self._read_encrypted(length, hash_only=False)
910 d.addCallback(_at_correct_offset)
913 size = sum([len(data) for data in strings])
914 self._bytes_sent += size
919 def remote_close(self):
920 return self._eu.close()
923 class AssistedUploader:
925 def __init__(self, helper):
926 self._helper = helper
927 self._log_number = log.msg("AssistedUploader starting")
928 self._storage_index = None
929 self._upload_status = s = UploadStatus()
933 def log(self, *args, **kwargs):
934 if "parent" not in kwargs:
935 kwargs["parent"] = self._log_number
936 return log.msg(*args, **kwargs)
938 def start(self, encrypted_uploadable, storage_index):
939 """Start uploading the file.
941 Returns a Deferred that will fire with the UploadResults instance.
943 precondition(isinstance(storage_index, str), storage_index)
944 self._started = time.time()
945 eu = IEncryptedUploadable(encrypted_uploadable)
946 eu.set_upload_status(self._upload_status)
947 self._encuploadable = eu
948 self._storage_index = storage_index
950 d.addCallback(self._got_size)
951 d.addCallback(lambda res: eu.get_all_encoding_parameters())
952 d.addCallback(self._got_all_encoding_parameters)
953 d.addCallback(self._contact_helper)
954 d.addCallback(self._build_verifycap)
956 self._upload_status.set_active(False)
961 def _got_size(self, size):
963 self._upload_status.set_size(size)
965 def _got_all_encoding_parameters(self, params):
966 k, happy, n, segment_size = params
967 # stash these for URI generation later
968 self._needed_shares = k
969 self._total_shares = n
970 self._segment_size = segment_size
972 def _contact_helper(self, res):
973 now = self._time_contacting_helper_start = time.time()
974 self._storage_index_elapsed = now - self._started
975 self.log(format="contacting helper for SI %(si)s..",
976 si=si_b2a(self._storage_index))
977 self._upload_status.set_status("Contacting Helper")
978 d = self._helper.callRemote("upload_chk", self._storage_index)
979 d.addCallback(self._contacted_helper)
982 def _contacted_helper(self, (upload_results, upload_helper)):
984 elapsed = now - self._time_contacting_helper_start
985 self._elapsed_time_contacting_helper = elapsed
987 self.log("helper says we need to upload")
988 self._upload_status.set_status("Uploading Ciphertext")
989 # we need to upload the file
990 reu = RemoteEncryptedUploadable(self._encuploadable,
992 # let it pre-compute the size for progress purposes
994 d.addCallback(lambda ignored:
995 upload_helper.callRemote("upload", reu))
996 # this Deferred will fire with the upload results
998 self.log("helper says file is already uploaded")
999 self._upload_status.set_progress(1, 1.0)
1000 self._upload_status.set_results(upload_results)
1001 return upload_results
1003 def _convert_old_upload_results(self, upload_results):
1004 # pre-1.3.0 helpers return upload results which contain a mapping
1005 # from shnum to a single human-readable string, containing things
1006 # like "Found on [x],[y],[z]" (for healthy files that were already in
1007 # the grid), "Found on [x]" (for files that needed upload but which
1008 # discovered pre-existing shares), and "Placed on [x]" (for newly
1009 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1010 # set of binary serverid strings.
1012 # the old results are too hard to deal with (they don't even contain
1013 # as much information as the new results, since the nodeids are
1014 # abbreviated), so if we detect old results, just clobber them.
1016 sharemap = upload_results.sharemap
1017 if str in [type(v) for v in sharemap.values()]:
1018 upload_results.sharemap = None
1020 def _build_verifycap(self, upload_results):
1021 self.log("upload finished, building readcap")
1022 self._convert_old_upload_results(upload_results)
1023 self._upload_status.set_status("Building Readcap")
1025 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1026 assert r.uri_extension_data["total_shares"] == self._total_shares
1027 assert r.uri_extension_data["segment_size"] == self._segment_size
1028 assert r.uri_extension_data["size"] == self._size
1029 r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1030 uri_extension_hash=r.uri_extension_hash,
1031 needed_shares=self._needed_shares,
1032 total_shares=self._total_shares, size=self._size
1035 r.file_size = self._size
1036 r.timings["storage_index"] = self._storage_index_elapsed
1037 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1038 if "total" in r.timings:
1039 r.timings["helper_total"] = r.timings["total"]
1040 r.timings["total"] = now - self._started
1041 self._upload_status.set_status("Finished")
1042 self._upload_status.set_results(r)
1045 def get_upload_status(self):
1046 return self._upload_status
1048 class BaseUploadable:
1049 default_max_segment_size = 128*KiB # overridden by max_segment_size
1050 default_encoding_param_k = 3 # overridden by encoding_parameters
1051 default_encoding_param_happy = 7
1052 default_encoding_param_n = 10
1054 max_segment_size = None
1055 encoding_param_k = None
1056 encoding_param_happy = None
1057 encoding_param_n = None
1059 _all_encoding_parameters = None
1062 def set_upload_status(self, upload_status):
1063 self._status = IUploadStatus(upload_status)
1065 def set_default_encoding_parameters(self, default_params):
1066 assert isinstance(default_params, dict)
1067 for k,v in default_params.items():
1068 precondition(isinstance(k, str), k, v)
1069 precondition(isinstance(v, int), k, v)
1070 if "k" in default_params:
1071 self.default_encoding_param_k = default_params["k"]
1072 if "happy" in default_params:
1073 self.default_encoding_param_happy = default_params["happy"]
1074 if "n" in default_params:
1075 self.default_encoding_param_n = default_params["n"]
1076 if "max_segment_size" in default_params:
1077 self.default_max_segment_size = default_params["max_segment_size"]
1079 def get_all_encoding_parameters(self):
1080 if self._all_encoding_parameters:
1081 return defer.succeed(self._all_encoding_parameters)
1083 max_segsize = self.max_segment_size or self.default_max_segment_size
1084 k = self.encoding_param_k or self.default_encoding_param_k
1085 happy = self.encoding_param_happy or self.default_encoding_param_happy
1086 n = self.encoding_param_n or self.default_encoding_param_n
1089 def _got_size(file_size):
1090 # for small files, shrink the segment size to avoid wasting space
1091 segsize = min(max_segsize, file_size)
1092 # this must be a multiple of 'required_shares'==k
1093 segsize = mathutil.next_multiple(segsize, k)
1094 encoding_parameters = (k, happy, n, segsize)
1095 self._all_encoding_parameters = encoding_parameters
1096 return encoding_parameters
1097 d.addCallback(_got_size)
1100 class FileHandle(BaseUploadable):
1101 implements(IUploadable)
1103 def __init__(self, filehandle, convergence):
1105 Upload the data from the filehandle. If convergence is None then a
1106 random encryption key will be used, else the plaintext will be hashed,
1107 then the hash will be hashed together with the string in the
1108 "convergence" argument to form the encryption key.
1110 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1111 self._filehandle = filehandle
1113 self.convergence = convergence
1116 def _get_encryption_key_convergent(self):
1117 if self._key is not None:
1118 return defer.succeed(self._key)
1121 # that sets self._size as a side-effect
1122 d.addCallback(lambda size: self.get_all_encoding_parameters())
1124 k, happy, n, segsize = params
1125 f = self._filehandle
1126 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1131 data = f.read(BLOCKSIZE)
1134 enckey_hasher.update(data)
1135 # TODO: setting progress in a non-yielding loop is kind of
1136 # pointless, but I'm anticipating (perhaps prematurely) the
1137 # day when we use a slowjob or twisted's CooperatorService to
1138 # make this yield time to other jobs.
1139 bytes_read += len(data)
1141 self._status.set_progress(0, float(bytes_read)/self._size)
1143 self._key = enckey_hasher.digest()
1145 self._status.set_progress(0, 1.0)
1146 assert len(self._key) == 16
1151 def _get_encryption_key_random(self):
1152 if self._key is None:
1153 self._key = os.urandom(16)
1154 return defer.succeed(self._key)
1156 def get_encryption_key(self):
1157 if self.convergence is not None:
1158 return self._get_encryption_key_convergent()
1160 return self._get_encryption_key_random()
1163 if self._size is not None:
1164 return defer.succeed(self._size)
1165 self._filehandle.seek(0,2)
1166 size = self._filehandle.tell()
1168 self._filehandle.seek(0)
1169 return defer.succeed(size)
1171 def read(self, length):
1172 return defer.succeed([self._filehandle.read(length)])
1175 # the originator of the filehandle reserves the right to close it
1178 class FileName(FileHandle):
1179 def __init__(self, filename, convergence):
1181 Upload the data from the filename. If convergence is None then a
1182 random encryption key will be used, else the plaintext will be hashed,
1183 then the hash will be hashed together with the string in the
1184 "convergence" argument to form the encryption key.
1186 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1187 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1189 FileHandle.close(self)
1190 self._filehandle.close()
1192 class Data(FileHandle):
1193 def __init__(self, data, convergence):
1195 Upload the data from the data argument. If convergence is None then a
1196 random encryption key will be used, else the plaintext will be hashed,
1197 then the hash will be hashed together with the string in the
1198 "convergence" argument to form the encryption key.
1200 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1201 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1203 class Uploader(service.MultiService, log.PrefixingLogMixin):
1204 """I am a service that allows file uploading. I am a service-child of the
1207 implements(IUploader)
1209 URI_LIT_SIZE_THRESHOLD = 55
1211 def __init__(self, helper_furl=None, stats_provider=None):
1212 self._helper_furl = helper_furl
1213 self.stats_provider = stats_provider
1215 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1216 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1217 service.MultiService.__init__(self)
1219 def startService(self):
1220 service.MultiService.startService(self)
1221 if self._helper_furl:
1222 self.parent.tub.connectTo(self._helper_furl,
1225 def _got_helper(self, helper):
1226 self.log("got helper connection, getting versions")
1227 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1229 "application-version": "unknown: no get_version()",
1231 d = add_version_to_remote_reference(helper, default)
1232 d.addCallback(self._got_versioned_helper)
1234 def _got_versioned_helper(self, helper):
1235 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1236 if needed not in helper.version:
1237 raise InsufficientVersionError(needed, helper.version)
1238 self._helper = helper
1239 helper.notifyOnDisconnect(self._lost_helper)
1241 def _lost_helper(self):
1244 def get_helper_info(self):
1245 # return a tuple of (helper_furl_or_None, connected_bool)
1246 return (self._helper_furl, bool(self._helper))
1249 def upload(self, uploadable, history=None):
1251 Returns a Deferred that will fire with the UploadResults instance.
1256 uploadable = IUploadable(uploadable)
1257 d = uploadable.get_size()
1258 def _got_size(size):
1259 default_params = self.parent.get_encoding_parameters()
1260 precondition(isinstance(default_params, dict), default_params)
1261 precondition("max_segment_size" in default_params, default_params)
1262 uploadable.set_default_encoding_parameters(default_params)
1264 if self.stats_provider:
1265 self.stats_provider.count('uploader.files_uploaded', 1)
1266 self.stats_provider.count('uploader.bytes_uploaded', size)
1268 if size <= self.URI_LIT_SIZE_THRESHOLD:
1269 uploader = LiteralUploader()
1270 return uploader.start(uploadable)
1272 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1273 d2 = defer.succeed(None)
1275 uploader = AssistedUploader(self._helper)
1276 d2.addCallback(lambda x: eu.get_storage_index())
1277 d2.addCallback(lambda si: uploader.start(eu, si))
1279 storage_broker = self.parent.get_storage_broker()
1280 secret_holder = self.parent._secret_holder
1281 uploader = CHKUploader(storage_broker, secret_holder)
1282 d2.addCallback(lambda x: uploader.start(eu))
1284 self._all_uploads[uploader] = None
1286 history.add_upload(uploader.get_upload_status())
1287 def turn_verifycap_into_read_cap(uploadresults):
1288 # Generate the uri from the verifycap plus the key.
1289 d3 = uploadable.get_encryption_key()
1290 def put_readcap_into_results(key):
1291 v = uri.from_string(uploadresults.verifycapstr)
1292 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1293 uploadresults.uri = r.to_string()
1294 return uploadresults
1295 d3.addCallback(put_readcap_into_results)
1297 d2.addCallback(turn_verifycap_into_read_cap)
1299 d.addCallback(_got_size)