1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap import Referenceable, Copyable, RemoteCopy
7 from foolscap import eventual
8 from foolscap.logging import log
10 from allmydata.util.hashutil import file_renewal_secret_hash, \
11 file_cancel_secret_hash, bucket_renewal_secret_hash, \
12 bucket_cancel_secret_hash, plaintext_hasher, \
13 storage_index_hash, plaintext_segment_hasher, convergence_hasher
14 from allmydata import storage, hashtree, uri
15 from allmydata.immutable import encode
16 from allmydata.util import base32, idlib, mathutil
17 from allmydata.util.assertutil import precondition
18 from allmydata.util.rrefutil import get_versioned_remote_reference
19 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
20 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
21 NotEnoughSharesError, InsufficientVersionError
22 from allmydata.immutable import layout
23 from pycryptopp.cipher.aes import AES
25 from cStringIO import StringIO
34 class HaveAllPeersError(Exception):
35 # we use this to jump out of the loop
38 # this wants to live in storage, not here
39 class TooFullError(Exception):
42 class UploadResults(Copyable, RemoteCopy):
43 implements(IUploadResults)
44 # note: don't change this string, it needs to match the value used on the
45 # helper, and it does *not* need to match the fully-qualified
46 # package/module/class name
47 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
51 self.timings = {} # dict of name to number of seconds
52 self.sharemap = {} # dict of shnum to placement string
53 self.servermap = {} # dict of peerid to set(shnums)
55 self.ciphertext_fetched = None # how much the helper fetched
57 self.preexisting_shares = None # count of shares already present
58 self.pushed_shares = None # count of shares we pushed
61 # our current uri_extension is 846 bytes for small files, a few bytes
62 # more for larger ones (since the filesize is encoded in decimal in a
63 # few places). Ask for a little bit more just in case we need it. If
64 # the extension changes size, we can change EXTENSION_SIZE to
65 # allocate a more accurate amount of space.
67 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
71 def __init__(self, peerid, storage_server,
72 sharesize, blocksize, num_segments, num_share_hashes,
74 bucket_renewal_secret, bucket_cancel_secret):
75 precondition(isinstance(peerid, str), peerid)
76 precondition(len(peerid) == 20, peerid)
78 self._storageserver = storage_server # to an RIStorageServer
79 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
80 self.sharesize = sharesize
81 self.allocated_size = layout.allocated_size(sharesize,
86 self.blocksize = blocksize
87 self.num_segments = num_segments
88 self.num_share_hashes = num_share_hashes
89 self.storage_index = storage_index
91 self.renew_secret = bucket_renewal_secret
92 self.cancel_secret = bucket_cancel_secret
95 return ("<PeerTracker for peer %s and SI %s>"
96 % (idlib.shortnodeid_b2a(self.peerid),
97 storage.si_b2a(self.storage_index)[:5]))
99 def query(self, sharenums):
100 d = self._storageserver.callRemote("allocate_buckets",
106 canary=Referenceable())
107 d.addCallback(self._got_reply)
110 def _got_reply(self, (alreadygot, buckets)):
111 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
113 for sharenum, rref in buckets.iteritems():
114 bp = layout.WriteBucketProxy(rref, self.sharesize,
117 self.num_share_hashes,
121 self.buckets.update(b)
122 return (alreadygot, set(b.keys()))
124 class Tahoe2PeerSelector:
126 def __init__(self, upload_id, logparent=None, upload_status=None):
127 self.upload_id = upload_id
128 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
130 self.num_peers_contacted = 0
131 self.last_failure_msg = None
132 self._status = IUploadStatus(upload_status)
133 self._log_parent = log.msg("%s starting" % self, parent=logparent)
136 return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
138 def get_shareholders(self, client,
139 storage_index, share_size, block_size,
140 num_segments, total_shares, shares_of_happiness):
142 @return: (used_peers, already_peers), where used_peers is a set of
143 PeerTracker instances that have agreed to hold some shares
144 for us (the shnum is stashed inside the PeerTracker),
145 and already_peers is a dict mapping shnum to a peer
146 which claims to already have the share.
150 self._status.set_status("Contacting Peers..")
152 self.total_shares = total_shares
153 self.shares_of_happiness = shares_of_happiness
155 self.homeless_shares = range(total_shares)
156 # self.uncontacted_peers = list() # peers we haven't asked yet
157 self.contacted_peers = [] # peers worth asking again
158 self.contacted_peers2 = [] # peers that we have asked again
159 self._started_second_pass = False
160 self.use_peers = set() # PeerTrackers that have shares assigned to them
161 self.preexisting_shares = {} # sharenum -> peerid holding the share
163 peers = client.get_permuted_peers("storage", storage_index)
165 raise NotEnoughSharesError("client gave us zero peers")
167 # this needed_hashes computation should mirror
168 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
169 # (instead of a HashTree) because we don't require actual hashing
170 # just to count the levels.
171 ht = hashtree.IncompleteHashTree(total_shares)
172 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
174 # figure out how much space to ask for
175 allocated_size = layout.allocated_size(share_size,
179 # filter the list of peers according to which ones can accomodate
180 # this request. This excludes older peers (which used a 4-byte size
181 # field) from getting large shares (for files larger than about
182 # 12GiB). See #439 for details.
183 def _get_maxsize(peer):
184 (peerid, conn) = peer
185 v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
186 return v1["maximum-immutable-share-size"]
187 peers = [peer for peer in peers
188 if _get_maxsize(peer) >= allocated_size]
190 raise NotEnoughSharesError("no peers could accept an allocated_size of %d" % allocated_size)
192 # decide upon the renewal/cancel secrets, to include them in the
193 # allocat_buckets query.
194 client_renewal_secret = client.get_renewal_secret()
195 client_cancel_secret = client.get_cancel_secret()
197 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
199 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
202 trackers = [ PeerTracker(peerid, conn,
203 share_size, block_size,
204 num_segments, num_share_hashes,
206 bucket_renewal_secret_hash(file_renewal_secret,
208 bucket_cancel_secret_hash(file_cancel_secret,
211 for (peerid, conn) in peers ]
212 self.uncontacted_peers = trackers
214 d = defer.maybeDeferred(self._loop)
218 if not self.homeless_shares:
220 msg = ("placed all %d shares, "
221 "sent %d queries to %d peers, "
222 "%d queries placed some shares, %d placed none, "
225 self.query_count, self.num_peers_contacted,
226 self.good_query_count, self.bad_query_count,
228 log.msg("peer selection successful for %s: %s" % (self, msg),
229 parent=self._log_parent)
230 return (self.use_peers, self.preexisting_shares)
232 if self.uncontacted_peers:
233 peer = self.uncontacted_peers.pop(0)
234 # TODO: don't pre-convert all peerids to PeerTrackers
235 assert isinstance(peer, PeerTracker)
237 shares_to_ask = set([self.homeless_shares.pop(0)])
238 self.query_count += 1
239 self.num_peers_contacted += 1
241 self._status.set_status("Contacting Peers [%s] (first query),"
243 % (idlib.shortnodeid_b2a(peer.peerid),
244 len(self.homeless_shares)))
245 d = peer.query(shares_to_ask)
246 d.addBoth(self._got_response, peer, shares_to_ask,
247 self.contacted_peers)
249 elif self.contacted_peers:
250 # ask a peer that we've already asked.
251 if not self._started_second_pass:
252 log.msg("starting second pass", parent=self._log_parent,
254 self._started_second_pass = True
255 num_shares = mathutil.div_ceil(len(self.homeless_shares),
256 len(self.contacted_peers))
257 peer = self.contacted_peers.pop(0)
258 shares_to_ask = set(self.homeless_shares[:num_shares])
259 self.homeless_shares[:num_shares] = []
260 self.query_count += 1
262 self._status.set_status("Contacting Peers [%s] (second query),"
264 % (idlib.shortnodeid_b2a(peer.peerid),
265 len(self.homeless_shares)))
266 d = peer.query(shares_to_ask)
267 d.addBoth(self._got_response, peer, shares_to_ask,
268 self.contacted_peers2)
270 elif self.contacted_peers2:
271 # we've finished the second-or-later pass. Move all the remaining
272 # peers back into self.contacted_peers for the next pass.
273 self.contacted_peers.extend(self.contacted_peers2)
274 self.contacted_peers[:] = []
277 # no more peers. If we haven't placed enough shares, we fail.
278 placed_shares = self.total_shares - len(self.homeless_shares)
279 if placed_shares < self.shares_of_happiness:
280 msg = ("placed %d shares out of %d total (%d homeless), "
281 "sent %d queries to %d peers, "
282 "%d queries placed some shares, %d placed none, "
284 (self.total_shares - len(self.homeless_shares),
285 self.total_shares, len(self.homeless_shares),
286 self.query_count, self.num_peers_contacted,
287 self.good_query_count, self.bad_query_count,
289 msg = "peer selection failed for %s: %s" % (self, msg)
290 if self.last_failure_msg:
291 msg += " (%s)" % (self.last_failure_msg,)
292 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
293 raise NotEnoughSharesError(msg)
295 # we placed enough to be happy, so we're done
297 self._status.set_status("Placed all shares")
298 return self.use_peers
300 def _got_response(self, res, peer, shares_to_ask, put_peer_here):
301 if isinstance(res, failure.Failure):
302 # This is unusual, and probably indicates a bug or a network
304 log.msg("%s got error during peer selection: %s" % (peer, res),
305 level=log.UNUSUAL, parent=self._log_parent)
306 self.error_count += 1
307 self.homeless_shares = list(shares_to_ask) + self.homeless_shares
308 if (self.uncontacted_peers
309 or self.contacted_peers
310 or self.contacted_peers2):
311 # there is still hope, so just loop
314 # No more peers, so this upload might fail (it depends upon
315 # whether we've hit shares_of_happiness or not). Log the last
316 # failure we got: if a coding error causes all peers to fail
317 # in the same way, this allows the common failure to be seen
318 # by the uploader and should help with debugging
319 msg = ("last failure (from %s) was: %s" % (peer, res))
320 self.last_failure_msg = msg
322 (alreadygot, allocated) = res
323 log.msg("response from peer %s: alreadygot=%s, allocated=%s"
324 % (idlib.shortnodeid_b2a(peer.peerid),
325 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
326 level=log.NOISY, parent=self._log_parent)
329 self.preexisting_shares[s] = peer.peerid
330 if s in self.homeless_shares:
331 self.homeless_shares.remove(s)
334 # the PeerTracker will remember which shares were allocated on
335 # that peer. We just have to remember to use them.
337 self.use_peers.add(peer)
340 not_yet_present = set(shares_to_ask) - set(alreadygot)
341 still_homeless = not_yet_present - set(allocated)
344 # they accepted or already had at least one share, so
345 # progress has been made
346 self.good_query_count += 1
348 self.bad_query_count += 1
351 # In networks with lots of space, this is very unusual and
352 # probably indicates an error. In networks with peers that
353 # are full, it is merely unusual. In networks that are very
354 # full, it is common, and many uploads will fail. In most
355 # cases, this is obviously not fatal, and we'll just use some
358 # some shares are still homeless, keep trying to find them a
359 # home. The ones that were rejected get first priority.
360 self.homeless_shares = (list(still_homeless)
361 + self.homeless_shares)
362 # Since they were unable to accept all of our requests, so it
363 # is safe to assume that asking them again won't help.
365 # if they *were* able to accept everything, they might be
366 # willing to accept even more.
367 put_peer_here.append(peer)
373 class EncryptAnUploadable:
374 """This is a wrapper that takes an IUploadable and provides
375 IEncryptedUploadable."""
376 implements(IEncryptedUploadable)
379 def __init__(self, original, log_parent=None):
380 self.original = IUploadable(original)
381 self._log_number = log_parent
382 self._encryptor = None
383 self._plaintext_hasher = plaintext_hasher()
384 self._plaintext_segment_hasher = None
385 self._plaintext_segment_hashes = []
386 self._encoding_parameters = None
387 self._file_size = None
388 self._ciphertext_bytes_read = 0
391 def set_upload_status(self, upload_status):
392 self._status = IUploadStatus(upload_status)
393 self.original.set_upload_status(upload_status)
395 def log(self, *args, **kwargs):
396 if "facility" not in kwargs:
397 kwargs["facility"] = "upload.encryption"
398 if "parent" not in kwargs:
399 kwargs["parent"] = self._log_number
400 return log.msg(*args, **kwargs)
403 if self._file_size is not None:
404 return defer.succeed(self._file_size)
405 d = self.original.get_size()
407 self._file_size = size
409 self._status.set_size(size)
411 d.addCallback(_got_size)
414 def get_all_encoding_parameters(self):
415 if self._encoding_parameters is not None:
416 return defer.succeed(self._encoding_parameters)
417 d = self.original.get_all_encoding_parameters()
418 def _got(encoding_parameters):
419 (k, happy, n, segsize) = encoding_parameters
420 self._segment_size = segsize # used by segment hashers
421 self._encoding_parameters = encoding_parameters
422 self.log("my encoding parameters: %s" % (encoding_parameters,),
424 return encoding_parameters
428 def _get_encryptor(self):
430 return defer.succeed(self._encryptor)
432 d = self.original.get_encryption_key()
437 storage_index = storage_index_hash(key)
438 assert isinstance(storage_index, str)
439 # There's no point to having the SI be longer than the key, so we
440 # specify that it is truncated to the same 128 bits as the AES key.
441 assert len(storage_index) == 16 # SHA-256 truncated to 128b
442 self._storage_index = storage_index
444 self._status.set_storage_index(storage_index)
449 def get_storage_index(self):
450 d = self._get_encryptor()
451 d.addCallback(lambda res: self._storage_index)
454 def _get_segment_hasher(self):
455 p = self._plaintext_segment_hasher
457 left = self._segment_size - self._plaintext_segment_hashed_bytes
459 p = plaintext_segment_hasher()
460 self._plaintext_segment_hasher = p
461 self._plaintext_segment_hashed_bytes = 0
462 return p, self._segment_size
464 def _update_segment_hash(self, chunk):
466 while offset < len(chunk):
467 p, segment_left = self._get_segment_hasher()
468 chunk_left = len(chunk) - offset
469 this_segment = min(chunk_left, segment_left)
470 p.update(chunk[offset:offset+this_segment])
471 self._plaintext_segment_hashed_bytes += this_segment
473 if self._plaintext_segment_hashed_bytes == self._segment_size:
474 # we've filled this segment
475 self._plaintext_segment_hashes.append(p.digest())
476 self._plaintext_segment_hasher = None
477 self.log("closed hash [%d]: %dB" %
478 (len(self._plaintext_segment_hashes)-1,
479 self._plaintext_segment_hashed_bytes),
481 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
482 segnum=len(self._plaintext_segment_hashes)-1,
483 hash=base32.b2a(p.digest()),
486 offset += this_segment
489 def read_encrypted(self, length, hash_only):
490 # make sure our parameters have been set up first
491 d = self.get_all_encoding_parameters()
493 d.addCallback(lambda ignored: self.get_size())
494 d.addCallback(lambda ignored: self._get_encryptor())
495 # then fetch and encrypt the plaintext. The unusual structure here
496 # (passing a Deferred *into* a function) is needed to avoid
497 # overflowing the stack: Deferreds don't optimize out tail recursion.
498 # We also pass in a list, to which _read_encrypted will append
501 d2 = defer.Deferred()
502 d.addCallback(lambda ignored:
503 self._read_encrypted(length, ciphertext, hash_only, d2))
504 d.addCallback(lambda ignored: d2)
507 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
509 fire_when_done.callback(ciphertext)
511 # tolerate large length= values without consuming a lot of RAM by
512 # reading just a chunk (say 50kB) at a time. This only really matters
513 # when hash_only==True (i.e. resuming an interrupted upload), since
514 # that's the case where we will be skipping over a lot of data.
515 size = min(remaining, self.CHUNKSIZE)
516 remaining = remaining - size
517 # read a chunk of plaintext..
518 d = defer.maybeDeferred(self.original.read, size)
519 # N.B.: if read() is synchronous, then since everything else is
520 # actually synchronous too, we'd blow the stack unless we stall for a
521 # tick. Once you accept a Deferred from IUploadable.read(), you must
522 # be prepared to have it fire immediately too.
523 d.addCallback(eventual.fireEventually)
524 def _good(plaintext):
526 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
527 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
528 ciphertext.extend(ct)
529 self._read_encrypted(remaining, ciphertext, hash_only,
532 fire_when_done.errback(why)
537 def _hash_and_encrypt_plaintext(self, data, hash_only):
538 assert isinstance(data, (tuple, list)), type(data)
541 # we use data.pop(0) instead of 'for chunk in data' to save
542 # memory: each chunk is destroyed as soon as we're done with it.
546 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
548 bytes_processed += len(chunk)
549 self._plaintext_hasher.update(chunk)
550 self._update_segment_hash(chunk)
551 # TODO: we have to encrypt the data (even if hash_only==True)
552 # because pycryptopp's AES-CTR implementation doesn't offer a
553 # way to change the counter value. Once pycryptopp acquires
554 # this ability, change this to simply update the counter
555 # before each call to (hash_only==False) _encryptor.process()
556 ciphertext = self._encryptor.process(chunk)
558 self.log(" skipping encryption", level=log.NOISY)
560 cryptdata.append(ciphertext)
563 self._ciphertext_bytes_read += bytes_processed
565 progress = float(self._ciphertext_bytes_read) / self._file_size
566 self._status.set_progress(1, progress)
570 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
571 if len(self._plaintext_segment_hashes) < num_segments:
572 # close out the last one
573 assert len(self._plaintext_segment_hashes) == num_segments-1
574 p, segment_left = self._get_segment_hasher()
575 self._plaintext_segment_hashes.append(p.digest())
576 del self._plaintext_segment_hasher
577 self.log("closing plaintext leaf hasher, hashed %d bytes" %
578 self._plaintext_segment_hashed_bytes,
580 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
581 segnum=len(self._plaintext_segment_hashes)-1,
582 hash=base32.b2a(p.digest()),
584 assert len(self._plaintext_segment_hashes) == num_segments
585 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
587 def get_plaintext_hash(self):
588 h = self._plaintext_hasher.digest()
589 return defer.succeed(h)
592 return self.original.close()
595 implements(IUploadStatus)
596 statusid_counter = itertools.count(0)
599 self.storage_index = None
602 self.status = "Not started"
603 self.progress = [0.0, 0.0, 0.0]
606 self.counter = self.statusid_counter.next()
607 self.started = time.time()
609 def get_started(self):
611 def get_storage_index(self):
612 return self.storage_index
615 def using_helper(self):
617 def get_status(self):
619 def get_progress(self):
620 return tuple(self.progress)
621 def get_active(self):
623 def get_results(self):
625 def get_counter(self):
628 def set_storage_index(self, si):
629 self.storage_index = si
630 def set_size(self, size):
632 def set_helper(self, helper):
634 def set_status(self, status):
636 def set_progress(self, which, value):
637 # [0]: chk, [1]: ciphertext, [2]: encode+push
638 self.progress[which] = value
639 def set_active(self, value):
641 def set_results(self, value):
645 peer_selector_class = Tahoe2PeerSelector
647 def __init__(self, client):
648 self._client = client
649 self._log_number = self._client.log("CHKUploader starting")
651 self._results = UploadResults()
652 self._storage_index = None
653 self._upload_status = UploadStatus()
654 self._upload_status.set_helper(False)
655 self._upload_status.set_active(True)
656 self._upload_status.set_results(self._results)
658 def log(self, *args, **kwargs):
659 if "parent" not in kwargs:
660 kwargs["parent"] = self._log_number
661 if "facility" not in kwargs:
662 kwargs["facility"] = "tahoe.upload"
663 return self._client.log(*args, **kwargs)
665 def start(self, uploadable):
666 """Start uploading the file.
668 This method returns a Deferred that will fire with the URI (a
671 self._started = time.time()
672 uploadable = IUploadable(uploadable)
673 self.log("starting upload of %s" % uploadable)
675 eu = EncryptAnUploadable(uploadable, self._log_number)
676 eu.set_upload_status(self._upload_status)
677 d = self.start_encrypted(eu)
679 d1 = uploadable.get_encryption_key()
680 d1.addCallback(lambda key: self._compute_uri(res, key))
682 d.addCallback(_uploaded)
684 self._upload_status.set_active(False)
690 """Call this is the upload must be abandoned before it completes.
691 This will tell the shareholders to delete their partial shares. I
692 return a Deferred that fires when these messages have been acked."""
693 if not self._encoder:
694 # how did you call abort() before calling start() ?
695 return defer.succeed(None)
696 return self._encoder.abort()
698 def start_encrypted(self, encrypted):
699 eu = IEncryptedUploadable(encrypted)
701 started = time.time()
702 self._encoder = e = encode.Encoder(self._log_number,
704 d = e.set_encrypted_uploadable(eu)
705 d.addCallback(self.locate_all_shareholders, started)
706 d.addCallback(self.set_shareholders, e)
707 d.addCallback(lambda res: e.start())
708 d.addCallback(self._encrypted_done)
709 # this fires with the uri_extension_hash and other data
712 def locate_all_shareholders(self, encoder, started):
713 peer_selection_started = now = time.time()
714 self._storage_index_elapsed = now - started
715 storage_index = encoder.get_param("storage_index")
716 self._storage_index = storage_index
717 upload_id = storage.si_b2a(storage_index)[:5]
718 self.log("using storage index %s" % upload_id)
719 peer_selector = self.peer_selector_class(upload_id, self._log_number,
722 share_size = encoder.get_param("share_size")
723 block_size = encoder.get_param("block_size")
724 num_segments = encoder.get_param("num_segments")
725 k,desired,n = encoder.get_param("share_counts")
727 self._peer_selection_started = time.time()
728 d = peer_selector.get_shareholders(self._client, storage_index,
729 share_size, block_size,
730 num_segments, n, desired)
732 self._peer_selection_elapsed = time.time() - peer_selection_started
737 def set_shareholders(self, (used_peers, already_peers), encoder):
739 @param used_peers: a sequence of PeerTracker objects
740 @paran already_peers: a dict mapping sharenum to a peerid that
741 claims to already have this share
743 self.log("_send_shares, used_peers is %s" % (used_peers,))
744 # record already-present shares in self._results
745 for (shnum, peerid) in already_peers.items():
746 peerid_s = idlib.shortnodeid_b2a(peerid)
747 self._results.sharemap[shnum] = "Found on [%s]" % peerid_s
748 if peerid not in self._results.servermap:
749 self._results.servermap[peerid] = set()
750 self._results.servermap[peerid].add(shnum)
751 self._results.preexisting_shares = len(already_peers)
754 for peer in used_peers:
755 assert isinstance(peer, PeerTracker)
757 for peer in used_peers:
758 buckets.update(peer.buckets)
759 for shnum in peer.buckets:
760 self._sharemap[shnum] = peer
761 assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
762 encoder.set_shareholders(buckets)
764 def _encrypted_done(self, res):
766 for shnum in self._encoder.get_shares_placed():
767 peer_tracker = self._sharemap[shnum]
768 peerid = peer_tracker.peerid
769 peerid_s = idlib.shortnodeid_b2a(peerid)
770 r.sharemap[shnum] = "Placed on [%s]" % peerid_s
771 if peerid not in r.servermap:
772 r.servermap[peerid] = set()
773 r.servermap[peerid].add(shnum)
774 r.pushed_shares = len(self._encoder.get_shares_placed())
776 r.file_size = self._encoder.file_size
777 r.timings["total"] = now - self._started
778 r.timings["storage_index"] = self._storage_index_elapsed
779 r.timings["peer_selection"] = self._peer_selection_elapsed
780 r.timings.update(self._encoder.get_times())
781 r.uri_extension_data = self._encoder.get_uri_extension_data()
784 def _compute_uri(self, (uri_extension_hash,
785 needed_shares, total_shares, size),
787 u = uri.CHKFileURI(key=key,
788 uri_extension_hash=uri_extension_hash,
789 needed_shares=needed_shares,
790 total_shares=total_shares,
794 r.uri = u.to_string()
797 def get_upload_status(self):
798 return self._upload_status
800 def read_this_many_bytes(uploadable, size, prepend_data=[]):
802 return defer.succeed([])
803 d = uploadable.read(size)
805 assert isinstance(data, list)
806 bytes = sum([len(piece) for piece in data])
809 remaining = size - bytes
811 return read_this_many_bytes(uploadable, remaining,
813 return prepend_data + data
817 class LiteralUploader:
819 def __init__(self, client):
820 self._client = client
821 self._results = UploadResults()
822 self._status = s = UploadStatus()
823 s.set_storage_index(None)
825 s.set_progress(0, 1.0)
827 s.set_results(self._results)
829 def start(self, uploadable):
830 uploadable = IUploadable(uploadable)
831 d = uploadable.get_size()
834 self._status.set_size(size)
835 self._results.file_size = size
836 return read_this_many_bytes(uploadable, size)
837 d.addCallback(_got_size)
838 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
839 d.addCallback(lambda u: u.to_string())
840 d.addCallback(self._build_results)
843 def _build_results(self, uri):
844 self._results.uri = uri
845 self._status.set_status("Done")
846 self._status.set_progress(1, 1.0)
847 self._status.set_progress(2, 1.0)
853 def get_upload_status(self):
856 class RemoteEncryptedUploadable(Referenceable):
857 implements(RIEncryptedUploadable)
859 def __init__(self, encrypted_uploadable, upload_status):
860 self._eu = IEncryptedUploadable(encrypted_uploadable)
863 self._status = IUploadStatus(upload_status)
864 # we are responsible for updating the status string while we run, and
865 # for setting the ciphertext-fetch progress.
869 if self._size is not None:
870 return defer.succeed(self._size)
871 d = self._eu.get_size()
875 d.addCallback(_got_size)
878 def remote_get_size(self):
879 return self.get_size()
880 def remote_get_all_encoding_parameters(self):
881 return self._eu.get_all_encoding_parameters()
883 def _read_encrypted(self, length, hash_only):
884 d = self._eu.read_encrypted(length, hash_only)
887 self._offset += length
889 size = sum([len(data) for data in strings])
895 def remote_read_encrypted(self, offset, length):
896 # we don't support seek backwards, but we allow skipping forwards
897 precondition(offset >= 0, offset)
898 precondition(length >= 0, length)
899 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
901 precondition(offset >= self._offset, offset, self._offset)
902 if offset > self._offset:
903 # read the data from disk anyways, to build up the hash tree
904 skip = offset - self._offset
905 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
906 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
907 d = self._read_encrypted(skip, hash_only=True)
909 d = defer.succeed(None)
911 def _at_correct_offset(res):
912 assert offset == self._offset, "%d != %d" % (offset, self._offset)
913 return self._read_encrypted(length, hash_only=False)
914 d.addCallback(_at_correct_offset)
917 size = sum([len(data) for data in strings])
918 self._bytes_sent += size
923 def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
924 log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
925 (first, last-1, num_segments),
927 d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
930 def remote_get_plaintext_hash(self):
931 return self._eu.get_plaintext_hash()
932 def remote_close(self):
933 return self._eu.close()
936 class AssistedUploader:
938 def __init__(self, helper):
939 self._helper = helper
940 self._log_number = log.msg("AssistedUploader starting")
941 self._storage_index = None
942 self._upload_status = s = UploadStatus()
946 def log(self, *args, **kwargs):
947 if "parent" not in kwargs:
948 kwargs["parent"] = self._log_number
949 return log.msg(*args, **kwargs)
951 def start(self, uploadable):
952 self._started = time.time()
953 u = IUploadable(uploadable)
954 eu = EncryptAnUploadable(u, self._log_number)
955 eu.set_upload_status(self._upload_status)
956 self._encuploadable = eu
958 d.addCallback(self._got_size)
959 d.addCallback(lambda res: eu.get_all_encoding_parameters())
960 d.addCallback(self._got_all_encoding_parameters)
961 # when we get the encryption key, that will also compute the storage
962 # index, so this only takes one pass.
963 # TODO: I'm not sure it's cool to switch back and forth between
964 # the Uploadable and the IEncryptedUploadable that wraps it.
965 d.addCallback(lambda res: u.get_encryption_key())
966 d.addCallback(self._got_encryption_key)
967 d.addCallback(lambda res: eu.get_storage_index())
968 d.addCallback(self._got_storage_index)
969 d.addCallback(self._contact_helper)
970 d.addCallback(self._build_readcap)
972 self._upload_status.set_active(False)
977 def _got_size(self, size):
979 self._upload_status.set_size(size)
981 def _got_all_encoding_parameters(self, params):
982 k, happy, n, segment_size = params
983 # stash these for URI generation later
984 self._needed_shares = k
985 self._total_shares = n
986 self._segment_size = segment_size
988 def _got_encryption_key(self, key):
991 def _got_storage_index(self, storage_index):
992 self._storage_index = storage_index
995 def _contact_helper(self, res):
996 now = self._time_contacting_helper_start = time.time()
997 self._storage_index_elapsed = now - self._started
998 self.log(format="contacting helper for SI %(si)s..",
999 si=storage.si_b2a(self._storage_index))
1000 self._upload_status.set_status("Contacting Helper")
1001 d = self._helper.callRemote("upload_chk", self._storage_index)
1002 d.addCallback(self._contacted_helper)
1005 def _contacted_helper(self, (upload_results, upload_helper)):
1007 elapsed = now - self._time_contacting_helper_start
1008 self._elapsed_time_contacting_helper = elapsed
1010 self.log("helper says we need to upload")
1011 self._upload_status.set_status("Uploading Ciphertext")
1012 # we need to upload the file
1013 reu = RemoteEncryptedUploadable(self._encuploadable,
1014 self._upload_status)
1015 # let it pre-compute the size for progress purposes
1017 d.addCallback(lambda ignored:
1018 upload_helper.callRemote("upload", reu))
1019 # this Deferred will fire with the upload results
1021 self.log("helper says file is already uploaded")
1022 self._upload_status.set_progress(1, 1.0)
1023 self._upload_status.set_results(upload_results)
1024 return upload_results
1026 def _build_readcap(self, upload_results):
1027 self.log("upload finished, building readcap")
1028 self._upload_status.set_status("Building Readcap")
1030 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1031 assert r.uri_extension_data["total_shares"] == self._total_shares
1032 assert r.uri_extension_data["segment_size"] == self._segment_size
1033 assert r.uri_extension_data["size"] == self._size
1034 u = uri.CHKFileURI(key=self._key,
1035 uri_extension_hash=r.uri_extension_hash,
1036 needed_shares=self._needed_shares,
1037 total_shares=self._total_shares,
1040 r.uri = u.to_string()
1042 r.file_size = self._size
1043 r.timings["storage_index"] = self._storage_index_elapsed
1044 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1045 if "total" in r.timings:
1046 r.timings["helper_total"] = r.timings["total"]
1047 r.timings["total"] = now - self._started
1048 self._upload_status.set_status("Done")
1049 self._upload_status.set_results(r)
1052 def get_upload_status(self):
1053 return self._upload_status
1055 class BaseUploadable:
1056 default_max_segment_size = 128*KiB # overridden by max_segment_size
1057 default_encoding_param_k = 3 # overridden by encoding_parameters
1058 default_encoding_param_happy = 7
1059 default_encoding_param_n = 10
1061 max_segment_size = None
1062 encoding_param_k = None
1063 encoding_param_happy = None
1064 encoding_param_n = None
1066 _all_encoding_parameters = None
1069 def set_upload_status(self, upload_status):
1070 self._status = IUploadStatus(upload_status)
1072 def set_default_encoding_parameters(self, default_params):
1073 assert isinstance(default_params, dict)
1074 for k,v in default_params.items():
1075 precondition(isinstance(k, str), k, v)
1076 precondition(isinstance(v, int), k, v)
1077 if "k" in default_params:
1078 self.default_encoding_param_k = default_params["k"]
1079 if "happy" in default_params:
1080 self.default_encoding_param_happy = default_params["happy"]
1081 if "n" in default_params:
1082 self.default_encoding_param_n = default_params["n"]
1083 if "max_segment_size" in default_params:
1084 self.default_max_segment_size = default_params["max_segment_size"]
1086 def get_all_encoding_parameters(self):
1087 if self._all_encoding_parameters:
1088 return defer.succeed(self._all_encoding_parameters)
1090 max_segsize = self.max_segment_size or self.default_max_segment_size
1091 k = self.encoding_param_k or self.default_encoding_param_k
1092 happy = self.encoding_param_happy or self.default_encoding_param_happy
1093 n = self.encoding_param_n or self.default_encoding_param_n
1096 def _got_size(file_size):
1097 # for small files, shrink the segment size to avoid wasting space
1098 segsize = min(max_segsize, file_size)
1099 # this must be a multiple of 'required_shares'==k
1100 segsize = mathutil.next_multiple(segsize, k)
1101 encoding_parameters = (k, happy, n, segsize)
1102 self._all_encoding_parameters = encoding_parameters
1103 return encoding_parameters
1104 d.addCallback(_got_size)
1107 class FileHandle(BaseUploadable):
1108 implements(IUploadable)
1110 def __init__(self, filehandle, convergence):
1112 Upload the data from the filehandle. If convergence is None then a
1113 random encryption key will be used, else the plaintext will be hashed,
1114 then the hash will be hashed together with the string in the
1115 "convergence" argument to form the encryption key.
1117 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1118 self._filehandle = filehandle
1120 self.convergence = convergence
1123 def _get_encryption_key_convergent(self):
1124 if self._key is not None:
1125 return defer.succeed(self._key)
1128 # that sets self._size as a side-effect
1129 d.addCallback(lambda size: self.get_all_encoding_parameters())
1131 k, happy, n, segsize = params
1132 f = self._filehandle
1133 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1138 data = f.read(BLOCKSIZE)
1141 enckey_hasher.update(data)
1142 # TODO: setting progress in a non-yielding loop is kind of
1143 # pointless, but I'm anticipating (perhaps prematurely) the
1144 # day when we use a slowjob or twisted's CooperatorService to
1145 # make this yield time to other jobs.
1146 bytes_read += len(data)
1148 self._status.set_progress(0, float(bytes_read)/self._size)
1150 self._key = enckey_hasher.digest()
1152 self._status.set_progress(0, 1.0)
1153 assert len(self._key) == 16
1158 def _get_encryption_key_random(self):
1159 if self._key is None:
1160 self._key = os.urandom(16)
1161 return defer.succeed(self._key)
1163 def get_encryption_key(self):
1164 if self.convergence is not None:
1165 return self._get_encryption_key_convergent()
1167 return self._get_encryption_key_random()
1170 if self._size is not None:
1171 return defer.succeed(self._size)
1172 self._filehandle.seek(0,2)
1173 size = self._filehandle.tell()
1175 self._filehandle.seek(0)
1176 return defer.succeed(size)
1178 def read(self, length):
1179 return defer.succeed([self._filehandle.read(length)])
1182 # the originator of the filehandle reserves the right to close it
1185 class FileName(FileHandle):
1186 def __init__(self, filename, convergence):
1188 Upload the data from the filename. If convergence is None then a
1189 random encryption key will be used, else the plaintext will be hashed,
1190 then the hash will be hashed together with the string in the
1191 "convergence" argument to form the encryption key.
1193 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1194 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1196 FileHandle.close(self)
1197 self._filehandle.close()
1199 class Data(FileHandle):
1200 def __init__(self, data, convergence):
1202 Upload the data from the data argument. If convergence is None then a
1203 random encryption key will be used, else the plaintext will be hashed,
1204 then the hash will be hashed together with the string in the
1205 "convergence" argument to form the encryption key.
1207 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1208 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1210 class Uploader(service.MultiService):
1211 """I am a service that allows file uploading. I am a service-child of the
1214 implements(IUploader)
1216 uploader_class = CHKUploader
1217 URI_LIT_SIZE_THRESHOLD = 55
1218 MAX_UPLOAD_STATUSES = 10
1220 def __init__(self, helper_furl=None, stats_provider=None):
1221 self._helper_furl = helper_furl
1222 self.stats_provider = stats_provider
1224 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1225 self._all_upload_statuses = weakref.WeakKeyDictionary()
1226 self._recent_upload_statuses = []
1227 service.MultiService.__init__(self)
1229 def startService(self):
1230 service.MultiService.startService(self)
1231 if self._helper_furl:
1232 self.parent.tub.connectTo(self._helper_furl,
1235 def _got_helper(self, helper):
1236 log.msg("got helper connection, getting versions")
1237 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1239 "application-version": "unknown: no get_version()",
1241 d = get_versioned_remote_reference(helper, default)
1242 d.addCallback(self._got_versioned_helper)
1244 def _got_versioned_helper(self, helper):
1245 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1246 if needed not in helper.version:
1247 raise InsufficientVersionError(needed, helper.version)
1248 self._helper = helper
1249 helper.notifyOnDisconnect(self._lost_helper)
1251 def _lost_helper(self):
1254 def get_helper_info(self):
1255 # return a tuple of (helper_furl_or_None, connected_bool)
1256 return (self._helper_furl, bool(self._helper))
1259 def upload(self, uploadable):
1260 # this returns the URI
1264 uploadable = IUploadable(uploadable)
1265 d = uploadable.get_size()
1266 def _got_size(size):
1267 default_params = self.parent.get_encoding_parameters()
1268 precondition(isinstance(default_params, dict), default_params)
1269 precondition("max_segment_size" in default_params, default_params)
1270 uploadable.set_default_encoding_parameters(default_params)
1272 if self.stats_provider:
1273 self.stats_provider.count('uploader.files_uploaded', 1)
1274 self.stats_provider.count('uploader.bytes_uploaded', size)
1276 if size <= self.URI_LIT_SIZE_THRESHOLD:
1277 uploader = LiteralUploader(self.parent)
1279 uploader = AssistedUploader(self._helper)
1281 uploader = self.uploader_class(self.parent)
1282 self._add_upload(uploader)
1283 return uploader.start(uploadable)
1284 d.addCallback(_got_size)
1291 def _add_upload(self, uploader):
1292 s = uploader.get_upload_status()
1293 self._all_uploads[uploader] = None
1294 self._all_upload_statuses[s] = None
1295 self._recent_upload_statuses.append(s)
1296 while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
1297 self._recent_upload_statuses.pop(0)
1299 def list_all_upload_statuses(self):
1300 for us in self._all_upload_statuses: