1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9 file_cancel_secret_hash, bucket_renewal_secret_hash, \
10 bucket_cancel_secret_hash, plaintext_hasher, \
11 storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17 shares_by_server, merge_peers, \
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23 NoServersError, InsufficientVersionError, UploadUnhappinessError
24 from allmydata.immutable import layout
25 from pycryptopp.cipher.aes import AES
27 from cStringIO import StringIO
36 class HaveAllPeersError(Exception):
37 # we use this to jump out of the loop
40 # this wants to live in storage, not here
41 class TooFullError(Exception):
44 class UploadResults(Copyable, RemoteCopy):
45 implements(IUploadResults)
46 # note: don't change this string, it needs to match the value used on the
47 # helper, and it does *not* need to match the fully-qualified
48 # package/module/class name
49 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
52 # also, think twice about changing the shape of any existing attribute,
53 # because instances of this class are sent from the helper to its client,
54 # so changing this may break compatibility. Consider adding new fields
55 # instead of modifying existing ones.
58 self.timings = {} # dict of name to number of seconds
59 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
60 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
62 self.ciphertext_fetched = None # how much the helper fetched
64 self.preexisting_shares = None # count of shares already present
65 self.pushed_shares = None # count of shares we pushed
68 # our current uri_extension is 846 bytes for small files, a few bytes
69 # more for larger ones (since the filesize is encoded in decimal in a
70 # few places). Ask for a little bit more just in case we need it. If
71 # the extension changes size, we can change EXTENSION_SIZE to
72 # allocate a more accurate amount of space.
74 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
78 def __init__(self, peerid, storage_server,
79 sharesize, blocksize, num_segments, num_share_hashes,
81 bucket_renewal_secret, bucket_cancel_secret):
82 precondition(isinstance(peerid, str), peerid)
83 precondition(len(peerid) == 20, peerid)
85 self._storageserver = storage_server # to an RIStorageServer
86 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
87 self.sharesize = sharesize
89 wbp = layout.make_write_bucket_proxy(None, sharesize,
90 blocksize, num_segments,
92 EXTENSION_SIZE, peerid)
93 self.wbp_class = wbp.__class__ # to create more of them
94 self.allocated_size = wbp.get_allocated_size()
95 self.blocksize = blocksize
96 self.num_segments = num_segments
97 self.num_share_hashes = num_share_hashes
98 self.storage_index = storage_index
100 self.renew_secret = bucket_renewal_secret
101 self.cancel_secret = bucket_cancel_secret
104 return ("<PeerTracker for peer %s and SI %s>"
105 % (idlib.shortnodeid_b2a(self.peerid),
106 si_b2a(self.storage_index)[:5]))
108 def query(self, sharenums):
109 d = self._storageserver.callRemote("allocate_buckets",
115 canary=Referenceable())
116 d.addCallback(self._got_reply)
119 def ask_about_existing_shares(self):
120 return self._storageserver.callRemote("get_buckets",
123 def _got_reply(self, (alreadygot, buckets)):
124 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
126 for sharenum, rref in buckets.iteritems():
127 bp = self.wbp_class(rref, self.sharesize,
130 self.num_share_hashes,
134 self.buckets.update(b)
135 return (alreadygot, set(b.keys()))
138 class Tahoe2PeerSelector:
140 def __init__(self, upload_id, logparent=None, upload_status=None):
141 self.upload_id = upload_id
142 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
143 # Peers that are working normally, but full.
146 self.num_peers_contacted = 0
147 self.last_failure_msg = None
148 self._status = IUploadStatus(upload_status)
149 self._log_parent = log.msg("%s starting" % self, parent=logparent)
152 return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
154 def get_shareholders(self, storage_broker, secret_holder,
155 storage_index, share_size, block_size,
156 num_segments, total_shares, needed_shares,
157 servers_of_happiness):
159 @return: (used_peers, already_peers), where used_peers is a set of
160 PeerTracker instances that have agreed to hold some shares
161 for us (the shnum is stashed inside the PeerTracker),
162 and already_peers is a dict mapping shnum to a set of peers
163 which claim to already have the share.
167 self._status.set_status("Contacting Peers..")
169 self.total_shares = total_shares
170 self.servers_of_happiness = servers_of_happiness
171 self.needed_shares = needed_shares
173 self.homeless_shares = range(total_shares)
174 self.contacted_peers = [] # peers worth asking again
175 self.contacted_peers2 = [] # peers that we have asked again
176 self._started_second_pass = False
177 self.use_peers = set() # PeerTrackers that have shares assigned to them
178 self.preexisting_shares = {} # shareid => set(peerids) holding shareid
179 # We don't try to allocate shares to these servers, since they've said
180 # that they're incapable of storing shares of the size that we'd want
181 # to store. We keep them around because they may have existing shares
182 # for this storage index, which we want to know about for accurate
183 # servers_of_happiness accounting
184 # (this is eventually a list, but it is initialized later)
185 self.readonly_peers = None
186 # These peers have shares -- any shares -- for our SI. We keep
187 # track of these to write an error message with them later.
188 self.peers_with_shares = set()
190 # this needed_hashes computation should mirror
191 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
192 # (instead of a HashTree) because we don't require actual hashing
193 # just to count the levels.
194 ht = hashtree.IncompleteHashTree(total_shares)
195 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
197 # figure out how much space to ask for
198 wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
199 num_share_hashes, EXTENSION_SIZE,
201 allocated_size = wbp.get_allocated_size()
202 all_peers = storage_broker.get_servers_for_index(storage_index)
204 raise NoServersError("client gave us zero peers")
206 # filter the list of peers according to which ones can accomodate
207 # this request. This excludes older peers (which used a 4-byte size
208 # field) from getting large shares (for files larger than about
209 # 12GiB). See #439 for details.
210 def _get_maxsize(peer):
211 (peerid, conn) = peer
212 v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
213 return v1["maximum-immutable-share-size"]
214 writable_peers = [peer for peer in all_peers
215 if _get_maxsize(peer) >= allocated_size]
216 readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers)
218 # decide upon the renewal/cancel secrets, to include them in the
219 # allocate_buckets query.
220 client_renewal_secret = secret_holder.get_renewal_secret()
221 client_cancel_secret = secret_holder.get_cancel_secret()
223 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
225 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
227 def _make_trackers(peers):
228 return [PeerTracker(peerid, conn,
229 share_size, block_size,
230 num_segments, num_share_hashes,
232 bucket_renewal_secret_hash(file_renewal_secret,
234 bucket_cancel_secret_hash(file_cancel_secret,
236 for (peerid, conn) in peers]
237 self.uncontacted_peers = _make_trackers(writable_peers)
238 self.readonly_peers = _make_trackers(readonly_peers)
239 # We now ask peers that can't hold any new shares about existing
240 # shares that they might have for our SI. Once this is done, we
241 # start placing the shares that we haven't already accounted
244 if self._status and self.readonly_peers:
245 self._status.set_status("Contacting readonly peers to find "
246 "any existing shares")
247 for peer in self.readonly_peers:
248 assert isinstance(peer, PeerTracker)
249 d = peer.ask_about_existing_shares()
250 d.addBoth(self._handle_existing_response, peer.peerid)
252 self.num_peers_contacted += 1
253 self.query_count += 1
254 log.msg("asking peer %s for any existing shares for "
256 % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
257 level=log.NOISY, parent=self._log_parent)
258 dl = defer.DeferredList(ds)
259 dl.addCallback(lambda ign: self._loop())
263 def _handle_existing_response(self, res, peer):
265 I handle responses to the queries sent by
266 Tahoe2PeerSelector._existing_shares.
268 if isinstance(res, failure.Failure):
269 log.msg("%s got error during existing shares check: %s"
270 % (idlib.shortnodeid_b2a(peer), res),
271 level=log.UNUSUAL, parent=self._log_parent)
272 self.error_count += 1
273 self.bad_query_count += 1
277 self.peers_with_shares.add(peer)
278 log.msg("response from peer %s: alreadygot=%s"
279 % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
280 level=log.NOISY, parent=self._log_parent)
281 for bucket in buckets:
282 self.preexisting_shares.setdefault(bucket, set()).add(peer)
283 if self.homeless_shares and bucket in self.homeless_shares:
284 self.homeless_shares.remove(bucket)
286 self.bad_query_count += 1
289 def _get_progress_message(self):
290 if not self.homeless_shares:
291 msg = "placed all %d shares, " % (self.total_shares)
293 msg = ("placed %d shares out of %d total (%d homeless), " %
294 (self.total_shares - len(self.homeless_shares),
296 len(self.homeless_shares)))
297 return (msg + "want to place shares on at least %d servers such that "
298 "any %d of them have enough shares to recover the file, "
299 "sent %d queries to %d peers, "
300 "%d queries placed some shares, %d placed none "
301 "(of which %d placed none due to the server being"
302 " full and %d placed none due to an error)" %
303 (self.servers_of_happiness, self.needed_shares,
304 self.query_count, self.num_peers_contacted,
305 self.good_query_count, self.bad_query_count,
306 self.full_count, self.error_count))
310 if not self.homeless_shares:
311 merged = merge_peers(self.preexisting_shares, self.use_peers)
312 effective_happiness = servers_of_happiness(merged)
313 if self.servers_of_happiness <= effective_happiness:
314 msg = ("peer selection successful for %s: %s" % (self,
315 self._get_progress_message()))
316 log.msg(msg, parent=self._log_parent)
317 return (self.use_peers, self.preexisting_shares)
319 # We're not okay right now, but maybe we can fix it by
320 # redistributing some shares. In cases where one or two
321 # servers has, before the upload, all or most of the
322 # shares for a given SI, this can work by allowing _loop
323 # a chance to spread those out over the other peers,
324 delta = self.servers_of_happiness - effective_happiness
325 shares = shares_by_server(self.preexisting_shares)
326 # Each server in shares maps to a set of shares stored on it.
327 # Since we want to keep at least one share on each server
328 # that has one (otherwise we'd only be making
329 # the situation worse by removing distinct servers),
330 # each server has len(its shares) - 1 to spread around.
331 shares_to_spread = sum([len(list(sharelist)) - 1
332 for (server, sharelist)
334 if delta <= len(self.uncontacted_peers) and \
335 shares_to_spread >= delta:
336 items = shares.items()
337 while len(self.homeless_shares) < delta:
338 # Loop through the allocated shares, removing
339 # one from each server that has more than one
340 # and putting it back into self.homeless_shares
341 # until we've done this delta times.
342 server, sharelist = items.pop()
343 if len(sharelist) > 1:
344 share = sharelist.pop()
345 self.homeless_shares.append(share)
346 self.preexisting_shares[share].remove(server)
347 if not self.preexisting_shares[share]:
348 del self.preexisting_shares[share]
349 items.append((server, sharelist))
352 # Redistribution won't help us; fail.
353 peer_count = len(self.peers_with_shares)
354 msg = failure_message(peer_count,
356 self.servers_of_happiness,
358 raise UploadUnhappinessError("%s (%s)" % (msg,
359 self._get_progress_message()))
361 if self.uncontacted_peers:
362 peer = self.uncontacted_peers.pop(0)
363 # TODO: don't pre-convert all peerids to PeerTrackers
364 assert isinstance(peer, PeerTracker)
366 shares_to_ask = set([self.homeless_shares.pop(0)])
367 self.query_count += 1
368 self.num_peers_contacted += 1
370 self._status.set_status("Contacting Peers [%s] (first query),"
372 % (idlib.shortnodeid_b2a(peer.peerid),
373 len(self.homeless_shares)))
374 d = peer.query(shares_to_ask)
375 d.addBoth(self._got_response, peer, shares_to_ask,
376 self.contacted_peers)
378 elif self.contacted_peers:
379 # ask a peer that we've already asked.
380 if not self._started_second_pass:
381 log.msg("starting second pass", parent=self._log_parent,
383 self._started_second_pass = True
384 num_shares = mathutil.div_ceil(len(self.homeless_shares),
385 len(self.contacted_peers))
386 peer = self.contacted_peers.pop(0)
387 shares_to_ask = set(self.homeless_shares[:num_shares])
388 self.homeless_shares[:num_shares] = []
389 self.query_count += 1
391 self._status.set_status("Contacting Peers [%s] (second query),"
393 % (idlib.shortnodeid_b2a(peer.peerid),
394 len(self.homeless_shares)))
395 d = peer.query(shares_to_ask)
396 d.addBoth(self._got_response, peer, shares_to_ask,
397 self.contacted_peers2)
399 elif self.contacted_peers2:
400 # we've finished the second-or-later pass. Move all the remaining
401 # peers back into self.contacted_peers for the next pass.
402 self.contacted_peers.extend(self.contacted_peers2)
403 self.contacted_peers2[:] = []
406 # no more peers. If we haven't placed enough shares, we fail.
407 placed_shares = self.total_shares - len(self.homeless_shares)
408 merged = merge_peers(self.preexisting_shares, self.use_peers)
409 effective_happiness = servers_of_happiness(merged)
410 if effective_happiness < self.servers_of_happiness:
411 msg = failure_message(len(self.peers_with_shares),
413 self.servers_of_happiness,
415 msg = ("peer selection failed for %s: %s (%s)" % (self,
417 self._get_progress_message()))
418 if self.last_failure_msg:
419 msg += " (%s)" % (self.last_failure_msg,)
420 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
421 raise UploadUnhappinessError(msg)
423 # we placed enough to be happy, so we're done
425 self._status.set_status("Placed all shares")
426 return (self.use_peers, self.preexisting_shares)
428 def _got_response(self, res, peer, shares_to_ask, put_peer_here):
429 if isinstance(res, failure.Failure):
430 # This is unusual, and probably indicates a bug or a network
432 log.msg("%s got error during peer selection: %s" % (peer, res),
433 level=log.UNUSUAL, parent=self._log_parent)
434 self.error_count += 1
435 self.bad_query_count += 1
436 self.homeless_shares = list(shares_to_ask) + self.homeless_shares
437 if (self.uncontacted_peers
438 or self.contacted_peers
439 or self.contacted_peers2):
440 # there is still hope, so just loop
443 # No more peers, so this upload might fail (it depends upon
444 # whether we've hit servers_of_happiness or not). Log the last
445 # failure we got: if a coding error causes all peers to fail
446 # in the same way, this allows the common failure to be seen
447 # by the uploader and should help with debugging
448 msg = ("last failure (from %s) was: %s" % (peer, res))
449 self.last_failure_msg = msg
451 (alreadygot, allocated) = res
452 log.msg("response from peer %s: alreadygot=%s, allocated=%s"
453 % (idlib.shortnodeid_b2a(peer.peerid),
454 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
455 level=log.NOISY, parent=self._log_parent)
458 self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
459 if s in self.homeless_shares:
460 self.homeless_shares.remove(s)
462 elif s in shares_to_ask:
465 # the PeerTracker will remember which shares were allocated on
466 # that peer. We just have to remember to use them.
468 self.use_peers.add(peer)
471 if allocated or alreadygot:
472 self.peers_with_shares.add(peer.peerid)
474 not_yet_present = set(shares_to_ask) - set(alreadygot)
475 still_homeless = not_yet_present - set(allocated)
478 # They accepted at least one of the shares that we asked
479 # them to accept, or they had a share that we didn't ask
480 # them to accept but that we hadn't placed yet, so this
481 # was a productive query
482 self.good_query_count += 1
484 self.bad_query_count += 1
488 # In networks with lots of space, this is very unusual and
489 # probably indicates an error. In networks with peers that
490 # are full, it is merely unusual. In networks that are very
491 # full, it is common, and many uploads will fail. In most
492 # cases, this is obviously not fatal, and we'll just use some
495 # some shares are still homeless, keep trying to find them a
496 # home. The ones that were rejected get first priority.
497 self.homeless_shares = (list(still_homeless)
498 + self.homeless_shares)
499 # Since they were unable to accept all of our requests, so it
500 # is safe to assume that asking them again won't help.
502 # if they *were* able to accept everything, they might be
503 # willing to accept even more.
504 put_peer_here.append(peer)
510 class EncryptAnUploadable:
511 """This is a wrapper that takes an IUploadable and provides
512 IEncryptedUploadable."""
513 implements(IEncryptedUploadable)
516 def __init__(self, original, log_parent=None):
517 self.original = IUploadable(original)
518 self._log_number = log_parent
519 self._encryptor = None
520 self._plaintext_hasher = plaintext_hasher()
521 self._plaintext_segment_hasher = None
522 self._plaintext_segment_hashes = []
523 self._encoding_parameters = None
524 self._file_size = None
525 self._ciphertext_bytes_read = 0
528 def set_upload_status(self, upload_status):
529 self._status = IUploadStatus(upload_status)
530 self.original.set_upload_status(upload_status)
532 def log(self, *args, **kwargs):
533 if "facility" not in kwargs:
534 kwargs["facility"] = "upload.encryption"
535 if "parent" not in kwargs:
536 kwargs["parent"] = self._log_number
537 return log.msg(*args, **kwargs)
540 if self._file_size is not None:
541 return defer.succeed(self._file_size)
542 d = self.original.get_size()
544 self._file_size = size
546 self._status.set_size(size)
548 d.addCallback(_got_size)
551 def get_all_encoding_parameters(self):
552 if self._encoding_parameters is not None:
553 return defer.succeed(self._encoding_parameters)
554 d = self.original.get_all_encoding_parameters()
555 def _got(encoding_parameters):
556 (k, happy, n, segsize) = encoding_parameters
557 self._segment_size = segsize # used by segment hashers
558 self._encoding_parameters = encoding_parameters
559 self.log("my encoding parameters: %s" % (encoding_parameters,),
561 return encoding_parameters
565 def _get_encryptor(self):
567 return defer.succeed(self._encryptor)
569 d = self.original.get_encryption_key()
574 storage_index = storage_index_hash(key)
575 assert isinstance(storage_index, str)
576 # There's no point to having the SI be longer than the key, so we
577 # specify that it is truncated to the same 128 bits as the AES key.
578 assert len(storage_index) == 16 # SHA-256 truncated to 128b
579 self._storage_index = storage_index
581 self._status.set_storage_index(storage_index)
586 def get_storage_index(self):
587 d = self._get_encryptor()
588 d.addCallback(lambda res: self._storage_index)
591 def _get_segment_hasher(self):
592 p = self._plaintext_segment_hasher
594 left = self._segment_size - self._plaintext_segment_hashed_bytes
596 p = plaintext_segment_hasher()
597 self._plaintext_segment_hasher = p
598 self._plaintext_segment_hashed_bytes = 0
599 return p, self._segment_size
601 def _update_segment_hash(self, chunk):
603 while offset < len(chunk):
604 p, segment_left = self._get_segment_hasher()
605 chunk_left = len(chunk) - offset
606 this_segment = min(chunk_left, segment_left)
607 p.update(chunk[offset:offset+this_segment])
608 self._plaintext_segment_hashed_bytes += this_segment
610 if self._plaintext_segment_hashed_bytes == self._segment_size:
611 # we've filled this segment
612 self._plaintext_segment_hashes.append(p.digest())
613 self._plaintext_segment_hasher = None
614 self.log("closed hash [%d]: %dB" %
615 (len(self._plaintext_segment_hashes)-1,
616 self._plaintext_segment_hashed_bytes),
618 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
619 segnum=len(self._plaintext_segment_hashes)-1,
620 hash=base32.b2a(p.digest()),
623 offset += this_segment
626 def read_encrypted(self, length, hash_only):
627 # make sure our parameters have been set up first
628 d = self.get_all_encoding_parameters()
630 d.addCallback(lambda ignored: self.get_size())
631 d.addCallback(lambda ignored: self._get_encryptor())
632 # then fetch and encrypt the plaintext. The unusual structure here
633 # (passing a Deferred *into* a function) is needed to avoid
634 # overflowing the stack: Deferreds don't optimize out tail recursion.
635 # We also pass in a list, to which _read_encrypted will append
638 d2 = defer.Deferred()
639 d.addCallback(lambda ignored:
640 self._read_encrypted(length, ciphertext, hash_only, d2))
641 d.addCallback(lambda ignored: d2)
644 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
646 fire_when_done.callback(ciphertext)
648 # tolerate large length= values without consuming a lot of RAM by
649 # reading just a chunk (say 50kB) at a time. This only really matters
650 # when hash_only==True (i.e. resuming an interrupted upload), since
651 # that's the case where we will be skipping over a lot of data.
652 size = min(remaining, self.CHUNKSIZE)
653 remaining = remaining - size
654 # read a chunk of plaintext..
655 d = defer.maybeDeferred(self.original.read, size)
656 # N.B.: if read() is synchronous, then since everything else is
657 # actually synchronous too, we'd blow the stack unless we stall for a
658 # tick. Once you accept a Deferred from IUploadable.read(), you must
659 # be prepared to have it fire immediately too.
660 d.addCallback(fireEventually)
661 def _good(plaintext):
663 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
664 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
665 ciphertext.extend(ct)
666 self._read_encrypted(remaining, ciphertext, hash_only,
669 fire_when_done.errback(why)
674 def _hash_and_encrypt_plaintext(self, data, hash_only):
675 assert isinstance(data, (tuple, list)), type(data)
678 # we use data.pop(0) instead of 'for chunk in data' to save
679 # memory: each chunk is destroyed as soon as we're done with it.
683 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
685 bytes_processed += len(chunk)
686 self._plaintext_hasher.update(chunk)
687 self._update_segment_hash(chunk)
688 # TODO: we have to encrypt the data (even if hash_only==True)
689 # because pycryptopp's AES-CTR implementation doesn't offer a
690 # way to change the counter value. Once pycryptopp acquires
691 # this ability, change this to simply update the counter
692 # before each call to (hash_only==False) _encryptor.process()
693 ciphertext = self._encryptor.process(chunk)
695 self.log(" skipping encryption", level=log.NOISY)
697 cryptdata.append(ciphertext)
700 self._ciphertext_bytes_read += bytes_processed
702 progress = float(self._ciphertext_bytes_read) / self._file_size
703 self._status.set_progress(1, progress)
707 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
708 # this is currently unused, but will live again when we fix #453
709 if len(self._plaintext_segment_hashes) < num_segments:
710 # close out the last one
711 assert len(self._plaintext_segment_hashes) == num_segments-1
712 p, segment_left = self._get_segment_hasher()
713 self._plaintext_segment_hashes.append(p.digest())
714 del self._plaintext_segment_hasher
715 self.log("closing plaintext leaf hasher, hashed %d bytes" %
716 self._plaintext_segment_hashed_bytes,
718 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
719 segnum=len(self._plaintext_segment_hashes)-1,
720 hash=base32.b2a(p.digest()),
722 assert len(self._plaintext_segment_hashes) == num_segments
723 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
725 def get_plaintext_hash(self):
726 h = self._plaintext_hasher.digest()
727 return defer.succeed(h)
730 return self.original.close()
733 implements(IUploadStatus)
734 statusid_counter = itertools.count(0)
737 self.storage_index = None
740 self.status = "Not started"
741 self.progress = [0.0, 0.0, 0.0]
744 self.counter = self.statusid_counter.next()
745 self.started = time.time()
747 def get_started(self):
749 def get_storage_index(self):
750 return self.storage_index
753 def using_helper(self):
755 def get_status(self):
757 def get_progress(self):
758 return tuple(self.progress)
759 def get_active(self):
761 def get_results(self):
763 def get_counter(self):
766 def set_storage_index(self, si):
767 self.storage_index = si
768 def set_size(self, size):
770 def set_helper(self, helper):
772 def set_status(self, status):
774 def set_progress(self, which, value):
775 # [0]: chk, [1]: ciphertext, [2]: encode+push
776 self.progress[which] = value
777 def set_active(self, value):
779 def set_results(self, value):
783 peer_selector_class = Tahoe2PeerSelector
785 def __init__(self, storage_broker, secret_holder):
786 # peer_selector needs storage_broker and secret_holder
787 self._storage_broker = storage_broker
788 self._secret_holder = secret_holder
789 self._log_number = self.log("CHKUploader starting", parent=None)
791 self._results = UploadResults()
792 self._storage_index = None
793 self._upload_status = UploadStatus()
794 self._upload_status.set_helper(False)
795 self._upload_status.set_active(True)
796 self._upload_status.set_results(self._results)
798 # locate_all_shareholders() will create the following attribute:
799 # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
801 def log(self, *args, **kwargs):
802 if "parent" not in kwargs:
803 kwargs["parent"] = self._log_number
804 if "facility" not in kwargs:
805 kwargs["facility"] = "tahoe.upload"
806 return log.msg(*args, **kwargs)
808 def start(self, encrypted_uploadable):
809 """Start uploading the file.
811 Returns a Deferred that will fire with the UploadResults instance.
814 self._started = time.time()
815 eu = IEncryptedUploadable(encrypted_uploadable)
816 self.log("starting upload of %s" % eu)
818 eu.set_upload_status(self._upload_status)
819 d = self.start_encrypted(eu)
820 def _done(uploadresults):
821 self._upload_status.set_active(False)
827 """Call this if the upload must be abandoned before it completes.
828 This will tell the shareholders to delete their partial shares. I
829 return a Deferred that fires when these messages have been acked."""
830 if not self._encoder:
831 # how did you call abort() before calling start() ?
832 return defer.succeed(None)
833 return self._encoder.abort()
835 def start_encrypted(self, encrypted):
836 """ Returns a Deferred that will fire with the UploadResults instance. """
837 eu = IEncryptedUploadable(encrypted)
839 started = time.time()
840 self._encoder = e = encode.Encoder(self._log_number,
842 d = e.set_encrypted_uploadable(eu)
843 d.addCallback(self.locate_all_shareholders, started)
844 d.addCallback(self.set_shareholders, e)
845 d.addCallback(lambda res: e.start())
846 d.addCallback(self._encrypted_done)
849 def locate_all_shareholders(self, encoder, started):
850 peer_selection_started = now = time.time()
851 self._storage_index_elapsed = now - started
852 storage_broker = self._storage_broker
853 secret_holder = self._secret_holder
854 storage_index = encoder.get_param("storage_index")
855 self._storage_index = storage_index
856 upload_id = si_b2a(storage_index)[:5]
857 self.log("using storage index %s" % upload_id)
858 peer_selector = self.peer_selector_class(upload_id, self._log_number,
861 share_size = encoder.get_param("share_size")
862 block_size = encoder.get_param("block_size")
863 num_segments = encoder.get_param("num_segments")
864 k,desired,n = encoder.get_param("share_counts")
866 self._peer_selection_started = time.time()
867 d = peer_selector.get_shareholders(storage_broker, secret_holder,
869 share_size, block_size,
870 num_segments, n, k, desired)
872 self._peer_selection_elapsed = time.time() - peer_selection_started
877 def set_shareholders(self, (used_peers, already_peers), encoder):
879 @param used_peers: a sequence of PeerTracker objects
880 @paran already_peers: a dict mapping sharenum to a set of peerids
881 that claim to already have this share
883 self.log("_send_shares, used_peers is %s" % (used_peers,))
884 # record already-present shares in self._results
885 self._results.preexisting_shares = len(already_peers)
887 self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
888 for peer in used_peers:
889 assert isinstance(peer, PeerTracker)
891 servermap = already_peers.copy()
892 for peer in used_peers:
893 buckets.update(peer.buckets)
894 for shnum in peer.buckets:
895 self._peer_trackers[shnum] = peer
896 servermap.setdefault(shnum, set()).add(peer.peerid)
897 assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
898 encoder.set_shareholders(buckets, servermap)
900 def _encrypted_done(self, verifycap):
901 """ Returns a Deferred that will fire with the UploadResults instance. """
903 for shnum in self._encoder.get_shares_placed():
904 peer_tracker = self._peer_trackers[shnum]
905 peerid = peer_tracker.peerid
906 r.sharemap.add(shnum, peerid)
907 r.servermap.add(peerid, shnum)
908 r.pushed_shares = len(self._encoder.get_shares_placed())
910 r.file_size = self._encoder.file_size
911 r.timings["total"] = now - self._started
912 r.timings["storage_index"] = self._storage_index_elapsed
913 r.timings["peer_selection"] = self._peer_selection_elapsed
914 r.timings.update(self._encoder.get_times())
915 r.uri_extension_data = self._encoder.get_uri_extension_data()
916 r.verifycapstr = verifycap.to_string()
919 def get_upload_status(self):
920 return self._upload_status
922 def read_this_many_bytes(uploadable, size, prepend_data=[]):
924 return defer.succeed([])
925 d = uploadable.read(size)
927 assert isinstance(data, list)
928 bytes = sum([len(piece) for piece in data])
931 remaining = size - bytes
933 return read_this_many_bytes(uploadable, remaining,
935 return prepend_data + data
939 class LiteralUploader:
942 self._results = UploadResults()
943 self._status = s = UploadStatus()
944 s.set_storage_index(None)
946 s.set_progress(0, 1.0)
948 s.set_results(self._results)
950 def start(self, uploadable):
951 uploadable = IUploadable(uploadable)
952 d = uploadable.get_size()
955 self._status.set_size(size)
956 self._results.file_size = size
957 return read_this_many_bytes(uploadable, size)
958 d.addCallback(_got_size)
959 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
960 d.addCallback(lambda u: u.to_string())
961 d.addCallback(self._build_results)
964 def _build_results(self, uri):
965 self._results.uri = uri
966 self._status.set_status("Finished")
967 self._status.set_progress(1, 1.0)
968 self._status.set_progress(2, 1.0)
974 def get_upload_status(self):
977 class RemoteEncryptedUploadable(Referenceable):
978 implements(RIEncryptedUploadable)
980 def __init__(self, encrypted_uploadable, upload_status):
981 self._eu = IEncryptedUploadable(encrypted_uploadable)
984 self._status = IUploadStatus(upload_status)
985 # we are responsible for updating the status string while we run, and
986 # for setting the ciphertext-fetch progress.
990 if self._size is not None:
991 return defer.succeed(self._size)
992 d = self._eu.get_size()
996 d.addCallback(_got_size)
999 def remote_get_size(self):
1000 return self.get_size()
1001 def remote_get_all_encoding_parameters(self):
1002 return self._eu.get_all_encoding_parameters()
1004 def _read_encrypted(self, length, hash_only):
1005 d = self._eu.read_encrypted(length, hash_only)
1008 self._offset += length
1010 size = sum([len(data) for data in strings])
1011 self._offset += size
1013 d.addCallback(_read)
1016 def remote_read_encrypted(self, offset, length):
1017 # we don't support seek backwards, but we allow skipping forwards
1018 precondition(offset >= 0, offset)
1019 precondition(length >= 0, length)
1020 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1022 precondition(offset >= self._offset, offset, self._offset)
1023 if offset > self._offset:
1024 # read the data from disk anyways, to build up the hash tree
1025 skip = offset - self._offset
1026 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1027 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1028 d = self._read_encrypted(skip, hash_only=True)
1030 d = defer.succeed(None)
1032 def _at_correct_offset(res):
1033 assert offset == self._offset, "%d != %d" % (offset, self._offset)
1034 return self._read_encrypted(length, hash_only=False)
1035 d.addCallback(_at_correct_offset)
1038 size = sum([len(data) for data in strings])
1039 self._bytes_sent += size
1041 d.addCallback(_read)
1044 def remote_close(self):
1045 return self._eu.close()
1048 class AssistedUploader:
1050 def __init__(self, helper):
1051 self._helper = helper
1052 self._log_number = log.msg("AssistedUploader starting")
1053 self._storage_index = None
1054 self._upload_status = s = UploadStatus()
1058 def log(self, *args, **kwargs):
1059 if "parent" not in kwargs:
1060 kwargs["parent"] = self._log_number
1061 return log.msg(*args, **kwargs)
1063 def start(self, encrypted_uploadable, storage_index):
1064 """Start uploading the file.
1066 Returns a Deferred that will fire with the UploadResults instance.
1068 precondition(isinstance(storage_index, str), storage_index)
1069 self._started = time.time()
1070 eu = IEncryptedUploadable(encrypted_uploadable)
1071 eu.set_upload_status(self._upload_status)
1072 self._encuploadable = eu
1073 self._storage_index = storage_index
1075 d.addCallback(self._got_size)
1076 d.addCallback(lambda res: eu.get_all_encoding_parameters())
1077 d.addCallback(self._got_all_encoding_parameters)
1078 d.addCallback(self._contact_helper)
1079 d.addCallback(self._build_verifycap)
1081 self._upload_status.set_active(False)
1086 def _got_size(self, size):
1088 self._upload_status.set_size(size)
1090 def _got_all_encoding_parameters(self, params):
1091 k, happy, n, segment_size = params
1092 # stash these for URI generation later
1093 self._needed_shares = k
1094 self._total_shares = n
1095 self._segment_size = segment_size
1097 def _contact_helper(self, res):
1098 now = self._time_contacting_helper_start = time.time()
1099 self._storage_index_elapsed = now - self._started
1100 self.log(format="contacting helper for SI %(si)s..",
1101 si=si_b2a(self._storage_index))
1102 self._upload_status.set_status("Contacting Helper")
1103 d = self._helper.callRemote("upload_chk", self._storage_index)
1104 d.addCallback(self._contacted_helper)
1107 def _contacted_helper(self, (upload_results, upload_helper)):
1109 elapsed = now - self._time_contacting_helper_start
1110 self._elapsed_time_contacting_helper = elapsed
1112 self.log("helper says we need to upload")
1113 self._upload_status.set_status("Uploading Ciphertext")
1114 # we need to upload the file
1115 reu = RemoteEncryptedUploadable(self._encuploadable,
1116 self._upload_status)
1117 # let it pre-compute the size for progress purposes
1119 d.addCallback(lambda ignored:
1120 upload_helper.callRemote("upload", reu))
1121 # this Deferred will fire with the upload results
1123 self.log("helper says file is already uploaded")
1124 self._upload_status.set_progress(1, 1.0)
1125 self._upload_status.set_results(upload_results)
1126 return upload_results
1128 def _convert_old_upload_results(self, upload_results):
1129 # pre-1.3.0 helpers return upload results which contain a mapping
1130 # from shnum to a single human-readable string, containing things
1131 # like "Found on [x],[y],[z]" (for healthy files that were already in
1132 # the grid), "Found on [x]" (for files that needed upload but which
1133 # discovered pre-existing shares), and "Placed on [x]" (for newly
1134 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1135 # set of binary serverid strings.
1137 # the old results are too hard to deal with (they don't even contain
1138 # as much information as the new results, since the nodeids are
1139 # abbreviated), so if we detect old results, just clobber them.
1141 sharemap = upload_results.sharemap
1142 if str in [type(v) for v in sharemap.values()]:
1143 upload_results.sharemap = None
1145 def _build_verifycap(self, upload_results):
1146 self.log("upload finished, building readcap")
1147 self._convert_old_upload_results(upload_results)
1148 self._upload_status.set_status("Building Readcap")
1150 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1151 assert r.uri_extension_data["total_shares"] == self._total_shares
1152 assert r.uri_extension_data["segment_size"] == self._segment_size
1153 assert r.uri_extension_data["size"] == self._size
1154 r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1155 uri_extension_hash=r.uri_extension_hash,
1156 needed_shares=self._needed_shares,
1157 total_shares=self._total_shares, size=self._size
1160 r.file_size = self._size
1161 r.timings["storage_index"] = self._storage_index_elapsed
1162 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1163 if "total" in r.timings:
1164 r.timings["helper_total"] = r.timings["total"]
1165 r.timings["total"] = now - self._started
1166 self._upload_status.set_status("Finished")
1167 self._upload_status.set_results(r)
1170 def get_upload_status(self):
1171 return self._upload_status
1173 class BaseUploadable:
1174 default_max_segment_size = 128*KiB # overridden by max_segment_size
1175 default_encoding_param_k = 3 # overridden by encoding_parameters
1176 default_encoding_param_happy = 7
1177 default_encoding_param_n = 10
1179 max_segment_size = None
1180 encoding_param_k = None
1181 encoding_param_happy = None
1182 encoding_param_n = None
1184 _all_encoding_parameters = None
1187 def set_upload_status(self, upload_status):
1188 self._status = IUploadStatus(upload_status)
1190 def set_default_encoding_parameters(self, default_params):
1191 assert isinstance(default_params, dict)
1192 for k,v in default_params.items():
1193 precondition(isinstance(k, str), k, v)
1194 precondition(isinstance(v, int), k, v)
1195 if "k" in default_params:
1196 self.default_encoding_param_k = default_params["k"]
1197 if "happy" in default_params:
1198 self.default_encoding_param_happy = default_params["happy"]
1199 if "n" in default_params:
1200 self.default_encoding_param_n = default_params["n"]
1201 if "max_segment_size" in default_params:
1202 self.default_max_segment_size = default_params["max_segment_size"]
1204 def get_all_encoding_parameters(self):
1205 if self._all_encoding_parameters:
1206 return defer.succeed(self._all_encoding_parameters)
1208 max_segsize = self.max_segment_size or self.default_max_segment_size
1209 k = self.encoding_param_k or self.default_encoding_param_k
1210 happy = self.encoding_param_happy or self.default_encoding_param_happy
1211 n = self.encoding_param_n or self.default_encoding_param_n
1214 def _got_size(file_size):
1215 # for small files, shrink the segment size to avoid wasting space
1216 segsize = min(max_segsize, file_size)
1217 # this must be a multiple of 'required_shares'==k
1218 segsize = mathutil.next_multiple(segsize, k)
1219 encoding_parameters = (k, happy, n, segsize)
1220 self._all_encoding_parameters = encoding_parameters
1221 return encoding_parameters
1222 d.addCallback(_got_size)
1225 class FileHandle(BaseUploadable):
1226 implements(IUploadable)
1228 def __init__(self, filehandle, convergence):
1230 Upload the data from the filehandle. If convergence is None then a
1231 random encryption key will be used, else the plaintext will be hashed,
1232 then the hash will be hashed together with the string in the
1233 "convergence" argument to form the encryption key.
1235 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1236 self._filehandle = filehandle
1238 self.convergence = convergence
1241 def _get_encryption_key_convergent(self):
1242 if self._key is not None:
1243 return defer.succeed(self._key)
1246 # that sets self._size as a side-effect
1247 d.addCallback(lambda size: self.get_all_encoding_parameters())
1249 k, happy, n, segsize = params
1250 f = self._filehandle
1251 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1256 data = f.read(BLOCKSIZE)
1259 enckey_hasher.update(data)
1260 # TODO: setting progress in a non-yielding loop is kind of
1261 # pointless, but I'm anticipating (perhaps prematurely) the
1262 # day when we use a slowjob or twisted's CooperatorService to
1263 # make this yield time to other jobs.
1264 bytes_read += len(data)
1266 self._status.set_progress(0, float(bytes_read)/self._size)
1268 self._key = enckey_hasher.digest()
1270 self._status.set_progress(0, 1.0)
1271 assert len(self._key) == 16
1276 def _get_encryption_key_random(self):
1277 if self._key is None:
1278 self._key = os.urandom(16)
1279 return defer.succeed(self._key)
1281 def get_encryption_key(self):
1282 if self.convergence is not None:
1283 return self._get_encryption_key_convergent()
1285 return self._get_encryption_key_random()
1288 if self._size is not None:
1289 return defer.succeed(self._size)
1290 self._filehandle.seek(0,2)
1291 size = self._filehandle.tell()
1293 self._filehandle.seek(0)
1294 return defer.succeed(size)
1296 def read(self, length):
1297 return defer.succeed([self._filehandle.read(length)])
1300 # the originator of the filehandle reserves the right to close it
1303 class FileName(FileHandle):
1304 def __init__(self, filename, convergence):
1306 Upload the data from the filename. If convergence is None then a
1307 random encryption key will be used, else the plaintext will be hashed,
1308 then the hash will be hashed together with the string in the
1309 "convergence" argument to form the encryption key.
1311 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1312 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1314 FileHandle.close(self)
1315 self._filehandle.close()
1317 class Data(FileHandle):
1318 def __init__(self, data, convergence):
1320 Upload the data from the data argument. If convergence is None then a
1321 random encryption key will be used, else the plaintext will be hashed,
1322 then the hash will be hashed together with the string in the
1323 "convergence" argument to form the encryption key.
1325 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1326 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1328 class Uploader(service.MultiService, log.PrefixingLogMixin):
1329 """I am a service that allows file uploading. I am a service-child of the
1332 implements(IUploader)
1334 URI_LIT_SIZE_THRESHOLD = 55
1336 def __init__(self, helper_furl=None, stats_provider=None):
1337 self._helper_furl = helper_furl
1338 self.stats_provider = stats_provider
1340 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1341 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1342 service.MultiService.__init__(self)
1344 def startService(self):
1345 service.MultiService.startService(self)
1346 if self._helper_furl:
1347 self.parent.tub.connectTo(self._helper_furl,
1350 def _got_helper(self, helper):
1351 self.log("got helper connection, getting versions")
1352 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1354 "application-version": "unknown: no get_version()",
1356 d = add_version_to_remote_reference(helper, default)
1357 d.addCallback(self._got_versioned_helper)
1359 def _got_versioned_helper(self, helper):
1360 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1361 if needed not in helper.version:
1362 raise InsufficientVersionError(needed, helper.version)
1363 self._helper = helper
1364 helper.notifyOnDisconnect(self._lost_helper)
1366 def _lost_helper(self):
1369 def get_helper_info(self):
1370 # return a tuple of (helper_furl_or_None, connected_bool)
1371 return (self._helper_furl, bool(self._helper))
1374 def upload(self, uploadable, history=None):
1376 Returns a Deferred that will fire with the UploadResults instance.
1381 uploadable = IUploadable(uploadable)
1382 d = uploadable.get_size()
1383 def _got_size(size):
1384 default_params = self.parent.get_encoding_parameters()
1385 precondition(isinstance(default_params, dict), default_params)
1386 precondition("max_segment_size" in default_params, default_params)
1387 uploadable.set_default_encoding_parameters(default_params)
1389 if self.stats_provider:
1390 self.stats_provider.count('uploader.files_uploaded', 1)
1391 self.stats_provider.count('uploader.bytes_uploaded', size)
1393 if size <= self.URI_LIT_SIZE_THRESHOLD:
1394 uploader = LiteralUploader()
1395 return uploader.start(uploadable)
1397 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1398 d2 = defer.succeed(None)
1400 uploader = AssistedUploader(self._helper)
1401 d2.addCallback(lambda x: eu.get_storage_index())
1402 d2.addCallback(lambda si: uploader.start(eu, si))
1404 storage_broker = self.parent.get_storage_broker()
1405 secret_holder = self.parent._secret_holder
1406 uploader = CHKUploader(storage_broker, secret_holder)
1407 d2.addCallback(lambda x: uploader.start(eu))
1409 self._all_uploads[uploader] = None
1411 history.add_upload(uploader.get_upload_status())
1412 def turn_verifycap_into_read_cap(uploadresults):
1413 # Generate the uri from the verifycap plus the key.
1414 d3 = uploadable.get_encryption_key()
1415 def put_readcap_into_results(key):
1416 v = uri.from_string(uploadresults.verifycapstr)
1417 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1418 uploadresults.uri = r.to_string()
1419 return uploadresults
1420 d3.addCallback(put_readcap_into_results)
1422 d2.addCallback(turn_verifycap_into_read_cap)
1424 d.addCallback(_got_size)