ciphertext_fetched, # how much the helper fetched
preexisting_shares, # count of shares already present
pushed_shares, # count of shares we pushed
- sharemap, # {shnum: set(serverid)}
- servermap, # {serverid: set(shnum)}
+ sharemap, # {shnum: set(server)}
+ servermap, # {server: set(shnum)}
timings, # dict of name to number of seconds
uri_extension_data,
uri_extension_hash,
def get_pushed_shares(self):
return self._pushed_shares
def get_sharemap(self):
- return self._sharemap
+ # returns {shnum: set(serverid)}
+ sharemap = {}
+ for shnum, servers in self._sharemap.items():
+ sharemap[shnum] = set([s.get_serverid() for s in servers])
+ return sharemap
def get_servermap(self):
- return self._servermap
+ # returns {serverid: set(shnum)}
+ servermap = {}
+ for server, shnums in self._servermap.items():
+ servermap[server.get_serverid()] = set(shnums)
+ return servermap
def get_timings(self):
return self._timings
def get_uri_extension_data(self):
return ("<ServerTracker for server %s and SI %s>"
% (self._server.get_name(), si_b2a(self.storage_index)[:5]))
+ def get_server(self):
+ return self._server
def get_serverid(self):
return self._server.get_serverid()
def get_name(self):
sharemap = dictutil.DictOfSets()
servermap = dictutil.DictOfSets()
for shnum in e.get_shares_placed():
- server_tracker = self._server_trackers[shnum]
- serverid = server_tracker.get_serverid()
- sharemap.add(shnum, serverid)
- servermap.add(serverid, shnum)
+ server = self._server_trackers[shnum].get_server()
+ sharemap.add(shnum, server)
+ servermap.add(server, shnum)
now = time.time()
timings = {}
timings["total"] = now - self._started
class AssistedUploader:
- def __init__(self, helper):
+ def __init__(self, helper, storage_broker):
self._helper = helper
+ self._storage_broker = storage_broker
self._log_number = log.msg("AssistedUploader starting")
self._storage_index = None
self._upload_status = s = UploadStatus()
now = time.time()
timings["total"] = now - self._started
+ gss = self._storage_broker.get_stub_server
+ sharemap = {}
+ servermap = {}
+ for shnum, serverids in hur.sharemap.items():
+ sharemap[shnum] = set([gss(serverid) for serverid in serverids])
+ # if the file was already in the grid, hur.servermap is an empty dict
+ for serverid, shnums in hur.servermap.items():
+ servermap[gss(serverid)] = set(shnums)
+
ur = UploadResults(file_size=self._size,
# not if already found
ciphertext_fetched=hur.ciphertext_fetched,
preexisting_shares=hur.preexisting_shares,
pushed_shares=hur.pushed_shares,
- sharemap=hur.sharemap,
- servermap=hur.servermap, # not if already found
+ sharemap=sharemap,
+ servermap=servermap,
timings=timings,
uri_extension_data=hur.uri_extension_data,
uri_extension_hash=hur.uri_extension_hash,
else:
eu = EncryptAnUploadable(uploadable, self._parentmsgid)
d2 = defer.succeed(None)
+ storage_broker = self.parent.get_storage_broker()
if self._helper:
- uploader = AssistedUploader(self._helper)
+ uploader = AssistedUploader(self._helper, storage_broker)
d2.addCallback(lambda x: eu.get_storage_index())
d2.addCallback(lambda si: uploader.start(eu, si))
else:
def get_encoding_parameters(self):
return self.DEFAULT_ENCODING_PARAMETERS
+ def get_storage_broker(self):
+ return self.storage_broker
def flush_but_dont_ignore(res):
d = flushEventualQueue()
timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
def setUp(self):
self.s = FakeClient()
- self.storage_broker = StorageFarmBroker(None, True)
- self.secret_holder = client.SecretHolder("lease secret", "convergence")
+ self.s.storage_broker = StorageFarmBroker(None, True)
+ self.s.secret_holder = client.SecretHolder("lease secret", "converge")
self.s.startService()
self.tub = t = Tub()
def setUpHelper(self, basedir, helper_class=Helper_fake_upload):
fileutil.make_dirs(basedir)
self.helper = h = helper_class(basedir,
- self.storage_broker,
- self.secret_holder,
+ self.s.storage_broker,
+ self.s.secret_holder,
None, None)
self.helper_furl = self.tub.registerReference(h)