2 import os, time, weakref, itertools
3 from zope.interface import implements
4 from twisted.python import failure
5 from twisted.internet import defer
6 from twisted.application import service
7 from foolscap import Referenceable, Copyable, RemoteCopy
8 from foolscap import eventual
9 from foolscap.logging import log
11 from allmydata.util.hashutil import file_renewal_secret_hash, \
12 file_cancel_secret_hash, bucket_renewal_secret_hash, \
13 bucket_cancel_secret_hash, plaintext_hasher, \
14 storage_index_hash, plaintext_segment_hasher, convergence_hasher
15 from allmydata import encode, storage, hashtree, uri
16 from allmydata.util import base32, idlib, mathutil
17 from allmydata.util.assertutil import precondition
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus
20 from pycryptopp.cipher.aes import AES
22 from cStringIO import StringIO
31 class HaveAllPeersError(Exception):
32 # we use this to jump out of the loop
35 # this wants to live in storage, not here
36 class TooFullError(Exception):
39 class UploadResults(Copyable, RemoteCopy):
40 implements(IUploadResults)
41 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
45 self.timings = {} # dict of name to number of seconds
46 self.sharemap = {} # dict of shnum to placement string
47 self.servermap = {} # dict of peerid to set(shnums)
49 self.ciphertext_fetched = None # how much the helper fetched
51 self.preexisting_shares = None # count of shares already present
52 self.pushed_shares = None # count of shares we pushed
55 # our current uri_extension is 846 bytes for small files, a few bytes
56 # more for larger ones (since the filesize is encoded in decimal in a
57 # few places). Ask for a little bit more just in case we need it. If
58 # the extension changes size, we can change EXTENSION_SIZE to
59 # allocate a more accurate amount of space.
63 def __init__(self, peerid, storage_server,
64 sharesize, blocksize, num_segments, num_share_hashes,
66 bucket_renewal_secret, bucket_cancel_secret):
67 precondition(isinstance(peerid, str), peerid)
68 precondition(len(peerid) == 20, peerid)
70 self._storageserver = storage_server # to an RIStorageServer
71 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
72 self.sharesize = sharesize
73 as = storage.allocated_size(sharesize,
77 self.allocated_size = as
79 self.blocksize = blocksize
80 self.num_segments = num_segments
81 self.num_share_hashes = num_share_hashes
82 self.storage_index = storage_index
84 self.renew_secret = bucket_renewal_secret
85 self.cancel_secret = bucket_cancel_secret
88 return ("<PeerTracker for peer %s and SI %s>"
89 % (idlib.shortnodeid_b2a(self.peerid),
90 storage.si_b2a(self.storage_index)[:5]))
92 def query(self, sharenums):
93 d = self._storageserver.callRemote("allocate_buckets",
99 canary=Referenceable())
100 d.addCallback(self._got_reply)
103 def _got_reply(self, (alreadygot, buckets)):
104 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
106 for sharenum, rref in buckets.iteritems():
107 bp = storage.WriteBucketProxy(rref, self.sharesize,
110 self.num_share_hashes,
114 self.buckets.update(b)
115 return (alreadygot, set(b.keys()))
117 class Tahoe2PeerSelector:
119 def __init__(self, upload_id, logparent=None, upload_status=None):
120 self.upload_id = upload_id
121 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
123 self.num_peers_contacted = 0
124 self.last_failure_msg = None
125 self._status = IUploadStatus(upload_status)
126 self._log_parent = log.msg("%s starting" % self, parent=logparent)
129 return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
131 def get_shareholders(self, client,
132 storage_index, share_size, block_size,
133 num_segments, total_shares, shares_of_happiness):
135 @return: (used_peers, already_peers), where used_peers is a set of
136 PeerTracker instances that have agreed to hold some shares
137 for us (the shnum is stashed inside the PeerTracker),
138 and already_peers is a dict mapping shnum to a peer
139 which claims to already have the share.
143 self._status.set_status("Contacting Peers..")
145 self.total_shares = total_shares
146 self.shares_of_happiness = shares_of_happiness
148 self.homeless_shares = range(total_shares)
149 # self.uncontacted_peers = list() # peers we haven't asked yet
150 self.contacted_peers = [] # peers worth asking again
151 self.contacted_peers2 = [] # peers that we have asked again
152 self._started_second_pass = False
153 self.use_peers = set() # PeerTrackers that have shares assigned to them
154 self.preexisting_shares = {} # sharenum -> peerid holding the share
156 peers = client.get_permuted_peers("storage", storage_index)
158 raise encode.NotEnoughPeersError("client gave us zero peers")
160 # figure out how much space to ask for
162 # this needed_hashes computation should mirror
163 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
164 # (instead of a HashTree) because we don't require actual hashing
165 # just to count the levels.
166 ht = hashtree.IncompleteHashTree(total_shares)
167 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
169 # decide upon the renewal/cancel secrets, to include them in the
170 # allocat_buckets query.
171 client_renewal_secret = client.get_renewal_secret()
172 client_cancel_secret = client.get_cancel_secret()
174 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
176 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
179 trackers = [ PeerTracker(peerid, conn,
180 share_size, block_size,
181 num_segments, num_share_hashes,
183 bucket_renewal_secret_hash(file_renewal_secret,
185 bucket_cancel_secret_hash(file_cancel_secret,
188 for (peerid, conn) in peers ]
189 self.uncontacted_peers = trackers
191 d = defer.maybeDeferred(self._loop)
195 if not self.homeless_shares:
197 msg = ("placed all %d shares, "
198 "sent %d queries to %d peers, "
199 "%d queries placed some shares, %d placed none, "
202 self.query_count, self.num_peers_contacted,
203 self.good_query_count, self.bad_query_count,
205 log.msg("peer selection successful for %s: %s" % (self, msg),
206 parent=self._log_parent)
207 return (self.use_peers, self.preexisting_shares)
209 if self.uncontacted_peers:
210 peer = self.uncontacted_peers.pop(0)
211 # TODO: don't pre-convert all peerids to PeerTrackers
212 assert isinstance(peer, PeerTracker)
214 shares_to_ask = set([self.homeless_shares.pop(0)])
215 self.query_count += 1
216 self.num_peers_contacted += 1
218 self._status.set_status("Contacting Peers [%s] (first query),"
220 % (idlib.shortnodeid_b2a(peer.peerid),
221 len(self.homeless_shares)))
222 d = peer.query(shares_to_ask)
223 d.addBoth(self._got_response, peer, shares_to_ask,
224 self.contacted_peers)
226 elif self.contacted_peers:
227 # ask a peer that we've already asked.
228 if not self._started_second_pass:
229 log.msg("starting second pass", parent=self._log_parent,
231 self._started_second_pass = True
232 num_shares = mathutil.div_ceil(len(self.homeless_shares),
233 len(self.contacted_peers))
234 peer = self.contacted_peers.pop(0)
235 shares_to_ask = set(self.homeless_shares[:num_shares])
236 self.homeless_shares[:num_shares] = []
237 self.query_count += 1
239 self._status.set_status("Contacting Peers [%s] (second query),"
241 % (idlib.shortnodeid_b2a(peer.peerid),
242 len(self.homeless_shares)))
243 d = peer.query(shares_to_ask)
244 d.addBoth(self._got_response, peer, shares_to_ask,
245 self.contacted_peers2)
247 elif self.contacted_peers2:
248 # we've finished the second-or-later pass. Move all the remaining
249 # peers back into self.contacted_peers for the next pass.
250 self.contacted_peers.extend(self.contacted_peers2)
251 self.contacted_peers[:] = []
254 # no more peers. If we haven't placed enough shares, we fail.
255 placed_shares = self.total_shares - len(self.homeless_shares)
256 if placed_shares < self.shares_of_happiness:
257 msg = ("placed %d shares out of %d total (%d homeless), "
258 "sent %d queries to %d peers, "
259 "%d queries placed some shares, %d placed none, "
261 (self.total_shares - len(self.homeless_shares),
262 self.total_shares, len(self.homeless_shares),
263 self.query_count, self.num_peers_contacted,
264 self.good_query_count, self.bad_query_count,
266 msg = "peer selection failed for %s: %s" % (self, msg)
267 if self.last_failure_msg:
268 msg += " (%s)" % (self.last_failure_msg,)
269 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
270 raise encode.NotEnoughPeersError(msg)
272 # we placed enough to be happy, so we're done
274 self._status.set_status("Placed all shares")
275 return self.use_peers
277 def _got_response(self, res, peer, shares_to_ask, put_peer_here):
278 if isinstance(res, failure.Failure):
279 # This is unusual, and probably indicates a bug or a network
281 log.msg("%s got error during peer selection: %s" % (peer, res),
282 level=log.UNUSUAL, parent=self._log_parent)
283 self.error_count += 1
284 self.homeless_shares = list(shares_to_ask) + self.homeless_shares
285 if (self.uncontacted_peers
286 or self.contacted_peers
287 or self.contacted_peers2):
288 # there is still hope, so just loop
291 # No more peers, so this upload might fail (it depends upon
292 # whether we've hit shares_of_happiness or not). Log the last
293 # failure we got: if a coding error causes all peers to fail
294 # in the same way, this allows the common failure to be seen
295 # by the uploader and should help with debugging
296 msg = ("last failure (from %s) was: %s" % (peer, res))
297 self.last_failure_msg = msg
299 (alreadygot, allocated) = res
300 log.msg("response from peer %s: alreadygot=%s, allocated=%s"
301 % (idlib.shortnodeid_b2a(peer.peerid),
302 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
303 level=log.NOISY, parent=self._log_parent)
306 self.preexisting_shares[s] = peer.peerid
307 if s in self.homeless_shares:
308 self.homeless_shares.remove(s)
311 # the PeerTracker will remember which shares were allocated on
312 # that peer. We just have to remember to use them.
314 self.use_peers.add(peer)
317 not_yet_present = set(shares_to_ask) - set(alreadygot)
318 still_homeless = not_yet_present - set(allocated)
321 # they accepted or already had at least one share, so
322 # progress has been made
323 self.good_query_count += 1
325 self.bad_query_count += 1
328 # In networks with lots of space, this is very unusual and
329 # probably indicates an error. In networks with peers that
330 # are full, it is merely unusual. In networks that are very
331 # full, it is common, and many uploads will fail. In most
332 # cases, this is obviously not fatal, and we'll just use some
335 # some shares are still homeless, keep trying to find them a
336 # home. The ones that were rejected get first priority.
337 self.homeless_shares = (list(still_homeless)
338 + self.homeless_shares)
339 # Since they were unable to accept all of our requests, so it
340 # is safe to assume that asking them again won't help.
342 # if they *were* able to accept everything, they might be
343 # willing to accept even more.
344 put_peer_here.append(peer)
350 class EncryptAnUploadable:
351 """This is a wrapper that takes an IUploadable and provides
352 IEncryptedUploadable."""
353 implements(IEncryptedUploadable)
356 def __init__(self, original, log_parent=None):
357 self.original = IUploadable(original)
358 self._log_number = log_parent
359 self._encryptor = None
360 self._plaintext_hasher = plaintext_hasher()
361 self._plaintext_segment_hasher = None
362 self._plaintext_segment_hashes = []
363 self._encoding_parameters = None
364 self._file_size = None
365 self._ciphertext_bytes_read = 0
368 def set_upload_status(self, upload_status):
369 self._status = IUploadStatus(upload_status)
370 self.original.set_upload_status(upload_status)
372 def log(self, *args, **kwargs):
373 if "facility" not in kwargs:
374 kwargs["facility"] = "upload.encryption"
375 if "parent" not in kwargs:
376 kwargs["parent"] = self._log_number
377 return log.msg(*args, **kwargs)
380 if self._file_size is not None:
381 return defer.succeed(self._file_size)
382 d = self.original.get_size()
384 self._file_size = size
386 self._status.set_size(size)
388 d.addCallback(_got_size)
391 def get_all_encoding_parameters(self):
392 if self._encoding_parameters is not None:
393 return defer.succeed(self._encoding_parameters)
394 d = self.original.get_all_encoding_parameters()
395 def _got(encoding_parameters):
396 (k, happy, n, segsize) = encoding_parameters
397 self._segment_size = segsize # used by segment hashers
398 self._encoding_parameters = encoding_parameters
399 self.log("my encoding parameters: %s" % (encoding_parameters,),
401 return encoding_parameters
405 def _get_encryptor(self):
407 return defer.succeed(self._encryptor)
409 d = self.original.get_encryption_key()
414 storage_index = storage_index_hash(key)
415 assert isinstance(storage_index, str)
416 # There's no point to having the SI be longer than the key, so we
417 # specify that it is truncated to the same 128 bits as the AES key.
418 assert len(storage_index) == 16 # SHA-256 truncated to 128b
419 self._storage_index = storage_index
421 self._status.set_storage_index(storage_index)
426 def get_storage_index(self):
427 d = self._get_encryptor()
428 d.addCallback(lambda res: self._storage_index)
431 def _get_segment_hasher(self):
432 p = self._plaintext_segment_hasher
434 left = self._segment_size - self._plaintext_segment_hashed_bytes
436 p = plaintext_segment_hasher()
437 self._plaintext_segment_hasher = p
438 self._plaintext_segment_hashed_bytes = 0
439 return p, self._segment_size
441 def _update_segment_hash(self, chunk):
443 while offset < len(chunk):
444 p, segment_left = self._get_segment_hasher()
445 chunk_left = len(chunk) - offset
446 this_segment = min(chunk_left, segment_left)
447 p.update(chunk[offset:offset+this_segment])
448 self._plaintext_segment_hashed_bytes += this_segment
450 if self._plaintext_segment_hashed_bytes == self._segment_size:
451 # we've filled this segment
452 self._plaintext_segment_hashes.append(p.digest())
453 self._plaintext_segment_hasher = None
454 self.log("closed hash [%d]: %dB" %
455 (len(self._plaintext_segment_hashes)-1,
456 self._plaintext_segment_hashed_bytes),
458 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
459 segnum=len(self._plaintext_segment_hashes)-1,
460 hash=base32.b2a(p.digest()),
463 offset += this_segment
466 def read_encrypted(self, length, hash_only):
467 # make sure our parameters have been set up first
468 d = self.get_all_encoding_parameters()
470 d.addCallback(lambda ignored: self.get_size())
471 d.addCallback(lambda ignored: self._get_encryptor())
472 # then fetch and encrypt the plaintext. The unusual structure here
473 # (passing a Deferred *into* a function) is needed to avoid
474 # overflowing the stack: Deferreds don't optimize out tail recursion.
475 # We also pass in a list, to which _read_encrypted will append
478 d2 = defer.Deferred()
479 d.addCallback(lambda ignored:
480 self._read_encrypted(length, ciphertext, hash_only, d2))
481 d.addCallback(lambda ignored: d2)
484 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
486 fire_when_done.callback(ciphertext)
488 # tolerate large length= values without consuming a lot of RAM by
489 # reading just a chunk (say 50kB) at a time. This only really matters
490 # when hash_only==True (i.e. resuming an interrupted upload), since
491 # that's the case where we will be skipping over a lot of data.
492 size = min(remaining, self.CHUNKSIZE)
493 remaining = remaining - size
494 # read a chunk of plaintext..
495 d = defer.maybeDeferred(self.original.read, size)
496 # N.B.: if read() is synchronous, then since everything else is
497 # actually synchronous too, we'd blow the stack unless we stall for a
498 # tick. Once you accept a Deferred from IUploadable.read(), you must
499 # be prepared to have it fire immediately too.
500 d.addCallback(eventual.fireEventually)
501 def _good(plaintext):
503 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
504 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
505 ciphertext.extend(ct)
506 self._read_encrypted(remaining, ciphertext, hash_only,
509 fire_when_done.errback(why)
514 def _hash_and_encrypt_plaintext(self, data, hash_only):
515 assert isinstance(data, (tuple, list)), type(data)
518 # we use data.pop(0) instead of 'for chunk in data' to save
519 # memory: each chunk is destroyed as soon as we're done with it.
523 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
525 bytes_processed += len(chunk)
526 self._plaintext_hasher.update(chunk)
527 self._update_segment_hash(chunk)
528 # TODO: we have to encrypt the data (even if hash_only==True)
529 # because pycryptopp's AES-CTR implementation doesn't offer a
530 # way to change the counter value. Once pycryptopp acquires
531 # this ability, change this to simply update the counter
532 # before each call to (hash_only==False) _encryptor.process()
533 ciphertext = self._encryptor.process(chunk)
535 self.log(" skipping encryption", level=log.NOISY)
537 cryptdata.append(ciphertext)
540 self._ciphertext_bytes_read += bytes_processed
542 progress = float(self._ciphertext_bytes_read) / self._file_size
543 self._status.set_progress(1, progress)
547 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
548 if len(self._plaintext_segment_hashes) < num_segments:
549 # close out the last one
550 assert len(self._plaintext_segment_hashes) == num_segments-1
551 p, segment_left = self._get_segment_hasher()
552 self._plaintext_segment_hashes.append(p.digest())
553 del self._plaintext_segment_hasher
554 self.log("closing plaintext leaf hasher, hashed %d bytes" %
555 self._plaintext_segment_hashed_bytes,
557 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
558 segnum=len(self._plaintext_segment_hashes)-1,
559 hash=base32.b2a(p.digest()),
561 assert len(self._plaintext_segment_hashes) == num_segments
562 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
564 def get_plaintext_hash(self):
565 h = self._plaintext_hasher.digest()
566 return defer.succeed(h)
569 return self.original.close()
572 implements(IUploadStatus)
573 statusid_counter = itertools.count(0)
576 self.storage_index = None
579 self.status = "Not started"
580 self.progress = [0.0, 0.0, 0.0]
583 self.counter = self.statusid_counter.next()
584 self.started = time.time()
586 def get_started(self):
588 def get_storage_index(self):
589 return self.storage_index
592 def using_helper(self):
594 def get_status(self):
596 def get_progress(self):
597 return tuple(self.progress)
598 def get_active(self):
600 def get_results(self):
602 def get_counter(self):
605 def set_storage_index(self, si):
606 self.storage_index = si
607 def set_size(self, size):
609 def set_helper(self, helper):
611 def set_status(self, status):
613 def set_progress(self, which, value):
614 # [0]: chk, [1]: ciphertext, [2]: encode+push
615 self.progress[which] = value
616 def set_active(self, value):
618 def set_results(self, value):
622 peer_selector_class = Tahoe2PeerSelector
624 def __init__(self, client):
625 self._client = client
626 self._log_number = self._client.log("CHKUploader starting")
628 self._results = UploadResults()
629 self._storage_index = None
630 self._upload_status = UploadStatus()
631 self._upload_status.set_helper(False)
632 self._upload_status.set_active(True)
633 self._upload_status.set_results(self._results)
635 def log(self, *args, **kwargs):
636 if "parent" not in kwargs:
637 kwargs["parent"] = self._log_number
638 if "facility" not in kwargs:
639 kwargs["facility"] = "tahoe.upload"
640 return self._client.log(*args, **kwargs)
642 def start(self, uploadable):
643 """Start uploading the file.
645 This method returns a Deferred that will fire with the URI (a
648 self._started = time.time()
649 uploadable = IUploadable(uploadable)
650 self.log("starting upload of %s" % uploadable)
652 eu = EncryptAnUploadable(uploadable, self._log_number)
653 eu.set_upload_status(self._upload_status)
654 d = self.start_encrypted(eu)
656 d1 = uploadable.get_encryption_key()
657 d1.addCallback(lambda key: self._compute_uri(res, key))
659 d.addCallback(_uploaded)
661 self._upload_status.set_active(False)
667 """Call this is the upload must be abandoned before it completes.
668 This will tell the shareholders to delete their partial shares. I
669 return a Deferred that fires when these messages have been acked."""
670 if not self._encoder:
671 # how did you call abort() before calling start() ?
672 return defer.succeed(None)
673 return self._encoder.abort()
675 def start_encrypted(self, encrypted):
676 eu = IEncryptedUploadable(encrypted)
678 started = time.time()
679 self._encoder = e = encode.Encoder(self._log_number,
681 d = e.set_encrypted_uploadable(eu)
682 d.addCallback(self.locate_all_shareholders, started)
683 d.addCallback(self.set_shareholders, e)
684 d.addCallback(lambda res: e.start())
685 d.addCallback(self._encrypted_done)
686 # this fires with the uri_extension_hash and other data
689 def locate_all_shareholders(self, encoder, started):
690 peer_selection_started = now = time.time()
691 self._storage_index_elapsed = now - started
692 storage_index = encoder.get_param("storage_index")
693 self._storage_index = storage_index
694 upload_id = storage.si_b2a(storage_index)[:5]
695 self.log("using storage index %s" % upload_id)
696 peer_selector = self.peer_selector_class(upload_id, self._log_number,
699 share_size = encoder.get_param("share_size")
700 block_size = encoder.get_param("block_size")
701 num_segments = encoder.get_param("num_segments")
702 k,desired,n = encoder.get_param("share_counts")
704 self._peer_selection_started = time.time()
705 d = peer_selector.get_shareholders(self._client, storage_index,
706 share_size, block_size,
707 num_segments, n, desired)
709 self._peer_selection_elapsed = time.time() - peer_selection_started
714 def set_shareholders(self, (used_peers, already_peers), encoder):
716 @param used_peers: a sequence of PeerTracker objects
717 @paran already_peers: a dict mapping sharenum to a peerid that
718 claims to already have this share
720 self.log("_send_shares, used_peers is %s" % (used_peers,))
721 # record already-present shares in self._results
722 for (shnum, peerid) in already_peers.items():
723 peerid_s = idlib.shortnodeid_b2a(peerid)
724 self._results.sharemap[shnum] = "Found on [%s]" % peerid_s
725 if peerid not in self._results.servermap:
726 self._results.servermap[peerid] = set()
727 self._results.servermap[peerid].add(shnum)
728 self._results.preexisting_shares = len(already_peers)
731 for peer in used_peers:
732 assert isinstance(peer, PeerTracker)
734 for peer in used_peers:
735 buckets.update(peer.buckets)
736 for shnum in peer.buckets:
737 self._sharemap[shnum] = peer
738 assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
739 encoder.set_shareholders(buckets)
741 def _encrypted_done(self, res):
743 for shnum in self._encoder.get_shares_placed():
744 peer_tracker = self._sharemap[shnum]
745 peerid = peer_tracker.peerid
746 peerid_s = idlib.shortnodeid_b2a(peerid)
747 r.sharemap[shnum] = "Placed on [%s]" % peerid_s
748 if peerid not in r.servermap:
749 r.servermap[peerid] = set()
750 r.servermap[peerid].add(shnum)
751 r.pushed_shares = len(self._encoder.get_shares_placed())
753 r.file_size = self._encoder.file_size
754 r.timings["total"] = now - self._started
755 r.timings["storage_index"] = self._storage_index_elapsed
756 r.timings["peer_selection"] = self._peer_selection_elapsed
757 r.timings.update(self._encoder.get_times())
758 r.uri_extension_data = self._encoder.get_uri_extension_data()
761 def _compute_uri(self, (uri_extension_hash,
762 needed_shares, total_shares, size),
764 u = uri.CHKFileURI(key=key,
765 uri_extension_hash=uri_extension_hash,
766 needed_shares=needed_shares,
767 total_shares=total_shares,
771 r.uri = u.to_string()
774 def get_upload_status(self):
775 return self._upload_status
777 def read_this_many_bytes(uploadable, size, prepend_data=[]):
779 return defer.succeed([])
780 d = uploadable.read(size)
782 assert isinstance(data, list)
783 bytes = sum([len(piece) for piece in data])
786 remaining = size - bytes
788 return read_this_many_bytes(uploadable, remaining,
790 return prepend_data + data
794 class LiteralUploader:
796 def __init__(self, client):
797 self._client = client
798 self._results = UploadResults()
799 self._status = s = UploadStatus()
800 s.set_storage_index(None)
802 s.set_progress(0, 1.0)
804 s.set_results(self._results)
806 def start(self, uploadable):
807 uploadable = IUploadable(uploadable)
808 d = uploadable.get_size()
811 self._status.set_size(size)
812 self._results.file_size = size
813 return read_this_many_bytes(uploadable, size)
814 d.addCallback(_got_size)
815 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
816 d.addCallback(lambda u: u.to_string())
817 d.addCallback(self._build_results)
820 def _build_results(self, uri):
821 self._results.uri = uri
822 self._status.set_status("Done")
823 self._status.set_progress(1, 1.0)
824 self._status.set_progress(2, 1.0)
830 def get_upload_status(self):
833 class RemoteEncryptedUploadable(Referenceable):
834 implements(RIEncryptedUploadable)
836 def __init__(self, encrypted_uploadable, upload_status):
837 self._eu = IEncryptedUploadable(encrypted_uploadable)
840 self._status = IUploadStatus(upload_status)
841 # we are responsible for updating the status string while we run, and
842 # for setting the ciphertext-fetch progress.
846 if self._size is not None:
847 return defer.succeed(self._size)
848 d = self._eu.get_size()
852 d.addCallback(_got_size)
855 def remote_get_size(self):
856 return self.get_size()
857 def remote_get_all_encoding_parameters(self):
858 return self._eu.get_all_encoding_parameters()
860 def _read_encrypted(self, length, hash_only):
861 d = self._eu.read_encrypted(length, hash_only)
864 self._offset += length
866 size = sum([len(data) for data in strings])
872 def remote_read_encrypted(self, offset, length):
873 # we don't support seek backwards, but we allow skipping forwards
874 precondition(offset >= 0, offset)
875 precondition(length >= 0, length)
876 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
878 precondition(offset >= self._offset, offset, self._offset)
879 if offset > self._offset:
880 # read the data from disk anyways, to build up the hash tree
881 skip = offset - self._offset
882 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
883 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
884 d = self._read_encrypted(skip, hash_only=True)
886 d = defer.succeed(None)
888 def _at_correct_offset(res):
889 assert offset == self._offset, "%d != %d" % (offset, self._offset)
890 return self._read_encrypted(length, hash_only=False)
891 d.addCallback(_at_correct_offset)
894 size = sum([len(data) for data in strings])
895 self._bytes_sent += size
900 def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
901 log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
902 (first, last-1, num_segments),
904 d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
907 def remote_get_plaintext_hash(self):
908 return self._eu.get_plaintext_hash()
909 def remote_close(self):
910 return self._eu.close()
913 class AssistedUploader:
915 def __init__(self, helper):
916 self._helper = helper
917 self._log_number = log.msg("AssistedUploader starting")
918 self._storage_index = None
919 self._upload_status = s = UploadStatus()
923 def log(self, *args, **kwargs):
924 if "parent" not in kwargs:
925 kwargs["parent"] = self._log_number
926 return log.msg(*args, **kwargs)
928 def start(self, uploadable):
929 self._started = time.time()
930 u = IUploadable(uploadable)
931 eu = EncryptAnUploadable(u, self._log_number)
932 eu.set_upload_status(self._upload_status)
933 self._encuploadable = eu
935 d.addCallback(self._got_size)
936 d.addCallback(lambda res: eu.get_all_encoding_parameters())
937 d.addCallback(self._got_all_encoding_parameters)
938 # when we get the encryption key, that will also compute the storage
939 # index, so this only takes one pass.
940 # TODO: I'm not sure it's cool to switch back and forth between
941 # the Uploadable and the IEncryptedUploadable that wraps it.
942 d.addCallback(lambda res: u.get_encryption_key())
943 d.addCallback(self._got_encryption_key)
944 d.addCallback(lambda res: eu.get_storage_index())
945 d.addCallback(self._got_storage_index)
946 d.addCallback(self._contact_helper)
947 d.addCallback(self._build_readcap)
949 self._upload_status.set_active(False)
954 def _got_size(self, size):
956 self._upload_status.set_size(size)
958 def _got_all_encoding_parameters(self, params):
959 k, happy, n, segment_size = params
960 # stash these for URI generation later
961 self._needed_shares = k
962 self._total_shares = n
963 self._segment_size = segment_size
965 def _got_encryption_key(self, key):
968 def _got_storage_index(self, storage_index):
969 self._storage_index = storage_index
972 def _contact_helper(self, res):
973 now = self._time_contacting_helper_start = time.time()
974 self._storage_index_elapsed = now - self._started
975 self.log(format="contacting helper for SI %(si)s..",
976 si=storage.si_b2a(self._storage_index))
977 self._upload_status.set_status("Contacting Helper")
978 d = self._helper.callRemote("upload_chk", self._storage_index)
979 d.addCallback(self._contacted_helper)
982 def _contacted_helper(self, (upload_results, upload_helper)):
984 elapsed = now - self._time_contacting_helper_start
985 self._elapsed_time_contacting_helper = elapsed
987 self.log("helper says we need to upload")
988 self._upload_status.set_status("Uploading Ciphertext")
989 # we need to upload the file
990 reu = RemoteEncryptedUploadable(self._encuploadable,
992 # let it pre-compute the size for progress purposes
994 d.addCallback(lambda ignored:
995 upload_helper.callRemote("upload", reu))
996 # this Deferred will fire with the upload results
998 self.log("helper says file is already uploaded")
999 self._upload_status.set_progress(1, 1.0)
1000 self._upload_status.set_results(upload_results)
1001 return upload_results
1003 def _build_readcap(self, upload_results):
1004 self.log("upload finished, building readcap")
1005 self._upload_status.set_status("Building Readcap")
1007 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1008 assert r.uri_extension_data["total_shares"] == self._total_shares
1009 assert r.uri_extension_data["segment_size"] == self._segment_size
1010 assert r.uri_extension_data["size"] == self._size
1011 u = uri.CHKFileURI(key=self._key,
1012 uri_extension_hash=r.uri_extension_hash,
1013 needed_shares=self._needed_shares,
1014 total_shares=self._total_shares,
1017 r.uri = u.to_string()
1019 r.file_size = self._size
1020 r.timings["storage_index"] = self._storage_index_elapsed
1021 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1022 if "total" in r.timings:
1023 r.timings["helper_total"] = r.timings["total"]
1024 r.timings["total"] = now - self._started
1025 self._upload_status.set_status("Done")
1026 self._upload_status.set_results(r)
1029 def get_upload_status(self):
1030 return self._upload_status
1032 class BaseUploadable:
1033 default_max_segment_size = 128*KiB # overridden by max_segment_size
1034 default_encoding_param_k = 3 # overridden by encoding_parameters
1035 default_encoding_param_happy = 7
1036 default_encoding_param_n = 10
1038 max_segment_size = None
1039 encoding_param_k = None
1040 encoding_param_happy = None
1041 encoding_param_n = None
1043 _all_encoding_parameters = None
1046 def set_upload_status(self, upload_status):
1047 self._status = IUploadStatus(upload_status)
1049 def set_default_encoding_parameters(self, default_params):
1050 assert isinstance(default_params, dict)
1051 for k,v in default_params.items():
1052 precondition(isinstance(k, str), k, v)
1053 precondition(isinstance(v, int), k, v)
1054 if "k" in default_params:
1055 self.default_encoding_param_k = default_params["k"]
1056 if "happy" in default_params:
1057 self.default_encoding_param_happy = default_params["happy"]
1058 if "n" in default_params:
1059 self.default_encoding_param_n = default_params["n"]
1060 if "max_segment_size" in default_params:
1061 self.default_max_segment_size = default_params["max_segment_size"]
1063 def get_all_encoding_parameters(self):
1064 if self._all_encoding_parameters:
1065 return defer.succeed(self._all_encoding_parameters)
1067 max_segsize = self.max_segment_size or self.default_max_segment_size
1068 k = self.encoding_param_k or self.default_encoding_param_k
1069 happy = self.encoding_param_happy or self.default_encoding_param_happy
1070 n = self.encoding_param_n or self.default_encoding_param_n
1073 def _got_size(file_size):
1074 # for small files, shrink the segment size to avoid wasting space
1075 segsize = min(max_segsize, file_size)
1076 # this must be a multiple of 'required_shares'==k
1077 segsize = mathutil.next_multiple(segsize, k)
1078 encoding_parameters = (k, happy, n, segsize)
1079 self._all_encoding_parameters = encoding_parameters
1080 return encoding_parameters
1081 d.addCallback(_got_size)
1084 class FileHandle(BaseUploadable):
1085 implements(IUploadable)
1087 def __init__(self, filehandle, convergence):
1089 Upload the data from the filehandle. If convergence is None then a
1090 random encryption key will be used, else the plaintext will be hashed,
1091 then the hash will be hashed together with the string in the
1092 "convergence" argument to form the encryption key."
1094 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1095 self._filehandle = filehandle
1097 self.convergence = convergence
1100 def _get_encryption_key_convergent(self):
1101 if self._key is not None:
1102 return defer.succeed(self._key)
1105 # that sets self._size as a side-effect
1106 d.addCallback(lambda size: self.get_all_encoding_parameters())
1108 k, happy, n, segsize = params
1109 f = self._filehandle
1110 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1115 data = f.read(BLOCKSIZE)
1118 enckey_hasher.update(data)
1119 # TODO: setting progress in a non-yielding loop is kind of
1120 # pointless, but I'm anticipating (perhaps prematurely) the
1121 # day when we use a slowjob or twisted's CooperatorService to
1122 # make this yield time to other jobs.
1123 bytes_read += len(data)
1125 self._status.set_progress(0, float(bytes_read)/self._size)
1127 self._key = enckey_hasher.digest()
1129 self._status.set_progress(0, 1.0)
1130 assert len(self._key) == 16
1135 def _get_encryption_key_random(self):
1136 if self._key is None:
1137 self._key = os.urandom(16)
1138 return defer.succeed(self._key)
1140 def get_encryption_key(self):
1141 if self.convergence is not None:
1142 return self._get_encryption_key_convergent()
1144 return self._get_encryption_key_random()
1147 if self._size is not None:
1148 return defer.succeed(self._size)
1149 self._filehandle.seek(0,2)
1150 size = self._filehandle.tell()
1152 self._filehandle.seek(0)
1153 return defer.succeed(size)
1155 def read(self, length):
1156 return defer.succeed([self._filehandle.read(length)])
1159 # the originator of the filehandle reserves the right to close it
1162 class FileName(FileHandle):
1163 def __init__(self, filename, convergence):
1165 Upload the data from the filename. If convergence is None then a
1166 random encryption key will be used, else the plaintext will be hashed,
1167 then the hash will be hashed together with the string in the
1168 "convergence" argument to form the encryption key."
1170 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1171 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1173 FileHandle.close(self)
1174 self._filehandle.close()
1176 class Data(FileHandle):
1177 def __init__(self, data, convergence):
1179 Upload the data from the data argument. If convergence is None then a
1180 random encryption key will be used, else the plaintext will be hashed,
1181 then the hash will be hashed together with the string in the
1182 "convergence" argument to form the encryption key."
1184 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1185 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1187 class Uploader(service.MultiService):
1188 """I am a service that allows file uploading. I am a service-child of the
1191 implements(IUploader)
1193 uploader_class = CHKUploader
1194 URI_LIT_SIZE_THRESHOLD = 55
1195 MAX_UPLOAD_STATUSES = 10
1197 def __init__(self, helper_furl=None):
1198 self._helper_furl = helper_furl
1200 self._all_uploads = weakref.WeakKeyDictionary()
1201 self._recent_upload_status = []
1202 service.MultiService.__init__(self)
1204 def startService(self):
1205 service.MultiService.startService(self)
1206 if self._helper_furl:
1207 self.parent.tub.connectTo(self._helper_furl,
1210 def _got_helper(self, helper):
1211 self._helper = helper
1212 helper.notifyOnDisconnect(self._lost_helper)
1213 def _lost_helper(self):
1216 def get_helper_info(self):
1217 # return a tuple of (helper_furl_or_None, connected_bool)
1218 return (self._helper_furl, bool(self._helper))
1220 def upload(self, uploadable):
1221 # this returns the URI
1225 uploadable = IUploadable(uploadable)
1226 d = uploadable.get_size()
1227 def _got_size(size):
1228 default_params = self.parent.get_encoding_parameters()
1229 precondition(isinstance(default_params, dict), default_params)
1230 precondition("max_segment_size" in default_params, default_params)
1231 uploadable.set_default_encoding_parameters(default_params)
1232 if size <= self.URI_LIT_SIZE_THRESHOLD:
1233 uploader = LiteralUploader(self.parent)
1235 uploader = AssistedUploader(self._helper)
1237 uploader = self.uploader_class(self.parent)
1238 self._all_uploads[uploader] = None
1239 self._recent_upload_status.append(uploader.get_upload_status())
1240 while len(self._recent_upload_status) > self.MAX_UPLOAD_STATUSES:
1241 self._recent_upload_status.pop(0)
1242 return uploader.start(uploadable)
1243 d.addCallback(_got_size)
1250 def list_all_uploads(self):
1251 return self._all_uploads.keys()
1252 def list_active_uploads(self):
1253 return [u.get_upload_status() for u in self._all_uploads.keys()
1254 if u.get_upload_status().get_active()]
1255 def list_recent_uploads(self):
1256 return self._recent_upload_status