assert IMutableFileURI.providedBy(u), u
return MutableFileNode(self).init_from_uri(u)
- def create_empty_dirnode(self, wait_for_numpeers=None):
+ def create_empty_dirnode(self):
n = NewDirectoryNode(self)
- d = n.create(wait_for_numpeers=wait_for_numpeers)
+ d = n.create()
d.addCallback(lambda res: n)
return d
- def create_mutable_file(self, contents="", wait_for_numpeers=None):
+ def create_mutable_file(self, contents=""):
n = MutableFileNode(self)
- d = n.create(contents, wait_for_numpeers=wait_for_numpeers)
+ d = n.create(contents)
d.addCallback(lambda res: n)
return d
- def upload(self, uploadable, wait_for_numpeers=None):
+ def upload(self, uploadable):
uploader = self.getServiceNamed("uploader")
- return uploader.upload(uploadable, wait_for_numpeers=wait_for_numpeers)
+ return uploader.upload(uploadable)
self._node.init_from_uri(self._uri.get_filenode_uri())
return self
- def create(self, wait_for_numpeers=None):
+ def create(self):
"""
Returns a deferred that eventually fires with self once the directory
has been created (distributed across a set of storage servers).
# URI to create our own.
self._node = self.filenode_class(self._client)
empty_contents = self._pack_contents({})
- d = self._node.create(empty_contents, wait_for_numpeers=wait_for_numpeers)
+ d = self._node.create(empty_contents)
d.addCallback(self._filenode_created)
return d
def _filenode_created(self, res):
d.addCallback(_got)
return d
- def set_uri(self, name, child_uri, metadata={}, wait_for_numpeers=None):
+ def set_uri(self, name, child_uri, metadata={}):
"""I add a child (by URI) at the specific name. I return a Deferred
that fires with the child node when the operation finishes. I will
replace any existing child of the same name.
If this directory node is read-only, the Deferred will errback with a
NotMutableError."""
- return self.set_node(name, self._create_node(child_uri), metadata,
- wait_for_numpeers)
+ return self.set_node(name, self._create_node(child_uri), metadata)
- def set_uris(self, entries, wait_for_numpeers=None):
+ def set_uris(self, entries):
node_entries = []
for e in entries:
if len(e) == 2:
assert len(e) == 3
name, child_uri, metadata = e
node_entries.append( (name,self._create_node(child_uri),metadata) )
- return self.set_nodes(node_entries, wait_for_numpeers)
+ return self.set_nodes(node_entries)
- def set_node(self, name, child, metadata={}, wait_for_numpeers=None):
+ def set_node(self, name, child, metadata={}):
"""I add a child at the specific name. I return a Deferred that fires
when the operation finishes. This Deferred will fire with the child
node that was just added. I will replace any existing child of the
If this directory node is read-only, the Deferred will errback with a
NotMutableError."""
assert IFilesystemNode.providedBy(child), child
- d = self.set_nodes( [(name, child, metadata)], wait_for_numpeers)
+ d = self.set_nodes( [(name, child, metadata)])
d.addCallback(lambda res: child)
return d
- def set_nodes(self, entries, wait_for_numpeers=None):
+ def set_nodes(self, entries):
if self.is_readonly():
return defer.fail(NotMutableError())
d = self._read()
name, child, metadata = e
children[name] = (child, metadata)
new_contents = self._pack_contents(children)
- return self._node.replace(new_contents, wait_for_numpeers=wait_for_numpeers)
+ return self._node.replace(new_contents)
d.addCallback(_add)
d.addCallback(lambda res: None)
return d
- def add_file(self, name, uploadable, wait_for_numpeers=None):
+ def add_file(self, name, uploadable):
"""I upload a file (using the given IUploadable), then attach the
resulting FileNode to the directory at the given name. I return a
Deferred that fires (with the IFileNode of the uploaded file) when
the operation completes."""
if self.is_readonly():
return defer.fail(NotMutableError())
- d = self._client.upload(uploadable, wait_for_numpeers=wait_for_numpeers)
+ d = self._client.upload(uploadable)
d.addCallback(self._client.create_node_from_uri)
- d.addCallback(lambda node: self.set_node(name, node, wait_for_numpeers=wait_for_numpeers))
+ d.addCallback(lambda node: self.set_node(name, node))
return d
def delete(self, name):
d.addCallback(_delete)
return d
- def create_empty_directory(self, name, wait_for_numpeers=None):
+ def create_empty_directory(self, name):
"""I create and attach an empty directory at the given name. I return
a Deferred that fires (with the new directory node) when the
operation finishes."""
if self.is_readonly():
return defer.fail(NotMutableError())
- d = self._client.create_empty_dirnode(wait_for_numpeers=wait_for_numpeers)
+ d = self._client.create_empty_dirnode()
def _created(child):
- d = self.set_node(name, child, wait_for_numpeers=wait_for_numpeers)
+ d = self.set_node(name, child)
d.addCallback(lambda res: child)
return d
d.addCallback(_created)
return d
def move_child_to(self, current_child_name, new_parent,
- new_child_name=None, wait_for_numpeers=None):
+ new_child_name=None):
"""I take one of my children and move them to a new parent. The child
is referenced by name. On the new parent, the child will live under
'new_child_name', which defaults to 'current_child_name'. I return a
new_child_name = current_child_name
d = self.get(current_child_name)
def sn(child):
- return new_parent.set_node(new_child_name, child,
- wait_for_numpeers=wait_for_numpeers)
+ return new_parent.set_node(new_child_name, child)
d.addCallback(sn)
d.addCallback(lambda child: self.delete(current_child_name))
return d
more advanced API will signal and provide access to the multiple
heads."""
- def replace(newdata, wait_for_numpeers=None):
+ def replace(newdata):
"""Replace the old contents with the new data. Returns a Deferred
that fires (with None) when the operation is complete.
closed."""
class IUploader(Interface):
- def upload(uploadable, wait_for_numpeers=None):
+ def upload(uploadable):
"""Upload the file. 'uploadable' must impement IUploadable. This
returns a Deferred which fires with the URI of the file."""
"""
class IClient(Interface):
- def upload(uploadable, wait_for_numpeers=None):
+ def upload(uploadable):
"""Upload some data into a CHK, get back the URI string for it.
@param uploadable: something that implements IUploadable
- @param wait_for_numpeers: don't upload anything until we have at least
- this many peers connected
@return: a Deferred that fires with the (string) URI for this file.
"""
- def create_mutable_file(contents="", wait_for_numpeers=None):
+ def create_mutable_file(contents=""):
"""Create a new mutable file with contents, get back the URI string.
@param contents: the initial contents to place in the file.
- @param wait_for_numpeers: don't upload anything until we have at least
- this many peers connected
@return: a Deferred that fires with tne (string) SSK URI for the new
file.
"""
- def create_empty_dirnode(wait_for_numpeers=None):
+ def create_empty_dirnode():
"""Create a new dirnode, empty and unattached.
- @param wait_for_numpeers: don't create anything until we have at least
- this many peers connected.
@return: a Deferred that fires with the new IDirectoryNode instance.
"""
num = log.err(*args, **kwargs)
return num
- def publish(self, newdata, wait_for_numpeers=None):
+ def publish(self, newdata):
"""Publish the filenode's current contents. Returns a Deferred that
fires (with None) when the publish has done as much work as it's ever
going to do, or errbacks with ConsistencyError if it detects a
simultaneous write.
-
- It will wait until at least wait_for_numpeers peers are connected
- before it starts uploading
-
- If wait_for_numpeers is None then it will be set to a default value
- (currently 1).
"""
- if wait_for_numpeers is None:
- wait_for_numpeers = 1
-
- self.log("starting publish")
-
- d = self._node._client.introducer_client.when_enough_peers(wait_for_numpeers)
- d.addCallback(lambda dummy: self._after_enough_peers(newdata))
- return d
- def _after_enough_peers(self, newdata):
# 1: generate shares (SDMF: files are small, so we can do it in RAM)
# 2: perform peer selection, get candidate servers
# 2a: send queries to n+epsilon servers, to determine current shares
# 4a: may need to run recovery algorithm
# 5: when enough responses are back, we're done
- self.log("got enough peers, datalen is %s" % len(newdata))
+ self.log("starting publish, datalen is %s" % len(newdata))
self._writekey = self._node.get_writekey()
assert self._writekey, "need write capability to publish"
self._encprivkey = None
return self
- def create(self, initial_contents, wait_for_numpeers=None):
+ def create(self, initial_contents):
"""Call this when the filenode is first created. This will generate
the keys, generate the initial shares, wait until at least numpeers
are connected, allocate shares, and upload the initial
# nobody knows about us yet"
self._current_seqnum = 0
self._current_roothash = "\x00"*32
- return self._publish(initial_contents, wait_for_numpeers=wait_for_numpeers)
+ return self._publish(initial_contents)
d.addCallback(_generated)
return d
verifier = signer.get_verifying_key()
return verifier, signer
- def _publish(self, initial_contents, wait_for_numpeers):
+ def _publish(self, initial_contents):
p = self.publish_class(self)
- d = p.publish(initial_contents, wait_for_numpeers=wait_for_numpeers)
+ d = p.publish(initial_contents)
d.addCallback(lambda res: self)
return d
r = Retrieve(self)
return r.retrieve()
- def replace(self, newdata, wait_for_numpeers=None):
+ def replace(self, newdata):
r = Retrieve(self)
d = r.retrieve()
- d.addCallback(lambda res: self._publish(newdata, wait_for_numpeers=wait_for_numpeers))
+ d.addCallback(lambda res: self._publish(newdata))
return d
self._log_number = self._helper.log("CHKUploadHelper starting")
self._client = helper.parent
- self._wait_for_numpeers = None
self._options = {}
self.set_params( (3,7,10) ) # GACK
self.client = client
self.my_uri = make_mutable_file_uri()
self.storage_index = self.my_uri.storage_index
- def create(self, initial_contents, wait_for_numpeers=None):
+ def create(self, initial_contents):
self.all_contents[self.storage_index] = initial_contents
return defer.succeed(self)
def init_from_uri(self, myuri):
def get_size(self):
return "?" # TODO: see mutable.MutableFileNode.get_size
- def replace(self, new_contents, wait_for_numpeers=None):
+ def replace(self, new_contents):
assert not self.is_readonly()
self.all_contents[self.storage_index] = new_contents
return defer.succeed(None)
class FakeClient:
implements(IClient)
- def upload(self, uploadable, wait_for_numpeers):
+ def upload(self, uploadable):
d = uploadable.get_size()
d.addCallback(lambda size: uploadable.read(size))
def _got_data(datav):
return FakeDirectoryNode(self).init_from_uri(u)
return Marker(u.to_string())
- def create_empty_dirnode(self, wait_for_numpeers):
+ def create_empty_dirnode(self):
n = FakeDirectoryNode(self)
- d = n.create(wait_for_numpeers)
+ d = n.create()
d.addCallback(lambda res: n)
return d
self.client = FakeClient()
def test_basic(self):
- d = self.client.create_empty_dirnode(0)
+ d = self.client.create_empty_dirnode()
def _done(res):
self.failUnless(isinstance(res, FakeDirectoryNode))
rep = str(res)
return d
def test_corrupt(self):
- d = self.client.create_empty_dirnode(0)
+ d = self.client.create_empty_dirnode()
def _created(dn):
u = make_mutable_file_uri()
d = dn.set_uri("child", u)
return d
def test_check(self):
- d = self.client.create_empty_dirnode(0)
+ d = self.client.create_empty_dirnode()
d.addCallback(lambda dn: dn.check())
def _done(res):
pass
filenode = self.client.create_node_from_uri(fileuri)
uploadable = upload.Data("some data")
- d = self.client.create_empty_dirnode(0)
+ d = self.client.create_empty_dirnode()
def _created(rw_dn):
d2 = rw_dn.set_uri("child", fileuri)
d2.addCallback(lambda res: rw_dn)
def test_create(self):
self.expected_manifest = []
- d = self.client.create_empty_dirnode(wait_for_numpeers=1)
+ d = self.client.create_empty_dirnode()
def _then(n):
self.failUnless(n.is_mutable())
u = n.get_uri()
self.expected_manifest.append(ffu_v)
d.addCallback(lambda res: n.set_uri("child", fake_file_uri))
- d.addCallback(lambda res: n.create_empty_directory("subdir", wait_for_numpeers=1))
+ d.addCallback(lambda res: n.create_empty_directory("subdir"))
def _created(subdir):
self.failUnless(isinstance(subdir, FakeDirectoryNode))
self.subdir = subdir
d.addCallback(_check_manifest)
def _add_subsubdir(res):
- return self.subdir.create_empty_directory("subsubdir", wait_for_numpeers=1)
+ return self.subdir.create_empty_directory("subsubdir")
d.addCallback(_add_subsubdir)
d.addCallback(lambda res: n.get_child_at_path("subdir/subsubdir"))
d.addCallback(lambda subsubdir:
all_contents = {}
all_rw_friends = {}
- def create(self, initial_contents, wait_for_numpeers=None):
- d = mutable.MutableFileNode.create(self, initial_contents, wait_for_numpeers=None)
+ def create(self, initial_contents):
+ d = mutable.MutableFileNode.create(self, initial_contents)
def _then(res):
self.all_contents[self.get_uri()] = initial_contents
return res
def _generate_pubprivkeys(self):
count = self.counter.next()
return FakePubKey(count), FakePrivKey(count)
- def _publish(self, initial_contents, wait_for_numpeers):
+ def _publish(self, initial_contents):
self.all_contents[self.get_uri()] = initial_contents
return defer.succeed(self)
return defer.succeed(self.all_contents[self.all_rw_friends[self.get_uri()]])
else:
return defer.succeed(self.all_contents[self.get_uri()])
- def replace(self, newdata, wait_for_numpeers=None):
+ def replace(self, newdata):
self.all_contents[self.get_uri()] = newdata
return defer.succeed(None)
class FakeNewDirectoryNode(dirnode.NewDirectoryNode):
filenode_class = FakeFilenode
-class FakeIntroducerClient:
- def when_enough_peers(self, numpeers):
- return defer.succeed(None)
-
class FakeClient:
def __init__(self, num_peers=10):
self._num_peers = num_peers
self._peerids = [tagged_hash("peerid", "%d" % i)[:20]
for i in range(self._num_peers)]
- self.introducer_client = FakeIntroducerClient()
self.nodeid = "fakenodeid"
def log(self, msg, **kw):
def get_cancel_secret(self):
return "I hereby permit you to cancel my leases"
- def create_empty_dirnode(self, wait_for_numpeers):
+ def create_empty_dirnode(self):
n = FakeNewDirectoryNode(self)
- d = n.create(wait_for_numpeers=wait_for_numpeers)
+ d = n.create()
d.addCallback(lambda res: n)
return d
def create_dirnode_from_uri(self, u):
return FakeNewDirectoryNode(self).init_from_uri(u)
- def create_mutable_file(self, contents="", wait_for_numpeers=None):
+ def create_mutable_file(self, contents=""):
n = FakeFilenode(self)
- d = n.create(contents, wait_for_numpeers=wait_for_numpeers)
+ d = n.create(contents)
d.addCallback(lambda res: n)
return d
results.sort()
return results
- def upload(self, uploadable, wait_for_numpeers=None):
+ def upload(self, uploadable):
assert IUploadable.providedBy(uploadable)
d = uploadable.get_size()
d.addCallback(lambda length: uploadable.read(length))
self.client = FakeClient()
def test_create(self):
- d = self.client.create_mutable_file(wait_for_numpeers=1)
+ d = self.client.create_mutable_file()
def _created(n):
d = n.replace("contents 1")
d.addCallback(lambda res: self.failUnlessIdentical(res, None))
# .create usually returns a Deferred, but we happen to know it's
# synchronous
CONTENTS = "some initial contents"
- fn.create(CONTENTS, wait_for_numpeers=1)
+ fn.create(CONTENTS)
p = mutable.Publish(fn)
target_info = None
d = defer.maybeDeferred(p._encrypt_and_encode, target_info,
# .create usually returns a Deferred, but we happen to know it's
# synchronous
CONTENTS = "some initial contents"
- fn.create(CONTENTS, wait_for_numpeers=1)
+ fn.create(CONTENTS)
p = mutable.Publish(fn)
r = mutable.Retrieve(fn)
# make some fake shares
def _create_mutable(res):
c = self.clients[0]
log.msg("starting create_mutable_file")
- d1 = c.create_mutable_file(DATA, wait_for_numpeers=self.numclients)
+ d1 = c.create_mutable_file(DATA)
def _done(res):
log.msg("DONE: %s" % (res,))
self._mutable_node_1 = res
self.failUnlessEqual(res, DATA)
# replace the data
log.msg("starting replace1")
- d1 = newnode.replace(NEWDATA, wait_for_numpeers=self.numclients)
+ d1 = newnode.replace(NEWDATA)
d1.addCallback(lambda res: newnode.download_to_data())
return d1
d.addCallback(_check_download_3)
newnode2 = self.clients[3].create_node_from_uri(uri)
self._newnode3 = self.clients[3].create_node_from_uri(uri)
log.msg("starting replace2")
- d1 = newnode1.replace(NEWERDATA, wait_for_numpeers=self.numclients)
+ d1 = newnode1.replace(NEWERDATA)
d1.addCallback(lambda res: newnode2.download_to_data())
return d1
d.addCallback(_check_download_4)
def _check_empty_file(res):
# make sure we can create empty files, this usually screws up the
# segsize math
- d1 = self.clients[2].create_mutable_file("", wait_for_numpeers=self.numclients)
+ d1 = self.clients[2].create_mutable_file("")
d1.addCallback(lambda newnode: newnode.download_to_data())
d1.addCallback(lambda res: self.failUnlessEqual("", res))
return d1
d.addCallback(_check_empty_file)
- d.addCallback(lambda res: self.clients[0].create_empty_dirnode(wait_for_numpeers=self.numclients))
+ d.addCallback(lambda res: self.clients[0].create_empty_dirnode())
def _created_dirnode(dnode):
log.msg("_created_dirnode(%s)" % (dnode,))
d1 = dnode.list()
d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
d1.addCallback(lambda res: dnode.has_child("edgar"))
d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
- d1.addCallback(lambda res: dnode.set_node("see recursive", dnode, wait_for_numpeers=self.numclients))
+ d1.addCallback(lambda res: dnode.set_node("see recursive", dnode))
d1.addCallback(lambda res: dnode.has_child("see recursive"))
d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
d1.addCallback(lambda res: dnode.build_manifest())
def _do_publish1(self, res):
ut = upload.Data(self.data)
c0 = self.clients[0]
- d = c0.create_empty_dirnode(wait_for_numpeers=self.numclients)
+ d = c0.create_empty_dirnode()
def _made_root(new_dirnode):
self._root_directory_uri = new_dirnode.get_uri()
return c0.create_node_from_uri(self._root_directory_uri)
d.addCallback(_made_root)
- d.addCallback(lambda root: root.create_empty_directory("subdir1", wait_for_numpeers=self.numclients))
+ d.addCallback(lambda root: root.create_empty_directory("subdir1"))
def _made_subdir1(subdir1_node):
self._subdir1_node = subdir1_node
- d1 = subdir1_node.add_file("mydata567", ut, wait_for_numpeers=self.numclients)
+ d1 = subdir1_node.add_file("mydata567", ut)
d1.addCallback(self.log, "publish finished")
def _stash_uri(filenode):
self.uri = filenode.get_uri()
def _do_publish2(self, res):
ut = upload.Data(self.data)
- d = self._subdir1_node.create_empty_directory("subdir2", wait_for_numpeers=self.numclients)
- d.addCallback(lambda subdir2: subdir2.add_file("mydata992", ut, wait_for_numpeers=self.numclients))
+ d = self._subdir1_node.create_empty_directory("subdir2")
+ d.addCallback(lambda subdir2: subdir2.add_file("mydata992", ut))
return d
def _bounce_client0(self, res):
def _do_publish_private(self, res):
self.smalldata = "sssh, very secret stuff"
ut = upload.Data(self.smalldata)
- d = self.clients[0].create_empty_dirnode(wait_for_numpeers=self.numclients)
+ d = self.clients[0].create_empty_dirnode()
d.addCallback(self.log, "GOT private directory")
def _got_new_dir(privnode):
rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
- d1 = privnode.create_empty_directory("personal", wait_for_numpeers=self.numclients)
+ d1 = privnode.create_empty_directory("personal")
d1.addCallback(self.log, "made P/personal")
- d1.addCallback(lambda node: node.add_file("sekrit data", ut, wait_for_numpeers=self.numclients))
+ d1.addCallback(lambda node: node.add_file("sekrit data", ut))
d1.addCallback(self.log, "made P/personal/sekrit data")
d1.addCallback(lambda res: rootnode.get_child_at_path(["subdir1", "subdir2"]))
def _got_s2(s2node):
- d2 = privnode.set_uri("s2-rw", s2node.get_uri(), wait_for_numpeers=self.numclients)
- d2.addCallback(lambda node: privnode.set_uri("s2-ro", s2node.get_readonly_uri(), wait_for_numpeers=self.numclients))
+ d2 = privnode.set_uri("s2-rw", s2node.get_uri())
+ d2.addCallback(lambda node: privnode.set_uri("s2-ro", s2node.get_readonly_uri()))
return d2
d1.addCallback(_got_s2)
d1.addCallback(lambda res: privnode)
precondition(not self.closed)
self.closed = True
-class FakeIntroducerClient:
- def when_enough_peers(self, numpeers):
- return defer.succeed(None)
-
class FakeClient:
def __init__(self, mode="good", num_servers=50):
self.mode = mode
self.num_servers = num_servers
- self.introducer_client = FakeIntroducerClient()
def log(self, *args, **kwargs):
pass
def get_permuted_peers(self, storage_index, include_myself):
assert IMutableFileURI.providedBy(u), u
return FakeMutableFileNode(self).init_from_uri(u)
- def create_empty_dirnode(self, wait_for_numpeers=None):
+ def create_empty_dirnode(self):
n = NonGridDirectoryNode(self)
- d = n.create(wait_for_numpeers)
+ d = n.create()
d.addCallback(lambda res: n)
return d
- def create_mutable_file(self, contents="", wait_for_numpeers=None):
+ def create_mutable_file(self, contents=""):
n = FakeMutableFileNode(self)
return n.create(contents)
- def upload(self, uploadable, wait_for_numpeers=None):
+ def upload(self, uploadable):
d = uploadable.get_size()
d.addCallback(lambda size: uploadable.read(size))
def _got_data(datav):
class CHKUploader:
peer_selector_class = Tahoe2PeerSelector
- def __init__(self, client, options={}, wait_for_numpeers=None):
- assert wait_for_numpeers is None or isinstance(wait_for_numpeers, int), wait_for_numpeers
+ def __init__(self, client, options={}):
self._client = client
- self._wait_for_numpeers = wait_for_numpeers
self._options = options
self._log_number = self._client.log("CHKUploader starting")
class LiteralUploader:
- def __init__(self, client, wait_for_numpeers, options={}):
+ def __init__(self, client, options={}):
self._client = client
self._options = options
def _got_helper(self, helper):
self._helper = helper
- def upload(self, uploadable, options={}, wait_for_numpeers=None):
- assert wait_for_numpeers is None or isinstance(wait_for_numpeers, int), wait_for_numpeers
+ def upload(self, uploadable, options={}):
# this returns the URI
assert self.parent
assert self.running
d = uploadable.get_size()
def _got_size(size):
if size <= self.URI_LIT_SIZE_THRESHOLD:
- uploader = LiteralUploader(self.parent, options,
- wait_for_numpeers)
+ uploader = LiteralUploader(self.parent, options)
elif self._helper:
uploader = AssistedUploader(self._helper, options)
else:
- uploader = self.uploader_class(self.parent, options,
- wait_for_numpeers)
+ uploader = self.uploader_class(self.parent, options)
uploader.set_params(self.parent.get_encoding_parameters()
or self.DEFAULT_ENCODING_PARAMETERS)
return uploader.start(uploadable)