from twisted.python import failure
from twisted.internet import defer
from twisted.application import service
-from foolscap import Referenceable, Copyable, RemoteCopy
-from foolscap import eventual
+from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
from allmydata.util.hashutil import file_renewal_secret_hash, \
file_cancel_secret_hash, bucket_renewal_secret_hash, \
bucket_cancel_secret_hash, plaintext_hasher, \
storage_index_hash, plaintext_segment_hasher, convergence_hasher
-from allmydata import storage, hashtree, uri
+from allmydata import hashtree, uri
+from allmydata.storage.server import si_b2a
from allmydata.immutable import encode
-from allmydata.util import base32, idlib, log, mathutil
+from allmydata.util import base32, dictutil, idlib, log, mathutil
+from allmydata.util.happinessutil import servers_of_happiness, \
+ shares_by_server, merge_servers, \
+ failure_message
from allmydata.util.assertutil import precondition
-from allmydata.util.rrefutil import get_versioned_remote_reference
+from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
- NotEnoughSharesError, InsufficientVersionError
+ NoServersError, InsufficientVersionError, UploadUnhappinessError, \
+ DEFAULT_MAX_SEGMENT_SIZE
from allmydata.immutable import layout
from pycryptopp.cipher.aes import AES
from cStringIO import StringIO
-KiB=1024
-MiB=1024*KiB
-GiB=1024*MiB
-TiB=1024*GiB
-PiB=1024*TiB
-
-class HaveAllPeersError(Exception):
- # we use this to jump out of the loop
- pass
-
# this wants to live in storage, not here
class TooFullError(Exception):
pass
typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
copytype = typeToCopy
+ # also, think twice about changing the shape of any existing attribute,
+ # because instances of this class are sent from the helper to its client,
+ # so changing this may break compatibility. Consider adding new fields
+ # instead of modifying existing ones.
+
def __init__(self):
self.timings = {} # dict of name to number of seconds
- self.sharemap = {} # dict of shnum to placement string
- self.servermap = {} # dict of peerid to set(shnums)
+ self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
+ self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
self.file_size = None
self.ciphertext_fetched = None # how much the helper fetched
self.uri = None
# TODO: actual extensions are closer to 419 bytes, so we can probably lower
# this.
-class PeerTracker:
- def __init__(self, peerid, storage_server,
+def pretty_print_shnum_to_servers(s):
+ return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
+
+class ServerTracker:
+ def __init__(self, server,
sharesize, blocksize, num_segments, num_share_hashes,
storage_index,
bucket_renewal_secret, bucket_cancel_secret):
- precondition(isinstance(peerid, str), peerid)
- precondition(len(peerid) == 20, peerid)
- self.peerid = peerid
- self._storageserver = storage_server # to an RIStorageServer
+ self._server = server
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
self.sharesize = sharesize
- self.allocated_size = layout.allocated_size(sharesize,
- num_segments,
- num_share_hashes,
- EXTENSION_SIZE)
+ wbp = layout.make_write_bucket_proxy(None, None, sharesize,
+ blocksize, num_segments,
+ num_share_hashes,
+ EXTENSION_SIZE)
+ self.wbp_class = wbp.__class__ # to create more of them
+ self.allocated_size = wbp.get_allocated_size()
self.blocksize = blocksize
self.num_segments = num_segments
self.num_share_hashes = num_share_hashes
self.cancel_secret = bucket_cancel_secret
def __repr__(self):
- return ("<PeerTracker for peer %s and SI %s>"
- % (idlib.shortnodeid_b2a(self.peerid),
- storage.si_b2a(self.storage_index)[:5]))
+ return ("<ServerTracker for server %s and SI %s>"
+ % (self._server.get_name(), si_b2a(self.storage_index)[:5]))
+
+ def get_serverid(self):
+ return self._server.get_serverid()
+ def get_name(self):
+ return self._server.get_name()
def query(self, sharenums):
- d = self._storageserver.callRemote("allocate_buckets",
- self.storage_index,
- self.renew_secret,
- self.cancel_secret,
- sharenums,
- self.allocated_size,
- canary=Referenceable())
+ rref = self._server.get_rref()
+ d = rref.callRemote("allocate_buckets",
+ self.storage_index,
+ self.renew_secret,
+ self.cancel_secret,
+ sharenums,
+ self.allocated_size,
+ canary=Referenceable())
d.addCallback(self._got_reply)
return d
+ def ask_about_existing_shares(self):
+ rref = self._server.get_rref()
+ return rref.callRemote("get_buckets", self.storage_index)
+
def _got_reply(self, (alreadygot, buckets)):
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
b = {}
for sharenum, rref in buckets.iteritems():
- bp = layout.WriteBucketProxy(rref, self.sharesize,
- self.blocksize,
- self.num_segments,
- self.num_share_hashes,
- EXTENSION_SIZE,
- self.peerid)
+ bp = self.wbp_class(rref, self._server, self.sharesize,
+ self.blocksize,
+ self.num_segments,
+ self.num_share_hashes,
+ EXTENSION_SIZE)
b[sharenum] = bp
self.buckets.update(b)
return (alreadygot, set(b.keys()))
-class Tahoe2PeerSelector:
+
+ def abort(self):
+ """
+ I abort the remote bucket writers for all shares. This is a good idea
+ to conserve space on the storage server.
+ """
+ self.abort_some_buckets(self.buckets.keys())
+
+ def abort_some_buckets(self, sharenums):
+ """
+ I abort the remote bucket writers for the share numbers in sharenums.
+ """
+ for sharenum in sharenums:
+ if sharenum in self.buckets:
+ self.buckets[sharenum].abort()
+ del self.buckets[sharenum]
+
+
+def str_shareloc(shnum, bucketwriter):
+ return "%s: %s" % (shnum, bucketwriter.get_servername(),)
+
+class Tahoe2ServerSelector(log.PrefixingLogMixin):
def __init__(self, upload_id, logparent=None, upload_status=None):
self.upload_id = upload_id
self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
+ # Servers that are working normally, but full.
+ self.full_count = 0
self.error_count = 0
- self.num_peers_contacted = 0
+ self.num_servers_contacted = 0
self.last_failure_msg = None
self._status = IUploadStatus(upload_status)
- self._log_parent = log.msg("%s starting" % self, parent=logparent)
+ log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
+ self.log("starting", level=log.OPERATIONAL)
def __repr__(self):
- return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
+ return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
- def get_shareholders(self, client,
+ def get_shareholders(self, storage_broker, secret_holder,
storage_index, share_size, block_size,
- num_segments, total_shares, shares_of_happiness):
+ num_segments, total_shares, needed_shares,
+ servers_of_happiness):
"""
- @return: (used_peers, already_peers), where used_peers is a set of
- PeerTracker instances that have agreed to hold some shares
- for us (the shnum is stashed inside the PeerTracker),
- and already_peers is a dict mapping shnum to a peer
- which claims to already have the share.
+ @return: (upload_trackers, already_serverids), where upload_trackers
+ is a set of ServerTracker instances that have agreed to hold
+ some shares for us (the shareids are stashed inside the
+ ServerTracker), and already_serverids is a dict mapping
+ shnum to a set of serverids for servers which claim to
+ already have the share.
"""
if self._status:
- self._status.set_status("Contacting Peers..")
+ self._status.set_status("Contacting Servers..")
self.total_shares = total_shares
- self.shares_of_happiness = shares_of_happiness
+ self.servers_of_happiness = servers_of_happiness
+ self.needed_shares = needed_shares
- self.homeless_shares = range(total_shares)
- # self.uncontacted_peers = list() # peers we haven't asked yet
- self.contacted_peers = [] # peers worth asking again
- self.contacted_peers2 = [] # peers that we have asked again
- self._started_second_pass = False
- self.use_peers = set() # PeerTrackers that have shares assigned to them
- self.preexisting_shares = {} # sharenum -> peerid holding the share
+ self.homeless_shares = set(range(total_shares))
+ self.use_trackers = set() # ServerTrackers that have shares assigned
+ # to them
+ self.preexisting_shares = {} # shareid => set(serverids) holding shareid
- peers = client.get_permuted_peers("storage", storage_index)
- if not peers:
- raise NotEnoughSharesError("client gave us zero peers")
+ # These servers have shares -- any shares -- for our SI. We keep
+ # track of these to write an error message with them later.
+ self.serverids_with_shares = set()
# this needed_hashes computation should mirror
# Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
# figure out how much space to ask for
- allocated_size = layout.allocated_size(share_size,
- num_segments,
- num_share_hashes,
- EXTENSION_SIZE)
- # filter the list of peers according to which ones can accomodate
- # this request. This excludes older peers (which used a 4-byte size
+ wbp = layout.make_write_bucket_proxy(None, None,
+ share_size, 0, num_segments,
+ num_share_hashes, EXTENSION_SIZE)
+ allocated_size = wbp.get_allocated_size()
+ all_servers = storage_broker.get_servers_for_psi(storage_index)
+ if not all_servers:
+ raise NoServersError("client gave us zero servers")
+
+ # filter the list of servers according to which ones can accomodate
+ # this request. This excludes older servers (which used a 4-byte size
# field) from getting large shares (for files larger than about
# 12GiB). See #439 for details.
- def _get_maxsize(peer):
- (peerid, conn) = peer
- v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
+ def _get_maxsize(server):
+ v0 = server.get_rref().version
+ v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
return v1["maximum-immutable-share-size"]
- peers = [peer for peer in peers
- if _get_maxsize(peer) >= allocated_size]
- if not peers:
- raise NotEnoughSharesError("no peers could accept an allocated_size of %d" % allocated_size)
+ writeable_servers = [server for server in all_servers
+ if _get_maxsize(server) >= allocated_size]
+ readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)
# decide upon the renewal/cancel secrets, to include them in the
- # allocat_buckets query.
- client_renewal_secret = client.get_renewal_secret()
- client_cancel_secret = client.get_cancel_secret()
+ # allocate_buckets query.
+ client_renewal_secret = secret_holder.get_renewal_secret()
+ client_cancel_secret = secret_holder.get_cancel_secret()
file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
storage_index)
file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
storage_index)
+ def _make_trackers(servers):
+ trackers = []
+ for s in servers:
+ seed = s.get_lease_seed()
+ renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
+ cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
+ st = ServerTracker(s,
+ share_size, block_size,
+ num_segments, num_share_hashes,
+ storage_index,
+ renew, cancel)
+ trackers.append(st)
+ return trackers
+
+ # We assign each servers/trackers into one three lists. They all
+ # start in the "first pass" list. During the first pass, as we ask
+ # each one to hold a share, we move their tracker to the "second
+ # pass" list, until the first-pass list is empty. Then during the
+ # second pass, as we ask each to hold more shares, we move their
+ # tracker to the "next pass" list, until the second-pass list is
+ # empty. Then we move everybody from the next-pass list back to the
+ # second-pass list and repeat the "second" pass (really the third,
+ # fourth, etc pass), until all shares are assigned, or we've run out
+ # of potential servers.
+ self.first_pass_trackers = _make_trackers(writeable_servers)
+ self.second_pass_trackers = [] # servers worth asking again
+ self.next_pass_trackers = [] # servers that we have asked again
+ self._started_second_pass = False
+
+ # We don't try to allocate shares to these servers, since they've
+ # said that they're incapable of storing shares of the size that we'd
+ # want to store. We ask them about existing shares for this storage
+ # index, which we want to know about for accurate
+ # servers_of_happiness accounting, then we forget about them.
+ readonly_trackers = _make_trackers(readonly_servers)
+
+ # We now ask servers that can't hold any new shares about existing
+ # shares that they might have for our SI. Once this is done, we
+ # start placing the shares that we haven't already accounted
+ # for.
+ ds = []
+ if self._status and readonly_trackers:
+ self._status.set_status("Contacting readonly servers to find "
+ "any existing shares")
+ for tracker in readonly_trackers:
+ assert isinstance(tracker, ServerTracker)
+ d = tracker.ask_about_existing_shares()
+ d.addBoth(self._handle_existing_response, tracker)
+ ds.append(d)
+ self.num_servers_contacted += 1
+ self.query_count += 1
+ self.log("asking server %s for any existing shares" %
+ (tracker.get_name(),), level=log.NOISY)
+ dl = defer.DeferredList(ds)
+ dl.addCallback(lambda ign: self._loop())
+ return dl
+
+
+ def _handle_existing_response(self, res, tracker):
+ """
+ I handle responses to the queries sent by
+ Tahoe2ServerSelector._existing_shares.
+ """
+ serverid = tracker.get_serverid()
+ if isinstance(res, failure.Failure):
+ self.log("%s got error during existing shares check: %s"
+ % (tracker.get_name(), res), level=log.UNUSUAL)
+ self.error_count += 1
+ self.bad_query_count += 1
+ else:
+ buckets = res
+ if buckets:
+ self.serverids_with_shares.add(serverid)
+ self.log("response to get_buckets() from server %s: alreadygot=%s"
+ % (tracker.get_name(), tuple(sorted(buckets))),
+ level=log.NOISY)
+ for bucket in buckets:
+ self.preexisting_shares.setdefault(bucket, set()).add(serverid)
+ self.homeless_shares.discard(bucket)
+ self.full_count += 1
+ self.bad_query_count += 1
+
+
+ def _get_progress_message(self):
+ if not self.homeless_shares:
+ msg = "placed all %d shares, " % (self.total_shares)
+ else:
+ msg = ("placed %d shares out of %d total (%d homeless), " %
+ (self.total_shares - len(self.homeless_shares),
+ self.total_shares,
+ len(self.homeless_shares)))
+ return (msg + "want to place shares on at least %d servers such that "
+ "any %d of them have enough shares to recover the file, "
+ "sent %d queries to %d servers, "
+ "%d queries placed some shares, %d placed none "
+ "(of which %d placed none due to the server being"
+ " full and %d placed none due to an error)" %
+ (self.servers_of_happiness, self.needed_shares,
+ self.query_count, self.num_servers_contacted,
+ self.good_query_count, self.bad_query_count,
+ self.full_count, self.error_count))
- trackers = [ PeerTracker(peerid, conn,
- share_size, block_size,
- num_segments, num_share_hashes,
- storage_index,
- bucket_renewal_secret_hash(file_renewal_secret,
- peerid),
- bucket_cancel_secret_hash(file_cancel_secret,
- peerid),
- )
- for (peerid, conn) in peers ]
- self.uncontacted_peers = trackers
-
- d = defer.maybeDeferred(self._loop)
- return d
def _loop(self):
if not self.homeless_shares:
- # all done
- msg = ("placed all %d shares, "
- "sent %d queries to %d peers, "
- "%d queries placed some shares, %d placed none, "
- "got %d errors" %
- (self.total_shares,
- self.query_count, self.num_peers_contacted,
- self.good_query_count, self.bad_query_count,
- self.error_count))
- log.msg("peer selection successful for %s: %s" % (self, msg),
- parent=self._log_parent)
- return (self.use_peers, self.preexisting_shares)
-
- if self.uncontacted_peers:
- peer = self.uncontacted_peers.pop(0)
- # TODO: don't pre-convert all peerids to PeerTrackers
- assert isinstance(peer, PeerTracker)
-
- shares_to_ask = set([self.homeless_shares.pop(0)])
+ merged = merge_servers(self.preexisting_shares, self.use_trackers)
+ effective_happiness = servers_of_happiness(merged)
+ if self.servers_of_happiness <= effective_happiness:
+ msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
+ "self.use_trackers: %s, self.preexisting_shares: %s") \
+ % (self, self._get_progress_message(),
+ pretty_print_shnum_to_servers(merged),
+ [', '.join([str_shareloc(k,v)
+ for k,v in st.buckets.iteritems()])
+ for st in self.use_trackers],
+ pretty_print_shnum_to_servers(self.preexisting_shares))
+ self.log(msg, level=log.OPERATIONAL)
+ return (self.use_trackers, self.preexisting_shares)
+ else:
+ # We're not okay right now, but maybe we can fix it by
+ # redistributing some shares. In cases where one or two
+ # servers has, before the upload, all or most of the
+ # shares for a given SI, this can work by allowing _loop
+ # a chance to spread those out over the other servers,
+ delta = self.servers_of_happiness - effective_happiness
+ shares = shares_by_server(self.preexisting_shares)
+ # Each server in shares maps to a set of shares stored on it.
+ # Since we want to keep at least one share on each server
+ # that has one (otherwise we'd only be making
+ # the situation worse by removing distinct servers),
+ # each server has len(its shares) - 1 to spread around.
+ shares_to_spread = sum([len(list(sharelist)) - 1
+ for (server, sharelist)
+ in shares.items()])
+ if delta <= len(self.first_pass_trackers) and \
+ shares_to_spread >= delta:
+ items = shares.items()
+ while len(self.homeless_shares) < delta:
+ # Loop through the allocated shares, removing
+ # one from each server that has more than one
+ # and putting it back into self.homeless_shares
+ # until we've done this delta times.
+ server, sharelist = items.pop()
+ if len(sharelist) > 1:
+ share = sharelist.pop()
+ self.homeless_shares.add(share)
+ self.preexisting_shares[share].remove(server)
+ if not self.preexisting_shares[share]:
+ del self.preexisting_shares[share]
+ items.append((server, sharelist))
+ for writer in self.use_trackers:
+ writer.abort_some_buckets(self.homeless_shares)
+ return self._loop()
+ else:
+ # Redistribution won't help us; fail.
+ server_count = len(self.serverids_with_shares)
+ failmsg = failure_message(server_count,
+ self.needed_shares,
+ self.servers_of_happiness,
+ effective_happiness)
+ servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
+ servmsg = servmsgtempl % (
+ self,
+ failmsg,
+ self._get_progress_message(),
+ pretty_print_shnum_to_servers(merged)
+ )
+ self.log(servmsg, level=log.INFREQUENT)
+ return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
+
+ if self.first_pass_trackers:
+ tracker = self.first_pass_trackers.pop(0)
+ # TODO: don't pre-convert all serverids to ServerTrackers
+ assert isinstance(tracker, ServerTracker)
+
+ shares_to_ask = set(sorted(self.homeless_shares)[:1])
+ self.homeless_shares -= shares_to_ask
self.query_count += 1
- self.num_peers_contacted += 1
+ self.num_servers_contacted += 1
if self._status:
- self._status.set_status("Contacting Peers [%s] (first query),"
+ self._status.set_status("Contacting Servers [%s] (first query),"
" %d shares left.."
- % (idlib.shortnodeid_b2a(peer.peerid),
+ % (tracker.get_name(),
len(self.homeless_shares)))
- d = peer.query(shares_to_ask)
- d.addBoth(self._got_response, peer, shares_to_ask,
- self.contacted_peers)
+ d = tracker.query(shares_to_ask)
+ d.addBoth(self._got_response, tracker, shares_to_ask,
+ self.second_pass_trackers)
return d
- elif self.contacted_peers:
- # ask a peer that we've already asked.
+ elif self.second_pass_trackers:
+ # ask a server that we've already asked.
if not self._started_second_pass:
- log.msg("starting second pass", parent=self._log_parent,
+ self.log("starting second pass",
level=log.NOISY)
self._started_second_pass = True
num_shares = mathutil.div_ceil(len(self.homeless_shares),
- len(self.contacted_peers))
- peer = self.contacted_peers.pop(0)
- shares_to_ask = set(self.homeless_shares[:num_shares])
- self.homeless_shares[:num_shares] = []
+ len(self.second_pass_trackers))
+ tracker = self.second_pass_trackers.pop(0)
+ shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
+ self.homeless_shares -= shares_to_ask
self.query_count += 1
if self._status:
- self._status.set_status("Contacting Peers [%s] (second query),"
+ self._status.set_status("Contacting Servers [%s] (second query),"
" %d shares left.."
- % (idlib.shortnodeid_b2a(peer.peerid),
+ % (tracker.get_name(),
len(self.homeless_shares)))
- d = peer.query(shares_to_ask)
- d.addBoth(self._got_response, peer, shares_to_ask,
- self.contacted_peers2)
+ d = tracker.query(shares_to_ask)
+ d.addBoth(self._got_response, tracker, shares_to_ask,
+ self.next_pass_trackers)
return d
- elif self.contacted_peers2:
+ elif self.next_pass_trackers:
# we've finished the second-or-later pass. Move all the remaining
- # peers back into self.contacted_peers for the next pass.
- self.contacted_peers.extend(self.contacted_peers2)
- self.contacted_peers[:] = []
+ # servers back into self.second_pass_trackers for the next pass.
+ self.second_pass_trackers.extend(self.next_pass_trackers)
+ self.next_pass_trackers[:] = []
return self._loop()
else:
- # no more peers. If we haven't placed enough shares, we fail.
- placed_shares = self.total_shares - len(self.homeless_shares)
- if placed_shares < self.shares_of_happiness:
- msg = ("placed %d shares out of %d total (%d homeless), "
- "sent %d queries to %d peers, "
- "%d queries placed some shares, %d placed none, "
- "got %d errors" %
- (self.total_shares - len(self.homeless_shares),
- self.total_shares, len(self.homeless_shares),
- self.query_count, self.num_peers_contacted,
- self.good_query_count, self.bad_query_count,
- self.error_count))
- msg = "peer selection failed for %s: %s" % (self, msg)
+ # no more servers. If we haven't placed enough shares, we fail.
+ merged = merge_servers(self.preexisting_shares, self.use_trackers)
+ effective_happiness = servers_of_happiness(merged)
+ if effective_happiness < self.servers_of_happiness:
+ msg = failure_message(len(self.serverids_with_shares),
+ self.needed_shares,
+ self.servers_of_happiness,
+ effective_happiness)
+ msg = ("server selection failed for %s: %s (%s)" %
+ (self, msg, self._get_progress_message()))
if self.last_failure_msg:
msg += " (%s)" % (self.last_failure_msg,)
- log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
- raise NotEnoughSharesError(msg)
+ self.log(msg, level=log.UNUSUAL)
+ return self._failed(msg)
else:
# we placed enough to be happy, so we're done
if self._status:
self._status.set_status("Placed all shares")
- return self.use_peers
+ msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
+ self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
+ self.log(msg, level=log.OPERATIONAL)
+ return (self.use_trackers, self.preexisting_shares)
- def _got_response(self, res, peer, shares_to_ask, put_peer_here):
+ def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
if isinstance(res, failure.Failure):
# This is unusual, and probably indicates a bug or a network
# problem.
- log.msg("%s got error during peer selection: %s" % (peer, res),
- level=log.UNUSUAL, parent=self._log_parent)
+ self.log("%s got error during server selection: %s" % (tracker, res),
+ level=log.UNUSUAL)
self.error_count += 1
- self.homeless_shares = list(shares_to_ask) + self.homeless_shares
- if (self.uncontacted_peers
- or self.contacted_peers
- or self.contacted_peers2):
+ self.bad_query_count += 1
+ self.homeless_shares |= shares_to_ask
+ if (self.first_pass_trackers
+ or self.second_pass_trackers
+ or self.next_pass_trackers):
# there is still hope, so just loop
pass
else:
- # No more peers, so this upload might fail (it depends upon
- # whether we've hit shares_of_happiness or not). Log the last
- # failure we got: if a coding error causes all peers to fail
+ # No more servers, so this upload might fail (it depends upon
+ # whether we've hit servers_of_happiness or not). Log the last
+ # failure we got: if a coding error causes all servers to fail
# in the same way, this allows the common failure to be seen
# by the uploader and should help with debugging
- msg = ("last failure (from %s) was: %s" % (peer, res))
+ msg = ("last failure (from %s) was: %s" % (tracker, res))
self.last_failure_msg = msg
else:
(alreadygot, allocated) = res
- log.msg("response from peer %s: alreadygot=%s, allocated=%s"
- % (idlib.shortnodeid_b2a(peer.peerid),
+ self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
+ % (tracker.get_name(),
tuple(sorted(alreadygot)), tuple(sorted(allocated))),
- level=log.NOISY, parent=self._log_parent)
+ level=log.NOISY)
progress = False
for s in alreadygot:
- self.preexisting_shares[s] = peer.peerid
+ self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
if s in self.homeless_shares:
self.homeless_shares.remove(s)
progress = True
+ elif s in shares_to_ask:
+ progress = True
- # the PeerTracker will remember which shares were allocated on
+ # the ServerTracker will remember which shares were allocated on
# that peer. We just have to remember to use them.
if allocated:
- self.use_peers.add(peer)
+ self.use_trackers.add(tracker)
progress = True
+ if allocated or alreadygot:
+ self.serverids_with_shares.add(tracker.get_serverid())
+
not_yet_present = set(shares_to_ask) - set(alreadygot)
still_homeless = not_yet_present - set(allocated)
if progress:
- # they accepted or already had at least one share, so
- # progress has been made
+ # They accepted at least one of the shares that we asked
+ # them to accept, or they had a share that we didn't ask
+ # them to accept but that we hadn't placed yet, so this
+ # was a productive query
self.good_query_count += 1
else:
self.bad_query_count += 1
+ self.full_count += 1
if still_homeless:
# In networks with lots of space, this is very unusual and
- # probably indicates an error. In networks with peers that
+ # probably indicates an error. In networks with servers that
# are full, it is merely unusual. In networks that are very
# full, it is common, and many uploads will fail. In most
# cases, this is obviously not fatal, and we'll just use some
- # other peers.
+ # other servers.
# some shares are still homeless, keep trying to find them a
# home. The ones that were rejected get first priority.
- self.homeless_shares = (list(still_homeless)
- + self.homeless_shares)
+ self.homeless_shares |= still_homeless
# Since they were unable to accept all of our requests, so it
# is safe to assume that asking them again won't help.
else:
# if they *were* able to accept everything, they might be
# willing to accept even more.
- put_peer_here.append(peer)
+ put_tracker_here.append(tracker)
# now loop
return self._loop()
+ def _failed(self, msg):
+ """
+ I am called when server selection fails. I first abort all of the
+ remote buckets that I allocated during my unsuccessful attempt to
+ place shares for this file. I then raise an
+ UploadUnhappinessError with my msg argument.
+ """
+ for tracker in self.use_trackers:
+ assert isinstance(tracker, ServerTracker)
+ tracker.abort()
+ raise UploadUnhappinessError(msg)
+
+
class EncryptAnUploadable:
"""This is a wrapper that takes an IUploadable and provides
IEncryptedUploadable."""
# actually synchronous too, we'd blow the stack unless we stall for a
# tick. Once you accept a Deferred from IUploadable.read(), you must
# be prepared to have it fire immediately too.
- d.addCallback(eventual.fireEventually)
+ d.addCallback(fireEventually)
def _good(plaintext):
# and encrypt it..
# o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
def get_plaintext_hashtree_leaves(self, first, last, num_segments):
+ # this is currently unused, but will live again when we fix #453
if len(self._plaintext_segment_hashes) < num_segments:
# close out the last one
assert len(self._plaintext_segment_hashes) == num_segments-1
self.results = value
class CHKUploader:
- peer_selector_class = Tahoe2PeerSelector
+ server_selector_class = Tahoe2ServerSelector
- def __init__(self, client):
- self._client = client
- self._log_number = self._client.log("CHKUploader starting")
+ def __init__(self, storage_broker, secret_holder):
+ # server_selector needs storage_broker and secret_holder
+ self._storage_broker = storage_broker
+ self._secret_holder = secret_holder
+ self._log_number = self.log("CHKUploader starting", parent=None)
self._encoder = None
self._results = UploadResults()
self._storage_index = None
self._upload_status.set_active(True)
self._upload_status.set_results(self._results)
+ # locate_all_shareholders() will create the following attribute:
+ # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
+
def log(self, *args, **kwargs):
if "parent" not in kwargs:
kwargs["parent"] = self._log_number
if "facility" not in kwargs:
kwargs["facility"] = "tahoe.upload"
- return self._client.log(*args, **kwargs)
+ return log.msg(*args, **kwargs)
def start(self, encrypted_uploadable):
"""Start uploading the file.
return d
def abort(self):
- """Call this is the upload must be abandoned before it completes.
+ """Call this if the upload must be abandoned before it completes.
This will tell the shareholders to delete their partial shares. I
return a Deferred that fires when these messages have been acked."""
if not self._encoder:
return d
def locate_all_shareholders(self, encoder, started):
- peer_selection_started = now = time.time()
+ server_selection_started = now = time.time()
self._storage_index_elapsed = now - started
+ storage_broker = self._storage_broker
+ secret_holder = self._secret_holder
storage_index = encoder.get_param("storage_index")
self._storage_index = storage_index
- upload_id = storage.si_b2a(storage_index)[:5]
+ upload_id = si_b2a(storage_index)[:5]
self.log("using storage index %s" % upload_id)
- peer_selector = self.peer_selector_class(upload_id, self._log_number,
- self._upload_status)
+ server_selector = self.server_selector_class(upload_id,
+ self._log_number,
+ self._upload_status)
share_size = encoder.get_param("share_size")
block_size = encoder.get_param("block_size")
num_segments = encoder.get_param("num_segments")
k,desired,n = encoder.get_param("share_counts")
- self._peer_selection_started = time.time()
- d = peer_selector.get_shareholders(self._client, storage_index,
- share_size, block_size,
- num_segments, n, desired)
+ self._server_selection_started = time.time()
+ d = server_selector.get_shareholders(storage_broker, secret_holder,
+ storage_index,
+ share_size, block_size,
+ num_segments, n, k, desired)
def _done(res):
- self._peer_selection_elapsed = time.time() - peer_selection_started
+ self._server_selection_elapsed = time.time() - server_selection_started
return res
d.addCallback(_done)
return d
- def set_shareholders(self, (used_peers, already_peers), encoder):
+ def set_shareholders(self, (upload_trackers, already_serverids), encoder):
"""
- @param used_peers: a sequence of PeerTracker objects
- @paran already_peers: a dict mapping sharenum to a peerid that
- claims to already have this share
+ @param upload_trackers: a sequence of ServerTracker objects that
+ have agreed to hold some shares for us (the
+ shareids are stashed inside the ServerTracker)
+
+ @paran already_serverids: a dict mapping sharenum to a set of
+ serverids for servers that claim to already
+ have this share
"""
- self.log("_send_shares, used_peers is %s" % (used_peers,))
+ msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
+ values = ([', '.join([str_shareloc(k,v)
+ for k,v in st.buckets.iteritems()])
+ for st in upload_trackers], already_serverids)
+ self.log(msgtempl % values, level=log.OPERATIONAL)
# record already-present shares in self._results
- for (shnum, peerid) in already_peers.items():
- peerid_s = idlib.shortnodeid_b2a(peerid)
- self._results.sharemap[shnum] = "Found on [%s]" % peerid_s
- if peerid not in self._results.servermap:
- self._results.servermap[peerid] = set()
- self._results.servermap[peerid].add(shnum)
- self._results.preexisting_shares = len(already_peers)
-
- self._sharemap = {}
- for peer in used_peers:
- assert isinstance(peer, PeerTracker)
+ self._results.preexisting_shares = len(already_serverids)
+
+ self._server_trackers = {} # k: shnum, v: instance of ServerTracker
+ for tracker in upload_trackers:
+ assert isinstance(tracker, ServerTracker)
buckets = {}
- for peer in used_peers:
- buckets.update(peer.buckets)
- for shnum in peer.buckets:
- self._sharemap[shnum] = peer
- assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
- encoder.set_shareholders(buckets)
+ servermap = already_serverids.copy()
+ for tracker in upload_trackers:
+ buckets.update(tracker.buckets)
+ for shnum in tracker.buckets:
+ self._server_trackers[shnum] = tracker
+ servermap.setdefault(shnum, set()).add(tracker.get_serverid())
+ assert len(buckets) == sum([len(tracker.buckets)
+ for tracker in upload_trackers]), \
+ "%s (%s) != %s (%s)" % (
+ len(buckets),
+ buckets,
+ sum([len(tracker.buckets) for tracker in upload_trackers]),
+ [(t.buckets, t.get_serverid()) for t in upload_trackers]
+ )
+ encoder.set_shareholders(buckets, servermap)
def _encrypted_done(self, verifycap):
""" Returns a Deferred that will fire with the UploadResults instance. """
r = self._results
for shnum in self._encoder.get_shares_placed():
- peer_tracker = self._sharemap[shnum]
- peerid = peer_tracker.peerid
- peerid_s = idlib.shortnodeid_b2a(peerid)
- r.sharemap[shnum] = "Placed on [%s]" % peerid_s
- if peerid not in r.servermap:
- r.servermap[peerid] = set()
- r.servermap[peerid].add(shnum)
+ server_tracker = self._server_trackers[shnum]
+ serverid = server_tracker.get_serverid()
+ r.sharemap.add(shnum, serverid)
+ r.servermap.add(serverid, shnum)
r.pushed_shares = len(self._encoder.get_shares_placed())
now = time.time()
r.file_size = self._encoder.file_size
r.timings["total"] = now - self._started
r.timings["storage_index"] = self._storage_index_elapsed
- r.timings["peer_selection"] = self._peer_selection_elapsed
+ r.timings["peer_selection"] = self._server_selection_elapsed
r.timings.update(self._encoder.get_times())
r.uri_extension_data = self._encoder.get_uri_extension_data()
r.verifycapstr = verifycap.to_string()
class LiteralUploader:
- def __init__(self, client):
- self._client = client
+ def __init__(self):
self._results = UploadResults()
self._status = s = UploadStatus()
s.set_storage_index(None)
def _build_results(self, uri):
self._results.uri = uri
- self._status.set_status("Done")
+ self._status.set_status("Finished")
self._status.set_progress(1, 1.0)
self._status.set_progress(2, 1.0)
return self._results
d.addCallback(_read)
return d
- def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
- log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
- (first, last-1, num_segments),
- level=log.NOISY)
- d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
- d.addCallback(list)
- return d
- def remote_get_plaintext_hash(self):
- return self._eu.get_plaintext_hash()
def remote_close(self):
return self._eu.close()
now = self._time_contacting_helper_start = time.time()
self._storage_index_elapsed = now - self._started
self.log(format="contacting helper for SI %(si)s..",
- si=storage.si_b2a(self._storage_index))
+ si=si_b2a(self._storage_index), level=log.NOISY)
self._upload_status.set_status("Contacting Helper")
d = self._helper.callRemote("upload_chk", self._storage_index)
d.addCallback(self._contacted_helper)
elapsed = now - self._time_contacting_helper_start
self._elapsed_time_contacting_helper = elapsed
if upload_helper:
- self.log("helper says we need to upload")
+ self.log("helper says we need to upload", level=log.NOISY)
self._upload_status.set_status("Uploading Ciphertext")
# we need to upload the file
reu = RemoteEncryptedUploadable(self._encuploadable,
upload_helper.callRemote("upload", reu))
# this Deferred will fire with the upload results
return d
- self.log("helper says file is already uploaded")
+ self.log("helper says file is already uploaded", level=log.OPERATIONAL)
self._upload_status.set_progress(1, 1.0)
self._upload_status.set_results(upload_results)
return upload_results
+ def _convert_old_upload_results(self, upload_results):
+ # pre-1.3.0 helpers return upload results which contain a mapping
+ # from shnum to a single human-readable string, containing things
+ # like "Found on [x],[y],[z]" (for healthy files that were already in
+ # the grid), "Found on [x]" (for files that needed upload but which
+ # discovered pre-existing shares), and "Placed on [x]" (for newly
+ # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
+ # set of binary serverid strings.
+
+ # the old results are too hard to deal with (they don't even contain
+ # as much information as the new results, since the nodeids are
+ # abbreviated), so if we detect old results, just clobber them.
+
+ sharemap = upload_results.sharemap
+ if str in [type(v) for v in sharemap.values()]:
+ upload_results.sharemap = None
+
def _build_verifycap(self, upload_results):
- self.log("upload finished, building readcap")
+ self.log("upload finished, building readcap", level=log.OPERATIONAL)
+ self._convert_old_upload_results(upload_results)
self._upload_status.set_status("Building Readcap")
r = upload_results
assert r.uri_extension_data["needed_shares"] == self._needed_shares
if "total" in r.timings:
r.timings["helper_total"] = r.timings["total"]
r.timings["total"] = now - self._started
- self._upload_status.set_status("Done")
+ self._upload_status.set_status("Finished")
self._upload_status.set_results(r)
return r
return self._upload_status
class BaseUploadable:
- default_max_segment_size = 128*KiB # overridden by max_segment_size
+ # this is overridden by max_segment_size
+ default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
default_encoding_param_k = 3 # overridden by encoding_parameters
default_encoding_param_happy = 7
default_encoding_param_n = 10
implements(IUploader)
name = "uploader"
URI_LIT_SIZE_THRESHOLD = 55
- MAX_UPLOAD_STATUSES = 10
- def __init__(self, helper_furl=None, stats_provider=None):
+ def __init__(self, helper_furl=None, stats_provider=None, history=None):
self._helper_furl = helper_furl
self.stats_provider = stats_provider
+ self._history = history
self._helper = None
self._all_uploads = weakref.WeakKeyDictionary() # for debugging
- self._all_upload_statuses = weakref.WeakKeyDictionary()
- self._recent_upload_statuses = []
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
service.MultiService.__init__(self)
{ },
"application-version": "unknown: no get_version()",
}
- d = get_versioned_remote_reference(helper, default)
+ d = add_version_to_remote_reference(helper, default)
d.addCallback(self._got_versioned_helper)
def _got_versioned_helper(self, helper):
self.stats_provider.count('uploader.bytes_uploaded', size)
if size <= self.URI_LIT_SIZE_THRESHOLD:
- uploader = LiteralUploader(self.parent)
+ uploader = LiteralUploader()
return uploader.start(uploadable)
else:
eu = EncryptAnUploadable(uploadable, self._parentmsgid)
d2.addCallback(lambda x: eu.get_storage_index())
d2.addCallback(lambda si: uploader.start(eu, si))
else:
- uploader = CHKUploader(self.parent)
+ storage_broker = self.parent.get_storage_broker()
+ secret_holder = self.parent._secret_holder
+ uploader = CHKUploader(storage_broker, secret_holder)
d2.addCallback(lambda x: uploader.start(eu))
- self._add_upload(uploader)
+ self._all_uploads[uploader] = None
+ if self._history:
+ self._history.add_upload(uploader.get_upload_status())
def turn_verifycap_into_read_cap(uploadresults):
# Generate the uri from the verifycap plus the key.
d3 = uploadable.get_encryption_key()
return res
d.addBoth(_done)
return d
-
- def _add_upload(self, uploader):
- s = uploader.get_upload_status()
- self._all_uploads[uploader] = None
- self._all_upload_statuses[s] = None
- self._recent_upload_statuses.append(s)
- while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
- self._recent_upload_statuses.pop(0)
-
- def list_all_upload_statuses(self):
- for us in self._all_upload_statuses:
- yield us