2 import os, time, weakref, itertools
3 from zope.interface import implements
4 from twisted.python import failure
5 from twisted.internet import defer
6 from twisted.application import service
7 from foolscap import Referenceable, Copyable, RemoteCopy
8 from foolscap import eventual
9 from foolscap.logging import log
11 from allmydata.util.hashutil import file_renewal_secret_hash, \
12 file_cancel_secret_hash, bucket_renewal_secret_hash, \
13 bucket_cancel_secret_hash, plaintext_hasher, \
14 storage_index_hash, plaintext_segment_hasher, content_hash_key_hasher
15 from allmydata import encode, storage, hashtree, uri
16 from allmydata.util import base32, idlib, mathutil
17 from allmydata.util.assertutil import precondition
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus
20 from pycryptopp.cipher.aes import AES
22 from cStringIO import StringIO
31 class HaveAllPeersError(Exception):
32 # we use this to jump out of the loop
35 # this wants to live in storage, not here
36 class TooFullError(Exception):
39 class UploadResults(Copyable, RemoteCopy):
40 implements(IUploadResults)
41 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
45 self.timings = {} # dict of name to number of seconds
46 self.sharemap = {} # dict of shnum to placement string
47 self.servermap = {} # dict of peerid to set(shnums)
49 self.ciphertext_fetched = None # how much the helper fetched
51 self.preexisting_shares = None # count of shares already present
52 self.pushed_shares = None # count of shares we pushed
55 # our current uri_extension is 846 bytes for small files, a few bytes
56 # more for larger ones (since the filesize is encoded in decimal in a
57 # few places). Ask for a little bit more just in case we need it. If
58 # the extension changes size, we can change EXTENSION_SIZE to
59 # allocate a more accurate amount of space.
63 def __init__(self, peerid, storage_server,
64 sharesize, blocksize, num_segments, num_share_hashes,
66 bucket_renewal_secret, bucket_cancel_secret):
67 precondition(isinstance(peerid, str), peerid)
68 precondition(len(peerid) == 20, peerid)
70 self._storageserver = storage_server # to an RIStorageServer
71 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
72 self.sharesize = sharesize
73 as = storage.allocated_size(sharesize,
77 self.allocated_size = as
79 self.blocksize = blocksize
80 self.num_segments = num_segments
81 self.num_share_hashes = num_share_hashes
82 self.storage_index = storage_index
84 self.renew_secret = bucket_renewal_secret
85 self.cancel_secret = bucket_cancel_secret
88 return ("<PeerTracker for peer %s and SI %s>"
89 % (idlib.shortnodeid_b2a(self.peerid),
90 storage.si_b2a(self.storage_index)[:5]))
92 def query(self, sharenums):
93 d = self._storageserver.callRemote("allocate_buckets",
99 canary=Referenceable())
100 d.addCallback(self._got_reply)
103 def _got_reply(self, (alreadygot, buckets)):
104 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
106 for sharenum, rref in buckets.iteritems():
107 bp = storage.WriteBucketProxy(rref, self.sharesize,
110 self.num_share_hashes,
114 self.buckets.update(b)
115 return (alreadygot, set(b.keys()))
117 class Tahoe2PeerSelector:
119 def __init__(self, upload_id, logparent=None, upload_status=None):
120 self.upload_id = upload_id
121 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
123 self.num_peers_contacted = 0
124 self.last_failure_msg = None
125 self._status = IUploadStatus(upload_status)
126 self._log_parent = log.msg("%s starting" % self, parent=logparent)
129 return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
131 def get_shareholders(self, client,
132 storage_index, share_size, block_size,
133 num_segments, total_shares, shares_of_happiness):
135 @return: (used_peers, already_peers), where used_peers is a set of
136 PeerTracker instances that have agreed to hold some shares
137 for us (the shnum is stashed inside the PeerTracker),
138 and already_peers is a dict mapping shnum to a peer
139 which claims to already have the share.
143 self._status.set_status("Contacting Peers..")
145 self.total_shares = total_shares
146 self.shares_of_happiness = shares_of_happiness
148 self.homeless_shares = range(total_shares)
149 # self.uncontacted_peers = list() # peers we haven't asked yet
150 self.contacted_peers = [] # peers worth asking again
151 self.contacted_peers2 = [] # peers that we have asked again
152 self._started_second_pass = False
153 self.use_peers = set() # PeerTrackers that have shares assigned to them
154 self.preexisting_shares = {} # sharenum -> peerid holding the share
156 peers = client.get_permuted_peers("storage", storage_index)
158 raise encode.NotEnoughPeersError("client gave us zero peers")
160 # figure out how much space to ask for
162 # this needed_hashes computation should mirror
163 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
164 # (instead of a HashTree) because we don't require actual hashing
165 # just to count the levels.
166 ht = hashtree.IncompleteHashTree(total_shares)
167 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
169 # decide upon the renewal/cancel secrets, to include them in the
170 # allocat_buckets query.
171 client_renewal_secret = client.get_renewal_secret()
172 client_cancel_secret = client.get_cancel_secret()
174 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
176 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
179 trackers = [ PeerTracker(peerid, conn,
180 share_size, block_size,
181 num_segments, num_share_hashes,
183 bucket_renewal_secret_hash(file_renewal_secret,
185 bucket_cancel_secret_hash(file_cancel_secret,
188 for (peerid, conn) in peers ]
189 self.uncontacted_peers = trackers
191 d = defer.maybeDeferred(self._loop)
195 if not self.homeless_shares:
197 msg = ("placed all %d shares, "
198 "sent %d queries to %d peers, "
199 "%d queries placed some shares, %d placed none, "
202 self.query_count, self.num_peers_contacted,
203 self.good_query_count, self.bad_query_count,
205 log.msg("peer selection successful for %s: %s" % (self, msg),
206 parent=self._log_parent)
207 return (self.use_peers, self.preexisting_shares)
209 if self.uncontacted_peers:
210 peer = self.uncontacted_peers.pop(0)
211 # TODO: don't pre-convert all peerids to PeerTrackers
212 assert isinstance(peer, PeerTracker)
214 shares_to_ask = set([self.homeless_shares.pop(0)])
215 self.query_count += 1
216 self.num_peers_contacted += 1
218 self._status.set_status("Contacting Peers [%s] (first query),"
220 % (idlib.shortnodeid_b2a(peer.peerid),
221 len(self.homeless_shares)))
222 d = peer.query(shares_to_ask)
223 d.addBoth(self._got_response, peer, shares_to_ask,
224 self.contacted_peers)
226 elif self.contacted_peers:
227 # ask a peer that we've already asked.
228 if not self._started_second_pass:
229 log.msg("starting second pass", parent=self._log_parent,
231 self._started_second_pass = True
232 num_shares = mathutil.div_ceil(len(self.homeless_shares),
233 len(self.contacted_peers))
234 peer = self.contacted_peers.pop(0)
235 shares_to_ask = set(self.homeless_shares[:num_shares])
236 self.homeless_shares[:num_shares] = []
237 self.query_count += 1
239 self._status.set_status("Contacting Peers [%s] (second query),"
241 % (idlib.shortnodeid_b2a(peer.peerid),
242 len(self.homeless_shares)))
243 d = peer.query(shares_to_ask)
244 d.addBoth(self._got_response, peer, shares_to_ask,
245 self.contacted_peers2)
247 elif self.contacted_peers2:
248 # we've finished the second-or-later pass. Move all the remaining
249 # peers back into self.contacted_peers for the next pass.
250 self.contacted_peers.extend(self.contacted_peers2)
251 self.contacted_peers[:] = []
254 # no more peers. If we haven't placed enough shares, we fail.
255 placed_shares = self.total_shares - len(self.homeless_shares)
256 if placed_shares < self.shares_of_happiness:
257 msg = ("placed %d shares out of %d total (%d homeless), "
258 "sent %d queries to %d peers, "
259 "%d queries placed some shares, %d placed none, "
261 (self.total_shares - len(self.homeless_shares),
262 self.total_shares, len(self.homeless_shares),
263 self.query_count, self.num_peers_contacted,
264 self.good_query_count, self.bad_query_count,
266 msg = "peer selection failed for %s: %s" % (self, msg)
267 if self.last_failure_msg:
268 msg += " (%s)" % (self.last_failure_msg,)
269 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
270 raise encode.NotEnoughPeersError(msg)
272 # we placed enough to be happy, so we're done
274 self._status.set_status("Placed all shares")
275 return self.use_peers
277 def _got_response(self, res, peer, shares_to_ask, put_peer_here):
278 if isinstance(res, failure.Failure):
279 # This is unusual, and probably indicates a bug or a network
281 log.msg("%s got error during peer selection: %s" % (peer, res),
282 level=log.UNUSUAL, parent=self._log_parent)
283 self.error_count += 1
284 self.homeless_shares = list(shares_to_ask) + self.homeless_shares
285 if (self.uncontacted_peers
286 or self.contacted_peers
287 or self.contacted_peers2):
288 # there is still hope, so just loop
291 # No more peers, so this upload might fail (it depends upon
292 # whether we've hit shares_of_happiness or not). Log the last
293 # failure we got: if a coding error causes all peers to fail
294 # in the same way, this allows the common failure to be seen
295 # by the uploader and should help with debugging
296 msg = ("last failure (from %s) was: %s" % (peer, res))
297 self.last_failure_msg = msg
299 (alreadygot, allocated) = res
300 log.msg("response from peer %s: alreadygot=%s, allocated=%s"
301 % (idlib.shortnodeid_b2a(peer.peerid),
302 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
303 level=log.NOISY, parent=self._log_parent)
306 self.preexisting_shares[s] = peer.peerid
307 if s in self.homeless_shares:
308 self.homeless_shares.remove(s)
311 # the PeerTracker will remember which shares were allocated on
312 # that peer. We just have to remember to use them.
314 self.use_peers.add(peer)
317 not_yet_present = set(shares_to_ask) - set(alreadygot)
318 still_homeless = not_yet_present - set(allocated)
321 # they accepted or already had at least one share, so
322 # progress has been made
323 self.good_query_count += 1
325 self.bad_query_count += 1
328 # In networks with lots of space, this is very unusual and
329 # probably indicates an error. In networks with peers that
330 # are full, it is merely unusual. In networks that are very
331 # full, it is common, and many uploads will fail. In most
332 # cases, this is obviously not fatal, and we'll just use some
335 # some shares are still homeless, keep trying to find them a
336 # home. The ones that were rejected get first priority.
337 self.homeless_shares = (list(still_homeless)
338 + self.homeless_shares)
339 # Since they were unable to accept all of our requests, so it
340 # is safe to assume that asking them again won't help.
342 # if they *were* able to accept everything, they might be
343 # willing to accept even more.
344 put_peer_here.append(peer)
350 class EncryptAnUploadable:
351 """This is a wrapper that takes an IUploadable and provides
352 IEncryptedUploadable."""
353 implements(IEncryptedUploadable)
356 def __init__(self, original, log_parent=None):
357 self.original = IUploadable(original)
358 self._log_number = log_parent
359 self._encryptor = None
360 self._plaintext_hasher = plaintext_hasher()
361 self._plaintext_segment_hasher = None
362 self._plaintext_segment_hashes = []
363 self._encoding_parameters = None
364 self._file_size = None
365 self._ciphertext_bytes_read = 0
368 def set_upload_status(self, upload_status):
369 self._status = IUploadStatus(upload_status)
370 self.original.set_upload_status(upload_status)
372 def log(self, *args, **kwargs):
373 if "facility" not in kwargs:
374 kwargs["facility"] = "upload.encryption"
375 if "parent" not in kwargs:
376 kwargs["parent"] = self._log_number
377 return log.msg(*args, **kwargs)
380 if self._file_size is not None:
381 return defer.succeed(self._file_size)
382 d = self.original.get_size()
384 self._file_size = size
386 self._status.set_size(size)
388 d.addCallback(_got_size)
391 def get_all_encoding_parameters(self):
392 if self._encoding_parameters is not None:
393 return defer.succeed(self._encoding_parameters)
394 d = self.original.get_all_encoding_parameters()
395 def _got(encoding_parameters):
396 (k, happy, n, segsize) = encoding_parameters
397 self._segment_size = segsize # used by segment hashers
398 self._encoding_parameters = encoding_parameters
399 self.log("my encoding parameters: %s" % (encoding_parameters,),
401 return encoding_parameters
405 def _get_encryptor(self):
407 return defer.succeed(self._encryptor)
409 d = self.original.get_encryption_key()
414 storage_index = storage_index_hash(key)
415 assert isinstance(storage_index, str)
416 # There's no point to having the SI be longer than the key, so we
417 # specify that it is truncated to the same 128 bits as the AES key.
418 assert len(storage_index) == 16 # SHA-256 truncated to 128b
419 self._storage_index = storage_index
421 self._status.set_storage_index(storage_index)
426 def get_storage_index(self):
427 d = self._get_encryptor()
428 d.addCallback(lambda res: self._storage_index)
431 def _get_segment_hasher(self):
432 p = self._plaintext_segment_hasher
434 left = self._segment_size - self._plaintext_segment_hashed_bytes
436 p = plaintext_segment_hasher()
437 self._plaintext_segment_hasher = p
438 self._plaintext_segment_hashed_bytes = 0
439 return p, self._segment_size
441 def _update_segment_hash(self, chunk):
443 while offset < len(chunk):
444 p, segment_left = self._get_segment_hasher()
445 chunk_left = len(chunk) - offset
446 this_segment = min(chunk_left, segment_left)
447 p.update(chunk[offset:offset+this_segment])
448 self._plaintext_segment_hashed_bytes += this_segment
450 if self._plaintext_segment_hashed_bytes == self._segment_size:
451 # we've filled this segment
452 self._plaintext_segment_hashes.append(p.digest())
453 self._plaintext_segment_hasher = None
454 self.log("closed hash [%d]: %dB" %
455 (len(self._plaintext_segment_hashes)-1,
456 self._plaintext_segment_hashed_bytes),
458 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
459 segnum=len(self._plaintext_segment_hashes)-1,
460 hash=base32.b2a(p.digest()),
463 offset += this_segment
466 def read_encrypted(self, length, hash_only):
467 # make sure our parameters have been set up first
468 d = self.get_all_encoding_parameters()
470 d.addCallback(lambda ignored: self.get_size())
471 d.addCallback(lambda ignored: self._get_encryptor())
472 # then fetch and encrypt the plaintext. The unusual structure here
473 # (passing a Deferred *into* a function) is needed to avoid
474 # overflowing the stack: Deferreds don't optimize out tail recursion.
475 # We also pass in a list, to which _read_encrypted will append
478 d2 = defer.Deferred()
479 d.addCallback(lambda ignored:
480 self._read_encrypted(length, ciphertext, hash_only, d2))
481 d.addCallback(lambda ignored: d2)
484 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
486 fire_when_done.callback(ciphertext)
488 # tolerate large length= values without consuming a lot of RAM by
489 # reading just a chunk (say 50kB) at a time. This only really matters
490 # when hash_only==True (i.e. resuming an interrupted upload), since
491 # that's the case where we will be skipping over a lot of data.
492 size = min(remaining, self.CHUNKSIZE)
493 remaining = remaining - size
494 # read a chunk of plaintext..
495 d = defer.maybeDeferred(self.original.read, size)
496 # N.B.: if read() is synchronous, then since everything else is
497 # actually synchronous too, we'd blow the stack unless we stall for a
498 # tick. Once you accept a Deferred from IUploadable.read(), you must
499 # be prepared to have it fire immediately too.
500 d.addCallback(eventual.fireEventually)
501 def _good(plaintext):
503 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
504 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
505 ciphertext.extend(ct)
506 self._read_encrypted(remaining, ciphertext, hash_only,
509 fire_when_done.errback(why)
514 def _hash_and_encrypt_plaintext(self, data, hash_only):
515 assert isinstance(data, (tuple, list)), type(data)
518 # we use data.pop(0) instead of 'for chunk in data' to save
519 # memory: each chunk is destroyed as soon as we're done with it.
523 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
525 bytes_processed += len(chunk)
526 self._plaintext_hasher.update(chunk)
527 self._update_segment_hash(chunk)
528 # TODO: we have to encrypt the data (even if hash_only==True)
529 # because pycryptopp's AES-CTR implementation doesn't offer a
530 # way to change the counter value. Once pycryptopp acquires
531 # this ability, change this to simply update the counter
532 # before each call to (hash_only==False) _encryptor.process()
533 ciphertext = self._encryptor.process(chunk)
535 self.log(" skipping encryption", level=log.NOISY)
537 cryptdata.append(ciphertext)
540 self._ciphertext_bytes_read += bytes_processed
542 progress = float(self._ciphertext_bytes_read) / self._file_size
543 self._status.set_progress(1, progress)
547 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
548 if len(self._plaintext_segment_hashes) < num_segments:
549 # close out the last one
550 assert len(self._plaintext_segment_hashes) == num_segments-1
551 p, segment_left = self._get_segment_hasher()
552 self._plaintext_segment_hashes.append(p.digest())
553 del self._plaintext_segment_hasher
554 self.log("closing plaintext leaf hasher, hashed %d bytes" %
555 self._plaintext_segment_hashed_bytes,
557 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
558 segnum=len(self._plaintext_segment_hashes)-1,
559 hash=base32.b2a(p.digest()),
561 assert len(self._plaintext_segment_hashes) == num_segments
562 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
564 def get_plaintext_hash(self):
565 h = self._plaintext_hasher.digest()
566 return defer.succeed(h)
569 return self.original.close()
572 implements(IUploadStatus)
573 statusid_counter = itertools.count(0)
576 self.storage_index = None
579 self.status = "Not started"
580 self.progress = [0.0, 0.0, 0.0]
583 self.counter = self.statusid_counter.next()
584 self.started = time.time()
586 def get_started(self):
588 def get_storage_index(self):
589 return self.storage_index
592 def using_helper(self):
594 def get_status(self):
596 def get_progress(self):
597 return tuple(self.progress)
598 def get_active(self):
600 def get_results(self):
602 def get_counter(self):
605 def set_storage_index(self, si):
606 self.storage_index = si
607 def set_size(self, size):
609 def set_helper(self, helper):
611 def set_status(self, status):
613 def set_progress(self, which, value):
614 # [0]: chk, [1]: ciphertext, [2]: encode+push
615 self.progress[which] = value
616 def set_active(self, value):
618 def set_results(self, value):
622 peer_selector_class = Tahoe2PeerSelector
624 def __init__(self, client):
625 self._client = client
626 self._log_number = self._client.log("CHKUploader starting")
628 self._results = UploadResults()
629 self._storage_index = None
630 self._upload_status = UploadStatus()
631 self._upload_status.set_helper(False)
632 self._upload_status.set_active(True)
633 self._upload_status.set_results(self._results)
635 def log(self, *args, **kwargs):
636 if "parent" not in kwargs:
637 kwargs["parent"] = self._log_number
638 if "facility" not in kwargs:
639 kwargs["facility"] = "tahoe.upload"
640 return self._client.log(*args, **kwargs)
642 def start(self, uploadable):
643 """Start uploading the file.
645 This method returns a Deferred that will fire with the URI (a
648 self._started = time.time()
649 uploadable = IUploadable(uploadable)
650 self.log("starting upload of %s" % uploadable)
652 eu = EncryptAnUploadable(uploadable, self._log_number)
653 eu.set_upload_status(self._upload_status)
654 d = self.start_encrypted(eu)
656 d1 = uploadable.get_encryption_key()
657 d1.addCallback(lambda key: self._compute_uri(res, key))
659 d.addCallback(_uploaded)
661 self._upload_status.set_active(False)
667 """Call this is the upload must be abandoned before it completes.
668 This will tell the shareholders to delete their partial shares. I
669 return a Deferred that fires when these messages have been acked."""
670 if not self._encoder:
671 # how did you call abort() before calling start() ?
672 return defer.succeed(None)
673 return self._encoder.abort()
675 def start_encrypted(self, encrypted):
676 eu = IEncryptedUploadable(encrypted)
678 started = time.time()
679 self._encoder = e = encode.Encoder(self._log_number,
681 d = e.set_encrypted_uploadable(eu)
682 d.addCallback(self.locate_all_shareholders, started)
683 d.addCallback(self.set_shareholders, e)
684 d.addCallback(lambda res: e.start())
685 d.addCallback(self._encrypted_done)
686 # this fires with the uri_extension_hash and other data
689 def locate_all_shareholders(self, encoder, started):
690 peer_selection_started = now = time.time()
691 self._storage_index_elapsed = now - started
692 storage_index = encoder.get_param("storage_index")
693 self._storage_index = storage_index
694 upload_id = storage.si_b2a(storage_index)[:5]
695 self.log("using storage index %s" % upload_id)
696 peer_selector = self.peer_selector_class(upload_id, self._log_number,
699 share_size = encoder.get_param("share_size")
700 block_size = encoder.get_param("block_size")
701 num_segments = encoder.get_param("num_segments")
702 k,desired,n = encoder.get_param("share_counts")
704 self._peer_selection_started = time.time()
705 d = peer_selector.get_shareholders(self._client, storage_index,
706 share_size, block_size,
707 num_segments, n, desired)
709 self._peer_selection_elapsed = time.time() - peer_selection_started
714 def set_shareholders(self, (used_peers, already_peers), encoder):
716 @param used_peers: a sequence of PeerTracker objects
717 @paran already_peers: a dict mapping sharenum to a peerid that
718 claims to already have this share
720 self.log("_send_shares, used_peers is %s" % (used_peers,))
721 # record already-present shares in self._results
722 for (shnum, peerid) in already_peers.items():
723 peerid_s = idlib.shortnodeid_b2a(peerid)
724 self._results.sharemap[shnum] = "Found on [%s]" % peerid_s
725 if peerid not in self._results.servermap:
726 self._results.servermap[peerid] = set()
727 self._results.servermap[peerid].add(shnum)
728 self._results.preexisting_shares = len(already_peers)
731 for peer in used_peers:
732 assert isinstance(peer, PeerTracker)
734 for peer in used_peers:
735 buckets.update(peer.buckets)
736 for shnum in peer.buckets:
737 self._sharemap[shnum] = peer
738 assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
739 encoder.set_shareholders(buckets)
741 def _encrypted_done(self, res):
743 for shnum in self._encoder.get_shares_placed():
744 peer_tracker = self._sharemap[shnum]
745 peerid = peer_tracker.peerid
746 peerid_s = idlib.shortnodeid_b2a(peerid)
747 r.sharemap[shnum] = "Placed on [%s]" % peerid_s
748 if peerid not in r.servermap:
749 r.servermap[peerid] = set()
750 r.servermap[peerid].add(shnum)
751 r.pushed_shares = len(self._encoder.get_shares_placed())
753 r.file_size = self._encoder.file_size
754 r.timings["total"] = now - self._started
755 r.timings["storage_index"] = self._storage_index_elapsed
756 r.timings["peer_selection"] = self._peer_selection_elapsed
757 r.timings.update(self._encoder.get_times())
758 r.uri_extension_data = self._encoder.get_uri_extension_data()
761 def _compute_uri(self, (uri_extension_hash,
762 needed_shares, total_shares, size),
764 u = uri.CHKFileURI(key=key,
765 uri_extension_hash=uri_extension_hash,
766 needed_shares=needed_shares,
767 total_shares=total_shares,
771 r.uri = u.to_string()
774 def get_upload_status(self):
775 return self._upload_status
777 def read_this_many_bytes(uploadable, size, prepend_data=[]):
779 return defer.succeed([])
780 d = uploadable.read(size)
782 assert isinstance(data, list)
783 bytes = sum([len(piece) for piece in data])
786 remaining = size - bytes
788 return read_this_many_bytes(uploadable, remaining,
790 return prepend_data + data
794 class LiteralUploader:
796 def __init__(self, client):
797 self._client = client
798 self._results = UploadResults()
799 self._status = s = UploadStatus()
800 s.set_storage_index(None)
802 s.set_progress(0, 1.0)
804 s.set_results(self._results)
806 def start(self, uploadable):
807 uploadable = IUploadable(uploadable)
808 d = uploadable.get_size()
811 self._status.set_size(size)
812 self._results.file_size = size
813 return read_this_many_bytes(uploadable, size)
814 d.addCallback(_got_size)
815 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
816 d.addCallback(lambda u: u.to_string())
817 d.addCallback(self._build_results)
820 def _build_results(self, uri):
821 self._results.uri = uri
822 self._status.set_status("Done")
823 self._status.set_progress(1, 1.0)
824 self._status.set_progress(2, 1.0)
830 def get_upload_status(self):
833 class RemoteEncryptedUploadable(Referenceable):
834 implements(RIEncryptedUploadable)
836 def __init__(self, encrypted_uploadable, upload_status):
837 self._eu = IEncryptedUploadable(encrypted_uploadable)
840 self._status = IUploadStatus(upload_status)
841 # we are responsible for updating the status string while we run, and
842 # for setting the ciphertext-fetch progress.
846 if self._size is not None:
847 return defer.succeed(self._size)
848 d = self._eu.get_size()
852 d.addCallback(_got_size)
855 def remote_get_size(self):
856 return self.get_size()
857 def remote_get_all_encoding_parameters(self):
858 return self._eu.get_all_encoding_parameters()
860 def _read_encrypted(self, length, hash_only):
861 d = self._eu.read_encrypted(length, hash_only)
864 self._offset += length
866 size = sum([len(data) for data in strings])
872 def remote_read_encrypted(self, offset, length):
873 # we don't support seek backwards, but we allow skipping forwards
874 precondition(offset >= 0, offset)
875 precondition(length >= 0, length)
876 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
878 precondition(offset >= self._offset, offset, self._offset)
879 if offset > self._offset:
880 # read the data from disk anyways, to build up the hash tree
881 skip = offset - self._offset
882 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
883 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
884 d = self._read_encrypted(skip, hash_only=True)
886 d = defer.succeed(None)
888 def _at_correct_offset(res):
889 assert offset == self._offset, "%d != %d" % (offset, self._offset)
890 return self._read_encrypted(length, hash_only=False)
891 d.addCallback(_at_correct_offset)
894 size = sum([len(data) for data in strings])
895 self._bytes_sent += size
900 def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
901 log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
902 (first, last-1, num_segments),
904 d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
907 def remote_get_plaintext_hash(self):
908 return self._eu.get_plaintext_hash()
909 def remote_close(self):
910 return self._eu.close()
913 class AssistedUploader:
915 def __init__(self, helper):
916 self._helper = helper
917 self._log_number = log.msg("AssistedUploader starting")
918 self._storage_index = None
919 self._upload_status = s = UploadStatus()
923 def log(self, *args, **kwargs):
924 if "parent" not in kwargs:
925 kwargs["parent"] = self._log_number
926 return log.msg(*args, **kwargs)
928 def start(self, uploadable):
929 self._started = time.time()
930 u = IUploadable(uploadable)
931 eu = EncryptAnUploadable(u, self._log_number)
932 eu.set_upload_status(self._upload_status)
933 self._encuploadable = eu
935 d.addCallback(self._got_size)
936 d.addCallback(lambda res: eu.get_all_encoding_parameters())
937 d.addCallback(self._got_all_encoding_parameters)
938 # when we get the encryption key, that will also compute the storage
939 # index, so this only takes one pass.
940 # TODO: I'm not sure it's cool to switch back and forth between
941 # the Uploadable and the IEncryptedUploadable that wraps it.
942 d.addCallback(lambda res: u.get_encryption_key())
943 d.addCallback(self._got_encryption_key)
944 d.addCallback(lambda res: eu.get_storage_index())
945 d.addCallback(self._got_storage_index)
946 d.addCallback(self._contact_helper)
947 d.addCallback(self._build_readcap)
949 self._upload_status.set_active(False)
954 def _got_size(self, size):
956 self._upload_status.set_size(size)
958 def _got_all_encoding_parameters(self, params):
959 k, happy, n, segment_size = params
960 # stash these for URI generation later
961 self._needed_shares = k
962 self._total_shares = n
963 self._segment_size = segment_size
965 def _got_encryption_key(self, key):
968 def _got_storage_index(self, storage_index):
969 self._storage_index = storage_index
972 def _contact_helper(self, res):
973 now = self._time_contacting_helper_start = time.time()
974 self._storage_index_elapsed = now - self._started
975 self.log(format="contacting helper for SI %(si)s..",
976 si=storage.si_b2a(self._storage_index))
977 self._upload_status.set_status("Contacting Helper")
978 d = self._helper.callRemote("upload_chk", self._storage_index)
979 d.addCallback(self._contacted_helper)
982 def _contacted_helper(self, (upload_results, upload_helper)):
984 elapsed = now - self._time_contacting_helper_start
985 self._elapsed_time_contacting_helper = elapsed
987 self.log("helper says we need to upload")
988 self._upload_status.set_status("Uploading Ciphertext")
989 # we need to upload the file
990 reu = RemoteEncryptedUploadable(self._encuploadable,
992 # let it pre-compute the size for progress purposes
994 d.addCallback(lambda ignored:
995 upload_helper.callRemote("upload", reu))
996 # this Deferred will fire with the upload results
998 self.log("helper says file is already uploaded")
999 self._upload_status.set_progress(1, 1.0)
1000 self._upload_status.set_results(upload_results)
1001 return upload_results
1003 def _build_readcap(self, upload_results):
1004 self.log("upload finished, building readcap")
1005 self._upload_status.set_status("Building Readcap")
1007 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1008 assert r.uri_extension_data["total_shares"] == self._total_shares
1009 assert r.uri_extension_data["segment_size"] == self._segment_size
1010 assert r.uri_extension_data["size"] == self._size
1011 u = uri.CHKFileURI(key=self._key,
1012 uri_extension_hash=r.uri_extension_hash,
1013 needed_shares=self._needed_shares,
1014 total_shares=self._total_shares,
1017 r.uri = u.to_string()
1019 r.file_size = self._size
1020 r.timings["storage_index"] = self._storage_index_elapsed
1021 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1022 if "total" in r.timings:
1023 r.timings["helper_total"] = r.timings["total"]
1024 r.timings["total"] = now - self._started
1025 self._upload_status.set_status("Done")
1026 self._upload_status.set_results(r)
1029 def get_upload_status(self):
1030 return self._upload_status
1032 class BaseUploadable:
1033 default_max_segment_size = 128*KiB # overridden by max_segment_size
1034 default_encoding_param_k = 3 # overridden by encoding_parameters
1035 default_encoding_param_happy = 7
1036 default_encoding_param_n = 10
1038 max_segment_size = None
1039 encoding_param_k = None
1040 encoding_param_happy = None
1041 encoding_param_n = None
1043 _all_encoding_parameters = None
1046 def set_upload_status(self, upload_status):
1047 self._status = IUploadStatus(upload_status)
1049 def set_default_encoding_parameters(self, default_params):
1050 assert isinstance(default_params, dict)
1051 for k,v in default_params.items():
1052 precondition(isinstance(k, str), k, v)
1053 precondition(isinstance(v, int), k, v)
1054 if "k" in default_params:
1055 self.default_encoding_param_k = default_params["k"]
1056 if "happy" in default_params:
1057 self.default_encoding_param_happy = default_params["happy"]
1058 if "n" in default_params:
1059 self.default_encoding_param_n = default_params["n"]
1060 if "max_segment_size" in default_params:
1061 self.default_max_segment_size = default_params["max_segment_size"]
1063 def get_all_encoding_parameters(self):
1064 if self._all_encoding_parameters:
1065 return defer.succeed(self._all_encoding_parameters)
1067 max_segsize = self.max_segment_size or self.default_max_segment_size
1068 k = self.encoding_param_k or self.default_encoding_param_k
1069 happy = self.encoding_param_happy or self.default_encoding_param_happy
1070 n = self.encoding_param_n or self.default_encoding_param_n
1073 def _got_size(file_size):
1074 # for small files, shrink the segment size to avoid wasting space
1075 segsize = min(max_segsize, file_size)
1076 # this must be a multiple of 'required_shares'==k
1077 segsize = mathutil.next_multiple(segsize, k)
1078 encoding_parameters = (k, happy, n, segsize)
1079 self._all_encoding_parameters = encoding_parameters
1080 return encoding_parameters
1081 d.addCallback(_got_size)
1084 class FileHandle(BaseUploadable):
1085 implements(IUploadable)
1087 def __init__(self, filehandle, contenthashkey=True):
1088 self._filehandle = filehandle
1090 self._contenthashkey = contenthashkey
1093 def _get_encryption_key_content_hash(self):
1094 if self._key is not None:
1095 return defer.succeed(self._key)
1098 # that sets self._size as a side-effect
1099 d.addCallback(lambda size: self.get_all_encoding_parameters())
1101 k, happy, n, segsize = params
1102 f = self._filehandle
1103 enckey_hasher = content_hash_key_hasher(k, n, segsize)
1108 data = f.read(BLOCKSIZE)
1111 enckey_hasher.update(data)
1112 # TODO: setting progress in a non-yielding loop is kind of
1113 # pointless, but I'm anticipating (perhaps prematurely) the
1114 # day when we use a slowjob or twisted's CooperatorService to
1115 # make this yield time to other jobs.
1116 bytes_read += len(data)
1118 self._status.set_progress(0, float(bytes_read)/self._size)
1120 self._key = enckey_hasher.digest()
1122 self._status.set_progress(0, 1.0)
1123 assert len(self._key) == 16
1128 def _get_encryption_key_random(self):
1129 if self._key is None:
1130 self._key = os.urandom(16)
1131 return defer.succeed(self._key)
1133 def get_encryption_key(self):
1134 if self._contenthashkey:
1135 return self._get_encryption_key_content_hash()
1137 return self._get_encryption_key_random()
1140 if self._size is not None:
1141 return defer.succeed(self._size)
1142 self._filehandle.seek(0,2)
1143 size = self._filehandle.tell()
1145 self._filehandle.seek(0)
1146 return defer.succeed(size)
1148 def read(self, length):
1149 return defer.succeed([self._filehandle.read(length)])
1152 # the originator of the filehandle reserves the right to close it
1155 class FileName(FileHandle):
1156 def __init__(self, filename, contenthashkey=True):
1157 FileHandle.__init__(self, open(filename, "rb"), contenthashkey=contenthashkey)
1159 FileHandle.close(self)
1160 self._filehandle.close()
1162 class Data(FileHandle):
1163 def __init__(self, data, contenthashkey=True):
1164 FileHandle.__init__(self, StringIO(data), contenthashkey=contenthashkey)
1166 class Uploader(service.MultiService):
1167 """I am a service that allows file uploading. I am a service-child of the
1170 implements(IUploader)
1172 uploader_class = CHKUploader
1173 URI_LIT_SIZE_THRESHOLD = 55
1174 MAX_UPLOAD_STATUSES = 10
1176 def __init__(self, helper_furl=None):
1177 self._helper_furl = helper_furl
1179 self._all_uploads = weakref.WeakKeyDictionary()
1180 self._recent_upload_status = []
1181 service.MultiService.__init__(self)
1183 def startService(self):
1184 service.MultiService.startService(self)
1185 if self._helper_furl:
1186 self.parent.tub.connectTo(self._helper_furl,
1189 def _got_helper(self, helper):
1190 self._helper = helper
1191 helper.notifyOnDisconnect(self._lost_helper)
1192 def _lost_helper(self):
1195 def get_helper_info(self):
1196 # return a tuple of (helper_furl_or_None, connected_bool)
1197 return (self._helper_furl, bool(self._helper))
1199 def upload(self, uploadable):
1200 # this returns the URI
1204 uploadable = IUploadable(uploadable)
1205 d = uploadable.get_size()
1206 def _got_size(size):
1207 default_params = self.parent.get_encoding_parameters()
1208 precondition(isinstance(default_params, dict), default_params)
1209 precondition("max_segment_size" in default_params, default_params)
1210 uploadable.set_default_encoding_parameters(default_params)
1211 if size <= self.URI_LIT_SIZE_THRESHOLD:
1212 uploader = LiteralUploader(self.parent)
1214 uploader = AssistedUploader(self._helper)
1216 uploader = self.uploader_class(self.parent)
1217 self._all_uploads[uploader] = None
1218 self._recent_upload_status.append(uploader.get_upload_status())
1219 while len(self._recent_upload_status) > self.MAX_UPLOAD_STATUSES:
1220 self._recent_upload_status.pop(0)
1221 return uploader.start(uploadable)
1222 d.addCallback(_got_size)
1229 def list_all_uploads(self):
1230 return self._all_uploads.keys()
1231 def list_active_uploads(self):
1232 return [u.get_upload_status() for u in self._all_uploads.keys()
1233 if u.get_upload_status().get_active()]
1234 def list_recent_uploads(self):
1235 return self._recent_upload_status