Signed-off-by: Daira Hopwood <daira@jacaranda.org>
def __init__(self, basedir="."):
node.Node.__init__(self, basedir)
def __init__(self, basedir="."):
node.Node.__init__(self, basedir)
+ self.connected_enough_d = defer.Deferred()
self.started_timestamp = time.time()
self.logSource="Client"
self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
self.started_timestamp = time.time()
self.logSource="Client"
self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
# (and everybody else who wants to use storage servers)
ps = self.get_config("client", "peers.preferred", "").split(",")
preferred_peers = tuple([p.strip() for p in ps if p != ""])
# (and everybody else who wants to use storage servers)
ps = self.get_config("client", "peers.preferred", "").split(",")
preferred_peers = tuple([p.strip() for p in ps if p != ""])
- sb = storage_client.StorageFarmBroker(self.tub, permute_peers=True, preferred_peers=preferred_peers)
+
+ connection_threshold = min(self.encoding_params["k"],
+ self.encoding_params["happy"] + 1)
+
+ sb = storage_client.StorageFarmBroker(self.tub, True, connection_threshold,
+ self.connected_enough_d, preferred_peers=preferred_peers)
self.storage_broker = sb
# load static server specifications from tahoe.cfg, if any.
self.storage_broker = sb
# load static server specifications from tahoe.cfg, if any.
s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
s.setServiceParent(self)
s.startService()
s = drop_upload.DropUploader(self, upload_dircap, local_dir_utf8)
s.setServiceParent(self)
s.startService()
+
+ # start processing the upload queue when we've connected to enough servers
+ self.connected_enough_d.addCallback(s.ready)
except Exception, e:
self.log("couldn't start drop-uploader: %r", args=(e,))
except Exception, e:
self.log("couldn't start drop-uploader: %r", args=(e,))
self._convergence = client.convergence
self._local_path = FilePath(local_dir)
self._convergence = client.convergence
self._local_path = FilePath(local_dir)
+ self.is_upload_ready = False
+
if inotify is None:
from twisted.internet import inotify
self._inotify = inotify
if inotify is None:
from twisted.internet import inotify
self._inotify = inotify
self._stats_provider.count('drop_upload.dirs_monitored', 1)
return d
self._stats_provider.count('drop_upload.dirs_monitored', 1)
return d
+ def upload_ready(self):
+ """upload_ready is used to signal us to start
+ processing the upload items...
+ """
+ self.is_upload_ready = True
+
def _notify(self, opaque, path, events_mask):
self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
def _notify(self, opaque, path, events_mask):
self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
I'm also responsible for subscribing to the IntroducerClient to find out
about new servers as they are announced by the Introducer.
"""
I'm also responsible for subscribing to the IntroducerClient to find out
about new servers as they are announced by the Introducer.
"""
- def __init__(self, tub, permute_peers, preferred_peers=()):
+ def __init__(self, tub, permute_peers, connected_threshold, connected_d,
+ preferred_peers=()):
self.tub = tub
assert permute_peers # False not implemented yet
self.permute_peers = permute_peers
self.tub = tub
assert permute_peers # False not implemented yet
self.permute_peers = permute_peers
+ self.connected_threshold = connected_threshold
+ self.connected_d = connected_d
self.preferred_peers = preferred_peers
# self.servers maps serverid -> IServer, and keeps track of all the
# storage servers that we've heard about. Each descriptor manages its
self.preferred_peers = preferred_peers
# self.servers maps serverid -> IServer, and keeps track of all the
# storage servers that we've heard about. Each descriptor manages its
# these two are used in unit tests
def test_add_rref(self, serverid, rref, ann):
# these two are used in unit tests
def test_add_rref(self, serverid, rref, ann):
- s = NativeStorageServer(serverid, ann.copy())
+ s = NativeStorageServer(serverid, ann.copy(), self)
s.rref = rref
s._is_connected = True
self.servers[serverid] = s
s.rref = rref
s._is_connected = True
self.servers[serverid] = s
precondition(isinstance(key_s, str), key_s)
precondition(key_s.startswith("v0-"), key_s)
assert ann["service-name"] == "storage"
precondition(isinstance(key_s, str), key_s)
precondition(key_s.startswith("v0-"), key_s)
assert ann["service-name"] == "storage"
- s = NativeStorageServer(key_s, ann)
+ s = NativeStorageServer(key_s, ann, self)
serverid = s.get_serverid()
old = self.servers.get(serverid)
if old:
serverid = s.get_serverid()
old = self.servers.get(serverid)
if old:
for dsc in self.servers.values():
dsc.try_to_connect()
for dsc in self.servers.values():
dsc.try_to_connect()
+ def check_enough_connected(self):
+ if (self.connected_d is not None and
+ len(self.get_connected_servers()) >= self.connected_threshold):
+ d = self.connected_d
+ self.connected_d = None
+ d.callback(None)
+
def get_servers_for_psi(self, peer_selection_index):
# return a list of server objects (IServers)
assert self.permute_peers == True
def get_servers_for_psi(self, peer_selection_index):
# return a list of server objects (IServers)
assert self.permute_peers == True
"application-version": "unknown: no get_version()",
}
"application-version": "unknown: no get_version()",
}
- def __init__(self, key_s, ann):
+ def __init__(self, key_s, ann, broker):
self.key_s = key_s
self.announcement = ann
self.key_s = key_s
self.announcement = ann
assert "anonymous-storage-FURL" in ann, ann
furl = str(ann["anonymous-storage-FURL"])
assert "anonymous-storage-FURL" in ann, ann
furl = str(ann["anonymous-storage-FURL"])
default = self.VERSION_DEFAULTS
d = add_version_to_remote_reference(rref, default)
d.addCallback(self._got_versioned_service, lp)
default = self.VERSION_DEFAULTS
d = add_version_to_remote_reference(rref, default)
d.addCallback(self._got_versioned_service, lp)
+ d.addCallback(lambda ign: self.broker.check_enough_connected())
d.addErrback(log.err, format="storageclient._got_connection",
name=self.get_name(), umid="Sdq3pg")
d.addErrback(log.err, format="storageclient._got_connection",
name=self.get_name(), umid="Sdq3pg")
class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
def create_fake_client(self):
class WebResultsRendering(unittest.TestCase, WebRenderingMixin):
def create_fake_client(self):
- sb = StorageFarmBroker(None, True)
+ sb = StorageFarmBroker(None, True, 0, None)
# s.get_name() (the "short description") will be "v0-00000000".
# s.get_longname() will include the -long suffix.
# s.get_peerid() (i.e. tubid) will be "aaa.." or "777.." or "ceir.."
# s.get_name() (the "short description") will be "v0-00000000".
# s.get_longname() will include the -long suffix.
# s.get_peerid() (i.e. tubid) will be "aaa.." or "777.." or "ceir.."
"my-version": "ver",
"oldest-supported": "oldest",
}
"my-version": "ver",
"oldest-supported": "oldest",
}
- s = NativeStorageServer(key_s, ann)
+ s = NativeStorageServer(key_s, ann, sb)
sb.test_add_server(peerid, s) # XXX: maybe use key_s?
c = FakeClient()
c.storage_broker = sb
sb.test_add_server(peerid, s) # XXX: maybe use key_s?
c = FakeClient()
c.storage_broker = sb
return [ s.get_longname() for s in sb.get_servers_for_psi(key) ]
def test_permute(self):
return [ s.get_longname() for s in sb.get_servers_for_psi(key) ]
def test_permute(self):
- sb = StorageFarmBroker(None, True)
+ sb = StorageFarmBroker(None, True, 0, None)
for k in ["%d" % i for i in range(5)]:
ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
"permutation-seed-base32": base32.b2a(k) }
for k in ["%d" % i for i in range(5)]:
ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
"permutation-seed-base32": base32.b2a(k) }
self.failUnlessReallyEqual(self._permute(sb, "one"), [])
def test_permute_with_preferred(self):
self.failUnlessReallyEqual(self._permute(sb, "one"), [])
def test_permute_with_preferred(self):
- sb = StorageFarmBroker(None, True, ['1','4'])
+ sb = StorageFarmBroker(None, True, 0, None, ['1','4'])
for k in ["%d" % i for i in range(5)]:
ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
"permutation-seed-base32": base32.b2a(k) }
for k in ["%d" % i for i in range(5)]:
ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
"permutation-seed-base32": base32.b2a(k) }
timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
def setUp(self):
self.s = FakeClient()
timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
def setUp(self):
self.s = FakeClient()
- self.s.storage_broker = StorageFarmBroker(None, True)
+ self.s.storage_broker = StorageFarmBroker(None, True, 0, None)
self.s.secret_holder = client.SecretHolder("lease secret", "converge")
self.s.startService()
self.s.secret_holder = client.SecretHolder("lease secret", "converge")
self.s.startService()
s = FakeStorage()
peerids = [tagged_hash("peerid", "%d" % i)[:20]
for i in range(num_peers)]
s = FakeStorage()
peerids = [tagged_hash("peerid", "%d" % i)[:20]
for i in range(num_peers)]
- storage_broker = StorageFarmBroker(None, True)
+ storage_broker = StorageFarmBroker(None, True, 0, None)
for peerid in peerids:
fss = FakeStorageServer(peerid, s)
ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid),
for peerid in peerids:
fss = FakeStorageServer(peerid, s)
ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid),
mode = dict([i,mode] for i in range(num_servers))
servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
for fakeid in range(self.num_servers) ]
mode = dict([i,mode] for i in range(num_servers))
servers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
for fakeid in range(self.num_servers) ]
- self.storage_broker = StorageFarmBroker(None, permute_peers=True)
+ self.storage_broker = StorageFarmBroker(None, True, 0, None)
for (serverid, rref) in servers:
ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid),
"permutation-seed-base32": base32.b2a(serverid) }
for (serverid, rref) in servers:
ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid),
"permutation-seed-base32": base32.b2a(serverid) }
self._secret_holder = SecretHolder("lease secret", "convergence secret")
self.helper = None
self.convergence = "some random string"
self._secret_holder = SecretHolder("lease secret", "convergence secret")
self.helper = None
self.convergence = "some random string"
- self.storage_broker = StorageFarmBroker(None, permute_peers=True)
+ self.storage_broker = StorageFarmBroker(None, True, 0, None)
# fake knowledge of another server
self.storage_broker.test_add_server("other_nodeid",
FakeDisplayableServer("other_nodeid", u"other_nickname \u263B"))
# fake knowledge of another server
self.storage_broker.test_add_server("other_nodeid",
FakeDisplayableServer("other_nodeid", u"other_nickname \u263B"))