3 from twisted.trial import unittest
4 from allmydata import uri
5 from allmydata.util import hashutil, base32
6 from allmydata.interfaces import IURI, IFileURI, IDirnodeURI, IMutableFileURI, \
7 IVerifierURI, CapConstraintError
8 import allmydata.test.common_util as testutil
10 class Literal(testutil.ReallyEqualMixin, unittest.TestCase):
11 def _help_test(self, data):
12 u = uri.LiteralFileURI(data)
13 self.failUnless(IURI.providedBy(u))
14 self.failUnless(IFileURI.providedBy(u))
15 self.failIf(IDirnodeURI.providedBy(u))
16 self.failUnlessReallyEqual(u.data, data)
17 self.failUnlessReallyEqual(u.get_size(), len(data))
18 self.failUnless(u.is_readonly())
19 self.failIf(u.is_mutable())
21 u2 = uri.from_string(u.to_string())
22 self.failUnless(IURI.providedBy(u2))
23 self.failUnless(IFileURI.providedBy(u2))
24 self.failIf(IDirnodeURI.providedBy(u2))
25 self.failUnlessReallyEqual(u2.data, data)
26 self.failUnlessReallyEqual(u2.get_size(), len(data))
27 self.failUnless(u2.is_readonly())
28 self.failIf(u2.is_mutable())
30 u2i = uri.from_string(u.to_string(), deep_immutable=True)
31 self.failUnless(IFileURI.providedBy(u2i))
32 self.failIf(IDirnodeURI.providedBy(u2i))
33 self.failUnlessReallyEqual(u2i.data, data)
34 self.failUnlessReallyEqual(u2i.get_size(), len(data))
35 self.failUnless(u2i.is_readonly())
36 self.failIf(u2i.is_mutable())
39 self.failUnlessIdentical(u, u3)
40 self.failUnlessReallyEqual(u.get_verify_cap(), None)
42 he = u.to_human_encoding()
43 u_h = uri.LiteralFileURI.init_from_human_encoding(he)
44 self.failUnlessReallyEqual(u, u_h)
47 data = "" # This data is some *very* small data!
48 return self._help_test(data)
51 data = "This is some small data"
52 return self._help_test(data)
54 def test_nonascii(self):
55 data = "This contains \x00 and URI:LIT: and \n, oh my."
56 return self._help_test(data)
58 class Compare(testutil.ReallyEqualMixin, unittest.TestCase):
59 def test_compare(self):
60 lit1 = uri.LiteralFileURI("some data")
61 fileURI = 'URI:CHK:f5ahxa25t4qkktywz6teyfvcx4:opuioq7tj2y6idzfp6cazehtmgs5fdcebcz3cygrxyydvcozrmeq:3:10:345834'
62 chk1 = uri.CHKFileURI.init_from_string(fileURI)
63 chk2 = uri.CHKFileURI.init_from_string(fileURI)
64 unk = uri.UnknownURI("lafs://from_the_future")
65 self.failIfEqual(lit1, chk1)
66 self.failUnlessReallyEqual(chk1, chk2)
67 self.failIfEqual(chk1, "not actually a URI")
68 # these should be hashable too
69 s = set([lit1, chk1, chk2, unk])
70 self.failUnlessReallyEqual(len(s), 3) # since chk1==chk2
72 def test_is_uri(self):
73 lit1 = uri.LiteralFileURI("some data").to_string()
74 self.failUnless(uri.is_uri(lit1))
75 self.failIf(uri.is_uri(None))
77 def test_is_literal_file_uri(self):
78 lit1 = uri.LiteralFileURI("some data").to_string()
79 self.failUnless(uri.is_literal_file_uri(lit1))
80 self.failIf(uri.is_literal_file_uri(None))
81 self.failIf(uri.is_literal_file_uri("foo"))
82 self.failIf(uri.is_literal_file_uri("ro.foo"))
83 self.failIf(uri.is_literal_file_uri("URI:LITfoo"))
84 self.failUnless(uri.is_literal_file_uri("ro.URI:LIT:foo"))
85 self.failUnless(uri.is_literal_file_uri("imm.URI:LIT:foo"))
87 def test_has_uri_prefix(self):
88 self.failUnless(uri.has_uri_prefix("URI:foo"))
89 self.failUnless(uri.has_uri_prefix("ro.URI:foo"))
90 self.failUnless(uri.has_uri_prefix("imm.URI:foo"))
91 self.failIf(uri.has_uri_prefix(None))
92 self.failIf(uri.has_uri_prefix("foo"))
94 class CHKFile(testutil.ReallyEqualMixin, unittest.TestCase):
96 key = "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"
97 storage_index = hashutil.storage_index_hash(key)
98 uri_extension_hash = hashutil.uri_extension_hash("stuff")
102 u = uri.CHKFileURI(key=key,
103 uri_extension_hash=uri_extension_hash,
104 needed_shares=needed_shares,
105 total_shares=total_shares,
107 self.failUnlessReallyEqual(u.get_storage_index(), storage_index)
108 self.failUnlessReallyEqual(u.key, key)
109 self.failUnlessReallyEqual(u.uri_extension_hash, uri_extension_hash)
110 self.failUnlessReallyEqual(u.needed_shares, needed_shares)
111 self.failUnlessReallyEqual(u.total_shares, total_shares)
112 self.failUnlessReallyEqual(u.size, size)
113 self.failUnless(u.is_readonly())
114 self.failIf(u.is_mutable())
115 self.failUnless(IURI.providedBy(u))
116 self.failUnless(IFileURI.providedBy(u))
117 self.failIf(IDirnodeURI.providedBy(u))
118 self.failUnlessReallyEqual(u.get_size(), 1234)
120 u_ro = u.get_readonly()
121 self.failUnlessIdentical(u, u_ro)
122 he = u.to_human_encoding()
123 self.failUnlessReallyEqual(he, "http://127.0.0.1:3456/uri/" + u.to_string())
124 self.failUnlessReallyEqual(uri.CHKFileURI.init_from_human_encoding(he), u)
126 u2 = uri.from_string(u.to_string())
127 self.failUnlessReallyEqual(u2.get_storage_index(), storage_index)
128 self.failUnlessReallyEqual(u2.key, key)
129 self.failUnlessReallyEqual(u2.uri_extension_hash, uri_extension_hash)
130 self.failUnlessReallyEqual(u2.needed_shares, needed_shares)
131 self.failUnlessReallyEqual(u2.total_shares, total_shares)
132 self.failUnlessReallyEqual(u2.size, size)
133 self.failUnless(u2.is_readonly())
134 self.failIf(u2.is_mutable())
135 self.failUnless(IURI.providedBy(u2))
136 self.failUnless(IFileURI.providedBy(u2))
137 self.failIf(IDirnodeURI.providedBy(u2))
138 self.failUnlessReallyEqual(u2.get_size(), 1234)
140 u2i = uri.from_string(u.to_string(), deep_immutable=True)
141 self.failUnlessReallyEqual(u.to_string(), u2i.to_string())
142 u2ro = uri.from_string(uri.ALLEGED_READONLY_PREFIX + u.to_string())
143 self.failUnlessReallyEqual(u.to_string(), u2ro.to_string())
144 u2imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u.to_string())
145 self.failUnlessReallyEqual(u.to_string(), u2imm.to_string())
147 v = u.get_verify_cap()
148 self.failUnless(isinstance(v.to_string(), str))
149 self.failUnless(v.is_readonly())
150 self.failIf(v.is_mutable())
152 v2 = uri.from_string(v.to_string())
153 self.failUnlessReallyEqual(v, v2)
154 he = v.to_human_encoding()
155 v2_h = uri.CHKFileVerifierURI.init_from_human_encoding(he)
156 self.failUnlessReallyEqual(v2, v2_h)
158 v3 = uri.CHKFileVerifierURI(storage_index="\x00"*16,
159 uri_extension_hash="\x00"*32,
163 self.failUnless(isinstance(v3.to_string(), str))
164 self.failUnless(v3.is_readonly())
165 self.failIf(v3.is_mutable())
167 def test_pack_badly(self):
168 key = "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"
169 storage_index = hashutil.storage_index_hash(key)
170 uri_extension_hash = hashutil.uri_extension_hash("stuff")
174 self.failUnlessRaises(TypeError,
177 uri_extension_hash=uri_extension_hash,
178 needed_shares=needed_shares,
179 total_shares=total_shares,
182 bogus_extra_argument="reject me",
184 self.failUnlessRaises(TypeError,
185 uri.CHKFileVerifierURI,
187 self.failUnlessRaises(TypeError,
188 uri.CHKFileVerifierURI,
189 storage_index=storage_index,
190 uri_extension_hash=uri_extension_hash,
193 # leave size= missing
197 class Extension(testutil.ReallyEqualMixin, unittest.TestCase):
199 data = {"stuff": "value",
202 "big_hash": hashutil.tagged_hash("foo", "bar"),
204 ext = uri.pack_extension(data)
205 d = uri.unpack_extension(ext)
206 self.failUnlessReallyEqual(d["stuff"], "value")
207 self.failUnlessReallyEqual(d["size"], 12)
208 self.failUnlessReallyEqual(d["big_hash"], hashutil.tagged_hash("foo", "bar"))
210 readable = uri.unpack_extension_readable(ext)
211 self.failUnlessReallyEqual(readable["needed_shares"], 3)
212 self.failUnlessReallyEqual(readable["stuff"], "value")
213 self.failUnlessReallyEqual(readable["size"], 12)
214 self.failUnlessReallyEqual(readable["big_hash"],
215 base32.b2a(hashutil.tagged_hash("foo", "bar")))
216 self.failUnlessReallyEqual(readable["UEB_hash"],
217 base32.b2a(hashutil.uri_extension_hash(ext)))
219 class Unknown(testutil.ReallyEqualMixin, unittest.TestCase):
220 def test_from_future(self):
221 # any URI type that we don't recognize should be treated as unknown
222 future_uri = "I am a URI from the future. Whatever you do, don't "
223 u = uri.from_string(future_uri)
224 self.failUnless(isinstance(u, uri.UnknownURI))
225 self.failUnlessReallyEqual(u.to_string(), future_uri)
226 self.failUnless(u.get_readonly() is None)
227 self.failUnless(u.get_error() is None)
229 u2 = uri.UnknownURI(future_uri, error=CapConstraintError("..."))
230 self.failUnlessReallyEqual(u.to_string(), future_uri)
231 self.failUnless(u2.get_readonly() is None)
232 self.failUnless(isinstance(u2.get_error(), CapConstraintError))
234 # Future caps might have non-ASCII chars in them. (Or maybe not, who can tell about the future?)
235 future_uri = u"I am a cap from the \u263A future. Whatever you ".encode('utf-8')
236 u = uri.from_string(future_uri)
237 self.failUnless(isinstance(u, uri.UnknownURI))
238 self.failUnlessReallyEqual(u.to_string(), future_uri)
239 self.failUnless(u.get_readonly() is None)
240 self.failUnless(u.get_error() is None)
242 u2 = uri.UnknownURI(future_uri, error=CapConstraintError("..."))
243 self.failUnlessReallyEqual(u.to_string(), future_uri)
244 self.failUnless(u2.get_readonly() is None)
245 self.failUnless(isinstance(u2.get_error(), CapConstraintError))
247 class Constraint(testutil.ReallyEqualMixin, unittest.TestCase):
248 def test_constraint(self):
249 good="http://127.0.0.1:3456/uri/URI%3ADIR2%3Agh3l5rbvnv2333mrfvalmjfr4i%3Alz6l7u3z3b7g37s4zkdmfpx5ly4ib4m6thrpbusi6ys62qtc6mma/"
250 uri.DirectoryURI.init_from_human_encoding(good)
251 self.failUnlessRaises(uri.BadURIError, uri.DirectoryURI.init_from_string, good)
253 self.failUnlessRaises(uri.BadURIError, uri.DirectoryURI.init_from_human_encoding, bad)
254 self.failUnlessRaises(uri.BadURIError, uri.DirectoryURI.init_from_string, bad)
255 fileURI = 'URI:CHK:gh3l5rbvnv2333mrfvalmjfr4i:lz6l7u3z3b7g37s4zkdmfpx5ly4ib4m6thrpbusi6ys62qtc6mma:3:10:345834'
256 uri.CHKFileURI.init_from_string(fileURI)
258 class Mutable(testutil.ReallyEqualMixin, unittest.TestCase):
260 self.writekey = "\x01" * 16
261 self.fingerprint = "\x02" * 32
262 self.readkey = hashutil.ssk_readkey_hash(self.writekey)
263 self.storage_index = hashutil.ssk_storage_index_hash(self.readkey)
266 u = uri.WriteableSSKFileURI(self.writekey, self.fingerprint)
267 self.failUnlessReallyEqual(u.writekey, self.writekey)
268 self.failUnlessReallyEqual(u.fingerprint, self.fingerprint)
269 self.failIf(u.is_readonly())
270 self.failUnless(u.is_mutable())
271 self.failUnless(IURI.providedBy(u))
272 self.failUnless(IMutableFileURI.providedBy(u))
273 self.failIf(IDirnodeURI.providedBy(u))
274 self.failUnless("WriteableSSKFileURI" in str(u))
276 he = u.to_human_encoding()
277 u_h = uri.WriteableSSKFileURI.init_from_human_encoding(he)
278 self.failUnlessReallyEqual(u, u_h)
280 u2 = uri.from_string(u.to_string())
281 self.failUnlessReallyEqual(u2.writekey, self.writekey)
282 self.failUnlessReallyEqual(u2.fingerprint, self.fingerprint)
283 self.failIf(u2.is_readonly())
284 self.failUnless(u2.is_mutable())
285 self.failUnless(IURI.providedBy(u2))
286 self.failUnless(IMutableFileURI.providedBy(u2))
287 self.failIf(IDirnodeURI.providedBy(u2))
289 u2i = uri.from_string(u.to_string(), deep_immutable=True)
290 self.failUnless(isinstance(u2i, uri.UnknownURI), u2i)
291 u2ro = uri.from_string(uri.ALLEGED_READONLY_PREFIX + u.to_string())
292 self.failUnless(isinstance(u2ro, uri.UnknownURI), u2ro)
293 u2imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u.to_string())
294 self.failUnless(isinstance(u2imm, uri.UnknownURI), u2imm)
296 u3 = u2.get_readonly()
297 readkey = hashutil.ssk_readkey_hash(self.writekey)
298 self.failUnlessReallyEqual(u3.fingerprint, self.fingerprint)
299 self.failUnlessReallyEqual(u3.readkey, readkey)
300 self.failUnless(u3.is_readonly())
301 self.failUnless(u3.is_mutable())
302 self.failUnless(IURI.providedBy(u3))
303 self.failUnless(IMutableFileURI.providedBy(u3))
304 self.failIf(IDirnodeURI.providedBy(u3))
306 u3i = uri.from_string(u3.to_string(), deep_immutable=True)
307 self.failUnless(isinstance(u3i, uri.UnknownURI), u3i)
308 u3ro = uri.from_string(uri.ALLEGED_READONLY_PREFIX + u3.to_string())
309 self.failUnlessReallyEqual(u3.to_string(), u3ro.to_string())
310 u3imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u3.to_string())
311 self.failUnless(isinstance(u3imm, uri.UnknownURI), u3imm)
313 he = u3.to_human_encoding()
314 u3_h = uri.ReadonlySSKFileURI.init_from_human_encoding(he)
315 self.failUnlessReallyEqual(u3, u3_h)
317 u4 = uri.ReadonlySSKFileURI(readkey, self.fingerprint)
318 self.failUnlessReallyEqual(u4.fingerprint, self.fingerprint)
319 self.failUnlessReallyEqual(u4.readkey, readkey)
320 self.failUnless(u4.is_readonly())
321 self.failUnless(u4.is_mutable())
322 self.failUnless(IURI.providedBy(u4))
323 self.failUnless(IMutableFileURI.providedBy(u4))
324 self.failIf(IDirnodeURI.providedBy(u4))
326 u4i = uri.from_string(u4.to_string(), deep_immutable=True)
327 self.failUnless(isinstance(u4i, uri.UnknownURI), u4i)
328 u4ro = uri.from_string(uri.ALLEGED_READONLY_PREFIX + u4.to_string())
329 self.failUnlessReallyEqual(u4.to_string(), u4ro.to_string())
330 u4imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u4.to_string())
331 self.failUnless(isinstance(u4imm, uri.UnknownURI), u4imm)
333 u4a = uri.from_string(u4.to_string())
334 self.failUnlessReallyEqual(u4a, u4)
335 self.failUnless("ReadonlySSKFileURI" in str(u4a))
336 self.failUnlessIdentical(u4a.get_readonly(), u4a)
338 u5 = u4.get_verify_cap()
339 self.failUnless(IVerifierURI.providedBy(u5))
340 self.failUnlessReallyEqual(u5.get_storage_index(), u.get_storage_index())
341 u7 = u.get_verify_cap()
342 self.failUnless(IVerifierURI.providedBy(u7))
343 self.failUnlessReallyEqual(u7.get_storage_index(), u.get_storage_index())
345 he = u5.to_human_encoding()
346 u5_h = uri.SSKVerifierURI.init_from_human_encoding(he)
347 self.failUnlessReallyEqual(u5, u5_h)
350 def test_writable_mdmf_cap(self):
351 u1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
353 u = uri.WriteableMDMFFileURI.init_from_string(cap)
355 self.failUnless(IMutableFileURI.providedBy(u))
356 self.failUnlessReallyEqual(u.fingerprint, self.fingerprint)
357 self.failUnlessReallyEqual(u.writekey, self.writekey)
358 self.failUnless(u.is_mutable())
359 self.failIf(u.is_readonly())
360 self.failUnlessEqual(cap, u.to_string())
362 # Now get a readonly cap from the writable cap, and test that it
363 # degrades gracefully.
364 ru = u.get_readonly()
365 self.failUnlessReallyEqual(self.readkey, ru.readkey)
366 self.failUnlessReallyEqual(self.fingerprint, ru.fingerprint)
367 self.failUnless(ru.is_mutable())
368 self.failUnless(ru.is_readonly())
370 # Now get a verifier cap.
371 vu = ru.get_verify_cap()
372 self.failUnlessReallyEqual(self.storage_index, vu.storage_index)
373 self.failUnlessReallyEqual(self.fingerprint, vu.fingerprint)
374 self.failUnless(IVerifierURI.providedBy(vu))
376 def test_readonly_mdmf_cap(self):
377 u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
379 u2 = uri.ReadonlyMDMFFileURI.init_from_string(cap)
381 self.failUnlessReallyEqual(u2.fingerprint, self.fingerprint)
382 self.failUnlessReallyEqual(u2.readkey, self.readkey)
383 self.failUnless(u2.is_readonly())
384 self.failUnless(u2.is_mutable())
386 vu = u2.get_verify_cap()
387 self.failUnlessEqual(vu.storage_index, self.storage_index)
388 self.failUnlessEqual(vu.fingerprint, self.fingerprint)
390 def test_create_writable_mdmf_cap_from_readcap(self):
391 # we shouldn't be able to create a writable MDMF cap given only a
393 u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
395 self.failUnlessRaises(uri.BadURIError,
396 uri.WriteableMDMFFileURI.init_from_string,
399 def test_create_writable_mdmf_cap_from_verifycap(self):
400 u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
402 self.failUnlessRaises(uri.BadURIError,
403 uri.WriteableMDMFFileURI.init_from_string,
406 def test_create_readonly_mdmf_cap_from_verifycap(self):
407 u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
409 self.failUnlessRaises(uri.BadURIError,
410 uri.ReadonlyMDMFFileURI.init_from_string,
413 def test_mdmf_verifier_cap(self):
414 u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
415 self.failUnless(u1.is_readonly())
416 self.failIf(u1.is_mutable())
417 self.failUnlessReallyEqual(self.storage_index, u1.storage_index)
418 self.failUnlessReallyEqual(self.fingerprint, u1.fingerprint)
421 u2 = uri.MDMFVerifierURI.init_from_string(cap)
422 self.failUnless(u2.is_readonly())
423 self.failIf(u2.is_mutable())
424 self.failUnlessReallyEqual(self.storage_index, u2.storage_index)
425 self.failUnlessReallyEqual(self.fingerprint, u2.fingerprint)
427 u3 = u2.get_readonly()
428 self.failUnlessReallyEqual(u3, u2)
430 u4 = u2.get_verify_cap()
431 self.failUnlessReallyEqual(u4, u2)
433 def test_mdmf_cap_extra_information(self):
434 # MDMF caps can be arbitrarily extended after the fingerprint
435 # and key/storage index fields.
436 u1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
437 self.failUnlessEqual([], u1.get_extension_params())
440 # Now let's append some fields. Say, 131073 (the segment size)
441 # and 3 (the "k" encoding parameter).
442 expected_extensions = []
443 for e in ('131073', '3'):
445 expected_extensions.append(e)
447 u2 = uri.WriteableMDMFFileURI.init_from_string(cap)
448 self.failUnlessReallyEqual(self.writekey, u2.writekey)
449 self.failUnlessReallyEqual(self.fingerprint, u2.fingerprint)
450 self.failIf(u2.is_readonly())
451 self.failUnless(u2.is_mutable())
454 u2n = uri.WriteableMDMFFileURI.init_from_string(c2)
455 self.failUnlessReallyEqual(u2, u2n)
457 # We should get the extra back when we ask for it.
458 self.failUnlessEqual(expected_extensions, u2.get_extension_params())
460 # These should be preserved through cap attenuation, too.
461 u3 = u2.get_readonly()
462 self.failUnlessReallyEqual(self.readkey, u3.readkey)
463 self.failUnlessReallyEqual(self.fingerprint, u3.fingerprint)
464 self.failUnless(u3.is_readonly())
465 self.failUnless(u3.is_mutable())
466 self.failUnlessEqual(expected_extensions, u3.get_extension_params())
469 u3n = uri.ReadonlyMDMFFileURI.init_from_string(c3)
470 self.failUnlessReallyEqual(u3, u3n)
472 u4 = u3.get_verify_cap()
473 self.failUnlessReallyEqual(self.storage_index, u4.storage_index)
474 self.failUnlessReallyEqual(self.fingerprint, u4.fingerprint)
475 self.failUnless(u4.is_readonly())
476 self.failIf(u4.is_mutable())
479 u4n = uri.MDMFVerifierURI.init_from_string(c4)
480 self.failUnlessReallyEqual(u4n, u4)
482 self.failUnlessEqual(expected_extensions, u4.get_extension_params())
485 def test_sdmf_cap_extra_information(self):
486 # For interface consistency, we define a method to get
487 # extensions for SDMF files as well. This method must always
488 # return no extensions, since SDMF files were not created with
489 # extensions and cannot be modified to include extensions
490 # without breaking older clients.
491 u1 = uri.WriteableSSKFileURI(self.writekey, self.fingerprint)
493 u2 = uri.WriteableSSKFileURI.init_from_string(cap)
494 self.failUnlessEqual([], u2.get_extension_params())
496 def test_extension_character_range(self):
497 # As written now, we shouldn't put things other than numbers in
498 # the extension fields.
499 writecap = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint).to_string()
500 readcap = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint).to_string()
501 vcap = uri.MDMFVerifierURI(self.storage_index, self.fingerprint).to_string()
502 self.failUnlessRaises(uri.BadURIError,
503 uri.WriteableMDMFFileURI.init_from_string,
504 ("%s:invalid" % writecap))
505 self.failUnlessRaises(uri.BadURIError,
506 uri.ReadonlyMDMFFileURI.init_from_string,
507 ("%s:invalid" % readcap))
508 self.failUnlessRaises(uri.BadURIError,
509 uri.MDMFVerifierURI.init_from_string,
510 ("%s:invalid" % vcap))
513 def test_mdmf_valid_human_encoding(self):
514 # What's a human encoding? Well, it's of the form:
515 base = "https://127.0.0.1:3456/uri/"
516 # With a cap on the end. For each of the cap types, we need to
517 # test that a valid cap (with and without the traditional
518 # separators) is recognized and accepted by the classes.
519 w1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
520 w2 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint,
522 r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
523 r2 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint,
525 v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
526 v2 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint,
529 # These will yield six different caps.
530 for o in (w1, w2, r1 , r2, v1, v2):
531 url = base + o.to_string()
532 o1 = o.__class__.init_from_human_encoding(url)
533 self.failUnlessReallyEqual(o1, o)
535 # Note that our cap will, by default, have : as separators.
536 # But it's expected that users from, e.g., the WUI, will
537 # have %3A as a separator. We need to make sure that the
538 # initialization routine handles that, too.
540 cap = re.sub(":", "%3A", cap)
542 o2 = o.__class__.init_from_human_encoding(url)
543 self.failUnlessReallyEqual(o2, o)
546 def test_mdmf_human_encoding_invalid_base(self):
547 # What's a human encoding? Well, it's of the form:
548 base = "https://127.0.0.1:3456/foo/bar/bazuri/"
549 # With a cap on the end. For each of the cap types, we need to
550 # test that a valid cap (with and without the traditional
551 # separators) is recognized and accepted by the classes.
552 w1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
553 w2 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint,
555 r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
556 r2 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint,
558 v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
559 v2 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint,
562 # These will yield six different caps.
563 for o in (w1, w2, r1 , r2, v1, v2):
564 url = base + o.to_string()
565 self.failUnlessRaises(uri.BadURIError,
566 o.__class__.init_from_human_encoding,
569 def test_mdmf_human_encoding_invalid_cap(self):
570 base = "https://127.0.0.1:3456/uri/"
571 # With a cap on the end. For each of the cap types, we need to
572 # test that a valid cap (with and without the traditional
573 # separators) is recognized and accepted by the classes.
574 w1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
575 w2 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint,
577 r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
578 r2 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint,
580 v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
581 v2 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint,
584 # These will yield six different caps.
585 for o in (w1, w2, r1 , r2, v1, v2):
586 # not exhaustive, obviously...
587 url = base + o.to_string() + "foobarbaz"
588 url2 = base + "foobarbaz" + o.to_string()
589 url3 = base + o.to_string()[:25] + "foo" + o.to_string()[:25]
590 for u in (url, url2, url3):
591 self.failUnlessRaises(uri.BadURIError,
592 o.__class__.init_from_human_encoding,
595 def test_mdmf_from_string(self):
596 # Make sure that the from_string utility function works with
598 u1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
600 self.failUnless(uri.is_uri(cap))
601 u2 = uri.from_string(cap)
602 self.failUnlessReallyEqual(u1, u2)
603 u3 = uri.from_string_mutable_filenode(cap)
604 self.failUnlessEqual(u3, u1)
606 # XXX: We should refactor the extension field into setUp
607 u1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint,
610 self.failUnless(uri.is_uri(cap))
611 u2 = uri.from_string(cap)
612 self.failUnlessReallyEqual(u1, u2)
613 u3 = uri.from_string_mutable_filenode(cap)
614 self.failUnlessEqual(u3, u1)
616 u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
618 self.failUnless(uri.is_uri(cap))
619 u2 = uri.from_string(cap)
620 self.failUnlessReallyEqual(u1, u2)
621 u3 = uri.from_string_mutable_filenode(cap)
622 self.failUnlessEqual(u3, u1)
624 u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint,
627 self.failUnless(uri.is_uri(cap))
628 u2 = uri.from_string(cap)
629 self.failUnlessReallyEqual(u1, u2)
630 u3 = uri.from_string_mutable_filenode(cap)
631 self.failUnlessEqual(u3, u1)
633 u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
635 self.failUnless(uri.is_uri(cap))
636 u2 = uri.from_string(cap)
637 self.failUnlessReallyEqual(u1, u2)
638 u3 = uri.from_string_verifier(cap)
639 self.failUnlessEqual(u3, u1)
641 u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint,
644 self.failUnless(uri.is_uri(cap))
645 u2 = uri.from_string(cap)
646 self.failUnlessReallyEqual(u1, u2)
647 u3 = uri.from_string_verifier(cap)
648 self.failUnlessEqual(u3, u1)
651 class Dirnode(testutil.ReallyEqualMixin, unittest.TestCase):
653 writekey = "\x01" * 16
654 fingerprint = "\x02" * 32
656 n = uri.WriteableSSKFileURI(writekey, fingerprint)
657 u1 = uri.DirectoryURI(n)
658 self.failIf(u1.is_readonly())
659 self.failUnless(u1.is_mutable())
660 self.failUnless(IURI.providedBy(u1))
661 self.failIf(IFileURI.providedBy(u1))
662 self.failUnless(IDirnodeURI.providedBy(u1))
663 self.failUnless("DirectoryURI" in str(u1))
664 u1_filenode = u1.get_filenode_cap()
665 self.failUnless(u1_filenode.is_mutable())
666 self.failIf(u1_filenode.is_readonly())
668 u2 = uri.from_string(u1.to_string())
669 self.failUnlessReallyEqual(u1.to_string(), u2.to_string())
670 self.failIf(u2.is_readonly())
671 self.failUnless(u2.is_mutable())
672 self.failUnless(IURI.providedBy(u2))
673 self.failIf(IFileURI.providedBy(u2))
674 self.failUnless(IDirnodeURI.providedBy(u2))
676 u2i = uri.from_string(u1.to_string(), deep_immutable=True)
677 self.failUnless(isinstance(u2i, uri.UnknownURI))
679 u3 = u2.get_readonly()
680 self.failUnless(u3.is_readonly())
681 self.failUnless(u3.is_mutable())
682 self.failUnless(IURI.providedBy(u3))
683 self.failIf(IFileURI.providedBy(u3))
684 self.failUnless(IDirnodeURI.providedBy(u3))
686 u3i = uri.from_string(u2.to_string(), deep_immutable=True)
687 self.failUnless(isinstance(u3i, uri.UnknownURI))
689 u3n = u3._filenode_uri
690 self.failUnless(u3n.is_readonly())
691 self.failUnless(u3n.is_mutable())
692 u3_filenode = u3.get_filenode_cap()
693 self.failUnless(u3_filenode.is_mutable())
694 self.failUnless(u3_filenode.is_readonly())
696 u3a = uri.from_string(u3.to_string())
697 self.failUnlessIdentical(u3a, u3a.get_readonly())
699 u4 = uri.ReadonlyDirectoryURI(u2._filenode_uri.get_readonly())
700 self.failUnlessReallyEqual(u4.to_string(), u3.to_string())
701 self.failUnless(u4.is_readonly())
702 self.failUnless(u4.is_mutable())
703 self.failUnless(IURI.providedBy(u4))
704 self.failIf(IFileURI.providedBy(u4))
705 self.failUnless(IDirnodeURI.providedBy(u4))
707 u4_verifier = u4.get_verify_cap()
708 u4_verifier_filenode = u4_verifier.get_filenode_cap()
709 self.failUnless(isinstance(u4_verifier_filenode, uri.SSKVerifierURI))
711 verifiers = [u1.get_verify_cap(), u2.get_verify_cap(),
712 u3.get_verify_cap(), u4.get_verify_cap(),
713 uri.DirectoryURIVerifier(n.get_verify_cap()),
716 self.failUnless(IVerifierURI.providedBy(v))
717 self.failUnlessReallyEqual(v._filenode_uri,
718 u1.get_verify_cap()._filenode_uri)
720 def test_immutable(self):
721 readkey = "\x01" * 16
722 uri_extension_hash = hashutil.uri_extension_hash("stuff")
727 fnuri = uri.CHKFileURI(key=readkey,
728 uri_extension_hash=uri_extension_hash,
729 needed_shares=needed_shares,
730 total_shares=total_shares,
732 fncap = fnuri.to_string()
733 self.failUnlessReallyEqual(fncap, "URI:CHK:aeaqcaibaeaqcaibaeaqcaibae:nf3nimquen7aeqm36ekgxomalstenpkvsdmf6fplj7swdatbv5oa:3:10:1234")
734 u1 = uri.ImmutableDirectoryURI(fnuri)
735 self.failUnless(u1.is_readonly())
736 self.failIf(u1.is_mutable())
737 self.failUnless(IURI.providedBy(u1))
738 self.failIf(IFileURI.providedBy(u1))
739 self.failUnless(IDirnodeURI.providedBy(u1))
740 self.failUnless("DirectoryURI" in str(u1))
741 u1_filenode = u1.get_filenode_cap()
742 self.failIf(u1_filenode.is_mutable())
743 self.failUnless(u1_filenode.is_readonly())
744 self.failUnlessReallyEqual(u1_filenode.to_string(), fncap)
745 self.failUnless(str(u1))
747 u2 = uri.from_string(u1.to_string())
748 self.failUnlessReallyEqual(u1.to_string(), u2.to_string())
749 self.failUnless(u2.is_readonly())
750 self.failIf(u2.is_mutable())
751 self.failUnless(IURI.providedBy(u2))
752 self.failIf(IFileURI.providedBy(u2))
753 self.failUnless(IDirnodeURI.providedBy(u2))
755 u2i = uri.from_string(u1.to_string(), deep_immutable=True)
756 self.failUnlessReallyEqual(u1.to_string(), u2i.to_string())
758 u3 = u2.get_readonly()
759 self.failUnlessReallyEqual(u3.to_string(), u2.to_string())
760 self.failUnless(str(u3))
762 u3i = uri.from_string(u2.to_string(), deep_immutable=True)
763 self.failUnlessReallyEqual(u2.to_string(), u3i.to_string())
765 u2_verifier = u2.get_verify_cap()
766 self.failUnless(isinstance(u2_verifier,
767 uri.ImmutableDirectoryURIVerifier),
769 self.failUnless(IVerifierURI.providedBy(u2_verifier))
770 u2vs = u2_verifier.to_string()
771 # URI:DIR2-CHK-Verifier:$key:$ueb:$k:$n:$size
772 self.failUnless(u2vs.startswith("URI:DIR2-CHK-Verifier:"), u2vs)
773 u2_verifier_fileuri = u2_verifier.get_filenode_cap()
774 self.failUnless(IVerifierURI.providedBy(u2_verifier_fileuri))
775 u2vfs = u2_verifier_fileuri.to_string()
776 # URI:CHK-Verifier:$key:$ueb:$k:$n:$size
777 self.failUnlessReallyEqual(u2vfs, fnuri.get_verify_cap().to_string())
778 self.failUnlessReallyEqual(u2vs[len("URI:DIR2-"):], u2vfs[len("URI:"):])
779 self.failUnless(str(u2_verifier))
781 def test_literal(self):
782 u0 = uri.LiteralFileURI("data")
783 u1 = uri.LiteralDirectoryURI(u0)
784 self.failUnless(str(u1))
785 self.failUnlessReallyEqual(u1.to_string(), "URI:DIR2-LIT:mrqxiyi")
786 self.failUnless(u1.is_readonly())
787 self.failIf(u1.is_mutable())
788 self.failUnless(IURI.providedBy(u1))
789 self.failIf(IFileURI.providedBy(u1))
790 self.failUnless(IDirnodeURI.providedBy(u1))
791 self.failUnlessReallyEqual(u1.get_verify_cap(), None)
792 self.failUnlessReallyEqual(u1.get_storage_index(), None)
793 self.failUnlessReallyEqual(u1.abbrev_si(), "<LIT>")
796 writekey = "\x01" * 16
797 fingerprint = "\x02" * 32
798 uri1 = uri.WriteableMDMFFileURI(writekey, fingerprint)
799 d1 = uri.MDMFDirectoryURI(uri1)
800 self.failIf(d1.is_readonly())
801 self.failUnless(d1.is_mutable())
802 self.failUnless(IURI.providedBy(d1))
803 self.failUnless(IDirnodeURI.providedBy(d1))
804 d1_uri = d1.to_string()
806 d2 = uri.from_string(d1_uri)
807 self.failUnlessIsInstance(d2, uri.MDMFDirectoryURI)
808 self.failIf(d2.is_readonly())
809 self.failUnless(d2.is_mutable())
810 self.failUnless(IURI.providedBy(d2))
811 self.failUnless(IDirnodeURI.providedBy(d2))
813 # It doesn't make sense to ask for a deep immutable URI for a
814 # mutable directory, and we should get back a result to that
816 d3 = uri.from_string(d2.to_string(), deep_immutable=True)
817 self.failUnlessIsInstance(d3, uri.UnknownURI)
819 def test_mdmf_with_extensions(self):
820 writekey = "\x01" * 16
821 fingerprint = "\x02" * 32
822 uri1 = uri.WriteableMDMFFileURI(writekey, fingerprint)
823 d1 = uri.MDMFDirectoryURI(uri1)
824 d1_uri = d1.to_string()
825 # Add some extensions, verify that the URI is interpreted
827 d1_uri += ":3:131073"
828 uri2 = uri.from_string(d1_uri)
829 self.failUnlessIsInstance(uri2, uri.MDMFDirectoryURI)
830 self.failUnless(IURI.providedBy(uri2))
831 self.failUnless(IDirnodeURI.providedBy(uri2))
832 self.failUnless(uri1.is_mutable())
833 self.failIf(uri1.is_readonly())
835 d2_uri = uri2.to_string()
836 self.failUnlessIn(":3:131073", d2_uri)
838 # Now attenuate, verify that the extensions persist
839 ro_uri = uri2.get_readonly()
840 self.failUnlessIsInstance(ro_uri, uri.ReadonlyMDMFDirectoryURI)
841 self.failUnless(ro_uri.is_mutable())
842 self.failUnless(ro_uri.is_readonly())
843 self.failUnless(IURI.providedBy(ro_uri))
844 self.failUnless(IDirnodeURI.providedBy(ro_uri))
845 ro_uri_str = ro_uri.to_string()
846 self.failUnlessIn(":3:131073", ro_uri_str)
848 def test_mdmf_attenuation(self):
849 writekey = "\x01" * 16
850 fingerprint = "\x02" * 32
852 uri1 = uri.WriteableMDMFFileURI(writekey, fingerprint)
853 d1 = uri.MDMFDirectoryURI(uri1)
854 self.failUnless(d1.is_mutable())
855 self.failIf(d1.is_readonly())
856 self.failUnless(IURI.providedBy(d1))
857 self.failUnless(IDirnodeURI.providedBy(d1))
859 d1_uri = d1.to_string()
860 d1_uri_from_fn = uri.MDMFDirectoryURI(d1.get_filenode_cap()).to_string()
861 self.failUnlessEqual(d1_uri_from_fn, d1_uri)
863 uri2 = uri.from_string(d1_uri)
864 self.failUnlessIsInstance(uri2, uri.MDMFDirectoryURI)
865 self.failUnless(IURI.providedBy(uri2))
866 self.failUnless(IDirnodeURI.providedBy(uri2))
867 self.failUnless(uri2.is_mutable())
868 self.failIf(uri2.is_readonly())
870 ro = uri2.get_readonly()
871 self.failUnlessIsInstance(ro, uri.ReadonlyMDMFDirectoryURI)
872 self.failUnless(ro.is_mutable())
873 self.failUnless(ro.is_readonly())
874 self.failUnless(IURI.providedBy(ro))
875 self.failUnless(IDirnodeURI.providedBy(ro))
877 ro_uri = ro.to_string()
878 n = uri.from_string(ro_uri, deep_immutable=True)
879 self.failUnlessIsInstance(n, uri.UnknownURI)
881 fn_cap = ro.get_filenode_cap()
882 fn_ro_cap = fn_cap.get_readonly()
883 d3 = uri.ReadonlyMDMFDirectoryURI(fn_ro_cap)
884 self.failUnlessEqual(ro.to_string(), d3.to_string())
885 self.failUnless(ro.is_mutable())
886 self.failUnless(ro.is_readonly())
888 def test_mdmf_verifier(self):
889 # I'm not sure what I want to write here yet.
890 writekey = "\x01" * 16
891 fingerprint = "\x02" * 32
892 uri1 = uri.WriteableMDMFFileURI(writekey, fingerprint)
893 d1 = uri.MDMFDirectoryURI(uri1)
894 v1 = d1.get_verify_cap()
895 self.failUnlessIsInstance(v1, uri.MDMFDirectoryURIVerifier)
896 self.failIf(v1.is_mutable())
898 d2 = uri.from_string(d1.to_string())
899 v2 = d2.get_verify_cap()
900 self.failUnlessIsInstance(v2, uri.MDMFDirectoryURIVerifier)
901 self.failIf(v2.is_mutable())
902 self.failUnlessEqual(v2.to_string(), v1.to_string())
904 # Now attenuate and make sure that works correctly.
905 r3 = d2.get_readonly()
906 v3 = r3.get_verify_cap()
907 self.failUnlessIsInstance(v3, uri.MDMFDirectoryURIVerifier)
908 self.failIf(v3.is_mutable())
909 self.failUnlessEqual(v3.to_string(), v1.to_string())
910 r4 = uri.from_string(r3.to_string())
911 v4 = r4.get_verify_cap()
912 self.failUnlessIsInstance(v4, uri.MDMFDirectoryURIVerifier)
913 self.failIf(v4.is_mutable())
914 self.failUnlessEqual(v4.to_string(), v3.to_string())