self.buckets = {} # k: shareid, v: IRemoteBucketWriter
self.sharesize = sharesize
- wbp = layout.make_write_bucket_proxy(None, sharesize,
+ wbp = layout.make_write_bucket_proxy(None, None, sharesize,
blocksize, num_segments,
num_share_hashes,
- EXTENSION_SIZE, server.get_serverid())
+ EXTENSION_SIZE)
self.wbp_class = wbp.__class__ # to create more of them
self.allocated_size = wbp.get_allocated_size()
self.blocksize = blocksize
def __repr__(self):
return ("<ServerTracker for server %s and SI %s>"
- % (self._server.name(), si_b2a(self.storage_index)[:5]))
+ % (self._server.get_name(), si_b2a(self.storage_index)[:5]))
def get_serverid(self):
return self._server.get_serverid()
- def name(self):
- return self._server.name()
+ def get_name(self):
+ return self._server.get_name()
def query(self, sharenums):
rref = self._server.get_rref()
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
b = {}
for sharenum, rref in buckets.iteritems():
- bp = self.wbp_class(rref, self.sharesize,
+ bp = self.wbp_class(rref, self._server, self.sharesize,
self.blocksize,
self.num_segments,
self.num_share_hashes,
- EXTENSION_SIZE,
- self._server.get_serverid())
+ EXTENSION_SIZE)
b[sharenum] = bp
self.buckets.update(b)
return (alreadygot, set(b.keys()))
def str_shareloc(shnum, bucketwriter):
- return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),)
+ return "%s: %s" % (shnum, bucketwriter.get_servername(),)
class Tahoe2ServerSelector(log.PrefixingLogMixin):
num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
# figure out how much space to ask for
- wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
- num_share_hashes, EXTENSION_SIZE,
- None)
+ wbp = layout.make_write_bucket_proxy(None, None,
+ share_size, 0, num_segments,
+ num_share_hashes, EXTENSION_SIZE)
allocated_size = wbp.get_allocated_size()
all_servers = storage_broker.get_servers_for_psi(storage_index)
if not all_servers:
v0 = server.get_rref().version
v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
return v1["maximum-immutable-share-size"]
- writable_servers = [server for server in all_servers
+ writeable_servers = [server for server in all_servers
if _get_maxsize(server) >= allocated_size]
- readonly_servers = set(all_servers[:2*total_shares]) - set(writable_servers)
+ readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)
# decide upon the renewal/cancel secrets, to include them in the
# allocate_buckets query.
# second-pass list and repeat the "second" pass (really the third,
# fourth, etc pass), until all shares are assigned, or we've run out
# of potential servers.
- self.first_pass_trackers = _make_trackers(writable_servers)
+ self.first_pass_trackers = _make_trackers(writeable_servers)
self.second_pass_trackers = [] # servers worth asking again
self.next_pass_trackers = [] # servers that we have asked again
self._started_second_pass = False
self.num_servers_contacted += 1
self.query_count += 1
self.log("asking server %s for any existing shares" %
- (tracker.name(),), level=log.NOISY)
+ (tracker.get_name(),), level=log.NOISY)
dl = defer.DeferredList(ds)
dl.addCallback(lambda ign: self._loop())
return dl
serverid = tracker.get_serverid()
if isinstance(res, failure.Failure):
self.log("%s got error during existing shares check: %s"
- % (tracker.name(), res), level=log.UNUSUAL)
+ % (tracker.get_name(), res), level=log.UNUSUAL)
self.error_count += 1
self.bad_query_count += 1
else:
if buckets:
self.serverids_with_shares.add(serverid)
self.log("response to get_buckets() from server %s: alreadygot=%s"
- % (tracker.name(), tuple(sorted(buckets))),
+ % (tracker.get_name(), tuple(sorted(buckets))),
level=log.NOISY)
for bucket in buckets:
self.preexisting_shares.setdefault(bucket, set()).add(serverid)
if self._status:
self._status.set_status("Contacting Servers [%s] (first query),"
" %d shares left.."
- % (tracker.name(),
+ % (tracker.get_name(),
len(self.homeless_shares)))
d = tracker.query(shares_to_ask)
d.addBoth(self._got_response, tracker, shares_to_ask,
if self._status:
self._status.set_status("Contacting Servers [%s] (second query),"
" %d shares left.."
- % (tracker.name(),
+ % (tracker.get_name(),
len(self.homeless_shares)))
d = tracker.query(shares_to_ask)
d.addBoth(self._got_response, tracker, shares_to_ask,
else:
(alreadygot, allocated) = res
self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
- % (tracker.name(),
+ % (tracker.get_name(),
tuple(sorted(alreadygot)), tuple(sorted(allocated))),
level=log.NOISY)
progress = False
name = "uploader"
URI_LIT_SIZE_THRESHOLD = 55
- def __init__(self, helper_furl=None, stats_provider=None):
+ def __init__(self, helper_furl=None, stats_provider=None, history=None):
self._helper_furl = helper_furl
self.stats_provider = stats_provider
+ self._history = history
self._helper = None
self._all_uploads = weakref.WeakKeyDictionary() # for debugging
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
return (self._helper_furl, bool(self._helper))
- def upload(self, uploadable, history=None):
+ def upload(self, uploadable):
"""
Returns a Deferred that will fire with the UploadResults instance.
"""
d2.addCallback(lambda x: uploader.start(eu))
self._all_uploads[uploader] = None
- if history:
- history.add_upload(uploader.get_upload_status())
+ if self._history:
+ self._history.add_upload(uploader.get_upload_status())
def turn_verifycap_into_read_cap(uploadresults):
# Generate the uri from the verifycap plus the key.
d3 = uploadable.get_encryption_key()