+import re
from twisted.trial import unittest
from allmydata import uri
from allmydata.util import hashutil, base32
uri.CHKFileURI.init_from_string(fileURI)
class Mutable(testutil.ReallyEqualMixin, unittest.TestCase):
- def test_pack(self):
- writekey = "\x01" * 16
- fingerprint = "\x02" * 32
+ def setUp(self):
+ self.writekey = "\x01" * 16
+ self.fingerprint = "\x02" * 32
+ self.readkey = hashutil.ssk_readkey_hash(self.writekey)
+ self.storage_index = hashutil.ssk_storage_index_hash(self.readkey)
- u = uri.WriteableSSKFileURI(writekey, fingerprint)
- self.failUnlessReallyEqual(u.writekey, writekey)
- self.failUnlessReallyEqual(u.fingerprint, fingerprint)
+ def test_pack(self):
+ u = uri.WriteableSSKFileURI(self.writekey, self.fingerprint)
+ self.failUnlessReallyEqual(u.writekey, self.writekey)
+ self.failUnlessReallyEqual(u.fingerprint, self.fingerprint)
self.failIf(u.is_readonly())
self.failUnless(u.is_mutable())
self.failUnless(IURI.providedBy(u))
self.failUnlessReallyEqual(u, u_h)
u2 = uri.from_string(u.to_string())
- self.failUnlessReallyEqual(u2.writekey, writekey)
- self.failUnlessReallyEqual(u2.fingerprint, fingerprint)
+ self.failUnlessReallyEqual(u2.writekey, self.writekey)
+ self.failUnlessReallyEqual(u2.fingerprint, self.fingerprint)
self.failIf(u2.is_readonly())
self.failUnless(u2.is_mutable())
self.failUnless(IURI.providedBy(u2))
self.failUnless(isinstance(u2imm, uri.UnknownURI), u2imm)
u3 = u2.get_readonly()
- readkey = hashutil.ssk_readkey_hash(writekey)
- self.failUnlessReallyEqual(u3.fingerprint, fingerprint)
+ readkey = hashutil.ssk_readkey_hash(self.writekey)
+ self.failUnlessReallyEqual(u3.fingerprint, self.fingerprint)
self.failUnlessReallyEqual(u3.readkey, readkey)
self.failUnless(u3.is_readonly())
self.failUnless(u3.is_mutable())
u3_h = uri.ReadonlySSKFileURI.init_from_human_encoding(he)
self.failUnlessReallyEqual(u3, u3_h)
- u4 = uri.ReadonlySSKFileURI(readkey, fingerprint)
- self.failUnlessReallyEqual(u4.fingerprint, fingerprint)
+ u4 = uri.ReadonlySSKFileURI(readkey, self.fingerprint)
+ self.failUnlessReallyEqual(u4.fingerprint, self.fingerprint)
self.failUnlessReallyEqual(u4.readkey, readkey)
self.failUnless(u4.is_readonly())
self.failUnless(u4.is_mutable())
self.failUnlessReallyEqual(u5, u5_h)
+ def test_writable_mdmf_cap(self):
+ u1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint)
+ cap = u1.to_string()
+ u = uri.WritableMDMFFileURI.init_from_string(cap)
+
+ self.failUnless(IMutableFileURI.providedBy(u))
+ self.failUnlessReallyEqual(u.fingerprint, self.fingerprint)
+ self.failUnlessReallyEqual(u.writekey, self.writekey)
+ self.failUnless(u.is_mutable())
+ self.failIf(u.is_readonly())
+ self.failUnlessEqual(cap, u.to_string())
+
+ # Now get a readonly cap from the writable cap, and test that it
+ # degrades gracefully.
+ ru = u.get_readonly()
+ self.failUnlessReallyEqual(self.readkey, ru.readkey)
+ self.failUnlessReallyEqual(self.fingerprint, ru.fingerprint)
+ self.failUnless(ru.is_mutable())
+ self.failUnless(ru.is_readonly())
+
+ # Now get a verifier cap.
+ vu = ru.get_verify_cap()
+ self.failUnlessReallyEqual(self.storage_index, vu.storage_index)
+ self.failUnlessReallyEqual(self.fingerprint, vu.fingerprint)
+ self.failUnless(IVerifierURI.providedBy(vu))
+
+ def test_readonly_mdmf_cap(self):
+ u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
+ cap = u1.to_string()
+ u2 = uri.ReadonlyMDMFFileURI.init_from_string(cap)
+
+ self.failUnlessReallyEqual(u2.fingerprint, self.fingerprint)
+ self.failUnlessReallyEqual(u2.readkey, self.readkey)
+ self.failUnless(u2.is_readonly())
+ self.failUnless(u2.is_mutable())
+
+ vu = u2.get_verify_cap()
+ self.failUnlessEqual(vu.storage_index, self.storage_index)
+ self.failUnlessEqual(vu.fingerprint, self.fingerprint)
+
+ def test_create_writable_mdmf_cap_from_readcap(self):
+ # we shouldn't be able to create a writable MDMF cap given only a
+ # readcap.
+ u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
+ cap = u1.to_string()
+ self.failUnlessRaises(uri.BadURIError,
+ uri.WritableMDMFFileURI.init_from_string,
+ cap)
+
+ def test_create_writable_mdmf_cap_from_verifycap(self):
+ u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
+ cap = u1.to_string()
+ self.failUnlessRaises(uri.BadURIError,
+ uri.WritableMDMFFileURI.init_from_string,
+ cap)
+
+ def test_create_readonly_mdmf_cap_from_verifycap(self):
+ u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
+ cap = u1.to_string()
+ self.failUnlessRaises(uri.BadURIError,
+ uri.ReadonlyMDMFFileURI.init_from_string,
+ cap)
+
+ def test_mdmf_verifier_cap(self):
+ u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
+ self.failUnless(u1.is_readonly())
+ self.failIf(u1.is_mutable())
+ self.failUnlessReallyEqual(self.storage_index, u1.storage_index)
+ self.failUnlessReallyEqual(self.fingerprint, u1.fingerprint)
+
+ cap = u1.to_string()
+ u2 = uri.MDMFVerifierURI.init_from_string(cap)
+ self.failUnless(u2.is_readonly())
+ self.failIf(u2.is_mutable())
+ self.failUnlessReallyEqual(self.storage_index, u2.storage_index)
+ self.failUnlessReallyEqual(self.fingerprint, u2.fingerprint)
+
+ u3 = u2.get_readonly()
+ self.failUnlessReallyEqual(u3, u2)
+
+ u4 = u2.get_verify_cap()
+ self.failUnlessReallyEqual(u4, u2)
+
+ def test_mdmf_cap_extra_information(self):
+ # MDMF caps can be arbitrarily extended after the fingerprint
+ # and key/storage index fields.
+ u1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint)
+ self.failUnlessEqual([], u1.get_extension_params())
+
+ cap = u1.to_string()
+ # Now let's append some fields. Say, 131073 (the segment size)
+ # and 3 (the "k" encoding parameter).
+ expected_extensions = []
+ for e in ('131073', '3'):
+ cap += (":%s" % e)
+ expected_extensions.append(e)
+
+ u2 = uri.WritableMDMFFileURI.init_from_string(cap)
+ self.failUnlessReallyEqual(self.writekey, u2.writekey)
+ self.failUnlessReallyEqual(self.fingerprint, u2.fingerprint)
+ self.failIf(u2.is_readonly())
+ self.failUnless(u2.is_mutable())
+
+ c2 = u2.to_string()
+ u2n = uri.WritableMDMFFileURI.init_from_string(c2)
+ self.failUnlessReallyEqual(u2, u2n)
+
+ # We should get the extra back when we ask for it.
+ self.failUnlessEqual(expected_extensions, u2.get_extension_params())
+
+ # These should be preserved through cap attenuation, too.
+ u3 = u2.get_readonly()
+ self.failUnlessReallyEqual(self.readkey, u3.readkey)
+ self.failUnlessReallyEqual(self.fingerprint, u3.fingerprint)
+ self.failUnless(u3.is_readonly())
+ self.failUnless(u3.is_mutable())
+ self.failUnlessEqual(expected_extensions, u3.get_extension_params())
+
+ c3 = u3.to_string()
+ u3n = uri.ReadonlyMDMFFileURI.init_from_string(c3)
+ self.failUnlessReallyEqual(u3, u3n)
+
+ u4 = u3.get_verify_cap()
+ self.failUnlessReallyEqual(self.storage_index, u4.storage_index)
+ self.failUnlessReallyEqual(self.fingerprint, u4.fingerprint)
+ self.failUnless(u4.is_readonly())
+ self.failIf(u4.is_mutable())
+
+ c4 = u4.to_string()
+ u4n = uri.MDMFVerifierURI.init_from_string(c4)
+ self.failUnlessReallyEqual(u4n, u4)
+
+ self.failUnlessEqual(expected_extensions, u4.get_extension_params())
+
+
+ def test_sdmf_cap_extra_information(self):
+ # For interface consistency, we define a method to get
+ # extensions for SDMF files as well. This method must always
+ # return no extensions, since SDMF files were not created with
+ # extensions and cannot be modified to include extensions
+ # without breaking older clients.
+ u1 = uri.WriteableSSKFileURI(self.writekey, self.fingerprint)
+ cap = u1.to_string()
+ u2 = uri.WriteableSSKFileURI.init_from_string(cap)
+ self.failUnlessEqual([], u2.get_extension_params())
+
+ def test_extension_character_range(self):
+ # As written now, we shouldn't put things other than numbers in
+ # the extension fields.
+ writecap = uri.WritableMDMFFileURI(self.writekey, self.fingerprint).to_string()
+ readcap = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint).to_string()
+ vcap = uri.MDMFVerifierURI(self.storage_index, self.fingerprint).to_string()
+ self.failUnlessRaises(uri.BadURIError,
+ uri.WritableMDMFFileURI.init_from_string,
+ ("%s:invalid" % writecap))
+ self.failUnlessRaises(uri.BadURIError,
+ uri.ReadonlyMDMFFileURI.init_from_string,
+ ("%s:invalid" % readcap))
+ self.failUnlessRaises(uri.BadURIError,
+ uri.MDMFVerifierURI.init_from_string,
+ ("%s:invalid" % vcap))
+
+
+ def test_mdmf_valid_human_encoding(self):
+ # What's a human encoding? Well, it's of the form:
+ base = "https://127.0.0.1:3456/uri/"
+ # With a cap on the end. For each of the cap types, we need to
+ # test that a valid cap (with and without the traditional
+ # separators) is recognized and accepted by the classes.
+ w1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint)
+ w2 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint,
+ ['131073', '3'])
+ r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
+ r2 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint,
+ ['131073', '3'])
+ v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
+ v2 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint,
+ ['131073', '3'])
+
+ # These will yield six different caps.
+ for o in (w1, w2, r1 , r2, v1, v2):
+ url = base + o.to_string()
+ o1 = o.__class__.init_from_human_encoding(url)
+ self.failUnlessReallyEqual(o1, o)
+
+ # Note that our cap will, by default, have : as separators.
+ # But it's expected that users from, e.g., the WUI, will
+ # have %3A as a separator. We need to make sure that the
+ # initialization routine handles that, too.
+ cap = o.to_string()
+ cap = re.sub(":", "%3A", cap)
+ url = base + cap
+ o2 = o.__class__.init_from_human_encoding(url)
+ self.failUnlessReallyEqual(o2, o)
+
+
+ def test_mdmf_human_encoding_invalid_base(self):
+ # What's a human encoding? Well, it's of the form:
+ base = "https://127.0.0.1:3456/foo/bar/bazuri/"
+ # With a cap on the end. For each of the cap types, we need to
+ # test that a valid cap (with and without the traditional
+ # separators) is recognized and accepted by the classes.
+ w1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint)
+ w2 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint,
+ ['131073', '3'])
+ r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
+ r2 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint,
+ ['131073', '3'])
+ v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
+ v2 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint,
+ ['131073', '3'])
+
+ # These will yield six different caps.
+ for o in (w1, w2, r1 , r2, v1, v2):
+ url = base + o.to_string()
+ self.failUnlessRaises(uri.BadURIError,
+ o.__class__.init_from_human_encoding,
+ url)
+
+ def test_mdmf_human_encoding_invalid_cap(self):
+ base = "https://127.0.0.1:3456/uri/"
+ # With a cap on the end. For each of the cap types, we need to
+ # test that a valid cap (with and without the traditional
+ # separators) is recognized and accepted by the classes.
+ w1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint)
+ w2 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint,
+ ['131073', '3'])
+ r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
+ r2 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint,
+ ['131073', '3'])
+ v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
+ v2 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint,
+ ['131073', '3'])
+
+ # These will yield six different caps.
+ for o in (w1, w2, r1 , r2, v1, v2):
+ # not exhaustive, obviously...
+ url = base + o.to_string() + "foobarbaz"
+ url2 = base + "foobarbaz" + o.to_string()
+ url3 = base + o.to_string()[:25] + "foo" + o.to_string()[:25]
+ for u in (url, url2, url3):
+ self.failUnlessRaises(uri.BadURIError,
+ o.__class__.init_from_human_encoding,
+ u)
+
+ def test_mdmf_from_string(self):
+ # Make sure that the from_string utility function works with
+ # MDMF caps.
+ u1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint)
+ cap = u1.to_string()
+ self.failUnless(uri.is_uri(cap))
+ u2 = uri.from_string(cap)
+ self.failUnlessReallyEqual(u1, u2)
+ u3 = uri.from_string_mutable_filenode(cap)
+ self.failUnlessEqual(u3, u1)
+
+ # XXX: We should refactor the extension field into setUp
+ u1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint,
+ ['131073', '3'])
+ cap = u1.to_string()
+ self.failUnless(uri.is_uri(cap))
+ u2 = uri.from_string(cap)
+ self.failUnlessReallyEqual(u1, u2)
+ u3 = uri.from_string_mutable_filenode(cap)
+ self.failUnlessEqual(u3, u1)
+
+ u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
+ cap = u1.to_string()
+ self.failUnless(uri.is_uri(cap))
+ u2 = uri.from_string(cap)
+ self.failUnlessReallyEqual(u1, u2)
+ u3 = uri.from_string_mutable_filenode(cap)
+ self.failUnlessEqual(u3, u1)
+
+ u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint,
+ ['131073', '3'])
+ cap = u1.to_string()
+ self.failUnless(uri.is_uri(cap))
+ u2 = uri.from_string(cap)
+ self.failUnlessReallyEqual(u1, u2)
+ u3 = uri.from_string_mutable_filenode(cap)
+ self.failUnlessEqual(u3, u1)
+
+ u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
+ cap = u1.to_string()
+ self.failUnless(uri.is_uri(cap))
+ u2 = uri.from_string(cap)
+ self.failUnlessReallyEqual(u1, u2)
+ u3 = uri.from_string_verifier(cap)
+ self.failUnlessEqual(u3, u1)
+
+ u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint,
+ ['131073', '3'])
+ cap = u1.to_string()
+ self.failUnless(uri.is_uri(cap))
+ u2 = uri.from_string(cap)
+ self.failUnlessReallyEqual(u1, u2)
+ u3 = uri.from_string_verifier(cap)
+ self.failUnlessEqual(u3, u1)
+
+
class Dirnode(testutil.ReallyEqualMixin, unittest.TestCase):
def test_pack(self):
writekey = "\x01" * 16
self.failUnlessReallyEqual(u1.get_verify_cap(), None)
self.failUnlessReallyEqual(u1.get_storage_index(), None)
self.failUnlessReallyEqual(u1.abbrev_si(), "<LIT>")
+
+ def test_mdmf(self):
+ writekey = "\x01" * 16
+ fingerprint = "\x02" * 32
+ uri1 = uri.WritableMDMFFileURI(writekey, fingerprint)
+ d1 = uri.MDMFDirectoryURI(uri1)
+ self.failIf(d1.is_readonly())
+ self.failUnless(d1.is_mutable())
+ self.failUnless(IURI.providedBy(d1))
+ self.failUnless(IDirnodeURI.providedBy(d1))
+ d1_uri = d1.to_string()
+
+ d2 = uri.from_string(d1_uri)
+ self.failUnlessIsInstance(d2, uri.MDMFDirectoryURI)
+ self.failIf(d2.is_readonly())
+ self.failUnless(d2.is_mutable())
+ self.failUnless(IURI.providedBy(d2))
+ self.failUnless(IDirnodeURI.providedBy(d2))
+
+ # It doesn't make sense to ask for a deep immutable URI for a
+ # mutable directory, and we should get back a result to that
+ # effect.
+ d3 = uri.from_string(d2.to_string(), deep_immutable=True)
+ self.failUnlessIsInstance(d3, uri.UnknownURI)
+
+ def test_mdmf_with_extensions(self):
+ writekey = "\x01" * 16
+ fingerprint = "\x02" * 32
+ uri1 = uri.WritableMDMFFileURI(writekey, fingerprint)
+ d1 = uri.MDMFDirectoryURI(uri1)
+ d1_uri = d1.to_string()
+ # Add some extensions, verify that the URI is interpreted
+ # correctly.
+ d1_uri += ":3:131073"
+ uri2 = uri.from_string(d1_uri)
+ self.failUnlessIsInstance(uri2, uri.MDMFDirectoryURI)
+ self.failUnless(IURI.providedBy(uri2))
+ self.failUnless(IDirnodeURI.providedBy(uri2))
+ self.failUnless(uri1.is_mutable())
+ self.failIf(uri1.is_readonly())
+
+ d2_uri = uri2.to_string()
+ self.failUnlessIn(":3:131073", d2_uri)
+
+ # Now attenuate, verify that the extensions persist
+ ro_uri = uri2.get_readonly()
+ self.failUnlessIsInstance(ro_uri, uri.ReadonlyMDMFDirectoryURI)
+ self.failUnless(ro_uri.is_mutable())
+ self.failUnless(ro_uri.is_readonly())
+ self.failUnless(IURI.providedBy(ro_uri))
+ self.failUnless(IDirnodeURI.providedBy(ro_uri))
+ ro_uri_str = ro_uri.to_string()
+ self.failUnlessIn(":3:131073", ro_uri_str)
+
+ def test_mdmf_attenuation(self):
+ writekey = "\x01" * 16
+ fingerprint = "\x02" * 32
+
+ uri1 = uri.WritableMDMFFileURI(writekey, fingerprint)
+ d1 = uri.MDMFDirectoryURI(uri1)
+ self.failUnless(d1.is_mutable())
+ self.failIf(d1.is_readonly())
+ self.failUnless(IURI.providedBy(d1))
+ self.failUnless(IDirnodeURI.providedBy(d1))
+
+ d1_uri = d1.to_string()
+ d1_uri_from_fn = uri.MDMFDirectoryURI(d1.get_filenode_cap()).to_string()
+ self.failUnlessEqual(d1_uri_from_fn, d1_uri)
+
+ uri2 = uri.from_string(d1_uri)
+ self.failUnlessIsInstance(uri2, uri.MDMFDirectoryURI)
+ self.failUnless(IURI.providedBy(uri2))
+ self.failUnless(IDirnodeURI.providedBy(uri2))
+ self.failUnless(uri2.is_mutable())
+ self.failIf(uri2.is_readonly())
+
+ ro = uri2.get_readonly()
+ self.failUnlessIsInstance(ro, uri.ReadonlyMDMFDirectoryURI)
+ self.failUnless(ro.is_mutable())
+ self.failUnless(ro.is_readonly())
+ self.failUnless(IURI.providedBy(ro))
+ self.failUnless(IDirnodeURI.providedBy(ro))
+
+ ro_uri = ro.to_string()
+ n = uri.from_string(ro_uri, deep_immutable=True)
+ self.failUnlessIsInstance(n, uri.UnknownURI)
+
+ fn_cap = ro.get_filenode_cap()
+ fn_ro_cap = fn_cap.get_readonly()
+ d3 = uri.ReadonlyMDMFDirectoryURI(fn_ro_cap)
+ self.failUnlessEqual(ro.to_string(), d3.to_string())
+ self.failUnless(ro.is_mutable())
+ self.failUnless(ro.is_readonly())
+
+ def test_mdmf_verifier(self):
+ # I'm not sure what I want to write here yet.
+ writekey = "\x01" * 16
+ fingerprint = "\x02" * 32
+ uri1 = uri.WritableMDMFFileURI(writekey, fingerprint)
+ d1 = uri.MDMFDirectoryURI(uri1)
+ v1 = d1.get_verify_cap()
+ self.failUnlessIsInstance(v1, uri.MDMFDirectoryURIVerifier)
+ self.failIf(v1.is_mutable())
+
+ d2 = uri.from_string(d1.to_string())
+ v2 = d2.get_verify_cap()
+ self.failUnlessIsInstance(v2, uri.MDMFDirectoryURIVerifier)
+ self.failIf(v2.is_mutable())
+ self.failUnlessEqual(v2.to_string(), v1.to_string())
+
+ # Now attenuate and make sure that works correctly.
+ r3 = d2.get_readonly()
+ v3 = r3.get_verify_cap()
+ self.failUnlessIsInstance(v3, uri.MDMFDirectoryURIVerifier)
+ self.failIf(v3.is_mutable())
+ self.failUnlessEqual(v3.to_string(), v1.to_string())
+ r4 = uri.from_string(r3.to_string())
+ v4 = r4.get_verify_cap()
+ self.failUnlessIsInstance(v4, uri.MDMFDirectoryURIVerifier)
+ self.failIf(v4.is_mutable())
+ self.failUnlessEqual(v4.to_string(), v3.to_string())
SEP='(?::|%3A)'
NUMBER='([0-9]+)'
NUMBER_IGNORE='(?:[0-9]+)'
+OPTIONAL_EXTENSION_FIELD = '(' + SEP + '[0-9' + SEP + ']+|)'
# "human-encoded" URIs are allowed to come with a leading
# 'http://127.0.0.1:(8123|3456)/uri/' that will be ignored.
def get_verify_cap(self):
return SSKVerifierURI(self.storage_index, self.fingerprint)
+ def get_extension_params(self):
+ return []
+
+ def set_extension_params(self, params):
+ pass
class ReadonlySSKFileURI(_BaseURI):
implements(IURI, IMutableFileURI)
def get_verify_cap(self):
return SSKVerifierURI(self.storage_index, self.fingerprint)
+ def get_extension_params(self):
+ return []
+
+ def set_extension_params(self, params):
+ pass
class SSKVerifierURI(_BaseURI):
implements(IVerifierURI)
def get_verify_cap(self):
return self
+ def get_extension_params(self):
+ return []
+
+ def set_extension_params(self, params):
+ pass
+
+class WritableMDMFFileURI(_BaseURI):
+ implements(IURI, IMutableFileURI)
+
+ BASE_STRING='URI:MDMF:'
+ STRING_RE=re.compile('^'+BASE_STRING+BASE32STR_128bits+':'+BASE32STR_256bits+OPTIONAL_EXTENSION_FIELD+'$')
+ HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'MDMF'+SEP+BASE32STR_128bits+SEP+BASE32STR_256bits+OPTIONAL_EXTENSION_FIELD+'$')
+
+ def __init__(self, writekey, fingerprint, params=[]):
+ self.writekey = writekey
+ self.readkey = hashutil.ssk_readkey_hash(writekey)
+ self.storage_index = hashutil.ssk_storage_index_hash(self.readkey)
+ assert len(self.storage_index) == 16
+ self.fingerprint = fingerprint
+ self.extension = params
+
+ @classmethod
+ def init_from_human_encoding(cls, uri):
+ mo = cls.HUMAN_RE.search(uri)
+ if not mo:
+ raise BadURIError("'%s' doesn't look like a %s cap" % (uri, cls))
+ params = filter(lambda x: x != '', re.split(SEP, mo.group(3)))
+ return cls(base32.a2b(mo.group(1)), base32.a2b(mo.group(2)), params)
+
+ @classmethod
+ def init_from_string(cls, uri):
+ mo = cls.STRING_RE.search(uri)
+ if not mo:
+ raise BadURIError("'%s' doesn't look like a %s cap" % (uri, cls))
+ params = mo.group(3)
+ params = filter(lambda x: x != '', params.split(":"))
+ return cls(base32.a2b(mo.group(1)), base32.a2b(mo.group(2)), params)
+
+ def to_string(self):
+ assert isinstance(self.writekey, str)
+ assert isinstance(self.fingerprint, str)
+ ret = 'URI:MDMF:%s:%s' % (base32.b2a(self.writekey),
+ base32.b2a(self.fingerprint))
+ if self.extension:
+ ret += ":"
+ ret += ":".join(self.extension)
+
+ return ret
+
+ def __repr__(self):
+ return "<%s %s>" % (self.__class__.__name__, self.abbrev())
+
+ def abbrev(self):
+ return base32.b2a(self.writekey[:5])
+
+ def abbrev_si(self):
+ return base32.b2a(self.storage_index)[:5]
+
+ def is_readonly(self):
+ return False
+
+ def is_mutable(self):
+ return True
+
+ def get_readonly(self):
+ return ReadonlyMDMFFileURI(self.readkey, self.fingerprint, self.extension)
+
+ def get_verify_cap(self):
+ return MDMFVerifierURI(self.storage_index, self.fingerprint, self.extension)
+
+ def get_extension_params(self):
+ return self.extension
+
+ def set_extension_params(self, params):
+ params = map(str, params)
+ self.extension = params
+
+class ReadonlyMDMFFileURI(_BaseURI):
+ implements(IURI, IMutableFileURI)
+
+ BASE_STRING='URI:MDMF-RO:'
+ STRING_RE=re.compile('^' +BASE_STRING+BASE32STR_128bits+':'+BASE32STR_256bits+OPTIONAL_EXTENSION_FIELD+'$')
+ HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'MDMF-RO'+SEP+BASE32STR_128bits+SEP+BASE32STR_256bits+OPTIONAL_EXTENSION_FIELD+'$')
+
+ def __init__(self, readkey, fingerprint, params=[]):
+ self.readkey = readkey
+ self.storage_index = hashutil.ssk_storage_index_hash(self.readkey)
+ assert len(self.storage_index) == 16
+ self.fingerprint = fingerprint
+ self.extension = params
+
+ @classmethod
+ def init_from_human_encoding(cls, uri):
+ mo = cls.HUMAN_RE.search(uri)
+ if not mo:
+ raise BadURIError("'%s' doesn't look like a %s cap" % (uri, cls))
+ params = mo.group(3)
+ params = filter(lambda x: x!= '', re.split(SEP, params))
+ return cls(base32.a2b(mo.group(1)), base32.a2b(mo.group(2)), params)
+
+ @classmethod
+ def init_from_string(cls, uri):
+ mo = cls.STRING_RE.search(uri)
+ if not mo:
+ raise BadURIError("'%s' doesn't look like a %s cap" % (uri, cls))
+
+ params = mo.group(3)
+ params = filter(lambda x: x != '', params.split(":"))
+ return cls(base32.a2b(mo.group(1)), base32.a2b(mo.group(2)), params)
+
+ def to_string(self):
+ assert isinstance(self.readkey, str)
+ assert isinstance(self.fingerprint, str)
+ ret = 'URI:MDMF-RO:%s:%s' % (base32.b2a(self.readkey),
+ base32.b2a(self.fingerprint))
+ if self.extension:
+ ret += ":"
+ ret += ":".join(self.extension)
+
+ return ret
+
+ def __repr__(self):
+ return "<%s %s>" % (self.__class__.__name__, self.abbrev())
+
+ def abbrev(self):
+ return base32.b2a(self.readkey[:5])
+
+ def abbrev_si(self):
+ return base32.b2a(self.storage_index)[:5]
+
+ def is_readonly(self):
+ return True
+
+ def is_mutable(self):
+ return True
+
+ def get_readonly(self):
+ return self
+
+ def get_verify_cap(self):
+ return MDMFVerifierURI(self.storage_index, self.fingerprint, self.extension)
+
+ def get_extension_params(self):
+ return self.extension
+
+ def set_extension_params(self, params):
+ params = map(str, params)
+ self.extension = params
+
+class MDMFVerifierURI(_BaseURI):
+ implements(IVerifierURI)
+
+ BASE_STRING='URI:MDMF-Verifier:'
+ STRING_RE=re.compile('^'+BASE_STRING+BASE32STR_128bits+':'+BASE32STR_256bits+OPTIONAL_EXTENSION_FIELD+'$')
+ HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'MDMF-Verifier'+SEP+BASE32STR_128bits+SEP+BASE32STR_256bits+OPTIONAL_EXTENSION_FIELD+'$')
+
+ def __init__(self, storage_index, fingerprint, params=[]):
+ assert len(storage_index) == 16
+ self.storage_index = storage_index
+ self.fingerprint = fingerprint
+ self.extension = params
+
+ @classmethod
+ def init_from_human_encoding(cls, uri):
+ mo = cls.HUMAN_RE.search(uri)
+ if not mo:
+ raise BadURIError("'%s' doesn't look like a %s cap" % (uri, cls))
+ params = mo.group(3)
+ params = filter(lambda x: x != '', re.split(SEP, params))
+ return cls(si_a2b(mo.group(1)), base32.a2b(mo.group(2)), params)
+
+ @classmethod
+ def init_from_string(cls, uri):
+ mo = cls.STRING_RE.search(uri)
+ if not mo:
+ raise BadURIError("'%s' doesn't look like a %s cap" % (uri, cls))
+ params = mo.group(3)
+ params = filter(lambda x: x != '', params.split(":"))
+ return cls(si_a2b(mo.group(1)), base32.a2b(mo.group(2)), params)
+
+ def to_string(self):
+ assert isinstance(self.storage_index, str)
+ assert isinstance(self.fingerprint, str)
+ ret = 'URI:MDMF-Verifier:%s:%s' % (si_b2a(self.storage_index),
+ base32.b2a(self.fingerprint))
+ if self.extension:
+ ret += ':'
+ ret += ":".join(self.extension)
+
+ return ret
+
+ def is_readonly(self):
+ return True
+
+ def is_mutable(self):
+ return False
+
+ def get_readonly(self):
+ return self
+
+ def get_verify_cap(self):
+ return self
+
+ def get_extension_params(self):
+ return self.extension
+
class _DirectoryBaseURI(_BaseURI):
implements(IURI, IDirnodeURI)
def __init__(self, filenode_uri=None):
return None
+class MDMFDirectoryURI(_DirectoryBaseURI):
+ implements(IDirectoryURI)
+
+ BASE_STRING='URI:DIR2-MDMF:'
+ BASE_STRING_RE=re.compile('^'+BASE_STRING)
+ BASE_HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'DIR2-MDMF'+SEP)
+ INNER_URI_CLASS=WritableMDMFFileURI
+
+ def __init__(self, filenode_uri=None):
+ if filenode_uri:
+ assert not filenode_uri.is_readonly()
+ _DirectoryBaseURI.__init__(self, filenode_uri)
+
+ def is_readonly(self):
+ return False
+
+ def get_readonly(self):
+ return ReadonlyMDMFDirectoryURI(self._filenode_uri.get_readonly())
+
+ def get_verify_cap(self):
+ return MDMFDirectoryURIVerifier(self._filenode_uri.get_verify_cap())
+
+
+class ReadonlyMDMFDirectoryURI(_DirectoryBaseURI):
+ implements(IReadonlyDirectoryURI)
+
+ BASE_STRING='URI:DIR2-MDMF-RO:'
+ BASE_STRING_RE=re.compile('^'+BASE_STRING)
+ BASE_HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'DIR2-MDMF-RO'+SEP)
+ INNER_URI_CLASS=ReadonlyMDMFFileURI
+
+ def __init__(self, filenode_uri=None):
+ if filenode_uri:
+ assert filenode_uri.is_readonly()
+ _DirectoryBaseURI.__init__(self, filenode_uri)
+
+ def is_readonly(self):
+ return True
+
+ def get_readonly(self):
+ return self
+
+ def get_verify_cap(self):
+ return MDMFDirectoryURIVerifier(self._filenode_uri.get_verify_cap())
+
def wrap_dirnode_cap(filecap):
if isinstance(filecap, WriteableSSKFileURI):
return DirectoryURI(filecap)
return ImmutableDirectoryURI(filecap)
if isinstance(filecap, LiteralFileURI):
return LiteralDirectoryURI(filecap)
+ if isinstance(filecap, WritableMDMFFileURI):
+ return MDMFDirectoryURI(filecap)
+ if isinstance(filecap, ReadonlyMDMFFileURI):
+ return ReadonlyMDMFDirectoryURI(filecap)
assert False, "cannot interpret as a directory cap: %s" % filecap.__class__
+class MDMFDirectoryURIVerifier(_DirectoryBaseURI):
+ implements(IVerifierURI)
+
+ BASE_STRING='URI:DIR2-MDMF-Verifier:'
+ BASE_STRING_RE=re.compile('^'+BASE_STRING)
+ BASE_HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'DIR2-MDMF-Verifier'+SEP)
+ INNER_URI_CLASS=MDMFVerifierURI
+
+ def __init__(self, filenode_uri=None):
+ if filenode_uri:
+ assert IVerifierURI.providedBy(filenode_uri)
+ self._filenode_uri = filenode_uri
+
+ def get_filenode_cap(self):
+ return self._filenode_uri
+
+ def is_mutable(self):
+ return False
class DirectoryURIVerifier(_DirectoryBaseURI):
implements(IVerifierURI)
kind = "URI:SSK-RO readcap to a mutable file"
elif s.startswith('URI:SSK-Verifier:'):
return SSKVerifierURI.init_from_string(s)
+ elif s.startswith('URI:MDMF:'):
+ return WritableMDMFFileURI.init_from_string(s)
+ elif s.startswith('URI:MDMF-RO:'):
+ return ReadonlyMDMFFileURI.init_from_string(s)
+ elif s.startswith('URI:MDMF-Verifier:'):
+ return MDMFVerifierURI.init_from_string(s)
elif s.startswith('URI:DIR2:'):
if can_be_writeable:
return DirectoryURI.init_from_string(s)
return ImmutableDirectoryURI.init_from_string(s)
elif s.startswith('URI:DIR2-LIT:'):
return LiteralDirectoryURI.init_from_string(s)
+ elif s.startswith('URI:DIR2-MDMF:'):
+ if can_be_writeable:
+ return MDMFDirectoryURI.init_from_string(s)
+ kind = "URI:DIR2-MDMF directory writecap"
+ elif s.startswith('URI:DIR2-MDMF-RO:'):
+ if can_be_mutable:
+ return ReadonlyMDMFDirectoryURI.init_from_string(s)
+ kind = "URI:DIR2-MDMF-RO readcap to a mutable directory"
elif s.startswith('x-tahoe-future-test-writeable:') and not can_be_writeable:
# For testing how future writeable caps would behave in read-only contexts.
kind = "x-tahoe-future-test-writeable: testing cap"