1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap import Referenceable, Copyable, RemoteCopy
7 from foolscap import eventual
9 from allmydata.util.hashutil import file_renewal_secret_hash, \
10 file_cancel_secret_hash, bucket_renewal_secret_hash, \
11 bucket_cancel_secret_hash, plaintext_hasher, \
12 storage_index_hash, plaintext_segment_hasher, convergence_hasher
13 from allmydata import hashtree, uri
14 from allmydata.storage.server import si_b2a
15 from allmydata.immutable import encode
16 from allmydata.util import base32, dictutil, idlib, log, mathutil
17 from allmydata.util.assertutil import precondition
18 from allmydata.util.rrefutil import get_versioned_remote_reference
19 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
20 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
21 NotEnoughSharesError, InsufficientVersionError, NoServersError
22 from allmydata.immutable import layout
23 from pycryptopp.cipher.aes import AES
25 from cStringIO import StringIO
34 class HaveAllPeersError(Exception):
35 # we use this to jump out of the loop
38 # this wants to live in storage, not here
39 class TooFullError(Exception):
42 class UploadResults(Copyable, RemoteCopy):
43 implements(IUploadResults)
44 # note: don't change this string, it needs to match the value used on the
45 # helper, and it does *not* need to match the fully-qualified
46 # package/module/class name
47 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
50 # also, think twice about changing the shape of any existing attribute,
51 # because instances of this class are sent from the helper to its client,
52 # so changing this may break compatibility. Consider adding new fields
53 # instead of modifying existing ones.
56 self.timings = {} # dict of name to number of seconds
57 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
58 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
60 self.ciphertext_fetched = None # how much the helper fetched
62 self.preexisting_shares = None # count of shares already present
63 self.pushed_shares = None # count of shares we pushed
66 # our current uri_extension is 846 bytes for small files, a few bytes
67 # more for larger ones (since the filesize is encoded in decimal in a
68 # few places). Ask for a little bit more just in case we need it. If
69 # the extension changes size, we can change EXTENSION_SIZE to
70 # allocate a more accurate amount of space.
72 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
76 def __init__(self, peerid, storage_server,
77 sharesize, blocksize, num_segments, num_share_hashes,
79 bucket_renewal_secret, bucket_cancel_secret):
80 precondition(isinstance(peerid, str), peerid)
81 precondition(len(peerid) == 20, peerid)
83 self._storageserver = storage_server # to an RIStorageServer
84 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
85 self.sharesize = sharesize
87 wbp = layout.make_write_bucket_proxy(None, sharesize,
88 blocksize, num_segments,
90 EXTENSION_SIZE, peerid)
91 self.wbp_class = wbp.__class__ # to create more of them
92 self.allocated_size = wbp.get_allocated_size()
93 self.blocksize = blocksize
94 self.num_segments = num_segments
95 self.num_share_hashes = num_share_hashes
96 self.storage_index = storage_index
98 self.renew_secret = bucket_renewal_secret
99 self.cancel_secret = bucket_cancel_secret
102 return ("<PeerTracker for peer %s and SI %s>"
103 % (idlib.shortnodeid_b2a(self.peerid),
104 si_b2a(self.storage_index)[:5]))
106 def query(self, sharenums):
107 d = self._storageserver.callRemote("allocate_buckets",
113 canary=Referenceable())
114 d.addCallback(self._got_reply)
117 def _got_reply(self, (alreadygot, buckets)):
118 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
120 for sharenum, rref in buckets.iteritems():
121 bp = self.wbp_class(rref, self.sharesize,
124 self.num_share_hashes,
128 self.buckets.update(b)
129 return (alreadygot, set(b.keys()))
131 class Tahoe2PeerSelector:
133 def __init__(self, upload_id, logparent=None, upload_status=None):
134 self.upload_id = upload_id
135 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
137 self.num_peers_contacted = 0
138 self.last_failure_msg = None
139 self._status = IUploadStatus(upload_status)
140 self._log_parent = log.msg("%s starting" % self, parent=logparent)
143 return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
145 def get_shareholders(self, client,
146 storage_index, share_size, block_size,
147 num_segments, total_shares, shares_of_happiness):
149 @return: (used_peers, already_peers), where used_peers is a set of
150 PeerTracker instances that have agreed to hold some shares
151 for us (the shnum is stashed inside the PeerTracker),
152 and already_peers is a dict mapping shnum to a peer
153 which claims to already have the share.
157 self._status.set_status("Contacting Peers..")
159 self.total_shares = total_shares
160 self.shares_of_happiness = shares_of_happiness
162 self.homeless_shares = range(total_shares)
163 # self.uncontacted_peers = list() # peers we haven't asked yet
164 self.contacted_peers = [] # peers worth asking again
165 self.contacted_peers2 = [] # peers that we have asked again
166 self._started_second_pass = False
167 self.use_peers = set() # PeerTrackers that have shares assigned to them
168 self.preexisting_shares = {} # sharenum -> peerid holding the share
170 peers = client.get_permuted_peers("storage", storage_index)
172 raise NoServersError("client gave us zero peers")
174 # this needed_hashes computation should mirror
175 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
176 # (instead of a HashTree) because we don't require actual hashing
177 # just to count the levels.
178 ht = hashtree.IncompleteHashTree(total_shares)
179 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
181 # figure out how much space to ask for
182 wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
183 num_share_hashes, EXTENSION_SIZE,
185 allocated_size = wbp.get_allocated_size()
187 # filter the list of peers according to which ones can accomodate
188 # this request. This excludes older peers (which used a 4-byte size
189 # field) from getting large shares (for files larger than about
190 # 12GiB). See #439 for details.
191 def _get_maxsize(peer):
192 (peerid, conn) = peer
193 v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
194 return v1["maximum-immutable-share-size"]
195 peers = [peer for peer in peers
196 if _get_maxsize(peer) >= allocated_size]
198 raise NoServersError("no peers could accept an allocated_size of %d" % allocated_size)
200 # decide upon the renewal/cancel secrets, to include them in the
201 # allocat_buckets query.
202 client_renewal_secret = client.get_renewal_secret()
203 client_cancel_secret = client.get_cancel_secret()
205 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
207 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
210 trackers = [ PeerTracker(peerid, conn,
211 share_size, block_size,
212 num_segments, num_share_hashes,
214 bucket_renewal_secret_hash(file_renewal_secret,
216 bucket_cancel_secret_hash(file_cancel_secret,
219 for (peerid, conn) in peers ]
220 self.uncontacted_peers = trackers
222 d = defer.maybeDeferred(self._loop)
226 if not self.homeless_shares:
228 msg = ("placed all %d shares, "
229 "sent %d queries to %d peers, "
230 "%d queries placed some shares, %d placed none, "
233 self.query_count, self.num_peers_contacted,
234 self.good_query_count, self.bad_query_count,
236 log.msg("peer selection successful for %s: %s" % (self, msg),
237 parent=self._log_parent)
238 return (self.use_peers, self.preexisting_shares)
240 if self.uncontacted_peers:
241 peer = self.uncontacted_peers.pop(0)
242 # TODO: don't pre-convert all peerids to PeerTrackers
243 assert isinstance(peer, PeerTracker)
245 shares_to_ask = set([self.homeless_shares.pop(0)])
246 self.query_count += 1
247 self.num_peers_contacted += 1
249 self._status.set_status("Contacting Peers [%s] (first query),"
251 % (idlib.shortnodeid_b2a(peer.peerid),
252 len(self.homeless_shares)))
253 d = peer.query(shares_to_ask)
254 d.addBoth(self._got_response, peer, shares_to_ask,
255 self.contacted_peers)
257 elif self.contacted_peers:
258 # ask a peer that we've already asked.
259 if not self._started_second_pass:
260 log.msg("starting second pass", parent=self._log_parent,
262 self._started_second_pass = True
263 num_shares = mathutil.div_ceil(len(self.homeless_shares),
264 len(self.contacted_peers))
265 peer = self.contacted_peers.pop(0)
266 shares_to_ask = set(self.homeless_shares[:num_shares])
267 self.homeless_shares[:num_shares] = []
268 self.query_count += 1
270 self._status.set_status("Contacting Peers [%s] (second query),"
272 % (idlib.shortnodeid_b2a(peer.peerid),
273 len(self.homeless_shares)))
274 d = peer.query(shares_to_ask)
275 d.addBoth(self._got_response, peer, shares_to_ask,
276 self.contacted_peers2)
278 elif self.contacted_peers2:
279 # we've finished the second-or-later pass. Move all the remaining
280 # peers back into self.contacted_peers for the next pass.
281 self.contacted_peers.extend(self.contacted_peers2)
282 self.contacted_peers[:] = []
285 # no more peers. If we haven't placed enough shares, we fail.
286 placed_shares = self.total_shares - len(self.homeless_shares)
287 if placed_shares < self.shares_of_happiness:
288 msg = ("placed %d shares out of %d total (%d homeless), "
289 "sent %d queries to %d peers, "
290 "%d queries placed some shares, %d placed none, "
292 (self.total_shares - len(self.homeless_shares),
293 self.total_shares, len(self.homeless_shares),
294 self.query_count, self.num_peers_contacted,
295 self.good_query_count, self.bad_query_count,
297 msg = "peer selection failed for %s: %s" % (self, msg)
298 if self.last_failure_msg:
299 msg += " (%s)" % (self.last_failure_msg,)
300 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
301 raise NotEnoughSharesError(msg, placed_shares,
302 self.shares_of_happiness)
304 # we placed enough to be happy, so we're done
306 self._status.set_status("Placed all shares")
307 return self.use_peers
309 def _got_response(self, res, peer, shares_to_ask, put_peer_here):
310 if isinstance(res, failure.Failure):
311 # This is unusual, and probably indicates a bug or a network
313 log.msg("%s got error during peer selection: %s" % (peer, res),
314 level=log.UNUSUAL, parent=self._log_parent)
315 self.error_count += 1
316 self.homeless_shares = list(shares_to_ask) + self.homeless_shares
317 if (self.uncontacted_peers
318 or self.contacted_peers
319 or self.contacted_peers2):
320 # there is still hope, so just loop
323 # No more peers, so this upload might fail (it depends upon
324 # whether we've hit shares_of_happiness or not). Log the last
325 # failure we got: if a coding error causes all peers to fail
326 # in the same way, this allows the common failure to be seen
327 # by the uploader and should help with debugging
328 msg = ("last failure (from %s) was: %s" % (peer, res))
329 self.last_failure_msg = msg
331 (alreadygot, allocated) = res
332 log.msg("response from peer %s: alreadygot=%s, allocated=%s"
333 % (idlib.shortnodeid_b2a(peer.peerid),
334 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
335 level=log.NOISY, parent=self._log_parent)
338 self.preexisting_shares[s] = peer.peerid
339 if s in self.homeless_shares:
340 self.homeless_shares.remove(s)
343 # the PeerTracker will remember which shares were allocated on
344 # that peer. We just have to remember to use them.
346 self.use_peers.add(peer)
349 not_yet_present = set(shares_to_ask) - set(alreadygot)
350 still_homeless = not_yet_present - set(allocated)
353 # they accepted or already had at least one share, so
354 # progress has been made
355 self.good_query_count += 1
357 self.bad_query_count += 1
360 # In networks with lots of space, this is very unusual and
361 # probably indicates an error. In networks with peers that
362 # are full, it is merely unusual. In networks that are very
363 # full, it is common, and many uploads will fail. In most
364 # cases, this is obviously not fatal, and we'll just use some
367 # some shares are still homeless, keep trying to find them a
368 # home. The ones that were rejected get first priority.
369 self.homeless_shares = (list(still_homeless)
370 + self.homeless_shares)
371 # Since they were unable to accept all of our requests, so it
372 # is safe to assume that asking them again won't help.
374 # if they *were* able to accept everything, they might be
375 # willing to accept even more.
376 put_peer_here.append(peer)
382 class EncryptAnUploadable:
383 """This is a wrapper that takes an IUploadable and provides
384 IEncryptedUploadable."""
385 implements(IEncryptedUploadable)
388 def __init__(self, original, log_parent=None):
389 self.original = IUploadable(original)
390 self._log_number = log_parent
391 self._encryptor = None
392 self._plaintext_hasher = plaintext_hasher()
393 self._plaintext_segment_hasher = None
394 self._plaintext_segment_hashes = []
395 self._encoding_parameters = None
396 self._file_size = None
397 self._ciphertext_bytes_read = 0
400 def set_upload_status(self, upload_status):
401 self._status = IUploadStatus(upload_status)
402 self.original.set_upload_status(upload_status)
404 def log(self, *args, **kwargs):
405 if "facility" not in kwargs:
406 kwargs["facility"] = "upload.encryption"
407 if "parent" not in kwargs:
408 kwargs["parent"] = self._log_number
409 return log.msg(*args, **kwargs)
412 if self._file_size is not None:
413 return defer.succeed(self._file_size)
414 d = self.original.get_size()
416 self._file_size = size
418 self._status.set_size(size)
420 d.addCallback(_got_size)
423 def get_all_encoding_parameters(self):
424 if self._encoding_parameters is not None:
425 return defer.succeed(self._encoding_parameters)
426 d = self.original.get_all_encoding_parameters()
427 def _got(encoding_parameters):
428 (k, happy, n, segsize) = encoding_parameters
429 self._segment_size = segsize # used by segment hashers
430 self._encoding_parameters = encoding_parameters
431 self.log("my encoding parameters: %s" % (encoding_parameters,),
433 return encoding_parameters
437 def _get_encryptor(self):
439 return defer.succeed(self._encryptor)
441 d = self.original.get_encryption_key()
446 storage_index = storage_index_hash(key)
447 assert isinstance(storage_index, str)
448 # There's no point to having the SI be longer than the key, so we
449 # specify that it is truncated to the same 128 bits as the AES key.
450 assert len(storage_index) == 16 # SHA-256 truncated to 128b
451 self._storage_index = storage_index
453 self._status.set_storage_index(storage_index)
458 def get_storage_index(self):
459 d = self._get_encryptor()
460 d.addCallback(lambda res: self._storage_index)
463 def _get_segment_hasher(self):
464 p = self._plaintext_segment_hasher
466 left = self._segment_size - self._plaintext_segment_hashed_bytes
468 p = plaintext_segment_hasher()
469 self._plaintext_segment_hasher = p
470 self._plaintext_segment_hashed_bytes = 0
471 return p, self._segment_size
473 def _update_segment_hash(self, chunk):
475 while offset < len(chunk):
476 p, segment_left = self._get_segment_hasher()
477 chunk_left = len(chunk) - offset
478 this_segment = min(chunk_left, segment_left)
479 p.update(chunk[offset:offset+this_segment])
480 self._plaintext_segment_hashed_bytes += this_segment
482 if self._plaintext_segment_hashed_bytes == self._segment_size:
483 # we've filled this segment
484 self._plaintext_segment_hashes.append(p.digest())
485 self._plaintext_segment_hasher = None
486 self.log("closed hash [%d]: %dB" %
487 (len(self._plaintext_segment_hashes)-1,
488 self._plaintext_segment_hashed_bytes),
490 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
491 segnum=len(self._plaintext_segment_hashes)-1,
492 hash=base32.b2a(p.digest()),
495 offset += this_segment
498 def read_encrypted(self, length, hash_only):
499 # make sure our parameters have been set up first
500 d = self.get_all_encoding_parameters()
502 d.addCallback(lambda ignored: self.get_size())
503 d.addCallback(lambda ignored: self._get_encryptor())
504 # then fetch and encrypt the plaintext. The unusual structure here
505 # (passing a Deferred *into* a function) is needed to avoid
506 # overflowing the stack: Deferreds don't optimize out tail recursion.
507 # We also pass in a list, to which _read_encrypted will append
510 d2 = defer.Deferred()
511 d.addCallback(lambda ignored:
512 self._read_encrypted(length, ciphertext, hash_only, d2))
513 d.addCallback(lambda ignored: d2)
516 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
518 fire_when_done.callback(ciphertext)
520 # tolerate large length= values without consuming a lot of RAM by
521 # reading just a chunk (say 50kB) at a time. This only really matters
522 # when hash_only==True (i.e. resuming an interrupted upload), since
523 # that's the case where we will be skipping over a lot of data.
524 size = min(remaining, self.CHUNKSIZE)
525 remaining = remaining - size
526 # read a chunk of plaintext..
527 d = defer.maybeDeferred(self.original.read, size)
528 # N.B.: if read() is synchronous, then since everything else is
529 # actually synchronous too, we'd blow the stack unless we stall for a
530 # tick. Once you accept a Deferred from IUploadable.read(), you must
531 # be prepared to have it fire immediately too.
532 d.addCallback(eventual.fireEventually)
533 def _good(plaintext):
535 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
536 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
537 ciphertext.extend(ct)
538 self._read_encrypted(remaining, ciphertext, hash_only,
541 fire_when_done.errback(why)
546 def _hash_and_encrypt_plaintext(self, data, hash_only):
547 assert isinstance(data, (tuple, list)), type(data)
550 # we use data.pop(0) instead of 'for chunk in data' to save
551 # memory: each chunk is destroyed as soon as we're done with it.
555 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
557 bytes_processed += len(chunk)
558 self._plaintext_hasher.update(chunk)
559 self._update_segment_hash(chunk)
560 # TODO: we have to encrypt the data (even if hash_only==True)
561 # because pycryptopp's AES-CTR implementation doesn't offer a
562 # way to change the counter value. Once pycryptopp acquires
563 # this ability, change this to simply update the counter
564 # before each call to (hash_only==False) _encryptor.process()
565 ciphertext = self._encryptor.process(chunk)
567 self.log(" skipping encryption", level=log.NOISY)
569 cryptdata.append(ciphertext)
572 self._ciphertext_bytes_read += bytes_processed
574 progress = float(self._ciphertext_bytes_read) / self._file_size
575 self._status.set_progress(1, progress)
579 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
580 if len(self._plaintext_segment_hashes) < num_segments:
581 # close out the last one
582 assert len(self._plaintext_segment_hashes) == num_segments-1
583 p, segment_left = self._get_segment_hasher()
584 self._plaintext_segment_hashes.append(p.digest())
585 del self._plaintext_segment_hasher
586 self.log("closing plaintext leaf hasher, hashed %d bytes" %
587 self._plaintext_segment_hashed_bytes,
589 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
590 segnum=len(self._plaintext_segment_hashes)-1,
591 hash=base32.b2a(p.digest()),
593 assert len(self._plaintext_segment_hashes) == num_segments
594 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
596 def get_plaintext_hash(self):
597 h = self._plaintext_hasher.digest()
598 return defer.succeed(h)
601 return self.original.close()
604 implements(IUploadStatus)
605 statusid_counter = itertools.count(0)
608 self.storage_index = None
611 self.status = "Not started"
612 self.progress = [0.0, 0.0, 0.0]
615 self.counter = self.statusid_counter.next()
616 self.started = time.time()
618 def get_started(self):
620 def get_storage_index(self):
621 return self.storage_index
624 def using_helper(self):
626 def get_status(self):
628 def get_progress(self):
629 return tuple(self.progress)
630 def get_active(self):
632 def get_results(self):
634 def get_counter(self):
637 def set_storage_index(self, si):
638 self.storage_index = si
639 def set_size(self, size):
641 def set_helper(self, helper):
643 def set_status(self, status):
645 def set_progress(self, which, value):
646 # [0]: chk, [1]: ciphertext, [2]: encode+push
647 self.progress[which] = value
648 def set_active(self, value):
650 def set_results(self, value):
654 peer_selector_class = Tahoe2PeerSelector
656 def __init__(self, client):
657 self._client = client
658 self._log_number = self._client.log("CHKUploader starting")
660 self._results = UploadResults()
661 self._storage_index = None
662 self._upload_status = UploadStatus()
663 self._upload_status.set_helper(False)
664 self._upload_status.set_active(True)
665 self._upload_status.set_results(self._results)
667 # locate_all_shareholders() will create the following attribute:
668 # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
670 def log(self, *args, **kwargs):
671 if "parent" not in kwargs:
672 kwargs["parent"] = self._log_number
673 if "facility" not in kwargs:
674 kwargs["facility"] = "tahoe.upload"
675 return self._client.log(*args, **kwargs)
677 def start(self, encrypted_uploadable):
678 """Start uploading the file.
680 Returns a Deferred that will fire with the UploadResults instance.
683 self._started = time.time()
684 eu = IEncryptedUploadable(encrypted_uploadable)
685 self.log("starting upload of %s" % eu)
687 eu.set_upload_status(self._upload_status)
688 d = self.start_encrypted(eu)
689 def _done(uploadresults):
690 self._upload_status.set_active(False)
696 """Call this if the upload must be abandoned before it completes.
697 This will tell the shareholders to delete their partial shares. I
698 return a Deferred that fires when these messages have been acked."""
699 if not self._encoder:
700 # how did you call abort() before calling start() ?
701 return defer.succeed(None)
702 return self._encoder.abort()
704 def start_encrypted(self, encrypted):
705 """ Returns a Deferred that will fire with the UploadResults instance. """
706 eu = IEncryptedUploadable(encrypted)
708 started = time.time()
709 self._encoder = e = encode.Encoder(self._log_number,
711 d = e.set_encrypted_uploadable(eu)
712 d.addCallback(self.locate_all_shareholders, started)
713 d.addCallback(self.set_shareholders, e)
714 d.addCallback(lambda res: e.start())
715 d.addCallback(self._encrypted_done)
718 def locate_all_shareholders(self, encoder, started):
719 peer_selection_started = now = time.time()
720 self._storage_index_elapsed = now - started
721 storage_index = encoder.get_param("storage_index")
722 self._storage_index = storage_index
723 upload_id = si_b2a(storage_index)[:5]
724 self.log("using storage index %s" % upload_id)
725 peer_selector = self.peer_selector_class(upload_id, self._log_number,
728 share_size = encoder.get_param("share_size")
729 block_size = encoder.get_param("block_size")
730 num_segments = encoder.get_param("num_segments")
731 k,desired,n = encoder.get_param("share_counts")
733 self._peer_selection_started = time.time()
734 d = peer_selector.get_shareholders(self._client, storage_index,
735 share_size, block_size,
736 num_segments, n, desired)
738 self._peer_selection_elapsed = time.time() - peer_selection_started
743 def set_shareholders(self, (used_peers, already_peers), encoder):
745 @param used_peers: a sequence of PeerTracker objects
746 @paran already_peers: a dict mapping sharenum to a peerid that
747 claims to already have this share
749 self.log("_send_shares, used_peers is %s" % (used_peers,))
750 # record already-present shares in self._results
751 self._results.preexisting_shares = len(already_peers)
753 self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
754 for peer in used_peers:
755 assert isinstance(peer, PeerTracker)
757 for peer in used_peers:
758 buckets.update(peer.buckets)
759 for shnum in peer.buckets:
760 self._peer_trackers[shnum] = peer
761 assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
762 encoder.set_shareholders(buckets)
764 def _encrypted_done(self, verifycap):
765 """ Returns a Deferred that will fire with the UploadResults instance. """
767 for shnum in self._encoder.get_shares_placed():
768 peer_tracker = self._peer_trackers[shnum]
769 peerid = peer_tracker.peerid
770 peerid_s = idlib.shortnodeid_b2a(peerid)
771 r.sharemap.add(shnum, peerid)
772 r.servermap.add(peerid, shnum)
773 r.pushed_shares = len(self._encoder.get_shares_placed())
775 r.file_size = self._encoder.file_size
776 r.timings["total"] = now - self._started
777 r.timings["storage_index"] = self._storage_index_elapsed
778 r.timings["peer_selection"] = self._peer_selection_elapsed
779 r.timings.update(self._encoder.get_times())
780 r.uri_extension_data = self._encoder.get_uri_extension_data()
781 r.verifycapstr = verifycap.to_string()
784 def get_upload_status(self):
785 return self._upload_status
787 def read_this_many_bytes(uploadable, size, prepend_data=[]):
789 return defer.succeed([])
790 d = uploadable.read(size)
792 assert isinstance(data, list)
793 bytes = sum([len(piece) for piece in data])
796 remaining = size - bytes
798 return read_this_many_bytes(uploadable, remaining,
800 return prepend_data + data
804 class LiteralUploader:
806 def __init__(self, client):
807 self._client = client
808 self._results = UploadResults()
809 self._status = s = UploadStatus()
810 s.set_storage_index(None)
812 s.set_progress(0, 1.0)
814 s.set_results(self._results)
816 def start(self, uploadable):
817 uploadable = IUploadable(uploadable)
818 d = uploadable.get_size()
821 self._status.set_size(size)
822 self._results.file_size = size
823 return read_this_many_bytes(uploadable, size)
824 d.addCallback(_got_size)
825 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
826 d.addCallback(lambda u: u.to_string())
827 d.addCallback(self._build_results)
830 def _build_results(self, uri):
831 self._results.uri = uri
832 self._status.set_status("Done")
833 self._status.set_progress(1, 1.0)
834 self._status.set_progress(2, 1.0)
840 def get_upload_status(self):
843 class RemoteEncryptedUploadable(Referenceable):
844 implements(RIEncryptedUploadable)
846 def __init__(self, encrypted_uploadable, upload_status):
847 self._eu = IEncryptedUploadable(encrypted_uploadable)
850 self._status = IUploadStatus(upload_status)
851 # we are responsible for updating the status string while we run, and
852 # for setting the ciphertext-fetch progress.
856 if self._size is not None:
857 return defer.succeed(self._size)
858 d = self._eu.get_size()
862 d.addCallback(_got_size)
865 def remote_get_size(self):
866 return self.get_size()
867 def remote_get_all_encoding_parameters(self):
868 return self._eu.get_all_encoding_parameters()
870 def _read_encrypted(self, length, hash_only):
871 d = self._eu.read_encrypted(length, hash_only)
874 self._offset += length
876 size = sum([len(data) for data in strings])
882 def remote_read_encrypted(self, offset, length):
883 # we don't support seek backwards, but we allow skipping forwards
884 precondition(offset >= 0, offset)
885 precondition(length >= 0, length)
886 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
888 precondition(offset >= self._offset, offset, self._offset)
889 if offset > self._offset:
890 # read the data from disk anyways, to build up the hash tree
891 skip = offset - self._offset
892 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
893 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
894 d = self._read_encrypted(skip, hash_only=True)
896 d = defer.succeed(None)
898 def _at_correct_offset(res):
899 assert offset == self._offset, "%d != %d" % (offset, self._offset)
900 return self._read_encrypted(length, hash_only=False)
901 d.addCallback(_at_correct_offset)
904 size = sum([len(data) for data in strings])
905 self._bytes_sent += size
910 def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
911 log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
912 (first, last-1, num_segments),
914 d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
917 def remote_get_plaintext_hash(self):
918 return self._eu.get_plaintext_hash()
919 def remote_close(self):
920 return self._eu.close()
923 class AssistedUploader:
925 def __init__(self, helper):
926 self._helper = helper
927 self._log_number = log.msg("AssistedUploader starting")
928 self._storage_index = None
929 self._upload_status = s = UploadStatus()
933 def log(self, *args, **kwargs):
934 if "parent" not in kwargs:
935 kwargs["parent"] = self._log_number
936 return log.msg(*args, **kwargs)
938 def start(self, encrypted_uploadable, storage_index):
939 """Start uploading the file.
941 Returns a Deferred that will fire with the UploadResults instance.
943 precondition(isinstance(storage_index, str), storage_index)
944 self._started = time.time()
945 eu = IEncryptedUploadable(encrypted_uploadable)
946 eu.set_upload_status(self._upload_status)
947 self._encuploadable = eu
948 self._storage_index = storage_index
950 d.addCallback(self._got_size)
951 d.addCallback(lambda res: eu.get_all_encoding_parameters())
952 d.addCallback(self._got_all_encoding_parameters)
953 d.addCallback(self._contact_helper)
954 d.addCallback(self._build_verifycap)
956 self._upload_status.set_active(False)
961 def _got_size(self, size):
963 self._upload_status.set_size(size)
965 def _got_all_encoding_parameters(self, params):
966 k, happy, n, segment_size = params
967 # stash these for URI generation later
968 self._needed_shares = k
969 self._total_shares = n
970 self._segment_size = segment_size
972 def _contact_helper(self, res):
973 now = self._time_contacting_helper_start = time.time()
974 self._storage_index_elapsed = now - self._started
975 self.log(format="contacting helper for SI %(si)s..",
976 si=si_b2a(self._storage_index))
977 self._upload_status.set_status("Contacting Helper")
978 d = self._helper.callRemote("upload_chk", self._storage_index)
979 d.addCallback(self._contacted_helper)
982 def _contacted_helper(self, (upload_results, upload_helper)):
984 elapsed = now - self._time_contacting_helper_start
985 self._elapsed_time_contacting_helper = elapsed
987 self.log("helper says we need to upload")
988 self._upload_status.set_status("Uploading Ciphertext")
989 # we need to upload the file
990 reu = RemoteEncryptedUploadable(self._encuploadable,
992 # let it pre-compute the size for progress purposes
994 d.addCallback(lambda ignored:
995 upload_helper.callRemote("upload", reu))
996 # this Deferred will fire with the upload results
998 self.log("helper says file is already uploaded")
999 self._upload_status.set_progress(1, 1.0)
1000 self._upload_status.set_results(upload_results)
1001 return upload_results
1003 def _convert_old_upload_results(self, upload_results):
1004 # pre-1.3.0 helpers return upload results which contain a mapping
1005 # from shnum to a single human-readable string, containing things
1006 # like "Found on [x],[y],[z]" (for healthy files that were already in
1007 # the grid), "Found on [x]" (for files that needed upload but which
1008 # discovered pre-existing shares), and "Placed on [x]" (for newly
1009 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1010 # set of binary serverid strings.
1012 # the old results are too hard to deal with (they don't even contain
1013 # as much information as the new results, since the nodeids are
1014 # abbreviated), so if we detect old results, just clobber them.
1016 sharemap = upload_results.sharemap
1017 if str in [type(v) for v in sharemap.values()]:
1018 upload_results.sharemap = None
1020 def _build_verifycap(self, upload_results):
1021 self.log("upload finished, building readcap")
1022 self._convert_old_upload_results(upload_results)
1023 self._upload_status.set_status("Building Readcap")
1025 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1026 assert r.uri_extension_data["total_shares"] == self._total_shares
1027 assert r.uri_extension_data["segment_size"] == self._segment_size
1028 assert r.uri_extension_data["size"] == self._size
1029 r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1030 uri_extension_hash=r.uri_extension_hash,
1031 needed_shares=self._needed_shares,
1032 total_shares=self._total_shares, size=self._size
1035 r.file_size = self._size
1036 r.timings["storage_index"] = self._storage_index_elapsed
1037 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1038 if "total" in r.timings:
1039 r.timings["helper_total"] = r.timings["total"]
1040 r.timings["total"] = now - self._started
1041 self._upload_status.set_status("Done")
1042 self._upload_status.set_results(r)
1045 def get_upload_status(self):
1046 return self._upload_status
1048 class BaseUploadable:
1049 default_max_segment_size = 128*KiB # overridden by max_segment_size
1050 default_encoding_param_k = 3 # overridden by encoding_parameters
1051 default_encoding_param_happy = 7
1052 default_encoding_param_n = 10
1054 max_segment_size = None
1055 encoding_param_k = None
1056 encoding_param_happy = None
1057 encoding_param_n = None
1059 _all_encoding_parameters = None
1062 def set_upload_status(self, upload_status):
1063 self._status = IUploadStatus(upload_status)
1065 def set_default_encoding_parameters(self, default_params):
1066 assert isinstance(default_params, dict)
1067 for k,v in default_params.items():
1068 precondition(isinstance(k, str), k, v)
1069 precondition(isinstance(v, int), k, v)
1070 if "k" in default_params:
1071 self.default_encoding_param_k = default_params["k"]
1072 if "happy" in default_params:
1073 self.default_encoding_param_happy = default_params["happy"]
1074 if "n" in default_params:
1075 self.default_encoding_param_n = default_params["n"]
1076 if "max_segment_size" in default_params:
1077 self.default_max_segment_size = default_params["max_segment_size"]
1079 def get_all_encoding_parameters(self):
1080 if self._all_encoding_parameters:
1081 return defer.succeed(self._all_encoding_parameters)
1083 max_segsize = self.max_segment_size or self.default_max_segment_size
1084 k = self.encoding_param_k or self.default_encoding_param_k
1085 happy = self.encoding_param_happy or self.default_encoding_param_happy
1086 n = self.encoding_param_n or self.default_encoding_param_n
1089 def _got_size(file_size):
1090 # for small files, shrink the segment size to avoid wasting space
1091 segsize = min(max_segsize, file_size)
1092 # this must be a multiple of 'required_shares'==k
1093 segsize = mathutil.next_multiple(segsize, k)
1094 encoding_parameters = (k, happy, n, segsize)
1095 self._all_encoding_parameters = encoding_parameters
1096 return encoding_parameters
1097 d.addCallback(_got_size)
1100 class FileHandle(BaseUploadable):
1101 implements(IUploadable)
1103 def __init__(self, filehandle, convergence):
1105 Upload the data from the filehandle. If convergence is None then a
1106 random encryption key will be used, else the plaintext will be hashed,
1107 then the hash will be hashed together with the string in the
1108 "convergence" argument to form the encryption key.
1110 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1111 self._filehandle = filehandle
1113 self.convergence = convergence
1116 def _get_encryption_key_convergent(self):
1117 if self._key is not None:
1118 return defer.succeed(self._key)
1121 # that sets self._size as a side-effect
1122 d.addCallback(lambda size: self.get_all_encoding_parameters())
1124 k, happy, n, segsize = params
1125 f = self._filehandle
1126 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1131 data = f.read(BLOCKSIZE)
1134 enckey_hasher.update(data)
1135 # TODO: setting progress in a non-yielding loop is kind of
1136 # pointless, but I'm anticipating (perhaps prematurely) the
1137 # day when we use a slowjob or twisted's CooperatorService to
1138 # make this yield time to other jobs.
1139 bytes_read += len(data)
1141 self._status.set_progress(0, float(bytes_read)/self._size)
1143 self._key = enckey_hasher.digest()
1145 self._status.set_progress(0, 1.0)
1146 assert len(self._key) == 16
1151 def _get_encryption_key_random(self):
1152 if self._key is None:
1153 self._key = os.urandom(16)
1154 return defer.succeed(self._key)
1156 def get_encryption_key(self):
1157 if self.convergence is not None:
1158 return self._get_encryption_key_convergent()
1160 return self._get_encryption_key_random()
1163 if self._size is not None:
1164 return defer.succeed(self._size)
1165 self._filehandle.seek(0,2)
1166 size = self._filehandle.tell()
1168 self._filehandle.seek(0)
1169 return defer.succeed(size)
1171 def read(self, length):
1172 return defer.succeed([self._filehandle.read(length)])
1175 # the originator of the filehandle reserves the right to close it
1178 class FileName(FileHandle):
1179 def __init__(self, filename, convergence):
1181 Upload the data from the filename. If convergence is None then a
1182 random encryption key will be used, else the plaintext will be hashed,
1183 then the hash will be hashed together with the string in the
1184 "convergence" argument to form the encryption key.
1186 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1187 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1189 FileHandle.close(self)
1190 self._filehandle.close()
1192 class Data(FileHandle):
1193 def __init__(self, data, convergence):
1195 Upload the data from the data argument. If convergence is None then a
1196 random encryption key will be used, else the plaintext will be hashed,
1197 then the hash will be hashed together with the string in the
1198 "convergence" argument to form the encryption key.
1200 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1201 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1203 class Uploader(service.MultiService, log.PrefixingLogMixin):
1204 """I am a service that allows file uploading. I am a service-child of the
1207 implements(IUploader)
1209 URI_LIT_SIZE_THRESHOLD = 55
1211 def __init__(self, helper_furl=None, stats_provider=None):
1212 self._helper_furl = helper_furl
1213 self.stats_provider = stats_provider
1215 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1216 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1217 service.MultiService.__init__(self)
1219 def startService(self):
1220 service.MultiService.startService(self)
1221 if self._helper_furl:
1222 self.parent.tub.connectTo(self._helper_furl,
1225 def _got_helper(self, helper):
1226 self.log("got helper connection, getting versions")
1227 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1229 "application-version": "unknown: no get_version()",
1231 d = get_versioned_remote_reference(helper, default)
1232 d.addCallback(self._got_versioned_helper)
1234 def _got_versioned_helper(self, helper):
1235 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1236 if needed not in helper.version:
1237 raise InsufficientVersionError(needed, helper.version)
1238 self._helper = helper
1239 helper.notifyOnDisconnect(self._lost_helper)
1241 def _lost_helper(self):
1244 def get_helper_info(self):
1245 # return a tuple of (helper_furl_or_None, connected_bool)
1246 return (self._helper_furl, bool(self._helper))
1249 def upload(self, uploadable, history=None):
1251 Returns a Deferred that will fire with the UploadResults instance.
1256 uploadable = IUploadable(uploadable)
1257 d = uploadable.get_size()
1258 def _got_size(size):
1259 default_params = self.parent.get_encoding_parameters()
1260 precondition(isinstance(default_params, dict), default_params)
1261 precondition("max_segment_size" in default_params, default_params)
1262 uploadable.set_default_encoding_parameters(default_params)
1264 if self.stats_provider:
1265 self.stats_provider.count('uploader.files_uploaded', 1)
1266 self.stats_provider.count('uploader.bytes_uploaded', size)
1268 if size <= self.URI_LIT_SIZE_THRESHOLD:
1269 uploader = LiteralUploader(self.parent)
1270 return uploader.start(uploadable)
1272 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1273 d2 = defer.succeed(None)
1275 uploader = AssistedUploader(self._helper)
1276 d2.addCallback(lambda x: eu.get_storage_index())
1277 d2.addCallback(lambda si: uploader.start(eu, si))
1279 uploader = CHKUploader(self.parent)
1280 d2.addCallback(lambda x: uploader.start(eu))
1282 self._all_uploads[uploader] = None
1284 history.add_upload(uploader.get_upload_status())
1285 def turn_verifycap_into_read_cap(uploadresults):
1286 # Generate the uri from the verifycap plus the key.
1287 d3 = uploadable.get_encryption_key()
1288 def put_readcap_into_results(key):
1289 v = uri.from_string(uploadresults.verifycapstr)
1290 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1291 uploadresults.uri = r.to_string()
1292 return uploadresults
1293 d3.addCallback(put_readcap_into_results)
1295 d2.addCallback(turn_verifycap_into_read_cap)
1297 d.addCallback(_got_size)