1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9 file_cancel_secret_hash, bucket_renewal_secret_hash, \
10 bucket_cancel_secret_hash, plaintext_hasher, \
11 storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.rrefutil import add_version_to_remote_reference
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
20 NotEnoughSharesError, NoSharesError, NoServersError, \
21 InsufficientVersionError
22 from allmydata.immutable import layout
23 from pycryptopp.cipher.aes import AES
25 from cStringIO import StringIO
34 class HaveAllPeersError(Exception):
35 # we use this to jump out of the loop
38 # this wants to live in storage, not here
39 class TooFullError(Exception):
42 class UploadResults(Copyable, RemoteCopy):
43 implements(IUploadResults)
44 # note: don't change this string, it needs to match the value used on the
45 # helper, and it does *not* need to match the fully-qualified
46 # package/module/class name
47 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
50 # also, think twice about changing the shape of any existing attribute,
51 # because instances of this class are sent from the helper to its client,
52 # so changing this may break compatibility. Consider adding new fields
53 # instead of modifying existing ones.
56 self.timings = {} # dict of name to number of seconds
57 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
58 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
60 self.ciphertext_fetched = None # how much the helper fetched
62 self.preexisting_shares = None # count of shares already present
63 self.pushed_shares = None # count of shares we pushed
66 # our current uri_extension is 846 bytes for small files, a few bytes
67 # more for larger ones (since the filesize is encoded in decimal in a
68 # few places). Ask for a little bit more just in case we need it. If
69 # the extension changes size, we can change EXTENSION_SIZE to
70 # allocate a more accurate amount of space.
72 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
76 def __init__(self, peerid, storage_server,
77 sharesize, blocksize, num_segments, num_share_hashes,
79 bucket_renewal_secret, bucket_cancel_secret):
80 precondition(isinstance(peerid, str), peerid)
81 precondition(len(peerid) == 20, peerid)
83 self._storageserver = storage_server # to an RIStorageServer
84 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
85 self.sharesize = sharesize
87 wbp = layout.make_write_bucket_proxy(None, sharesize,
88 blocksize, num_segments,
90 EXTENSION_SIZE, peerid)
91 self.wbp_class = wbp.__class__ # to create more of them
92 self.allocated_size = wbp.get_allocated_size()
93 self.blocksize = blocksize
94 self.num_segments = num_segments
95 self.num_share_hashes = num_share_hashes
96 self.storage_index = storage_index
98 self.renew_secret = bucket_renewal_secret
99 self.cancel_secret = bucket_cancel_secret
102 return ("<PeerTracker for peer %s and SI %s>"
103 % (idlib.shortnodeid_b2a(self.peerid),
104 si_b2a(self.storage_index)[:5]))
106 def query(self, sharenums):
107 d = self._storageserver.callRemote("allocate_buckets",
113 canary=Referenceable())
114 d.addCallback(self._got_reply)
117 def query_allocated(self):
118 d = self._storageserver.callRemote("get_buckets",
120 d.addCallback(self._got_allocate_reply)
123 def _got_allocate_reply(self, buckets):
124 return (self.peerid, buckets)
126 def _got_reply(self, (alreadygot, buckets)):
127 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
129 for sharenum, rref in buckets.iteritems():
130 bp = self.wbp_class(rref, self.sharesize,
133 self.num_share_hashes,
137 self.buckets.update(b)
138 return (alreadygot, set(b.keys()))
140 def servers_with_unique_shares(existing_shares, used_peers=None):
143 peers = list(used_peers.copy())
144 # We do this because the preexisting shares list goes by peerid.
145 peers = [x.peerid for x in peers]
146 servers.extend(peers)
147 servers.extend(existing_shares.values())
148 return list(set(servers))
150 def shares_by_server(existing_shares):
152 for server in set(existing_shares.values()):
153 servers[server] = set([x for x in existing_shares.keys()
154 if existing_shares[x] == server])
157 class Tahoe2PeerSelector:
159 def __init__(self, upload_id, logparent=None, upload_status=None):
160 self.upload_id = upload_id
161 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
163 self.num_peers_contacted = 0
164 self.last_failure_msg = None
165 self._status = IUploadStatus(upload_status)
166 self._log_parent = log.msg("%s starting" % self, parent=logparent)
169 return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
171 def get_shareholders(self, storage_broker, secret_holder,
172 storage_index, share_size, block_size,
173 num_segments, total_shares, servers_of_happiness):
175 @return: (used_peers, already_peers), where used_peers is a set of
176 PeerTracker instances that have agreed to hold some shares
177 for us (the shnum is stashed inside the PeerTracker),
178 and already_peers is a dict mapping shnum to a peer
179 which claims to already have the share.
183 self._status.set_status("Contacting Peers..")
185 self.total_shares = total_shares
186 self.servers_of_happiness = servers_of_happiness
188 self.homeless_shares = range(total_shares)
189 # self.uncontacted_peers = list() # peers we haven't asked yet
190 self.contacted_peers = [] # peers worth asking again
191 self.contacted_peers2 = [] # peers that we have asked again
192 self._started_second_pass = False
193 self.use_peers = set() # PeerTrackers that have shares assigned to them
194 self.preexisting_shares = {} # sharenum -> peerid holding the share
195 # We don't try to allocate shares to these servers, since they've
196 # said that they're incapable of storing shares of the size that
197 # we'd want to store. We keep them around because they may have
198 # existing shares for this storage index, which we want to know
199 # about for accurate servers_of_happiness accounting
200 self.readonly_peers = []
202 peers = storage_broker.get_servers_for_index(storage_index)
204 raise NoServersError("client gave us zero peers")
206 # this needed_hashes computation should mirror
207 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
208 # (instead of a HashTree) because we don't require actual hashing
209 # just to count the levels.
210 ht = hashtree.IncompleteHashTree(total_shares)
211 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
213 # figure out how much space to ask for
214 wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
215 num_share_hashes, EXTENSION_SIZE,
217 allocated_size = wbp.get_allocated_size()
219 # filter the list of peers according to which ones can accomodate
220 # this request. This excludes older peers (which used a 4-byte size
221 # field) from getting large shares (for files larger than about
222 # 12GiB). See #439 for details.
223 def _get_maxsize(peer):
224 (peerid, conn) = peer
225 v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
226 return v1["maximum-immutable-share-size"]
227 new_peers = [peer for peer in peers
228 if _get_maxsize(peer) >= allocated_size]
229 old_peers = list(set(peers).difference(set(new_peers)))
232 # decide upon the renewal/cancel secrets, to include them in the
233 # allocate_buckets query.
234 client_renewal_secret = secret_holder.get_renewal_secret()
235 client_cancel_secret = secret_holder.get_cancel_secret()
237 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
239 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
241 def _make_trackers(peers):
242 return [ PeerTracker(peerid, conn,
243 share_size, block_size,
244 num_segments, num_share_hashes,
246 bucket_renewal_secret_hash(file_renewal_secret,
248 bucket_cancel_secret_hash(file_cancel_secret,
250 for (peerid, conn) in peers]
251 self.uncontacted_peers = _make_trackers(peers)
252 self.readonly_peers = _make_trackers(old_peers)
253 # Talk to the readonly servers to get an idea of what servers
254 # have what shares (if any) for this storage index
255 d = defer.maybeDeferred(self._existing_shares)
256 d.addCallback(lambda ign: self._loop())
259 def _existing_shares(self):
260 if self.readonly_peers:
261 peer = self.readonly_peers.pop()
262 assert isinstance(peer, PeerTracker)
263 d = peer.query_allocated()
264 d.addCallback(self._handle_allocate_response)
267 def _handle_allocate_response(self, (peer, buckets)):
268 for bucket in buckets:
269 self.preexisting_shares[bucket] = peer
270 if self.homeless_shares:
271 self.homeless_shares.remove(bucket)
272 return self._existing_shares()
275 if not self.homeless_shares:
276 effective_happiness = servers_with_unique_shares(
277 self.preexisting_shares,
279 if self.servers_of_happiness <= len(effective_happiness):
280 msg = ("placed all %d shares, "
281 "sent %d queries to %d peers, "
282 "%d queries placed some shares, %d placed none, "
285 self.query_count, self.num_peers_contacted,
286 self.good_query_count, self.bad_query_count,
288 log.msg("peer selection successful for %s: %s" % (self, msg),
289 parent=self._log_parent)
290 return (self.use_peers, self.preexisting_shares)
292 delta = self.servers_of_happiness - len(effective_happiness)
293 shares = shares_by_server(self.preexisting_shares)
294 # Each server in shares maps to a set of shares stored on it.
295 # Since we want to keep at least one share on each server
296 # that has one (otherwise we'd only be making
297 # the situation worse by removing distinct servers),
298 # each server has len(its shares) - 1 to spread around.
299 shares_to_spread = sum([len(list(sharelist)) - 1
300 for (server, sharelist)
302 if delta <= len(self.uncontacted_peers) and \
303 shares_to_spread >= delta:
304 # Loop through the allocated shares, removing
305 items = shares.items()
306 while len(self.homeless_shares) < delta:
307 servernum, sharelist = items.pop()
308 if len(sharelist) > 1:
309 share = sharelist.pop()
310 self.homeless_shares.append(share)
311 del(self.preexisting_shares[share])
312 items.append((servernum, sharelist))
315 raise NotEnoughSharesError("shares could only be placed on %d "
316 "servers (%d were requested)" %
317 (len(effective_happiness),
318 self.servers_of_happiness))
320 if self.uncontacted_peers:
321 peer = self.uncontacted_peers.pop(0)
322 # TODO: don't pre-convert all peerids to PeerTrackers
323 assert isinstance(peer, PeerTracker)
325 shares_to_ask = set([self.homeless_shares.pop(0)])
326 self.query_count += 1
327 self.num_peers_contacted += 1
329 self._status.set_status("Contacting Peers [%s] (first query),"
331 % (idlib.shortnodeid_b2a(peer.peerid),
332 len(self.homeless_shares)))
333 d = peer.query(shares_to_ask)
334 d.addBoth(self._got_response, peer, shares_to_ask,
335 self.contacted_peers)
337 elif self.contacted_peers:
338 # ask a peer that we've already asked.
339 if not self._started_second_pass:
340 log.msg("starting second pass", parent=self._log_parent,
342 self._started_second_pass = True
343 num_shares = mathutil.div_ceil(len(self.homeless_shares),
344 len(self.contacted_peers))
345 peer = self.contacted_peers.pop(0)
346 shares_to_ask = set(self.homeless_shares[:num_shares])
347 self.homeless_shares[:num_shares] = []
348 self.query_count += 1
350 self._status.set_status("Contacting Peers [%s] (second query),"
352 % (idlib.shortnodeid_b2a(peer.peerid),
353 len(self.homeless_shares)))
354 d = peer.query(shares_to_ask)
355 d.addBoth(self._got_response, peer, shares_to_ask,
356 self.contacted_peers2)
358 elif self.contacted_peers2:
359 # we've finished the second-or-later pass. Move all the remaining
360 # peers back into self.contacted_peers for the next pass.
361 self.contacted_peers.extend(self.contacted_peers2)
362 self.contacted_peers2[:] = []
365 # no more peers. If we haven't placed enough shares, we fail.
366 placed_shares = self.total_shares - len(self.homeless_shares)
367 effective_happiness = servers_with_unique_shares(
368 self.preexisting_shares,
370 if len(effective_happiness) < self.servers_of_happiness:
371 msg = ("placed %d shares out of %d total (%d homeless), "
372 "want to place on %d servers, "
373 "sent %d queries to %d peers, "
374 "%d queries placed some shares, %d placed none, "
376 (self.total_shares - len(self.homeless_shares),
377 self.total_shares, len(self.homeless_shares),
378 self.servers_of_happiness,
379 self.query_count, self.num_peers_contacted,
380 self.good_query_count, self.bad_query_count,
382 msg = "peer selection failed for %s: %s" % (self, msg)
383 if self.last_failure_msg:
384 msg += " (%s)" % (self.last_failure_msg,)
385 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
387 raise NotEnoughSharesError(msg)
389 raise NoSharesError(msg)
391 # we placed enough to be happy, so we're done
393 self._status.set_status("Placed all shares")
394 return self.use_peers
396 def _got_response(self, res, peer, shares_to_ask, put_peer_here):
397 if isinstance(res, failure.Failure):
398 # This is unusual, and probably indicates a bug or a network
400 log.msg("%s got error during peer selection: %s" % (peer, res),
401 level=log.UNUSUAL, parent=self._log_parent)
402 self.error_count += 1
403 self.homeless_shares = list(shares_to_ask) + self.homeless_shares
404 if (self.uncontacted_peers
405 or self.contacted_peers
406 or self.contacted_peers2):
407 # there is still hope, so just loop
410 # No more peers, so this upload might fail (it depends upon
411 # whether we've hit shares_of_happiness or not). Log the last
412 # failure we got: if a coding error causes all peers to fail
413 # in the same way, this allows the common failure to be seen
414 # by the uploader and should help with debugging
415 msg = ("last failure (from %s) was: %s" % (peer, res))
416 self.last_failure_msg = msg
418 (alreadygot, allocated) = res
419 log.msg("response from peer %s: alreadygot=%s, allocated=%s"
420 % (idlib.shortnodeid_b2a(peer.peerid),
421 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
422 level=log.NOISY, parent=self._log_parent)
425 if self.preexisting_shares.has_key(s):
426 old_size = len(servers_with_unique_shares(self.preexisting_shares))
427 new_candidate = self.preexisting_shares.copy()
428 new_candidate[s] = peer.peerid
429 new_size = len(servers_with_unique_shares(new_candidate))
430 if old_size >= new_size: continue
431 self.preexisting_shares[s] = peer.peerid
432 if s in self.homeless_shares:
433 self.homeless_shares.remove(s)
436 # the PeerTracker will remember which shares were allocated on
437 # that peer. We just have to remember to use them.
439 self.use_peers.add(peer)
442 not_yet_present = set(shares_to_ask) - set(alreadygot)
443 still_homeless = not_yet_present - set(allocated)
446 # they accepted or already had at least one share, so
447 # progress has been made
448 self.good_query_count += 1
450 self.bad_query_count += 1
453 # In networks with lots of space, this is very unusual and
454 # probably indicates an error. In networks with peers that
455 # are full, it is merely unusual. In networks that are very
456 # full, it is common, and many uploads will fail. In most
457 # cases, this is obviously not fatal, and we'll just use some
460 # some shares are still homeless, keep trying to find them a
461 # home. The ones that were rejected get first priority.
462 self.homeless_shares = (list(still_homeless)
463 + self.homeless_shares)
464 # Since they were unable to accept all of our requests, so it
465 # is safe to assume that asking them again won't help.
467 # if they *were* able to accept everything, they might be
468 # willing to accept even more.
469 put_peer_here.append(peer)
475 class EncryptAnUploadable:
476 """This is a wrapper that takes an IUploadable and provides
477 IEncryptedUploadable."""
478 implements(IEncryptedUploadable)
481 def __init__(self, original, log_parent=None):
482 self.original = IUploadable(original)
483 self._log_number = log_parent
484 self._encryptor = None
485 self._plaintext_hasher = plaintext_hasher()
486 self._plaintext_segment_hasher = None
487 self._plaintext_segment_hashes = []
488 self._encoding_parameters = None
489 self._file_size = None
490 self._ciphertext_bytes_read = 0
493 def set_upload_status(self, upload_status):
494 self._status = IUploadStatus(upload_status)
495 self.original.set_upload_status(upload_status)
497 def log(self, *args, **kwargs):
498 if "facility" not in kwargs:
499 kwargs["facility"] = "upload.encryption"
500 if "parent" not in kwargs:
501 kwargs["parent"] = self._log_number
502 return log.msg(*args, **kwargs)
505 if self._file_size is not None:
506 return defer.succeed(self._file_size)
507 d = self.original.get_size()
509 self._file_size = size
511 self._status.set_size(size)
513 d.addCallback(_got_size)
516 def get_all_encoding_parameters(self):
517 if self._encoding_parameters is not None:
518 return defer.succeed(self._encoding_parameters)
519 d = self.original.get_all_encoding_parameters()
520 def _got(encoding_parameters):
521 (k, happy, n, segsize) = encoding_parameters
522 self._segment_size = segsize # used by segment hashers
523 self._encoding_parameters = encoding_parameters
524 self.log("my encoding parameters: %s" % (encoding_parameters,),
526 return encoding_parameters
530 def _get_encryptor(self):
532 return defer.succeed(self._encryptor)
534 d = self.original.get_encryption_key()
539 storage_index = storage_index_hash(key)
540 assert isinstance(storage_index, str)
541 # There's no point to having the SI be longer than the key, so we
542 # specify that it is truncated to the same 128 bits as the AES key.
543 assert len(storage_index) == 16 # SHA-256 truncated to 128b
544 self._storage_index = storage_index
546 self._status.set_storage_index(storage_index)
551 def get_storage_index(self):
552 d = self._get_encryptor()
553 d.addCallback(lambda res: self._storage_index)
556 def _get_segment_hasher(self):
557 p = self._plaintext_segment_hasher
559 left = self._segment_size - self._plaintext_segment_hashed_bytes
561 p = plaintext_segment_hasher()
562 self._plaintext_segment_hasher = p
563 self._plaintext_segment_hashed_bytes = 0
564 return p, self._segment_size
566 def _update_segment_hash(self, chunk):
568 while offset < len(chunk):
569 p, segment_left = self._get_segment_hasher()
570 chunk_left = len(chunk) - offset
571 this_segment = min(chunk_left, segment_left)
572 p.update(chunk[offset:offset+this_segment])
573 self._plaintext_segment_hashed_bytes += this_segment
575 if self._plaintext_segment_hashed_bytes == self._segment_size:
576 # we've filled this segment
577 self._plaintext_segment_hashes.append(p.digest())
578 self._plaintext_segment_hasher = None
579 self.log("closed hash [%d]: %dB" %
580 (len(self._plaintext_segment_hashes)-1,
581 self._plaintext_segment_hashed_bytes),
583 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
584 segnum=len(self._plaintext_segment_hashes)-1,
585 hash=base32.b2a(p.digest()),
588 offset += this_segment
591 def read_encrypted(self, length, hash_only):
592 # make sure our parameters have been set up first
593 d = self.get_all_encoding_parameters()
595 d.addCallback(lambda ignored: self.get_size())
596 d.addCallback(lambda ignored: self._get_encryptor())
597 # then fetch and encrypt the plaintext. The unusual structure here
598 # (passing a Deferred *into* a function) is needed to avoid
599 # overflowing the stack: Deferreds don't optimize out tail recursion.
600 # We also pass in a list, to which _read_encrypted will append
603 d2 = defer.Deferred()
604 d.addCallback(lambda ignored:
605 self._read_encrypted(length, ciphertext, hash_only, d2))
606 d.addCallback(lambda ignored: d2)
609 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
611 fire_when_done.callback(ciphertext)
613 # tolerate large length= values without consuming a lot of RAM by
614 # reading just a chunk (say 50kB) at a time. This only really matters
615 # when hash_only==True (i.e. resuming an interrupted upload), since
616 # that's the case where we will be skipping over a lot of data.
617 size = min(remaining, self.CHUNKSIZE)
618 remaining = remaining - size
619 # read a chunk of plaintext..
620 d = defer.maybeDeferred(self.original.read, size)
621 # N.B.: if read() is synchronous, then since everything else is
622 # actually synchronous too, we'd blow the stack unless we stall for a
623 # tick. Once you accept a Deferred from IUploadable.read(), you must
624 # be prepared to have it fire immediately too.
625 d.addCallback(fireEventually)
626 def _good(plaintext):
628 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
629 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
630 ciphertext.extend(ct)
631 self._read_encrypted(remaining, ciphertext, hash_only,
634 fire_when_done.errback(why)
639 def _hash_and_encrypt_plaintext(self, data, hash_only):
640 assert isinstance(data, (tuple, list)), type(data)
643 # we use data.pop(0) instead of 'for chunk in data' to save
644 # memory: each chunk is destroyed as soon as we're done with it.
648 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
650 bytes_processed += len(chunk)
651 self._plaintext_hasher.update(chunk)
652 self._update_segment_hash(chunk)
653 # TODO: we have to encrypt the data (even if hash_only==True)
654 # because pycryptopp's AES-CTR implementation doesn't offer a
655 # way to change the counter value. Once pycryptopp acquires
656 # this ability, change this to simply update the counter
657 # before each call to (hash_only==False) _encryptor.process()
658 ciphertext = self._encryptor.process(chunk)
660 self.log(" skipping encryption", level=log.NOISY)
662 cryptdata.append(ciphertext)
665 self._ciphertext_bytes_read += bytes_processed
667 progress = float(self._ciphertext_bytes_read) / self._file_size
668 self._status.set_progress(1, progress)
672 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
673 # this is currently unused, but will live again when we fix #453
674 if len(self._plaintext_segment_hashes) < num_segments:
675 # close out the last one
676 assert len(self._plaintext_segment_hashes) == num_segments-1
677 p, segment_left = self._get_segment_hasher()
678 self._plaintext_segment_hashes.append(p.digest())
679 del self._plaintext_segment_hasher
680 self.log("closing plaintext leaf hasher, hashed %d bytes" %
681 self._plaintext_segment_hashed_bytes,
683 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
684 segnum=len(self._plaintext_segment_hashes)-1,
685 hash=base32.b2a(p.digest()),
687 assert len(self._plaintext_segment_hashes) == num_segments
688 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
690 def get_plaintext_hash(self):
691 h = self._plaintext_hasher.digest()
692 return defer.succeed(h)
695 return self.original.close()
698 implements(IUploadStatus)
699 statusid_counter = itertools.count(0)
702 self.storage_index = None
705 self.status = "Not started"
706 self.progress = [0.0, 0.0, 0.0]
709 self.counter = self.statusid_counter.next()
710 self.started = time.time()
712 def get_started(self):
714 def get_storage_index(self):
715 return self.storage_index
718 def using_helper(self):
720 def get_status(self):
722 def get_progress(self):
723 return tuple(self.progress)
724 def get_active(self):
726 def get_results(self):
728 def get_counter(self):
731 def set_storage_index(self, si):
732 self.storage_index = si
733 def set_size(self, size):
735 def set_helper(self, helper):
737 def set_status(self, status):
739 def set_progress(self, which, value):
740 # [0]: chk, [1]: ciphertext, [2]: encode+push
741 self.progress[which] = value
742 def set_active(self, value):
744 def set_results(self, value):
748 peer_selector_class = Tahoe2PeerSelector
750 def __init__(self, storage_broker, secret_holder):
751 # peer_selector needs storage_broker and secret_holder
752 self._storage_broker = storage_broker
753 self._secret_holder = secret_holder
754 self._log_number = self.log("CHKUploader starting", parent=None)
756 self._results = UploadResults()
757 self._storage_index = None
758 self._upload_status = UploadStatus()
759 self._upload_status.set_helper(False)
760 self._upload_status.set_active(True)
761 self._upload_status.set_results(self._results)
763 # locate_all_shareholders() will create the following attribute:
764 # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
766 def log(self, *args, **kwargs):
767 if "parent" not in kwargs:
768 kwargs["parent"] = self._log_number
769 if "facility" not in kwargs:
770 kwargs["facility"] = "tahoe.upload"
771 return log.msg(*args, **kwargs)
773 def start(self, encrypted_uploadable):
774 """Start uploading the file.
776 Returns a Deferred that will fire with the UploadResults instance.
779 self._started = time.time()
780 eu = IEncryptedUploadable(encrypted_uploadable)
781 self.log("starting upload of %s" % eu)
783 eu.set_upload_status(self._upload_status)
784 d = self.start_encrypted(eu)
785 def _done(uploadresults):
786 self._upload_status.set_active(False)
792 """Call this if the upload must be abandoned before it completes.
793 This will tell the shareholders to delete their partial shares. I
794 return a Deferred that fires when these messages have been acked."""
795 if not self._encoder:
796 # how did you call abort() before calling start() ?
797 return defer.succeed(None)
798 return self._encoder.abort()
800 def start_encrypted(self, encrypted):
801 """ Returns a Deferred that will fire with the UploadResults instance. """
802 eu = IEncryptedUploadable(encrypted)
804 started = time.time()
805 self._encoder = e = encode.Encoder(self._log_number,
807 d = e.set_encrypted_uploadable(eu)
808 d.addCallback(self.locate_all_shareholders, started)
809 d.addCallback(self.set_shareholders, e)
810 d.addCallback(lambda res: e.start())
811 d.addCallback(self._encrypted_done)
814 def locate_all_shareholders(self, encoder, started):
815 peer_selection_started = now = time.time()
816 self._storage_index_elapsed = now - started
817 storage_broker = self._storage_broker
818 secret_holder = self._secret_holder
819 storage_index = encoder.get_param("storage_index")
820 self._storage_index = storage_index
821 upload_id = si_b2a(storage_index)[:5]
822 self.log("using storage index %s" % upload_id)
823 peer_selector = self.peer_selector_class(upload_id, self._log_number,
826 share_size = encoder.get_param("share_size")
827 block_size = encoder.get_param("block_size")
828 num_segments = encoder.get_param("num_segments")
829 k,desired,n = encoder.get_param("share_counts")
831 self._peer_selection_started = time.time()
832 d = peer_selector.get_shareholders(storage_broker, secret_holder,
834 share_size, block_size,
835 num_segments, n, desired)
837 self._peer_selection_elapsed = time.time() - peer_selection_started
842 def set_shareholders(self, (used_peers, already_peers), encoder):
844 @param used_peers: a sequence of PeerTracker objects
845 @paran already_peers: a dict mapping sharenum to a peerid that
846 claims to already have this share
848 self.log("_send_shares, used_peers is %s" % (used_peers,))
849 # record already-present shares in self._results
850 self._results.preexisting_shares = len(already_peers)
852 self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
853 for peer in used_peers:
854 assert isinstance(peer, PeerTracker)
856 servermap = already_peers.copy()
857 for peer in used_peers:
858 buckets.update(peer.buckets)
859 for shnum in peer.buckets:
860 self._peer_trackers[shnum] = peer
861 servermap[shnum] = peer.peerid
862 assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
863 encoder.set_shareholders(buckets, servermap)
865 def _encrypted_done(self, verifycap):
866 """ Returns a Deferred that will fire with the UploadResults instance. """
868 for shnum in self._encoder.get_shares_placed():
869 peer_tracker = self._peer_trackers[shnum]
870 peerid = peer_tracker.peerid
871 r.sharemap.add(shnum, peerid)
872 r.servermap.add(peerid, shnum)
873 r.pushed_shares = len(self._encoder.get_shares_placed())
875 r.file_size = self._encoder.file_size
876 r.timings["total"] = now - self._started
877 r.timings["storage_index"] = self._storage_index_elapsed
878 r.timings["peer_selection"] = self._peer_selection_elapsed
879 r.timings.update(self._encoder.get_times())
880 r.uri_extension_data = self._encoder.get_uri_extension_data()
881 r.verifycapstr = verifycap.to_string()
884 def get_upload_status(self):
885 return self._upload_status
887 def read_this_many_bytes(uploadable, size, prepend_data=[]):
889 return defer.succeed([])
890 d = uploadable.read(size)
892 assert isinstance(data, list)
893 bytes = sum([len(piece) for piece in data])
896 remaining = size - bytes
898 return read_this_many_bytes(uploadable, remaining,
900 return prepend_data + data
904 class LiteralUploader:
907 self._results = UploadResults()
908 self._status = s = UploadStatus()
909 s.set_storage_index(None)
911 s.set_progress(0, 1.0)
913 s.set_results(self._results)
915 def start(self, uploadable):
916 uploadable = IUploadable(uploadable)
917 d = uploadable.get_size()
920 self._status.set_size(size)
921 self._results.file_size = size
922 return read_this_many_bytes(uploadable, size)
923 d.addCallback(_got_size)
924 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
925 d.addCallback(lambda u: u.to_string())
926 d.addCallback(self._build_results)
929 def _build_results(self, uri):
930 self._results.uri = uri
931 self._status.set_status("Finished")
932 self._status.set_progress(1, 1.0)
933 self._status.set_progress(2, 1.0)
939 def get_upload_status(self):
942 class RemoteEncryptedUploadable(Referenceable):
943 implements(RIEncryptedUploadable)
945 def __init__(self, encrypted_uploadable, upload_status):
946 self._eu = IEncryptedUploadable(encrypted_uploadable)
949 self._status = IUploadStatus(upload_status)
950 # we are responsible for updating the status string while we run, and
951 # for setting the ciphertext-fetch progress.
955 if self._size is not None:
956 return defer.succeed(self._size)
957 d = self._eu.get_size()
961 d.addCallback(_got_size)
964 def remote_get_size(self):
965 return self.get_size()
966 def remote_get_all_encoding_parameters(self):
967 return self._eu.get_all_encoding_parameters()
969 def _read_encrypted(self, length, hash_only):
970 d = self._eu.read_encrypted(length, hash_only)
973 self._offset += length
975 size = sum([len(data) for data in strings])
981 def remote_read_encrypted(self, offset, length):
982 # we don't support seek backwards, but we allow skipping forwards
983 precondition(offset >= 0, offset)
984 precondition(length >= 0, length)
985 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
987 precondition(offset >= self._offset, offset, self._offset)
988 if offset > self._offset:
989 # read the data from disk anyways, to build up the hash tree
990 skip = offset - self._offset
991 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
992 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
993 d = self._read_encrypted(skip, hash_only=True)
995 d = defer.succeed(None)
997 def _at_correct_offset(res):
998 assert offset == self._offset, "%d != %d" % (offset, self._offset)
999 return self._read_encrypted(length, hash_only=False)
1000 d.addCallback(_at_correct_offset)
1003 size = sum([len(data) for data in strings])
1004 self._bytes_sent += size
1006 d.addCallback(_read)
1009 def remote_close(self):
1010 return self._eu.close()
1013 class AssistedUploader:
1015 def __init__(self, helper):
1016 self._helper = helper
1017 self._log_number = log.msg("AssistedUploader starting")
1018 self._storage_index = None
1019 self._upload_status = s = UploadStatus()
1023 def log(self, *args, **kwargs):
1024 if "parent" not in kwargs:
1025 kwargs["parent"] = self._log_number
1026 return log.msg(*args, **kwargs)
1028 def start(self, encrypted_uploadable, storage_index):
1029 """Start uploading the file.
1031 Returns a Deferred that will fire with the UploadResults instance.
1033 precondition(isinstance(storage_index, str), storage_index)
1034 self._started = time.time()
1035 eu = IEncryptedUploadable(encrypted_uploadable)
1036 eu.set_upload_status(self._upload_status)
1037 self._encuploadable = eu
1038 self._storage_index = storage_index
1040 d.addCallback(self._got_size)
1041 d.addCallback(lambda res: eu.get_all_encoding_parameters())
1042 d.addCallback(self._got_all_encoding_parameters)
1043 d.addCallback(self._contact_helper)
1044 d.addCallback(self._build_verifycap)
1046 self._upload_status.set_active(False)
1051 def _got_size(self, size):
1053 self._upload_status.set_size(size)
1055 def _got_all_encoding_parameters(self, params):
1056 k, happy, n, segment_size = params
1057 # stash these for URI generation later
1058 self._needed_shares = k
1059 self._total_shares = n
1060 self._segment_size = segment_size
1062 def _contact_helper(self, res):
1063 now = self._time_contacting_helper_start = time.time()
1064 self._storage_index_elapsed = now - self._started
1065 self.log(format="contacting helper for SI %(si)s..",
1066 si=si_b2a(self._storage_index))
1067 self._upload_status.set_status("Contacting Helper")
1068 d = self._helper.callRemote("upload_chk", self._storage_index)
1069 d.addCallback(self._contacted_helper)
1072 def _contacted_helper(self, (upload_results, upload_helper)):
1074 elapsed = now - self._time_contacting_helper_start
1075 self._elapsed_time_contacting_helper = elapsed
1077 self.log("helper says we need to upload")
1078 self._upload_status.set_status("Uploading Ciphertext")
1079 # we need to upload the file
1080 reu = RemoteEncryptedUploadable(self._encuploadable,
1081 self._upload_status)
1082 # let it pre-compute the size for progress purposes
1084 d.addCallback(lambda ignored:
1085 upload_helper.callRemote("upload", reu))
1086 # this Deferred will fire with the upload results
1088 self.log("helper says file is already uploaded")
1089 self._upload_status.set_progress(1, 1.0)
1090 self._upload_status.set_results(upload_results)
1091 return upload_results
1093 def _convert_old_upload_results(self, upload_results):
1094 # pre-1.3.0 helpers return upload results which contain a mapping
1095 # from shnum to a single human-readable string, containing things
1096 # like "Found on [x],[y],[z]" (for healthy files that were already in
1097 # the grid), "Found on [x]" (for files that needed upload but which
1098 # discovered pre-existing shares), and "Placed on [x]" (for newly
1099 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1100 # set of binary serverid strings.
1102 # the old results are too hard to deal with (they don't even contain
1103 # as much information as the new results, since the nodeids are
1104 # abbreviated), so if we detect old results, just clobber them.
1106 sharemap = upload_results.sharemap
1107 if str in [type(v) for v in sharemap.values()]:
1108 upload_results.sharemap = None
1110 def _build_verifycap(self, upload_results):
1111 self.log("upload finished, building readcap")
1112 self._convert_old_upload_results(upload_results)
1113 self._upload_status.set_status("Building Readcap")
1115 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1116 assert r.uri_extension_data["total_shares"] == self._total_shares
1117 assert r.uri_extension_data["segment_size"] == self._segment_size
1118 assert r.uri_extension_data["size"] == self._size
1119 r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1120 uri_extension_hash=r.uri_extension_hash,
1121 needed_shares=self._needed_shares,
1122 total_shares=self._total_shares, size=self._size
1125 r.file_size = self._size
1126 r.timings["storage_index"] = self._storage_index_elapsed
1127 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1128 if "total" in r.timings:
1129 r.timings["helper_total"] = r.timings["total"]
1130 r.timings["total"] = now - self._started
1131 self._upload_status.set_status("Finished")
1132 self._upload_status.set_results(r)
1135 def get_upload_status(self):
1136 return self._upload_status
1138 class BaseUploadable:
1139 default_max_segment_size = 128*KiB # overridden by max_segment_size
1140 default_encoding_param_k = 3 # overridden by encoding_parameters
1141 default_encoding_param_happy = 7
1142 default_encoding_param_n = 10
1144 max_segment_size = None
1145 encoding_param_k = None
1146 encoding_param_happy = None
1147 encoding_param_n = None
1149 _all_encoding_parameters = None
1152 def set_upload_status(self, upload_status):
1153 self._status = IUploadStatus(upload_status)
1155 def set_default_encoding_parameters(self, default_params):
1156 assert isinstance(default_params, dict)
1157 for k,v in default_params.items():
1158 precondition(isinstance(k, str), k, v)
1159 precondition(isinstance(v, int), k, v)
1160 if "k" in default_params:
1161 self.default_encoding_param_k = default_params["k"]
1162 if "happy" in default_params:
1163 self.default_encoding_param_happy = default_params["happy"]
1164 if "n" in default_params:
1165 self.default_encoding_param_n = default_params["n"]
1166 if "max_segment_size" in default_params:
1167 self.default_max_segment_size = default_params["max_segment_size"]
1169 def get_all_encoding_parameters(self):
1170 if self._all_encoding_parameters:
1171 return defer.succeed(self._all_encoding_parameters)
1173 max_segsize = self.max_segment_size or self.default_max_segment_size
1174 k = self.encoding_param_k or self.default_encoding_param_k
1175 happy = self.encoding_param_happy or self.default_encoding_param_happy
1176 n = self.encoding_param_n or self.default_encoding_param_n
1179 def _got_size(file_size):
1180 # for small files, shrink the segment size to avoid wasting space
1181 segsize = min(max_segsize, file_size)
1182 # this must be a multiple of 'required_shares'==k
1183 segsize = mathutil.next_multiple(segsize, k)
1184 encoding_parameters = (k, happy, n, segsize)
1185 self._all_encoding_parameters = encoding_parameters
1186 return encoding_parameters
1187 d.addCallback(_got_size)
1190 class FileHandle(BaseUploadable):
1191 implements(IUploadable)
1193 def __init__(self, filehandle, convergence):
1195 Upload the data from the filehandle. If convergence is None then a
1196 random encryption key will be used, else the plaintext will be hashed,
1197 then the hash will be hashed together with the string in the
1198 "convergence" argument to form the encryption key.
1200 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1201 self._filehandle = filehandle
1203 self.convergence = convergence
1206 def _get_encryption_key_convergent(self):
1207 if self._key is not None:
1208 return defer.succeed(self._key)
1211 # that sets self._size as a side-effect
1212 d.addCallback(lambda size: self.get_all_encoding_parameters())
1214 k, happy, n, segsize = params
1215 f = self._filehandle
1216 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1221 data = f.read(BLOCKSIZE)
1224 enckey_hasher.update(data)
1225 # TODO: setting progress in a non-yielding loop is kind of
1226 # pointless, but I'm anticipating (perhaps prematurely) the
1227 # day when we use a slowjob or twisted's CooperatorService to
1228 # make this yield time to other jobs.
1229 bytes_read += len(data)
1231 self._status.set_progress(0, float(bytes_read)/self._size)
1233 self._key = enckey_hasher.digest()
1235 self._status.set_progress(0, 1.0)
1236 assert len(self._key) == 16
1241 def _get_encryption_key_random(self):
1242 if self._key is None:
1243 self._key = os.urandom(16)
1244 return defer.succeed(self._key)
1246 def get_encryption_key(self):
1247 if self.convergence is not None:
1248 return self._get_encryption_key_convergent()
1250 return self._get_encryption_key_random()
1253 if self._size is not None:
1254 return defer.succeed(self._size)
1255 self._filehandle.seek(0,2)
1256 size = self._filehandle.tell()
1258 self._filehandle.seek(0)
1259 return defer.succeed(size)
1261 def read(self, length):
1262 return defer.succeed([self._filehandle.read(length)])
1265 # the originator of the filehandle reserves the right to close it
1268 class FileName(FileHandle):
1269 def __init__(self, filename, convergence):
1271 Upload the data from the filename. If convergence is None then a
1272 random encryption key will be used, else the plaintext will be hashed,
1273 then the hash will be hashed together with the string in the
1274 "convergence" argument to form the encryption key.
1276 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1277 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1279 FileHandle.close(self)
1280 self._filehandle.close()
1282 class Data(FileHandle):
1283 def __init__(self, data, convergence):
1285 Upload the data from the data argument. If convergence is None then a
1286 random encryption key will be used, else the plaintext will be hashed,
1287 then the hash will be hashed together with the string in the
1288 "convergence" argument to form the encryption key.
1290 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1291 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1293 class Uploader(service.MultiService, log.PrefixingLogMixin):
1294 """I am a service that allows file uploading. I am a service-child of the
1297 implements(IUploader)
1299 URI_LIT_SIZE_THRESHOLD = 55
1301 def __init__(self, helper_furl=None, stats_provider=None):
1302 self._helper_furl = helper_furl
1303 self.stats_provider = stats_provider
1305 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1306 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1307 service.MultiService.__init__(self)
1309 def startService(self):
1310 service.MultiService.startService(self)
1311 if self._helper_furl:
1312 self.parent.tub.connectTo(self._helper_furl,
1315 def _got_helper(self, helper):
1316 self.log("got helper connection, getting versions")
1317 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1319 "application-version": "unknown: no get_version()",
1321 d = add_version_to_remote_reference(helper, default)
1322 d.addCallback(self._got_versioned_helper)
1324 def _got_versioned_helper(self, helper):
1325 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1326 if needed not in helper.version:
1327 raise InsufficientVersionError(needed, helper.version)
1328 self._helper = helper
1329 helper.notifyOnDisconnect(self._lost_helper)
1331 def _lost_helper(self):
1334 def get_helper_info(self):
1335 # return a tuple of (helper_furl_or_None, connected_bool)
1336 return (self._helper_furl, bool(self._helper))
1339 def upload(self, uploadable, history=None):
1341 Returns a Deferred that will fire with the UploadResults instance.
1346 uploadable = IUploadable(uploadable)
1347 d = uploadable.get_size()
1348 def _got_size(size):
1349 default_params = self.parent.get_encoding_parameters()
1350 precondition(isinstance(default_params, dict), default_params)
1351 precondition("max_segment_size" in default_params, default_params)
1352 uploadable.set_default_encoding_parameters(default_params)
1354 if self.stats_provider:
1355 self.stats_provider.count('uploader.files_uploaded', 1)
1356 self.stats_provider.count('uploader.bytes_uploaded', size)
1358 if size <= self.URI_LIT_SIZE_THRESHOLD:
1359 uploader = LiteralUploader()
1360 return uploader.start(uploadable)
1362 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1363 d2 = defer.succeed(None)
1365 uploader = AssistedUploader(self._helper)
1366 d2.addCallback(lambda x: eu.get_storage_index())
1367 d2.addCallback(lambda si: uploader.start(eu, si))
1369 storage_broker = self.parent.get_storage_broker()
1370 secret_holder = self.parent._secret_holder
1371 uploader = CHKUploader(storage_broker, secret_holder)
1372 d2.addCallback(lambda x: uploader.start(eu))
1374 self._all_uploads[uploader] = None
1376 history.add_upload(uploader.get_upload_status())
1377 def turn_verifycap_into_read_cap(uploadresults):
1378 # Generate the uri from the verifycap plus the key.
1379 d3 = uploadable.get_encryption_key()
1380 def put_readcap_into_results(key):
1381 v = uri.from_string(uploadresults.verifycapstr)
1382 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1383 uploadresults.uri = r.to_string()
1384 return uploadresults
1385 d3.addCallback(put_readcap_into_results)
1387 d2.addCallback(turn_verifycap_into_read_cap)
1389 d.addCallback(_got_size)