1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9 file_cancel_secret_hash, bucket_renewal_secret_hash, \
10 bucket_cancel_secret_hash, plaintext_hasher, \
11 storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17 shares_by_server, merge_peers, \
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23 NoServersError, InsufficientVersionError, UploadUnhappinessError
24 from allmydata.immutable import layout
25 from pycryptopp.cipher.aes import AES
27 from cStringIO import StringIO
36 class HaveAllPeersError(Exception):
37 # we use this to jump out of the loop
40 # this wants to live in storage, not here
41 class TooFullError(Exception):
44 class UploadResults(Copyable, RemoteCopy):
45 implements(IUploadResults)
46 # note: don't change this string, it needs to match the value used on the
47 # helper, and it does *not* need to match the fully-qualified
48 # package/module/class name
49 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
52 # also, think twice about changing the shape of any existing attribute,
53 # because instances of this class are sent from the helper to its client,
54 # so changing this may break compatibility. Consider adding new fields
55 # instead of modifying existing ones.
58 self.timings = {} # dict of name to number of seconds
59 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
60 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
62 self.ciphertext_fetched = None # how much the helper fetched
64 self.preexisting_shares = None # count of shares already present
65 self.pushed_shares = None # count of shares we pushed
68 # our current uri_extension is 846 bytes for small files, a few bytes
69 # more for larger ones (since the filesize is encoded in decimal in a
70 # few places). Ask for a little bit more just in case we need it. If
71 # the extension changes size, we can change EXTENSION_SIZE to
72 # allocate a more accurate amount of space.
74 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
78 def __init__(self, peerid, storage_server,
79 sharesize, blocksize, num_segments, num_share_hashes,
81 bucket_renewal_secret, bucket_cancel_secret):
82 precondition(isinstance(peerid, str), peerid)
83 precondition(len(peerid) == 20, peerid)
85 self._storageserver = storage_server # to an RIStorageServer
86 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
87 self.sharesize = sharesize
89 wbp = layout.make_write_bucket_proxy(None, sharesize,
90 blocksize, num_segments,
92 EXTENSION_SIZE, peerid)
93 self.wbp_class = wbp.__class__ # to create more of them
94 self.allocated_size = wbp.get_allocated_size()
95 self.blocksize = blocksize
96 self.num_segments = num_segments
97 self.num_share_hashes = num_share_hashes
98 self.storage_index = storage_index
100 self.renew_secret = bucket_renewal_secret
101 self.cancel_secret = bucket_cancel_secret
104 return ("<PeerTracker for peer %s and SI %s>"
105 % (idlib.shortnodeid_b2a(self.peerid),
106 si_b2a(self.storage_index)[:5]))
108 def query(self, sharenums):
109 d = self._storageserver.callRemote("allocate_buckets",
115 canary=Referenceable())
116 d.addCallback(self._got_reply)
119 def ask_about_existing_shares(self):
120 return self._storageserver.callRemote("get_buckets",
123 def _got_reply(self, (alreadygot, buckets)):
124 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
126 for sharenum, rref in buckets.iteritems():
127 bp = self.wbp_class(rref, self.sharesize,
130 self.num_share_hashes,
134 self.buckets.update(b)
135 return (alreadygot, set(b.keys()))
140 I abort the remote bucket writers for the share numbers in
141 sharenums. This is a good idea to conserve space on the storage
144 for writer in self.buckets.itervalues(): writer.abort()
147 class Tahoe2PeerSelector:
149 def __init__(self, upload_id, logparent=None, upload_status=None):
150 self.upload_id = upload_id
151 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
152 # Peers that are working normally, but full.
155 self.num_peers_contacted = 0
156 self.last_failure_msg = None
157 self._status = IUploadStatus(upload_status)
158 self._log_parent = log.msg("%s starting" % self, parent=logparent)
161 return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
163 def get_shareholders(self, storage_broker, secret_holder,
164 storage_index, share_size, block_size,
165 num_segments, total_shares, needed_shares,
166 servers_of_happiness):
168 @return: (used_peers, already_peers), where used_peers is a set of
169 PeerTracker instances that have agreed to hold some shares
170 for us (the shnum is stashed inside the PeerTracker),
171 and already_peers is a dict mapping shnum to a set of peers
172 which claim to already have the share.
176 self._status.set_status("Contacting Peers..")
178 self.total_shares = total_shares
179 self.servers_of_happiness = servers_of_happiness
180 self.needed_shares = needed_shares
182 self.homeless_shares = range(total_shares)
183 self.contacted_peers = [] # peers worth asking again
184 self.contacted_peers2 = [] # peers that we have asked again
185 self._started_second_pass = False
186 self.use_peers = set() # PeerTrackers that have shares assigned to them
187 self.preexisting_shares = {} # shareid => set(peerids) holding shareid
188 # We don't try to allocate shares to these servers, since they've said
189 # that they're incapable of storing shares of the size that we'd want
190 # to store. We keep them around because they may have existing shares
191 # for this storage index, which we want to know about for accurate
192 # servers_of_happiness accounting
193 # (this is eventually a list, but it is initialized later)
194 self.readonly_peers = None
195 # These peers have shares -- any shares -- for our SI. We keep
196 # track of these to write an error message with them later.
197 self.peers_with_shares = set()
199 # this needed_hashes computation should mirror
200 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
201 # (instead of a HashTree) because we don't require actual hashing
202 # just to count the levels.
203 ht = hashtree.IncompleteHashTree(total_shares)
204 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
206 # figure out how much space to ask for
207 wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
208 num_share_hashes, EXTENSION_SIZE,
210 allocated_size = wbp.get_allocated_size()
211 all_peers = storage_broker.get_servers_for_index(storage_index)
213 raise NoServersError("client gave us zero peers")
215 # filter the list of peers according to which ones can accomodate
216 # this request. This excludes older peers (which used a 4-byte size
217 # field) from getting large shares (for files larger than about
218 # 12GiB). See #439 for details.
219 def _get_maxsize(peer):
220 (peerid, conn) = peer
221 v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
222 return v1["maximum-immutable-share-size"]
223 writable_peers = [peer for peer in all_peers
224 if _get_maxsize(peer) >= allocated_size]
225 readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers)
227 # decide upon the renewal/cancel secrets, to include them in the
228 # allocate_buckets query.
229 client_renewal_secret = secret_holder.get_renewal_secret()
230 client_cancel_secret = secret_holder.get_cancel_secret()
232 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
234 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
236 def _make_trackers(peers):
237 return [PeerTracker(peerid, conn,
238 share_size, block_size,
239 num_segments, num_share_hashes,
241 bucket_renewal_secret_hash(file_renewal_secret,
243 bucket_cancel_secret_hash(file_cancel_secret,
245 for (peerid, conn) in peers]
246 self.uncontacted_peers = _make_trackers(writable_peers)
247 self.readonly_peers = _make_trackers(readonly_peers)
248 # We now ask peers that can't hold any new shares about existing
249 # shares that they might have for our SI. Once this is done, we
250 # start placing the shares that we haven't already accounted
253 if self._status and self.readonly_peers:
254 self._status.set_status("Contacting readonly peers to find "
255 "any existing shares")
256 for peer in self.readonly_peers:
257 assert isinstance(peer, PeerTracker)
258 d = peer.ask_about_existing_shares()
259 d.addBoth(self._handle_existing_response, peer.peerid)
261 self.num_peers_contacted += 1
262 self.query_count += 1
263 log.msg("asking peer %s for any existing shares for "
265 % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
266 level=log.NOISY, parent=self._log_parent)
267 dl = defer.DeferredList(ds)
268 dl.addCallback(lambda ign: self._loop())
272 def _handle_existing_response(self, res, peer):
274 I handle responses to the queries sent by
275 Tahoe2PeerSelector._existing_shares.
277 if isinstance(res, failure.Failure):
278 log.msg("%s got error during existing shares check: %s"
279 % (idlib.shortnodeid_b2a(peer), res),
280 level=log.UNUSUAL, parent=self._log_parent)
281 self.error_count += 1
282 self.bad_query_count += 1
286 self.peers_with_shares.add(peer)
287 log.msg("response from peer %s: alreadygot=%s"
288 % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
289 level=log.NOISY, parent=self._log_parent)
290 for bucket in buckets:
291 self.preexisting_shares.setdefault(bucket, set()).add(peer)
292 if self.homeless_shares and bucket in self.homeless_shares:
293 self.homeless_shares.remove(bucket)
295 self.bad_query_count += 1
298 def _get_progress_message(self):
299 if not self.homeless_shares:
300 msg = "placed all %d shares, " % (self.total_shares)
302 msg = ("placed %d shares out of %d total (%d homeless), " %
303 (self.total_shares - len(self.homeless_shares),
305 len(self.homeless_shares)))
306 return (msg + "want to place shares on at least %d servers such that "
307 "any %d of them have enough shares to recover the file, "
308 "sent %d queries to %d peers, "
309 "%d queries placed some shares, %d placed none "
310 "(of which %d placed none due to the server being"
311 " full and %d placed none due to an error)" %
312 (self.servers_of_happiness, self.needed_shares,
313 self.query_count, self.num_peers_contacted,
314 self.good_query_count, self.bad_query_count,
315 self.full_count, self.error_count))
319 if not self.homeless_shares:
320 merged = merge_peers(self.preexisting_shares, self.use_peers)
321 effective_happiness = servers_of_happiness(merged)
322 if self.servers_of_happiness <= effective_happiness:
323 msg = ("peer selection successful for %s: %s" % (self,
324 self._get_progress_message()))
325 log.msg(msg, parent=self._log_parent)
326 return (self.use_peers, self.preexisting_shares)
328 # We're not okay right now, but maybe we can fix it by
329 # redistributing some shares. In cases where one or two
330 # servers has, before the upload, all or most of the
331 # shares for a given SI, this can work by allowing _loop
332 # a chance to spread those out over the other peers,
333 delta = self.servers_of_happiness - effective_happiness
334 shares = shares_by_server(self.preexisting_shares)
335 # Each server in shares maps to a set of shares stored on it.
336 # Since we want to keep at least one share on each server
337 # that has one (otherwise we'd only be making
338 # the situation worse by removing distinct servers),
339 # each server has len(its shares) - 1 to spread around.
340 shares_to_spread = sum([len(list(sharelist)) - 1
341 for (server, sharelist)
343 if delta <= len(self.uncontacted_peers) and \
344 shares_to_spread >= delta:
345 items = shares.items()
346 while len(self.homeless_shares) < delta:
347 # Loop through the allocated shares, removing
348 # one from each server that has more than one
349 # and putting it back into self.homeless_shares
350 # until we've done this delta times.
351 server, sharelist = items.pop()
352 if len(sharelist) > 1:
353 share = sharelist.pop()
354 self.homeless_shares.append(share)
355 self.preexisting_shares[share].remove(server)
356 if not self.preexisting_shares[share]:
357 del self.preexisting_shares[share]
358 items.append((server, sharelist))
361 # Redistribution won't help us; fail.
362 peer_count = len(self.peers_with_shares)
363 msg = failure_message(peer_count,
365 self.servers_of_happiness,
367 return self._failed("%s (%s)" % (msg, self._get_progress_message()))
369 if self.uncontacted_peers:
370 peer = self.uncontacted_peers.pop(0)
371 # TODO: don't pre-convert all peerids to PeerTrackers
372 assert isinstance(peer, PeerTracker)
374 shares_to_ask = set([self.homeless_shares.pop(0)])
375 self.query_count += 1
376 self.num_peers_contacted += 1
378 self._status.set_status("Contacting Peers [%s] (first query),"
380 % (idlib.shortnodeid_b2a(peer.peerid),
381 len(self.homeless_shares)))
382 d = peer.query(shares_to_ask)
383 d.addBoth(self._got_response, peer, shares_to_ask,
384 self.contacted_peers)
386 elif self.contacted_peers:
387 # ask a peer that we've already asked.
388 if not self._started_second_pass:
389 log.msg("starting second pass", parent=self._log_parent,
391 self._started_second_pass = True
392 num_shares = mathutil.div_ceil(len(self.homeless_shares),
393 len(self.contacted_peers))
394 peer = self.contacted_peers.pop(0)
395 shares_to_ask = set(self.homeless_shares[:num_shares])
396 self.homeless_shares[:num_shares] = []
397 self.query_count += 1
399 self._status.set_status("Contacting Peers [%s] (second query),"
401 % (idlib.shortnodeid_b2a(peer.peerid),
402 len(self.homeless_shares)))
403 d = peer.query(shares_to_ask)
404 d.addBoth(self._got_response, peer, shares_to_ask,
405 self.contacted_peers2)
407 elif self.contacted_peers2:
408 # we've finished the second-or-later pass. Move all the remaining
409 # peers back into self.contacted_peers for the next pass.
410 self.contacted_peers.extend(self.contacted_peers2)
411 self.contacted_peers2[:] = []
414 # no more peers. If we haven't placed enough shares, we fail.
415 merged = merge_peers(self.preexisting_shares, self.use_peers)
416 effective_happiness = servers_of_happiness(merged)
417 if effective_happiness < self.servers_of_happiness:
418 msg = failure_message(len(self.peers_with_shares),
420 self.servers_of_happiness,
422 msg = ("peer selection failed for %s: %s (%s)" % (self,
424 self._get_progress_message()))
425 if self.last_failure_msg:
426 msg += " (%s)" % (self.last_failure_msg,)
427 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
428 return self._failed(msg)
430 # we placed enough to be happy, so we're done
432 self._status.set_status("Placed all shares")
433 return (self.use_peers, self.preexisting_shares)
435 def _got_response(self, res, peer, shares_to_ask, put_peer_here):
436 if isinstance(res, failure.Failure):
437 # This is unusual, and probably indicates a bug or a network
439 log.msg("%s got error during peer selection: %s" % (peer, res),
440 level=log.UNUSUAL, parent=self._log_parent)
441 self.error_count += 1
442 self.bad_query_count += 1
443 self.homeless_shares = list(shares_to_ask) + self.homeless_shares
444 if (self.uncontacted_peers
445 or self.contacted_peers
446 or self.contacted_peers2):
447 # there is still hope, so just loop
450 # No more peers, so this upload might fail (it depends upon
451 # whether we've hit servers_of_happiness or not). Log the last
452 # failure we got: if a coding error causes all peers to fail
453 # in the same way, this allows the common failure to be seen
454 # by the uploader and should help with debugging
455 msg = ("last failure (from %s) was: %s" % (peer, res))
456 self.last_failure_msg = msg
458 (alreadygot, allocated) = res
459 log.msg("response from peer %s: alreadygot=%s, allocated=%s"
460 % (idlib.shortnodeid_b2a(peer.peerid),
461 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
462 level=log.NOISY, parent=self._log_parent)
465 self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
466 if s in self.homeless_shares:
467 self.homeless_shares.remove(s)
469 elif s in shares_to_ask:
472 # the PeerTracker will remember which shares were allocated on
473 # that peer. We just have to remember to use them.
475 self.use_peers.add(peer)
478 if allocated or alreadygot:
479 self.peers_with_shares.add(peer.peerid)
481 not_yet_present = set(shares_to_ask) - set(alreadygot)
482 still_homeless = not_yet_present - set(allocated)
485 # They accepted at least one of the shares that we asked
486 # them to accept, or they had a share that we didn't ask
487 # them to accept but that we hadn't placed yet, so this
488 # was a productive query
489 self.good_query_count += 1
491 self.bad_query_count += 1
495 # In networks with lots of space, this is very unusual and
496 # probably indicates an error. In networks with peers that
497 # are full, it is merely unusual. In networks that are very
498 # full, it is common, and many uploads will fail. In most
499 # cases, this is obviously not fatal, and we'll just use some
502 # some shares are still homeless, keep trying to find them a
503 # home. The ones that were rejected get first priority.
504 self.homeless_shares = (list(still_homeless)
505 + self.homeless_shares)
506 # Since they were unable to accept all of our requests, so it
507 # is safe to assume that asking them again won't help.
509 # if they *were* able to accept everything, they might be
510 # willing to accept even more.
511 put_peer_here.append(peer)
517 def _failed(self, msg):
519 I am called when peer selection fails. I first abort all of the
520 remote buckets that I allocated during my unsuccessful attempt to
521 place shares for this file. I then raise an
522 UploadUnhappinessError with my msg argument.
524 for peer in self.use_peers:
525 assert isinstance(peer, PeerTracker)
529 raise UploadUnhappinessError(msg)
532 class EncryptAnUploadable:
533 """This is a wrapper that takes an IUploadable and provides
534 IEncryptedUploadable."""
535 implements(IEncryptedUploadable)
538 def __init__(self, original, log_parent=None):
539 self.original = IUploadable(original)
540 self._log_number = log_parent
541 self._encryptor = None
542 self._plaintext_hasher = plaintext_hasher()
543 self._plaintext_segment_hasher = None
544 self._plaintext_segment_hashes = []
545 self._encoding_parameters = None
546 self._file_size = None
547 self._ciphertext_bytes_read = 0
550 def set_upload_status(self, upload_status):
551 self._status = IUploadStatus(upload_status)
552 self.original.set_upload_status(upload_status)
554 def log(self, *args, **kwargs):
555 if "facility" not in kwargs:
556 kwargs["facility"] = "upload.encryption"
557 if "parent" not in kwargs:
558 kwargs["parent"] = self._log_number
559 return log.msg(*args, **kwargs)
562 if self._file_size is not None:
563 return defer.succeed(self._file_size)
564 d = self.original.get_size()
566 self._file_size = size
568 self._status.set_size(size)
570 d.addCallback(_got_size)
573 def get_all_encoding_parameters(self):
574 if self._encoding_parameters is not None:
575 return defer.succeed(self._encoding_parameters)
576 d = self.original.get_all_encoding_parameters()
577 def _got(encoding_parameters):
578 (k, happy, n, segsize) = encoding_parameters
579 self._segment_size = segsize # used by segment hashers
580 self._encoding_parameters = encoding_parameters
581 self.log("my encoding parameters: %s" % (encoding_parameters,),
583 return encoding_parameters
587 def _get_encryptor(self):
589 return defer.succeed(self._encryptor)
591 d = self.original.get_encryption_key()
596 storage_index = storage_index_hash(key)
597 assert isinstance(storage_index, str)
598 # There's no point to having the SI be longer than the key, so we
599 # specify that it is truncated to the same 128 bits as the AES key.
600 assert len(storage_index) == 16 # SHA-256 truncated to 128b
601 self._storage_index = storage_index
603 self._status.set_storage_index(storage_index)
608 def get_storage_index(self):
609 d = self._get_encryptor()
610 d.addCallback(lambda res: self._storage_index)
613 def _get_segment_hasher(self):
614 p = self._plaintext_segment_hasher
616 left = self._segment_size - self._plaintext_segment_hashed_bytes
618 p = plaintext_segment_hasher()
619 self._plaintext_segment_hasher = p
620 self._plaintext_segment_hashed_bytes = 0
621 return p, self._segment_size
623 def _update_segment_hash(self, chunk):
625 while offset < len(chunk):
626 p, segment_left = self._get_segment_hasher()
627 chunk_left = len(chunk) - offset
628 this_segment = min(chunk_left, segment_left)
629 p.update(chunk[offset:offset+this_segment])
630 self._plaintext_segment_hashed_bytes += this_segment
632 if self._plaintext_segment_hashed_bytes == self._segment_size:
633 # we've filled this segment
634 self._plaintext_segment_hashes.append(p.digest())
635 self._plaintext_segment_hasher = None
636 self.log("closed hash [%d]: %dB" %
637 (len(self._plaintext_segment_hashes)-1,
638 self._plaintext_segment_hashed_bytes),
640 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
641 segnum=len(self._plaintext_segment_hashes)-1,
642 hash=base32.b2a(p.digest()),
645 offset += this_segment
648 def read_encrypted(self, length, hash_only):
649 # make sure our parameters have been set up first
650 d = self.get_all_encoding_parameters()
652 d.addCallback(lambda ignored: self.get_size())
653 d.addCallback(lambda ignored: self._get_encryptor())
654 # then fetch and encrypt the plaintext. The unusual structure here
655 # (passing a Deferred *into* a function) is needed to avoid
656 # overflowing the stack: Deferreds don't optimize out tail recursion.
657 # We also pass in a list, to which _read_encrypted will append
660 d2 = defer.Deferred()
661 d.addCallback(lambda ignored:
662 self._read_encrypted(length, ciphertext, hash_only, d2))
663 d.addCallback(lambda ignored: d2)
666 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
668 fire_when_done.callback(ciphertext)
670 # tolerate large length= values without consuming a lot of RAM by
671 # reading just a chunk (say 50kB) at a time. This only really matters
672 # when hash_only==True (i.e. resuming an interrupted upload), since
673 # that's the case where we will be skipping over a lot of data.
674 size = min(remaining, self.CHUNKSIZE)
675 remaining = remaining - size
676 # read a chunk of plaintext..
677 d = defer.maybeDeferred(self.original.read, size)
678 # N.B.: if read() is synchronous, then since everything else is
679 # actually synchronous too, we'd blow the stack unless we stall for a
680 # tick. Once you accept a Deferred from IUploadable.read(), you must
681 # be prepared to have it fire immediately too.
682 d.addCallback(fireEventually)
683 def _good(plaintext):
685 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
686 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
687 ciphertext.extend(ct)
688 self._read_encrypted(remaining, ciphertext, hash_only,
691 fire_when_done.errback(why)
696 def _hash_and_encrypt_plaintext(self, data, hash_only):
697 assert isinstance(data, (tuple, list)), type(data)
700 # we use data.pop(0) instead of 'for chunk in data' to save
701 # memory: each chunk is destroyed as soon as we're done with it.
705 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
707 bytes_processed += len(chunk)
708 self._plaintext_hasher.update(chunk)
709 self._update_segment_hash(chunk)
710 # TODO: we have to encrypt the data (even if hash_only==True)
711 # because pycryptopp's AES-CTR implementation doesn't offer a
712 # way to change the counter value. Once pycryptopp acquires
713 # this ability, change this to simply update the counter
714 # before each call to (hash_only==False) _encryptor.process()
715 ciphertext = self._encryptor.process(chunk)
717 self.log(" skipping encryption", level=log.NOISY)
719 cryptdata.append(ciphertext)
722 self._ciphertext_bytes_read += bytes_processed
724 progress = float(self._ciphertext_bytes_read) / self._file_size
725 self._status.set_progress(1, progress)
729 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
730 # this is currently unused, but will live again when we fix #453
731 if len(self._plaintext_segment_hashes) < num_segments:
732 # close out the last one
733 assert len(self._plaintext_segment_hashes) == num_segments-1
734 p, segment_left = self._get_segment_hasher()
735 self._plaintext_segment_hashes.append(p.digest())
736 del self._plaintext_segment_hasher
737 self.log("closing plaintext leaf hasher, hashed %d bytes" %
738 self._plaintext_segment_hashed_bytes,
740 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
741 segnum=len(self._plaintext_segment_hashes)-1,
742 hash=base32.b2a(p.digest()),
744 assert len(self._plaintext_segment_hashes) == num_segments
745 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
747 def get_plaintext_hash(self):
748 h = self._plaintext_hasher.digest()
749 return defer.succeed(h)
752 return self.original.close()
755 implements(IUploadStatus)
756 statusid_counter = itertools.count(0)
759 self.storage_index = None
762 self.status = "Not started"
763 self.progress = [0.0, 0.0, 0.0]
766 self.counter = self.statusid_counter.next()
767 self.started = time.time()
769 def get_started(self):
771 def get_storage_index(self):
772 return self.storage_index
775 def using_helper(self):
777 def get_status(self):
779 def get_progress(self):
780 return tuple(self.progress)
781 def get_active(self):
783 def get_results(self):
785 def get_counter(self):
788 def set_storage_index(self, si):
789 self.storage_index = si
790 def set_size(self, size):
792 def set_helper(self, helper):
794 def set_status(self, status):
796 def set_progress(self, which, value):
797 # [0]: chk, [1]: ciphertext, [2]: encode+push
798 self.progress[which] = value
799 def set_active(self, value):
801 def set_results(self, value):
805 peer_selector_class = Tahoe2PeerSelector
807 def __init__(self, storage_broker, secret_holder):
808 # peer_selector needs storage_broker and secret_holder
809 self._storage_broker = storage_broker
810 self._secret_holder = secret_holder
811 self._log_number = self.log("CHKUploader starting", parent=None)
813 self._results = UploadResults()
814 self._storage_index = None
815 self._upload_status = UploadStatus()
816 self._upload_status.set_helper(False)
817 self._upload_status.set_active(True)
818 self._upload_status.set_results(self._results)
820 # locate_all_shareholders() will create the following attribute:
821 # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
823 def log(self, *args, **kwargs):
824 if "parent" not in kwargs:
825 kwargs["parent"] = self._log_number
826 if "facility" not in kwargs:
827 kwargs["facility"] = "tahoe.upload"
828 return log.msg(*args, **kwargs)
830 def start(self, encrypted_uploadable):
831 """Start uploading the file.
833 Returns a Deferred that will fire with the UploadResults instance.
836 self._started = time.time()
837 eu = IEncryptedUploadable(encrypted_uploadable)
838 self.log("starting upload of %s" % eu)
840 eu.set_upload_status(self._upload_status)
841 d = self.start_encrypted(eu)
842 def _done(uploadresults):
843 self._upload_status.set_active(False)
849 """Call this if the upload must be abandoned before it completes.
850 This will tell the shareholders to delete their partial shares. I
851 return a Deferred that fires when these messages have been acked."""
852 if not self._encoder:
853 # how did you call abort() before calling start() ?
854 return defer.succeed(None)
855 return self._encoder.abort()
857 def start_encrypted(self, encrypted):
858 """ Returns a Deferred that will fire with the UploadResults instance. """
859 eu = IEncryptedUploadable(encrypted)
861 started = time.time()
862 self._encoder = e = encode.Encoder(self._log_number,
864 d = e.set_encrypted_uploadable(eu)
865 d.addCallback(self.locate_all_shareholders, started)
866 d.addCallback(self.set_shareholders, e)
867 d.addCallback(lambda res: e.start())
868 d.addCallback(self._encrypted_done)
871 def locate_all_shareholders(self, encoder, started):
872 peer_selection_started = now = time.time()
873 self._storage_index_elapsed = now - started
874 storage_broker = self._storage_broker
875 secret_holder = self._secret_holder
876 storage_index = encoder.get_param("storage_index")
877 self._storage_index = storage_index
878 upload_id = si_b2a(storage_index)[:5]
879 self.log("using storage index %s" % upload_id)
880 peer_selector = self.peer_selector_class(upload_id, self._log_number,
883 share_size = encoder.get_param("share_size")
884 block_size = encoder.get_param("block_size")
885 num_segments = encoder.get_param("num_segments")
886 k,desired,n = encoder.get_param("share_counts")
888 self._peer_selection_started = time.time()
889 d = peer_selector.get_shareholders(storage_broker, secret_holder,
891 share_size, block_size,
892 num_segments, n, k, desired)
894 self._peer_selection_elapsed = time.time() - peer_selection_started
899 def set_shareholders(self, (used_peers, already_peers), encoder):
901 @param used_peers: a sequence of PeerTracker objects
902 @paran already_peers: a dict mapping sharenum to a set of peerids
903 that claim to already have this share
905 self.log("_send_shares, used_peers is %s" % (used_peers,))
906 # record already-present shares in self._results
907 self._results.preexisting_shares = len(already_peers)
909 self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
910 for peer in used_peers:
911 assert isinstance(peer, PeerTracker)
913 servermap = already_peers.copy()
914 for peer in used_peers:
915 buckets.update(peer.buckets)
916 for shnum in peer.buckets:
917 self._peer_trackers[shnum] = peer
918 servermap.setdefault(shnum, set()).add(peer.peerid)
919 assert len(buckets) == sum([len(peer.buckets) for peer in used_peers]), "%s (%s) != %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in used_peers]), [(p.buckets, p.peerid) for p in used_peers])
920 encoder.set_shareholders(buckets, servermap)
922 def _encrypted_done(self, verifycap):
923 """ Returns a Deferred that will fire with the UploadResults instance. """
925 for shnum in self._encoder.get_shares_placed():
926 peer_tracker = self._peer_trackers[shnum]
927 peerid = peer_tracker.peerid
928 r.sharemap.add(shnum, peerid)
929 r.servermap.add(peerid, shnum)
930 r.pushed_shares = len(self._encoder.get_shares_placed())
932 r.file_size = self._encoder.file_size
933 r.timings["total"] = now - self._started
934 r.timings["storage_index"] = self._storage_index_elapsed
935 r.timings["peer_selection"] = self._peer_selection_elapsed
936 r.timings.update(self._encoder.get_times())
937 r.uri_extension_data = self._encoder.get_uri_extension_data()
938 r.verifycapstr = verifycap.to_string()
941 def get_upload_status(self):
942 return self._upload_status
944 def read_this_many_bytes(uploadable, size, prepend_data=[]):
946 return defer.succeed([])
947 d = uploadable.read(size)
949 assert isinstance(data, list)
950 bytes = sum([len(piece) for piece in data])
953 remaining = size - bytes
955 return read_this_many_bytes(uploadable, remaining,
957 return prepend_data + data
961 class LiteralUploader:
964 self._results = UploadResults()
965 self._status = s = UploadStatus()
966 s.set_storage_index(None)
968 s.set_progress(0, 1.0)
970 s.set_results(self._results)
972 def start(self, uploadable):
973 uploadable = IUploadable(uploadable)
974 d = uploadable.get_size()
977 self._status.set_size(size)
978 self._results.file_size = size
979 return read_this_many_bytes(uploadable, size)
980 d.addCallback(_got_size)
981 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
982 d.addCallback(lambda u: u.to_string())
983 d.addCallback(self._build_results)
986 def _build_results(self, uri):
987 self._results.uri = uri
988 self._status.set_status("Finished")
989 self._status.set_progress(1, 1.0)
990 self._status.set_progress(2, 1.0)
996 def get_upload_status(self):
999 class RemoteEncryptedUploadable(Referenceable):
1000 implements(RIEncryptedUploadable)
1002 def __init__(self, encrypted_uploadable, upload_status):
1003 self._eu = IEncryptedUploadable(encrypted_uploadable)
1005 self._bytes_sent = 0
1006 self._status = IUploadStatus(upload_status)
1007 # we are responsible for updating the status string while we run, and
1008 # for setting the ciphertext-fetch progress.
1012 if self._size is not None:
1013 return defer.succeed(self._size)
1014 d = self._eu.get_size()
1015 def _got_size(size):
1018 d.addCallback(_got_size)
1021 def remote_get_size(self):
1022 return self.get_size()
1023 def remote_get_all_encoding_parameters(self):
1024 return self._eu.get_all_encoding_parameters()
1026 def _read_encrypted(self, length, hash_only):
1027 d = self._eu.read_encrypted(length, hash_only)
1030 self._offset += length
1032 size = sum([len(data) for data in strings])
1033 self._offset += size
1035 d.addCallback(_read)
1038 def remote_read_encrypted(self, offset, length):
1039 # we don't support seek backwards, but we allow skipping forwards
1040 precondition(offset >= 0, offset)
1041 precondition(length >= 0, length)
1042 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1044 precondition(offset >= self._offset, offset, self._offset)
1045 if offset > self._offset:
1046 # read the data from disk anyways, to build up the hash tree
1047 skip = offset - self._offset
1048 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1049 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1050 d = self._read_encrypted(skip, hash_only=True)
1052 d = defer.succeed(None)
1054 def _at_correct_offset(res):
1055 assert offset == self._offset, "%d != %d" % (offset, self._offset)
1056 return self._read_encrypted(length, hash_only=False)
1057 d.addCallback(_at_correct_offset)
1060 size = sum([len(data) for data in strings])
1061 self._bytes_sent += size
1063 d.addCallback(_read)
1066 def remote_close(self):
1067 return self._eu.close()
1070 class AssistedUploader:
1072 def __init__(self, helper):
1073 self._helper = helper
1074 self._log_number = log.msg("AssistedUploader starting")
1075 self._storage_index = None
1076 self._upload_status = s = UploadStatus()
1080 def log(self, *args, **kwargs):
1081 if "parent" not in kwargs:
1082 kwargs["parent"] = self._log_number
1083 return log.msg(*args, **kwargs)
1085 def start(self, encrypted_uploadable, storage_index):
1086 """Start uploading the file.
1088 Returns a Deferred that will fire with the UploadResults instance.
1090 precondition(isinstance(storage_index, str), storage_index)
1091 self._started = time.time()
1092 eu = IEncryptedUploadable(encrypted_uploadable)
1093 eu.set_upload_status(self._upload_status)
1094 self._encuploadable = eu
1095 self._storage_index = storage_index
1097 d.addCallback(self._got_size)
1098 d.addCallback(lambda res: eu.get_all_encoding_parameters())
1099 d.addCallback(self._got_all_encoding_parameters)
1100 d.addCallback(self._contact_helper)
1101 d.addCallback(self._build_verifycap)
1103 self._upload_status.set_active(False)
1108 def _got_size(self, size):
1110 self._upload_status.set_size(size)
1112 def _got_all_encoding_parameters(self, params):
1113 k, happy, n, segment_size = params
1114 # stash these for URI generation later
1115 self._needed_shares = k
1116 self._total_shares = n
1117 self._segment_size = segment_size
1119 def _contact_helper(self, res):
1120 now = self._time_contacting_helper_start = time.time()
1121 self._storage_index_elapsed = now - self._started
1122 self.log(format="contacting helper for SI %(si)s..",
1123 si=si_b2a(self._storage_index))
1124 self._upload_status.set_status("Contacting Helper")
1125 d = self._helper.callRemote("upload_chk", self._storage_index)
1126 d.addCallback(self._contacted_helper)
1129 def _contacted_helper(self, (upload_results, upload_helper)):
1131 elapsed = now - self._time_contacting_helper_start
1132 self._elapsed_time_contacting_helper = elapsed
1134 self.log("helper says we need to upload")
1135 self._upload_status.set_status("Uploading Ciphertext")
1136 # we need to upload the file
1137 reu = RemoteEncryptedUploadable(self._encuploadable,
1138 self._upload_status)
1139 # let it pre-compute the size for progress purposes
1141 d.addCallback(lambda ignored:
1142 upload_helper.callRemote("upload", reu))
1143 # this Deferred will fire with the upload results
1145 self.log("helper says file is already uploaded")
1146 self._upload_status.set_progress(1, 1.0)
1147 self._upload_status.set_results(upload_results)
1148 return upload_results
1150 def _convert_old_upload_results(self, upload_results):
1151 # pre-1.3.0 helpers return upload results which contain a mapping
1152 # from shnum to a single human-readable string, containing things
1153 # like "Found on [x],[y],[z]" (for healthy files that were already in
1154 # the grid), "Found on [x]" (for files that needed upload but which
1155 # discovered pre-existing shares), and "Placed on [x]" (for newly
1156 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1157 # set of binary serverid strings.
1159 # the old results are too hard to deal with (they don't even contain
1160 # as much information as the new results, since the nodeids are
1161 # abbreviated), so if we detect old results, just clobber them.
1163 sharemap = upload_results.sharemap
1164 if str in [type(v) for v in sharemap.values()]:
1165 upload_results.sharemap = None
1167 def _build_verifycap(self, upload_results):
1168 self.log("upload finished, building readcap")
1169 self._convert_old_upload_results(upload_results)
1170 self._upload_status.set_status("Building Readcap")
1172 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1173 assert r.uri_extension_data["total_shares"] == self._total_shares
1174 assert r.uri_extension_data["segment_size"] == self._segment_size
1175 assert r.uri_extension_data["size"] == self._size
1176 r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1177 uri_extension_hash=r.uri_extension_hash,
1178 needed_shares=self._needed_shares,
1179 total_shares=self._total_shares, size=self._size
1182 r.file_size = self._size
1183 r.timings["storage_index"] = self._storage_index_elapsed
1184 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1185 if "total" in r.timings:
1186 r.timings["helper_total"] = r.timings["total"]
1187 r.timings["total"] = now - self._started
1188 self._upload_status.set_status("Finished")
1189 self._upload_status.set_results(r)
1192 def get_upload_status(self):
1193 return self._upload_status
1195 class BaseUploadable:
1196 default_max_segment_size = 128*KiB # overridden by max_segment_size
1197 default_encoding_param_k = 3 # overridden by encoding_parameters
1198 default_encoding_param_happy = 7
1199 default_encoding_param_n = 10
1201 max_segment_size = None
1202 encoding_param_k = None
1203 encoding_param_happy = None
1204 encoding_param_n = None
1206 _all_encoding_parameters = None
1209 def set_upload_status(self, upload_status):
1210 self._status = IUploadStatus(upload_status)
1212 def set_default_encoding_parameters(self, default_params):
1213 assert isinstance(default_params, dict)
1214 for k,v in default_params.items():
1215 precondition(isinstance(k, str), k, v)
1216 precondition(isinstance(v, int), k, v)
1217 if "k" in default_params:
1218 self.default_encoding_param_k = default_params["k"]
1219 if "happy" in default_params:
1220 self.default_encoding_param_happy = default_params["happy"]
1221 if "n" in default_params:
1222 self.default_encoding_param_n = default_params["n"]
1223 if "max_segment_size" in default_params:
1224 self.default_max_segment_size = default_params["max_segment_size"]
1226 def get_all_encoding_parameters(self):
1227 if self._all_encoding_parameters:
1228 return defer.succeed(self._all_encoding_parameters)
1230 max_segsize = self.max_segment_size or self.default_max_segment_size
1231 k = self.encoding_param_k or self.default_encoding_param_k
1232 happy = self.encoding_param_happy or self.default_encoding_param_happy
1233 n = self.encoding_param_n or self.default_encoding_param_n
1236 def _got_size(file_size):
1237 # for small files, shrink the segment size to avoid wasting space
1238 segsize = min(max_segsize, file_size)
1239 # this must be a multiple of 'required_shares'==k
1240 segsize = mathutil.next_multiple(segsize, k)
1241 encoding_parameters = (k, happy, n, segsize)
1242 self._all_encoding_parameters = encoding_parameters
1243 return encoding_parameters
1244 d.addCallback(_got_size)
1247 class FileHandle(BaseUploadable):
1248 implements(IUploadable)
1250 def __init__(self, filehandle, convergence):
1252 Upload the data from the filehandle. If convergence is None then a
1253 random encryption key will be used, else the plaintext will be hashed,
1254 then the hash will be hashed together with the string in the
1255 "convergence" argument to form the encryption key.
1257 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1258 self._filehandle = filehandle
1260 self.convergence = convergence
1263 def _get_encryption_key_convergent(self):
1264 if self._key is not None:
1265 return defer.succeed(self._key)
1268 # that sets self._size as a side-effect
1269 d.addCallback(lambda size: self.get_all_encoding_parameters())
1271 k, happy, n, segsize = params
1272 f = self._filehandle
1273 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1278 data = f.read(BLOCKSIZE)
1281 enckey_hasher.update(data)
1282 # TODO: setting progress in a non-yielding loop is kind of
1283 # pointless, but I'm anticipating (perhaps prematurely) the
1284 # day when we use a slowjob or twisted's CooperatorService to
1285 # make this yield time to other jobs.
1286 bytes_read += len(data)
1288 self._status.set_progress(0, float(bytes_read)/self._size)
1290 self._key = enckey_hasher.digest()
1292 self._status.set_progress(0, 1.0)
1293 assert len(self._key) == 16
1298 def _get_encryption_key_random(self):
1299 if self._key is None:
1300 self._key = os.urandom(16)
1301 return defer.succeed(self._key)
1303 def get_encryption_key(self):
1304 if self.convergence is not None:
1305 return self._get_encryption_key_convergent()
1307 return self._get_encryption_key_random()
1310 if self._size is not None:
1311 return defer.succeed(self._size)
1312 self._filehandle.seek(0,2)
1313 size = self._filehandle.tell()
1315 self._filehandle.seek(0)
1316 return defer.succeed(size)
1318 def read(self, length):
1319 return defer.succeed([self._filehandle.read(length)])
1322 # the originator of the filehandle reserves the right to close it
1325 class FileName(FileHandle):
1326 def __init__(self, filename, convergence):
1328 Upload the data from the filename. If convergence is None then a
1329 random encryption key will be used, else the plaintext will be hashed,
1330 then the hash will be hashed together with the string in the
1331 "convergence" argument to form the encryption key.
1333 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1334 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1336 FileHandle.close(self)
1337 self._filehandle.close()
1339 class Data(FileHandle):
1340 def __init__(self, data, convergence):
1342 Upload the data from the data argument. If convergence is None then a
1343 random encryption key will be used, else the plaintext will be hashed,
1344 then the hash will be hashed together with the string in the
1345 "convergence" argument to form the encryption key.
1347 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1348 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1350 class Uploader(service.MultiService, log.PrefixingLogMixin):
1351 """I am a service that allows file uploading. I am a service-child of the
1354 implements(IUploader)
1356 URI_LIT_SIZE_THRESHOLD = 55
1358 def __init__(self, helper_furl=None, stats_provider=None):
1359 self._helper_furl = helper_furl
1360 self.stats_provider = stats_provider
1362 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1363 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1364 service.MultiService.__init__(self)
1366 def startService(self):
1367 service.MultiService.startService(self)
1368 if self._helper_furl:
1369 self.parent.tub.connectTo(self._helper_furl,
1372 def _got_helper(self, helper):
1373 self.log("got helper connection, getting versions")
1374 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1376 "application-version": "unknown: no get_version()",
1378 d = add_version_to_remote_reference(helper, default)
1379 d.addCallback(self._got_versioned_helper)
1381 def _got_versioned_helper(self, helper):
1382 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1383 if needed not in helper.version:
1384 raise InsufficientVersionError(needed, helper.version)
1385 self._helper = helper
1386 helper.notifyOnDisconnect(self._lost_helper)
1388 def _lost_helper(self):
1391 def get_helper_info(self):
1392 # return a tuple of (helper_furl_or_None, connected_bool)
1393 return (self._helper_furl, bool(self._helper))
1396 def upload(self, uploadable, history=None):
1398 Returns a Deferred that will fire with the UploadResults instance.
1403 uploadable = IUploadable(uploadable)
1404 d = uploadable.get_size()
1405 def _got_size(size):
1406 default_params = self.parent.get_encoding_parameters()
1407 precondition(isinstance(default_params, dict), default_params)
1408 precondition("max_segment_size" in default_params, default_params)
1409 uploadable.set_default_encoding_parameters(default_params)
1411 if self.stats_provider:
1412 self.stats_provider.count('uploader.files_uploaded', 1)
1413 self.stats_provider.count('uploader.bytes_uploaded', size)
1415 if size <= self.URI_LIT_SIZE_THRESHOLD:
1416 uploader = LiteralUploader()
1417 return uploader.start(uploadable)
1419 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1420 d2 = defer.succeed(None)
1422 uploader = AssistedUploader(self._helper)
1423 d2.addCallback(lambda x: eu.get_storage_index())
1424 d2.addCallback(lambda si: uploader.start(eu, si))
1426 storage_broker = self.parent.get_storage_broker()
1427 secret_holder = self.parent._secret_holder
1428 uploader = CHKUploader(storage_broker, secret_holder)
1429 d2.addCallback(lambda x: uploader.start(eu))
1431 self._all_uploads[uploader] = None
1433 history.add_upload(uploader.get_upload_status())
1434 def turn_verifycap_into_read_cap(uploadresults):
1435 # Generate the uri from the verifycap plus the key.
1436 d3 = uploadable.get_encryption_key()
1437 def put_readcap_into_results(key):
1438 v = uri.from_string(uploadresults.verifycapstr)
1439 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1440 uploadresults.uri = r.to_string()
1441 return uploadresults
1442 d3.addCallback(put_readcap_into_results)
1444 d2.addCallback(turn_verifycap_into_read_cap)
1446 d.addCallback(_got_size)