From: Zooko O'Whielacronx Date: Thu, 3 Jan 2008 23:55:43 +0000 (-0700) Subject: add human-encodings of caps, refactor encodings of caps, tighten checks, fix unit... X-Git-Tag: allmydata-tahoe-0.7.0~44 X-Git-Url: https://git.rkrishnan.org/junkers?a=commitdiff_plain;h=eb373c0001814ac3adbec88189ec4c83884229af;p=tahoe-lafs%2Ftahoe-lafs.git add human-encodings of caps, refactor encodings of caps, tighten checks, fix unit tests to use values of the right size --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index d250e1a1..bd9c363f 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -128,7 +128,9 @@ class Client(node.Node, Referenceable, testutil.PollMixin): if os.path.exists(privdirfile): try: theuri = open(privdirfile, "r").read().strip() - if not uri.is_string_newdirnode_rw(theuri): + try: + uri.NewDirectoryURI.init_from_human_encoding(theuri) + except: raise EnvironmentError("not a well-formed mutable directory uri") except EnvironmentError, le: d = self.when_tub_ready() diff --git a/src/allmydata/test/test_dirnode.py b/src/allmydata/test/test_dirnode.py index 0e945c54..5e834aff 100644 --- a/src/allmydata/test/test_dirnode.py +++ b/src/allmydata/test/test_dirnode.py @@ -20,7 +20,7 @@ class Marker: if not isinstance(nodeuri, str): nodeuri = nodeuri.to_string() self.nodeuri = nodeuri - si = hashutil.tagged_hash("tag1", nodeuri) + si = hashutil.tagged_hash("tag1", nodeuri)[:16] fp = hashutil.tagged_hash("tag2", nodeuri) self.verifieruri = uri.SSKVerifierURI(storage_index=si, fingerprint=fp).to_string() diff --git a/src/allmydata/test/test_uri.py b/src/allmydata/test/test_uri.py index 4a13d204..6b3cee0c 100644 --- a/src/allmydata/test/test_uri.py +++ b/src/allmydata/test/test_uri.py @@ -45,8 +45,8 @@ class Compare(unittest.TestCase): def test_compare(self): lit1 = uri.LiteralFileURI("some data") fileURI = 'URI:CHK:f3mf6az85wpcai8ma4qayfmxuc:nnw518w5hu3t5oohwtp7ah9n81z9rfg6c1ywk33ia3m64o67nsgo:3:10:345834' - chk1 = uri.CHKFileURI().init_from_string(fileURI) - chk2 = uri.CHKFileURI().init_from_string(fileURI) + chk1 = uri.CHKFileURI.init_from_string(fileURI) + chk2 = uri.CHKFileURI.init_from_string(fileURI) self.failIfEqual(lit1, chk1) self.failUnlessEqual(chk1, chk2) self.failIfEqual(chk1, "not actually a URI") @@ -168,12 +168,13 @@ class Invalid(unittest.TestCase): class Constraint(unittest.TestCase): def test_constraint(self): good="http://127.0.0.1:8123/uri/URI%3ADIR2%3Aqo8ayna47cpw3rx3kho3mu7q4h%3Abk9qbgx76gh6eyj5ps8p6buz8fffw1ofc37e9w9d6ncsfpuz7icy/" - self.failUnless(uri.DirnodeURI_RE.search(good)) + uri.NewDirectoryURI.init_from_human_encoding(good) + self.failUnlessRaises(AssertionError, uri.NewDirectoryURI.init_from_string, good) bad = good + '===' - self.failIf(uri.DirnodeURI_RE.search(bad)) + self.failUnlessRaises(AssertionError, uri.NewDirectoryURI.init_from_human_encoding, bad) + self.failUnlessRaises(AssertionError, uri.NewDirectoryURI.init_from_string, bad) fileURI = 'URI:CHK:f3mf6az85wpcai8ma4qayfmxuc:nnw518w5hu3t5oohwtp7ah9n81z9rfg6c1ywk33ia3m64o67nsgo:3:10:345834' - self.failIf(uri.DirnodeURI_RE.search(fileURI)) - + uri.CHKFileURI.init_from_string(fileURI) class Mutable(unittest.TestCase): def test_pack(self): @@ -239,7 +240,7 @@ class Mutable(unittest.TestCase): class NewDirnode(unittest.TestCase): def test_pack(self): writekey = "\x01" * 16 - fingerprint = "\x02" * 16 + fingerprint = "\x02" * 32 n = uri.WriteableSSKFileURI(writekey, fingerprint) u1 = uri.NewDirectoryURI(n) @@ -301,16 +302,3 @@ class NewDirnode(unittest.TestCase): self.failUnless(IVerifierURI.providedBy(v)) self.failUnlessEqual(v._filenode_uri, u1.get_verifier()._filenode_uri) - - def test_is_string_newdirnode_rw(self): - writekey = "\x01" * 16 - fingerprint = "\x02" * 32 - - n = uri.WriteableSSKFileURI(writekey, fingerprint) - u1 = uri.NewDirectoryURI(n) - - self.failUnless(uri.is_string_newdirnode_rw(u1.to_string()), u1.to_string()) - - self.failIf(uri.is_string_newdirnode_rw("bogus")) - self.failIf(uri.is_string_newdirnode_rw("URI:DIR2:bogus")) - diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py index d4ad068d..e93be026 100644 --- a/src/allmydata/test/test_web.py +++ b/src/allmydata/test/test_web.py @@ -1184,7 +1184,7 @@ class Web(WebMixin, unittest.TestCase): def test_POST_mkdir_no_parentdir_noredirect(self): d = self.POST("/uri?t=mkdir") def _after_mkdir(res): - self.failUnless(uri.is_string_newdirnode_rw(res)) + uri.NewDirectoryURI.init_from_string(res) d.addCallback(_after_mkdir) return d diff --git a/src/allmydata/uri.py b/src/allmydata/uri.py index 0433d908..5cb74f21 100644 --- a/src/allmydata/uri.py +++ b/src/allmydata/uri.py @@ -1,5 +1,5 @@ -import re +import re, urllib from zope.interface import implements from twisted.python.components import registerAdapter from allmydata.util import idlib, hashutil @@ -10,25 +10,15 @@ from allmydata.interfaces import IURI, IDirnodeURI, IFileURI, IVerifierURI, \ # enough information to retrieve and validate the contents. It shall be # expressed in a limited character set (namely [TODO]). -ZBASE32CHAR = "[ybndrfg8ejkmcpqxot1uwisza345h769]" # excludes l, 0, 2, and v -ZBASE32CHAR_3bits = "[yoearcwh]" -ZBASE32CHAR_1bits = "[yo]" -ZBASE32STR_128bits = "%s{25}%s" % (ZBASE32CHAR, ZBASE32CHAR_3bits) -ZBASE32STR_256bits = "%s{51}%s" % (ZBASE32CHAR, ZBASE32CHAR_1bits) -COLON="(:|%3A)" +ZBASE32STR_128bits = '(%s{25}%s)' % (idlib.ZBASE32CHAR, idlib.ZBASE32CHAR_3bits) +ZBASE32STR_256bits = '(%s{51}%s)' % (idlib.ZBASE32CHAR, idlib.ZBASE32CHAR_1bits) -# Writeable SSK bits -WSSKBITS= "(%s)%s(%s)" % (ZBASE32STR_128bits, COLON, ZBASE32STR_256bits) +SEP='(?::|%3A)' +NUMBER='([0-9]+)' # URIs (soon to be renamed "caps") are always allowed to come with a leading -# "http://127.0.0.1:8123/uri/" that will be ignored. -OPTIONALHTTPLEAD=r'(https?://(127.0.0.1|localhost):8123/uri/)?' - -# Writeable SSK URI -WriteableSSKFileURI_RE=re.compile("^%sURI%sSSK%s%s$" % (OPTIONALHTTPLEAD, COLON, COLON, WSSKBITS)) - -# NewDirectory Read-Write URI -DirnodeURI_RE=re.compile("^%sURI%sDIR2%s%s/?$" % (OPTIONALHTTPLEAD, COLON, COLON, WSSKBITS)) +# 'http://127.0.0.1:8123/uri/' that will be ignored. +OPTIONALHTTPLEAD=r'(?:https?://(127.0.0.1|localhost):8123/uri/)?' class _BaseURI: @@ -40,60 +30,51 @@ class _BaseURI: if cmp(self.__class__, them.__class__): return cmp(self.__class__, them.__class__) return cmp(self.to_string(), them.to_string()) + def to_human_encoding(self): + return 'http://127.0.0.1:8123/uri/'+self.to_string() class CHKFileURI(_BaseURI): implements(IURI, IFileURI) - def __init__(self, **kwargs): - # construct me with kwargs, since there are so many of them - if not kwargs: - return - keys = ("key", "uri_extension_hash", - "needed_shares", "total_shares", "size") - for name in kwargs: - if name in keys: - value = kwargs[name] - setattr(self, name, value) - else: - raise TypeError("CHKFileURI does not accept '%s=' argument" - % name) + STRING_RE=re.compile('^URI:CHK:'+ZBASE32STR_128bits+':'+ + ZBASE32STR_256bits+':'+NUMBER+':'+NUMBER+':'+NUMBER+ + '$') + HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'CHK'+SEP+ + ZBASE32STR_128bits+SEP+ZBASE32STR_256bits+SEP+NUMBER+ + SEP+NUMBER+SEP+NUMBER+'$') + + def __init__(self, key, uri_extension_hash, needed_shares, total_shares, + size): + self.key = key + self.uri_extension_hash = uri_extension_hash + self.needed_shares = needed_shares + self.total_shares = total_shares + self.size = size self.storage_index = hashutil.storage_index_chk_hash(self.key) + assert len(self.storage_index) == 16 + self.storage_index = hashutil.storage_index_chk_hash(key) + assert len(self.storage_index) == 16 # sha256 hash truncated to 128 + + @classmethod + def init_from_human_encoding(cls, uri): + mo = cls.HUMAN_RE.search(uri) + assert mo, uri + return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2)), + int(mo.group(3)), int(mo.group(4)), int(mo.group(5))) @classmethod def init_from_string(cls, uri): - assert uri.startswith("URI:CHK:"), uri - d = {} - (header_uri, header_chk, - key_s, uri_extension_hash_s, - needed_shares_s, total_shares_s, size_s) = uri.split(":") - assert header_uri == "URI" - assert header_chk == "CHK" - - key = idlib.a2b(key_s) - assert isinstance(key, str) - assert len(key) == 16 # AES-128 - - storage_index = hashutil.storage_index_chk_hash(key) - assert isinstance(storage_index, str) - assert len(storage_index) == 16 # sha256 hash truncated to 128 - - uri_extension_hash = idlib.a2b(uri_extension_hash_s) - assert isinstance(uri_extension_hash, str) - assert len(uri_extension_hash) == 32 # sha256 hash - - needed_shares = int(needed_shares_s) - total_shares = int(total_shares_s) - size = int(size_s) - return cls(key=key, uri_extension_hash=uri_extension_hash, - needed_shares=needed_shares, total_shares=total_shares, - size=size) + mo = cls.STRING_RE.search(uri) + assert mo, uri + return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2)), + int(mo.group(3)), int(mo.group(4)), int(mo.group(5))) def to_string(self): assert isinstance(self.needed_shares, int) assert isinstance(self.total_shares, int) assert isinstance(self.size, (int,long)) - return ("URI:CHK:%s:%s:%d:%d:%d" % + return ('URI:CHK:%s:%s:%d:%d:%d' % (idlib.b2a(self.key), idlib.b2a(self.uri_extension_hash), self.needed_shares, @@ -120,54 +101,41 @@ class CHKFileURI(_BaseURI): class CHKFileVerifierURI(_BaseURI): implements(IVerifierURI) - def __init__(self, **kwargs): - # construct me with kwargs, since there are so many of them - if not kwargs: - return - self.populate(**kwargs) + STRING_RE=re.compile('^URI:CHK-Verifier:'+ZBASE32STR_128bits+':'+ + ZBASE32STR_256bits+':'+NUMBER+':'+NUMBER+':'+NUMBER) + HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'CHK-Verifier'+SEP+ + ZBASE32STR_128bits+SEP+ZBASE32STR_256bits+SEP+NUMBER+ + SEP+NUMBER+SEP+NUMBER) - def populate(self, storage_index, uri_extension_hash, + def __init__(self, storage_index, uri_extension_hash, needed_shares, total_shares, size): + assert len(storage_index) == 16 self.storage_index = storage_index self.uri_extension_hash = uri_extension_hash self.needed_shares = needed_shares self.total_shares = total_shares self.size = size + @classmethod + def init_from_human_encoding(cls, uri): + mo = cls.HUMAN_RE.search(uri) + assert mo, uri + return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2)), + int(mo.group(3)), int(mo.group(4)), int(mo.group(5))) + @classmethod def init_from_string(cls, uri): - assert uri.startswith("URI:CHK-Verifier:"), uri - d = {} - (header_uri, header_chk, - storage_index_s, uri_extension_hash_s, - needed_shares_s, total_shares_s, size_s) = uri.split(":") - assert header_uri == "URI" - assert header_chk == "CHK-Verifier" - - storage_index = idlib.a2b(storage_index_s) - assert isinstance(storage_index, str) - assert len(storage_index) == 16 # sha256 hash truncated to 128 - - uri_extension_hash = idlib.a2b(uri_extension_hash_s) - assert isinstance(uri_extension_hash, str) - assert len(uri_extension_hash) == 32 # sha256 hash - - needed_shares = int(needed_shares_s) - total_shares = int(total_shares_s) - size = int(size_s) - - return cls(storage_index=storage_index, - uri_extension_hash=uri_extension_hash, - needed_shares=needed_shares, - total_shares=total_shares, - size=size) + mo = cls.STRING_RE.search(uri) + assert mo, uri + return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2)), + int(mo.group(3)), int(mo.group(4)), int(mo.group(5))) def to_string(self): assert isinstance(self.needed_shares, int) assert isinstance(self.total_shares, int) assert isinstance(self.size, (int,long)) - return ("URI:CHK-Verifier:%s:%s:%d:%d:%d" % + return ('URI:CHK-Verifier:%s:%s:%d:%d:%d' % (idlib.b2a(self.storage_index), idlib.b2a(self.uri_extension_hash), self.needed_shares, @@ -178,18 +146,27 @@ class CHKFileVerifierURI(_BaseURI): class LiteralFileURI(_BaseURI): implements(IURI, IFileURI) + STRING_RE=re.compile('^URI:LIT:'+idlib.ZBASE32STR_anybytes+'$') + HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'LIT'+SEP+idlib.ZBASE32STR_anybytes+'$') + def __init__(self, data=None): if data is not None: self.data = data + @classmethod + def init_from_human_encoding(cls, uri): + mo = cls.HUMAN_RE.search(uri) + assert mo, uri + return cls(idlib.a2b(mo.group(1))) + @classmethod def init_from_string(cls, uri): - assert uri.startswith("URI:LIT:") - data_s = uri[len("URI:LIT:"):] - return cls(idlib.a2b(data_s)) + mo = cls.STRING_RE.search(uri) + assert mo, uri + return cls(idlib.a2b(mo.group(1))) def to_string(self): - return "URI:LIT:%s" % idlib.b2a(self.data) + return 'URI:LIT:%s' % idlib.b2a(self.data) def is_readonly(self): return True @@ -208,33 +185,35 @@ class LiteralFileURI(_BaseURI): class WriteableSSKFileURI(_BaseURI): implements(IURI, IMutableFileURI) + BASE_STRING='URI:SSK:' + STRING_RE=re.compile('^'+BASE_STRING+ZBASE32STR_128bits+':'+ + ZBASE32STR_256bits+'$') + HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'SSK'+SEP+ + ZBASE32STR_128bits+SEP+ZBASE32STR_256bits+'$') + def __init__(self, writekey, fingerprint): self.writekey = writekey self.readkey = hashutil.ssk_readkey_hash(writekey) self.storage_index = hashutil.ssk_storage_index_hash(self.readkey) + assert len(self.storage_index) == 16 self.fingerprint = fingerprint @classmethod def init_from_human_encoding(cls, uri): - mo = WriteableSSKFileURI_RE.search(uri) - assert mo - return cls(idlib.a2b(mo.group(5)), idlib.a2b(mo.group(7))) + mo = cls.HUMAN_RE.search(uri) + assert mo, uri + return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2))) @classmethod def init_from_string(cls, uri): - assert uri.startswith("URI:SSK:"), uri - (header_uri, header_ssk, writekey_s, fingerprint_s) = uri.split(":") - return cls(idlib.a2b(writekey_s), idlib.a2b(fingerprint_s)) - - def to_human_encoding(self): - assert isinstance(self.writekey, str) - assert isinstance(self.fingerprint, str) - return "http://127.0.0.1:8123/uri/URI:SSK:%s:%s" % (idlib.b2a(self.writekey), idlib.b2a(self.fingerprint)) + mo = cls.STRING_RE.search(uri) + assert mo, (uri, cls) + return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2))) def to_string(self): assert isinstance(self.writekey, str) assert isinstance(self.fingerprint, str) - return "URI:SSK:%s:%s" % (idlib.b2a(self.writekey), + return 'URI:SSK:%s:%s' % (idlib.b2a(self.writekey), idlib.b2a(self.fingerprint)) def __repr__(self): @@ -255,21 +234,32 @@ class WriteableSSKFileURI(_BaseURI): class ReadonlySSKFileURI(_BaseURI): implements(IURI, IMutableFileURI) + BASE_STRING='URI:SSK-RO:' + STRING_RE=re.compile('^URI:SSK-RO:'+ZBASE32STR_128bits+':'+ZBASE32STR_256bits+'$') + HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'SSK-RO'+SEP+ZBASE32STR_128bits+SEP+ZBASE32STR_256bits+'$') + def __init__(self, readkey, fingerprint): self.readkey = readkey self.storage_index = hashutil.ssk_storage_index_hash(self.readkey) + assert len(self.storage_index) == 16 self.fingerprint = fingerprint + @classmethod + def init_from_human_encoding(cls, uri): + mo = cls.HUMAN_RE.search(uri) + assert mo, uri + return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2))) + @classmethod def init_from_string(cls, uri): - assert uri.startswith("URI:SSK-RO:"), uri - (header_uri, header_ssk, readkey_s, fingerprint_s) = uri.split(":") - return cls(idlib.a2b(readkey_s), idlib.a2b(fingerprint_s)) + mo = cls.STRING_RE.search(uri) + assert mo, uri + return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2))) def to_string(self): assert isinstance(self.readkey, str) assert isinstance(self.fingerprint, str) - return "URI:SSK-RO:%s:%s" % (idlib.b2a(self.readkey), + return 'URI:SSK-RO:%s:%s' % (idlib.b2a(self.readkey), idlib.b2a(self.fingerprint)) def __repr__(self): @@ -290,21 +280,31 @@ class ReadonlySSKFileURI(_BaseURI): class SSKVerifierURI(_BaseURI): implements(IVerifierURI) + BASE_STRING='URI:SSK-Verifier:' + STRING_RE=re.compile('^'+BASE_STRING+ZBASE32STR_128bits+':'+ZBASE32STR_256bits+'$') + HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'SSK-RO'+SEP+ZBASE32STR_128bits+SEP+ZBASE32STR_256bits+'$') + def __init__(self, storage_index, fingerprint): + assert len(storage_index) == 16 self.storage_index = storage_index self.fingerprint = fingerprint + @classmethod + def init_from_human_encoding(cls, uri): + mo = cls.HUMAN_RE.search(uri) + assert mo, uri + return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2))) + @classmethod def init_from_string(cls, uri): - assert uri.startswith("URI:SSK-Verifier:"), uri - (header_uri, header_ssk, - storage_index_s, fingerprint_s) = uri.split(":") - return cls(idlib.a2b(storage_index_s), idlib.a2b(fingerprint_s)) + mo = cls.STRING_RE.search(uri) + assert mo, uri + return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2))) def to_string(self): assert isinstance(self.storage_index, str) assert isinstance(self.fingerprint, str) - return "URI:SSK-Verifier:%s:%s" % (idlib.b2a(self.storage_index), + return 'URI:SSK-Verifier:%s:%s' % (idlib.b2a(self.storage_index), idlib.b2a(self.fingerprint)) class _NewDirectoryBaseURI(_BaseURI): @@ -315,6 +315,33 @@ class _NewDirectoryBaseURI(_BaseURI): def __repr__(self): return "<%s %s>" % (self.__class__.__name__, self.abbrev()) + @classmethod + def init_from_string(cls, uri): + mo = cls.BASE_STRING_RE.search(uri) + assert mo, (uri, cls) + bits = uri[mo.end():] + fn = cls.INNER_URI_CLASS.init_from_string( + cls.INNER_URI_CLASS.BASE_STRING+bits) + return cls(fn) + + @classmethod + def init_from_human_encoding(cls, uri): + mo = cls.BASE_HUMAN_RE.search(uri) + assert mo, uri + bits = uri[mo.end():] + while bits and bits[-1] == '/': + bits = bits[:-1] + fn = cls.INNER_URI_CLASS.init_from_string( + cls.INNER_URI_CLASS.BASE_STRING+urllib.unquote(bits)) + return cls(fn) + + def to_string(self): + fnuri = self._filenode_uri.to_string() + mo = re.match(self.INNER_URI_CLASS.BASE_STRING, fnuri) + assert mo, fnuri + bits = fnuri[mo.end():] + return self.BASE_STRING+bits + def abbrev(self): return self._filenode_uri.to_string().split(':')[2][:5] @@ -329,24 +356,17 @@ class _NewDirectoryBaseURI(_BaseURI): class NewDirectoryURI(_NewDirectoryBaseURI): implements(INewDirectoryURI) + + BASE_STRING='URI:DIR2:' + BASE_STRING_RE=re.compile('^'+BASE_STRING) + BASE_HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'DIR2'+SEP) + INNER_URI_CLASS=WriteableSSKFileURI + def __init__(self, filenode_uri=None): if filenode_uri: assert not filenode_uri.is_readonly() _NewDirectoryBaseURI.__init__(self, filenode_uri) - @classmethod - def init_from_string(cls, uri): - assert uri.startswith("URI:DIR2:") - (header_uri, header_dir2, bits) = uri.split(":", 2) - fn = WriteableSSKFileURI.init_from_string("URI:SSK:" + bits) - return cls(fn) - - def to_string(self): - assert isinstance(self._filenode_uri, WriteableSSKFileURI) - fn_u = self._filenode_uri.to_string() - (header_uri, header_ssk, bits) = fn_u.split(":", 2) - return "URI:DIR2:" + bits - def is_readonly(self): return False @@ -355,74 +375,59 @@ class NewDirectoryURI(_NewDirectoryBaseURI): class ReadonlyNewDirectoryURI(_NewDirectoryBaseURI): implements(IReadonlyNewDirectoryURI) + + BASE_STRING='URI:DIR2-RO:' + BASE_STRING_RE=re.compile('^'+BASE_STRING) + BASE_HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'DIR2-RO'+SEP) + INNER_URI_CLASS=ReadonlySSKFileURI + def __init__(self, filenode_uri=None): if filenode_uri: assert filenode_uri.is_readonly() _NewDirectoryBaseURI.__init__(self, filenode_uri) - @classmethod - def init_from_string(cls, uri): - assert uri.startswith("URI:DIR2-RO:") - (header_uri, header_dir2, bits) = uri.split(":", 2) - fn = ReadonlySSKFileURI.init_from_string("URI:SSK-RO:" + bits) - return cls(fn) - - def to_string(self): - assert isinstance(self._filenode_uri, ReadonlySSKFileURI) - fn_u = self._filenode_uri.to_string() - (header_uri, header_ssk, bits) = fn_u.split(":", 2) - return "URI:DIR2-RO:" + bits - def is_readonly(self): return True def get_readonly(self): return self -class NewDirectoryURIVerifier(_BaseURI): +class NewDirectoryURIVerifier(_NewDirectoryBaseURI): implements(IVerifierURI) + BASE_STRING='URI:DIR2-Verifier:' + BASE_STRING_RE=re.compile('^'+BASE_STRING) + BASE_HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'DIR2-Verifier'+SEP) + INNER_URI_CLASS=SSKVerifierURI + def __init__(self, filenode_uri=None): if filenode_uri: filenode_uri = IVerifierURI(filenode_uri) self._filenode_uri = filenode_uri - @classmethod - def init_from_string(cls, uri): - assert uri.startswith("URI:DIR2-Verifier:") - (header_uri, header_dir2, bits) = uri.split(":", 2) - fv = SSKVerifierURI.init_from_string("URI:SSK-Verifier:" + bits) - return cls(fv) - - def to_string(self): - assert isinstance(self._filenode_uri, SSKVerifierURI) - fn_u = self._filenode_uri.to_string() - (header_uri, header_ssk, bits) = fn_u.split(":", 2) - return "URI:DIR2-Verifier:" + bits - def get_filenode_uri(self): return self._filenode_uri def from_string(s): - if s.startswith("URI:CHK:"): + if s.startswith('URI:CHK:'): return CHKFileURI.init_from_string(s) - elif s.startswith("URI:CHK-Verifier:"): + elif s.startswith('URI:CHK-Verifier:'): return CHKFileVerifierURI.init_from_string(s) - elif s.startswith("URI:LIT:"): + elif s.startswith('URI:LIT:'): return LiteralFileURI.init_from_string(s) - elif s.startswith("URI:SSK:"): + elif s.startswith('URI:SSK:'): return WriteableSSKFileURI.init_from_string(s) - elif s.startswith("URI:SSK-RO:"): + elif s.startswith('URI:SSK-RO:'): return ReadonlySSKFileURI.init_from_string(s) - elif s.startswith("URI:SSK-Verifier:"): + elif s.startswith('URI:SSK-Verifier:'): return SSKVerifierURI.init_from_string(s) - elif s.startswith("URI:DIR2:"): + elif s.startswith('URI:DIR2:'): return NewDirectoryURI.init_from_string(s) - elif s.startswith("URI:DIR2-RO:"): + elif s.startswith('URI:DIR2-RO:'): return ReadonlyNewDirectoryURI.init_from_string(s) - elif s.startswith("URI:DIR2-Verifier:"): + elif s.startswith('URI:DIR2-Verifier:'): return NewDirectoryURIVerifier.init_from_string(s) else: raise TypeError("unknown URI type: %s.." % s[:12]) @@ -436,9 +441,6 @@ def from_string_dirnode(s): registerAdapter(from_string_dirnode, str, IDirnodeURI) -def is_string_newdirnode_rw(s): - return DirnodeURI_RE.search(s) - def from_string_filenode(s): u = from_string(s) assert IFileURI.providedBy(u) @@ -467,31 +469,31 @@ def pack_extension(data): value = "%d" % value assert isinstance(value, str), k assert re.match(r'^[a-zA-Z_\-]+$', k) - pieces.append(k + ":" + hashutil.netstring(value)) - uri_extension = "".join(pieces) + pieces.append(k + ':' + hashutil.netstring(value)) + uri_extension = ''.join(pieces) return uri_extension def unpack_extension(data): d = {} while data: - colon = data.index(":") + colon = data.index(':') key = data[:colon] data = data[colon+1:] - colon = data.index(":") + colon = data.index(':') number = data[:colon] length = int(number) data = data[colon+1:] value = data[:length] - assert data[length] == "," + assert data[length] == ',' data = data[length+1:] d[key] = value # convert certain things to numbers - for intkey in ("size", "segment_size", "num_segments", - "needed_shares", "total_shares"): + for intkey in ('size', 'segment_size', 'num_segments', + 'needed_shares', 'total_shares'): if intkey in d: d[intkey] = int(d[intkey]) return d @@ -500,7 +502,7 @@ def unpack_extension(data): def unpack_extension_readable(data): unpacked = unpack_extension(data) for k in sorted(unpacked.keys()): - if "hash" in k: + if 'hash' in k: unpacked[k] = idlib.b2a(unpacked[k]) return unpacked