]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_uri.py
MDMF: remove extension fields from caps, tolerate arbitrary ones. Fixes #1526
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_uri.py
1
2 import os, re
3 from twisted.trial import unittest
4 from allmydata import uri
5 from allmydata.util import hashutil, base32
6 from allmydata.interfaces import IURI, IFileURI, IDirnodeURI, IMutableFileURI, \
7     IVerifierURI, CapConstraintError
8 import allmydata.test.common_util as testutil
9
10 class Literal(testutil.ReallyEqualMixin, unittest.TestCase):
11     def _help_test(self, data):
12         u = uri.LiteralFileURI(data)
13         self.failUnless(IURI.providedBy(u))
14         self.failUnless(IFileURI.providedBy(u))
15         self.failIf(IDirnodeURI.providedBy(u))
16         self.failUnlessReallyEqual(u.data, data)
17         self.failUnlessReallyEqual(u.get_size(), len(data))
18         self.failUnless(u.is_readonly())
19         self.failIf(u.is_mutable())
20
21         u2 = uri.from_string(u.to_string())
22         self.failUnless(IURI.providedBy(u2))
23         self.failUnless(IFileURI.providedBy(u2))
24         self.failIf(IDirnodeURI.providedBy(u2))
25         self.failUnlessReallyEqual(u2.data, data)
26         self.failUnlessReallyEqual(u2.get_size(), len(data))
27         self.failUnless(u2.is_readonly())
28         self.failIf(u2.is_mutable())
29
30         u2i = uri.from_string(u.to_string(), deep_immutable=True)
31         self.failUnless(IFileURI.providedBy(u2i))
32         self.failIf(IDirnodeURI.providedBy(u2i))
33         self.failUnlessReallyEqual(u2i.data, data)
34         self.failUnlessReallyEqual(u2i.get_size(), len(data))
35         self.failUnless(u2i.is_readonly())
36         self.failIf(u2i.is_mutable())
37
38         u3 = u.get_readonly()
39         self.failUnlessIdentical(u, u3)
40         self.failUnlessReallyEqual(u.get_verify_cap(), None)
41
42         he = u.to_human_encoding()
43         u_h = uri.LiteralFileURI.init_from_human_encoding(he)
44         self.failUnlessReallyEqual(u, u_h)
45
46     def test_empty(self):
47         data = "" # This data is some *very* small data!
48         return self._help_test(data)
49
50     def test_pack(self):
51         data = "This is some small data"
52         return self._help_test(data)
53
54     def test_nonascii(self):
55         data = "This contains \x00 and URI:LIT: and \n, oh my."
56         return self._help_test(data)
57
58 class Compare(testutil.ReallyEqualMixin, unittest.TestCase):
59     def test_compare(self):
60         lit1 = uri.LiteralFileURI("some data")
61         fileURI = 'URI:CHK:f5ahxa25t4qkktywz6teyfvcx4:opuioq7tj2y6idzfp6cazehtmgs5fdcebcz3cygrxyydvcozrmeq:3:10:345834'
62         chk1 = uri.CHKFileURI.init_from_string(fileURI)
63         chk2 = uri.CHKFileURI.init_from_string(fileURI)
64         unk = uri.UnknownURI("lafs://from_the_future")
65         self.failIfEqual(lit1, chk1)
66         self.failUnlessReallyEqual(chk1, chk2)
67         self.failIfEqual(chk1, "not actually a URI")
68         # these should be hashable too
69         s = set([lit1, chk1, chk2, unk])
70         self.failUnlessReallyEqual(len(s), 3) # since chk1==chk2
71
72     def test_is_uri(self):
73         lit1 = uri.LiteralFileURI("some data").to_string()
74         self.failUnless(uri.is_uri(lit1))
75         self.failIf(uri.is_uri(None))
76
77     def test_is_literal_file_uri(self):
78         lit1 = uri.LiteralFileURI("some data").to_string()
79         self.failUnless(uri.is_literal_file_uri(lit1))
80         self.failIf(uri.is_literal_file_uri(None))
81         self.failIf(uri.is_literal_file_uri("foo"))
82         self.failIf(uri.is_literal_file_uri("ro.foo"))
83         self.failIf(uri.is_literal_file_uri("URI:LITfoo"))
84         self.failUnless(uri.is_literal_file_uri("ro.URI:LIT:foo"))
85         self.failUnless(uri.is_literal_file_uri("imm.URI:LIT:foo"))
86
87     def test_has_uri_prefix(self):
88         self.failUnless(uri.has_uri_prefix("URI:foo"))
89         self.failUnless(uri.has_uri_prefix("ro.URI:foo"))
90         self.failUnless(uri.has_uri_prefix("imm.URI:foo"))
91         self.failIf(uri.has_uri_prefix(None))
92         self.failIf(uri.has_uri_prefix("foo"))
93
94 class CHKFile(testutil.ReallyEqualMixin, unittest.TestCase):
95     def test_pack(self):
96         key = "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"
97         storage_index = hashutil.storage_index_hash(key)
98         uri_extension_hash = hashutil.uri_extension_hash("stuff")
99         needed_shares = 25
100         total_shares = 100
101         size = 1234
102         u = uri.CHKFileURI(key=key,
103                            uri_extension_hash=uri_extension_hash,
104                            needed_shares=needed_shares,
105                            total_shares=total_shares,
106                            size=size)
107         self.failUnlessReallyEqual(u.get_storage_index(), storage_index)
108         self.failUnlessReallyEqual(u.key, key)
109         self.failUnlessReallyEqual(u.uri_extension_hash, uri_extension_hash)
110         self.failUnlessReallyEqual(u.needed_shares, needed_shares)
111         self.failUnlessReallyEqual(u.total_shares, total_shares)
112         self.failUnlessReallyEqual(u.size, size)
113         self.failUnless(u.is_readonly())
114         self.failIf(u.is_mutable())
115         self.failUnless(IURI.providedBy(u))
116         self.failUnless(IFileURI.providedBy(u))
117         self.failIf(IDirnodeURI.providedBy(u))
118         self.failUnlessReallyEqual(u.get_size(), 1234)
119
120         u_ro = u.get_readonly()
121         self.failUnlessIdentical(u, u_ro)
122         he = u.to_human_encoding()
123         self.failUnlessReallyEqual(he, "http://127.0.0.1:3456/uri/" + u.to_string())
124         self.failUnlessReallyEqual(uri.CHKFileURI.init_from_human_encoding(he), u)
125
126         u2 = uri.from_string(u.to_string())
127         self.failUnlessReallyEqual(u2.get_storage_index(), storage_index)
128         self.failUnlessReallyEqual(u2.key, key)
129         self.failUnlessReallyEqual(u2.uri_extension_hash, uri_extension_hash)
130         self.failUnlessReallyEqual(u2.needed_shares, needed_shares)
131         self.failUnlessReallyEqual(u2.total_shares, total_shares)
132         self.failUnlessReallyEqual(u2.size, size)
133         self.failUnless(u2.is_readonly())
134         self.failIf(u2.is_mutable())
135         self.failUnless(IURI.providedBy(u2))
136         self.failUnless(IFileURI.providedBy(u2))
137         self.failIf(IDirnodeURI.providedBy(u2))
138         self.failUnlessReallyEqual(u2.get_size(), 1234)
139
140         u2i = uri.from_string(u.to_string(), deep_immutable=True)
141         self.failUnlessReallyEqual(u.to_string(), u2i.to_string())
142         u2ro = uri.from_string(uri.ALLEGED_READONLY_PREFIX + u.to_string())
143         self.failUnlessReallyEqual(u.to_string(), u2ro.to_string())
144         u2imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u.to_string())
145         self.failUnlessReallyEqual(u.to_string(), u2imm.to_string())
146
147         v = u.get_verify_cap()
148         self.failUnless(isinstance(v.to_string(), str))
149         self.failUnless(v.is_readonly())
150         self.failIf(v.is_mutable())
151
152         v2 = uri.from_string(v.to_string())
153         self.failUnlessReallyEqual(v, v2)
154         he = v.to_human_encoding()
155         v2_h = uri.CHKFileVerifierURI.init_from_human_encoding(he)
156         self.failUnlessReallyEqual(v2, v2_h)
157
158         v3 = uri.CHKFileVerifierURI(storage_index="\x00"*16,
159                                     uri_extension_hash="\x00"*32,
160                                     needed_shares=3,
161                                     total_shares=10,
162                                     size=1234)
163         self.failUnless(isinstance(v3.to_string(), str))
164         self.failUnless(v3.is_readonly())
165         self.failIf(v3.is_mutable())
166
167     def test_pack_badly(self):
168         key = "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"
169         storage_index = hashutil.storage_index_hash(key)
170         uri_extension_hash = hashutil.uri_extension_hash("stuff")
171         needed_shares = 25
172         total_shares = 100
173         size = 1234
174         self.failUnlessRaises(TypeError,
175                               uri.CHKFileURI,
176                               key=key,
177                               uri_extension_hash=uri_extension_hash,
178                               needed_shares=needed_shares,
179                               total_shares=total_shares,
180                               size=size,
181
182                               bogus_extra_argument="reject me",
183                               )
184         self.failUnlessRaises(TypeError,
185                               uri.CHKFileVerifierURI,
186                               bogus="bogus")
187         self.failUnlessRaises(TypeError,
188                               uri.CHKFileVerifierURI,
189                               storage_index=storage_index,
190                               uri_extension_hash=uri_extension_hash,
191                               needed_shares=3,
192                               total_shares=10,
193                               # leave size= missing
194                               )
195
196
197 class Extension(testutil.ReallyEqualMixin, unittest.TestCase):
198     def test_pack(self):
199         data = {"stuff": "value",
200                 "size": 12,
201                 "needed_shares": 3,
202                 "big_hash": hashutil.tagged_hash("foo", "bar"),
203                 }
204         ext = uri.pack_extension(data)
205         d = uri.unpack_extension(ext)
206         self.failUnlessReallyEqual(d["stuff"], "value")
207         self.failUnlessReallyEqual(d["size"], 12)
208         self.failUnlessReallyEqual(d["big_hash"], hashutil.tagged_hash("foo", "bar"))
209
210         readable = uri.unpack_extension_readable(ext)
211         self.failUnlessReallyEqual(readable["needed_shares"], 3)
212         self.failUnlessReallyEqual(readable["stuff"], "value")
213         self.failUnlessReallyEqual(readable["size"], 12)
214         self.failUnlessReallyEqual(readable["big_hash"],
215                              base32.b2a(hashutil.tagged_hash("foo", "bar")))
216         self.failUnlessReallyEqual(readable["UEB_hash"],
217                              base32.b2a(hashutil.uri_extension_hash(ext)))
218
219 class Unknown(testutil.ReallyEqualMixin, unittest.TestCase):
220     def test_from_future(self):
221         # any URI type that we don't recognize should be treated as unknown
222         future_uri = "I am a URI from the future. Whatever you do, don't "
223         u = uri.from_string(future_uri)
224         self.failUnless(isinstance(u, uri.UnknownURI))
225         self.failUnlessReallyEqual(u.to_string(), future_uri)
226         self.failUnless(u.get_readonly() is None)
227         self.failUnless(u.get_error() is None)
228
229         u2 = uri.UnknownURI(future_uri, error=CapConstraintError("..."))
230         self.failUnlessReallyEqual(u.to_string(), future_uri)
231         self.failUnless(u2.get_readonly() is None)
232         self.failUnless(isinstance(u2.get_error(), CapConstraintError))
233
234         # Future caps might have non-ASCII chars in them. (Or maybe not, who can tell about the future?)
235         future_uri = u"I am a cap from the \u263A future. Whatever you ".encode('utf-8')
236         u = uri.from_string(future_uri)
237         self.failUnless(isinstance(u, uri.UnknownURI))
238         self.failUnlessReallyEqual(u.to_string(), future_uri)
239         self.failUnless(u.get_readonly() is None)
240         self.failUnless(u.get_error() is None)
241
242         u2 = uri.UnknownURI(future_uri, error=CapConstraintError("..."))
243         self.failUnlessReallyEqual(u.to_string(), future_uri)
244         self.failUnless(u2.get_readonly() is None)
245         self.failUnless(isinstance(u2.get_error(), CapConstraintError))
246
247 class Constraint(testutil.ReallyEqualMixin, unittest.TestCase):
248     def test_constraint(self):
249         good="http://127.0.0.1:3456/uri/URI%3ADIR2%3Agh3l5rbvnv2333mrfvalmjfr4i%3Alz6l7u3z3b7g37s4zkdmfpx5ly4ib4m6thrpbusi6ys62qtc6mma/"
250         uri.DirectoryURI.init_from_human_encoding(good)
251         self.failUnlessRaises(uri.BadURIError, uri.DirectoryURI.init_from_string, good)
252         bad = good + '==='
253         self.failUnlessRaises(uri.BadURIError, uri.DirectoryURI.init_from_human_encoding, bad)
254         self.failUnlessRaises(uri.BadURIError, uri.DirectoryURI.init_from_string, bad)
255         fileURI = 'URI:CHK:gh3l5rbvnv2333mrfvalmjfr4i:lz6l7u3z3b7g37s4zkdmfpx5ly4ib4m6thrpbusi6ys62qtc6mma:3:10:345834'
256         uri.CHKFileURI.init_from_string(fileURI)
257
258 class Mutable(testutil.ReallyEqualMixin, unittest.TestCase):
259     def setUp(self):
260         self.writekey = "\x01" * 16
261         self.fingerprint = "\x02" * 32
262         self.readkey = hashutil.ssk_readkey_hash(self.writekey)
263         self.storage_index = hashutil.ssk_storage_index_hash(self.readkey)
264
265     def test_pack(self):
266         u = uri.WriteableSSKFileURI(self.writekey, self.fingerprint)
267         self.failUnlessReallyEqual(u.writekey, self.writekey)
268         self.failUnlessReallyEqual(u.fingerprint, self.fingerprint)
269         self.failIf(u.is_readonly())
270         self.failUnless(u.is_mutable())
271         self.failUnless(IURI.providedBy(u))
272         self.failUnless(IMutableFileURI.providedBy(u))
273         self.failIf(IDirnodeURI.providedBy(u))
274         self.failUnless("WriteableSSKFileURI" in str(u))
275
276         he = u.to_human_encoding()
277         u_h = uri.WriteableSSKFileURI.init_from_human_encoding(he)
278         self.failUnlessReallyEqual(u, u_h)
279
280         u2 = uri.from_string(u.to_string())
281         self.failUnlessReallyEqual(u2.writekey, self.writekey)
282         self.failUnlessReallyEqual(u2.fingerprint, self.fingerprint)
283         self.failIf(u2.is_readonly())
284         self.failUnless(u2.is_mutable())
285         self.failUnless(IURI.providedBy(u2))
286         self.failUnless(IMutableFileURI.providedBy(u2))
287         self.failIf(IDirnodeURI.providedBy(u2))
288
289         u2i = uri.from_string(u.to_string(), deep_immutable=True)
290         self.failUnless(isinstance(u2i, uri.UnknownURI), u2i)
291         u2ro = uri.from_string(uri.ALLEGED_READONLY_PREFIX + u.to_string())
292         self.failUnless(isinstance(u2ro, uri.UnknownURI), u2ro)
293         u2imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u.to_string())
294         self.failUnless(isinstance(u2imm, uri.UnknownURI), u2imm)
295
296         u3 = u2.get_readonly()
297         readkey = hashutil.ssk_readkey_hash(self.writekey)
298         self.failUnlessReallyEqual(u3.fingerprint, self.fingerprint)
299         self.failUnlessReallyEqual(u3.readkey, readkey)
300         self.failUnless(u3.is_readonly())
301         self.failUnless(u3.is_mutable())
302         self.failUnless(IURI.providedBy(u3))
303         self.failUnless(IMutableFileURI.providedBy(u3))
304         self.failIf(IDirnodeURI.providedBy(u3))
305
306         u3i = uri.from_string(u3.to_string(), deep_immutable=True)
307         self.failUnless(isinstance(u3i, uri.UnknownURI), u3i)
308         u3ro = uri.from_string(uri.ALLEGED_READONLY_PREFIX + u3.to_string())
309         self.failUnlessReallyEqual(u3.to_string(), u3ro.to_string())
310         u3imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u3.to_string())
311         self.failUnless(isinstance(u3imm, uri.UnknownURI), u3imm)
312
313         he = u3.to_human_encoding()
314         u3_h = uri.ReadonlySSKFileURI.init_from_human_encoding(he)
315         self.failUnlessReallyEqual(u3, u3_h)
316
317         u4 = uri.ReadonlySSKFileURI(readkey, self.fingerprint)
318         self.failUnlessReallyEqual(u4.fingerprint, self.fingerprint)
319         self.failUnlessReallyEqual(u4.readkey, readkey)
320         self.failUnless(u4.is_readonly())
321         self.failUnless(u4.is_mutable())
322         self.failUnless(IURI.providedBy(u4))
323         self.failUnless(IMutableFileURI.providedBy(u4))
324         self.failIf(IDirnodeURI.providedBy(u4))
325
326         u4i = uri.from_string(u4.to_string(), deep_immutable=True)
327         self.failUnless(isinstance(u4i, uri.UnknownURI), u4i)
328         u4ro = uri.from_string(uri.ALLEGED_READONLY_PREFIX + u4.to_string())
329         self.failUnlessReallyEqual(u4.to_string(), u4ro.to_string())
330         u4imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u4.to_string())
331         self.failUnless(isinstance(u4imm, uri.UnknownURI), u4imm)
332
333         u4a = uri.from_string(u4.to_string())
334         self.failUnlessReallyEqual(u4a, u4)
335         self.failUnless("ReadonlySSKFileURI" in str(u4a))
336         self.failUnlessIdentical(u4a.get_readonly(), u4a)
337
338         u5 = u4.get_verify_cap()
339         self.failUnless(IVerifierURI.providedBy(u5))
340         self.failUnlessReallyEqual(u5.get_storage_index(), u.get_storage_index())
341         u7 = u.get_verify_cap()
342         self.failUnless(IVerifierURI.providedBy(u7))
343         self.failUnlessReallyEqual(u7.get_storage_index(), u.get_storage_index())
344
345         he = u5.to_human_encoding()
346         u5_h = uri.SSKVerifierURI.init_from_human_encoding(he)
347         self.failUnlessReallyEqual(u5, u5_h)
348
349
350     def test_writeable_mdmf_cap(self):
351         u1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
352         cap = u1.to_string()
353         u = uri.WriteableMDMFFileURI.init_from_string(cap)
354
355         self.failUnless(IMutableFileURI.providedBy(u))
356         self.failUnlessReallyEqual(u.fingerprint, self.fingerprint)
357         self.failUnlessReallyEqual(u.writekey, self.writekey)
358         self.failUnless(u.is_mutable())
359         self.failIf(u.is_readonly())
360         self.failUnlessEqual(cap, u.to_string())
361
362         # Now get a readonly cap from the writeable cap, and test that it
363         # degrades gracefully.
364         ru = u.get_readonly()
365         self.failUnlessReallyEqual(self.readkey, ru.readkey)
366         self.failUnlessReallyEqual(self.fingerprint, ru.fingerprint)
367         self.failUnless(ru.is_mutable())
368         self.failUnless(ru.is_readonly())
369
370         # Now get a verifier cap.
371         vu = ru.get_verify_cap()
372         self.failUnlessReallyEqual(self.storage_index, vu.storage_index)
373         self.failUnlessReallyEqual(self.fingerprint, vu.fingerprint)
374         self.failUnless(IVerifierURI.providedBy(vu))
375
376     def test_readonly_mdmf_cap(self):
377         u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
378         cap = u1.to_string()
379         u2 = uri.ReadonlyMDMFFileURI.init_from_string(cap)
380
381         self.failUnlessReallyEqual(u2.fingerprint, self.fingerprint)
382         self.failUnlessReallyEqual(u2.readkey, self.readkey)
383         self.failUnless(u2.is_readonly())
384         self.failUnless(u2.is_mutable())
385
386         vu = u2.get_verify_cap()
387         self.failUnlessEqual(vu.storage_index, self.storage_index)
388         self.failUnlessEqual(vu.fingerprint, self.fingerprint)
389
390     def test_create_writeable_mdmf_cap_from_readcap(self):
391         # we shouldn't be able to create a writeable MDMF cap given only a
392         # readcap.
393         u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
394         cap = u1.to_string()
395         self.failUnlessRaises(uri.BadURIError,
396                               uri.WriteableMDMFFileURI.init_from_string,
397                               cap)
398
399     def test_create_writeable_mdmf_cap_from_verifycap(self):
400         u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
401         cap = u1.to_string()
402         self.failUnlessRaises(uri.BadURIError,
403                               uri.WriteableMDMFFileURI.init_from_string,
404                               cap)
405
406     def test_create_readonly_mdmf_cap_from_verifycap(self):
407         u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
408         cap = u1.to_string()
409         self.failUnlessRaises(uri.BadURIError,
410                               uri.ReadonlyMDMFFileURI.init_from_string,
411                               cap)
412
413     def test_mdmf_verifier_cap(self):
414         u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
415         self.failUnless(u1.is_readonly())
416         self.failIf(u1.is_mutable())
417         self.failUnlessReallyEqual(self.storage_index, u1.storage_index)
418         self.failUnlessReallyEqual(self.fingerprint, u1.fingerprint)
419
420         cap = u1.to_string()
421         u2 = uri.MDMFVerifierURI.init_from_string(cap)
422         self.failUnless(u2.is_readonly())
423         self.failIf(u2.is_mutable())
424         self.failUnlessReallyEqual(self.storage_index, u2.storage_index)
425         self.failUnlessReallyEqual(self.fingerprint, u2.fingerprint)
426
427         u3 = u2.get_readonly()
428         self.failUnlessReallyEqual(u3, u2)
429
430         u4 = u2.get_verify_cap()
431         self.failUnlessReallyEqual(u4, u2)
432
433     def test_mdmf_cap_ignore_extensions(self):
434         # MDMF caps can be arbitrarily extended after the fingerprint and
435         # key/storage index fields. tahoe-1.9 is supposed to ignore any
436         # extensions, and not add any itself.
437         u1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
438         cap = u1.to_string()
439
440         cap2 = cap+":I COME FROM THE FUTURE"
441         u2 = uri.WriteableMDMFFileURI.init_from_string(cap2)
442         self.failUnlessReallyEqual(self.writekey, u2.writekey)
443         self.failUnlessReallyEqual(self.fingerprint, u2.fingerprint)
444         self.failIf(u2.is_readonly())
445         self.failUnless(u2.is_mutable())
446
447         cap3 = cap+":"+os.urandom(40) # parse *that*!
448         u3 = uri.WriteableMDMFFileURI.init_from_string(cap3)
449         self.failUnlessReallyEqual(self.writekey, u3.writekey)
450         self.failUnlessReallyEqual(self.fingerprint, u3.fingerprint)
451         self.failIf(u3.is_readonly())
452         self.failUnless(u3.is_mutable())
453
454         cap4 = u1.get_readonly().to_string()+":ooh scary future stuff"
455         u4 = uri.from_string_mutable_filenode(cap4)
456         self.failUnlessReallyEqual(self.readkey, u4.readkey)
457         self.failUnlessReallyEqual(self.fingerprint, u4.fingerprint)
458         self.failUnless(u4.is_readonly())
459         self.failUnless(u4.is_mutable())
460
461         cap5 = u1.get_verify_cap().to_string()+":spoilers!"
462         u5 = uri.from_string(cap5)
463         self.failUnlessReallyEqual(self.storage_index, u5.storage_index)
464         self.failUnlessReallyEqual(self.fingerprint, u5.fingerprint)
465         self.failUnless(u5.is_readonly())
466         self.failIf(u5.is_mutable())
467
468
469     def test_mdmf_valid_human_encoding(self):
470         # What's a human encoding? Well, it's of the form:
471         base = "https://127.0.0.1:3456/uri/"
472         # With a cap on the end. For each of the cap types, we need to
473         # test that a valid cap (with and without the traditional
474         # separators) is recognized and accepted by the classes.
475         w1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
476         r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
477         v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
478
479         # These will yield three different caps.
480         for o in (w1, r1, v1):
481             url = base + o.to_string()
482             o1 = o.__class__.init_from_human_encoding(url)
483             self.failUnlessReallyEqual(o1, o)
484
485             # Note that our cap will, by default, have : as separators.
486             # But it's expected that users from, e.g., the WUI, will
487             # have %3A as a separator. We need to make sure that the
488             # initialization routine handles that, too.
489             cap = o.to_string()
490             cap = re.sub(":", "%3A", cap)
491             url = base + cap
492             o2 = o.__class__.init_from_human_encoding(url)
493             self.failUnlessReallyEqual(o2, o)
494
495
496     def test_mdmf_human_encoding_invalid_base(self):
497         # What's a human encoding? Well, it's of the form:
498         base = "https://127.0.0.1:3456/foo/bar/bazuri/"
499         # With a cap on the end. For each of the cap types, we need to
500         # test that a valid cap (with and without the traditional
501         # separators) is recognized and accepted by the classes.
502         w1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
503         r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
504         v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
505
506         # These will yield three different caps.
507         for o in (w1, r1, v1):
508             url = base + o.to_string()
509             self.failUnlessRaises(uri.BadURIError,
510                                   o.__class__.init_from_human_encoding,
511                                   url)
512
513     def test_mdmf_human_encoding_invalid_cap(self):
514         base = "https://127.0.0.1:3456/uri/"
515         # With a cap on the end. For each of the cap types, we need to
516         # test that a valid cap (with and without the traditional
517         # separators) is recognized and accepted by the classes.
518         w1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
519         r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
520         v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
521
522         # These will yield three different caps.
523         for o in (w1, r1, v1):
524             # not exhaustive, obviously...
525             url = base + o.to_string() + "foobarbaz"
526             url2 = base + "foobarbaz" + o.to_string()
527             url3 = base + o.to_string()[:25] + "foo" + o.to_string()[:25]
528             for u in (url, url2, url3):
529                 self.failUnlessRaises(uri.BadURIError,
530                                       o.__class__.init_from_human_encoding,
531                                       u)
532
533     def test_mdmf_from_string(self):
534         # Make sure that the from_string utility function works with
535         # MDMF caps.
536         u1 = uri.WriteableMDMFFileURI(self.writekey, self.fingerprint)
537         cap = u1.to_string()
538         self.failUnless(uri.is_uri(cap))
539         u2 = uri.from_string(cap)
540         self.failUnlessReallyEqual(u1, u2)
541         u3 = uri.from_string_mutable_filenode(cap)
542         self.failUnlessEqual(u3, u1)
543
544         u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
545         cap = u1.to_string()
546         self.failUnless(uri.is_uri(cap))
547         u2 = uri.from_string(cap)
548         self.failUnlessReallyEqual(u1, u2)
549         u3 = uri.from_string_mutable_filenode(cap)
550         self.failUnlessEqual(u3, u1)
551
552         u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
553         cap = u1.to_string()
554         self.failUnless(uri.is_uri(cap))
555         u2 = uri.from_string(cap)
556         self.failUnlessReallyEqual(u1, u2)
557         u3 = uri.from_string_verifier(cap)
558         self.failUnlessEqual(u3, u1)
559
560
561 class Dirnode(testutil.ReallyEqualMixin, unittest.TestCase):
562     def test_pack(self):
563         writekey = "\x01" * 16
564         fingerprint = "\x02" * 32
565
566         n = uri.WriteableSSKFileURI(writekey, fingerprint)
567         u1 = uri.DirectoryURI(n)
568         self.failIf(u1.is_readonly())
569         self.failUnless(u1.is_mutable())
570         self.failUnless(IURI.providedBy(u1))
571         self.failIf(IFileURI.providedBy(u1))
572         self.failUnless(IDirnodeURI.providedBy(u1))
573         self.failUnless("DirectoryURI" in str(u1))
574         u1_filenode = u1.get_filenode_cap()
575         self.failUnless(u1_filenode.is_mutable())
576         self.failIf(u1_filenode.is_readonly())
577
578         u2 = uri.from_string(u1.to_string())
579         self.failUnlessReallyEqual(u1.to_string(), u2.to_string())
580         self.failIf(u2.is_readonly())
581         self.failUnless(u2.is_mutable())
582         self.failUnless(IURI.providedBy(u2))
583         self.failIf(IFileURI.providedBy(u2))
584         self.failUnless(IDirnodeURI.providedBy(u2))
585
586         u2i = uri.from_string(u1.to_string(), deep_immutable=True)
587         self.failUnless(isinstance(u2i, uri.UnknownURI))
588
589         u3 = u2.get_readonly()
590         self.failUnless(u3.is_readonly())
591         self.failUnless(u3.is_mutable())
592         self.failUnless(IURI.providedBy(u3))
593         self.failIf(IFileURI.providedBy(u3))
594         self.failUnless(IDirnodeURI.providedBy(u3))
595
596         u3i = uri.from_string(u2.to_string(), deep_immutable=True)
597         self.failUnless(isinstance(u3i, uri.UnknownURI))
598
599         u3n = u3._filenode_uri
600         self.failUnless(u3n.is_readonly())
601         self.failUnless(u3n.is_mutable())
602         u3_filenode = u3.get_filenode_cap()
603         self.failUnless(u3_filenode.is_mutable())
604         self.failUnless(u3_filenode.is_readonly())
605
606         u3a = uri.from_string(u3.to_string())
607         self.failUnlessIdentical(u3a, u3a.get_readonly())
608
609         u4 = uri.ReadonlyDirectoryURI(u2._filenode_uri.get_readonly())
610         self.failUnlessReallyEqual(u4.to_string(), u3.to_string())
611         self.failUnless(u4.is_readonly())
612         self.failUnless(u4.is_mutable())
613         self.failUnless(IURI.providedBy(u4))
614         self.failIf(IFileURI.providedBy(u4))
615         self.failUnless(IDirnodeURI.providedBy(u4))
616
617         u4_verifier = u4.get_verify_cap()
618         u4_verifier_filenode = u4_verifier.get_filenode_cap()
619         self.failUnless(isinstance(u4_verifier_filenode, uri.SSKVerifierURI))
620
621         verifiers = [u1.get_verify_cap(), u2.get_verify_cap(),
622                      u3.get_verify_cap(), u4.get_verify_cap(),
623                      uri.DirectoryURIVerifier(n.get_verify_cap()),
624                      ]
625         for v in verifiers:
626             self.failUnless(IVerifierURI.providedBy(v))
627             self.failUnlessReallyEqual(v._filenode_uri,
628                                  u1.get_verify_cap()._filenode_uri)
629
630     def test_immutable(self):
631         readkey = "\x01" * 16
632         uri_extension_hash = hashutil.uri_extension_hash("stuff")
633         needed_shares = 3
634         total_shares = 10
635         size = 1234
636
637         fnuri = uri.CHKFileURI(key=readkey,
638                                uri_extension_hash=uri_extension_hash,
639                                needed_shares=needed_shares,
640                                total_shares=total_shares,
641                                size=size)
642         fncap = fnuri.to_string()
643         self.failUnlessReallyEqual(fncap, "URI:CHK:aeaqcaibaeaqcaibaeaqcaibae:nf3nimquen7aeqm36ekgxomalstenpkvsdmf6fplj7swdatbv5oa:3:10:1234")
644         u1 = uri.ImmutableDirectoryURI(fnuri)
645         self.failUnless(u1.is_readonly())
646         self.failIf(u1.is_mutable())
647         self.failUnless(IURI.providedBy(u1))
648         self.failIf(IFileURI.providedBy(u1))
649         self.failUnless(IDirnodeURI.providedBy(u1))
650         self.failUnless("DirectoryURI" in str(u1))
651         u1_filenode = u1.get_filenode_cap()
652         self.failIf(u1_filenode.is_mutable())
653         self.failUnless(u1_filenode.is_readonly())
654         self.failUnlessReallyEqual(u1_filenode.to_string(), fncap)
655         self.failUnless(str(u1))
656
657         u2 = uri.from_string(u1.to_string())
658         self.failUnlessReallyEqual(u1.to_string(), u2.to_string())
659         self.failUnless(u2.is_readonly())
660         self.failIf(u2.is_mutable())
661         self.failUnless(IURI.providedBy(u2))
662         self.failIf(IFileURI.providedBy(u2))
663         self.failUnless(IDirnodeURI.providedBy(u2))
664
665         u2i = uri.from_string(u1.to_string(), deep_immutable=True)
666         self.failUnlessReallyEqual(u1.to_string(), u2i.to_string())
667
668         u3 = u2.get_readonly()
669         self.failUnlessReallyEqual(u3.to_string(), u2.to_string())
670         self.failUnless(str(u3))
671
672         u3i = uri.from_string(u2.to_string(), deep_immutable=True)
673         self.failUnlessReallyEqual(u2.to_string(), u3i.to_string())
674
675         u2_verifier = u2.get_verify_cap()
676         self.failUnless(isinstance(u2_verifier,
677                                    uri.ImmutableDirectoryURIVerifier),
678                         u2_verifier)
679         self.failUnless(IVerifierURI.providedBy(u2_verifier))
680         u2vs = u2_verifier.to_string()
681         # URI:DIR2-CHK-Verifier:$key:$ueb:$k:$n:$size
682         self.failUnless(u2vs.startswith("URI:DIR2-CHK-Verifier:"), u2vs)
683         u2_verifier_fileuri = u2_verifier.get_filenode_cap()
684         self.failUnless(IVerifierURI.providedBy(u2_verifier_fileuri))
685         u2vfs = u2_verifier_fileuri.to_string()
686         # URI:CHK-Verifier:$key:$ueb:$k:$n:$size
687         self.failUnlessReallyEqual(u2vfs, fnuri.get_verify_cap().to_string())
688         self.failUnlessReallyEqual(u2vs[len("URI:DIR2-"):], u2vfs[len("URI:"):])
689         self.failUnless(str(u2_verifier))
690
691     def test_literal(self):
692         u0 = uri.LiteralFileURI("data")
693         u1 = uri.LiteralDirectoryURI(u0)
694         self.failUnless(str(u1))
695         self.failUnlessReallyEqual(u1.to_string(), "URI:DIR2-LIT:mrqxiyi")
696         self.failUnless(u1.is_readonly())
697         self.failIf(u1.is_mutable())
698         self.failUnless(IURI.providedBy(u1))
699         self.failIf(IFileURI.providedBy(u1))
700         self.failUnless(IDirnodeURI.providedBy(u1))
701         self.failUnlessReallyEqual(u1.get_verify_cap(), None)
702         self.failUnlessReallyEqual(u1.get_storage_index(), None)
703         self.failUnlessReallyEqual(u1.abbrev_si(), "<LIT>")
704
705     def test_mdmf(self):
706         writekey = "\x01" * 16
707         fingerprint = "\x02" * 32
708         uri1 = uri.WriteableMDMFFileURI(writekey, fingerprint)
709         d1 = uri.MDMFDirectoryURI(uri1)
710         self.failIf(d1.is_readonly())
711         self.failUnless(d1.is_mutable())
712         self.failUnless(IURI.providedBy(d1))
713         self.failUnless(IDirnodeURI.providedBy(d1))
714         d1_uri = d1.to_string()
715
716         d2 = uri.from_string(d1_uri)
717         self.failUnlessIsInstance(d2, uri.MDMFDirectoryURI)
718         self.failIf(d2.is_readonly())
719         self.failUnless(d2.is_mutable())
720         self.failUnless(IURI.providedBy(d2))
721         self.failUnless(IDirnodeURI.providedBy(d2))
722
723         # It doesn't make sense to ask for a deep immutable URI for a
724         # mutable directory, and we should get back a result to that
725         # effect.
726         d3 = uri.from_string(d2.to_string(), deep_immutable=True)
727         self.failUnlessIsInstance(d3, uri.UnknownURI)
728
729     def test_mdmf_attenuation(self):
730         writekey = "\x01" * 16
731         fingerprint = "\x02" * 32
732
733         uri1 = uri.WriteableMDMFFileURI(writekey, fingerprint)
734         d1 = uri.MDMFDirectoryURI(uri1)
735         self.failUnless(d1.is_mutable())
736         self.failIf(d1.is_readonly())
737         self.failUnless(IURI.providedBy(d1))
738         self.failUnless(IDirnodeURI.providedBy(d1))
739
740         d1_uri = d1.to_string()
741         d1_uri_from_fn = uri.MDMFDirectoryURI(d1.get_filenode_cap()).to_string()
742         self.failUnlessEqual(d1_uri_from_fn, d1_uri)
743
744         uri2 = uri.from_string(d1_uri)
745         self.failUnlessIsInstance(uri2, uri.MDMFDirectoryURI)
746         self.failUnless(IURI.providedBy(uri2))
747         self.failUnless(IDirnodeURI.providedBy(uri2))
748         self.failUnless(uri2.is_mutable())
749         self.failIf(uri2.is_readonly())
750
751         ro = uri2.get_readonly()
752         self.failUnlessIsInstance(ro, uri.ReadonlyMDMFDirectoryURI)
753         self.failUnless(ro.is_mutable())
754         self.failUnless(ro.is_readonly())
755         self.failUnless(IURI.providedBy(ro))
756         self.failUnless(IDirnodeURI.providedBy(ro))
757
758         ro_uri = ro.to_string()
759         n = uri.from_string(ro_uri, deep_immutable=True)
760         self.failUnlessIsInstance(n, uri.UnknownURI)
761
762         fn_cap = ro.get_filenode_cap()
763         fn_ro_cap = fn_cap.get_readonly()
764         d3 = uri.ReadonlyMDMFDirectoryURI(fn_ro_cap)
765         self.failUnlessEqual(ro.to_string(), d3.to_string())
766         self.failUnless(ro.is_mutable())
767         self.failUnless(ro.is_readonly())
768
769     def test_mdmf_verifier(self):
770         # I'm not sure what I want to write here yet.
771         writekey = "\x01" * 16
772         fingerprint = "\x02" * 32
773         uri1 = uri.WriteableMDMFFileURI(writekey, fingerprint)
774         d1 = uri.MDMFDirectoryURI(uri1)
775         v1 = d1.get_verify_cap()
776         self.failUnlessIsInstance(v1, uri.MDMFDirectoryURIVerifier)
777         self.failIf(v1.is_mutable())
778
779         d2 = uri.from_string(d1.to_string())
780         v2 = d2.get_verify_cap()
781         self.failUnlessIsInstance(v2, uri.MDMFDirectoryURIVerifier)
782         self.failIf(v2.is_mutable())
783         self.failUnlessEqual(v2.to_string(), v1.to_string())
784
785         # Now attenuate and make sure that works correctly.
786         r3 = d2.get_readonly()
787         v3 = r3.get_verify_cap()
788         self.failUnlessIsInstance(v3, uri.MDMFDirectoryURIVerifier)
789         self.failIf(v3.is_mutable())
790         self.failUnlessEqual(v3.to_string(), v1.to_string())
791         r4 = uri.from_string(r3.to_string())
792         v4 = r4.get_verify_cap()
793         self.failUnlessIsInstance(v4, uri.MDMFDirectoryURIVerifier)
794         self.failIf(v4.is_mutable())
795         self.failUnlessEqual(v4.to_string(), v3.to_string())