]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_upload.py
mutable WIP: rename NotEnoughPeersError to NotEnoughSharesError
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_upload.py
1
2 import os
3 from twisted.trial import unittest
4 from twisted.python.failure import Failure
5 from twisted.python import log
6 from cStringIO import StringIO
7
8 from allmydata import upload, encode, uri
9 from allmydata.interfaces import IFileURI
10 from allmydata.util.assertutil import precondition
11 from allmydata.util.deferredutil import DeferredListShouldSucceed
12 from foolscap import eventual
13
14 MiB = 1024*1024
15
16 def extract_uri(results):
17     return results.uri
18
19 class Uploadable(unittest.TestCase):
20     def shouldEqual(self, data, expected):
21         self.failUnless(isinstance(data, list))
22         for e in data:
23             self.failUnless(isinstance(e, str))
24         s = "".join(data)
25         self.failUnlessEqual(s, expected)
26
27     def test_filehandle_random_key(self):
28         return self._test_filehandle(convergence=None)
29
30     def test_filehandle_convergent_encryption(self):
31         return self._test_filehandle(convergence="some convergence string")
32
33     def _test_filehandle(self, convergence):
34         s = StringIO("a"*41)
35         u = upload.FileHandle(s, convergence=convergence)
36         d = u.get_size()
37         d.addCallback(self.failUnlessEqual, 41)
38         d.addCallback(lambda res: u.read(1))
39         d.addCallback(self.shouldEqual, "a")
40         d.addCallback(lambda res: u.read(80))
41         d.addCallback(self.shouldEqual, "a"*40)
42         d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
43         d.addCallback(lambda res: s.close()) # that privilege is reserved for us
44         return d
45
46     def test_filename(self):
47         basedir = "upload/Uploadable/test_filename"
48         os.makedirs(basedir)
49         fn = os.path.join(basedir, "file")
50         f = open(fn, "w")
51         f.write("a"*41)
52         f.close()
53         u = upload.FileName(fn, convergence=None)
54         d = u.get_size()
55         d.addCallback(self.failUnlessEqual, 41)
56         d.addCallback(lambda res: u.read(1))
57         d.addCallback(self.shouldEqual, "a")
58         d.addCallback(lambda res: u.read(80))
59         d.addCallback(self.shouldEqual, "a"*40)
60         d.addCallback(lambda res: u.close())
61         return d
62
63     def test_data(self):
64         s = "a"*41
65         u = upload.Data(s, convergence=None)
66         d = u.get_size()
67         d.addCallback(self.failUnlessEqual, 41)
68         d.addCallback(lambda res: u.read(1))
69         d.addCallback(self.shouldEqual, "a")
70         d.addCallback(lambda res: u.read(80))
71         d.addCallback(self.shouldEqual, "a"*40)
72         d.addCallback(lambda res: u.close())
73         return d
74
75 class FakeStorageServer:
76     def __init__(self, mode):
77         self.mode = mode
78         self.allocated = []
79         self.queries = 0
80     def callRemote(self, methname, *args, **kwargs):
81         def _call():
82             meth = getattr(self, methname)
83             return meth(*args, **kwargs)
84         d = eventual.fireEventually()
85         d.addCallback(lambda res: _call())
86         return d
87
88     def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
89                          sharenums, share_size, canary):
90         #print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
91         self.queries += 1
92         if self.mode == "full":
93             return (set(), {},)
94         elif self.mode == "already got them":
95             return (set(sharenums), {},)
96         else:
97             for shnum in sharenums:
98                 self.allocated.append( (storage_index, shnum) )
99             return (set(),
100                     dict([( shnum, FakeBucketWriter(share_size) )
101                           for shnum in sharenums]),
102                     )
103
104 class FakeBucketWriter:
105     # a diagnostic version of storageserver.BucketWriter
106     def __init__(self, size):
107         self.data = StringIO()
108         self.closed = False
109         self._size = size
110
111     def callRemote(self, methname, *args, **kwargs):
112         def _call():
113             meth = getattr(self, "remote_" + methname)
114             return meth(*args, **kwargs)
115         d = eventual.fireEventually()
116         d.addCallback(lambda res: _call())
117         return d
118
119     def remote_write(self, offset, data):
120         precondition(not self.closed)
121         precondition(offset >= 0)
122         precondition(offset+len(data) <= self._size,
123                      "offset=%d + data=%d > size=%d" %
124                      (offset, len(data), self._size))
125         self.data.seek(offset)
126         self.data.write(data)
127
128     def remote_close(self):
129         precondition(not self.closed)
130         self.closed = True
131
132     def remote_abort(self):
133         log.err("uh oh, I was asked to abort")
134
135 class FakeClient:
136     DEFAULT_ENCODING_PARAMETERS = {"k":25,
137                                    "happy": 75,
138                                    "n": 100,
139                                    "max_segment_size": 1*MiB,
140                                    }
141     def __init__(self, mode="good", num_servers=50):
142         self.mode = mode
143         self.num_servers = num_servers
144     def log(self, *args, **kwargs):
145         pass
146     def get_permuted_peers(self, storage_index, include_myself):
147         peers = [ ("%20d"%fakeid, FakeStorageServer(self.mode),)
148                   for fakeid in range(self.num_servers) ]
149         self.last_peers = [p[1] for p in peers]
150         return peers
151     def get_encoding_parameters(self):
152         return self.DEFAULT_ENCODING_PARAMETERS
153
154     def get_renewal_secret(self):
155         return ""
156     def get_cancel_secret(self):
157         return ""
158
159 DATA = """
160 Once upon a time, there was a beautiful princess named Buttercup. She lived
161 in a magical land where every file was stored securely among millions of
162 machines, and nobody ever worried about their data being lost ever again.
163 The End.
164 """
165 assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
166
167 SIZE_ZERO = 0
168 SIZE_SMALL = 16
169 SIZE_LARGE = len(DATA)
170
171 def upload_data(uploader, data):
172     u = upload.Data(data, convergence=None)
173     return uploader.upload(u)
174 def upload_filename(uploader, filename):
175     u = upload.FileName(filename, convergence=None)
176     return uploader.upload(u)
177 def upload_filehandle(uploader, fh):
178     u = upload.FileHandle(fh, convergence=None)
179     return uploader.upload(u)
180
181 class GoodServer(unittest.TestCase):
182     def setUp(self):
183         self.node = FakeClient(mode="good")
184         self.u = upload.Uploader()
185         self.u.running = True
186         self.u.parent = self.node
187
188     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
189         p = {"k": k,
190              "happy": happy,
191              "n": n,
192              "max_segment_size": max_segsize,
193              }
194         self.node.DEFAULT_ENCODING_PARAMETERS = p
195
196     def _check_small(self, newuri, size):
197         u = IFileURI(newuri)
198         self.failUnless(isinstance(u, uri.LiteralFileURI))
199         self.failUnlessEqual(len(u.data), size)
200
201     def _check_large(self, newuri, size):
202         u = IFileURI(newuri)
203         self.failUnless(isinstance(u, uri.CHKFileURI))
204         self.failUnless(isinstance(u.storage_index, str))
205         self.failUnlessEqual(len(u.storage_index), 16)
206         self.failUnless(isinstance(u.key, str))
207         self.failUnlessEqual(len(u.key), 16)
208         self.failUnlessEqual(u.size, size)
209
210     def get_data(self, size):
211         return DATA[:size]
212
213     def test_data_zero(self):
214         data = self.get_data(SIZE_ZERO)
215         d = upload_data(self.u, data)
216         d.addCallback(extract_uri)
217         d.addCallback(self._check_small, SIZE_ZERO)
218         return d
219
220     def test_data_small(self):
221         data = self.get_data(SIZE_SMALL)
222         d = upload_data(self.u, data)
223         d.addCallback(extract_uri)
224         d.addCallback(self._check_small, SIZE_SMALL)
225         return d
226
227     def test_data_large(self):
228         data = self.get_data(SIZE_LARGE)
229         d = upload_data(self.u, data)
230         d.addCallback(extract_uri)
231         d.addCallback(self._check_large, SIZE_LARGE)
232         return d
233
234     def test_data_large_odd_segments(self):
235         data = self.get_data(SIZE_LARGE)
236         segsize = int(SIZE_LARGE / 2.5)
237         # we want 3 segments, since that's not a power of two
238         self.set_encoding_parameters(25, 75, 100, segsize)
239         d = upload_data(self.u, data)
240         d.addCallback(extract_uri)
241         d.addCallback(self._check_large, SIZE_LARGE)
242         return d
243
244     def test_filehandle_zero(self):
245         data = self.get_data(SIZE_ZERO)
246         d = upload_filehandle(self.u, StringIO(data))
247         d.addCallback(extract_uri)
248         d.addCallback(self._check_small, SIZE_ZERO)
249         return d
250
251     def test_filehandle_small(self):
252         data = self.get_data(SIZE_SMALL)
253         d = upload_filehandle(self.u, StringIO(data))
254         d.addCallback(extract_uri)
255         d.addCallback(self._check_small, SIZE_SMALL)
256         return d
257
258     def test_filehandle_large(self):
259         data = self.get_data(SIZE_LARGE)
260         d = upload_filehandle(self.u, StringIO(data))
261         d.addCallback(extract_uri)
262         d.addCallback(self._check_large, SIZE_LARGE)
263         return d
264
265     def test_filename_zero(self):
266         fn = "Uploader-test_filename_zero.data"
267         f = open(fn, "wb")
268         data = self.get_data(SIZE_ZERO)
269         f.write(data)
270         f.close()
271         d = upload_filename(self.u, fn)
272         d.addCallback(extract_uri)
273         d.addCallback(self._check_small, SIZE_ZERO)
274         return d
275
276     def test_filename_small(self):
277         fn = "Uploader-test_filename_small.data"
278         f = open(fn, "wb")
279         data = self.get_data(SIZE_SMALL)
280         f.write(data)
281         f.close()
282         d = upload_filename(self.u, fn)
283         d.addCallback(extract_uri)
284         d.addCallback(self._check_small, SIZE_SMALL)
285         return d
286
287     def test_filename_large(self):
288         fn = "Uploader-test_filename_large.data"
289         f = open(fn, "wb")
290         data = self.get_data(SIZE_LARGE)
291         f.write(data)
292         f.close()
293         d = upload_filename(self.u, fn)
294         d.addCallback(extract_uri)
295         d.addCallback(self._check_large, SIZE_LARGE)
296         return d
297
298 class FullServer(unittest.TestCase):
299     def setUp(self):
300         self.node = FakeClient(mode="full")
301         self.u = upload.Uploader()
302         self.u.running = True
303         self.u.parent = self.node
304
305     def _should_fail(self, f):
306         self.failUnless(isinstance(f, Failure) and f.check(encode.NotEnoughSharesError), f)
307
308     def test_data_large(self):
309         data = DATA
310         d = upload_data(self.u, data)
311         d.addBoth(self._should_fail)
312         return d
313
314 class PeerSelection(unittest.TestCase):
315
316     def make_client(self, num_servers=50):
317         self.node = FakeClient(mode="good", num_servers=num_servers)
318         self.u = upload.Uploader()
319         self.u.running = True
320         self.u.parent = self.node
321
322     def get_data(self, size):
323         return DATA[:size]
324
325     def _check_large(self, newuri, size):
326         u = IFileURI(newuri)
327         self.failUnless(isinstance(u, uri.CHKFileURI))
328         self.failUnless(isinstance(u.storage_index, str))
329         self.failUnlessEqual(len(u.storage_index), 16)
330         self.failUnless(isinstance(u.key, str))
331         self.failUnlessEqual(len(u.key), 16)
332         self.failUnlessEqual(u.size, size)
333
334     def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
335         p = {"k": k,
336              "happy": happy,
337              "n": n,
338              "max_segment_size": max_segsize,
339              }
340         self.node.DEFAULT_ENCODING_PARAMETERS = p
341
342     def test_one_each(self):
343         # if we have 50 shares, and there are 50 peers, and they all accept a
344         # share, we should get exactly one share per peer
345
346         self.make_client()
347         data = self.get_data(SIZE_LARGE)
348         self.set_encoding_parameters(25, 30, 50)
349         d = upload_data(self.u, data)
350         d.addCallback(extract_uri)
351         d.addCallback(self._check_large, SIZE_LARGE)
352         def _check(res):
353             for p in self.node.last_peers:
354                 allocated = p.allocated
355                 self.failUnlessEqual(len(allocated), 1)
356                 self.failUnlessEqual(p.queries, 1)
357         d.addCallback(_check)
358         return d
359
360     def test_two_each(self):
361         # if we have 100 shares, and there are 50 peers, and they all accept
362         # all shares, we should get exactly two shares per peer
363
364         self.make_client()
365         data = self.get_data(SIZE_LARGE)
366         self.set_encoding_parameters(50, 75, 100)
367         d = upload_data(self.u, data)
368         d.addCallback(extract_uri)
369         d.addCallback(self._check_large, SIZE_LARGE)
370         def _check(res):
371             for p in self.node.last_peers:
372                 allocated = p.allocated
373                 self.failUnlessEqual(len(allocated), 2)
374                 self.failUnlessEqual(p.queries, 2)
375         d.addCallback(_check)
376         return d
377
378     def test_one_each_plus_one_extra(self):
379         # if we have 51 shares, and there are 50 peers, then one peer gets
380         # two shares and the rest get just one
381
382         self.make_client()
383         data = self.get_data(SIZE_LARGE)
384         self.set_encoding_parameters(24, 41, 51)
385         d = upload_data(self.u, data)
386         d.addCallback(extract_uri)
387         d.addCallback(self._check_large, SIZE_LARGE)
388         def _check(res):
389             got_one = []
390             got_two = []
391             for p in self.node.last_peers:
392                 allocated = p.allocated
393                 self.failUnless(len(allocated) in (1,2), len(allocated))
394                 if len(allocated) == 1:
395                     self.failUnlessEqual(p.queries, 1)
396                     got_one.append(p)
397                 else:
398                     self.failUnlessEqual(p.queries, 2)
399                     got_two.append(p)
400             self.failUnlessEqual(len(got_one), 49)
401             self.failUnlessEqual(len(got_two), 1)
402         d.addCallback(_check)
403         return d
404
405     def test_four_each(self):
406         # if we have 200 shares, and there are 50 peers, then each peer gets
407         # 4 shares. The design goal is to accomplish this with only two
408         # queries per peer.
409
410         self.make_client()
411         data = self.get_data(SIZE_LARGE)
412         self.set_encoding_parameters(100, 150, 200)
413         d = upload_data(self.u, data)
414         d.addCallback(extract_uri)
415         d.addCallback(self._check_large, SIZE_LARGE)
416         def _check(res):
417             for p in self.node.last_peers:
418                 allocated = p.allocated
419                 self.failUnlessEqual(len(allocated), 4)
420                 self.failUnlessEqual(p.queries, 2)
421         d.addCallback(_check)
422         return d
423
424     def test_three_of_ten(self):
425         # if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
426         # 4+4+2
427
428         self.make_client(3)
429         data = self.get_data(SIZE_LARGE)
430         self.set_encoding_parameters(3, 5, 10)
431         d = upload_data(self.u, data)
432         d.addCallback(extract_uri)
433         d.addCallback(self._check_large, SIZE_LARGE)
434         def _check(res):
435             counts = {}
436             for p in self.node.last_peers:
437                 allocated = p.allocated
438                 counts[len(allocated)] = counts.get(len(allocated), 0) + 1
439             histogram = [counts.get(i, 0) for i in range(5)]
440             self.failUnlessEqual(histogram, [0,0,0,2,1])
441         d.addCallback(_check)
442         return d
443
444 class StorageIndex(unittest.TestCase):
445     def test_params_must_matter(self):
446         DATA = "I am some data"
447         u = upload.Data(DATA, convergence="")
448         eu = upload.EncryptAnUploadable(u)
449         d1 = eu.get_storage_index()
450
451         # CHK means the same data should encrypt the same way
452         u = upload.Data(DATA, convergence="")
453         eu = upload.EncryptAnUploadable(u)
454         d1a = eu.get_storage_index()
455
456         # but if we use a different convergence string it should be different
457         u = upload.Data(DATA, convergence="wheee!")
458         eu = upload.EncryptAnUploadable(u)
459         d1salt1 = eu.get_storage_index()
460
461         # and if we add yet a different convergence it should be different again
462         u = upload.Data(DATA, convergence="NOT wheee!")
463         eu = upload.EncryptAnUploadable(u)
464         d1salt2 = eu.get_storage_index()
465
466         # and if we use the first string again it should be the same as last time
467         u = upload.Data(DATA, convergence="wheee!")
468         eu = upload.EncryptAnUploadable(u)
469         d1salt1a = eu.get_storage_index()
470
471         # and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
472         u = upload.Data(DATA, convergence="")
473         u.encoding_param_k = u.default_encoding_param_k + 1
474         eu = upload.EncryptAnUploadable(u)
475         d2 = eu.get_storage_index()
476
477         # and if we use a random key, it should be different than the CHK
478         u = upload.Data(DATA, convergence=None)
479         eu = upload.EncryptAnUploadable(u)
480         d3 = eu.get_storage_index()
481         # and different from another instance
482         u = upload.Data(DATA, convergence=None)
483         eu = upload.EncryptAnUploadable(u)
484         d4 = eu.get_storage_index()
485
486         d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
487         def _done(res):
488             si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
489             self.failUnlessEqual(si1, si1a)
490             self.failIfEqual(si1, si2)
491             self.failIfEqual(si1, si3)
492             self.failIfEqual(si1, si4)
493             self.failIfEqual(si3, si4)
494             self.failIfEqual(si1salt1, si1)
495             self.failIfEqual(si1salt1, si1salt2)
496             self.failIfEqual(si1salt2, si1)
497             self.failUnlessEqual(si1salt1, si1salt1a)
498         d.addCallback(_done)
499         return d
500
501
502 # TODO:
503 #  upload with exactly 75 peers (shares_of_happiness)
504 #  have a download fail
505 #  cancel a download (need to implement more cancel stuff)