]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_uri.py
27b98534e52d81c678736f01eb1c1e4416ffc4ce
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_uri.py
1
2 import re
3 from twisted.trial import unittest
4 from allmydata import uri
5 from allmydata.util import hashutil, base32
6 from allmydata.interfaces import IURI, IFileURI, IDirnodeURI, IMutableFileURI, \
7     IVerifierURI, CapConstraintError
8 import allmydata.test.common_util as testutil
9
10 class Literal(testutil.ReallyEqualMixin, unittest.TestCase):
11     def _help_test(self, data):
12         u = uri.LiteralFileURI(data)
13         self.failUnless(IURI.providedBy(u))
14         self.failUnless(IFileURI.providedBy(u))
15         self.failIf(IDirnodeURI.providedBy(u))
16         self.failUnlessReallyEqual(u.data, data)
17         self.failUnlessReallyEqual(u.get_size(), len(data))
18         self.failUnless(u.is_readonly())
19         self.failIf(u.is_mutable())
20
21         u2 = uri.from_string(u.to_string())
22         self.failUnless(IURI.providedBy(u2))
23         self.failUnless(IFileURI.providedBy(u2))
24         self.failIf(IDirnodeURI.providedBy(u2))
25         self.failUnlessReallyEqual(u2.data, data)
26         self.failUnlessReallyEqual(u2.get_size(), len(data))
27         self.failUnless(u2.is_readonly())
28         self.failIf(u2.is_mutable())
29
30         u2i = uri.from_string(u.to_string(), deep_immutable=True)
31         self.failUnless(IFileURI.providedBy(u2i))
32         self.failIf(IDirnodeURI.providedBy(u2i))
33         self.failUnlessReallyEqual(u2i.data, data)
34         self.failUnlessReallyEqual(u2i.get_size(), len(data))
35         self.failUnless(u2i.is_readonly())
36         self.failIf(u2i.is_mutable())
37
38         u3 = u.get_readonly()
39         self.failUnlessIdentical(u, u3)
40         self.failUnlessReallyEqual(u.get_verify_cap(), None)
41
42         he = u.to_human_encoding()
43         u_h = uri.LiteralFileURI.init_from_human_encoding(he)
44         self.failUnlessReallyEqual(u, u_h)
45
46     def test_empty(self):
47         data = "" # This data is some *very* small data!
48         return self._help_test(data)
49
50     def test_pack(self):
51         data = "This is some small data"
52         return self._help_test(data)
53
54     def test_nonascii(self):
55         data = "This contains \x00 and URI:LIT: and \n, oh my."
56         return self._help_test(data)
57
58 class Compare(testutil.ReallyEqualMixin, unittest.TestCase):
59     def test_compare(self):
60         lit1 = uri.LiteralFileURI("some data")
61         fileURI = 'URI:CHK:f5ahxa25t4qkktywz6teyfvcx4:opuioq7tj2y6idzfp6cazehtmgs5fdcebcz3cygrxyydvcozrmeq:3:10:345834'
62         chk1 = uri.CHKFileURI.init_from_string(fileURI)
63         chk2 = uri.CHKFileURI.init_from_string(fileURI)
64         unk = uri.UnknownURI("lafs://from_the_future")
65         self.failIfEqual(lit1, chk1)
66         self.failUnlessReallyEqual(chk1, chk2)
67         self.failIfEqual(chk1, "not actually a URI")
68         # these should be hashable too
69         s = set([lit1, chk1, chk2, unk])
70         self.failUnlessReallyEqual(len(s), 3) # since chk1==chk2
71
72     def test_is_uri(self):
73         lit1 = uri.LiteralFileURI("some data").to_string()
74         self.failUnless(uri.is_uri(lit1))
75         self.failIf(uri.is_uri(None))
76
77     def test_is_literal_file_uri(self):
78         lit1 = uri.LiteralFileURI("some data").to_string()
79         self.failUnless(uri.is_literal_file_uri(lit1))
80         self.failIf(uri.is_literal_file_uri(None))
81         self.failIf(uri.is_literal_file_uri("foo"))
82         self.failIf(uri.is_literal_file_uri("ro.foo"))
83         self.failIf(uri.is_literal_file_uri("URI:LITfoo"))
84         self.failUnless(uri.is_literal_file_uri("ro.URI:LIT:foo"))
85         self.failUnless(uri.is_literal_file_uri("imm.URI:LIT:foo"))
86
87     def test_has_uri_prefix(self):
88         self.failUnless(uri.has_uri_prefix("URI:foo"))
89         self.failUnless(uri.has_uri_prefix("ro.URI:foo"))
90         self.failUnless(uri.has_uri_prefix("imm.URI:foo"))
91         self.failIf(uri.has_uri_prefix(None))
92         self.failIf(uri.has_uri_prefix("foo"))
93
94 class CHKFile(testutil.ReallyEqualMixin, unittest.TestCase):
95     def test_pack(self):
96         key = "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"
97         storage_index = hashutil.storage_index_hash(key)
98         uri_extension_hash = hashutil.uri_extension_hash("stuff")
99         needed_shares = 25
100         total_shares = 100
101         size = 1234
102         u = uri.CHKFileURI(key=key,
103                            uri_extension_hash=uri_extension_hash,
104                            needed_shares=needed_shares,
105                            total_shares=total_shares,
106                            size=size)
107         self.failUnlessReallyEqual(u.get_storage_index(), storage_index)
108         self.failUnlessReallyEqual(u.key, key)
109         self.failUnlessReallyEqual(u.uri_extension_hash, uri_extension_hash)
110         self.failUnlessReallyEqual(u.needed_shares, needed_shares)
111         self.failUnlessReallyEqual(u.total_shares, total_shares)
112         self.failUnlessReallyEqual(u.size, size)
113         self.failUnless(u.is_readonly())
114         self.failIf(u.is_mutable())
115         self.failUnless(IURI.providedBy(u))
116         self.failUnless(IFileURI.providedBy(u))
117         self.failIf(IDirnodeURI.providedBy(u))
118         self.failUnlessReallyEqual(u.get_size(), 1234)
119
120         u_ro = u.get_readonly()
121         self.failUnlessIdentical(u, u_ro)
122         he = u.to_human_encoding()
123         self.failUnlessReallyEqual(he, "http://127.0.0.1:3456/uri/" + u.to_string())
124         self.failUnlessReallyEqual(uri.CHKFileURI.init_from_human_encoding(he), u)
125
126         u2 = uri.from_string(u.to_string())
127         self.failUnlessReallyEqual(u2.get_storage_index(), storage_index)
128         self.failUnlessReallyEqual(u2.key, key)
129         self.failUnlessReallyEqual(u2.uri_extension_hash, uri_extension_hash)
130         self.failUnlessReallyEqual(u2.needed_shares, needed_shares)
131         self.failUnlessReallyEqual(u2.total_shares, total_shares)
132         self.failUnlessReallyEqual(u2.size, size)
133         self.failUnless(u2.is_readonly())
134         self.failIf(u2.is_mutable())
135         self.failUnless(IURI.providedBy(u2))
136         self.failUnless(IFileURI.providedBy(u2))
137         self.failIf(IDirnodeURI.providedBy(u2))
138         self.failUnlessReallyEqual(u2.get_size(), 1234)
139
140         u2i = uri.from_string(u.to_string(), deep_immutable=True)
141         self.failUnlessReallyEqual(u.to_string(), u2i.to_string())
142         u2ro = uri.from_string(uri.ALLEGED_READONLY_PREFIX + u.to_string())
143         self.failUnlessReallyEqual(u.to_string(), u2ro.to_string())
144         u2imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u.to_string())
145         self.failUnlessReallyEqual(u.to_string(), u2imm.to_string())
146
147         v = u.get_verify_cap()
148         self.failUnless(isinstance(v.to_string(), str))
149         self.failUnless(v.is_readonly())
150         self.failIf(v.is_mutable())
151
152         v2 = uri.from_string(v.to_string())
153         self.failUnlessReallyEqual(v, v2)
154         he = v.to_human_encoding()
155         v2_h = uri.CHKFileVerifierURI.init_from_human_encoding(he)
156         self.failUnlessReallyEqual(v2, v2_h)
157
158         v3 = uri.CHKFileVerifierURI(storage_index="\x00"*16,
159                                     uri_extension_hash="\x00"*32,
160                                     needed_shares=3,
161                                     total_shares=10,
162                                     size=1234)
163         self.failUnless(isinstance(v3.to_string(), str))
164         self.failUnless(v3.is_readonly())
165         self.failIf(v3.is_mutable())
166
167     def test_pack_badly(self):
168         key = "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"
169         storage_index = hashutil.storage_index_hash(key)
170         uri_extension_hash = hashutil.uri_extension_hash("stuff")
171         needed_shares = 25
172         total_shares = 100
173         size = 1234
174         self.failUnlessRaises(TypeError,
175                               uri.CHKFileURI,
176                               key=key,
177                               uri_extension_hash=uri_extension_hash,
178                               needed_shares=needed_shares,
179                               total_shares=total_shares,
180                               size=size,
181
182                               bogus_extra_argument="reject me",
183                               )
184         self.failUnlessRaises(TypeError,
185                               uri.CHKFileVerifierURI,
186                               bogus="bogus")
187         self.failUnlessRaises(TypeError,
188                               uri.CHKFileVerifierURI,
189                               storage_index=storage_index,
190                               uri_extension_hash=uri_extension_hash,
191                               needed_shares=3,
192                               total_shares=10,
193                               # leave size= missing
194                               )
195
196
197 class Extension(testutil.ReallyEqualMixin, unittest.TestCase):
198     def test_pack(self):
199         data = {"stuff": "value",
200                 "size": 12,
201                 "needed_shares": 3,
202                 "big_hash": hashutil.tagged_hash("foo", "bar"),
203                 }
204         ext = uri.pack_extension(data)
205         d = uri.unpack_extension(ext)
206         self.failUnlessReallyEqual(d["stuff"], "value")
207         self.failUnlessReallyEqual(d["size"], 12)
208         self.failUnlessReallyEqual(d["big_hash"], hashutil.tagged_hash("foo", "bar"))
209
210         readable = uri.unpack_extension_readable(ext)
211         self.failUnlessReallyEqual(readable["needed_shares"], 3)
212         self.failUnlessReallyEqual(readable["stuff"], "value")
213         self.failUnlessReallyEqual(readable["size"], 12)
214         self.failUnlessReallyEqual(readable["big_hash"],
215                              base32.b2a(hashutil.tagged_hash("foo", "bar")))
216         self.failUnlessReallyEqual(readable["UEB_hash"],
217                              base32.b2a(hashutil.uri_extension_hash(ext)))
218
219 class Unknown(testutil.ReallyEqualMixin, unittest.TestCase):
220     def test_from_future(self):
221         # any URI type that we don't recognize should be treated as unknown
222         future_uri = "I am a URI from the future. Whatever you do, don't "
223         u = uri.from_string(future_uri)
224         self.failUnless(isinstance(u, uri.UnknownURI))
225         self.failUnlessReallyEqual(u.to_string(), future_uri)
226         self.failUnless(u.get_readonly() is None)
227         self.failUnless(u.get_error() is None)
228
229         u2 = uri.UnknownURI(future_uri, error=CapConstraintError("..."))
230         self.failUnlessReallyEqual(u.to_string(), future_uri)
231         self.failUnless(u2.get_readonly() is None)
232         self.failUnless(isinstance(u2.get_error(), CapConstraintError))
233
234         # Future caps might have non-ASCII chars in them. (Or maybe not, who can tell about the future?)
235         future_uri = u"I am a cap from the \u263A future. Whatever you ".encode('utf-8')
236         u = uri.from_string(future_uri)
237         self.failUnless(isinstance(u, uri.UnknownURI))
238         self.failUnlessReallyEqual(u.to_string(), future_uri)
239         self.failUnless(u.get_readonly() is None)
240         self.failUnless(u.get_error() is None)
241
242         u2 = uri.UnknownURI(future_uri, error=CapConstraintError("..."))
243         self.failUnlessReallyEqual(u.to_string(), future_uri)
244         self.failUnless(u2.get_readonly() is None)
245         self.failUnless(isinstance(u2.get_error(), CapConstraintError))
246
247 class Constraint(testutil.ReallyEqualMixin, unittest.TestCase):
248     def test_constraint(self):
249         good="http://127.0.0.1:3456/uri/URI%3ADIR2%3Agh3l5rbvnv2333mrfvalmjfr4i%3Alz6l7u3z3b7g37s4zkdmfpx5ly4ib4m6thrpbusi6ys62qtc6mma/"
250         uri.DirectoryURI.init_from_human_encoding(good)
251         self.failUnlessRaises(uri.BadURIError, uri.DirectoryURI.init_from_string, good)
252         bad = good + '==='
253         self.failUnlessRaises(uri.BadURIError, uri.DirectoryURI.init_from_human_encoding, bad)
254         self.failUnlessRaises(uri.BadURIError, uri.DirectoryURI.init_from_string, bad)
255         fileURI = 'URI:CHK:gh3l5rbvnv2333mrfvalmjfr4i:lz6l7u3z3b7g37s4zkdmfpx5ly4ib4m6thrpbusi6ys62qtc6mma:3:10:345834'
256         uri.CHKFileURI.init_from_string(fileURI)
257
258 class Mutable(testutil.ReallyEqualMixin, unittest.TestCase):
259     def setUp(self):
260         self.writekey = "\x01" * 16
261         self.fingerprint = "\x02" * 32
262         self.readkey = hashutil.ssk_readkey_hash(self.writekey)
263         self.storage_index = hashutil.ssk_storage_index_hash(self.readkey)
264
265     def test_pack(self):
266         u = uri.WriteableSSKFileURI(self.writekey, self.fingerprint)
267         self.failUnlessReallyEqual(u.writekey, self.writekey)
268         self.failUnlessReallyEqual(u.fingerprint, self.fingerprint)
269         self.failIf(u.is_readonly())
270         self.failUnless(u.is_mutable())
271         self.failUnless(IURI.providedBy(u))
272         self.failUnless(IMutableFileURI.providedBy(u))
273         self.failIf(IDirnodeURI.providedBy(u))
274         self.failUnless("WriteableSSKFileURI" in str(u))
275
276         he = u.to_human_encoding()
277         u_h = uri.WriteableSSKFileURI.init_from_human_encoding(he)
278         self.failUnlessReallyEqual(u, u_h)
279
280         u2 = uri.from_string(u.to_string())
281         self.failUnlessReallyEqual(u2.writekey, self.writekey)
282         self.failUnlessReallyEqual(u2.fingerprint, self.fingerprint)
283         self.failIf(u2.is_readonly())
284         self.failUnless(u2.is_mutable())
285         self.failUnless(IURI.providedBy(u2))
286         self.failUnless(IMutableFileURI.providedBy(u2))
287         self.failIf(IDirnodeURI.providedBy(u2))
288
289         u2i = uri.from_string(u.to_string(), deep_immutable=True)
290         self.failUnless(isinstance(u2i, uri.UnknownURI), u2i)
291         u2ro = uri.from_string(uri.ALLEGED_READONLY_PREFIX + u.to_string())
292         self.failUnless(isinstance(u2ro, uri.UnknownURI), u2ro)
293         u2imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u.to_string())
294         self.failUnless(isinstance(u2imm, uri.UnknownURI), u2imm)
295
296         u3 = u2.get_readonly()
297         readkey = hashutil.ssk_readkey_hash(self.writekey)
298         self.failUnlessReallyEqual(u3.fingerprint, self.fingerprint)
299         self.failUnlessReallyEqual(u3.readkey, readkey)
300         self.failUnless(u3.is_readonly())
301         self.failUnless(u3.is_mutable())
302         self.failUnless(IURI.providedBy(u3))
303         self.failUnless(IMutableFileURI.providedBy(u3))
304         self.failIf(IDirnodeURI.providedBy(u3))
305
306         u3i = uri.from_string(u3.to_string(), deep_immutable=True)
307         self.failUnless(isinstance(u3i, uri.UnknownURI), u3i)
308         u3ro = uri.from_string(uri.ALLEGED_READONLY_PREFIX + u3.to_string())
309         self.failUnlessReallyEqual(u3.to_string(), u3ro.to_string())
310         u3imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u3.to_string())
311         self.failUnless(isinstance(u3imm, uri.UnknownURI), u3imm)
312
313         he = u3.to_human_encoding()
314         u3_h = uri.ReadonlySSKFileURI.init_from_human_encoding(he)
315         self.failUnlessReallyEqual(u3, u3_h)
316
317         u4 = uri.ReadonlySSKFileURI(readkey, self.fingerprint)
318         self.failUnlessReallyEqual(u4.fingerprint, self.fingerprint)
319         self.failUnlessReallyEqual(u4.readkey, readkey)
320         self.failUnless(u4.is_readonly())
321         self.failUnless(u4.is_mutable())
322         self.failUnless(IURI.providedBy(u4))
323         self.failUnless(IMutableFileURI.providedBy(u4))
324         self.failIf(IDirnodeURI.providedBy(u4))
325
326         u4i = uri.from_string(u4.to_string(), deep_immutable=True)
327         self.failUnless(isinstance(u4i, uri.UnknownURI), u4i)
328         u4ro = uri.from_string(uri.ALLEGED_READONLY_PREFIX + u4.to_string())
329         self.failUnlessReallyEqual(u4.to_string(), u4ro.to_string())
330         u4imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u4.to_string())
331         self.failUnless(isinstance(u4imm, uri.UnknownURI), u4imm)
332
333         u4a = uri.from_string(u4.to_string())
334         self.failUnlessReallyEqual(u4a, u4)
335         self.failUnless("ReadonlySSKFileURI" in str(u4a))
336         self.failUnlessIdentical(u4a.get_readonly(), u4a)
337
338         u5 = u4.get_verify_cap()
339         self.failUnless(IVerifierURI.providedBy(u5))
340         self.failUnlessReallyEqual(u5.get_storage_index(), u.get_storage_index())
341         u7 = u.get_verify_cap()
342         self.failUnless(IVerifierURI.providedBy(u7))
343         self.failUnlessReallyEqual(u7.get_storage_index(), u.get_storage_index())
344
345         he = u5.to_human_encoding()
346         u5_h = uri.SSKVerifierURI.init_from_human_encoding(he)
347         self.failUnlessReallyEqual(u5, u5_h)
348
349
350     def test_writable_mdmf_cap(self):
351         u1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint)
352         cap = u1.to_string()
353         u = uri.WritableMDMFFileURI.init_from_string(cap)
354
355         self.failUnless(IMutableFileURI.providedBy(u))
356         self.failUnlessReallyEqual(u.fingerprint, self.fingerprint)
357         self.failUnlessReallyEqual(u.writekey, self.writekey)
358         self.failUnless(u.is_mutable())
359         self.failIf(u.is_readonly())
360         self.failUnlessEqual(cap, u.to_string())
361
362         # Now get a readonly cap from the writable cap, and test that it
363         # degrades gracefully.
364         ru = u.get_readonly()
365         self.failUnlessReallyEqual(self.readkey, ru.readkey)
366         self.failUnlessReallyEqual(self.fingerprint, ru.fingerprint)
367         self.failUnless(ru.is_mutable())
368         self.failUnless(ru.is_readonly())
369
370         # Now get a verifier cap.
371         vu = ru.get_verify_cap()
372         self.failUnlessReallyEqual(self.storage_index, vu.storage_index)
373         self.failUnlessReallyEqual(self.fingerprint, vu.fingerprint)
374         self.failUnless(IVerifierURI.providedBy(vu))
375
376     def test_readonly_mdmf_cap(self):
377         u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
378         cap = u1.to_string()
379         u2 = uri.ReadonlyMDMFFileURI.init_from_string(cap)
380
381         self.failUnlessReallyEqual(u2.fingerprint, self.fingerprint)
382         self.failUnlessReallyEqual(u2.readkey, self.readkey)
383         self.failUnless(u2.is_readonly())
384         self.failUnless(u2.is_mutable())
385
386         vu = u2.get_verify_cap()
387         self.failUnlessEqual(vu.storage_index, self.storage_index)
388         self.failUnlessEqual(vu.fingerprint, self.fingerprint)
389
390     def test_create_writable_mdmf_cap_from_readcap(self):
391         # we shouldn't be able to create a writable MDMF cap given only a
392         # readcap.
393         u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
394         cap = u1.to_string()
395         self.failUnlessRaises(uri.BadURIError,
396                               uri.WritableMDMFFileURI.init_from_string,
397                               cap)
398
399     def test_create_writable_mdmf_cap_from_verifycap(self):
400         u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
401         cap = u1.to_string()
402         self.failUnlessRaises(uri.BadURIError,
403                               uri.WritableMDMFFileURI.init_from_string,
404                               cap)
405
406     def test_create_readonly_mdmf_cap_from_verifycap(self):
407         u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
408         cap = u1.to_string()
409         self.failUnlessRaises(uri.BadURIError,
410                               uri.ReadonlyMDMFFileURI.init_from_string,
411                               cap)
412
413     def test_mdmf_verifier_cap(self):
414         u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
415         self.failUnless(u1.is_readonly())
416         self.failIf(u1.is_mutable())
417         self.failUnlessReallyEqual(self.storage_index, u1.storage_index)
418         self.failUnlessReallyEqual(self.fingerprint, u1.fingerprint)
419
420         cap = u1.to_string()
421         u2 = uri.MDMFVerifierURI.init_from_string(cap)
422         self.failUnless(u2.is_readonly())
423         self.failIf(u2.is_mutable())
424         self.failUnlessReallyEqual(self.storage_index, u2.storage_index)
425         self.failUnlessReallyEqual(self.fingerprint, u2.fingerprint)
426
427         u3 = u2.get_readonly()
428         self.failUnlessReallyEqual(u3, u2)
429
430         u4 = u2.get_verify_cap()
431         self.failUnlessReallyEqual(u4, u2)
432
433     def test_mdmf_cap_extra_information(self):
434         # MDMF caps can be arbitrarily extended after the fingerprint
435         # and key/storage index fields. 
436         u1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint)
437         self.failUnlessEqual([], u1.get_extension_params())
438
439         cap = u1.to_string()
440         # Now let's append some fields. Say, 131073 (the segment size)
441         # and 3 (the "k" encoding parameter).
442         expected_extensions = []
443         for e in ('131073', '3'):
444             cap += (":%s" % e)
445             expected_extensions.append(e)
446
447             u2 = uri.WritableMDMFFileURI.init_from_string(cap)
448             self.failUnlessReallyEqual(self.writekey, u2.writekey)
449             self.failUnlessReallyEqual(self.fingerprint, u2.fingerprint)
450             self.failIf(u2.is_readonly())
451             self.failUnless(u2.is_mutable())
452
453             c2 = u2.to_string()
454             u2n = uri.WritableMDMFFileURI.init_from_string(c2)
455             self.failUnlessReallyEqual(u2, u2n)
456
457             # We should get the extra back when we ask for it.
458             self.failUnlessEqual(expected_extensions, u2.get_extension_params())
459
460             # These should be preserved through cap attenuation, too.
461             u3 = u2.get_readonly()
462             self.failUnlessReallyEqual(self.readkey, u3.readkey)
463             self.failUnlessReallyEqual(self.fingerprint, u3.fingerprint)
464             self.failUnless(u3.is_readonly())
465             self.failUnless(u3.is_mutable())
466             self.failUnlessEqual(expected_extensions, u3.get_extension_params())
467
468             c3 = u3.to_string()
469             u3n = uri.ReadonlyMDMFFileURI.init_from_string(c3)
470             self.failUnlessReallyEqual(u3, u3n)
471
472             u4 = u3.get_verify_cap()
473             self.failUnlessReallyEqual(self.storage_index, u4.storage_index)
474             self.failUnlessReallyEqual(self.fingerprint, u4.fingerprint)
475             self.failUnless(u4.is_readonly())
476             self.failIf(u4.is_mutable())
477
478             c4 = u4.to_string()
479             u4n = uri.MDMFVerifierURI.init_from_string(c4)
480             self.failUnlessReallyEqual(u4n, u4)
481
482             self.failUnlessEqual(expected_extensions, u4.get_extension_params())
483
484
485     def test_sdmf_cap_extra_information(self):
486         # For interface consistency, we define a method to get
487         # extensions for SDMF files as well. This method must always
488         # return no extensions, since SDMF files were not created with
489         # extensions and cannot be modified to include extensions
490         # without breaking older clients.
491         u1 = uri.WriteableSSKFileURI(self.writekey, self.fingerprint)
492         cap = u1.to_string()
493         u2 = uri.WriteableSSKFileURI.init_from_string(cap)
494         self.failUnlessEqual([], u2.get_extension_params())
495
496     def test_extension_character_range(self):
497         # As written now, we shouldn't put things other than numbers in
498         # the extension fields.
499         writecap = uri.WritableMDMFFileURI(self.writekey, self.fingerprint).to_string()
500         readcap  = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint).to_string()
501         vcap     = uri.MDMFVerifierURI(self.storage_index, self.fingerprint).to_string()
502         self.failUnlessRaises(uri.BadURIError,
503                               uri.WritableMDMFFileURI.init_from_string,
504                               ("%s:invalid" % writecap))
505         self.failUnlessRaises(uri.BadURIError,
506                               uri.ReadonlyMDMFFileURI.init_from_string,
507                               ("%s:invalid" % readcap))
508         self.failUnlessRaises(uri.BadURIError,
509                               uri.MDMFVerifierURI.init_from_string,
510                               ("%s:invalid" % vcap))
511
512
513     def test_mdmf_valid_human_encoding(self):
514         # What's a human encoding? Well, it's of the form:
515         base = "https://127.0.0.1:3456/uri/"
516         # With a cap on the end. For each of the cap types, we need to
517         # test that a valid cap (with and without the traditional
518         # separators) is recognized and accepted by the classes.
519         w1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint)
520         w2 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint,
521                                      ['131073', '3'])
522         r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
523         r2 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint,
524                                      ['131073', '3'])
525         v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
526         v2 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint,
527                                  ['131073', '3'])
528
529         # These will yield six different caps.
530         for o in (w1, w2, r1 , r2, v1, v2):
531             url = base + o.to_string()
532             o1 = o.__class__.init_from_human_encoding(url)
533             self.failUnlessReallyEqual(o1, o)
534
535             # Note that our cap will, by default, have : as separators. 
536             # But it's expected that users from, e.g., the WUI, will
537             # have %3A as a separator. We need to make sure that the
538             # initialization routine handles that, too.
539             cap = o.to_string()
540             cap = re.sub(":", "%3A", cap)
541             url = base + cap
542             o2 = o.__class__.init_from_human_encoding(url)
543             self.failUnlessReallyEqual(o2, o)
544
545
546     def test_mdmf_human_encoding_invalid_base(self):
547         # What's a human encoding? Well, it's of the form:
548         base = "https://127.0.0.1:3456/foo/bar/bazuri/"
549         # With a cap on the end. For each of the cap types, we need to
550         # test that a valid cap (with and without the traditional
551         # separators) is recognized and accepted by the classes.
552         w1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint)
553         w2 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint,
554                                      ['131073', '3'])
555         r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
556         r2 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint,
557                                      ['131073', '3'])
558         v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
559         v2 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint,
560                                  ['131073', '3'])
561
562         # These will yield six different caps.
563         for o in (w1, w2, r1 , r2, v1, v2):
564             url = base + o.to_string()
565             self.failUnlessRaises(uri.BadURIError,
566                                   o.__class__.init_from_human_encoding,
567                                   url)
568
569     def test_mdmf_human_encoding_invalid_cap(self):
570         base = "https://127.0.0.1:3456/uri/"
571         # With a cap on the end. For each of the cap types, we need to
572         # test that a valid cap (with and without the traditional
573         # separators) is recognized and accepted by the classes.
574         w1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint)
575         w2 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint,
576                                      ['131073', '3'])
577         r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
578         r2 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint,
579                                      ['131073', '3'])
580         v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
581         v2 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint,
582                                  ['131073', '3'])
583
584         # These will yield six different caps.
585         for o in (w1, w2, r1 , r2, v1, v2):
586             # not exhaustive, obviously...
587             url = base + o.to_string() + "foobarbaz"
588             url2 = base + "foobarbaz" + o.to_string()
589             url3 = base + o.to_string()[:25] + "foo" + o.to_string()[:25]
590             for u in (url, url2, url3):
591                 self.failUnlessRaises(uri.BadURIError,
592                                       o.__class__.init_from_human_encoding,
593                                       u)
594
595     def test_mdmf_from_string(self):
596         # Make sure that the from_string utility function works with
597         # MDMF caps.
598         u1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint)
599         cap = u1.to_string()
600         self.failUnless(uri.is_uri(cap))
601         u2 = uri.from_string(cap)
602         self.failUnlessReallyEqual(u1, u2)
603         u3 = uri.from_string_mutable_filenode(cap)
604         self.failUnlessEqual(u3, u1)
605
606         # XXX: We should refactor the extension field into setUp
607         u1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint,
608                                      ['131073', '3'])
609         cap = u1.to_string()
610         self.failUnless(uri.is_uri(cap))
611         u2 = uri.from_string(cap)
612         self.failUnlessReallyEqual(u1, u2)
613         u3 = uri.from_string_mutable_filenode(cap)
614         self.failUnlessEqual(u3, u1)
615
616         u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
617         cap = u1.to_string()
618         self.failUnless(uri.is_uri(cap))
619         u2 = uri.from_string(cap)
620         self.failUnlessReallyEqual(u1, u2)
621         u3 = uri.from_string_mutable_filenode(cap)
622         self.failUnlessEqual(u3, u1)
623
624         u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint,
625                                      ['131073', '3'])
626         cap = u1.to_string()
627         self.failUnless(uri.is_uri(cap))
628         u2 = uri.from_string(cap)
629         self.failUnlessReallyEqual(u1, u2)
630         u3 = uri.from_string_mutable_filenode(cap)
631         self.failUnlessEqual(u3, u1)
632
633         u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
634         cap = u1.to_string()
635         self.failUnless(uri.is_uri(cap))
636         u2 = uri.from_string(cap)
637         self.failUnlessReallyEqual(u1, u2)
638         u3 = uri.from_string_verifier(cap)
639         self.failUnlessEqual(u3, u1)
640
641         u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint,
642                                  ['131073', '3'])
643         cap = u1.to_string()
644         self.failUnless(uri.is_uri(cap))
645         u2 = uri.from_string(cap)
646         self.failUnlessReallyEqual(u1, u2)
647         u3 = uri.from_string_verifier(cap)
648         self.failUnlessEqual(u3, u1)
649
650
651 class Dirnode(testutil.ReallyEqualMixin, unittest.TestCase):
652     def test_pack(self):
653         writekey = "\x01" * 16
654         fingerprint = "\x02" * 32
655
656         n = uri.WriteableSSKFileURI(writekey, fingerprint)
657         u1 = uri.DirectoryURI(n)
658         self.failIf(u1.is_readonly())
659         self.failUnless(u1.is_mutable())
660         self.failUnless(IURI.providedBy(u1))
661         self.failIf(IFileURI.providedBy(u1))
662         self.failUnless(IDirnodeURI.providedBy(u1))
663         self.failUnless("DirectoryURI" in str(u1))
664         u1_filenode = u1.get_filenode_cap()
665         self.failUnless(u1_filenode.is_mutable())
666         self.failIf(u1_filenode.is_readonly())
667
668         u2 = uri.from_string(u1.to_string())
669         self.failUnlessReallyEqual(u1.to_string(), u2.to_string())
670         self.failIf(u2.is_readonly())
671         self.failUnless(u2.is_mutable())
672         self.failUnless(IURI.providedBy(u2))
673         self.failIf(IFileURI.providedBy(u2))
674         self.failUnless(IDirnodeURI.providedBy(u2))
675
676         u2i = uri.from_string(u1.to_string(), deep_immutable=True)
677         self.failUnless(isinstance(u2i, uri.UnknownURI))
678
679         u3 = u2.get_readonly()
680         self.failUnless(u3.is_readonly())
681         self.failUnless(u3.is_mutable())
682         self.failUnless(IURI.providedBy(u3))
683         self.failIf(IFileURI.providedBy(u3))
684         self.failUnless(IDirnodeURI.providedBy(u3))
685
686         u3i = uri.from_string(u2.to_string(), deep_immutable=True)
687         self.failUnless(isinstance(u3i, uri.UnknownURI))
688
689         u3n = u3._filenode_uri
690         self.failUnless(u3n.is_readonly())
691         self.failUnless(u3n.is_mutable())
692         u3_filenode = u3.get_filenode_cap()
693         self.failUnless(u3_filenode.is_mutable())
694         self.failUnless(u3_filenode.is_readonly())
695
696         u3a = uri.from_string(u3.to_string())
697         self.failUnlessIdentical(u3a, u3a.get_readonly())
698
699         u4 = uri.ReadonlyDirectoryURI(u2._filenode_uri.get_readonly())
700         self.failUnlessReallyEqual(u4.to_string(), u3.to_string())
701         self.failUnless(u4.is_readonly())
702         self.failUnless(u4.is_mutable())
703         self.failUnless(IURI.providedBy(u4))
704         self.failIf(IFileURI.providedBy(u4))
705         self.failUnless(IDirnodeURI.providedBy(u4))
706
707         u4_verifier = u4.get_verify_cap()
708         u4_verifier_filenode = u4_verifier.get_filenode_cap()
709         self.failUnless(isinstance(u4_verifier_filenode, uri.SSKVerifierURI))
710
711         verifiers = [u1.get_verify_cap(), u2.get_verify_cap(),
712                      u3.get_verify_cap(), u4.get_verify_cap(),
713                      uri.DirectoryURIVerifier(n.get_verify_cap()),
714                      ]
715         for v in verifiers:
716             self.failUnless(IVerifierURI.providedBy(v))
717             self.failUnlessReallyEqual(v._filenode_uri,
718                                  u1.get_verify_cap()._filenode_uri)
719
720     def test_immutable(self):
721         readkey = "\x01" * 16
722         uri_extension_hash = hashutil.uri_extension_hash("stuff")
723         needed_shares = 3
724         total_shares = 10
725         size = 1234
726
727         fnuri = uri.CHKFileURI(key=readkey,
728                                uri_extension_hash=uri_extension_hash,
729                                needed_shares=needed_shares,
730                                total_shares=total_shares,
731                                size=size)
732         fncap = fnuri.to_string()
733         self.failUnlessReallyEqual(fncap, "URI:CHK:aeaqcaibaeaqcaibaeaqcaibae:nf3nimquen7aeqm36ekgxomalstenpkvsdmf6fplj7swdatbv5oa:3:10:1234")
734         u1 = uri.ImmutableDirectoryURI(fnuri)
735         self.failUnless(u1.is_readonly())
736         self.failIf(u1.is_mutable())
737         self.failUnless(IURI.providedBy(u1))
738         self.failIf(IFileURI.providedBy(u1))
739         self.failUnless(IDirnodeURI.providedBy(u1))
740         self.failUnless("DirectoryURI" in str(u1))
741         u1_filenode = u1.get_filenode_cap()
742         self.failIf(u1_filenode.is_mutable())
743         self.failUnless(u1_filenode.is_readonly())
744         self.failUnlessReallyEqual(u1_filenode.to_string(), fncap)
745         self.failUnless(str(u1))
746
747         u2 = uri.from_string(u1.to_string())
748         self.failUnlessReallyEqual(u1.to_string(), u2.to_string())
749         self.failUnless(u2.is_readonly())
750         self.failIf(u2.is_mutable())
751         self.failUnless(IURI.providedBy(u2))
752         self.failIf(IFileURI.providedBy(u2))
753         self.failUnless(IDirnodeURI.providedBy(u2))
754
755         u2i = uri.from_string(u1.to_string(), deep_immutable=True)
756         self.failUnlessReallyEqual(u1.to_string(), u2i.to_string())
757
758         u3 = u2.get_readonly()
759         self.failUnlessReallyEqual(u3.to_string(), u2.to_string())
760         self.failUnless(str(u3))
761
762         u3i = uri.from_string(u2.to_string(), deep_immutable=True)
763         self.failUnlessReallyEqual(u2.to_string(), u3i.to_string())
764
765         u2_verifier = u2.get_verify_cap()
766         self.failUnless(isinstance(u2_verifier,
767                                    uri.ImmutableDirectoryURIVerifier),
768                         u2_verifier)
769         self.failUnless(IVerifierURI.providedBy(u2_verifier))
770         u2vs = u2_verifier.to_string()
771         # URI:DIR2-CHK-Verifier:$key:$ueb:$k:$n:$size
772         self.failUnless(u2vs.startswith("URI:DIR2-CHK-Verifier:"), u2vs)
773         u2_verifier_fileuri = u2_verifier.get_filenode_cap()
774         self.failUnless(IVerifierURI.providedBy(u2_verifier_fileuri))
775         u2vfs = u2_verifier_fileuri.to_string()
776         # URI:CHK-Verifier:$key:$ueb:$k:$n:$size
777         self.failUnlessReallyEqual(u2vfs, fnuri.get_verify_cap().to_string())
778         self.failUnlessReallyEqual(u2vs[len("URI:DIR2-"):], u2vfs[len("URI:"):])
779         self.failUnless(str(u2_verifier))
780
781     def test_literal(self):
782         u0 = uri.LiteralFileURI("data")
783         u1 = uri.LiteralDirectoryURI(u0)
784         self.failUnless(str(u1))
785         self.failUnlessReallyEqual(u1.to_string(), "URI:DIR2-LIT:mrqxiyi")
786         self.failUnless(u1.is_readonly())
787         self.failIf(u1.is_mutable())
788         self.failUnless(IURI.providedBy(u1))
789         self.failIf(IFileURI.providedBy(u1))
790         self.failUnless(IDirnodeURI.providedBy(u1))
791         self.failUnlessReallyEqual(u1.get_verify_cap(), None)
792         self.failUnlessReallyEqual(u1.get_storage_index(), None)
793         self.failUnlessReallyEqual(u1.abbrev_si(), "<LIT>")
794
795     def test_mdmf(self):
796         writekey = "\x01" * 16
797         fingerprint = "\x02" * 32
798         uri1 = uri.WritableMDMFFileURI(writekey, fingerprint)
799         d1 = uri.MDMFDirectoryURI(uri1)
800         self.failIf(d1.is_readonly())
801         self.failUnless(d1.is_mutable())
802         self.failUnless(IURI.providedBy(d1))
803         self.failUnless(IDirnodeURI.providedBy(d1))
804         d1_uri = d1.to_string()
805
806         d2 = uri.from_string(d1_uri)
807         self.failUnlessIsInstance(d2, uri.MDMFDirectoryURI)
808         self.failIf(d2.is_readonly())
809         self.failUnless(d2.is_mutable())
810         self.failUnless(IURI.providedBy(d2))
811         self.failUnless(IDirnodeURI.providedBy(d2))
812
813         # It doesn't make sense to ask for a deep immutable URI for a
814         # mutable directory, and we should get back a result to that
815         # effect.
816         d3 = uri.from_string(d2.to_string(), deep_immutable=True)
817         self.failUnlessIsInstance(d3, uri.UnknownURI)
818
819     def test_mdmf_with_extensions(self):
820         writekey = "\x01" * 16
821         fingerprint = "\x02" * 32
822         uri1 = uri.WritableMDMFFileURI(writekey, fingerprint)
823         d1 = uri.MDMFDirectoryURI(uri1)
824         d1_uri = d1.to_string()
825         # Add some extensions, verify that the URI is interpreted
826         # correctly.
827         d1_uri += ":3:131073"
828         uri2 = uri.from_string(d1_uri)
829         self.failUnlessIsInstance(uri2, uri.MDMFDirectoryURI)
830         self.failUnless(IURI.providedBy(uri2))
831         self.failUnless(IDirnodeURI.providedBy(uri2))
832         self.failUnless(uri1.is_mutable())
833         self.failIf(uri1.is_readonly())
834
835         d2_uri = uri2.to_string()
836         self.failUnlessIn(":3:131073", d2_uri)
837
838         # Now attenuate, verify that the extensions persist
839         ro_uri = uri2.get_readonly()
840         self.failUnlessIsInstance(ro_uri, uri.ReadonlyMDMFDirectoryURI)
841         self.failUnless(ro_uri.is_mutable())
842         self.failUnless(ro_uri.is_readonly())
843         self.failUnless(IURI.providedBy(ro_uri))
844         self.failUnless(IDirnodeURI.providedBy(ro_uri))
845         ro_uri_str = ro_uri.to_string()
846         self.failUnlessIn(":3:131073", ro_uri_str)
847
848     def test_mdmf_attenuation(self):
849         writekey = "\x01" * 16
850         fingerprint = "\x02" * 32
851
852         uri1 = uri.WritableMDMFFileURI(writekey, fingerprint)
853         d1 = uri.MDMFDirectoryURI(uri1)
854         self.failUnless(d1.is_mutable())
855         self.failIf(d1.is_readonly())
856         self.failUnless(IURI.providedBy(d1))
857         self.failUnless(IDirnodeURI.providedBy(d1))
858
859         d1_uri = d1.to_string()
860         d1_uri_from_fn = uri.MDMFDirectoryURI(d1.get_filenode_cap()).to_string()
861         self.failUnlessEqual(d1_uri_from_fn, d1_uri)
862
863         uri2 = uri.from_string(d1_uri)
864         self.failUnlessIsInstance(uri2, uri.MDMFDirectoryURI)
865         self.failUnless(IURI.providedBy(uri2))
866         self.failUnless(IDirnodeURI.providedBy(uri2))
867         self.failUnless(uri2.is_mutable())
868         self.failIf(uri2.is_readonly())
869
870         ro = uri2.get_readonly()
871         self.failUnlessIsInstance(ro, uri.ReadonlyMDMFDirectoryURI)
872         self.failUnless(ro.is_mutable())
873         self.failUnless(ro.is_readonly())
874         self.failUnless(IURI.providedBy(ro))
875         self.failUnless(IDirnodeURI.providedBy(ro))
876
877         ro_uri = ro.to_string()
878         n = uri.from_string(ro_uri, deep_immutable=True)
879         self.failUnlessIsInstance(n, uri.UnknownURI)
880
881         fn_cap = ro.get_filenode_cap()
882         fn_ro_cap = fn_cap.get_readonly()
883         d3 = uri.ReadonlyMDMFDirectoryURI(fn_ro_cap)
884         self.failUnlessEqual(ro.to_string(), d3.to_string())
885         self.failUnless(ro.is_mutable())
886         self.failUnless(ro.is_readonly())
887
888     def test_mdmf_verifier(self):
889         # I'm not sure what I want to write here yet.
890         writekey = "\x01" * 16
891         fingerprint = "\x02" * 32
892         uri1 = uri.WritableMDMFFileURI(writekey, fingerprint)
893         d1 = uri.MDMFDirectoryURI(uri1)
894         v1 = d1.get_verify_cap()
895         self.failUnlessIsInstance(v1, uri.MDMFDirectoryURIVerifier)
896         self.failIf(v1.is_mutable())
897
898         d2 = uri.from_string(d1.to_string())
899         v2 = d2.get_verify_cap()
900         self.failUnlessIsInstance(v2, uri.MDMFDirectoryURIVerifier)
901         self.failIf(v2.is_mutable())
902         self.failUnlessEqual(v2.to_string(), v1.to_string())
903
904         # Now attenuate and make sure that works correctly.
905         r3 = d2.get_readonly()
906         v3 = r3.get_verify_cap()
907         self.failUnlessIsInstance(v3, uri.MDMFDirectoryURIVerifier)
908         self.failIf(v3.is_mutable())
909         self.failUnlessEqual(v3.to_string(), v1.to_string())
910         r4 = uri.from_string(r3.to_string())
911         v4 = r4.get_verify_cap()
912         self.failUnlessIsInstance(v4, uri.MDMFDirectoryURIVerifier)
913         self.failIf(v4.is_mutable())
914         self.failUnlessEqual(v4.to_string(), v3.to_string())