from allmydata.mutable.filenode import MutableFileNode
from allmydata.unknown import UnknownNode
from allmydata.interfaces import IMutableFileNode, IDirectoryNode,\
- IFileNode, IMutableFileURI, IFilesystemNode, \
+ IFileNode, IFilesystemNode, \
ExistingChildError, NoSuchChildError, ICheckable, IDeepCheckable, \
CannotPackUnknownNodeError
from allmydata.check_results import DeepCheckResults, \
def __init__(self, filenode, nodemaker, uploader):
self._node = filenode
- filenode_uri = IMutableFileURI(filenode.get_uri())
- if filenode_uri.is_readonly():
- self._uri = ReadonlyDirectoryURI(filenode_uri)
+ filenode_cap = filenode.get_cap()
+ if filenode_cap.is_readonly():
+ self._uri = ReadonlyDirectoryURI(filenode_cap)
else:
- self._uri = DirectoryURI(filenode_uri)
+ self._uri = DirectoryURI(filenode_cap)
self._nodemaker = nodemaker
self._uploader = uploader
self._most_recent_size = None
from zope.interface import implements
from twisted.internet import defer, reactor
from foolscap.api import eventually
-from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \
+from allmydata.interfaces import IMutableFileNode, \
ICheckable, ICheckResults, NotEnoughSharesError
from allmydata.util import hashutil, log
from allmydata.util.assertutil import precondition
def get_readcap(self):
return self._uri.get_readonly()
def get_verify_cap(self):
- return IMutableFileURI(self._uri).get_verify_cap()
+ return self._uri.get_verify_cap()
def get_repair_cap(self):
if self._uri.is_readonly():
return None
(contents, type(contents))
return contents(self)
+ def get_cap(self):
+ return self.uri
def get_uri(self):
return self.uri.to_string()
def download_best_version(self):
from allmydata.util import idlib, mathutil
from allmydata.util import log, base32
from allmydata.scripts import runner
-from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI, \
+from allmydata.interfaces import IDirectoryNode, IFileNode, \
NoSuchChildError, NoSharesError
from allmydata.monitor import Monitor
from allmydata.mutable.common import NotMutableError
def mangle_uri(self, gooduri):
# change the key, which changes the storage index, which means we'll
# be asking about the wrong file, so nobody will have any shares
- u = IFileURI(gooduri)
+ u = uri.from_string(gooduri)
u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
uri_extension_hash=u.uri_extension_hash,
needed_shares=u.needed_shares,
import allmydata # for __full_version__
from allmydata import uri, monitor, client
from allmydata.immutable import upload
-from allmydata.interfaces import IFileURI, FileTooLargeError, NoSharesError, \
+from allmydata.interfaces import FileTooLargeError, NoSharesError, \
NotEnoughSharesError
from allmydata.util.assertutil import precondition
from allmydata.util.deferredutil import DeferredListShouldSucceed
self.node.DEFAULT_ENCODING_PARAMETERS = p
def _check_small(self, newuri, size):
- u = IFileURI(newuri)
+ u = uri.from_string(newuri)
self.failUnless(isinstance(u, uri.LiteralFileURI))
self.failUnlessEqual(len(u.data), size)
def _check_large(self, newuri, size):
- u = IFileURI(newuri)
+ u = uri.from_string(newuri)
self.failUnless(isinstance(u, uri.CHKFileURI))
self.failUnless(isinstance(u.storage_index, str))
self.failUnlessEqual(len(u.storage_index), 16)
self.u.parent = self.node
def _check_large(self, newuri, size):
- u = IFileURI(newuri)
+ u = uri.from_string(newuri)
self.failUnless(isinstance(u, uri.CHKFileURI))
self.failUnless(isinstance(u.storage_index, str))
self.failUnlessEqual(len(u.storage_index), 16)
return DATA[:size]
def _check_large(self, newuri, size):
- u = IFileURI(newuri)
+ u = uri.from_string(newuri)
self.failUnless(isinstance(u, uri.CHKFileURI))
self.failUnless(isinstance(u.storage_index, str))
self.failUnlessEqual(len(u.storage_index), 16)
self.failIf(u.is_mutable())
u_ro = u.get_readonly()
self.failUnlessIdentical(u, u_ro)
- u1a = IFileURI(u.to_string())
- self.failUnlessEqual(u1a, u)
he = u.to_human_encoding()
self.failUnlessEqual(he, "http://127.0.0.1:3456/uri/" + u.to_string())
self.failUnlessEqual(uri.CHKFileURI.init_from_human_encoding(he), u)
self.failUnless(IMutableFileURI.providedBy(u))
self.failIf(IDirnodeURI.providedBy(u))
self.failUnless("WriteableSSKFileURI" in str(u))
- u1a = IMutableFileURI(u.to_string())
- self.failUnlessEqual(u1a, u)
he = u.to_human_encoding()
u_h = uri.WriteableSSKFileURI.init_from_human_encoding(he)
u5 = u4.get_verify_cap()
self.failUnless(IVerifierURI.providedBy(u5))
self.failUnlessEqual(u5.storage_index, u.storage_index)
- u6 = IVerifierURI(u5.to_string())
- self.failUnless(IVerifierURI.providedBy(u6))
- self.failUnlessEqual(u6.storage_index, u.storage_index)
u7 = u.get_verify_cap()
self.failUnless(IVerifierURI.providedBy(u7))
self.failUnlessEqual(u7.storage_index, u.storage_index)
u1_filenode = u1.get_filenode_cap()
self.failUnless(u1_filenode.is_mutable())
self.failIf(u1_filenode.is_readonly())
- u1a = IDirnodeURI(u1.to_string())
- self.failUnlessEqual(u1a, u1)
u2 = uri.from_string(u1.to_string())
self.failUnlessEqual(u1.to_string(), u2.to_string())
verifiers = [u1.get_verify_cap(), u2.get_verify_cap(),
u3.get_verify_cap(), u4.get_verify_cap(),
- IVerifierURI(u1.get_verify_cap().to_string()),
uri.DirectoryURIVerifier(n.get_verify_cap()),
- uri.DirectoryURIVerifier(n.get_verify_cap().to_string()),
]
for v in verifiers:
self.failUnless(IVerifierURI.providedBy(v))
def __init__(self, filenode_uri=None):
if filenode_uri:
- filenode_uri = IVerifierURI(filenode_uri)
+ assert IVerifierURI.providedBy(filenode_uri)
self._filenode_uri = filenode_uri
def get_filenode_cap(self):