1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap import Referenceable, Copyable, RemoteCopy
7 from foolscap import eventual
9 from allmydata.util.hashutil import file_renewal_secret_hash, \
10 file_cancel_secret_hash, bucket_renewal_secret_hash, \
11 bucket_cancel_secret_hash, plaintext_hasher, \
12 storage_index_hash, plaintext_segment_hasher, convergence_hasher
13 from allmydata import storage, hashtree, uri
14 from allmydata.immutable import encode
15 from allmydata.util import base32, idlib, log, mathutil
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.rrefutil import get_versioned_remote_reference
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
20 NotEnoughSharesError, InsufficientVersionError
21 from allmydata.immutable import layout
22 from pycryptopp.cipher.aes import AES
24 from cStringIO import StringIO
33 class HaveAllPeersError(Exception):
34 # we use this to jump out of the loop
37 # this wants to live in storage, not here
38 class TooFullError(Exception):
41 class UploadResults(Copyable, RemoteCopy):
42 implements(IUploadResults)
43 # note: don't change this string, it needs to match the value used on the
44 # helper, and it does *not* need to match the fully-qualified
45 # package/module/class name
46 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
50 self.timings = {} # dict of name to number of seconds
51 self.sharemap = {} # k: shnum, v: set(serverid)
52 self.servermap = {} # k: serverid, v: set(shnum)
54 self.ciphertext_fetched = None # how much the helper fetched
56 self.preexisting_shares = None # count of shares already present
57 self.pushed_shares = None # count of shares we pushed
60 # our current uri_extension is 846 bytes for small files, a few bytes
61 # more for larger ones (since the filesize is encoded in decimal in a
62 # few places). Ask for a little bit more just in case we need it. If
63 # the extension changes size, we can change EXTENSION_SIZE to
64 # allocate a more accurate amount of space.
66 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
70 def __init__(self, peerid, storage_server,
71 sharesize, blocksize, num_segments, num_share_hashes,
73 bucket_renewal_secret, bucket_cancel_secret):
74 precondition(isinstance(peerid, str), peerid)
75 precondition(len(peerid) == 20, peerid)
77 self._storageserver = storage_server # to an RIStorageServer
78 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
79 self.sharesize = sharesize
80 self.allocated_size = layout.allocated_size(sharesize,
85 self.blocksize = blocksize
86 self.num_segments = num_segments
87 self.num_share_hashes = num_share_hashes
88 self.storage_index = storage_index
90 self.renew_secret = bucket_renewal_secret
91 self.cancel_secret = bucket_cancel_secret
94 return ("<PeerTracker for peer %s and SI %s>"
95 % (idlib.shortnodeid_b2a(self.peerid),
96 storage.si_b2a(self.storage_index)[:5]))
98 def query(self, sharenums):
99 d = self._storageserver.callRemote("allocate_buckets",
105 canary=Referenceable())
106 d.addCallback(self._got_reply)
109 def _got_reply(self, (alreadygot, buckets)):
110 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
112 for sharenum, rref in buckets.iteritems():
113 bp = layout.WriteBucketProxy(rref, self.sharesize,
116 self.num_share_hashes,
120 self.buckets.update(b)
121 return (alreadygot, set(b.keys()))
123 class Tahoe2PeerSelector:
125 def __init__(self, upload_id, logparent=None, upload_status=None):
126 self.upload_id = upload_id
127 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
129 self.num_peers_contacted = 0
130 self.last_failure_msg = None
131 self._status = IUploadStatus(upload_status)
132 self._log_parent = log.msg("%s starting" % self, parent=logparent)
135 return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
137 def get_shareholders(self, client,
138 storage_index, share_size, block_size,
139 num_segments, total_shares, shares_of_happiness):
141 @return: (used_peers, already_peers), where used_peers is a set of
142 PeerTracker instances that have agreed to hold some shares
143 for us (the shnum is stashed inside the PeerTracker),
144 and already_peers is a dict mapping shnum to a peer
145 which claims to already have the share.
149 self._status.set_status("Contacting Peers..")
151 self.total_shares = total_shares
152 self.shares_of_happiness = shares_of_happiness
154 self.homeless_shares = range(total_shares)
155 # self.uncontacted_peers = list() # peers we haven't asked yet
156 self.contacted_peers = [] # peers worth asking again
157 self.contacted_peers2 = [] # peers that we have asked again
158 self._started_second_pass = False
159 self.use_peers = set() # PeerTrackers that have shares assigned to them
160 self.preexisting_shares = {} # sharenum -> peerid holding the share
162 peers = client.get_permuted_peers("storage", storage_index)
164 raise NotEnoughSharesError("client gave us zero peers")
166 # this needed_hashes computation should mirror
167 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
168 # (instead of a HashTree) because we don't require actual hashing
169 # just to count the levels.
170 ht = hashtree.IncompleteHashTree(total_shares)
171 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
173 # figure out how much space to ask for
174 allocated_size = layout.allocated_size(share_size,
178 # filter the list of peers according to which ones can accomodate
179 # this request. This excludes older peers (which used a 4-byte size
180 # field) from getting large shares (for files larger than about
181 # 12GiB). See #439 for details.
182 def _get_maxsize(peer):
183 (peerid, conn) = peer
184 v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
185 return v1["maximum-immutable-share-size"]
186 peers = [peer for peer in peers
187 if _get_maxsize(peer) >= allocated_size]
189 raise NotEnoughSharesError("no peers could accept an allocated_size of %d" % allocated_size)
191 # decide upon the renewal/cancel secrets, to include them in the
192 # allocat_buckets query.
193 client_renewal_secret = client.get_renewal_secret()
194 client_cancel_secret = client.get_cancel_secret()
196 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
198 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
201 trackers = [ PeerTracker(peerid, conn,
202 share_size, block_size,
203 num_segments, num_share_hashes,
205 bucket_renewal_secret_hash(file_renewal_secret,
207 bucket_cancel_secret_hash(file_cancel_secret,
210 for (peerid, conn) in peers ]
211 self.uncontacted_peers = trackers
213 d = defer.maybeDeferred(self._loop)
217 if not self.homeless_shares:
219 msg = ("placed all %d shares, "
220 "sent %d queries to %d peers, "
221 "%d queries placed some shares, %d placed none, "
224 self.query_count, self.num_peers_contacted,
225 self.good_query_count, self.bad_query_count,
227 log.msg("peer selection successful for %s: %s" % (self, msg),
228 parent=self._log_parent)
229 return (self.use_peers, self.preexisting_shares)
231 if self.uncontacted_peers:
232 peer = self.uncontacted_peers.pop(0)
233 # TODO: don't pre-convert all peerids to PeerTrackers
234 assert isinstance(peer, PeerTracker)
236 shares_to_ask = set([self.homeless_shares.pop(0)])
237 self.query_count += 1
238 self.num_peers_contacted += 1
240 self._status.set_status("Contacting Peers [%s] (first query),"
242 % (idlib.shortnodeid_b2a(peer.peerid),
243 len(self.homeless_shares)))
244 d = peer.query(shares_to_ask)
245 d.addBoth(self._got_response, peer, shares_to_ask,
246 self.contacted_peers)
248 elif self.contacted_peers:
249 # ask a peer that we've already asked.
250 if not self._started_second_pass:
251 log.msg("starting second pass", parent=self._log_parent,
253 self._started_second_pass = True
254 num_shares = mathutil.div_ceil(len(self.homeless_shares),
255 len(self.contacted_peers))
256 peer = self.contacted_peers.pop(0)
257 shares_to_ask = set(self.homeless_shares[:num_shares])
258 self.homeless_shares[:num_shares] = []
259 self.query_count += 1
261 self._status.set_status("Contacting Peers [%s] (second query),"
263 % (idlib.shortnodeid_b2a(peer.peerid),
264 len(self.homeless_shares)))
265 d = peer.query(shares_to_ask)
266 d.addBoth(self._got_response, peer, shares_to_ask,
267 self.contacted_peers2)
269 elif self.contacted_peers2:
270 # we've finished the second-or-later pass. Move all the remaining
271 # peers back into self.contacted_peers for the next pass.
272 self.contacted_peers.extend(self.contacted_peers2)
273 self.contacted_peers[:] = []
276 # no more peers. If we haven't placed enough shares, we fail.
277 placed_shares = self.total_shares - len(self.homeless_shares)
278 if placed_shares < self.shares_of_happiness:
279 msg = ("placed %d shares out of %d total (%d homeless), "
280 "sent %d queries to %d peers, "
281 "%d queries placed some shares, %d placed none, "
283 (self.total_shares - len(self.homeless_shares),
284 self.total_shares, len(self.homeless_shares),
285 self.query_count, self.num_peers_contacted,
286 self.good_query_count, self.bad_query_count,
288 msg = "peer selection failed for %s: %s" % (self, msg)
289 if self.last_failure_msg:
290 msg += " (%s)" % (self.last_failure_msg,)
291 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
292 raise NotEnoughSharesError(msg)
294 # we placed enough to be happy, so we're done
296 self._status.set_status("Placed all shares")
297 return self.use_peers
299 def _got_response(self, res, peer, shares_to_ask, put_peer_here):
300 if isinstance(res, failure.Failure):
301 # This is unusual, and probably indicates a bug or a network
303 log.msg("%s got error during peer selection: %s" % (peer, res),
304 level=log.UNUSUAL, parent=self._log_parent)
305 self.error_count += 1
306 self.homeless_shares = list(shares_to_ask) + self.homeless_shares
307 if (self.uncontacted_peers
308 or self.contacted_peers
309 or self.contacted_peers2):
310 # there is still hope, so just loop
313 # No more peers, so this upload might fail (it depends upon
314 # whether we've hit shares_of_happiness or not). Log the last
315 # failure we got: if a coding error causes all peers to fail
316 # in the same way, this allows the common failure to be seen
317 # by the uploader and should help with debugging
318 msg = ("last failure (from %s) was: %s" % (peer, res))
319 self.last_failure_msg = msg
321 (alreadygot, allocated) = res
322 log.msg("response from peer %s: alreadygot=%s, allocated=%s"
323 % (idlib.shortnodeid_b2a(peer.peerid),
324 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
325 level=log.NOISY, parent=self._log_parent)
328 self.preexisting_shares[s] = peer.peerid
329 if s in self.homeless_shares:
330 self.homeless_shares.remove(s)
333 # the PeerTracker will remember which shares were allocated on
334 # that peer. We just have to remember to use them.
336 self.use_peers.add(peer)
339 not_yet_present = set(shares_to_ask) - set(alreadygot)
340 still_homeless = not_yet_present - set(allocated)
343 # they accepted or already had at least one share, so
344 # progress has been made
345 self.good_query_count += 1
347 self.bad_query_count += 1
350 # In networks with lots of space, this is very unusual and
351 # probably indicates an error. In networks with peers that
352 # are full, it is merely unusual. In networks that are very
353 # full, it is common, and many uploads will fail. In most
354 # cases, this is obviously not fatal, and we'll just use some
357 # some shares are still homeless, keep trying to find them a
358 # home. The ones that were rejected get first priority.
359 self.homeless_shares = (list(still_homeless)
360 + self.homeless_shares)
361 # Since they were unable to accept all of our requests, so it
362 # is safe to assume that asking them again won't help.
364 # if they *were* able to accept everything, they might be
365 # willing to accept even more.
366 put_peer_here.append(peer)
372 class EncryptAnUploadable:
373 """This is a wrapper that takes an IUploadable and provides
374 IEncryptedUploadable."""
375 implements(IEncryptedUploadable)
378 def __init__(self, original, log_parent=None):
379 self.original = IUploadable(original)
380 self._log_number = log_parent
381 self._encryptor = None
382 self._plaintext_hasher = plaintext_hasher()
383 self._plaintext_segment_hasher = None
384 self._plaintext_segment_hashes = []
385 self._encoding_parameters = None
386 self._file_size = None
387 self._ciphertext_bytes_read = 0
390 def set_upload_status(self, upload_status):
391 self._status = IUploadStatus(upload_status)
392 self.original.set_upload_status(upload_status)
394 def log(self, *args, **kwargs):
395 if "facility" not in kwargs:
396 kwargs["facility"] = "upload.encryption"
397 if "parent" not in kwargs:
398 kwargs["parent"] = self._log_number
399 return log.msg(*args, **kwargs)
402 if self._file_size is not None:
403 return defer.succeed(self._file_size)
404 d = self.original.get_size()
406 self._file_size = size
408 self._status.set_size(size)
410 d.addCallback(_got_size)
413 def get_all_encoding_parameters(self):
414 if self._encoding_parameters is not None:
415 return defer.succeed(self._encoding_parameters)
416 d = self.original.get_all_encoding_parameters()
417 def _got(encoding_parameters):
418 (k, happy, n, segsize) = encoding_parameters
419 self._segment_size = segsize # used by segment hashers
420 self._encoding_parameters = encoding_parameters
421 self.log("my encoding parameters: %s" % (encoding_parameters,),
423 return encoding_parameters
427 def _get_encryptor(self):
429 return defer.succeed(self._encryptor)
431 d = self.original.get_encryption_key()
436 storage_index = storage_index_hash(key)
437 assert isinstance(storage_index, str)
438 # There's no point to having the SI be longer than the key, so we
439 # specify that it is truncated to the same 128 bits as the AES key.
440 assert len(storage_index) == 16 # SHA-256 truncated to 128b
441 self._storage_index = storage_index
443 self._status.set_storage_index(storage_index)
448 def get_storage_index(self):
449 d = self._get_encryptor()
450 d.addCallback(lambda res: self._storage_index)
453 def _get_segment_hasher(self):
454 p = self._plaintext_segment_hasher
456 left = self._segment_size - self._plaintext_segment_hashed_bytes
458 p = plaintext_segment_hasher()
459 self._plaintext_segment_hasher = p
460 self._plaintext_segment_hashed_bytes = 0
461 return p, self._segment_size
463 def _update_segment_hash(self, chunk):
465 while offset < len(chunk):
466 p, segment_left = self._get_segment_hasher()
467 chunk_left = len(chunk) - offset
468 this_segment = min(chunk_left, segment_left)
469 p.update(chunk[offset:offset+this_segment])
470 self._plaintext_segment_hashed_bytes += this_segment
472 if self._plaintext_segment_hashed_bytes == self._segment_size:
473 # we've filled this segment
474 self._plaintext_segment_hashes.append(p.digest())
475 self._plaintext_segment_hasher = None
476 self.log("closed hash [%d]: %dB" %
477 (len(self._plaintext_segment_hashes)-1,
478 self._plaintext_segment_hashed_bytes),
480 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
481 segnum=len(self._plaintext_segment_hashes)-1,
482 hash=base32.b2a(p.digest()),
485 offset += this_segment
488 def read_encrypted(self, length, hash_only):
489 # make sure our parameters have been set up first
490 d = self.get_all_encoding_parameters()
492 d.addCallback(lambda ignored: self.get_size())
493 d.addCallback(lambda ignored: self._get_encryptor())
494 # then fetch and encrypt the plaintext. The unusual structure here
495 # (passing a Deferred *into* a function) is needed to avoid
496 # overflowing the stack: Deferreds don't optimize out tail recursion.
497 # We also pass in a list, to which _read_encrypted will append
500 d2 = defer.Deferred()
501 d.addCallback(lambda ignored:
502 self._read_encrypted(length, ciphertext, hash_only, d2))
503 d.addCallback(lambda ignored: d2)
506 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
508 fire_when_done.callback(ciphertext)
510 # tolerate large length= values without consuming a lot of RAM by
511 # reading just a chunk (say 50kB) at a time. This only really matters
512 # when hash_only==True (i.e. resuming an interrupted upload), since
513 # that's the case where we will be skipping over a lot of data.
514 size = min(remaining, self.CHUNKSIZE)
515 remaining = remaining - size
516 # read a chunk of plaintext..
517 d = defer.maybeDeferred(self.original.read, size)
518 # N.B.: if read() is synchronous, then since everything else is
519 # actually synchronous too, we'd blow the stack unless we stall for a
520 # tick. Once you accept a Deferred from IUploadable.read(), you must
521 # be prepared to have it fire immediately too.
522 d.addCallback(eventual.fireEventually)
523 def _good(plaintext):
525 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
526 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
527 ciphertext.extend(ct)
528 self._read_encrypted(remaining, ciphertext, hash_only,
531 fire_when_done.errback(why)
536 def _hash_and_encrypt_plaintext(self, data, hash_only):
537 assert isinstance(data, (tuple, list)), type(data)
540 # we use data.pop(0) instead of 'for chunk in data' to save
541 # memory: each chunk is destroyed as soon as we're done with it.
545 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
547 bytes_processed += len(chunk)
548 self._plaintext_hasher.update(chunk)
549 self._update_segment_hash(chunk)
550 # TODO: we have to encrypt the data (even if hash_only==True)
551 # because pycryptopp's AES-CTR implementation doesn't offer a
552 # way to change the counter value. Once pycryptopp acquires
553 # this ability, change this to simply update the counter
554 # before each call to (hash_only==False) _encryptor.process()
555 ciphertext = self._encryptor.process(chunk)
557 self.log(" skipping encryption", level=log.NOISY)
559 cryptdata.append(ciphertext)
562 self._ciphertext_bytes_read += bytes_processed
564 progress = float(self._ciphertext_bytes_read) / self._file_size
565 self._status.set_progress(1, progress)
569 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
570 if len(self._plaintext_segment_hashes) < num_segments:
571 # close out the last one
572 assert len(self._plaintext_segment_hashes) == num_segments-1
573 p, segment_left = self._get_segment_hasher()
574 self._plaintext_segment_hashes.append(p.digest())
575 del self._plaintext_segment_hasher
576 self.log("closing plaintext leaf hasher, hashed %d bytes" %
577 self._plaintext_segment_hashed_bytes,
579 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
580 segnum=len(self._plaintext_segment_hashes)-1,
581 hash=base32.b2a(p.digest()),
583 assert len(self._plaintext_segment_hashes) == num_segments
584 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
586 def get_plaintext_hash(self):
587 h = self._plaintext_hasher.digest()
588 return defer.succeed(h)
591 return self.original.close()
594 implements(IUploadStatus)
595 statusid_counter = itertools.count(0)
598 self.storage_index = None
601 self.status = "Not started"
602 self.progress = [0.0, 0.0, 0.0]
605 self.counter = self.statusid_counter.next()
606 self.started = time.time()
608 def get_started(self):
610 def get_storage_index(self):
611 return self.storage_index
614 def using_helper(self):
616 def get_status(self):
618 def get_progress(self):
619 return tuple(self.progress)
620 def get_active(self):
622 def get_results(self):
624 def get_counter(self):
627 def set_storage_index(self, si):
628 self.storage_index = si
629 def set_size(self, size):
631 def set_helper(self, helper):
633 def set_status(self, status):
635 def set_progress(self, which, value):
636 # [0]: chk, [1]: ciphertext, [2]: encode+push
637 self.progress[which] = value
638 def set_active(self, value):
640 def set_results(self, value):
644 peer_selector_class = Tahoe2PeerSelector
646 def __init__(self, client):
647 self._client = client
648 self._log_number = self._client.log("CHKUploader starting")
650 self._results = UploadResults()
651 self._storage_index = None
652 self._upload_status = UploadStatus()
653 self._upload_status.set_helper(False)
654 self._upload_status.set_active(True)
655 self._upload_status.set_results(self._results)
657 # locate_all_shareholders() will create the following attribute:
658 # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
660 def log(self, *args, **kwargs):
661 if "parent" not in kwargs:
662 kwargs["parent"] = self._log_number
663 if "facility" not in kwargs:
664 kwargs["facility"] = "tahoe.upload"
665 return self._client.log(*args, **kwargs)
667 def start(self, encrypted_uploadable):
668 """Start uploading the file.
670 Returns a Deferred that will fire with the UploadResults instance.
673 self._started = time.time()
674 eu = IEncryptedUploadable(encrypted_uploadable)
675 self.log("starting upload of %s" % eu)
677 eu.set_upload_status(self._upload_status)
678 d = self.start_encrypted(eu)
679 def _done(uploadresults):
680 self._upload_status.set_active(False)
686 """Call this is the upload must be abandoned before it completes.
687 This will tell the shareholders to delete their partial shares. I
688 return a Deferred that fires when these messages have been acked."""
689 if not self._encoder:
690 # how did you call abort() before calling start() ?
691 return defer.succeed(None)
692 return self._encoder.abort()
694 def start_encrypted(self, encrypted):
695 """ Returns a Deferred that will fire with the UploadResults instance. """
696 eu = IEncryptedUploadable(encrypted)
698 started = time.time()
699 self._encoder = e = encode.Encoder(self._log_number,
701 d = e.set_encrypted_uploadable(eu)
702 d.addCallback(self.locate_all_shareholders, started)
703 d.addCallback(self.set_shareholders, e)
704 d.addCallback(lambda res: e.start())
705 d.addCallback(self._encrypted_done)
708 def locate_all_shareholders(self, encoder, started):
709 peer_selection_started = now = time.time()
710 self._storage_index_elapsed = now - started
711 storage_index = encoder.get_param("storage_index")
712 self._storage_index = storage_index
713 upload_id = storage.si_b2a(storage_index)[:5]
714 self.log("using storage index %s" % upload_id)
715 peer_selector = self.peer_selector_class(upload_id, self._log_number,
718 share_size = encoder.get_param("share_size")
719 block_size = encoder.get_param("block_size")
720 num_segments = encoder.get_param("num_segments")
721 k,desired,n = encoder.get_param("share_counts")
723 self._peer_selection_started = time.time()
724 d = peer_selector.get_shareholders(self._client, storage_index,
725 share_size, block_size,
726 num_segments, n, desired)
728 self._peer_selection_elapsed = time.time() - peer_selection_started
733 def set_shareholders(self, (used_peers, already_peers), encoder):
735 @param used_peers: a sequence of PeerTracker objects
736 @paran already_peers: a dict mapping sharenum to a peerid that
737 claims to already have this share
739 self.log("_send_shares, used_peers is %s" % (used_peers,))
740 # record already-present shares in self._results
741 self._results.preexisting_shares = len(already_peers)
743 self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
744 for peer in used_peers:
745 assert isinstance(peer, PeerTracker)
747 for peer in used_peers:
748 buckets.update(peer.buckets)
749 for shnum in peer.buckets:
750 self._peer_trackers[shnum] = peer
751 assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
752 encoder.set_shareholders(buckets)
754 def _encrypted_done(self, verifycap):
755 """ Returns a Deferred that will fire with the UploadResults instance. """
757 for shnum in self._encoder.get_shares_placed():
758 peer_tracker = self._peer_trackers[shnum]
759 peerid = peer_tracker.peerid
760 peerid_s = idlib.shortnodeid_b2a(peerid)
761 r.sharemap.setdefault(shnum, set()).add(peerid)
762 r.servermap.setdefault(peerid, set()).add(shnum)
763 r.pushed_shares = len(self._encoder.get_shares_placed())
765 r.file_size = self._encoder.file_size
766 r.timings["total"] = now - self._started
767 r.timings["storage_index"] = self._storage_index_elapsed
768 r.timings["peer_selection"] = self._peer_selection_elapsed
769 r.timings.update(self._encoder.get_times())
770 r.uri_extension_data = self._encoder.get_uri_extension_data()
771 r.verifycapstr = verifycap.to_string()
774 def get_upload_status(self):
775 return self._upload_status
777 def read_this_many_bytes(uploadable, size, prepend_data=[]):
779 return defer.succeed([])
780 d = uploadable.read(size)
782 assert isinstance(data, list)
783 bytes = sum([len(piece) for piece in data])
786 remaining = size - bytes
788 return read_this_many_bytes(uploadable, remaining,
790 return prepend_data + data
794 class LiteralUploader:
796 def __init__(self, client):
797 self._client = client
798 self._results = UploadResults()
799 self._status = s = UploadStatus()
800 s.set_storage_index(None)
802 s.set_progress(0, 1.0)
804 s.set_results(self._results)
806 def start(self, uploadable):
807 uploadable = IUploadable(uploadable)
808 d = uploadable.get_size()
811 self._status.set_size(size)
812 self._results.file_size = size
813 return read_this_many_bytes(uploadable, size)
814 d.addCallback(_got_size)
815 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
816 d.addCallback(lambda u: u.to_string())
817 d.addCallback(self._build_results)
820 def _build_results(self, uri):
821 self._results.uri = uri
822 self._status.set_status("Done")
823 self._status.set_progress(1, 1.0)
824 self._status.set_progress(2, 1.0)
830 def get_upload_status(self):
833 class RemoteEncryptedUploadable(Referenceable):
834 implements(RIEncryptedUploadable)
836 def __init__(self, encrypted_uploadable, upload_status):
837 self._eu = IEncryptedUploadable(encrypted_uploadable)
840 self._status = IUploadStatus(upload_status)
841 # we are responsible for updating the status string while we run, and
842 # for setting the ciphertext-fetch progress.
846 if self._size is not None:
847 return defer.succeed(self._size)
848 d = self._eu.get_size()
852 d.addCallback(_got_size)
855 def remote_get_size(self):
856 return self.get_size()
857 def remote_get_all_encoding_parameters(self):
858 return self._eu.get_all_encoding_parameters()
860 def _read_encrypted(self, length, hash_only):
861 d = self._eu.read_encrypted(length, hash_only)
864 self._offset += length
866 size = sum([len(data) for data in strings])
872 def remote_read_encrypted(self, offset, length):
873 # we don't support seek backwards, but we allow skipping forwards
874 precondition(offset >= 0, offset)
875 precondition(length >= 0, length)
876 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
878 precondition(offset >= self._offset, offset, self._offset)
879 if offset > self._offset:
880 # read the data from disk anyways, to build up the hash tree
881 skip = offset - self._offset
882 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
883 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
884 d = self._read_encrypted(skip, hash_only=True)
886 d = defer.succeed(None)
888 def _at_correct_offset(res):
889 assert offset == self._offset, "%d != %d" % (offset, self._offset)
890 return self._read_encrypted(length, hash_only=False)
891 d.addCallback(_at_correct_offset)
894 size = sum([len(data) for data in strings])
895 self._bytes_sent += size
900 def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
901 log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
902 (first, last-1, num_segments),
904 d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
907 def remote_get_plaintext_hash(self):
908 return self._eu.get_plaintext_hash()
909 def remote_close(self):
910 return self._eu.close()
913 class AssistedUploader:
915 def __init__(self, helper):
916 self._helper = helper
917 self._log_number = log.msg("AssistedUploader starting")
918 self._storage_index = None
919 self._upload_status = s = UploadStatus()
923 def log(self, *args, **kwargs):
924 if "parent" not in kwargs:
925 kwargs["parent"] = self._log_number
926 return log.msg(*args, **kwargs)
928 def start(self, encrypted_uploadable, storage_index):
929 """Start uploading the file.
931 Returns a Deferred that will fire with the UploadResults instance.
933 precondition(isinstance(storage_index, str), storage_index)
934 self._started = time.time()
935 eu = IEncryptedUploadable(encrypted_uploadable)
936 eu.set_upload_status(self._upload_status)
937 self._encuploadable = eu
938 self._storage_index = storage_index
940 d.addCallback(self._got_size)
941 d.addCallback(lambda res: eu.get_all_encoding_parameters())
942 d.addCallback(self._got_all_encoding_parameters)
943 d.addCallback(self._contact_helper)
944 d.addCallback(self._build_verifycap)
946 self._upload_status.set_active(False)
951 def _got_size(self, size):
953 self._upload_status.set_size(size)
955 def _got_all_encoding_parameters(self, params):
956 k, happy, n, segment_size = params
957 # stash these for URI generation later
958 self._needed_shares = k
959 self._total_shares = n
960 self._segment_size = segment_size
962 def _contact_helper(self, res):
963 now = self._time_contacting_helper_start = time.time()
964 self._storage_index_elapsed = now - self._started
965 self.log(format="contacting helper for SI %(si)s..",
966 si=storage.si_b2a(self._storage_index))
967 self._upload_status.set_status("Contacting Helper")
968 d = self._helper.callRemote("upload_chk", self._storage_index)
969 d.addCallback(self._contacted_helper)
972 def _contacted_helper(self, (upload_results, upload_helper)):
974 elapsed = now - self._time_contacting_helper_start
975 self._elapsed_time_contacting_helper = elapsed
977 self.log("helper says we need to upload")
978 self._upload_status.set_status("Uploading Ciphertext")
979 # we need to upload the file
980 reu = RemoteEncryptedUploadable(self._encuploadable,
982 # let it pre-compute the size for progress purposes
984 d.addCallback(lambda ignored:
985 upload_helper.callRemote("upload", reu))
986 # this Deferred will fire with the upload results
988 self.log("helper says file is already uploaded")
989 self._upload_status.set_progress(1, 1.0)
990 self._upload_status.set_results(upload_results)
991 return upload_results
993 def _build_verifycap(self, upload_results):
994 self.log("upload finished, building readcap")
995 self._upload_status.set_status("Building Readcap")
997 assert r.uri_extension_data["needed_shares"] == self._needed_shares
998 assert r.uri_extension_data["total_shares"] == self._total_shares
999 assert r.uri_extension_data["segment_size"] == self._segment_size
1000 assert r.uri_extension_data["size"] == self._size
1001 r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1002 uri_extension_hash=r.uri_extension_hash,
1003 needed_shares=self._needed_shares,
1004 total_shares=self._total_shares, size=self._size
1007 r.file_size = self._size
1008 r.timings["storage_index"] = self._storage_index_elapsed
1009 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1010 if "total" in r.timings:
1011 r.timings["helper_total"] = r.timings["total"]
1012 r.timings["total"] = now - self._started
1013 self._upload_status.set_status("Done")
1014 self._upload_status.set_results(r)
1017 def get_upload_status(self):
1018 return self._upload_status
1020 class BaseUploadable:
1021 default_max_segment_size = 128*KiB # overridden by max_segment_size
1022 default_encoding_param_k = 3 # overridden by encoding_parameters
1023 default_encoding_param_happy = 7
1024 default_encoding_param_n = 10
1026 max_segment_size = None
1027 encoding_param_k = None
1028 encoding_param_happy = None
1029 encoding_param_n = None
1031 _all_encoding_parameters = None
1034 def set_upload_status(self, upload_status):
1035 self._status = IUploadStatus(upload_status)
1037 def set_default_encoding_parameters(self, default_params):
1038 assert isinstance(default_params, dict)
1039 for k,v in default_params.items():
1040 precondition(isinstance(k, str), k, v)
1041 precondition(isinstance(v, int), k, v)
1042 if "k" in default_params:
1043 self.default_encoding_param_k = default_params["k"]
1044 if "happy" in default_params:
1045 self.default_encoding_param_happy = default_params["happy"]
1046 if "n" in default_params:
1047 self.default_encoding_param_n = default_params["n"]
1048 if "max_segment_size" in default_params:
1049 self.default_max_segment_size = default_params["max_segment_size"]
1051 def get_all_encoding_parameters(self):
1052 if self._all_encoding_parameters:
1053 return defer.succeed(self._all_encoding_parameters)
1055 max_segsize = self.max_segment_size or self.default_max_segment_size
1056 k = self.encoding_param_k or self.default_encoding_param_k
1057 happy = self.encoding_param_happy or self.default_encoding_param_happy
1058 n = self.encoding_param_n or self.default_encoding_param_n
1061 def _got_size(file_size):
1062 # for small files, shrink the segment size to avoid wasting space
1063 segsize = min(max_segsize, file_size)
1064 # this must be a multiple of 'required_shares'==k
1065 segsize = mathutil.next_multiple(segsize, k)
1066 encoding_parameters = (k, happy, n, segsize)
1067 self._all_encoding_parameters = encoding_parameters
1068 return encoding_parameters
1069 d.addCallback(_got_size)
1072 class FileHandle(BaseUploadable):
1073 implements(IUploadable)
1075 def __init__(self, filehandle, convergence):
1077 Upload the data from the filehandle. If convergence is None then a
1078 random encryption key will be used, else the plaintext will be hashed,
1079 then the hash will be hashed together with the string in the
1080 "convergence" argument to form the encryption key.
1082 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1083 self._filehandle = filehandle
1085 self.convergence = convergence
1088 def _get_encryption_key_convergent(self):
1089 if self._key is not None:
1090 return defer.succeed(self._key)
1093 # that sets self._size as a side-effect
1094 d.addCallback(lambda size: self.get_all_encoding_parameters())
1096 k, happy, n, segsize = params
1097 f = self._filehandle
1098 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1103 data = f.read(BLOCKSIZE)
1106 enckey_hasher.update(data)
1107 # TODO: setting progress in a non-yielding loop is kind of
1108 # pointless, but I'm anticipating (perhaps prematurely) the
1109 # day when we use a slowjob or twisted's CooperatorService to
1110 # make this yield time to other jobs.
1111 bytes_read += len(data)
1113 self._status.set_progress(0, float(bytes_read)/self._size)
1115 self._key = enckey_hasher.digest()
1117 self._status.set_progress(0, 1.0)
1118 assert len(self._key) == 16
1123 def _get_encryption_key_random(self):
1124 if self._key is None:
1125 self._key = os.urandom(16)
1126 return defer.succeed(self._key)
1128 def get_encryption_key(self):
1129 if self.convergence is not None:
1130 return self._get_encryption_key_convergent()
1132 return self._get_encryption_key_random()
1135 if self._size is not None:
1136 return defer.succeed(self._size)
1137 self._filehandle.seek(0,2)
1138 size = self._filehandle.tell()
1140 self._filehandle.seek(0)
1141 return defer.succeed(size)
1143 def read(self, length):
1144 return defer.succeed([self._filehandle.read(length)])
1147 # the originator of the filehandle reserves the right to close it
1150 class FileName(FileHandle):
1151 def __init__(self, filename, convergence):
1153 Upload the data from the filename. If convergence is None then a
1154 random encryption key will be used, else the plaintext will be hashed,
1155 then the hash will be hashed together with the string in the
1156 "convergence" argument to form the encryption key.
1158 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1159 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1161 FileHandle.close(self)
1162 self._filehandle.close()
1164 class Data(FileHandle):
1165 def __init__(self, data, convergence):
1167 Upload the data from the data argument. If convergence is None then a
1168 random encryption key will be used, else the plaintext will be hashed,
1169 then the hash will be hashed together with the string in the
1170 "convergence" argument to form the encryption key.
1172 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1173 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1175 class Uploader(service.MultiService, log.PrefixingLogMixin):
1176 """I am a service that allows file uploading. I am a service-child of the
1179 implements(IUploader)
1181 URI_LIT_SIZE_THRESHOLD = 55
1182 MAX_UPLOAD_STATUSES = 10
1184 def __init__(self, helper_furl=None, stats_provider=None):
1185 self._helper_furl = helper_furl
1186 self.stats_provider = stats_provider
1188 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1189 self._all_upload_statuses = weakref.WeakKeyDictionary()
1190 self._recent_upload_statuses = []
1191 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1192 service.MultiService.__init__(self)
1194 def startService(self):
1195 service.MultiService.startService(self)
1196 if self._helper_furl:
1197 self.parent.tub.connectTo(self._helper_furl,
1200 def _got_helper(self, helper):
1201 self.log("got helper connection, getting versions")
1202 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1204 "application-version": "unknown: no get_version()",
1206 d = get_versioned_remote_reference(helper, default)
1207 d.addCallback(self._got_versioned_helper)
1209 def _got_versioned_helper(self, helper):
1210 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1211 if needed not in helper.version:
1212 raise InsufficientVersionError(needed, helper.version)
1213 self._helper = helper
1214 helper.notifyOnDisconnect(self._lost_helper)
1216 def _lost_helper(self):
1219 def get_helper_info(self):
1220 # return a tuple of (helper_furl_or_None, connected_bool)
1221 return (self._helper_furl, bool(self._helper))
1224 def upload(self, uploadable):
1226 Returns a Deferred that will fire with the UploadResults instance.
1231 uploadable = IUploadable(uploadable)
1232 d = uploadable.get_size()
1233 def _got_size(size):
1234 default_params = self.parent.get_encoding_parameters()
1235 precondition(isinstance(default_params, dict), default_params)
1236 precondition("max_segment_size" in default_params, default_params)
1237 uploadable.set_default_encoding_parameters(default_params)
1239 if self.stats_provider:
1240 self.stats_provider.count('uploader.files_uploaded', 1)
1241 self.stats_provider.count('uploader.bytes_uploaded', size)
1243 if size <= self.URI_LIT_SIZE_THRESHOLD:
1244 uploader = LiteralUploader(self.parent)
1245 return uploader.start(uploadable)
1247 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1248 d2 = defer.succeed(None)
1250 uploader = AssistedUploader(self._helper)
1251 d2.addCallback(lambda x: eu.get_storage_index())
1252 d2.addCallback(lambda si: uploader.start(eu, si))
1254 uploader = CHKUploader(self.parent)
1255 d2.addCallback(lambda x: uploader.start(eu))
1257 self._add_upload(uploader)
1258 def turn_verifycap_into_read_cap(uploadresults):
1259 # Generate the uri from the verifycap plus the key.
1260 d3 = uploadable.get_encryption_key()
1261 def put_readcap_into_results(key):
1262 v = uri.from_string(uploadresults.verifycapstr)
1263 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1264 uploadresults.uri = r.to_string()
1265 return uploadresults
1266 d3.addCallback(put_readcap_into_results)
1268 d2.addCallback(turn_verifycap_into_read_cap)
1270 d.addCallback(_got_size)
1277 def _add_upload(self, uploader):
1278 s = uploader.get_upload_status()
1279 self._all_uploads[uploader] = None
1280 self._all_upload_statuses[s] = None
1281 self._recent_upload_statuses.append(s)
1282 while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
1283 self._recent_upload_statuses.pop(0)
1285 def list_all_upload_statuses(self):
1286 for us in self._all_upload_statuses: