from allmydata.immutable import encode
from allmydata.util import base32, dictutil, idlib, log, mathutil
from allmydata.util.happinessutil import servers_of_happiness, \
- shares_by_server, merge_peers, \
+ shares_by_server, merge_servers, \
failure_message
-from allmydata.util.assertutil import precondition
+from allmydata.util.assertutil import precondition, _assert
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
class TooFullError(Exception):
pass
-class UploadResults(Copyable, RemoteCopy):
- implements(IUploadResults)
+# HelperUploadResults are what we get from the Helper, and to retain
+# backwards compatibility with old Helpers we can't change the format. We
+# convert them into a local UploadResults upon receipt.
+class HelperUploadResults(Copyable, RemoteCopy):
# note: don't change this string, it needs to match the value used on the
# helper, and it does *not* need to match the fully-qualified
# package/module/class name
self.preexisting_shares = None # count of shares already present
self.pushed_shares = None # count of shares we pushed
+class UploadResults:
+ implements(IUploadResults)
+
+ def __init__(self, file_size,
+ ciphertext_fetched, # how much the helper fetched
+ preexisting_shares, # count of shares already present
+ pushed_shares, # count of shares we pushed
+ sharemap, # {shnum: set(server)}
+ servermap, # {server: set(shnum)}
+ timings, # dict of name to number of seconds
+ uri_extension_data,
+ uri_extension_hash,
+ verifycapstr):
+ self._file_size = file_size
+ self._ciphertext_fetched = ciphertext_fetched
+ self._preexisting_shares = preexisting_shares
+ self._pushed_shares = pushed_shares
+ self._sharemap = sharemap
+ self._servermap = servermap
+ self._timings = timings
+ self._uri_extension_data = uri_extension_data
+ self._uri_extension_hash = uri_extension_hash
+ self._verifycapstr = verifycapstr
+
+ def set_uri(self, uri):
+ self._uri = uri
+
+ def get_file_size(self):
+ return self._file_size
+ def get_uri(self):
+ return self._uri
+ def get_ciphertext_fetched(self):
+ return self._ciphertext_fetched
+ def get_preexisting_shares(self):
+ return self._preexisting_shares
+ def get_pushed_shares(self):
+ return self._pushed_shares
+ def get_sharemap(self):
+ return self._sharemap
+ def get_servermap(self):
+ return self._servermap
+ def get_timings(self):
+ return self._timings
+ def get_uri_extension_data(self):
+ return self._uri_extension_data
+ def get_verifycapstr(self):
+ return self._verifycapstr
# our current uri_extension is 846 bytes for small files, a few bytes
# more for larger ones (since the filesize is encoded in decimal in a
return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
class ServerTracker:
- def __init__(self, serverid, storage_server,
+ def __init__(self, server,
sharesize, blocksize, num_segments, num_share_hashes,
storage_index,
bucket_renewal_secret, bucket_cancel_secret):
- precondition(isinstance(serverid, str), serverid)
- precondition(len(serverid) == 20, serverid)
- self.serverid = serverid
- self._storageserver = storage_server # to an RIStorageServer
+ self._server = server
self.buckets = {} # k: shareid, v: IRemoteBucketWriter
self.sharesize = sharesize
- wbp = layout.make_write_bucket_proxy(None, sharesize,
+ wbp = layout.make_write_bucket_proxy(None, None, sharesize,
blocksize, num_segments,
num_share_hashes,
- EXTENSION_SIZE, serverid)
+ EXTENSION_SIZE)
self.wbp_class = wbp.__class__ # to create more of them
self.allocated_size = wbp.get_allocated_size()
self.blocksize = blocksize
def __repr__(self):
return ("<ServerTracker for server %s and SI %s>"
- % (idlib.shortnodeid_b2a(self.serverid),
- si_b2a(self.storage_index)[:5]))
+ % (self._server.get_name(), si_b2a(self.storage_index)[:5]))
+
+ def get_server(self):
+ return self._server
+ def get_serverid(self):
+ return self._server.get_serverid()
+ def get_name(self):
+ return self._server.get_name()
def query(self, sharenums):
- d = self._storageserver.callRemote("allocate_buckets",
- self.storage_index,
- self.renew_secret,
- self.cancel_secret,
- sharenums,
- self.allocated_size,
- canary=Referenceable())
+ rref = self._server.get_rref()
+ d = rref.callRemote("allocate_buckets",
+ self.storage_index,
+ self.renew_secret,
+ self.cancel_secret,
+ sharenums,
+ self.allocated_size,
+ canary=Referenceable())
d.addCallback(self._got_reply)
return d
def ask_about_existing_shares(self):
- return self._storageserver.callRemote("get_buckets",
- self.storage_index)
+ rref = self._server.get_rref()
+ return rref.callRemote("get_buckets", self.storage_index)
def _got_reply(self, (alreadygot, buckets)):
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
b = {}
for sharenum, rref in buckets.iteritems():
- bp = self.wbp_class(rref, self.sharesize,
+ bp = self.wbp_class(rref, self._server, self.sharesize,
self.blocksize,
self.num_segments,
self.num_share_hashes,
- EXTENSION_SIZE,
- self.serverid)
+ EXTENSION_SIZE)
b[sharenum] = bp
self.buckets.update(b)
return (alreadygot, set(b.keys()))
def str_shareloc(shnum, bucketwriter):
- return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),)
+ return "%s: %s" % (shnum, bucketwriter.get_servername(),)
class Tahoe2ServerSelector(log.PrefixingLogMixin):
num_segments, total_shares, needed_shares,
servers_of_happiness):
"""
- @return: (upload_servers, already_servers), where upload_servers is
- a set of ServerTracker instances that have agreed to hold
+ @return: (upload_trackers, already_serverids), where upload_trackers
+ is a set of ServerTracker instances that have agreed to hold
some shares for us (the shareids are stashed inside the
- ServerTracker), and already_servers is a dict mapping shnum
- to a set of servers which claim to already have the share.
+ ServerTracker), and already_serverids is a dict mapping
+ shnum to a set of serverids for servers which claim to
+ already have the share.
"""
if self._status:
self.needed_shares = needed_shares
self.homeless_shares = set(range(total_shares))
- self.contacted_trackers = [] # servers worth asking again
- self.contacted_trackers2 = [] # servers that we have asked again
- self._started_second_pass = False
self.use_trackers = set() # ServerTrackers that have shares assigned
# to them
self.preexisting_shares = {} # shareid => set(serverids) holding shareid
# These servers have shares -- any shares -- for our SI. We keep
# track of these to write an error message with them later.
- self.servers_with_shares = set()
+ self.serverids_with_shares = set()
# this needed_hashes computation should mirror
# Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
# figure out how much space to ask for
- wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
- num_share_hashes, EXTENSION_SIZE,
- None)
+ wbp = layout.make_write_bucket_proxy(None, None,
+ share_size, 0, num_segments,
+ num_share_hashes, EXTENSION_SIZE)
allocated_size = wbp.get_allocated_size()
- all_servers = [(s.get_serverid(), s.get_rref())
- for s in storage_broker.get_servers_for_psi(storage_index)]
+ all_servers = storage_broker.get_servers_for_psi(storage_index)
if not all_servers:
raise NoServersError("client gave us zero servers")
# field) from getting large shares (for files larger than about
# 12GiB). See #439 for details.
def _get_maxsize(server):
- (serverid, conn) = server
- v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
+ v0 = server.get_rref().version
+ v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
return v1["maximum-immutable-share-size"]
- writable_servers = [server for server in all_servers
+ writeable_servers = [server for server in all_servers
if _get_maxsize(server) >= allocated_size]
- readonly_servers = set(all_servers[:2*total_shares]) - set(writable_servers)
+ readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)
# decide upon the renewal/cancel secrets, to include them in the
# allocate_buckets query.
file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
storage_index)
def _make_trackers(servers):
- return [ServerTracker(serverid, conn,
- share_size, block_size,
- num_segments, num_share_hashes,
- storage_index,
- bucket_renewal_secret_hash(file_renewal_secret,
- serverid),
- bucket_cancel_secret_hash(file_cancel_secret,
- serverid))
- for (serverid, conn) in servers]
- self.uncontacted_trackers = _make_trackers(writable_servers)
+ trackers = []
+ for s in servers:
+ seed = s.get_lease_seed()
+ renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
+ cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
+ st = ServerTracker(s,
+ share_size, block_size,
+ num_segments, num_share_hashes,
+ storage_index,
+ renew, cancel)
+ trackers.append(st)
+ return trackers
+
+ # We assign each servers/trackers into one three lists. They all
+ # start in the "first pass" list. During the first pass, as we ask
+ # each one to hold a share, we move their tracker to the "second
+ # pass" list, until the first-pass list is empty. Then during the
+ # second pass, as we ask each to hold more shares, we move their
+ # tracker to the "next pass" list, until the second-pass list is
+ # empty. Then we move everybody from the next-pass list back to the
+ # second-pass list and repeat the "second" pass (really the third,
+ # fourth, etc pass), until all shares are assigned, or we've run out
+ # of potential servers.
+ self.first_pass_trackers = _make_trackers(writeable_servers)
+ self.second_pass_trackers = [] # servers worth asking again
+ self.next_pass_trackers = [] # servers that we have asked again
+ self._started_second_pass = False
# We don't try to allocate shares to these servers, since they've
# said that they're incapable of storing shares of the size that we'd
for tracker in readonly_trackers:
assert isinstance(tracker, ServerTracker)
d = tracker.ask_about_existing_shares()
- d.addBoth(self._handle_existing_response, tracker.serverid)
+ d.addBoth(self._handle_existing_response, tracker)
ds.append(d)
self.num_servers_contacted += 1
self.query_count += 1
self.log("asking server %s for any existing shares" %
- (idlib.shortnodeid_b2a(tracker.serverid),),
- level=log.NOISY)
+ (tracker.get_name(),), level=log.NOISY)
dl = defer.DeferredList(ds)
dl.addCallback(lambda ign: self._loop())
return dl
- def _handle_existing_response(self, res, server):
+ def _handle_existing_response(self, res, tracker):
"""
I handle responses to the queries sent by
Tahoe2ServerSelector._existing_shares.
"""
+ serverid = tracker.get_serverid()
if isinstance(res, failure.Failure):
self.log("%s got error during existing shares check: %s"
- % (idlib.shortnodeid_b2a(server), res),
- level=log.UNUSUAL)
+ % (tracker.get_name(), res), level=log.UNUSUAL)
self.error_count += 1
self.bad_query_count += 1
else:
buckets = res
if buckets:
- self.servers_with_shares.add(server)
+ self.serverids_with_shares.add(serverid)
self.log("response to get_buckets() from server %s: alreadygot=%s"
- % (idlib.shortnodeid_b2a(server), tuple(sorted(buckets))),
+ % (tracker.get_name(), tuple(sorted(buckets))),
level=log.NOISY)
for bucket in buckets:
- self.preexisting_shares.setdefault(bucket, set()).add(server)
+ self.preexisting_shares.setdefault(bucket, set()).add(serverid)
self.homeless_shares.discard(bucket)
self.full_count += 1
self.bad_query_count += 1
def _loop(self):
if not self.homeless_shares:
- merged = merge_peers(self.preexisting_shares, self.use_trackers)
+ merged = merge_servers(self.preexisting_shares, self.use_trackers)
effective_happiness = servers_of_happiness(merged)
if self.servers_of_happiness <= effective_happiness:
msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
shares_to_spread = sum([len(list(sharelist)) - 1
for (server, sharelist)
in shares.items()])
- if delta <= len(self.uncontacted_trackers) and \
+ if delta <= len(self.first_pass_trackers) and \
shares_to_spread >= delta:
items = shares.items()
while len(self.homeless_shares) < delta:
return self._loop()
else:
# Redistribution won't help us; fail.
- server_count = len(self.servers_with_shares)
+ server_count = len(self.serverids_with_shares)
failmsg = failure_message(server_count,
self.needed_shares,
self.servers_of_happiness,
self.log(servmsg, level=log.INFREQUENT)
return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
- if self.uncontacted_trackers:
- tracker = self.uncontacted_trackers.pop(0)
+ if self.first_pass_trackers:
+ tracker = self.first_pass_trackers.pop(0)
# TODO: don't pre-convert all serverids to ServerTrackers
assert isinstance(tracker, ServerTracker)
if self._status:
self._status.set_status("Contacting Servers [%s] (first query),"
" %d shares left.."
- % (idlib.shortnodeid_b2a(tracker.serverid),
+ % (tracker.get_name(),
len(self.homeless_shares)))
d = tracker.query(shares_to_ask)
d.addBoth(self._got_response, tracker, shares_to_ask,
- self.contacted_trackers)
+ self.second_pass_trackers)
return d
- elif self.contacted_trackers:
+ elif self.second_pass_trackers:
# ask a server that we've already asked.
if not self._started_second_pass:
self.log("starting second pass",
level=log.NOISY)
self._started_second_pass = True
num_shares = mathutil.div_ceil(len(self.homeless_shares),
- len(self.contacted_trackers))
- tracker = self.contacted_trackers.pop(0)
+ len(self.second_pass_trackers))
+ tracker = self.second_pass_trackers.pop(0)
shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
self.homeless_shares -= shares_to_ask
self.query_count += 1
if self._status:
self._status.set_status("Contacting Servers [%s] (second query),"
" %d shares left.."
- % (idlib.shortnodeid_b2a(tracker.serverid),
+ % (tracker.get_name(),
len(self.homeless_shares)))
d = tracker.query(shares_to_ask)
d.addBoth(self._got_response, tracker, shares_to_ask,
- self.contacted_trackers2)
+ self.next_pass_trackers)
return d
- elif self.contacted_trackers2:
+ elif self.next_pass_trackers:
# we've finished the second-or-later pass. Move all the remaining
- # servers back into self.contacted_trackers for the next pass.
- self.contacted_trackers.extend(self.contacted_trackers2)
- self.contacted_trackers2[:] = []
+ # servers back into self.second_pass_trackers for the next pass.
+ self.second_pass_trackers.extend(self.next_pass_trackers)
+ self.next_pass_trackers[:] = []
return self._loop()
else:
# no more servers. If we haven't placed enough shares, we fail.
- merged = merge_peers(self.preexisting_shares, self.use_trackers)
+ merged = merge_servers(self.preexisting_shares, self.use_trackers)
effective_happiness = servers_of_happiness(merged)
if effective_happiness < self.servers_of_happiness:
- msg = failure_message(len(self.servers_with_shares),
+ msg = failure_message(len(self.serverids_with_shares),
self.needed_shares,
self.servers_of_happiness,
effective_happiness)
self.error_count += 1
self.bad_query_count += 1
self.homeless_shares |= shares_to_ask
- if (self.uncontacted_trackers
- or self.contacted_trackers
- or self.contacted_trackers2):
+ if (self.first_pass_trackers
+ or self.second_pass_trackers
+ or self.next_pass_trackers):
# there is still hope, so just loop
pass
else:
else:
(alreadygot, allocated) = res
self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
- % (idlib.shortnodeid_b2a(tracker.serverid),
+ % (tracker.get_name(),
tuple(sorted(alreadygot)), tuple(sorted(allocated))),
level=log.NOISY)
progress = False
for s in alreadygot:
- self.preexisting_shares.setdefault(s, set()).add(tracker.serverid)
+ self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
if s in self.homeless_shares:
self.homeless_shares.remove(s)
progress = True
progress = True
if allocated or alreadygot:
- self.servers_with_shares.add(tracker.serverid)
+ self.serverids_with_shares.add(tracker.get_serverid())
not_yet_present = set(shares_to_ask) - set(alreadygot)
still_homeless = not_yet_present - set(allocated)
CHUNKSIZE = 50*1024
def __init__(self, original, log_parent=None):
+ precondition(original.default_params_set,
+ "set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,))
self.original = IUploadable(original)
self._log_number = log_parent
self._encryptor = None
self._secret_holder = secret_holder
self._log_number = self.log("CHKUploader starting", parent=None)
self._encoder = None
- self._results = UploadResults()
self._storage_index = None
self._upload_status = UploadStatus()
self._upload_status.set_helper(False)
self._upload_status.set_active(True)
- self._upload_status.set_results(self._results)
# locate_all_shareholders() will create the following attribute:
# self._server_trackers = {} # k: shnum, v: instance of ServerTracker
d.addCallback(_done)
return d
- def set_shareholders(self, (upload_servers, already_servers), encoder):
+ def set_shareholders(self, (upload_trackers, already_serverids), encoder):
"""
- @param upload_servers: a sequence of ServerTracker objects that
- have agreed to hold some shares for us (the
- shareids are stashed inside the ServerTracker)
- @paran already_servers: a dict mapping sharenum to a set of serverids
- that claim to already have this share
+ @param upload_trackers: a sequence of ServerTracker objects that
+ have agreed to hold some shares for us (the
+ shareids are stashed inside the ServerTracker)
+
+ @paran already_serverids: a dict mapping sharenum to a set of
+ serverids for servers that claim to already
+ have this share
"""
- msgtempl = "set_shareholders; upload_servers is %s, already_servers is %s"
- values = ([', '.join([str_shareloc(k,v) for k,v in s.buckets.iteritems()])
- for s in upload_servers], already_servers)
+ msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
+ values = ([', '.join([str_shareloc(k,v)
+ for k,v in st.buckets.iteritems()])
+ for st in upload_trackers], already_serverids)
self.log(msgtempl % values, level=log.OPERATIONAL)
# record already-present shares in self._results
- self._results.preexisting_shares = len(already_servers)
+ self._count_preexisting_shares = len(already_serverids)
self._server_trackers = {} # k: shnum, v: instance of ServerTracker
- for server in upload_servers:
- assert isinstance(server, ServerTracker)
+ for tracker in upload_trackers:
+ assert isinstance(tracker, ServerTracker)
buckets = {}
- servermap = already_servers.copy()
- for server in upload_servers:
- buckets.update(server.buckets)
- for shnum in server.buckets:
- self._server_trackers[shnum] = server
- servermap.setdefault(shnum, set()).add(server.serverid)
- assert len(buckets) == sum([len(server.buckets)
- for server in upload_servers]), \
+ servermap = already_serverids.copy()
+ for tracker in upload_trackers:
+ buckets.update(tracker.buckets)
+ for shnum in tracker.buckets:
+ self._server_trackers[shnum] = tracker
+ servermap.setdefault(shnum, set()).add(tracker.get_serverid())
+ assert len(buckets) == sum([len(tracker.buckets)
+ for tracker in upload_trackers]), \
"%s (%s) != %s (%s)" % (
len(buckets),
buckets,
- sum([len(server.buckets) for server in upload_servers]),
- [(s.buckets, s.serverid) for s in upload_servers]
+ sum([len(tracker.buckets) for tracker in upload_trackers]),
+ [(t.buckets, t.get_serverid()) for t in upload_trackers]
)
encoder.set_shareholders(buckets, servermap)
def _encrypted_done(self, verifycap):
- """ Returns a Deferred that will fire with the UploadResults instance. """
- r = self._results
- for shnum in self._encoder.get_shares_placed():
- server_tracker = self._server_trackers[shnum]
- serverid = server_tracker.serverid
- r.sharemap.add(shnum, serverid)
- r.servermap.add(serverid, shnum)
- r.pushed_shares = len(self._encoder.get_shares_placed())
+ """Returns a Deferred that will fire with the UploadResults instance."""
+ e = self._encoder
+ sharemap = dictutil.DictOfSets()
+ servermap = dictutil.DictOfSets()
+ for shnum in e.get_shares_placed():
+ server = self._server_trackers[shnum].get_server()
+ sharemap.add(shnum, server)
+ servermap.add(server, shnum)
now = time.time()
- r.file_size = self._encoder.file_size
- r.timings["total"] = now - self._started
- r.timings["storage_index"] = self._storage_index_elapsed
- r.timings["peer_selection"] = self._server_selection_elapsed
- r.timings.update(self._encoder.get_times())
- r.uri_extension_data = self._encoder.get_uri_extension_data()
- r.verifycapstr = verifycap.to_string()
- return r
+ timings = {}
+ timings["total"] = now - self._started
+ timings["storage_index"] = self._storage_index_elapsed
+ timings["peer_selection"] = self._server_selection_elapsed
+ timings.update(e.get_times())
+ ur = UploadResults(file_size=e.file_size,
+ ciphertext_fetched=0,
+ preexisting_shares=self._count_preexisting_shares,
+ pushed_shares=len(e.get_shares_placed()),
+ sharemap=sharemap,
+ servermap=servermap,
+ timings=timings,
+ uri_extension_data=e.get_uri_extension_data(),
+ uri_extension_hash=e.get_uri_extension_hash(),
+ verifycapstr=verifycap.to_string())
+ self._upload_status.set_results(ur)
+ return ur
def get_upload_status(self):
return self._upload_status
class LiteralUploader:
def __init__(self):
- self._results = UploadResults()
self._status = s = UploadStatus()
s.set_storage_index(None)
s.set_helper(False)
s.set_progress(0, 1.0)
s.set_active(False)
- s.set_results(self._results)
def start(self, uploadable):
uploadable = IUploadable(uploadable)
def _got_size(size):
self._size = size
self._status.set_size(size)
- self._results.file_size = size
return read_this_many_bytes(uploadable, size)
d.addCallback(_got_size)
d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
return d
def _build_results(self, uri):
- self._results.uri = uri
+ ur = UploadResults(file_size=self._size,
+ ciphertext_fetched=0,
+ preexisting_shares=0,
+ pushed_shares=0,
+ sharemap={},
+ servermap={},
+ timings={},
+ uri_extension_data=None,
+ uri_extension_hash=None,
+ verifycapstr=None)
+ ur.set_uri(uri)
self._status.set_status("Finished")
self._status.set_progress(1, 1.0)
self._status.set_progress(2, 1.0)
- return self._results
+ self._status.set_results(ur)
+ return ur
def close(self):
pass
class AssistedUploader:
- def __init__(self, helper):
+ def __init__(self, helper, storage_broker):
self._helper = helper
+ self._storage_broker = storage_broker
self._log_number = log.msg("AssistedUploader starting")
self._storage_index = None
self._upload_status = s = UploadStatus()
d.addCallback(self._contacted_helper)
return d
- def _contacted_helper(self, (upload_results, upload_helper)):
+ def _contacted_helper(self, (helper_upload_results, upload_helper)):
now = time.time()
elapsed = now - self._time_contacting_helper_start
self._elapsed_time_contacting_helper = elapsed
return d
self.log("helper says file is already uploaded", level=log.OPERATIONAL)
self._upload_status.set_progress(1, 1.0)
- self._upload_status.set_results(upload_results)
- return upload_results
+ return helper_upload_results
def _convert_old_upload_results(self, upload_results):
# pre-1.3.0 helpers return upload results which contain a mapping
if str in [type(v) for v in sharemap.values()]:
upload_results.sharemap = None
- def _build_verifycap(self, upload_results):
+ def _build_verifycap(self, helper_upload_results):
self.log("upload finished, building readcap", level=log.OPERATIONAL)
- self._convert_old_upload_results(upload_results)
+ self._convert_old_upload_results(helper_upload_results)
self._upload_status.set_status("Building Readcap")
- r = upload_results
- assert r.uri_extension_data["needed_shares"] == self._needed_shares
- assert r.uri_extension_data["total_shares"] == self._total_shares
- assert r.uri_extension_data["segment_size"] == self._segment_size
- assert r.uri_extension_data["size"] == self._size
- r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
- uri_extension_hash=r.uri_extension_hash,
- needed_shares=self._needed_shares,
- total_shares=self._total_shares, size=self._size
- ).to_string()
+ hur = helper_upload_results
+ assert hur.uri_extension_data["needed_shares"] == self._needed_shares
+ assert hur.uri_extension_data["total_shares"] == self._total_shares
+ assert hur.uri_extension_data["segment_size"] == self._segment_size
+ assert hur.uri_extension_data["size"] == self._size
+
+ # hur.verifycap doesn't exist if already found
+ v = uri.CHKFileVerifierURI(self._storage_index,
+ uri_extension_hash=hur.uri_extension_hash,
+ needed_shares=self._needed_shares,
+ total_shares=self._total_shares,
+ size=self._size)
+ timings = {}
+ timings["storage_index"] = self._storage_index_elapsed
+ timings["contacting_helper"] = self._elapsed_time_contacting_helper
+ for key,val in hur.timings.items():
+ if key == "total":
+ key = "helper_total"
+ timings[key] = val
now = time.time()
- r.file_size = self._size
- r.timings["storage_index"] = self._storage_index_elapsed
- r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
- if "total" in r.timings:
- r.timings["helper_total"] = r.timings["total"]
- r.timings["total"] = now - self._started
+ timings["total"] = now - self._started
+
+ gss = self._storage_broker.get_stub_server
+ sharemap = {}
+ servermap = {}
+ for shnum, serverids in hur.sharemap.items():
+ sharemap[shnum] = set([gss(serverid) for serverid in serverids])
+ # if the file was already in the grid, hur.servermap is an empty dict
+ for serverid, shnums in hur.servermap.items():
+ servermap[gss(serverid)] = set(shnums)
+
+ ur = UploadResults(file_size=self._size,
+ # not if already found
+ ciphertext_fetched=hur.ciphertext_fetched,
+ preexisting_shares=hur.preexisting_shares,
+ pushed_shares=hur.pushed_shares,
+ sharemap=sharemap,
+ servermap=servermap,
+ timings=timings,
+ uri_extension_data=hur.uri_extension_data,
+ uri_extension_hash=hur.uri_extension_hash,
+ verifycapstr=v.to_string())
+
self._upload_status.set_status("Finished")
- self._upload_status.set_results(r)
- return r
+ self._upload_status.set_results(ur)
+ return ur
def get_upload_status(self):
return self._upload_status
class BaseUploadable:
# this is overridden by max_segment_size
default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
- default_encoding_param_k = 3 # overridden by encoding_parameters
- default_encoding_param_happy = 7
- default_encoding_param_n = 10
+ default_params_set = False
max_segment_size = None
encoding_param_k = None
self.default_encoding_param_n = default_params["n"]
if "max_segment_size" in default_params:
self.default_max_segment_size = default_params["max_segment_size"]
+ self.default_params_set = True
def get_all_encoding_parameters(self):
+ _assert(self.default_params_set, "set_default_encoding_parameters not called on %r" % (self,))
if self._all_encoding_parameters:
return defer.succeed(self._all_encoding_parameters)
def get_size(self):
if self._size is not None:
return defer.succeed(self._size)
- self._filehandle.seek(0,2)
+ self._filehandle.seek(0, os.SEEK_END)
size = self._filehandle.tell()
self._size = size
self._filehandle.seek(0)
name = "uploader"
URI_LIT_SIZE_THRESHOLD = 55
- def __init__(self, helper_furl=None, stats_provider=None):
+ def __init__(self, helper_furl=None, stats_provider=None, history=None):
self._helper_furl = helper_furl
self.stats_provider = stats_provider
+ self._history = history
self._helper = None
self._all_uploads = weakref.WeakKeyDictionary() # for debugging
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
return (self._helper_furl, bool(self._helper))
- def upload(self, uploadable, history=None):
+ def upload(self, uploadable):
"""
Returns a Deferred that will fire with the UploadResults instance.
"""
else:
eu = EncryptAnUploadable(uploadable, self._parentmsgid)
d2 = defer.succeed(None)
+ storage_broker = self.parent.get_storage_broker()
if self._helper:
- uploader = AssistedUploader(self._helper)
+ uploader = AssistedUploader(self._helper, storage_broker)
d2.addCallback(lambda x: eu.get_storage_index())
d2.addCallback(lambda si: uploader.start(eu, si))
else:
d2.addCallback(lambda x: uploader.start(eu))
self._all_uploads[uploader] = None
- if history:
- history.add_upload(uploader.get_upload_status())
+ if self._history:
+ self._history.add_upload(uploader.get_upload_status())
def turn_verifycap_into_read_cap(uploadresults):
# Generate the uri from the verifycap plus the key.
d3 = uploadable.get_encryption_key()
def put_readcap_into_results(key):
- v = uri.from_string(uploadresults.verifycapstr)
+ v = uri.from_string(uploadresults.get_verifycapstr())
r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
- uploadresults.uri = r.to_string()
+ uploadresults.set_uri(r.to_string())
return uploadresults
d3.addCallback(put_readcap_into_results)
return d3