1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9 file_cancel_secret_hash, bucket_renewal_secret_hash, \
10 bucket_cancel_secret_hash, plaintext_hasher, \
11 storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.rrefutil import add_version_to_remote_reference
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
20 NotEnoughSharesError, NoSharesError, NoServersError, \
21 InsufficientVersionError
22 from allmydata.immutable import layout
23 from pycryptopp.cipher.aes import AES
25 from cStringIO import StringIO
34 class HaveAllPeersError(Exception):
35 # we use this to jump out of the loop
38 # this wants to live in storage, not here
39 class TooFullError(Exception):
42 class UploadResults(Copyable, RemoteCopy):
43 implements(IUploadResults)
44 # note: don't change this string, it needs to match the value used on the
45 # helper, and it does *not* need to match the fully-qualified
46 # package/module/class name
47 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
50 # also, think twice about changing the shape of any existing attribute,
51 # because instances of this class are sent from the helper to its client,
52 # so changing this may break compatibility. Consider adding new fields
53 # instead of modifying existing ones.
56 self.timings = {} # dict of name to number of seconds
57 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
58 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
60 self.ciphertext_fetched = None # how much the helper fetched
62 self.preexisting_shares = None # count of shares already present
63 self.pushed_shares = None # count of shares we pushed
66 # our current uri_extension is 846 bytes for small files, a few bytes
67 # more for larger ones (since the filesize is encoded in decimal in a
68 # few places). Ask for a little bit more just in case we need it. If
69 # the extension changes size, we can change EXTENSION_SIZE to
70 # allocate a more accurate amount of space.
72 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
76 def __init__(self, peerid, storage_server,
77 sharesize, blocksize, num_segments, num_share_hashes,
79 bucket_renewal_secret, bucket_cancel_secret):
80 precondition(isinstance(peerid, str), peerid)
81 precondition(len(peerid) == 20, peerid)
83 self._storageserver = storage_server # to an RIStorageServer
84 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
85 self.sharesize = sharesize
87 wbp = layout.make_write_bucket_proxy(None, sharesize,
88 blocksize, num_segments,
90 EXTENSION_SIZE, peerid)
91 self.wbp_class = wbp.__class__ # to create more of them
92 self.allocated_size = wbp.get_allocated_size()
93 self.blocksize = blocksize
94 self.num_segments = num_segments
95 self.num_share_hashes = num_share_hashes
96 self.storage_index = storage_index
98 self.renew_secret = bucket_renewal_secret
99 self.cancel_secret = bucket_cancel_secret
102 return ("<PeerTracker for peer %s and SI %s>"
103 % (idlib.shortnodeid_b2a(self.peerid),
104 si_b2a(self.storage_index)[:5]))
106 def query(self, sharenums):
107 d = self._storageserver.callRemote("allocate_buckets",
113 canary=Referenceable())
114 d.addCallback(self._got_reply)
117 def _got_reply(self, (alreadygot, buckets)):
118 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
120 for sharenum, rref in buckets.iteritems():
121 bp = self.wbp_class(rref, self.sharesize,
124 self.num_share_hashes,
128 self.buckets.update(b)
129 return (alreadygot, set(b.keys()))
131 def servers_with_unique_shares(existing_shares, used_peers=None):
134 peers = list(used_peers.copy())
135 # We do this because the preexisting shares list goes by peerid.
136 peers = [x.peerid for x in peers]
137 servers.extend(peers)
138 servers.extend(existing_shares.values())
139 return list(set(servers))
141 def shares_by_server(existing_shares):
143 for server in set(existing_shares.values()):
144 servers[server] = set([x for x in existing_shares.keys()
145 if existing_shares[x] == server])
148 class Tahoe2PeerSelector:
150 def __init__(self, upload_id, logparent=None, upload_status=None):
151 self.upload_id = upload_id
152 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
154 self.num_peers_contacted = 0
155 self.last_failure_msg = None
156 self._status = IUploadStatus(upload_status)
157 self._log_parent = log.msg("%s starting" % self, parent=logparent)
160 return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
162 def get_shareholders(self, storage_broker, secret_holder,
163 storage_index, share_size, block_size,
164 num_segments, total_shares, servers_of_happiness):
166 @return: (used_peers, already_peers), where used_peers is a set of
167 PeerTracker instances that have agreed to hold some shares
168 for us (the shnum is stashed inside the PeerTracker),
169 and already_peers is a dict mapping shnum to a peer
170 which claims to already have the share.
174 self._status.set_status("Contacting Peers..")
176 self.total_shares = total_shares
177 self.servers_of_happiness = servers_of_happiness
179 self.homeless_shares = range(total_shares)
180 # self.uncontacted_peers = list() # peers we haven't asked yet
181 self.contacted_peers = [] # peers worth asking again
182 self.contacted_peers2 = [] # peers that we have asked again
183 self._started_second_pass = False
184 self.use_peers = set() # PeerTrackers that have shares assigned to them
185 self.preexisting_shares = {} # sharenum -> peerid holding the share
187 peers = storage_broker.get_servers_for_index(storage_index)
189 raise NoServersError("client gave us zero peers")
191 # this needed_hashes computation should mirror
192 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
193 # (instead of a HashTree) because we don't require actual hashing
194 # just to count the levels.
195 ht = hashtree.IncompleteHashTree(total_shares)
196 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
198 # figure out how much space to ask for
199 wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
200 num_share_hashes, EXTENSION_SIZE,
202 allocated_size = wbp.get_allocated_size()
204 # filter the list of peers according to which ones can accomodate
205 # this request. This excludes older peers (which used a 4-byte size
206 # field) from getting large shares (for files larger than about
207 # 12GiB). See #439 for details.
208 def _get_maxsize(peer):
209 (peerid, conn) = peer
210 v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
211 return v1["maximum-immutable-share-size"]
212 peers = [peer for peer in peers
213 if _get_maxsize(peer) >= allocated_size]
215 raise NoServersError("no peers could accept an allocated_size of %d" % allocated_size)
217 # decide upon the renewal/cancel secrets, to include them in the
218 # allocate_buckets query.
219 client_renewal_secret = secret_holder.get_renewal_secret()
220 client_cancel_secret = secret_holder.get_cancel_secret()
222 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
224 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
227 trackers = [ PeerTracker(peerid, conn,
228 share_size, block_size,
229 num_segments, num_share_hashes,
231 bucket_renewal_secret_hash(file_renewal_secret,
233 bucket_cancel_secret_hash(file_cancel_secret,
236 for (peerid, conn) in peers ]
237 self.uncontacted_peers = trackers
239 d = defer.maybeDeferred(self._loop)
244 if not self.homeless_shares:
245 effective_happiness = servers_with_unique_shares(
246 self.preexisting_shares,
248 if self.servers_of_happiness <= len(effective_happiness):
249 msg = ("placed all %d shares, "
250 "sent %d queries to %d peers, "
251 "%d queries placed some shares, %d placed none, "
254 self.query_count, self.num_peers_contacted,
255 self.good_query_count, self.bad_query_count,
257 log.msg("peer selection successful for %s: %s" % (self, msg),
258 parent=self._log_parent)
259 return (self.use_peers, self.preexisting_shares)
261 delta = self.servers_of_happiness - len(effective_happiness)
262 shares = shares_by_server(self.preexisting_shares)
263 # Each server in shares maps to a set of shares stored on it.
264 # Since we want to keep at least one share on each server
265 # that has one (otherwise we'd only be making
266 # the situation worse by removing distinct servers),
267 # each server has len(its shares) - 1 to spread around.
268 shares_to_spread = sum([len(list(sharelist)) - 1
269 for (server, sharelist)
271 if delta <= len(self.uncontacted_peers) and \
272 shares_to_spread >= delta:
273 # Loop through the allocated shares, removing
274 items = shares.items()
275 while len(self.homeless_shares) < delta:
276 servernum, sharelist = items.pop()
277 if len(sharelist) > 1:
278 share = sharelist.pop()
279 self.homeless_shares.append(share)
280 del(self.preexisting_shares[share])
281 items.append((servernum, sharelist))
284 raise NotEnoughSharesError("shares could only be placed on %d "
285 "servers (%d were requested)" %
286 (len(effective_happiness),
287 self.servers_of_happiness))
289 if self.uncontacted_peers:
290 peer = self.uncontacted_peers.pop(0)
291 # TODO: don't pre-convert all peerids to PeerTrackers
292 assert isinstance(peer, PeerTracker)
294 shares_to_ask = set([self.homeless_shares.pop(0)])
295 self.query_count += 1
296 self.num_peers_contacted += 1
298 self._status.set_status("Contacting Peers [%s] (first query),"
300 % (idlib.shortnodeid_b2a(peer.peerid),
301 len(self.homeless_shares)))
302 d = peer.query(shares_to_ask)
303 d.addBoth(self._got_response, peer, shares_to_ask,
304 self.contacted_peers)
306 elif self.contacted_peers:
307 # ask a peer that we've already asked.
308 if not self._started_second_pass:
309 log.msg("starting second pass", parent=self._log_parent,
311 self._started_second_pass = True
312 num_shares = mathutil.div_ceil(len(self.homeless_shares),
313 len(self.contacted_peers))
314 peer = self.contacted_peers.pop(0)
315 shares_to_ask = set(self.homeless_shares[:num_shares])
316 self.homeless_shares[:num_shares] = []
317 self.query_count += 1
319 self._status.set_status("Contacting Peers [%s] (second query),"
321 % (idlib.shortnodeid_b2a(peer.peerid),
322 len(self.homeless_shares)))
323 d = peer.query(shares_to_ask)
324 d.addBoth(self._got_response, peer, shares_to_ask,
325 self.contacted_peers2)
327 elif self.contacted_peers2:
328 # we've finished the second-or-later pass. Move all the remaining
329 # peers back into self.contacted_peers for the next pass.
330 self.contacted_peers.extend(self.contacted_peers2)
331 self.contacted_peers2[:] = []
334 # no more peers. If we haven't placed enough shares, we fail.
335 placed_shares = self.total_shares - len(self.homeless_shares)
336 effective_happiness = servers_with_unique_shares(
337 self.preexisting_shares,
339 if len(effective_happiness) < self.servers_of_happiness:
340 msg = ("placed %d shares out of %d total (%d homeless), "
341 "want to place on %d servers, "
342 "sent %d queries to %d peers, "
343 "%d queries placed some shares, %d placed none, "
345 (self.total_shares - len(self.homeless_shares),
346 self.total_shares, len(self.homeless_shares),
347 self.servers_of_happiness,
348 self.query_count, self.num_peers_contacted,
349 self.good_query_count, self.bad_query_count,
351 msg = "peer selection failed for %s: %s" % (self, msg)
352 if self.last_failure_msg:
353 msg += " (%s)" % (self.last_failure_msg,)
354 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
356 raise NotEnoughSharesError(msg)
358 raise NoSharesError(msg)
360 # we placed enough to be happy, so we're done
362 self._status.set_status("Placed all shares")
363 return self.use_peers
365 def _got_response(self, res, peer, shares_to_ask, put_peer_here):
366 if isinstance(res, failure.Failure):
367 # This is unusual, and probably indicates a bug or a network
369 log.msg("%s got error during peer selection: %s" % (peer, res),
370 level=log.UNUSUAL, parent=self._log_parent)
371 self.error_count += 1
372 self.homeless_shares = list(shares_to_ask) + self.homeless_shares
373 if (self.uncontacted_peers
374 or self.contacted_peers
375 or self.contacted_peers2):
376 # there is still hope, so just loop
379 # No more peers, so this upload might fail (it depends upon
380 # whether we've hit shares_of_happiness or not). Log the last
381 # failure we got: if a coding error causes all peers to fail
382 # in the same way, this allows the common failure to be seen
383 # by the uploader and should help with debugging
384 msg = ("last failure (from %s) was: %s" % (peer, res))
385 self.last_failure_msg = msg
387 (alreadygot, allocated) = res
388 log.msg("response from peer %s: alreadygot=%s, allocated=%s"
389 % (idlib.shortnodeid_b2a(peer.peerid),
390 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
391 level=log.NOISY, parent=self._log_parent)
394 if self.preexisting_shares.has_key(s):
395 old_size = len(servers_with_unique_shares(self.preexisting_shares))
396 new_candidate = self.preexisting_shares.copy()
397 new_candidate[s] = peer.peerid
398 new_size = len(servers_with_unique_shares(new_candidate))
399 if old_size >= new_size: continue
400 self.preexisting_shares[s] = peer.peerid
401 if s in self.homeless_shares:
402 self.homeless_shares.remove(s)
405 # the PeerTracker will remember which shares were allocated on
406 # that peer. We just have to remember to use them.
408 self.use_peers.add(peer)
411 not_yet_present = set(shares_to_ask) - set(alreadygot)
412 still_homeless = not_yet_present - set(allocated)
415 # they accepted or already had at least one share, so
416 # progress has been made
417 self.good_query_count += 1
419 self.bad_query_count += 1
422 # In networks with lots of space, this is very unusual and
423 # probably indicates an error. In networks with peers that
424 # are full, it is merely unusual. In networks that are very
425 # full, it is common, and many uploads will fail. In most
426 # cases, this is obviously not fatal, and we'll just use some
429 # some shares are still homeless, keep trying to find them a
430 # home. The ones that were rejected get first priority.
431 self.homeless_shares = (list(still_homeless)
432 + self.homeless_shares)
433 # Since they were unable to accept all of our requests, so it
434 # is safe to assume that asking them again won't help.
436 # if they *were* able to accept everything, they might be
437 # willing to accept even more.
438 put_peer_here.append(peer)
444 class EncryptAnUploadable:
445 """This is a wrapper that takes an IUploadable and provides
446 IEncryptedUploadable."""
447 implements(IEncryptedUploadable)
450 def __init__(self, original, log_parent=None):
451 self.original = IUploadable(original)
452 self._log_number = log_parent
453 self._encryptor = None
454 self._plaintext_hasher = plaintext_hasher()
455 self._plaintext_segment_hasher = None
456 self._plaintext_segment_hashes = []
457 self._encoding_parameters = None
458 self._file_size = None
459 self._ciphertext_bytes_read = 0
462 def set_upload_status(self, upload_status):
463 self._status = IUploadStatus(upload_status)
464 self.original.set_upload_status(upload_status)
466 def log(self, *args, **kwargs):
467 if "facility" not in kwargs:
468 kwargs["facility"] = "upload.encryption"
469 if "parent" not in kwargs:
470 kwargs["parent"] = self._log_number
471 return log.msg(*args, **kwargs)
474 if self._file_size is not None:
475 return defer.succeed(self._file_size)
476 d = self.original.get_size()
478 self._file_size = size
480 self._status.set_size(size)
482 d.addCallback(_got_size)
485 def get_all_encoding_parameters(self):
486 if self._encoding_parameters is not None:
487 return defer.succeed(self._encoding_parameters)
488 d = self.original.get_all_encoding_parameters()
489 def _got(encoding_parameters):
490 (k, happy, n, segsize) = encoding_parameters
491 self._segment_size = segsize # used by segment hashers
492 self._encoding_parameters = encoding_parameters
493 self.log("my encoding parameters: %s" % (encoding_parameters,),
495 return encoding_parameters
499 def _get_encryptor(self):
501 return defer.succeed(self._encryptor)
503 d = self.original.get_encryption_key()
508 storage_index = storage_index_hash(key)
509 assert isinstance(storage_index, str)
510 # There's no point to having the SI be longer than the key, so we
511 # specify that it is truncated to the same 128 bits as the AES key.
512 assert len(storage_index) == 16 # SHA-256 truncated to 128b
513 self._storage_index = storage_index
515 self._status.set_storage_index(storage_index)
520 def get_storage_index(self):
521 d = self._get_encryptor()
522 d.addCallback(lambda res: self._storage_index)
525 def _get_segment_hasher(self):
526 p = self._plaintext_segment_hasher
528 left = self._segment_size - self._plaintext_segment_hashed_bytes
530 p = plaintext_segment_hasher()
531 self._plaintext_segment_hasher = p
532 self._plaintext_segment_hashed_bytes = 0
533 return p, self._segment_size
535 def _update_segment_hash(self, chunk):
537 while offset < len(chunk):
538 p, segment_left = self._get_segment_hasher()
539 chunk_left = len(chunk) - offset
540 this_segment = min(chunk_left, segment_left)
541 p.update(chunk[offset:offset+this_segment])
542 self._plaintext_segment_hashed_bytes += this_segment
544 if self._plaintext_segment_hashed_bytes == self._segment_size:
545 # we've filled this segment
546 self._plaintext_segment_hashes.append(p.digest())
547 self._plaintext_segment_hasher = None
548 self.log("closed hash [%d]: %dB" %
549 (len(self._plaintext_segment_hashes)-1,
550 self._plaintext_segment_hashed_bytes),
552 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
553 segnum=len(self._plaintext_segment_hashes)-1,
554 hash=base32.b2a(p.digest()),
557 offset += this_segment
560 def read_encrypted(self, length, hash_only):
561 # make sure our parameters have been set up first
562 d = self.get_all_encoding_parameters()
564 d.addCallback(lambda ignored: self.get_size())
565 d.addCallback(lambda ignored: self._get_encryptor())
566 # then fetch and encrypt the plaintext. The unusual structure here
567 # (passing a Deferred *into* a function) is needed to avoid
568 # overflowing the stack: Deferreds don't optimize out tail recursion.
569 # We also pass in a list, to which _read_encrypted will append
572 d2 = defer.Deferred()
573 d.addCallback(lambda ignored:
574 self._read_encrypted(length, ciphertext, hash_only, d2))
575 d.addCallback(lambda ignored: d2)
578 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
580 fire_when_done.callback(ciphertext)
582 # tolerate large length= values without consuming a lot of RAM by
583 # reading just a chunk (say 50kB) at a time. This only really matters
584 # when hash_only==True (i.e. resuming an interrupted upload), since
585 # that's the case where we will be skipping over a lot of data.
586 size = min(remaining, self.CHUNKSIZE)
587 remaining = remaining - size
588 # read a chunk of plaintext..
589 d = defer.maybeDeferred(self.original.read, size)
590 # N.B.: if read() is synchronous, then since everything else is
591 # actually synchronous too, we'd blow the stack unless we stall for a
592 # tick. Once you accept a Deferred from IUploadable.read(), you must
593 # be prepared to have it fire immediately too.
594 d.addCallback(fireEventually)
595 def _good(plaintext):
597 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
598 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
599 ciphertext.extend(ct)
600 self._read_encrypted(remaining, ciphertext, hash_only,
603 fire_when_done.errback(why)
608 def _hash_and_encrypt_plaintext(self, data, hash_only):
609 assert isinstance(data, (tuple, list)), type(data)
612 # we use data.pop(0) instead of 'for chunk in data' to save
613 # memory: each chunk is destroyed as soon as we're done with it.
617 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
619 bytes_processed += len(chunk)
620 self._plaintext_hasher.update(chunk)
621 self._update_segment_hash(chunk)
622 # TODO: we have to encrypt the data (even if hash_only==True)
623 # because pycryptopp's AES-CTR implementation doesn't offer a
624 # way to change the counter value. Once pycryptopp acquires
625 # this ability, change this to simply update the counter
626 # before each call to (hash_only==False) _encryptor.process()
627 ciphertext = self._encryptor.process(chunk)
629 self.log(" skipping encryption", level=log.NOISY)
631 cryptdata.append(ciphertext)
634 self._ciphertext_bytes_read += bytes_processed
636 progress = float(self._ciphertext_bytes_read) / self._file_size
637 self._status.set_progress(1, progress)
641 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
642 # this is currently unused, but will live again when we fix #453
643 if len(self._plaintext_segment_hashes) < num_segments:
644 # close out the last one
645 assert len(self._plaintext_segment_hashes) == num_segments-1
646 p, segment_left = self._get_segment_hasher()
647 self._plaintext_segment_hashes.append(p.digest())
648 del self._plaintext_segment_hasher
649 self.log("closing plaintext leaf hasher, hashed %d bytes" %
650 self._plaintext_segment_hashed_bytes,
652 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
653 segnum=len(self._plaintext_segment_hashes)-1,
654 hash=base32.b2a(p.digest()),
656 assert len(self._plaintext_segment_hashes) == num_segments
657 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
659 def get_plaintext_hash(self):
660 h = self._plaintext_hasher.digest()
661 return defer.succeed(h)
664 return self.original.close()
667 implements(IUploadStatus)
668 statusid_counter = itertools.count(0)
671 self.storage_index = None
674 self.status = "Not started"
675 self.progress = [0.0, 0.0, 0.0]
678 self.counter = self.statusid_counter.next()
679 self.started = time.time()
681 def get_started(self):
683 def get_storage_index(self):
684 return self.storage_index
687 def using_helper(self):
689 def get_status(self):
691 def get_progress(self):
692 return tuple(self.progress)
693 def get_active(self):
695 def get_results(self):
697 def get_counter(self):
700 def set_storage_index(self, si):
701 self.storage_index = si
702 def set_size(self, size):
704 def set_helper(self, helper):
706 def set_status(self, status):
708 def set_progress(self, which, value):
709 # [0]: chk, [1]: ciphertext, [2]: encode+push
710 self.progress[which] = value
711 def set_active(self, value):
713 def set_results(self, value):
717 peer_selector_class = Tahoe2PeerSelector
719 def __init__(self, storage_broker, secret_holder):
720 # peer_selector needs storage_broker and secret_holder
721 self._storage_broker = storage_broker
722 self._secret_holder = secret_holder
723 self._log_number = self.log("CHKUploader starting", parent=None)
725 self._results = UploadResults()
726 self._storage_index = None
727 self._upload_status = UploadStatus()
728 self._upload_status.set_helper(False)
729 self._upload_status.set_active(True)
730 self._upload_status.set_results(self._results)
732 # locate_all_shareholders() will create the following attribute:
733 # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
735 def log(self, *args, **kwargs):
736 if "parent" not in kwargs:
737 kwargs["parent"] = self._log_number
738 if "facility" not in kwargs:
739 kwargs["facility"] = "tahoe.upload"
740 return log.msg(*args, **kwargs)
742 def start(self, encrypted_uploadable):
743 """Start uploading the file.
745 Returns a Deferred that will fire with the UploadResults instance.
748 self._started = time.time()
749 eu = IEncryptedUploadable(encrypted_uploadable)
750 self.log("starting upload of %s" % eu)
752 eu.set_upload_status(self._upload_status)
753 d = self.start_encrypted(eu)
754 def _done(uploadresults):
755 self._upload_status.set_active(False)
761 """Call this if the upload must be abandoned before it completes.
762 This will tell the shareholders to delete their partial shares. I
763 return a Deferred that fires when these messages have been acked."""
764 if not self._encoder:
765 # how did you call abort() before calling start() ?
766 return defer.succeed(None)
767 return self._encoder.abort()
769 def start_encrypted(self, encrypted):
770 """ Returns a Deferred that will fire with the UploadResults instance. """
771 eu = IEncryptedUploadable(encrypted)
773 started = time.time()
774 self._encoder = e = encode.Encoder(self._log_number,
776 d = e.set_encrypted_uploadable(eu)
777 d.addCallback(self.locate_all_shareholders, started)
778 d.addCallback(self.set_shareholders, e)
779 d.addCallback(lambda res: e.start())
780 d.addCallback(self._encrypted_done)
783 def locate_all_shareholders(self, encoder, started):
784 peer_selection_started = now = time.time()
785 self._storage_index_elapsed = now - started
786 storage_broker = self._storage_broker
787 secret_holder = self._secret_holder
788 storage_index = encoder.get_param("storage_index")
789 self._storage_index = storage_index
790 upload_id = si_b2a(storage_index)[:5]
791 self.log("using storage index %s" % upload_id)
792 peer_selector = self.peer_selector_class(upload_id, self._log_number,
795 share_size = encoder.get_param("share_size")
796 block_size = encoder.get_param("block_size")
797 num_segments = encoder.get_param("num_segments")
798 k,desired,n = encoder.get_param("share_counts")
800 self._peer_selection_started = time.time()
801 d = peer_selector.get_shareholders(storage_broker, secret_holder,
803 share_size, block_size,
804 num_segments, n, desired)
806 self._peer_selection_elapsed = time.time() - peer_selection_started
811 def set_shareholders(self, (used_peers, already_peers), encoder):
813 @param used_peers: a sequence of PeerTracker objects
814 @paran already_peers: a dict mapping sharenum to a peerid that
815 claims to already have this share
817 self.log("_send_shares, used_peers is %s" % (used_peers,))
818 # record already-present shares in self._results
819 self._results.preexisting_shares = len(already_peers)
821 self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
822 for peer in used_peers:
823 assert isinstance(peer, PeerTracker)
825 servermap = already_peers.copy()
826 for peer in used_peers:
827 buckets.update(peer.buckets)
828 for shnum in peer.buckets:
829 self._peer_trackers[shnum] = peer
830 servermap[shnum] = peer.peerid
831 assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
832 encoder.set_shareholders(buckets, servermap)
834 def _encrypted_done(self, verifycap):
835 """ Returns a Deferred that will fire with the UploadResults instance. """
837 for shnum in self._encoder.get_shares_placed():
838 peer_tracker = self._peer_trackers[shnum]
839 peerid = peer_tracker.peerid
840 r.sharemap.add(shnum, peerid)
841 r.servermap.add(peerid, shnum)
842 r.pushed_shares = len(self._encoder.get_shares_placed())
844 r.file_size = self._encoder.file_size
845 r.timings["total"] = now - self._started
846 r.timings["storage_index"] = self._storage_index_elapsed
847 r.timings["peer_selection"] = self._peer_selection_elapsed
848 r.timings.update(self._encoder.get_times())
849 r.uri_extension_data = self._encoder.get_uri_extension_data()
850 r.verifycapstr = verifycap.to_string()
853 def get_upload_status(self):
854 return self._upload_status
856 def read_this_many_bytes(uploadable, size, prepend_data=[]):
858 return defer.succeed([])
859 d = uploadable.read(size)
861 assert isinstance(data, list)
862 bytes = sum([len(piece) for piece in data])
865 remaining = size - bytes
867 return read_this_many_bytes(uploadable, remaining,
869 return prepend_data + data
873 class LiteralUploader:
876 self._results = UploadResults()
877 self._status = s = UploadStatus()
878 s.set_storage_index(None)
880 s.set_progress(0, 1.0)
882 s.set_results(self._results)
884 def start(self, uploadable):
885 uploadable = IUploadable(uploadable)
886 d = uploadable.get_size()
889 self._status.set_size(size)
890 self._results.file_size = size
891 return read_this_many_bytes(uploadable, size)
892 d.addCallback(_got_size)
893 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
894 d.addCallback(lambda u: u.to_string())
895 d.addCallback(self._build_results)
898 def _build_results(self, uri):
899 self._results.uri = uri
900 self._status.set_status("Finished")
901 self._status.set_progress(1, 1.0)
902 self._status.set_progress(2, 1.0)
908 def get_upload_status(self):
911 class RemoteEncryptedUploadable(Referenceable):
912 implements(RIEncryptedUploadable)
914 def __init__(self, encrypted_uploadable, upload_status):
915 self._eu = IEncryptedUploadable(encrypted_uploadable)
918 self._status = IUploadStatus(upload_status)
919 # we are responsible for updating the status string while we run, and
920 # for setting the ciphertext-fetch progress.
924 if self._size is not None:
925 return defer.succeed(self._size)
926 d = self._eu.get_size()
930 d.addCallback(_got_size)
933 def remote_get_size(self):
934 return self.get_size()
935 def remote_get_all_encoding_parameters(self):
936 return self._eu.get_all_encoding_parameters()
938 def _read_encrypted(self, length, hash_only):
939 d = self._eu.read_encrypted(length, hash_only)
942 self._offset += length
944 size = sum([len(data) for data in strings])
950 def remote_read_encrypted(self, offset, length):
951 # we don't support seek backwards, but we allow skipping forwards
952 precondition(offset >= 0, offset)
953 precondition(length >= 0, length)
954 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
956 precondition(offset >= self._offset, offset, self._offset)
957 if offset > self._offset:
958 # read the data from disk anyways, to build up the hash tree
959 skip = offset - self._offset
960 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
961 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
962 d = self._read_encrypted(skip, hash_only=True)
964 d = defer.succeed(None)
966 def _at_correct_offset(res):
967 assert offset == self._offset, "%d != %d" % (offset, self._offset)
968 return self._read_encrypted(length, hash_only=False)
969 d.addCallback(_at_correct_offset)
972 size = sum([len(data) for data in strings])
973 self._bytes_sent += size
978 def remote_close(self):
979 return self._eu.close()
982 class AssistedUploader:
984 def __init__(self, helper):
985 self._helper = helper
986 self._log_number = log.msg("AssistedUploader starting")
987 self._storage_index = None
988 self._upload_status = s = UploadStatus()
992 def log(self, *args, **kwargs):
993 if "parent" not in kwargs:
994 kwargs["parent"] = self._log_number
995 return log.msg(*args, **kwargs)
997 def start(self, encrypted_uploadable, storage_index):
998 """Start uploading the file.
1000 Returns a Deferred that will fire with the UploadResults instance.
1002 precondition(isinstance(storage_index, str), storage_index)
1003 self._started = time.time()
1004 eu = IEncryptedUploadable(encrypted_uploadable)
1005 eu.set_upload_status(self._upload_status)
1006 self._encuploadable = eu
1007 self._storage_index = storage_index
1009 d.addCallback(self._got_size)
1010 d.addCallback(lambda res: eu.get_all_encoding_parameters())
1011 d.addCallback(self._got_all_encoding_parameters)
1012 d.addCallback(self._contact_helper)
1013 d.addCallback(self._build_verifycap)
1015 self._upload_status.set_active(False)
1020 def _got_size(self, size):
1022 self._upload_status.set_size(size)
1024 def _got_all_encoding_parameters(self, params):
1025 k, happy, n, segment_size = params
1026 # stash these for URI generation later
1027 self._needed_shares = k
1028 self._total_shares = n
1029 self._segment_size = segment_size
1031 def _contact_helper(self, res):
1032 now = self._time_contacting_helper_start = time.time()
1033 self._storage_index_elapsed = now - self._started
1034 self.log(format="contacting helper for SI %(si)s..",
1035 si=si_b2a(self._storage_index))
1036 self._upload_status.set_status("Contacting Helper")
1037 d = self._helper.callRemote("upload_chk", self._storage_index)
1038 d.addCallback(self._contacted_helper)
1041 def _contacted_helper(self, (upload_results, upload_helper)):
1043 elapsed = now - self._time_contacting_helper_start
1044 self._elapsed_time_contacting_helper = elapsed
1046 self.log("helper says we need to upload")
1047 self._upload_status.set_status("Uploading Ciphertext")
1048 # we need to upload the file
1049 reu = RemoteEncryptedUploadable(self._encuploadable,
1050 self._upload_status)
1051 # let it pre-compute the size for progress purposes
1053 d.addCallback(lambda ignored:
1054 upload_helper.callRemote("upload", reu))
1055 # this Deferred will fire with the upload results
1057 self.log("helper says file is already uploaded")
1058 self._upload_status.set_progress(1, 1.0)
1059 self._upload_status.set_results(upload_results)
1060 return upload_results
1062 def _convert_old_upload_results(self, upload_results):
1063 # pre-1.3.0 helpers return upload results which contain a mapping
1064 # from shnum to a single human-readable string, containing things
1065 # like "Found on [x],[y],[z]" (for healthy files that were already in
1066 # the grid), "Found on [x]" (for files that needed upload but which
1067 # discovered pre-existing shares), and "Placed on [x]" (for newly
1068 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1069 # set of binary serverid strings.
1071 # the old results are too hard to deal with (they don't even contain
1072 # as much information as the new results, since the nodeids are
1073 # abbreviated), so if we detect old results, just clobber them.
1075 sharemap = upload_results.sharemap
1076 if str in [type(v) for v in sharemap.values()]:
1077 upload_results.sharemap = None
1079 def _build_verifycap(self, upload_results):
1080 self.log("upload finished, building readcap")
1081 self._convert_old_upload_results(upload_results)
1082 self._upload_status.set_status("Building Readcap")
1084 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1085 assert r.uri_extension_data["total_shares"] == self._total_shares
1086 assert r.uri_extension_data["segment_size"] == self._segment_size
1087 assert r.uri_extension_data["size"] == self._size
1088 r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1089 uri_extension_hash=r.uri_extension_hash,
1090 needed_shares=self._needed_shares,
1091 total_shares=self._total_shares, size=self._size
1094 r.file_size = self._size
1095 r.timings["storage_index"] = self._storage_index_elapsed
1096 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1097 if "total" in r.timings:
1098 r.timings["helper_total"] = r.timings["total"]
1099 r.timings["total"] = now - self._started
1100 self._upload_status.set_status("Finished")
1101 self._upload_status.set_results(r)
1104 def get_upload_status(self):
1105 return self._upload_status
1107 class BaseUploadable:
1108 default_max_segment_size = 128*KiB # overridden by max_segment_size
1109 default_encoding_param_k = 3 # overridden by encoding_parameters
1110 default_encoding_param_happy = 7
1111 default_encoding_param_n = 10
1113 max_segment_size = None
1114 encoding_param_k = None
1115 encoding_param_happy = None
1116 encoding_param_n = None
1118 _all_encoding_parameters = None
1121 def set_upload_status(self, upload_status):
1122 self._status = IUploadStatus(upload_status)
1124 def set_default_encoding_parameters(self, default_params):
1125 assert isinstance(default_params, dict)
1126 for k,v in default_params.items():
1127 precondition(isinstance(k, str), k, v)
1128 precondition(isinstance(v, int), k, v)
1129 if "k" in default_params:
1130 self.default_encoding_param_k = default_params["k"]
1131 if "happy" in default_params:
1132 self.default_encoding_param_happy = default_params["happy"]
1133 if "n" in default_params:
1134 self.default_encoding_param_n = default_params["n"]
1135 if "max_segment_size" in default_params:
1136 self.default_max_segment_size = default_params["max_segment_size"]
1138 def get_all_encoding_parameters(self):
1139 if self._all_encoding_parameters:
1140 return defer.succeed(self._all_encoding_parameters)
1142 max_segsize = self.max_segment_size or self.default_max_segment_size
1143 k = self.encoding_param_k or self.default_encoding_param_k
1144 happy = self.encoding_param_happy or self.default_encoding_param_happy
1145 n = self.encoding_param_n or self.default_encoding_param_n
1148 def _got_size(file_size):
1149 # for small files, shrink the segment size to avoid wasting space
1150 segsize = min(max_segsize, file_size)
1151 # this must be a multiple of 'required_shares'==k
1152 segsize = mathutil.next_multiple(segsize, k)
1153 encoding_parameters = (k, happy, n, segsize)
1154 self._all_encoding_parameters = encoding_parameters
1155 return encoding_parameters
1156 d.addCallback(_got_size)
1159 class FileHandle(BaseUploadable):
1160 implements(IUploadable)
1162 def __init__(self, filehandle, convergence):
1164 Upload the data from the filehandle. If convergence is None then a
1165 random encryption key will be used, else the plaintext will be hashed,
1166 then the hash will be hashed together with the string in the
1167 "convergence" argument to form the encryption key.
1169 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1170 self._filehandle = filehandle
1172 self.convergence = convergence
1175 def _get_encryption_key_convergent(self):
1176 if self._key is not None:
1177 return defer.succeed(self._key)
1180 # that sets self._size as a side-effect
1181 d.addCallback(lambda size: self.get_all_encoding_parameters())
1183 k, happy, n, segsize = params
1184 f = self._filehandle
1185 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1190 data = f.read(BLOCKSIZE)
1193 enckey_hasher.update(data)
1194 # TODO: setting progress in a non-yielding loop is kind of
1195 # pointless, but I'm anticipating (perhaps prematurely) the
1196 # day when we use a slowjob or twisted's CooperatorService to
1197 # make this yield time to other jobs.
1198 bytes_read += len(data)
1200 self._status.set_progress(0, float(bytes_read)/self._size)
1202 self._key = enckey_hasher.digest()
1204 self._status.set_progress(0, 1.0)
1205 assert len(self._key) == 16
1210 def _get_encryption_key_random(self):
1211 if self._key is None:
1212 self._key = os.urandom(16)
1213 return defer.succeed(self._key)
1215 def get_encryption_key(self):
1216 if self.convergence is not None:
1217 return self._get_encryption_key_convergent()
1219 return self._get_encryption_key_random()
1222 if self._size is not None:
1223 return defer.succeed(self._size)
1224 self._filehandle.seek(0,2)
1225 size = self._filehandle.tell()
1227 self._filehandle.seek(0)
1228 return defer.succeed(size)
1230 def read(self, length):
1231 return defer.succeed([self._filehandle.read(length)])
1234 # the originator of the filehandle reserves the right to close it
1237 class FileName(FileHandle):
1238 def __init__(self, filename, convergence):
1240 Upload the data from the filename. If convergence is None then a
1241 random encryption key will be used, else the plaintext will be hashed,
1242 then the hash will be hashed together with the string in the
1243 "convergence" argument to form the encryption key.
1245 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1246 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1248 FileHandle.close(self)
1249 self._filehandle.close()
1251 class Data(FileHandle):
1252 def __init__(self, data, convergence):
1254 Upload the data from the data argument. If convergence is None then a
1255 random encryption key will be used, else the plaintext will be hashed,
1256 then the hash will be hashed together with the string in the
1257 "convergence" argument to form the encryption key.
1259 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1260 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1262 class Uploader(service.MultiService, log.PrefixingLogMixin):
1263 """I am a service that allows file uploading. I am a service-child of the
1266 implements(IUploader)
1268 URI_LIT_SIZE_THRESHOLD = 55
1270 def __init__(self, helper_furl=None, stats_provider=None):
1271 self._helper_furl = helper_furl
1272 self.stats_provider = stats_provider
1274 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1275 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1276 service.MultiService.__init__(self)
1278 def startService(self):
1279 service.MultiService.startService(self)
1280 if self._helper_furl:
1281 self.parent.tub.connectTo(self._helper_furl,
1284 def _got_helper(self, helper):
1285 self.log("got helper connection, getting versions")
1286 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1288 "application-version": "unknown: no get_version()",
1290 d = add_version_to_remote_reference(helper, default)
1291 d.addCallback(self._got_versioned_helper)
1293 def _got_versioned_helper(self, helper):
1294 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1295 if needed not in helper.version:
1296 raise InsufficientVersionError(needed, helper.version)
1297 self._helper = helper
1298 helper.notifyOnDisconnect(self._lost_helper)
1300 def _lost_helper(self):
1303 def get_helper_info(self):
1304 # return a tuple of (helper_furl_or_None, connected_bool)
1305 return (self._helper_furl, bool(self._helper))
1308 def upload(self, uploadable, history=None):
1310 Returns a Deferred that will fire with the UploadResults instance.
1315 uploadable = IUploadable(uploadable)
1316 d = uploadable.get_size()
1317 def _got_size(size):
1318 default_params = self.parent.get_encoding_parameters()
1319 precondition(isinstance(default_params, dict), default_params)
1320 precondition("max_segment_size" in default_params, default_params)
1321 uploadable.set_default_encoding_parameters(default_params)
1323 if self.stats_provider:
1324 self.stats_provider.count('uploader.files_uploaded', 1)
1325 self.stats_provider.count('uploader.bytes_uploaded', size)
1327 if size <= self.URI_LIT_SIZE_THRESHOLD:
1328 uploader = LiteralUploader()
1329 return uploader.start(uploadable)
1331 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1332 d2 = defer.succeed(None)
1334 uploader = AssistedUploader(self._helper)
1335 d2.addCallback(lambda x: eu.get_storage_index())
1336 d2.addCallback(lambda si: uploader.start(eu, si))
1338 storage_broker = self.parent.get_storage_broker()
1339 secret_holder = self.parent._secret_holder
1340 uploader = CHKUploader(storage_broker, secret_holder)
1341 d2.addCallback(lambda x: uploader.start(eu))
1343 self._all_uploads[uploader] = None
1345 history.add_upload(uploader.get_upload_status())
1346 def turn_verifycap_into_read_cap(uploadresults):
1347 # Generate the uri from the verifycap plus the key.
1348 d3 = uploadable.get_encryption_key()
1349 def put_readcap_into_results(key):
1350 v = uri.from_string(uploadresults.verifycapstr)
1351 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1352 uploadresults.uri = r.to_string()
1353 return uploadresults
1354 d3.addCallback(put_readcap_into_results)
1356 d2.addCallback(turn_verifycap_into_read_cap)
1358 d.addCallback(_got_size)