1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9 file_cancel_secret_hash, bucket_renewal_secret_hash, \
10 bucket_cancel_secret_hash, plaintext_hasher, \
11 storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.rrefutil import add_version_to_remote_reference
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
20 NotEnoughSharesError, InsufficientVersionError, NoServersError
21 from allmydata.immutable import layout
22 from pycryptopp.cipher.aes import AES
24 from cStringIO import StringIO
33 class HaveAllPeersError(Exception):
34 # we use this to jump out of the loop
37 # this wants to live in storage, not here
38 class TooFullError(Exception):
41 class UploadResults(Copyable, RemoteCopy):
42 implements(IUploadResults)
43 # note: don't change this string, it needs to match the value used on the
44 # helper, and it does *not* need to match the fully-qualified
45 # package/module/class name
46 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
49 # also, think twice about changing the shape of any existing attribute,
50 # because instances of this class are sent from the helper to its client,
51 # so changing this may break compatibility. Consider adding new fields
52 # instead of modifying existing ones.
55 self.timings = {} # dict of name to number of seconds
56 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
57 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
59 self.ciphertext_fetched = None # how much the helper fetched
61 self.preexisting_shares = None # count of shares already present
62 self.pushed_shares = None # count of shares we pushed
65 # our current uri_extension is 846 bytes for small files, a few bytes
66 # more for larger ones (since the filesize is encoded in decimal in a
67 # few places). Ask for a little bit more just in case we need it. If
68 # the extension changes size, we can change EXTENSION_SIZE to
69 # allocate a more accurate amount of space.
71 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
75 def __init__(self, peerid, storage_server,
76 sharesize, blocksize, num_segments, num_share_hashes,
78 bucket_renewal_secret, bucket_cancel_secret):
79 precondition(isinstance(peerid, str), peerid)
80 precondition(len(peerid) == 20, peerid)
82 self._storageserver = storage_server # to an RIStorageServer
83 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
84 self.sharesize = sharesize
86 wbp = layout.make_write_bucket_proxy(None, sharesize,
87 blocksize, num_segments,
89 EXTENSION_SIZE, peerid)
90 self.wbp_class = wbp.__class__ # to create more of them
91 self.allocated_size = wbp.get_allocated_size()
92 self.blocksize = blocksize
93 self.num_segments = num_segments
94 self.num_share_hashes = num_share_hashes
95 self.storage_index = storage_index
97 self.renew_secret = bucket_renewal_secret
98 self.cancel_secret = bucket_cancel_secret
101 return ("<PeerTracker for peer %s and SI %s>"
102 % (idlib.shortnodeid_b2a(self.peerid),
103 si_b2a(self.storage_index)[:5]))
105 def query(self, sharenums):
106 d = self._storageserver.callRemote("allocate_buckets",
112 canary=Referenceable())
113 d.addCallback(self._got_reply)
116 def _got_reply(self, (alreadygot, buckets)):
117 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
119 for sharenum, rref in buckets.iteritems():
120 bp = self.wbp_class(rref, self.sharesize,
123 self.num_share_hashes,
127 self.buckets.update(b)
128 return (alreadygot, set(b.keys()))
130 class Tahoe2PeerSelector:
132 def __init__(self, upload_id, logparent=None, upload_status=None):
133 self.upload_id = upload_id
134 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
136 self.num_peers_contacted = 0
137 self.last_failure_msg = None
138 self._status = IUploadStatus(upload_status)
139 self._log_parent = log.msg("%s starting" % self, parent=logparent)
142 return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
144 def get_shareholders(self, client,
145 storage_index, share_size, block_size,
146 num_segments, total_shares, shares_of_happiness):
148 @return: (used_peers, already_peers), where used_peers is a set of
149 PeerTracker instances that have agreed to hold some shares
150 for us (the shnum is stashed inside the PeerTracker),
151 and already_peers is a dict mapping shnum to a peer
152 which claims to already have the share.
156 self._status.set_status("Contacting Peers..")
158 self.total_shares = total_shares
159 self.shares_of_happiness = shares_of_happiness
161 self.homeless_shares = range(total_shares)
162 # self.uncontacted_peers = list() # peers we haven't asked yet
163 self.contacted_peers = [] # peers worth asking again
164 self.contacted_peers2 = [] # peers that we have asked again
165 self._started_second_pass = False
166 self.use_peers = set() # PeerTrackers that have shares assigned to them
167 self.preexisting_shares = {} # sharenum -> peerid holding the share
169 sb = client.get_storage_broker()
170 peers = list(sb.get_servers(storage_index))
172 raise NoServersError("client gave us zero peers")
174 # this needed_hashes computation should mirror
175 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
176 # (instead of a HashTree) because we don't require actual hashing
177 # just to count the levels.
178 ht = hashtree.IncompleteHashTree(total_shares)
179 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
181 # figure out how much space to ask for
182 wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
183 num_share_hashes, EXTENSION_SIZE,
185 allocated_size = wbp.get_allocated_size()
187 # filter the list of peers according to which ones can accomodate
188 # this request. This excludes older peers (which used a 4-byte size
189 # field) from getting large shares (for files larger than about
190 # 12GiB). See #439 for details.
191 def _get_maxsize(peer):
192 (peerid, conn) = peer
193 v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
194 return v1["maximum-immutable-share-size"]
195 peers = [peer for peer in peers
196 if _get_maxsize(peer) >= allocated_size]
198 raise NoServersError("no peers could accept an allocated_size of %d" % allocated_size)
200 # decide upon the renewal/cancel secrets, to include them in the
201 # allocat_buckets query.
202 client_renewal_secret = client.get_renewal_secret()
203 client_cancel_secret = client.get_cancel_secret()
205 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
207 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
210 trackers = [ PeerTracker(peerid, conn,
211 share_size, block_size,
212 num_segments, num_share_hashes,
214 bucket_renewal_secret_hash(file_renewal_secret,
216 bucket_cancel_secret_hash(file_cancel_secret,
219 for (peerid, conn) in peers ]
220 self.uncontacted_peers = trackers
222 d = defer.maybeDeferred(self._loop)
226 if not self.homeless_shares:
228 msg = ("placed all %d shares, "
229 "sent %d queries to %d peers, "
230 "%d queries placed some shares, %d placed none, "
233 self.query_count, self.num_peers_contacted,
234 self.good_query_count, self.bad_query_count,
236 log.msg("peer selection successful for %s: %s" % (self, msg),
237 parent=self._log_parent)
238 return (self.use_peers, self.preexisting_shares)
240 if self.uncontacted_peers:
241 peer = self.uncontacted_peers.pop(0)
242 # TODO: don't pre-convert all peerids to PeerTrackers
243 assert isinstance(peer, PeerTracker)
245 shares_to_ask = set([self.homeless_shares.pop(0)])
246 self.query_count += 1
247 self.num_peers_contacted += 1
249 self._status.set_status("Contacting Peers [%s] (first query),"
251 % (idlib.shortnodeid_b2a(peer.peerid),
252 len(self.homeless_shares)))
253 d = peer.query(shares_to_ask)
254 d.addBoth(self._got_response, peer, shares_to_ask,
255 self.contacted_peers)
257 elif self.contacted_peers:
258 # ask a peer that we've already asked.
259 if not self._started_second_pass:
260 log.msg("starting second pass", parent=self._log_parent,
262 self._started_second_pass = True
263 num_shares = mathutil.div_ceil(len(self.homeless_shares),
264 len(self.contacted_peers))
265 peer = self.contacted_peers.pop(0)
266 shares_to_ask = set(self.homeless_shares[:num_shares])
267 self.homeless_shares[:num_shares] = []
268 self.query_count += 1
270 self._status.set_status("Contacting Peers [%s] (second query),"
272 % (idlib.shortnodeid_b2a(peer.peerid),
273 len(self.homeless_shares)))
274 d = peer.query(shares_to_ask)
275 d.addBoth(self._got_response, peer, shares_to_ask,
276 self.contacted_peers2)
278 elif self.contacted_peers2:
279 # we've finished the second-or-later pass. Move all the remaining
280 # peers back into self.contacted_peers for the next pass.
281 self.contacted_peers.extend(self.contacted_peers2)
282 self.contacted_peers[:] = []
285 # no more peers. If we haven't placed enough shares, we fail.
286 placed_shares = self.total_shares - len(self.homeless_shares)
287 if placed_shares < self.shares_of_happiness:
288 msg = ("placed %d shares out of %d total (%d homeless), "
289 "sent %d queries to %d peers, "
290 "%d queries placed some shares, %d placed none, "
292 (self.total_shares - len(self.homeless_shares),
293 self.total_shares, len(self.homeless_shares),
294 self.query_count, self.num_peers_contacted,
295 self.good_query_count, self.bad_query_count,
297 msg = "peer selection failed for %s: %s" % (self, msg)
298 if self.last_failure_msg:
299 msg += " (%s)" % (self.last_failure_msg,)
300 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
301 raise NotEnoughSharesError(msg, placed_shares,
302 self.shares_of_happiness)
304 # we placed enough to be happy, so we're done
306 self._status.set_status("Placed all shares")
307 return self.use_peers
309 def _got_response(self, res, peer, shares_to_ask, put_peer_here):
310 if isinstance(res, failure.Failure):
311 # This is unusual, and probably indicates a bug or a network
313 log.msg("%s got error during peer selection: %s" % (peer, res),
314 level=log.UNUSUAL, parent=self._log_parent)
315 self.error_count += 1
316 self.homeless_shares = list(shares_to_ask) + self.homeless_shares
317 if (self.uncontacted_peers
318 or self.contacted_peers
319 or self.contacted_peers2):
320 # there is still hope, so just loop
323 # No more peers, so this upload might fail (it depends upon
324 # whether we've hit shares_of_happiness or not). Log the last
325 # failure we got: if a coding error causes all peers to fail
326 # in the same way, this allows the common failure to be seen
327 # by the uploader and should help with debugging
328 msg = ("last failure (from %s) was: %s" % (peer, res))
329 self.last_failure_msg = msg
331 (alreadygot, allocated) = res
332 log.msg("response from peer %s: alreadygot=%s, allocated=%s"
333 % (idlib.shortnodeid_b2a(peer.peerid),
334 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
335 level=log.NOISY, parent=self._log_parent)
338 self.preexisting_shares[s] = peer.peerid
339 if s in self.homeless_shares:
340 self.homeless_shares.remove(s)
343 # the PeerTracker will remember which shares were allocated on
344 # that peer. We just have to remember to use them.
346 self.use_peers.add(peer)
349 not_yet_present = set(shares_to_ask) - set(alreadygot)
350 still_homeless = not_yet_present - set(allocated)
353 # they accepted or already had at least one share, so
354 # progress has been made
355 self.good_query_count += 1
357 self.bad_query_count += 1
360 # In networks with lots of space, this is very unusual and
361 # probably indicates an error. In networks with peers that
362 # are full, it is merely unusual. In networks that are very
363 # full, it is common, and many uploads will fail. In most
364 # cases, this is obviously not fatal, and we'll just use some
367 # some shares are still homeless, keep trying to find them a
368 # home. The ones that were rejected get first priority.
369 self.homeless_shares = (list(still_homeless)
370 + self.homeless_shares)
371 # Since they were unable to accept all of our requests, so it
372 # is safe to assume that asking them again won't help.
374 # if they *were* able to accept everything, they might be
375 # willing to accept even more.
376 put_peer_here.append(peer)
382 class EncryptAnUploadable:
383 """This is a wrapper that takes an IUploadable and provides
384 IEncryptedUploadable."""
385 implements(IEncryptedUploadable)
388 def __init__(self, original, log_parent=None):
389 self.original = IUploadable(original)
390 self._log_number = log_parent
391 self._encryptor = None
392 self._plaintext_hasher = plaintext_hasher()
393 self._plaintext_segment_hasher = None
394 self._plaintext_segment_hashes = []
395 self._encoding_parameters = None
396 self._file_size = None
397 self._ciphertext_bytes_read = 0
400 def set_upload_status(self, upload_status):
401 self._status = IUploadStatus(upload_status)
402 self.original.set_upload_status(upload_status)
404 def log(self, *args, **kwargs):
405 if "facility" not in kwargs:
406 kwargs["facility"] = "upload.encryption"
407 if "parent" not in kwargs:
408 kwargs["parent"] = self._log_number
409 return log.msg(*args, **kwargs)
412 if self._file_size is not None:
413 return defer.succeed(self._file_size)
414 d = self.original.get_size()
416 self._file_size = size
418 self._status.set_size(size)
420 d.addCallback(_got_size)
423 def get_all_encoding_parameters(self):
424 if self._encoding_parameters is not None:
425 return defer.succeed(self._encoding_parameters)
426 d = self.original.get_all_encoding_parameters()
427 def _got(encoding_parameters):
428 (k, happy, n, segsize) = encoding_parameters
429 self._segment_size = segsize # used by segment hashers
430 self._encoding_parameters = encoding_parameters
431 self.log("my encoding parameters: %s" % (encoding_parameters,),
433 return encoding_parameters
437 def _get_encryptor(self):
439 return defer.succeed(self._encryptor)
441 d = self.original.get_encryption_key()
446 storage_index = storage_index_hash(key)
447 assert isinstance(storage_index, str)
448 # There's no point to having the SI be longer than the key, so we
449 # specify that it is truncated to the same 128 bits as the AES key.
450 assert len(storage_index) == 16 # SHA-256 truncated to 128b
451 self._storage_index = storage_index
453 self._status.set_storage_index(storage_index)
458 def get_storage_index(self):
459 d = self._get_encryptor()
460 d.addCallback(lambda res: self._storage_index)
463 def _get_segment_hasher(self):
464 p = self._plaintext_segment_hasher
466 left = self._segment_size - self._plaintext_segment_hashed_bytes
468 p = plaintext_segment_hasher()
469 self._plaintext_segment_hasher = p
470 self._plaintext_segment_hashed_bytes = 0
471 return p, self._segment_size
473 def _update_segment_hash(self, chunk):
475 while offset < len(chunk):
476 p, segment_left = self._get_segment_hasher()
477 chunk_left = len(chunk) - offset
478 this_segment = min(chunk_left, segment_left)
479 p.update(chunk[offset:offset+this_segment])
480 self._plaintext_segment_hashed_bytes += this_segment
482 if self._plaintext_segment_hashed_bytes == self._segment_size:
483 # we've filled this segment
484 self._plaintext_segment_hashes.append(p.digest())
485 self._plaintext_segment_hasher = None
486 self.log("closed hash [%d]: %dB" %
487 (len(self._plaintext_segment_hashes)-1,
488 self._plaintext_segment_hashed_bytes),
490 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
491 segnum=len(self._plaintext_segment_hashes)-1,
492 hash=base32.b2a(p.digest()),
495 offset += this_segment
498 def read_encrypted(self, length, hash_only):
499 # make sure our parameters have been set up first
500 d = self.get_all_encoding_parameters()
502 d.addCallback(lambda ignored: self.get_size())
503 d.addCallback(lambda ignored: self._get_encryptor())
504 # then fetch and encrypt the plaintext. The unusual structure here
505 # (passing a Deferred *into* a function) is needed to avoid
506 # overflowing the stack: Deferreds don't optimize out tail recursion.
507 # We also pass in a list, to which _read_encrypted will append
510 d2 = defer.Deferred()
511 d.addCallback(lambda ignored:
512 self._read_encrypted(length, ciphertext, hash_only, d2))
513 d.addCallback(lambda ignored: d2)
516 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
518 fire_when_done.callback(ciphertext)
520 # tolerate large length= values without consuming a lot of RAM by
521 # reading just a chunk (say 50kB) at a time. This only really matters
522 # when hash_only==True (i.e. resuming an interrupted upload), since
523 # that's the case where we will be skipping over a lot of data.
524 size = min(remaining, self.CHUNKSIZE)
525 remaining = remaining - size
526 # read a chunk of plaintext..
527 d = defer.maybeDeferred(self.original.read, size)
528 # N.B.: if read() is synchronous, then since everything else is
529 # actually synchronous too, we'd blow the stack unless we stall for a
530 # tick. Once you accept a Deferred from IUploadable.read(), you must
531 # be prepared to have it fire immediately too.
532 d.addCallback(fireEventually)
533 def _good(plaintext):
535 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
536 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
537 ciphertext.extend(ct)
538 self._read_encrypted(remaining, ciphertext, hash_only,
541 fire_when_done.errback(why)
546 def _hash_and_encrypt_plaintext(self, data, hash_only):
547 assert isinstance(data, (tuple, list)), type(data)
550 # we use data.pop(0) instead of 'for chunk in data' to save
551 # memory: each chunk is destroyed as soon as we're done with it.
555 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
557 bytes_processed += len(chunk)
558 self._plaintext_hasher.update(chunk)
559 self._update_segment_hash(chunk)
560 # TODO: we have to encrypt the data (even if hash_only==True)
561 # because pycryptopp's AES-CTR implementation doesn't offer a
562 # way to change the counter value. Once pycryptopp acquires
563 # this ability, change this to simply update the counter
564 # before each call to (hash_only==False) _encryptor.process()
565 ciphertext = self._encryptor.process(chunk)
567 self.log(" skipping encryption", level=log.NOISY)
569 cryptdata.append(ciphertext)
572 self._ciphertext_bytes_read += bytes_processed
574 progress = float(self._ciphertext_bytes_read) / self._file_size
575 self._status.set_progress(1, progress)
579 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
580 # this is currently unused, but will live again when we fix #453
581 if len(self._plaintext_segment_hashes) < num_segments:
582 # close out the last one
583 assert len(self._plaintext_segment_hashes) == num_segments-1
584 p, segment_left = self._get_segment_hasher()
585 self._plaintext_segment_hashes.append(p.digest())
586 del self._plaintext_segment_hasher
587 self.log("closing plaintext leaf hasher, hashed %d bytes" %
588 self._plaintext_segment_hashed_bytes,
590 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
591 segnum=len(self._plaintext_segment_hashes)-1,
592 hash=base32.b2a(p.digest()),
594 assert len(self._plaintext_segment_hashes) == num_segments
595 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
597 def get_plaintext_hash(self):
598 h = self._plaintext_hasher.digest()
599 return defer.succeed(h)
602 return self.original.close()
605 implements(IUploadStatus)
606 statusid_counter = itertools.count(0)
609 self.storage_index = None
612 self.status = "Not started"
613 self.progress = [0.0, 0.0, 0.0]
616 self.counter = self.statusid_counter.next()
617 self.started = time.time()
619 def get_started(self):
621 def get_storage_index(self):
622 return self.storage_index
625 def using_helper(self):
627 def get_status(self):
629 def get_progress(self):
630 return tuple(self.progress)
631 def get_active(self):
633 def get_results(self):
635 def get_counter(self):
638 def set_storage_index(self, si):
639 self.storage_index = si
640 def set_size(self, size):
642 def set_helper(self, helper):
644 def set_status(self, status):
646 def set_progress(self, which, value):
647 # [0]: chk, [1]: ciphertext, [2]: encode+push
648 self.progress[which] = value
649 def set_active(self, value):
651 def set_results(self, value):
655 peer_selector_class = Tahoe2PeerSelector
657 def __init__(self, client):
658 self._client = client
659 self._log_number = self._client.log("CHKUploader starting")
661 self._results = UploadResults()
662 self._storage_index = None
663 self._upload_status = UploadStatus()
664 self._upload_status.set_helper(False)
665 self._upload_status.set_active(True)
666 self._upload_status.set_results(self._results)
668 # locate_all_shareholders() will create the following attribute:
669 # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
671 def log(self, *args, **kwargs):
672 if "parent" not in kwargs:
673 kwargs["parent"] = self._log_number
674 if "facility" not in kwargs:
675 kwargs["facility"] = "tahoe.upload"
676 return self._client.log(*args, **kwargs)
678 def start(self, encrypted_uploadable):
679 """Start uploading the file.
681 Returns a Deferred that will fire with the UploadResults instance.
684 self._started = time.time()
685 eu = IEncryptedUploadable(encrypted_uploadable)
686 self.log("starting upload of %s" % eu)
688 eu.set_upload_status(self._upload_status)
689 d = self.start_encrypted(eu)
690 def _done(uploadresults):
691 self._upload_status.set_active(False)
697 """Call this if the upload must be abandoned before it completes.
698 This will tell the shareholders to delete their partial shares. I
699 return a Deferred that fires when these messages have been acked."""
700 if not self._encoder:
701 # how did you call abort() before calling start() ?
702 return defer.succeed(None)
703 return self._encoder.abort()
705 def start_encrypted(self, encrypted):
706 """ Returns a Deferred that will fire with the UploadResults instance. """
707 eu = IEncryptedUploadable(encrypted)
709 started = time.time()
710 self._encoder = e = encode.Encoder(self._log_number,
712 d = e.set_encrypted_uploadable(eu)
713 d.addCallback(self.locate_all_shareholders, started)
714 d.addCallback(self.set_shareholders, e)
715 d.addCallback(lambda res: e.start())
716 d.addCallback(self._encrypted_done)
719 def locate_all_shareholders(self, encoder, started):
720 peer_selection_started = now = time.time()
721 self._storage_index_elapsed = now - started
722 storage_index = encoder.get_param("storage_index")
723 self._storage_index = storage_index
724 upload_id = si_b2a(storage_index)[:5]
725 self.log("using storage index %s" % upload_id)
726 peer_selector = self.peer_selector_class(upload_id, self._log_number,
729 share_size = encoder.get_param("share_size")
730 block_size = encoder.get_param("block_size")
731 num_segments = encoder.get_param("num_segments")
732 k,desired,n = encoder.get_param("share_counts")
734 self._peer_selection_started = time.time()
735 d = peer_selector.get_shareholders(self._client, storage_index,
736 share_size, block_size,
737 num_segments, n, desired)
739 self._peer_selection_elapsed = time.time() - peer_selection_started
744 def set_shareholders(self, (used_peers, already_peers), encoder):
746 @param used_peers: a sequence of PeerTracker objects
747 @paran already_peers: a dict mapping sharenum to a peerid that
748 claims to already have this share
750 self.log("_send_shares, used_peers is %s" % (used_peers,))
751 # record already-present shares in self._results
752 self._results.preexisting_shares = len(already_peers)
754 self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
755 for peer in used_peers:
756 assert isinstance(peer, PeerTracker)
758 for peer in used_peers:
759 buckets.update(peer.buckets)
760 for shnum in peer.buckets:
761 self._peer_trackers[shnum] = peer
762 assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
763 encoder.set_shareholders(buckets)
765 def _encrypted_done(self, verifycap):
766 """ Returns a Deferred that will fire with the UploadResults instance. """
768 for shnum in self._encoder.get_shares_placed():
769 peer_tracker = self._peer_trackers[shnum]
770 peerid = peer_tracker.peerid
771 peerid_s = idlib.shortnodeid_b2a(peerid)
772 r.sharemap.add(shnum, peerid)
773 r.servermap.add(peerid, shnum)
774 r.pushed_shares = len(self._encoder.get_shares_placed())
776 r.file_size = self._encoder.file_size
777 r.timings["total"] = now - self._started
778 r.timings["storage_index"] = self._storage_index_elapsed
779 r.timings["peer_selection"] = self._peer_selection_elapsed
780 r.timings.update(self._encoder.get_times())
781 r.uri_extension_data = self._encoder.get_uri_extension_data()
782 r.verifycapstr = verifycap.to_string()
785 def get_upload_status(self):
786 return self._upload_status
788 def read_this_many_bytes(uploadable, size, prepend_data=[]):
790 return defer.succeed([])
791 d = uploadable.read(size)
793 assert isinstance(data, list)
794 bytes = sum([len(piece) for piece in data])
797 remaining = size - bytes
799 return read_this_many_bytes(uploadable, remaining,
801 return prepend_data + data
805 class LiteralUploader:
807 def __init__(self, client):
808 self._client = client
809 self._results = UploadResults()
810 self._status = s = UploadStatus()
811 s.set_storage_index(None)
813 s.set_progress(0, 1.0)
815 s.set_results(self._results)
817 def start(self, uploadable):
818 uploadable = IUploadable(uploadable)
819 d = uploadable.get_size()
822 self._status.set_size(size)
823 self._results.file_size = size
824 return read_this_many_bytes(uploadable, size)
825 d.addCallback(_got_size)
826 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
827 d.addCallback(lambda u: u.to_string())
828 d.addCallback(self._build_results)
831 def _build_results(self, uri):
832 self._results.uri = uri
833 self._status.set_status("Done")
834 self._status.set_progress(1, 1.0)
835 self._status.set_progress(2, 1.0)
841 def get_upload_status(self):
844 class RemoteEncryptedUploadable(Referenceable):
845 implements(RIEncryptedUploadable)
847 def __init__(self, encrypted_uploadable, upload_status):
848 self._eu = IEncryptedUploadable(encrypted_uploadable)
851 self._status = IUploadStatus(upload_status)
852 # we are responsible for updating the status string while we run, and
853 # for setting the ciphertext-fetch progress.
857 if self._size is not None:
858 return defer.succeed(self._size)
859 d = self._eu.get_size()
863 d.addCallback(_got_size)
866 def remote_get_size(self):
867 return self.get_size()
868 def remote_get_all_encoding_parameters(self):
869 return self._eu.get_all_encoding_parameters()
871 def _read_encrypted(self, length, hash_only):
872 d = self._eu.read_encrypted(length, hash_only)
875 self._offset += length
877 size = sum([len(data) for data in strings])
883 def remote_read_encrypted(self, offset, length):
884 # we don't support seek backwards, but we allow skipping forwards
885 precondition(offset >= 0, offset)
886 precondition(length >= 0, length)
887 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
889 precondition(offset >= self._offset, offset, self._offset)
890 if offset > self._offset:
891 # read the data from disk anyways, to build up the hash tree
892 skip = offset - self._offset
893 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
894 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
895 d = self._read_encrypted(skip, hash_only=True)
897 d = defer.succeed(None)
899 def _at_correct_offset(res):
900 assert offset == self._offset, "%d != %d" % (offset, self._offset)
901 return self._read_encrypted(length, hash_only=False)
902 d.addCallback(_at_correct_offset)
905 size = sum([len(data) for data in strings])
906 self._bytes_sent += size
911 def remote_close(self):
912 return self._eu.close()
915 class AssistedUploader:
917 def __init__(self, helper):
918 self._helper = helper
919 self._log_number = log.msg("AssistedUploader starting")
920 self._storage_index = None
921 self._upload_status = s = UploadStatus()
925 def log(self, *args, **kwargs):
926 if "parent" not in kwargs:
927 kwargs["parent"] = self._log_number
928 return log.msg(*args, **kwargs)
930 def start(self, encrypted_uploadable, storage_index):
931 """Start uploading the file.
933 Returns a Deferred that will fire with the UploadResults instance.
935 precondition(isinstance(storage_index, str), storage_index)
936 self._started = time.time()
937 eu = IEncryptedUploadable(encrypted_uploadable)
938 eu.set_upload_status(self._upload_status)
939 self._encuploadable = eu
940 self._storage_index = storage_index
942 d.addCallback(self._got_size)
943 d.addCallback(lambda res: eu.get_all_encoding_parameters())
944 d.addCallback(self._got_all_encoding_parameters)
945 d.addCallback(self._contact_helper)
946 d.addCallback(self._build_verifycap)
948 self._upload_status.set_active(False)
953 def _got_size(self, size):
955 self._upload_status.set_size(size)
957 def _got_all_encoding_parameters(self, params):
958 k, happy, n, segment_size = params
959 # stash these for URI generation later
960 self._needed_shares = k
961 self._total_shares = n
962 self._segment_size = segment_size
964 def _contact_helper(self, res):
965 now = self._time_contacting_helper_start = time.time()
966 self._storage_index_elapsed = now - self._started
967 self.log(format="contacting helper for SI %(si)s..",
968 si=si_b2a(self._storage_index))
969 self._upload_status.set_status("Contacting Helper")
970 d = self._helper.callRemote("upload_chk", self._storage_index)
971 d.addCallback(self._contacted_helper)
974 def _contacted_helper(self, (upload_results, upload_helper)):
976 elapsed = now - self._time_contacting_helper_start
977 self._elapsed_time_contacting_helper = elapsed
979 self.log("helper says we need to upload")
980 self._upload_status.set_status("Uploading Ciphertext")
981 # we need to upload the file
982 reu = RemoteEncryptedUploadable(self._encuploadable,
984 # let it pre-compute the size for progress purposes
986 d.addCallback(lambda ignored:
987 upload_helper.callRemote("upload", reu))
988 # this Deferred will fire with the upload results
990 self.log("helper says file is already uploaded")
991 self._upload_status.set_progress(1, 1.0)
992 self._upload_status.set_results(upload_results)
993 return upload_results
995 def _convert_old_upload_results(self, upload_results):
996 # pre-1.3.0 helpers return upload results which contain a mapping
997 # from shnum to a single human-readable string, containing things
998 # like "Found on [x],[y],[z]" (for healthy files that were already in
999 # the grid), "Found on [x]" (for files that needed upload but which
1000 # discovered pre-existing shares), and "Placed on [x]" (for newly
1001 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1002 # set of binary serverid strings.
1004 # the old results are too hard to deal with (they don't even contain
1005 # as much information as the new results, since the nodeids are
1006 # abbreviated), so if we detect old results, just clobber them.
1008 sharemap = upload_results.sharemap
1009 if str in [type(v) for v in sharemap.values()]:
1010 upload_results.sharemap = None
1012 def _build_verifycap(self, upload_results):
1013 self.log("upload finished, building readcap")
1014 self._convert_old_upload_results(upload_results)
1015 self._upload_status.set_status("Building Readcap")
1017 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1018 assert r.uri_extension_data["total_shares"] == self._total_shares
1019 assert r.uri_extension_data["segment_size"] == self._segment_size
1020 assert r.uri_extension_data["size"] == self._size
1021 r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1022 uri_extension_hash=r.uri_extension_hash,
1023 needed_shares=self._needed_shares,
1024 total_shares=self._total_shares, size=self._size
1027 r.file_size = self._size
1028 r.timings["storage_index"] = self._storage_index_elapsed
1029 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1030 if "total" in r.timings:
1031 r.timings["helper_total"] = r.timings["total"]
1032 r.timings["total"] = now - self._started
1033 self._upload_status.set_status("Done")
1034 self._upload_status.set_results(r)
1037 def get_upload_status(self):
1038 return self._upload_status
1040 class BaseUploadable:
1041 default_max_segment_size = 128*KiB # overridden by max_segment_size
1042 default_encoding_param_k = 3 # overridden by encoding_parameters
1043 default_encoding_param_happy = 7
1044 default_encoding_param_n = 10
1046 max_segment_size = None
1047 encoding_param_k = None
1048 encoding_param_happy = None
1049 encoding_param_n = None
1051 _all_encoding_parameters = None
1054 def set_upload_status(self, upload_status):
1055 self._status = IUploadStatus(upload_status)
1057 def set_default_encoding_parameters(self, default_params):
1058 assert isinstance(default_params, dict)
1059 for k,v in default_params.items():
1060 precondition(isinstance(k, str), k, v)
1061 precondition(isinstance(v, int), k, v)
1062 if "k" in default_params:
1063 self.default_encoding_param_k = default_params["k"]
1064 if "happy" in default_params:
1065 self.default_encoding_param_happy = default_params["happy"]
1066 if "n" in default_params:
1067 self.default_encoding_param_n = default_params["n"]
1068 if "max_segment_size" in default_params:
1069 self.default_max_segment_size = default_params["max_segment_size"]
1071 def get_all_encoding_parameters(self):
1072 if self._all_encoding_parameters:
1073 return defer.succeed(self._all_encoding_parameters)
1075 max_segsize = self.max_segment_size or self.default_max_segment_size
1076 k = self.encoding_param_k or self.default_encoding_param_k
1077 happy = self.encoding_param_happy or self.default_encoding_param_happy
1078 n = self.encoding_param_n or self.default_encoding_param_n
1081 def _got_size(file_size):
1082 # for small files, shrink the segment size to avoid wasting space
1083 segsize = min(max_segsize, file_size)
1084 # this must be a multiple of 'required_shares'==k
1085 segsize = mathutil.next_multiple(segsize, k)
1086 encoding_parameters = (k, happy, n, segsize)
1087 self._all_encoding_parameters = encoding_parameters
1088 return encoding_parameters
1089 d.addCallback(_got_size)
1092 class FileHandle(BaseUploadable):
1093 implements(IUploadable)
1095 def __init__(self, filehandle, convergence):
1097 Upload the data from the filehandle. If convergence is None then a
1098 random encryption key will be used, else the plaintext will be hashed,
1099 then the hash will be hashed together with the string in the
1100 "convergence" argument to form the encryption key.
1102 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1103 self._filehandle = filehandle
1105 self.convergence = convergence
1108 def _get_encryption_key_convergent(self):
1109 if self._key is not None:
1110 return defer.succeed(self._key)
1113 # that sets self._size as a side-effect
1114 d.addCallback(lambda size: self.get_all_encoding_parameters())
1116 k, happy, n, segsize = params
1117 f = self._filehandle
1118 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1123 data = f.read(BLOCKSIZE)
1126 enckey_hasher.update(data)
1127 # TODO: setting progress in a non-yielding loop is kind of
1128 # pointless, but I'm anticipating (perhaps prematurely) the
1129 # day when we use a slowjob or twisted's CooperatorService to
1130 # make this yield time to other jobs.
1131 bytes_read += len(data)
1133 self._status.set_progress(0, float(bytes_read)/self._size)
1135 self._key = enckey_hasher.digest()
1137 self._status.set_progress(0, 1.0)
1138 assert len(self._key) == 16
1143 def _get_encryption_key_random(self):
1144 if self._key is None:
1145 self._key = os.urandom(16)
1146 return defer.succeed(self._key)
1148 def get_encryption_key(self):
1149 if self.convergence is not None:
1150 return self._get_encryption_key_convergent()
1152 return self._get_encryption_key_random()
1155 if self._size is not None:
1156 return defer.succeed(self._size)
1157 self._filehandle.seek(0,2)
1158 size = self._filehandle.tell()
1160 self._filehandle.seek(0)
1161 return defer.succeed(size)
1163 def read(self, length):
1164 return defer.succeed([self._filehandle.read(length)])
1167 # the originator of the filehandle reserves the right to close it
1170 class FileName(FileHandle):
1171 def __init__(self, filename, convergence):
1173 Upload the data from the filename. If convergence is None then a
1174 random encryption key will be used, else the plaintext will be hashed,
1175 then the hash will be hashed together with the string in the
1176 "convergence" argument to form the encryption key.
1178 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1179 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1181 FileHandle.close(self)
1182 self._filehandle.close()
1184 class Data(FileHandle):
1185 def __init__(self, data, convergence):
1187 Upload the data from the data argument. If convergence is None then a
1188 random encryption key will be used, else the plaintext will be hashed,
1189 then the hash will be hashed together with the string in the
1190 "convergence" argument to form the encryption key.
1192 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1193 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1195 class Uploader(service.MultiService, log.PrefixingLogMixin):
1196 """I am a service that allows file uploading. I am a service-child of the
1199 implements(IUploader)
1201 URI_LIT_SIZE_THRESHOLD = 55
1203 def __init__(self, helper_furl=None, stats_provider=None):
1204 self._helper_furl = helper_furl
1205 self.stats_provider = stats_provider
1207 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1208 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1209 service.MultiService.__init__(self)
1211 def startService(self):
1212 service.MultiService.startService(self)
1213 if self._helper_furl:
1214 self.parent.tub.connectTo(self._helper_furl,
1217 def _got_helper(self, helper):
1218 self.log("got helper connection, getting versions")
1219 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1221 "application-version": "unknown: no get_version()",
1223 d = add_version_to_remote_reference(helper, default)
1224 d.addCallback(self._got_versioned_helper)
1226 def _got_versioned_helper(self, helper):
1227 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1228 if needed not in helper.version:
1229 raise InsufficientVersionError(needed, helper.version)
1230 self._helper = helper
1231 helper.notifyOnDisconnect(self._lost_helper)
1233 def _lost_helper(self):
1236 def get_helper_info(self):
1237 # return a tuple of (helper_furl_or_None, connected_bool)
1238 return (self._helper_furl, bool(self._helper))
1241 def upload(self, uploadable, history=None):
1243 Returns a Deferred that will fire with the UploadResults instance.
1248 uploadable = IUploadable(uploadable)
1249 d = uploadable.get_size()
1250 def _got_size(size):
1251 default_params = self.parent.get_encoding_parameters()
1252 precondition(isinstance(default_params, dict), default_params)
1253 precondition("max_segment_size" in default_params, default_params)
1254 uploadable.set_default_encoding_parameters(default_params)
1256 if self.stats_provider:
1257 self.stats_provider.count('uploader.files_uploaded', 1)
1258 self.stats_provider.count('uploader.bytes_uploaded', size)
1260 if size <= self.URI_LIT_SIZE_THRESHOLD:
1261 uploader = LiteralUploader(self.parent)
1262 return uploader.start(uploadable)
1264 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1265 d2 = defer.succeed(None)
1267 uploader = AssistedUploader(self._helper)
1268 d2.addCallback(lambda x: eu.get_storage_index())
1269 d2.addCallback(lambda si: uploader.start(eu, si))
1271 uploader = CHKUploader(self.parent)
1272 d2.addCallback(lambda x: uploader.start(eu))
1274 self._all_uploads[uploader] = None
1276 history.add_upload(uploader.get_upload_status())
1277 def turn_verifycap_into_read_cap(uploadresults):
1278 # Generate the uri from the verifycap plus the key.
1279 d3 = uploadable.get_encryption_key()
1280 def put_readcap_into_results(key):
1281 v = uri.from_string(uploadresults.verifycapstr)
1282 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1283 uploadresults.uri = r.to_string()
1284 return uploadresults
1285 d3.addCallback(put_readcap_into_results)
1287 d2.addCallback(turn_verifycap_into_read_cap)
1289 d.addCallback(_got_size)