1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9 file_cancel_secret_hash, bucket_renewal_secret_hash, \
10 bucket_cancel_secret_hash, plaintext_hasher, \
11 storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17 shares_by_server, merge_peers, \
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23 NoServersError, InsufficientVersionError, UploadUnhappinessError
24 from allmydata.immutable import layout
25 from pycryptopp.cipher.aes import AES
27 from cStringIO import StringIO
36 class HaveAllPeersError(Exception):
37 # we use this to jump out of the loop
40 # this wants to live in storage, not here
41 class TooFullError(Exception):
44 class UploadResults(Copyable, RemoteCopy):
45 implements(IUploadResults)
46 # note: don't change this string, it needs to match the value used on the
47 # helper, and it does *not* need to match the fully-qualified
48 # package/module/class name
49 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
52 # also, think twice about changing the shape of any existing attribute,
53 # because instances of this class are sent from the helper to its client,
54 # so changing this may break compatibility. Consider adding new fields
55 # instead of modifying existing ones.
58 self.timings = {} # dict of name to number of seconds
59 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
60 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
62 self.ciphertext_fetched = None # how much the helper fetched
64 self.preexisting_shares = None # count of shares already present
65 self.pushed_shares = None # count of shares we pushed
68 # our current uri_extension is 846 bytes for small files, a few bytes
69 # more for larger ones (since the filesize is encoded in decimal in a
70 # few places). Ask for a little bit more just in case we need it. If
71 # the extension changes size, we can change EXTENSION_SIZE to
72 # allocate a more accurate amount of space.
74 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
78 def __init__(self, peerid, storage_server,
79 sharesize, blocksize, num_segments, num_share_hashes,
81 bucket_renewal_secret, bucket_cancel_secret):
82 precondition(isinstance(peerid, str), peerid)
83 precondition(len(peerid) == 20, peerid)
85 self._storageserver = storage_server # to an RIStorageServer
86 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
87 self.sharesize = sharesize
89 wbp = layout.make_write_bucket_proxy(None, sharesize,
90 blocksize, num_segments,
92 EXTENSION_SIZE, peerid)
93 self.wbp_class = wbp.__class__ # to create more of them
94 self.allocated_size = wbp.get_allocated_size()
95 self.blocksize = blocksize
96 self.num_segments = num_segments
97 self.num_share_hashes = num_share_hashes
98 self.storage_index = storage_index
100 self.renew_secret = bucket_renewal_secret
101 self.cancel_secret = bucket_cancel_secret
104 return ("<PeerTracker for peer %s and SI %s>"
105 % (idlib.shortnodeid_b2a(self.peerid),
106 si_b2a(self.storage_index)[:5]))
108 def query(self, sharenums):
109 d = self._storageserver.callRemote("allocate_buckets",
115 canary=Referenceable())
116 d.addCallback(self._got_reply)
119 def ask_about_existing_shares(self):
120 return self._storageserver.callRemote("get_buckets",
123 def _got_reply(self, (alreadygot, buckets)):
124 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
126 for sharenum, rref in buckets.iteritems():
127 bp = self.wbp_class(rref, self.sharesize,
130 self.num_share_hashes,
134 self.buckets.update(b)
135 return (alreadygot, set(b.keys()))
138 class Tahoe2PeerSelector:
140 def __init__(self, upload_id, logparent=None, upload_status=None):
141 self.upload_id = upload_id
142 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
143 # Peers that are working normally, but full.
146 self.num_peers_contacted = 0
147 self.last_failure_msg = None
148 self._status = IUploadStatus(upload_status)
149 self._log_parent = log.msg("%s starting" % self, parent=logparent)
152 return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
154 def get_shareholders(self, storage_broker, secret_holder,
155 storage_index, share_size, block_size,
156 num_segments, total_shares, needed_shares,
157 servers_of_happiness):
159 @return: (used_peers, already_peers), where used_peers is a set of
160 PeerTracker instances that have agreed to hold some shares
161 for us (the shnum is stashed inside the PeerTracker),
162 and already_peers is a dict mapping shnum to a set of peers
163 which claim to already have the share.
167 self._status.set_status("Contacting Peers..")
169 self.total_shares = total_shares
170 self.servers_of_happiness = servers_of_happiness
171 self.needed_shares = needed_shares
173 self.homeless_shares = range(total_shares)
174 self.contacted_peers = [] # peers worth asking again
175 self.contacted_peers2 = [] # peers that we have asked again
176 self._started_second_pass = False
177 self.use_peers = set() # PeerTrackers that have shares assigned to them
178 self.preexisting_shares = {} # shareid => set(peerids) holding shareid
179 # We don't try to allocate shares to these servers, since they've said
180 # that they're incapable of storing shares of the size that we'd want
181 # to store. We keep them around because they may have existing shares
182 # for this storage index, which we want to know about for accurate
183 # servers_of_happiness accounting
184 # (this is eventually a list, but it is initialized later)
185 self.readonly_peers = None
186 # These peers have shares -- any shares -- for our SI. We keep
187 # track of these to write an error message with them later.
188 self.peers_with_shares = set()
190 # this needed_hashes computation should mirror
191 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
192 # (instead of a HashTree) because we don't require actual hashing
193 # just to count the levels.
194 ht = hashtree.IncompleteHashTree(total_shares)
195 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
197 # figure out how much space to ask for
198 wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
199 num_share_hashes, EXTENSION_SIZE,
201 allocated_size = wbp.get_allocated_size()
202 all_peers = storage_broker.get_servers_for_index(storage_index)
204 raise NoServersError("client gave us zero peers")
206 # filter the list of peers according to which ones can accomodate
207 # this request. This excludes older peers (which used a 4-byte size
208 # field) from getting large shares (for files larger than about
209 # 12GiB). See #439 for details.
210 def _get_maxsize(peer):
211 (peerid, conn) = peer
212 v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
213 return v1["maximum-immutable-share-size"]
214 writable_peers = [peer for peer in all_peers
215 if _get_maxsize(peer) >= allocated_size]
216 readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers)
218 # decide upon the renewal/cancel secrets, to include them in the
219 # allocate_buckets query.
220 client_renewal_secret = secret_holder.get_renewal_secret()
221 client_cancel_secret = secret_holder.get_cancel_secret()
223 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
225 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
227 def _make_trackers(peers):
228 return [PeerTracker(peerid, conn,
229 share_size, block_size,
230 num_segments, num_share_hashes,
232 bucket_renewal_secret_hash(file_renewal_secret,
234 bucket_cancel_secret_hash(file_cancel_secret,
236 for (peerid, conn) in peers]
237 self.uncontacted_peers = _make_trackers(writable_peers)
238 self.readonly_peers = _make_trackers(readonly_peers)
239 # We now ask peers that can't hold any new shares about existing
240 # shares that they might have for our SI. Once this is done, we
241 # start placing the shares that we haven't already accounted
244 if self._status and self.readonly_peers:
245 self._status.set_status("Contacting readonly peers to find "
246 "any existing shares")
247 for peer in self.readonly_peers:
248 assert isinstance(peer, PeerTracker)
249 d = peer.ask_about_existing_shares()
250 d.addBoth(self._handle_existing_response, peer.peerid)
252 self.num_peers_contacted += 1
253 self.query_count += 1
254 log.msg("asking peer %s for any existing shares for "
256 % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
257 level=log.NOISY, parent=self._log_parent)
258 dl = defer.DeferredList(ds)
259 dl.addCallback(lambda ign: self._loop())
263 def _handle_existing_response(self, res, peer):
265 I handle responses to the queries sent by
266 Tahoe2PeerSelector._existing_shares.
268 if isinstance(res, failure.Failure):
269 log.msg("%s got error during existing shares check: %s"
270 % (idlib.shortnodeid_b2a(peer), res),
271 level=log.UNUSUAL, parent=self._log_parent)
272 self.error_count += 1
273 self.bad_query_count += 1
277 self.peers_with_shares.add(peer)
278 log.msg("response from peer %s: alreadygot=%s"
279 % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
280 level=log.NOISY, parent=self._log_parent)
281 for bucket in buckets:
282 self.preexisting_shares.setdefault(bucket, set()).add(peer)
283 if self.homeless_shares and bucket in self.homeless_shares:
284 self.homeless_shares.remove(bucket)
286 self.bad_query_count += 1
289 def _get_progress_message(self):
290 if not self.homeless_shares:
291 msg = "placed all %d shares, " % (self.total_shares)
293 msg = ("placed %d shares out of %d total (%d homeless), " %
294 (self.total_shares - len(self.homeless_shares),
296 len(self.homeless_shares)))
297 return (msg + "want to place shares on at least %d servers such that "
298 "any %d of them have enough shares to recover the file, "
299 "sent %d queries to %d peers, "
300 "%d queries placed some shares, %d placed none "
301 "(of which %d placed none due to the server being"
302 " full and %d placed none due to an error)" %
303 (self.servers_of_happiness, self.needed_shares,
304 self.query_count, self.num_peers_contacted,
305 self.good_query_count, self.bad_query_count,
306 self.full_count, self.error_count))
310 if not self.homeless_shares:
311 merged = merge_peers(self.preexisting_shares, self.use_peers)
312 effective_happiness = servers_of_happiness(merged)
313 if self.servers_of_happiness <= effective_happiness:
314 msg = ("peer selection successful for %s: %s" % (self,
315 self._get_progress_message()))
316 log.msg(msg, parent=self._log_parent)
317 return (self.use_peers, self.preexisting_shares)
319 # We're not okay right now, but maybe we can fix it by
320 # redistributing some shares. In cases where one or two
321 # servers has, before the upload, all or most of the
322 # shares for a given SI, this can work by allowing _loop
323 # a chance to spread those out over the other peers,
324 delta = self.servers_of_happiness - effective_happiness
325 shares = shares_by_server(self.preexisting_shares)
326 # Each server in shares maps to a set of shares stored on it.
327 # Since we want to keep at least one share on each server
328 # that has one (otherwise we'd only be making
329 # the situation worse by removing distinct servers),
330 # each server has len(its shares) - 1 to spread around.
331 shares_to_spread = sum([len(list(sharelist)) - 1
332 for (server, sharelist)
334 if delta <= len(self.uncontacted_peers) and \
335 shares_to_spread >= delta:
336 items = shares.items()
337 while len(self.homeless_shares) < delta:
338 # Loop through the allocated shares, removing
339 # one from each server that has more than one
340 # and putting it back into self.homeless_shares
341 # until we've done this delta times.
342 server, sharelist = items.pop()
343 if len(sharelist) > 1:
344 share = sharelist.pop()
345 self.homeless_shares.append(share)
346 self.preexisting_shares[share].remove(server)
347 if not self.preexisting_shares[share]:
348 del self.preexisting_shares[share]
349 items.append((server, sharelist))
352 # Redistribution won't help us; fail.
353 peer_count = len(self.peers_with_shares)
354 # If peer_count < needed_shares, then the second error
355 # message is nonsensical, so we use this one.
356 msg = failure_message(peer_count,
358 self.servers_of_happiness,
360 raise UploadUnhappinessError("%s (%s)" % (msg,
361 self._get_progress_message()))
363 if self.uncontacted_peers:
364 peer = self.uncontacted_peers.pop(0)
365 # TODO: don't pre-convert all peerids to PeerTrackers
366 assert isinstance(peer, PeerTracker)
368 shares_to_ask = set([self.homeless_shares.pop(0)])
369 self.query_count += 1
370 self.num_peers_contacted += 1
372 self._status.set_status("Contacting Peers [%s] (first query),"
374 % (idlib.shortnodeid_b2a(peer.peerid),
375 len(self.homeless_shares)))
376 d = peer.query(shares_to_ask)
377 d.addBoth(self._got_response, peer, shares_to_ask,
378 self.contacted_peers)
380 elif self.contacted_peers:
381 # ask a peer that we've already asked.
382 if not self._started_second_pass:
383 log.msg("starting second pass", parent=self._log_parent,
385 self._started_second_pass = True
386 num_shares = mathutil.div_ceil(len(self.homeless_shares),
387 len(self.contacted_peers))
388 peer = self.contacted_peers.pop(0)
389 shares_to_ask = set(self.homeless_shares[:num_shares])
390 self.homeless_shares[:num_shares] = []
391 self.query_count += 1
393 self._status.set_status("Contacting Peers [%s] (second query),"
395 % (idlib.shortnodeid_b2a(peer.peerid),
396 len(self.homeless_shares)))
397 d = peer.query(shares_to_ask)
398 d.addBoth(self._got_response, peer, shares_to_ask,
399 self.contacted_peers2)
401 elif self.contacted_peers2:
402 # we've finished the second-or-later pass. Move all the remaining
403 # peers back into self.contacted_peers for the next pass.
404 self.contacted_peers.extend(self.contacted_peers2)
405 self.contacted_peers2[:] = []
408 # no more peers. If we haven't placed enough shares, we fail.
409 placed_shares = self.total_shares - len(self.homeless_shares)
410 merged = merge_peers(self.preexisting_shares, self.use_peers)
411 effective_happiness = servers_of_happiness(merged)
412 if effective_happiness < self.servers_of_happiness:
413 msg = failure_message(len(self.peers_with_shares),
415 self.servers_of_happiness,
417 msg = ("peer selection failed for %s: %s (%s)" % (self,
419 self._get_progress_message()))
420 if self.last_failure_msg:
421 msg += " (%s)" % (self.last_failure_msg,)
422 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
423 raise UploadUnhappinessError(msg)
425 # we placed enough to be happy, so we're done
427 self._status.set_status("Placed all shares")
428 return (self.use_peers, self.preexisting_shares)
430 def _got_response(self, res, peer, shares_to_ask, put_peer_here):
431 if isinstance(res, failure.Failure):
432 # This is unusual, and probably indicates a bug or a network
434 log.msg("%s got error during peer selection: %s" % (peer, res),
435 level=log.UNUSUAL, parent=self._log_parent)
436 self.error_count += 1
437 self.bad_query_count += 1
438 self.homeless_shares = list(shares_to_ask) + self.homeless_shares
439 if (self.uncontacted_peers
440 or self.contacted_peers
441 or self.contacted_peers2):
442 # there is still hope, so just loop
445 # No more peers, so this upload might fail (it depends upon
446 # whether we've hit servers_of_happiness or not). Log the last
447 # failure we got: if a coding error causes all peers to fail
448 # in the same way, this allows the common failure to be seen
449 # by the uploader and should help with debugging
450 msg = ("last failure (from %s) was: %s" % (peer, res))
451 self.last_failure_msg = msg
453 (alreadygot, allocated) = res
454 log.msg("response from peer %s: alreadygot=%s, allocated=%s"
455 % (idlib.shortnodeid_b2a(peer.peerid),
456 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
457 level=log.NOISY, parent=self._log_parent)
460 self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
461 if s in self.homeless_shares:
462 self.homeless_shares.remove(s)
464 elif s in shares_to_ask:
467 # the PeerTracker will remember which shares were allocated on
468 # that peer. We just have to remember to use them.
470 self.use_peers.add(peer)
473 if allocated or alreadygot:
474 self.peers_with_shares.add(peer.peerid)
476 not_yet_present = set(shares_to_ask) - set(alreadygot)
477 still_homeless = not_yet_present - set(allocated)
480 # They accepted at least one of the shares that we asked
481 # them to accept, or they had a share that we didn't ask
482 # them to accept but that we hadn't placed yet, so this
483 # was a productive query
484 self.good_query_count += 1
486 self.bad_query_count += 1
490 # In networks with lots of space, this is very unusual and
491 # probably indicates an error. In networks with peers that
492 # are full, it is merely unusual. In networks that are very
493 # full, it is common, and many uploads will fail. In most
494 # cases, this is obviously not fatal, and we'll just use some
497 # some shares are still homeless, keep trying to find them a
498 # home. The ones that were rejected get first priority.
499 self.homeless_shares = (list(still_homeless)
500 + self.homeless_shares)
501 # Since they were unable to accept all of our requests, so it
502 # is safe to assume that asking them again won't help.
504 # if they *were* able to accept everything, they might be
505 # willing to accept even more.
506 put_peer_here.append(peer)
512 class EncryptAnUploadable:
513 """This is a wrapper that takes an IUploadable and provides
514 IEncryptedUploadable."""
515 implements(IEncryptedUploadable)
518 def __init__(self, original, log_parent=None):
519 self.original = IUploadable(original)
520 self._log_number = log_parent
521 self._encryptor = None
522 self._plaintext_hasher = plaintext_hasher()
523 self._plaintext_segment_hasher = None
524 self._plaintext_segment_hashes = []
525 self._encoding_parameters = None
526 self._file_size = None
527 self._ciphertext_bytes_read = 0
530 def set_upload_status(self, upload_status):
531 self._status = IUploadStatus(upload_status)
532 self.original.set_upload_status(upload_status)
534 def log(self, *args, **kwargs):
535 if "facility" not in kwargs:
536 kwargs["facility"] = "upload.encryption"
537 if "parent" not in kwargs:
538 kwargs["parent"] = self._log_number
539 return log.msg(*args, **kwargs)
542 if self._file_size is not None:
543 return defer.succeed(self._file_size)
544 d = self.original.get_size()
546 self._file_size = size
548 self._status.set_size(size)
550 d.addCallback(_got_size)
553 def get_all_encoding_parameters(self):
554 if self._encoding_parameters is not None:
555 return defer.succeed(self._encoding_parameters)
556 d = self.original.get_all_encoding_parameters()
557 def _got(encoding_parameters):
558 (k, happy, n, segsize) = encoding_parameters
559 self._segment_size = segsize # used by segment hashers
560 self._encoding_parameters = encoding_parameters
561 self.log("my encoding parameters: %s" % (encoding_parameters,),
563 return encoding_parameters
567 def _get_encryptor(self):
569 return defer.succeed(self._encryptor)
571 d = self.original.get_encryption_key()
576 storage_index = storage_index_hash(key)
577 assert isinstance(storage_index, str)
578 # There's no point to having the SI be longer than the key, so we
579 # specify that it is truncated to the same 128 bits as the AES key.
580 assert len(storage_index) == 16 # SHA-256 truncated to 128b
581 self._storage_index = storage_index
583 self._status.set_storage_index(storage_index)
588 def get_storage_index(self):
589 d = self._get_encryptor()
590 d.addCallback(lambda res: self._storage_index)
593 def _get_segment_hasher(self):
594 p = self._plaintext_segment_hasher
596 left = self._segment_size - self._plaintext_segment_hashed_bytes
598 p = plaintext_segment_hasher()
599 self._plaintext_segment_hasher = p
600 self._plaintext_segment_hashed_bytes = 0
601 return p, self._segment_size
603 def _update_segment_hash(self, chunk):
605 while offset < len(chunk):
606 p, segment_left = self._get_segment_hasher()
607 chunk_left = len(chunk) - offset
608 this_segment = min(chunk_left, segment_left)
609 p.update(chunk[offset:offset+this_segment])
610 self._plaintext_segment_hashed_bytes += this_segment
612 if self._plaintext_segment_hashed_bytes == self._segment_size:
613 # we've filled this segment
614 self._plaintext_segment_hashes.append(p.digest())
615 self._plaintext_segment_hasher = None
616 self.log("closed hash [%d]: %dB" %
617 (len(self._plaintext_segment_hashes)-1,
618 self._plaintext_segment_hashed_bytes),
620 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
621 segnum=len(self._plaintext_segment_hashes)-1,
622 hash=base32.b2a(p.digest()),
625 offset += this_segment
628 def read_encrypted(self, length, hash_only):
629 # make sure our parameters have been set up first
630 d = self.get_all_encoding_parameters()
632 d.addCallback(lambda ignored: self.get_size())
633 d.addCallback(lambda ignored: self._get_encryptor())
634 # then fetch and encrypt the plaintext. The unusual structure here
635 # (passing a Deferred *into* a function) is needed to avoid
636 # overflowing the stack: Deferreds don't optimize out tail recursion.
637 # We also pass in a list, to which _read_encrypted will append
640 d2 = defer.Deferred()
641 d.addCallback(lambda ignored:
642 self._read_encrypted(length, ciphertext, hash_only, d2))
643 d.addCallback(lambda ignored: d2)
646 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
648 fire_when_done.callback(ciphertext)
650 # tolerate large length= values without consuming a lot of RAM by
651 # reading just a chunk (say 50kB) at a time. This only really matters
652 # when hash_only==True (i.e. resuming an interrupted upload), since
653 # that's the case where we will be skipping over a lot of data.
654 size = min(remaining, self.CHUNKSIZE)
655 remaining = remaining - size
656 # read a chunk of plaintext..
657 d = defer.maybeDeferred(self.original.read, size)
658 # N.B.: if read() is synchronous, then since everything else is
659 # actually synchronous too, we'd blow the stack unless we stall for a
660 # tick. Once you accept a Deferred from IUploadable.read(), you must
661 # be prepared to have it fire immediately too.
662 d.addCallback(fireEventually)
663 def _good(plaintext):
665 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
666 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
667 ciphertext.extend(ct)
668 self._read_encrypted(remaining, ciphertext, hash_only,
671 fire_when_done.errback(why)
676 def _hash_and_encrypt_plaintext(self, data, hash_only):
677 assert isinstance(data, (tuple, list)), type(data)
680 # we use data.pop(0) instead of 'for chunk in data' to save
681 # memory: each chunk is destroyed as soon as we're done with it.
685 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
687 bytes_processed += len(chunk)
688 self._plaintext_hasher.update(chunk)
689 self._update_segment_hash(chunk)
690 # TODO: we have to encrypt the data (even if hash_only==True)
691 # because pycryptopp's AES-CTR implementation doesn't offer a
692 # way to change the counter value. Once pycryptopp acquires
693 # this ability, change this to simply update the counter
694 # before each call to (hash_only==False) _encryptor.process()
695 ciphertext = self._encryptor.process(chunk)
697 self.log(" skipping encryption", level=log.NOISY)
699 cryptdata.append(ciphertext)
702 self._ciphertext_bytes_read += bytes_processed
704 progress = float(self._ciphertext_bytes_read) / self._file_size
705 self._status.set_progress(1, progress)
709 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
710 # this is currently unused, but will live again when we fix #453
711 if len(self._plaintext_segment_hashes) < num_segments:
712 # close out the last one
713 assert len(self._plaintext_segment_hashes) == num_segments-1
714 p, segment_left = self._get_segment_hasher()
715 self._plaintext_segment_hashes.append(p.digest())
716 del self._plaintext_segment_hasher
717 self.log("closing plaintext leaf hasher, hashed %d bytes" %
718 self._plaintext_segment_hashed_bytes,
720 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
721 segnum=len(self._plaintext_segment_hashes)-1,
722 hash=base32.b2a(p.digest()),
724 assert len(self._plaintext_segment_hashes) == num_segments
725 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
727 def get_plaintext_hash(self):
728 h = self._plaintext_hasher.digest()
729 return defer.succeed(h)
732 return self.original.close()
735 implements(IUploadStatus)
736 statusid_counter = itertools.count(0)
739 self.storage_index = None
742 self.status = "Not started"
743 self.progress = [0.0, 0.0, 0.0]
746 self.counter = self.statusid_counter.next()
747 self.started = time.time()
749 def get_started(self):
751 def get_storage_index(self):
752 return self.storage_index
755 def using_helper(self):
757 def get_status(self):
759 def get_progress(self):
760 return tuple(self.progress)
761 def get_active(self):
763 def get_results(self):
765 def get_counter(self):
768 def set_storage_index(self, si):
769 self.storage_index = si
770 def set_size(self, size):
772 def set_helper(self, helper):
774 def set_status(self, status):
776 def set_progress(self, which, value):
777 # [0]: chk, [1]: ciphertext, [2]: encode+push
778 self.progress[which] = value
779 def set_active(self, value):
781 def set_results(self, value):
785 peer_selector_class = Tahoe2PeerSelector
787 def __init__(self, storage_broker, secret_holder):
788 # peer_selector needs storage_broker and secret_holder
789 self._storage_broker = storage_broker
790 self._secret_holder = secret_holder
791 self._log_number = self.log("CHKUploader starting", parent=None)
793 self._results = UploadResults()
794 self._storage_index = None
795 self._upload_status = UploadStatus()
796 self._upload_status.set_helper(False)
797 self._upload_status.set_active(True)
798 self._upload_status.set_results(self._results)
800 # locate_all_shareholders() will create the following attribute:
801 # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
803 def log(self, *args, **kwargs):
804 if "parent" not in kwargs:
805 kwargs["parent"] = self._log_number
806 if "facility" not in kwargs:
807 kwargs["facility"] = "tahoe.upload"
808 return log.msg(*args, **kwargs)
810 def start(self, encrypted_uploadable):
811 """Start uploading the file.
813 Returns a Deferred that will fire with the UploadResults instance.
816 self._started = time.time()
817 eu = IEncryptedUploadable(encrypted_uploadable)
818 self.log("starting upload of %s" % eu)
820 eu.set_upload_status(self._upload_status)
821 d = self.start_encrypted(eu)
822 def _done(uploadresults):
823 self._upload_status.set_active(False)
829 """Call this if the upload must be abandoned before it completes.
830 This will tell the shareholders to delete their partial shares. I
831 return a Deferred that fires when these messages have been acked."""
832 if not self._encoder:
833 # how did you call abort() before calling start() ?
834 return defer.succeed(None)
835 return self._encoder.abort()
837 def start_encrypted(self, encrypted):
838 """ Returns a Deferred that will fire with the UploadResults instance. """
839 eu = IEncryptedUploadable(encrypted)
841 started = time.time()
842 self._encoder = e = encode.Encoder(self._log_number,
844 d = e.set_encrypted_uploadable(eu)
845 d.addCallback(self.locate_all_shareholders, started)
846 d.addCallback(self.set_shareholders, e)
847 d.addCallback(lambda res: e.start())
848 d.addCallback(self._encrypted_done)
851 def locate_all_shareholders(self, encoder, started):
852 peer_selection_started = now = time.time()
853 self._storage_index_elapsed = now - started
854 storage_broker = self._storage_broker
855 secret_holder = self._secret_holder
856 storage_index = encoder.get_param("storage_index")
857 self._storage_index = storage_index
858 upload_id = si_b2a(storage_index)[:5]
859 self.log("using storage index %s" % upload_id)
860 peer_selector = self.peer_selector_class(upload_id, self._log_number,
863 share_size = encoder.get_param("share_size")
864 block_size = encoder.get_param("block_size")
865 num_segments = encoder.get_param("num_segments")
866 k,desired,n = encoder.get_param("share_counts")
868 self._peer_selection_started = time.time()
869 d = peer_selector.get_shareholders(storage_broker, secret_holder,
871 share_size, block_size,
872 num_segments, n, k, desired)
874 self._peer_selection_elapsed = time.time() - peer_selection_started
879 def set_shareholders(self, (used_peers, already_peers), encoder):
881 @param used_peers: a sequence of PeerTracker objects
882 @paran already_peers: a dict mapping sharenum to a set of peerids
883 that claim to already have this share
885 self.log("_send_shares, used_peers is %s" % (used_peers,))
886 # record already-present shares in self._results
887 self._results.preexisting_shares = len(already_peers)
889 self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
890 for peer in used_peers:
891 assert isinstance(peer, PeerTracker)
893 servermap = already_peers.copy()
894 for peer in used_peers:
895 buckets.update(peer.buckets)
896 for shnum in peer.buckets:
897 self._peer_trackers[shnum] = peer
898 servermap.setdefault(shnum, set()).add(peer.peerid)
899 assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
900 encoder.set_shareholders(buckets, servermap)
902 def _encrypted_done(self, verifycap):
903 """ Returns a Deferred that will fire with the UploadResults instance. """
905 for shnum in self._encoder.get_shares_placed():
906 peer_tracker = self._peer_trackers[shnum]
907 peerid = peer_tracker.peerid
908 r.sharemap.add(shnum, peerid)
909 r.servermap.add(peerid, shnum)
910 r.pushed_shares = len(self._encoder.get_shares_placed())
912 r.file_size = self._encoder.file_size
913 r.timings["total"] = now - self._started
914 r.timings["storage_index"] = self._storage_index_elapsed
915 r.timings["peer_selection"] = self._peer_selection_elapsed
916 r.timings.update(self._encoder.get_times())
917 r.uri_extension_data = self._encoder.get_uri_extension_data()
918 r.verifycapstr = verifycap.to_string()
921 def get_upload_status(self):
922 return self._upload_status
924 def read_this_many_bytes(uploadable, size, prepend_data=[]):
926 return defer.succeed([])
927 d = uploadable.read(size)
929 assert isinstance(data, list)
930 bytes = sum([len(piece) for piece in data])
933 remaining = size - bytes
935 return read_this_many_bytes(uploadable, remaining,
937 return prepend_data + data
941 class LiteralUploader:
944 self._results = UploadResults()
945 self._status = s = UploadStatus()
946 s.set_storage_index(None)
948 s.set_progress(0, 1.0)
950 s.set_results(self._results)
952 def start(self, uploadable):
953 uploadable = IUploadable(uploadable)
954 d = uploadable.get_size()
957 self._status.set_size(size)
958 self._results.file_size = size
959 return read_this_many_bytes(uploadable, size)
960 d.addCallback(_got_size)
961 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
962 d.addCallback(lambda u: u.to_string())
963 d.addCallback(self._build_results)
966 def _build_results(self, uri):
967 self._results.uri = uri
968 self._status.set_status("Finished")
969 self._status.set_progress(1, 1.0)
970 self._status.set_progress(2, 1.0)
976 def get_upload_status(self):
979 class RemoteEncryptedUploadable(Referenceable):
980 implements(RIEncryptedUploadable)
982 def __init__(self, encrypted_uploadable, upload_status):
983 self._eu = IEncryptedUploadable(encrypted_uploadable)
986 self._status = IUploadStatus(upload_status)
987 # we are responsible for updating the status string while we run, and
988 # for setting the ciphertext-fetch progress.
992 if self._size is not None:
993 return defer.succeed(self._size)
994 d = self._eu.get_size()
998 d.addCallback(_got_size)
1001 def remote_get_size(self):
1002 return self.get_size()
1003 def remote_get_all_encoding_parameters(self):
1004 return self._eu.get_all_encoding_parameters()
1006 def _read_encrypted(self, length, hash_only):
1007 d = self._eu.read_encrypted(length, hash_only)
1010 self._offset += length
1012 size = sum([len(data) for data in strings])
1013 self._offset += size
1015 d.addCallback(_read)
1018 def remote_read_encrypted(self, offset, length):
1019 # we don't support seek backwards, but we allow skipping forwards
1020 precondition(offset >= 0, offset)
1021 precondition(length >= 0, length)
1022 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1024 precondition(offset >= self._offset, offset, self._offset)
1025 if offset > self._offset:
1026 # read the data from disk anyways, to build up the hash tree
1027 skip = offset - self._offset
1028 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1029 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1030 d = self._read_encrypted(skip, hash_only=True)
1032 d = defer.succeed(None)
1034 def _at_correct_offset(res):
1035 assert offset == self._offset, "%d != %d" % (offset, self._offset)
1036 return self._read_encrypted(length, hash_only=False)
1037 d.addCallback(_at_correct_offset)
1040 size = sum([len(data) for data in strings])
1041 self._bytes_sent += size
1043 d.addCallback(_read)
1046 def remote_close(self):
1047 return self._eu.close()
1050 class AssistedUploader:
1052 def __init__(self, helper):
1053 self._helper = helper
1054 self._log_number = log.msg("AssistedUploader starting")
1055 self._storage_index = None
1056 self._upload_status = s = UploadStatus()
1060 def log(self, *args, **kwargs):
1061 if "parent" not in kwargs:
1062 kwargs["parent"] = self._log_number
1063 return log.msg(*args, **kwargs)
1065 def start(self, encrypted_uploadable, storage_index):
1066 """Start uploading the file.
1068 Returns a Deferred that will fire with the UploadResults instance.
1070 precondition(isinstance(storage_index, str), storage_index)
1071 self._started = time.time()
1072 eu = IEncryptedUploadable(encrypted_uploadable)
1073 eu.set_upload_status(self._upload_status)
1074 self._encuploadable = eu
1075 self._storage_index = storage_index
1077 d.addCallback(self._got_size)
1078 d.addCallback(lambda res: eu.get_all_encoding_parameters())
1079 d.addCallback(self._got_all_encoding_parameters)
1080 d.addCallback(self._contact_helper)
1081 d.addCallback(self._build_verifycap)
1083 self._upload_status.set_active(False)
1088 def _got_size(self, size):
1090 self._upload_status.set_size(size)
1092 def _got_all_encoding_parameters(self, params):
1093 k, happy, n, segment_size = params
1094 # stash these for URI generation later
1095 self._needed_shares = k
1096 self._total_shares = n
1097 self._segment_size = segment_size
1099 def _contact_helper(self, res):
1100 now = self._time_contacting_helper_start = time.time()
1101 self._storage_index_elapsed = now - self._started
1102 self.log(format="contacting helper for SI %(si)s..",
1103 si=si_b2a(self._storage_index))
1104 self._upload_status.set_status("Contacting Helper")
1105 d = self._helper.callRemote("upload_chk", self._storage_index)
1106 d.addCallback(self._contacted_helper)
1109 def _contacted_helper(self, (upload_results, upload_helper)):
1111 elapsed = now - self._time_contacting_helper_start
1112 self._elapsed_time_contacting_helper = elapsed
1114 self.log("helper says we need to upload")
1115 self._upload_status.set_status("Uploading Ciphertext")
1116 # we need to upload the file
1117 reu = RemoteEncryptedUploadable(self._encuploadable,
1118 self._upload_status)
1119 # let it pre-compute the size for progress purposes
1121 d.addCallback(lambda ignored:
1122 upload_helper.callRemote("upload", reu))
1123 # this Deferred will fire with the upload results
1125 self.log("helper says file is already uploaded")
1126 self._upload_status.set_progress(1, 1.0)
1127 self._upload_status.set_results(upload_results)
1128 return upload_results
1130 def _convert_old_upload_results(self, upload_results):
1131 # pre-1.3.0 helpers return upload results which contain a mapping
1132 # from shnum to a single human-readable string, containing things
1133 # like "Found on [x],[y],[z]" (for healthy files that were already in
1134 # the grid), "Found on [x]" (for files that needed upload but which
1135 # discovered pre-existing shares), and "Placed on [x]" (for newly
1136 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1137 # set of binary serverid strings.
1139 # the old results are too hard to deal with (they don't even contain
1140 # as much information as the new results, since the nodeids are
1141 # abbreviated), so if we detect old results, just clobber them.
1143 sharemap = upload_results.sharemap
1144 if str in [type(v) for v in sharemap.values()]:
1145 upload_results.sharemap = None
1147 def _build_verifycap(self, upload_results):
1148 self.log("upload finished, building readcap")
1149 self._convert_old_upload_results(upload_results)
1150 self._upload_status.set_status("Building Readcap")
1152 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1153 assert r.uri_extension_data["total_shares"] == self._total_shares
1154 assert r.uri_extension_data["segment_size"] == self._segment_size
1155 assert r.uri_extension_data["size"] == self._size
1156 r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1157 uri_extension_hash=r.uri_extension_hash,
1158 needed_shares=self._needed_shares,
1159 total_shares=self._total_shares, size=self._size
1162 r.file_size = self._size
1163 r.timings["storage_index"] = self._storage_index_elapsed
1164 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1165 if "total" in r.timings:
1166 r.timings["helper_total"] = r.timings["total"]
1167 r.timings["total"] = now - self._started
1168 self._upload_status.set_status("Finished")
1169 self._upload_status.set_results(r)
1172 def get_upload_status(self):
1173 return self._upload_status
1175 class BaseUploadable:
1176 default_max_segment_size = 128*KiB # overridden by max_segment_size
1177 default_encoding_param_k = 3 # overridden by encoding_parameters
1178 default_encoding_param_happy = 7
1179 default_encoding_param_n = 10
1181 max_segment_size = None
1182 encoding_param_k = None
1183 encoding_param_happy = None
1184 encoding_param_n = None
1186 _all_encoding_parameters = None
1189 def set_upload_status(self, upload_status):
1190 self._status = IUploadStatus(upload_status)
1192 def set_default_encoding_parameters(self, default_params):
1193 assert isinstance(default_params, dict)
1194 for k,v in default_params.items():
1195 precondition(isinstance(k, str), k, v)
1196 precondition(isinstance(v, int), k, v)
1197 if "k" in default_params:
1198 self.default_encoding_param_k = default_params["k"]
1199 if "happy" in default_params:
1200 self.default_encoding_param_happy = default_params["happy"]
1201 if "n" in default_params:
1202 self.default_encoding_param_n = default_params["n"]
1203 if "max_segment_size" in default_params:
1204 self.default_max_segment_size = default_params["max_segment_size"]
1206 def get_all_encoding_parameters(self):
1207 if self._all_encoding_parameters:
1208 return defer.succeed(self._all_encoding_parameters)
1210 max_segsize = self.max_segment_size or self.default_max_segment_size
1211 k = self.encoding_param_k or self.default_encoding_param_k
1212 happy = self.encoding_param_happy or self.default_encoding_param_happy
1213 n = self.encoding_param_n or self.default_encoding_param_n
1216 def _got_size(file_size):
1217 # for small files, shrink the segment size to avoid wasting space
1218 segsize = min(max_segsize, file_size)
1219 # this must be a multiple of 'required_shares'==k
1220 segsize = mathutil.next_multiple(segsize, k)
1221 encoding_parameters = (k, happy, n, segsize)
1222 self._all_encoding_parameters = encoding_parameters
1223 return encoding_parameters
1224 d.addCallback(_got_size)
1227 class FileHandle(BaseUploadable):
1228 implements(IUploadable)
1230 def __init__(self, filehandle, convergence):
1232 Upload the data from the filehandle. If convergence is None then a
1233 random encryption key will be used, else the plaintext will be hashed,
1234 then the hash will be hashed together with the string in the
1235 "convergence" argument to form the encryption key.
1237 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1238 self._filehandle = filehandle
1240 self.convergence = convergence
1243 def _get_encryption_key_convergent(self):
1244 if self._key is not None:
1245 return defer.succeed(self._key)
1248 # that sets self._size as a side-effect
1249 d.addCallback(lambda size: self.get_all_encoding_parameters())
1251 k, happy, n, segsize = params
1252 f = self._filehandle
1253 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1258 data = f.read(BLOCKSIZE)
1261 enckey_hasher.update(data)
1262 # TODO: setting progress in a non-yielding loop is kind of
1263 # pointless, but I'm anticipating (perhaps prematurely) the
1264 # day when we use a slowjob or twisted's CooperatorService to
1265 # make this yield time to other jobs.
1266 bytes_read += len(data)
1268 self._status.set_progress(0, float(bytes_read)/self._size)
1270 self._key = enckey_hasher.digest()
1272 self._status.set_progress(0, 1.0)
1273 assert len(self._key) == 16
1278 def _get_encryption_key_random(self):
1279 if self._key is None:
1280 self._key = os.urandom(16)
1281 return defer.succeed(self._key)
1283 def get_encryption_key(self):
1284 if self.convergence is not None:
1285 return self._get_encryption_key_convergent()
1287 return self._get_encryption_key_random()
1290 if self._size is not None:
1291 return defer.succeed(self._size)
1292 self._filehandle.seek(0,2)
1293 size = self._filehandle.tell()
1295 self._filehandle.seek(0)
1296 return defer.succeed(size)
1298 def read(self, length):
1299 return defer.succeed([self._filehandle.read(length)])
1302 # the originator of the filehandle reserves the right to close it
1305 class FileName(FileHandle):
1306 def __init__(self, filename, convergence):
1308 Upload the data from the filename. If convergence is None then a
1309 random encryption key will be used, else the plaintext will be hashed,
1310 then the hash will be hashed together with the string in the
1311 "convergence" argument to form the encryption key.
1313 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1314 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1316 FileHandle.close(self)
1317 self._filehandle.close()
1319 class Data(FileHandle):
1320 def __init__(self, data, convergence):
1322 Upload the data from the data argument. If convergence is None then a
1323 random encryption key will be used, else the plaintext will be hashed,
1324 then the hash will be hashed together with the string in the
1325 "convergence" argument to form the encryption key.
1327 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1328 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1330 class Uploader(service.MultiService, log.PrefixingLogMixin):
1331 """I am a service that allows file uploading. I am a service-child of the
1334 implements(IUploader)
1336 URI_LIT_SIZE_THRESHOLD = 55
1338 def __init__(self, helper_furl=None, stats_provider=None):
1339 self._helper_furl = helper_furl
1340 self.stats_provider = stats_provider
1342 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1343 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1344 service.MultiService.__init__(self)
1346 def startService(self):
1347 service.MultiService.startService(self)
1348 if self._helper_furl:
1349 self.parent.tub.connectTo(self._helper_furl,
1352 def _got_helper(self, helper):
1353 self.log("got helper connection, getting versions")
1354 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1356 "application-version": "unknown: no get_version()",
1358 d = add_version_to_remote_reference(helper, default)
1359 d.addCallback(self._got_versioned_helper)
1361 def _got_versioned_helper(self, helper):
1362 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1363 if needed not in helper.version:
1364 raise InsufficientVersionError(needed, helper.version)
1365 self._helper = helper
1366 helper.notifyOnDisconnect(self._lost_helper)
1368 def _lost_helper(self):
1371 def get_helper_info(self):
1372 # return a tuple of (helper_furl_or_None, connected_bool)
1373 return (self._helper_furl, bool(self._helper))
1376 def upload(self, uploadable, history=None):
1378 Returns a Deferred that will fire with the UploadResults instance.
1383 uploadable = IUploadable(uploadable)
1384 d = uploadable.get_size()
1385 def _got_size(size):
1386 default_params = self.parent.get_encoding_parameters()
1387 precondition(isinstance(default_params, dict), default_params)
1388 precondition("max_segment_size" in default_params, default_params)
1389 uploadable.set_default_encoding_parameters(default_params)
1391 if self.stats_provider:
1392 self.stats_provider.count('uploader.files_uploaded', 1)
1393 self.stats_provider.count('uploader.bytes_uploaded', size)
1395 if size <= self.URI_LIT_SIZE_THRESHOLD:
1396 uploader = LiteralUploader()
1397 return uploader.start(uploadable)
1399 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1400 d2 = defer.succeed(None)
1402 uploader = AssistedUploader(self._helper)
1403 d2.addCallback(lambda x: eu.get_storage_index())
1404 d2.addCallback(lambda si: uploader.start(eu, si))
1406 storage_broker = self.parent.get_storage_broker()
1407 secret_holder = self.parent._secret_holder
1408 uploader = CHKUploader(storage_broker, secret_holder)
1409 d2.addCallback(lambda x: uploader.start(eu))
1411 self._all_uploads[uploader] = None
1413 history.add_upload(uploader.get_upload_status())
1414 def turn_verifycap_into_read_cap(uploadresults):
1415 # Generate the uri from the verifycap plus the key.
1416 d3 = uploadable.get_encryption_key()
1417 def put_readcap_into_results(key):
1418 v = uri.from_string(uploadresults.verifycapstr)
1419 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1420 uploadresults.uri = r.to_string()
1421 return uploadresults
1422 d3.addCallback(put_readcap_into_results)
1424 d2.addCallback(turn_verifycap_into_read_cap)
1426 d.addCallback(_got_size)