self.failUnlessIdentical(u, u3)
self.failUnlessReallyEqual(u.get_verify_cap(), None)
- he = u.to_human_encoding()
- u_h = uri.LiteralFileURI.init_from_human_encoding(he)
- self.failUnlessReallyEqual(u, u_h)
-
def test_empty(self):
data = "" # This data is some *very* small data!
return self._help_test(data)
u_ro = u.get_readonly()
self.failUnlessIdentical(u, u_ro)
- he = u.to_human_encoding()
- self.failUnlessReallyEqual(he, "http://127.0.0.1:3456/uri/" + u.to_string())
- self.failUnlessReallyEqual(uri.CHKFileURI.init_from_human_encoding(he), u)
u2 = uri.from_string(u.to_string())
self.failUnlessReallyEqual(u2.get_storage_index(), storage_index)
v2 = uri.from_string(v.to_string())
self.failUnlessReallyEqual(v, v2)
- he = v.to_human_encoding()
- v2_h = uri.CHKFileVerifierURI.init_from_human_encoding(he)
- self.failUnlessReallyEqual(v2, v2_h)
v3 = uri.CHKFileVerifierURI(storage_index="\x00"*16,
uri_extension_hash="\x00"*32,
class Constraint(testutil.ReallyEqualMixin, unittest.TestCase):
def test_constraint(self):
- good="http://127.0.0.1:3456/uri/URI%3ADIR2%3Agh3l5rbvnv2333mrfvalmjfr4i%3Alz6l7u3z3b7g37s4zkdmfpx5ly4ib4m6thrpbusi6ys62qtc6mma/"
- uri.DirectoryURI.init_from_human_encoding(good)
- self.failUnlessRaises(uri.BadURIError, uri.DirectoryURI.init_from_string, good)
- bad = good + '==='
- self.failUnlessRaises(uri.BadURIError, uri.DirectoryURI.init_from_human_encoding, bad)
+ bad = "http://127.0.0.1:3456/uri/URI%3ADIR2%3Agh3l5rbvnv2333mrfvalmjfr4i%3Alz6l7u3z3b7g37s4zkdmfpx5ly4ib4m6thrpbusi6ys62qtc6mma/"
self.failUnlessRaises(uri.BadURIError, uri.DirectoryURI.init_from_string, bad)
fileURI = 'URI:CHK:gh3l5rbvnv2333mrfvalmjfr4i:lz6l7u3z3b7g37s4zkdmfpx5ly4ib4m6thrpbusi6ys62qtc6mma:3:10:345834'
uri.CHKFileURI.init_from_string(fileURI)
self.failIf(IDirnodeURI.providedBy(u))
self.failUnless("WriteableSSKFileURI" in str(u))
- he = u.to_human_encoding()
- u_h = uri.WriteableSSKFileURI.init_from_human_encoding(he)
- self.failUnlessReallyEqual(u, u_h)
-
u2 = uri.from_string(u.to_string())
self.failUnlessReallyEqual(u2.writekey, self.writekey)
self.failUnlessReallyEqual(u2.fingerprint, self.fingerprint)
u3imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u3.to_string())
self.failUnless(isinstance(u3imm, uri.UnknownURI), u3imm)
- he = u3.to_human_encoding()
- u3_h = uri.ReadonlySSKFileURI.init_from_human_encoding(he)
- self.failUnlessReallyEqual(u3, u3_h)
-
u4 = uri.ReadonlySSKFileURI(readkey, self.fingerprint)
self.failUnlessReallyEqual(u4.fingerprint, self.fingerprint)
self.failUnlessReallyEqual(u4.readkey, readkey)
self.failUnless(IVerifierURI.providedBy(u7))
self.failUnlessReallyEqual(u7.get_storage_index(), u.get_storage_index())
- he = u5.to_human_encoding()
- u5_h = uri.SSKVerifierURI.init_from_human_encoding(he)
- self.failUnlessReallyEqual(u5, u5_h)
-
-
def test_writeable_mdmf_cap(self):
u1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
cap = u1.to_string()
self.failUnless(u5.is_readonly())
self.failIf(u5.is_mutable())
-
- def test_mdmf_valid_human_encoding(self):
- # What's a human encoding? Well, it's of the form:
- base = "https://127.0.0.1:3456/uri/"
- # With a cap on the end. For each of the cap types, we need to
- # test that a valid cap (with and without the traditional
- # separators) is recognized and accepted by the classes.
- w1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
- r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
- v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
-
- # These will yield three different caps.
- for o in (w1, r1, v1):
- url = base + o.to_string()
- o1 = o.__class__.init_from_human_encoding(url)
- self.failUnlessReallyEqual(o1, o)
-
- # Note that our cap will, by default, have : as separators.
- # But it's expected that users from, e.g., the WUI, will
- # have %3A as a separator. We need to make sure that the
- # initialization routine handles that, too.
- cap = o.to_string()
- cap = re.sub(":", "%3A", cap)
- url = base + cap
- o2 = o.__class__.init_from_human_encoding(url)
- self.failUnlessReallyEqual(o2, o)
-
-
- def test_mdmf_human_encoding_invalid_base(self):
- # What's a human encoding? Well, it's of the form:
- base = "https://127.0.0.1:3456/foo/bar/bazuri/"
- # With a cap on the end. For each of the cap types, we need to
- # test that a valid cap (with and without the traditional
- # separators) is recognized and accepted by the classes.
- w1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
- r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
- v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
-
- # These will yield three different caps.
- for o in (w1, r1, v1):
- url = base + o.to_string()
- self.failUnlessRaises(uri.BadURIError,
- o.__class__.init_from_human_encoding,
- url)
-
- def test_mdmf_human_encoding_invalid_cap(self):
- base = "https://127.0.0.1:3456/uri/"
- # With a cap on the end. For each of the cap types, we need to
- # test that a valid cap (with and without the traditional
- # separators) is recognized and accepted by the classes.
- w1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
- r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
- v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
-
- # These will yield three different caps.
- for o in (w1, r1, v1):
- # not exhaustive, obviously...
- url = base + o.to_string() + "foobarbaz"
- url2 = base + "foobarbaz" + o.to_string()
- url3 = base + o.to_string()[:25] + "foo" + o.to_string()[:25]
- for u in (url, url2, url3):
- self.failUnlessRaises(uri.BadURIError,
- o.__class__.init_from_human_encoding,
- u)
-
def test_mdmf_from_string(self):
# Make sure that the from_string utility function works with
# MDMF caps.
-import re, urllib
+import re
+
from zope.interface import implements
from twisted.python.components import registerAdapter
+
from allmydata.storage.server import si_a2b, si_b2a
from allmydata.util import base32, hashutil
from allmydata.util.assertutil import _assert
# - rename all of the *URI classes/interfaces to *Cap
# - make variable and method names consistently use _uri for an URI string,
# and _cap for a Cap object (decoded URI)
-# - remove the human_encoding methods?
BASE32STR_128bits = '(%s{25}%s)' % (base32.BASE32CHAR, base32.BASE32CHAR_3bits)
BASE32STR_256bits = '(%s{51}%s)' % (base32.BASE32CHAR, base32.BASE32CHAR_1bits)
-SEP='(?::|%3A)'
NUMBER='([0-9]+)'
-NUMBER_IGNORE='(?:[0-9]+)'
-
-# "human-encoded" URIs are allowed to come with a leading
-# 'http://127.0.0.1:(8123|3456)/uri/' that will be ignored.
-# Note that nothing in the Tahoe code currently uses the human encoding.
-OPTIONALHTTPLEAD=r'(?:https?://(?:[^:/]+)(?::%s)?/uri/)?' % NUMBER_IGNORE
class _BaseURI:
else:
return True
- def to_human_encoding(self):
- return 'http://127.0.0.1:3456/uri/'+self.to_string()
-
def get_storage_index(self):
return self.storage_index
STRING_RE=re.compile('^URI:CHK:'+BASE32STR_128bits+':'+
BASE32STR_256bits+':'+NUMBER+':'+NUMBER+':'+NUMBER+
'$')
- HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'CHK'+SEP+
- BASE32STR_128bits+SEP+BASE32STR_256bits+SEP+NUMBER+
- SEP+NUMBER+SEP+NUMBER+'$')
def __init__(self, key, uri_extension_hash, needed_shares, total_shares,
size):
if not len(self.storage_index) == 16: # sha256 hash truncated to 128
raise BadURIError("storage index must be 16 bytes long")
- @classmethod
- def init_from_human_encoding(cls, uri):
- mo = cls.HUMAN_RE.search(uri)
- if not mo:
- raise BadURIError("'%s' doesn't look like a %s cap" % (uri, cls))
- return cls(base32.a2b(mo.group(1)), base32.a2b(mo.group(2)),
- int(mo.group(3)), int(mo.group(4)), int(mo.group(5)))
-
@classmethod
def init_from_string(cls, uri):
mo = cls.STRING_RE.search(uri)
total_shares=self.total_shares,
size=self.size)
+
class CHKFileVerifierURI(_BaseURI):
implements(IVerifierURI)
BASE_STRING='URI:CHK-Verifier:'
STRING_RE=re.compile('^URI:CHK-Verifier:'+BASE32STR_128bits+':'+
BASE32STR_256bits+':'+NUMBER+':'+NUMBER+':'+NUMBER)
- HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'CHK-Verifier'+SEP+
- BASE32STR_128bits+SEP+BASE32STR_256bits+SEP+NUMBER+
- SEP+NUMBER+SEP+NUMBER)
def __init__(self, storage_index, uri_extension_hash,
needed_shares, total_shares, size):
self.total_shares = total_shares
self.size = size
- @classmethod
- def init_from_human_encoding(cls, uri):
- mo = cls.HUMAN_RE.search(uri)
- if not mo:
- raise BadURIError("'%s' doesn't look like a %s cap" % (uri, cls))
- return cls(base32.a2b(mo.group(1)), base32.a2b(mo.group(2)),
- int(mo.group(3)), int(mo.group(4)), int(mo.group(5)))
-
@classmethod
def init_from_string(cls, uri):
mo = cls.STRING_RE.search(uri)
BASE_STRING='URI:LIT:'
STRING_RE=re.compile('^URI:LIT:'+base32.BASE32STR_anybytes+'$')
- HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'LIT'+SEP+base32.BASE32STR_anybytes+'$')
def __init__(self, data=None):
if data is not None:
assert isinstance(data, str)
self.data = data
- @classmethod
- def init_from_human_encoding(cls, uri):
- mo = cls.HUMAN_RE.search(uri)
- if not mo:
- raise BadURIError("'%s' doesn't look like a %s cap" % (uri, cls))
- return cls(base32.a2b(mo.group(1)))
-
@classmethod
def init_from_string(cls, uri):
mo = cls.STRING_RE.search(uri)
BASE_STRING='URI:SSK:'
STRING_RE=re.compile('^'+BASE_STRING+BASE32STR_128bits+':'+
BASE32STR_256bits+'$')
- HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'SSK'+SEP+
- BASE32STR_128bits+SEP+BASE32STR_256bits+'$')
def __init__(self, writekey, fingerprint):
self.writekey = writekey
assert len(self.storage_index) == 16
self.fingerprint = fingerprint
- @classmethod
- def init_from_human_encoding(cls, uri):
- mo = cls.HUMAN_RE.search(uri)
- if not mo:
- raise BadURIError("'%s' doesn't look like a %s cap" % (uri, cls))
- return cls(base32.a2b(mo.group(1)), base32.a2b(mo.group(2)))
-
@classmethod
def init_from_string(cls, uri):
mo = cls.STRING_RE.search(uri)
def get_verify_cap(self):
return SSKVerifierURI(self.storage_index, self.fingerprint)
+
class ReadonlySSKFileURI(_BaseURI):
implements(IURI, IMutableFileURI)
BASE_STRING='URI:SSK-RO:'
STRING_RE=re.compile('^URI:SSK-RO:'+BASE32STR_128bits+':'+BASE32STR_256bits+'$')
- HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'SSK-RO'+SEP+BASE32STR_128bits+SEP+BASE32STR_256bits+'$')
def __init__(self, readkey, fingerprint):
self.readkey = readkey
assert len(self.storage_index) == 16
self.fingerprint = fingerprint
- @classmethod
- def init_from_human_encoding(cls, uri):
- mo = cls.HUMAN_RE.search(uri)
- if not mo:
- raise BadURIError("'%s' doesn't look like a %s cap" % (uri, cls))
- return cls(base32.a2b(mo.group(1)), base32.a2b(mo.group(2)))
-
@classmethod
def init_from_string(cls, uri):
mo = cls.STRING_RE.search(uri)
def get_verify_cap(self):
return SSKVerifierURI(self.storage_index, self.fingerprint)
+
class SSKVerifierURI(_BaseURI):
implements(IVerifierURI)
BASE_STRING='URI:SSK-Verifier:'
STRING_RE=re.compile('^'+BASE_STRING+BASE32STR_128bits+':'+BASE32STR_256bits+'$')
- HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'SSK-Verifier'+SEP+BASE32STR_128bits+SEP+BASE32STR_256bits+'$')
def __init__(self, storage_index, fingerprint):
assert len(storage_index) == 16
self.storage_index = storage_index
self.fingerprint = fingerprint
- @classmethod
- def init_from_human_encoding(cls, uri):
- mo = cls.HUMAN_RE.search(uri)
- if not mo:
- raise BadURIError("'%s' doesn't look like a %s cap" % (uri, cls))
- return cls(si_a2b(mo.group(1)), base32.a2b(mo.group(2)))
-
@classmethod
def init_from_string(cls, uri):
mo = cls.STRING_RE.search(uri)
def get_verify_cap(self):
return self
+
class WriteableMDMFFileURI(_BaseURI):
implements(IURI, IMutableFileURI)
BASE_STRING='URI:MDMF:'
STRING_RE=re.compile('^'+BASE_STRING+BASE32STR_128bits+':'+BASE32STR_256bits+'(:|$)')
- HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'MDMF'+SEP+BASE32STR_128bits+SEP+BASE32STR_256bits+'(:|$)')
def __init__(self, writekey, fingerprint):
self.writekey = writekey
assert len(self.storage_index) == 16
self.fingerprint = fingerprint
- @classmethod
- def init_from_human_encoding(cls, uri):
- mo = cls.HUMAN_RE.search(uri)
- if not mo:
- raise BadURIError("'%s' doesn't look like a %s cap" % (uri, cls))
- return cls(base32.a2b(mo.group(1)), base32.a2b(mo.group(2)))
-
@classmethod
def init_from_string(cls, uri):
mo = cls.STRING_RE.search(uri)
def get_verify_cap(self):
return MDMFVerifierURI(self.storage_index, self.fingerprint)
+
class ReadonlyMDMFFileURI(_BaseURI):
implements(IURI, IMutableFileURI)
BASE_STRING='URI:MDMF-RO:'
STRING_RE=re.compile('^' +BASE_STRING+BASE32STR_128bits+':'+BASE32STR_256bits+'(:|$)')
- HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'MDMF-RO'+SEP+BASE32STR_128bits+SEP+BASE32STR_256bits+'(:|$)')
def __init__(self, readkey, fingerprint):
self.readkey = readkey
assert len(self.storage_index) == 16
self.fingerprint = fingerprint
- @classmethod
- def init_from_human_encoding(cls, uri):
- mo = cls.HUMAN_RE.search(uri)
- if not mo:
- raise BadURIError("'%s' doesn't look like a %s cap" % (uri, cls))
- return cls(base32.a2b(mo.group(1)), base32.a2b(mo.group(2)))
-
@classmethod
def init_from_string(cls, uri):
mo = cls.STRING_RE.search(uri)
def get_verify_cap(self):
return MDMFVerifierURI(self.storage_index, self.fingerprint)
+
class MDMFVerifierURI(_BaseURI):
implements(IVerifierURI)
BASE_STRING='URI:MDMF-Verifier:'
STRING_RE=re.compile('^'+BASE_STRING+BASE32STR_128bits+':'+BASE32STR_256bits+'(:|$)')
- HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'MDMF-Verifier'+SEP+BASE32STR_128bits+SEP+BASE32STR_256bits+'(:|$)')
def __init__(self, storage_index, fingerprint):
assert len(storage_index) == 16
self.storage_index = storage_index
self.fingerprint = fingerprint
- @classmethod
- def init_from_human_encoding(cls, uri):
- mo = cls.HUMAN_RE.search(uri)
- if not mo:
- raise BadURIError("'%s' doesn't look like a %s cap" % (uri, cls))
- return cls(si_a2b(mo.group(1)), base32.a2b(mo.group(2)))
-
@classmethod
def init_from_string(cls, uri):
mo = cls.STRING_RE.search(uri)
def get_verify_cap(self):
return self
+
class _DirectoryBaseURI(_BaseURI):
implements(IURI, IDirnodeURI)
def __init__(self, filenode_uri=None):
cls.INNER_URI_CLASS.BASE_STRING+bits)
return cls(fn)
- @classmethod
- def init_from_human_encoding(cls, uri):
- mo = cls.BASE_HUMAN_RE.search(uri)
- if not mo:
- raise BadURIError("'%s' doesn't look like a %s cap" % (uri, cls))
- bits = uri[mo.end():]
- while bits and bits[-1] == '/':
- bits = bits[:-1]
- fn = cls.INNER_URI_CLASS.init_from_string(
- cls.INNER_URI_CLASS.BASE_STRING+urllib.unquote(bits))
- return cls(fn)
-
def to_string(self):
fnuri = self._filenode_uri.to_string()
mo = re.match(self.INNER_URI_CLASS.BASE_STRING, fnuri)
def get_storage_index(self):
return self._filenode_uri.get_storage_index()
+
class DirectoryURI(_DirectoryBaseURI):
implements(IDirectoryURI)
BASE_STRING='URI:DIR2:'
BASE_STRING_RE=re.compile('^'+BASE_STRING)
- BASE_HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'DIR2'+SEP)
INNER_URI_CLASS=WriteableSSKFileURI
def __init__(self, filenode_uri=None):
BASE_STRING='URI:DIR2-RO:'
BASE_STRING_RE=re.compile('^'+BASE_STRING)
- BASE_HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'DIR2-RO'+SEP)
INNER_URI_CLASS=ReadonlySSKFileURI
def __init__(self, filenode_uri=None):
class ImmutableDirectoryURI(_ImmutableDirectoryBaseURI):
BASE_STRING='URI:DIR2-CHK:'
BASE_STRING_RE=re.compile('^'+BASE_STRING)
- BASE_HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'DIR2-CHK'+SEP)
INNER_URI_CLASS=CHKFileURI
def get_verify_cap(self):
class LiteralDirectoryURI(_ImmutableDirectoryBaseURI):
BASE_STRING='URI:DIR2-LIT:'
BASE_STRING_RE=re.compile('^'+BASE_STRING)
- BASE_HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'DIR2-LIT'+SEP)
INNER_URI_CLASS=LiteralFileURI
def get_verify_cap(self):
BASE_STRING='URI:DIR2-MDMF:'
BASE_STRING_RE=re.compile('^'+BASE_STRING)
- BASE_HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'DIR2-MDMF'+SEP)
INNER_URI_CLASS=WriteableMDMFFileURI
def __init__(self, filenode_uri=None):
BASE_STRING='URI:DIR2-MDMF-RO:'
BASE_STRING_RE=re.compile('^'+BASE_STRING)
- BASE_HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'DIR2-MDMF-RO'+SEP)
INNER_URI_CLASS=ReadonlyMDMFFileURI
def __init__(self, filenode_uri=None):
BASE_STRING='URI:DIR2-MDMF-Verifier:'
BASE_STRING_RE=re.compile('^'+BASE_STRING)
- BASE_HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'DIR2-MDMF-Verifier'+SEP)
INNER_URI_CLASS=MDMFVerifierURI
def __init__(self, filenode_uri=None):
BASE_STRING='URI:DIR2-Verifier:'
BASE_STRING_RE=re.compile('^'+BASE_STRING)
- BASE_HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'DIR2-Verifier'+SEP)
INNER_URI_CLASS=SSKVerifierURI
def __init__(self, filenode_uri=None):
implements(IVerifierURI)
BASE_STRING='URI:DIR2-CHK-Verifier:'
BASE_STRING_RE=re.compile('^'+BASE_STRING)
- BASE_HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'DIR2-CHK-VERIFIER'+SEP)
INNER_URI_CLASS=CHKFileVerifierURI