1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9 file_cancel_secret_hash, bucket_renewal_secret_hash, \
10 bucket_cancel_secret_hash, plaintext_hasher, \
11 storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.rrefutil import add_version_to_remote_reference
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19 IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
20 NoServersError, InsufficientVersionError, UploadHappinessError
21 from allmydata.immutable import layout
22 from pycryptopp.cipher.aes import AES
24 from cStringIO import StringIO
33 class HaveAllPeersError(Exception):
34 # we use this to jump out of the loop
37 # this wants to live in storage, not here
38 class TooFullError(Exception):
41 class UploadResults(Copyable, RemoteCopy):
42 implements(IUploadResults)
43 # note: don't change this string, it needs to match the value used on the
44 # helper, and it does *not* need to match the fully-qualified
45 # package/module/class name
46 typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
49 # also, think twice about changing the shape of any existing attribute,
50 # because instances of this class are sent from the helper to its client,
51 # so changing this may break compatibility. Consider adding new fields
52 # instead of modifying existing ones.
55 self.timings = {} # dict of name to number of seconds
56 self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
57 self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
59 self.ciphertext_fetched = None # how much the helper fetched
61 self.preexisting_shares = None # count of shares already present
62 self.pushed_shares = None # count of shares we pushed
65 # our current uri_extension is 846 bytes for small files, a few bytes
66 # more for larger ones (since the filesize is encoded in decimal in a
67 # few places). Ask for a little bit more just in case we need it. If
68 # the extension changes size, we can change EXTENSION_SIZE to
69 # allocate a more accurate amount of space.
71 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
75 def __init__(self, peerid, storage_server,
76 sharesize, blocksize, num_segments, num_share_hashes,
78 bucket_renewal_secret, bucket_cancel_secret):
79 precondition(isinstance(peerid, str), peerid)
80 precondition(len(peerid) == 20, peerid)
82 self._storageserver = storage_server # to an RIStorageServer
83 self.buckets = {} # k: shareid, v: IRemoteBucketWriter
84 self.sharesize = sharesize
86 wbp = layout.make_write_bucket_proxy(None, sharesize,
87 blocksize, num_segments,
89 EXTENSION_SIZE, peerid)
90 self.wbp_class = wbp.__class__ # to create more of them
91 self.allocated_size = wbp.get_allocated_size()
92 self.blocksize = blocksize
93 self.num_segments = num_segments
94 self.num_share_hashes = num_share_hashes
95 self.storage_index = storage_index
97 self.renew_secret = bucket_renewal_secret
98 self.cancel_secret = bucket_cancel_secret
101 return ("<PeerTracker for peer %s and SI %s>"
102 % (idlib.shortnodeid_b2a(self.peerid),
103 si_b2a(self.storage_index)[:5]))
105 def query(self, sharenums):
106 d = self._storageserver.callRemote("allocate_buckets",
112 canary=Referenceable())
113 d.addCallback(self._got_reply)
116 def query_allocated(self):
117 d = self._storageserver.callRemote("get_buckets",
121 def _got_reply(self, (alreadygot, buckets)):
122 #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
124 for sharenum, rref in buckets.iteritems():
125 bp = self.wbp_class(rref, self.sharesize,
128 self.num_share_hashes,
132 self.buckets.update(b)
133 return (alreadygot, set(b.keys()))
135 def servers_with_unique_shares(existing_shares, used_peers=None):
137 I accept a dict of shareid -> peerid mappings (and optionally a list
138 of PeerTracker instances) and return a list of servers that have shares.
141 existing_shares = existing_shares.copy()
144 for peer in used_peers:
145 peerdict.update(dict([(i, peer.peerid) for i in peer.buckets]))
146 for k in peerdict.keys():
147 if existing_shares.has_key(k):
148 # Prevent overcounting; favor the bucket, and not the
150 del(existing_shares[k])
151 peers = list(used_peers.copy())
152 # We do this because the preexisting shares list goes by peerid.
153 peers = [x.peerid for x in peers]
154 servers.extend(peers)
155 servers.extend(existing_shares.values())
156 return list(set(servers))
158 def shares_by_server(existing_shares):
160 I accept a dict of shareid -> peerid mappings, and return a dict
161 of peerid -> shareid mappings
164 for server in set(existing_shares.values()):
165 servers[server] = set([x for x in existing_shares.keys()
166 if existing_shares[x] == server])
169 def should_add_server(existing_shares, server, bucket):
171 I tell my caller whether the servers_of_happiness number will be
172 increased or decreased if a particular server is added as the peer
173 already holding a particular share. I take a dictionary, a peerid,
174 and a bucket as arguments, and return a boolean.
176 old_size = len(servers_with_unique_shares(existing_shares))
177 new_candidate = existing_shares.copy()
178 new_candidate[bucket] = server
179 new_size = len(servers_with_unique_shares(new_candidate))
180 return old_size < new_size
182 class Tahoe2PeerSelector:
184 def __init__(self, upload_id, logparent=None, upload_status=None):
185 self.upload_id = upload_id
186 self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
187 # Peers that are working normally, but full.
190 self.num_peers_contacted = 0
191 self.last_failure_msg = None
192 self._status = IUploadStatus(upload_status)
193 self._log_parent = log.msg("%s starting" % self, parent=logparent)
196 return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
198 def get_shareholders(self, storage_broker, secret_holder,
199 storage_index, share_size, block_size,
200 num_segments, total_shares, servers_of_happiness):
202 @return: (used_peers, already_peers), where used_peers is a set of
203 PeerTracker instances that have agreed to hold some shares
204 for us (the shnum is stashed inside the PeerTracker),
205 and already_peers is a dict mapping shnum to a peer
206 which claims to already have the share.
210 self._status.set_status("Contacting Peers..")
212 self.total_shares = total_shares
213 self.servers_of_happiness = servers_of_happiness
215 self.homeless_shares = range(total_shares)
216 # self.uncontacted_peers = list() # peers we haven't asked yet
217 self.contacted_peers = [] # peers worth asking again
218 self.contacted_peers2 = [] # peers that we have asked again
219 self._started_second_pass = False
220 self.use_peers = set() # PeerTrackers that have shares assigned to them
221 self.preexisting_shares = {} # sharenum -> peerid holding the share
222 # We don't try to allocate shares to these servers, since they've
223 # said that they're incapable of storing shares of the size that
224 # we'd want to store. We keep them around because they may have
225 # existing shares for this storage index, which we want to know
226 # about for accurate servers_of_happiness accounting
227 self.readonly_peers = []
229 peers = storage_broker.get_servers_for_index(storage_index)
231 raise NoServersError("client gave us zero peers")
233 # this needed_hashes computation should mirror
234 # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
235 # (instead of a HashTree) because we don't require actual hashing
236 # just to count the levels.
237 ht = hashtree.IncompleteHashTree(total_shares)
238 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
240 # figure out how much space to ask for
241 wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
242 num_share_hashes, EXTENSION_SIZE,
244 allocated_size = wbp.get_allocated_size()
246 # filter the list of peers according to which ones can accomodate
247 # this request. This excludes older peers (which used a 4-byte size
248 # field) from getting large shares (for files larger than about
249 # 12GiB). See #439 for details.
250 def _get_maxsize(peer):
251 (peerid, conn) = peer
252 v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
253 return v1["maximum-immutable-share-size"]
254 new_peers = [peer for peer in peers
255 if _get_maxsize(peer) >= allocated_size]
256 old_peers = list(set(peers).difference(set(new_peers)))
259 # decide upon the renewal/cancel secrets, to include them in the
260 # allocate_buckets query.
261 client_renewal_secret = secret_holder.get_renewal_secret()
262 client_cancel_secret = secret_holder.get_cancel_secret()
264 file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
266 file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
268 def _make_trackers(peers):
269 return [ PeerTracker(peerid, conn,
270 share_size, block_size,
271 num_segments, num_share_hashes,
273 bucket_renewal_secret_hash(file_renewal_secret,
275 bucket_cancel_secret_hash(file_cancel_secret,
277 for (peerid, conn) in peers]
278 self.uncontacted_peers = _make_trackers(peers)
279 self.readonly_peers = _make_trackers(old_peers)
280 # Talk to the readonly servers to get an idea of what servers
281 # have what shares (if any) for this storage index
282 d = defer.maybeDeferred(self._existing_shares)
283 d.addCallback(lambda ign: self._loop())
286 def _existing_shares(self):
287 if self.readonly_peers:
288 peer = self.readonly_peers.pop()
289 assert isinstance(peer, PeerTracker)
290 d = peer.query_allocated()
291 d.addBoth(self._handle_existing_response, peer.peerid)
292 self.num_peers_contacted += 1
293 self.query_count += 1
294 log.msg("asking peer %s for any existing shares for upload id %s"
295 % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
296 level=log.NOISY, parent=self._log_parent)
298 self._status.set_status("Contacting Peer %s to find "
299 "any existing shares"
300 % idlib.shortnodeid_b2a(peer.peerid))
303 def _handle_existing_response(self, res, peer):
304 if isinstance(res, failure.Failure):
305 log.msg("%s got error during existing shares check: %s"
306 % (idlib.shortnodeid_b2a(peer), res),
307 level=log.UNUSUAL, parent=self._log_parent)
308 self.error_count += 1
309 self.bad_query_count += 1
312 log.msg("response from peer %s: alreadygot=%s"
313 % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
314 level=log.NOISY, parent=self._log_parent)
315 for bucket in buckets:
316 if should_add_server(self.preexisting_shares, peer, bucket):
317 self.preexisting_shares[bucket] = peer
318 if self.homeless_shares and bucket in self.homeless_shares:
319 self.homeless_shares.remove(bucket)
321 self.bad_query_count += 1
322 return self._existing_shares()
325 if not self.homeless_shares:
326 effective_happiness = servers_with_unique_shares(
327 self.preexisting_shares,
329 if self.servers_of_happiness <= len(effective_happiness):
330 msg = ("placed all %d shares, "
331 "sent %d queries to %d peers, "
332 "%d queries placed some shares, %d placed none, "
335 self.query_count, self.num_peers_contacted,
336 self.good_query_count, self.bad_query_count,
338 log.msg("peer selection successful for %s: %s" % (self, msg),
339 parent=self._log_parent)
340 return (self.use_peers, self.preexisting_shares)
342 delta = self.servers_of_happiness - len(effective_happiness)
343 shares = shares_by_server(self.preexisting_shares)
344 # Each server in shares maps to a set of shares stored on it.
345 # Since we want to keep at least one share on each server
346 # that has one (otherwise we'd only be making
347 # the situation worse by removing distinct servers),
348 # each server has len(its shares) - 1 to spread around.
349 shares_to_spread = sum([len(list(sharelist)) - 1
350 for (server, sharelist)
352 if delta <= len(self.uncontacted_peers) and \
353 shares_to_spread >= delta:
354 # Loop through the allocated shares, removing
355 items = shares.items()
356 while len(self.homeless_shares) < delta:
357 servernum, sharelist = items.pop()
358 if len(sharelist) > 1:
359 share = sharelist.pop()
360 self.homeless_shares.append(share)
361 del(self.preexisting_shares[share])
362 items.append((servernum, sharelist))
365 raise UploadHappinessError("shares could only be placed "
366 "on %d servers (%d were requested)" %
367 (len(effective_happiness),
368 self.servers_of_happiness))
370 if self.uncontacted_peers:
371 peer = self.uncontacted_peers.pop(0)
372 # TODO: don't pre-convert all peerids to PeerTrackers
373 assert isinstance(peer, PeerTracker)
375 shares_to_ask = set([self.homeless_shares.pop(0)])
376 self.query_count += 1
377 self.num_peers_contacted += 1
379 self._status.set_status("Contacting Peers [%s] (first query),"
381 % (idlib.shortnodeid_b2a(peer.peerid),
382 len(self.homeless_shares)))
383 d = peer.query(shares_to_ask)
384 d.addBoth(self._got_response, peer, shares_to_ask,
385 self.contacted_peers)
387 elif self.contacted_peers:
388 # ask a peer that we've already asked.
389 if not self._started_second_pass:
390 log.msg("starting second pass", parent=self._log_parent,
392 self._started_second_pass = True
393 num_shares = mathutil.div_ceil(len(self.homeless_shares),
394 len(self.contacted_peers))
395 peer = self.contacted_peers.pop(0)
396 shares_to_ask = set(self.homeless_shares[:num_shares])
397 self.homeless_shares[:num_shares] = []
398 self.query_count += 1
400 self._status.set_status("Contacting Peers [%s] (second query),"
402 % (idlib.shortnodeid_b2a(peer.peerid),
403 len(self.homeless_shares)))
404 d = peer.query(shares_to_ask)
405 d.addBoth(self._got_response, peer, shares_to_ask,
406 self.contacted_peers2)
408 elif self.contacted_peers2:
409 # we've finished the second-or-later pass. Move all the remaining
410 # peers back into self.contacted_peers for the next pass.
411 self.contacted_peers.extend(self.contacted_peers2)
412 self.contacted_peers2[:] = []
415 # no more peers. If we haven't placed enough shares, we fail.
416 placed_shares = self.total_shares - len(self.homeless_shares)
417 effective_happiness = servers_with_unique_shares(
418 self.preexisting_shares,
420 if len(effective_happiness) < self.servers_of_happiness:
421 msg = ("placed %d shares out of %d total (%d homeless), "
422 "want to place on %d servers, "
423 "sent %d queries to %d peers, "
424 "%d queries placed some shares, %d placed none "
425 "(of which %d placed none due to the server being"
426 " full and %d placed none due to an error)" %
427 (self.total_shares - len(self.homeless_shares),
428 self.total_shares, len(self.homeless_shares),
429 self.servers_of_happiness,
430 self.query_count, self.num_peers_contacted,
431 self.good_query_count, self.bad_query_count,
432 self.full_count, self.error_count))
433 msg = "peer selection failed for %s: %s" % (self, msg)
434 if self.last_failure_msg:
435 msg += " (%s)" % (self.last_failure_msg,)
436 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
437 raise UploadHappinessError(msg)
439 # we placed enough to be happy, so we're done
441 self._status.set_status("Placed all shares")
442 return (self.use_peers, self.preexisting_shares)
444 def _got_response(self, res, peer, shares_to_ask, put_peer_here):
445 if isinstance(res, failure.Failure):
446 # This is unusual, and probably indicates a bug or a network
448 log.msg("%s got error during peer selection: %s" % (peer, res),
449 level=log.UNUSUAL, parent=self._log_parent)
450 self.error_count += 1
451 self.bad_query_count += 1
452 self.homeless_shares = list(shares_to_ask) + self.homeless_shares
453 if (self.uncontacted_peers
454 or self.contacted_peers
455 or self.contacted_peers2):
456 # there is still hope, so just loop
459 # No more peers, so this upload might fail (it depends upon
460 # whether we've hit servers_of_happiness or not). Log the last
461 # failure we got: if a coding error causes all peers to fail
462 # in the same way, this allows the common failure to be seen
463 # by the uploader and should help with debugging
464 msg = ("last failure (from %s) was: %s" % (peer, res))
465 self.last_failure_msg = msg
467 (alreadygot, allocated) = res
468 log.msg("response from peer %s: alreadygot=%s, allocated=%s"
469 % (idlib.shortnodeid_b2a(peer.peerid),
470 tuple(sorted(alreadygot)), tuple(sorted(allocated))),
471 level=log.NOISY, parent=self._log_parent)
474 if should_add_server(self.preexisting_shares,
476 self.preexisting_shares[s] = peer.peerid
477 if s in self.homeless_shares:
478 self.homeless_shares.remove(s)
480 # the PeerTracker will remember which shares were allocated on
481 # that peer. We just have to remember to use them.
483 self.use_peers.add(peer)
486 not_yet_present = set(shares_to_ask) - set(alreadygot)
487 still_homeless = not_yet_present - set(allocated)
490 # they accepted or already had at least one share, so
491 # progress has been made
492 self.good_query_count += 1
494 self.bad_query_count += 1
498 # In networks with lots of space, this is very unusual and
499 # probably indicates an error. In networks with peers that
500 # are full, it is merely unusual. In networks that are very
501 # full, it is common, and many uploads will fail. In most
502 # cases, this is obviously not fatal, and we'll just use some
505 # some shares are still homeless, keep trying to find them a
506 # home. The ones that were rejected get first priority.
507 self.homeless_shares = (list(still_homeless)
508 + self.homeless_shares)
509 # Since they were unable to accept all of our requests, so it
510 # is safe to assume that asking them again won't help.
512 # if they *were* able to accept everything, they might be
513 # willing to accept even more.
514 put_peer_here.append(peer)
520 class EncryptAnUploadable:
521 """This is a wrapper that takes an IUploadable and provides
522 IEncryptedUploadable."""
523 implements(IEncryptedUploadable)
526 def __init__(self, original, log_parent=None):
527 self.original = IUploadable(original)
528 self._log_number = log_parent
529 self._encryptor = None
530 self._plaintext_hasher = plaintext_hasher()
531 self._plaintext_segment_hasher = None
532 self._plaintext_segment_hashes = []
533 self._encoding_parameters = None
534 self._file_size = None
535 self._ciphertext_bytes_read = 0
538 def set_upload_status(self, upload_status):
539 self._status = IUploadStatus(upload_status)
540 self.original.set_upload_status(upload_status)
542 def log(self, *args, **kwargs):
543 if "facility" not in kwargs:
544 kwargs["facility"] = "upload.encryption"
545 if "parent" not in kwargs:
546 kwargs["parent"] = self._log_number
547 return log.msg(*args, **kwargs)
550 if self._file_size is not None:
551 return defer.succeed(self._file_size)
552 d = self.original.get_size()
554 self._file_size = size
556 self._status.set_size(size)
558 d.addCallback(_got_size)
561 def get_all_encoding_parameters(self):
562 if self._encoding_parameters is not None:
563 return defer.succeed(self._encoding_parameters)
564 d = self.original.get_all_encoding_parameters()
565 def _got(encoding_parameters):
566 (k, happy, n, segsize) = encoding_parameters
567 self._segment_size = segsize # used by segment hashers
568 self._encoding_parameters = encoding_parameters
569 self.log("my encoding parameters: %s" % (encoding_parameters,),
571 return encoding_parameters
575 def _get_encryptor(self):
577 return defer.succeed(self._encryptor)
579 d = self.original.get_encryption_key()
584 storage_index = storage_index_hash(key)
585 assert isinstance(storage_index, str)
586 # There's no point to having the SI be longer than the key, so we
587 # specify that it is truncated to the same 128 bits as the AES key.
588 assert len(storage_index) == 16 # SHA-256 truncated to 128b
589 self._storage_index = storage_index
591 self._status.set_storage_index(storage_index)
596 def get_storage_index(self):
597 d = self._get_encryptor()
598 d.addCallback(lambda res: self._storage_index)
601 def _get_segment_hasher(self):
602 p = self._plaintext_segment_hasher
604 left = self._segment_size - self._plaintext_segment_hashed_bytes
606 p = plaintext_segment_hasher()
607 self._plaintext_segment_hasher = p
608 self._plaintext_segment_hashed_bytes = 0
609 return p, self._segment_size
611 def _update_segment_hash(self, chunk):
613 while offset < len(chunk):
614 p, segment_left = self._get_segment_hasher()
615 chunk_left = len(chunk) - offset
616 this_segment = min(chunk_left, segment_left)
617 p.update(chunk[offset:offset+this_segment])
618 self._plaintext_segment_hashed_bytes += this_segment
620 if self._plaintext_segment_hashed_bytes == self._segment_size:
621 # we've filled this segment
622 self._plaintext_segment_hashes.append(p.digest())
623 self._plaintext_segment_hasher = None
624 self.log("closed hash [%d]: %dB" %
625 (len(self._plaintext_segment_hashes)-1,
626 self._plaintext_segment_hashed_bytes),
628 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
629 segnum=len(self._plaintext_segment_hashes)-1,
630 hash=base32.b2a(p.digest()),
633 offset += this_segment
636 def read_encrypted(self, length, hash_only):
637 # make sure our parameters have been set up first
638 d = self.get_all_encoding_parameters()
640 d.addCallback(lambda ignored: self.get_size())
641 d.addCallback(lambda ignored: self._get_encryptor())
642 # then fetch and encrypt the plaintext. The unusual structure here
643 # (passing a Deferred *into* a function) is needed to avoid
644 # overflowing the stack: Deferreds don't optimize out tail recursion.
645 # We also pass in a list, to which _read_encrypted will append
648 d2 = defer.Deferred()
649 d.addCallback(lambda ignored:
650 self._read_encrypted(length, ciphertext, hash_only, d2))
651 d.addCallback(lambda ignored: d2)
654 def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
656 fire_when_done.callback(ciphertext)
658 # tolerate large length= values without consuming a lot of RAM by
659 # reading just a chunk (say 50kB) at a time. This only really matters
660 # when hash_only==True (i.e. resuming an interrupted upload), since
661 # that's the case where we will be skipping over a lot of data.
662 size = min(remaining, self.CHUNKSIZE)
663 remaining = remaining - size
664 # read a chunk of plaintext..
665 d = defer.maybeDeferred(self.original.read, size)
666 # N.B.: if read() is synchronous, then since everything else is
667 # actually synchronous too, we'd blow the stack unless we stall for a
668 # tick. Once you accept a Deferred from IUploadable.read(), you must
669 # be prepared to have it fire immediately too.
670 d.addCallback(fireEventually)
671 def _good(plaintext):
673 # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
674 ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
675 ciphertext.extend(ct)
676 self._read_encrypted(remaining, ciphertext, hash_only,
679 fire_when_done.errback(why)
684 def _hash_and_encrypt_plaintext(self, data, hash_only):
685 assert isinstance(data, (tuple, list)), type(data)
688 # we use data.pop(0) instead of 'for chunk in data' to save
689 # memory: each chunk is destroyed as soon as we're done with it.
693 self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
695 bytes_processed += len(chunk)
696 self._plaintext_hasher.update(chunk)
697 self._update_segment_hash(chunk)
698 # TODO: we have to encrypt the data (even if hash_only==True)
699 # because pycryptopp's AES-CTR implementation doesn't offer a
700 # way to change the counter value. Once pycryptopp acquires
701 # this ability, change this to simply update the counter
702 # before each call to (hash_only==False) _encryptor.process()
703 ciphertext = self._encryptor.process(chunk)
705 self.log(" skipping encryption", level=log.NOISY)
707 cryptdata.append(ciphertext)
710 self._ciphertext_bytes_read += bytes_processed
712 progress = float(self._ciphertext_bytes_read) / self._file_size
713 self._status.set_progress(1, progress)
717 def get_plaintext_hashtree_leaves(self, first, last, num_segments):
718 # this is currently unused, but will live again when we fix #453
719 if len(self._plaintext_segment_hashes) < num_segments:
720 # close out the last one
721 assert len(self._plaintext_segment_hashes) == num_segments-1
722 p, segment_left = self._get_segment_hasher()
723 self._plaintext_segment_hashes.append(p.digest())
724 del self._plaintext_segment_hasher
725 self.log("closing plaintext leaf hasher, hashed %d bytes" %
726 self._plaintext_segment_hashed_bytes,
728 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
729 segnum=len(self._plaintext_segment_hashes)-1,
730 hash=base32.b2a(p.digest()),
732 assert len(self._plaintext_segment_hashes) == num_segments
733 return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
735 def get_plaintext_hash(self):
736 h = self._plaintext_hasher.digest()
737 return defer.succeed(h)
740 return self.original.close()
743 implements(IUploadStatus)
744 statusid_counter = itertools.count(0)
747 self.storage_index = None
750 self.status = "Not started"
751 self.progress = [0.0, 0.0, 0.0]
754 self.counter = self.statusid_counter.next()
755 self.started = time.time()
757 def get_started(self):
759 def get_storage_index(self):
760 return self.storage_index
763 def using_helper(self):
765 def get_status(self):
767 def get_progress(self):
768 return tuple(self.progress)
769 def get_active(self):
771 def get_results(self):
773 def get_counter(self):
776 def set_storage_index(self, si):
777 self.storage_index = si
778 def set_size(self, size):
780 def set_helper(self, helper):
782 def set_status(self, status):
784 def set_progress(self, which, value):
785 # [0]: chk, [1]: ciphertext, [2]: encode+push
786 self.progress[which] = value
787 def set_active(self, value):
789 def set_results(self, value):
793 peer_selector_class = Tahoe2PeerSelector
795 def __init__(self, storage_broker, secret_holder):
796 # peer_selector needs storage_broker and secret_holder
797 self._storage_broker = storage_broker
798 self._secret_holder = secret_holder
799 self._log_number = self.log("CHKUploader starting", parent=None)
801 self._results = UploadResults()
802 self._storage_index = None
803 self._upload_status = UploadStatus()
804 self._upload_status.set_helper(False)
805 self._upload_status.set_active(True)
806 self._upload_status.set_results(self._results)
808 # locate_all_shareholders() will create the following attribute:
809 # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
811 def log(self, *args, **kwargs):
812 if "parent" not in kwargs:
813 kwargs["parent"] = self._log_number
814 if "facility" not in kwargs:
815 kwargs["facility"] = "tahoe.upload"
816 return log.msg(*args, **kwargs)
818 def start(self, encrypted_uploadable):
819 """Start uploading the file.
821 Returns a Deferred that will fire with the UploadResults instance.
824 self._started = time.time()
825 eu = IEncryptedUploadable(encrypted_uploadable)
826 self.log("starting upload of %s" % eu)
828 eu.set_upload_status(self._upload_status)
829 d = self.start_encrypted(eu)
830 def _done(uploadresults):
831 self._upload_status.set_active(False)
837 """Call this if the upload must be abandoned before it completes.
838 This will tell the shareholders to delete their partial shares. I
839 return a Deferred that fires when these messages have been acked."""
840 if not self._encoder:
841 # how did you call abort() before calling start() ?
842 return defer.succeed(None)
843 return self._encoder.abort()
845 def start_encrypted(self, encrypted):
846 """ Returns a Deferred that will fire with the UploadResults instance. """
847 eu = IEncryptedUploadable(encrypted)
849 started = time.time()
850 self._encoder = e = encode.Encoder(self._log_number,
852 d = e.set_encrypted_uploadable(eu)
853 d.addCallback(self.locate_all_shareholders, started)
854 d.addCallback(self.set_shareholders, e)
855 d.addCallback(lambda res: e.start())
856 d.addCallback(self._encrypted_done)
859 def locate_all_shareholders(self, encoder, started):
860 peer_selection_started = now = time.time()
861 self._storage_index_elapsed = now - started
862 storage_broker = self._storage_broker
863 secret_holder = self._secret_holder
864 storage_index = encoder.get_param("storage_index")
865 self._storage_index = storage_index
866 upload_id = si_b2a(storage_index)[:5]
867 self.log("using storage index %s" % upload_id)
868 peer_selector = self.peer_selector_class(upload_id, self._log_number,
871 share_size = encoder.get_param("share_size")
872 block_size = encoder.get_param("block_size")
873 num_segments = encoder.get_param("num_segments")
874 k,desired,n = encoder.get_param("share_counts")
876 self._peer_selection_started = time.time()
877 d = peer_selector.get_shareholders(storage_broker, secret_holder,
879 share_size, block_size,
880 num_segments, n, desired)
882 self._peer_selection_elapsed = time.time() - peer_selection_started
887 def set_shareholders(self, (used_peers, already_peers), encoder):
889 @param used_peers: a sequence of PeerTracker objects
890 @paran already_peers: a dict mapping sharenum to a peerid that
891 claims to already have this share
893 self.log("_send_shares, used_peers is %s" % (used_peers,))
894 # record already-present shares in self._results
895 self._results.preexisting_shares = len(already_peers)
897 self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
898 for peer in used_peers:
899 assert isinstance(peer, PeerTracker)
901 servermap = already_peers.copy()
902 for peer in used_peers:
903 buckets.update(peer.buckets)
904 for shnum in peer.buckets:
905 self._peer_trackers[shnum] = peer
906 servermap[shnum] = peer.peerid
907 assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
908 encoder.set_shareholders(buckets, servermap)
910 def _encrypted_done(self, verifycap):
911 """ Returns a Deferred that will fire with the UploadResults instance. """
913 for shnum in self._encoder.get_shares_placed():
914 peer_tracker = self._peer_trackers[shnum]
915 peerid = peer_tracker.peerid
916 r.sharemap.add(shnum, peerid)
917 r.servermap.add(peerid, shnum)
918 r.pushed_shares = len(self._encoder.get_shares_placed())
920 r.file_size = self._encoder.file_size
921 r.timings["total"] = now - self._started
922 r.timings["storage_index"] = self._storage_index_elapsed
923 r.timings["peer_selection"] = self._peer_selection_elapsed
924 r.timings.update(self._encoder.get_times())
925 r.uri_extension_data = self._encoder.get_uri_extension_data()
926 r.verifycapstr = verifycap.to_string()
929 def get_upload_status(self):
930 return self._upload_status
932 def read_this_many_bytes(uploadable, size, prepend_data=[]):
934 return defer.succeed([])
935 d = uploadable.read(size)
937 assert isinstance(data, list)
938 bytes = sum([len(piece) for piece in data])
941 remaining = size - bytes
943 return read_this_many_bytes(uploadable, remaining,
945 return prepend_data + data
949 class LiteralUploader:
952 self._results = UploadResults()
953 self._status = s = UploadStatus()
954 s.set_storage_index(None)
956 s.set_progress(0, 1.0)
958 s.set_results(self._results)
960 def start(self, uploadable):
961 uploadable = IUploadable(uploadable)
962 d = uploadable.get_size()
965 self._status.set_size(size)
966 self._results.file_size = size
967 return read_this_many_bytes(uploadable, size)
968 d.addCallback(_got_size)
969 d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
970 d.addCallback(lambda u: u.to_string())
971 d.addCallback(self._build_results)
974 def _build_results(self, uri):
975 self._results.uri = uri
976 self._status.set_status("Finished")
977 self._status.set_progress(1, 1.0)
978 self._status.set_progress(2, 1.0)
984 def get_upload_status(self):
987 class RemoteEncryptedUploadable(Referenceable):
988 implements(RIEncryptedUploadable)
990 def __init__(self, encrypted_uploadable, upload_status):
991 self._eu = IEncryptedUploadable(encrypted_uploadable)
994 self._status = IUploadStatus(upload_status)
995 # we are responsible for updating the status string while we run, and
996 # for setting the ciphertext-fetch progress.
1000 if self._size is not None:
1001 return defer.succeed(self._size)
1002 d = self._eu.get_size()
1003 def _got_size(size):
1006 d.addCallback(_got_size)
1009 def remote_get_size(self):
1010 return self.get_size()
1011 def remote_get_all_encoding_parameters(self):
1012 return self._eu.get_all_encoding_parameters()
1014 def _read_encrypted(self, length, hash_only):
1015 d = self._eu.read_encrypted(length, hash_only)
1018 self._offset += length
1020 size = sum([len(data) for data in strings])
1021 self._offset += size
1023 d.addCallback(_read)
1026 def remote_read_encrypted(self, offset, length):
1027 # we don't support seek backwards, but we allow skipping forwards
1028 precondition(offset >= 0, offset)
1029 precondition(length >= 0, length)
1030 lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1032 precondition(offset >= self._offset, offset, self._offset)
1033 if offset > self._offset:
1034 # read the data from disk anyways, to build up the hash tree
1035 skip = offset - self._offset
1036 log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1037 (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1038 d = self._read_encrypted(skip, hash_only=True)
1040 d = defer.succeed(None)
1042 def _at_correct_offset(res):
1043 assert offset == self._offset, "%d != %d" % (offset, self._offset)
1044 return self._read_encrypted(length, hash_only=False)
1045 d.addCallback(_at_correct_offset)
1048 size = sum([len(data) for data in strings])
1049 self._bytes_sent += size
1051 d.addCallback(_read)
1054 def remote_close(self):
1055 return self._eu.close()
1058 class AssistedUploader:
1060 def __init__(self, helper):
1061 self._helper = helper
1062 self._log_number = log.msg("AssistedUploader starting")
1063 self._storage_index = None
1064 self._upload_status = s = UploadStatus()
1068 def log(self, *args, **kwargs):
1069 if "parent" not in kwargs:
1070 kwargs["parent"] = self._log_number
1071 return log.msg(*args, **kwargs)
1073 def start(self, encrypted_uploadable, storage_index):
1074 """Start uploading the file.
1076 Returns a Deferred that will fire with the UploadResults instance.
1078 precondition(isinstance(storage_index, str), storage_index)
1079 self._started = time.time()
1080 eu = IEncryptedUploadable(encrypted_uploadable)
1081 eu.set_upload_status(self._upload_status)
1082 self._encuploadable = eu
1083 self._storage_index = storage_index
1085 d.addCallback(self._got_size)
1086 d.addCallback(lambda res: eu.get_all_encoding_parameters())
1087 d.addCallback(self._got_all_encoding_parameters)
1088 d.addCallback(self._contact_helper)
1089 d.addCallback(self._build_verifycap)
1091 self._upload_status.set_active(False)
1096 def _got_size(self, size):
1098 self._upload_status.set_size(size)
1100 def _got_all_encoding_parameters(self, params):
1101 k, happy, n, segment_size = params
1102 # stash these for URI generation later
1103 self._needed_shares = k
1104 self._total_shares = n
1105 self._segment_size = segment_size
1107 def _contact_helper(self, res):
1108 now = self._time_contacting_helper_start = time.time()
1109 self._storage_index_elapsed = now - self._started
1110 self.log(format="contacting helper for SI %(si)s..",
1111 si=si_b2a(self._storage_index))
1112 self._upload_status.set_status("Contacting Helper")
1113 d = self._helper.callRemote("upload_chk", self._storage_index)
1114 d.addCallback(self._contacted_helper)
1117 def _contacted_helper(self, (upload_results, upload_helper)):
1119 elapsed = now - self._time_contacting_helper_start
1120 self._elapsed_time_contacting_helper = elapsed
1122 self.log("helper says we need to upload")
1123 self._upload_status.set_status("Uploading Ciphertext")
1124 # we need to upload the file
1125 reu = RemoteEncryptedUploadable(self._encuploadable,
1126 self._upload_status)
1127 # let it pre-compute the size for progress purposes
1129 d.addCallback(lambda ignored:
1130 upload_helper.callRemote("upload", reu))
1131 # this Deferred will fire with the upload results
1133 self.log("helper says file is already uploaded")
1134 self._upload_status.set_progress(1, 1.0)
1135 self._upload_status.set_results(upload_results)
1136 return upload_results
1138 def _convert_old_upload_results(self, upload_results):
1139 # pre-1.3.0 helpers return upload results which contain a mapping
1140 # from shnum to a single human-readable string, containing things
1141 # like "Found on [x],[y],[z]" (for healthy files that were already in
1142 # the grid), "Found on [x]" (for files that needed upload but which
1143 # discovered pre-existing shares), and "Placed on [x]" (for newly
1144 # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1145 # set of binary serverid strings.
1147 # the old results are too hard to deal with (they don't even contain
1148 # as much information as the new results, since the nodeids are
1149 # abbreviated), so if we detect old results, just clobber them.
1151 sharemap = upload_results.sharemap
1152 if str in [type(v) for v in sharemap.values()]:
1153 upload_results.sharemap = None
1155 def _build_verifycap(self, upload_results):
1156 self.log("upload finished, building readcap")
1157 self._convert_old_upload_results(upload_results)
1158 self._upload_status.set_status("Building Readcap")
1160 assert r.uri_extension_data["needed_shares"] == self._needed_shares
1161 assert r.uri_extension_data["total_shares"] == self._total_shares
1162 assert r.uri_extension_data["segment_size"] == self._segment_size
1163 assert r.uri_extension_data["size"] == self._size
1164 r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1165 uri_extension_hash=r.uri_extension_hash,
1166 needed_shares=self._needed_shares,
1167 total_shares=self._total_shares, size=self._size
1170 r.file_size = self._size
1171 r.timings["storage_index"] = self._storage_index_elapsed
1172 r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1173 if "total" in r.timings:
1174 r.timings["helper_total"] = r.timings["total"]
1175 r.timings["total"] = now - self._started
1176 self._upload_status.set_status("Finished")
1177 self._upload_status.set_results(r)
1180 def get_upload_status(self):
1181 return self._upload_status
1183 class BaseUploadable:
1184 default_max_segment_size = 128*KiB # overridden by max_segment_size
1185 default_encoding_param_k = 3 # overridden by encoding_parameters
1186 default_encoding_param_happy = 7
1187 default_encoding_param_n = 10
1189 max_segment_size = None
1190 encoding_param_k = None
1191 encoding_param_happy = None
1192 encoding_param_n = None
1194 _all_encoding_parameters = None
1197 def set_upload_status(self, upload_status):
1198 self._status = IUploadStatus(upload_status)
1200 def set_default_encoding_parameters(self, default_params):
1201 assert isinstance(default_params, dict)
1202 for k,v in default_params.items():
1203 precondition(isinstance(k, str), k, v)
1204 precondition(isinstance(v, int), k, v)
1205 if "k" in default_params:
1206 self.default_encoding_param_k = default_params["k"]
1207 if "happy" in default_params:
1208 self.default_encoding_param_happy = default_params["happy"]
1209 if "n" in default_params:
1210 self.default_encoding_param_n = default_params["n"]
1211 if "max_segment_size" in default_params:
1212 self.default_max_segment_size = default_params["max_segment_size"]
1214 def get_all_encoding_parameters(self):
1215 if self._all_encoding_parameters:
1216 return defer.succeed(self._all_encoding_parameters)
1218 max_segsize = self.max_segment_size or self.default_max_segment_size
1219 k = self.encoding_param_k or self.default_encoding_param_k
1220 happy = self.encoding_param_happy or self.default_encoding_param_happy
1221 n = self.encoding_param_n or self.default_encoding_param_n
1224 def _got_size(file_size):
1225 # for small files, shrink the segment size to avoid wasting space
1226 segsize = min(max_segsize, file_size)
1227 # this must be a multiple of 'required_shares'==k
1228 segsize = mathutil.next_multiple(segsize, k)
1229 encoding_parameters = (k, happy, n, segsize)
1230 self._all_encoding_parameters = encoding_parameters
1231 return encoding_parameters
1232 d.addCallback(_got_size)
1235 class FileHandle(BaseUploadable):
1236 implements(IUploadable)
1238 def __init__(self, filehandle, convergence):
1240 Upload the data from the filehandle. If convergence is None then a
1241 random encryption key will be used, else the plaintext will be hashed,
1242 then the hash will be hashed together with the string in the
1243 "convergence" argument to form the encryption key.
1245 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1246 self._filehandle = filehandle
1248 self.convergence = convergence
1251 def _get_encryption_key_convergent(self):
1252 if self._key is not None:
1253 return defer.succeed(self._key)
1256 # that sets self._size as a side-effect
1257 d.addCallback(lambda size: self.get_all_encoding_parameters())
1259 k, happy, n, segsize = params
1260 f = self._filehandle
1261 enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1266 data = f.read(BLOCKSIZE)
1269 enckey_hasher.update(data)
1270 # TODO: setting progress in a non-yielding loop is kind of
1271 # pointless, but I'm anticipating (perhaps prematurely) the
1272 # day when we use a slowjob or twisted's CooperatorService to
1273 # make this yield time to other jobs.
1274 bytes_read += len(data)
1276 self._status.set_progress(0, float(bytes_read)/self._size)
1278 self._key = enckey_hasher.digest()
1280 self._status.set_progress(0, 1.0)
1281 assert len(self._key) == 16
1286 def _get_encryption_key_random(self):
1287 if self._key is None:
1288 self._key = os.urandom(16)
1289 return defer.succeed(self._key)
1291 def get_encryption_key(self):
1292 if self.convergence is not None:
1293 return self._get_encryption_key_convergent()
1295 return self._get_encryption_key_random()
1298 if self._size is not None:
1299 return defer.succeed(self._size)
1300 self._filehandle.seek(0,2)
1301 size = self._filehandle.tell()
1303 self._filehandle.seek(0)
1304 return defer.succeed(size)
1306 def read(self, length):
1307 return defer.succeed([self._filehandle.read(length)])
1310 # the originator of the filehandle reserves the right to close it
1313 class FileName(FileHandle):
1314 def __init__(self, filename, convergence):
1316 Upload the data from the filename. If convergence is None then a
1317 random encryption key will be used, else the plaintext will be hashed,
1318 then the hash will be hashed together with the string in the
1319 "convergence" argument to form the encryption key.
1321 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1322 FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1324 FileHandle.close(self)
1325 self._filehandle.close()
1327 class Data(FileHandle):
1328 def __init__(self, data, convergence):
1330 Upload the data from the data argument. If convergence is None then a
1331 random encryption key will be used, else the plaintext will be hashed,
1332 then the hash will be hashed together with the string in the
1333 "convergence" argument to form the encryption key.
1335 assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1336 FileHandle.__init__(self, StringIO(data), convergence=convergence)
1338 class Uploader(service.MultiService, log.PrefixingLogMixin):
1339 """I am a service that allows file uploading. I am a service-child of the
1342 implements(IUploader)
1344 URI_LIT_SIZE_THRESHOLD = 55
1346 def __init__(self, helper_furl=None, stats_provider=None):
1347 self._helper_furl = helper_furl
1348 self.stats_provider = stats_provider
1350 self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1351 log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1352 service.MultiService.__init__(self)
1354 def startService(self):
1355 service.MultiService.startService(self)
1356 if self._helper_furl:
1357 self.parent.tub.connectTo(self._helper_furl,
1360 def _got_helper(self, helper):
1361 self.log("got helper connection, getting versions")
1362 default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1364 "application-version": "unknown: no get_version()",
1366 d = add_version_to_remote_reference(helper, default)
1367 d.addCallback(self._got_versioned_helper)
1369 def _got_versioned_helper(self, helper):
1370 needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1371 if needed not in helper.version:
1372 raise InsufficientVersionError(needed, helper.version)
1373 self._helper = helper
1374 helper.notifyOnDisconnect(self._lost_helper)
1376 def _lost_helper(self):
1379 def get_helper_info(self):
1380 # return a tuple of (helper_furl_or_None, connected_bool)
1381 return (self._helper_furl, bool(self._helper))
1384 def upload(self, uploadable, history=None):
1386 Returns a Deferred that will fire with the UploadResults instance.
1391 uploadable = IUploadable(uploadable)
1392 d = uploadable.get_size()
1393 def _got_size(size):
1394 default_params = self.parent.get_encoding_parameters()
1395 precondition(isinstance(default_params, dict), default_params)
1396 precondition("max_segment_size" in default_params, default_params)
1397 uploadable.set_default_encoding_parameters(default_params)
1399 if self.stats_provider:
1400 self.stats_provider.count('uploader.files_uploaded', 1)
1401 self.stats_provider.count('uploader.bytes_uploaded', size)
1403 if size <= self.URI_LIT_SIZE_THRESHOLD:
1404 uploader = LiteralUploader()
1405 return uploader.start(uploadable)
1407 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1408 d2 = defer.succeed(None)
1410 uploader = AssistedUploader(self._helper)
1411 d2.addCallback(lambda x: eu.get_storage_index())
1412 d2.addCallback(lambda si: uploader.start(eu, si))
1414 storage_broker = self.parent.get_storage_broker()
1415 secret_holder = self.parent._secret_holder
1416 uploader = CHKUploader(storage_broker, secret_holder)
1417 d2.addCallback(lambda x: uploader.start(eu))
1419 self._all_uploads[uploader] = None
1421 history.add_upload(uploader.get_upload_status())
1422 def turn_verifycap_into_read_cap(uploadresults):
1423 # Generate the uri from the verifycap plus the key.
1424 d3 = uploadable.get_encryption_key()
1425 def put_readcap_into_results(key):
1426 v = uri.from_string(uploadresults.verifycapstr)
1427 r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1428 uploadresults.uri = r.to_string()
1429 return uploadresults
1430 d3.addCallback(put_readcap_into_results)
1432 d2.addCallback(turn_verifycap_into_read_cap)
1434 d.addCallback(_got_size)