3 from twisted.trial import unittest
4 from allmydata import uri
5 from allmydata.util import hashutil, base32
6 from allmydata.interfaces import IURI, IFileURI, IDirnodeURI, IMutableFileURI, \
7 IVerifierURI, CapConstraintError
8 import allmydata.test.common_util as testutil
10 class Literal(testutil.ReallyEqualMixin, unittest.TestCase):
11 def _help_test(self, data):
12 u = uri.LiteralFileURI(data)
13 self.failUnless(IURI.providedBy(u))
14 self.failUnless(IFileURI.providedBy(u))
15 self.failIf(IDirnodeURI.providedBy(u))
16 self.failUnlessReallyEqual(u.data, data)
17 self.failUnlessReallyEqual(u.get_size(), len(data))
18 self.failUnless(u.is_readonly())
19 self.failIf(u.is_mutable())
21 u2 = uri.from_string(u.to_string())
22 self.failUnless(IURI.providedBy(u2))
23 self.failUnless(IFileURI.providedBy(u2))
24 self.failIf(IDirnodeURI.providedBy(u2))
25 self.failUnlessReallyEqual(u2.data, data)
26 self.failUnlessReallyEqual(u2.get_size(), len(data))
27 self.failUnless(u2.is_readonly())
28 self.failIf(u2.is_mutable())
30 u2i = uri.from_string(u.to_string(), deep_immutable=True)
31 self.failUnless(IFileURI.providedBy(u2i))
32 self.failIf(IDirnodeURI.providedBy(u2i))
33 self.failUnlessReallyEqual(u2i.data, data)
34 self.failUnlessReallyEqual(u2i.get_size(), len(data))
35 self.failUnless(u2i.is_readonly())
36 self.failIf(u2i.is_mutable())
39 self.failUnlessIdentical(u, u3)
40 self.failUnlessReallyEqual(u.get_verify_cap(), None)
42 he = u.to_human_encoding()
43 u_h = uri.LiteralFileURI.init_from_human_encoding(he)
44 self.failUnlessReallyEqual(u, u_h)
47 data = "" # This data is some *very* small data!
48 return self._help_test(data)
51 data = "This is some small data"
52 return self._help_test(data)
54 def test_nonascii(self):
55 data = "This contains \x00 and URI:LIT: and \n, oh my."
56 return self._help_test(data)
58 class Compare(testutil.ReallyEqualMixin, unittest.TestCase):
59 def test_compare(self):
60 lit1 = uri.LiteralFileURI("some data")
61 fileURI = 'URI:CHK:f5ahxa25t4qkktywz6teyfvcx4:opuioq7tj2y6idzfp6cazehtmgs5fdcebcz3cygrxyydvcozrmeq:3:10:345834'
62 chk1 = uri.CHKFileURI.init_from_string(fileURI)
63 chk2 = uri.CHKFileURI.init_from_string(fileURI)
64 unk = uri.UnknownURI("lafs://from_the_future")
65 self.failIfEqual(lit1, chk1)
66 self.failUnlessReallyEqual(chk1, chk2)
67 self.failIfEqual(chk1, "not actually a URI")
68 # these should be hashable too
69 s = set([lit1, chk1, chk2, unk])
70 self.failUnlessReallyEqual(len(s), 3) # since chk1==chk2
72 def test_is_uri(self):
73 lit1 = uri.LiteralFileURI("some data").to_string()
74 self.failUnless(uri.is_uri(lit1))
75 self.failIf(uri.is_uri(None))
77 def test_is_literal_file_uri(self):
78 lit1 = uri.LiteralFileURI("some data").to_string()
79 self.failUnless(uri.is_literal_file_uri(lit1))
80 self.failIf(uri.is_literal_file_uri(None))
81 self.failIf(uri.is_literal_file_uri("foo"))
82 self.failIf(uri.is_literal_file_uri("ro.foo"))
83 self.failIf(uri.is_literal_file_uri("URI:LITfoo"))
84 self.failUnless(uri.is_literal_file_uri("ro.URI:LIT:foo"))
85 self.failUnless(uri.is_literal_file_uri("imm.URI:LIT:foo"))
87 def test_has_uri_prefix(self):
88 self.failUnless(uri.has_uri_prefix("URI:foo"))
89 self.failUnless(uri.has_uri_prefix("ro.URI:foo"))
90 self.failUnless(uri.has_uri_prefix("imm.URI:foo"))
91 self.failIf(uri.has_uri_prefix(None))
92 self.failIf(uri.has_uri_prefix("foo"))
94 class CHKFile(testutil.ReallyEqualMixin, unittest.TestCase):
96 key = "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"
97 storage_index = hashutil.storage_index_hash(key)
98 uri_extension_hash = hashutil.uri_extension_hash("stuff")
102 u = uri.CHKFileURI(key=key,
103 uri_extension_hash=uri_extension_hash,
104 needed_shares=needed_shares,
105 total_shares=total_shares,
107 self.failUnlessReallyEqual(u.get_storage_index(), storage_index)
108 self.failUnlessReallyEqual(u.key, key)
109 self.failUnlessReallyEqual(u.uri_extension_hash, uri_extension_hash)
110 self.failUnlessReallyEqual(u.needed_shares, needed_shares)
111 self.failUnlessReallyEqual(u.total_shares, total_shares)
112 self.failUnlessReallyEqual(u.size, size)
113 self.failUnless(u.is_readonly())
114 self.failIf(u.is_mutable())
115 self.failUnless(IURI.providedBy(u))
116 self.failUnless(IFileURI.providedBy(u))
117 self.failIf(IDirnodeURI.providedBy(u))
118 self.failUnlessReallyEqual(u.get_size(), 1234)
120 u_ro = u.get_readonly()
121 self.failUnlessIdentical(u, u_ro)
122 he = u.to_human_encoding()
123 self.failUnlessReallyEqual(he, "http://127.0.0.1:3456/uri/" + u.to_string())
124 self.failUnlessReallyEqual(uri.CHKFileURI.init_from_human_encoding(he), u)
126 u2 = uri.from_string(u.to_string())
127 self.failUnlessReallyEqual(u2.get_storage_index(), storage_index)
128 self.failUnlessReallyEqual(u2.key, key)
129 self.failUnlessReallyEqual(u2.uri_extension_hash, uri_extension_hash)
130 self.failUnlessReallyEqual(u2.needed_shares, needed_shares)
131 self.failUnlessReallyEqual(u2.total_shares, total_shares)
132 self.failUnlessReallyEqual(u2.size, size)
133 self.failUnless(u2.is_readonly())
134 self.failIf(u2.is_mutable())
135 self.failUnless(IURI.providedBy(u2))
136 self.failUnless(IFileURI.providedBy(u2))
137 self.failIf(IDirnodeURI.providedBy(u2))
138 self.failUnlessReallyEqual(u2.get_size(), 1234)
140 u2i = uri.from_string(u.to_string(), deep_immutable=True)
141 self.failUnlessReallyEqual(u.to_string(), u2i.to_string())
142 u2ro = uri.from_string(uri.ALLEGED_READONLY_PREFIX + u.to_string())
143 self.failUnlessReallyEqual(u.to_string(), u2ro.to_string())
144 u2imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u.to_string())
145 self.failUnlessReallyEqual(u.to_string(), u2imm.to_string())
147 v = u.get_verify_cap()
148 self.failUnless(isinstance(v.to_string(), str))
149 self.failUnless(v.is_readonly())
150 self.failIf(v.is_mutable())
152 v2 = uri.from_string(v.to_string())
153 self.failUnlessReallyEqual(v, v2)
154 he = v.to_human_encoding()
155 v2_h = uri.CHKFileVerifierURI.init_from_human_encoding(he)
156 self.failUnlessReallyEqual(v2, v2_h)
158 v3 = uri.CHKFileVerifierURI(storage_index="\x00"*16,
159 uri_extension_hash="\x00"*32,
163 self.failUnless(isinstance(v3.to_string(), str))
164 self.failUnless(v3.is_readonly())
165 self.failIf(v3.is_mutable())
167 def test_pack_badly(self):
168 key = "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"
169 storage_index = hashutil.storage_index_hash(key)
170 uri_extension_hash = hashutil.uri_extension_hash("stuff")
174 self.failUnlessRaises(TypeError,
177 uri_extension_hash=uri_extension_hash,
178 needed_shares=needed_shares,
179 total_shares=total_shares,
182 bogus_extra_argument="reject me",
184 self.failUnlessRaises(TypeError,
185 uri.CHKFileVerifierURI,
187 self.failUnlessRaises(TypeError,
188 uri.CHKFileVerifierURI,
189 storage_index=storage_index,
190 uri_extension_hash=uri_extension_hash,
193 # leave size= missing
197 class Extension(testutil.ReallyEqualMixin, unittest.TestCase):
199 data = {"stuff": "value",
202 "big_hash": hashutil.tagged_hash("foo", "bar"),
204 ext = uri.pack_extension(data)
205 d = uri.unpack_extension(ext)
206 self.failUnlessReallyEqual(d["stuff"], "value")
207 self.failUnlessReallyEqual(d["size"], 12)
208 self.failUnlessReallyEqual(d["big_hash"], hashutil.tagged_hash("foo", "bar"))
210 readable = uri.unpack_extension_readable(ext)
211 self.failUnlessReallyEqual(readable["needed_shares"], 3)
212 self.failUnlessReallyEqual(readable["stuff"], "value")
213 self.failUnlessReallyEqual(readable["size"], 12)
214 self.failUnlessReallyEqual(readable["big_hash"],
215 base32.b2a(hashutil.tagged_hash("foo", "bar")))
216 self.failUnlessReallyEqual(readable["UEB_hash"],
217 base32.b2a(hashutil.uri_extension_hash(ext)))
219 class Unknown(testutil.ReallyEqualMixin, unittest.TestCase):
220 def test_from_future(self):
221 # any URI type that we don't recognize should be treated as unknown
222 future_uri = "I am a URI from the future. Whatever you do, don't "
223 u = uri.from_string(future_uri)
224 self.failUnless(isinstance(u, uri.UnknownURI))
225 self.failUnlessReallyEqual(u.to_string(), future_uri)
226 self.failUnless(u.get_readonly() is None)
227 self.failUnless(u.get_error() is None)
229 u2 = uri.UnknownURI(future_uri, error=CapConstraintError("..."))
230 self.failUnlessReallyEqual(u.to_string(), future_uri)
231 self.failUnless(u2.get_readonly() is None)
232 self.failUnless(isinstance(u2.get_error(), CapConstraintError))
234 # Future caps might have non-ASCII chars in them. (Or maybe not, who can tell about the future?)
235 future_uri = u"I am a cap from the \u263A future. Whatever you ".encode('utf-8')
236 u = uri.from_string(future_uri)
237 self.failUnless(isinstance(u, uri.UnknownURI))
238 self.failUnlessReallyEqual(u.to_string(), future_uri)
239 self.failUnless(u.get_readonly() is None)
240 self.failUnless(u.get_error() is None)
242 u2 = uri.UnknownURI(future_uri, error=CapConstraintError("..."))
243 self.failUnlessReallyEqual(u.to_string(), future_uri)
244 self.failUnless(u2.get_readonly() is None)
245 self.failUnless(isinstance(u2.get_error(), CapConstraintError))
247 class Constraint(testutil.ReallyEqualMixin, unittest.TestCase):
248 def test_constraint(self):
249 good="http://127.0.0.1:3456/uri/URI%3ADIR2%3Agh3l5rbvnv2333mrfvalmjfr4i%3Alz6l7u3z3b7g37s4zkdmfpx5ly4ib4m6thrpbusi6ys62qtc6mma/"
250 uri.DirectoryURI.init_from_human_encoding(good)
251 self.failUnlessRaises(uri.BadURIError, uri.DirectoryURI.init_from_string, good)
253 self.failUnlessRaises(uri.BadURIError, uri.DirectoryURI.init_from_human_encoding, bad)
254 self.failUnlessRaises(uri.BadURIError, uri.DirectoryURI.init_from_string, bad)
255 fileURI = 'URI:CHK:gh3l5rbvnv2333mrfvalmjfr4i:lz6l7u3z3b7g37s4zkdmfpx5ly4ib4m6thrpbusi6ys62qtc6mma:3:10:345834'
256 uri.CHKFileURI.init_from_string(fileURI)
258 class Mutable(testutil.ReallyEqualMixin, unittest.TestCase):
260 self.writekey = "\x01" * 16
261 self.fingerprint = "\x02" * 32
262 self.readkey = hashutil.ssk_readkey_hash(self.writekey)
263 self.storage_index = hashutil.ssk_storage_index_hash(self.readkey)
266 u = uri.WriteableSSKFileURI(self.writekey, self.fingerprint)
267 self.failUnlessReallyEqual(u.writekey, self.writekey)
268 self.failUnlessReallyEqual(u.fingerprint, self.fingerprint)
269 self.failIf(u.is_readonly())
270 self.failUnless(u.is_mutable())
271 self.failUnless(IURI.providedBy(u))
272 self.failUnless(IMutableFileURI.providedBy(u))
273 self.failIf(IDirnodeURI.providedBy(u))
274 self.failUnless("WriteableSSKFileURI" in str(u))
276 he = u.to_human_encoding()
277 u_h = uri.WriteableSSKFileURI.init_from_human_encoding(he)
278 self.failUnlessReallyEqual(u, u_h)
280 u2 = uri.from_string(u.to_string())
281 self.failUnlessReallyEqual(u2.writekey, self.writekey)
282 self.failUnlessReallyEqual(u2.fingerprint, self.fingerprint)
283 self.failIf(u2.is_readonly())
284 self.failUnless(u2.is_mutable())
285 self.failUnless(IURI.providedBy(u2))
286 self.failUnless(IMutableFileURI.providedBy(u2))
287 self.failIf(IDirnodeURI.providedBy(u2))
289 u2i = uri.from_string(u.to_string(), deep_immutable=True)
290 self.failUnless(isinstance(u2i, uri.UnknownURI), u2i)
291 u2ro = uri.from_string(uri.ALLEGED_READONLY_PREFIX + u.to_string())
292 self.failUnless(isinstance(u2ro, uri.UnknownURI), u2ro)
293 u2imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u.to_string())
294 self.failUnless(isinstance(u2imm, uri.UnknownURI), u2imm)
296 u3 = u2.get_readonly()
297 readkey = hashutil.ssk_readkey_hash(self.writekey)
298 self.failUnlessReallyEqual(u3.fingerprint, self.fingerprint)
299 self.failUnlessReallyEqual(u3.readkey, readkey)
300 self.failUnless(u3.is_readonly())
301 self.failUnless(u3.is_mutable())
302 self.failUnless(IURI.providedBy(u3))
303 self.failUnless(IMutableFileURI.providedBy(u3))
304 self.failIf(IDirnodeURI.providedBy(u3))
306 u3i = uri.from_string(u3.to_string(), deep_immutable=True)
307 self.failUnless(isinstance(u3i, uri.UnknownURI), u3i)
308 u3ro = uri.from_string(uri.ALLEGED_READONLY_PREFIX + u3.to_string())
309 self.failUnlessReallyEqual(u3.to_string(), u3ro.to_string())
310 u3imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u3.to_string())
311 self.failUnless(isinstance(u3imm, uri.UnknownURI), u3imm)
313 he = u3.to_human_encoding()
314 u3_h = uri.ReadonlySSKFileURI.init_from_human_encoding(he)
315 self.failUnlessReallyEqual(u3, u3_h)
317 u4 = uri.ReadonlySSKFileURI(readkey, self.fingerprint)
318 self.failUnlessReallyEqual(u4.fingerprint, self.fingerprint)
319 self.failUnlessReallyEqual(u4.readkey, readkey)
320 self.failUnless(u4.is_readonly())
321 self.failUnless(u4.is_mutable())
322 self.failUnless(IURI.providedBy(u4))
323 self.failUnless(IMutableFileURI.providedBy(u4))
324 self.failIf(IDirnodeURI.providedBy(u4))
326 u4i = uri.from_string(u4.to_string(), deep_immutable=True)
327 self.failUnless(isinstance(u4i, uri.UnknownURI), u4i)
328 u4ro = uri.from_string(uri.ALLEGED_READONLY_PREFIX + u4.to_string())
329 self.failUnlessReallyEqual(u4.to_string(), u4ro.to_string())
330 u4imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u4.to_string())
331 self.failUnless(isinstance(u4imm, uri.UnknownURI), u4imm)
333 u4a = uri.from_string(u4.to_string())
334 self.failUnlessReallyEqual(u4a, u4)
335 self.failUnless("ReadonlySSKFileURI" in str(u4a))
336 self.failUnlessIdentical(u4a.get_readonly(), u4a)
338 u5 = u4.get_verify_cap()
339 self.failUnless(IVerifierURI.providedBy(u5))
340 self.failUnlessReallyEqual(u5.get_storage_index(), u.get_storage_index())
341 u7 = u.get_verify_cap()
342 self.failUnless(IVerifierURI.providedBy(u7))
343 self.failUnlessReallyEqual(u7.get_storage_index(), u.get_storage_index())
345 he = u5.to_human_encoding()
346 u5_h = uri.SSKVerifierURI.init_from_human_encoding(he)
347 self.failUnlessReallyEqual(u5, u5_h)
350 def test_writeable_mdmf_cap(self):
351 u1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
353 u = uri.WriteableMDMFFileURI.init_from_string(cap)
355 self.failUnless(IMutableFileURI.providedBy(u))
356 self.failUnlessReallyEqual(u.fingerprint, self.fingerprint)
357 self.failUnlessReallyEqual(u.writekey, self.writekey)
358 self.failUnless(u.is_mutable())
359 self.failIf(u.is_readonly())
360 self.failUnlessEqual(cap, u.to_string())
362 # Now get a readonly cap from the writeable cap, and test that it
363 # degrades gracefully.
364 ru = u.get_readonly()
365 self.failUnlessReallyEqual(self.readkey, ru.readkey)
366 self.failUnlessReallyEqual(self.fingerprint, ru.fingerprint)
367 self.failUnless(ru.is_mutable())
368 self.failUnless(ru.is_readonly())
370 # Now get a verifier cap.
371 vu = ru.get_verify_cap()
372 self.failUnlessReallyEqual(self.storage_index, vu.storage_index)
373 self.failUnlessReallyEqual(self.fingerprint, vu.fingerprint)
374 self.failUnless(IVerifierURI.providedBy(vu))
376 def test_readonly_mdmf_cap(self):
377 u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
379 u2 = uri.ReadonlyMDMFFileURI.init_from_string(cap)
381 self.failUnlessReallyEqual(u2.fingerprint, self.fingerprint)
382 self.failUnlessReallyEqual(u2.readkey, self.readkey)
383 self.failUnless(u2.is_readonly())
384 self.failUnless(u2.is_mutable())
386 vu = u2.get_verify_cap()
387 self.failUnlessEqual(vu.storage_index, self.storage_index)
388 self.failUnlessEqual(vu.fingerprint, self.fingerprint)
390 def test_create_writeable_mdmf_cap_from_readcap(self):
391 # we shouldn't be able to create a writeable MDMF cap given only a
393 u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
395 self.failUnlessRaises(uri.BadURIError,
396 uri.WriteableMDMFFileURI.init_from_string,
399 def test_create_writeable_mdmf_cap_from_verifycap(self):
400 u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
402 self.failUnlessRaises(uri.BadURIError,
403 uri.WriteableMDMFFileURI.init_from_string,
406 def test_create_readonly_mdmf_cap_from_verifycap(self):
407 u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
409 self.failUnlessRaises(uri.BadURIError,
410 uri.ReadonlyMDMFFileURI.init_from_string,
413 def test_mdmf_verifier_cap(self):
414 u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
415 self.failUnless(u1.is_readonly())
416 self.failIf(u1.is_mutable())
417 self.failUnlessReallyEqual(self.storage_index, u1.storage_index)
418 self.failUnlessReallyEqual(self.fingerprint, u1.fingerprint)
421 u2 = uri.MDMFVerifierURI.init_from_string(cap)
422 self.failUnless(u2.is_readonly())
423 self.failIf(u2.is_mutable())
424 self.failUnlessReallyEqual(self.storage_index, u2.storage_index)
425 self.failUnlessReallyEqual(self.fingerprint, u2.fingerprint)
427 u3 = u2.get_readonly()
428 self.failUnlessReallyEqual(u3, u2)
430 u4 = u2.get_verify_cap()
431 self.failUnlessReallyEqual(u4, u2)
433 def test_mdmf_cap_ignore_extensions(self):
434 # MDMF caps can be arbitrarily extended after the fingerprint and
435 # key/storage index fields. tahoe-1.9 is supposed to ignore any
436 # extensions, and not add any itself.
437 u1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
440 cap2 = cap+":I COME FROM THE FUTURE"
441 u2 = uri.WriteableMDMFFileURI.init_from_string(cap2)
442 self.failUnlessReallyEqual(self.writekey, u2.writekey)
443 self.failUnlessReallyEqual(self.fingerprint, u2.fingerprint)
444 self.failIf(u2.is_readonly())
445 self.failUnless(u2.is_mutable())
447 cap3 = cap+":"+os.urandom(40) # parse *that*!
448 u3 = uri.WriteableMDMFFileURI.init_from_string(cap3)
449 self.failUnlessReallyEqual(self.writekey, u3.writekey)
450 self.failUnlessReallyEqual(self.fingerprint, u3.fingerprint)
451 self.failIf(u3.is_readonly())
452 self.failUnless(u3.is_mutable())
454 cap4 = u1.get_readonly().to_string()+":ooh scary future stuff"
455 u4 = uri.from_string_mutable_filenode(cap4)
456 self.failUnlessReallyEqual(self.readkey, u4.readkey)
457 self.failUnlessReallyEqual(self.fingerprint, u4.fingerprint)
458 self.failUnless(u4.is_readonly())
459 self.failUnless(u4.is_mutable())
461 cap5 = u1.get_verify_cap().to_string()+":spoilers!"
462 u5 = uri.from_string(cap5)
463 self.failUnlessReallyEqual(self.storage_index, u5.storage_index)
464 self.failUnlessReallyEqual(self.fingerprint, u5.fingerprint)
465 self.failUnless(u5.is_readonly())
466 self.failIf(u5.is_mutable())
469 def test_mdmf_valid_human_encoding(self):
470 # What's a human encoding? Well, it's of the form:
471 base = "https://127.0.0.1:3456/uri/"
472 # With a cap on the end. For each of the cap types, we need to
473 # test that a valid cap (with and without the traditional
474 # separators) is recognized and accepted by the classes.
475 w1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
476 r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
477 v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
479 # These will yield three different caps.
480 for o in (w1, r1, v1):
481 url = base + o.to_string()
482 o1 = o.__class__.init_from_human_encoding(url)
483 self.failUnlessReallyEqual(o1, o)
485 # Note that our cap will, by default, have : as separators.
486 # But it's expected that users from, e.g., the WUI, will
487 # have %3A as a separator. We need to make sure that the
488 # initialization routine handles that, too.
490 cap = re.sub(":", "%3A", cap)
492 o2 = o.__class__.init_from_human_encoding(url)
493 self.failUnlessReallyEqual(o2, o)
496 def test_mdmf_human_encoding_invalid_base(self):
497 # What's a human encoding? Well, it's of the form:
498 base = "https://127.0.0.1:3456/foo/bar/bazuri/"
499 # With a cap on the end. For each of the cap types, we need to
500 # test that a valid cap (with and without the traditional
501 # separators) is recognized and accepted by the classes.
502 w1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
503 r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
504 v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
506 # These will yield three different caps.
507 for o in (w1, r1, v1):
508 url = base + o.to_string()
509 self.failUnlessRaises(uri.BadURIError,
510 o.__class__.init_from_human_encoding,
513 def test_mdmf_human_encoding_invalid_cap(self):
514 base = "https://127.0.0.1:3456/uri/"
515 # With a cap on the end. For each of the cap types, we need to
516 # test that a valid cap (with and without the traditional
517 # separators) is recognized and accepted by the classes.
518 w1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
519 r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
520 v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
522 # These will yield three different caps.
523 for o in (w1, r1, v1):
524 # not exhaustive, obviously...
525 url = base + o.to_string() + "foobarbaz"
526 url2 = base + "foobarbaz" + o.to_string()
527 url3 = base + o.to_string()[:25] + "foo" + o.to_string()[:25]
528 for u in (url, url2, url3):
529 self.failUnlessRaises(uri.BadURIError,
530 o.__class__.init_from_human_encoding,
533 def test_mdmf_from_string(self):
534 # Make sure that the from_string utility function works with
536 u1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
538 self.failUnless(uri.is_uri(cap))
539 u2 = uri.from_string(cap)
540 self.failUnlessReallyEqual(u1, u2)
541 u3 = uri.from_string_mutable_filenode(cap)
542 self.failUnlessEqual(u3, u1)
544 u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
546 self.failUnless(uri.is_uri(cap))
547 u2 = uri.from_string(cap)
548 self.failUnlessReallyEqual(u1, u2)
549 u3 = uri.from_string_mutable_filenode(cap)
550 self.failUnlessEqual(u3, u1)
552 u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
554 self.failUnless(uri.is_uri(cap))
555 u2 = uri.from_string(cap)
556 self.failUnlessReallyEqual(u1, u2)
557 u3 = uri.from_string_verifier(cap)
558 self.failUnlessEqual(u3, u1)
561 class Dirnode(testutil.ReallyEqualMixin, unittest.TestCase):
563 writekey = "\x01" * 16
564 fingerprint = "\x02" * 32
566 n = uri.WriteableSSKFileURI(writekey, fingerprint)
567 u1 = uri.DirectoryURI(n)
568 self.failIf(u1.is_readonly())
569 self.failUnless(u1.is_mutable())
570 self.failUnless(IURI.providedBy(u1))
571 self.failIf(IFileURI.providedBy(u1))
572 self.failUnless(IDirnodeURI.providedBy(u1))
573 self.failUnless("DirectoryURI" in str(u1))
574 u1_filenode = u1.get_filenode_cap()
575 self.failUnless(u1_filenode.is_mutable())
576 self.failIf(u1_filenode.is_readonly())
578 u2 = uri.from_string(u1.to_string())
579 self.failUnlessReallyEqual(u1.to_string(), u2.to_string())
580 self.failIf(u2.is_readonly())
581 self.failUnless(u2.is_mutable())
582 self.failUnless(IURI.providedBy(u2))
583 self.failIf(IFileURI.providedBy(u2))
584 self.failUnless(IDirnodeURI.providedBy(u2))
586 u2i = uri.from_string(u1.to_string(), deep_immutable=True)
587 self.failUnless(isinstance(u2i, uri.UnknownURI))
589 u3 = u2.get_readonly()
590 self.failUnless(u3.is_readonly())
591 self.failUnless(u3.is_mutable())
592 self.failUnless(IURI.providedBy(u3))
593 self.failIf(IFileURI.providedBy(u3))
594 self.failUnless(IDirnodeURI.providedBy(u3))
596 u3i = uri.from_string(u2.to_string(), deep_immutable=True)
597 self.failUnless(isinstance(u3i, uri.UnknownURI))
599 u3n = u3._filenode_uri
600 self.failUnless(u3n.is_readonly())
601 self.failUnless(u3n.is_mutable())
602 u3_filenode = u3.get_filenode_cap()
603 self.failUnless(u3_filenode.is_mutable())
604 self.failUnless(u3_filenode.is_readonly())
606 u3a = uri.from_string(u3.to_string())
607 self.failUnlessIdentical(u3a, u3a.get_readonly())
609 u4 = uri.ReadonlyDirectoryURI(u2._filenode_uri.get_readonly())
610 self.failUnlessReallyEqual(u4.to_string(), u3.to_string())
611 self.failUnless(u4.is_readonly())
612 self.failUnless(u4.is_mutable())
613 self.failUnless(IURI.providedBy(u4))
614 self.failIf(IFileURI.providedBy(u4))
615 self.failUnless(IDirnodeURI.providedBy(u4))
617 u4_verifier = u4.get_verify_cap()
618 u4_verifier_filenode = u4_verifier.get_filenode_cap()
619 self.failUnless(isinstance(u4_verifier_filenode, uri.SSKVerifierURI))
621 verifiers = [u1.get_verify_cap(), u2.get_verify_cap(),
622 u3.get_verify_cap(), u4.get_verify_cap(),
623 uri.DirectoryURIVerifier(n.get_verify_cap()),
626 self.failUnless(IVerifierURI.providedBy(v))
627 self.failUnlessReallyEqual(v._filenode_uri,
628 u1.get_verify_cap()._filenode_uri)
630 def test_immutable(self):
631 readkey = "\x01" * 16
632 uri_extension_hash = hashutil.uri_extension_hash("stuff")
637 fnuri = uri.CHKFileURI(key=readkey,
638 uri_extension_hash=uri_extension_hash,
639 needed_shares=needed_shares,
640 total_shares=total_shares,
642 fncap = fnuri.to_string()
643 self.failUnlessReallyEqual(fncap, "URI:CHK:aeaqcaibaeaqcaibaeaqcaibae:nf3nimquen7aeqm36ekgxomalstenpkvsdmf6fplj7swdatbv5oa:3:10:1234")
644 u1 = uri.ImmutableDirectoryURI(fnuri)
645 self.failUnless(u1.is_readonly())
646 self.failIf(u1.is_mutable())
647 self.failUnless(IURI.providedBy(u1))
648 self.failIf(IFileURI.providedBy(u1))
649 self.failUnless(IDirnodeURI.providedBy(u1))
650 self.failUnless("DirectoryURI" in str(u1))
651 u1_filenode = u1.get_filenode_cap()
652 self.failIf(u1_filenode.is_mutable())
653 self.failUnless(u1_filenode.is_readonly())
654 self.failUnlessReallyEqual(u1_filenode.to_string(), fncap)
655 self.failUnless(str(u1))
657 u2 = uri.from_string(u1.to_string())
658 self.failUnlessReallyEqual(u1.to_string(), u2.to_string())
659 self.failUnless(u2.is_readonly())
660 self.failIf(u2.is_mutable())
661 self.failUnless(IURI.providedBy(u2))
662 self.failIf(IFileURI.providedBy(u2))
663 self.failUnless(IDirnodeURI.providedBy(u2))
665 u2i = uri.from_string(u1.to_string(), deep_immutable=True)
666 self.failUnlessReallyEqual(u1.to_string(), u2i.to_string())
668 u3 = u2.get_readonly()
669 self.failUnlessReallyEqual(u3.to_string(), u2.to_string())
670 self.failUnless(str(u3))
672 u3i = uri.from_string(u2.to_string(), deep_immutable=True)
673 self.failUnlessReallyEqual(u2.to_string(), u3i.to_string())
675 u2_verifier = u2.get_verify_cap()
676 self.failUnless(isinstance(u2_verifier,
677 uri.ImmutableDirectoryURIVerifier),
679 self.failUnless(IVerifierURI.providedBy(u2_verifier))
680 u2vs = u2_verifier.to_string()
681 # URI:DIR2-CHK-Verifier:$key:$ueb:$k:$n:$size
682 self.failUnless(u2vs.startswith("URI:DIR2-CHK-Verifier:"), u2vs)
683 u2_verifier_fileuri = u2_verifier.get_filenode_cap()
684 self.failUnless(IVerifierURI.providedBy(u2_verifier_fileuri))
685 u2vfs = u2_verifier_fileuri.to_string()
686 # URI:CHK-Verifier:$key:$ueb:$k:$n:$size
687 self.failUnlessReallyEqual(u2vfs, fnuri.get_verify_cap().to_string())
688 self.failUnlessReallyEqual(u2vs[len("URI:DIR2-"):], u2vfs[len("URI:"):])
689 self.failUnless(str(u2_verifier))
691 def test_literal(self):
692 u0 = uri.LiteralFileURI("data")
693 u1 = uri.LiteralDirectoryURI(u0)
694 self.failUnless(str(u1))
695 self.failUnlessReallyEqual(u1.to_string(), "URI:DIR2-LIT:mrqxiyi")
696 self.failUnless(u1.is_readonly())
697 self.failIf(u1.is_mutable())
698 self.failUnless(IURI.providedBy(u1))
699 self.failIf(IFileURI.providedBy(u1))
700 self.failUnless(IDirnodeURI.providedBy(u1))
701 self.failUnlessReallyEqual(u1.get_verify_cap(), None)
702 self.failUnlessReallyEqual(u1.get_storage_index(), None)
703 self.failUnlessReallyEqual(u1.abbrev_si(), "<LIT>")
706 writekey = "\x01" * 16
707 fingerprint = "\x02" * 32
708 uri1 = uri.WriteableMDMFFileURI(writekey, fingerprint)
709 d1 = uri.MDMFDirectoryURI(uri1)
710 self.failIf(d1.is_readonly())
711 self.failUnless(d1.is_mutable())
712 self.failUnless(IURI.providedBy(d1))
713 self.failUnless(IDirnodeURI.providedBy(d1))
714 d1_uri = d1.to_string()
716 d2 = uri.from_string(d1_uri)
717 self.failUnlessIsInstance(d2, uri.MDMFDirectoryURI)
718 self.failIf(d2.is_readonly())
719 self.failUnless(d2.is_mutable())
720 self.failUnless(IURI.providedBy(d2))
721 self.failUnless(IDirnodeURI.providedBy(d2))
723 # It doesn't make sense to ask for a deep immutable URI for a
724 # mutable directory, and we should get back a result to that
726 d3 = uri.from_string(d2.to_string(), deep_immutable=True)
727 self.failUnlessIsInstance(d3, uri.UnknownURI)
729 def test_mdmf_attenuation(self):
730 writekey = "\x01" * 16
731 fingerprint = "\x02" * 32
733 uri1 = uri.WriteableMDMFFileURI(writekey, fingerprint)
734 d1 = uri.MDMFDirectoryURI(uri1)
735 self.failUnless(d1.is_mutable())
736 self.failIf(d1.is_readonly())
737 self.failUnless(IURI.providedBy(d1))
738 self.failUnless(IDirnodeURI.providedBy(d1))
740 d1_uri = d1.to_string()
741 d1_uri_from_fn = uri.MDMFDirectoryURI(d1.get_filenode_cap()).to_string()
742 self.failUnlessEqual(d1_uri_from_fn, d1_uri)
744 uri2 = uri.from_string(d1_uri)
745 self.failUnlessIsInstance(uri2, uri.MDMFDirectoryURI)
746 self.failUnless(IURI.providedBy(uri2))
747 self.failUnless(IDirnodeURI.providedBy(uri2))
748 self.failUnless(uri2.is_mutable())
749 self.failIf(uri2.is_readonly())
751 ro = uri2.get_readonly()
752 self.failUnlessIsInstance(ro, uri.ReadonlyMDMFDirectoryURI)
753 self.failUnless(ro.is_mutable())
754 self.failUnless(ro.is_readonly())
755 self.failUnless(IURI.providedBy(ro))
756 self.failUnless(IDirnodeURI.providedBy(ro))
758 ro_uri = ro.to_string()
759 n = uri.from_string(ro_uri, deep_immutable=True)
760 self.failUnlessIsInstance(n, uri.UnknownURI)
762 fn_cap = ro.get_filenode_cap()
763 fn_ro_cap = fn_cap.get_readonly()
764 d3 = uri.ReadonlyMDMFDirectoryURI(fn_ro_cap)
765 self.failUnlessEqual(ro.to_string(), d3.to_string())
766 self.failUnless(ro.is_mutable())
767 self.failUnless(ro.is_readonly())
769 def test_mdmf_verifier(self):
770 # I'm not sure what I want to write here yet.
771 writekey = "\x01" * 16
772 fingerprint = "\x02" * 32
773 uri1 = uri.WriteableMDMFFileURI(writekey, fingerprint)
774 d1 = uri.MDMFDirectoryURI(uri1)
775 v1 = d1.get_verify_cap()
776 self.failUnlessIsInstance(v1, uri.MDMFDirectoryURIVerifier)
777 self.failIf(v1.is_mutable())
779 d2 = uri.from_string(d1.to_string())
780 v2 = d2.get_verify_cap()
781 self.failUnlessIsInstance(v2, uri.MDMFDirectoryURIVerifier)
782 self.failIf(v2.is_mutable())
783 self.failUnlessEqual(v2.to_string(), v1.to_string())
785 # Now attenuate and make sure that works correctly.
786 r3 = d2.get_readonly()
787 v3 = r3.get_verify_cap()
788 self.failUnlessIsInstance(v3, uri.MDMFDirectoryURIVerifier)
789 self.failIf(v3.is_mutable())
790 self.failUnlessEqual(v3.to_string(), v1.to_string())
791 r4 = uri.from_string(r3.to_string())
792 v4 = r4.get_verify_cap()
793 self.failUnlessIsInstance(v4, uri.MDMFDirectoryURIVerifier)
794 self.failIf(v4.is_mutable())
795 self.failUnlessEqual(v4.to_string(), v3.to_string())