1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap import Referenceable, Copyable, RemoteCopy
7 from foolscap import eventual
9 from allmydata.util.hashutil import file_renewal_secret_hash, \
10 file_cancel_secret_hash, bucket_renewal_secret_hash, \
11 bucket_cancel_secret_hash, plaintext_hasher, \
12 storage_index_hash, plaintext_segment_hasher, convergence_hasher
13 from allmydata import hashtree, uri
14 from allmydata.storage.server import si_b2a
15 from allmydata.immutable import encode
16 from allmydata.util import base32, dictutil, idlib, log, mathutil
17 from allmydata.util.assertutil import precondition
18 from allmydata.util.rrefutil import get_versioned_remote_reference
19 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
20 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
21 NotEnoughSharesError, InsufficientVersionError
22 from allmydata.immutable import layout
23 from pycryptopp.cipher.aes import AES
25 from cStringIO import StringIO
34 class HaveAllPeersError(Exception):
35 # we use this to jump out of the loop
38 # this wants to live in storage, not here
39 class TooFullError(Exception):
42 class UploadResults(Copyable, RemoteCopy):
43 implements(IUploadResults)
44 # note: don't change this string, it needs to match the value used on the
45 # helper, and it does *not* need to match the fully-qualified
46 # package/module/class name
47 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
50 # also, think twice about changing the shape of any existing attribute,
51 # because instances of this class are sent from the helper to its client,
52 # so changing this may break compatibility. Consider adding new fields
53 # instead of modifying existing ones.
56 self.timings = {} # dict of name to number of seconds
57 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
58 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
60 self.ciphertext_fetched = None # how much the helper fetched
62 self.preexisting_shares = None # count of shares already present
63 self.pushed_shares = None # count of shares we pushed
66 # our current uri_extension is 846 bytes for small files, a few bytes
67 # more for larger ones (since the filesize is encoded in decimal in a
68 # few places). Ask for a little bit more just in case we need it. If
69 # the extension changes size, we can change EXTENSION_SIZE to
70 # allocate a more accurate amount of space.
72 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
76 def __init__(self, peerid, storage_server,
77 sharesize, blocksize, num_segments, num_share_hashes,
79 bucket_renewal_secret, bucket_cancel_secret):
80 precondition(isinstance(peerid, str), peerid)
81 precondition(len(peerid) == 20, peerid)
83 self._storageserver = storage_server # to an RIStorageServer
84 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
85 self.sharesize = sharesize
87 wbp = layout.make_write_bucket_proxy(None, sharesize,
88 blocksize, num_segments,
90 EXTENSION_SIZE, peerid)
91 self.wbp_class = wbp.__class__ # to create more of them
92 self.allocated_size = wbp.get_allocated_size()
93 self.blocksize = blocksize
94 self.num_segments = num_segments
95 self.num_share_hashes = num_share_hashes
96 self.storage_index = storage_index
98 self.renew_secret = bucket_renewal_secret
99 self.cancel_secret = bucket_cancel_secret
102 return ("<PeerTracker for peer %s and SI %s>"
103 % (idlib.shortnodeid_b2a(self.peerid),
104 si_b2a(self.storage_index)[:5]))
106 def query(self, sharenums):
107 d = self._storageserver.callRemote("allocate_buckets",
113 canary=Referenceable())
114 d.addCallback(self._got_reply)
117 def _got_reply(self, (alreadygot, buckets)):
118 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
120 for sharenum, rref in buckets.iteritems():
121 bp = self.wbp_class(rref, self.sharesize,
124 self.num_share_hashes,
128 self.buckets.update(b)
129 return (alreadygot, set(b.keys()))
131 class Tahoe2PeerSelector:
133 def __init__(self, upload_id, logparent=None, upload_status=None):
134 self.upload_id = upload_id
135 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
137 self.num_peers_contacted = 0
138 self.last_failure_msg = None
139 self._status = IUploadStatus(upload_status)
140 self._log_parent = log.msg("%s starting" % self, parent=logparent)
143 return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
145 def get_shareholders(self, client,
146 storage_index, share_size, block_size,
147 num_segments, total_shares, shares_of_happiness):
149 @return: (used_peers, already_peers), where used_peers is a set of
150 PeerTracker instances that have agreed to hold some shares
151 for us (the shnum is stashed inside the PeerTracker),
152 and already_peers is a dict mapping shnum to a peer
153 which claims to already have the share.
157 self._status.set_status("Contacting Peers..")
159 self.total_shares = total_shares
160 self.shares_of_happiness = shares_of_happiness
162 self.homeless_shares = range(total_shares)
163 # self.uncontacted_peers = list() # peers we haven't asked yet
164 self.contacted_peers = [] # peers worth asking again
165 self.contacted_peers2 = [] # peers that we have asked again
166 self._started_second_pass = False
167 self.use_peers = set() # PeerTrackers that have shares assigned to them
168 self.preexisting_shares = {} # sharenum -> peerid holding the share
170 peers = client.get_permuted_peers("storage", storage_index)
172 raise NotEnoughSharesError("client gave us zero peers")
174 # this needed_hashes computation should mirror
175 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
176 # (instead of a HashTree) because we don't require actual hashing
177 # just to count the levels.
178 ht = hashtree.IncompleteHashTree(total_shares)
179 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
181 # figure out how much space to ask for
182 wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
183 num_share_hashes, EXTENSION_SIZE,
185 allocated_size = wbp.get_allocated_size()
187 # filter the list of peers according to which ones can accomodate
188 # this request. This excludes older peers (which used a 4-byte size
189 # field) from getting large shares (for files larger than about
190 # 12GiB). See #439 for details.
191 def _get_maxsize(peer):
192 (peerid, conn) = peer
193 v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
194 return v1["maximum-immutable-share-size"]
195 peers = [peer for peer in peers
196 if _get_maxsize(peer) >= allocated_size]
198 raise NotEnoughSharesError("no peers could accept an allocated_size of %d" % allocated_size)
200 # decide upon the renewal/cancel secrets, to include them in the
201 # allocat_buckets query.
202 client_renewal_secret = client.get_renewal_secret()
203 client_cancel_secret = client.get_cancel_secret()
205 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
207 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
210 trackers = [ PeerTracker(peerid, conn,
211 share_size, block_size,
212 num_segments, num_share_hashes,
214 bucket_renewal_secret_hash(file_renewal_secret,
216 bucket_cancel_secret_hash(file_cancel_secret,
219 for (peerid, conn) in peers ]
220 self.uncontacted_peers = trackers
222 d = defer.maybeDeferred(self._loop)
226 if not self.homeless_shares:
228 msg = ("placed all %d shares, "
229 "sent %d queries to %d peers, "
230 "%d queries placed some shares, %d placed none, "
233 self.query_count, self.num_peers_contacted,
234 self.good_query_count, self.bad_query_count,
236 log.msg("peer selection successful for %s: %s" % (self, msg),
237 parent=self._log_parent)
238 return (self.use_peers, self.preexisting_shares)
240 if self.uncontacted_peers:
241 peer = self.uncontacted_peers.pop(0)
242 # TODO: don't pre-convert all peerids to PeerTrackers
243 assert isinstance(peer, PeerTracker)
245 shares_to_ask = set([self.homeless_shares.pop(0)])
246 self.query_count += 1
247 self.num_peers_contacted += 1
249 self._status.set_status("Contacting Peers [%s] (first query),"
251 % (idlib.shortnodeid_b2a(peer.peerid),
252 len(self.homeless_shares)))
253 d = peer.query(shares_to_ask)
254 d.addBoth(self._got_response, peer, shares_to_ask,
255 self.contacted_peers)
257 elif self.contacted_peers:
258 # ask a peer that we've already asked.
259 if not self._started_second_pass:
260 log.msg("starting second pass", parent=self._log_parent,
262 self._started_second_pass = True
263 num_shares = mathutil.div_ceil(len(self.homeless_shares),
264 len(self.contacted_peers))
265 peer = self.contacted_peers.pop(0)
266 shares_to_ask = set(self.homeless_shares[:num_shares])
267 self.homeless_shares[:num_shares] = []
268 self.query_count += 1
270 self._status.set_status("Contacting Peers [%s] (second query),"
272 % (idlib.shortnodeid_b2a(peer.peerid),
273 len(self.homeless_shares)))
274 d = peer.query(shares_to_ask)
275 d.addBoth(self._got_response, peer, shares_to_ask,
276 self.contacted_peers2)
278 elif self.contacted_peers2:
279 # we've finished the second-or-later pass. Move all the remaining
280 # peers back into self.contacted_peers for the next pass.
281 self.contacted_peers.extend(self.contacted_peers2)
282 self.contacted_peers[:] = []
285 # no more peers. If we haven't placed enough shares, we fail.
286 placed_shares = self.total_shares - len(self.homeless_shares)
287 if placed_shares < self.shares_of_happiness:
288 msg = ("placed %d shares out of %d total (%d homeless), "
289 "sent %d queries to %d peers, "
290 "%d queries placed some shares, %d placed none, "
292 (self.total_shares - len(self.homeless_shares),
293 self.total_shares, len(self.homeless_shares),
294 self.query_count, self.num_peers_contacted,
295 self.good_query_count, self.bad_query_count,
297 msg = "peer selection failed for %s: %s" % (self, msg)
298 if self.last_failure_msg:
299 msg += " (%s)" % (self.last_failure_msg,)
300 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
301 raise NotEnoughSharesError(msg)
303 # we placed enough to be happy, so we're done
305 self._status.set_status("Placed all shares")
306 return self.use_peers
308 def _got_response(self, res, peer, shares_to_ask, put_peer_here):
309 if isinstance(res, failure.Failure):
310 # This is unusual, and probably indicates a bug or a network
312 log.msg("%s got error during peer selection: %s" % (peer, res),
313 level=log.UNUSUAL, parent=self._log_parent)
314 self.error_count += 1
315 self.homeless_shares = list(shares_to_ask) + self.homeless_shares
316 if (self.uncontacted_peers
317 or self.contacted_peers
318 or self.contacted_peers2):
319 # there is still hope, so just loop
322 # No more peers, so this upload might fail (it depends upon
323 # whether we've hit shares_of_happiness or not). Log the last
324 # failure we got: if a coding error causes all peers to fail
325 # in the same way, this allows the common failure to be seen
326 # by the uploader and should help with debugging
327 msg = ("last failure (from %s) was: %s" % (peer, res))
328 self.last_failure_msg = msg
330 (alreadygot, allocated) = res
331 log.msg("response from peer %s: alreadygot=%s, allocated=%s"
332 % (idlib.shortnodeid_b2a(peer.peerid),
333 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
334 level=log.NOISY, parent=self._log_parent)
337 self.preexisting_shares[s] = peer.peerid
338 if s in self.homeless_shares:
339 self.homeless_shares.remove(s)
342 # the PeerTracker will remember which shares were allocated on
343 # that peer. We just have to remember to use them.
345 self.use_peers.add(peer)
348 not_yet_present = set(shares_to_ask) - set(alreadygot)
349 still_homeless = not_yet_present - set(allocated)
352 # they accepted or already had at least one share, so
353 # progress has been made
354 self.good_query_count += 1
356 self.bad_query_count += 1
359 # In networks with lots of space, this is very unusual and
360 # probably indicates an error. In networks with peers that
361 # are full, it is merely unusual. In networks that are very
362 # full, it is common, and many uploads will fail. In most
363 # cases, this is obviously not fatal, and we'll just use some
366 # some shares are still homeless, keep trying to find them a
367 # home. The ones that were rejected get first priority.
368 self.homeless_shares = (list(still_homeless)
369 + self.homeless_shares)
370 # Since they were unable to accept all of our requests, so it
371 # is safe to assume that asking them again won't help.
373 # if they *were* able to accept everything, they might be
374 # willing to accept even more.
375 put_peer_here.append(peer)
381 class EncryptAnUploadable:
382 """This is a wrapper that takes an IUploadable and provides
383 IEncryptedUploadable."""
384 implements(IEncryptedUploadable)
387 def __init__(self, original, log_parent=None):
388 self.original = IUploadable(original)
389 self._log_number = log_parent
390 self._encryptor = None
391 self._plaintext_hasher = plaintext_hasher()
392 self._plaintext_segment_hasher = None
393 self._plaintext_segment_hashes = []
394 self._encoding_parameters = None
395 self._file_size = None
396 self._ciphertext_bytes_read = 0
399 def set_upload_status(self, upload_status):
400 self._status = IUploadStatus(upload_status)
401 self.original.set_upload_status(upload_status)
403 def log(self, *args, **kwargs):
404 if "facility" not in kwargs:
405 kwargs["facility"] = "upload.encryption"
406 if "parent" not in kwargs:
407 kwargs["parent"] = self._log_number
408 return log.msg(*args, **kwargs)
411 if self._file_size is not None:
412 return defer.succeed(self._file_size)
413 d = self.original.get_size()
415 self._file_size = size
417 self._status.set_size(size)
419 d.addCallback(_got_size)
422 def get_all_encoding_parameters(self):
423 if self._encoding_parameters is not None:
424 return defer.succeed(self._encoding_parameters)
425 d = self.original.get_all_encoding_parameters()
426 def _got(encoding_parameters):
427 (k, happy, n, segsize) = encoding_parameters
428 self._segment_size = segsize # used by segment hashers
429 self._encoding_parameters = encoding_parameters
430 self.log("my encoding parameters: %s" % (encoding_parameters,),
432 return encoding_parameters
436 def _get_encryptor(self):
438 return defer.succeed(self._encryptor)
440 d = self.original.get_encryption_key()
445 storage_index = storage_index_hash(key)
446 assert isinstance(storage_index, str)
447 # There's no point to having the SI be longer than the key, so we
448 # specify that it is truncated to the same 128 bits as the AES key.
449 assert len(storage_index) == 16 # SHA-256 truncated to 128b
450 self._storage_index = storage_index
452 self._status.set_storage_index(storage_index)
457 def get_storage_index(self):
458 d = self._get_encryptor()
459 d.addCallback(lambda res: self._storage_index)
462 def _get_segment_hasher(self):
463 p = self._plaintext_segment_hasher
465 left = self._segment_size - self._plaintext_segment_hashed_bytes
467 p = plaintext_segment_hasher()
468 self._plaintext_segment_hasher = p
469 self._plaintext_segment_hashed_bytes = 0
470 return p, self._segment_size
472 def _update_segment_hash(self, chunk):
474 while offset < len(chunk):
475 p, segment_left = self._get_segment_hasher()
476 chunk_left = len(chunk) - offset
477 this_segment = min(chunk_left, segment_left)
478 p.update(chunk[offset:offset+this_segment])
479 self._plaintext_segment_hashed_bytes += this_segment
481 if self._plaintext_segment_hashed_bytes == self._segment_size:
482 # we've filled this segment
483 self._plaintext_segment_hashes.append(p.digest())
484 self._plaintext_segment_hasher = None
485 self.log("closed hash [%d]: %dB" %
486 (len(self._plaintext_segment_hashes)-1,
487 self._plaintext_segment_hashed_bytes),
489 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
490 segnum=len(self._plaintext_segment_hashes)-1,
491 hash=base32.b2a(p.digest()),
494 offset += this_segment
497 def read_encrypted(self, length, hash_only):
498 # make sure our parameters have been set up first
499 d = self.get_all_encoding_parameters()
501 d.addCallback(lambda ignored: self.get_size())
502 d.addCallback(lambda ignored: self._get_encryptor())
503 # then fetch and encrypt the plaintext. The unusual structure here
504 # (passing a Deferred *into* a function) is needed to avoid
505 # overflowing the stack: Deferreds don't optimize out tail recursion.
506 # We also pass in a list, to which _read_encrypted will append
509 d2 = defer.Deferred()
510 d.addCallback(lambda ignored:
511 self._read_encrypted(length, ciphertext, hash_only, d2))
512 d.addCallback(lambda ignored: d2)
515 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
517 fire_when_done.callback(ciphertext)
519 # tolerate large length= values without consuming a lot of RAM by
520 # reading just a chunk (say 50kB) at a time. This only really matters
521 # when hash_only==True (i.e. resuming an interrupted upload), since
522 # that's the case where we will be skipping over a lot of data.
523 size = min(remaining, self.CHUNKSIZE)
524 remaining = remaining - size
525 # read a chunk of plaintext..
526 d = defer.maybeDeferred(self.original.read, size)
527 # N.B.: if read() is synchronous, then since everything else is
528 # actually synchronous too, we'd blow the stack unless we stall for a
529 # tick. Once you accept a Deferred from IUploadable.read(), you must
530 # be prepared to have it fire immediately too.
531 d.addCallback(eventual.fireEventually)
532 def _good(plaintext):
534 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
535 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
536 ciphertext.extend(ct)
537 self._read_encrypted(remaining, ciphertext, hash_only,
540 fire_when_done.errback(why)
545 def _hash_and_encrypt_plaintext(self, data, hash_only):
546 assert isinstance(data, (tuple, list)), type(data)
549 # we use data.pop(0) instead of 'for chunk in data' to save
550 # memory: each chunk is destroyed as soon as we're done with it.
554 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
556 bytes_processed += len(chunk)
557 self._plaintext_hasher.update(chunk)
558 self._update_segment_hash(chunk)
559 # TODO: we have to encrypt the data (even if hash_only==True)
560 # because pycryptopp's AES-CTR implementation doesn't offer a
561 # way to change the counter value. Once pycryptopp acquires
562 # this ability, change this to simply update the counter
563 # before each call to (hash_only==False) _encryptor.process()
564 ciphertext = self._encryptor.process(chunk)
566 self.log(" skipping encryption", level=log.NOISY)
568 cryptdata.append(ciphertext)
571 self._ciphertext_bytes_read += bytes_processed
573 progress = float(self._ciphertext_bytes_read) / self._file_size
574 self._status.set_progress(1, progress)
578 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
579 if len(self._plaintext_segment_hashes) < num_segments:
580 # close out the last one
581 assert len(self._plaintext_segment_hashes) == num_segments-1
582 p, segment_left = self._get_segment_hasher()
583 self._plaintext_segment_hashes.append(p.digest())
584 del self._plaintext_segment_hasher
585 self.log("closing plaintext leaf hasher, hashed %d bytes" %
586 self._plaintext_segment_hashed_bytes,
588 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
589 segnum=len(self._plaintext_segment_hashes)-1,
590 hash=base32.b2a(p.digest()),
592 assert len(self._plaintext_segment_hashes) == num_segments
593 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
595 def get_plaintext_hash(self):
596 h = self._plaintext_hasher.digest()
597 return defer.succeed(h)
600 return self.original.close()
603 implements(IUploadStatus)
604 statusid_counter = itertools.count(0)
607 self.storage_index = None
610 self.status = "Not started"
611 self.progress = [0.0, 0.0, 0.0]
614 self.counter = self.statusid_counter.next()
615 self.started = time.time()
617 def get_started(self):
619 def get_storage_index(self):
620 return self.storage_index
623 def using_helper(self):
625 def get_status(self):
627 def get_progress(self):
628 return tuple(self.progress)
629 def get_active(self):
631 def get_results(self):
633 def get_counter(self):
636 def set_storage_index(self, si):
637 self.storage_index = si
638 def set_size(self, size):
640 def set_helper(self, helper):
642 def set_status(self, status):
644 def set_progress(self, which, value):
645 # [0]: chk, [1]: ciphertext, [2]: encode+push
646 self.progress[which] = value
647 def set_active(self, value):
649 def set_results(self, value):
653 peer_selector_class = Tahoe2PeerSelector
655 def __init__(self, client):
656 self._client = client
657 self._log_number = self._client.log("CHKUploader starting")
659 self._results = UploadResults()
660 self._storage_index = None
661 self._upload_status = UploadStatus()
662 self._upload_status.set_helper(False)
663 self._upload_status.set_active(True)
664 self._upload_status.set_results(self._results)
666 # locate_all_shareholders() will create the following attribute:
667 # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
669 def log(self, *args, **kwargs):
670 if "parent" not in kwargs:
671 kwargs["parent"] = self._log_number
672 if "facility" not in kwargs:
673 kwargs["facility"] = "tahoe.upload"
674 return self._client.log(*args, **kwargs)
676 def start(self, encrypted_uploadable):
677 """Start uploading the file.
679 Returns a Deferred that will fire with the UploadResults instance.
682 self._started = time.time()
683 eu = IEncryptedUploadable(encrypted_uploadable)
684 self.log("starting upload of %s" % eu)
686 eu.set_upload_status(self._upload_status)
687 d = self.start_encrypted(eu)
688 def _done(uploadresults):
689 self._upload_status.set_active(False)
695 """Call this if the upload must be abandoned before it completes.
696 This will tell the shareholders to delete their partial shares. I
697 return a Deferred that fires when these messages have been acked."""
698 if not self._encoder:
699 # how did you call abort() before calling start() ?
700 return defer.succeed(None)
701 return self._encoder.abort()
703 def start_encrypted(self, encrypted):
704 """ Returns a Deferred that will fire with the UploadResults instance. """
705 eu = IEncryptedUploadable(encrypted)
707 started = time.time()
708 self._encoder = e = encode.Encoder(self._log_number,
710 d = e.set_encrypted_uploadable(eu)
711 d.addCallback(self.locate_all_shareholders, started)
712 d.addCallback(self.set_shareholders, e)
713 d.addCallback(lambda res: e.start())
714 d.addCallback(self._encrypted_done)
717 def locate_all_shareholders(self, encoder, started):
718 peer_selection_started = now = time.time()
719 self._storage_index_elapsed = now - started
720 storage_index = encoder.get_param("storage_index")
721 self._storage_index = storage_index
722 upload_id = si_b2a(storage_index)[:5]
723 self.log("using storage index %s" % upload_id)
724 peer_selector = self.peer_selector_class(upload_id, self._log_number,
727 share_size = encoder.get_param("share_size")
728 block_size = encoder.get_param("block_size")
729 num_segments = encoder.get_param("num_segments")
730 k,desired,n = encoder.get_param("share_counts")
732 self._peer_selection_started = time.time()
733 d = peer_selector.get_shareholders(self._client, storage_index,
734 share_size, block_size,
735 num_segments, n, desired)
737 self._peer_selection_elapsed = time.time() - peer_selection_started
742 def set_shareholders(self, (used_peers, already_peers), encoder):
744 @param used_peers: a sequence of PeerTracker objects
745 @paran already_peers: a dict mapping sharenum to a peerid that
746 claims to already have this share
748 self.log("_send_shares, used_peers is %s" % (used_peers,))
749 # record already-present shares in self._results
750 self._results.preexisting_shares = len(already_peers)
752 self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
753 for peer in used_peers:
754 assert isinstance(peer, PeerTracker)
756 for peer in used_peers:
757 buckets.update(peer.buckets)
758 for shnum in peer.buckets:
759 self._peer_trackers[shnum] = peer
760 assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
761 encoder.set_shareholders(buckets)
763 def _encrypted_done(self, verifycap):
764 """ Returns a Deferred that will fire with the UploadResults instance. """
766 for shnum in self._encoder.get_shares_placed():
767 peer_tracker = self._peer_trackers[shnum]
768 peerid = peer_tracker.peerid
769 peerid_s = idlib.shortnodeid_b2a(peerid)
770 r.sharemap.add(shnum, peerid)
771 r.servermap.add(peerid, shnum)
772 r.pushed_shares = len(self._encoder.get_shares_placed())
774 r.file_size = self._encoder.file_size
775 r.timings["total"] = now - self._started
776 r.timings["storage_index"] = self._storage_index_elapsed
777 r.timings["peer_selection"] = self._peer_selection_elapsed
778 r.timings.update(self._encoder.get_times())
779 r.uri_extension_data = self._encoder.get_uri_extension_data()
780 r.verifycapstr = verifycap.to_string()
783 def get_upload_status(self):
784 return self._upload_status
786 def read_this_many_bytes(uploadable, size, prepend_data=[]):
788 return defer.succeed([])
789 d = uploadable.read(size)
791 assert isinstance(data, list)
792 bytes = sum([len(piece) for piece in data])
795 remaining = size - bytes
797 return read_this_many_bytes(uploadable, remaining,
799 return prepend_data + data
803 class LiteralUploader:
805 def __init__(self, client):
806 self._client = client
807 self._results = UploadResults()
808 self._status = s = UploadStatus()
809 s.set_storage_index(None)
811 s.set_progress(0, 1.0)
813 s.set_results(self._results)
815 def start(self, uploadable):
816 uploadable = IUploadable(uploadable)
817 d = uploadable.get_size()
820 self._status.set_size(size)
821 self._results.file_size = size
822 return read_this_many_bytes(uploadable, size)
823 d.addCallback(_got_size)
824 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
825 d.addCallback(lambda u: u.to_string())
826 d.addCallback(self._build_results)
829 def _build_results(self, uri):
830 self._results.uri = uri
831 self._status.set_status("Done")
832 self._status.set_progress(1, 1.0)
833 self._status.set_progress(2, 1.0)
839 def get_upload_status(self):
842 class RemoteEncryptedUploadable(Referenceable):
843 implements(RIEncryptedUploadable)
845 def __init__(self, encrypted_uploadable, upload_status):
846 self._eu = IEncryptedUploadable(encrypted_uploadable)
849 self._status = IUploadStatus(upload_status)
850 # we are responsible for updating the status string while we run, and
851 # for setting the ciphertext-fetch progress.
855 if self._size is not None:
856 return defer.succeed(self._size)
857 d = self._eu.get_size()
861 d.addCallback(_got_size)
864 def remote_get_size(self):
865 return self.get_size()
866 def remote_get_all_encoding_parameters(self):
867 return self._eu.get_all_encoding_parameters()
869 def _read_encrypted(self, length, hash_only):
870 d = self._eu.read_encrypted(length, hash_only)
873 self._offset += length
875 size = sum([len(data) for data in strings])
881 def remote_read_encrypted(self, offset, length):
882 # we don't support seek backwards, but we allow skipping forwards
883 precondition(offset >= 0, offset)
884 precondition(length >= 0, length)
885 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
887 precondition(offset >= self._offset, offset, self._offset)
888 if offset > self._offset:
889 # read the data from disk anyways, to build up the hash tree
890 skip = offset - self._offset
891 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
892 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
893 d = self._read_encrypted(skip, hash_only=True)
895 d = defer.succeed(None)
897 def _at_correct_offset(res):
898 assert offset == self._offset, "%d != %d" % (offset, self._offset)
899 return self._read_encrypted(length, hash_only=False)
900 d.addCallback(_at_correct_offset)
903 size = sum([len(data) for data in strings])
904 self._bytes_sent += size
909 def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
910 log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
911 (first, last-1, num_segments),
913 d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
916 def remote_get_plaintext_hash(self):
917 return self._eu.get_plaintext_hash()
918 def remote_close(self):
919 return self._eu.close()
922 class AssistedUploader:
924 def __init__(self, helper):
925 self._helper = helper
926 self._log_number = log.msg("AssistedUploader starting")
927 self._storage_index = None
928 self._upload_status = s = UploadStatus()
932 def log(self, *args, **kwargs):
933 if "parent" not in kwargs:
934 kwargs["parent"] = self._log_number
935 return log.msg(*args, **kwargs)
937 def start(self, encrypted_uploadable, storage_index):
938 """Start uploading the file.
940 Returns a Deferred that will fire with the UploadResults instance.
942 precondition(isinstance(storage_index, str), storage_index)
943 self._started = time.time()
944 eu = IEncryptedUploadable(encrypted_uploadable)
945 eu.set_upload_status(self._upload_status)
946 self._encuploadable = eu
947 self._storage_index = storage_index
949 d.addCallback(self._got_size)
950 d.addCallback(lambda res: eu.get_all_encoding_parameters())
951 d.addCallback(self._got_all_encoding_parameters)
952 d.addCallback(self._contact_helper)
953 d.addCallback(self._build_verifycap)
955 self._upload_status.set_active(False)
960 def _got_size(self, size):
962 self._upload_status.set_size(size)
964 def _got_all_encoding_parameters(self, params):
965 k, happy, n, segment_size = params
966 # stash these for URI generation later
967 self._needed_shares = k
968 self._total_shares = n
969 self._segment_size = segment_size
971 def _contact_helper(self, res):
972 now = self._time_contacting_helper_start = time.time()
973 self._storage_index_elapsed = now - self._started
974 self.log(format="contacting helper for SI %(si)s..",
975 si=si_b2a(self._storage_index))
976 self._upload_status.set_status("Contacting Helper")
977 d = self._helper.callRemote("upload_chk", self._storage_index)
978 d.addCallback(self._contacted_helper)
981 def _contacted_helper(self, (upload_results, upload_helper)):
983 elapsed = now - self._time_contacting_helper_start
984 self._elapsed_time_contacting_helper = elapsed
986 self.log("helper says we need to upload")
987 self._upload_status.set_status("Uploading Ciphertext")
988 # we need to upload the file
989 reu = RemoteEncryptedUploadable(self._encuploadable,
991 # let it pre-compute the size for progress purposes
993 d.addCallback(lambda ignored:
994 upload_helper.callRemote("upload", reu))
995 # this Deferred will fire with the upload results
997 self.log("helper says file is already uploaded")
998 self._upload_status.set_progress(1, 1.0)
999 self._upload_status.set_results(upload_results)
1000 return upload_results
1002 def _convert_old_upload_results(self, upload_results):
1003 # pre-1.3.0 helpers return upload results which contain a mapping
1004 # from shnum to a single human-readable string, containing things
1005 # like "Found on [x],[y],[z]" (for healthy files that were already in
1006 # the grid), "Found on [x]" (for files that needed upload but which
1007 # discovered pre-existing shares), and "Placed on [x]" (for newly
1008 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1009 # set of binary serverid strings.
1011 # the old results are too hard to deal with (they don't even contain
1012 # as much information as the new results, since the nodeids are
1013 # abbreviated), so if we detect old results, just clobber them.
1015 sharemap = upload_results.sharemap
1016 if str in [type(v) for v in sharemap.values()]:
1017 upload_results.sharemap = None
1019 def _build_verifycap(self, upload_results):
1020 self.log("upload finished, building readcap")
1021 self._convert_old_upload_results(upload_results)
1022 self._upload_status.set_status("Building Readcap")
1024 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1025 assert r.uri_extension_data["total_shares"] == self._total_shares
1026 assert r.uri_extension_data["segment_size"] == self._segment_size
1027 assert r.uri_extension_data["size"] == self._size
1028 r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1029 uri_extension_hash=r.uri_extension_hash,
1030 needed_shares=self._needed_shares,
1031 total_shares=self._total_shares, size=self._size
1034 r.file_size = self._size
1035 r.timings["storage_index"] = self._storage_index_elapsed
1036 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1037 if "total" in r.timings:
1038 r.timings["helper_total"] = r.timings["total"]
1039 r.timings["total"] = now - self._started
1040 self._upload_status.set_status("Done")
1041 self._upload_status.set_results(r)
1044 def get_upload_status(self):
1045 return self._upload_status
1047 class BaseUploadable:
1048 default_max_segment_size = 128*KiB # overridden by max_segment_size
1049 default_encoding_param_k = 3 # overridden by encoding_parameters
1050 default_encoding_param_happy = 7
1051 default_encoding_param_n = 10
1053 max_segment_size = None
1054 encoding_param_k = None
1055 encoding_param_happy = None
1056 encoding_param_n = None
1058 _all_encoding_parameters = None
1061 def set_upload_status(self, upload_status):
1062 self._status = IUploadStatus(upload_status)
1064 def set_default_encoding_parameters(self, default_params):
1065 assert isinstance(default_params, dict)
1066 for k,v in default_params.items():
1067 precondition(isinstance(k, str), k, v)
1068 precondition(isinstance(v, int), k, v)
1069 if "k" in default_params:
1070 self.default_encoding_param_k = default_params["k"]
1071 if "happy" in default_params:
1072 self.default_encoding_param_happy = default_params["happy"]
1073 if "n" in default_params:
1074 self.default_encoding_param_n = default_params["n"]
1075 if "max_segment_size" in default_params:
1076 self.default_max_segment_size = default_params["max_segment_size"]
1078 def get_all_encoding_parameters(self):
1079 if self._all_encoding_parameters:
1080 return defer.succeed(self._all_encoding_parameters)
1082 max_segsize = self.max_segment_size or self.default_max_segment_size
1083 k = self.encoding_param_k or self.default_encoding_param_k
1084 happy = self.encoding_param_happy or self.default_encoding_param_happy
1085 n = self.encoding_param_n or self.default_encoding_param_n
1088 def _got_size(file_size):
1089 # for small files, shrink the segment size to avoid wasting space
1090 segsize = min(max_segsize, file_size)
1091 # this must be a multiple of 'required_shares'==k
1092 segsize = mathutil.next_multiple(segsize, k)
1093 encoding_parameters = (k, happy, n, segsize)
1094 self._all_encoding_parameters = encoding_parameters
1095 return encoding_parameters
1096 d.addCallback(_got_size)
1099 class FileHandle(BaseUploadable):
1100 implements(IUploadable)
1102 def __init__(self, filehandle, convergence):
1104 Upload the data from the filehandle. If convergence is None then a
1105 random encryption key will be used, else the plaintext will be hashed,
1106 then the hash will be hashed together with the string in the
1107 "convergence" argument to form the encryption key.
1109 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1110 self._filehandle = filehandle
1112 self.convergence = convergence
1115 def _get_encryption_key_convergent(self):
1116 if self._key is not None:
1117 return defer.succeed(self._key)
1120 # that sets self._size as a side-effect
1121 d.addCallback(lambda size: self.get_all_encoding_parameters())
1123 k, happy, n, segsize = params
1124 f = self._filehandle
1125 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1130 data = f.read(BLOCKSIZE)
1133 enckey_hasher.update(data)
1134 # TODO: setting progress in a non-yielding loop is kind of
1135 # pointless, but I'm anticipating (perhaps prematurely) the
1136 # day when we use a slowjob or twisted's CooperatorService to
1137 # make this yield time to other jobs.
1138 bytes_read += len(data)
1140 self._status.set_progress(0, float(bytes_read)/self._size)
1142 self._key = enckey_hasher.digest()
1144 self._status.set_progress(0, 1.0)
1145 assert len(self._key) == 16
1150 def _get_encryption_key_random(self):
1151 if self._key is None:
1152 self._key = os.urandom(16)
1153 return defer.succeed(self._key)
1155 def get_encryption_key(self):
1156 if self.convergence is not None:
1157 return self._get_encryption_key_convergent()
1159 return self._get_encryption_key_random()
1162 if self._size is not None:
1163 return defer.succeed(self._size)
1164 self._filehandle.seek(0,2)
1165 size = self._filehandle.tell()
1167 self._filehandle.seek(0)
1168 return defer.succeed(size)
1170 def read(self, length):
1171 return defer.succeed([self._filehandle.read(length)])
1174 # the originator of the filehandle reserves the right to close it
1177 class FileName(FileHandle):
1178 def __init__(self, filename, convergence):
1180 Upload the data from the filename. If convergence is None then a
1181 random encryption key will be used, else the plaintext will be hashed,
1182 then the hash will be hashed together with the string in the
1183 "convergence" argument to form the encryption key.
1185 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1186 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1188 FileHandle.close(self)
1189 self._filehandle.close()
1191 class Data(FileHandle):
1192 def __init__(self, data, convergence):
1194 Upload the data from the data argument. If convergence is None then a
1195 random encryption key will be used, else the plaintext will be hashed,
1196 then the hash will be hashed together with the string in the
1197 "convergence" argument to form the encryption key.
1199 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1200 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1202 class Uploader(service.MultiService, log.PrefixingLogMixin):
1203 """I am a service that allows file uploading. I am a service-child of the
1206 implements(IUploader)
1208 URI_LIT_SIZE_THRESHOLD = 55
1210 def __init__(self, helper_furl=None, stats_provider=None):
1211 self._helper_furl = helper_furl
1212 self.stats_provider = stats_provider
1214 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1215 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1216 service.MultiService.__init__(self)
1218 def startService(self):
1219 service.MultiService.startService(self)
1220 if self._helper_furl:
1221 self.parent.tub.connectTo(self._helper_furl,
1224 def _got_helper(self, helper):
1225 self.log("got helper connection, getting versions")
1226 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1228 "application-version": "unknown: no get_version()",
1230 d = get_versioned_remote_reference(helper, default)
1231 d.addCallback(self._got_versioned_helper)
1233 def _got_versioned_helper(self, helper):
1234 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1235 if needed not in helper.version:
1236 raise InsufficientVersionError(needed, helper.version)
1237 self._helper = helper
1238 helper.notifyOnDisconnect(self._lost_helper)
1240 def _lost_helper(self):
1243 def get_helper_info(self):
1244 # return a tuple of (helper_furl_or_None, connected_bool)
1245 return (self._helper_furl, bool(self._helper))
1248 def upload(self, uploadable, history=None):
1250 Returns a Deferred that will fire with the UploadResults instance.
1255 uploadable = IUploadable(uploadable)
1256 d = uploadable.get_size()
1257 def _got_size(size):
1258 default_params = self.parent.get_encoding_parameters()
1259 precondition(isinstance(default_params, dict), default_params)
1260 precondition("max_segment_size" in default_params, default_params)
1261 uploadable.set_default_encoding_parameters(default_params)
1263 if self.stats_provider:
1264 self.stats_provider.count('uploader.files_uploaded', 1)
1265 self.stats_provider.count('uploader.bytes_uploaded', size)
1267 if size <= self.URI_LIT_SIZE_THRESHOLD:
1268 uploader = LiteralUploader(self.parent)
1269 return uploader.start(uploadable)
1271 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1272 d2 = defer.succeed(None)
1274 uploader = AssistedUploader(self._helper)
1275 d2.addCallback(lambda x: eu.get_storage_index())
1276 d2.addCallback(lambda si: uploader.start(eu, si))
1278 uploader = CHKUploader(self.parent)
1279 d2.addCallback(lambda x: uploader.start(eu))
1281 self._all_uploads[uploader] = None
1283 history.add_upload(uploader.get_upload_status())
1284 def turn_verifycap_into_read_cap(uploadresults):
1285 # Generate the uri from the verifycap plus the key.
1286 d3 = uploadable.get_encryption_key()
1287 def put_readcap_into_results(key):
1288 v = uri.from_string(uploadresults.verifycapstr)
1289 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1290 uploadresults.uri = r.to_string()
1291 return uploadresults
1292 d3.addCallback(put_readcap_into_results)
1294 d2.addCallback(turn_verifycap_into_read_cap)
1296 d.addCallback(_got_size)