1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap import Referenceable, Copyable, RemoteCopy
7 from foolscap import eventual
9 from allmydata.util.hashutil import file_renewal_secret_hash, \
10 file_cancel_secret_hash, bucket_renewal_secret_hash, \
11 bucket_cancel_secret_hash, plaintext_hasher, \
12 storage_index_hash, plaintext_segment_hasher, convergence_hasher
13 from allmydata import storage, hashtree, uri
14 from allmydata.immutable import encode
15 from allmydata.util import base32, idlib, log, mathutil
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.rrefutil import get_versioned_remote_reference
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
20 NotEnoughSharesError, InsufficientVersionError
21 from allmydata.immutable import layout
22 from pycryptopp.cipher.aes import AES
24 from cStringIO import StringIO
33 class HaveAllPeersError(Exception):
34 # we use this to jump out of the loop
37 # this wants to live in storage, not here
38 class TooFullError(Exception):
41 class UploadResults(Copyable, RemoteCopy):
42 implements(IUploadResults)
43 # note: don't change this string, it needs to match the value used on the
44 # helper, and it does *not* need to match the fully-qualified
45 # package/module/class name
46 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
50 self.timings = {} # dict of name to number of seconds
51 self.sharemap = {} # dict of shnum to placement string
52 self.servermap = {} # dict of peerid to set(shnums)
54 self.ciphertext_fetched = None # how much the helper fetched
56 self.preexisting_shares = None # count of shares already present
57 self.pushed_shares = None # count of shares we pushed
60 # our current uri_extension is 846 bytes for small files, a few bytes
61 # more for larger ones (since the filesize is encoded in decimal in a
62 # few places). Ask for a little bit more just in case we need it. If
63 # the extension changes size, we can change EXTENSION_SIZE to
64 # allocate a more accurate amount of space.
66 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
70 def __init__(self, peerid, storage_server,
71 sharesize, blocksize, num_segments, num_share_hashes,
73 bucket_renewal_secret, bucket_cancel_secret):
74 precondition(isinstance(peerid, str), peerid)
75 precondition(len(peerid) == 20, peerid)
77 self._storageserver = storage_server # to an RIStorageServer
78 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
79 self.sharesize = sharesize
80 self.allocated_size = layout.allocated_size(sharesize,
85 self.blocksize = blocksize
86 self.num_segments = num_segments
87 self.num_share_hashes = num_share_hashes
88 self.storage_index = storage_index
90 self.renew_secret = bucket_renewal_secret
91 self.cancel_secret = bucket_cancel_secret
94 return ("<PeerTracker for peer %s and SI %s>"
95 % (idlib.shortnodeid_b2a(self.peerid),
96 storage.si_b2a(self.storage_index)[:5]))
98 def query(self, sharenums):
99 d = self._storageserver.callRemote("allocate_buckets",
105 canary=Referenceable())
106 d.addCallback(self._got_reply)
109 def _got_reply(self, (alreadygot, buckets)):
110 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
112 for sharenum, rref in buckets.iteritems():
113 bp = layout.WriteBucketProxy(rref, self.sharesize,
116 self.num_share_hashes,
120 self.buckets.update(b)
121 return (alreadygot, set(b.keys()))
123 class Tahoe2PeerSelector:
125 def __init__(self, upload_id, logparent=None, upload_status=None):
126 self.upload_id = upload_id
127 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
129 self.num_peers_contacted = 0
130 self.last_failure_msg = None
131 self._status = IUploadStatus(upload_status)
132 self._log_parent = log.msg("%s starting" % self, parent=logparent)
135 return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
137 def get_shareholders(self, client,
138 storage_index, share_size, block_size,
139 num_segments, total_shares, shares_of_happiness):
141 @return: (used_peers, already_peers), where used_peers is a set of
142 PeerTracker instances that have agreed to hold some shares
143 for us (the shnum is stashed inside the PeerTracker),
144 and already_peers is a dict mapping shnum to a peer
145 which claims to already have the share.
149 self._status.set_status("Contacting Peers..")
151 self.total_shares = total_shares
152 self.shares_of_happiness = shares_of_happiness
154 self.homeless_shares = range(total_shares)
155 # self.uncontacted_peers = list() # peers we haven't asked yet
156 self.contacted_peers = [] # peers worth asking again
157 self.contacted_peers2 = [] # peers that we have asked again
158 self._started_second_pass = False
159 self.use_peers = set() # PeerTrackers that have shares assigned to them
160 self.preexisting_shares = {} # sharenum -> peerid holding the share
162 peers = client.get_permuted_peers("storage", storage_index)
164 raise NotEnoughSharesError("client gave us zero peers")
166 # this needed_hashes computation should mirror
167 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
168 # (instead of a HashTree) because we don't require actual hashing
169 # just to count the levels.
170 ht = hashtree.IncompleteHashTree(total_shares)
171 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
173 # figure out how much space to ask for
174 allocated_size = layout.allocated_size(share_size,
178 # filter the list of peers according to which ones can accomodate
179 # this request. This excludes older peers (which used a 4-byte size
180 # field) from getting large shares (for files larger than about
181 # 12GiB). See #439 for details.
182 def _get_maxsize(peer):
183 (peerid, conn) = peer
184 v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
185 return v1["maximum-immutable-share-size"]
186 peers = [peer for peer in peers
187 if _get_maxsize(peer) >= allocated_size]
189 raise NotEnoughSharesError("no peers could accept an allocated_size of %d" % allocated_size)
191 # decide upon the renewal/cancel secrets, to include them in the
192 # allocat_buckets query.
193 client_renewal_secret = client.get_renewal_secret()
194 client_cancel_secret = client.get_cancel_secret()
196 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
198 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
201 trackers = [ PeerTracker(peerid, conn,
202 share_size, block_size,
203 num_segments, num_share_hashes,
205 bucket_renewal_secret_hash(file_renewal_secret,
207 bucket_cancel_secret_hash(file_cancel_secret,
210 for (peerid, conn) in peers ]
211 self.uncontacted_peers = trackers
213 d = defer.maybeDeferred(self._loop)
217 if not self.homeless_shares:
219 msg = ("placed all %d shares, "
220 "sent %d queries to %d peers, "
221 "%d queries placed some shares, %d placed none, "
224 self.query_count, self.num_peers_contacted,
225 self.good_query_count, self.bad_query_count,
227 log.msg("peer selection successful for %s: %s" % (self, msg),
228 parent=self._log_parent)
229 return (self.use_peers, self.preexisting_shares)
231 if self.uncontacted_peers:
232 peer = self.uncontacted_peers.pop(0)
233 # TODO: don't pre-convert all peerids to PeerTrackers
234 assert isinstance(peer, PeerTracker)
236 shares_to_ask = set([self.homeless_shares.pop(0)])
237 self.query_count += 1
238 self.num_peers_contacted += 1
240 self._status.set_status("Contacting Peers [%s] (first query),"
242 % (idlib.shortnodeid_b2a(peer.peerid),
243 len(self.homeless_shares)))
244 d = peer.query(shares_to_ask)
245 d.addBoth(self._got_response, peer, shares_to_ask,
246 self.contacted_peers)
248 elif self.contacted_peers:
249 # ask a peer that we've already asked.
250 if not self._started_second_pass:
251 log.msg("starting second pass", parent=self._log_parent,
253 self._started_second_pass = True
254 num_shares = mathutil.div_ceil(len(self.homeless_shares),
255 len(self.contacted_peers))
256 peer = self.contacted_peers.pop(0)
257 shares_to_ask = set(self.homeless_shares[:num_shares])
258 self.homeless_shares[:num_shares] = []
259 self.query_count += 1
261 self._status.set_status("Contacting Peers [%s] (second query),"
263 % (idlib.shortnodeid_b2a(peer.peerid),
264 len(self.homeless_shares)))
265 d = peer.query(shares_to_ask)
266 d.addBoth(self._got_response, peer, shares_to_ask,
267 self.contacted_peers2)
269 elif self.contacted_peers2:
270 # we've finished the second-or-later pass. Move all the remaining
271 # peers back into self.contacted_peers for the next pass.
272 self.contacted_peers.extend(self.contacted_peers2)
273 self.contacted_peers[:] = []
276 # no more peers. If we haven't placed enough shares, we fail.
277 placed_shares = self.total_shares - len(self.homeless_shares)
278 if placed_shares < self.shares_of_happiness:
279 msg = ("placed %d shares out of %d total (%d homeless), "
280 "sent %d queries to %d peers, "
281 "%d queries placed some shares, %d placed none, "
283 (self.total_shares - len(self.homeless_shares),
284 self.total_shares, len(self.homeless_shares),
285 self.query_count, self.num_peers_contacted,
286 self.good_query_count, self.bad_query_count,
288 msg = "peer selection failed for %s: %s" % (self, msg)
289 if self.last_failure_msg:
290 msg += " (%s)" % (self.last_failure_msg,)
291 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
292 raise NotEnoughSharesError(msg)
294 # we placed enough to be happy, so we're done
296 self._status.set_status("Placed all shares")
297 return self.use_peers
299 def _got_response(self, res, peer, shares_to_ask, put_peer_here):
300 if isinstance(res, failure.Failure):
301 # This is unusual, and probably indicates a bug or a network
303 log.msg("%s got error during peer selection: %s" % (peer, res),
304 level=log.UNUSUAL, parent=self._log_parent)
305 self.error_count += 1
306 self.homeless_shares = list(shares_to_ask) + self.homeless_shares
307 if (self.uncontacted_peers
308 or self.contacted_peers
309 or self.contacted_peers2):
310 # there is still hope, so just loop
313 # No more peers, so this upload might fail (it depends upon
314 # whether we've hit shares_of_happiness or not). Log the last
315 # failure we got: if a coding error causes all peers to fail
316 # in the same way, this allows the common failure to be seen
317 # by the uploader and should help with debugging
318 msg = ("last failure (from %s) was: %s" % (peer, res))
319 self.last_failure_msg = msg
321 (alreadygot, allocated) = res
322 log.msg("response from peer %s: alreadygot=%s, allocated=%s"
323 % (idlib.shortnodeid_b2a(peer.peerid),
324 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
325 level=log.NOISY, parent=self._log_parent)
328 self.preexisting_shares[s] = peer.peerid
329 if s in self.homeless_shares:
330 self.homeless_shares.remove(s)
333 # the PeerTracker will remember which shares were allocated on
334 # that peer. We just have to remember to use them.
336 self.use_peers.add(peer)
339 not_yet_present = set(shares_to_ask) - set(alreadygot)
340 still_homeless = not_yet_present - set(allocated)
343 # they accepted or already had at least one share, so
344 # progress has been made
345 self.good_query_count += 1
347 self.bad_query_count += 1
350 # In networks with lots of space, this is very unusual and
351 # probably indicates an error. In networks with peers that
352 # are full, it is merely unusual. In networks that are very
353 # full, it is common, and many uploads will fail. In most
354 # cases, this is obviously not fatal, and we'll just use some
357 # some shares are still homeless, keep trying to find them a
358 # home. The ones that were rejected get first priority.
359 self.homeless_shares = (list(still_homeless)
360 + self.homeless_shares)
361 # Since they were unable to accept all of our requests, so it
362 # is safe to assume that asking them again won't help.
364 # if they *were* able to accept everything, they might be
365 # willing to accept even more.
366 put_peer_here.append(peer)
372 class EncryptAnUploadable:
373 """This is a wrapper that takes an IUploadable and provides
374 IEncryptedUploadable."""
375 implements(IEncryptedUploadable)
378 def __init__(self, original, log_parent=None):
379 self.original = IUploadable(original)
380 self._log_number = log_parent
381 self._encryptor = None
382 self._plaintext_hasher = plaintext_hasher()
383 self._plaintext_segment_hasher = None
384 self._plaintext_segment_hashes = []
385 self._encoding_parameters = None
386 self._file_size = None
387 self._ciphertext_bytes_read = 0
390 def set_upload_status(self, upload_status):
391 self._status = IUploadStatus(upload_status)
392 self.original.set_upload_status(upload_status)
394 def log(self, *args, **kwargs):
395 if "facility" not in kwargs:
396 kwargs["facility"] = "upload.encryption"
397 if "parent" not in kwargs:
398 kwargs["parent"] = self._log_number
399 return log.msg(*args, **kwargs)
402 if self._file_size is not None:
403 return defer.succeed(self._file_size)
404 d = self.original.get_size()
406 self._file_size = size
408 self._status.set_size(size)
410 d.addCallback(_got_size)
413 def get_all_encoding_parameters(self):
414 if self._encoding_parameters is not None:
415 return defer.succeed(self._encoding_parameters)
416 d = self.original.get_all_encoding_parameters()
417 def _got(encoding_parameters):
418 (k, happy, n, segsize) = encoding_parameters
419 self._segment_size = segsize # used by segment hashers
420 self._encoding_parameters = encoding_parameters
421 self.log("my encoding parameters: %s" % (encoding_parameters,),
423 return encoding_parameters
427 def _get_encryptor(self):
429 return defer.succeed(self._encryptor)
431 d = self.original.get_encryption_key()
436 storage_index = storage_index_hash(key)
437 assert isinstance(storage_index, str)
438 # There's no point to having the SI be longer than the key, so we
439 # specify that it is truncated to the same 128 bits as the AES key.
440 assert len(storage_index) == 16 # SHA-256 truncated to 128b
441 self._storage_index = storage_index
443 self._status.set_storage_index(storage_index)
448 def get_storage_index(self):
449 d = self._get_encryptor()
450 d.addCallback(lambda res: self._storage_index)
453 def _get_segment_hasher(self):
454 p = self._plaintext_segment_hasher
456 left = self._segment_size - self._plaintext_segment_hashed_bytes
458 p = plaintext_segment_hasher()
459 self._plaintext_segment_hasher = p
460 self._plaintext_segment_hashed_bytes = 0
461 return p, self._segment_size
463 def _update_segment_hash(self, chunk):
465 while offset < len(chunk):
466 p, segment_left = self._get_segment_hasher()
467 chunk_left = len(chunk) - offset
468 this_segment = min(chunk_left, segment_left)
469 p.update(chunk[offset:offset+this_segment])
470 self._plaintext_segment_hashed_bytes += this_segment
472 if self._plaintext_segment_hashed_bytes == self._segment_size:
473 # we've filled this segment
474 self._plaintext_segment_hashes.append(p.digest())
475 self._plaintext_segment_hasher = None
476 self.log("closed hash [%d]: %dB" %
477 (len(self._plaintext_segment_hashes)-1,
478 self._plaintext_segment_hashed_bytes),
480 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
481 segnum=len(self._plaintext_segment_hashes)-1,
482 hash=base32.b2a(p.digest()),
485 offset += this_segment
488 def read_encrypted(self, length, hash_only):
489 # make sure our parameters have been set up first
490 d = self.get_all_encoding_parameters()
492 d.addCallback(lambda ignored: self.get_size())
493 d.addCallback(lambda ignored: self._get_encryptor())
494 # then fetch and encrypt the plaintext. The unusual structure here
495 # (passing a Deferred *into* a function) is needed to avoid
496 # overflowing the stack: Deferreds don't optimize out tail recursion.
497 # We also pass in a list, to which _read_encrypted will append
500 d2 = defer.Deferred()
501 d.addCallback(lambda ignored:
502 self._read_encrypted(length, ciphertext, hash_only, d2))
503 d.addCallback(lambda ignored: d2)
506 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
508 fire_when_done.callback(ciphertext)
510 # tolerate large length= values without consuming a lot of RAM by
511 # reading just a chunk (say 50kB) at a time. This only really matters
512 # when hash_only==True (i.e. resuming an interrupted upload), since
513 # that's the case where we will be skipping over a lot of data.
514 size = min(remaining, self.CHUNKSIZE)
515 remaining = remaining - size
516 # read a chunk of plaintext..
517 d = defer.maybeDeferred(self.original.read, size)
518 # N.B.: if read() is synchronous, then since everything else is
519 # actually synchronous too, we'd blow the stack unless we stall for a
520 # tick. Once you accept a Deferred from IUploadable.read(), you must
521 # be prepared to have it fire immediately too.
522 d.addCallback(eventual.fireEventually)
523 def _good(plaintext):
525 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
526 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
527 ciphertext.extend(ct)
528 self._read_encrypted(remaining, ciphertext, hash_only,
531 fire_when_done.errback(why)
536 def _hash_and_encrypt_plaintext(self, data, hash_only):
537 assert isinstance(data, (tuple, list)), type(data)
540 # we use data.pop(0) instead of 'for chunk in data' to save
541 # memory: each chunk is destroyed as soon as we're done with it.
545 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
547 bytes_processed += len(chunk)
548 self._plaintext_hasher.update(chunk)
549 self._update_segment_hash(chunk)
550 # TODO: we have to encrypt the data (even if hash_only==True)
551 # because pycryptopp's AES-CTR implementation doesn't offer a
552 # way to change the counter value. Once pycryptopp acquires
553 # this ability, change this to simply update the counter
554 # before each call to (hash_only==False) _encryptor.process()
555 ciphertext = self._encryptor.process(chunk)
557 self.log(" skipping encryption", level=log.NOISY)
559 cryptdata.append(ciphertext)
562 self._ciphertext_bytes_read += bytes_processed
564 progress = float(self._ciphertext_bytes_read) / self._file_size
565 self._status.set_progress(1, progress)
569 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
570 if len(self._plaintext_segment_hashes) < num_segments:
571 # close out the last one
572 assert len(self._plaintext_segment_hashes) == num_segments-1
573 p, segment_left = self._get_segment_hasher()
574 self._plaintext_segment_hashes.append(p.digest())
575 del self._plaintext_segment_hasher
576 self.log("closing plaintext leaf hasher, hashed %d bytes" %
577 self._plaintext_segment_hashed_bytes,
579 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
580 segnum=len(self._plaintext_segment_hashes)-1,
581 hash=base32.b2a(p.digest()),
583 assert len(self._plaintext_segment_hashes) == num_segments
584 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
586 def get_plaintext_hash(self):
587 h = self._plaintext_hasher.digest()
588 return defer.succeed(h)
591 return self.original.close()
594 implements(IUploadStatus)
595 statusid_counter = itertools.count(0)
598 self.storage_index = None
601 self.status = "Not started"
602 self.progress = [0.0, 0.0, 0.0]
605 self.counter = self.statusid_counter.next()
606 self.started = time.time()
608 def get_started(self):
610 def get_storage_index(self):
611 return self.storage_index
614 def using_helper(self):
616 def get_status(self):
618 def get_progress(self):
619 return tuple(self.progress)
620 def get_active(self):
622 def get_results(self):
624 def get_counter(self):
627 def set_storage_index(self, si):
628 self.storage_index = si
629 def set_size(self, size):
631 def set_helper(self, helper):
633 def set_status(self, status):
635 def set_progress(self, which, value):
636 # [0]: chk, [1]: ciphertext, [2]: encode+push
637 self.progress[which] = value
638 def set_active(self, value):
640 def set_results(self, value):
644 peer_selector_class = Tahoe2PeerSelector
646 def __init__(self, client):
647 self._client = client
648 self._log_number = self._client.log("CHKUploader starting")
650 self._results = UploadResults()
651 self._storage_index = None
652 self._upload_status = UploadStatus()
653 self._upload_status.set_helper(False)
654 self._upload_status.set_active(True)
655 self._upload_status.set_results(self._results)
657 def log(self, *args, **kwargs):
658 if "parent" not in kwargs:
659 kwargs["parent"] = self._log_number
660 if "facility" not in kwargs:
661 kwargs["facility"] = "tahoe.upload"
662 return self._client.log(*args, **kwargs)
664 def start(self, encrypted_uploadable):
665 """Start uploading the file.
667 Returns a Deferred that will fire with the UploadResults instance.
670 self._started = time.time()
671 eu = IEncryptedUploadable(encrypted_uploadable)
672 self.log("starting upload of %s" % eu)
674 eu.set_upload_status(self._upload_status)
675 d = self.start_encrypted(eu)
676 def _done(uploadresults):
677 self._upload_status.set_active(False)
683 """Call this is the upload must be abandoned before it completes.
684 This will tell the shareholders to delete their partial shares. I
685 return a Deferred that fires when these messages have been acked."""
686 if not self._encoder:
687 # how did you call abort() before calling start() ?
688 return defer.succeed(None)
689 return self._encoder.abort()
691 def start_encrypted(self, encrypted):
692 """ Returns a Deferred that will fire with the UploadResults instance. """
693 eu = IEncryptedUploadable(encrypted)
695 started = time.time()
696 self._encoder = e = encode.Encoder(self._log_number,
698 d = e.set_encrypted_uploadable(eu)
699 d.addCallback(self.locate_all_shareholders, started)
700 d.addCallback(self.set_shareholders, e)
701 d.addCallback(lambda res: e.start())
702 d.addCallback(self._encrypted_done)
705 def locate_all_shareholders(self, encoder, started):
706 peer_selection_started = now = time.time()
707 self._storage_index_elapsed = now - started
708 storage_index = encoder.get_param("storage_index")
709 self._storage_index = storage_index
710 upload_id = storage.si_b2a(storage_index)[:5]
711 self.log("using storage index %s" % upload_id)
712 peer_selector = self.peer_selector_class(upload_id, self._log_number,
715 share_size = encoder.get_param("share_size")
716 block_size = encoder.get_param("block_size")
717 num_segments = encoder.get_param("num_segments")
718 k,desired,n = encoder.get_param("share_counts")
720 self._peer_selection_started = time.time()
721 d = peer_selector.get_shareholders(self._client, storage_index,
722 share_size, block_size,
723 num_segments, n, desired)
725 self._peer_selection_elapsed = time.time() - peer_selection_started
730 def set_shareholders(self, (used_peers, already_peers), encoder):
732 @param used_peers: a sequence of PeerTracker objects
733 @paran already_peers: a dict mapping sharenum to a peerid that
734 claims to already have this share
736 self.log("_send_shares, used_peers is %s" % (used_peers,))
737 # record already-present shares in self._results
738 for (shnum, peerid) in already_peers.items():
739 peerid_s = idlib.shortnodeid_b2a(peerid)
740 self._results.sharemap[shnum] = "Found on [%s]" % peerid_s
741 if peerid not in self._results.servermap:
742 self._results.servermap[peerid] = set()
743 self._results.servermap[peerid].add(shnum)
744 self._results.preexisting_shares = len(already_peers)
747 for peer in used_peers:
748 assert isinstance(peer, PeerTracker)
750 for peer in used_peers:
751 buckets.update(peer.buckets)
752 for shnum in peer.buckets:
753 self._sharemap[shnum] = peer
754 assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
755 encoder.set_shareholders(buckets)
757 def _encrypted_done(self, verifycap):
758 """ Returns a Deferred that will fire with the UploadResults instance. """
760 for shnum in self._encoder.get_shares_placed():
761 peer_tracker = self._sharemap[shnum]
762 peerid = peer_tracker.peerid
763 peerid_s = idlib.shortnodeid_b2a(peerid)
764 r.sharemap[shnum] = "Placed on [%s]" % peerid_s
765 if peerid not in r.servermap:
766 r.servermap[peerid] = set()
767 r.servermap[peerid].add(shnum)
768 r.pushed_shares = len(self._encoder.get_shares_placed())
770 r.file_size = self._encoder.file_size
771 r.timings["total"] = now - self._started
772 r.timings["storage_index"] = self._storage_index_elapsed
773 r.timings["peer_selection"] = self._peer_selection_elapsed
774 r.timings.update(self._encoder.get_times())
775 r.uri_extension_data = self._encoder.get_uri_extension_data()
776 r.verifycapstr = verifycap.to_string()
779 def get_upload_status(self):
780 return self._upload_status
782 def read_this_many_bytes(uploadable, size, prepend_data=[]):
784 return defer.succeed([])
785 d = uploadable.read(size)
787 assert isinstance(data, list)
788 bytes = sum([len(piece) for piece in data])
791 remaining = size - bytes
793 return read_this_many_bytes(uploadable, remaining,
795 return prepend_data + data
799 class LiteralUploader:
801 def __init__(self, client):
802 self._client = client
803 self._results = UploadResults()
804 self._status = s = UploadStatus()
805 s.set_storage_index(None)
807 s.set_progress(0, 1.0)
809 s.set_results(self._results)
811 def start(self, uploadable):
812 uploadable = IUploadable(uploadable)
813 d = uploadable.get_size()
816 self._status.set_size(size)
817 self._results.file_size = size
818 return read_this_many_bytes(uploadable, size)
819 d.addCallback(_got_size)
820 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
821 d.addCallback(lambda u: u.to_string())
822 d.addCallback(self._build_results)
825 def _build_results(self, uri):
826 self._results.uri = uri
827 self._status.set_status("Done")
828 self._status.set_progress(1, 1.0)
829 self._status.set_progress(2, 1.0)
835 def get_upload_status(self):
838 class RemoteEncryptedUploadable(Referenceable):
839 implements(RIEncryptedUploadable)
841 def __init__(self, encrypted_uploadable, upload_status):
842 self._eu = IEncryptedUploadable(encrypted_uploadable)
845 self._status = IUploadStatus(upload_status)
846 # we are responsible for updating the status string while we run, and
847 # for setting the ciphertext-fetch progress.
851 if self._size is not None:
852 return defer.succeed(self._size)
853 d = self._eu.get_size()
857 d.addCallback(_got_size)
860 def remote_get_size(self):
861 return self.get_size()
862 def remote_get_all_encoding_parameters(self):
863 return self._eu.get_all_encoding_parameters()
865 def _read_encrypted(self, length, hash_only):
866 d = self._eu.read_encrypted(length, hash_only)
869 self._offset += length
871 size = sum([len(data) for data in strings])
877 def remote_read_encrypted(self, offset, length):
878 # we don't support seek backwards, but we allow skipping forwards
879 precondition(offset >= 0, offset)
880 precondition(length >= 0, length)
881 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
883 precondition(offset >= self._offset, offset, self._offset)
884 if offset > self._offset:
885 # read the data from disk anyways, to build up the hash tree
886 skip = offset - self._offset
887 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
888 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
889 d = self._read_encrypted(skip, hash_only=True)
891 d = defer.succeed(None)
893 def _at_correct_offset(res):
894 assert offset == self._offset, "%d != %d" % (offset, self._offset)
895 return self._read_encrypted(length, hash_only=False)
896 d.addCallback(_at_correct_offset)
899 size = sum([len(data) for data in strings])
900 self._bytes_sent += size
905 def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
906 log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
907 (first, last-1, num_segments),
909 d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
912 def remote_get_plaintext_hash(self):
913 return self._eu.get_plaintext_hash()
914 def remote_close(self):
915 return self._eu.close()
918 class AssistedUploader:
920 def __init__(self, helper):
921 self._helper = helper
922 self._log_number = log.msg("AssistedUploader starting")
923 self._storage_index = None
924 self._upload_status = s = UploadStatus()
928 def log(self, *args, **kwargs):
929 if "parent" not in kwargs:
930 kwargs["parent"] = self._log_number
931 return log.msg(*args, **kwargs)
933 def start(self, encrypted_uploadable, storage_index):
934 """Start uploading the file.
936 Returns a Deferred that will fire with the UploadResults instance.
938 precondition(isinstance(storage_index, str), storage_index)
939 self._started = time.time()
940 eu = IEncryptedUploadable(encrypted_uploadable)
941 eu.set_upload_status(self._upload_status)
942 self._encuploadable = eu
943 self._storage_index = storage_index
945 d.addCallback(self._got_size)
946 d.addCallback(lambda res: eu.get_all_encoding_parameters())
947 d.addCallback(self._got_all_encoding_parameters)
948 d.addCallback(self._contact_helper)
949 d.addCallback(self._build_verifycap)
951 self._upload_status.set_active(False)
956 def _got_size(self, size):
958 self._upload_status.set_size(size)
960 def _got_all_encoding_parameters(self, params):
961 k, happy, n, segment_size = params
962 # stash these for URI generation later
963 self._needed_shares = k
964 self._total_shares = n
965 self._segment_size = segment_size
967 def _contact_helper(self, res):
968 now = self._time_contacting_helper_start = time.time()
969 self._storage_index_elapsed = now - self._started
970 self.log(format="contacting helper for SI %(si)s..",
971 si=storage.si_b2a(self._storage_index))
972 self._upload_status.set_status("Contacting Helper")
973 d = self._helper.callRemote("upload_chk", self._storage_index)
974 d.addCallback(self._contacted_helper)
977 def _contacted_helper(self, (upload_results, upload_helper)):
979 elapsed = now - self._time_contacting_helper_start
980 self._elapsed_time_contacting_helper = elapsed
982 self.log("helper says we need to upload")
983 self._upload_status.set_status("Uploading Ciphertext")
984 # we need to upload the file
985 reu = RemoteEncryptedUploadable(self._encuploadable,
987 # let it pre-compute the size for progress purposes
989 d.addCallback(lambda ignored:
990 upload_helper.callRemote("upload", reu))
991 # this Deferred will fire with the upload results
993 self.log("helper says file is already uploaded")
994 self._upload_status.set_progress(1, 1.0)
995 self._upload_status.set_results(upload_results)
996 return upload_results
998 def _build_verifycap(self, upload_results):
999 self.log("upload finished, building readcap")
1000 self._upload_status.set_status("Building Readcap")
1002 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1003 assert r.uri_extension_data["total_shares"] == self._total_shares
1004 assert r.uri_extension_data["segment_size"] == self._segment_size
1005 assert r.uri_extension_data["size"] == self._size
1006 r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1007 uri_extension_hash=r.uri_extension_hash,
1008 needed_shares=self._needed_shares,
1009 total_shares=self._total_shares, size=self._size
1012 r.file_size = self._size
1013 r.timings["storage_index"] = self._storage_index_elapsed
1014 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1015 if "total" in r.timings:
1016 r.timings["helper_total"] = r.timings["total"]
1017 r.timings["total"] = now - self._started
1018 self._upload_status.set_status("Done")
1019 self._upload_status.set_results(r)
1022 def get_upload_status(self):
1023 return self._upload_status
1025 class BaseUploadable:
1026 default_max_segment_size = 128*KiB # overridden by max_segment_size
1027 default_encoding_param_k = 3 # overridden by encoding_parameters
1028 default_encoding_param_happy = 7
1029 default_encoding_param_n = 10
1031 max_segment_size = None
1032 encoding_param_k = None
1033 encoding_param_happy = None
1034 encoding_param_n = None
1036 _all_encoding_parameters = None
1039 def set_upload_status(self, upload_status):
1040 self._status = IUploadStatus(upload_status)
1042 def set_default_encoding_parameters(self, default_params):
1043 assert isinstance(default_params, dict)
1044 for k,v in default_params.items():
1045 precondition(isinstance(k, str), k, v)
1046 precondition(isinstance(v, int), k, v)
1047 if "k" in default_params:
1048 self.default_encoding_param_k = default_params["k"]
1049 if "happy" in default_params:
1050 self.default_encoding_param_happy = default_params["happy"]
1051 if "n" in default_params:
1052 self.default_encoding_param_n = default_params["n"]
1053 if "max_segment_size" in default_params:
1054 self.default_max_segment_size = default_params["max_segment_size"]
1056 def get_all_encoding_parameters(self):
1057 if self._all_encoding_parameters:
1058 return defer.succeed(self._all_encoding_parameters)
1060 max_segsize = self.max_segment_size or self.default_max_segment_size
1061 k = self.encoding_param_k or self.default_encoding_param_k
1062 happy = self.encoding_param_happy or self.default_encoding_param_happy
1063 n = self.encoding_param_n or self.default_encoding_param_n
1066 def _got_size(file_size):
1067 # for small files, shrink the segment size to avoid wasting space
1068 segsize = min(max_segsize, file_size)
1069 # this must be a multiple of 'required_shares'==k
1070 segsize = mathutil.next_multiple(segsize, k)
1071 encoding_parameters = (k, happy, n, segsize)
1072 self._all_encoding_parameters = encoding_parameters
1073 return encoding_parameters
1074 d.addCallback(_got_size)
1077 class FileHandle(BaseUploadable):
1078 implements(IUploadable)
1080 def __init__(self, filehandle, convergence):
1082 Upload the data from the filehandle. If convergence is None then a
1083 random encryption key will be used, else the plaintext will be hashed,
1084 then the hash will be hashed together with the string in the
1085 "convergence" argument to form the encryption key.
1087 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1088 self._filehandle = filehandle
1090 self.convergence = convergence
1093 def _get_encryption_key_convergent(self):
1094 if self._key is not None:
1095 return defer.succeed(self._key)
1098 # that sets self._size as a side-effect
1099 d.addCallback(lambda size: self.get_all_encoding_parameters())
1101 k, happy, n, segsize = params
1102 f = self._filehandle
1103 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1108 data = f.read(BLOCKSIZE)
1111 enckey_hasher.update(data)
1112 # TODO: setting progress in a non-yielding loop is kind of
1113 # pointless, but I'm anticipating (perhaps prematurely) the
1114 # day when we use a slowjob or twisted's CooperatorService to
1115 # make this yield time to other jobs.
1116 bytes_read += len(data)
1118 self._status.set_progress(0, float(bytes_read)/self._size)
1120 self._key = enckey_hasher.digest()
1122 self._status.set_progress(0, 1.0)
1123 assert len(self._key) == 16
1128 def _get_encryption_key_random(self):
1129 if self._key is None:
1130 self._key = os.urandom(16)
1131 return defer.succeed(self._key)
1133 def get_encryption_key(self):
1134 if self.convergence is not None:
1135 return self._get_encryption_key_convergent()
1137 return self._get_encryption_key_random()
1140 if self._size is not None:
1141 return defer.succeed(self._size)
1142 self._filehandle.seek(0,2)
1143 size = self._filehandle.tell()
1145 self._filehandle.seek(0)
1146 return defer.succeed(size)
1148 def read(self, length):
1149 return defer.succeed([self._filehandle.read(length)])
1152 # the originator of the filehandle reserves the right to close it
1155 class FileName(FileHandle):
1156 def __init__(self, filename, convergence):
1158 Upload the data from the filename. If convergence is None then a
1159 random encryption key will be used, else the plaintext will be hashed,
1160 then the hash will be hashed together with the string in the
1161 "convergence" argument to form the encryption key.
1163 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1164 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1166 FileHandle.close(self)
1167 self._filehandle.close()
1169 class Data(FileHandle):
1170 def __init__(self, data, convergence):
1172 Upload the data from the data argument. If convergence is None then a
1173 random encryption key will be used, else the plaintext will be hashed,
1174 then the hash will be hashed together with the string in the
1175 "convergence" argument to form the encryption key.
1177 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1178 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1180 class Uploader(service.MultiService, log.PrefixingLogMixin):
1181 """I am a service that allows file uploading. I am a service-child of the
1184 implements(IUploader)
1186 URI_LIT_SIZE_THRESHOLD = 55
1187 MAX_UPLOAD_STATUSES = 10
1189 def __init__(self, helper_furl=None, stats_provider=None):
1190 self._helper_furl = helper_furl
1191 self.stats_provider = stats_provider
1193 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1194 self._all_upload_statuses = weakref.WeakKeyDictionary()
1195 self._recent_upload_statuses = []
1196 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1197 service.MultiService.__init__(self)
1199 def startService(self):
1200 service.MultiService.startService(self)
1201 if self._helper_furl:
1202 self.parent.tub.connectTo(self._helper_furl,
1205 def _got_helper(self, helper):
1206 self.log("got helper connection, getting versions")
1207 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1209 "application-version": "unknown: no get_version()",
1211 d = get_versioned_remote_reference(helper, default)
1212 d.addCallback(self._got_versioned_helper)
1214 def _got_versioned_helper(self, helper):
1215 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1216 if needed not in helper.version:
1217 raise InsufficientVersionError(needed, helper.version)
1218 self._helper = helper
1219 helper.notifyOnDisconnect(self._lost_helper)
1221 def _lost_helper(self):
1224 def get_helper_info(self):
1225 # return a tuple of (helper_furl_or_None, connected_bool)
1226 return (self._helper_furl, bool(self._helper))
1229 def upload(self, uploadable):
1231 Returns a Deferred that will fire with the UploadResults instance.
1236 uploadable = IUploadable(uploadable)
1237 d = uploadable.get_size()
1238 def _got_size(size):
1239 default_params = self.parent.get_encoding_parameters()
1240 precondition(isinstance(default_params, dict), default_params)
1241 precondition("max_segment_size" in default_params, default_params)
1242 uploadable.set_default_encoding_parameters(default_params)
1244 if self.stats_provider:
1245 self.stats_provider.count('uploader.files_uploaded', 1)
1246 self.stats_provider.count('uploader.bytes_uploaded', size)
1248 if size <= self.URI_LIT_SIZE_THRESHOLD:
1249 uploader = LiteralUploader(self.parent)
1250 return uploader.start(uploadable)
1252 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1253 d2 = defer.succeed(None)
1255 uploader = AssistedUploader(self._helper)
1256 d2.addCallback(lambda x: eu.get_storage_index())
1257 d2.addCallback(lambda si: uploader.start(eu, si))
1259 uploader = CHKUploader(self.parent)
1260 d2.addCallback(lambda x: uploader.start(eu))
1262 self._add_upload(uploader)
1263 def turn_verifycap_into_read_cap(uploadresults):
1264 # Generate the uri from the verifycap plus the key.
1265 d3 = uploadable.get_encryption_key()
1266 def put_readcap_into_results(key):
1267 v = uri.from_string(uploadresults.verifycapstr)
1268 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1269 uploadresults.uri = r.to_string()
1270 return uploadresults
1271 d3.addCallback(put_readcap_into_results)
1273 d2.addCallback(turn_verifycap_into_read_cap)
1275 d.addCallback(_got_size)
1282 def _add_upload(self, uploader):
1283 s = uploader.get_upload_status()
1284 self._all_uploads[uploader] = None
1285 self._all_upload_statuses[s] = None
1286 self._recent_upload_statuses.append(s)
1287 while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
1288 self._recent_upload_statuses.pop(0)
1290 def list_all_upload_statuses(self):
1291 for us in self._all_upload_statuses: