1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9 file_cancel_secret_hash, bucket_renewal_secret_hash, \
10 bucket_cancel_secret_hash, plaintext_hasher, \
11 storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17 shares_by_server, merge_peers, \
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23 NoServersError, InsufficientVersionError, UploadUnhappinessError
24 from allmydata.immutable import layout
25 from pycryptopp.cipher.aes import AES
27 from cStringIO import StringIO
36 class HaveAllPeersError(Exception):
37 # we use this to jump out of the loop
40 # this wants to live in storage, not here
41 class TooFullError(Exception):
44 class UploadResults(Copyable, RemoteCopy):
45 implements(IUploadResults)
46 # note: don't change this string, it needs to match the value used on the
47 # helper, and it does *not* need to match the fully-qualified
48 # package/module/class name
49 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
52 # also, think twice about changing the shape of any existing attribute,
53 # because instances of this class are sent from the helper to its client,
54 # so changing this may break compatibility. Consider adding new fields
55 # instead of modifying existing ones.
58 self.timings = {} # dict of name to number of seconds
59 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
60 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
62 self.ciphertext_fetched = None # how much the helper fetched
64 self.preexisting_shares = None # count of shares already present
65 self.pushed_shares = None # count of shares we pushed
68 # our current uri_extension is 846 bytes for small files, a few bytes
69 # more for larger ones (since the filesize is encoded in decimal in a
70 # few places). Ask for a little bit more just in case we need it. If
71 # the extension changes size, we can change EXTENSION_SIZE to
72 # allocate a more accurate amount of space.
74 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
78 def __init__(self, peerid, storage_server,
79 sharesize, blocksize, num_segments, num_share_hashes,
81 bucket_renewal_secret, bucket_cancel_secret):
82 precondition(isinstance(peerid, str), peerid)
83 precondition(len(peerid) == 20, peerid)
85 self._storageserver = storage_server # to an RIStorageServer
86 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
87 self.sharesize = sharesize
89 wbp = layout.make_write_bucket_proxy(None, sharesize,
90 blocksize, num_segments,
92 EXTENSION_SIZE, peerid)
93 self.wbp_class = wbp.__class__ # to create more of them
94 self.allocated_size = wbp.get_allocated_size()
95 self.blocksize = blocksize
96 self.num_segments = num_segments
97 self.num_share_hashes = num_share_hashes
98 self.storage_index = storage_index
100 self.renew_secret = bucket_renewal_secret
101 self.cancel_secret = bucket_cancel_secret
104 return ("<PeerTracker for peer %s and SI %s>"
105 % (idlib.shortnodeid_b2a(self.peerid),
106 si_b2a(self.storage_index)[:5]))
108 def query(self, sharenums):
109 d = self._storageserver.callRemote("allocate_buckets",
115 canary=Referenceable())
116 d.addCallback(self._got_reply)
119 def ask_about_existing_shares(self):
120 return self._storageserver.callRemote("get_buckets",
123 def _got_reply(self, (alreadygot, buckets)):
124 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
126 for sharenum, rref in buckets.iteritems():
127 bp = self.wbp_class(rref, self.sharesize,
130 self.num_share_hashes,
134 self.buckets.update(b)
135 return (alreadygot, set(b.keys()))
140 I abort the remote bucket writers for all shares. This is a good idea
141 to conserve space on the storage server.
143 self.abort_some_buckets(self.buckets.keys())
145 def abort_some_buckets(self, sharenums):
147 I abort the remote bucket writers for the share numbers in sharenums.
149 for sharenum in sharenums:
150 if sharenum in self.buckets:
151 self.buckets[sharenum].abort()
152 del self.buckets[sharenum]
155 class Tahoe2PeerSelector:
157 def __init__(self, upload_id, logparent=None, upload_status=None):
158 self.upload_id = upload_id
159 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
160 # Peers that are working normally, but full.
163 self.num_peers_contacted = 0
164 self.last_failure_msg = None
165 self._status = IUploadStatus(upload_status)
166 self._log_parent = log.msg("%s starting" % self, parent=logparent)
169 return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
171 def get_shareholders(self, storage_broker, secret_holder,
172 storage_index, share_size, block_size,
173 num_segments, total_shares, needed_shares,
174 servers_of_happiness):
176 @return: (upload_servers, already_peers), where upload_servers is a set of
177 PeerTracker instances that have agreed to hold some shares
178 for us (the shareids are stashed inside the PeerTracker),
179 and already_peers is a dict mapping shnum to a set of peers
180 which claim to already have the share.
184 self._status.set_status("Contacting Peers..")
186 self.total_shares = total_shares
187 self.servers_of_happiness = servers_of_happiness
188 self.needed_shares = needed_shares
190 self.homeless_shares = range(total_shares)
191 self.contacted_peers = [] # peers worth asking again
192 self.contacted_peers2 = [] # peers that we have asked again
193 self._started_second_pass = False
194 self.use_peers = set() # PeerTrackers that have shares assigned to them
195 self.preexisting_shares = {} # shareid => set(peerids) holding shareid
196 # We don't try to allocate shares to these servers, since they've said
197 # that they're incapable of storing shares of the size that we'd want
198 # to store. We keep them around because they may have existing shares
199 # for this storage index, which we want to know about for accurate
200 # servers_of_happiness accounting
201 # (this is eventually a list, but it is initialized later)
202 self.readonly_peers = None
203 # These peers have shares -- any shares -- for our SI. We keep
204 # track of these to write an error message with them later.
205 self.peers_with_shares = set()
207 # this needed_hashes computation should mirror
208 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
209 # (instead of a HashTree) because we don't require actual hashing
210 # just to count the levels.
211 ht = hashtree.IncompleteHashTree(total_shares)
212 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
214 # figure out how much space to ask for
215 wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
216 num_share_hashes, EXTENSION_SIZE,
218 allocated_size = wbp.get_allocated_size()
219 all_peers = storage_broker.get_servers_for_index(storage_index)
221 raise NoServersError("client gave us zero peers")
223 # filter the list of peers according to which ones can accomodate
224 # this request. This excludes older peers (which used a 4-byte size
225 # field) from getting large shares (for files larger than about
226 # 12GiB). See #439 for details.
227 def _get_maxsize(peer):
228 (peerid, conn) = peer
229 v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
230 return v1["maximum-immutable-share-size"]
231 writable_peers = [peer for peer in all_peers
232 if _get_maxsize(peer) >= allocated_size]
233 readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers)
235 # decide upon the renewal/cancel secrets, to include them in the
236 # allocate_buckets query.
237 client_renewal_secret = secret_holder.get_renewal_secret()
238 client_cancel_secret = secret_holder.get_cancel_secret()
240 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
242 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
244 def _make_trackers(peers):
245 return [PeerTracker(peerid, conn,
246 share_size, block_size,
247 num_segments, num_share_hashes,
249 bucket_renewal_secret_hash(file_renewal_secret,
251 bucket_cancel_secret_hash(file_cancel_secret,
253 for (peerid, conn) in peers]
254 self.uncontacted_peers = _make_trackers(writable_peers)
255 self.readonly_peers = _make_trackers(readonly_peers)
256 # We now ask peers that can't hold any new shares about existing
257 # shares that they might have for our SI. Once this is done, we
258 # start placing the shares that we haven't already accounted
261 if self._status and self.readonly_peers:
262 self._status.set_status("Contacting readonly peers to find "
263 "any existing shares")
264 for peer in self.readonly_peers:
265 assert isinstance(peer, PeerTracker)
266 d = peer.ask_about_existing_shares()
267 d.addBoth(self._handle_existing_response, peer.peerid)
269 self.num_peers_contacted += 1
270 self.query_count += 1
271 log.msg("asking peer %s for any existing shares for "
273 % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
274 level=log.NOISY, parent=self._log_parent)
275 dl = defer.DeferredList(ds)
276 dl.addCallback(lambda ign: self._loop())
280 def _handle_existing_response(self, res, peer):
282 I handle responses to the queries sent by
283 Tahoe2PeerSelector._existing_shares.
285 if isinstance(res, failure.Failure):
286 log.msg("%s got error during existing shares check: %s"
287 % (idlib.shortnodeid_b2a(peer), res),
288 level=log.UNUSUAL, parent=self._log_parent)
289 self.error_count += 1
290 self.bad_query_count += 1
294 self.peers_with_shares.add(peer)
295 log.msg("response from peer %s: alreadygot=%s"
296 % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
297 level=log.NOISY, parent=self._log_parent)
298 for bucket in buckets:
299 self.preexisting_shares.setdefault(bucket, set()).add(peer)
300 if self.homeless_shares and bucket in self.homeless_shares:
301 self.homeless_shares.remove(bucket)
303 self.bad_query_count += 1
306 def _get_progress_message(self):
307 if not self.homeless_shares:
308 msg = "placed all %d shares, " % (self.total_shares)
310 msg = ("placed %d shares out of %d total (%d homeless), " %
311 (self.total_shares - len(self.homeless_shares),
313 len(self.homeless_shares)))
314 return (msg + "want to place shares on at least %d servers such that "
315 "any %d of them have enough shares to recover the file, "
316 "sent %d queries to %d peers, "
317 "%d queries placed some shares, %d placed none "
318 "(of which %d placed none due to the server being"
319 " full and %d placed none due to an error)" %
320 (self.servers_of_happiness, self.needed_shares,
321 self.query_count, self.num_peers_contacted,
322 self.good_query_count, self.bad_query_count,
323 self.full_count, self.error_count))
327 if not self.homeless_shares:
328 merged = merge_peers(self.preexisting_shares, self.use_peers)
329 effective_happiness = servers_of_happiness(merged)
330 if self.servers_of_happiness <= effective_happiness:
331 msg = ("peer selection successful for %s: %s" % (self,
332 self._get_progress_message()))
333 log.msg(msg, parent=self._log_parent)
334 return (self.use_peers, self.preexisting_shares)
336 # We're not okay right now, but maybe we can fix it by
337 # redistributing some shares. In cases where one or two
338 # servers has, before the upload, all or most of the
339 # shares for a given SI, this can work by allowing _loop
340 # a chance to spread those out over the other peers,
341 delta = self.servers_of_happiness - effective_happiness
342 shares = shares_by_server(self.preexisting_shares)
343 # Each server in shares maps to a set of shares stored on it.
344 # Since we want to keep at least one share on each server
345 # that has one (otherwise we'd only be making
346 # the situation worse by removing distinct servers),
347 # each server has len(its shares) - 1 to spread around.
348 shares_to_spread = sum([len(list(sharelist)) - 1
349 for (server, sharelist)
351 if delta <= len(self.uncontacted_peers) and \
352 shares_to_spread >= delta:
353 items = shares.items()
354 while len(self.homeless_shares) < delta:
355 # Loop through the allocated shares, removing
356 # one from each server that has more than one
357 # and putting it back into self.homeless_shares
358 # until we've done this delta times.
359 server, sharelist = items.pop()
360 if len(sharelist) > 1:
361 share = sharelist.pop()
362 self.homeless_shares.append(share)
363 self.preexisting_shares[share].remove(server)
364 if not self.preexisting_shares[share]:
365 del self.preexisting_shares[share]
366 items.append((server, sharelist))
367 for writer in self.use_peers:
368 writer.abort_some_buckets(self.homeless_shares)
371 # Redistribution won't help us; fail.
372 peer_count = len(self.peers_with_shares)
373 msg = failure_message(peer_count,
375 self.servers_of_happiness,
377 log.msg("server selection unsuccessful for %r: %s (%s), merged=%r"
378 % (self, msg, self._get_progress_message(), merged), level=log.INFREQUENT)
379 return self._failed("%s (%s)" % (msg, self._get_progress_message()))
381 if self.uncontacted_peers:
382 peer = self.uncontacted_peers.pop(0)
383 # TODO: don't pre-convert all peerids to PeerTrackers
384 assert isinstance(peer, PeerTracker)
386 shares_to_ask = set([self.homeless_shares.pop(0)])
387 self.query_count += 1
388 self.num_peers_contacted += 1
390 self._status.set_status("Contacting Peers [%s] (first query),"
392 % (idlib.shortnodeid_b2a(peer.peerid),
393 len(self.homeless_shares)))
394 d = peer.query(shares_to_ask)
395 d.addBoth(self._got_response, peer, shares_to_ask,
396 self.contacted_peers)
398 elif self.contacted_peers:
399 # ask a peer that we've already asked.
400 if not self._started_second_pass:
401 log.msg("starting second pass", parent=self._log_parent,
403 self._started_second_pass = True
404 num_shares = mathutil.div_ceil(len(self.homeless_shares),
405 len(self.contacted_peers))
406 peer = self.contacted_peers.pop(0)
407 shares_to_ask = set(self.homeless_shares[:num_shares])
408 self.homeless_shares[:num_shares] = []
409 self.query_count += 1
411 self._status.set_status("Contacting Peers [%s] (second query),"
413 % (idlib.shortnodeid_b2a(peer.peerid),
414 len(self.homeless_shares)))
415 d = peer.query(shares_to_ask)
416 d.addBoth(self._got_response, peer, shares_to_ask,
417 self.contacted_peers2)
419 elif self.contacted_peers2:
420 # we've finished the second-or-later pass. Move all the remaining
421 # peers back into self.contacted_peers for the next pass.
422 self.contacted_peers.extend(self.contacted_peers2)
423 self.contacted_peers2[:] = []
426 # no more peers. If we haven't placed enough shares, we fail.
427 merged = merge_peers(self.preexisting_shares, self.use_peers)
428 effective_happiness = servers_of_happiness(merged)
429 if effective_happiness < self.servers_of_happiness:
430 msg = failure_message(len(self.peers_with_shares),
432 self.servers_of_happiness,
434 msg = ("peer selection failed for %s: %s (%s)" % (self,
436 self._get_progress_message()))
437 if self.last_failure_msg:
438 msg += " (%s)" % (self.last_failure_msg,)
439 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
440 return self._failed(msg)
442 # we placed enough to be happy, so we're done
444 self._status.set_status("Placed all shares")
445 return (self.use_peers, self.preexisting_shares)
447 def _got_response(self, res, peer, shares_to_ask, put_peer_here):
448 if isinstance(res, failure.Failure):
449 # This is unusual, and probably indicates a bug or a network
451 log.msg("%s got error during peer selection: %s" % (peer, res),
452 level=log.UNUSUAL, parent=self._log_parent)
453 self.error_count += 1
454 self.bad_query_count += 1
455 self.homeless_shares = list(shares_to_ask) + self.homeless_shares
456 if (self.uncontacted_peers
457 or self.contacted_peers
458 or self.contacted_peers2):
459 # there is still hope, so just loop
462 # No more peers, so this upload might fail (it depends upon
463 # whether we've hit servers_of_happiness or not). Log the last
464 # failure we got: if a coding error causes all peers to fail
465 # in the same way, this allows the common failure to be seen
466 # by the uploader and should help with debugging
467 msg = ("last failure (from %s) was: %s" % (peer, res))
468 self.last_failure_msg = msg
470 (alreadygot, allocated) = res
471 log.msg("response from peer %s: alreadygot=%s, allocated=%s"
472 % (idlib.shortnodeid_b2a(peer.peerid),
473 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
474 level=log.NOISY, parent=self._log_parent)
477 self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
478 if s in self.homeless_shares:
479 self.homeless_shares.remove(s)
481 elif s in shares_to_ask:
484 # the PeerTracker will remember which shares were allocated on
485 # that peer. We just have to remember to use them.
487 self.use_peers.add(peer)
490 if allocated or alreadygot:
491 self.peers_with_shares.add(peer.peerid)
493 not_yet_present = set(shares_to_ask) - set(alreadygot)
494 still_homeless = not_yet_present - set(allocated)
497 # They accepted at least one of the shares that we asked
498 # them to accept, or they had a share that we didn't ask
499 # them to accept but that we hadn't placed yet, so this
500 # was a productive query
501 self.good_query_count += 1
503 self.bad_query_count += 1
507 # In networks with lots of space, this is very unusual and
508 # probably indicates an error. In networks with peers that
509 # are full, it is merely unusual. In networks that are very
510 # full, it is common, and many uploads will fail. In most
511 # cases, this is obviously not fatal, and we'll just use some
514 # some shares are still homeless, keep trying to find them a
515 # home. The ones that were rejected get first priority.
516 self.homeless_shares = (list(still_homeless)
517 + self.homeless_shares)
518 # Since they were unable to accept all of our requests, so it
519 # is safe to assume that asking them again won't help.
521 # if they *were* able to accept everything, they might be
522 # willing to accept even more.
523 put_peer_here.append(peer)
529 def _failed(self, msg):
531 I am called when peer selection fails. I first abort all of the
532 remote buckets that I allocated during my unsuccessful attempt to
533 place shares for this file. I then raise an
534 UploadUnhappinessError with my msg argument.
536 for peer in self.use_peers:
537 assert isinstance(peer, PeerTracker)
541 raise UploadUnhappinessError(msg)
544 class EncryptAnUploadable:
545 """This is a wrapper that takes an IUploadable and provides
546 IEncryptedUploadable."""
547 implements(IEncryptedUploadable)
550 def __init__(self, original, log_parent=None):
551 self.original = IUploadable(original)
552 self._log_number = log_parent
553 self._encryptor = None
554 self._plaintext_hasher = plaintext_hasher()
555 self._plaintext_segment_hasher = None
556 self._plaintext_segment_hashes = []
557 self._encoding_parameters = None
558 self._file_size = None
559 self._ciphertext_bytes_read = 0
562 def set_upload_status(self, upload_status):
563 self._status = IUploadStatus(upload_status)
564 self.original.set_upload_status(upload_status)
566 def log(self, *args, **kwargs):
567 if "facility" not in kwargs:
568 kwargs["facility"] = "upload.encryption"
569 if "parent" not in kwargs:
570 kwargs["parent"] = self._log_number
571 return log.msg(*args, **kwargs)
574 if self._file_size is not None:
575 return defer.succeed(self._file_size)
576 d = self.original.get_size()
578 self._file_size = size
580 self._status.set_size(size)
582 d.addCallback(_got_size)
585 def get_all_encoding_parameters(self):
586 if self._encoding_parameters is not None:
587 return defer.succeed(self._encoding_parameters)
588 d = self.original.get_all_encoding_parameters()
589 def _got(encoding_parameters):
590 (k, happy, n, segsize) = encoding_parameters
591 self._segment_size = segsize # used by segment hashers
592 self._encoding_parameters = encoding_parameters
593 self.log("my encoding parameters: %s" % (encoding_parameters,),
595 return encoding_parameters
599 def _get_encryptor(self):
601 return defer.succeed(self._encryptor)
603 d = self.original.get_encryption_key()
608 storage_index = storage_index_hash(key)
609 assert isinstance(storage_index, str)
610 # There's no point to having the SI be longer than the key, so we
611 # specify that it is truncated to the same 128 bits as the AES key.
612 assert len(storage_index) == 16 # SHA-256 truncated to 128b
613 self._storage_index = storage_index
615 self._status.set_storage_index(storage_index)
620 def get_storage_index(self):
621 d = self._get_encryptor()
622 d.addCallback(lambda res: self._storage_index)
625 def _get_segment_hasher(self):
626 p = self._plaintext_segment_hasher
628 left = self._segment_size - self._plaintext_segment_hashed_bytes
630 p = plaintext_segment_hasher()
631 self._plaintext_segment_hasher = p
632 self._plaintext_segment_hashed_bytes = 0
633 return p, self._segment_size
635 def _update_segment_hash(self, chunk):
637 while offset < len(chunk):
638 p, segment_left = self._get_segment_hasher()
639 chunk_left = len(chunk) - offset
640 this_segment = min(chunk_left, segment_left)
641 p.update(chunk[offset:offset+this_segment])
642 self._plaintext_segment_hashed_bytes += this_segment
644 if self._plaintext_segment_hashed_bytes == self._segment_size:
645 # we've filled this segment
646 self._plaintext_segment_hashes.append(p.digest())
647 self._plaintext_segment_hasher = None
648 self.log("closed hash [%d]: %dB" %
649 (len(self._plaintext_segment_hashes)-1,
650 self._plaintext_segment_hashed_bytes),
652 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
653 segnum=len(self._plaintext_segment_hashes)-1,
654 hash=base32.b2a(p.digest()),
657 offset += this_segment
660 def read_encrypted(self, length, hash_only):
661 # make sure our parameters have been set up first
662 d = self.get_all_encoding_parameters()
664 d.addCallback(lambda ignored: self.get_size())
665 d.addCallback(lambda ignored: self._get_encryptor())
666 # then fetch and encrypt the plaintext. The unusual structure here
667 # (passing a Deferred *into* a function) is needed to avoid
668 # overflowing the stack: Deferreds don't optimize out tail recursion.
669 # We also pass in a list, to which _read_encrypted will append
672 d2 = defer.Deferred()
673 d.addCallback(lambda ignored:
674 self._read_encrypted(length, ciphertext, hash_only, d2))
675 d.addCallback(lambda ignored: d2)
678 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
680 fire_when_done.callback(ciphertext)
682 # tolerate large length= values without consuming a lot of RAM by
683 # reading just a chunk (say 50kB) at a time. This only really matters
684 # when hash_only==True (i.e. resuming an interrupted upload), since
685 # that's the case where we will be skipping over a lot of data.
686 size = min(remaining, self.CHUNKSIZE)
687 remaining = remaining - size
688 # read a chunk of plaintext..
689 d = defer.maybeDeferred(self.original.read, size)
690 # N.B.: if read() is synchronous, then since everything else is
691 # actually synchronous too, we'd blow the stack unless we stall for a
692 # tick. Once you accept a Deferred from IUploadable.read(), you must
693 # be prepared to have it fire immediately too.
694 d.addCallback(fireEventually)
695 def _good(plaintext):
697 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
698 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
699 ciphertext.extend(ct)
700 self._read_encrypted(remaining, ciphertext, hash_only,
703 fire_when_done.errback(why)
708 def _hash_and_encrypt_plaintext(self, data, hash_only):
709 assert isinstance(data, (tuple, list)), type(data)
712 # we use data.pop(0) instead of 'for chunk in data' to save
713 # memory: each chunk is destroyed as soon as we're done with it.
717 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
719 bytes_processed += len(chunk)
720 self._plaintext_hasher.update(chunk)
721 self._update_segment_hash(chunk)
722 # TODO: we have to encrypt the data (even if hash_only==True)
723 # because pycryptopp's AES-CTR implementation doesn't offer a
724 # way to change the counter value. Once pycryptopp acquires
725 # this ability, change this to simply update the counter
726 # before each call to (hash_only==False) _encryptor.process()
727 ciphertext = self._encryptor.process(chunk)
729 self.log(" skipping encryption", level=log.NOISY)
731 cryptdata.append(ciphertext)
734 self._ciphertext_bytes_read += bytes_processed
736 progress = float(self._ciphertext_bytes_read) / self._file_size
737 self._status.set_progress(1, progress)
741 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
742 # this is currently unused, but will live again when we fix #453
743 if len(self._plaintext_segment_hashes) < num_segments:
744 # close out the last one
745 assert len(self._plaintext_segment_hashes) == num_segments-1
746 p, segment_left = self._get_segment_hasher()
747 self._plaintext_segment_hashes.append(p.digest())
748 del self._plaintext_segment_hasher
749 self.log("closing plaintext leaf hasher, hashed %d bytes" %
750 self._plaintext_segment_hashed_bytes,
752 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
753 segnum=len(self._plaintext_segment_hashes)-1,
754 hash=base32.b2a(p.digest()),
756 assert len(self._plaintext_segment_hashes) == num_segments
757 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
759 def get_plaintext_hash(self):
760 h = self._plaintext_hasher.digest()
761 return defer.succeed(h)
764 return self.original.close()
767 implements(IUploadStatus)
768 statusid_counter = itertools.count(0)
771 self.storage_index = None
774 self.status = "Not started"
775 self.progress = [0.0, 0.0, 0.0]
778 self.counter = self.statusid_counter.next()
779 self.started = time.time()
781 def get_started(self):
783 def get_storage_index(self):
784 return self.storage_index
787 def using_helper(self):
789 def get_status(self):
791 def get_progress(self):
792 return tuple(self.progress)
793 def get_active(self):
795 def get_results(self):
797 def get_counter(self):
800 def set_storage_index(self, si):
801 self.storage_index = si
802 def set_size(self, size):
804 def set_helper(self, helper):
806 def set_status(self, status):
808 def set_progress(self, which, value):
809 # [0]: chk, [1]: ciphertext, [2]: encode+push
810 self.progress[which] = value
811 def set_active(self, value):
813 def set_results(self, value):
817 peer_selector_class = Tahoe2PeerSelector
819 def __init__(self, storage_broker, secret_holder):
820 # peer_selector needs storage_broker and secret_holder
821 self._storage_broker = storage_broker
822 self._secret_holder = secret_holder
823 self._log_number = self.log("CHKUploader starting", parent=None)
825 self._results = UploadResults()
826 self._storage_index = None
827 self._upload_status = UploadStatus()
828 self._upload_status.set_helper(False)
829 self._upload_status.set_active(True)
830 self._upload_status.set_results(self._results)
832 # locate_all_shareholders() will create the following attribute:
833 # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
835 def log(self, *args, **kwargs):
836 if "parent" not in kwargs:
837 kwargs["parent"] = self._log_number
838 if "facility" not in kwargs:
839 kwargs["facility"] = "tahoe.upload"
840 return log.msg(*args, **kwargs)
842 def start(self, encrypted_uploadable):
843 """Start uploading the file.
845 Returns a Deferred that will fire with the UploadResults instance.
848 self._started = time.time()
849 eu = IEncryptedUploadable(encrypted_uploadable)
850 self.log("starting upload of %s" % eu)
852 eu.set_upload_status(self._upload_status)
853 d = self.start_encrypted(eu)
854 def _done(uploadresults):
855 self._upload_status.set_active(False)
861 """Call this if the upload must be abandoned before it completes.
862 This will tell the shareholders to delete their partial shares. I
863 return a Deferred that fires when these messages have been acked."""
864 if not self._encoder:
865 # how did you call abort() before calling start() ?
866 return defer.succeed(None)
867 return self._encoder.abort()
869 def start_encrypted(self, encrypted):
870 """ Returns a Deferred that will fire with the UploadResults instance. """
871 eu = IEncryptedUploadable(encrypted)
873 started = time.time()
874 self._encoder = e = encode.Encoder(self._log_number,
876 d = e.set_encrypted_uploadable(eu)
877 d.addCallback(self.locate_all_shareholders, started)
878 d.addCallback(self.set_shareholders, e)
879 d.addCallback(lambda res: e.start())
880 d.addCallback(self._encrypted_done)
883 def locate_all_shareholders(self, encoder, started):
884 peer_selection_started = now = time.time()
885 self._storage_index_elapsed = now - started
886 storage_broker = self._storage_broker
887 secret_holder = self._secret_holder
888 storage_index = encoder.get_param("storage_index")
889 self._storage_index = storage_index
890 upload_id = si_b2a(storage_index)[:5]
891 self.log("using storage index %s" % upload_id)
892 peer_selector = self.peer_selector_class(upload_id, self._log_number,
895 share_size = encoder.get_param("share_size")
896 block_size = encoder.get_param("block_size")
897 num_segments = encoder.get_param("num_segments")
898 k,desired,n = encoder.get_param("share_counts")
900 self._peer_selection_started = time.time()
901 d = peer_selector.get_shareholders(storage_broker, secret_holder,
903 share_size, block_size,
904 num_segments, n, k, desired)
906 self._peer_selection_elapsed = time.time() - peer_selection_started
911 def set_shareholders(self, (upload_servers, already_peers), encoder):
913 @param upload_servers: a sequence of PeerTracker objects that have agreed to hold some shares for us (the shareids are stashed inside the PeerTracker)
914 @paran already_peers: a dict mapping sharenum to a set of peerids
915 that claim to already have this share
917 self.log("_send_shares, upload_servers is %s" % (upload_servers,))
918 # record already-present shares in self._results
919 self._results.preexisting_shares = len(already_peers)
921 self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
922 for peer in upload_servers:
923 assert isinstance(peer, PeerTracker)
925 servermap = already_peers.copy()
926 for peer in upload_servers:
927 buckets.update(peer.buckets)
928 for shnum in peer.buckets:
929 self._peer_trackers[shnum] = peer
930 servermap.setdefault(shnum, set()).add(peer.peerid)
931 assert len(buckets) == sum([len(peer.buckets) for peer in upload_servers]), "%s (%s) != %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers])
932 encoder.set_shareholders(buckets, servermap)
934 def _encrypted_done(self, verifycap):
935 """ Returns a Deferred that will fire with the UploadResults instance. """
937 for shnum in self._encoder.get_shares_placed():
938 peer_tracker = self._peer_trackers[shnum]
939 peerid = peer_tracker.peerid
940 r.sharemap.add(shnum, peerid)
941 r.servermap.add(peerid, shnum)
942 r.pushed_shares = len(self._encoder.get_shares_placed())
944 r.file_size = self._encoder.file_size
945 r.timings["total"] = now - self._started
946 r.timings["storage_index"] = self._storage_index_elapsed
947 r.timings["peer_selection"] = self._peer_selection_elapsed
948 r.timings.update(self._encoder.get_times())
949 r.uri_extension_data = self._encoder.get_uri_extension_data()
950 r.verifycapstr = verifycap.to_string()
953 def get_upload_status(self):
954 return self._upload_status
956 def read_this_many_bytes(uploadable, size, prepend_data=[]):
958 return defer.succeed([])
959 d = uploadable.read(size)
961 assert isinstance(data, list)
962 bytes = sum([len(piece) for piece in data])
965 remaining = size - bytes
967 return read_this_many_bytes(uploadable, remaining,
969 return prepend_data + data
973 class LiteralUploader:
976 self._results = UploadResults()
977 self._status = s = UploadStatus()
978 s.set_storage_index(None)
980 s.set_progress(0, 1.0)
982 s.set_results(self._results)
984 def start(self, uploadable):
985 uploadable = IUploadable(uploadable)
986 d = uploadable.get_size()
989 self._status.set_size(size)
990 self._results.file_size = size
991 return read_this_many_bytes(uploadable, size)
992 d.addCallback(_got_size)
993 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
994 d.addCallback(lambda u: u.to_string())
995 d.addCallback(self._build_results)
998 def _build_results(self, uri):
999 self._results.uri = uri
1000 self._status.set_status("Finished")
1001 self._status.set_progress(1, 1.0)
1002 self._status.set_progress(2, 1.0)
1003 return self._results
1008 def get_upload_status(self):
1011 class RemoteEncryptedUploadable(Referenceable):
1012 implements(RIEncryptedUploadable)
1014 def __init__(self, encrypted_uploadable, upload_status):
1015 self._eu = IEncryptedUploadable(encrypted_uploadable)
1017 self._bytes_sent = 0
1018 self._status = IUploadStatus(upload_status)
1019 # we are responsible for updating the status string while we run, and
1020 # for setting the ciphertext-fetch progress.
1024 if self._size is not None:
1025 return defer.succeed(self._size)
1026 d = self._eu.get_size()
1027 def _got_size(size):
1030 d.addCallback(_got_size)
1033 def remote_get_size(self):
1034 return self.get_size()
1035 def remote_get_all_encoding_parameters(self):
1036 return self._eu.get_all_encoding_parameters()
1038 def _read_encrypted(self, length, hash_only):
1039 d = self._eu.read_encrypted(length, hash_only)
1042 self._offset += length
1044 size = sum([len(data) for data in strings])
1045 self._offset += size
1047 d.addCallback(_read)
1050 def remote_read_encrypted(self, offset, length):
1051 # we don't support seek backwards, but we allow skipping forwards
1052 precondition(offset >= 0, offset)
1053 precondition(length >= 0, length)
1054 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1056 precondition(offset >= self._offset, offset, self._offset)
1057 if offset > self._offset:
1058 # read the data from disk anyways, to build up the hash tree
1059 skip = offset - self._offset
1060 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1061 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1062 d = self._read_encrypted(skip, hash_only=True)
1064 d = defer.succeed(None)
1066 def _at_correct_offset(res):
1067 assert offset == self._offset, "%d != %d" % (offset, self._offset)
1068 return self._read_encrypted(length, hash_only=False)
1069 d.addCallback(_at_correct_offset)
1072 size = sum([len(data) for data in strings])
1073 self._bytes_sent += size
1075 d.addCallback(_read)
1078 def remote_close(self):
1079 return self._eu.close()
1082 class AssistedUploader:
1084 def __init__(self, helper):
1085 self._helper = helper
1086 self._log_number = log.msg("AssistedUploader starting")
1087 self._storage_index = None
1088 self._upload_status = s = UploadStatus()
1092 def log(self, *args, **kwargs):
1093 if "parent" not in kwargs:
1094 kwargs["parent"] = self._log_number
1095 return log.msg(*args, **kwargs)
1097 def start(self, encrypted_uploadable, storage_index):
1098 """Start uploading the file.
1100 Returns a Deferred that will fire with the UploadResults instance.
1102 precondition(isinstance(storage_index, str), storage_index)
1103 self._started = time.time()
1104 eu = IEncryptedUploadable(encrypted_uploadable)
1105 eu.set_upload_status(self._upload_status)
1106 self._encuploadable = eu
1107 self._storage_index = storage_index
1109 d.addCallback(self._got_size)
1110 d.addCallback(lambda res: eu.get_all_encoding_parameters())
1111 d.addCallback(self._got_all_encoding_parameters)
1112 d.addCallback(self._contact_helper)
1113 d.addCallback(self._build_verifycap)
1115 self._upload_status.set_active(False)
1120 def _got_size(self, size):
1122 self._upload_status.set_size(size)
1124 def _got_all_encoding_parameters(self, params):
1125 k, happy, n, segment_size = params
1126 # stash these for URI generation later
1127 self._needed_shares = k
1128 self._total_shares = n
1129 self._segment_size = segment_size
1131 def _contact_helper(self, res):
1132 now = self._time_contacting_helper_start = time.time()
1133 self._storage_index_elapsed = now - self._started
1134 self.log(format="contacting helper for SI %(si)s..",
1135 si=si_b2a(self._storage_index))
1136 self._upload_status.set_status("Contacting Helper")
1137 d = self._helper.callRemote("upload_chk", self._storage_index)
1138 d.addCallback(self._contacted_helper)
1141 def _contacted_helper(self, (upload_results, upload_helper)):
1143 elapsed = now - self._time_contacting_helper_start
1144 self._elapsed_time_contacting_helper = elapsed
1146 self.log("helper says we need to upload")
1147 self._upload_status.set_status("Uploading Ciphertext")
1148 # we need to upload the file
1149 reu = RemoteEncryptedUploadable(self._encuploadable,
1150 self._upload_status)
1151 # let it pre-compute the size for progress purposes
1153 d.addCallback(lambda ignored:
1154 upload_helper.callRemote("upload", reu))
1155 # this Deferred will fire with the upload results
1157 self.log("helper says file is already uploaded")
1158 self._upload_status.set_progress(1, 1.0)
1159 self._upload_status.set_results(upload_results)
1160 return upload_results
1162 def _convert_old_upload_results(self, upload_results):
1163 # pre-1.3.0 helpers return upload results which contain a mapping
1164 # from shnum to a single human-readable string, containing things
1165 # like "Found on [x],[y],[z]" (for healthy files that were already in
1166 # the grid), "Found on [x]" (for files that needed upload but which
1167 # discovered pre-existing shares), and "Placed on [x]" (for newly
1168 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1169 # set of binary serverid strings.
1171 # the old results are too hard to deal with (they don't even contain
1172 # as much information as the new results, since the nodeids are
1173 # abbreviated), so if we detect old results, just clobber them.
1175 sharemap = upload_results.sharemap
1176 if str in [type(v) for v in sharemap.values()]:
1177 upload_results.sharemap = None
1179 def _build_verifycap(self, upload_results):
1180 self.log("upload finished, building readcap")
1181 self._convert_old_upload_results(upload_results)
1182 self._upload_status.set_status("Building Readcap")
1184 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1185 assert r.uri_extension_data["total_shares"] == self._total_shares
1186 assert r.uri_extension_data["segment_size"] == self._segment_size
1187 assert r.uri_extension_data["size"] == self._size
1188 r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1189 uri_extension_hash=r.uri_extension_hash,
1190 needed_shares=self._needed_shares,
1191 total_shares=self._total_shares, size=self._size
1194 r.file_size = self._size
1195 r.timings["storage_index"] = self._storage_index_elapsed
1196 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1197 if "total" in r.timings:
1198 r.timings["helper_total"] = r.timings["total"]
1199 r.timings["total"] = now - self._started
1200 self._upload_status.set_status("Finished")
1201 self._upload_status.set_results(r)
1204 def get_upload_status(self):
1205 return self._upload_status
1207 class BaseUploadable:
1208 default_max_segment_size = 128*KiB # overridden by max_segment_size
1209 default_encoding_param_k = 3 # overridden by encoding_parameters
1210 default_encoding_param_happy = 7
1211 default_encoding_param_n = 10
1213 max_segment_size = None
1214 encoding_param_k = None
1215 encoding_param_happy = None
1216 encoding_param_n = None
1218 _all_encoding_parameters = None
1221 def set_upload_status(self, upload_status):
1222 self._status = IUploadStatus(upload_status)
1224 def set_default_encoding_parameters(self, default_params):
1225 assert isinstance(default_params, dict)
1226 for k,v in default_params.items():
1227 precondition(isinstance(k, str), k, v)
1228 precondition(isinstance(v, int), k, v)
1229 if "k" in default_params:
1230 self.default_encoding_param_k = default_params["k"]
1231 if "happy" in default_params:
1232 self.default_encoding_param_happy = default_params["happy"]
1233 if "n" in default_params:
1234 self.default_encoding_param_n = default_params["n"]
1235 if "max_segment_size" in default_params:
1236 self.default_max_segment_size = default_params["max_segment_size"]
1238 def get_all_encoding_parameters(self):
1239 if self._all_encoding_parameters:
1240 return defer.succeed(self._all_encoding_parameters)
1242 max_segsize = self.max_segment_size or self.default_max_segment_size
1243 k = self.encoding_param_k or self.default_encoding_param_k
1244 happy = self.encoding_param_happy or self.default_encoding_param_happy
1245 n = self.encoding_param_n or self.default_encoding_param_n
1248 def _got_size(file_size):
1249 # for small files, shrink the segment size to avoid wasting space
1250 segsize = min(max_segsize, file_size)
1251 # this must be a multiple of 'required_shares'==k
1252 segsize = mathutil.next_multiple(segsize, k)
1253 encoding_parameters = (k, happy, n, segsize)
1254 self._all_encoding_parameters = encoding_parameters
1255 return encoding_parameters
1256 d.addCallback(_got_size)
1259 class FileHandle(BaseUploadable):
1260 implements(IUploadable)
1262 def __init__(self, filehandle, convergence):
1264 Upload the data from the filehandle. If convergence is None then a
1265 random encryption key will be used, else the plaintext will be hashed,
1266 then the hash will be hashed together with the string in the
1267 "convergence" argument to form the encryption key.
1269 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1270 self._filehandle = filehandle
1272 self.convergence = convergence
1275 def _get_encryption_key_convergent(self):
1276 if self._key is not None:
1277 return defer.succeed(self._key)
1280 # that sets self._size as a side-effect
1281 d.addCallback(lambda size: self.get_all_encoding_parameters())
1283 k, happy, n, segsize = params
1284 f = self._filehandle
1285 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1290 data = f.read(BLOCKSIZE)
1293 enckey_hasher.update(data)
1294 # TODO: setting progress in a non-yielding loop is kind of
1295 # pointless, but I'm anticipating (perhaps prematurely) the
1296 # day when we use a slowjob or twisted's CooperatorService to
1297 # make this yield time to other jobs.
1298 bytes_read += len(data)
1300 self._status.set_progress(0, float(bytes_read)/self._size)
1302 self._key = enckey_hasher.digest()
1304 self._status.set_progress(0, 1.0)
1305 assert len(self._key) == 16
1310 def _get_encryption_key_random(self):
1311 if self._key is None:
1312 self._key = os.urandom(16)
1313 return defer.succeed(self._key)
1315 def get_encryption_key(self):
1316 if self.convergence is not None:
1317 return self._get_encryption_key_convergent()
1319 return self._get_encryption_key_random()
1322 if self._size is not None:
1323 return defer.succeed(self._size)
1324 self._filehandle.seek(0,2)
1325 size = self._filehandle.tell()
1327 self._filehandle.seek(0)
1328 return defer.succeed(size)
1330 def read(self, length):
1331 return defer.succeed([self._filehandle.read(length)])
1334 # the originator of the filehandle reserves the right to close it
1337 class FileName(FileHandle):
1338 def __init__(self, filename, convergence):
1340 Upload the data from the filename. If convergence is None then a
1341 random encryption key will be used, else the plaintext will be hashed,
1342 then the hash will be hashed together with the string in the
1343 "convergence" argument to form the encryption key.
1345 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1346 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1348 FileHandle.close(self)
1349 self._filehandle.close()
1351 class Data(FileHandle):
1352 def __init__(self, data, convergence):
1354 Upload the data from the data argument. If convergence is None then a
1355 random encryption key will be used, else the plaintext will be hashed,
1356 then the hash will be hashed together with the string in the
1357 "convergence" argument to form the encryption key.
1359 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1360 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1362 class Uploader(service.MultiService, log.PrefixingLogMixin):
1363 """I am a service that allows file uploading. I am a service-child of the
1366 implements(IUploader)
1368 URI_LIT_SIZE_THRESHOLD = 55
1370 def __init__(self, helper_furl=None, stats_provider=None):
1371 self._helper_furl = helper_furl
1372 self.stats_provider = stats_provider
1374 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1375 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1376 service.MultiService.__init__(self)
1378 def startService(self):
1379 service.MultiService.startService(self)
1380 if self._helper_furl:
1381 self.parent.tub.connectTo(self._helper_furl,
1384 def _got_helper(self, helper):
1385 self.log("got helper connection, getting versions")
1386 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1388 "application-version": "unknown: no get_version()",
1390 d = add_version_to_remote_reference(helper, default)
1391 d.addCallback(self._got_versioned_helper)
1393 def _got_versioned_helper(self, helper):
1394 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1395 if needed not in helper.version:
1396 raise InsufficientVersionError(needed, helper.version)
1397 self._helper = helper
1398 helper.notifyOnDisconnect(self._lost_helper)
1400 def _lost_helper(self):
1403 def get_helper_info(self):
1404 # return a tuple of (helper_furl_or_None, connected_bool)
1405 return (self._helper_furl, bool(self._helper))
1408 def upload(self, uploadable, history=None):
1410 Returns a Deferred that will fire with the UploadResults instance.
1415 uploadable = IUploadable(uploadable)
1416 d = uploadable.get_size()
1417 def _got_size(size):
1418 default_params = self.parent.get_encoding_parameters()
1419 precondition(isinstance(default_params, dict), default_params)
1420 precondition("max_segment_size" in default_params, default_params)
1421 uploadable.set_default_encoding_parameters(default_params)
1423 if self.stats_provider:
1424 self.stats_provider.count('uploader.files_uploaded', 1)
1425 self.stats_provider.count('uploader.bytes_uploaded', size)
1427 if size <= self.URI_LIT_SIZE_THRESHOLD:
1428 uploader = LiteralUploader()
1429 return uploader.start(uploadable)
1431 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1432 d2 = defer.succeed(None)
1434 uploader = AssistedUploader(self._helper)
1435 d2.addCallback(lambda x: eu.get_storage_index())
1436 d2.addCallback(lambda si: uploader.start(eu, si))
1438 storage_broker = self.parent.get_storage_broker()
1439 secret_holder = self.parent._secret_holder
1440 uploader = CHKUploader(storage_broker, secret_holder)
1441 d2.addCallback(lambda x: uploader.start(eu))
1443 self._all_uploads[uploader] = None
1445 history.add_upload(uploader.get_upload_status())
1446 def turn_verifycap_into_read_cap(uploadresults):
1447 # Generate the uri from the verifycap plus the key.
1448 d3 = uploadable.get_encryption_key()
1449 def put_readcap_into_results(key):
1450 v = uri.from_string(uploadresults.verifycapstr)
1451 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1452 uploadresults.uri = r.to_string()
1453 return uploadresults
1454 d3.addCallback(put_readcap_into_results)
1456 d2.addCallback(turn_verifycap_into_read_cap)
1458 d.addCallback(_got_size)